##// END OF EJS Templates
worker: avoid potential partial write of pickled data...
Manuel Jacob -
r50164:395f2806 default
parent child Browse files
Show More
@@ -1,471 +1,473 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 )
31 31
32 32
33 33 def countcpus():
34 34 '''try to count the number of CPUs on the system'''
35 35
36 36 # posix
37 37 try:
38 38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 39 if n > 0:
40 40 return n
41 41 except (AttributeError, ValueError):
42 42 pass
43 43
44 44 # windows
45 45 try:
46 46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 47 if n > 0:
48 48 return n
49 49 except (KeyError, ValueError):
50 50 pass
51 51
52 52 return 1
53 53
54 54
55 55 def _numworkers(ui):
56 56 s = ui.config(b'worker', b'numcpus')
57 57 if s:
58 58 try:
59 59 n = int(s)
60 60 if n >= 1:
61 61 return n
62 62 except ValueError:
63 63 raise error.Abort(_(b'number of cpus must be an integer'))
64 64 return min(max(countcpus(), 4), 32)
65 65
66 66
67 67 def ismainthread():
68 68 return threading.current_thread() == threading.main_thread()
69 69
70 70
71 71 class _blockingreader:
72 72 """Wrap unbuffered stream such that pickle.load() works with it.
73 73
74 74 pickle.load() expects that calls to read() and readinto() read as many
75 75 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
76 76 pickle.load() raises an EOFError.
77 77 """
78 78
79 79 def __init__(self, wrapped):
80 80 self._wrapped = wrapped
81 81
82 82 def readline(self):
83 83 return self._wrapped.readline()
84 84
85 85 def readinto(self, buf):
86 86 pos = 0
87 87 size = len(buf)
88 88
89 89 with memoryview(buf) as view:
90 90 while pos < size:
91 91 with view[pos:] as subview:
92 92 ret = self._wrapped.readinto(subview)
93 93 if not ret:
94 94 break
95 95 pos += ret
96 96
97 97 return pos
98 98
99 99 # issue multiple reads until size is fulfilled (or EOF is encountered)
100 100 def read(self, size=-1):
101 101 if size < 0:
102 102 return self._wrapped.readall()
103 103
104 104 buf = bytearray(size)
105 105 n_read = self.readinto(buf)
106 106 del buf[n_read:]
107 107 return bytes(buf)
108 108
109 109
110 110 if pycompat.isposix or pycompat.iswindows:
111 111 _STARTUP_COST = 0.01
112 112 # The Windows worker is thread based. If tasks are CPU bound, threads
113 113 # in the presence of the GIL result in excessive context switching and
114 114 # this overhead can slow down execution.
115 115 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
116 116 else:
117 117 _STARTUP_COST = 1e30
118 118 _DISALLOW_THREAD_UNSAFE = False
119 119
120 120
121 121 def worthwhile(ui, costperop, nops, threadsafe=True):
122 122 """try to determine whether the benefit of multiple processes can
123 123 outweigh the cost of starting them"""
124 124
125 125 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
126 126 return False
127 127
128 128 linear = costperop * nops
129 129 workers = _numworkers(ui)
130 130 benefit = linear - (_STARTUP_COST * workers + linear / workers)
131 131 return benefit >= 0.15
132 132
133 133
134 134 def worker(
135 135 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
136 136 ):
137 137 """run a function, possibly in parallel in multiple worker
138 138 processes.
139 139
140 140 returns a progress iterator
141 141
142 142 costperarg - cost of a single task
143 143
144 144 func - function to run. It is expected to return a progress iterator.
145 145
146 146 staticargs - arguments to pass to every invocation of the function
147 147
148 148 args - arguments to split into chunks, to pass to individual
149 149 workers
150 150
151 151 hasretval - when True, func and the current function return an progress
152 152 iterator then a dict (encoded as an iterator that yield many (False, ..)
153 153 then a (True, dict)). The dicts are joined in some arbitrary order, so
154 154 overlapping keys are a bad idea.
155 155
156 156 threadsafe - whether work items are thread safe and can be executed using
157 157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
158 158 release the GIL.
159 159 """
160 160 enabled = ui.configbool(b'worker', b'enabled')
161 161 if enabled and _platformworker is _posixworker and not ismainthread():
162 162 # The POSIX worker has to install a handler for SIGCHLD.
163 163 # Python up to 3.9 only allows this in the main thread.
164 164 enabled = False
165 165
166 166 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
167 167 return _platformworker(ui, func, staticargs, args, hasretval)
168 168 return func(*staticargs + (args,))
169 169
170 170
171 171 def _posixworker(ui, func, staticargs, args, hasretval):
172 172 workers = _numworkers(ui)
173 173 oldhandler = signal.getsignal(signal.SIGINT)
174 174 signal.signal(signal.SIGINT, signal.SIG_IGN)
175 175 pids, problem = set(), [0]
176 176
177 177 def killworkers():
178 178 # unregister SIGCHLD handler as all children will be killed. This
179 179 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
180 180 # could be updated while iterating, which would cause inconsistency.
181 181 signal.signal(signal.SIGCHLD, oldchldhandler)
182 182 # if one worker bails, there's no good reason to wait for the rest
183 183 for p in pids:
184 184 try:
185 185 os.kill(p, signal.SIGTERM)
186 186 except OSError as err:
187 187 if err.errno != errno.ESRCH:
188 188 raise
189 189
190 190 def waitforworkers(blocking=True):
191 191 for pid in pids.copy():
192 192 p = st = 0
193 193 while True:
194 194 try:
195 195 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
196 196 break
197 197 except OSError as e:
198 198 if e.errno == errno.EINTR:
199 199 continue
200 200 elif e.errno == errno.ECHILD:
201 201 # child would already be reaped, but pids yet been
202 202 # updated (maybe interrupted just after waitpid)
203 203 pids.discard(pid)
204 204 break
205 205 else:
206 206 raise
207 207 if not p:
208 208 # skip subsequent steps, because child process should
209 209 # be still running in this case
210 210 continue
211 211 pids.discard(p)
212 212 st = _exitstatus(st)
213 213 if st and not problem[0]:
214 214 problem[0] = st
215 215
216 216 def sigchldhandler(signum, frame):
217 217 waitforworkers(blocking=False)
218 218 if problem[0]:
219 219 killworkers()
220 220
221 221 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
222 222 ui.flush()
223 223 parentpid = os.getpid()
224 224 pipes = []
225 225 retval = {}
226 226 for pargs in partition(args, min(workers, len(args))):
227 227 # Every worker gets its own pipe to send results on, so we don't have to
228 228 # implement atomic writes larger than PIPE_BUF. Each forked process has
229 229 # its own pipe's descriptors in the local variables, and the parent
230 230 # process has the full list of pipe descriptors (and it doesn't really
231 231 # care what order they're in).
232 232 rfd, wfd = os.pipe()
233 233 pipes.append((rfd, wfd))
234 234 # make sure we use os._exit in all worker code paths. otherwise the
235 235 # worker may do some clean-ups which could cause surprises like
236 236 # deadlock. see sshpeer.cleanup for example.
237 237 # override error handling *before* fork. this is necessary because
238 238 # exception (signal) may arrive after fork, before "pid =" assignment
239 239 # completes, and other exception handler (dispatch.py) can lead to
240 240 # unexpected code path without os._exit.
241 241 ret = -1
242 242 try:
243 243 pid = os.fork()
244 244 if pid == 0:
245 245 signal.signal(signal.SIGINT, oldhandler)
246 246 signal.signal(signal.SIGCHLD, oldchldhandler)
247 247
248 248 def workerfunc():
249 249 for r, w in pipes[:-1]:
250 250 os.close(r)
251 251 os.close(w)
252 252 os.close(rfd)
253 for result in func(*(staticargs + (pargs,))):
254 os.write(wfd, pickle.dumps(result))
253 with os.fdopen(wfd, 'wb') as wf:
254 for result in func(*(staticargs + (pargs,))):
255 pickle.dump(result, wf)
256 wf.flush()
255 257 return 0
256 258
257 259 ret = scmutil.callcatch(ui, workerfunc)
258 260 except: # parent re-raises, child never returns
259 261 if os.getpid() == parentpid:
260 262 raise
261 263 exctype = sys.exc_info()[0]
262 264 force = not issubclass(exctype, KeyboardInterrupt)
263 265 ui.traceback(force=force)
264 266 finally:
265 267 if os.getpid() != parentpid:
266 268 try:
267 269 ui.flush()
268 270 except: # never returns, no re-raises
269 271 pass
270 272 finally:
271 273 os._exit(ret & 255)
272 274 pids.add(pid)
273 275 selector = selectors.DefaultSelector()
274 276 for rfd, wfd in pipes:
275 277 os.close(wfd)
276 278 # The stream has to be unbuffered. Otherwise, if all data is read from
277 279 # the raw file into the buffer, the selector thinks that the FD is not
278 280 # ready to read while pickle.load() could read from the buffer. This
279 281 # would delay the processing of readable items.
280 282 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
281 283
282 284 def cleanup():
283 285 signal.signal(signal.SIGINT, oldhandler)
284 286 waitforworkers()
285 287 signal.signal(signal.SIGCHLD, oldchldhandler)
286 288 selector.close()
287 289 return problem[0]
288 290
289 291 try:
290 292 openpipes = len(pipes)
291 293 while openpipes > 0:
292 294 for key, events in selector.select():
293 295 try:
294 296 # The pytype error likely goes away on a modern version of
295 297 # pytype having a modern typeshed snapshot.
296 298 # pytype: disable=wrong-arg-types
297 299 res = pickle.load(_blockingreader(key.fileobj))
298 300 # pytype: enable=wrong-arg-types
299 301 if hasretval and res[0]:
300 302 retval.update(res[1])
301 303 else:
302 304 yield res
303 305 except EOFError:
304 306 selector.unregister(key.fileobj)
305 307 key.fileobj.close()
306 308 openpipes -= 1
307 309 except IOError as e:
308 310 if e.errno == errno.EINTR:
309 311 continue
310 312 raise
311 313 except: # re-raises
312 314 killworkers()
313 315 cleanup()
314 316 raise
315 317 status = cleanup()
316 318 if status:
317 319 if status < 0:
318 320 os.kill(os.getpid(), -status)
319 321 raise error.WorkerError(status)
320 322 if hasretval:
321 323 yield True, retval
322 324
323 325
324 326 def _posixexitstatus(code):
325 327 """convert a posix exit status into the same form returned by
326 328 os.spawnv
327 329
328 330 returns None if the process was stopped instead of exiting"""
329 331 if os.WIFEXITED(code):
330 332 return os.WEXITSTATUS(code)
331 333 elif os.WIFSIGNALED(code):
332 334 return -(os.WTERMSIG(code))
333 335
334 336
335 337 def _windowsworker(ui, func, staticargs, args, hasretval):
336 338 class Worker(threading.Thread):
337 339 def __init__(
338 340 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
339 341 ):
340 342 threading.Thread.__init__(self, *args, **kwargs)
341 343 self._taskqueue = taskqueue
342 344 self._resultqueue = resultqueue
343 345 self._func = func
344 346 self._staticargs = staticargs
345 347 self._interrupted = False
346 348 self.daemon = True
347 349 self.exception = None
348 350
349 351 def interrupt(self):
350 352 self._interrupted = True
351 353
352 354 def run(self):
353 355 try:
354 356 while not self._taskqueue.empty():
355 357 try:
356 358 args = self._taskqueue.get_nowait()
357 359 for res in self._func(*self._staticargs + (args,)):
358 360 self._resultqueue.put(res)
359 361 # threading doesn't provide a native way to
360 362 # interrupt execution. handle it manually at every
361 363 # iteration.
362 364 if self._interrupted:
363 365 return
364 366 except pycompat.queue.Empty:
365 367 break
366 368 except Exception as e:
367 369 # store the exception such that the main thread can resurface
368 370 # it as if the func was running without workers.
369 371 self.exception = e
370 372 raise
371 373
372 374 threads = []
373 375
374 376 def trykillworkers():
375 377 # Allow up to 1 second to clean worker threads nicely
376 378 cleanupend = time.time() + 1
377 379 for t in threads:
378 380 t.interrupt()
379 381 for t in threads:
380 382 remainingtime = cleanupend - time.time()
381 383 t.join(remainingtime)
382 384 if t.is_alive():
383 385 # pass over the workers joining failure. it is more
384 386 # important to surface the inital exception than the
385 387 # fact that one of workers may be processing a large
386 388 # task and does not get to handle the interruption.
387 389 ui.warn(
388 390 _(
389 391 b"failed to kill worker threads while "
390 392 b"handling an exception\n"
391 393 )
392 394 )
393 395 return
394 396
395 397 workers = _numworkers(ui)
396 398 resultqueue = pycompat.queue.Queue()
397 399 taskqueue = pycompat.queue.Queue()
398 400 retval = {}
399 401 # partition work to more pieces than workers to minimize the chance
400 402 # of uneven distribution of large tasks between the workers
401 403 for pargs in partition(args, workers * 20):
402 404 taskqueue.put(pargs)
403 405 for _i in range(workers):
404 406 t = Worker(taskqueue, resultqueue, func, staticargs)
405 407 threads.append(t)
406 408 t.start()
407 409 try:
408 410 while len(threads) > 0:
409 411 while not resultqueue.empty():
410 412 res = resultqueue.get()
411 413 if hasretval and res[0]:
412 414 retval.update(res[1])
413 415 else:
414 416 yield res
415 417 threads[0].join(0.05)
416 418 finishedthreads = [_t for _t in threads if not _t.is_alive()]
417 419 for t in finishedthreads:
418 420 if t.exception is not None:
419 421 raise t.exception
420 422 threads.remove(t)
421 423 except (Exception, KeyboardInterrupt): # re-raises
422 424 trykillworkers()
423 425 raise
424 426 while not resultqueue.empty():
425 427 res = resultqueue.get()
426 428 if hasretval and res[0]:
427 429 retval.update(res[1])
428 430 else:
429 431 yield res
430 432 if hasretval:
431 433 yield True, retval
432 434
433 435
434 436 if pycompat.iswindows:
435 437 _platformworker = _windowsworker
436 438 else:
437 439 _platformworker = _posixworker
438 440 _exitstatus = _posixexitstatus
439 441
440 442
441 443 def partition(lst, nslices):
442 444 """partition a list into N slices of roughly equal size
443 445
444 446 The current strategy takes every Nth element from the input. If
445 447 we ever write workers that need to preserve grouping in input
446 448 we should consider allowing callers to specify a partition strategy.
447 449
448 450 olivia is not a fan of this partitioning strategy when files are involved.
449 451 In his words:
450 452
451 453 Single-threaded Mercurial makes a point of creating and visiting
452 454 files in a fixed order (alphabetical). When creating files in order,
453 455 a typical filesystem is likely to allocate them on nearby regions on
454 456 disk. Thus, when revisiting in the same order, locality is maximized
455 457 and various forms of OS and disk-level caching and read-ahead get a
456 458 chance to work.
457 459
458 460 This effect can be quite significant on spinning disks. I discovered it
459 461 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
460 462 Tarring a repo and copying it to another disk effectively randomized
461 463 the revlog ordering on disk by sorting the revlogs by hash and suddenly
462 464 performance of my kernel checkout benchmark dropped by ~10x because the
463 465 "working set" of sectors visited no longer fit in the drive's cache and
464 466 the workload switched from streaming to random I/O.
465 467
466 468 What we should really be doing is have workers read filenames from a
467 469 ordered queue. This preserves locality and also keeps any worker from
468 470 getting more than one file out of balance.
469 471 """
470 472 for i in range(nslices):
471 473 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now