Skip to content

Commit d1176eb

Browse files
authored
Merge pull request #2317 from effigies/constant_loop
ENH: Improve loop timing in DistributedPluginBase
2 parents d42ae1a + 89bf38f commit d1176eb

File tree

1 file changed

+17
-11
lines changed

1 file changed

+17
-11
lines changed

nipype/pipeline/plugins/base.py

+17-11
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ def run(self, graph, config, updatehash=False):
128128
old_progress_stats = None
129129
old_presub_stats = None
130130
while not np.all(self.proc_done) or np.any(self.proc_pending):
131-
# Check to see if a job is available (jobs without dependencies not run)
132-
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
133-
jobs_ready = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
131+
loop_start = time()
132+
# Check if a job is available (jobs with all dependencies run)
133+
# https://github.com/nipy/nipype/pull/2200#discussion_r141605722
134+
jobs_ready = np.nonzero(~self.proc_done &
135+
(self.depidx.sum(0) == 0))[1]
134136

135137
progress_stats = (len(self.proc_done),
136138
np.sum(self.proc_done ^ self.proc_pending),
@@ -164,7 +166,8 @@ def run(self, graph, config, updatehash=False):
164166
self._remove_node_dirs()
165167
self._clear_task(taskid)
166168
else:
167-
assert self.proc_done[jobid] and self.proc_pending[jobid]
169+
assert self.proc_done[jobid] and \
170+
self.proc_pending[jobid]
168171
toappend.insert(0, (taskid, jobid))
169172

170173
if toappend:
@@ -183,7 +186,8 @@ def run(self, graph, config, updatehash=False):
183186
elif display_stats:
184187
logger.debug('Not submitting (max jobs reached)')
185188

186-
sleep(poll_sleep_secs)
189+
sleep_til = loop_start + poll_sleep_secs
190+
sleep(max(0, sleep_til - time()))
187191

188192
self._remove_node_dirs()
189193
report_nodes_not_run(notrun)
@@ -271,8 +275,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
271275
if (num_jobs >= self.max_jobs) or (slots == 0):
272276
break
273277

274-
# Check to see if a job is available (jobs without dependencies not run)
275-
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
278+
# Check if a job is available (jobs with all dependencies run)
279+
# https://github.com/nipy/nipype/pull/2200#discussion_r141605722
276280
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
277281

278282
if len(jobids) > 0:
@@ -325,7 +329,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
325329
break
326330

327331
def _local_hash_check(self, jobid, graph):
328-
if not str2bool(self.procs[jobid].config['execution']['local_hash_check']):
332+
if not str2bool(self.procs[jobid].config['execution'][
333+
'local_hash_check']):
329334
return False
330335

331336
logger.debug('Checking hash (%d) locally', jobid)
@@ -397,8 +402,8 @@ def _remove_node_dirs(self):
397402
"""Removes directories whose outputs have already been used up
398403
"""
399404
if str2bool(self._config['execution']['remove_node_directories']):
400-
for idx in np.nonzero(
401-
(self.refidx.sum(axis=1) == 0).__array__())[0]:
405+
indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]
406+
for idx in indices:
402407
if idx in self.mapnodesubids:
403408
continue
404409
if self.proc_done[idx] and (not self.proc_pending[idx]):
@@ -513,7 +518,8 @@ class GraphPluginBase(PluginBase):
513518

514519
def __init__(self, plugin_args=None):
515520
if plugin_args and plugin_args.get('status_callback'):
516-
logger.warning('status_callback not supported for Graph submission plugins')
521+
logger.warning('status_callback not supported for Graph submission'
522+
' plugins')
517523
super(GraphPluginBase, self).__init__(plugin_args=plugin_args)
518524

519525
def run(self, graph, config, updatehash=False):

0 commit comments

Comments
 (0)