##// END OF EJS Templates
posixworker: avoid creating workers that end up getting no work...
Martin von Zweigbergk -
r45936:26eb62bd default
parent child Browse files
Show More
@@ -1,451 +1,451 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 util,
31 31 )
32 32
33 33
34 34 def countcpus():
35 35 '''try to count the number of CPUs on the system'''
36 36
37 37 # posix
38 38 try:
39 39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 40 if n > 0:
41 41 return n
42 42 except (AttributeError, ValueError):
43 43 pass
44 44
45 45 # windows
46 46 try:
47 47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 48 if n > 0:
49 49 return n
50 50 except (KeyError, ValueError):
51 51 pass
52 52
53 53 return 1
54 54
55 55
56 56 def _numworkers(ui):
57 57 s = ui.config(b'worker', b'numcpus')
58 58 if s:
59 59 try:
60 60 n = int(s)
61 61 if n >= 1:
62 62 return n
63 63 except ValueError:
64 64 raise error.Abort(_(b'number of cpus must be an integer'))
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 68 if pycompat.ispy3:
69 69
70 70 class _blockingreader(object):
71 71 def __init__(self, wrapped):
72 72 self._wrapped = wrapped
73 73
74 74 def __getattr__(self, attr):
75 75 return getattr(self._wrapped, attr)
76 76
77 77 # issue multiple reads until size is fulfilled
78 78 def read(self, size=-1):
79 79 if size < 0:
80 80 return self._wrapped.readall()
81 81
82 82 buf = bytearray(size)
83 83 view = memoryview(buf)
84 84 pos = 0
85 85
86 86 while pos < size:
87 87 ret = self._wrapped.readinto(view[pos:])
88 88 if not ret:
89 89 break
90 90 pos += ret
91 91
92 92 del view
93 93 del buf[pos:]
94 94 return buf
95 95
96 96
97 97 else:
98 98
99 99 def _blockingreader(wrapped):
100 100 return wrapped
101 101
102 102
103 103 if pycompat.isposix or pycompat.iswindows:
104 104 _STARTUP_COST = 0.01
105 105 # The Windows worker is thread based. If tasks are CPU bound, threads
106 106 # in the presence of the GIL result in excessive context switching and
107 107 # this overhead can slow down execution.
108 108 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
109 109 else:
110 110 _STARTUP_COST = 1e30
111 111 _DISALLOW_THREAD_UNSAFE = False
112 112
113 113
114 114 def worthwhile(ui, costperop, nops, threadsafe=True):
115 115 '''try to determine whether the benefit of multiple processes can
116 116 outweigh the cost of starting them'''
117 117
118 118 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
119 119 return False
120 120
121 121 linear = costperop * nops
122 122 workers = _numworkers(ui)
123 123 benefit = linear - (_STARTUP_COST * workers + linear / workers)
124 124 return benefit >= 0.15
125 125
126 126
127 127 def worker(
128 128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
129 129 ):
130 130 '''run a function, possibly in parallel in multiple worker
131 131 processes.
132 132
133 133 returns a progress iterator
134 134
135 135 costperarg - cost of a single task
136 136
137 137 func - function to run. It is expected to return a progress iterator.
138 138
139 139 staticargs - arguments to pass to every invocation of the function
140 140
141 141 args - arguments to split into chunks, to pass to individual
142 142 workers
143 143
144 144 hasretval - when True, func and the current function return an progress
145 145 iterator then a dict (encoded as an iterator that yield many (False, ..)
146 146 then a (True, dict)). The dicts are joined in some arbitrary order, so
147 147 overlapping keys are a bad idea.
148 148
149 149 threadsafe - whether work items are thread safe and can be executed using
150 150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 151 release the GIL.
152 152 '''
153 153 enabled = ui.configbool(b'worker', b'enabled')
154 154 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
155 155 return _platformworker(ui, func, staticargs, args, hasretval)
156 156 return func(*staticargs + (args,))
157 157
158 158
159 159 def _posixworker(ui, func, staticargs, args, hasretval):
160 160 workers = _numworkers(ui)
161 161 oldhandler = signal.getsignal(signal.SIGINT)
162 162 signal.signal(signal.SIGINT, signal.SIG_IGN)
163 163 pids, problem = set(), [0]
164 164
165 165 def killworkers():
166 166 # unregister SIGCHLD handler as all children will be killed. This
167 167 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
168 168 # could be updated while iterating, which would cause inconsistency.
169 169 signal.signal(signal.SIGCHLD, oldchldhandler)
170 170 # if one worker bails, there's no good reason to wait for the rest
171 171 for p in pids:
172 172 try:
173 173 os.kill(p, signal.SIGTERM)
174 174 except OSError as err:
175 175 if err.errno != errno.ESRCH:
176 176 raise
177 177
178 178 def waitforworkers(blocking=True):
179 179 for pid in pids.copy():
180 180 p = st = 0
181 181 while True:
182 182 try:
183 183 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
184 184 break
185 185 except OSError as e:
186 186 if e.errno == errno.EINTR:
187 187 continue
188 188 elif e.errno == errno.ECHILD:
189 189 # child would already be reaped, but pids yet been
190 190 # updated (maybe interrupted just after waitpid)
191 191 pids.discard(pid)
192 192 break
193 193 else:
194 194 raise
195 195 if not p:
196 196 # skip subsequent steps, because child process should
197 197 # be still running in this case
198 198 continue
199 199 pids.discard(p)
200 200 st = _exitstatus(st)
201 201 if st and not problem[0]:
202 202 problem[0] = st
203 203
204 204 def sigchldhandler(signum, frame):
205 205 waitforworkers(blocking=False)
206 206 if problem[0]:
207 207 killworkers()
208 208
209 209 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
210 210 ui.flush()
211 211 parentpid = os.getpid()
212 212 pipes = []
213 213 retval = {}
214 for pargs in partition(args, workers):
214 for pargs in partition(args, min(workers, len(args))):
215 215 # Every worker gets its own pipe to send results on, so we don't have to
216 216 # implement atomic writes larger than PIPE_BUF. Each forked process has
217 217 # its own pipe's descriptors in the local variables, and the parent
218 218 # process has the full list of pipe descriptors (and it doesn't really
219 219 # care what order they're in).
220 220 rfd, wfd = os.pipe()
221 221 pipes.append((rfd, wfd))
222 222 # make sure we use os._exit in all worker code paths. otherwise the
223 223 # worker may do some clean-ups which could cause surprises like
224 224 # deadlock. see sshpeer.cleanup for example.
225 225 # override error handling *before* fork. this is necessary because
226 226 # exception (signal) may arrive after fork, before "pid =" assignment
227 227 # completes, and other exception handler (dispatch.py) can lead to
228 228 # unexpected code path without os._exit.
229 229 ret = -1
230 230 try:
231 231 pid = os.fork()
232 232 if pid == 0:
233 233 signal.signal(signal.SIGINT, oldhandler)
234 234 signal.signal(signal.SIGCHLD, oldchldhandler)
235 235
236 236 def workerfunc():
237 237 for r, w in pipes[:-1]:
238 238 os.close(r)
239 239 os.close(w)
240 240 os.close(rfd)
241 241 for result in func(*(staticargs + (pargs,))):
242 242 os.write(wfd, util.pickle.dumps(result))
243 243 return 0
244 244
245 245 ret = scmutil.callcatch(ui, workerfunc)
246 246 except: # parent re-raises, child never returns
247 247 if os.getpid() == parentpid:
248 248 raise
249 249 exctype = sys.exc_info()[0]
250 250 force = not issubclass(exctype, KeyboardInterrupt)
251 251 ui.traceback(force=force)
252 252 finally:
253 253 if os.getpid() != parentpid:
254 254 try:
255 255 ui.flush()
256 256 except: # never returns, no re-raises
257 257 pass
258 258 finally:
259 259 os._exit(ret & 255)
260 260 pids.add(pid)
261 261 selector = selectors.DefaultSelector()
262 262 for rfd, wfd in pipes:
263 263 os.close(wfd)
264 264 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
265 265
266 266 def cleanup():
267 267 signal.signal(signal.SIGINT, oldhandler)
268 268 waitforworkers()
269 269 signal.signal(signal.SIGCHLD, oldchldhandler)
270 270 selector.close()
271 271 return problem[0]
272 272
273 273 try:
274 274 openpipes = len(pipes)
275 275 while openpipes > 0:
276 276 for key, events in selector.select():
277 277 try:
278 278 res = util.pickle.load(_blockingreader(key.fileobj))
279 279 if hasretval and res[0]:
280 280 retval.update(res[1])
281 281 else:
282 282 yield res
283 283 except EOFError:
284 284 selector.unregister(key.fileobj)
285 285 key.fileobj.close()
286 286 openpipes -= 1
287 287 except IOError as e:
288 288 if e.errno == errno.EINTR:
289 289 continue
290 290 raise
291 291 except: # re-raises
292 292 killworkers()
293 293 cleanup()
294 294 raise
295 295 status = cleanup()
296 296 if status:
297 297 if status < 0:
298 298 os.kill(os.getpid(), -status)
299 299 sys.exit(status)
300 300 if hasretval:
301 301 yield True, retval
302 302
303 303
304 304 def _posixexitstatus(code):
305 305 '''convert a posix exit status into the same form returned by
306 306 os.spawnv
307 307
308 308 returns None if the process was stopped instead of exiting'''
309 309 if os.WIFEXITED(code):
310 310 return os.WEXITSTATUS(code)
311 311 elif os.WIFSIGNALED(code):
312 312 return -(os.WTERMSIG(code))
313 313
314 314
315 315 def _windowsworker(ui, func, staticargs, args, hasretval):
316 316 class Worker(threading.Thread):
317 317 def __init__(
318 318 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
319 319 ):
320 320 threading.Thread.__init__(self, *args, **kwargs)
321 321 self._taskqueue = taskqueue
322 322 self._resultqueue = resultqueue
323 323 self._func = func
324 324 self._staticargs = staticargs
325 325 self._interrupted = False
326 326 self.daemon = True
327 327 self.exception = None
328 328
329 329 def interrupt(self):
330 330 self._interrupted = True
331 331
332 332 def run(self):
333 333 try:
334 334 while not self._taskqueue.empty():
335 335 try:
336 336 args = self._taskqueue.get_nowait()
337 337 for res in self._func(*self._staticargs + (args,)):
338 338 self._resultqueue.put(res)
339 339 # threading doesn't provide a native way to
340 340 # interrupt execution. handle it manually at every
341 341 # iteration.
342 342 if self._interrupted:
343 343 return
344 344 except pycompat.queue.Empty:
345 345 break
346 346 except Exception as e:
347 347 # store the exception such that the main thread can resurface
348 348 # it as if the func was running without workers.
349 349 self.exception = e
350 350 raise
351 351
352 352 threads = []
353 353
354 354 def trykillworkers():
355 355 # Allow up to 1 second to clean worker threads nicely
356 356 cleanupend = time.time() + 1
357 357 for t in threads:
358 358 t.interrupt()
359 359 for t in threads:
360 360 remainingtime = cleanupend - time.time()
361 361 t.join(remainingtime)
362 362 if t.is_alive():
363 363 # pass over the workers joining failure. it is more
364 364 # important to surface the inital exception than the
365 365 # fact that one of workers may be processing a large
366 366 # task and does not get to handle the interruption.
367 367 ui.warn(
368 368 _(
369 369 b"failed to kill worker threads while "
370 370 b"handling an exception\n"
371 371 )
372 372 )
373 373 return
374 374
375 375 workers = _numworkers(ui)
376 376 resultqueue = pycompat.queue.Queue()
377 377 taskqueue = pycompat.queue.Queue()
378 378 retval = {}
379 379 # partition work to more pieces than workers to minimize the chance
380 380 # of uneven distribution of large tasks between the workers
381 381 for pargs in partition(args, workers * 20):
382 382 taskqueue.put(pargs)
383 383 for _i in range(workers):
384 384 t = Worker(taskqueue, resultqueue, func, staticargs)
385 385 threads.append(t)
386 386 t.start()
387 387 try:
388 388 while len(threads) > 0:
389 389 while not resultqueue.empty():
390 390 res = resultqueue.get()
391 391 if hasretval and res[0]:
392 392 retval.update(res[1])
393 393 else:
394 394 yield res
395 395 threads[0].join(0.05)
396 396 finishedthreads = [_t for _t in threads if not _t.is_alive()]
397 397 for t in finishedthreads:
398 398 if t.exception is not None:
399 399 raise t.exception
400 400 threads.remove(t)
401 401 except (Exception, KeyboardInterrupt): # re-raises
402 402 trykillworkers()
403 403 raise
404 404 while not resultqueue.empty():
405 405 res = resultqueue.get()
406 406 if hasretval and res[0]:
407 407 retval.update(res[1])
408 408 else:
409 409 yield res
410 410 if hasretval:
411 411 yield True, retval
412 412
413 413
414 414 if pycompat.iswindows:
415 415 _platformworker = _windowsworker
416 416 else:
417 417 _platformworker = _posixworker
418 418 _exitstatus = _posixexitstatus
419 419
420 420
421 421 def partition(lst, nslices):
422 422 '''partition a list into N slices of roughly equal size
423 423
424 424 The current strategy takes every Nth element from the input. If
425 425 we ever write workers that need to preserve grouping in input
426 426 we should consider allowing callers to specify a partition strategy.
427 427
428 428 mpm is not a fan of this partitioning strategy when files are involved.
429 429 In his words:
430 430
431 431 Single-threaded Mercurial makes a point of creating and visiting
432 432 files in a fixed order (alphabetical). When creating files in order,
433 433 a typical filesystem is likely to allocate them on nearby regions on
434 434 disk. Thus, when revisiting in the same order, locality is maximized
435 435 and various forms of OS and disk-level caching and read-ahead get a
436 436 chance to work.
437 437
438 438 This effect can be quite significant on spinning disks. I discovered it
439 439 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
440 440 Tarring a repo and copying it to another disk effectively randomized
441 441 the revlog ordering on disk by sorting the revlogs by hash and suddenly
442 442 performance of my kernel checkout benchmark dropped by ~10x because the
443 443 "working set" of sectors visited no longer fit in the drive's cache and
444 444 the workload switched from streaming to random I/O.
445 445
446 446 What we should really be doing is have workers read filenames from a
447 447 ordered queue. This preserves locality and also keeps any worker from
448 448 getting more than one file out of balance.
449 449 '''
450 450 for i in range(nslices):
451 451 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now