##// END OF EJS Templates
worker: do not swallow exception occurred in main process...
Yuya Nishihara -
r41024:03f7d082 stable
parent child Browse files
Show More
@@ -1,369 +1,369
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19 selectors.BaseSelector
20 20 except ImportError:
21 21 from .thirdparty import selectors2 as selectors
22 22
23 23 from .i18n import _
24 24 from . import (
25 25 encoding,
26 26 error,
27 27 pycompat,
28 28 scmutil,
29 29 util,
30 30 )
31 31
32 32 def countcpus():
33 33 '''try to count the number of CPUs on the system'''
34 34
35 35 # posix
36 36 try:
37 37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 38 if n > 0:
39 39 return n
40 40 except (AttributeError, ValueError):
41 41 pass
42 42
43 43 # windows
44 44 try:
45 45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 46 if n > 0:
47 47 return n
48 48 except (KeyError, ValueError):
49 49 pass
50 50
51 51 return 1
52 52
53 53 def _numworkers(ui):
54 54 s = ui.config('worker', 'numcpus')
55 55 if s:
56 56 try:
57 57 n = int(s)
58 58 if n >= 1:
59 59 return n
60 60 except ValueError:
61 61 raise error.Abort(_('number of cpus must be an integer'))
62 62 return min(max(countcpus(), 4), 32)
63 63
64 64 if pycompat.isposix or pycompat.iswindows:
65 65 _STARTUP_COST = 0.01
66 66 # The Windows worker is thread based. If tasks are CPU bound, threads
67 67 # in the presence of the GIL result in excessive context switching and
68 68 # this overhead can slow down execution.
69 69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
70 70 else:
71 71 _STARTUP_COST = 1e30
72 72 _DISALLOW_THREAD_UNSAFE = False
73 73
74 74 def worthwhile(ui, costperop, nops, threadsafe=True):
75 75 '''try to determine whether the benefit of multiple processes can
76 76 outweigh the cost of starting them'''
77 77
78 78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
79 79 return False
80 80
81 81 linear = costperop * nops
82 82 workers = _numworkers(ui)
83 83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 84 return benefit >= 0.15
85 85
86 86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
87 87 '''run a function, possibly in parallel in multiple worker
88 88 processes.
89 89
90 90 returns a progress iterator
91 91
92 92 costperarg - cost of a single task
93 93
94 94 func - function to run
95 95
96 96 staticargs - arguments to pass to every invocation of the function
97 97
98 98 args - arguments to split into chunks, to pass to individual
99 99 workers
100 100
101 101 threadsafe - whether work items are thread safe and can be executed using
102 102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
103 103 release the GIL.
104 104 '''
105 105 enabled = ui.configbool('worker', 'enabled')
106 106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
107 107 return _platformworker(ui, func, staticargs, args)
108 108 return func(*staticargs + (args,))
109 109
110 110 def _posixworker(ui, func, staticargs, args):
111 111 workers = _numworkers(ui)
112 112 oldhandler = signal.getsignal(signal.SIGINT)
113 113 signal.signal(signal.SIGINT, signal.SIG_IGN)
114 114 pids, problem = set(), [0]
115 115 def killworkers():
116 116 # unregister SIGCHLD handler as all children will be killed. This
117 117 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
118 118 # could be updated while iterating, which would cause inconsistency.
119 119 signal.signal(signal.SIGCHLD, oldchldhandler)
120 120 # if one worker bails, there's no good reason to wait for the rest
121 121 for p in pids:
122 122 try:
123 123 os.kill(p, signal.SIGTERM)
124 124 except OSError as err:
125 125 if err.errno != errno.ESRCH:
126 126 raise
127 127 def waitforworkers(blocking=True):
128 128 for pid in pids.copy():
129 129 p = st = 0
130 130 while True:
131 131 try:
132 132 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
133 133 break
134 134 except OSError as e:
135 135 if e.errno == errno.EINTR:
136 136 continue
137 137 elif e.errno == errno.ECHILD:
138 138 # child would already be reaped, but pids yet been
139 139 # updated (maybe interrupted just after waitpid)
140 140 pids.discard(pid)
141 141 break
142 142 else:
143 143 raise
144 144 if not p:
145 145 # skip subsequent steps, because child process should
146 146 # be still running in this case
147 147 continue
148 148 pids.discard(p)
149 149 st = _exitstatus(st)
150 150 if st and not problem[0]:
151 151 problem[0] = st
152 152 def sigchldhandler(signum, frame):
153 153 waitforworkers(blocking=False)
154 154 if problem[0]:
155 155 killworkers()
156 156 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
157 157 ui.flush()
158 158 parentpid = os.getpid()
159 159 pipes = []
160 160 for pargs in partition(args, workers):
161 161 # Every worker gets its own pipe to send results on, so we don't have to
162 162 # implement atomic writes larger than PIPE_BUF. Each forked process has
163 163 # its own pipe's descriptors in the local variables, and the parent
164 164 # process has the full list of pipe descriptors (and it doesn't really
165 165 # care what order they're in).
166 166 rfd, wfd = os.pipe()
167 167 pipes.append((rfd, wfd))
168 168 # make sure we use os._exit in all worker code paths. otherwise the
169 169 # worker may do some clean-ups which could cause surprises like
170 170 # deadlock. see sshpeer.cleanup for example.
171 171 # override error handling *before* fork. this is necessary because
172 172 # exception (signal) may arrive after fork, before "pid =" assignment
173 173 # completes, and other exception handler (dispatch.py) can lead to
174 174 # unexpected code path without os._exit.
175 175 ret = -1
176 176 try:
177 177 pid = os.fork()
178 178 if pid == 0:
179 179 signal.signal(signal.SIGINT, oldhandler)
180 180 signal.signal(signal.SIGCHLD, oldchldhandler)
181 181
182 182 def workerfunc():
183 183 for r, w in pipes[:-1]:
184 184 os.close(r)
185 185 os.close(w)
186 186 os.close(rfd)
187 187 for result in func(*(staticargs + (pargs,))):
188 188 os.write(wfd, util.pickle.dumps(result))
189 189 return 0
190 190
191 191 ret = scmutil.callcatch(ui, workerfunc)
192 192 except: # parent re-raises, child never returns
193 193 if os.getpid() == parentpid:
194 194 raise
195 195 exctype = sys.exc_info()[0]
196 196 force = not issubclass(exctype, KeyboardInterrupt)
197 197 ui.traceback(force=force)
198 198 finally:
199 199 if os.getpid() != parentpid:
200 200 try:
201 201 ui.flush()
202 202 except: # never returns, no re-raises
203 203 pass
204 204 finally:
205 205 os._exit(ret & 255)
206 206 pids.add(pid)
207 207 selector = selectors.DefaultSelector()
208 208 for rfd, wfd in pipes:
209 209 os.close(wfd)
210 210 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
211 211 def cleanup():
212 212 signal.signal(signal.SIGINT, oldhandler)
213 213 waitforworkers()
214 214 signal.signal(signal.SIGCHLD, oldchldhandler)
215 215 selector.close()
216 status = problem[0]
217 if status:
218 if status < 0:
219 os.kill(os.getpid(), -status)
220 sys.exit(status)
216 return problem[0]
221 217 try:
222 218 openpipes = len(pipes)
223 219 while openpipes > 0:
224 220 for key, events in selector.select():
225 221 try:
226 222 yield util.pickle.load(key.fileobj)
227 223 except EOFError:
228 224 selector.unregister(key.fileobj)
229 225 key.fileobj.close()
230 226 openpipes -= 1
231 227 except IOError as e:
232 228 if e.errno == errno.EINTR:
233 229 continue
234 230 raise
235 231 except: # re-raises
236 232 killworkers()
237 233 cleanup()
238 234 raise
239 cleanup()
235 status = cleanup()
236 if status:
237 if status < 0:
238 os.kill(os.getpid(), -status)
239 sys.exit(status)
240 240
241 241 def _posixexitstatus(code):
242 242 '''convert a posix exit status into the same form returned by
243 243 os.spawnv
244 244
245 245 returns None if the process was stopped instead of exiting'''
246 246 if os.WIFEXITED(code):
247 247 return os.WEXITSTATUS(code)
248 248 elif os.WIFSIGNALED(code):
249 249 return -os.WTERMSIG(code)
250 250
251 251 def _windowsworker(ui, func, staticargs, args):
252 252 class Worker(threading.Thread):
253 253 def __init__(self, taskqueue, resultqueue, func, staticargs,
254 254 group=None, target=None, name=None, verbose=None):
255 255 threading.Thread.__init__(self, group=group, target=target,
256 256 name=name, verbose=verbose)
257 257 self._taskqueue = taskqueue
258 258 self._resultqueue = resultqueue
259 259 self._func = func
260 260 self._staticargs = staticargs
261 261 self._interrupted = False
262 262 self.daemon = True
263 263 self.exception = None
264 264
265 265 def interrupt(self):
266 266 self._interrupted = True
267 267
268 268 def run(self):
269 269 try:
270 270 while not self._taskqueue.empty():
271 271 try:
272 272 args = self._taskqueue.get_nowait()
273 273 for res in self._func(*self._staticargs + (args,)):
274 274 self._resultqueue.put(res)
275 275 # threading doesn't provide a native way to
276 276 # interrupt execution. handle it manually at every
277 277 # iteration.
278 278 if self._interrupted:
279 279 return
280 280 except pycompat.queue.Empty:
281 281 break
282 282 except Exception as e:
283 283 # store the exception such that the main thread can resurface
284 284 # it as if the func was running without workers.
285 285 self.exception = e
286 286 raise
287 287
288 288 threads = []
289 289 def trykillworkers():
290 290 # Allow up to 1 second to clean worker threads nicely
291 291 cleanupend = time.time() + 1
292 292 for t in threads:
293 293 t.interrupt()
294 294 for t in threads:
295 295 remainingtime = cleanupend - time.time()
296 296 t.join(remainingtime)
297 297 if t.is_alive():
298 298 # pass over the workers joining failure. it is more
299 299 # important to surface the inital exception than the
300 300 # fact that one of workers may be processing a large
301 301 # task and does not get to handle the interruption.
302 302 ui.warn(_("failed to kill worker threads while "
303 303 "handling an exception\n"))
304 304 return
305 305
306 306 workers = _numworkers(ui)
307 307 resultqueue = pycompat.queue.Queue()
308 308 taskqueue = pycompat.queue.Queue()
309 309 # partition work to more pieces than workers to minimize the chance
310 310 # of uneven distribution of large tasks between the workers
311 311 for pargs in partition(args, workers * 20):
312 312 taskqueue.put(pargs)
313 313 for _i in range(workers):
314 314 t = Worker(taskqueue, resultqueue, func, staticargs)
315 315 threads.append(t)
316 316 t.start()
317 317 try:
318 318 while len(threads) > 0:
319 319 while not resultqueue.empty():
320 320 yield resultqueue.get()
321 321 threads[0].join(0.05)
322 322 finishedthreads = [_t for _t in threads if not _t.is_alive()]
323 323 for t in finishedthreads:
324 324 if t.exception is not None:
325 325 raise t.exception
326 326 threads.remove(t)
327 327 except (Exception, KeyboardInterrupt): # re-raises
328 328 trykillworkers()
329 329 raise
330 330 while not resultqueue.empty():
331 331 yield resultqueue.get()
332 332
333 333 if pycompat.iswindows:
334 334 _platformworker = _windowsworker
335 335 else:
336 336 _platformworker = _posixworker
337 337 _exitstatus = _posixexitstatus
338 338
339 339 def partition(lst, nslices):
340 340 '''partition a list into N slices of roughly equal size
341 341
342 342 The current strategy takes every Nth element from the input. If
343 343 we ever write workers that need to preserve grouping in input
344 344 we should consider allowing callers to specify a partition strategy.
345 345
346 346 mpm is not a fan of this partitioning strategy when files are involved.
347 347 In his words:
348 348
349 349 Single-threaded Mercurial makes a point of creating and visiting
350 350 files in a fixed order (alphabetical). When creating files in order,
351 351 a typical filesystem is likely to allocate them on nearby regions on
352 352 disk. Thus, when revisiting in the same order, locality is maximized
353 353 and various forms of OS and disk-level caching and read-ahead get a
354 354 chance to work.
355 355
356 356 This effect can be quite significant on spinning disks. I discovered it
357 357 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
358 358 Tarring a repo and copying it to another disk effectively randomized
359 359 the revlog ordering on disk by sorting the revlogs by hash and suddenly
360 360 performance of my kernel checkout benchmark dropped by ~10x because the
361 361 "working set" of sectors visited no longer fit in the drive's cache and
362 362 the workload switched from streaming to random I/O.
363 363
364 364 What we should really be doing is have workers read filenames from a
365 365 ordered queue. This preserves locality and also keeps any worker from
366 366 getting more than one file out of balance.
367 367 '''
368 368 for i in range(nslices):
369 369 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now