##// END OF EJS Templates
worker: remove Python 2 support code...
Gregory Szorc -
r49756:cc0e059d default
parent child Browse files
Show More
@@ -1,467 +1,455 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import signal
13 13 import sys
14 14 import threading
15 15 import time
16 16
17 17 try:
18 18 import selectors
19 19
20 20 selectors.BaseSelector
21 21 except ImportError:
22 22 from .thirdparty import selectors2 as selectors
23 23
24 24 from .i18n import _
25 25 from . import (
26 26 encoding,
27 27 error,
28 28 pycompat,
29 29 scmutil,
30 30 )
31 31
32 32
33 33 def countcpus():
34 34 '''try to count the number of CPUs on the system'''
35 35
36 36 # posix
37 37 try:
38 38 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
39 39 if n > 0:
40 40 return n
41 41 except (AttributeError, ValueError):
42 42 pass
43 43
44 44 # windows
45 45 try:
46 46 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
47 47 if n > 0:
48 48 return n
49 49 except (KeyError, ValueError):
50 50 pass
51 51
52 52 return 1
53 53
54 54
55 55 def _numworkers(ui):
56 56 s = ui.config(b'worker', b'numcpus')
57 57 if s:
58 58 try:
59 59 n = int(s)
60 60 if n >= 1:
61 61 return n
62 62 except ValueError:
63 63 raise error.Abort(_(b'number of cpus must be an integer'))
64 64 return min(max(countcpus(), 4), 32)
65 65
66 66
67 if pycompat.ispy3:
68
69 def ismainthread():
70 return threading.current_thread() == threading.main_thread()
71
72 class _blockingreader(object):
73 def __init__(self, wrapped):
74 self._wrapped = wrapped
75
76 # Do NOT implement readinto() by making it delegate to
77 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
78 # with just read() and readline(), so we don't need to implement it.
79
80 def readline(self):
81 return self._wrapped.readline()
82
83 # issue multiple reads until size is fulfilled
84 def read(self, size=-1):
85 if size < 0:
86 return self._wrapped.readall()
87
88 buf = bytearray(size)
89 view = memoryview(buf)
90 pos = 0
91
92 while pos < size:
93 ret = self._wrapped.readinto(view[pos:])
94 if not ret:
95 break
96 pos += ret
97
98 del view
99 del buf[pos:]
100 return bytes(buf)
67 def ismainthread():
68 return threading.current_thread() == threading.main_thread()
101 69
102 70
103 else:
71 class _blockingreader(object):
72 def __init__(self, wrapped):
73 self._wrapped = wrapped
74
75 # Do NOT implement readinto() by making it delegate to
76 # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
77 # with just read() and readline(), so we don't need to implement it.
78
79 def readline(self):
80 return self._wrapped.readline()
104 81
105 def ismainthread():
106 # pytype: disable=module-attr
107 return isinstance(threading.current_thread(), threading._MainThread)
108 # pytype: enable=module-attr
82 # issue multiple reads until size is fulfilled
83 def read(self, size=-1):
84 if size < 0:
85 return self._wrapped.readall()
86
87 buf = bytearray(size)
88 view = memoryview(buf)
89 pos = 0
109 90
110 def _blockingreader(wrapped):
111 return wrapped
91 while pos < size:
92 ret = self._wrapped.readinto(view[pos:])
93 if not ret:
94 break
95 pos += ret
96
97 del view
98 del buf[pos:]
99 return bytes(buf)
112 100
113 101
114 102 if pycompat.isposix or pycompat.iswindows:
115 103 _STARTUP_COST = 0.01
116 104 # The Windows worker is thread based. If tasks are CPU bound, threads
117 105 # in the presence of the GIL result in excessive context switching and
118 106 # this overhead can slow down execution.
119 107 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
120 108 else:
121 109 _STARTUP_COST = 1e30
122 110 _DISALLOW_THREAD_UNSAFE = False
123 111
124 112
125 113 def worthwhile(ui, costperop, nops, threadsafe=True):
126 114 """try to determine whether the benefit of multiple processes can
127 115 outweigh the cost of starting them"""
128 116
129 117 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
130 118 return False
131 119
132 120 linear = costperop * nops
133 121 workers = _numworkers(ui)
134 122 benefit = linear - (_STARTUP_COST * workers + linear / workers)
135 123 return benefit >= 0.15
136 124
137 125
138 126 def worker(
139 127 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
140 128 ):
141 129 """run a function, possibly in parallel in multiple worker
142 130 processes.
143 131
144 132 returns a progress iterator
145 133
146 134 costperarg - cost of a single task
147 135
148 136 func - function to run. It is expected to return a progress iterator.
149 137
150 138 staticargs - arguments to pass to every invocation of the function
151 139
152 140 args - arguments to split into chunks, to pass to individual
153 141 workers
154 142
155 143 hasretval - when True, func and the current function return an progress
156 144 iterator then a dict (encoded as an iterator that yield many (False, ..)
157 145 then a (True, dict)). The dicts are joined in some arbitrary order, so
158 146 overlapping keys are a bad idea.
159 147
160 148 threadsafe - whether work items are thread safe and can be executed using
161 149 a thread-based worker. Should be disabled for CPU heavy tasks that don't
162 150 release the GIL.
163 151 """
164 152 enabled = ui.configbool(b'worker', b'enabled')
165 153 if enabled and _platformworker is _posixworker and not ismainthread():
166 154 # The POSIX worker has to install a handler for SIGCHLD.
167 155 # Python up to 3.9 only allows this in the main thread.
168 156 enabled = False
169 157
170 158 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
171 159 return _platformworker(ui, func, staticargs, args, hasretval)
172 160 return func(*staticargs + (args,))
173 161
174 162
175 163 def _posixworker(ui, func, staticargs, args, hasretval):
176 164 workers = _numworkers(ui)
177 165 oldhandler = signal.getsignal(signal.SIGINT)
178 166 signal.signal(signal.SIGINT, signal.SIG_IGN)
179 167 pids, problem = set(), [0]
180 168
181 169 def killworkers():
182 170 # unregister SIGCHLD handler as all children will be killed. This
183 171 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
184 172 # could be updated while iterating, which would cause inconsistency.
185 173 signal.signal(signal.SIGCHLD, oldchldhandler)
186 174 # if one worker bails, there's no good reason to wait for the rest
187 175 for p in pids:
188 176 try:
189 177 os.kill(p, signal.SIGTERM)
190 178 except OSError as err:
191 179 if err.errno != errno.ESRCH:
192 180 raise
193 181
194 182 def waitforworkers(blocking=True):
195 183 for pid in pids.copy():
196 184 p = st = 0
197 185 while True:
198 186 try:
199 187 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
200 188 break
201 189 except OSError as e:
202 190 if e.errno == errno.EINTR:
203 191 continue
204 192 elif e.errno == errno.ECHILD:
205 193 # child would already be reaped, but pids yet been
206 194 # updated (maybe interrupted just after waitpid)
207 195 pids.discard(pid)
208 196 break
209 197 else:
210 198 raise
211 199 if not p:
212 200 # skip subsequent steps, because child process should
213 201 # be still running in this case
214 202 continue
215 203 pids.discard(p)
216 204 st = _exitstatus(st)
217 205 if st and not problem[0]:
218 206 problem[0] = st
219 207
220 208 def sigchldhandler(signum, frame):
221 209 waitforworkers(blocking=False)
222 210 if problem[0]:
223 211 killworkers()
224 212
225 213 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
226 214 ui.flush()
227 215 parentpid = os.getpid()
228 216 pipes = []
229 217 retval = {}
230 218 for pargs in partition(args, min(workers, len(args))):
231 219 # Every worker gets its own pipe to send results on, so we don't have to
232 220 # implement atomic writes larger than PIPE_BUF. Each forked process has
233 221 # its own pipe's descriptors in the local variables, and the parent
234 222 # process has the full list of pipe descriptors (and it doesn't really
235 223 # care what order they're in).
236 224 rfd, wfd = os.pipe()
237 225 pipes.append((rfd, wfd))
238 226 # make sure we use os._exit in all worker code paths. otherwise the
239 227 # worker may do some clean-ups which could cause surprises like
240 228 # deadlock. see sshpeer.cleanup for example.
241 229 # override error handling *before* fork. this is necessary because
242 230 # exception (signal) may arrive after fork, before "pid =" assignment
243 231 # completes, and other exception handler (dispatch.py) can lead to
244 232 # unexpected code path without os._exit.
245 233 ret = -1
246 234 try:
247 235 pid = os.fork()
248 236 if pid == 0:
249 237 signal.signal(signal.SIGINT, oldhandler)
250 238 signal.signal(signal.SIGCHLD, oldchldhandler)
251 239
252 240 def workerfunc():
253 241 for r, w in pipes[:-1]:
254 242 os.close(r)
255 243 os.close(w)
256 244 os.close(rfd)
257 245 for result in func(*(staticargs + (pargs,))):
258 246 os.write(wfd, pickle.dumps(result))
259 247 return 0
260 248
261 249 ret = scmutil.callcatch(ui, workerfunc)
262 250 except: # parent re-raises, child never returns
263 251 if os.getpid() == parentpid:
264 252 raise
265 253 exctype = sys.exc_info()[0]
266 254 force = not issubclass(exctype, KeyboardInterrupt)
267 255 ui.traceback(force=force)
268 256 finally:
269 257 if os.getpid() != parentpid:
270 258 try:
271 259 ui.flush()
272 260 except: # never returns, no re-raises
273 261 pass
274 262 finally:
275 263 os._exit(ret & 255)
276 264 pids.add(pid)
277 265 selector = selectors.DefaultSelector()
278 266 for rfd, wfd in pipes:
279 267 os.close(wfd)
280 268 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
281 269
282 270 def cleanup():
283 271 signal.signal(signal.SIGINT, oldhandler)
284 272 waitforworkers()
285 273 signal.signal(signal.SIGCHLD, oldchldhandler)
286 274 selector.close()
287 275 return problem[0]
288 276
289 277 try:
290 278 openpipes = len(pipes)
291 279 while openpipes > 0:
292 280 for key, events in selector.select():
293 281 try:
294 282 res = pickle.load(_blockingreader(key.fileobj))
295 283 if hasretval and res[0]:
296 284 retval.update(res[1])
297 285 else:
298 286 yield res
299 287 except EOFError:
300 288 selector.unregister(key.fileobj)
301 289 key.fileobj.close()
302 290 openpipes -= 1
303 291 except IOError as e:
304 292 if e.errno == errno.EINTR:
305 293 continue
306 294 raise
307 295 except: # re-raises
308 296 killworkers()
309 297 cleanup()
310 298 raise
311 299 status = cleanup()
312 300 if status:
313 301 if status < 0:
314 302 os.kill(os.getpid(), -status)
315 303 raise error.WorkerError(status)
316 304 if hasretval:
317 305 yield True, retval
318 306
319 307
320 308 def _posixexitstatus(code):
321 309 """convert a posix exit status into the same form returned by
322 310 os.spawnv
323 311
324 312 returns None if the process was stopped instead of exiting"""
325 313 if os.WIFEXITED(code):
326 314 return os.WEXITSTATUS(code)
327 315 elif os.WIFSIGNALED(code):
328 316 return -(os.WTERMSIG(code))
329 317
330 318
331 319 def _windowsworker(ui, func, staticargs, args, hasretval):
332 320 class Worker(threading.Thread):
333 321 def __init__(
334 322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
335 323 ):
336 324 threading.Thread.__init__(self, *args, **kwargs)
337 325 self._taskqueue = taskqueue
338 326 self._resultqueue = resultqueue
339 327 self._func = func
340 328 self._staticargs = staticargs
341 329 self._interrupted = False
342 330 self.daemon = True
343 331 self.exception = None
344 332
345 333 def interrupt(self):
346 334 self._interrupted = True
347 335
348 336 def run(self):
349 337 try:
350 338 while not self._taskqueue.empty():
351 339 try:
352 340 args = self._taskqueue.get_nowait()
353 341 for res in self._func(*self._staticargs + (args,)):
354 342 self._resultqueue.put(res)
355 343 # threading doesn't provide a native way to
356 344 # interrupt execution. handle it manually at every
357 345 # iteration.
358 346 if self._interrupted:
359 347 return
360 348 except pycompat.queue.Empty:
361 349 break
362 350 except Exception as e:
363 351 # store the exception such that the main thread can resurface
364 352 # it as if the func was running without workers.
365 353 self.exception = e
366 354 raise
367 355
368 356 threads = []
369 357
370 358 def trykillworkers():
371 359 # Allow up to 1 second to clean worker threads nicely
372 360 cleanupend = time.time() + 1
373 361 for t in threads:
374 362 t.interrupt()
375 363 for t in threads:
376 364 remainingtime = cleanupend - time.time()
377 365 t.join(remainingtime)
378 366 if t.is_alive():
379 367 # pass over the workers joining failure. it is more
380 368 # important to surface the inital exception than the
381 369 # fact that one of workers may be processing a large
382 370 # task and does not get to handle the interruption.
383 371 ui.warn(
384 372 _(
385 373 b"failed to kill worker threads while "
386 374 b"handling an exception\n"
387 375 )
388 376 )
389 377 return
390 378
391 379 workers = _numworkers(ui)
392 380 resultqueue = pycompat.queue.Queue()
393 381 taskqueue = pycompat.queue.Queue()
394 382 retval = {}
395 383 # partition work to more pieces than workers to minimize the chance
396 384 # of uneven distribution of large tasks between the workers
397 385 for pargs in partition(args, workers * 20):
398 386 taskqueue.put(pargs)
399 387 for _i in range(workers):
400 388 t = Worker(taskqueue, resultqueue, func, staticargs)
401 389 threads.append(t)
402 390 t.start()
403 391 try:
404 392 while len(threads) > 0:
405 393 while not resultqueue.empty():
406 394 res = resultqueue.get()
407 395 if hasretval and res[0]:
408 396 retval.update(res[1])
409 397 else:
410 398 yield res
411 399 threads[0].join(0.05)
412 400 finishedthreads = [_t for _t in threads if not _t.is_alive()]
413 401 for t in finishedthreads:
414 402 if t.exception is not None:
415 403 raise t.exception
416 404 threads.remove(t)
417 405 except (Exception, KeyboardInterrupt): # re-raises
418 406 trykillworkers()
419 407 raise
420 408 while not resultqueue.empty():
421 409 res = resultqueue.get()
422 410 if hasretval and res[0]:
423 411 retval.update(res[1])
424 412 else:
425 413 yield res
426 414 if hasretval:
427 415 yield True, retval
428 416
429 417
430 418 if pycompat.iswindows:
431 419 _platformworker = _windowsworker
432 420 else:
433 421 _platformworker = _posixworker
434 422 _exitstatus = _posixexitstatus
435 423
436 424
437 425 def partition(lst, nslices):
438 426 """partition a list into N slices of roughly equal size
439 427
440 428 The current strategy takes every Nth element from the input. If
441 429 we ever write workers that need to preserve grouping in input
442 430 we should consider allowing callers to specify a partition strategy.
443 431
444 432 olivia is not a fan of this partitioning strategy when files are involved.
445 433 In his words:
446 434
447 435 Single-threaded Mercurial makes a point of creating and visiting
448 436 files in a fixed order (alphabetical). When creating files in order,
449 437 a typical filesystem is likely to allocate them on nearby regions on
450 438 disk. Thus, when revisiting in the same order, locality is maximized
451 439 and various forms of OS and disk-level caching and read-ahead get a
452 440 chance to work.
453 441
454 442 This effect can be quite significant on spinning disks. I discovered it
455 443 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
456 444 Tarring a repo and copying it to another disk effectively randomized
457 445 the revlog ordering on disk by sorting the revlogs by hash and suddenly
458 446 performance of my kernel checkout benchmark dropped by ~10x because the
459 447 "working set" of sectors visited no longer fit in the drive's cache and
460 448 the workload switched from streaming to random I/O.
461 449
462 450 What we should really be doing is have workers read filenames from a
463 451 ordered queue. This preserves locality and also keeps any worker from
464 452 getting more than one file out of balance.
465 453 """
466 454 for i in range(nslices):
467 455 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now