##// END OF EJS Templates
worker: avoid reading 1 byte at a time from the OS pipe...
Arseniy Alekseyev -
r50794:3eef8baf default
parent child Browse files
Show More
@@ -61,45 +61,6 b' def ismainthread():'
61 return threading.current_thread() == threading.main_thread()
61 return threading.current_thread() == threading.main_thread()
62
62
63
63
64 class _blockingreader:
65 """Wrap unbuffered stream such that pickle.load() works with it.
66
67 pickle.load() expects that calls to read() and readinto() read as many
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
69 pickle.load() raises an EOFError.
70 """
71
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
74
75 def readline(self):
76 return self._wrapped.readline()
77
78 def readinto(self, buf):
79 pos = 0
80 size = len(buf)
81
82 with memoryview(buf) as view:
83 while pos < size:
84 with view[pos:] as subview:
85 ret = self._wrapped.readinto(subview)
86 if not ret:
87 break
88 pos += ret
89
90 return pos
91
92 # issue multiple reads until size is fulfilled (or EOF is encountered)
93 def read(self, size=-1):
94 if size < 0:
95 return self._wrapped.readall()
96
97 buf = bytearray(size)
98 n_read = self.readinto(buf)
99 del buf[n_read:]
100 return bytes(buf)
101
102
103 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
104 _STARTUP_COST = 0.01
65 _STARTUP_COST = 0.01
105 # The Windows worker is thread based. If tasks are CPU bound, threads
66 # The Windows worker is thread based. If tasks are CPU bound, threads
@@ -276,11 +237,26 b' def _posixworker(ui, func, staticargs, a'
276 selector = selectors.DefaultSelector()
237 selector = selectors.DefaultSelector()
277 for rfd, wfd in pipes:
238 for rfd, wfd in pipes:
278 os.close(wfd)
239 os.close(wfd)
279 # The stream has to be unbuffered. Otherwise, if all data is read from
240 # Buffering is needed for performance, but it also presents a problem:
280 # the raw file into the buffer, the selector thinks that the FD is not
241 # selector doesn't take the buffered data into account,
281 # ready to read while pickle.load() could read from the buffer. This
242 # so we have to arrange it so that the buffers are empty when select is called
282 # would delay the processing of readable items.
243 # (see [peek_nonblock])
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
244 selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ)
245
246 def peek_nonblock(f):
247 os.set_blocking(f.fileno(), False)
248 res = f.peek()
249 os.set_blocking(f.fileno(), True)
250 return res
251
252 def load_all(f):
253 # The pytype error likely goes away on a modern version of
254 # pytype having a modern typeshed snapshot.
255 # pytype: disable=wrong-arg-types
256 yield pickle.load(f)
257 while len(peek_nonblock(f)) > 0:
258 yield pickle.load(f)
259 # pytype: enable=wrong-arg-types
284
260
285 def cleanup():
261 def cleanup():
286 signal.signal(signal.SIGINT, oldhandler)
262 signal.signal(signal.SIGINT, oldhandler)
@@ -294,15 +270,11 b' def _posixworker(ui, func, staticargs, a'
294 while openpipes > 0:
270 while openpipes > 0:
295 for key, events in selector.select():
271 for key, events in selector.select():
296 try:
272 try:
297 # The pytype error likely goes away on a modern version of
273 for res in load_all(key.fileobj):
298 # pytype having a modern typeshed snapshot.
274 if hasretval and res[0]:
299 # pytype: disable=wrong-arg-types
275 retval.update(res[1])
300 res = pickle.load(_blockingreader(key.fileobj))
276 else:
301 # pytype: enable=wrong-arg-types
277 yield res
302 if hasretval and res[0]:
303 retval.update(res[1])
304 else:
305 yield res
306 except EOFError:
278 except EOFError:
307 selector.unregister(key.fileobj)
279 selector.unregister(key.fileobj)
308 # pytype: disable=attribute-error
280 # pytype: disable=attribute-error
General Comments 0
You need to be logged in to leave comments. Login now