##// END OF EJS Templates
worker: adapt _blockingreader to work around a python3.8.[0-1] bug (issue6444)...
Matt Harbison -
r50103:2fe4efaa stable
parent child Browse files
Show More
@@ -1,468 +1,483 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 util,
31 31 )
32 32
33 33
34 34 def countcpus():
35 35 '''try to count the number of CPUs on the system'''
36 36
37 37 # posix
38 38 try:
39 39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 40 if n > 0:
41 41 return n
42 42 except (AttributeError, ValueError):
43 43 pass
44 44
45 45 # windows
46 46 try:
47 47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 48 if n > 0:
49 49 return n
50 50 except (KeyError, ValueError):
51 51 pass
52 52
53 53 return 1
54 54
55 55
56 56 def _numworkers(ui):
57 57 s = ui.config(b'worker', b'numcpus')
58 58 if s:
59 59 try:
60 60 n = int(s)
61 61 if n >= 1:
62 62 return n
63 63 except ValueError:
64 64 raise error.Abort(_(b'number of cpus must be an integer'))
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 68 if pycompat.ispy3:
69 69
70 70 def ismainthread():
71 71 return threading.current_thread() == threading.main_thread()
72 72
73 73 class _blockingreader(object):
74 74 def __init__(self, wrapped):
75 75 self._wrapped = wrapped
76 76
77 77 # Do NOT implement readinto() by making it delegate to
78 78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
79 79 # with just read() and readline(), so we don't need to implement it.
80 80
81 if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
82
83 # This is required for python 3.8, prior to 3.8.2. See issue6444.
84 def readinto(self, b):
85 pos = 0
86 size = len(b)
87
88 while pos < size:
89 ret = self._wrapped.readinto(b[pos:])
90 if not ret:
91 break
92 pos += ret
93
94 return pos
95
81 96 def readline(self):
82 97 return self._wrapped.readline()
83 98
84 99 # issue multiple reads until size is fulfilled
85 100 def read(self, size=-1):
86 101 if size < 0:
87 102 return self._wrapped.readall()
88 103
89 104 buf = bytearray(size)
90 105 view = memoryview(buf)
91 106 pos = 0
92 107
93 108 while pos < size:
94 109 ret = self._wrapped.readinto(view[pos:])
95 110 if not ret:
96 111 break
97 112 pos += ret
98 113
99 114 del view
100 115 del buf[pos:]
101 116 return bytes(buf)
102 117
103 118
104 119 else:
105 120
106 121 def ismainthread():
107 122 # pytype: disable=module-attr
108 123 return isinstance(threading.current_thread(), threading._MainThread)
109 124 # pytype: enable=module-attr
110 125
111 126 def _blockingreader(wrapped):
112 127 return wrapped
113 128
114 129
115 130 if pycompat.isposix or pycompat.iswindows:
116 131 _STARTUP_COST = 0.01
117 132 # The Windows worker is thread based. If tasks are CPU bound, threads
118 133 # in the presence of the GIL result in excessive context switching and
119 134 # this overhead can slow down execution.
120 135 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
121 136 else:
122 137 _STARTUP_COST = 1e30
123 138 _DISALLOW_THREAD_UNSAFE = False
124 139
125 140
126 141 def worthwhile(ui, costperop, nops, threadsafe=True):
127 142 """try to determine whether the benefit of multiple processes can
128 143 outweigh the cost of starting them"""
129 144
130 145 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
131 146 return False
132 147
133 148 linear = costperop * nops
134 149 workers = _numworkers(ui)
135 150 benefit = linear - (_STARTUP_COST * workers + linear / workers)
136 151 return benefit >= 0.15
137 152
138 153
139 154 def worker(
140 155 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
141 156 ):
142 157 """run a function, possibly in parallel in multiple worker
143 158 processes.
144 159
145 160 returns a progress iterator
146 161
147 162 costperarg - cost of a single task
148 163
149 164 func - function to run. It is expected to return a progress iterator.
150 165
151 166 staticargs - arguments to pass to every invocation of the function
152 167
153 168 args - arguments to split into chunks, to pass to individual
154 169 workers
155 170
156 171 hasretval - when True, func and the current function return an progress
157 172 iterator then a dict (encoded as an iterator that yield many (False, ..)
158 173 then a (True, dict)). The dicts are joined in some arbitrary order, so
159 174 overlapping keys are a bad idea.
160 175
161 176 threadsafe - whether work items are thread safe and can be executed using
162 177 a thread-based worker. Should be disabled for CPU heavy tasks that don't
163 178 release the GIL.
164 179 """
165 180 enabled = ui.configbool(b'worker', b'enabled')
166 181 if enabled and _platformworker is _posixworker and not ismainthread():
167 182 # The POSIX worker has to install a handler for SIGCHLD.
168 183 # Python up to 3.9 only allows this in the main thread.
169 184 enabled = False
170 185
171 186 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
172 187 return _platformworker(ui, func, staticargs, args, hasretval)
173 188 return func(*staticargs + (args,))
174 189
175 190
176 191 def _posixworker(ui, func, staticargs, args, hasretval):
177 192 workers = _numworkers(ui)
178 193 oldhandler = signal.getsignal(signal.SIGINT)
179 194 signal.signal(signal.SIGINT, signal.SIG_IGN)
180 195 pids, problem = set(), [0]
181 196
182 197 def killworkers():
183 198 # unregister SIGCHLD handler as all children will be killed. This
184 199 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
185 200 # could be updated while iterating, which would cause inconsistency.
186 201 signal.signal(signal.SIGCHLD, oldchldhandler)
187 202 # if one worker bails, there's no good reason to wait for the rest
188 203 for p in pids:
189 204 try:
190 205 os.kill(p, signal.SIGTERM)
191 206 except OSError as err:
192 207 if err.errno != errno.ESRCH:
193 208 raise
194 209
195 210 def waitforworkers(blocking=True):
196 211 for pid in pids.copy():
197 212 p = st = 0
198 213 while True:
199 214 try:
200 215 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
201 216 break
202 217 except OSError as e:
203 218 if e.errno == errno.EINTR:
204 219 continue
205 220 elif e.errno == errno.ECHILD:
206 221 # child would already be reaped, but pids yet been
207 222 # updated (maybe interrupted just after waitpid)
208 223 pids.discard(pid)
209 224 break
210 225 else:
211 226 raise
212 227 if not p:
213 228 # skip subsequent steps, because child process should
214 229 # be still running in this case
215 230 continue
216 231 pids.discard(p)
217 232 st = _exitstatus(st)
218 233 if st and not problem[0]:
219 234 problem[0] = st
220 235
221 236 def sigchldhandler(signum, frame):
222 237 waitforworkers(blocking=False)
223 238 if problem[0]:
224 239 killworkers()
225 240
226 241 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
227 242 ui.flush()
228 243 parentpid = os.getpid()
229 244 pipes = []
230 245 retval = {}
231 246 for pargs in partition(args, min(workers, len(args))):
232 247 # Every worker gets its own pipe to send results on, so we don't have to
233 248 # implement atomic writes larger than PIPE_BUF. Each forked process has
234 249 # its own pipe's descriptors in the local variables, and the parent
235 250 # process has the full list of pipe descriptors (and it doesn't really
236 251 # care what order they're in).
237 252 rfd, wfd = os.pipe()
238 253 pipes.append((rfd, wfd))
239 254 # make sure we use os._exit in all worker code paths. otherwise the
240 255 # worker may do some clean-ups which could cause surprises like
241 256 # deadlock. see sshpeer.cleanup for example.
242 257 # override error handling *before* fork. this is necessary because
243 258 # exception (signal) may arrive after fork, before "pid =" assignment
244 259 # completes, and other exception handler (dispatch.py) can lead to
245 260 # unexpected code path without os._exit.
246 261 ret = -1
247 262 try:
248 263 pid = os.fork()
249 264 if pid == 0:
250 265 signal.signal(signal.SIGINT, oldhandler)
251 266 signal.signal(signal.SIGCHLD, oldchldhandler)
252 267
253 268 def workerfunc():
254 269 for r, w in pipes[:-1]:
255 270 os.close(r)
256 271 os.close(w)
257 272 os.close(rfd)
258 273 for result in func(*(staticargs + (pargs,))):
259 274 os.write(wfd, util.pickle.dumps(result))
260 275 return 0
261 276
262 277 ret = scmutil.callcatch(ui, workerfunc)
263 278 except: # parent re-raises, child never returns
264 279 if os.getpid() == parentpid:
265 280 raise
266 281 exctype = sys.exc_info()[0]
267 282 force = not issubclass(exctype, KeyboardInterrupt)
268 283 ui.traceback(force=force)
269 284 finally:
270 285 if os.getpid() != parentpid:
271 286 try:
272 287 ui.flush()
273 288 except: # never returns, no re-raises
274 289 pass
275 290 finally:
276 291 os._exit(ret & 255)
277 292 pids.add(pid)
278 293 selector = selectors.DefaultSelector()
279 294 for rfd, wfd in pipes:
280 295 os.close(wfd)
281 296 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
282 297
283 298 def cleanup():
284 299 signal.signal(signal.SIGINT, oldhandler)
285 300 waitforworkers()
286 301 signal.signal(signal.SIGCHLD, oldchldhandler)
287 302 selector.close()
288 303 return problem[0]
289 304
290 305 try:
291 306 openpipes = len(pipes)
292 307 while openpipes > 0:
293 308 for key, events in selector.select():
294 309 try:
295 310 res = util.pickle.load(_blockingreader(key.fileobj))
296 311 if hasretval and res[0]:
297 312 retval.update(res[1])
298 313 else:
299 314 yield res
300 315 except EOFError:
301 316 selector.unregister(key.fileobj)
302 317 key.fileobj.close()
303 318 openpipes -= 1
304 319 except IOError as e:
305 320 if e.errno == errno.EINTR:
306 321 continue
307 322 raise
308 323 except: # re-raises
309 324 killworkers()
310 325 cleanup()
311 326 raise
312 327 status = cleanup()
313 328 if status:
314 329 if status < 0:
315 330 os.kill(os.getpid(), -status)
316 331 raise error.WorkerError(status)
317 332 if hasretval:
318 333 yield True, retval
319 334
320 335
321 336 def _posixexitstatus(code):
322 337 """convert a posix exit status into the same form returned by
323 338 os.spawnv
324 339
325 340 returns None if the process was stopped instead of exiting"""
326 341 if os.WIFEXITED(code):
327 342 return os.WEXITSTATUS(code)
328 343 elif os.WIFSIGNALED(code):
329 344 return -(os.WTERMSIG(code))
330 345
331 346
332 347 def _windowsworker(ui, func, staticargs, args, hasretval):
333 348 class Worker(threading.Thread):
334 349 def __init__(
335 350 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
336 351 ):
337 352 threading.Thread.__init__(self, *args, **kwargs)
338 353 self._taskqueue = taskqueue
339 354 self._resultqueue = resultqueue
340 355 self._func = func
341 356 self._staticargs = staticargs
342 357 self._interrupted = False
343 358 self.daemon = True
344 359 self.exception = None
345 360
346 361 def interrupt(self):
347 362 self._interrupted = True
348 363
349 364 def run(self):
350 365 try:
351 366 while not self._taskqueue.empty():
352 367 try:
353 368 args = self._taskqueue.get_nowait()
354 369 for res in self._func(*self._staticargs + (args,)):
355 370 self._resultqueue.put(res)
356 371 # threading doesn't provide a native way to
357 372 # interrupt execution. handle it manually at every
358 373 # iteration.
359 374 if self._interrupted:
360 375 return
361 376 except pycompat.queue.Empty:
362 377 break
363 378 except Exception as e:
364 379 # store the exception such that the main thread can resurface
365 380 # it as if the func was running without workers.
366 381 self.exception = e
367 382 raise
368 383
369 384 threads = []
370 385
371 386 def trykillworkers():
372 387 # Allow up to 1 second to clean worker threads nicely
373 388 cleanupend = time.time() + 1
374 389 for t in threads:
375 390 t.interrupt()
376 391 for t in threads:
377 392 remainingtime = cleanupend - time.time()
378 393 t.join(remainingtime)
379 394 if t.is_alive():
380 395 # pass over the workers joining failure. it is more
381 396 # important to surface the inital exception than the
382 397 # fact that one of workers may be processing a large
383 398 # task and does not get to handle the interruption.
384 399 ui.warn(
385 400 _(
386 401 b"failed to kill worker threads while "
387 402 b"handling an exception\n"
388 403 )
389 404 )
390 405 return
391 406
392 407 workers = _numworkers(ui)
393 408 resultqueue = pycompat.queue.Queue()
394 409 taskqueue = pycompat.queue.Queue()
395 410 retval = {}
396 411 # partition work to more pieces than workers to minimize the chance
397 412 # of uneven distribution of large tasks between the workers
398 413 for pargs in partition(args, workers * 20):
399 414 taskqueue.put(pargs)
400 415 for _i in range(workers):
401 416 t = Worker(taskqueue, resultqueue, func, staticargs)
402 417 threads.append(t)
403 418 t.start()
404 419 try:
405 420 while len(threads) > 0:
406 421 while not resultqueue.empty():
407 422 res = resultqueue.get()
408 423 if hasretval and res[0]:
409 424 retval.update(res[1])
410 425 else:
411 426 yield res
412 427 threads[0].join(0.05)
413 428 finishedthreads = [_t for _t in threads if not _t.is_alive()]
414 429 for t in finishedthreads:
415 430 if t.exception is not None:
416 431 raise t.exception
417 432 threads.remove(t)
418 433 except (Exception, KeyboardInterrupt): # re-raises
419 434 trykillworkers()
420 435 raise
421 436 while not resultqueue.empty():
422 437 res = resultqueue.get()
423 438 if hasretval and res[0]:
424 439 retval.update(res[1])
425 440 else:
426 441 yield res
427 442 if hasretval:
428 443 yield True, retval
429 444
430 445
431 446 if pycompat.iswindows:
432 447 _platformworker = _windowsworker
433 448 else:
434 449 _platformworker = _posixworker
435 450 _exitstatus = _posixexitstatus
436 451
437 452
438 453 def partition(lst, nslices):
439 454 """partition a list into N slices of roughly equal size
440 455
441 456 The current strategy takes every Nth element from the input. If
442 457 we ever write workers that need to preserve grouping in input
443 458 we should consider allowing callers to specify a partition strategy.
444 459
445 460 olivia is not a fan of this partitioning strategy when files are involved.
446 461 In his words:
447 462
448 463 Single-threaded Mercurial makes a point of creating and visiting
449 464 files in a fixed order (alphabetical). When creating files in order,
450 465 a typical filesystem is likely to allocate them on nearby regions on
451 466 disk. Thus, when revisiting in the same order, locality is maximized
452 467 and various forms of OS and disk-level caching and read-ahead get a
453 468 chance to work.
454 469
455 470 This effect can be quite significant on spinning disks. I discovered it
456 471 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
457 472 Tarring a repo and copying it to another disk effectively randomized
458 473 the revlog ordering on disk by sorting the revlogs by hash and suddenly
459 474 performance of my kernel checkout benchmark dropped by ~10x because the
460 475 "working set" of sectors visited no longer fit in the drive's cache and
461 476 the workload switched from streaming to random I/O.
462 477
463 478 What we should really be doing is have workers read filenames from a
464 479 ordered queue. This preserves locality and also keeps any worker from
465 480 getting more than one file out of balance.
466 481 """
467 482 for i in range(nslices):
468 483 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now