Skip to content

ENH: Improve loop timing in DistributedPluginBase #2317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 5, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ def run(self, graph, config, updatehash=False):
old_progress_stats = None
old_presub_stats = None
while not np.all(self.proc_done) or np.any(self.proc_pending):
# Check to see if a job is available (jobs without dependencies not run)
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
jobs_ready = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]
loop_start = time()
# Check if a job is available (jobs with all dependencies run)
# https://github.com/nipy/nipype/pull/2200#discussion_r141605722
jobs_ready = np.nonzero(~self.proc_done &
(self.depidx.sum(0) == 0))[1]

progress_stats = (len(self.proc_done),
np.sum(self.proc_done ^ self.proc_pending),
Expand Down Expand Up @@ -164,7 +166,8 @@ def run(self, graph, config, updatehash=False):
self._remove_node_dirs()
self._clear_task(taskid)
else:
assert self.proc_done[jobid] and self.proc_pending[jobid]
assert self.proc_done[jobid] and \
self.proc_pending[jobid]
toappend.insert(0, (taskid, jobid))

if toappend:
Expand All @@ -183,7 +186,8 @@ def run(self, graph, config, updatehash=False):
elif display_stats:
logger.debug('Not submitting (max jobs reached)')

sleep(poll_sleep_secs)
sleep_til = loop_start + poll_sleep_secs
sleep(max(0, sleep_til - time()))

self._remove_node_dirs()
report_nodes_not_run(notrun)
Expand Down Expand Up @@ -271,8 +275,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
if (num_jobs >= self.max_jobs) or (slots == 0):
break

# Check to see if a job is available (jobs without dependencies not run)
# See https://github.com/nipy/nipype/pull/2200#discussion_r141605722
# Check if a job is available (jobs with all dependencies run)
# https://github.com/nipy/nipype/pull/2200#discussion_r141605722
jobids = np.nonzero(~self.proc_done & (self.depidx.sum(0) == 0))[1]

if len(jobids) > 0:
Expand Down Expand Up @@ -325,7 +329,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
break

def _local_hash_check(self, jobid, graph):
if not str2bool(self.procs[jobid].config['execution']['local_hash_check']):
if not str2bool(self.procs[jobid].config['execution'][
'local_hash_check']):
return False

logger.debug('Checking hash (%d) locally', jobid)
Expand Down Expand Up @@ -397,8 +402,8 @@ def _remove_node_dirs(self):
"""Removes directories whose outputs have already been used up
"""
if str2bool(self._config['execution']['remove_node_directories']):
for idx in np.nonzero(
(self.refidx.sum(axis=1) == 0).__array__())[0]:
indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]
for idx in indices:
if idx in self.mapnodesubids:
continue
if self.proc_done[idx] and (not self.proc_pending[idx]):
Expand Down Expand Up @@ -513,7 +518,8 @@ class GraphPluginBase(PluginBase):

def __init__(self, plugin_args=None):
if plugin_args and plugin_args.get('status_callback'):
logger.warning('status_callback not supported for Graph submission plugins')
logger.warning('status_callback not supported for Graph submission'
' plugins')
super(GraphPluginBase, self).__init__(plugin_args=plugin_args)

def run(self, graph, config, updatehash=False):
Expand Down