##// END OF EJS Templates
worker: support parallelization of functions with return values...
Valentin Gatien-Baron -
r42655:5ca136bb default
parent child Browse files
Show More
@@ -1,368 +1,393
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19 selectors.BaseSelector
20 20 except ImportError:
21 21 from .thirdparty import selectors2 as selectors
22 22
23 23 from .i18n import _
24 24 from . import (
25 25 encoding,
26 26 error,
27 27 pycompat,
28 28 scmutil,
29 29 util,
30 30 )
31 31
32 32 def countcpus():
33 33 '''try to count the number of CPUs on the system'''
34 34
35 35 # posix
36 36 try:
37 37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 38 if n > 0:
39 39 return n
40 40 except (AttributeError, ValueError):
41 41 pass
42 42
43 43 # windows
44 44 try:
45 45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 46 if n > 0:
47 47 return n
48 48 except (KeyError, ValueError):
49 49 pass
50 50
51 51 return 1
52 52
53 53 def _numworkers(ui):
54 54 s = ui.config('worker', 'numcpus')
55 55 if s:
56 56 try:
57 57 n = int(s)
58 58 if n >= 1:
59 59 return n
60 60 except ValueError:
61 61 raise error.Abort(_('number of cpus must be an integer'))
62 62 return min(max(countcpus(), 4), 32)
63 63
64 64 if pycompat.isposix or pycompat.iswindows:
65 65 _STARTUP_COST = 0.01
66 66 # The Windows worker is thread based. If tasks are CPU bound, threads
67 67 # in the presence of the GIL result in excessive context switching and
68 68 # this overhead can slow down execution.
69 69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
70 70 else:
71 71 _STARTUP_COST = 1e30
72 72 _DISALLOW_THREAD_UNSAFE = False
73 73
74 74 def worthwhile(ui, costperop, nops, threadsafe=True):
75 75 '''try to determine whether the benefit of multiple processes can
76 76 outweigh the cost of starting them'''
77 77
78 78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
79 79 return False
80 80
81 81 linear = costperop * nops
82 82 workers = _numworkers(ui)
83 83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 84 return benefit >= 0.15
85 85
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
86 def worker(ui, costperarg, func, staticargs, args, hasretval=False,
87 threadsafe=True):
87 88 '''run a function, possibly in parallel in multiple worker
88 89 processes.
89 90
90 91 returns a progress iterator
91 92
92 93 costperarg - cost of a single task
93 94
94 func - function to run
95 func - function to run. It is expected to return a progress iterator.
95 96
96 97 staticargs - arguments to pass to every invocation of the function
97 98
98 99 args - arguments to split into chunks, to pass to individual
99 100 workers
100 101
102 hasretval - when True, func and the current function return an progress
103 iterator then a list (encoded as an iterator that yield many (False, ..)
104 then a (True, list)). The resulting list is in the natural order.
105
101 106 threadsafe - whether work items are thread safe and can be executed using
102 107 a thread-based worker. Should be disabled for CPU heavy tasks that don't
103 108 release the GIL.
104 109 '''
105 110 enabled = ui.configbool('worker', 'enabled')
106 111 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
107 return _platformworker(ui, func, staticargs, args)
112 return _platformworker(ui, func, staticargs, args, hasretval)
108 113 return func(*staticargs + (args,))
109 114
110 def _posixworker(ui, func, staticargs, args):
115 def _posixworker(ui, func, staticargs, args, hasretval):
111 116 workers = _numworkers(ui)
112 117 oldhandler = signal.getsignal(signal.SIGINT)
113 118 signal.signal(signal.SIGINT, signal.SIG_IGN)
114 119 pids, problem = set(), [0]
115 120 def killworkers():
116 121 # unregister SIGCHLD handler as all children will be killed. This
117 122 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
118 123 # could be updated while iterating, which would cause inconsistency.
119 124 signal.signal(signal.SIGCHLD, oldchldhandler)
120 125 # if one worker bails, there's no good reason to wait for the rest
121 126 for p in pids:
122 127 try:
123 128 os.kill(p, signal.SIGTERM)
124 129 except OSError as err:
125 130 if err.errno != errno.ESRCH:
126 131 raise
127 132 def waitforworkers(blocking=True):
128 133 for pid in pids.copy():
129 134 p = st = 0
130 135 while True:
131 136 try:
132 137 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
133 138 break
134 139 except OSError as e:
135 140 if e.errno == errno.EINTR:
136 141 continue
137 142 elif e.errno == errno.ECHILD:
138 143 # child would already be reaped, but pids yet been
139 144 # updated (maybe interrupted just after waitpid)
140 145 pids.discard(pid)
141 146 break
142 147 else:
143 148 raise
144 149 if not p:
145 150 # skip subsequent steps, because child process should
146 151 # be still running in this case
147 152 continue
148 153 pids.discard(p)
149 154 st = _exitstatus(st)
150 155 if st and not problem[0]:
151 156 problem[0] = st
152 157 def sigchldhandler(signum, frame):
153 158 waitforworkers(blocking=False)
154 159 if problem[0]:
155 160 killworkers()
156 161 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
157 162 ui.flush()
158 163 parentpid = os.getpid()
159 164 pipes = []
160 for pargs in partition(args, workers):
165 retvals = []
166 for i, pargs in enumerate(partition(args, workers)):
161 167 # Every worker gets its own pipe to send results on, so we don't have to
162 168 # implement atomic writes larger than PIPE_BUF. Each forked process has
163 169 # its own pipe's descriptors in the local variables, and the parent
164 170 # process has the full list of pipe descriptors (and it doesn't really
165 171 # care what order they're in).
166 172 rfd, wfd = os.pipe()
167 173 pipes.append((rfd, wfd))
174 retvals.append(None)
168 175 # make sure we use os._exit in all worker code paths. otherwise the
169 176 # worker may do some clean-ups which could cause surprises like
170 177 # deadlock. see sshpeer.cleanup for example.
171 178 # override error handling *before* fork. this is necessary because
172 179 # exception (signal) may arrive after fork, before "pid =" assignment
173 180 # completes, and other exception handler (dispatch.py) can lead to
174 181 # unexpected code path without os._exit.
175 182 ret = -1
176 183 try:
177 184 pid = os.fork()
178 185 if pid == 0:
179 186 signal.signal(signal.SIGINT, oldhandler)
180 187 signal.signal(signal.SIGCHLD, oldchldhandler)
181 188
182 189 def workerfunc():
183 190 for r, w in pipes[:-1]:
184 191 os.close(r)
185 192 os.close(w)
186 193 os.close(rfd)
187 194 for result in func(*(staticargs + (pargs,))):
188 os.write(wfd, util.pickle.dumps(result))
195 os.write(wfd, util.pickle.dumps((i, result)))
189 196 return 0
190 197
191 198 ret = scmutil.callcatch(ui, workerfunc)
192 199 except: # parent re-raises, child never returns
193 200 if os.getpid() == parentpid:
194 201 raise
195 202 exctype = sys.exc_info()[0]
196 203 force = not issubclass(exctype, KeyboardInterrupt)
197 204 ui.traceback(force=force)
198 205 finally:
199 206 if os.getpid() != parentpid:
200 207 try:
201 208 ui.flush()
202 209 except: # never returns, no re-raises
203 210 pass
204 211 finally:
205 212 os._exit(ret & 255)
206 213 pids.add(pid)
207 214 selector = selectors.DefaultSelector()
208 215 for rfd, wfd in pipes:
209 216 os.close(wfd)
210 217 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
211 218 def cleanup():
212 219 signal.signal(signal.SIGINT, oldhandler)
213 220 waitforworkers()
214 221 signal.signal(signal.SIGCHLD, oldchldhandler)
215 222 selector.close()
216 223 return problem[0]
217 224 try:
218 225 openpipes = len(pipes)
219 226 while openpipes > 0:
220 227 for key, events in selector.select():
221 228 try:
222 yield util.pickle.load(key.fileobj)
229 i, res = util.pickle.load(key.fileobj)
230 if hasretval and res[0]:
231 retvals[i] = res[1]
232 else:
233 yield res
223 234 except EOFError:
224 235 selector.unregister(key.fileobj)
225 236 key.fileobj.close()
226 237 openpipes -= 1
227 238 except IOError as e:
228 239 if e.errno == errno.EINTR:
229 240 continue
230 241 raise
231 242 except: # re-raises
232 243 killworkers()
233 244 cleanup()
234 245 raise
235 246 status = cleanup()
236 247 if status:
237 248 if status < 0:
238 249 os.kill(os.getpid(), -status)
239 250 sys.exit(status)
251 if hasretval:
252 yield True, sum(retvals, [])
240 253
241 254 def _posixexitstatus(code):
242 255 '''convert a posix exit status into the same form returned by
243 256 os.spawnv
244 257
245 258 returns None if the process was stopped instead of exiting'''
246 259 if os.WIFEXITED(code):
247 260 return os.WEXITSTATUS(code)
248 261 elif os.WIFSIGNALED(code):
249 262 return -os.WTERMSIG(code)
250 263
251 def _windowsworker(ui, func, staticargs, args):
264 def _windowsworker(ui, func, staticargs, args, hasretval):
252 265 class Worker(threading.Thread):
253 266 def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
254 267 **kwargs):
255 268 threading.Thread.__init__(self, *args, **kwargs)
256 269 self._taskqueue = taskqueue
257 270 self._resultqueue = resultqueue
258 271 self._func = func
259 272 self._staticargs = staticargs
260 273 self._interrupted = False
261 274 self.daemon = True
262 275 self.exception = None
263 276
264 277 def interrupt(self):
265 278 self._interrupted = True
266 279
267 280 def run(self):
268 281 try:
269 282 while not self._taskqueue.empty():
270 283 try:
271 args = self._taskqueue.get_nowait()
284 i, args = self._taskqueue.get_nowait()
272 285 for res in self._func(*self._staticargs + (args,)):
273 self._resultqueue.put(res)
286 self._resultqueue.put((i, res))
274 287 # threading doesn't provide a native way to
275 288 # interrupt execution. handle it manually at every
276 289 # iteration.
277 290 if self._interrupted:
278 291 return
279 292 except pycompat.queue.Empty:
280 293 break
281 294 except Exception as e:
282 295 # store the exception such that the main thread can resurface
283 296 # it as if the func was running without workers.
284 297 self.exception = e
285 298 raise
286 299
287 300 threads = []
288 301 def trykillworkers():
289 302 # Allow up to 1 second to clean worker threads nicely
290 303 cleanupend = time.time() + 1
291 304 for t in threads:
292 305 t.interrupt()
293 306 for t in threads:
294 307 remainingtime = cleanupend - time.time()
295 308 t.join(remainingtime)
296 309 if t.is_alive():
297 310 # pass over the workers joining failure. it is more
298 311 # important to surface the inital exception than the
299 312 # fact that one of workers may be processing a large
300 313 # task and does not get to handle the interruption.
301 314 ui.warn(_("failed to kill worker threads while "
302 315 "handling an exception\n"))
303 316 return
304 317
305 318 workers = _numworkers(ui)
306 319 resultqueue = pycompat.queue.Queue()
307 320 taskqueue = pycompat.queue.Queue()
321 retvals = []
308 322 # partition work to more pieces than workers to minimize the chance
309 323 # of uneven distribution of large tasks between the workers
310 for pargs in partition(args, workers * 20):
324 for pargs in enumerate(partition(args, workers * 20)):
325 retvals.append(None)
311 326 taskqueue.put(pargs)
312 327 for _i in range(workers):
313 328 t = Worker(taskqueue, resultqueue, func, staticargs)
314 329 threads.append(t)
315 330 t.start()
316 331 try:
317 332 while len(threads) > 0:
318 333 while not resultqueue.empty():
319 yield resultqueue.get()
334 (i, res) = resultqueue.get()
335 if hasretval and res[0]:
336 retvals[i] = res[1]
337 else:
338 yield res
320 339 threads[0].join(0.05)
321 340 finishedthreads = [_t for _t in threads if not _t.is_alive()]
322 341 for t in finishedthreads:
323 342 if t.exception is not None:
324 343 raise t.exception
325 344 threads.remove(t)
326 345 except (Exception, KeyboardInterrupt): # re-raises
327 346 trykillworkers()
328 347 raise
329 348 while not resultqueue.empty():
330 yield resultqueue.get()
349 (i, res) = resultqueue.get()
350 if hasretval and res[0]:
351 retvals[i] = res[1]
352 else:
353 yield res
354 if hasretval:
355 yield True, sum(retvals, [])
331 356
332 357 if pycompat.iswindows:
333 358 _platformworker = _windowsworker
334 359 else:
335 360 _platformworker = _posixworker
336 361 _exitstatus = _posixexitstatus
337 362
338 363 def partition(lst, nslices):
339 364 '''partition a list into N slices of roughly equal size
340 365
341 366 The current strategy takes every Nth element from the input. If
342 367 we ever write workers that need to preserve grouping in input
343 368 we should consider allowing callers to specify a partition strategy.
344 369
345 370 mpm is not a fan of this partitioning strategy when files are involved.
346 371 In his words:
347 372
348 373 Single-threaded Mercurial makes a point of creating and visiting
349 374 files in a fixed order (alphabetical). When creating files in order,
350 375 a typical filesystem is likely to allocate them on nearby regions on
351 376 disk. Thus, when revisiting in the same order, locality is maximized
352 377 and various forms of OS and disk-level caching and read-ahead get a
353 378 chance to work.
354 379
355 380 This effect can be quite significant on spinning disks. I discovered it
356 381 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
357 382 Tarring a repo and copying it to another disk effectively randomized
358 383 the revlog ordering on disk by sorting the revlogs by hash and suddenly
359 384 performance of my kernel checkout benchmark dropped by ~10x because the
360 385 "working set" of sectors visited no longer fit in the drive's cache and
361 386 the workload switched from streaming to random I/O.
362 387
363 388 What we should really be doing is have workers read filenames from a
364 389 ordered queue. This preserves locality and also keeps any worker from
365 390 getting more than one file out of balance.
366 391 '''
367 392 for i in range(nslices):
368 393 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now