# HG changeset patch # User Jan Alexander Steffens (heftig) # Date 2020-02-04 21:07:36 # Node ID 12491abf93bd87b057cb6826e36606afa1cee88a # Parent c443b9ba6f63e96d30606ee9fa4377cebf1d9f80 worker: manually buffer reads from pickle stream My previous fix (D8051, cb52e619c99e, which added Python's built-in buffering to the pickle stream) has the problem that the selector will ignore the buffer. When multiple pickled objects are read from the pipe into the buffer at once, only one object will be loaded. This can repeat until the buffer is full and delays the processing of completed items until the worker exits, at which point the pipe is always considered readable and all remaining items are processed. This changeset reverts D8051, removing the buffer again. Instead, on Python 3 only, we use a wrapper to modify the "read" provided to the Unpickler to behave more like a buffered read. We never read more bytes from the pipe than the Unpickler requests, so the selector behaves as expected. Also add a test case for "pickle data was truncated" issue. https://phab.mercurial-scm.org/D8051#119193 Differential Revision: https://phab.mercurial-scm.org/D8076 diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -65,6 +65,41 @@ def _numworkers(ui): return min(max(countcpus(), 4), 32) +if pycompat.ispy3: + + class _blockingreader(object): + def __init__(self, wrapped): + self._wrapped = wrapped + + def __getattr__(self, attr): + return getattr(self._wrapped, attr) + + # issue multiple reads until size is fulfilled + def read(self, size=-1): + if size < 0: + return self._wrapped.readall() + + buf = bytearray(size) + view = memoryview(buf) + pos = 0 + + while pos < size: + ret = self._wrapped.readinto(view[pos:]) + if not ret: + break + pos += ret + + del view + del buf[pos:] + return buf + + +else: + + def _blockingreader(wrapped): + return wrapped + + if pycompat.isposix or pycompat.iswindows: _STARTUP_COST = 0.01 # The Windows worker is thread based. If tasks are CPU bound, threads @@ -226,7 +261,7 @@ def _posixworker(ui, func, staticargs, a selector = selectors.DefaultSelector() for rfd, wfd in pipes: os.close(wfd) - selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ) + selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) def cleanup(): signal.signal(signal.SIGINT, oldhandler) @@ -240,7 +275,7 @@ def _posixworker(ui, func, staticargs, a while openpipes > 0: for key, events in selector.select(): try: - res = util.pickle.load(key.fileobj) + res = util.pickle.load(_blockingreader(key.fileobj)) if hasretval and res[0]: retval.update(res[1]) else: diff --git a/tests/test-worker.t b/tests/test-worker.t --- a/tests/test-worker.t +++ b/tests/test-worker.t @@ -131,4 +131,35 @@ Workers should not do cleanups in all ca abort: known exception [255] +Do not crash on partially read result + + $ cat > $TESTTMP/detecttruncated.py < from __future__ import absolute_import + > import os + > import sys + > import time + > sys.unraisablehook = lambda x: None + > oldwrite = os.write + > def splitwrite(fd, string): + > ret = oldwrite(fd, string[:9]) + > if ret == 9: + > time.sleep(0.1) + > ret += oldwrite(fd, string[9:]) + > return ret + > os.write = splitwrite + > EOF + + $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \ + > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0 + start + run + run + run + run + run + run + run + run + done + #endif