From 056af08312e87dba421d3984ddbe996124f35995 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 17 Jan 2018 18:39:44 -0800 Subject: [PATCH 1/6] [MAINT] Improving hashing of nodes --- nipype/pipeline/engine/nodes.py | 49 +++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 7f99810f68..fda2178ec9 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -185,10 +185,11 @@ def __init__(self, 'num_threads') and self._n_procs is not None: self._interface.inputs.num_threads = self._n_procs - # Initialize needed_outputs - self.needed_outputs = [] - if needed_outputs: - self.needed_outputs = sorted(needed_outputs) + # Initialize needed_outputs and hashes + self._hashvalue = None + self._hashed_inputs = None + self._needed_outputs = [] + self.needed_outputs = sorted(needed_outputs) @property def interface(self): @@ -210,6 +211,20 @@ def outputs(self): """Return the output fields of the underlying interface""" return self._interface._outputs() + @property + def needed_outputs(self): + return self._needed_outputs + + @needed_outputs.setter + def needed_outputs(self, new_outputs): + """Needed outputs changes the hash, refresh if changed""" + new_outputs = sorted(new_outputs or []) + if new_outputs != self._needed_outputs: + # Reset hash + self._hashvalue = None + self._hashed_inputs = None + self._needed_outputs = new_outputs + @property def mem_gb(self): """Get estimated memory (GB)""" @@ -387,8 +402,8 @@ def run(self, updatehash=False): logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir) hash_info = self.hash_exists(updatehash=updatehash) hash_exists, hashvalue, hashfile, hashed_inputs = hash_info - force_run = self.overwrite or (self.overwrite is None - and self._interface.always_run) + force_run = self.overwrite or (self.overwrite is None and + self._interface.always_run) # If the node is cached, check on pklz files and finish if hash_exists and (updatehash or not force_run): @@ -479,17 +494,17 @@ def run(self, updatehash=False): def _get_hashval(self): """Return a hash of the input state""" self._get_inputs() - hashed_inputs, hashvalue = self.inputs.get_hashval( - hash_method=self.config['execution']['hash_method']) - rm_extra = self.config['execution']['remove_unnecessary_outputs'] - if str2bool(rm_extra) and self.needed_outputs: - hashobject = md5() - hashobject.update(hashvalue.encode()) - sorted_outputs = sorted(self.needed_outputs) - hashobject.update(str(sorted_outputs).encode()) - hashvalue = hashobject.hexdigest() - hashed_inputs.append(('needed_outputs', sorted_outputs)) - return hashed_inputs, hashvalue + if self._hashvalue is None and self._hashed_inputs is None: + self._hashed_inputs, self._hashvalue = self.inputs.get_hashval( + hash_method=self.config['execution']['hash_method']) + rm_extra = self.config['execution']['remove_unnecessary_outputs'] + if str2bool(rm_extra) and self.needed_outputs: + hashobject = md5() + hashobject.update(self._hashvalue.encode()) + hashobject.update(str(self.needed_outputs).encode()) + self._hashvalue = hashobject.hexdigest() + self._hashed_inputs.append(('needed_outputs', self.needed_outputs)) + return self._hashed_inputs, self._hashvalue def _get_inputs(self): """Retrieve inputs from pointers to results file From 9a99b2fe4027fcf3dbbcab94703e848e56e18989 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 17 Jan 2018 23:13:04 -0800 Subject: [PATCH 2/6] revise handling of hashes --- nipype/pipeline/engine/nodes.py | 228 ++++++++++++--------- nipype/pipeline/engine/tests/test_nodes.py | 26 +-- nipype/pipeline/plugins/base.py | 10 +- 3 files changed, 147 insertions(+), 117 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index fda2178ec9..9bf16081a5 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -189,7 +189,7 @@ def __init__(self, self._hashvalue = None self._hashed_inputs = None self._needed_outputs = [] - self.needed_outputs = sorted(needed_outputs) + self.needed_outputs = needed_outputs @property def interface(self): @@ -297,83 +297,105 @@ def help(self): """Print interface help""" self._interface.help() - def hash_exists(self, updatehash=False): + def is_cached(self, rm_outdated=False): """ Check if the interface has been run previously, and whether - cached results are viable for reuse + cached results are up-to-date. """ + outdir = self.output_dir() - # Get a dictionary with hashed filenames and a hashvalue - # of the dictionary itself. + # Update hash hashed_inputs, hashvalue = self._get_hashval() - outdir = self.output_dir() + + # The output folder does not exist: not cached + if not op.exists(outdir): + logger.debug('[Node] Directory not found "%s".', outdir) + return False, False + hashfile = op.join(outdir, '_0x%s.json' % hashvalue) - hash_exists = op.exists(hashfile) - - logger.debug('[Node] hash value=%s, exists=%s', hashvalue, hash_exists) - - if op.exists(outdir): - # Find previous hashfiles - globhashes = glob(op.join(outdir, '_0x*.json')) - unfinished = [ - path for path in globhashes - if path.endswith('_unfinished.json') - ] - hashfiles = list(set(globhashes) - set(unfinished)) - if len(hashfiles) > 1: - for rmfile in hashfiles: - os.remove(rmfile) - - raise RuntimeError( - '[Node] Cache ERROR - Found %d previous hashfiles indicating ' - 'that the ``base_dir`` for this node went stale. Please re-run the ' - 'workflow.' % len(hashfiles)) - - # This should not happen, but clean up and break if so. - if unfinished and updatehash: - for rmfile in unfinished: - os.remove(rmfile) - - raise RuntimeError( - '[Node] Cache ERROR - Found unfinished hashfiles (%d) indicating ' - 'that the ``base_dir`` for this node went stale. Please re-run the ' - 'workflow.' % len(unfinished)) - - # Remove outdated hashfile - if hashfiles and hashfiles[0] != hashfile: - logger.info( - '[Node] Outdated hashfile found for "%s", removing and forcing node ' - 'to rerun.', self.fullname) - - # If logging is more verbose than INFO (20), print diff between hashes - loglevel = logger.getEffectiveLevel() - if loglevel < 20: # Lazy logging: only < INFO - split_out = split_filename(hashfiles[0]) - exp_hash_file_base = split_out[1] - exp_hash = exp_hash_file_base[len('_0x'):] - logger.log(loglevel, "[Node] Old/new hashes = %s/%s", - exp_hash, hashvalue) - try: - prev_inputs = load_json(hashfiles[0]) - except Exception: - pass - else: - logger.log(loglevel, - dict_diff(prev_inputs, hashed_inputs, 10)) + cached = op.exists(hashfile) + + # Check if updated + globhashes = glob(op.join(outdir, '_0x*.json')) + unfinished = [ + path for path in globhashes + if path.endswith('_unfinished.json') + ] + hashfiles = list(set(globhashes) - set(unfinished)) + logger.debug('[Node] Hashes: %s, %s, %s, %s', + hashed_inputs, hashvalue, hashfile, hashfiles) + + # No previous hashfiles found, we're all set. + if cached and len(hashfiles) == 1: + assert(hashfile == hashfiles[0]) + logger.debug('[Node] Up-to-date cache found for "%s".', self.fullname) + return True, True # Cached and updated + + if len(hashfiles) > 1: + if cached: + hashfiles.remove(hashfile) # Do not clean up the node, if cached + logger.warning('[Node] Found %d previous hashfiles indicating that the working ' + 'directory of node "%s" is stale, deleting old hashfiles.', + len(hashfiles), self.fullname) + for rmfile in hashfiles: + os.remove(rmfile) + + hashfiles = [hashfile] if cached else [] + + # At this point only one hashfile is in the folder + # and we directly check whether it is updated + if not hashfiles: + logger.debug('[Node] No hashfiles found in "%s".', outdir) + assert(not cached) + return False, False + + updated = hashfile == hashfiles[0] + if not updated: # Report differences depending on log verbosity + cached = True + logger.info('[Node] Outdated cache found for "%s".', self.fullname) + # If logging is more verbose than INFO (20), print diff between hashes + loglevel = logger.getEffectiveLevel() + if loglevel < 40: # Lazy logging: only < INFO + exp_hash_file_base = split_filename(hashfiles[0])[1] + exp_hash = exp_hash_file_base[len('_0x'):] + logger.log(loglevel, "[Node] Old/new hashes = %s/%s", + exp_hash, hashvalue) + try: + prev_inputs = load_json(hashfiles[0]) + except Exception: + pass + else: + logger.log(loglevel, + dict_diff(prev_inputs, hashed_inputs, 10)) + if rm_outdated: os.remove(hashfiles[0]) + assert(cached) # At this point, node is cached (may not be up-to-date) + return cached, updated + + def hash_exists(self, updatehash=False): + """ + Decorate the new `is_cached` method with hash updating + to maintain backwards compatibility. + """ + + # Get a dictionary with hashed filenames and a hashvalue + # of the dictionary itself. + cached, updated = self.is_cached(rm_outdated=True) + + outdir = self.output_dir() + hashfile = op.join(outdir, '_0x%s.json' % self._hashvalue) + + if updated: + return True, self._hashvalue, hashfile, self._hashed_inputs + # Update only possible if it exists - if hash_exists and updatehash: - logger.debug("[Node] Updating hash: %s", hashvalue) - _save_hashfile(hashfile, hashed_inputs) + if cached and updatehash: + logger.debug("[Node] Updating hash: %s", self._hashvalue) + _save_hashfile(hashfile, self._hashed_inputs) - logger.debug( - 'updatehash=%s, overwrite=%s, always_run=%s, hash_exists=%s, ' - 'hash_method=%s', updatehash, self.overwrite, - self._interface.always_run, hash_exists, - self.config['execution']['hash_method'].lower()) - return hash_exists, hashvalue, hashfile, hashed_inputs + return cached, self._hashvalue, hashfile, self._hashed_inputs def run(self, updatehash=False): """Execute the node in its directory. @@ -390,23 +412,17 @@ def run(self, updatehash=False): if self.config is None: self.config = {} self.config = merge_dict(deepcopy(config._sections), self.config) - self._get_inputs() - # Check if output directory exists outdir = self.output_dir() - if op.exists(outdir): - logger.debug('Output directory (%s) exists and is %sempty,', - outdir, 'not ' * bool(os.listdir(outdir))) + force_run = self.overwrite or (self.overwrite is None and + self._interface.always_run) # Check hash, check whether run should be enforced logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir) - hash_info = self.hash_exists(updatehash=updatehash) - hash_exists, hashvalue, hashfile, hashed_inputs = hash_info - force_run = self.overwrite or (self.overwrite is None and - self._interface.always_run) + cached, updated = self.is_cached() # If the node is cached, check on pklz files and finish - if hash_exists and (updatehash or not force_run): + if not force_run and (updated or (not updated and updatehash)): logger.debug("Only updating node hashes or skipping execution") inputs_file = op.join(outdir, '_inputs.pklz') if not op.exists(inputs_file): @@ -418,46 +434,48 @@ def run(self, updatehash=False): logger.debug('Creating node file %s', node_file) savepkl(node_file, self) - result = self._run_interface(execute=False, updatehash=updatehash) + result = self._run_interface(execute=False, + updatehash=updatehash and not updated) logger.info('[Node] "%s" found cached%s.', self.fullname, - ' (and hash updated)' * updatehash) + ' (and hash updated)' * (updatehash and not updated)) return result - # by rerunning we mean only nodes that did finish to run previously - if hash_exists and not isinstance(self, MapNode): - logger.debug('[Node] Rerunning "%s"', self.fullname) + if cached and updated and not isinstance(self, MapNode): + logger.debug('[Node] Rerunning cached, up-to-date node "%s"', self.fullname) if not force_run and str2bool( self.config['execution']['stop_on_first_rerun']): raise Exception( 'Cannot rerun when "stop_on_first_rerun" is set to True') - # Remove hashfile if it exists at this point (re-running) - if op.exists(hashfile): - os.remove(hashfile) + # Remove any hashfile that exists at this point (re)running. + if cached: + for outdatedhash in glob(op.join(self.output_dir(), '_0x*.json')): + os.remove(outdatedhash) + # Hashfile while running - hashfile_unfinished = op.join(outdir, - '_0x%s_unfinished.json' % hashvalue) + hashfile_unfinished = op.join( + outdir, '_0x%s_unfinished.json' % self._hashvalue) # Delete directory contents if this is not a MapNode or can't resume - rm_outdir = not isinstance(self, MapNode) and not ( - self._interface.can_resume and op.isfile(hashfile_unfinished)) - if rm_outdir: + can_resume = not (self._interface.can_resume and op.isfile(hashfile_unfinished)) + if can_resume and not isinstance(self, MapNode): emptydirs(outdir, noexist_ok=True) else: logger.debug('[%sNode] Resume - hashfile=%s', 'Map' * int(isinstance(self, MapNode)), hashfile_unfinished) - if isinstance(self, MapNode): - # remove old json files - for filename in glob(op.join(outdir, '_0x*.json')): - os.remove(filename) + + if isinstance(self, MapNode): + # remove old json files + for filename in glob(op.join(outdir, '_0x*.json')): + os.remove(filename) # Make sure outdir is created makedirs(outdir, exist_ok=True) # Store runtime-hashfile, pre-execution report, the node and the inputs set. - _save_hashfile(hashfile_unfinished, hashed_inputs) + _save_hashfile(hashfile_unfinished, self._hashed_inputs) write_report( self, report_type='preexec', is_mapnode=isinstance(self, MapNode)) savepkl(op.join(outdir, '_node.pklz'), self) @@ -485,7 +503,8 @@ def run(self, updatehash=False): os.chdir(cwd) # Tear-up after success - shutil.move(hashfile_unfinished, hashfile) + shutil.move(hashfile_unfinished, + hashfile_unfinished.replace('_unfinished', '')) write_report( self, report_type='postexec', is_mapnode=isinstance(self, MapNode)) logger.info('[Node] Finished "%s".', self.fullname) @@ -551,8 +570,14 @@ def _get_inputs(self): # Successfully set inputs self._got_inputs = True + def _update_hash(self): + for outdatedhash in glob(op.join(self.output_dir(), '_0x*.json')): + os.remove(outdatedhash) + _save_hashfile(self._hashvalue, self._hashed_inputs) + def _run_interface(self, execute=True, updatehash=False): if updatehash: + self._update_hash() return self._load_results() return self._run_command(execute) @@ -586,7 +611,6 @@ def _load_results(self): return result def _run_command(self, execute, copyfiles=True): - if not execute: try: result = self._load_results() @@ -597,7 +621,8 @@ def _run_command(self, execute, copyfiles=True): copyfiles = False # OE: this was like this before, execute = True # I'll keep them for safety else: - logger.info("[Node] Cached - collecting precomputed outputs") + logger.info('[Node] Cached "%s" - collecting precomputed outputs', + self.fullname) return result # Run command: either execute is true or load_results failed. @@ -1037,6 +1062,10 @@ def _set_mapnode_input(self, name, newvalue): def _get_hashval(self): """Compute hash including iterfield lists.""" self._get_inputs() + + if self._hashvalue is not None and self._hashed_inputs is not None: + return self._hashed_inputs, self._hashvalue + self._check_iterfield() hashinputs = deepcopy(self._interface.inputs) for name in self.iterfield: @@ -1061,7 +1090,8 @@ def _get_hashval(self): hashobject.update(str(sorted_outputs).encode()) hashvalue = hashobject.hexdigest() hashed_inputs.append(('needed_outputs', sorted_outputs)) - return hashed_inputs, hashvalue + self._hashed_inputs, self._hashvalue = hashed_inputs, hashvalue + return self._hashed_inputs, self._hashvalue @property def inputs(self): diff --git a/nipype/pipeline/engine/tests/test_nodes.py b/nipype/pipeline/engine/tests/test_nodes.py index 29d75aa22b..cf6a91ef9b 100644 --- a/nipype/pipeline/engine/tests/test_nodes.py +++ b/nipype/pipeline/engine/tests/test_nodes.py @@ -187,6 +187,10 @@ def test_node_hash(tmpdir): from nipype.interfaces.utility import Function tmpdir.chdir() + config.set_default_config() + config.set('execution', 'stop_on_first_crash', True) + config.set('execution', 'crashdump_dir', os.getcwd()) + def func1(): return 1 @@ -216,31 +220,27 @@ class EngineTestException(Exception): class RaiseError(DistributedPluginBase): def _submit_job(self, node, updatehash=False): - raise EngineTestException('Submit called') + raise EngineTestException( + 'Submit called - cached=%s, updated=%s' % node.is_cached()) # check if a proper exception is raised with pytest.raises(EngineTestException) as excinfo: w1.run(plugin=RaiseError()) - assert 'Submit called' == str(excinfo.value) + assert str(excinfo.value).startswith('Submit called') # generate outputs w1.run(plugin='Linear') # ensure plugin is being called - w1.config['execution'] = { - 'stop_on_first_crash': 'true', - 'local_hash_check': 'false', - 'crashdump_dir': os.getcwd() - } + config.set('execution', 'local_hash_check', False) # rerun to ensure we have outputs w1.run(plugin='Linear') - # set local check - w1.config['execution'] = { - 'stop_on_first_crash': 'true', - 'local_hash_check': 'true', - 'crashdump_dir': os.getcwd() - } + # set local check + config.set('execution', 'local_hash_check', True) + w1 = pe.Workflow(name='test') + w1.connect(n1, ('a', modify), n2, 'a') + w1.base_dir = os.getcwd() w1.run(plugin=RaiseError()) diff --git a/nipype/pipeline/plugins/base.py b/nipype/pipeline/plugins/base.py index c757c3859c..144bbdb470 100644 --- a/nipype/pipeline/plugins/base.py +++ b/nipype/pipeline/plugins/base.py @@ -337,14 +337,14 @@ def _local_hash_check(self, jobid, graph): self.procs[jobid].config['execution']['local_hash_check']): return False - logger.debug('Checking hash (%d) locally', jobid) - - hash_exists, _, _, _ = self.procs[jobid].hash_exists() + cached, updated = self.procs[jobid].is_cached() + logger.debug('Checking hash "%s" locally: cached=%s, updated=%s.', + self.procs[jobid].fullname, cached, updated) overwrite = self.procs[jobid].overwrite always_run = self.procs[jobid]._interface.always_run - if hash_exists and (overwrite is False - or overwrite is None and not always_run): + if cached and updated and (overwrite is False or + overwrite is None and not always_run): logger.debug('Skipping cached node %s with ID %s.', self.procs[jobid]._id, jobid) try: From 64a9b7bf1007e06cca85ef1407a9aa6c3d822285 Mon Sep 17 00:00:00 2001 From: oesteban Date: Wed, 17 Jan 2018 23:22:08 -0800 Subject: [PATCH 3/6] do not submit updatehash --- nipype/pipeline/plugins/multiproc.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index bc66a026e5..f086b0d6f5 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -309,11 +309,12 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.proc_done[jobid] = True self.proc_pending[jobid] = True - # If cached just retrieve it, don't run + # If cached and up-to-date just retrieve it, don't run if self._local_hash_check(jobid, graph): continue - if self.procs[jobid].run_without_submitting: + # updatehash and run_without_submitting are also run locally + if updatehash or self.procs[jobid].run_without_submitting: logger.debug('Running node %s on master thread', self.procs[jobid]) try: From c08d84e42ada1178a0524cb49160283a6cc88821 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 19 Jan 2018 09:53:58 -0800 Subject: [PATCH 4/6] fix magic number --- nipype/pipeline/engine/nodes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index a994ff8eec..60fba40265 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -25,6 +25,7 @@ import socket from copy import deepcopy from glob import glob +from logging import INFO from tempfile import mkdtemp from future import standard_library @@ -356,7 +357,7 @@ def is_cached(self, rm_outdated=False): logger.info('[Node] Outdated cache found for "%s".', self.fullname) # If logging is more verbose than INFO (20), print diff between hashes loglevel = logger.getEffectiveLevel() - if loglevel < 40: # Lazy logging: only < INFO + if loglevel < INFO: # Lazy logging: only < INFO exp_hash_file_base = split_filename(hashfiles[0])[1] exp_hash = exp_hash_file_base[len('_0x'):] logger.log(loglevel, "[Node] Old/new hashes = %s/%s", From f273d18bde4f280de43a54f630350406cbad7315 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 19 Jan 2018 09:55:28 -0800 Subject: [PATCH 5/6] [skip ci] comment at the right spot --- nipype/pipeline/engine/nodes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index 60fba40265..cb7663778a 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -344,13 +344,13 @@ def is_cached(self, rm_outdated=False): hashfiles = [hashfile] if cached else [] - # At this point only one hashfile is in the folder - # and we directly check whether it is updated if not hashfiles: logger.debug('[Node] No hashfiles found in "%s".', outdir) assert(not cached) return False, False + # At this point only one hashfile is in the folder + # and we directly check whether it is updated updated = hashfile == hashfiles[0] if not updated: # Report differences depending on log verbosity cached = True From f5e5f1324d12d3b9bdeeae8eb2e9ef86ab108df3 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 19 Jan 2018 17:46:51 -0800 Subject: [PATCH 6/6] do not allow duplication of needed_outputs --- nipype/pipeline/engine/nodes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/engine/nodes.py b/nipype/pipeline/engine/nodes.py index cb7663778a..b855d7629b 100644 --- a/nipype/pipeline/engine/nodes.py +++ b/nipype/pipeline/engine/nodes.py @@ -219,7 +219,7 @@ def needed_outputs(self): @needed_outputs.setter def needed_outputs(self, new_outputs): """Needed outputs changes the hash, refresh if changed""" - new_outputs = sorted(new_outputs or []) + new_outputs = sorted(list(set(new_outputs or []))) if new_outputs != self._needed_outputs: # Reset hash self._hashvalue = None