##// END OF EJS Templates
worker: make windows workers daemons...
Wojciech Lis -
r35448:86b8cc1f default
parent child Browse files
Show More
@@ -1,325 +1,327 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15
16
16 from .i18n import _
17 from .i18n import _
17 from . import (
18 from . import (
18 encoding,
19 encoding,
19 error,
20 error,
20 pycompat,
21 pycompat,
21 scmutil,
22 scmutil,
22 util,
23 util,
23 )
24 )
24
25
25 def countcpus():
26 def countcpus():
26 '''try to count the number of CPUs on the system'''
27 '''try to count the number of CPUs on the system'''
27
28
28 # posix
29 # posix
29 try:
30 try:
30 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
31 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
31 if n > 0:
32 if n > 0:
32 return n
33 return n
33 except (AttributeError, ValueError):
34 except (AttributeError, ValueError):
34 pass
35 pass
35
36
36 # windows
37 # windows
37 try:
38 try:
38 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
39 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
39 if n > 0:
40 if n > 0:
40 return n
41 return n
41 except (KeyError, ValueError):
42 except (KeyError, ValueError):
42 pass
43 pass
43
44
44 return 1
45 return 1
45
46
46 def _numworkers(ui):
47 def _numworkers(ui):
47 s = ui.config('worker', 'numcpus')
48 s = ui.config('worker', 'numcpus')
48 if s:
49 if s:
49 try:
50 try:
50 n = int(s)
51 n = int(s)
51 if n >= 1:
52 if n >= 1:
52 return n
53 return n
53 except ValueError:
54 except ValueError:
54 raise error.Abort(_('number of cpus must be an integer'))
55 raise error.Abort(_('number of cpus must be an integer'))
55 return min(max(countcpus(), 4), 32)
56 return min(max(countcpus(), 4), 32)
56
57
57 if pycompat.isposix or pycompat.iswindows:
58 if pycompat.isposix or pycompat.iswindows:
58 _startupcost = 0.01
59 _startupcost = 0.01
59 else:
60 else:
60 _startupcost = 1e30
61 _startupcost = 1e30
61
62
62 def worthwhile(ui, costperop, nops):
63 def worthwhile(ui, costperop, nops):
63 '''try to determine whether the benefit of multiple processes can
64 '''try to determine whether the benefit of multiple processes can
64 outweigh the cost of starting them'''
65 outweigh the cost of starting them'''
65 linear = costperop * nops
66 linear = costperop * nops
66 workers = _numworkers(ui)
67 workers = _numworkers(ui)
67 benefit = linear - (_startupcost * workers + linear / workers)
68 benefit = linear - (_startupcost * workers + linear / workers)
68 return benefit >= 0.15
69 return benefit >= 0.15
69
70
70 def worker(ui, costperarg, func, staticargs, args):
71 def worker(ui, costperarg, func, staticargs, args):
71 '''run a function, possibly in parallel in multiple worker
72 '''run a function, possibly in parallel in multiple worker
72 processes.
73 processes.
73
74
74 returns a progress iterator
75 returns a progress iterator
75
76
76 costperarg - cost of a single task
77 costperarg - cost of a single task
77
78
78 func - function to run
79 func - function to run
79
80
80 staticargs - arguments to pass to every invocation of the function
81 staticargs - arguments to pass to every invocation of the function
81
82
82 args - arguments to split into chunks, to pass to individual
83 args - arguments to split into chunks, to pass to individual
83 workers
84 workers
84 '''
85 '''
85 enabled = ui.configbool('worker', 'enabled')
86 enabled = ui.configbool('worker', 'enabled')
86 if enabled and worthwhile(ui, costperarg, len(args)):
87 if enabled and worthwhile(ui, costperarg, len(args)):
87 return _platformworker(ui, func, staticargs, args)
88 return _platformworker(ui, func, staticargs, args)
88 return func(*staticargs + (args,))
89 return func(*staticargs + (args,))
89
90
90 def _posixworker(ui, func, staticargs, args):
91 def _posixworker(ui, func, staticargs, args):
91 rfd, wfd = os.pipe()
92 rfd, wfd = os.pipe()
92 workers = _numworkers(ui)
93 workers = _numworkers(ui)
93 oldhandler = signal.getsignal(signal.SIGINT)
94 oldhandler = signal.getsignal(signal.SIGINT)
94 signal.signal(signal.SIGINT, signal.SIG_IGN)
95 signal.signal(signal.SIGINT, signal.SIG_IGN)
95 pids, problem = set(), [0]
96 pids, problem = set(), [0]
96 def killworkers():
97 def killworkers():
97 # unregister SIGCHLD handler as all children will be killed. This
98 # unregister SIGCHLD handler as all children will be killed. This
98 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
99 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
99 # could be updated while iterating, which would cause inconsistency.
100 # could be updated while iterating, which would cause inconsistency.
100 signal.signal(signal.SIGCHLD, oldchldhandler)
101 signal.signal(signal.SIGCHLD, oldchldhandler)
101 # if one worker bails, there's no good reason to wait for the rest
102 # if one worker bails, there's no good reason to wait for the rest
102 for p in pids:
103 for p in pids:
103 try:
104 try:
104 os.kill(p, signal.SIGTERM)
105 os.kill(p, signal.SIGTERM)
105 except OSError as err:
106 except OSError as err:
106 if err.errno != errno.ESRCH:
107 if err.errno != errno.ESRCH:
107 raise
108 raise
108 def waitforworkers(blocking=True):
109 def waitforworkers(blocking=True):
109 for pid in pids.copy():
110 for pid in pids.copy():
110 p = st = 0
111 p = st = 0
111 while True:
112 while True:
112 try:
113 try:
113 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
114 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
114 break
115 break
115 except OSError as e:
116 except OSError as e:
116 if e.errno == errno.EINTR:
117 if e.errno == errno.EINTR:
117 continue
118 continue
118 elif e.errno == errno.ECHILD:
119 elif e.errno == errno.ECHILD:
119 # child would already be reaped, but pids yet been
120 # child would already be reaped, but pids yet been
120 # updated (maybe interrupted just after waitpid)
121 # updated (maybe interrupted just after waitpid)
121 pids.discard(pid)
122 pids.discard(pid)
122 break
123 break
123 else:
124 else:
124 raise
125 raise
125 if not p:
126 if not p:
126 # skip subsequent steps, because child process should
127 # skip subsequent steps, because child process should
127 # be still running in this case
128 # be still running in this case
128 continue
129 continue
129 pids.discard(p)
130 pids.discard(p)
130 st = _exitstatus(st)
131 st = _exitstatus(st)
131 if st and not problem[0]:
132 if st and not problem[0]:
132 problem[0] = st
133 problem[0] = st
133 def sigchldhandler(signum, frame):
134 def sigchldhandler(signum, frame):
134 waitforworkers(blocking=False)
135 waitforworkers(blocking=False)
135 if problem[0]:
136 if problem[0]:
136 killworkers()
137 killworkers()
137 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
138 ui.flush()
139 ui.flush()
139 parentpid = os.getpid()
140 parentpid = os.getpid()
140 for pargs in partition(args, workers):
141 for pargs in partition(args, workers):
141 # make sure we use os._exit in all worker code paths. otherwise the
142 # make sure we use os._exit in all worker code paths. otherwise the
142 # worker may do some clean-ups which could cause surprises like
143 # worker may do some clean-ups which could cause surprises like
143 # deadlock. see sshpeer.cleanup for example.
144 # deadlock. see sshpeer.cleanup for example.
144 # override error handling *before* fork. this is necessary because
145 # override error handling *before* fork. this is necessary because
145 # exception (signal) may arrive after fork, before "pid =" assignment
146 # exception (signal) may arrive after fork, before "pid =" assignment
146 # completes, and other exception handler (dispatch.py) can lead to
147 # completes, and other exception handler (dispatch.py) can lead to
147 # unexpected code path without os._exit.
148 # unexpected code path without os._exit.
148 ret = -1
149 ret = -1
149 try:
150 try:
150 pid = os.fork()
151 pid = os.fork()
151 if pid == 0:
152 if pid == 0:
152 signal.signal(signal.SIGINT, oldhandler)
153 signal.signal(signal.SIGINT, oldhandler)
153 signal.signal(signal.SIGCHLD, oldchldhandler)
154 signal.signal(signal.SIGCHLD, oldchldhandler)
154
155
155 def workerfunc():
156 def workerfunc():
156 os.close(rfd)
157 os.close(rfd)
157 for i, item in func(*(staticargs + (pargs,))):
158 for i, item in func(*(staticargs + (pargs,))):
158 os.write(wfd, '%d %s\n' % (i, item))
159 os.write(wfd, '%d %s\n' % (i, item))
159 return 0
160 return 0
160
161
161 ret = scmutil.callcatch(ui, workerfunc)
162 ret = scmutil.callcatch(ui, workerfunc)
162 except: # parent re-raises, child never returns
163 except: # parent re-raises, child never returns
163 if os.getpid() == parentpid:
164 if os.getpid() == parentpid:
164 raise
165 raise
165 exctype = sys.exc_info()[0]
166 exctype = sys.exc_info()[0]
166 force = not issubclass(exctype, KeyboardInterrupt)
167 force = not issubclass(exctype, KeyboardInterrupt)
167 ui.traceback(force=force)
168 ui.traceback(force=force)
168 finally:
169 finally:
169 if os.getpid() != parentpid:
170 if os.getpid() != parentpid:
170 try:
171 try:
171 ui.flush()
172 ui.flush()
172 except: # never returns, no re-raises
173 except: # never returns, no re-raises
173 pass
174 pass
174 finally:
175 finally:
175 os._exit(ret & 255)
176 os._exit(ret & 255)
176 pids.add(pid)
177 pids.add(pid)
177 os.close(wfd)
178 os.close(wfd)
178 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
179 fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
179 def cleanup():
180 def cleanup():
180 signal.signal(signal.SIGINT, oldhandler)
181 signal.signal(signal.SIGINT, oldhandler)
181 waitforworkers()
182 waitforworkers()
182 signal.signal(signal.SIGCHLD, oldchldhandler)
183 signal.signal(signal.SIGCHLD, oldchldhandler)
183 status = problem[0]
184 status = problem[0]
184 if status:
185 if status:
185 if status < 0:
186 if status < 0:
186 os.kill(os.getpid(), -status)
187 os.kill(os.getpid(), -status)
187 sys.exit(status)
188 sys.exit(status)
188 try:
189 try:
189 for line in util.iterfile(fp):
190 for line in util.iterfile(fp):
190 l = line.split(' ', 1)
191 l = line.split(' ', 1)
191 yield int(l[0]), l[1][:-1]
192 yield int(l[0]), l[1][:-1]
192 except: # re-raises
193 except: # re-raises
193 killworkers()
194 killworkers()
194 cleanup()
195 cleanup()
195 raise
196 raise
196 cleanup()
197 cleanup()
197
198
198 def _posixexitstatus(code):
199 def _posixexitstatus(code):
199 '''convert a posix exit status into the same form returned by
200 '''convert a posix exit status into the same form returned by
200 os.spawnv
201 os.spawnv
201
202
202 returns None if the process was stopped instead of exiting'''
203 returns None if the process was stopped instead of exiting'''
203 if os.WIFEXITED(code):
204 if os.WIFEXITED(code):
204 return os.WEXITSTATUS(code)
205 return os.WEXITSTATUS(code)
205 elif os.WIFSIGNALED(code):
206 elif os.WIFSIGNALED(code):
206 return -os.WTERMSIG(code)
207 return -os.WTERMSIG(code)
207
208
208 def _windowsworker(ui, func, staticargs, args):
209 def _windowsworker(ui, func, staticargs, args):
209 class Worker(threading.Thread):
210 class Worker(threading.Thread):
210 def __init__(self, taskqueue, resultqueue, func, staticargs,
211 def __init__(self, taskqueue, resultqueue, func, staticargs,
211 group=None, target=None, name=None, verbose=None):
212 group=None, target=None, name=None, verbose=None):
212 threading.Thread.__init__(self, group=group, target=target,
213 threading.Thread.__init__(self, group=group, target=target,
213 name=name, verbose=verbose)
214 name=name, verbose=verbose)
214 self._taskqueue = taskqueue
215 self._taskqueue = taskqueue
215 self._resultqueue = resultqueue
216 self._resultqueue = resultqueue
216 self._func = func
217 self._func = func
217 self._staticargs = staticargs
218 self._staticargs = staticargs
218 self._interrupted = False
219 self._interrupted = False
220 self.daemon = True
219 self.exception = None
221 self.exception = None
220
222
221 def interrupt(self):
223 def interrupt(self):
222 self._interrupted = True
224 self._interrupted = True
223
225
224 def run(self):
226 def run(self):
225 try:
227 try:
226 while not self._taskqueue.empty():
228 while not self._taskqueue.empty():
227 try:
229 try:
228 args = self._taskqueue.get_nowait()
230 args = self._taskqueue.get_nowait()
229 for res in self._func(*self._staticargs + (args,)):
231 for res in self._func(*self._staticargs + (args,)):
230 self._resultqueue.put(res)
232 self._resultqueue.put(res)
231 # threading doesn't provide a native way to
233 # threading doesn't provide a native way to
232 # interrupt execution. handle it manually at every
234 # interrupt execution. handle it manually at every
233 # iteration.
235 # iteration.
234 if self._interrupted:
236 if self._interrupted:
235 return
237 return
236 except util.empty:
238 except util.empty:
237 break
239 break
238 except Exception as e:
240 except Exception as e:
239 # store the exception such that the main thread can resurface
241 # store the exception such that the main thread can resurface
240 # it as if the func was running without workers.
242 # it as if the func was running without workers.
241 self.exception = e
243 self.exception = e
242 raise
244 raise
243
245
244 threads = []
246 threads = []
245 def killworkers():
247 def trykillworkers():
248 # Allow up to 1 second to clean worker threads nicely
249 cleanupend = time.time() + 1
246 for t in threads:
250 for t in threads:
247 t.interrupt()
251 t.interrupt()
248 for t in threads:
252 for t in threads:
249 # try to let the threads handle interruption, but don't wait
253 remainingtime = cleanupend - time.time()
250 # indefintely. the thread could be in infinite loop, handling
254 t.join(remainingtime)
251 # a very long task or in a deadlock situation
252 t.join(5)
253 if t.is_alive():
255 if t.is_alive():
254 raise error.Abort(_('failed to join worker thread'))
256 # pass over the workers joining failure. it is more
257 # important to surface the inital exception than the
258 # fact that one of workers may be processing a large
259 # task and does not get to handle the interruption.
260 ui.warn(_("failed to kill worker threads while "
261 "handling an exception\n"))
262 return
255
263
256 workers = _numworkers(ui)
264 workers = _numworkers(ui)
257 resultqueue = util.queue()
265 resultqueue = util.queue()
258 taskqueue = util.queue()
266 taskqueue = util.queue()
259 # partition work to more pieces than workers to minimize the chance
267 # partition work to more pieces than workers to minimize the chance
260 # of uneven distribution of large tasks between the workers
268 # of uneven distribution of large tasks between the workers
261 for pargs in partition(args, workers * 20):
269 for pargs in partition(args, workers * 20):
262 taskqueue.put(pargs)
270 taskqueue.put(pargs)
263 for _i in range(workers):
271 for _i in range(workers):
264 t = Worker(taskqueue, resultqueue, func, staticargs)
272 t = Worker(taskqueue, resultqueue, func, staticargs)
265 threads.append(t)
273 threads.append(t)
266 t.start()
274 t.start()
267
275 try:
268 while len(threads) > 0:
276 while len(threads) > 0:
269 while not resultqueue.empty():
277 while not resultqueue.empty():
270 yield resultqueue.get()
278 yield resultqueue.get()
271 threads[0].join(0.05)
279 threads[0].join(0.05)
272 finishedthreads = [_t for _t in threads if not _t.is_alive()]
280 finishedthreads = [_t for _t in threads if not _t.is_alive()]
273 for t in finishedthreads:
281 for t in finishedthreads:
274 if t.exception is not None:
282 if t.exception is not None:
275 try:
283 raise t.exception
276 killworkers()
284 threads.remove(t)
277 except Exception:
285 except Exception: # re-raises
278 # pass over the workers joining failure. it is more
286 trykillworkers()
279 # important to surface the inital exception than the
287 raise
280 # fact that one of workers may be processing a large
281 # task and does not get to handle the interruption.
282 ui.warn(_("failed to kill worker threads while handling "
283 "an exception"))
284 raise t.exception
285 threads.remove(t)
286 while not resultqueue.empty():
288 while not resultqueue.empty():
287 yield resultqueue.get()
289 yield resultqueue.get()
288
290
289 if pycompat.iswindows:
291 if pycompat.iswindows:
290 _platformworker = _windowsworker
292 _platformworker = _windowsworker
291 else:
293 else:
292 _platformworker = _posixworker
294 _platformworker = _posixworker
293 _exitstatus = _posixexitstatus
295 _exitstatus = _posixexitstatus
294
296
295 def partition(lst, nslices):
297 def partition(lst, nslices):
296 '''partition a list into N slices of roughly equal size
298 '''partition a list into N slices of roughly equal size
297
299
298 The current strategy takes every Nth element from the input. If
300 The current strategy takes every Nth element from the input. If
299 we ever write workers that need to preserve grouping in input
301 we ever write workers that need to preserve grouping in input
300 we should consider allowing callers to specify a partition strategy.
302 we should consider allowing callers to specify a partition strategy.
301
303
302 mpm is not a fan of this partitioning strategy when files are involved.
304 mpm is not a fan of this partitioning strategy when files are involved.
303 In his words:
305 In his words:
304
306
305 Single-threaded Mercurial makes a point of creating and visiting
307 Single-threaded Mercurial makes a point of creating and visiting
306 files in a fixed order (alphabetical). When creating files in order,
308 files in a fixed order (alphabetical). When creating files in order,
307 a typical filesystem is likely to allocate them on nearby regions on
309 a typical filesystem is likely to allocate them on nearby regions on
308 disk. Thus, when revisiting in the same order, locality is maximized
310 disk. Thus, when revisiting in the same order, locality is maximized
309 and various forms of OS and disk-level caching and read-ahead get a
311 and various forms of OS and disk-level caching and read-ahead get a
310 chance to work.
312 chance to work.
311
313
312 This effect can be quite significant on spinning disks. I discovered it
314 This effect can be quite significant on spinning disks. I discovered it
313 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
315 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
314 Tarring a repo and copying it to another disk effectively randomized
316 Tarring a repo and copying it to another disk effectively randomized
315 the revlog ordering on disk by sorting the revlogs by hash and suddenly
317 the revlog ordering on disk by sorting the revlogs by hash and suddenly
316 performance of my kernel checkout benchmark dropped by ~10x because the
318 performance of my kernel checkout benchmark dropped by ~10x because the
317 "working set" of sectors visited no longer fit in the drive's cache and
319 "working set" of sectors visited no longer fit in the drive's cache and
318 the workload switched from streaming to random I/O.
320 the workload switched from streaming to random I/O.
319
321
320 What we should really be doing is have workers read filenames from a
322 What we should really be doing is have workers read filenames from a
321 ordered queue. This preserves locality and also keeps any worker from
323 ordered queue. This preserves locality and also keeps any worker from
322 getting more than one file out of balance.
324 getting more than one file out of balance.
323 '''
325 '''
324 for i in range(nslices):
326 for i in range(nslices):
325 yield lst[i::nslices]
327 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now