diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index 5bb03ef3d9..e27733ab77 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -128,9 +128,11 @@ def run(self, graph, config, updatehash=False): old_progress_stats = None old_presub_stats = None while not np.all(self.proc_done) or np.any(self.proc_pending): - # Check to see if a job is available (jobs without dependencies not run) - # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722 - jobs_ready = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1] + loop_start = time() + # Check if a job is available (jobs with all dependencies run) + # https://github.com/nipy/nipype/pull/2200#discussion_r141605722 + jobs_ready = np.nonzero(~self.proc_done & + (self.depidx.sum(0) == 0))[1] progress_stats = (len(self.proc_done), np.sum(self.proc_done ^ self.proc_pending), @@ -164,7 +166,8 @@ def run(self, graph, config, updatehash=False): self._remove_node_dirs() self._clear_task(taskid) else: - assert self.proc_done[jobid] and self.proc_pending[jobid] + assert self.proc_done[jobid] and \ + self.proc_pending[jobid] toappend.insert(0, (taskid, jobid)) if toappend: @@ -183,7 +186,8 @@ def run(self, graph, config, updatehash=False): elif display_stats: logger.debug('Not submitting (max jobs reached)') - sleep(poll_sleep_secs) + sleep_til = loop_start + poll_sleep_secs + sleep(max(0, sleep_til - time())) self._remove_node_dirs() report_nodes_not_run(notrun) @@ -271,8 +275,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): if (num_jobs >= self.max_jobs) or (slots == 0): break - # Check to see if a job is available (jobs without dependencies not run) - # See https://github.com/nipy/nipype/pull/2200#discussion_r141605722 + # Check if a job is available (jobs with all dependencies run) + # https://github.com/nipy/nipype/pull/2200#discussion_r141605722 jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1] if len(jobids) > 0: @@ -325,7 +329,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): break def _local_hash_check(self, jobid, graph): - if not str2bool(self.procs[jobid].config['execution']['local_hash_check']): + if not str2bool(self.procs[jobid].config['execution'][ + 'local_hash_check']): return False logger.debug('Checking hash (%d) locally', jobid) @@ -397,8 +402,8 @@ def _remove_node_dirs(self): """Removes directories whose outputs have already been used up """ if str2bool(self._config['execution']['remove_node_directories']): - for idx in np.nonzero( - (self.refidx.sum(axis=1) == 0).__array__())[0]: + indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0] + for idx in indices: if idx in self.mapnodesubids: continue if self.proc_done[idx] and (not self.proc_pending[idx]): @@ -513,7 +518,8 @@ class GraphPluginBase(PluginBase): def __init__(self, plugin_args=None): if plugin_args and plugin_args.get('status_callback'): - logger.warning('status_callback not supported for Graph submission plugins') + logger.warning('status_callback not supported for Graph submission' + ' plugins') super(GraphPluginBase, self).__init__(plugin_args=plugin_args) def run(self, graph, config, updatehash=False):