##// END OF EJS Templates
worker: POSIX only supports workers from main thread (issue6460)...
Joerg Sonnenberger -
r46857:a42502e9 default
parent child Browse files
Show More
@@ -1,455 +1,466 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 util,
31 31 )
32 32
33 33
34 34 def countcpus():
35 35 '''try to count the number of CPUs on the system'''
36 36
37 37 # posix
38 38 try:
39 39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 40 if n > 0:
41 41 return n
42 42 except (AttributeError, ValueError):
43 43 pass
44 44
45 45 # windows
46 46 try:
47 47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 48 if n > 0:
49 49 return n
50 50 except (KeyError, ValueError):
51 51 pass
52 52
53 53 return 1
54 54
55 55
56 56 def _numworkers(ui):
57 57 s = ui.config(b'worker', b'numcpus')
58 58 if s:
59 59 try:
60 60 n = int(s)
61 61 if n >= 1:
62 62 return n
63 63 except ValueError:
64 64 raise error.Abort(_(b'number of cpus must be an integer'))
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 68 if pycompat.ispy3:
69 69
70 def ismainthread():
71 return threading.current_thread() == threading.main_thread()
72
70 73 class _blockingreader(object):
71 74 def __init__(self, wrapped):
72 75 self._wrapped = wrapped
73 76
74 77 # Do NOT implement readinto() by making it delegate to
75 78 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
76 79 # with just read() and readline(), so we don't need to implement it.
77 80
78 81 def readline(self):
79 82 return self._wrapped.readline()
80 83
81 84 # issue multiple reads until size is fulfilled
82 85 def read(self, size=-1):
83 86 if size < 0:
84 87 return self._wrapped.readall()
85 88
86 89 buf = bytearray(size)
87 90 view = memoryview(buf)
88 91 pos = 0
89 92
90 93 while pos < size:
91 94 ret = self._wrapped.readinto(view[pos:])
92 95 if not ret:
93 96 break
94 97 pos += ret
95 98
96 99 del view
97 100 del buf[pos:]
98 101 return bytes(buf)
99 102
100 103
101 104 else:
102 105
106 def ismainthread():
107 return isinstance(threading.current_thread(), threading._MainThread)
108
103 109 def _blockingreader(wrapped):
104 110 return wrapped
105 111
106 112
107 113 if pycompat.isposix or pycompat.iswindows:
108 114 _STARTUP_COST = 0.01
109 115 # The Windows worker is thread based. If tasks are CPU bound, threads
110 116 # in the presence of the GIL result in excessive context switching and
111 117 # this overhead can slow down execution.
112 118 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
113 119 else:
114 120 _STARTUP_COST = 1e30
115 121 _DISALLOW_THREAD_UNSAFE = False
116 122
117 123
118 124 def worthwhile(ui, costperop, nops, threadsafe=True):
119 125 """try to determine whether the benefit of multiple processes can
120 126 outweigh the cost of starting them"""
121 127
122 128 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
123 129 return False
124 130
125 131 linear = costperop * nops
126 132 workers = _numworkers(ui)
127 133 benefit = linear - (_STARTUP_COST * workers + linear / workers)
128 134 return benefit >= 0.15
129 135
130 136
131 137 def worker(
132 138 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
133 139 ):
134 140 """run a function, possibly in parallel in multiple worker
135 141 processes.
136 142
137 143 returns a progress iterator
138 144
139 145 costperarg - cost of a single task
140 146
141 147 func - function to run. It is expected to return a progress iterator.
142 148
143 149 staticargs - arguments to pass to every invocation of the function
144 150
145 151 args - arguments to split into chunks, to pass to individual
146 152 workers
147 153
148 154 hasretval - when True, func and the current function return an progress
149 155 iterator then a dict (encoded as an iterator that yield many (False, ..)
150 156 then a (True, dict)). The dicts are joined in some arbitrary order, so
151 157 overlapping keys are a bad idea.
152 158
153 159 threadsafe - whether work items are thread safe and can be executed using
154 160 a thread-based worker. Should be disabled for CPU heavy tasks that don't
155 161 release the GIL.
156 162 """
157 163 enabled = ui.configbool(b'worker', b'enabled')
164 if enabled and _platformworker is _posixworker and not ismainthread():
165 # The POSIX worker has to install a handler for SIGCHLD.
166 # Python up to 3.9 only allows this in the main thread.
167 enabled = False
168
158 169 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
159 170 return _platformworker(ui, func, staticargs, args, hasretval)
160 171 return func(*staticargs + (args,))
161 172
162 173
163 174 def _posixworker(ui, func, staticargs, args, hasretval):
164 175 workers = _numworkers(ui)
165 176 oldhandler = signal.getsignal(signal.SIGINT)
166 177 signal.signal(signal.SIGINT, signal.SIG_IGN)
167 178 pids, problem = set(), [0]
168 179
169 180 def killworkers():
170 181 # unregister SIGCHLD handler as all children will be killed. This
171 182 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
172 183 # could be updated while iterating, which would cause inconsistency.
173 184 signal.signal(signal.SIGCHLD, oldchldhandler)
174 185 # if one worker bails, there's no good reason to wait for the rest
175 186 for p in pids:
176 187 try:
177 188 os.kill(p, signal.SIGTERM)
178 189 except OSError as err:
179 190 if err.errno != errno.ESRCH:
180 191 raise
181 192
182 193 def waitforworkers(blocking=True):
183 194 for pid in pids.copy():
184 195 p = st = 0
185 196 while True:
186 197 try:
187 198 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
188 199 break
189 200 except OSError as e:
190 201 if e.errno == errno.EINTR:
191 202 continue
192 203 elif e.errno == errno.ECHILD:
193 204 # child would already be reaped, but pids yet been
194 205 # updated (maybe interrupted just after waitpid)
195 206 pids.discard(pid)
196 207 break
197 208 else:
198 209 raise
199 210 if not p:
200 211 # skip subsequent steps, because child process should
201 212 # be still running in this case
202 213 continue
203 214 pids.discard(p)
204 215 st = _exitstatus(st)
205 216 if st and not problem[0]:
206 217 problem[0] = st
207 218
208 219 def sigchldhandler(signum, frame):
209 220 waitforworkers(blocking=False)
210 221 if problem[0]:
211 222 killworkers()
212 223
213 224 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
214 225 ui.flush()
215 226 parentpid = os.getpid()
216 227 pipes = []
217 228 retval = {}
218 229 for pargs in partition(args, min(workers, len(args))):
219 230 # Every worker gets its own pipe to send results on, so we don't have to
220 231 # implement atomic writes larger than PIPE_BUF. Each forked process has
221 232 # its own pipe's descriptors in the local variables, and the parent
222 233 # process has the full list of pipe descriptors (and it doesn't really
223 234 # care what order they're in).
224 235 rfd, wfd = os.pipe()
225 236 pipes.append((rfd, wfd))
226 237 # make sure we use os._exit in all worker code paths. otherwise the
227 238 # worker may do some clean-ups which could cause surprises like
228 239 # deadlock. see sshpeer.cleanup for example.
229 240 # override error handling *before* fork. this is necessary because
230 241 # exception (signal) may arrive after fork, before "pid =" assignment
231 242 # completes, and other exception handler (dispatch.py) can lead to
232 243 # unexpected code path without os._exit.
233 244 ret = -1
234 245 try:
235 246 pid = os.fork()
236 247 if pid == 0:
237 248 signal.signal(signal.SIGINT, oldhandler)
238 249 signal.signal(signal.SIGCHLD, oldchldhandler)
239 250
240 251 def workerfunc():
241 252 for r, w in pipes[:-1]:
242 253 os.close(r)
243 254 os.close(w)
244 255 os.close(rfd)
245 256 for result in func(*(staticargs + (pargs,))):
246 257 os.write(wfd, util.pickle.dumps(result))
247 258 return 0
248 259
249 260 ret = scmutil.callcatch(ui, workerfunc)
250 261 except: # parent re-raises, child never returns
251 262 if os.getpid() == parentpid:
252 263 raise
253 264 exctype = sys.exc_info()[0]
254 265 force = not issubclass(exctype, KeyboardInterrupt)
255 266 ui.traceback(force=force)
256 267 finally:
257 268 if os.getpid() != parentpid:
258 269 try:
259 270 ui.flush()
260 271 except: # never returns, no re-raises
261 272 pass
262 273 finally:
263 274 os._exit(ret & 255)
264 275 pids.add(pid)
265 276 selector = selectors.DefaultSelector()
266 277 for rfd, wfd in pipes:
267 278 os.close(wfd)
268 279 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
269 280
270 281 def cleanup():
271 282 signal.signal(signal.SIGINT, oldhandler)
272 283 waitforworkers()
273 284 signal.signal(signal.SIGCHLD, oldchldhandler)
274 285 selector.close()
275 286 return problem[0]
276 287
277 288 try:
278 289 openpipes = len(pipes)
279 290 while openpipes > 0:
280 291 for key, events in selector.select():
281 292 try:
282 293 res = util.pickle.load(_blockingreader(key.fileobj))
283 294 if hasretval and res[0]:
284 295 retval.update(res[1])
285 296 else:
286 297 yield res
287 298 except EOFError:
288 299 selector.unregister(key.fileobj)
289 300 key.fileobj.close()
290 301 openpipes -= 1
291 302 except IOError as e:
292 303 if e.errno == errno.EINTR:
293 304 continue
294 305 raise
295 306 except: # re-raises
296 307 killworkers()
297 308 cleanup()
298 309 raise
299 310 status = cleanup()
300 311 if status:
301 312 if status < 0:
302 313 os.kill(os.getpid(), -status)
303 314 raise error.WorkerError(status)
304 315 if hasretval:
305 316 yield True, retval
306 317
307 318
308 319 def _posixexitstatus(code):
309 320 """convert a posix exit status into the same form returned by
310 321 os.spawnv
311 322
312 323 returns None if the process was stopped instead of exiting"""
313 324 if os.WIFEXITED(code):
314 325 return os.WEXITSTATUS(code)
315 326 elif os.WIFSIGNALED(code):
316 327 return -(os.WTERMSIG(code))
317 328
318 329
319 330 def _windowsworker(ui, func, staticargs, args, hasretval):
320 331 class Worker(threading.Thread):
321 332 def __init__(
322 333 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
323 334 ):
324 335 threading.Thread.__init__(self, *args, **kwargs)
325 336 self._taskqueue = taskqueue
326 337 self._resultqueue = resultqueue
327 338 self._func = func
328 339 self._staticargs = staticargs
329 340 self._interrupted = False
330 341 self.daemon = True
331 342 self.exception = None
332 343
333 344 def interrupt(self):
334 345 self._interrupted = True
335 346
336 347 def run(self):
337 348 try:
338 349 while not self._taskqueue.empty():
339 350 try:
340 351 args = self._taskqueue.get_nowait()
341 352 for res in self._func(*self._staticargs + (args,)):
342 353 self._resultqueue.put(res)
343 354 # threading doesn't provide a native way to
344 355 # interrupt execution. handle it manually at every
345 356 # iteration.
346 357 if self._interrupted:
347 358 return
348 359 except pycompat.queue.Empty:
349 360 break
350 361 except Exception as e:
351 362 # store the exception such that the main thread can resurface
352 363 # it as if the func was running without workers.
353 364 self.exception = e
354 365 raise
355 366
356 367 threads = []
357 368
358 369 def trykillworkers():
359 370 # Allow up to 1 second to clean worker threads nicely
360 371 cleanupend = time.time() + 1
361 372 for t in threads:
362 373 t.interrupt()
363 374 for t in threads:
364 375 remainingtime = cleanupend - time.time()
365 376 t.join(remainingtime)
366 377 if t.is_alive():
367 378 # pass over the workers joining failure. it is more
368 379 # important to surface the inital exception than the
369 380 # fact that one of workers may be processing a large
370 381 # task and does not get to handle the interruption.
371 382 ui.warn(
372 383 _(
373 384 b"failed to kill worker threads while "
374 385 b"handling an exception\n"
375 386 )
376 387 )
377 388 return
378 389
379 390 workers = _numworkers(ui)
380 391 resultqueue = pycompat.queue.Queue()
381 392 taskqueue = pycompat.queue.Queue()
382 393 retval = {}
383 394 # partition work to more pieces than workers to minimize the chance
384 395 # of uneven distribution of large tasks between the workers
385 396 for pargs in partition(args, workers * 20):
386 397 taskqueue.put(pargs)
387 398 for _i in range(workers):
388 399 t = Worker(taskqueue, resultqueue, func, staticargs)
389 400 threads.append(t)
390 401 t.start()
391 402 try:
392 403 while len(threads) > 0:
393 404 while not resultqueue.empty():
394 405 res = resultqueue.get()
395 406 if hasretval and res[0]:
396 407 retval.update(res[1])
397 408 else:
398 409 yield res
399 410 threads[0].join(0.05)
400 411 finishedthreads = [_t for _t in threads if not _t.is_alive()]
401 412 for t in finishedthreads:
402 413 if t.exception is not None:
403 414 raise t.exception
404 415 threads.remove(t)
405 416 except (Exception, KeyboardInterrupt): # re-raises
406 417 trykillworkers()
407 418 raise
408 419 while not resultqueue.empty():
409 420 res = resultqueue.get()
410 421 if hasretval and res[0]:
411 422 retval.update(res[1])
412 423 else:
413 424 yield res
414 425 if hasretval:
415 426 yield True, retval
416 427
417 428
418 429 if pycompat.iswindows:
419 430 _platformworker = _windowsworker
420 431 else:
421 432 _platformworker = _posixworker
422 433 _exitstatus = _posixexitstatus
423 434
424 435
425 436 def partition(lst, nslices):
426 437 """partition a list into N slices of roughly equal size
427 438
428 439 The current strategy takes every Nth element from the input. If
429 440 we ever write workers that need to preserve grouping in input
430 441 we should consider allowing callers to specify a partition strategy.
431 442
432 443 mpm is not a fan of this partitioning strategy when files are involved.
433 444 In his words:
434 445
435 446 Single-threaded Mercurial makes a point of creating and visiting
436 447 files in a fixed order (alphabetical). When creating files in order,
437 448 a typical filesystem is likely to allocate them on nearby regions on
438 449 disk. Thus, when revisiting in the same order, locality is maximized
439 450 and various forms of OS and disk-level caching and read-ahead get a
440 451 chance to work.
441 452
442 453 This effect can be quite significant on spinning disks. I discovered it
443 454 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
444 455 Tarring a repo and copying it to another disk effectively randomized
445 456 the revlog ordering on disk by sorting the revlogs by hash and suddenly
446 457 performance of my kernel checkout benchmark dropped by ~10x because the
447 458 "working set" of sectors visited no longer fit in the drive's cache and
448 459 the workload switched from streaming to random I/O.
449 460
450 461 What we should really be doing is have workers read filenames from a
451 462 ordered queue. This preserves locality and also keeps any worker from
452 463 getting more than one file out of balance.
453 464 """
454 465 for i in range(nslices):
455 466 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now