##// END OF EJS Templates
worker: use one pipe per posix worker and select() in parent process...
Danny Hooper -
r38752:9e6afe7f default
parent child Browse files
Show More
@@ -1,333 +1,355 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 try:
18 import selectors
19 selectors.BaseSelector
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
22
17 23 from .i18n import _
18 24 from . import (
19 25 encoding,
20 26 error,
21 27 pycompat,
22 28 scmutil,
23 29 util,
24 30 )
25 31
26 32 def countcpus():
27 33 '''try to count the number of CPUs on the system'''
28 34
29 35 # posix
30 36 try:
31 37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
32 38 if n > 0:
33 39 return n
34 40 except (AttributeError, ValueError):
35 41 pass
36 42
37 43 # windows
38 44 try:
39 45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
40 46 if n > 0:
41 47 return n
42 48 except (KeyError, ValueError):
43 49 pass
44 50
45 51 return 1
46 52
47 53 def _numworkers(ui):
48 54 s = ui.config('worker', 'numcpus')
49 55 if s:
50 56 try:
51 57 n = int(s)
52 58 if n >= 1:
53 59 return n
54 60 except ValueError:
55 61 raise error.Abort(_('number of cpus must be an integer'))
56 62 return min(max(countcpus(), 4), 32)
57 63
58 64 if pycompat.isposix or pycompat.iswindows:
59 65 _startupcost = 0.01
60 66 else:
61 67 _startupcost = 1e30
62 68
63 69 def worthwhile(ui, costperop, nops):
64 70 '''try to determine whether the benefit of multiple processes can
65 71 outweigh the cost of starting them'''
66 72 linear = costperop * nops
67 73 workers = _numworkers(ui)
68 74 benefit = linear - (_startupcost * workers + linear / workers)
69 75 return benefit >= 0.15
70 76
71 77 def worker(ui, costperarg, func, staticargs, args):
72 78 '''run a function, possibly in parallel in multiple worker
73 79 processes.
74 80
75 81 returns a progress iterator
76 82
77 83 costperarg - cost of a single task
78 84
79 85 func - function to run
80 86
81 87 staticargs - arguments to pass to every invocation of the function
82 88
83 89 args - arguments to split into chunks, to pass to individual
84 90 workers
85 91 '''
86 92 enabled = ui.configbool('worker', 'enabled')
87 93 if enabled and worthwhile(ui, costperarg, len(args)):
88 94 return _platformworker(ui, func, staticargs, args)
89 95 return func(*staticargs + (args,))
90 96
91 97 def _posixworker(ui, func, staticargs, args):
92 rfd, wfd = os.pipe()
93 98 workers = _numworkers(ui)
94 99 oldhandler = signal.getsignal(signal.SIGINT)
95 100 signal.signal(signal.SIGINT, signal.SIG_IGN)
96 101 pids, problem = set(), [0]
97 102 def killworkers():
98 103 # unregister SIGCHLD handler as all children will be killed. This
99 104 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
100 105 # could be updated while iterating, which would cause inconsistency.
101 106 signal.signal(signal.SIGCHLD, oldchldhandler)
102 107 # if one worker bails, there's no good reason to wait for the rest
103 108 for p in pids:
104 109 try:
105 110 os.kill(p, signal.SIGTERM)
106 111 except OSError as err:
107 112 if err.errno != errno.ESRCH:
108 113 raise
109 114 def waitforworkers(blocking=True):
110 115 for pid in pids.copy():
111 116 p = st = 0
112 117 while True:
113 118 try:
114 119 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
115 120 break
116 121 except OSError as e:
117 122 if e.errno == errno.EINTR:
118 123 continue
119 124 elif e.errno == errno.ECHILD:
120 125 # child would already be reaped, but pids yet been
121 126 # updated (maybe interrupted just after waitpid)
122 127 pids.discard(pid)
123 128 break
124 129 else:
125 130 raise
126 131 if not p:
127 132 # skip subsequent steps, because child process should
128 133 # be still running in this case
129 134 continue
130 135 pids.discard(p)
131 136 st = _exitstatus(st)
132 137 if st and not problem[0]:
133 138 problem[0] = st
134 139 def sigchldhandler(signum, frame):
135 140 waitforworkers(blocking=False)
136 141 if problem[0]:
137 142 killworkers()
138 143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
139 144 ui.flush()
140 145 parentpid = os.getpid()
146 pipes = []
141 147 for pargs in partition(args, workers):
148 # Every worker gets its own pipe to send results on, so we don't have to
149 # implement atomic writes larger than PIPE_BUF. Each forked process has
150 # its own pipe's descriptors in the local variables, and the parent
151 # process has the full list of pipe descriptors (and it doesn't really
152 # care what order they're in).
153 rfd, wfd = os.pipe()
154 pipes.append((rfd, wfd))
142 155 # make sure we use os._exit in all worker code paths. otherwise the
143 156 # worker may do some clean-ups which could cause surprises like
144 157 # deadlock. see sshpeer.cleanup for example.
145 158 # override error handling *before* fork. this is necessary because
146 159 # exception (signal) may arrive after fork, before "pid =" assignment
147 160 # completes, and other exception handler (dispatch.py) can lead to
148 161 # unexpected code path without os._exit.
149 162 ret = -1
150 163 try:
151 164 pid = os.fork()
152 165 if pid == 0:
153 166 signal.signal(signal.SIGINT, oldhandler)
154 167 signal.signal(signal.SIGCHLD, oldchldhandler)
155 168
156 169 def workerfunc():
170 for r, w in pipes[:-1]:
171 os.close(r)
172 os.close(w)
157 173 os.close(rfd)
158 174 for result in func(*(staticargs + (pargs,))):
159 175 os.write(wfd, util.pickle.dumps(result))
160 176 return 0
161 177
162 178 ret = scmutil.callcatch(ui, workerfunc)
163 179 except: # parent re-raises, child never returns
164 180 if os.getpid() == parentpid:
165 181 raise
166 182 exctype = sys.exc_info()[0]
167 183 force = not issubclass(exctype, KeyboardInterrupt)
168 184 ui.traceback(force=force)
169 185 finally:
170 186 if os.getpid() != parentpid:
171 187 try:
172 188 ui.flush()
173 189 except: # never returns, no re-raises
174 190 pass
175 191 finally:
176 192 os._exit(ret & 255)
177 193 pids.add(pid)
178 os.close(wfd)
179 fp = os.fdopen(rfd, r'rb', 0)
194 selector = selectors.DefaultSelector()
195 for rfd, wfd in pipes:
196 os.close(wfd)
197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
180 198 def cleanup():
181 199 signal.signal(signal.SIGINT, oldhandler)
182 200 waitforworkers()
183 201 signal.signal(signal.SIGCHLD, oldchldhandler)
184 202 status = problem[0]
185 203 if status:
186 204 if status < 0:
187 205 os.kill(os.getpid(), -status)
188 206 sys.exit(status)
189 207 try:
190 while True:
191 try:
192 yield util.pickle.load(fp)
193 except EOFError:
194 break
195 except IOError as e:
196 if e.errno == errno.EINTR:
197 continue
198 raise
208 openpipes = len(pipes)
209 while openpipes > 0:
210 for key, events in selector.select():
211 try:
212 yield util.pickle.load(key.fileobj)
213 except EOFError:
214 selector.unregister(key.fileobj)
215 key.fileobj.close()
216 openpipes -= 1
217 except IOError as e:
218 if e.errno == errno.EINTR:
219 continue
220 raise
199 221 except: # re-raises
200 222 killworkers()
201 223 cleanup()
202 224 raise
203 225 cleanup()
204 226
205 227 def _posixexitstatus(code):
206 228 '''convert a posix exit status into the same form returned by
207 229 os.spawnv
208 230
209 231 returns None if the process was stopped instead of exiting'''
210 232 if os.WIFEXITED(code):
211 233 return os.WEXITSTATUS(code)
212 234 elif os.WIFSIGNALED(code):
213 235 return -os.WTERMSIG(code)
214 236
215 237 def _windowsworker(ui, func, staticargs, args):
216 238 class Worker(threading.Thread):
217 239 def __init__(self, taskqueue, resultqueue, func, staticargs,
218 240 group=None, target=None, name=None, verbose=None):
219 241 threading.Thread.__init__(self, group=group, target=target,
220 242 name=name, verbose=verbose)
221 243 self._taskqueue = taskqueue
222 244 self._resultqueue = resultqueue
223 245 self._func = func
224 246 self._staticargs = staticargs
225 247 self._interrupted = False
226 248 self.daemon = True
227 249 self.exception = None
228 250
229 251 def interrupt(self):
230 252 self._interrupted = True
231 253
232 254 def run(self):
233 255 try:
234 256 while not self._taskqueue.empty():
235 257 try:
236 258 args = self._taskqueue.get_nowait()
237 259 for res in self._func(*self._staticargs + (args,)):
238 260 self._resultqueue.put(res)
239 261 # threading doesn't provide a native way to
240 262 # interrupt execution. handle it manually at every
241 263 # iteration.
242 264 if self._interrupted:
243 265 return
244 266 except pycompat.queue.Empty:
245 267 break
246 268 except Exception as e:
247 269 # store the exception such that the main thread can resurface
248 270 # it as if the func was running without workers.
249 271 self.exception = e
250 272 raise
251 273
252 274 threads = []
253 275 def trykillworkers():
254 276 # Allow up to 1 second to clean worker threads nicely
255 277 cleanupend = time.time() + 1
256 278 for t in threads:
257 279 t.interrupt()
258 280 for t in threads:
259 281 remainingtime = cleanupend - time.time()
260 282 t.join(remainingtime)
261 283 if t.is_alive():
262 284 # pass over the workers joining failure. it is more
263 285 # important to surface the inital exception than the
264 286 # fact that one of workers may be processing a large
265 287 # task and does not get to handle the interruption.
266 288 ui.warn(_("failed to kill worker threads while "
267 289 "handling an exception\n"))
268 290 return
269 291
270 292 workers = _numworkers(ui)
271 293 resultqueue = pycompat.queue.Queue()
272 294 taskqueue = pycompat.queue.Queue()
273 295 # partition work to more pieces than workers to minimize the chance
274 296 # of uneven distribution of large tasks between the workers
275 297 for pargs in partition(args, workers * 20):
276 298 taskqueue.put(pargs)
277 299 for _i in range(workers):
278 300 t = Worker(taskqueue, resultqueue, func, staticargs)
279 301 threads.append(t)
280 302 t.start()
281 303 try:
282 304 while len(threads) > 0:
283 305 while not resultqueue.empty():
284 306 yield resultqueue.get()
285 307 threads[0].join(0.05)
286 308 finishedthreads = [_t for _t in threads if not _t.is_alive()]
287 309 for t in finishedthreads:
288 310 if t.exception is not None:
289 311 raise t.exception
290 312 threads.remove(t)
291 313 except (Exception, KeyboardInterrupt): # re-raises
292 314 trykillworkers()
293 315 raise
294 316 while not resultqueue.empty():
295 317 yield resultqueue.get()
296 318
297 319 if pycompat.iswindows:
298 320 _platformworker = _windowsworker
299 321 else:
300 322 _platformworker = _posixworker
301 323 _exitstatus = _posixexitstatus
302 324
303 325 def partition(lst, nslices):
304 326 '''partition a list into N slices of roughly equal size
305 327
306 328 The current strategy takes every Nth element from the input. If
307 329 we ever write workers that need to preserve grouping in input
308 330 we should consider allowing callers to specify a partition strategy.
309 331
310 332 mpm is not a fan of this partitioning strategy when files are involved.
311 333 In his words:
312 334
313 335 Single-threaded Mercurial makes a point of creating and visiting
314 336 files in a fixed order (alphabetical). When creating files in order,
315 337 a typical filesystem is likely to allocate them on nearby regions on
316 338 disk. Thus, when revisiting in the same order, locality is maximized
317 339 and various forms of OS and disk-level caching and read-ahead get a
318 340 chance to work.
319 341
320 342 This effect can be quite significant on spinning disks. I discovered it
321 343 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
322 344 Tarring a repo and copying it to another disk effectively randomized
323 345 the revlog ordering on disk by sorting the revlogs by hash and suddenly
324 346 performance of my kernel checkout benchmark dropped by ~10x because the
325 347 "working set" of sectors visited no longer fit in the drive's cache and
326 348 the workload switched from streaming to random I/O.
327 349
328 350 What we should really be doing is have workers read filenames from a
329 351 ordered queue. This preserves locality and also keeps any worker from
330 352 getting more than one file out of balance.
331 353 '''
332 354 for i in range(nslices):
333 355 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now