##// END OF EJS Templates
worker: rename variable to reflect constant...
Gregory Szorc -
r38753:69ed2cff default
parent child Browse files
Show More
@@ -1,355 +1,355 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import os
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19 selectors.BaseSelector
20 20 except ImportError:
21 21 from .thirdparty import selectors2 as selectors
22 22
23 23 from .i18n import _
24 24 from . import (
25 25 encoding,
26 26 error,
27 27 pycompat,
28 28 scmutil,
29 29 util,
30 30 )
31 31
32 32 def countcpus():
33 33 '''try to count the number of CPUs on the system'''
34 34
35 35 # posix
36 36 try:
37 37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 38 if n > 0:
39 39 return n
40 40 except (AttributeError, ValueError):
41 41 pass
42 42
43 43 # windows
44 44 try:
45 45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 46 if n > 0:
47 47 return n
48 48 except (KeyError, ValueError):
49 49 pass
50 50
51 51 return 1
52 52
53 53 def _numworkers(ui):
54 54 s = ui.config('worker', 'numcpus')
55 55 if s:
56 56 try:
57 57 n = int(s)
58 58 if n >= 1:
59 59 return n
60 60 except ValueError:
61 61 raise error.Abort(_('number of cpus must be an integer'))
62 62 return min(max(countcpus(), 4), 32)
63 63
64 64 if pycompat.isposix or pycompat.iswindows:
65 _startupcost = 0.01
65 _STARTUP_COST = 0.01
66 66 else:
67 _startupcost = 1e30
67 _STARTUP_COST = 1e30
68 68
69 69 def worthwhile(ui, costperop, nops):
70 70 '''try to determine whether the benefit of multiple processes can
71 71 outweigh the cost of starting them'''
72 72 linear = costperop * nops
73 73 workers = _numworkers(ui)
74 benefit = linear - (_startupcost * workers + linear / workers)
74 benefit = linear - (_STARTUP_COST * workers + linear / workers)
75 75 return benefit >= 0.15
76 76
77 77 def worker(ui, costperarg, func, staticargs, args):
78 78 '''run a function, possibly in parallel in multiple worker
79 79 processes.
80 80
81 81 returns a progress iterator
82 82
83 83 costperarg - cost of a single task
84 84
85 85 func - function to run
86 86
87 87 staticargs - arguments to pass to every invocation of the function
88 88
89 89 args - arguments to split into chunks, to pass to individual
90 90 workers
91 91 '''
92 92 enabled = ui.configbool('worker', 'enabled')
93 93 if enabled and worthwhile(ui, costperarg, len(args)):
94 94 return _platformworker(ui, func, staticargs, args)
95 95 return func(*staticargs + (args,))
96 96
97 97 def _posixworker(ui, func, staticargs, args):
98 98 workers = _numworkers(ui)
99 99 oldhandler = signal.getsignal(signal.SIGINT)
100 100 signal.signal(signal.SIGINT, signal.SIG_IGN)
101 101 pids, problem = set(), [0]
102 102 def killworkers():
103 103 # unregister SIGCHLD handler as all children will be killed. This
104 104 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
105 105 # could be updated while iterating, which would cause inconsistency.
106 106 signal.signal(signal.SIGCHLD, oldchldhandler)
107 107 # if one worker bails, there's no good reason to wait for the rest
108 108 for p in pids:
109 109 try:
110 110 os.kill(p, signal.SIGTERM)
111 111 except OSError as err:
112 112 if err.errno != errno.ESRCH:
113 113 raise
114 114 def waitforworkers(blocking=True):
115 115 for pid in pids.copy():
116 116 p = st = 0
117 117 while True:
118 118 try:
119 119 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
120 120 break
121 121 except OSError as e:
122 122 if e.errno == errno.EINTR:
123 123 continue
124 124 elif e.errno == errno.ECHILD:
125 125 # child would already be reaped, but pids yet been
126 126 # updated (maybe interrupted just after waitpid)
127 127 pids.discard(pid)
128 128 break
129 129 else:
130 130 raise
131 131 if not p:
132 132 # skip subsequent steps, because child process should
133 133 # be still running in this case
134 134 continue
135 135 pids.discard(p)
136 136 st = _exitstatus(st)
137 137 if st and not problem[0]:
138 138 problem[0] = st
139 139 def sigchldhandler(signum, frame):
140 140 waitforworkers(blocking=False)
141 141 if problem[0]:
142 142 killworkers()
143 143 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
144 144 ui.flush()
145 145 parentpid = os.getpid()
146 146 pipes = []
147 147 for pargs in partition(args, workers):
148 148 # Every worker gets its own pipe to send results on, so we don't have to
149 149 # implement atomic writes larger than PIPE_BUF. Each forked process has
150 150 # its own pipe's descriptors in the local variables, and the parent
151 151 # process has the full list of pipe descriptors (and it doesn't really
152 152 # care what order they're in).
153 153 rfd, wfd = os.pipe()
154 154 pipes.append((rfd, wfd))
155 155 # make sure we use os._exit in all worker code paths. otherwise the
156 156 # worker may do some clean-ups which could cause surprises like
157 157 # deadlock. see sshpeer.cleanup for example.
158 158 # override error handling *before* fork. this is necessary because
159 159 # exception (signal) may arrive after fork, before "pid =" assignment
160 160 # completes, and other exception handler (dispatch.py) can lead to
161 161 # unexpected code path without os._exit.
162 162 ret = -1
163 163 try:
164 164 pid = os.fork()
165 165 if pid == 0:
166 166 signal.signal(signal.SIGINT, oldhandler)
167 167 signal.signal(signal.SIGCHLD, oldchldhandler)
168 168
169 169 def workerfunc():
170 170 for r, w in pipes[:-1]:
171 171 os.close(r)
172 172 os.close(w)
173 173 os.close(rfd)
174 174 for result in func(*(staticargs + (pargs,))):
175 175 os.write(wfd, util.pickle.dumps(result))
176 176 return 0
177 177
178 178 ret = scmutil.callcatch(ui, workerfunc)
179 179 except: # parent re-raises, child never returns
180 180 if os.getpid() == parentpid:
181 181 raise
182 182 exctype = sys.exc_info()[0]
183 183 force = not issubclass(exctype, KeyboardInterrupt)
184 184 ui.traceback(force=force)
185 185 finally:
186 186 if os.getpid() != parentpid:
187 187 try:
188 188 ui.flush()
189 189 except: # never returns, no re-raises
190 190 pass
191 191 finally:
192 192 os._exit(ret & 255)
193 193 pids.add(pid)
194 194 selector = selectors.DefaultSelector()
195 195 for rfd, wfd in pipes:
196 196 os.close(wfd)
197 197 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
198 198 def cleanup():
199 199 signal.signal(signal.SIGINT, oldhandler)
200 200 waitforworkers()
201 201 signal.signal(signal.SIGCHLD, oldchldhandler)
202 202 status = problem[0]
203 203 if status:
204 204 if status < 0:
205 205 os.kill(os.getpid(), -status)
206 206 sys.exit(status)
207 207 try:
208 208 openpipes = len(pipes)
209 209 while openpipes > 0:
210 210 for key, events in selector.select():
211 211 try:
212 212 yield util.pickle.load(key.fileobj)
213 213 except EOFError:
214 214 selector.unregister(key.fileobj)
215 215 key.fileobj.close()
216 216 openpipes -= 1
217 217 except IOError as e:
218 218 if e.errno == errno.EINTR:
219 219 continue
220 220 raise
221 221 except: # re-raises
222 222 killworkers()
223 223 cleanup()
224 224 raise
225 225 cleanup()
226 226
227 227 def _posixexitstatus(code):
228 228 '''convert a posix exit status into the same form returned by
229 229 os.spawnv
230 230
231 231 returns None if the process was stopped instead of exiting'''
232 232 if os.WIFEXITED(code):
233 233 return os.WEXITSTATUS(code)
234 234 elif os.WIFSIGNALED(code):
235 235 return -os.WTERMSIG(code)
236 236
237 237 def _windowsworker(ui, func, staticargs, args):
238 238 class Worker(threading.Thread):
239 239 def __init__(self, taskqueue, resultqueue, func, staticargs,
240 240 group=None, target=None, name=None, verbose=None):
241 241 threading.Thread.__init__(self, group=group, target=target,
242 242 name=name, verbose=verbose)
243 243 self._taskqueue = taskqueue
244 244 self._resultqueue = resultqueue
245 245 self._func = func
246 246 self._staticargs = staticargs
247 247 self._interrupted = False
248 248 self.daemon = True
249 249 self.exception = None
250 250
251 251 def interrupt(self):
252 252 self._interrupted = True
253 253
254 254 def run(self):
255 255 try:
256 256 while not self._taskqueue.empty():
257 257 try:
258 258 args = self._taskqueue.get_nowait()
259 259 for res in self._func(*self._staticargs + (args,)):
260 260 self._resultqueue.put(res)
261 261 # threading doesn't provide a native way to
262 262 # interrupt execution. handle it manually at every
263 263 # iteration.
264 264 if self._interrupted:
265 265 return
266 266 except pycompat.queue.Empty:
267 267 break
268 268 except Exception as e:
269 269 # store the exception such that the main thread can resurface
270 270 # it as if the func was running without workers.
271 271 self.exception = e
272 272 raise
273 273
274 274 threads = []
275 275 def trykillworkers():
276 276 # Allow up to 1 second to clean worker threads nicely
277 277 cleanupend = time.time() + 1
278 278 for t in threads:
279 279 t.interrupt()
280 280 for t in threads:
281 281 remainingtime = cleanupend - time.time()
282 282 t.join(remainingtime)
283 283 if t.is_alive():
284 284 # pass over the workers joining failure. it is more
285 285 # important to surface the inital exception than the
286 286 # fact that one of workers may be processing a large
287 287 # task and does not get to handle the interruption.
288 288 ui.warn(_("failed to kill worker threads while "
289 289 "handling an exception\n"))
290 290 return
291 291
292 292 workers = _numworkers(ui)
293 293 resultqueue = pycompat.queue.Queue()
294 294 taskqueue = pycompat.queue.Queue()
295 295 # partition work to more pieces than workers to minimize the chance
296 296 # of uneven distribution of large tasks between the workers
297 297 for pargs in partition(args, workers * 20):
298 298 taskqueue.put(pargs)
299 299 for _i in range(workers):
300 300 t = Worker(taskqueue, resultqueue, func, staticargs)
301 301 threads.append(t)
302 302 t.start()
303 303 try:
304 304 while len(threads) > 0:
305 305 while not resultqueue.empty():
306 306 yield resultqueue.get()
307 307 threads[0].join(0.05)
308 308 finishedthreads = [_t for _t in threads if not _t.is_alive()]
309 309 for t in finishedthreads:
310 310 if t.exception is not None:
311 311 raise t.exception
312 312 threads.remove(t)
313 313 except (Exception, KeyboardInterrupt): # re-raises
314 314 trykillworkers()
315 315 raise
316 316 while not resultqueue.empty():
317 317 yield resultqueue.get()
318 318
319 319 if pycompat.iswindows:
320 320 _platformworker = _windowsworker
321 321 else:
322 322 _platformworker = _posixworker
323 323 _exitstatus = _posixexitstatus
324 324
325 325 def partition(lst, nslices):
326 326 '''partition a list into N slices of roughly equal size
327 327
328 328 The current strategy takes every Nth element from the input. If
329 329 we ever write workers that need to preserve grouping in input
330 330 we should consider allowing callers to specify a partition strategy.
331 331
332 332 mpm is not a fan of this partitioning strategy when files are involved.
333 333 In his words:
334 334
335 335 Single-threaded Mercurial makes a point of creating and visiting
336 336 files in a fixed order (alphabetical). When creating files in order,
337 337 a typical filesystem is likely to allocate them on nearby regions on
338 338 disk. Thus, when revisiting in the same order, locality is maximized
339 339 and various forms of OS and disk-level caching and read-ahead get a
340 340 chance to work.
341 341
342 342 This effect can be quite significant on spinning disks. I discovered it
343 343 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
344 344 Tarring a repo and copying it to another disk effectively randomized
345 345 the revlog ordering on disk by sorting the revlogs by hash and suddenly
346 346 performance of my kernel checkout benchmark dropped by ~10x because the
347 347 "working set" of sectors visited no longer fit in the drive's cache and
348 348 the workload switched from streaming to random I/O.
349 349
350 350 What we should really be doing is have workers read filenames from a
351 351 ordered queue. This preserves locality and also keeps any worker from
352 352 getting more than one file out of balance.
353 353 '''
354 354 for i in range(nslices):
355 355 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now