##// END OF EJS Templates
worker: silence type error when calling pickle...
Gregory Szorc -
r49767:a0674e91 default
parent child Browse files
Show More
@@ -1,455 +1,459 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 )
31 31
32 32
33 33 def countcpus():
34 34 '''try to count the number of CPUs on the system'''
35 35
36 36 # posix
37 37 try:
38 38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 39 if n > 0:
40 40 return n
41 41 except (AttributeError, ValueError):
42 42 pass
43 43
44 44 # windows
45 45 try:
46 46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 47 if n > 0:
48 48 return n
49 49 except (KeyError, ValueError):
50 50 pass
51 51
52 52 return 1
53 53
54 54
55 55 def _numworkers(ui):
56 56 s = ui.config(b'worker', b'numcpus')
57 57 if s:
58 58 try:
59 59 n = int(s)
60 60 if n >= 1:
61 61 return n
62 62 except ValueError:
63 63 raise error.Abort(_(b'number of cpus must be an integer'))
64 64 return min(max(countcpus(), 4), 32)
65 65
66 66
67 67 def ismainthread():
68 68 return threading.current_thread() == threading.main_thread()
69 69
70 70
71 71 class _blockingreader(object):
72 72 def __init__(self, wrapped):
73 73 self._wrapped = wrapped
74 74
75 75 # Do NOT implement readinto() by making it delegate to
76 76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 77 # with just read() and readline(), so we don't need to implement it.
78 78
79 79 def readline(self):
80 80 return self._wrapped.readline()
81 81
82 82 # issue multiple reads until size is fulfilled
83 83 def read(self, size=-1):
84 84 if size < 0:
85 85 return self._wrapped.readall()
86 86
87 87 buf = bytearray(size)
88 88 view = memoryview(buf)
89 89 pos = 0
90 90
91 91 while pos < size:
92 92 ret = self._wrapped.readinto(view[pos:])
93 93 if not ret:
94 94 break
95 95 pos += ret
96 96
97 97 del view
98 98 del buf[pos:]
99 99 return bytes(buf)
100 100
101 101
102 102 if pycompat.isposix or pycompat.iswindows:
103 103 _STARTUP_COST = 0.01
104 104 # The Windows worker is thread based. If tasks are CPU bound, threads
105 105 # in the presence of the GIL result in excessive context switching and
106 106 # this overhead can slow down execution.
107 107 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
108 108 else:
109 109 _STARTUP_COST = 1e30
110 110 _DISALLOW_THREAD_UNSAFE = False
111 111
112 112
113 113 def worthwhile(ui, costperop, nops, threadsafe=True):
114 114 """try to determine whether the benefit of multiple processes can
115 115 outweigh the cost of starting them"""
116 116
117 117 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
118 118 return False
119 119
120 120 linear = costperop * nops
121 121 workers = _numworkers(ui)
122 122 benefit = linear - (_STARTUP_COST * workers + linear / workers)
123 123 return benefit >= 0.15
124 124
125 125
126 126 def worker(
127 127 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 128 ):
129 129 """run a function, possibly in parallel in multiple worker
130 130 processes.
131 131
132 132 returns a progress iterator
133 133
134 134 costperarg - cost of a single task
135 135
136 136 func - function to run. It is expected to return a progress iterator.
137 137
138 138 staticargs - arguments to pass to every invocation of the function
139 139
140 140 args - arguments to split into chunks, to pass to individual
141 141 workers
142 142
143 143 hasretval - when True, func and the current function return an progress
144 144 iterator then a dict (encoded as an iterator that yield many (False, ..)
145 145 then a (True, dict)). The dicts are joined in some arbitrary order, so
146 146 overlapping keys are a bad idea.
147 147
148 148 threadsafe - whether work items are thread safe and can be executed using
149 149 a thread-based worker. Should be disabled for CPU heavy tasks that don't
150 150 release the GIL.
151 151 """
152 152 enabled = ui.configbool(b'worker', b'enabled')
153 153 if enabled and _platformworker is _posixworker and not ismainthread():
154 154 # The POSIX worker has to install a handler for SIGCHLD.
155 155 # Python up to 3.9 only allows this in the main thread.
156 156 enabled = False
157 157
158 158 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
159 159 return _platformworker(ui, func, staticargs, args, hasretval)
160 160 return func(*staticargs + (args,))
161 161
162 162
163 163 def _posixworker(ui, func, staticargs, args, hasretval):
164 164 workers = _numworkers(ui)
165 165 oldhandler = signal.getsignal(signal.SIGINT)
166 166 signal.signal(signal.SIGINT, signal.SIG_IGN)
167 167 pids, problem = set(), [0]
168 168
169 169 def killworkers():
170 170 # unregister SIGCHLD handler as all children will be killed. This
171 171 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
172 172 # could be updated while iterating, which would cause inconsistency.
173 173 signal.signal(signal.SIGCHLD, oldchldhandler)
174 174 # if one worker bails, there's no good reason to wait for the rest
175 175 for p in pids:
176 176 try:
177 177 os.kill(p, signal.SIGTERM)
178 178 except OSError as err:
179 179 if err.errno != errno.ESRCH:
180 180 raise
181 181
182 182 def waitforworkers(blocking=True):
183 183 for pid in pids.copy():
184 184 p = st = 0
185 185 while True:
186 186 try:
187 187 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
188 188 break
189 189 except OSError as e:
190 190 if e.errno == errno.EINTR:
191 191 continue
192 192 elif e.errno == errno.ECHILD:
193 193 # child would already be reaped, but pids yet been
194 194 # updated (maybe interrupted just after waitpid)
195 195 pids.discard(pid)
196 196 break
197 197 else:
198 198 raise
199 199 if not p:
200 200 # skip subsequent steps, because child process should
201 201 # be still running in this case
202 202 continue
203 203 pids.discard(p)
204 204 st = _exitstatus(st)
205 205 if st and not problem[0]:
206 206 problem[0] = st
207 207
208 208 def sigchldhandler(signum, frame):
209 209 waitforworkers(blocking=False)
210 210 if problem[0]:
211 211 killworkers()
212 212
213 213 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
214 214 ui.flush()
215 215 parentpid = os.getpid()
216 216 pipes = []
217 217 retval = {}
218 218 for pargs in partition(args, min(workers, len(args))):
219 219 # Every worker gets its own pipe to send results on, so we don't have to
220 220 # implement atomic writes larger than PIPE_BUF. Each forked process has
221 221 # its own pipe's descriptors in the local variables, and the parent
222 222 # process has the full list of pipe descriptors (and it doesn't really
223 223 # care what order they're in).
224 224 rfd, wfd = os.pipe()
225 225 pipes.append((rfd, wfd))
226 226 # make sure we use os._exit in all worker code paths. otherwise the
227 227 # worker may do some clean-ups which could cause surprises like
228 228 # deadlock. see sshpeer.cleanup for example.
229 229 # override error handling *before* fork. this is necessary because
230 230 # exception (signal) may arrive after fork, before "pid =" assignment
231 231 # completes, and other exception handler (dispatch.py) can lead to
232 232 # unexpected code path without os._exit.
233 233 ret = -1
234 234 try:
235 235 pid = os.fork()
236 236 if pid == 0:
237 237 signal.signal(signal.SIGINT, oldhandler)
238 238 signal.signal(signal.SIGCHLD, oldchldhandler)
239 239
240 240 def workerfunc():
241 241 for r, w in pipes[:-1]:
242 242 os.close(r)
243 243 os.close(w)
244 244 os.close(rfd)
245 245 for result in func(*(staticargs + (pargs,))):
246 246 os.write(wfd, pickle.dumps(result))
247 247 return 0
248 248
249 249 ret = scmutil.callcatch(ui, workerfunc)
250 250 except: # parent re-raises, child never returns
251 251 if os.getpid() == parentpid:
252 252 raise
253 253 exctype = sys.exc_info()[0]
254 254 force = not issubclass(exctype, KeyboardInterrupt)
255 255 ui.traceback(force=force)
256 256 finally:
257 257 if os.getpid() != parentpid:
258 258 try:
259 259 ui.flush()
260 260 except: # never returns, no re-raises
261 261 pass
262 262 finally:
263 263 os._exit(ret & 255)
264 264 pids.add(pid)
265 265 selector = selectors.DefaultSelector()
266 266 for rfd, wfd in pipes:
267 267 os.close(wfd)
268 268 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
269 269
270 270 def cleanup():
271 271 signal.signal(signal.SIGINT, oldhandler)
272 272 waitforworkers()
273 273 signal.signal(signal.SIGCHLD, oldchldhandler)
274 274 selector.close()
275 275 return problem[0]
276 276
277 277 try:
278 278 openpipes = len(pipes)
279 279 while openpipes > 0:
280 280 for key, events in selector.select():
281 281 try:
282 # The pytype error likely goes away on a modern version of
283 # pytype having a modern typeshed snapshot.
284 # pytype: disable=wrong-arg-types
282 285 res = pickle.load(_blockingreader(key.fileobj))
286 # pytype: enable=wrong-arg-types
283 287 if hasretval and res[0]:
284 288 retval.update(res[1])
285 289 else:
286 290 yield res
287 291 except EOFError:
288 292 selector.unregister(key.fileobj)
289 293 key.fileobj.close()
290 294 openpipes -= 1
291 295 except IOError as e:
292 296 if e.errno == errno.EINTR:
293 297 continue
294 298 raise
295 299 except: # re-raises
296 300 killworkers()
297 301 cleanup()
298 302 raise
299 303 status = cleanup()
300 304 if status:
301 305 if status < 0:
302 306 os.kill(os.getpid(), -status)
303 307 raise error.WorkerError(status)
304 308 if hasretval:
305 309 yield True, retval
306 310
307 311
308 312 def _posixexitstatus(code):
309 313 """convert a posix exit status into the same form returned by
310 314 os.spawnv
311 315
312 316 returns None if the process was stopped instead of exiting"""
313 317 if os.WIFEXITED(code):
314 318 return os.WEXITSTATUS(code)
315 319 elif os.WIFSIGNALED(code):
316 320 return -(os.WTERMSIG(code))
317 321
318 322
319 323 def _windowsworker(ui, func, staticargs, args, hasretval):
320 324 class Worker(threading.Thread):
321 325 def __init__(
322 326 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
323 327 ):
324 328 threading.Thread.__init__(self, *args, **kwargs)
325 329 self._taskqueue = taskqueue
326 330 self._resultqueue = resultqueue
327 331 self._func = func
328 332 self._staticargs = staticargs
329 333 self._interrupted = False
330 334 self.daemon = True
331 335 self.exception = None
332 336
333 337 def interrupt(self):
334 338 self._interrupted = True
335 339
336 340 def run(self):
337 341 try:
338 342 while not self._taskqueue.empty():
339 343 try:
340 344 args = self._taskqueue.get_nowait()
341 345 for res in self._func(*self._staticargs + (args,)):
342 346 self._resultqueue.put(res)
343 347 # threading doesn't provide a native way to
344 348 # interrupt execution. handle it manually at every
345 349 # iteration.
346 350 if self._interrupted:
347 351 return
348 352 except pycompat.queue.Empty:
349 353 break
350 354 except Exception as e:
351 355 # store the exception such that the main thread can resurface
352 356 # it as if the func was running without workers.
353 357 self.exception = e
354 358 raise
355 359
356 360 threads = []
357 361
358 362 def trykillworkers():
359 363 # Allow up to 1 second to clean worker threads nicely
360 364 cleanupend = time.time() + 1
361 365 for t in threads:
362 366 t.interrupt()
363 367 for t in threads:
364 368 remainingtime = cleanupend - time.time()
365 369 t.join(remainingtime)
366 370 if t.is_alive():
367 371 # pass over the workers joining failure. it is more
368 372 # important to surface the inital exception than the
369 373 # fact that one of workers may be processing a large
370 374 # task and does not get to handle the interruption.
371 375 ui.warn(
372 376 _(
373 377 b"failed to kill worker threads while "
374 378 b"handling an exception\n"
375 379 )
376 380 )
377 381 return
378 382
379 383 workers = _numworkers(ui)
380 384 resultqueue = pycompat.queue.Queue()
381 385 taskqueue = pycompat.queue.Queue()
382 386 retval = {}
383 387 # partition work to more pieces than workers to minimize the chance
384 388 # of uneven distribution of large tasks between the workers
385 389 for pargs in partition(args, workers * 20):
386 390 taskqueue.put(pargs)
387 391 for _i in range(workers):
388 392 t = Worker(taskqueue, resultqueue, func, staticargs)
389 393 threads.append(t)
390 394 t.start()
391 395 try:
392 396 while len(threads) > 0:
393 397 while not resultqueue.empty():
394 398 res = resultqueue.get()
395 399 if hasretval and res[0]:
396 400 retval.update(res[1])
397 401 else:
398 402 yield res
399 403 threads[0].join(0.05)
400 404 finishedthreads = [_t for _t in threads if not _t.is_alive()]
401 405 for t in finishedthreads:
402 406 if t.exception is not None:
403 407 raise t.exception
404 408 threads.remove(t)
405 409 except (Exception, KeyboardInterrupt): # re-raises
406 410 trykillworkers()
407 411 raise
408 412 while not resultqueue.empty():
409 413 res = resultqueue.get()
410 414 if hasretval and res[0]:
411 415 retval.update(res[1])
412 416 else:
413 417 yield res
414 418 if hasretval:
415 419 yield True, retval
416 420
417 421
418 422 if pycompat.iswindows:
419 423 _platformworker = _windowsworker
420 424 else:
421 425 _platformworker = _posixworker
422 426 _exitstatus = _posixexitstatus
423 427
424 428
425 429 def partition(lst, nslices):
426 430 """partition a list into N slices of roughly equal size
427 431
428 432 The current strategy takes every Nth element from the input. If
429 433 we ever write workers that need to preserve grouping in input
430 434 we should consider allowing callers to specify a partition strategy.
431 435
432 436 olivia is not a fan of this partitioning strategy when files are involved.
433 437 In his words:
434 438
435 439 Single-threaded Mercurial makes a point of creating and visiting
436 440 files in a fixed order (alphabetical). When creating files in order,
437 441 a typical filesystem is likely to allocate them on nearby regions on
438 442 disk. Thus, when revisiting in the same order, locality is maximized
439 443 and various forms of OS and disk-level caching and read-ahead get a
440 444 chance to work.
441 445
442 446 This effect can be quite significant on spinning disks. I discovered it
443 447 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
444 448 Tarring a repo and copying it to another disk effectively randomized
445 449 the revlog ordering on disk by sorting the revlogs by hash and suddenly
446 450 performance of my kernel checkout benchmark dropped by ~10x because the
447 451 "working set" of sectors visited no longer fit in the drive's cache and
448 452 the workload switched from streaming to random I/O.
449 453
450 454 What we should really be doing is have workers read filenames from a
451 455 ordered queue. This preserves locality and also keeps any worker from
452 456 getting more than one file out of balance.
453 457 """
454 458 for i in range(nslices):
455 459 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now