##// END OF EJS Templates
worker: add docstring to _blockingreader
Manuel Jacob -
r50126:4d42a5fb default
parent child Browse files
Show More
@@ -1,478 +1,485 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 )
31 31
32 32
33 33 def countcpus():
34 34 '''try to count the number of CPUs on the system'''
35 35
36 36 # posix
37 37 try:
38 38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 39 if n > 0:
40 40 return n
41 41 except (AttributeError, ValueError):
42 42 pass
43 43
44 44 # windows
45 45 try:
46 46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 47 if n > 0:
48 48 return n
49 49 except (KeyError, ValueError):
50 50 pass
51 51
52 52 return 1
53 53
54 54
55 55 def _numworkers(ui):
56 56 s = ui.config(b'worker', b'numcpus')
57 57 if s:
58 58 try:
59 59 n = int(s)
60 60 if n >= 1:
61 61 return n
62 62 except ValueError:
63 63 raise error.Abort(_(b'number of cpus must be an integer'))
64 64 return min(max(countcpus(), 4), 32)
65 65
66 66
67 67 def ismainthread():
68 68 return threading.current_thread() == threading.main_thread()
69 69
70 70
71 71 class _blockingreader:
72 """Wrap unbuffered stream such that pickle.load() works with it.
73
74 pickle.load() expects that calls to read() and readinto() read as many
75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 pickle.load() raises an EOFError.
77 """
78
72 79 def __init__(self, wrapped):
73 80 self._wrapped = wrapped
74 81
75 82 # Do NOT implement readinto() by making it delegate to
76 83 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 84 # with just read() and readline(), so we don't need to implement it.
78 85
79 86 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
80 87
81 88 # This is required for python 3.8, prior to 3.8.2. See issue6444.
82 89 def readinto(self, b):
83 90 pos = 0
84 91 size = len(b)
85 92
86 93 while pos < size:
87 94 ret = self._wrapped.readinto(b[pos:])
88 95 if not ret:
89 96 break
90 97 pos += ret
91 98
92 99 return pos
93 100
94 101 def readline(self):
95 102 return self._wrapped.readline()
96 103
97 # issue multiple reads until size is fulfilled
104 # issue multiple reads until size is fulfilled (or EOF is encountered)
98 105 def read(self, size=-1):
99 106 if size < 0:
100 107 return self._wrapped.readall()
101 108
102 109 buf = bytearray(size)
103 110 view = memoryview(buf)
104 111 pos = 0
105 112
106 113 while pos < size:
107 114 ret = self._wrapped.readinto(view[pos:])
108 115 if not ret:
109 116 break
110 117 pos += ret
111 118
112 119 del view
113 120 del buf[pos:]
114 121 return bytes(buf)
115 122
116 123
117 124 if pycompat.isposix or pycompat.iswindows:
118 125 _STARTUP_COST = 0.01
119 126 # The Windows worker is thread based. If tasks are CPU bound, threads
120 127 # in the presence of the GIL result in excessive context switching and
121 128 # this overhead can slow down execution.
122 129 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
123 130 else:
124 131 _STARTUP_COST = 1e30
125 132 _DISALLOW_THREAD_UNSAFE = False
126 133
127 134
128 135 def worthwhile(ui, costperop, nops, threadsafe=True):
129 136 """try to determine whether the benefit of multiple processes can
130 137 outweigh the cost of starting them"""
131 138
132 139 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
133 140 return False
134 141
135 142 linear = costperop * nops
136 143 workers = _numworkers(ui)
137 144 benefit = linear - (_STARTUP_COST * workers + linear / workers)
138 145 return benefit >= 0.15
139 146
140 147
141 148 def worker(
142 149 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
143 150 ):
144 151 """run a function, possibly in parallel in multiple worker
145 152 processes.
146 153
147 154 returns a progress iterator
148 155
149 156 costperarg - cost of a single task
150 157
151 158 func - function to run. It is expected to return a progress iterator.
152 159
153 160 staticargs - arguments to pass to every invocation of the function
154 161
155 162 args - arguments to split into chunks, to pass to individual
156 163 workers
157 164
158 165 hasretval - when True, func and the current function return an progress
159 166 iterator then a dict (encoded as an iterator that yield many (False, ..)
160 167 then a (True, dict)). The dicts are joined in some arbitrary order, so
161 168 overlapping keys are a bad idea.
162 169
163 170 threadsafe - whether work items are thread safe and can be executed using
164 171 a thread-based worker. Should be disabled for CPU heavy tasks that don't
165 172 release the GIL.
166 173 """
167 174 enabled = ui.configbool(b'worker', b'enabled')
168 175 if enabled and _platformworker is _posixworker and not ismainthread():
169 176 # The POSIX worker has to install a handler for SIGCHLD.
170 177 # Python up to 3.9 only allows this in the main thread.
171 178 enabled = False
172 179
173 180 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
174 181 return _platformworker(ui, func, staticargs, args, hasretval)
175 182 return func(*staticargs + (args,))
176 183
177 184
178 185 def _posixworker(ui, func, staticargs, args, hasretval):
179 186 workers = _numworkers(ui)
180 187 oldhandler = signal.getsignal(signal.SIGINT)
181 188 signal.signal(signal.SIGINT, signal.SIG_IGN)
182 189 pids, problem = set(), [0]
183 190
184 191 def killworkers():
185 192 # unregister SIGCHLD handler as all children will be killed. This
186 193 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
187 194 # could be updated while iterating, which would cause inconsistency.
188 195 signal.signal(signal.SIGCHLD, oldchldhandler)
189 196 # if one worker bails, there's no good reason to wait for the rest
190 197 for p in pids:
191 198 try:
192 199 os.kill(p, signal.SIGTERM)
193 200 except OSError as err:
194 201 if err.errno != errno.ESRCH:
195 202 raise
196 203
197 204 def waitforworkers(blocking=True):
198 205 for pid in pids.copy():
199 206 p = st = 0
200 207 while True:
201 208 try:
202 209 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
203 210 break
204 211 except OSError as e:
205 212 if e.errno == errno.EINTR:
206 213 continue
207 214 elif e.errno == errno.ECHILD:
208 215 # child would already be reaped, but pids yet been
209 216 # updated (maybe interrupted just after waitpid)
210 217 pids.discard(pid)
211 218 break
212 219 else:
213 220 raise
214 221 if not p:
215 222 # skip subsequent steps, because child process should
216 223 # be still running in this case
217 224 continue
218 225 pids.discard(p)
219 226 st = _exitstatus(st)
220 227 if st and not problem[0]:
221 228 problem[0] = st
222 229
223 230 def sigchldhandler(signum, frame):
224 231 waitforworkers(blocking=False)
225 232 if problem[0]:
226 233 killworkers()
227 234
228 235 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
229 236 ui.flush()
230 237 parentpid = os.getpid()
231 238 pipes = []
232 239 retval = {}
233 240 for pargs in partition(args, min(workers, len(args))):
234 241 # Every worker gets its own pipe to send results on, so we don't have to
235 242 # implement atomic writes larger than PIPE_BUF. Each forked process has
236 243 # its own pipe's descriptors in the local variables, and the parent
237 244 # process has the full list of pipe descriptors (and it doesn't really
238 245 # care what order they're in).
239 246 rfd, wfd = os.pipe()
240 247 pipes.append((rfd, wfd))
241 248 # make sure we use os._exit in all worker code paths. otherwise the
242 249 # worker may do some clean-ups which could cause surprises like
243 250 # deadlock. see sshpeer.cleanup for example.
244 251 # override error handling *before* fork. this is necessary because
245 252 # exception (signal) may arrive after fork, before "pid =" assignment
246 253 # completes, and other exception handler (dispatch.py) can lead to
247 254 # unexpected code path without os._exit.
248 255 ret = -1
249 256 try:
250 257 pid = os.fork()
251 258 if pid == 0:
252 259 signal.signal(signal.SIGINT, oldhandler)
253 260 signal.signal(signal.SIGCHLD, oldchldhandler)
254 261
255 262 def workerfunc():
256 263 for r, w in pipes[:-1]:
257 264 os.close(r)
258 265 os.close(w)
259 266 os.close(rfd)
260 267 for result in func(*(staticargs + (pargs,))):
261 268 os.write(wfd, pickle.dumps(result))
262 269 return 0
263 270
264 271 ret = scmutil.callcatch(ui, workerfunc)
265 272 except: # parent re-raises, child never returns
266 273 if os.getpid() == parentpid:
267 274 raise
268 275 exctype = sys.exc_info()[0]
269 276 force = not issubclass(exctype, KeyboardInterrupt)
270 277 ui.traceback(force=force)
271 278 finally:
272 279 if os.getpid() != parentpid:
273 280 try:
274 281 ui.flush()
275 282 except: # never returns, no re-raises
276 283 pass
277 284 finally:
278 285 os._exit(ret & 255)
279 286 pids.add(pid)
280 287 selector = selectors.DefaultSelector()
281 288 for rfd, wfd in pipes:
282 289 os.close(wfd)
283 290 # The stream has to be unbuffered. Otherwise, if all data is read from
284 291 # the raw file into the buffer, the selector thinks that the FD is not
285 292 # ready to read while pickle.load() could read from the buffer. This
286 293 # would delay the processing of readable items.
287 294 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
288 295
289 296 def cleanup():
290 297 signal.signal(signal.SIGINT, oldhandler)
291 298 waitforworkers()
292 299 signal.signal(signal.SIGCHLD, oldchldhandler)
293 300 selector.close()
294 301 return problem[0]
295 302
296 303 try:
297 304 openpipes = len(pipes)
298 305 while openpipes > 0:
299 306 for key, events in selector.select():
300 307 try:
301 308 # The pytype error likely goes away on a modern version of
302 309 # pytype having a modern typeshed snapshot.
303 310 # pytype: disable=wrong-arg-types
304 311 res = pickle.load(_blockingreader(key.fileobj))
305 312 # pytype: enable=wrong-arg-types
306 313 if hasretval and res[0]:
307 314 retval.update(res[1])
308 315 else:
309 316 yield res
310 317 except EOFError:
311 318 selector.unregister(key.fileobj)
312 319 key.fileobj.close()
313 320 openpipes -= 1
314 321 except IOError as e:
315 322 if e.errno == errno.EINTR:
316 323 continue
317 324 raise
318 325 except: # re-raises
319 326 killworkers()
320 327 cleanup()
321 328 raise
322 329 status = cleanup()
323 330 if status:
324 331 if status < 0:
325 332 os.kill(os.getpid(), -status)
326 333 raise error.WorkerError(status)
327 334 if hasretval:
328 335 yield True, retval
329 336
330 337
331 338 def _posixexitstatus(code):
332 339 """convert a posix exit status into the same form returned by
333 340 os.spawnv
334 341
335 342 returns None if the process was stopped instead of exiting"""
336 343 if os.WIFEXITED(code):
337 344 return os.WEXITSTATUS(code)
338 345 elif os.WIFSIGNALED(code):
339 346 return -(os.WTERMSIG(code))
340 347
341 348
342 349 def _windowsworker(ui, func, staticargs, args, hasretval):
343 350 class Worker(threading.Thread):
344 351 def __init__(
345 352 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
346 353 ):
347 354 threading.Thread.__init__(self, *args, **kwargs)
348 355 self._taskqueue = taskqueue
349 356 self._resultqueue = resultqueue
350 357 self._func = func
351 358 self._staticargs = staticargs
352 359 self._interrupted = False
353 360 self.daemon = True
354 361 self.exception = None
355 362
356 363 def interrupt(self):
357 364 self._interrupted = True
358 365
359 366 def run(self):
360 367 try:
361 368 while not self._taskqueue.empty():
362 369 try:
363 370 args = self._taskqueue.get_nowait()
364 371 for res in self._func(*self._staticargs + (args,)):
365 372 self._resultqueue.put(res)
366 373 # threading doesn't provide a native way to
367 374 # interrupt execution. handle it manually at every
368 375 # iteration.
369 376 if self._interrupted:
370 377 return
371 378 except pycompat.queue.Empty:
372 379 break
373 380 except Exception as e:
374 381 # store the exception such that the main thread can resurface
375 382 # it as if the func was running without workers.
376 383 self.exception = e
377 384 raise
378 385
379 386 threads = []
380 387
381 388 def trykillworkers():
382 389 # Allow up to 1 second to clean worker threads nicely
383 390 cleanupend = time.time() + 1
384 391 for t in threads:
385 392 t.interrupt()
386 393 for t in threads:
387 394 remainingtime = cleanupend - time.time()
388 395 t.join(remainingtime)
389 396 if t.is_alive():
390 397 # pass over the workers joining failure. it is more
391 398 # important to surface the inital exception than the
392 399 # fact that one of workers may be processing a large
393 400 # task and does not get to handle the interruption.
394 401 ui.warn(
395 402 _(
396 403 b"failed to kill worker threads while "
397 404 b"handling an exception\n"
398 405 )
399 406 )
400 407 return
401 408
402 409 workers = _numworkers(ui)
403 410 resultqueue = pycompat.queue.Queue()
404 411 taskqueue = pycompat.queue.Queue()
405 412 retval = {}
406 413 # partition work to more pieces than workers to minimize the chance
407 414 # of uneven distribution of large tasks between the workers
408 415 for pargs in partition(args, workers * 20):
409 416 taskqueue.put(pargs)
410 417 for _i in range(workers):
411 418 t = Worker(taskqueue, resultqueue, func, staticargs)
412 419 threads.append(t)
413 420 t.start()
414 421 try:
415 422 while len(threads) > 0:
416 423 while not resultqueue.empty():
417 424 res = resultqueue.get()
418 425 if hasretval and res[0]:
419 426 retval.update(res[1])
420 427 else:
421 428 yield res
422 429 threads[0].join(0.05)
423 430 finishedthreads = [_t for _t in threads if not _t.is_alive()]
424 431 for t in finishedthreads:
425 432 if t.exception is not None:
426 433 raise t.exception
427 434 threads.remove(t)
428 435 except (Exception, KeyboardInterrupt): # re-raises
429 436 trykillworkers()
430 437 raise
431 438 while not resultqueue.empty():
432 439 res = resultqueue.get()
433 440 if hasretval and res[0]:
434 441 retval.update(res[1])
435 442 else:
436 443 yield res
437 444 if hasretval:
438 445 yield True, retval
439 446
440 447
441 448 if pycompat.iswindows:
442 449 _platformworker = _windowsworker
443 450 else:
444 451 _platformworker = _posixworker
445 452 _exitstatus = _posixexitstatus
446 453
447 454
448 455 def partition(lst, nslices):
449 456 """partition a list into N slices of roughly equal size
450 457
451 458 The current strategy takes every Nth element from the input. If
452 459 we ever write workers that need to preserve grouping in input
453 460 we should consider allowing callers to specify a partition strategy.
454 461
455 462 olivia is not a fan of this partitioning strategy when files are involved.
456 463 In his words:
457 464
458 465 Single-threaded Mercurial makes a point of creating and visiting
459 466 files in a fixed order (alphabetical). When creating files in order,
460 467 a typical filesystem is likely to allocate them on nearby regions on
461 468 disk. Thus, when revisiting in the same order, locality is maximized
462 469 and various forms of OS and disk-level caching and read-ahead get a
463 470 chance to work.
464 471
465 472 This effect can be quite significant on spinning disks. I discovered it
466 473 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
467 474 Tarring a repo and copying it to another disk effectively randomized
468 475 the revlog ordering on disk by sorting the revlogs by hash and suddenly
469 476 performance of my kernel checkout benchmark dropped by ~10x because the
470 477 "working set" of sectors visited no longer fit in the drive's cache and
471 478 the workload switched from streaming to random I/O.
472 479
473 480 What we should really be doing is have workers read filenames from a
474 481 ordered queue. This preserves locality and also keeps any worker from
475 482 getting more than one file out of balance.
476 483 """
477 484 for i in range(nslices):
478 485 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now