Skip to content

Commit 29100c8

Browse files
[3.13] gh-142206: multiprocessing.resource_tracker: Decode messages using older protocol (GH-142215) (#142287)
[3.14] gh-142206: multiprocessing.resource_tracker: Decode messages using older protocol (GH-142215) (GH-142285) (cherry picked from commit 4172644) Difference from the original commit: the default in 3.14 is to use the simpler original protocol (except for filenames with newlines). (cherry picked from commit f130b06) Co-authored-by: Petr Viktorin <encukou@gmail.com>
1 parent e68066e commit 29100c8

File tree

3 files changed

+76
-21
lines changed

3 files changed

+76
-21
lines changed

Lib/multiprocessing/resource_tracker.py

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ def __init__(self):
7272
self._exitcode = None
7373
self._reentrant_messages = deque()
7474

75+
# True to use colon-separated lines, rather than JSON lines,
76+
# for internal communication. (Mainly for testing).
77+
# Filenames not supported by the simple format will always be sent
78+
# using JSON.
79+
# The reader should understand all formats.
80+
self._use_simple_format = True
81+
7582
def _reentrant_call_error(self):
7683
# gh-109629: this happens if an explicit call to the ResourceTracker
7784
# gets interrupted by a garbage collection, invoking a finalizer (*)
@@ -204,7 +211,9 @@ def _launch(self):
204211
os.close(r)
205212

206213
def _make_probe_message(self):
207-
"""Return a JSON-encoded probe message."""
214+
"""Return a probe message."""
215+
if self._use_simple_format:
216+
return b'PROBE:0:noop\n'
208217
return (
209218
json.dumps(
210219
{"cmd": "PROBE", "rtype": "noop"},
@@ -271,6 +280,15 @@ def _write(self, msg):
271280
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
272281

273282
def _send(self, cmd, name, rtype):
283+
if self._use_simple_format and '\n' not in name:
284+
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
285+
if len(msg) > 512:
286+
# posix guarantees that writes to a pipe of less than PIPE_BUF
287+
# bytes are atomic, and that PIPE_BUF >= 512
288+
raise ValueError('msg too long')
289+
self._ensure_running_and_write(msg)
290+
return
291+
274292
# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
275293
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
276294
# POSIX shm_open() and sem_open() require the name, including its leading slash,
@@ -290,6 +308,7 @@ def _send(self, cmd, name, rtype):
290308

291309
# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
292310
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
311+
assert msg.startswith(b'{')
293312

294313
self._ensure_running_and_write(msg)
295314

@@ -300,6 +319,30 @@ def _send(self, cmd, name, rtype):
300319
getfd = _resource_tracker.getfd
301320

302321

322+
def _decode_message(line):
323+
if line.startswith(b'{'):
324+
try:
325+
obj = json.loads(line.decode('ascii'))
326+
except Exception as e:
327+
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
328+
329+
cmd = obj["cmd"]
330+
rtype = obj["rtype"]
331+
b64 = obj.get("base64_name", "")
332+
333+
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
334+
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
335+
336+
try:
337+
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
338+
except ValueError as e:
339+
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
340+
else:
341+
cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
342+
name, rtype = rest.rsplit(':', maxsplit=1)
343+
return cmd, rtype, name
344+
345+
303346
def main(fd):
304347
'''Run resource tracker.'''
305348
# protect the process from ^C and "killall python" etc
@@ -322,23 +365,7 @@ def main(fd):
322365
with open(fd, 'rb') as f:
323366
for line in f:
324367
try:
325-
try:
326-
obj = json.loads(line.decode('ascii'))
327-
except Exception as e:
328-
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
329-
330-
cmd = obj["cmd"]
331-
rtype = obj["rtype"]
332-
b64 = obj.get("base64_name", "")
333-
334-
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
335-
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
336-
337-
try:
338-
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
339-
except ValueError as e:
340-
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
341-
368+
cmd, rtype, name = _decode_message(line)
342369
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
343370
if cleanup_func is None:
344371
raise ValueError(

Lib/test/_test_multiprocessing.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from test.support import socket_helper
3939
from test.support import threading_helper
4040
from test.support import warnings_helper
41-
41+
from test.support import subTests
4242

4343
# Skip tests if _multiprocessing wasn't built.
4444
_multiprocessing = import_helper.import_module('_multiprocessing')
@@ -4141,6 +4141,19 @@ def test_copy(self):
41414141
self.assertEqual(bar.z, 2 ** 33)
41424142

41434143

4144+
def resource_tracker_format_subtests(func):
4145+
"""Run given test using both resource tracker communication formats"""
4146+
def _inner(self, *args, **kwargs):
4147+
tracker = resource_tracker._resource_tracker
4148+
for use_simple_format in False, True:
4149+
with (
4150+
self.subTest(use_simple_format=use_simple_format),
4151+
unittest.mock.patch.object(
4152+
tracker, '_use_simple_format', use_simple_format)
4153+
):
4154+
func(self, *args, **kwargs)
4155+
return _inner
4156+
41444157
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
41454158
@hashlib_helper.requires_hashdigest('sha256')
41464159
class _TestSharedMemory(BaseTestCase):
@@ -4418,6 +4431,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
44184431
smm.shutdown()
44194432

44204433
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4434+
@resource_tracker_format_subtests
44214435
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
44224436
# bpo-36867: test that a SharedMemoryManager uses the
44234437
# same resource_tracker process as its parent.
@@ -4668,6 +4682,7 @@ def test_shared_memory_cleaned_after_process_termination(self):
46684682
"shared_memory objects to clean up at shutdown", err)
46694683

46704684
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4685+
@resource_tracker_format_subtests
46714686
def test_shared_memory_untracking(self):
46724687
# gh-82300: When a separate Python process accesses shared memory
46734688
# with track=False, it must not cause the memory to be deleted
@@ -4695,6 +4710,7 @@ def test_shared_memory_untracking(self):
46954710
mem.close()
46964711

46974712
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
4713+
@resource_tracker_format_subtests
46984714
def test_shared_memory_tracking(self):
46994715
# gh-82300: When a separate Python process accesses shared memory
47004716
# with track=True, it must cause the memory to be deleted when
@@ -6794,13 +6810,18 @@ class SemLock(_multiprocessing.SemLock):
67946810

67956811
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
67966812
class TestSharedMemoryNames(unittest.TestCase):
6797-
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self):
6813+
@subTests('use_simple_format', (True, False))
6814+
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
6815+
self, use_simple_format):
67986816
# Test script that creates and cleans up shared memory with colon in name
67996817
test_script = textwrap.dedent("""
68006818
import sys
68016819
from multiprocessing import shared_memory
6820+
from multiprocessing import resource_tracker
68026821
import time
68036822
6823+
resource_tracker._resource_tracker._use_simple_format = %s
6824+
68046825
# Test various patterns of colons in names
68056826
test_names = [
68066827
"a:b",
@@ -6828,7 +6849,7 @@ def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self
68286849
sys.exit(1)
68296850
68306851
print("SUCCESS")
6831-
""")
6852+
""" % use_simple_format)
68326853

68336854
rc, out, err = script_helper.assert_python_ok("-c", test_script)
68346855
self.assertIn(b"SUCCESS", out)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
The resource tracker in the :mod:`multiprocessing` module now uses the
2+
original communication protocol, as in Python 3.14.0 and below,
3+
by default.
4+
This avoids issues with upgrading Python while it is running.
5+
(Note that such 'in-place' upgrades are not tested.)
6+
The tracker remains compatible with subprocesses that use new protocol
7+
(that is, subprocesses using Python 3.13.10, 3.14.1 and 3.15).

0 commit comments

Comments
 (0)