##// END OF EJS Templates
worker: handle interrupt on windows...
Wojciech Lis -
r35469:44fd4cfc @6 default
parent child Browse files
Show More
@@ -1,327 +1,327 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 from .i18n import _
17 from .i18n import _
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 pycompat,
21 pycompat,
22 scmutil,
22 scmutil,
23 util,
23 util,
24 )
24 )
25
25
26 def countcpus():
26 def countcpus():
27 '''try to count the number of CPUs on the system'''
27 '''try to count the number of CPUs on the system'''
28
28
29 # posix
29 # posix
30 try:
30 try:
31 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
31 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
32 if n > 0:
32 if n > 0:
33 return n
33 return n
34 except (AttributeError, ValueError):
34 except (AttributeError, ValueError):
35 pass
35 pass
36
36
37 # windows
37 # windows
38 try:
38 try:
39 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
39 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
40 if n > 0:
40 if n > 0:
41 return n
41 return n
42 except (KeyError, ValueError):
42 except (KeyError, ValueError):
43 pass
43 pass
44
44
45 return 1
45 return 1
46
46
47 def _numworkers(ui):
47 def _numworkers(ui):
48 s = ui.config('worker', 'numcpus')
48 s = ui.config('worker', 'numcpus')
49 if s:
49 if s:
50 try:
50 try:
51 n = int(s)
51 n = int(s)
52 if n >= 1:
52 if n >= 1:
53 return n
53 return n
54 except ValueError:
54 except ValueError:
55 raise error.Abort(_('number of cpus must be an integer'))
55 raise error.Abort(_('number of cpus must be an integer'))
56 return min(max(countcpus(), 4), 32)
56 return min(max(countcpus(), 4), 32)
57
57
58 if pycompat.isposix or pycompat.iswindows:
58 if pycompat.isposix or pycompat.iswindows:
59 _startupcost = 0.01
59 _startupcost = 0.01
60 else:
60 else:
61 _startupcost = 1e30
61 _startupcost = 1e30
62
62
63 def worthwhile(ui, costperop, nops):
63 def worthwhile(ui, costperop, nops):
64 '''try to determine whether the benefit of multiple processes can
64 '''try to determine whether the benefit of multiple processes can
65 outweigh the cost of starting them'''
65 outweigh the cost of starting them'''
66 linear = costperop * nops
66 linear = costperop * nops
67 workers = _numworkers(ui)
67 workers = _numworkers(ui)
68 benefit = linear - (_startupcost * workers + linear / workers)
68 benefit = linear - (_startupcost * workers + linear / workers)
69 return benefit >= 0.15
69 return benefit >= 0.15
70
70
71 def worker(ui, costperarg, func, staticargs, args):
71 def worker(ui, costperarg, func, staticargs, args):
72 '''run a function, possibly in parallel in multiple worker
72 '''run a function, possibly in parallel in multiple worker
73 processes.
73 processes.
74
74
75 returns a progress iterator
75 returns a progress iterator
76
76
77 costperarg - cost of a single task
77 costperarg - cost of a single task
78
78
79 func - function to run
79 func - function to run
80
80
81 staticargs - arguments to pass to every invocation of the function
81 staticargs - arguments to pass to every invocation of the function
82
82
83 args - arguments to split into chunks, to pass to individual
83 args - arguments to split into chunks, to pass to individual
84 workers
84 workers
85 '''
85 '''
86 enabled = ui.configbool('worker', 'enabled')
86 enabled = ui.configbool('worker', 'enabled')
87 if enabled and worthwhile(ui, costperarg, len(args)):
87 if enabled and worthwhile(ui, costperarg, len(args)):
88 return _platformworker(ui, func, staticargs, args)
88 return _platformworker(ui, func, staticargs, args)
89 return func(*staticargs + (args,))
89 return func(*staticargs + (args,))
90
90
91 def _posixworker(ui, func, staticargs, args):
91 def _posixworker(ui, func, staticargs, args):
92 rfd, wfd = os.pipe()
92 rfd, wfd = os.pipe()
93 workers = _numworkers(ui)
93 workers = _numworkers(ui)
94 oldhandler = signal.getsignal(signal.SIGINT)
94 oldhandler = signal.getsignal(signal.SIGINT)
95 signal.signal(signal.SIGINT, signal.SIG_IGN)
95 signal.signal(signal.SIGINT, signal.SIG_IGN)
96 pids, problem = set(), [0]
96 pids, problem = set(), [0]
97 def killworkers():
97 def killworkers():
98 # unregister SIGCHLD handler as all children will be killed. This
98 # unregister SIGCHLD handler as all children will be killed. This
99 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
99 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
100 # could be updated while iterating, which would cause inconsistency.
100 # could be updated while iterating, which would cause inconsistency.
101 signal.signal(signal.SIGCHLD, oldchldhandler)
101 signal.signal(signal.SIGCHLD, oldchldhandler)
102 # if one worker bails, there's no good reason to wait for the rest
102 # if one worker bails, there's no good reason to wait for the rest
103 for p in pids:
103 for p in pids:
104 try:
104 try:
105 os.kill(p, signal.SIGTERM)
105 os.kill(p, signal.SIGTERM)
106 except OSError as err:
106 except OSError as err:
107 if err.errno != errno.ESRCH:
107 if err.errno != errno.ESRCH:
108 raise
108 raise
109 def waitforworkers(blocking=True):
109 def waitforworkers(blocking=True):
110 for pid in pids.copy():
110 for pid in pids.copy():
111 p = st = 0
111 p = st = 0
112 while True:
112 while True:
113 try:
113 try:
114 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
114 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
115 break
115 break
116 except OSError as e:
116 except OSError as e:
117 if e.errno == errno.EINTR:
117 if e.errno == errno.EINTR:
118 continue
118 continue
119 elif e.errno == errno.ECHILD:
119 elif e.errno == errno.ECHILD:
120 # child would already be reaped, but pids yet been
120 # child would already be reaped, but pids yet been
121 # updated (maybe interrupted just after waitpid)
121 # updated (maybe interrupted just after waitpid)
122 pids.discard(pid)
122 pids.discard(pid)
123 break
123 break
124 else:
124 else:
125 raise
125 raise
126 if not p:
126 if not p:
127 # skip subsequent steps, because child process should
127 # skip subsequent steps, because child process should
128 # be still running in this case
128 # be still running in this case
129 continue
129 continue
130 pids.discard(p)
130 pids.discard(p)
131 st = _exitstatus(st)
131 st = _exitstatus(st)
132 if st and not problem[0]:
132 if st and not problem[0]:
133 problem[0] = st
133 problem[0] = st
134 def sigchldhandler(signum, frame):
134 def sigchldhandler(signum, frame):
135 waitforworkers(blocking=False)
135 waitforworkers(blocking=False)
136 if problem[0]:
136 if problem[0]:
137 killworkers()
137 killworkers()
138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
139 ui.flush()
139 ui.flush()
140 parentpid = os.getpid()
140 parentpid = os.getpid()
141 for pargs in partition(args, workers):
141 for pargs in partition(args, workers):
142 # make sure we use os._exit in all worker code paths. otherwise the
142 # make sure we use os._exit in all worker code paths. otherwise the
143 # worker may do some clean-ups which could cause surprises like
143 # worker may do some clean-ups which could cause surprises like
144 # deadlock. see sshpeer.cleanup for example.
144 # deadlock. see sshpeer.cleanup for example.
145 # override error handling *before* fork. this is necessary because
145 # override error handling *before* fork. this is necessary because
146 # exception (signal) may arrive after fork, before "pid =" assignment
146 # exception (signal) may arrive after fork, before "pid =" assignment
147 # completes, and other exception handler (dispatch.py) can lead to
147 # completes, and other exception handler (dispatch.py) can lead to
148 # unexpected code path without os._exit.
148 # unexpected code path without os._exit.
149 ret = -1
149 ret = -1
150 try:
150 try:
151 pid = os.fork()
151 pid = os.fork()
152 if pid == 0:
152 if pid == 0:
153 signal.signal(signal.SIGINT, oldhandler)
153 signal.signal(signal.SIGINT, oldhandler)
154 signal.signal(signal.SIGCHLD, oldchldhandler)
154 signal.signal(signal.SIGCHLD, oldchldhandler)
155
155
156 def workerfunc():
156 def workerfunc():
157 os.close(rfd)
157 os.close(rfd)
158 for i, item in func(*(staticargs + (pargs,))):
158 for i, item in func(*(staticargs + (pargs,))):
159 os.write(wfd, '%d %s\n' % (i, item))
159 os.write(wfd, '%d %s\n' % (i, item))
160 return 0
160 return 0
161
161
162 ret = scmutil.callcatch(ui, workerfunc)
162 ret = scmutil.callcatch(ui, workerfunc)
163 except: # parent re-raises, child never returns
163 except: # parent re-raises, child never returns
164 if os.getpid() == parentpid:
164 if os.getpid() == parentpid:
165 raise
165 raise
166 exctype = sys.exc_info()[0]
166 exctype = sys.exc_info()[0]
167 force = not issubclass(exctype, KeyboardInterrupt)
167 force = not issubclass(exctype, KeyboardInterrupt)
168 ui.traceback(force=force)
168 ui.traceback(force=force)
169 finally:
169 finally:
170 if os.getpid() != parentpid:
170 if os.getpid() != parentpid:
171 try:
171 try:
172 ui.flush()
172 ui.flush()
173 except: # never returns, no re-raises
173 except: # never returns, no re-raises
174 pass
174 pass
175 finally:
175 finally:
176 os._exit(ret & 255)
176 os._exit(ret & 255)
177 pids.add(pid)
177 pids.add(pid)
178 os.close(wfd)
178 os.close(wfd)
179 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
179 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
180 def cleanup():
180 def cleanup():
181 signal.signal(signal.SIGINT, oldhandler)
181 signal.signal(signal.SIGINT, oldhandler)
182 waitforworkers()
182 waitforworkers()
183 signal.signal(signal.SIGCHLD, oldchldhandler)
183 signal.signal(signal.SIGCHLD, oldchldhandler)
184 status = problem[0]
184 status = problem[0]
185 if status:
185 if status:
186 if status < 0:
186 if status < 0:
187 os.kill(os.getpid(), -status)
187 os.kill(os.getpid(), -status)
188 sys.exit(status)
188 sys.exit(status)
189 try:
189 try:
190 for line in util.iterfile(fp):
190 for line in util.iterfile(fp):
191 l = line.split(' ', 1)
191 l = line.split(' ', 1)
192 yield int(l[0]), l[1][:-1]
192 yield int(l[0]), l[1][:-1]
193 except: # re-raises
193 except: # re-raises
194 killworkers()
194 killworkers()
195 cleanup()
195 cleanup()
196 raise
196 raise
197 cleanup()
197 cleanup()
198
198
199 def _posixexitstatus(code):
199 def _posixexitstatus(code):
200 '''convert a posix exit status into the same form returned by
200 '''convert a posix exit status into the same form returned by
201 os.spawnv
201 os.spawnv
202
202
203 returns None if the process was stopped instead of exiting'''
203 returns None if the process was stopped instead of exiting'''
204 if os.WIFEXITED(code):
204 if os.WIFEXITED(code):
205 return os.WEXITSTATUS(code)
205 return os.WEXITSTATUS(code)
206 elif os.WIFSIGNALED(code):
206 elif os.WIFSIGNALED(code):
207 return -os.WTERMSIG(code)
207 return -os.WTERMSIG(code)
208
208
209 def _windowsworker(ui, func, staticargs, args):
209 def _windowsworker(ui, func, staticargs, args):
210 class Worker(threading.Thread):
210 class Worker(threading.Thread):
211 def __init__(self, taskqueue, resultqueue, func, staticargs,
211 def __init__(self, taskqueue, resultqueue, func, staticargs,
212 group=None, target=None, name=None, verbose=None):
212 group=None, target=None, name=None, verbose=None):
213 threading.Thread.__init__(self, group=group, target=target,
213 threading.Thread.__init__(self, group=group, target=target,
214 name=name, verbose=verbose)
214 name=name, verbose=verbose)
215 self._taskqueue = taskqueue
215 self._taskqueue = taskqueue
216 self._resultqueue = resultqueue
216 self._resultqueue = resultqueue
217 self._func = func
217 self._func = func
218 self._staticargs = staticargs
218 self._staticargs = staticargs
219 self._interrupted = False
219 self._interrupted = False
220 self.daemon = True
220 self.daemon = True
221 self.exception = None
221 self.exception = None
222
222
223 def interrupt(self):
223 def interrupt(self):
224 self._interrupted = True
224 self._interrupted = True
225
225
226 def run(self):
226 def run(self):
227 try:
227 try:
228 while not self._taskqueue.empty():
228 while not self._taskqueue.empty():
229 try:
229 try:
230 args = self._taskqueue.get_nowait()
230 args = self._taskqueue.get_nowait()
231 for res in self._func(*self._staticargs + (args,)):
231 for res in self._func(*self._staticargs + (args,)):
232 self._resultqueue.put(res)
232 self._resultqueue.put(res)
233 # threading doesn't provide a native way to
233 # threading doesn't provide a native way to
234 # interrupt execution. handle it manually at every
234 # interrupt execution. handle it manually at every
235 # iteration.
235 # iteration.
236 if self._interrupted:
236 if self._interrupted:
237 return
237 return
238 except util.empty:
238 except util.empty:
239 break
239 break
240 except Exception as e:
240 except Exception as e:
241 # store the exception such that the main thread can resurface
241 # store the exception such that the main thread can resurface
242 # it as if the func was running without workers.
242 # it as if the func was running without workers.
243 self.exception = e
243 self.exception = e
244 raise
244 raise
245
245
246 threads = []
246 threads = []
247 def trykillworkers():
247 def trykillworkers():
248 # Allow up to 1 second to clean worker threads nicely
248 # Allow up to 1 second to clean worker threads nicely
249 cleanupend = time.time() + 1
249 cleanupend = time.time() + 1
250 for t in threads:
250 for t in threads:
251 t.interrupt()
251 t.interrupt()
252 for t in threads:
252 for t in threads:
253 remainingtime = cleanupend - time.time()
253 remainingtime = cleanupend - time.time()
254 t.join(remainingtime)
254 t.join(remainingtime)
255 if t.is_alive():
255 if t.is_alive():
256 # pass over the workers joining failure. it is more
256 # pass over the workers joining failure. it is more
257 # important to surface the inital exception than the
257 # important to surface the inital exception than the
258 # fact that one of workers may be processing a large
258 # fact that one of workers may be processing a large
259 # task and does not get to handle the interruption.
259 # task and does not get to handle the interruption.
260 ui.warn(_("failed to kill worker threads while "
260 ui.warn(_("failed to kill worker threads while "
261 "handling an exception\n"))
261 "handling an exception\n"))
262 return
262 return
263
263
264 workers = _numworkers(ui)
264 workers = _numworkers(ui)
265 resultqueue = util.queue()
265 resultqueue = util.queue()
266 taskqueue = util.queue()
266 taskqueue = util.queue()
267 # partition work to more pieces than workers to minimize the chance
267 # partition work to more pieces than workers to minimize the chance
268 # of uneven distribution of large tasks between the workers
268 # of uneven distribution of large tasks between the workers
269 for pargs in partition(args, workers * 20):
269 for pargs in partition(args, workers * 20):
270 taskqueue.put(pargs)
270 taskqueue.put(pargs)
271 for _i in range(workers):
271 for _i in range(workers):
272 t = Worker(taskqueue, resultqueue, func, staticargs)
272 t = Worker(taskqueue, resultqueue, func, staticargs)
273 threads.append(t)
273 threads.append(t)
274 t.start()
274 t.start()
275 try:
275 try:
276 while len(threads) > 0:
276 while len(threads) > 0:
277 while not resultqueue.empty():
277 while not resultqueue.empty():
278 yield resultqueue.get()
278 yield resultqueue.get()
279 threads[0].join(0.05)
279 threads[0].join(0.05)
280 finishedthreads = [_t for _t in threads if not _t.is_alive()]
280 finishedthreads = [_t for _t in threads if not _t.is_alive()]
281 for t in finishedthreads:
281 for t in finishedthreads:
282 if t.exception is not None:
282 if t.exception is not None:
283 raise t.exception
283 raise t.exception
284 threads.remove(t)
284 threads.remove(t)
285 except Exception: # re-raises
285 except (Exception, KeyboardInterrupt): # re-raises
286 trykillworkers()
286 trykillworkers()
287 raise
287 raise
288 while not resultqueue.empty():
288 while not resultqueue.empty():
289 yield resultqueue.get()
289 yield resultqueue.get()
290
290
291 if pycompat.iswindows:
291 if pycompat.iswindows:
292 _platformworker = _windowsworker
292 _platformworker = _windowsworker
293 else:
293 else:
294 _platformworker = _posixworker
294 _platformworker = _posixworker
295 _exitstatus = _posixexitstatus
295 _exitstatus = _posixexitstatus
296
296
297 def partition(lst, nslices):
297 def partition(lst, nslices):
298 '''partition a list into N slices of roughly equal size
298 '''partition a list into N slices of roughly equal size
299
299
300 The current strategy takes every Nth element from the input. If
300 The current strategy takes every Nth element from the input. If
301 we ever write workers that need to preserve grouping in input
301 we ever write workers that need to preserve grouping in input
302 we should consider allowing callers to specify a partition strategy.
302 we should consider allowing callers to specify a partition strategy.
303
303
304 mpm is not a fan of this partitioning strategy when files are involved.
304 mpm is not a fan of this partitioning strategy when files are involved.
305 In his words:
305 In his words:
306
306
307 Single-threaded Mercurial makes a point of creating and visiting
307 Single-threaded Mercurial makes a point of creating and visiting
308 files in a fixed order (alphabetical). When creating files in order,
308 files in a fixed order (alphabetical). When creating files in order,
309 a typical filesystem is likely to allocate them on nearby regions on
309 a typical filesystem is likely to allocate them on nearby regions on
310 disk. Thus, when revisiting in the same order, locality is maximized
310 disk. Thus, when revisiting in the same order, locality is maximized
311 and various forms of OS and disk-level caching and read-ahead get a
311 and various forms of OS and disk-level caching and read-ahead get a
312 chance to work.
312 chance to work.
313
313
314 This effect can be quite significant on spinning disks. I discovered it
314 This effect can be quite significant on spinning disks. I discovered it
315 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
315 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
316 Tarring a repo and copying it to another disk effectively randomized
316 Tarring a repo and copying it to another disk effectively randomized
317 the revlog ordering on disk by sorting the revlogs by hash and suddenly
317 the revlog ordering on disk by sorting the revlogs by hash and suddenly
318 performance of my kernel checkout benchmark dropped by ~10x because the
318 performance of my kernel checkout benchmark dropped by ~10x because the
319 "working set" of sectors visited no longer fit in the drive's cache and
319 "working set" of sectors visited no longer fit in the drive's cache and
320 the workload switched from streaming to random I/O.
320 the workload switched from streaming to random I/O.
321
321
322 What we should really be doing is have workers read filenames from a
322 What we should really be doing is have workers read filenames from a
323 ordered queue. This preserves locality and also keeps any worker from
323 ordered queue. This preserves locality and also keeps any worker from
324 getting more than one file out of balance.
324 getting more than one file out of balance.
325 '''
325 '''
326 for i in range(nslices):
326 for i in range(nslices):
327 yield lst[i::nslices]
327 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now