Skip to content

Commit 1b6761b

Browse files
committed
address multiprocessing spawning errors when on macOS and nbdev runs tests
1 parent ea9bbc7 commit 1b6761b

File tree

2 files changed

+14
-67
lines changed

2 files changed

+14
-67
lines changed

fastcore/parallel.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ def g(_obj_td, *args, **kwargs):
3030
_obj_td.result = res
3131
@wraps(f)
3232
def _f(*args, **kwargs):
33-
res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs)
33+
if process:
34+
Proc = get_context('fork').Process if sys.platform == 'darwin' else Process
35+
else:
36+
Proc = Thread
37+
res = Proc(target=g, args=args, kwargs=kwargs)
3438
res._args = (res,)+res._args
3539
res.start()
3640
return res
@@ -123,7 +127,9 @@ def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None
123127
kwpool = {}
124128
if threadpool: pool = ThreadPoolExecutor
125129
else:
126-
if not method and sys.platform == 'darwin': method='fork'
130+
if not method and sys.platform == 'darwin':
131+
# Use fork only if function is defined in __main__ (notebooks/REPL), otherwise use spawn
132+
method = 'fork' if getattr(f, '__module__', None) == '__main__' else 'spawn'
127133
if method: kwpool['mp_context'] = get_context(method)
128134
pool = ProcessPoolExecutor
129135
with pool(n_workers, pause=pause, **kwpool) as ex:
@@ -158,7 +164,8 @@ async def limited_task(item):
158164
# %% ../nbs/03a_parallel.ipynb
159165
def run_procs(f, f_done, args):
160166
"Call `f` for each item in `args` in parallel, yielding `f_done`"
161-
processes = L(args).map(Process, args=arg0, target=f)
167+
Proc = get_context('fork').Process if sys.platform == 'darwin' else Process
168+
processes = L(args).map(Proc, args=arg0, target=f)
162169
for o in processes: o.start()
163170
yield from f_done()
164171
processes.map(Self.join())

nbs/03a_parallel.ipynb

Lines changed: 4 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,7 @@
1414
"execution_count": null,
1515
"metadata": {},
1616
"outputs": [],
17-
"source": [
18-
"#| export\n",
19-
"from fastcore.imports import *\n",
20-
"from fastcore.basics import *\n",
21-
"from fastcore.foundation import *\n",
22-
"from fastcore.meta import *\n",
23-
"from fastcore.xtras import *\n",
24-
"from functools import wraps\n",
25-
"\n",
26-
"import concurrent.futures,time\n",
27-
"from multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context\n",
28-
"from threading import Thread\n",
29-
"try:\n",
30-
" if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method(\"fork\")\n",
31-
"except: pass"
32-
]
17+
"source": "#| export\nfrom fastcore.imports import *\nfrom fastcore.basics import *\nfrom fastcore.foundation import *\nfrom fastcore.meta import *\nfrom fastcore.xtras import *\nfrom functools import wraps\n\nimport concurrent.futures,time\nfrom multiprocessing import Process,Queue,Manager,set_start_method,get_all_start_methods,get_context\nfrom threading import Thread\ntry:\n if sys.platform == 'darwin' and IN_NOTEBOOK: set_start_method(\"fork\")\nexcept: pass"
3318
},
3419
{
3520
"cell_type": "code",
@@ -56,27 +41,7 @@
5641
"execution_count": null,
5742
"metadata": {},
5843
"outputs": [],
59-
"source": [
60-
"#| export\n",
61-
"def threaded(process=False):\n",
62-
" \"Run `f` in a `Thread` (or `Process` if `process=True`), and returns it\"\n",
63-
" def _r(f):\n",
64-
" def g(_obj_td, *args, **kwargs):\n",
65-
" res = f(*args, **kwargs)\n",
66-
" _obj_td.result = res\n",
67-
" @wraps(f)\n",
68-
" def _f(*args, **kwargs):\n",
69-
" res = (Thread,Process)[process](target=g, args=args, kwargs=kwargs)\n",
70-
" res._args = (res,)+res._args\n",
71-
" res.start()\n",
72-
" return res\n",
73-
" return _f\n",
74-
" if callable(process):\n",
75-
" o = process\n",
76-
" process = False\n",
77-
" return _r(o)\n",
78-
" return _r"
79-
]
44+
"source": "#| export\ndef threaded(process=False):\n \"Run `f` in a `Thread` (or `Process` if `process=True`), and returns it\"\n def _r(f):\n def g(_obj_td, *args, **kwargs):\n res = f(*args, **kwargs)\n _obj_td.result = res\n @wraps(f)\n def _f(*args, **kwargs):\n if process:\n Proc = get_context('fork').Process if sys.platform == 'darwin' else Process\n else:\n Proc = Thread\n res = Proc(target=g, args=args, kwargs=kwargs)\n res._args = (res,)+res._args\n res.start()\n return res\n return _f\n if callable(process):\n o = process\n process = False\n return _r(o)\n return _r"
8045
},
8146
{
8247
"cell_type": "code",
@@ -406,24 +371,7 @@
406371
"execution_count": null,
407372
"metadata": {},
408373
"outputs": [],
409-
"source": [
410-
"#| export\n",
411-
"def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,\n",
412-
" method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):\n",
413-
" \"Applies `func` in parallel to `items`, using `n_workers`\"\n",
414-
" kwpool = {}\n",
415-
" if threadpool: pool = ThreadPoolExecutor\n",
416-
" else:\n",
417-
" if not method and sys.platform == 'darwin': method='fork'\n",
418-
" if method: kwpool['mp_context'] = get_context(method)\n",
419-
" pool = ProcessPoolExecutor\n",
420-
" with pool(n_workers, pause=pause, **kwpool) as ex:\n",
421-
" r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)\n",
422-
" if progress and progress_bar:\n",
423-
" if total is None: total = len(items)\n",
424-
" r = progress_bar(r, total=total, leave=False)\n",
425-
" return L(r)"
426-
]
374+
"source": "#| export\ndef parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,\n method=None, threadpool=False, timeout=None, chunksize=1, **kwargs):\n \"Applies `func` in parallel to `items`, using `n_workers`\"\n kwpool = {}\n if threadpool: pool = ThreadPoolExecutor\n else:\n if not method and sys.platform == 'darwin':\n # Use fork only if function is defined in __main__ (notebooks/REPL), otherwise use spawn\n method = 'fork' if getattr(f, '__module__', None) == '__main__' else 'spawn'\n if method: kwpool['mp_context'] = get_context(method)\n pool = ProcessPoolExecutor\n with pool(n_workers, pause=pause, **kwpool) as ex:\n r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)\n if progress and progress_bar:\n if total is None: total = len(items)\n r = progress_bar(r, total=total, leave=False)\n return L(r)"
427375
},
428376
{
429377
"cell_type": "code",
@@ -583,15 +531,7 @@
583531
"execution_count": null,
584532
"metadata": {},
585533
"outputs": [],
586-
"source": [
587-
"#| export\n",
588-
"def run_procs(f, f_done, args):\n",
589-
" \"Call `f` for each item in `args` in parallel, yielding `f_done`\"\n",
590-
" processes = L(args).map(Process, args=arg0, target=f)\n",
591-
" for o in processes: o.start()\n",
592-
" yield from f_done()\n",
593-
" processes.map(Self.join())"
594-
]
534+
"source": "#| export\ndef run_procs(f, f_done, args):\n \"Call `f` for each item in `args` in parallel, yielding `f_done`\"\n Proc = get_context('fork').Process if sys.platform == 'darwin' else Process\n processes = L(args).map(Proc, args=arg0, target=f)\n for o in processes: o.start()\n yield from f_done()\n processes.map(Self.join())"
595535
},
596536
{
597537
"cell_type": "code",

0 commit comments

Comments
 (0)