##// END OF EJS Templates
worker: avoid reading 1 byte at a time from the OS pipe...
Arseniy Alekseyev -
r50794:3eef8baf default
parent child Browse files
Show More
@@ -1,472 +1,444 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import os
10 10 import pickle
11 11 import selectors
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 from .i18n import _
18 18 from . import (
19 19 encoding,
20 20 error,
21 21 pycompat,
22 22 scmutil,
23 23 )
24 24
25 25
26 26 def countcpus():
27 27 '''try to count the number of CPUs on the system'''
28 28
29 29 # posix
30 30 try:
31 31 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
32 32 if n > 0:
33 33 return n
34 34 except (AttributeError, ValueError):
35 35 pass
36 36
37 37 # windows
38 38 try:
39 39 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
40 40 if n > 0:
41 41 return n
42 42 except (KeyError, ValueError):
43 43 pass
44 44
45 45 return 1
46 46
47 47
48 48 def _numworkers(ui):
49 49 s = ui.config(b'worker', b'numcpus')
50 50 if s:
51 51 try:
52 52 n = int(s)
53 53 if n >= 1:
54 54 return n
55 55 except ValueError:
56 56 raise error.Abort(_(b'number of cpus must be an integer'))
57 57 return min(max(countcpus(), 4), 32)
58 58
59 59
60 60 def ismainthread():
61 61 return threading.current_thread() == threading.main_thread()
62 62
63 63
64 class _blockingreader:
65 """Wrap unbuffered stream such that pickle.load() works with it.
66
67 pickle.load() expects that calls to read() and readinto() read as many
68 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
69 pickle.load() raises an EOFError.
70 """
71
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
74
75 def readline(self):
76 return self._wrapped.readline()
77
78 def readinto(self, buf):
79 pos = 0
80 size = len(buf)
81
82 with memoryview(buf) as view:
83 while pos < size:
84 with view[pos:] as subview:
85 ret = self._wrapped.readinto(subview)
86 if not ret:
87 break
88 pos += ret
89
90 return pos
91
92 # issue multiple reads until size is fulfilled (or EOF is encountered)
93 def read(self, size=-1):
94 if size < 0:
95 return self._wrapped.readall()
96
97 buf = bytearray(size)
98 n_read = self.readinto(buf)
99 del buf[n_read:]
100 return bytes(buf)
101
102
103 64 if pycompat.isposix or pycompat.iswindows:
104 65 _STARTUP_COST = 0.01
105 66 # The Windows worker is thread based. If tasks are CPU bound, threads
106 67 # in the presence of the GIL result in excessive context switching and
107 68 # this overhead can slow down execution.
108 69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
109 70 else:
110 71 _STARTUP_COST = 1e30
111 72 _DISALLOW_THREAD_UNSAFE = False
112 73
113 74
114 75 def worthwhile(ui, costperop, nops, threadsafe=True):
115 76 """try to determine whether the benefit of multiple processes can
116 77 outweigh the cost of starting them"""
117 78
118 79 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
119 80 return False
120 81
121 82 linear = costperop * nops
122 83 workers = _numworkers(ui)
123 84 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 85 return benefit >= 0.15
125 86
126 87
127 88 def worker(
128 89 ui,
129 90 costperarg,
130 91 func,
131 92 staticargs,
132 93 args,
133 94 hasretval=False,
134 95 threadsafe=True,
135 96 prefork=None,
136 97 ):
137 98 """run a function, possibly in parallel in multiple worker
138 99 processes.
139 100
140 101 returns a progress iterator
141 102
142 103 costperarg - cost of a single task
143 104
144 105 func - function to run. It is expected to return a progress iterator.
145 106
146 107 staticargs - arguments to pass to every invocation of the function
147 108
148 109 args - arguments to split into chunks, to pass to individual
149 110 workers
150 111
151 112 hasretval - when True, func and the current function return an progress
152 113 iterator then a dict (encoded as an iterator that yield many (False, ..)
153 114 then a (True, dict)). The dicts are joined in some arbitrary order, so
154 115 overlapping keys are a bad idea.
155 116
156 117 threadsafe - whether work items are thread safe and can be executed using
157 118 a thread-based worker. Should be disabled for CPU heavy tasks that don't
158 119 release the GIL.
159 120
160 121 prefork - a parameterless Callable that is invoked prior to forking the
161 122 process. fork() is only used on non-Windows platforms, but is also not
162 123 called on POSIX platforms if the work amount doesn't warrant a worker.
163 124 """
164 125 enabled = ui.configbool(b'worker', b'enabled')
165 126 if enabled and _platformworker is _posixworker and not ismainthread():
166 127 # The POSIX worker has to install a handler for SIGCHLD.
167 128 # Python up to 3.9 only allows this in the main thread.
168 129 enabled = False
169 130
170 131 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
171 132 return _platformworker(
172 133 ui, func, staticargs, args, hasretval, prefork=prefork
173 134 )
174 135 return func(*staticargs + (args,))
175 136
176 137
177 138 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
178 139 workers = _numworkers(ui)
179 140 oldhandler = signal.getsignal(signal.SIGINT)
180 141 signal.signal(signal.SIGINT, signal.SIG_IGN)
181 142 pids, problem = set(), [0]
182 143
183 144 def killworkers():
184 145 # unregister SIGCHLD handler as all children will be killed. This
185 146 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
186 147 # could be updated while iterating, which would cause inconsistency.
187 148 signal.signal(signal.SIGCHLD, oldchldhandler)
188 149 # if one worker bails, there's no good reason to wait for the rest
189 150 for p in pids:
190 151 try:
191 152 os.kill(p, signal.SIGTERM)
192 153 except ProcessLookupError:
193 154 pass
194 155
195 156 def waitforworkers(blocking=True):
196 157 for pid in pids.copy():
197 158 p = st = 0
198 159 try:
199 160 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
200 161 except ChildProcessError:
201 162 # child would already be reaped, but pids yet been
202 163 # updated (maybe interrupted just after waitpid)
203 164 pids.discard(pid)
204 165 if not p:
205 166 # skip subsequent steps, because child process should
206 167 # be still running in this case
207 168 continue
208 169 pids.discard(p)
209 170 st = _exitstatus(st)
210 171 if st and not problem[0]:
211 172 problem[0] = st
212 173
213 174 def sigchldhandler(signum, frame):
214 175 waitforworkers(blocking=False)
215 176 if problem[0]:
216 177 killworkers()
217 178
218 179 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
219 180 ui.flush()
220 181 parentpid = os.getpid()
221 182 pipes = []
222 183 retval = {}
223 184
224 185 if prefork:
225 186 prefork()
226 187
227 188 for pargs in partition(args, min(workers, len(args))):
228 189 # Every worker gets its own pipe to send results on, so we don't have to
229 190 # implement atomic writes larger than PIPE_BUF. Each forked process has
230 191 # its own pipe's descriptors in the local variables, and the parent
231 192 # process has the full list of pipe descriptors (and it doesn't really
232 193 # care what order they're in).
233 194 rfd, wfd = os.pipe()
234 195 pipes.append((rfd, wfd))
235 196 # make sure we use os._exit in all worker code paths. otherwise the
236 197 # worker may do some clean-ups which could cause surprises like
237 198 # deadlock. see sshpeer.cleanup for example.
238 199 # override error handling *before* fork. this is necessary because
239 200 # exception (signal) may arrive after fork, before "pid =" assignment
240 201 # completes, and other exception handler (dispatch.py) can lead to
241 202 # unexpected code path without os._exit.
242 203 ret = -1
243 204 try:
244 205 pid = os.fork()
245 206 if pid == 0:
246 207 signal.signal(signal.SIGINT, oldhandler)
247 208 signal.signal(signal.SIGCHLD, oldchldhandler)
248 209
249 210 def workerfunc():
250 211 for r, w in pipes[:-1]:
251 212 os.close(r)
252 213 os.close(w)
253 214 os.close(rfd)
254 215 with os.fdopen(wfd, 'wb') as wf:
255 216 for result in func(*(staticargs + (pargs,))):
256 217 pickle.dump(result, wf)
257 218 wf.flush()
258 219 return 0
259 220
260 221 ret = scmutil.callcatch(ui, workerfunc)
261 222 except: # parent re-raises, child never returns
262 223 if os.getpid() == parentpid:
263 224 raise
264 225 exctype = sys.exc_info()[0]
265 226 force = not issubclass(exctype, KeyboardInterrupt)
266 227 ui.traceback(force=force)
267 228 finally:
268 229 if os.getpid() != parentpid:
269 230 try:
270 231 ui.flush()
271 232 except: # never returns, no re-raises
272 233 pass
273 234 finally:
274 235 os._exit(ret & 255)
275 236 pids.add(pid)
276 237 selector = selectors.DefaultSelector()
277 238 for rfd, wfd in pipes:
278 239 os.close(wfd)
279 # The stream has to be unbuffered. Otherwise, if all data is read from
280 # the raw file into the buffer, the selector thinks that the FD is not
281 # ready to read while pickle.load() could read from the buffer. This
282 # would delay the processing of readable items.
283 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
240 # Buffering is needed for performance, but it also presents a problem:
241 # selector doesn't take the buffered data into account,
242 # so we have to arrange it so that the buffers are empty when select is called
243 # (see [peek_nonblock])
244 selector.register(os.fdopen(rfd, 'rb', 4096), selectors.EVENT_READ)
245
246 def peek_nonblock(f):
247 os.set_blocking(f.fileno(), False)
248 res = f.peek()
249 os.set_blocking(f.fileno(), True)
250 return res
251
252 def load_all(f):
253 # The pytype error likely goes away on a modern version of
254 # pytype having a modern typeshed snapshot.
255 # pytype: disable=wrong-arg-types
256 yield pickle.load(f)
257 while len(peek_nonblock(f)) > 0:
258 yield pickle.load(f)
259 # pytype: enable=wrong-arg-types
284 260
285 261 def cleanup():
286 262 signal.signal(signal.SIGINT, oldhandler)
287 263 waitforworkers()
288 264 signal.signal(signal.SIGCHLD, oldchldhandler)
289 265 selector.close()
290 266 return problem[0]
291 267
292 268 try:
293 269 openpipes = len(pipes)
294 270 while openpipes > 0:
295 271 for key, events in selector.select():
296 272 try:
297 # The pytype error likely goes away on a modern version of
298 # pytype having a modern typeshed snapshot.
299 # pytype: disable=wrong-arg-types
300 res = pickle.load(_blockingreader(key.fileobj))
301 # pytype: enable=wrong-arg-types
302 if hasretval and res[0]:
303 retval.update(res[1])
304 else:
305 yield res
273 for res in load_all(key.fileobj):
274 if hasretval and res[0]:
275 retval.update(res[1])
276 else:
277 yield res
306 278 except EOFError:
307 279 selector.unregister(key.fileobj)
308 280 # pytype: disable=attribute-error
309 281 key.fileobj.close()
310 282 # pytype: enable=attribute-error
311 283 openpipes -= 1
312 284 except: # re-raises
313 285 killworkers()
314 286 cleanup()
315 287 raise
316 288 status = cleanup()
317 289 if status:
318 290 if status < 0:
319 291 os.kill(os.getpid(), -status)
320 292 raise error.WorkerError(status)
321 293 if hasretval:
322 294 yield True, retval
323 295
324 296
325 297 def _posixexitstatus(code):
326 298 """convert a posix exit status into the same form returned by
327 299 os.spawnv
328 300
329 301 returns None if the process was stopped instead of exiting"""
330 302 if os.WIFEXITED(code):
331 303 return os.WEXITSTATUS(code)
332 304 elif os.WIFSIGNALED(code):
333 305 return -(os.WTERMSIG(code))
334 306
335 307
336 308 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
337 309 class Worker(threading.Thread):
338 310 def __init__(
339 311 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
340 312 ):
341 313 threading.Thread.__init__(self, *args, **kwargs)
342 314 self._taskqueue = taskqueue
343 315 self._resultqueue = resultqueue
344 316 self._func = func
345 317 self._staticargs = staticargs
346 318 self._interrupted = False
347 319 self.daemon = True
348 320 self.exception = None
349 321
350 322 def interrupt(self):
351 323 self._interrupted = True
352 324
353 325 def run(self):
354 326 try:
355 327 while not self._taskqueue.empty():
356 328 try:
357 329 args = self._taskqueue.get_nowait()
358 330 for res in self._func(*self._staticargs + (args,)):
359 331 self._resultqueue.put(res)
360 332 # threading doesn't provide a native way to
361 333 # interrupt execution. handle it manually at every
362 334 # iteration.
363 335 if self._interrupted:
364 336 return
365 337 except pycompat.queue.Empty:
366 338 break
367 339 except Exception as e:
368 340 # store the exception such that the main thread can resurface
369 341 # it as if the func was running without workers.
370 342 self.exception = e
371 343 raise
372 344
373 345 threads = []
374 346
375 347 def trykillworkers():
376 348 # Allow up to 1 second to clean worker threads nicely
377 349 cleanupend = time.time() + 1
378 350 for t in threads:
379 351 t.interrupt()
380 352 for t in threads:
381 353 remainingtime = cleanupend - time.time()
382 354 t.join(remainingtime)
383 355 if t.is_alive():
384 356 # pass over the workers joining failure. it is more
385 357 # important to surface the inital exception than the
386 358 # fact that one of workers may be processing a large
387 359 # task and does not get to handle the interruption.
388 360 ui.warn(
389 361 _(
390 362 b"failed to kill worker threads while "
391 363 b"handling an exception\n"
392 364 )
393 365 )
394 366 return
395 367
396 368 workers = _numworkers(ui)
397 369 resultqueue = pycompat.queue.Queue()
398 370 taskqueue = pycompat.queue.Queue()
399 371 retval = {}
400 372 # partition work to more pieces than workers to minimize the chance
401 373 # of uneven distribution of large tasks between the workers
402 374 for pargs in partition(args, workers * 20):
403 375 taskqueue.put(pargs)
404 376 for _i in range(workers):
405 377 t = Worker(taskqueue, resultqueue, func, staticargs)
406 378 threads.append(t)
407 379 t.start()
408 380 try:
409 381 while len(threads) > 0:
410 382 while not resultqueue.empty():
411 383 res = resultqueue.get()
412 384 if hasretval and res[0]:
413 385 retval.update(res[1])
414 386 else:
415 387 yield res
416 388 threads[0].join(0.05)
417 389 finishedthreads = [_t for _t in threads if not _t.is_alive()]
418 390 for t in finishedthreads:
419 391 if t.exception is not None:
420 392 raise t.exception
421 393 threads.remove(t)
422 394 except (Exception, KeyboardInterrupt): # re-raises
423 395 trykillworkers()
424 396 raise
425 397 while not resultqueue.empty():
426 398 res = resultqueue.get()
427 399 if hasretval and res[0]:
428 400 retval.update(res[1])
429 401 else:
430 402 yield res
431 403 if hasretval:
432 404 yield True, retval
433 405
434 406
435 407 if pycompat.iswindows:
436 408 _platformworker = _windowsworker
437 409 else:
438 410 _platformworker = _posixworker
439 411 _exitstatus = _posixexitstatus
440 412
441 413
442 414 def partition(lst, nslices):
443 415 """partition a list into N slices of roughly equal size
444 416
445 417 The current strategy takes every Nth element from the input. If
446 418 we ever write workers that need to preserve grouping in input
447 419 we should consider allowing callers to specify a partition strategy.
448 420
449 421 olivia is not a fan of this partitioning strategy when files are involved.
450 422 In his words:
451 423
452 424 Single-threaded Mercurial makes a point of creating and visiting
453 425 files in a fixed order (alphabetical). When creating files in order,
454 426 a typical filesystem is likely to allocate them on nearby regions on
455 427 disk. Thus, when revisiting in the same order, locality is maximized
456 428 and various forms of OS and disk-level caching and read-ahead get a
457 429 chance to work.
458 430
459 431 This effect can be quite significant on spinning disks. I discovered it
460 432 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
461 433 Tarring a repo and copying it to another disk effectively randomized
462 434 the revlog ordering on disk by sorting the revlogs by hash and suddenly
463 435 performance of my kernel checkout benchmark dropped by ~10x because the
464 436 "working set" of sectors visited no longer fit in the drive's cache and
465 437 the workload switched from streaming to random I/O.
466 438
467 439 What we should really be doing is have workers read filenames from a
468 440 ordered queue. This preserves locality and also keeps any worker from
469 441 getting more than one file out of balance.
470 442 """
471 443 for i in range(nslices):
472 444 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now