##// END OF EJS Templates
worker: use one pipe per posix worker and select() in parent process...
Danny Hooper -
r38752:9e6afe7f default
parent child Browse files
Show More
@@ -1,333 +1,355 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
18 import selectors
19 selectors.BaseSelector
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
22
17 from .i18n import _
23 from .i18n import _
18 from . import (
24 from . import (
19 encoding,
25 encoding,
20 error,
26 error,
21 pycompat,
27 pycompat,
22 scmutil,
28 scmutil,
23 util,
29 util,
24 )
30 )
25
31
26 def countcpus():
32 def countcpus():
27 '''try to count the number of CPUs on the system'''
33 '''try to count the number of CPUs on the system'''
28
34
29 # posix
35 # posix
30 try:
36 try:
31 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
32 if n > 0:
38 if n > 0:
33 return n
39 return n
34 except (AttributeError, ValueError):
40 except (AttributeError, ValueError):
35 pass
41 pass
36
42
37 # windows
43 # windows
38 try:
44 try:
39 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
40 if n > 0:
46 if n > 0:
41 return n
47 return n
42 except (KeyError, ValueError):
48 except (KeyError, ValueError):
43 pass
49 pass
44
50
45 return 1
51 return 1
46
52
47 def _numworkers(ui):
53 def _numworkers(ui):
48 s = ui.config('worker', 'numcpus')
54 s = ui.config('worker', 'numcpus')
49 if s:
55 if s:
50 try:
56 try:
51 n = int(s)
57 n = int(s)
52 if n >= 1:
58 if n >= 1:
53 return n
59 return n
54 except ValueError:
60 except ValueError:
55 raise error.Abort(_('number of cpus must be an integer'))
61 raise error.Abort(_('number of cpus must be an integer'))
56 return min(max(countcpus(), 4), 32)
62 return min(max(countcpus(), 4), 32)
57
63
58 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
59 _startupcost = 0.01
65 _startupcost = 0.01
60 else:
66 else:
61 _startupcost = 1e30
67 _startupcost = 1e30
62
68
63 def worthwhile(ui, costperop, nops):
69 def worthwhile(ui, costperop, nops):
64 '''try to determine whether the benefit of multiple processes can
70 '''try to determine whether the benefit of multiple processes can
65 outweigh the cost of starting them'''
71 outweigh the cost of starting them'''
66 linear = costperop * nops
72 linear = costperop * nops
67 workers = _numworkers(ui)
73 workers = _numworkers(ui)
68 benefit = linear - (_startupcost * workers + linear / workers)
74 benefit = linear - (_startupcost * workers + linear / workers)
69 return benefit >= 0.15
75 return benefit >= 0.15
70
76
71 def worker(ui, costperarg, func, staticargs, args):
77 def worker(ui, costperarg, func, staticargs, args):
72 '''run a function, possibly in parallel in multiple worker
78 '''run a function, possibly in parallel in multiple worker
73 processes.
79 processes.
74
80
75 returns a progress iterator
81 returns a progress iterator
76
82
77 costperarg - cost of a single task
83 costperarg - cost of a single task
78
84
79 func - function to run
85 func - function to run
80
86
81 staticargs - arguments to pass to every invocation of the function
87 staticargs - arguments to pass to every invocation of the function
82
88
83 args - arguments to split into chunks, to pass to individual
89 args - arguments to split into chunks, to pass to individual
84 workers
90 workers
85 '''
91 '''
86 enabled = ui.configbool('worker', 'enabled')
92 enabled = ui.configbool('worker', 'enabled')
87 if enabled and worthwhile(ui, costperarg, len(args)):
93 if enabled and worthwhile(ui, costperarg, len(args)):
88 return _platformworker(ui, func, staticargs, args)
94 return _platformworker(ui, func, staticargs, args)
89 return func(*staticargs + (args,))
95 return func(*staticargs + (args,))
90
96
91 def _posixworker(ui, func, staticargs, args):
97 def _posixworker(ui, func, staticargs, args):
92 rfd, wfd = os.pipe()
93 workers = _numworkers(ui)
98 workers = _numworkers(ui)
94 oldhandler = signal.getsignal(signal.SIGINT)
99 oldhandler = signal.getsignal(signal.SIGINT)
95 signal.signal(signal.SIGINT, signal.SIG_IGN)
100 signal.signal(signal.SIGINT, signal.SIG_IGN)
96 pids, problem = set(), [0]
101 pids, problem = set(), [0]
97 def killworkers():
102 def killworkers():
98 # unregister SIGCHLD handler as all children will be killed. This
103 # unregister SIGCHLD handler as all children will be killed. This
99 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
104 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
100 # could be updated while iterating, which would cause inconsistency.
105 # could be updated while iterating, which would cause inconsistency.
101 signal.signal(signal.SIGCHLD, oldchldhandler)
106 signal.signal(signal.SIGCHLD, oldchldhandler)
102 # if one worker bails, there's no good reason to wait for the rest
107 # if one worker bails, there's no good reason to wait for the rest
103 for p in pids:
108 for p in pids:
104 try:
109 try:
105 os.kill(p, signal.SIGTERM)
110 os.kill(p, signal.SIGTERM)
106 except OSError as err:
111 except OSError as err:
107 if err.errno != errno.ESRCH:
112 if err.errno != errno.ESRCH:
108 raise
113 raise
109 def waitforworkers(blocking=True):
114 def waitforworkers(blocking=True):
110 for pid in pids.copy():
115 for pid in pids.copy():
111 p = st = 0
116 p = st = 0
112 while True:
117 while True:
113 try:
118 try:
114 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
119 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
115 break
120 break
116 except OSError as e:
121 except OSError as e:
117 if e.errno == errno.EINTR:
122 if e.errno == errno.EINTR:
118 continue
123 continue
119 elif e.errno == errno.ECHILD:
124 elif e.errno == errno.ECHILD:
120 # child would already be reaped, but pids yet been
125 # child would already be reaped, but pids yet been
121 # updated (maybe interrupted just after waitpid)
126 # updated (maybe interrupted just after waitpid)
122 pids.discard(pid)
127 pids.discard(pid)
123 break
128 break
124 else:
129 else:
125 raise
130 raise
126 if not p:
131 if not p:
127 # skip subsequent steps, because child process should
132 # skip subsequent steps, because child process should
128 # be still running in this case
133 # be still running in this case
129 continue
134 continue
130 pids.discard(p)
135 pids.discard(p)
131 st = _exitstatus(st)
136 st = _exitstatus(st)
132 if st and not problem[0]:
137 if st and not problem[0]:
133 problem[0] = st
138 problem[0] = st
134 def sigchldhandler(signum, frame):
139 def sigchldhandler(signum, frame):
135 waitforworkers(blocking=False)
140 waitforworkers(blocking=False)
136 if problem[0]:
141 if problem[0]:
137 killworkers()
142 killworkers()
138 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
139 ui.flush()
144 ui.flush()
140 parentpid = os.getpid()
145 parentpid = os.getpid()
146 pipes = []
141 for pargs in partition(args, workers):
147 for pargs in partition(args, workers):
148 # Every worker gets its own pipe to send results on, so we don't have to
149 # implement atomic writes larger than PIPE_BUF. Each forked process has
150 # its own pipe's descriptors in the local variables, and the parent
151 # process has the full list of pipe descriptors (and it doesn't really
152 # care what order they're in).
153 rfd, wfd = os.pipe()
154 pipes.append((rfd, wfd))
142 # make sure we use os._exit in all worker code paths. otherwise the
155 # make sure we use os._exit in all worker code paths. otherwise the
143 # worker may do some clean-ups which could cause surprises like
156 # worker may do some clean-ups which could cause surprises like
144 # deadlock. see sshpeer.cleanup for example.
157 # deadlock. see sshpeer.cleanup for example.
145 # override error handling *before* fork. this is necessary because
158 # override error handling *before* fork. this is necessary because
146 # exception (signal) may arrive after fork, before "pid =" assignment
159 # exception (signal) may arrive after fork, before "pid =" assignment
147 # completes, and other exception handler (dispatch.py) can lead to
160 # completes, and other exception handler (dispatch.py) can lead to
148 # unexpected code path without os._exit.
161 # unexpected code path without os._exit.
149 ret = -1
162 ret = -1
150 try:
163 try:
151 pid = os.fork()
164 pid = os.fork()
152 if pid == 0:
165 if pid == 0:
153 signal.signal(signal.SIGINT, oldhandler)
166 signal.signal(signal.SIGINT, oldhandler)
154 signal.signal(signal.SIGCHLD, oldchldhandler)
167 signal.signal(signal.SIGCHLD, oldchldhandler)
155
168
156 def workerfunc():
169 def workerfunc():
170 for r, w in pipes[:-1]:
171 os.close(r)
172 os.close(w)
157 os.close(rfd)
173 os.close(rfd)
158 for result in func(*(staticargs + (pargs,))):
174 for result in func(*(staticargs + (pargs,))):
159 os.write(wfd, util.pickle.dumps(result))
175 os.write(wfd, util.pickle.dumps(result))
160 return 0
176 return 0
161
177
162 ret = scmutil.callcatch(ui, workerfunc)
178 ret = scmutil.callcatch(ui, workerfunc)
163 except: # parent re-raises, child never returns
179 except: # parent re-raises, child never returns
164 if os.getpid() == parentpid:
180 if os.getpid() == parentpid:
165 raise
181 raise
166 exctype = sys.exc_info()[0]
182 exctype = sys.exc_info()[0]
167 force = not issubclass(exctype, KeyboardInterrupt)
183 force = not issubclass(exctype, KeyboardInterrupt)
168 ui.traceback(force=force)
184 ui.traceback(force=force)
169 finally:
185 finally:
170 if os.getpid() != parentpid:
186 if os.getpid() != parentpid:
171 try:
187 try:
172 ui.flush()
188 ui.flush()
173 except: # never returns, no re-raises
189 except: # never returns, no re-raises
174 pass
190 pass
175 finally:
191 finally:
176 os._exit(ret & 255)
192 os._exit(ret & 255)
177 pids.add(pid)
193 pids.add(pid)
178 os.close(wfd)
194 selector = selectors.DefaultSelector()
179 fp = os.fdopen(rfd, r'rb', 0)
195 for rfd, wfd in pipes:
196 os.close(wfd)
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
180 def cleanup():
198 def cleanup():
181 signal.signal(signal.SIGINT, oldhandler)
199 signal.signal(signal.SIGINT, oldhandler)
182 waitforworkers()
200 waitforworkers()
183 signal.signal(signal.SIGCHLD, oldchldhandler)
201 signal.signal(signal.SIGCHLD, oldchldhandler)
184 status = problem[0]
202 status = problem[0]
185 if status:
203 if status:
186 if status < 0:
204 if status < 0:
187 os.kill(os.getpid(), -status)
205 os.kill(os.getpid(), -status)
188 sys.exit(status)
206 sys.exit(status)
189 try:
207 try:
190 while True:
208 openpipes = len(pipes)
191 try:
209 while openpipes > 0:
192 yield util.pickle.load(fp)
210 for key, events in selector.select():
193 except EOFError:
211 try:
194 break
212 yield util.pickle.load(key.fileobj)
195 except IOError as e:
213 except EOFError:
196 if e.errno == errno.EINTR:
214 selector.unregister(key.fileobj)
197 continue
215 key.fileobj.close()
198 raise
216 openpipes -= 1
217 except IOError as e:
218 if e.errno == errno.EINTR:
219 continue
220 raise
199 except: # re-raises
221 except: # re-raises
200 killworkers()
222 killworkers()
201 cleanup()
223 cleanup()
202 raise
224 raise
203 cleanup()
225 cleanup()
204
226
205 def _posixexitstatus(code):
227 def _posixexitstatus(code):
206 '''convert a posix exit status into the same form returned by
228 '''convert a posix exit status into the same form returned by
207 os.spawnv
229 os.spawnv
208
230
209 returns None if the process was stopped instead of exiting'''
231 returns None if the process was stopped instead of exiting'''
210 if os.WIFEXITED(code):
232 if os.WIFEXITED(code):
211 return os.WEXITSTATUS(code)
233 return os.WEXITSTATUS(code)
212 elif os.WIFSIGNALED(code):
234 elif os.WIFSIGNALED(code):
213 return -os.WTERMSIG(code)
235 return -os.WTERMSIG(code)
214
236
215 def _windowsworker(ui, func, staticargs, args):
237 def _windowsworker(ui, func, staticargs, args):
216 class Worker(threading.Thread):
238 class Worker(threading.Thread):
217 def __init__(self, taskqueue, resultqueue, func, staticargs,
239 def __init__(self, taskqueue, resultqueue, func, staticargs,
218 group=None, target=None, name=None, verbose=None):
240 group=None, target=None, name=None, verbose=None):
219 threading.Thread.__init__(self, group=group, target=target,
241 threading.Thread.__init__(self, group=group, target=target,
220 name=name, verbose=verbose)
242 name=name, verbose=verbose)
221 self._taskqueue = taskqueue
243 self._taskqueue = taskqueue
222 self._resultqueue = resultqueue
244 self._resultqueue = resultqueue
223 self._func = func
245 self._func = func
224 self._staticargs = staticargs
246 self._staticargs = staticargs
225 self._interrupted = False
247 self._interrupted = False
226 self.daemon = True
248 self.daemon = True
227 self.exception = None
249 self.exception = None
228
250
229 def interrupt(self):
251 def interrupt(self):
230 self._interrupted = True
252 self._interrupted = True
231
253
232 def run(self):
254 def run(self):
233 try:
255 try:
234 while not self._taskqueue.empty():
256 while not self._taskqueue.empty():
235 try:
257 try:
236 args = self._taskqueue.get_nowait()
258 args = self._taskqueue.get_nowait()
237 for res in self._func(*self._staticargs + (args,)):
259 for res in self._func(*self._staticargs + (args,)):
238 self._resultqueue.put(res)
260 self._resultqueue.put(res)
239 # threading doesn't provide a native way to
261 # threading doesn't provide a native way to
240 # interrupt execution. handle it manually at every
262 # interrupt execution. handle it manually at every
241 # iteration.
263 # iteration.
242 if self._interrupted:
264 if self._interrupted:
243 return
265 return
244 except pycompat.queue.Empty:
266 except pycompat.queue.Empty:
245 break
267 break
246 except Exception as e:
268 except Exception as e:
247 # store the exception such that the main thread can resurface
269 # store the exception such that the main thread can resurface
248 # it as if the func was running without workers.
270 # it as if the func was running without workers.
249 self.exception = e
271 self.exception = e
250 raise
272 raise
251
273
252 threads = []
274 threads = []
253 def trykillworkers():
275 def trykillworkers():
254 # Allow up to 1 second to clean worker threads nicely
276 # Allow up to 1 second to clean worker threads nicely
255 cleanupend = time.time() + 1
277 cleanupend = time.time() + 1
256 for t in threads:
278 for t in threads:
257 t.interrupt()
279 t.interrupt()
258 for t in threads:
280 for t in threads:
259 remainingtime = cleanupend - time.time()
281 remainingtime = cleanupend - time.time()
260 t.join(remainingtime)
282 t.join(remainingtime)
261 if t.is_alive():
283 if t.is_alive():
262 # pass over the workers joining failure. it is more
284 # pass over the workers joining failure. it is more
263 # important to surface the inital exception than the
285 # important to surface the inital exception than the
264 # fact that one of workers may be processing a large
286 # fact that one of workers may be processing a large
265 # task and does not get to handle the interruption.
287 # task and does not get to handle the interruption.
266 ui.warn(_("failed to kill worker threads while "
288 ui.warn(_("failed to kill worker threads while "
267 "handling an exception\n"))
289 "handling an exception\n"))
268 return
290 return
269
291
270 workers = _numworkers(ui)
292 workers = _numworkers(ui)
271 resultqueue = pycompat.queue.Queue()
293 resultqueue = pycompat.queue.Queue()
272 taskqueue = pycompat.queue.Queue()
294 taskqueue = pycompat.queue.Queue()
273 # partition work to more pieces than workers to minimize the chance
295 # partition work to more pieces than workers to minimize the chance
274 # of uneven distribution of large tasks between the workers
296 # of uneven distribution of large tasks between the workers
275 for pargs in partition(args, workers * 20):
297 for pargs in partition(args, workers * 20):
276 taskqueue.put(pargs)
298 taskqueue.put(pargs)
277 for _i in range(workers):
299 for _i in range(workers):
278 t = Worker(taskqueue, resultqueue, func, staticargs)
300 t = Worker(taskqueue, resultqueue, func, staticargs)
279 threads.append(t)
301 threads.append(t)
280 t.start()
302 t.start()
281 try:
303 try:
282 while len(threads) > 0:
304 while len(threads) > 0:
283 while not resultqueue.empty():
305 while not resultqueue.empty():
284 yield resultqueue.get()
306 yield resultqueue.get()
285 threads[0].join(0.05)
307 threads[0].join(0.05)
286 finishedthreads = [_t for _t in threads if not _t.is_alive()]
308 finishedthreads = [_t for _t in threads if not _t.is_alive()]
287 for t in finishedthreads:
309 for t in finishedthreads:
288 if t.exception is not None:
310 if t.exception is not None:
289 raise t.exception
311 raise t.exception
290 threads.remove(t)
312 threads.remove(t)
291 except (Exception, KeyboardInterrupt): # re-raises
313 except (Exception, KeyboardInterrupt): # re-raises
292 trykillworkers()
314 trykillworkers()
293 raise
315 raise
294 while not resultqueue.empty():
316 while not resultqueue.empty():
295 yield resultqueue.get()
317 yield resultqueue.get()
296
318
297 if pycompat.iswindows:
319 if pycompat.iswindows:
298 _platformworker = _windowsworker
320 _platformworker = _windowsworker
299 else:
321 else:
300 _platformworker = _posixworker
322 _platformworker = _posixworker
301 _exitstatus = _posixexitstatus
323 _exitstatus = _posixexitstatus
302
324
303 def partition(lst, nslices):
325 def partition(lst, nslices):
304 '''partition a list into N slices of roughly equal size
326 '''partition a list into N slices of roughly equal size
305
327
306 The current strategy takes every Nth element from the input. If
328 The current strategy takes every Nth element from the input. If
307 we ever write workers that need to preserve grouping in input
329 we ever write workers that need to preserve grouping in input
308 we should consider allowing callers to specify a partition strategy.
330 we should consider allowing callers to specify a partition strategy.
309
331
310 mpm is not a fan of this partitioning strategy when files are involved.
332 mpm is not a fan of this partitioning strategy when files are involved.
311 In his words:
333 In his words:
312
334
313 Single-threaded Mercurial makes a point of creating and visiting
335 Single-threaded Mercurial makes a point of creating and visiting
314 files in a fixed order (alphabetical). When creating files in order,
336 files in a fixed order (alphabetical). When creating files in order,
315 a typical filesystem is likely to allocate them on nearby regions on
337 a typical filesystem is likely to allocate them on nearby regions on
316 disk. Thus, when revisiting in the same order, locality is maximized
338 disk. Thus, when revisiting in the same order, locality is maximized
317 and various forms of OS and disk-level caching and read-ahead get a
339 and various forms of OS and disk-level caching and read-ahead get a
318 chance to work.
340 chance to work.
319
341
320 This effect can be quite significant on spinning disks. I discovered it
342 This effect can be quite significant on spinning disks. I discovered it
321 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
343 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
322 Tarring a repo and copying it to another disk effectively randomized
344 Tarring a repo and copying it to another disk effectively randomized
323 the revlog ordering on disk by sorting the revlogs by hash and suddenly
345 the revlog ordering on disk by sorting the revlogs by hash and suddenly
324 performance of my kernel checkout benchmark dropped by ~10x because the
346 performance of my kernel checkout benchmark dropped by ~10x because the
325 "working set" of sectors visited no longer fit in the drive's cache and
347 "working set" of sectors visited no longer fit in the drive's cache and
326 the workload switched from streaming to random I/O.
348 the workload switched from streaming to random I/O.
327
349
328 What we should really be doing is have workers read filenames from a
350 What we should really be doing is have workers read filenames from a
329 ordered queue. This preserves locality and also keeps any worker from
351 ordered queue. This preserves locality and also keeps any worker from
330 getting more than one file out of balance.
352 getting more than one file out of balance.
331 '''
353 '''
332 for i in range(nslices):
354 for i in range(nslices):
333 yield lst[i::nslices]
355 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now