##// END OF EJS Templates
worker: call selector.close() to release polling resources
Yuya Nishihara -
r38763:c08ea1e2 stable
parent child Browse files
Show More
@@ -1,368 +1,369 b''
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19 selectors.BaseSelector
19 selectors.BaseSelector
20 except ImportError:
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
21 from .thirdparty import selectors2 as selectors
22
22
23 from .i18n import _
23 from .i18n import _
24 from . import (
24 from . import (
25 encoding,
25 encoding,
26 error,
26 error,
27 pycompat,
27 pycompat,
28 scmutil,
28 scmutil,
29 util,
29 util,
30 )
30 )
31
31
32 def countcpus():
32 def countcpus():
33 '''try to count the number of CPUs on the system'''
33 '''try to count the number of CPUs on the system'''
34
34
35 # posix
35 # posix
36 try:
36 try:
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (AttributeError, ValueError):
40 except (AttributeError, ValueError):
41 pass
41 pass
42
42
43 # windows
43 # windows
44 try:
44 try:
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 if n > 0:
46 if n > 0:
47 return n
47 return n
48 except (KeyError, ValueError):
48 except (KeyError, ValueError):
49 pass
49 pass
50
50
51 return 1
51 return 1
52
52
53 def _numworkers(ui):
53 def _numworkers(ui):
54 s = ui.config('worker', 'numcpus')
54 s = ui.config('worker', 'numcpus')
55 if s:
55 if s:
56 try:
56 try:
57 n = int(s)
57 n = int(s)
58 if n >= 1:
58 if n >= 1:
59 return n
59 return n
60 except ValueError:
60 except ValueError:
61 raise error.Abort(_('number of cpus must be an integer'))
61 raise error.Abort(_('number of cpus must be an integer'))
62 return min(max(countcpus(), 4), 32)
62 return min(max(countcpus(), 4), 32)
63
63
64 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
65 _STARTUP_COST = 0.01
65 _STARTUP_COST = 0.01
66 # The Windows worker is thread based. If tasks are CPU bound, threads
66 # The Windows worker is thread based. If tasks are CPU bound, threads
67 # in the presence of the GIL result in excessive context switching and
67 # in the presence of the GIL result in excessive context switching and
68 # this overhead can slow down execution.
68 # this overhead can slow down execution.
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
70 else:
70 else:
71 _STARTUP_COST = 1e30
71 _STARTUP_COST = 1e30
72 _DISALLOW_THREAD_UNSAFE = False
72 _DISALLOW_THREAD_UNSAFE = False
73
73
74 def worthwhile(ui, costperop, nops, threadsafe=True):
74 def worthwhile(ui, costperop, nops, threadsafe=True):
75 '''try to determine whether the benefit of multiple processes can
75 '''try to determine whether the benefit of multiple processes can
76 outweigh the cost of starting them'''
76 outweigh the cost of starting them'''
77
77
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
79 return False
79 return False
80
80
81 linear = costperop * nops
81 linear = costperop * nops
82 workers = _numworkers(ui)
82 workers = _numworkers(ui)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 return benefit >= 0.15
84 return benefit >= 0.15
85
85
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
87 '''run a function, possibly in parallel in multiple worker
87 '''run a function, possibly in parallel in multiple worker
88 processes.
88 processes.
89
89
90 returns a progress iterator
90 returns a progress iterator
91
91
92 costperarg - cost of a single task
92 costperarg - cost of a single task
93
93
94 func - function to run
94 func - function to run
95
95
96 staticargs - arguments to pass to every invocation of the function
96 staticargs - arguments to pass to every invocation of the function
97
97
98 args - arguments to split into chunks, to pass to individual
98 args - arguments to split into chunks, to pass to individual
99 workers
99 workers
100
100
101 threadsafe - whether work items are thread safe and can be executed using
101 threadsafe - whether work items are thread safe and can be executed using
102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
103 release the GIL.
103 release the GIL.
104 '''
104 '''
105 enabled = ui.configbool('worker', 'enabled')
105 enabled = ui.configbool('worker', 'enabled')
106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
107 return _platformworker(ui, func, staticargs, args)
107 return _platformworker(ui, func, staticargs, args)
108 return func(*staticargs + (args,))
108 return func(*staticargs + (args,))
109
109
110 def _posixworker(ui, func, staticargs, args):
110 def _posixworker(ui, func, staticargs, args):
111 workers = _numworkers(ui)
111 workers = _numworkers(ui)
112 oldhandler = signal.getsignal(signal.SIGINT)
112 oldhandler = signal.getsignal(signal.SIGINT)
113 signal.signal(signal.SIGINT, signal.SIG_IGN)
113 signal.signal(signal.SIGINT, signal.SIG_IGN)
114 pids, problem = set(), [0]
114 pids, problem = set(), [0]
115 def killworkers():
115 def killworkers():
116 # unregister SIGCHLD handler as all children will be killed. This
116 # unregister SIGCHLD handler as all children will be killed. This
117 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
117 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
118 # could be updated while iterating, which would cause inconsistency.
118 # could be updated while iterating, which would cause inconsistency.
119 signal.signal(signal.SIGCHLD, oldchldhandler)
119 signal.signal(signal.SIGCHLD, oldchldhandler)
120 # if one worker bails, there's no good reason to wait for the rest
120 # if one worker bails, there's no good reason to wait for the rest
121 for p in pids:
121 for p in pids:
122 try:
122 try:
123 os.kill(p, signal.SIGTERM)
123 os.kill(p, signal.SIGTERM)
124 except OSError as err:
124 except OSError as err:
125 if err.errno != errno.ESRCH:
125 if err.errno != errno.ESRCH:
126 raise
126 raise
127 def waitforworkers(blocking=True):
127 def waitforworkers(blocking=True):
128 for pid in pids.copy():
128 for pid in pids.copy():
129 p = st = 0
129 p = st = 0
130 while True:
130 while True:
131 try:
131 try:
132 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
132 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
133 break
133 break
134 except OSError as e:
134 except OSError as e:
135 if e.errno == errno.EINTR:
135 if e.errno == errno.EINTR:
136 continue
136 continue
137 elif e.errno == errno.ECHILD:
137 elif e.errno == errno.ECHILD:
138 # child would already be reaped, but pids yet been
138 # child would already be reaped, but pids yet been
139 # updated (maybe interrupted just after waitpid)
139 # updated (maybe interrupted just after waitpid)
140 pids.discard(pid)
140 pids.discard(pid)
141 break
141 break
142 else:
142 else:
143 raise
143 raise
144 if not p:
144 if not p:
145 # skip subsequent steps, because child process should
145 # skip subsequent steps, because child process should
146 # be still running in this case
146 # be still running in this case
147 continue
147 continue
148 pids.discard(p)
148 pids.discard(p)
149 st = _exitstatus(st)
149 st = _exitstatus(st)
150 if st and not problem[0]:
150 if st and not problem[0]:
151 problem[0] = st
151 problem[0] = st
152 def sigchldhandler(signum, frame):
152 def sigchldhandler(signum, frame):
153 waitforworkers(blocking=False)
153 waitforworkers(blocking=False)
154 if problem[0]:
154 if problem[0]:
155 killworkers()
155 killworkers()
156 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
156 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
157 ui.flush()
157 ui.flush()
158 parentpid = os.getpid()
158 parentpid = os.getpid()
159 pipes = []
159 pipes = []
160 for pargs in partition(args, workers):
160 for pargs in partition(args, workers):
161 # Every worker gets its own pipe to send results on, so we don't have to
161 # Every worker gets its own pipe to send results on, so we don't have to
162 # implement atomic writes larger than PIPE_BUF. Each forked process has
162 # implement atomic writes larger than PIPE_BUF. Each forked process has
163 # its own pipe's descriptors in the local variables, and the parent
163 # its own pipe's descriptors in the local variables, and the parent
164 # process has the full list of pipe descriptors (and it doesn't really
164 # process has the full list of pipe descriptors (and it doesn't really
165 # care what order they're in).
165 # care what order they're in).
166 rfd, wfd = os.pipe()
166 rfd, wfd = os.pipe()
167 pipes.append((rfd, wfd))
167 pipes.append((rfd, wfd))
168 # make sure we use os._exit in all worker code paths. otherwise the
168 # make sure we use os._exit in all worker code paths. otherwise the
169 # worker may do some clean-ups which could cause surprises like
169 # worker may do some clean-ups which could cause surprises like
170 # deadlock. see sshpeer.cleanup for example.
170 # deadlock. see sshpeer.cleanup for example.
171 # override error handling *before* fork. this is necessary because
171 # override error handling *before* fork. this is necessary because
172 # exception (signal) may arrive after fork, before "pid =" assignment
172 # exception (signal) may arrive after fork, before "pid =" assignment
173 # completes, and other exception handler (dispatch.py) can lead to
173 # completes, and other exception handler (dispatch.py) can lead to
174 # unexpected code path without os._exit.
174 # unexpected code path without os._exit.
175 ret = -1
175 ret = -1
176 try:
176 try:
177 pid = os.fork()
177 pid = os.fork()
178 if pid == 0:
178 if pid == 0:
179 signal.signal(signal.SIGINT, oldhandler)
179 signal.signal(signal.SIGINT, oldhandler)
180 signal.signal(signal.SIGCHLD, oldchldhandler)
180 signal.signal(signal.SIGCHLD, oldchldhandler)
181
181
182 def workerfunc():
182 def workerfunc():
183 for r, w in pipes[:-1]:
183 for r, w in pipes[:-1]:
184 os.close(r)
184 os.close(r)
185 os.close(w)
185 os.close(w)
186 os.close(rfd)
186 os.close(rfd)
187 for result in func(*(staticargs + (pargs,))):
187 for result in func(*(staticargs + (pargs,))):
188 os.write(wfd, util.pickle.dumps(result))
188 os.write(wfd, util.pickle.dumps(result))
189 return 0
189 return 0
190
190
191 ret = scmutil.callcatch(ui, workerfunc)
191 ret = scmutil.callcatch(ui, workerfunc)
192 except: # parent re-raises, child never returns
192 except: # parent re-raises, child never returns
193 if os.getpid() == parentpid:
193 if os.getpid() == parentpid:
194 raise
194 raise
195 exctype = sys.exc_info()[0]
195 exctype = sys.exc_info()[0]
196 force = not issubclass(exctype, KeyboardInterrupt)
196 force = not issubclass(exctype, KeyboardInterrupt)
197 ui.traceback(force=force)
197 ui.traceback(force=force)
198 finally:
198 finally:
199 if os.getpid() != parentpid:
199 if os.getpid() != parentpid:
200 try:
200 try:
201 ui.flush()
201 ui.flush()
202 except: # never returns, no re-raises
202 except: # never returns, no re-raises
203 pass
203 pass
204 finally:
204 finally:
205 os._exit(ret & 255)
205 os._exit(ret & 255)
206 pids.add(pid)
206 pids.add(pid)
207 selector = selectors.DefaultSelector()
207 selector = selectors.DefaultSelector()
208 for rfd, wfd in pipes:
208 for rfd, wfd in pipes:
209 os.close(wfd)
209 os.close(wfd)
210 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
210 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
211 def cleanup():
211 def cleanup():
212 signal.signal(signal.SIGINT, oldhandler)
212 signal.signal(signal.SIGINT, oldhandler)
213 waitforworkers()
213 waitforworkers()
214 signal.signal(signal.SIGCHLD, oldchldhandler)
214 signal.signal(signal.SIGCHLD, oldchldhandler)
215 selector.close()
215 status = problem[0]
216 status = problem[0]
216 if status:
217 if status:
217 if status < 0:
218 if status < 0:
218 os.kill(os.getpid(), -status)
219 os.kill(os.getpid(), -status)
219 sys.exit(status)
220 sys.exit(status)
220 try:
221 try:
221 openpipes = len(pipes)
222 openpipes = len(pipes)
222 while openpipes > 0:
223 while openpipes > 0:
223 for key, events in selector.select():
224 for key, events in selector.select():
224 try:
225 try:
225 yield util.pickle.load(key.fileobj)
226 yield util.pickle.load(key.fileobj)
226 except EOFError:
227 except EOFError:
227 selector.unregister(key.fileobj)
228 selector.unregister(key.fileobj)
228 key.fileobj.close()
229 key.fileobj.close()
229 openpipes -= 1
230 openpipes -= 1
230 except IOError as e:
231 except IOError as e:
231 if e.errno == errno.EINTR:
232 if e.errno == errno.EINTR:
232 continue
233 continue
233 raise
234 raise
234 except: # re-raises
235 except: # re-raises
235 killworkers()
236 killworkers()
236 cleanup()
237 cleanup()
237 raise
238 raise
238 cleanup()
239 cleanup()
239
240
240 def _posixexitstatus(code):
241 def _posixexitstatus(code):
241 '''convert a posix exit status into the same form returned by
242 '''convert a posix exit status into the same form returned by
242 os.spawnv
243 os.spawnv
243
244
244 returns None if the process was stopped instead of exiting'''
245 returns None if the process was stopped instead of exiting'''
245 if os.WIFEXITED(code):
246 if os.WIFEXITED(code):
246 return os.WEXITSTATUS(code)
247 return os.WEXITSTATUS(code)
247 elif os.WIFSIGNALED(code):
248 elif os.WIFSIGNALED(code):
248 return -os.WTERMSIG(code)
249 return -os.WTERMSIG(code)
249
250
250 def _windowsworker(ui, func, staticargs, args):
251 def _windowsworker(ui, func, staticargs, args):
251 class Worker(threading.Thread):
252 class Worker(threading.Thread):
252 def __init__(self, taskqueue, resultqueue, func, staticargs,
253 def __init__(self, taskqueue, resultqueue, func, staticargs,
253 group=None, target=None, name=None, verbose=None):
254 group=None, target=None, name=None, verbose=None):
254 threading.Thread.__init__(self, group=group, target=target,
255 threading.Thread.__init__(self, group=group, target=target,
255 name=name, verbose=verbose)
256 name=name, verbose=verbose)
256 self._taskqueue = taskqueue
257 self._taskqueue = taskqueue
257 self._resultqueue = resultqueue
258 self._resultqueue = resultqueue
258 self._func = func
259 self._func = func
259 self._staticargs = staticargs
260 self._staticargs = staticargs
260 self._interrupted = False
261 self._interrupted = False
261 self.daemon = True
262 self.daemon = True
262 self.exception = None
263 self.exception = None
263
264
264 def interrupt(self):
265 def interrupt(self):
265 self._interrupted = True
266 self._interrupted = True
266
267
267 def run(self):
268 def run(self):
268 try:
269 try:
269 while not self._taskqueue.empty():
270 while not self._taskqueue.empty():
270 try:
271 try:
271 args = self._taskqueue.get_nowait()
272 args = self._taskqueue.get_nowait()
272 for res in self._func(*self._staticargs + (args,)):
273 for res in self._func(*self._staticargs + (args,)):
273 self._resultqueue.put(res)
274 self._resultqueue.put(res)
274 # threading doesn't provide a native way to
275 # threading doesn't provide a native way to
275 # interrupt execution. handle it manually at every
276 # interrupt execution. handle it manually at every
276 # iteration.
277 # iteration.
277 if self._interrupted:
278 if self._interrupted:
278 return
279 return
279 except pycompat.queue.Empty:
280 except pycompat.queue.Empty:
280 break
281 break
281 except Exception as e:
282 except Exception as e:
282 # store the exception such that the main thread can resurface
283 # store the exception such that the main thread can resurface
283 # it as if the func was running without workers.
284 # it as if the func was running without workers.
284 self.exception = e
285 self.exception = e
285 raise
286 raise
286
287
287 threads = []
288 threads = []
288 def trykillworkers():
289 def trykillworkers():
289 # Allow up to 1 second to clean worker threads nicely
290 # Allow up to 1 second to clean worker threads nicely
290 cleanupend = time.time() + 1
291 cleanupend = time.time() + 1
291 for t in threads:
292 for t in threads:
292 t.interrupt()
293 t.interrupt()
293 for t in threads:
294 for t in threads:
294 remainingtime = cleanupend - time.time()
295 remainingtime = cleanupend - time.time()
295 t.join(remainingtime)
296 t.join(remainingtime)
296 if t.is_alive():
297 if t.is_alive():
297 # pass over the workers joining failure. it is more
298 # pass over the workers joining failure. it is more
298 # important to surface the inital exception than the
299 # important to surface the inital exception than the
299 # fact that one of workers may be processing a large
300 # fact that one of workers may be processing a large
300 # task and does not get to handle the interruption.
301 # task and does not get to handle the interruption.
301 ui.warn(_("failed to kill worker threads while "
302 ui.warn(_("failed to kill worker threads while "
302 "handling an exception\n"))
303 "handling an exception\n"))
303 return
304 return
304
305
305 workers = _numworkers(ui)
306 workers = _numworkers(ui)
306 resultqueue = pycompat.queue.Queue()
307 resultqueue = pycompat.queue.Queue()
307 taskqueue = pycompat.queue.Queue()
308 taskqueue = pycompat.queue.Queue()
308 # partition work to more pieces than workers to minimize the chance
309 # partition work to more pieces than workers to minimize the chance
309 # of uneven distribution of large tasks between the workers
310 # of uneven distribution of large tasks between the workers
310 for pargs in partition(args, workers * 20):
311 for pargs in partition(args, workers * 20):
311 taskqueue.put(pargs)
312 taskqueue.put(pargs)
312 for _i in range(workers):
313 for _i in range(workers):
313 t = Worker(taskqueue, resultqueue, func, staticargs)
314 t = Worker(taskqueue, resultqueue, func, staticargs)
314 threads.append(t)
315 threads.append(t)
315 t.start()
316 t.start()
316 try:
317 try:
317 while len(threads) > 0:
318 while len(threads) > 0:
318 while not resultqueue.empty():
319 while not resultqueue.empty():
319 yield resultqueue.get()
320 yield resultqueue.get()
320 threads[0].join(0.05)
321 threads[0].join(0.05)
321 finishedthreads = [_t for _t in threads if not _t.is_alive()]
322 finishedthreads = [_t for _t in threads if not _t.is_alive()]
322 for t in finishedthreads:
323 for t in finishedthreads:
323 if t.exception is not None:
324 if t.exception is not None:
324 raise t.exception
325 raise t.exception
325 threads.remove(t)
326 threads.remove(t)
326 except (Exception, KeyboardInterrupt): # re-raises
327 except (Exception, KeyboardInterrupt): # re-raises
327 trykillworkers()
328 trykillworkers()
328 raise
329 raise
329 while not resultqueue.empty():
330 while not resultqueue.empty():
330 yield resultqueue.get()
331 yield resultqueue.get()
331
332
332 if pycompat.iswindows:
333 if pycompat.iswindows:
333 _platformworker = _windowsworker
334 _platformworker = _windowsworker
334 else:
335 else:
335 _platformworker = _posixworker
336 _platformworker = _posixworker
336 _exitstatus = _posixexitstatus
337 _exitstatus = _posixexitstatus
337
338
338 def partition(lst, nslices):
339 def partition(lst, nslices):
339 '''partition a list into N slices of roughly equal size
340 '''partition a list into N slices of roughly equal size
340
341
341 The current strategy takes every Nth element from the input. If
342 The current strategy takes every Nth element from the input. If
342 we ever write workers that need to preserve grouping in input
343 we ever write workers that need to preserve grouping in input
343 we should consider allowing callers to specify a partition strategy.
344 we should consider allowing callers to specify a partition strategy.
344
345
345 mpm is not a fan of this partitioning strategy when files are involved.
346 mpm is not a fan of this partitioning strategy when files are involved.
346 In his words:
347 In his words:
347
348
348 Single-threaded Mercurial makes a point of creating and visiting
349 Single-threaded Mercurial makes a point of creating and visiting
349 files in a fixed order (alphabetical). When creating files in order,
350 files in a fixed order (alphabetical). When creating files in order,
350 a typical filesystem is likely to allocate them on nearby regions on
351 a typical filesystem is likely to allocate them on nearby regions on
351 disk. Thus, when revisiting in the same order, locality is maximized
352 disk. Thus, when revisiting in the same order, locality is maximized
352 and various forms of OS and disk-level caching and read-ahead get a
353 and various forms of OS and disk-level caching and read-ahead get a
353 chance to work.
354 chance to work.
354
355
355 This effect can be quite significant on spinning disks. I discovered it
356 This effect can be quite significant on spinning disks. I discovered it
356 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
357 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
357 Tarring a repo and copying it to another disk effectively randomized
358 Tarring a repo and copying it to another disk effectively randomized
358 the revlog ordering on disk by sorting the revlogs by hash and suddenly
359 the revlog ordering on disk by sorting the revlogs by hash and suddenly
359 performance of my kernel checkout benchmark dropped by ~10x because the
360 performance of my kernel checkout benchmark dropped by ~10x because the
360 "working set" of sectors visited no longer fit in the drive's cache and
361 "working set" of sectors visited no longer fit in the drive's cache and
361 the workload switched from streaming to random I/O.
362 the workload switched from streaming to random I/O.
362
363
363 What we should really be doing is have workers read filenames from a
364 What we should really be doing is have workers read filenames from a
364 ordered queue. This preserves locality and also keeps any worker from
365 ordered queue. This preserves locality and also keeps any worker from
365 getting more than one file out of balance.
366 getting more than one file out of balance.
366 '''
367 '''
367 for i in range(nslices):
368 for i in range(nslices):
368 yield lst[i::nslices]
369 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now