Skip to content

Commit 16f0190

Browse files
committed
ENH: Add mp_context plugin arg for MultiProc
1 parent e475331 commit 16f0190

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

nipype/pipeline/plugins/multiproc.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
# Import packages
1313
import os
14-
from multiprocessing import cpu_count
14+
import multiprocessing as mp
1515
from concurrent.futures import ProcessPoolExecutor
1616
from traceback import format_exception
1717
import sys
@@ -95,7 +95,7 @@ class MultiProcPlugin(DistributedPluginBase):
9595
9696
Currently supported options are:
9797
98-
- non_daemon : boolean flag to execute as non-daemon processes
98+
- non_daemon: boolean flag to execute as non-daemon processes
9999
- n_procs: maximum number of threads to be executed in parallel
100100
- memory_gb: maximum memory (in GB) that can be used at once.
101101
- raise_insufficient: raise error if the requested resources for
@@ -104,8 +104,7 @@ class MultiProcPlugin(DistributedPluginBase):
104104
- scheduler: sort jobs topologically (``'tsort'``, default value)
105105
or prioritize jobs by, first, memory consumption and, second,
106106
number of threads (``'mem_thread'`` option).
107-
- maxtasksperchild: number of nodes to run on each process before
108-
refreshing the worker (default: 10).
107+
- mp_context: name of multiprocessing context to use
109108
110109
"""
111110

@@ -121,7 +120,7 @@ def __init__(self, plugin_args=None):
121120
self._cwd = os.getcwd()
122121

123122
# Read in options or set defaults.
124-
self.processors = self.plugin_args.get('n_procs', cpu_count())
123+
self.processors = self.plugin_args.get('n_procs', mp.cpu_count())
125124
self.memory_gb = self.plugin_args.get(
126125
'memory_gb', # Allocate 90% of system memory
127126
get_system_total_memory_gb() * 0.9)
@@ -133,7 +132,16 @@ def __init__(self, plugin_args=None):
133132
'mem_gb=%0.2f, cwd=%s)',
134133
self.processors, self.memory_gb, self._cwd)
135134

136-
self.pool = ProcessPoolExecutor(max_workers=self.processors)
135+
try:
136+
mp_context = mp.context.get_context(
137+
self.plugin_args.get('mp_context'))
138+
self.pool = ProcessPoolExecutor(max_workers=self.processors,
139+
initializer=os.chdir,
140+
initargs=(self._cwd,),
141+
mp_context=mp_context)
142+
except AttributeError, TypeError:
143+
# Python < 3.7 does not support initialization or contexts
144+
self.pool = ProcessPoolExecutor(max_workers=self.processors)
137145

138146
self._stats = None
139147

0 commit comments

Comments
 (0)