##// END OF EJS Templates
worker: explain why pickle reading stream has to be unbuffered
Manuel Jacob -
r50125:5d28246b default
parent child Browse files
Show More
@@ -1,474 +1,478 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 )
31 31
32 32
33 33 def countcpus():
34 34 '''try to count the number of CPUs on the system'''
35 35
36 36 # posix
37 37 try:
38 38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 39 if n > 0:
40 40 return n
41 41 except (AttributeError, ValueError):
42 42 pass
43 43
44 44 # windows
45 45 try:
46 46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 47 if n > 0:
48 48 return n
49 49 except (KeyError, ValueError):
50 50 pass
51 51
52 52 return 1
53 53
54 54
55 55 def _numworkers(ui):
56 56 s = ui.config(b'worker', b'numcpus')
57 57 if s:
58 58 try:
59 59 n = int(s)
60 60 if n >= 1:
61 61 return n
62 62 except ValueError:
63 63 raise error.Abort(_(b'number of cpus must be an integer'))
64 64 return min(max(countcpus(), 4), 32)
65 65
66 66
67 67 def ismainthread():
68 68 return threading.current_thread() == threading.main_thread()
69 69
70 70
71 71 class _blockingreader:
72 72 def __init__(self, wrapped):
73 73 self._wrapped = wrapped
74 74
75 75 # Do NOT implement readinto() by making it delegate to
76 76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 77 # with just read() and readline(), so we don't need to implement it.
78 78
79 79 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
80 80
81 81 # This is required for python 3.8, prior to 3.8.2. See issue6444.
82 82 def readinto(self, b):
83 83 pos = 0
84 84 size = len(b)
85 85
86 86 while pos < size:
87 87 ret = self._wrapped.readinto(b[pos:])
88 88 if not ret:
89 89 break
90 90 pos += ret
91 91
92 92 return pos
93 93
94 94 def readline(self):
95 95 return self._wrapped.readline()
96 96
97 97 # issue multiple reads until size is fulfilled
98 98 def read(self, size=-1):
99 99 if size < 0:
100 100 return self._wrapped.readall()
101 101
102 102 buf = bytearray(size)
103 103 view = memoryview(buf)
104 104 pos = 0
105 105
106 106 while pos < size:
107 107 ret = self._wrapped.readinto(view[pos:])
108 108 if not ret:
109 109 break
110 110 pos += ret
111 111
112 112 del view
113 113 del buf[pos:]
114 114 return bytes(buf)
115 115
116 116
117 117 if pycompat.isposix or pycompat.iswindows:
118 118 _STARTUP_COST = 0.01
119 119 # The Windows worker is thread based. If tasks are CPU bound, threads
120 120 # in the presence of the GIL result in excessive context switching and
121 121 # this overhead can slow down execution.
122 122 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
123 123 else:
124 124 _STARTUP_COST = 1e30
125 125 _DISALLOW_THREAD_UNSAFE = False
126 126
127 127
128 128 def worthwhile(ui, costperop, nops, threadsafe=True):
129 129 """try to determine whether the benefit of multiple processes can
130 130 outweigh the cost of starting them"""
131 131
132 132 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
133 133 return False
134 134
135 135 linear = costperop * nops
136 136 workers = _numworkers(ui)
137 137 benefit = linear - (_STARTUP_COST * workers + linear / workers)
138 138 return benefit >= 0.15
139 139
140 140
141 141 def worker(
142 142 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
143 143 ):
144 144 """run a function, possibly in parallel in multiple worker
145 145 processes.
146 146
147 147 returns a progress iterator
148 148
149 149 costperarg - cost of a single task
150 150
151 151 func - function to run. It is expected to return a progress iterator.
152 152
153 153 staticargs - arguments to pass to every invocation of the function
154 154
155 155 args - arguments to split into chunks, to pass to individual
156 156 workers
157 157
158 158 hasretval - when True, func and the current function return an progress
159 159 iterator then a dict (encoded as an iterator that yield many (False, ..)
160 160 then a (True, dict)). The dicts are joined in some arbitrary order, so
161 161 overlapping keys are a bad idea.
162 162
163 163 threadsafe - whether work items are thread safe and can be executed using
164 164 a thread-based worker. Should be disabled for CPU heavy tasks that don't
165 165 release the GIL.
166 166 """
167 167 enabled = ui.configbool(b'worker', b'enabled')
168 168 if enabled and _platformworker is _posixworker and not ismainthread():
169 169 # The POSIX worker has to install a handler for SIGCHLD.
170 170 # Python up to 3.9 only allows this in the main thread.
171 171 enabled = False
172 172
173 173 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
174 174 return _platformworker(ui, func, staticargs, args, hasretval)
175 175 return func(*staticargs + (args,))
176 176
177 177
178 178 def _posixworker(ui, func, staticargs, args, hasretval):
179 179 workers = _numworkers(ui)
180 180 oldhandler = signal.getsignal(signal.SIGINT)
181 181 signal.signal(signal.SIGINT, signal.SIG_IGN)
182 182 pids, problem = set(), [0]
183 183
184 184 def killworkers():
185 185 # unregister SIGCHLD handler as all children will be killed. This
186 186 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
187 187 # could be updated while iterating, which would cause inconsistency.
188 188 signal.signal(signal.SIGCHLD, oldchldhandler)
189 189 # if one worker bails, there's no good reason to wait for the rest
190 190 for p in pids:
191 191 try:
192 192 os.kill(p, signal.SIGTERM)
193 193 except OSError as err:
194 194 if err.errno != errno.ESRCH:
195 195 raise
196 196
197 197 def waitforworkers(blocking=True):
198 198 for pid in pids.copy():
199 199 p = st = 0
200 200 while True:
201 201 try:
202 202 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
203 203 break
204 204 except OSError as e:
205 205 if e.errno == errno.EINTR:
206 206 continue
207 207 elif e.errno == errno.ECHILD:
208 208 # child would already be reaped, but pids yet been
209 209 # updated (maybe interrupted just after waitpid)
210 210 pids.discard(pid)
211 211 break
212 212 else:
213 213 raise
214 214 if not p:
215 215 # skip subsequent steps, because child process should
216 216 # be still running in this case
217 217 continue
218 218 pids.discard(p)
219 219 st = _exitstatus(st)
220 220 if st and not problem[0]:
221 221 problem[0] = st
222 222
223 223 def sigchldhandler(signum, frame):
224 224 waitforworkers(blocking=False)
225 225 if problem[0]:
226 226 killworkers()
227 227
228 228 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
229 229 ui.flush()
230 230 parentpid = os.getpid()
231 231 pipes = []
232 232 retval = {}
233 233 for pargs in partition(args, min(workers, len(args))):
234 234 # Every worker gets its own pipe to send results on, so we don't have to
235 235 # implement atomic writes larger than PIPE_BUF. Each forked process has
236 236 # its own pipe's descriptors in the local variables, and the parent
237 237 # process has the full list of pipe descriptors (and it doesn't really
238 238 # care what order they're in).
239 239 rfd, wfd = os.pipe()
240 240 pipes.append((rfd, wfd))
241 241 # make sure we use os._exit in all worker code paths. otherwise the
242 242 # worker may do some clean-ups which could cause surprises like
243 243 # deadlock. see sshpeer.cleanup for example.
244 244 # override error handling *before* fork. this is necessary because
245 245 # exception (signal) may arrive after fork, before "pid =" assignment
246 246 # completes, and other exception handler (dispatch.py) can lead to
247 247 # unexpected code path without os._exit.
248 248 ret = -1
249 249 try:
250 250 pid = os.fork()
251 251 if pid == 0:
252 252 signal.signal(signal.SIGINT, oldhandler)
253 253 signal.signal(signal.SIGCHLD, oldchldhandler)
254 254
255 255 def workerfunc():
256 256 for r, w in pipes[:-1]:
257 257 os.close(r)
258 258 os.close(w)
259 259 os.close(rfd)
260 260 for result in func(*(staticargs + (pargs,))):
261 261 os.write(wfd, pickle.dumps(result))
262 262 return 0
263 263
264 264 ret = scmutil.callcatch(ui, workerfunc)
265 265 except: # parent re-raises, child never returns
266 266 if os.getpid() == parentpid:
267 267 raise
268 268 exctype = sys.exc_info()[0]
269 269 force = not issubclass(exctype, KeyboardInterrupt)
270 270 ui.traceback(force=force)
271 271 finally:
272 272 if os.getpid() != parentpid:
273 273 try:
274 274 ui.flush()
275 275 except: # never returns, no re-raises
276 276 pass
277 277 finally:
278 278 os._exit(ret & 255)
279 279 pids.add(pid)
280 280 selector = selectors.DefaultSelector()
281 281 for rfd, wfd in pipes:
282 282 os.close(wfd)
283 # The stream has to be unbuffered. Otherwise, if all data is read from
284 # the raw file into the buffer, the selector thinks that the FD is not
285 # ready to read while pickle.load() could read from the buffer. This
286 # would delay the processing of readable items.
283 287 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
284 288
285 289 def cleanup():
286 290 signal.signal(signal.SIGINT, oldhandler)
287 291 waitforworkers()
288 292 signal.signal(signal.SIGCHLD, oldchldhandler)
289 293 selector.close()
290 294 return problem[0]
291 295
292 296 try:
293 297 openpipes = len(pipes)
294 298 while openpipes > 0:
295 299 for key, events in selector.select():
296 300 try:
297 301 # The pytype error likely goes away on a modern version of
298 302 # pytype having a modern typeshed snapshot.
299 303 # pytype: disable=wrong-arg-types
300 304 res = pickle.load(_blockingreader(key.fileobj))
301 305 # pytype: enable=wrong-arg-types
302 306 if hasretval and res[0]:
303 307 retval.update(res[1])
304 308 else:
305 309 yield res
306 310 except EOFError:
307 311 selector.unregister(key.fileobj)
308 312 key.fileobj.close()
309 313 openpipes -= 1
310 314 except IOError as e:
311 315 if e.errno == errno.EINTR:
312 316 continue
313 317 raise
314 318 except: # re-raises
315 319 killworkers()
316 320 cleanup()
317 321 raise
318 322 status = cleanup()
319 323 if status:
320 324 if status < 0:
321 325 os.kill(os.getpid(), -status)
322 326 raise error.WorkerError(status)
323 327 if hasretval:
324 328 yield True, retval
325 329
326 330
327 331 def _posixexitstatus(code):
328 332 """convert a posix exit status into the same form returned by
329 333 os.spawnv
330 334
331 335 returns None if the process was stopped instead of exiting"""
332 336 if os.WIFEXITED(code):
333 337 return os.WEXITSTATUS(code)
334 338 elif os.WIFSIGNALED(code):
335 339 return -(os.WTERMSIG(code))
336 340
337 341
338 342 def _windowsworker(ui, func, staticargs, args, hasretval):
339 343 class Worker(threading.Thread):
340 344 def __init__(
341 345 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
342 346 ):
343 347 threading.Thread.__init__(self, *args, **kwargs)
344 348 self._taskqueue = taskqueue
345 349 self._resultqueue = resultqueue
346 350 self._func = func
347 351 self._staticargs = staticargs
348 352 self._interrupted = False
349 353 self.daemon = True
350 354 self.exception = None
351 355
352 356 def interrupt(self):
353 357 self._interrupted = True
354 358
355 359 def run(self):
356 360 try:
357 361 while not self._taskqueue.empty():
358 362 try:
359 363 args = self._taskqueue.get_nowait()
360 364 for res in self._func(*self._staticargs + (args,)):
361 365 self._resultqueue.put(res)
362 366 # threading doesn't provide a native way to
363 367 # interrupt execution. handle it manually at every
364 368 # iteration.
365 369 if self._interrupted:
366 370 return
367 371 except pycompat.queue.Empty:
368 372 break
369 373 except Exception as e:
370 374 # store the exception such that the main thread can resurface
371 375 # it as if the func was running without workers.
372 376 self.exception = e
373 377 raise
374 378
375 379 threads = []
376 380
377 381 def trykillworkers():
378 382 # Allow up to 1 second to clean worker threads nicely
379 383 cleanupend = time.time() + 1
380 384 for t in threads:
381 385 t.interrupt()
382 386 for t in threads:
383 387 remainingtime = cleanupend - time.time()
384 388 t.join(remainingtime)
385 389 if t.is_alive():
386 390 # pass over the workers joining failure. it is more
387 391 # important to surface the inital exception than the
388 392 # fact that one of workers may be processing a large
389 393 # task and does not get to handle the interruption.
390 394 ui.warn(
391 395 _(
392 396 b"failed to kill worker threads while "
393 397 b"handling an exception\n"
394 398 )
395 399 )
396 400 return
397 401
398 402 workers = _numworkers(ui)
399 403 resultqueue = pycompat.queue.Queue()
400 404 taskqueue = pycompat.queue.Queue()
401 405 retval = {}
402 406 # partition work to more pieces than workers to minimize the chance
403 407 # of uneven distribution of large tasks between the workers
404 408 for pargs in partition(args, workers * 20):
405 409 taskqueue.put(pargs)
406 410 for _i in range(workers):
407 411 t = Worker(taskqueue, resultqueue, func, staticargs)
408 412 threads.append(t)
409 413 t.start()
410 414 try:
411 415 while len(threads) > 0:
412 416 while not resultqueue.empty():
413 417 res = resultqueue.get()
414 418 if hasretval and res[0]:
415 419 retval.update(res[1])
416 420 else:
417 421 yield res
418 422 threads[0].join(0.05)
419 423 finishedthreads = [_t for _t in threads if not _t.is_alive()]
420 424 for t in finishedthreads:
421 425 if t.exception is not None:
422 426 raise t.exception
423 427 threads.remove(t)
424 428 except (Exception, KeyboardInterrupt): # re-raises
425 429 trykillworkers()
426 430 raise
427 431 while not resultqueue.empty():
428 432 res = resultqueue.get()
429 433 if hasretval and res[0]:
430 434 retval.update(res[1])
431 435 else:
432 436 yield res
433 437 if hasretval:
434 438 yield True, retval
435 439
436 440
437 441 if pycompat.iswindows:
438 442 _platformworker = _windowsworker
439 443 else:
440 444 _platformworker = _posixworker
441 445 _exitstatus = _posixexitstatus
442 446
443 447
444 448 def partition(lst, nslices):
445 449 """partition a list into N slices of roughly equal size
446 450
447 451 The current strategy takes every Nth element from the input. If
448 452 we ever write workers that need to preserve grouping in input
449 453 we should consider allowing callers to specify a partition strategy.
450 454
451 455 olivia is not a fan of this partitioning strategy when files are involved.
452 456 In his words:
453 457
454 458 Single-threaded Mercurial makes a point of creating and visiting
455 459 files in a fixed order (alphabetical). When creating files in order,
456 460 a typical filesystem is likely to allocate them on nearby regions on
457 461 disk. Thus, when revisiting in the same order, locality is maximized
458 462 and various forms of OS and disk-level caching and read-ahead get a
459 463 chance to work.
460 464
461 465 This effect can be quite significant on spinning disks. I discovered it
462 466 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
463 467 Tarring a repo and copying it to another disk effectively randomized
464 468 the revlog ordering on disk by sorting the revlogs by hash and suddenly
465 469 performance of my kernel checkout benchmark dropped by ~10x because the
466 470 "working set" of sectors visited no longer fit in the drive's cache and
467 471 the workload switched from streaming to random I/O.
468 472
469 473 What we should really be doing is have workers read filenames from a
470 474 ordered queue. This preserves locality and also keeps any worker from
471 475 getting more than one file out of balance.
472 476 """
473 477 for i in range(nslices):
474 478 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now