From 2042bf61702031bdec64e35dac963a5218bee122 Mon Sep 17 00:00:00 2001 From: "Christopher J. Markiewicz" Date: Wed, 14 Nov 2018 12:08:47 -0500 Subject: [PATCH] ENH: Add mp_context plugin arg for MultiProc --- nipype/pipeline/plugins/multiproc.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index c89a6af8e8..d2ef363a34 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -11,7 +11,7 @@ # Import packages import os -from multiprocessing import cpu_count +import multiprocessing as mp from concurrent.futures import ProcessPoolExecutor from traceback import format_exception import sys @@ -95,7 +95,7 @@ class MultiProcPlugin(DistributedPluginBase): Currently supported options are: - - non_daemon : boolean flag to execute as non-daemon processes + - non_daemon: boolean flag to execute as non-daemon processes - n_procs: maximum number of threads to be executed in parallel - memory_gb: maximum memory (in GB) that can be used at once. - raise_insufficient: raise error if the requested resources for @@ -104,8 +104,7 @@ class MultiProcPlugin(DistributedPluginBase): - scheduler: sort jobs topologically (``'tsort'``, default value) or prioritize jobs by, first, memory consumption and, second, number of threads (``'mem_thread'`` option). - - maxtasksperchild: number of nodes to run on each process before - refreshing the worker (default: 10). + - mp_context: name of multiprocessing context to use """ @@ -121,7 +120,7 @@ def __init__(self, plugin_args=None): self._cwd = os.getcwd() # Read in options or set defaults. - self.processors = self.plugin_args.get('n_procs', cpu_count()) + self.processors = self.plugin_args.get('n_procs', mp.cpu_count()) self.memory_gb = self.plugin_args.get( 'memory_gb', # Allocate 90% of system memory get_system_total_memory_gb() * 0.9) @@ -133,7 +132,16 @@ def __init__(self, plugin_args=None): 'mem_gb=%0.2f, cwd=%s)', self.processors, self.memory_gb, self._cwd) - self.pool = ProcessPoolExecutor(max_workers=self.processors) + try: + mp_context = mp.context.get_context( + self.plugin_args.get('mp_context')) + self.pool = ProcessPoolExecutor(max_workers=self.processors, + initializer=os.chdir, + initargs=(self._cwd,), + mp_context=mp_context) + except (AttributeError, TypeError): + # Python < 3.7 does not support initialization or contexts + self.pool = ProcessPoolExecutor(max_workers=self.processors) self._stats = None