Skip to content

Commit b401014

Browse files
authored
Merge pull request #2324 from oesteban/enh/logging-multiproc
[ENH] Logging - MultiProc report current tasks
2 parents 7ebf20c + cfcc2bc commit b401014

File tree

1 file changed

+26
-6
lines changed

1 file changed

+26
-6
lines changed

nipype/pipeline/plugins/multiproc.py

+26-6
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,26 @@
1212
from multiprocessing import Process, Pool, cpu_count, pool
1313
from traceback import format_exception
1414
import sys
15+
from logging import INFO
1516
import gc
1617

1718
from copy import deepcopy
1819
import numpy as np
19-
2020
from ... import logging
2121
from ...utils.profiler import get_system_total_memory_gb
2222
from ..engine import MapNode
2323
from .base import DistributedPluginBase
2424

25+
try:
26+
from textwrap import indent
27+
except ImportError:
28+
def indent(text, prefix):
29+
""" A textwrap.indent replacement for Python < 3.3 """
30+
if not prefix:
31+
return text
32+
splittext = text.splitlines(True)
33+
return prefix + prefix.join(splittext)
34+
2535
# Init logger
2636
logger = logging.getLogger('workflow')
2737

@@ -129,7 +139,7 @@ def __init__(self, plugin_args=None):
129139
True)
130140

131141
# Instantiate different thread pools for non-daemon processes
132-
logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d,'
142+
logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, '
133143
'mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors,
134144
self.memory_gb)
135145

@@ -162,7 +172,7 @@ def _submit_job(self, node, updatehash=False):
162172
run_node, (node, updatehash, self._taskid),
163173
callback=self._async_callback)
164174

165-
logger.debug('MultiProc submitted task %s (taskid=%d).',
175+
logger.debug('[MultiProc] Submitted task %s (taskid=%d).',
166176
node.fullname, self._taskid)
167177
return self._taskid
168178

@@ -219,9 +229,19 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
219229
stats = (len(self.pending_tasks), len(jobids), free_memory_gb,
220230
self.memory_gb, free_processors, self.processors)
221231
if self._stats != stats:
222-
logger.info('Currently running %d tasks, and %d jobs ready. Free '
223-
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d',
224-
*stats)
232+
tasks_list_msg = ''
233+
234+
if logger.level <= INFO:
235+
running_tasks = [' * %s' % self.procs[jobid].fullname
236+
for _, jobid in self.pending_tasks]
237+
if running_tasks:
238+
tasks_list_msg = '\nCurrently running:\n'
239+
tasks_list_msg += '\n'.join(running_tasks)
240+
tasks_list_msg = indent(tasks_list_msg, ' ' * 21)
241+
logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free '
242+
'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s',
243+
len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb,
244+
free_processors, self.processors, tasks_list_msg)
225245
self._stats = stats
226246

227247
if free_memory_gb < 0.01 or free_processors == 0:

0 commit comments

Comments
 (0)