##// END OF EJS Templates
worker: do not swallow exception occurred in main process...
Yuya Nishihara -
r41024:03f7d082 stable
parent child Browse files
Show More
@@ -1,369 +1,369
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19 selectors.BaseSelector
19 selectors.BaseSelector
20 except ImportError:
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
21 from .thirdparty import selectors2 as selectors
22
22
23 from .i18n import _
23 from .i18n import _
24 from . import (
24 from . import (
25 encoding,
25 encoding,
26 error,
26 error,
27 pycompat,
27 pycompat,
28 scmutil,
28 scmutil,
29 util,
29 util,
30 )
30 )
31
31
32 def countcpus():
32 def countcpus():
33 '''try to count the number of CPUs on the system'''
33 '''try to count the number of CPUs on the system'''
34
34
35 # posix
35 # posix
36 try:
36 try:
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (AttributeError, ValueError):
40 except (AttributeError, ValueError):
41 pass
41 pass
42
42
43 # windows
43 # windows
44 try:
44 try:
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 if n > 0:
46 if n > 0:
47 return n
47 return n
48 except (KeyError, ValueError):
48 except (KeyError, ValueError):
49 pass
49 pass
50
50
51 return 1
51 return 1
52
52
53 def _numworkers(ui):
53 def _numworkers(ui):
54 s = ui.config('worker', 'numcpus')
54 s = ui.config('worker', 'numcpus')
55 if s:
55 if s:
56 try:
56 try:
57 n = int(s)
57 n = int(s)
58 if n >= 1:
58 if n >= 1:
59 return n
59 return n
60 except ValueError:
60 except ValueError:
61 raise error.Abort(_('number of cpus must be an integer'))
61 raise error.Abort(_('number of cpus must be an integer'))
62 return min(max(countcpus(), 4), 32)
62 return min(max(countcpus(), 4), 32)
63
63
64 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
65 _STARTUP_COST = 0.01
65 _STARTUP_COST = 0.01
66 # The Windows worker is thread based. If tasks are CPU bound, threads
66 # The Windows worker is thread based. If tasks are CPU bound, threads
67 # in the presence of the GIL result in excessive context switching and
67 # in the presence of the GIL result in excessive context switching and
68 # this overhead can slow down execution.
68 # this overhead can slow down execution.
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
70 else:
70 else:
71 _STARTUP_COST = 1e30
71 _STARTUP_COST = 1e30
72 _DISALLOW_THREAD_UNSAFE = False
72 _DISALLOW_THREAD_UNSAFE = False
73
73
74 def worthwhile(ui, costperop, nops, threadsafe=True):
74 def worthwhile(ui, costperop, nops, threadsafe=True):
75 '''try to determine whether the benefit of multiple processes can
75 '''try to determine whether the benefit of multiple processes can
76 outweigh the cost of starting them'''
76 outweigh the cost of starting them'''
77
77
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
79 return False
79 return False
80
80
81 linear = costperop * nops
81 linear = costperop * nops
82 workers = _numworkers(ui)
82 workers = _numworkers(ui)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 return benefit >= 0.15
84 return benefit >= 0.15
85
85
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
87 '''run a function, possibly in parallel in multiple worker
87 '''run a function, possibly in parallel in multiple worker
88 processes.
88 processes.
89
89
90 returns a progress iterator
90 returns a progress iterator
91
91
92 costperarg - cost of a single task
92 costperarg - cost of a single task
93
93
94 func - function to run
94 func - function to run
95
95
96 staticargs - arguments to pass to every invocation of the function
96 staticargs - arguments to pass to every invocation of the function
97
97
98 args - arguments to split into chunks, to pass to individual
98 args - arguments to split into chunks, to pass to individual
99 workers
99 workers
100
100
101 threadsafe - whether work items are thread safe and can be executed using
101 threadsafe - whether work items are thread safe and can be executed using
102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
103 release the GIL.
103 release the GIL.
104 '''
104 '''
105 enabled = ui.configbool('worker', 'enabled')
105 enabled = ui.configbool('worker', 'enabled')
106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
107 return _platformworker(ui, func, staticargs, args)
107 return _platformworker(ui, func, staticargs, args)
108 return func(*staticargs + (args,))
108 return func(*staticargs + (args,))
109
109
110 def _posixworker(ui, func, staticargs, args):
110 def _posixworker(ui, func, staticargs, args):
111 workers = _numworkers(ui)
111 workers = _numworkers(ui)
112 oldhandler = signal.getsignal(signal.SIGINT)
112 oldhandler = signal.getsignal(signal.SIGINT)
113 signal.signal(signal.SIGINT, signal.SIG_IGN)
113 signal.signal(signal.SIGINT, signal.SIG_IGN)
114 pids, problem = set(), [0]
114 pids, problem = set(), [0]
115 def killworkers():
115 def killworkers():
116 # unregister SIGCHLD handler as all children will be killed. This
116 # unregister SIGCHLD handler as all children will be killed. This
117 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
117 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
118 # could be updated while iterating, which would cause inconsistency.
118 # could be updated while iterating, which would cause inconsistency.
119 signal.signal(signal.SIGCHLD, oldchldhandler)
119 signal.signal(signal.SIGCHLD, oldchldhandler)
120 # if one worker bails, there's no good reason to wait for the rest
120 # if one worker bails, there's no good reason to wait for the rest
121 for p in pids:
121 for p in pids:
122 try:
122 try:
123 os.kill(p, signal.SIGTERM)
123 os.kill(p, signal.SIGTERM)
124 except OSError as err:
124 except OSError as err:
125 if err.errno != errno.ESRCH:
125 if err.errno != errno.ESRCH:
126 raise
126 raise
127 def waitforworkers(blocking=True):
127 def waitforworkers(blocking=True):
128 for pid in pids.copy():
128 for pid in pids.copy():
129 p = st = 0
129 p = st = 0
130 while True:
130 while True:
131 try:
131 try:
132 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
132 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
133 break
133 break
134 except OSError as e:
134 except OSError as e:
135 if e.errno == errno.EINTR:
135 if e.errno == errno.EINTR:
136 continue
136 continue
137 elif e.errno == errno.ECHILD:
137 elif e.errno == errno.ECHILD:
138 # child would already be reaped, but pids yet been
138 # child would already be reaped, but pids yet been
139 # updated (maybe interrupted just after waitpid)
139 # updated (maybe interrupted just after waitpid)
140 pids.discard(pid)
140 pids.discard(pid)
141 break
141 break
142 else:
142 else:
143 raise
143 raise
144 if not p:
144 if not p:
145 # skip subsequent steps, because child process should
145 # skip subsequent steps, because child process should
146 # be still running in this case
146 # be still running in this case
147 continue
147 continue
148 pids.discard(p)
148 pids.discard(p)
149 st = _exitstatus(st)
149 st = _exitstatus(st)
150 if st and not problem[0]:
150 if st and not problem[0]:
151 problem[0] = st
151 problem[0] = st
152 def sigchldhandler(signum, frame):
152 def sigchldhandler(signum, frame):
153 waitforworkers(blocking=False)
153 waitforworkers(blocking=False)
154 if problem[0]:
154 if problem[0]:
155 killworkers()
155 killworkers()
156 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
156 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
157 ui.flush()
157 ui.flush()
158 parentpid = os.getpid()
158 parentpid = os.getpid()
159 pipes = []
159 pipes = []
160 for pargs in partition(args, workers):
160 for pargs in partition(args, workers):
161 # Every worker gets its own pipe to send results on, so we don't have to
161 # Every worker gets its own pipe to send results on, so we don't have to
162 # implement atomic writes larger than PIPE_BUF. Each forked process has
162 # implement atomic writes larger than PIPE_BUF. Each forked process has
163 # its own pipe's descriptors in the local variables, and the parent
163 # its own pipe's descriptors in the local variables, and the parent
164 # process has the full list of pipe descriptors (and it doesn't really
164 # process has the full list of pipe descriptors (and it doesn't really
165 # care what order they're in).
165 # care what order they're in).
166 rfd, wfd = os.pipe()
166 rfd, wfd = os.pipe()
167 pipes.append((rfd, wfd))
167 pipes.append((rfd, wfd))
168 # make sure we use os._exit in all worker code paths. otherwise the
168 # make sure we use os._exit in all worker code paths. otherwise the
169 # worker may do some clean-ups which could cause surprises like
169 # worker may do some clean-ups which could cause surprises like
170 # deadlock. see sshpeer.cleanup for example.
170 # deadlock. see sshpeer.cleanup for example.
171 # override error handling *before* fork. this is necessary because
171 # override error handling *before* fork. this is necessary because
172 # exception (signal) may arrive after fork, before "pid =" assignment
172 # exception (signal) may arrive after fork, before "pid =" assignment
173 # completes, and other exception handler (dispatch.py) can lead to
173 # completes, and other exception handler (dispatch.py) can lead to
174 # unexpected code path without os._exit.
174 # unexpected code path without os._exit.
175 ret = -1
175 ret = -1
176 try:
176 try:
177 pid = os.fork()
177 pid = os.fork()
178 if pid == 0:
178 if pid == 0:
179 signal.signal(signal.SIGINT, oldhandler)
179 signal.signal(signal.SIGINT, oldhandler)
180 signal.signal(signal.SIGCHLD, oldchldhandler)
180 signal.signal(signal.SIGCHLD, oldchldhandler)
181
181
182 def workerfunc():
182 def workerfunc():
183 for r, w in pipes[:-1]:
183 for r, w in pipes[:-1]:
184 os.close(r)
184 os.close(r)
185 os.close(w)
185 os.close(w)
186 os.close(rfd)
186 os.close(rfd)
187 for result in func(*(staticargs + (pargs,))):
187 for result in func(*(staticargs + (pargs,))):
188 os.write(wfd, util.pickle.dumps(result))
188 os.write(wfd, util.pickle.dumps(result))
189 return 0
189 return 0
190
190
191 ret = scmutil.callcatch(ui, workerfunc)
191 ret = scmutil.callcatch(ui, workerfunc)
192 except: # parent re-raises, child never returns
192 except: # parent re-raises, child never returns
193 if os.getpid() == parentpid:
193 if os.getpid() == parentpid:
194 raise
194 raise
195 exctype = sys.exc_info()[0]
195 exctype = sys.exc_info()[0]
196 force = not issubclass(exctype, KeyboardInterrupt)
196 force = not issubclass(exctype, KeyboardInterrupt)
197 ui.traceback(force=force)
197 ui.traceback(force=force)
198 finally:
198 finally:
199 if os.getpid() != parentpid:
199 if os.getpid() != parentpid:
200 try:
200 try:
201 ui.flush()
201 ui.flush()
202 except: # never returns, no re-raises
202 except: # never returns, no re-raises
203 pass
203 pass
204 finally:
204 finally:
205 os._exit(ret & 255)
205 os._exit(ret & 255)
206 pids.add(pid)
206 pids.add(pid)
207 selector = selectors.DefaultSelector()
207 selector = selectors.DefaultSelector()
208 for rfd, wfd in pipes:
208 for rfd, wfd in pipes:
209 os.close(wfd)
209 os.close(wfd)
210 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
210 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
211 def cleanup():
211 def cleanup():
212 signal.signal(signal.SIGINT, oldhandler)
212 signal.signal(signal.SIGINT, oldhandler)
213 waitforworkers()
213 waitforworkers()
214 signal.signal(signal.SIGCHLD, oldchldhandler)
214 signal.signal(signal.SIGCHLD, oldchldhandler)
215 selector.close()
215 selector.close()
216 status = problem[0]
216 return problem[0]
217 if status:
218 if status < 0:
219 os.kill(os.getpid(), -status)
220 sys.exit(status)
221 try:
217 try:
222 openpipes = len(pipes)
218 openpipes = len(pipes)
223 while openpipes > 0:
219 while openpipes > 0:
224 for key, events in selector.select():
220 for key, events in selector.select():
225 try:
221 try:
226 yield util.pickle.load(key.fileobj)
222 yield util.pickle.load(key.fileobj)
227 except EOFError:
223 except EOFError:
228 selector.unregister(key.fileobj)
224 selector.unregister(key.fileobj)
229 key.fileobj.close()
225 key.fileobj.close()
230 openpipes -= 1
226 openpipes -= 1
231 except IOError as e:
227 except IOError as e:
232 if e.errno == errno.EINTR:
228 if e.errno == errno.EINTR:
233 continue
229 continue
234 raise
230 raise
235 except: # re-raises
231 except: # re-raises
236 killworkers()
232 killworkers()
237 cleanup()
233 cleanup()
238 raise
234 raise
239 cleanup()
235 status = cleanup()
236 if status:
237 if status < 0:
238 os.kill(os.getpid(), -status)
239 sys.exit(status)
240
240
241 def _posixexitstatus(code):
241 def _posixexitstatus(code):
242 '''convert a posix exit status into the same form returned by
242 '''convert a posix exit status into the same form returned by
243 os.spawnv
243 os.spawnv
244
244
245 returns None if the process was stopped instead of exiting'''
245 returns None if the process was stopped instead of exiting'''
246 if os.WIFEXITED(code):
246 if os.WIFEXITED(code):
247 return os.WEXITSTATUS(code)
247 return os.WEXITSTATUS(code)
248 elif os.WIFSIGNALED(code):
248 elif os.WIFSIGNALED(code):
249 return -os.WTERMSIG(code)
249 return -os.WTERMSIG(code)
250
250
251 def _windowsworker(ui, func, staticargs, args):
251 def _windowsworker(ui, func, staticargs, args):
252 class Worker(threading.Thread):
252 class Worker(threading.Thread):
253 def __init__(self, taskqueue, resultqueue, func, staticargs,
253 def __init__(self, taskqueue, resultqueue, func, staticargs,
254 group=None, target=None, name=None, verbose=None):
254 group=None, target=None, name=None, verbose=None):
255 threading.Thread.__init__(self, group=group, target=target,
255 threading.Thread.__init__(self, group=group, target=target,
256 name=name, verbose=verbose)
256 name=name, verbose=verbose)
257 self._taskqueue = taskqueue
257 self._taskqueue = taskqueue
258 self._resultqueue = resultqueue
258 self._resultqueue = resultqueue
259 self._func = func
259 self._func = func
260 self._staticargs = staticargs
260 self._staticargs = staticargs
261 self._interrupted = False
261 self._interrupted = False
262 self.daemon = True
262 self.daemon = True
263 self.exception = None
263 self.exception = None
264
264
265 def interrupt(self):
265 def interrupt(self):
266 self._interrupted = True
266 self._interrupted = True
267
267
268 def run(self):
268 def run(self):
269 try:
269 try:
270 while not self._taskqueue.empty():
270 while not self._taskqueue.empty():
271 try:
271 try:
272 args = self._taskqueue.get_nowait()
272 args = self._taskqueue.get_nowait()
273 for res in self._func(*self._staticargs + (args,)):
273 for res in self._func(*self._staticargs + (args,)):
274 self._resultqueue.put(res)
274 self._resultqueue.put(res)
275 # threading doesn't provide a native way to
275 # threading doesn't provide a native way to
276 # interrupt execution. handle it manually at every
276 # interrupt execution. handle it manually at every
277 # iteration.
277 # iteration.
278 if self._interrupted:
278 if self._interrupted:
279 return
279 return
280 except pycompat.queue.Empty:
280 except pycompat.queue.Empty:
281 break
281 break
282 except Exception as e:
282 except Exception as e:
283 # store the exception such that the main thread can resurface
283 # store the exception such that the main thread can resurface
284 # it as if the func was running without workers.
284 # it as if the func was running without workers.
285 self.exception = e
285 self.exception = e
286 raise
286 raise
287
287
288 threads = []
288 threads = []
289 def trykillworkers():
289 def trykillworkers():
290 # Allow up to 1 second to clean worker threads nicely
290 # Allow up to 1 second to clean worker threads nicely
291 cleanupend = time.time() + 1
291 cleanupend = time.time() + 1
292 for t in threads:
292 for t in threads:
293 t.interrupt()
293 t.interrupt()
294 for t in threads:
294 for t in threads:
295 remainingtime = cleanupend - time.time()
295 remainingtime = cleanupend - time.time()
296 t.join(remainingtime)
296 t.join(remainingtime)
297 if t.is_alive():
297 if t.is_alive():
298 # pass over the workers joining failure. it is more
298 # pass over the workers joining failure. it is more
299 # important to surface the inital exception than the
299 # important to surface the inital exception than the
300 # fact that one of workers may be processing a large
300 # fact that one of workers may be processing a large
301 # task and does not get to handle the interruption.
301 # task and does not get to handle the interruption.
302 ui.warn(_("failed to kill worker threads while "
302 ui.warn(_("failed to kill worker threads while "
303 "handling an exception\n"))
303 "handling an exception\n"))
304 return
304 return
305
305
306 workers = _numworkers(ui)
306 workers = _numworkers(ui)
307 resultqueue = pycompat.queue.Queue()
307 resultqueue = pycompat.queue.Queue()
308 taskqueue = pycompat.queue.Queue()
308 taskqueue = pycompat.queue.Queue()
309 # partition work to more pieces than workers to minimize the chance
309 # partition work to more pieces than workers to minimize the chance
310 # of uneven distribution of large tasks between the workers
310 # of uneven distribution of large tasks between the workers
311 for pargs in partition(args, workers * 20):
311 for pargs in partition(args, workers * 20):
312 taskqueue.put(pargs)
312 taskqueue.put(pargs)
313 for _i in range(workers):
313 for _i in range(workers):
314 t = Worker(taskqueue, resultqueue, func, staticargs)
314 t = Worker(taskqueue, resultqueue, func, staticargs)
315 threads.append(t)
315 threads.append(t)
316 t.start()
316 t.start()
317 try:
317 try:
318 while len(threads) > 0:
318 while len(threads) > 0:
319 while not resultqueue.empty():
319 while not resultqueue.empty():
320 yield resultqueue.get()
320 yield resultqueue.get()
321 threads[0].join(0.05)
321 threads[0].join(0.05)
322 finishedthreads = [_t for _t in threads if not _t.is_alive()]
322 finishedthreads = [_t for _t in threads if not _t.is_alive()]
323 for t in finishedthreads:
323 for t in finishedthreads:
324 if t.exception is not None:
324 if t.exception is not None:
325 raise t.exception
325 raise t.exception
326 threads.remove(t)
326 threads.remove(t)
327 except (Exception, KeyboardInterrupt): # re-raises
327 except (Exception, KeyboardInterrupt): # re-raises
328 trykillworkers()
328 trykillworkers()
329 raise
329 raise
330 while not resultqueue.empty():
330 while not resultqueue.empty():
331 yield resultqueue.get()
331 yield resultqueue.get()
332
332
333 if pycompat.iswindows:
333 if pycompat.iswindows:
334 _platformworker = _windowsworker
334 _platformworker = _windowsworker
335 else:
335 else:
336 _platformworker = _posixworker
336 _platformworker = _posixworker
337 _exitstatus = _posixexitstatus
337 _exitstatus = _posixexitstatus
338
338
339 def partition(lst, nslices):
339 def partition(lst, nslices):
340 '''partition a list into N slices of roughly equal size
340 '''partition a list into N slices of roughly equal size
341
341
342 The current strategy takes every Nth element from the input. If
342 The current strategy takes every Nth element from the input. If
343 we ever write workers that need to preserve grouping in input
343 we ever write workers that need to preserve grouping in input
344 we should consider allowing callers to specify a partition strategy.
344 we should consider allowing callers to specify a partition strategy.
345
345
346 mpm is not a fan of this partitioning strategy when files are involved.
346 mpm is not a fan of this partitioning strategy when files are involved.
347 In his words:
347 In his words:
348
348
349 Single-threaded Mercurial makes a point of creating and visiting
349 Single-threaded Mercurial makes a point of creating and visiting
350 files in a fixed order (alphabetical). When creating files in order,
350 files in a fixed order (alphabetical). When creating files in order,
351 a typical filesystem is likely to allocate them on nearby regions on
351 a typical filesystem is likely to allocate them on nearby regions on
352 disk. Thus, when revisiting in the same order, locality is maximized
352 disk. Thus, when revisiting in the same order, locality is maximized
353 and various forms of OS and disk-level caching and read-ahead get a
353 and various forms of OS and disk-level caching and read-ahead get a
354 chance to work.
354 chance to work.
355
355
356 This effect can be quite significant on spinning disks. I discovered it
356 This effect can be quite significant on spinning disks. I discovered it
357 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
357 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
358 Tarring a repo and copying it to another disk effectively randomized
358 Tarring a repo and copying it to another disk effectively randomized
359 the revlog ordering on disk by sorting the revlogs by hash and suddenly
359 the revlog ordering on disk by sorting the revlogs by hash and suddenly
360 performance of my kernel checkout benchmark dropped by ~10x because the
360 performance of my kernel checkout benchmark dropped by ~10x because the
361 "working set" of sectors visited no longer fit in the drive's cache and
361 "working set" of sectors visited no longer fit in the drive's cache and
362 the workload switched from streaming to random I/O.
362 the workload switched from streaming to random I/O.
363
363
364 What we should really be doing is have workers read filenames from a
364 What we should really be doing is have workers read filenames from a
365 ordered queue. This preserves locality and also keeps any worker from
365 ordered queue. This preserves locality and also keeps any worker from
366 getting more than one file out of balance.
366 getting more than one file out of balance.
367 '''
367 '''
368 for i in range(nslices):
368 for i in range(nslices):
369 yield lst[i::nslices]
369 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now