From 2e02a8a414a7355aa0b25dcf5d7cc80f332f593e Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Sat, 11 Jan 2020 16:59:05 -0500 Subject: [PATCH 1/9] fix: improve version checking for nodes of workflows --- nipype/__init__.py | 8 +++----- nipype/pipeline/plugins/multiproc.py | 12 ++++++++++-- nipype/pipeline/plugins/tools.py | 3 +++ 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/nipype/__init__.py b/nipype/__init__.py index 18449c5f81..d0b6fc60fa 100644 --- a/nipype/__init__.py +++ b/nipype/__init__.py @@ -14,11 +14,7 @@ import os from distutils.version import LooseVersion -from .info import ( - URL as __url__, - STATUS as __status__, - __version__, -) +from .info import URL as __url__, STATUS as __status__, __version__ from .utils.config import NipypeConfig from .utils.logger import Logging from .refs import due @@ -105,6 +101,8 @@ def check_latest_version(raise_exception=False): packname="nipype", version=__version__, latest=latest["version"] ) ) + else: + logger.info("No new version available.") if latest["bad_versions"] and any( [ LooseVersion(__version__) == LooseVersion(ver) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index dc950385b1..87db326c0a 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -10,7 +10,7 @@ # Import packages import os import multiprocessing as mp -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, wait from traceback import format_exception import sys from logging import INFO @@ -73,6 +73,12 @@ def run_node(node, updatehash, taskid): return result +def process_initializer(cwd): + """Initializes the environment of the child process""" + os.chdir(cwd) + os.environ["NO_ET"] = "1" + + class MultiProcPlugin(DistributedPluginBase): """ Execute workflow with multiprocessing, not sending more jobs at once @@ -137,13 +143,15 @@ def __init__(self, plugin_args=None): mp_context = mp.context.get_context(self.plugin_args.get("mp_context")) self.pool = ProcessPoolExecutor( max_workers=self.processors, - initializer=os.chdir, + initializer=process_initializer, initargs=(self._cwd,), mp_context=mp_context, ) except (AttributeError, TypeError): # Python < 3.7 does not support initialization or contexts self.pool = ProcessPoolExecutor(max_workers=self.processors) + result_future = self.pool.submit(process_initializer, self._cwd) + wait(result_future, timeout=5) self._stats = None diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 2bb31de564..a1f1659f66 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -125,6 +125,9 @@ def create_pyscript(node, updatehash=False, store_exception=True): can_import_matplotlib = False pass +import os +os.environ['NO_ET'] = "1" + from nipype import config, logging from nipype.utils.filemanip import loadpkl, savepkl from socket import gethostname From 23a17f33a6e158aba30411aa4222558a2abe2cbf Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Mon, 13 Jan 2020 22:24:24 -0500 Subject: [PATCH 2/9] enh: use new environment variable to suppress ET for nipype --- nipype/__init__.py | 2 +- nipype/interfaces/base/core.py | 5 ++++- nipype/pipeline/plugins/tools.py | 8 ++++++-- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/nipype/__init__.py b/nipype/__init__.py index d0b6fc60fa..64249929cd 100644 --- a/nipype/__init__.py +++ b/nipype/__init__.py @@ -124,7 +124,7 @@ def check_latest_version(raise_exception=False): if config.getboolean("execution", "check_version"): import __main__ - if not hasattr(__main__, "__file__"): + if not hasattr(__main__, "__file__") and "NO_NIPYPE_ET" in os.environ: from .interfaces.base import BaseInterface if BaseInterface._etelemetry_version_data is None: diff --git a/nipype/interfaces/base/core.py b/nipype/interfaces/base/core.py index 6c11084032..75798fb553 100644 --- a/nipype/interfaces/base/core.py +++ b/nipype/interfaces/base/core.py @@ -168,7 +168,10 @@ class BaseInterface(Interface): def __init__( self, from_file=None, resource_monitor=None, ignore_exception=False, **inputs ): - if config.getboolean("execution", "check_version"): + if ( + config.getboolean("execution", "check_version") + and "NO_NIPYPE_ET" in os.environ + ): from ... import check_latest_version if BaseInterface._etelemetry_version_data is None: diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index a1f1659f66..8f4af80d8f 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -126,9 +126,13 @@ def create_pyscript(node, updatehash=False, store_exception=True): pass import os -os.environ['NO_ET'] = "1" - +value = os.environ.get('NO_NIPYPE_ET', None) +if value is None: + os.environ['NO_NIPYPE_ET'] = "1" from nipype import config, logging +if value is None: + del os.environ['NO_NIPYPE_ET'] + from nipype.utils.filemanip import loadpkl, savepkl from socket import gethostname from traceback import format_exception From 2ba619d960b2d4174495f3885a429e2f9f557dfd Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 14 Jan 2020 07:32:06 -0500 Subject: [PATCH 3/9] fix: wait syntax --- nipype/pipeline/plugins/multiproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 87db326c0a..b6e3718a37 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -151,7 +151,7 @@ def __init__(self, plugin_args=None): # Python < 3.7 does not support initialization or contexts self.pool = ProcessPoolExecutor(max_workers=self.processors) result_future = self.pool.submit(process_initializer, self._cwd) - wait(result_future, timeout=5) + wait([result_future], timeout=5) self._stats = None From 591fc63a48d056a68f336c7035a93007aaf4d11c Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 14 Jan 2020 17:04:20 -0500 Subject: [PATCH 4/9] Apply suggestions from code review Co-Authored-By: Chris Markiewicz --- nipype/__init__.py | 2 +- nipype/interfaces/base/core.py | 2 +- nipype/pipeline/plugins/multiproc.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nipype/__init__.py b/nipype/__init__.py index 64249929cd..a1a1f8095c 100644 --- a/nipype/__init__.py +++ b/nipype/__init__.py @@ -124,7 +124,7 @@ def check_latest_version(raise_exception=False): if config.getboolean("execution", "check_version"): import __main__ - if not hasattr(__main__, "__file__") and "NO_NIPYPE_ET" in os.environ: + if not hasattr(__main__, "__file__") and "NO_NIPYPE_ET" not in os.environ: from .interfaces.base import BaseInterface if BaseInterface._etelemetry_version_data is None: diff --git a/nipype/interfaces/base/core.py b/nipype/interfaces/base/core.py index 75798fb553..45d14af8da 100644 --- a/nipype/interfaces/base/core.py +++ b/nipype/interfaces/base/core.py @@ -170,7 +170,7 @@ def __init__( ): if ( config.getboolean("execution", "check_version") - and "NO_NIPYPE_ET" in os.environ + and "NO_NIPYPE_ET" not in os.environ ): from ... import check_latest_version diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index b6e3718a37..5c3ad0926e 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -76,7 +76,7 @@ def run_node(node, updatehash, taskid): def process_initializer(cwd): """Initializes the environment of the child process""" os.chdir(cwd) - os.environ["NO_ET"] = "1" + os.environ["NO_NIPYPE_ET"] = "1" class MultiProcPlugin(DistributedPluginBase): From 39a2cbd2a2098eed394dfcabc6d0600584696d16 Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Tue, 14 Jan 2020 17:07:03 -0500 Subject: [PATCH 5/9] fix: disable for all jobs --- nipype/pipeline/plugins/tools.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index 8f4af80d8f..c34072c0c3 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -128,10 +128,9 @@ def create_pyscript(node, updatehash=False, store_exception=True): import os value = os.environ.get('NO_NIPYPE_ET', None) if value is None: + # disable ET for any submitted job os.environ['NO_NIPYPE_ET'] = "1" from nipype import config, logging -if value is None: - del os.environ['NO_NIPYPE_ET'] from nipype.utils.filemanip import loadpkl, savepkl from socket import gethostname From 3a9ac481ef4f7bd78a7fd6e23f48ee91e68c164b Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Tue, 14 Jan 2020 21:55:00 -0500 Subject: [PATCH 6/9] ENH: Add process_initializer for LegacyMultiProc --- nipype/pipeline/plugins/legacymultiproc.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index 620aadb422..12bd09657f 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -153,6 +153,12 @@ class NonDaemonPool(pool.Pool): Process = NonDaemonProcess +def process_initializer(cwd): + """Initializes the environment of the child process""" + os.chdir(cwd) + os.environ["NO_NIPYPE_ET"] = "1" + + class LegacyMultiProcPlugin(DistributedPluginBase): """ Execute workflow with multiprocessing, not sending more jobs at once @@ -223,7 +229,7 @@ def __init__(self, plugin_args=None): self.pool = NipypePool( processes=self.processors, maxtasksperchild=maxtasks, - initializer=os.chdir, + initializer=process_initializer, initargs=(self._cwd,), ) except TypeError: From 906062da3544ca4de5113cf9043388ea3feec324 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Tue, 14 Jan 2020 21:55:24 -0500 Subject: [PATCH 7/9] FIX: Call get_context from multiprocessing --- nipype/pipeline/plugins/multiproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 5c3ad0926e..42edb3999b 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -140,7 +140,7 @@ def __init__(self, plugin_args=None): ) try: - mp_context = mp.context.get_context(self.plugin_args.get("mp_context")) + mp_context = mp.get_context(self.plugin_args.get("mp_context")) self.pool = ProcessPoolExecutor( max_workers=self.processors, initializer=process_initializer, From bb51c57eef0e0d10fa56eba4f99f1573512cb5dd Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Tue, 14 Jan 2020 21:56:15 -0500 Subject: [PATCH 8/9] TEST: Validate etelemetry should be skipped in workflow runs --- nipype/tests/test_nipype.py | 87 +++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/nipype/tests/test_nipype.py b/nipype/tests/test_nipype.py index ab3499c8db..b411df1104 100644 --- a/nipype/tests/test_nipype.py +++ b/nipype/tests/test_nipype.py @@ -19,3 +19,90 @@ def test_nipype_info(): def test_git_hash(): # removing the first "g" from gitversion get_nipype_gitversion()[1:] == get_info()["commit_hash"] + + +def _check_no_et(): + import os + from unittest.mock import patch + + et = os.getenv("NO_NIPYPE_ET") is None + + with patch.dict("os.environ", {"NO_NIPYPE_ET": "1"}): + from nipype.interfaces.base import BaseInterface + + ver_data = BaseInterface._etelemetry_version_data + + if et and ver_data is None: + raise ValueError( + "etelemetry enabled and version data missing - double hits likely" + ) + + return et + + +def test_no_et(tmp_path): + from unittest.mock import patch + from nipype.pipeline import engine as pe + from nipype.interfaces import utility as niu + from nipype.interfaces.base import BaseInterface + + # Pytest doesn't trigger this, so let's pretend it's there + with patch.object(BaseInterface, "_etelemetry_version_data", {}): + + # Direct function call - environment not set + f = niu.Function(function=_check_no_et) + res = f.run() + assert res.outputs.out is True + + # Basic node - environment not set + n = pe.Node( + niu.Function(function=_check_no_et), name="n", base_dir=str(tmp_path) + ) + res = n.run() + assert res.outputs.out is True + + # Linear run - environment not set + wf1 = pe.Workflow(name="wf1", base_dir=str(tmp_path)) + wf1.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) + res = wf1.run() + assert next(iter(res.nodes)).result.outputs.out is True + + # MultiProc run - environment initialized with NO_NIPYPE_ET + wf2 = pe.Workflow(name="wf2", base_dir=str(tmp_path)) + wf2.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) + res = wf2.run(plugin="MultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is False + + # LegacyMultiProc run - environment initialized with NO_NIPYPE_ET + wf3 = pe.Workflow(name="wf3", base_dir=str(tmp_path)) + wf3.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) + res = wf3.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is False + + # run_without_submitting - environment not set + wf4 = pe.Workflow(name="wf4", base_dir=str(tmp_path)) + wf4.add_nodes( + [ + pe.Node( + niu.Function(function=_check_no_et), + run_without_submitting=True, + name="n", + ) + ] + ) + res = wf4.run(plugin="MultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is True + + # LegacyMultiProc run - environment initialized with NO_NIPYPE_ET + wf5 = pe.Workflow(name="wf5", base_dir=str(tmp_path)) + wf5.add_nodes( + [ + pe.Node( + niu.Function(function=_check_no_et), + run_without_submitting=True, + name="n", + ) + ] + ) + res = wf5.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1}) + assert next(iter(res.nodes)).result.outputs.out is True From e2db1927dfdbc406aafaadb56ff2c4b44e6fa9ba Mon Sep 17 00:00:00 2001 From: Satrajit Ghosh Date: Wed, 15 Jan 2020 00:43:04 -0500 Subject: [PATCH 9/9] enh: change variable name to be consistent --- nipype/__init__.py | 2 +- nipype/interfaces/base/core.py | 2 +- nipype/pipeline/plugins/legacymultiproc.py | 2 +- nipype/pipeline/plugins/multiproc.py | 2 +- nipype/pipeline/plugins/tools.py | 4 ++-- nipype/tests/test_nipype.py | 10 +++++----- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/nipype/__init__.py b/nipype/__init__.py index a1a1f8095c..43e9011175 100644 --- a/nipype/__init__.py +++ b/nipype/__init__.py @@ -124,7 +124,7 @@ def check_latest_version(raise_exception=False): if config.getboolean("execution", "check_version"): import __main__ - if not hasattr(__main__, "__file__") and "NO_NIPYPE_ET" not in os.environ: + if not hasattr(__main__, "__file__") and "NIPYPE_NO_ET" not in os.environ: from .interfaces.base import BaseInterface if BaseInterface._etelemetry_version_data is None: diff --git a/nipype/interfaces/base/core.py b/nipype/interfaces/base/core.py index 45d14af8da..82da393a84 100644 --- a/nipype/interfaces/base/core.py +++ b/nipype/interfaces/base/core.py @@ -170,7 +170,7 @@ def __init__( ): if ( config.getboolean("execution", "check_version") - and "NO_NIPYPE_ET" not in os.environ + and "NIPYPE_NO_ET" not in os.environ ): from ... import check_latest_version diff --git a/nipype/pipeline/plugins/legacymultiproc.py b/nipype/pipeline/plugins/legacymultiproc.py index 12bd09657f..528184472d 100644 --- a/nipype/pipeline/plugins/legacymultiproc.py +++ b/nipype/pipeline/plugins/legacymultiproc.py @@ -156,7 +156,7 @@ class NonDaemonPool(pool.Pool): def process_initializer(cwd): """Initializes the environment of the child process""" os.chdir(cwd) - os.environ["NO_NIPYPE_ET"] = "1" + os.environ["NIPYPE_NO_ET"] = "1" class LegacyMultiProcPlugin(DistributedPluginBase): diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 42edb3999b..eac662533c 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -76,7 +76,7 @@ def run_node(node, updatehash, taskid): def process_initializer(cwd): """Initializes the environment of the child process""" os.chdir(cwd) - os.environ["NO_NIPYPE_ET"] = "1" + os.environ["NIPYPE_NO_ET"] = "1" class MultiProcPlugin(DistributedPluginBase): diff --git a/nipype/pipeline/plugins/tools.py b/nipype/pipeline/plugins/tools.py index c34072c0c3..ef213be36d 100644 --- a/nipype/pipeline/plugins/tools.py +++ b/nipype/pipeline/plugins/tools.py @@ -126,10 +126,10 @@ def create_pyscript(node, updatehash=False, store_exception=True): pass import os -value = os.environ.get('NO_NIPYPE_ET', None) +value = os.environ.get('NIPYPE_NO_ET', None) if value is None: # disable ET for any submitted job - os.environ['NO_NIPYPE_ET'] = "1" + os.environ['NIPYPE_NO_ET'] = "1" from nipype import config, logging from nipype.utils.filemanip import loadpkl, savepkl diff --git a/nipype/tests/test_nipype.py b/nipype/tests/test_nipype.py index b411df1104..60fa92d141 100644 --- a/nipype/tests/test_nipype.py +++ b/nipype/tests/test_nipype.py @@ -25,9 +25,9 @@ def _check_no_et(): import os from unittest.mock import patch - et = os.getenv("NO_NIPYPE_ET") is None + et = os.getenv("NIPYPE_NO_ET") is None - with patch.dict("os.environ", {"NO_NIPYPE_ET": "1"}): + with patch.dict("os.environ", {"NIPYPE_NO_ET": "1"}): from nipype.interfaces.base import BaseInterface ver_data = BaseInterface._etelemetry_version_data @@ -67,13 +67,13 @@ def test_no_et(tmp_path): res = wf1.run() assert next(iter(res.nodes)).result.outputs.out is True - # MultiProc run - environment initialized with NO_NIPYPE_ET + # MultiProc run - environment initialized with NIPYPE_NO_ET wf2 = pe.Workflow(name="wf2", base_dir=str(tmp_path)) wf2.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) res = wf2.run(plugin="MultiProc", plugin_args={"n_procs": 1}) assert next(iter(res.nodes)).result.outputs.out is False - # LegacyMultiProc run - environment initialized with NO_NIPYPE_ET + # LegacyMultiProc run - environment initialized with NIPYPE_NO_ET wf3 = pe.Workflow(name="wf3", base_dir=str(tmp_path)) wf3.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")]) res = wf3.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1}) @@ -93,7 +93,7 @@ def test_no_et(tmp_path): res = wf4.run(plugin="MultiProc", plugin_args={"n_procs": 1}) assert next(iter(res.nodes)).result.outputs.out is True - # LegacyMultiProc run - environment initialized with NO_NIPYPE_ET + # LegacyMultiProc run - environment initialized with NIPYPE_NO_ET wf5 = pe.Workflow(name="wf5", base_dir=str(tmp_path)) wf5.add_nodes( [