##// END OF EJS Templates
typing: disable a module-attr warning in the worker module's py2 code...
Matt Harbison -
r47525:f6480620 stable
parent child Browse files
Show More
@@ -1,466 +1,468 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 util,
31 31 )
32 32
33 33
34 34 def countcpus():
35 35 '''try to count the number of CPUs on the system'''
36 36
37 37 # posix
38 38 try:
39 39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 40 if n > 0:
41 41 return n
42 42 except (AttributeError, ValueError):
43 43 pass
44 44
45 45 # windows
46 46 try:
47 47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 48 if n > 0:
49 49 return n
50 50 except (KeyError, ValueError):
51 51 pass
52 52
53 53 return 1
54 54
55 55
56 56 def _numworkers(ui):
57 57 s = ui.config(b'worker', b'numcpus')
58 58 if s:
59 59 try:
60 60 n = int(s)
61 61 if n >= 1:
62 62 return n
63 63 except ValueError:
64 64 raise error.Abort(_(b'number of cpus must be an integer'))
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 68 if pycompat.ispy3:
69 69
70 70 def ismainthread():
71 71 return threading.current_thread() == threading.main_thread()
72 72
73 73 class _blockingreader(object):
74 74 def __init__(self, wrapped):
75 75 self._wrapped = wrapped
76 76
77 77 # Do NOT implement readinto() by making it delegate to
78 78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
79 79 # with just read() and readline(), so we don't need to implement it.
80 80
81 81 def readline(self):
82 82 return self._wrapped.readline()
83 83
84 84 # issue multiple reads until size is fulfilled
85 85 def read(self, size=-1):
86 86 if size < 0:
87 87 return self._wrapped.readall()
88 88
89 89 buf = bytearray(size)
90 90 view = memoryview(buf)
91 91 pos = 0
92 92
93 93 while pos < size:
94 94 ret = self._wrapped.readinto(view[pos:])
95 95 if not ret:
96 96 break
97 97 pos += ret
98 98
99 99 del view
100 100 del buf[pos:]
101 101 return bytes(buf)
102 102
103 103
104 104 else:
105 105
106 106 def ismainthread():
107 # pytype: disable=module-attr
107 108 return isinstance(threading.current_thread(), threading._MainThread)
109 # pytype: enable=module-attr
108 110
109 111 def _blockingreader(wrapped):
110 112 return wrapped
111 113
112 114
113 115 if pycompat.isposix or pycompat.iswindows:
114 116 _STARTUP_COST = 0.01
115 117 # The Windows worker is thread based. If tasks are CPU bound, threads
116 118 # in the presence of the GIL result in excessive context switching and
117 119 # this overhead can slow down execution.
118 120 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
119 121 else:
120 122 _STARTUP_COST = 1e30
121 123 _DISALLOW_THREAD_UNSAFE = False
122 124
123 125
124 126 def worthwhile(ui, costperop, nops, threadsafe=True):
125 127 """try to determine whether the benefit of multiple processes can
126 128 outweigh the cost of starting them"""
127 129
128 130 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
129 131 return False
130 132
131 133 linear = costperop * nops
132 134 workers = _numworkers(ui)
133 135 benefit = linear - (_STARTUP_COST * workers + linear / workers)
134 136 return benefit >= 0.15
135 137
136 138
137 139 def worker(
138 140 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
139 141 ):
140 142 """run a function, possibly in parallel in multiple worker
141 143 processes.
142 144
143 145 returns a progress iterator
144 146
145 147 costperarg - cost of a single task
146 148
147 149 func - function to run. It is expected to return a progress iterator.
148 150
149 151 staticargs - arguments to pass to every invocation of the function
150 152
151 153 args - arguments to split into chunks, to pass to individual
152 154 workers
153 155
154 156 hasretval - when True, func and the current function return an progress
155 157 iterator then a dict (encoded as an iterator that yield many (False, ..)
156 158 then a (True, dict)). The dicts are joined in some arbitrary order, so
157 159 overlapping keys are a bad idea.
158 160
159 161 threadsafe - whether work items are thread safe and can be executed using
160 162 a thread-based worker. Should be disabled for CPU heavy tasks that don't
161 163 release the GIL.
162 164 """
163 165 enabled = ui.configbool(b'worker', b'enabled')
164 166 if enabled and _platformworker is _posixworker and not ismainthread():
165 167 # The POSIX worker has to install a handler for SIGCHLD.
166 168 # Python up to 3.9 only allows this in the main thread.
167 169 enabled = False
168 170
169 171 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
170 172 return _platformworker(ui, func, staticargs, args, hasretval)
171 173 return func(*staticargs + (args,))
172 174
173 175
174 176 def _posixworker(ui, func, staticargs, args, hasretval):
175 177 workers = _numworkers(ui)
176 178 oldhandler = signal.getsignal(signal.SIGINT)
177 179 signal.signal(signal.SIGINT, signal.SIG_IGN)
178 180 pids, problem = set(), [0]
179 181
180 182 def killworkers():
181 183 # unregister SIGCHLD handler as all children will be killed. This
182 184 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
183 185 # could be updated while iterating, which would cause inconsistency.
184 186 signal.signal(signal.SIGCHLD, oldchldhandler)
185 187 # if one worker bails, there's no good reason to wait for the rest
186 188 for p in pids:
187 189 try:
188 190 os.kill(p, signal.SIGTERM)
189 191 except OSError as err:
190 192 if err.errno != errno.ESRCH:
191 193 raise
192 194
193 195 def waitforworkers(blocking=True):
194 196 for pid in pids.copy():
195 197 p = st = 0
196 198 while True:
197 199 try:
198 200 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
199 201 break
200 202 except OSError as e:
201 203 if e.errno == errno.EINTR:
202 204 continue
203 205 elif e.errno == errno.ECHILD:
204 206 # child would already be reaped, but pids yet been
205 207 # updated (maybe interrupted just after waitpid)
206 208 pids.discard(pid)
207 209 break
208 210 else:
209 211 raise
210 212 if not p:
211 213 # skip subsequent steps, because child process should
212 214 # be still running in this case
213 215 continue
214 216 pids.discard(p)
215 217 st = _exitstatus(st)
216 218 if st and not problem[0]:
217 219 problem[0] = st
218 220
219 221 def sigchldhandler(signum, frame):
220 222 waitforworkers(blocking=False)
221 223 if problem[0]:
222 224 killworkers()
223 225
224 226 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
225 227 ui.flush()
226 228 parentpid = os.getpid()
227 229 pipes = []
228 230 retval = {}
229 231 for pargs in partition(args, min(workers, len(args))):
230 232 # Every worker gets its own pipe to send results on, so we don't have to
231 233 # implement atomic writes larger than PIPE_BUF. Each forked process has
232 234 # its own pipe's descriptors in the local variables, and the parent
233 235 # process has the full list of pipe descriptors (and it doesn't really
234 236 # care what order they're in).
235 237 rfd, wfd = os.pipe()
236 238 pipes.append((rfd, wfd))
237 239 # make sure we use os._exit in all worker code paths. otherwise the
238 240 # worker may do some clean-ups which could cause surprises like
239 241 # deadlock. see sshpeer.cleanup for example.
240 242 # override error handling *before* fork. this is necessary because
241 243 # exception (signal) may arrive after fork, before "pid =" assignment
242 244 # completes, and other exception handler (dispatch.py) can lead to
243 245 # unexpected code path without os._exit.
244 246 ret = -1
245 247 try:
246 248 pid = os.fork()
247 249 if pid == 0:
248 250 signal.signal(signal.SIGINT, oldhandler)
249 251 signal.signal(signal.SIGCHLD, oldchldhandler)
250 252
251 253 def workerfunc():
252 254 for r, w in pipes[:-1]:
253 255 os.close(r)
254 256 os.close(w)
255 257 os.close(rfd)
256 258 for result in func(*(staticargs + (pargs,))):
257 259 os.write(wfd, util.pickle.dumps(result))
258 260 return 0
259 261
260 262 ret = scmutil.callcatch(ui, workerfunc)
261 263 except: # parent re-raises, child never returns
262 264 if os.getpid() == parentpid:
263 265 raise
264 266 exctype = sys.exc_info()[0]
265 267 force = not issubclass(exctype, KeyboardInterrupt)
266 268 ui.traceback(force=force)
267 269 finally:
268 270 if os.getpid() != parentpid:
269 271 try:
270 272 ui.flush()
271 273 except: # never returns, no re-raises
272 274 pass
273 275 finally:
274 276 os._exit(ret & 255)
275 277 pids.add(pid)
276 278 selector = selectors.DefaultSelector()
277 279 for rfd, wfd in pipes:
278 280 os.close(wfd)
279 281 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
280 282
281 283 def cleanup():
282 284 signal.signal(signal.SIGINT, oldhandler)
283 285 waitforworkers()
284 286 signal.signal(signal.SIGCHLD, oldchldhandler)
285 287 selector.close()
286 288 return problem[0]
287 289
288 290 try:
289 291 openpipes = len(pipes)
290 292 while openpipes > 0:
291 293 for key, events in selector.select():
292 294 try:
293 295 res = util.pickle.load(_blockingreader(key.fileobj))
294 296 if hasretval and res[0]:
295 297 retval.update(res[1])
296 298 else:
297 299 yield res
298 300 except EOFError:
299 301 selector.unregister(key.fileobj)
300 302 key.fileobj.close()
301 303 openpipes -= 1
302 304 except IOError as e:
303 305 if e.errno == errno.EINTR:
304 306 continue
305 307 raise
306 308 except: # re-raises
307 309 killworkers()
308 310 cleanup()
309 311 raise
310 312 status = cleanup()
311 313 if status:
312 314 if status < 0:
313 315 os.kill(os.getpid(), -status)
314 316 raise error.WorkerError(status)
315 317 if hasretval:
316 318 yield True, retval
317 319
318 320
319 321 def _posixexitstatus(code):
320 322 """convert a posix exit status into the same form returned by
321 323 os.spawnv
322 324
323 325 returns None if the process was stopped instead of exiting"""
324 326 if os.WIFEXITED(code):
325 327 return os.WEXITSTATUS(code)
326 328 elif os.WIFSIGNALED(code):
327 329 return -(os.WTERMSIG(code))
328 330
329 331
330 332 def _windowsworker(ui, func, staticargs, args, hasretval):
331 333 class Worker(threading.Thread):
332 334 def __init__(
333 335 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
334 336 ):
335 337 threading.Thread.__init__(self, *args, **kwargs)
336 338 self._taskqueue = taskqueue
337 339 self._resultqueue = resultqueue
338 340 self._func = func
339 341 self._staticargs = staticargs
340 342 self._interrupted = False
341 343 self.daemon = True
342 344 self.exception = None
343 345
344 346 def interrupt(self):
345 347 self._interrupted = True
346 348
347 349 def run(self):
348 350 try:
349 351 while not self._taskqueue.empty():
350 352 try:
351 353 args = self._taskqueue.get_nowait()
352 354 for res in self._func(*self._staticargs + (args,)):
353 355 self._resultqueue.put(res)
354 356 # threading doesn't provide a native way to
355 357 # interrupt execution. handle it manually at every
356 358 # iteration.
357 359 if self._interrupted:
358 360 return
359 361 except pycompat.queue.Empty:
360 362 break
361 363 except Exception as e:
362 364 # store the exception such that the main thread can resurface
363 365 # it as if the func was running without workers.
364 366 self.exception = e
365 367 raise
366 368
367 369 threads = []
368 370
369 371 def trykillworkers():
370 372 # Allow up to 1 second to clean worker threads nicely
371 373 cleanupend = time.time() + 1
372 374 for t in threads:
373 375 t.interrupt()
374 376 for t in threads:
375 377 remainingtime = cleanupend - time.time()
376 378 t.join(remainingtime)
377 379 if t.is_alive():
378 380 # pass over the workers joining failure. it is more
379 381 # important to surface the inital exception than the
380 382 # fact that one of workers may be processing a large
381 383 # task and does not get to handle the interruption.
382 384 ui.warn(
383 385 _(
384 386 b"failed to kill worker threads while "
385 387 b"handling an exception\n"
386 388 )
387 389 )
388 390 return
389 391
390 392 workers = _numworkers(ui)
391 393 resultqueue = pycompat.queue.Queue()
392 394 taskqueue = pycompat.queue.Queue()
393 395 retval = {}
394 396 # partition work to more pieces than workers to minimize the chance
395 397 # of uneven distribution of large tasks between the workers
396 398 for pargs in partition(args, workers * 20):
397 399 taskqueue.put(pargs)
398 400 for _i in range(workers):
399 401 t = Worker(taskqueue, resultqueue, func, staticargs)
400 402 threads.append(t)
401 403 t.start()
402 404 try:
403 405 while len(threads) > 0:
404 406 while not resultqueue.empty():
405 407 res = resultqueue.get()
406 408 if hasretval and res[0]:
407 409 retval.update(res[1])
408 410 else:
409 411 yield res
410 412 threads[0].join(0.05)
411 413 finishedthreads = [_t for _t in threads if not _t.is_alive()]
412 414 for t in finishedthreads:
413 415 if t.exception is not None:
414 416 raise t.exception
415 417 threads.remove(t)
416 418 except (Exception, KeyboardInterrupt): # re-raises
417 419 trykillworkers()
418 420 raise
419 421 while not resultqueue.empty():
420 422 res = resultqueue.get()
421 423 if hasretval and res[0]:
422 424 retval.update(res[1])
423 425 else:
424 426 yield res
425 427 if hasretval:
426 428 yield True, retval
427 429
428 430
429 431 if pycompat.iswindows:
430 432 _platformworker = _windowsworker
431 433 else:
432 434 _platformworker = _posixworker
433 435 _exitstatus = _posixexitstatus
434 436
435 437
436 438 def partition(lst, nslices):
437 439 """partition a list into N slices of roughly equal size
438 440
439 441 The current strategy takes every Nth element from the input. If
440 442 we ever write workers that need to preserve grouping in input
441 443 we should consider allowing callers to specify a partition strategy.
442 444
443 445 mpm is not a fan of this partitioning strategy when files are involved.
444 446 In his words:
445 447
446 448 Single-threaded Mercurial makes a point of creating and visiting
447 449 files in a fixed order (alphabetical). When creating files in order,
448 450 a typical filesystem is likely to allocate them on nearby regions on
449 451 disk. Thus, when revisiting in the same order, locality is maximized
450 452 and various forms of OS and disk-level caching and read-ahead get a
451 453 chance to work.
452 454
453 455 This effect can be quite significant on spinning disks. I discovered it
454 456 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
455 457 Tarring a repo and copying it to another disk effectively randomized
456 458 the revlog ordering on disk by sorting the revlogs by hash and suddenly
457 459 performance of my kernel checkout benchmark dropped by ~10x because the
458 460 "working set" of sectors visited no longer fit in the drive's cache and
459 461 the workload switched from streaming to random I/O.
460 462
461 463 What we should really be doing is have workers read filenames from a
462 464 ordered queue. This preserves locality and also keeps any worker from
463 465 getting more than one file out of balance.
464 466 """
465 467 for i in range(nslices):
466 468 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now