##// END OF EJS Templates
worker: Use buffered input from the pickle stream...
Jan Alexander Steffens (heftig) -
r44718:cb52e619 stable
parent child Browse files
Show More
@@ -1,416 +1,416 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 util,
31 31 )
32 32
33 33
34 34 def countcpus():
35 35 '''try to count the number of CPUs on the system'''
36 36
37 37 # posix
38 38 try:
39 39 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
40 40 if n > 0:
41 41 return n
42 42 except (AttributeError, ValueError):
43 43 pass
44 44
45 45 # windows
46 46 try:
47 47 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
48 48 if n > 0:
49 49 return n
50 50 except (KeyError, ValueError):
51 51 pass
52 52
53 53 return 1
54 54
55 55
56 56 def _numworkers(ui):
57 57 s = ui.config(b'worker', b'numcpus')
58 58 if s:
59 59 try:
60 60 n = int(s)
61 61 if n >= 1:
62 62 return n
63 63 except ValueError:
64 64 raise error.Abort(_(b'number of cpus must be an integer'))
65 65 return min(max(countcpus(), 4), 32)
66 66
67 67
68 68 if pycompat.isposix or pycompat.iswindows:
69 69 _STARTUP_COST = 0.01
70 70 # The Windows worker is thread based. If tasks are CPU bound, threads
71 71 # in the presence of the GIL result in excessive context switching and
72 72 # this overhead can slow down execution.
73 73 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
74 74 else:
75 75 _STARTUP_COST = 1e30
76 76 _DISALLOW_THREAD_UNSAFE = False
77 77
78 78
79 79 def worthwhile(ui, costperop, nops, threadsafe=True):
80 80 '''try to determine whether the benefit of multiple processes can
81 81 outweigh the cost of starting them'''
82 82
83 83 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
84 84 return False
85 85
86 86 linear = costperop * nops
87 87 workers = _numworkers(ui)
88 88 benefit = linear - (_STARTUP_COST * workers + linear / workers)
89 89 return benefit >= 0.15
90 90
91 91
92 92 def worker(
93 93 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
94 94 ):
95 95 '''run a function, possibly in parallel in multiple worker
96 96 processes.
97 97
98 98 returns a progress iterator
99 99
100 100 costperarg - cost of a single task
101 101
102 102 func - function to run. It is expected to return a progress iterator.
103 103
104 104 staticargs - arguments to pass to every invocation of the function
105 105
106 106 args - arguments to split into chunks, to pass to individual
107 107 workers
108 108
109 109 hasretval - when True, func and the current function return an progress
110 110 iterator then a dict (encoded as an iterator that yield many (False, ..)
111 111 then a (True, dict)). The dicts are joined in some arbitrary order, so
112 112 overlapping keys are a bad idea.
113 113
114 114 threadsafe - whether work items are thread safe and can be executed using
115 115 a thread-based worker. Should be disabled for CPU heavy tasks that don't
116 116 release the GIL.
117 117 '''
118 118 enabled = ui.configbool(b'worker', b'enabled')
119 119 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
120 120 return _platformworker(ui, func, staticargs, args, hasretval)
121 121 return func(*staticargs + (args,))
122 122
123 123
124 124 def _posixworker(ui, func, staticargs, args, hasretval):
125 125 workers = _numworkers(ui)
126 126 oldhandler = signal.getsignal(signal.SIGINT)
127 127 signal.signal(signal.SIGINT, signal.SIG_IGN)
128 128 pids, problem = set(), [0]
129 129
130 130 def killworkers():
131 131 # unregister SIGCHLD handler as all children will be killed. This
132 132 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
133 133 # could be updated while iterating, which would cause inconsistency.
134 134 signal.signal(signal.SIGCHLD, oldchldhandler)
135 135 # if one worker bails, there's no good reason to wait for the rest
136 136 for p in pids:
137 137 try:
138 138 os.kill(p, signal.SIGTERM)
139 139 except OSError as err:
140 140 if err.errno != errno.ESRCH:
141 141 raise
142 142
143 143 def waitforworkers(blocking=True):
144 144 for pid in pids.copy():
145 145 p = st = 0
146 146 while True:
147 147 try:
148 148 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
149 149 break
150 150 except OSError as e:
151 151 if e.errno == errno.EINTR:
152 152 continue
153 153 elif e.errno == errno.ECHILD:
154 154 # child would already be reaped, but pids yet been
155 155 # updated (maybe interrupted just after waitpid)
156 156 pids.discard(pid)
157 157 break
158 158 else:
159 159 raise
160 160 if not p:
161 161 # skip subsequent steps, because child process should
162 162 # be still running in this case
163 163 continue
164 164 pids.discard(p)
165 165 st = _exitstatus(st)
166 166 if st and not problem[0]:
167 167 problem[0] = st
168 168
169 169 def sigchldhandler(signum, frame):
170 170 waitforworkers(blocking=False)
171 171 if problem[0]:
172 172 killworkers()
173 173
174 174 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
175 175 ui.flush()
176 176 parentpid = os.getpid()
177 177 pipes = []
178 178 retval = {}
179 179 for pargs in partition(args, workers):
180 180 # Every worker gets its own pipe to send results on, so we don't have to
181 181 # implement atomic writes larger than PIPE_BUF. Each forked process has
182 182 # its own pipe's descriptors in the local variables, and the parent
183 183 # process has the full list of pipe descriptors (and it doesn't really
184 184 # care what order they're in).
185 185 rfd, wfd = os.pipe()
186 186 pipes.append((rfd, wfd))
187 187 # make sure we use os._exit in all worker code paths. otherwise the
188 188 # worker may do some clean-ups which could cause surprises like
189 189 # deadlock. see sshpeer.cleanup for example.
190 190 # override error handling *before* fork. this is necessary because
191 191 # exception (signal) may arrive after fork, before "pid =" assignment
192 192 # completes, and other exception handler (dispatch.py) can lead to
193 193 # unexpected code path without os._exit.
194 194 ret = -1
195 195 try:
196 196 pid = os.fork()
197 197 if pid == 0:
198 198 signal.signal(signal.SIGINT, oldhandler)
199 199 signal.signal(signal.SIGCHLD, oldchldhandler)
200 200
201 201 def workerfunc():
202 202 for r, w in pipes[:-1]:
203 203 os.close(r)
204 204 os.close(w)
205 205 os.close(rfd)
206 206 for result in func(*(staticargs + (pargs,))):
207 207 os.write(wfd, util.pickle.dumps(result))
208 208 return 0
209 209
210 210 ret = scmutil.callcatch(ui, workerfunc)
211 211 except: # parent re-raises, child never returns
212 212 if os.getpid() == parentpid:
213 213 raise
214 214 exctype = sys.exc_info()[0]
215 215 force = not issubclass(exctype, KeyboardInterrupt)
216 216 ui.traceback(force=force)
217 217 finally:
218 218 if os.getpid() != parentpid:
219 219 try:
220 220 ui.flush()
221 221 except: # never returns, no re-raises
222 222 pass
223 223 finally:
224 224 os._exit(ret & 255)
225 225 pids.add(pid)
226 226 selector = selectors.DefaultSelector()
227 227 for rfd, wfd in pipes:
228 228 os.close(wfd)
229 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
229 selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
230 230
231 231 def cleanup():
232 232 signal.signal(signal.SIGINT, oldhandler)
233 233 waitforworkers()
234 234 signal.signal(signal.SIGCHLD, oldchldhandler)
235 235 selector.close()
236 236 return problem[0]
237 237
238 238 try:
239 239 openpipes = len(pipes)
240 240 while openpipes > 0:
241 241 for key, events in selector.select():
242 242 try:
243 243 res = util.pickle.load(key.fileobj)
244 244 if hasretval and res[0]:
245 245 retval.update(res[1])
246 246 else:
247 247 yield res
248 248 except EOFError:
249 249 selector.unregister(key.fileobj)
250 250 key.fileobj.close()
251 251 openpipes -= 1
252 252 except IOError as e:
253 253 if e.errno == errno.EINTR:
254 254 continue
255 255 raise
256 256 except: # re-raises
257 257 killworkers()
258 258 cleanup()
259 259 raise
260 260 status = cleanup()
261 261 if status:
262 262 if status < 0:
263 263 os.kill(os.getpid(), -status)
264 264 sys.exit(status)
265 265 if hasretval:
266 266 yield True, retval
267 267
268 268
269 269 def _posixexitstatus(code):
270 270 '''convert a posix exit status into the same form returned by
271 271 os.spawnv
272 272
273 273 returns None if the process was stopped instead of exiting'''
274 274 if os.WIFEXITED(code):
275 275 return os.WEXITSTATUS(code)
276 276 elif os.WIFSIGNALED(code):
277 277 return -(os.WTERMSIG(code))
278 278
279 279
280 280 def _windowsworker(ui, func, staticargs, args, hasretval):
281 281 class Worker(threading.Thread):
282 282 def __init__(
283 283 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
284 284 ):
285 285 threading.Thread.__init__(self, *args, **kwargs)
286 286 self._taskqueue = taskqueue
287 287 self._resultqueue = resultqueue
288 288 self._func = func
289 289 self._staticargs = staticargs
290 290 self._interrupted = False
291 291 self.daemon = True
292 292 self.exception = None
293 293
294 294 def interrupt(self):
295 295 self._interrupted = True
296 296
297 297 def run(self):
298 298 try:
299 299 while not self._taskqueue.empty():
300 300 try:
301 301 args = self._taskqueue.get_nowait()
302 302 for res in self._func(*self._staticargs + (args,)):
303 303 self._resultqueue.put(res)
304 304 # threading doesn't provide a native way to
305 305 # interrupt execution. handle it manually at every
306 306 # iteration.
307 307 if self._interrupted:
308 308 return
309 309 except pycompat.queue.Empty:
310 310 break
311 311 except Exception as e:
312 312 # store the exception such that the main thread can resurface
313 313 # it as if the func was running without workers.
314 314 self.exception = e
315 315 raise
316 316
317 317 threads = []
318 318
319 319 def trykillworkers():
320 320 # Allow up to 1 second to clean worker threads nicely
321 321 cleanupend = time.time() + 1
322 322 for t in threads:
323 323 t.interrupt()
324 324 for t in threads:
325 325 remainingtime = cleanupend - time.time()
326 326 t.join(remainingtime)
327 327 if t.is_alive():
328 328 # pass over the workers joining failure. it is more
329 329 # important to surface the inital exception than the
330 330 # fact that one of workers may be processing a large
331 331 # task and does not get to handle the interruption.
332 332 ui.warn(
333 333 _(
334 334 b"failed to kill worker threads while "
335 335 b"handling an exception\n"
336 336 )
337 337 )
338 338 return
339 339
340 340 workers = _numworkers(ui)
341 341 resultqueue = pycompat.queue.Queue()
342 342 taskqueue = pycompat.queue.Queue()
343 343 retval = {}
344 344 # partition work to more pieces than workers to minimize the chance
345 345 # of uneven distribution of large tasks between the workers
346 346 for pargs in partition(args, workers * 20):
347 347 taskqueue.put(pargs)
348 348 for _i in range(workers):
349 349 t = Worker(taskqueue, resultqueue, func, staticargs)
350 350 threads.append(t)
351 351 t.start()
352 352 try:
353 353 while len(threads) > 0:
354 354 while not resultqueue.empty():
355 355 res = resultqueue.get()
356 356 if hasretval and res[0]:
357 357 retval.update(res[1])
358 358 else:
359 359 yield res
360 360 threads[0].join(0.05)
361 361 finishedthreads = [_t for _t in threads if not _t.is_alive()]
362 362 for t in finishedthreads:
363 363 if t.exception is not None:
364 364 raise t.exception
365 365 threads.remove(t)
366 366 except (Exception, KeyboardInterrupt): # re-raises
367 367 trykillworkers()
368 368 raise
369 369 while not resultqueue.empty():
370 370 res = resultqueue.get()
371 371 if hasretval and res[0]:
372 372 retval.update(res[1])
373 373 else:
374 374 yield res
375 375 if hasretval:
376 376 yield True, retval
377 377
378 378
379 379 if pycompat.iswindows:
380 380 _platformworker = _windowsworker
381 381 else:
382 382 _platformworker = _posixworker
383 383 _exitstatus = _posixexitstatus
384 384
385 385
386 386 def partition(lst, nslices):
387 387 '''partition a list into N slices of roughly equal size
388 388
389 389 The current strategy takes every Nth element from the input. If
390 390 we ever write workers that need to preserve grouping in input
391 391 we should consider allowing callers to specify a partition strategy.
392 392
393 393 mpm is not a fan of this partitioning strategy when files are involved.
394 394 In his words:
395 395
396 396 Single-threaded Mercurial makes a point of creating and visiting
397 397 files in a fixed order (alphabetical). When creating files in order,
398 398 a typical filesystem is likely to allocate them on nearby regions on
399 399 disk. Thus, when revisiting in the same order, locality is maximized
400 400 and various forms of OS and disk-level caching and read-ahead get a
401 401 chance to work.
402 402
403 403 This effect can be quite significant on spinning disks. I discovered it
404 404 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
405 405 Tarring a repo and copying it to another disk effectively randomized
406 406 the revlog ordering on disk by sorting the revlogs by hash and suddenly
407 407 performance of my kernel checkout benchmark dropped by ~10x because the
408 408 "working set" of sectors visited no longer fit in the drive's cache and
409 409 the workload switched from streaming to random I/O.
410 410
411 411 What we should really be doing is have workers read filenames from a
412 412 ordered queue. This preserves locality and also keeps any worker from
413 413 getting more than one file out of balance.
414 414 '''
415 415 for i in range(nslices):
416 416 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now