##// END OF EJS Templates
worker: manually buffer reads from pickle stream...
Jan Alexander Steffens (heftig) -
r44751:12491abf stable
parent child Browse files
Show More
@@ -1,416 +1,451 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 util,
31 31 )
32 32
33 33
34 34 def countcpus():
35 35 '''try to count the number of CPUs on the system'''
36 36
37 37 # posix
38 38 try:
39 39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 40 if n > 0:
41 41 return n
42 42 except (AttributeError, ValueError):
43 43 pass
44 44
45 45 # windows
46 46 try:
47 47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 48 if n > 0:
49 49 return n
50 50 except (KeyError, ValueError):
51 51 pass
52 52
53 53 return 1
54 54
55 55
56 56 def _numworkers(ui):
57 57 s = ui.config(b'worker', b'numcpus')
58 58 if s:
59 59 try:
60 60 n = int(s)
61 61 if n >= 1:
62 62 return n
63 63 except ValueError:
64 64 raise error.Abort(_(b'number of cpus must be an integer'))
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 if pycompat.ispy3:
69
70 class _blockingreader(object):
71 def __init__(self, wrapped):
72 self._wrapped = wrapped
73
74 def __getattr__(self, attr):
75 return getattr(self._wrapped, attr)
76
77 # issue multiple reads until size is fulfilled
78 def read(self, size=-1):
79 if size < 0:
80 return self._wrapped.readall()
81
82 buf = bytearray(size)
83 view = memoryview(buf)
84 pos = 0
85
86 while pos < size:
87 ret = self._wrapped.readinto(view[pos:])
88 if not ret:
89 break
90 pos += ret
91
92 del view
93 del buf[pos:]
94 return buf
95
96
97 else:
98
99 def _blockingreader(wrapped):
100 return wrapped
101
102
68 103 if pycompat.isposix or pycompat.iswindows:
69 104 _STARTUP_COST = 0.01
70 105 # The Windows worker is thread based. If tasks are CPU bound, threads
71 106 # in the presence of the GIL result in excessive context switching and
72 107 # this overhead can slow down execution.
73 108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
74 109 else:
75 110 _STARTUP_COST = 1e30
76 111 _DISALLOW_THREAD_UNSAFE = False
77 112
78 113
79 114 def worthwhile(ui, costperop, nops, threadsafe=True):
80 115 '''try to determine whether the benefit of multiple processes can
81 116 outweigh the cost of starting them'''
82 117
83 118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
84 119 return False
85 120
86 121 linear = costperop * nops
87 122 workers = _numworkers(ui)
88 123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
89 124 return benefit >= 0.15
90 125
91 126
92 127 def worker(
93 128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
94 129 ):
95 130 '''run a function, possibly in parallel in multiple worker
96 131 processes.
97 132
98 133 returns a progress iterator
99 134
100 135 costperarg - cost of a single task
101 136
102 137 func - function to run. It is expected to return a progress iterator.
103 138
104 139 staticargs - arguments to pass to every invocation of the function
105 140
106 141 args - arguments to split into chunks, to pass to individual
107 142 workers
108 143
109 144 hasretval - when True, func and the current function return an progress
110 145 iterator then a dict (encoded as an iterator that yield many (False, ..)
111 146 then a (True, dict)). The dicts are joined in some arbitrary order, so
112 147 overlapping keys are a bad idea.
113 148
114 149 threadsafe - whether work items are thread safe and can be executed using
115 150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
116 151 release the GIL.
117 152 '''
118 153 enabled = ui.configbool(b'worker', b'enabled')
119 154 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
120 155 return _platformworker(ui, func, staticargs, args, hasretval)
121 156 return func(*staticargs + (args,))
122 157
123 158
124 159 def _posixworker(ui, func, staticargs, args, hasretval):
125 160 workers = _numworkers(ui)
126 161 oldhandler = signal.getsignal(signal.SIGINT)
127 162 signal.signal(signal.SIGINT, signal.SIG_IGN)
128 163 pids, problem = set(), [0]
129 164
130 165 def killworkers():
131 166 # unregister SIGCHLD handler as all children will be killed. This
132 167 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
133 168 # could be updated while iterating, which would cause inconsistency.
134 169 signal.signal(signal.SIGCHLD, oldchldhandler)
135 170 # if one worker bails, there's no good reason to wait for the rest
136 171 for p in pids:
137 172 try:
138 173 os.kill(p, signal.SIGTERM)
139 174 except OSError as err:
140 175 if err.errno != errno.ESRCH:
141 176 raise
142 177
143 178 def waitforworkers(blocking=True):
144 179 for pid in pids.copy():
145 180 p = st = 0
146 181 while True:
147 182 try:
148 183 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
149 184 break
150 185 except OSError as e:
151 186 if e.errno == errno.EINTR:
152 187 continue
153 188 elif e.errno == errno.ECHILD:
154 189 # child would already be reaped, but pids yet been
155 190 # updated (maybe interrupted just after waitpid)
156 191 pids.discard(pid)
157 192 break
158 193 else:
159 194 raise
160 195 if not p:
161 196 # skip subsequent steps, because child process should
162 197 # be still running in this case
163 198 continue
164 199 pids.discard(p)
165 200 st = _exitstatus(st)
166 201 if st and not problem[0]:
167 202 problem[0] = st
168 203
169 204 def sigchldhandler(signum, frame):
170 205 waitforworkers(blocking=False)
171 206 if problem[0]:
172 207 killworkers()
173 208
174 209 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
175 210 ui.flush()
176 211 parentpid = os.getpid()
177 212 pipes = []
178 213 retval = {}
179 214 for pargs in partition(args, workers):
180 215 # Every worker gets its own pipe to send results on, so we don't have to
181 216 # implement atomic writes larger than PIPE_BUF. Each forked process has
182 217 # its own pipe's descriptors in the local variables, and the parent
183 218 # process has the full list of pipe descriptors (and it doesn't really
184 219 # care what order they're in).
185 220 rfd, wfd = os.pipe()
186 221 pipes.append((rfd, wfd))
187 222 # make sure we use os._exit in all worker code paths. otherwise the
188 223 # worker may do some clean-ups which could cause surprises like
189 224 # deadlock. see sshpeer.cleanup for example.
190 225 # override error handling *before* fork. this is necessary because
191 226 # exception (signal) may arrive after fork, before "pid =" assignment
192 227 # completes, and other exception handler (dispatch.py) can lead to
193 228 # unexpected code path without os._exit.
194 229 ret = -1
195 230 try:
196 231 pid = os.fork()
197 232 if pid == 0:
198 233 signal.signal(signal.SIGINT, oldhandler)
199 234 signal.signal(signal.SIGCHLD, oldchldhandler)
200 235
201 236 def workerfunc():
202 237 for r, w in pipes[:-1]:
203 238 os.close(r)
204 239 os.close(w)
205 240 os.close(rfd)
206 241 for result in func(*(staticargs + (pargs,))):
207 242 os.write(wfd, util.pickle.dumps(result))
208 243 return 0
209 244
210 245 ret = scmutil.callcatch(ui, workerfunc)
211 246 except: # parent re-raises, child never returns
212 247 if os.getpid() == parentpid:
213 248 raise
214 249 exctype = sys.exc_info()[0]
215 250 force = not issubclass(exctype, KeyboardInterrupt)
216 251 ui.traceback(force=force)
217 252 finally:
218 253 if os.getpid() != parentpid:
219 254 try:
220 255 ui.flush()
221 256 except: # never returns, no re-raises
222 257 pass
223 258 finally:
224 259 os._exit(ret & 255)
225 260 pids.add(pid)
226 261 selector = selectors.DefaultSelector()
227 262 for rfd, wfd in pipes:
228 263 os.close(wfd)
229 selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
264 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
230 265
231 266 def cleanup():
232 267 signal.signal(signal.SIGINT, oldhandler)
233 268 waitforworkers()
234 269 signal.signal(signal.SIGCHLD, oldchldhandler)
235 270 selector.close()
236 271 return problem[0]
237 272
238 273 try:
239 274 openpipes = len(pipes)
240 275 while openpipes > 0:
241 276 for key, events in selector.select():
242 277 try:
243 res = util.pickle.load(key.fileobj)
278 res = util.pickle.load(_blockingreader(key.fileobj))
244 279 if hasretval and res[0]:
245 280 retval.update(res[1])
246 281 else:
247 282 yield res
248 283 except EOFError:
249 284 selector.unregister(key.fileobj)
250 285 key.fileobj.close()
251 286 openpipes -= 1
252 287 except IOError as e:
253 288 if e.errno == errno.EINTR:
254 289 continue
255 290 raise
256 291 except: # re-raises
257 292 killworkers()
258 293 cleanup()
259 294 raise
260 295 status = cleanup()
261 296 if status:
262 297 if status < 0:
263 298 os.kill(os.getpid(), -status)
264 299 sys.exit(status)
265 300 if hasretval:
266 301 yield True, retval
267 302
268 303
269 304 def _posixexitstatus(code):
270 305 '''convert a posix exit status into the same form returned by
271 306 os.spawnv
272 307
273 308 returns None if the process was stopped instead of exiting'''
274 309 if os.WIFEXITED(code):
275 310 return os.WEXITSTATUS(code)
276 311 elif os.WIFSIGNALED(code):
277 312 return -(os.WTERMSIG(code))
278 313
279 314
280 315 def _windowsworker(ui, func, staticargs, args, hasretval):
281 316 class Worker(threading.Thread):
282 317 def __init__(
283 318 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
284 319 ):
285 320 threading.Thread.__init__(self, *args, **kwargs)
286 321 self._taskqueue = taskqueue
287 322 self._resultqueue = resultqueue
288 323 self._func = func
289 324 self._staticargs = staticargs
290 325 self._interrupted = False
291 326 self.daemon = True
292 327 self.exception = None
293 328
294 329 def interrupt(self):
295 330 self._interrupted = True
296 331
297 332 def run(self):
298 333 try:
299 334 while not self._taskqueue.empty():
300 335 try:
301 336 args = self._taskqueue.get_nowait()
302 337 for res in self._func(*self._staticargs + (args,)):
303 338 self._resultqueue.put(res)
304 339 # threading doesn't provide a native way to
305 340 # interrupt execution. handle it manually at every
306 341 # iteration.
307 342 if self._interrupted:
308 343 return
309 344 except pycompat.queue.Empty:
310 345 break
311 346 except Exception as e:
312 347 # store the exception such that the main thread can resurface
313 348 # it as if the func was running without workers.
314 349 self.exception = e
315 350 raise
316 351
317 352 threads = []
318 353
319 354 def trykillworkers():
320 355 # Allow up to 1 second to clean worker threads nicely
321 356 cleanupend = time.time() + 1
322 357 for t in threads:
323 358 t.interrupt()
324 359 for t in threads:
325 360 remainingtime = cleanupend - time.time()
326 361 t.join(remainingtime)
327 362 if t.is_alive():
328 363 # pass over the workers joining failure. it is more
329 364 # important to surface the inital exception than the
330 365 # fact that one of workers may be processing a large
331 366 # task and does not get to handle the interruption.
332 367 ui.warn(
333 368 _(
334 369 b"failed to kill worker threads while "
335 370 b"handling an exception\n"
336 371 )
337 372 )
338 373 return
339 374
340 375 workers = _numworkers(ui)
341 376 resultqueue = pycompat.queue.Queue()
342 377 taskqueue = pycompat.queue.Queue()
343 378 retval = {}
344 379 # partition work to more pieces than workers to minimize the chance
345 380 # of uneven distribution of large tasks between the workers
346 381 for pargs in partition(args, workers * 20):
347 382 taskqueue.put(pargs)
348 383 for _i in range(workers):
349 384 t = Worker(taskqueue, resultqueue, func, staticargs)
350 385 threads.append(t)
351 386 t.start()
352 387 try:
353 388 while len(threads) > 0:
354 389 while not resultqueue.empty():
355 390 res = resultqueue.get()
356 391 if hasretval and res[0]:
357 392 retval.update(res[1])
358 393 else:
359 394 yield res
360 395 threads[0].join(0.05)
361 396 finishedthreads = [_t for _t in threads if not _t.is_alive()]
362 397 for t in finishedthreads:
363 398 if t.exception is not None:
364 399 raise t.exception
365 400 threads.remove(t)
366 401 except (Exception, KeyboardInterrupt): # re-raises
367 402 trykillworkers()
368 403 raise
369 404 while not resultqueue.empty():
370 405 res = resultqueue.get()
371 406 if hasretval and res[0]:
372 407 retval.update(res[1])
373 408 else:
374 409 yield res
375 410 if hasretval:
376 411 yield True, retval
377 412
378 413
379 414 if pycompat.iswindows:
380 415 _platformworker = _windowsworker
381 416 else:
382 417 _platformworker = _posixworker
383 418 _exitstatus = _posixexitstatus
384 419
385 420
386 421 def partition(lst, nslices):
387 422 '''partition a list into N slices of roughly equal size
388 423
389 424 The current strategy takes every Nth element from the input. If
390 425 we ever write workers that need to preserve grouping in input
391 426 we should consider allowing callers to specify a partition strategy.
392 427
393 428 mpm is not a fan of this partitioning strategy when files are involved.
394 429 In his words:
395 430
396 431 Single-threaded Mercurial makes a point of creating and visiting
397 432 files in a fixed order (alphabetical). When creating files in order,
398 433 a typical filesystem is likely to allocate them on nearby regions on
399 434 disk. Thus, when revisiting in the same order, locality is maximized
400 435 and various forms of OS and disk-level caching and read-ahead get a
401 436 chance to work.
402 437
403 438 This effect can be quite significant on spinning disks. I discovered it
404 439 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
405 440 Tarring a repo and copying it to another disk effectively randomized
406 441 the revlog ordering on disk by sorting the revlogs by hash and suddenly
407 442 performance of my kernel checkout benchmark dropped by ~10x because the
408 443 "working set" of sectors visited no longer fit in the drive's cache and
409 444 the workload switched from streaming to random I/O.
410 445
411 446 What we should really be doing is have workers read filenames from a
412 447 ordered queue. This preserves locality and also keeps any worker from
413 448 getting more than one file out of balance.
414 449 '''
415 450 for i in range(nslices):
416 451 yield lst[i::nslices]
@@ -1,134 +1,165 b''
1 1 Test UI worker interaction
2 2
3 3 $ cat > t.py <<EOF
4 4 > from __future__ import absolute_import, print_function
5 5 > import sys
6 6 > import time
7 7 > from mercurial import (
8 8 > error,
9 9 > registrar,
10 10 > ui as uimod,
11 11 > worker,
12 12 > )
13 13 > sys.unraisablehook = lambda x: None
14 14 > def abort(ui, args):
15 15 > if args[0] == 0:
16 16 > # by first worker for test stability
17 17 > raise error.Abort(b'known exception')
18 18 > return runme(ui, [])
19 19 > def exc(ui, args):
20 20 > if args[0] == 0:
21 21 > # by first worker for test stability
22 22 > raise Exception('unknown exception')
23 23 > return runme(ui, [])
24 24 > def runme(ui, args):
25 25 > for arg in args:
26 26 > ui.status(b'run\n')
27 27 > yield 1, arg
28 28 > time.sleep(0.1) # easier to trigger killworkers code path
29 29 > functable = {
30 30 > b'abort': abort,
31 31 > b'exc': exc,
32 32 > b'runme': runme,
33 33 > }
34 34 > cmdtable = {}
35 35 > command = registrar.command(cmdtable)
36 36 > @command(b'test', [], b'hg test [COST] [FUNC]')
37 37 > def t(ui, repo, cost=1.0, func=b'runme'):
38 38 > cost = float(cost)
39 39 > func = functable[func]
40 40 > ui.status(b'start\n')
41 41 > runs = worker.worker(ui, cost, func, (ui,), range(8))
42 42 > for n, i in runs:
43 43 > pass
44 44 > ui.status(b'done\n')
45 45 > EOF
46 46 $ abspath=`pwd`/t.py
47 47 $ hg init
48 48
49 49 Run tests with worker enable by forcing a heigh cost
50 50
51 51 $ hg --config "extensions.t=$abspath" test 100000.0
52 52 start
53 53 run
54 54 run
55 55 run
56 56 run
57 57 run
58 58 run
59 59 run
60 60 run
61 61 done
62 62
63 63 Run tests without worker by forcing a low cost
64 64
65 65 $ hg --config "extensions.t=$abspath" test 0.0000001
66 66 start
67 67 run
68 68 run
69 69 run
70 70 run
71 71 run
72 72 run
73 73 run
74 74 run
75 75 done
76 76
77 77 #if no-windows
78 78
79 79 Known exception should be caught, but printed if --traceback is enabled
80 80
81 81 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
82 82 > test 100000.0 abort 2>&1
83 83 start
84 84 abort: known exception
85 85 [255]
86 86
87 87 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
88 88 > test 100000.0 abort --traceback 2>&1 | egrep '(SystemExit|Abort)'
89 89 raise error.Abort(b'known exception')
90 90 mercurial.error.Abort: known exception (py3 !)
91 91 Abort: known exception (no-py3 !)
92 92 SystemExit: 255
93 93
94 94 Traceback must be printed for unknown exceptions
95 95
96 96 $ hg --config "extensions.t=$abspath" --config 'worker.numcpus=8' \
97 97 > test 100000.0 exc 2>&1 | grep '^Exception'
98 98 Exception: unknown exception
99 99
100 100 Workers should not do cleanups in all cases
101 101
102 102 $ cat > $TESTTMP/detectcleanup.py <<EOF
103 103 > from __future__ import absolute_import
104 104 > import atexit
105 105 > import os
106 106 > import sys
107 107 > import time
108 108 > sys.unraisablehook = lambda x: None
109 109 > oldfork = os.fork
110 110 > count = 0
111 111 > parentpid = os.getpid()
112 112 > def delayedfork():
113 113 > global count
114 114 > count += 1
115 115 > pid = oldfork()
116 116 > # make it easier to test SIGTERM hitting other workers when they have
117 117 > # not set up error handling yet.
118 118 > if count > 1 and pid == 0:
119 119 > time.sleep(0.1)
120 120 > return pid
121 121 > os.fork = delayedfork
122 122 > def cleanup():
123 123 > if os.getpid() != parentpid:
124 124 > os.write(1, 'should never happen\n')
125 125 > atexit.register(cleanup)
126 126 > EOF
127 127
128 128 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
129 129 > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
130 130 start
131 131 abort: known exception
132 132 [255]
133 133
134 Do not crash on partially read result
135
136 $ cat > $TESTTMP/detecttruncated.py <<EOF
137 > from __future__ import absolute_import
138 > import os
139 > import sys
140 > import time
141 > sys.unraisablehook = lambda x: None
142 > oldwrite = os.write
143 > def splitwrite(fd, string):
144 > ret = oldwrite(fd, string[:9])
145 > if ret == 9:
146 > time.sleep(0.1)
147 > ret += oldwrite(fd, string[9:])
148 > return ret
149 > os.write = splitwrite
150 > EOF
151
152 $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
153 > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0
154 start
155 run
156 run
157 run
158 run
159 run
160 run
161 run
162 run
163 done
164
134 165 #endif
General Comments 0
You need to be logged in to leave comments. Login now