# HG changeset patch # User Arseniy Alekseyev # Date 2023-01-06 15:17:14 # Node ID 3eef8baf6b924be4a39c9cdf5332848129716699 # Parent 024e0580b8531421db48746aa67e972eb3ba861a worker: avoid reading 1 byte at a time from the OS pipe Apparently `pickle.load` does a lot of small reads, many of them literally 1-byte, so it benefits greatly from buffering. This change enables the buffering, at the cost of more complicated interaction with the `selector` API. On one repository with ~400k files this reduces the time by about ~30s, from ~60 to ~30s. The difference is so large because the actual updating work is parallellized, while these small reads are bottlenecking the central hg process. diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -61,45 +61,6 @@ def ismainthread(): return threading.current_thread() == threading.main_thread() -class _blockingreader: - """Wrap unbuffered stream such that pickle.load() works with it. - - pickle.load() expects that calls to read() and readinto() read as many - bytes as requested. On EOF, it is fine to read fewer bytes. In this case, - pickle.load() raises an EOFError. - """ - - def __init__(self, wrapped): - self._wrapped = wrapped - - def readline(self): - return self._wrapped.readline() - - def readinto(self, buf): - pos = 0 - size = len(buf) - - with memoryview(buf) as view: - while pos < size: - with view[pos:] as subview: - ret = self._wrapped.readinto(subview) - if not ret: - break - pos += ret - - return pos - - # issue multiple reads until size is fulfilled (or EOF is encountered) - def read(self, size=-1): - if size < 0: - return self._wrapped.readall() - - buf = bytearray(size) - n_read = self.readinto(buf) - del buf[n_read:] - return bytes(buf) - - if pycompat.isposix or pycompat.iswindows: _STARTUP_COST = 0.01 # The Windows worker is thread based. If tasks are CPU bound, threads @@ -276,11 +237,26 @@ def _posixworker(ui, func, staticargs, a selector = selectors.DefaultSelector() for rfd, wfd in pipes: os.close(wfd) - # The stream has to be unbuffered. Otherwise, if all data is read from - # the raw file into the buffer, the selector thinks that the FD is not - # ready to read while pickle.load() could read from the buffer. This - # would delay the processing of readable items. - selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) + # Buffering is needed for performance, but it also presents a problem: + # selector doesn't take the buffered data into account, + # so we have to arrange it so that the buffers are empty when select is called + # (see [peek_nonblock]) + selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ) + + def peek_nonblock(f): + os.set_blocking(f.fileno(), False) + res = f.peek() + os.set_blocking(f.fileno(), True) + return res + + def load_all(f): + # The pytype error likely goes away on a modern version of + # pytype having a modern typeshed snapshot. + # pytype: disable=wrong-arg-types + yield pickle.load(f) + while len(peek_nonblock(f)) > 0: + yield pickle.load(f) + # pytype: enable=wrong-arg-types def cleanup(): signal.signal(signal.SIGINT, oldhandler) @@ -294,15 +270,11 @@ def _posixworker(ui, func, staticargs, a while openpipes > 0: for key, events in selector.select(): try: - # The pytype error likely goes away on a modern version of - # pytype having a modern typeshed snapshot. - # pytype: disable=wrong-arg-types - res = pickle.load(_blockingreader(key.fileobj)) - # pytype: enable=wrong-arg-types - if hasretval and res[0]: - retval.update(res[1]) - else: - yield res + for res in load_all(key.fileobj): + if hasretval and res[0]: + retval.update(res[1]) + else: + yield res except EOFError: selector.unregister(key.fileobj) # pytype: disable=attribute-error