Skip to content

MAINT: Simplify interface execution and better error handling of Node #3349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 29, 2021
4 changes: 1 addition & 3 deletions nipype/interfaces/base/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
runtime.stderr = None
runtime.cmdline = self.cmdline
runtime.environ.update(out_environ)
runtime.success_codes = correct_return_codes

# which $cmd
executable_name = shlex.split(self._cmd_prefix + self.cmd)[0]
Expand All @@ -742,9 +743,6 @@ def _run_interface(self, runtime, correct_return_codes=(0,)):
else "<skipped>"
)
runtime = run_command(runtime, output=self.terminal_output)
if runtime.returncode is None or runtime.returncode not in correct_return_codes:
self.raise_exception(runtime)

return runtime

def _format_arg(self, name, trait_spec, value):
Expand Down
5 changes: 5 additions & 0 deletions nipype/interfaces/base/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ def __exit__(self, exc_type, exc_value, exc_tb):
if self._ignore_exc:
return True

if hasattr(self._runtime, "cmdline"):
retcode = self._runtime.returncode
if retcode not in self._runtime.success_codes:
self._runtime.traceback = f"RuntimeError: subprocess exited with code {retcode}."

@property
def runtime(self):
return self._runtime
Expand Down
24 changes: 5 additions & 19 deletions nipype/interfaces/matlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,11 @@


def get_matlab_command():
if "NIPYPE_NO_MATLAB" in os.environ:
return None

try:
matlab_cmd = os.environ["MATLABCMD"]
except:
matlab_cmd = "matlab"

try:
res = CommandLine(
command="which",
args=matlab_cmd,
resource_monitor=False,
terminal_output="allatonce",
).run()
matlab_path = res.runtime.stdout.strip()
except Exception:
return None
return matlab_cmd
"""Determine whether Matlab is installed and can be executed."""
if "NIPYPE_NO_MATLAB" not in os.environ:
from nipype.utils.filemanip import which

return which(os.getenv("MATLABCMD", "matlab"))


no_matlab = get_matlab_command() is None
Expand Down
2 changes: 1 addition & 1 deletion nipype/interfaces/tests/test_matlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def test_run_interface(tmpdir):
# bypasses ubuntu dash issue
mc = mlab.MatlabCommand(script="foo;", paths=[tmpdir.strpath], mfile=True)
assert not os.path.exists(default_script_file), "scriptfile should not exist 4."
with pytest.raises(RuntimeError):
with pytest.raises(OSError):
mc.run()
assert os.path.exists(default_script_file), "scriptfile should exist 4."
if os.path.exists(default_script_file): # cleanup
Expand Down
2 changes: 1 addition & 1 deletion nipype/interfaces/utility/tests/test_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def should_fail(tmp):


def test_should_fail(tmpdir):
with pytest.raises(NameError):
with pytest.raises(pe.nodes.NodeExecutionError):
should_fail(tmpdir)


Expand Down
107 changes: 44 additions & 63 deletions nipype/pipeline/engine/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import os
import os.path as op
from pathlib import Path
import shutil
import socket
from copy import deepcopy
Expand All @@ -30,7 +31,6 @@
load_json,
emptydirs,
savepkl,
indirectory,
silentrm,
)

Expand Down Expand Up @@ -64,6 +64,10 @@
logger = logging.getLogger("nipype.workflow")


class NodeExecutionError(RuntimeError):
"""A nipype-specific name for exceptions when executing a Node."""


class Node(EngineBase):
"""
Wraps interface objects for use in pipeline
Expand Down Expand Up @@ -98,7 +102,7 @@ def __init__(
run_without_submitting=False,
n_procs=None,
mem_gb=0.20,
**kwargs
**kwargs,
):
"""
Parameters
Expand Down Expand Up @@ -439,7 +443,8 @@ def run(self, updatehash=False):
)

# Check hash, check whether run should be enforced
logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir)
if not isinstance(self, MapNode):
logger.info(f'[Node] Setting-up "{self.fullname}" in "{outdir}".')
cached, updated = self.is_cached()

# If the node is cached, check on pklz files and finish
Expand Down Expand Up @@ -530,7 +535,6 @@ def run(self, updatehash=False):
# Tear-up after success
shutil.move(hashfile_unfinished, hashfile_unfinished.replace("_unfinished", ""))
write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
logger.info('[Node] Finished "%s".', self.fullname)
return result

def _get_hashval(self):
Expand Down Expand Up @@ -582,7 +586,7 @@ def _get_inputs(self):
logger.critical("%s", e)

if outputs is None:
raise RuntimeError(
raise NodeExecutionError(
"""\
Error populating the inputs of node "%s": the results file of the source node \
(%s) does not contain any outputs."""
Expand Down Expand Up @@ -697,79 +701,56 @@ def _run_command(self, execute, copyfiles=True):
)
return result

outdir = self.output_dir()
# Run command: either execute is true or load_results failed.
result = InterfaceResult(
interface=self._interface.__class__,
runtime=Bunch(
cwd=outdir,
returncode=1,
environ=dict(os.environ),
hostname=socket.gethostname(),
),
inputs=self._interface.inputs.get_traitsfree(),
)

outdir = Path(self.output_dir())
if copyfiles:
self._originputs = deepcopy(self._interface.inputs)
self._copyfiles_to_wd(execute=execute)

message = '[Node] Running "{}" ("{}.{}")'.format(
self.name, self._interface.__module__, self._interface.__class__.__name__
# Run command: either execute is true or load_results failed.
logger.info(
f'[Node] Executing "{self.name}" <{self._interface.__module__}'
f".{self._interface.__class__.__name__}>"
)
# Invoke core run method of the interface ignoring exceptions
result = self._interface.run(cwd=outdir, ignore_exception=True)
logger.info(
f'[Node] Finished "{self.name}", elapsed time {result.runtime.duration}s.'
)

if issubclass(self._interface.__class__, CommandLine):
try:
with indirectory(outdir):
cmd = self._interface.cmdline
except Exception as msg:
result.runtime.stderr = "{}\n\n{}".format(
getattr(result.runtime, "stderr", ""), msg
)
_save_resultfile(
result,
outdir,
self.name,
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
)
raise
cmdfile = op.join(outdir, "command.txt")
with open(cmdfile, "wt") as fd:
print(cmd + "\n", file=fd)
message += ", a CommandLine Interface with command:\n{}".format(cmd)
logger.info(message)
try:
result = self._interface.run(cwd=outdir)
except Exception as msg:
result.runtime.stderr = "%s\n\n%s".format(
getattr(result.runtime, "stderr", ""), msg
)
_save_resultfile(
result,
# Write out command line as it happened
Path.write_text(outdir / "command.txt", f"{result.runtime.cmdline}\n")

exc_tb = getattr(result.runtime, "traceback", None)

if not exc_tb:
# Clean working directory if no errors
dirs2keep = None
if isinstance(self, MapNode):
dirs2keep = [op.join(outdir, "mapflow")]

result.outputs = clean_working_directory(
result.outputs,
outdir,
self.name,
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
self._interface.inputs,
self.needed_outputs,
self.config,
dirs2keep=dirs2keep,
)
raise

dirs2keep = None
if isinstance(self, MapNode):
dirs2keep = [op.join(outdir, "mapflow")]

result.outputs = clean_working_directory(
result.outputs,
outdir,
self._interface.inputs,
self.needed_outputs,
self.config,
dirs2keep=dirs2keep,
)
# Store results file under all circumstances
_save_resultfile(
result,
outdir,
self.name,
rebase=str2bool(self.config["execution"]["use_relative_paths"]),
)

if exc_tb:
raise NodeExecutionError(
f"Exception raised while executing Node {self.name}.\n\n{result.runtime.traceback}"
)

return result

def _copyfiles_to_wd(self, execute=True, linksonly=False):
Expand Down Expand Up @@ -1290,7 +1271,7 @@ def _collate_results(self, nodes):
if code is not None:
msg += ["Subnode %d failed" % i]
msg += ["Error: %s" % str(code)]
raise Exception(
raise NodeExecutionError(
"Subnodes of node: %s failed:\n%s" % (self.name, "\n".join(msg))
)

Expand Down
2 changes: 1 addition & 1 deletion nipype/pipeline/engine/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def test_mapnode_crash(tmpdir):
node.config = deepcopy(config._sections)
node.config["execution"]["stop_on_first_crash"] = True
node.base_dir = tmpdir.strpath
with pytest.raises(TypeError):
with pytest.raises(pe.nodes.NodeExecutionError):
node.run()
os.chdir(cwd)

Expand Down
16 changes: 15 additions & 1 deletion nipype/pipeline/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def run(self, graph, config, updatehash=False):
self.mapnodesubids = {}
# setup polling - TODO: change to threaded model
notrun = []
errors = []

old_progress_stats = None
old_presub_stats = None
Expand Down Expand Up @@ -155,14 +156,16 @@ def run(self, graph, config, updatehash=False):
taskid, jobid = self.pending_tasks.pop()
try:
result = self._get_result(taskid)
except Exception:
except Exception as exc:
notrun.append(self._clean_queue(jobid, graph))
errors.append(exc)
else:
if result:
if result["traceback"]:
notrun.append(
self._clean_queue(jobid, graph, result=result)
)
errors.append("".join(result["traceback"]))
else:
self._task_finished_cb(jobid)
self._remove_node_dirs()
Expand Down Expand Up @@ -194,6 +197,17 @@ def run(self, graph, config, updatehash=False):
# close any open resources
self._postrun_check()

if errors:
# If one or more nodes failed, re-rise first of them
error, cause = errors[0], None
if isinstance(error, str):
error = RuntimeError(error)

if len(errors) > 1:
error, cause = RuntimeError(f"{len(errors)} raised. Re-raising first."), error

raise error from cause

def _get_result(self, taskid):
raise NotImplementedError

Expand Down
17 changes: 10 additions & 7 deletions nipype/pipeline/plugins/linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def run(self, graph, config, updatehash=False):
old_wd = os.getcwd()
notrun = []
donotrun = []
stop_on_first_crash = str2bool(config["execution"]["stop_on_first_crash"])
errors = []
nodes, _ = topological_sort(graph)
for node in nodes:
endstatus = "end"
Expand All @@ -43,27 +45,28 @@ def run(self, graph, config, updatehash=False):
if self._status_callback:
self._status_callback(node, "start")
node.run(updatehash=updatehash)
except:
except Exception as exc:
endstatus = "exception"
# bare except, but i really don't know where a
# node might fail
crashfile = report_crash(node)
if str2bool(config["execution"]["stop_on_first_crash"]):
raise
# remove dependencies from queue
subnodes = [s for s in dfs_preorder(graph, node)]
notrun.append(
{"node": node, "dependents": subnodes, "crashfile": crashfile}
)
donotrun.extend(subnodes)
# Delay raising the crash until we cleaned the house
if str2bool(config["execution"]["stop_on_first_crash"]):
os.chdir(old_wd) # Return wherever we were before
report_nodes_not_run(notrun) # report before raising
raise
errors.append(exc)

if stop_on_first_crash:
break
finally:
if self._status_callback:
self._status_callback(node, endstatus)

os.chdir(old_wd) # Return wherever we were before
report_nodes_not_run(notrun)
if errors:
# Re-raise exception of first failed node
raise errors[0]
7 changes: 4 additions & 3 deletions nipype/pipeline/plugins/tests/test_sgelike.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def test_crashfile_creation(tmp_path):
sgelike_plugin = SGELikeBatchManagerBase("")
with pytest.raises(RuntimeError) as e:
assert pipe.run(plugin=sgelike_plugin)
assert str(e.value) == "Workflow did not execute cleanly. Check log for details"

crashfiles = tmp_path.glob("crash*crasher*.pklz")
assert len(list(crashfiles)) == 1
crashfiles = list(tmp_path.glob("crash*crasher*.pklz")) + list(
tmp_path.glob("crash*crasher*.txt")
)
assert len(crashfiles) == 1
3 changes: 0 additions & 3 deletions nipype/pipeline/plugins/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ def report_nodes_not_run(notrun):
for subnode in info["dependents"]:
logger.debug(subnode._id)
logger.info("***********************************")
raise RuntimeError(
("Workflow did not execute cleanly. " "Check log for details")
)


def create_pyscript(node, updatehash=False, store_exception=True):
Expand Down