Skip to content

fix: improve version checking for nodes of workflows #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 16, 2020
10 changes: 4 additions & 6 deletions nipype/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
import os
from distutils.version import LooseVersion

from .info import (
URL as __url__,
STATUS as __status__,
__version__,
)
from .info import URL as __url__, STATUS as __status__, __version__
from .utils.config import NipypeConfig
from .utils.logger import Logging
from .refs import due
Expand Down Expand Up @@ -105,6 +101,8 @@ def check_latest_version(raise_exception=False):
packname="nipype", version=__version__, latest=latest["version"]
)
)
else:
logger.info("No new version available.")
if latest["bad_versions"] and any(
[
LooseVersion(__version__) == LooseVersion(ver)
Expand All @@ -126,7 +124,7 @@ def check_latest_version(raise_exception=False):
if config.getboolean("execution", "check_version"):
import __main__

if not hasattr(__main__, "__file__"):
if not hasattr(__main__, "__file__") and "NIPYPE_NO_ET" not in os.environ:
from .interfaces.base import BaseInterface

if BaseInterface._etelemetry_version_data is None:
Expand Down
5 changes: 4 additions & 1 deletion nipype/interfaces/base/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ class BaseInterface(Interface):
def __init__(
self, from_file=None, resource_monitor=None, ignore_exception=False, **inputs
):
if config.getboolean("execution", "check_version"):
if (
config.getboolean("execution", "check_version")
and "NIPYPE_NO_ET" not in os.environ
):
from ... import check_latest_version

if BaseInterface._etelemetry_version_data is None:
Expand Down
8 changes: 7 additions & 1 deletion nipype/pipeline/plugins/legacymultiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ class NonDaemonPool(pool.Pool):
Process = NonDaemonProcess


def process_initializer(cwd):
"""Initializes the environment of the child process"""
os.chdir(cwd)
os.environ["NIPYPE_NO_ET"] = "1"


class LegacyMultiProcPlugin(DistributedPluginBase):
"""
Execute workflow with multiprocessing, not sending more jobs at once
Expand Down Expand Up @@ -223,7 +229,7 @@ def __init__(self, plugin_args=None):
self.pool = NipypePool(
processes=self.processors,
maxtasksperchild=maxtasks,
initializer=os.chdir,
initializer=process_initializer,
initargs=(self._cwd,),
)
except TypeError:
Expand Down
14 changes: 11 additions & 3 deletions nipype/pipeline/plugins/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# Import packages
import os
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, wait
from traceback import format_exception
import sys
from logging import INFO
Expand Down Expand Up @@ -73,6 +73,12 @@ def run_node(node, updatehash, taskid):
return result


def process_initializer(cwd):
"""Initializes the environment of the child process"""
os.chdir(cwd)
os.environ["NIPYPE_NO_ET"] = "1"


class MultiProcPlugin(DistributedPluginBase):
"""
Execute workflow with multiprocessing, not sending more jobs at once
Expand Down Expand Up @@ -134,16 +140,18 @@ def __init__(self, plugin_args=None):
)

try:
mp_context = mp.context.get_context(self.plugin_args.get("mp_context"))
mp_context = mp.get_context(self.plugin_args.get("mp_context"))
self.pool = ProcessPoolExecutor(
max_workers=self.processors,
initializer=os.chdir,
initializer=process_initializer,
initargs=(self._cwd,),
mp_context=mp_context,
)
except (AttributeError, TypeError):
# Python < 3.7 does not support initialization or contexts
self.pool = ProcessPoolExecutor(max_workers=self.processors)
result_future = self.pool.submit(process_initializer, self._cwd)
wait([result_future], timeout=5)

self._stats = None

Expand Down
6 changes: 6 additions & 0 deletions nipype/pipeline/plugins/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,13 @@ def create_pyscript(node, updatehash=False, store_exception=True):
can_import_matplotlib = False
pass

import os
value = os.environ.get('NIPYPE_NO_ET', None)
if value is None:
# disable ET for any submitted job
os.environ['NIPYPE_NO_ET'] = "1"
from nipype import config, logging

from nipype.utils.filemanip import loadpkl, savepkl
from socket import gethostname
from traceback import format_exception
Expand Down
87 changes: 87 additions & 0 deletions nipype/tests/test_nipype.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,90 @@ def test_nipype_info():
def test_git_hash():
# removing the first "g" from gitversion
get_nipype_gitversion()[1:] == get_info()["commit_hash"]


def _check_no_et():
import os
from unittest.mock import patch

et = os.getenv("NIPYPE_NO_ET") is None

with patch.dict("os.environ", {"NIPYPE_NO_ET": "1"}):
from nipype.interfaces.base import BaseInterface

ver_data = BaseInterface._etelemetry_version_data

if et and ver_data is None:
raise ValueError(
"etelemetry enabled and version data missing - double hits likely"
)

return et


def test_no_et(tmp_path):
from unittest.mock import patch
from nipype.pipeline import engine as pe
from nipype.interfaces import utility as niu
from nipype.interfaces.base import BaseInterface

# Pytest doesn't trigger this, so let's pretend it's there
with patch.object(BaseInterface, "_etelemetry_version_data", {}):

# Direct function call - environment not set
f = niu.Function(function=_check_no_et)
res = f.run()
assert res.outputs.out is True

# Basic node - environment not set
n = pe.Node(
niu.Function(function=_check_no_et), name="n", base_dir=str(tmp_path)
)
res = n.run()
assert res.outputs.out is True

# Linear run - environment not set
wf1 = pe.Workflow(name="wf1", base_dir=str(tmp_path))
wf1.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")])
res = wf1.run()
assert next(iter(res.nodes)).result.outputs.out is True

# MultiProc run - environment initialized with NIPYPE_NO_ET
wf2 = pe.Workflow(name="wf2", base_dir=str(tmp_path))
wf2.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")])
res = wf2.run(plugin="MultiProc", plugin_args={"n_procs": 1})
assert next(iter(res.nodes)).result.outputs.out is False

# LegacyMultiProc run - environment initialized with NIPYPE_NO_ET
wf3 = pe.Workflow(name="wf3", base_dir=str(tmp_path))
wf3.add_nodes([pe.Node(niu.Function(function=_check_no_et), name="n")])
res = wf3.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1})
assert next(iter(res.nodes)).result.outputs.out is False

# run_without_submitting - environment not set
wf4 = pe.Workflow(name="wf4", base_dir=str(tmp_path))
wf4.add_nodes(
[
pe.Node(
niu.Function(function=_check_no_et),
run_without_submitting=True,
name="n",
)
]
)
res = wf4.run(plugin="MultiProc", plugin_args={"n_procs": 1})
assert next(iter(res.nodes)).result.outputs.out is True

# LegacyMultiProc run - environment initialized with NIPYPE_NO_ET
wf5 = pe.Workflow(name="wf5", base_dir=str(tmp_path))
wf5.add_nodes(
[
pe.Node(
niu.Function(function=_check_no_et),
run_without_submitting=True,
name="n",
)
]
)
res = wf5.run(plugin="LegacyMultiProc", plugin_args={"n_procs": 1})
assert next(iter(res.nodes)).result.outputs.out is True