worker.py
455 lines
| 15.0 KiB
| text/x-python
|
PythonLexer
/ mercurial / worker.py
Bryan O'Sullivan
|
r18635 | # worker.py - master-slave parallelism support | ||
# | ||||
# Copyright 2013 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Gregory Szorc
|
r25992 | from __future__ import absolute_import | ||
import errno | ||||
import os | ||||
import signal | ||||
import sys | ||||
Wojciech Lis
|
r35427 | import threading | ||
Wojciech Lis
|
r35448 | import time | ||
Gregory Szorc
|
r25992 | |||
Danny Hooper
|
r38752 | try: | ||
import selectors | ||||
Augie Fackler
|
r43346 | |||
Danny Hooper
|
r38752 | selectors.BaseSelector | ||
except ImportError: | ||||
from .thirdparty import selectors2 as selectors | ||||
Gregory Szorc
|
r25992 | from .i18n import _ | ||
Jun Wu
|
r30396 | from . import ( | ||
Pulkit Goyal
|
r30635 | encoding, | ||
Jun Wu
|
r30396 | error, | ||
Pulkit Goyal
|
r30639 | pycompat, | ||
Jun Wu
|
r30521 | scmutil, | ||
Jun Wu
|
r30396 | util, | ||
) | ||||
Bryan O'Sullivan
|
r18635 | |||
Augie Fackler
|
r43346 | |||
Bryan O'Sullivan
|
r18635 | def countcpus(): | ||
'''try to count the number of CPUs on the system''' | ||||
Gregory Szorc
|
r26568 | |||
# posix | ||||
Bryan O'Sullivan
|
r18635 | try: | ||
Augie Fackler
|
r43906 | n = int(os.sysconf('SC_NPROCESSORS_ONLN')) | ||
Gregory Szorc
|
r26568 | if n > 0: | ||
return n | ||||
except (AttributeError, ValueError): | ||||
pass | ||||
# windows | ||||
try: | ||||
Augie Fackler
|
r43347 | n = int(encoding.environ[b'NUMBER_OF_PROCESSORS']) | ||
Gregory Szorc
|
r26568 | if n > 0: | ||
return n | ||||
except (KeyError, ValueError): | ||||
pass | ||||
return 1 | ||||
Bryan O'Sullivan
|
r18636 | |||
Augie Fackler
|
r43346 | |||
Bryan O'Sullivan
|
r18636 | def _numworkers(ui): | ||
Augie Fackler
|
r43347 | s = ui.config(b'worker', b'numcpus') | ||
Bryan O'Sullivan
|
r18636 | if s: | ||
try: | ||||
n = int(s) | ||||
if n >= 1: | ||||
return n | ||||
except ValueError: | ||||
Augie Fackler
|
r43347 | raise error.Abort(_(b'number of cpus must be an integer')) | ||
Bryan O'Sullivan
|
r18636 | return min(max(countcpus(), 4), 32) | ||
Augie Fackler
|
r43346 | |||
Jan Alexander Steffens (heftig)
|
r44751 | if pycompat.ispy3: | ||
class _blockingreader(object): | ||||
def __init__(self, wrapped): | ||||
self._wrapped = wrapped | ||||
Martin von Zweigbergk
|
r45950 | # Do NOT implement readinto() by making it delegate to | ||
# _wrapped.readinto(), since that is unbuffered. The unpickler is fine | ||||
# with just read() and readline(), so we don't need to implement it. | ||||
def readline(self): | ||||
return self._wrapped.readline() | ||||
Jan Alexander Steffens (heftig)
|
r44751 | |||
# issue multiple reads until size is fulfilled | ||||
def read(self, size=-1): | ||||
if size < 0: | ||||
return self._wrapped.readall() | ||||
buf = bytearray(size) | ||||
view = memoryview(buf) | ||||
pos = 0 | ||||
while pos < size: | ||||
ret = self._wrapped.readinto(view[pos:]) | ||||
if not ret: | ||||
break | ||||
pos += ret | ||||
del view | ||||
del buf[pos:] | ||||
Martin von Zweigbergk
|
r45950 | return bytes(buf) | ||
Jan Alexander Steffens (heftig)
|
r44751 | |||
else: | ||||
def _blockingreader(wrapped): | ||||
return wrapped | ||||
Wojciech Lis
|
r35427 | if pycompat.isposix or pycompat.iswindows: | ||
Gregory Szorc
|
r38753 | _STARTUP_COST = 0.01 | ||
Gregory Szorc
|
r38754 | # The Windows worker is thread based. If tasks are CPU bound, threads | ||
# in the presence of the GIL result in excessive context switching and | ||||
# this overhead can slow down execution. | ||||
_DISALLOW_THREAD_UNSAFE = pycompat.iswindows | ||||
Bryan O'Sullivan
|
r18636 | else: | ||
Gregory Szorc
|
r38753 | _STARTUP_COST = 1e30 | ||
Gregory Szorc
|
r38754 | _DISALLOW_THREAD_UNSAFE = False | ||
Bryan O'Sullivan
|
r18636 | |||
Augie Fackler
|
r43346 | |||
Gregory Szorc
|
r38754 | def worthwhile(ui, costperop, nops, threadsafe=True): | ||
Augie Fackler
|
r46554 | """try to determine whether the benefit of multiple processes can | ||
outweigh the cost of starting them""" | ||||
Gregory Szorc
|
r38754 | |||
if not threadsafe and _DISALLOW_THREAD_UNSAFE: | ||||
return False | ||||
Bryan O'Sullivan
|
r18636 | linear = costperop * nops | ||
workers = _numworkers(ui) | ||||
Gregory Szorc
|
r38753 | benefit = linear - (_STARTUP_COST * workers + linear / workers) | ||
Bryan O'Sullivan
|
r18636 | return benefit >= 0.15 | ||
Bryan O'Sullivan
|
r18637 | |||
Augie Fackler
|
r43346 | |||
def worker( | ||||
ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True | ||||
): | ||||
Augie Fackler
|
r46554 | """run a function, possibly in parallel in multiple worker | ||
Bryan O'Sullivan
|
r18638 | processes. | ||
returns a progress iterator | ||||
costperarg - cost of a single task | ||||
Valentin Gatien-Baron
|
r42655 | func - function to run. It is expected to return a progress iterator. | ||
Bryan O'Sullivan
|
r18638 | |||
staticargs - arguments to pass to every invocation of the function | ||||
args - arguments to split into chunks, to pass to individual | ||||
workers | ||||
Gregory Szorc
|
r38754 | |||
Valentin Gatien-Baron
|
r42655 | hasretval - when True, func and the current function return an progress | ||
Valentin Gatien-Baron
|
r42722 | iterator then a dict (encoded as an iterator that yield many (False, ..) | ||
then a (True, dict)). The dicts are joined in some arbitrary order, so | ||||
overlapping keys are a bad idea. | ||||
Valentin Gatien-Baron
|
r42655 | |||
Gregory Szorc
|
r38754 | threadsafe - whether work items are thread safe and can be executed using | ||
a thread-based worker. Should be disabled for CPU heavy tasks that don't | ||||
release the GIL. | ||||
Augie Fackler
|
r46554 | """ | ||
Augie Fackler
|
r43347 | enabled = ui.configbool(b'worker', b'enabled') | ||
Gregory Szorc
|
r38754 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | ||
Valentin Gatien-Baron
|
r42655 | return _platformworker(ui, func, staticargs, args, hasretval) | ||
Bryan O'Sullivan
|
r18638 | return func(*staticargs + (args,)) | ||
Augie Fackler
|
r43346 | |||
Valentin Gatien-Baron
|
r42655 | def _posixworker(ui, func, staticargs, args, hasretval): | ||
Bryan O'Sullivan
|
r18638 | workers = _numworkers(ui) | ||
Bryan O'Sullivan
|
r18708 | oldhandler = signal.getsignal(signal.SIGINT) | ||
signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
Jun Wu
|
r30413 | pids, problem = set(), [0] | ||
Augie Fackler
|
r43346 | |||
Jun Wu
|
r30410 | def killworkers(): | ||
Yuya Nishihara
|
r30423 | # unregister SIGCHLD handler as all children will be killed. This | ||
# function shouldn't be interrupted by another SIGCHLD; otherwise pids | ||||
# could be updated while iterating, which would cause inconsistency. | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
Jun Wu
|
r30410 | # if one worker bails, there's no good reason to wait for the rest | ||
for p in pids: | ||||
try: | ||||
os.kill(p, signal.SIGTERM) | ||||
except OSError as err: | ||||
if err.errno != errno.ESRCH: | ||||
raise | ||||
Augie Fackler
|
r43346 | |||
Jun Wu
|
r30412 | def waitforworkers(blocking=True): | ||
Jun Wu
|
r30414 | for pid in pids.copy(): | ||
p = st = 0 | ||||
while True: | ||||
try: | ||||
p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) | ||||
Yuya Nishihara
|
r30422 | break | ||
Jun Wu
|
r30414 | except OSError as e: | ||
if e.errno == errno.EINTR: | ||||
continue | ||||
elif e.errno == errno.ECHILD: | ||||
Yuya Nishihara
|
r30425 | # child would already be reaped, but pids yet been | ||
# updated (maybe interrupted just after waitpid) | ||||
pids.discard(pid) | ||||
break | ||||
Jun Wu
|
r30414 | else: | ||
raise | ||||
FUJIWARA Katsunori
|
r31063 | if not p: | ||
# skip subsequent steps, because child process should | ||||
# be still running in this case | ||||
continue | ||||
pids.discard(p) | ||||
st = _exitstatus(st) | ||||
Jun Wu
|
r30410 | if st and not problem[0]: | ||
problem[0] = st | ||||
Augie Fackler
|
r43346 | |||
Jun Wu
|
r30415 | def sigchldhandler(signum, frame): | ||
waitforworkers(blocking=False) | ||||
Yuya Nishihara
|
r30424 | if problem[0]: | ||
killworkers() | ||||
Augie Fackler
|
r43346 | |||
Jun Wu
|
r30415 | oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | ||
David Soria Parra
|
r31696 | ui.flush() | ||
Jun Wu
|
r32112 | parentpid = os.getpid() | ||
Danny Hooper
|
r38752 | pipes = [] | ||
Valentin Gatien-Baron
|
r42722 | retval = {} | ||
Martin von Zweigbergk
|
r45936 | for pargs in partition(args, min(workers, len(args))): | ||
Danny Hooper
|
r38752 | # Every worker gets its own pipe to send results on, so we don't have to | ||
# implement atomic writes larger than PIPE_BUF. Each forked process has | ||||
# its own pipe's descriptors in the local variables, and the parent | ||||
# process has the full list of pipe descriptors (and it doesn't really | ||||
# care what order they're in). | ||||
rfd, wfd = os.pipe() | ||||
pipes.append((rfd, wfd)) | ||||
Jun Wu
|
r32112 | # make sure we use os._exit in all worker code paths. otherwise the | ||
# worker may do some clean-ups which could cause surprises like | ||||
# deadlock. see sshpeer.cleanup for example. | ||||
# override error handling *before* fork. this is necessary because | ||||
# exception (signal) may arrive after fork, before "pid =" assignment | ||||
# completes, and other exception handler (dispatch.py) can lead to | ||||
# unexpected code path without os._exit. | ||||
ret = -1 | ||||
try: | ||||
pid = os.fork() | ||||
if pid == 0: | ||||
signal.signal(signal.SIGINT, oldhandler) | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
Jun Wu
|
r30521 | |||
Jun Wu
|
r32112 | def workerfunc(): | ||
Danny Hooper
|
r38752 | for r, w in pipes[:-1]: | ||
os.close(r) | ||||
os.close(w) | ||||
Jun Wu
|
r32112 | os.close(rfd) | ||
Danny Hooper
|
r38553 | for result in func(*(staticargs + (pargs,))): | ||
Valentin Gatien-Baron
|
r42722 | os.write(wfd, util.pickle.dumps(result)) | ||
Jun Wu
|
r32112 | return 0 | ||
ret = scmutil.callcatch(ui, workerfunc) | ||||
Augie Fackler
|
r43346 | except: # parent re-raises, child never returns | ||
Jun Wu
|
r32112 | if os.getpid() == parentpid: | ||
raise | ||||
exctype = sys.exc_info()[0] | ||||
force = not issubclass(exctype, KeyboardInterrupt) | ||||
ui.traceback(force=force) | ||||
finally: | ||||
if os.getpid() != parentpid: | ||||
Yuya Nishihara
|
r31118 | try: | ||
ui.flush() | ||||
Augie Fackler
|
r43346 | except: # never returns, no re-raises | ||
Jun Wu
|
r32112 | pass | ||
Jun Wu
|
r30521 | finally: | ||
Jun Wu
|
r32112 | os._exit(ret & 255) | ||
Jun Wu
|
r30413 | pids.add(pid) | ||
Danny Hooper
|
r38752 | selector = selectors.DefaultSelector() | ||
for rfd, wfd in pipes: | ||||
os.close(wfd) | ||||
Jan Alexander Steffens (heftig)
|
r44751 | selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ) | ||
Augie Fackler
|
r43346 | |||
Bryan O'Sullivan
|
r18638 | def cleanup(): | ||
signal.signal(signal.SIGINT, oldhandler) | ||||
Jun Wu
|
r30416 | waitforworkers() | ||
Jun Wu
|
r30415 | signal.signal(signal.SIGCHLD, oldchldhandler) | ||
Yuya Nishihara
|
r38763 | selector.close() | ||
Yuya Nishihara
|
r41024 | return problem[0] | ||
Augie Fackler
|
r43346 | |||
Bryan O'Sullivan
|
r18638 | try: | ||
Danny Hooper
|
r38752 | openpipes = len(pipes) | ||
while openpipes > 0: | ||||
for key, events in selector.select(): | ||||
try: | ||||
Jan Alexander Steffens (heftig)
|
r44751 | res = util.pickle.load(_blockingreader(key.fileobj)) | ||
Valentin Gatien-Baron
|
r42655 | if hasretval and res[0]: | ||
Valentin Gatien-Baron
|
r42722 | retval.update(res[1]) | ||
Valentin Gatien-Baron
|
r42655 | else: | ||
yield res | ||||
Danny Hooper
|
r38752 | except EOFError: | ||
selector.unregister(key.fileobj) | ||||
key.fileobj.close() | ||||
openpipes -= 1 | ||||
except IOError as e: | ||||
if e.errno == errno.EINTR: | ||||
continue | ||||
raise | ||||
Augie Fackler
|
r43346 | except: # re-raises | ||
Bryan O'Sullivan
|
r18709 | killworkers() | ||
Bryan O'Sullivan
|
r18638 | cleanup() | ||
raise | ||||
Yuya Nishihara
|
r41024 | status = cleanup() | ||
if status: | ||||
if status < 0: | ||||
os.kill(os.getpid(), -status) | ||||
Martin von Zweigbergk
|
r46429 | raise error.WorkerError(status) | ||
Valentin Gatien-Baron
|
r42655 | if hasretval: | ||
Valentin Gatien-Baron
|
r42722 | yield True, retval | ||
Bryan O'Sullivan
|
r18638 | |||
Augie Fackler
|
r43346 | |||
Bryan O'Sullivan
|
r18707 | def _posixexitstatus(code): | ||
Augie Fackler
|
r46554 | """convert a posix exit status into the same form returned by | ||
Bryan O'Sullivan
|
r18707 | os.spawnv | ||
Augie Fackler
|
r46554 | returns None if the process was stopped instead of exiting""" | ||
Bryan O'Sullivan
|
r18707 | if os.WIFEXITED(code): | ||
return os.WEXITSTATUS(code) | ||||
elif os.WIFSIGNALED(code): | ||||
Augie Fackler
|
r43346 | return -(os.WTERMSIG(code)) | ||
Bryan O'Sullivan
|
r18707 | |||
Valentin Gatien-Baron
|
r42655 | def _windowsworker(ui, func, staticargs, args, hasretval): | ||
Wojciech Lis
|
r35427 | class Worker(threading.Thread): | ||
Augie Fackler
|
r43346 | def __init__( | ||
self, taskqueue, resultqueue, func, staticargs, *args, **kwargs | ||||
): | ||||
Matt Harbison
|
r40475 | threading.Thread.__init__(self, *args, **kwargs) | ||
Wojciech Lis
|
r35427 | self._taskqueue = taskqueue | ||
self._resultqueue = resultqueue | ||||
self._func = func | ||||
self._staticargs = staticargs | ||||
Wojciech Lis
|
r35428 | self._interrupted = False | ||
Wojciech Lis
|
r35448 | self.daemon = True | ||
Wojciech Lis
|
r35428 | self.exception = None | ||
def interrupt(self): | ||||
self._interrupted = True | ||||
Wojciech Lis
|
r35427 | |||
def run(self): | ||||
Wojciech Lis
|
r35428 | try: | ||
while not self._taskqueue.empty(): | ||||
try: | ||||
Valentin Gatien-Baron
|
r42722 | args = self._taskqueue.get_nowait() | ||
Wojciech Lis
|
r35428 | for res in self._func(*self._staticargs + (args,)): | ||
Valentin Gatien-Baron
|
r42722 | self._resultqueue.put(res) | ||
Wojciech Lis
|
r35428 | # threading doesn't provide a native way to | ||
# interrupt execution. handle it manually at every | ||||
# iteration. | ||||
if self._interrupted: | ||||
return | ||||
Gregory Szorc
|
r37863 | except pycompat.queue.Empty: | ||
Wojciech Lis
|
r35428 | break | ||
except Exception as e: | ||||
# store the exception such that the main thread can resurface | ||||
# it as if the func was running without workers. | ||||
self.exception = e | ||||
raise | ||||
threads = [] | ||||
Augie Fackler
|
r43346 | |||
Wojciech Lis
|
r35448 | def trykillworkers(): | ||
# Allow up to 1 second to clean worker threads nicely | ||||
cleanupend = time.time() + 1 | ||||
Wojciech Lis
|
r35428 | for t in threads: | ||
t.interrupt() | ||||
for t in threads: | ||||
Wojciech Lis
|
r35448 | remainingtime = cleanupend - time.time() | ||
t.join(remainingtime) | ||||
Wojciech Lis
|
r35428 | if t.is_alive(): | ||
Wojciech Lis
|
r35448 | # pass over the workers joining failure. it is more | ||
# important to surface the inital exception than the | ||||
# fact that one of workers may be processing a large | ||||
# task and does not get to handle the interruption. | ||||
Augie Fackler
|
r43346 | ui.warn( | ||
_( | ||||
Augie Fackler
|
r43347 | b"failed to kill worker threads while " | ||
b"handling an exception\n" | ||||
Augie Fackler
|
r43346 | ) | ||
) | ||||
Wojciech Lis
|
r35448 | return | ||
Wojciech Lis
|
r35427 | |||
workers = _numworkers(ui) | ||||
Gregory Szorc
|
r37863 | resultqueue = pycompat.queue.Queue() | ||
taskqueue = pycompat.queue.Queue() | ||||
Valentin Gatien-Baron
|
r42722 | retval = {} | ||
Wojciech Lis
|
r35427 | # partition work to more pieces than workers to minimize the chance | ||
# of uneven distribution of large tasks between the workers | ||||
Valentin Gatien-Baron
|
r42722 | for pargs in partition(args, workers * 20): | ||
Wojciech Lis
|
r35427 | taskqueue.put(pargs) | ||
for _i in range(workers): | ||||
t = Worker(taskqueue, resultqueue, func, staticargs) | ||||
threads.append(t) | ||||
t.start() | ||||
Wojciech Lis
|
r35448 | try: | ||
while len(threads) > 0: | ||||
while not resultqueue.empty(): | ||||
Valentin Gatien-Baron
|
r42722 | res = resultqueue.get() | ||
Valentin Gatien-Baron
|
r42655 | if hasretval and res[0]: | ||
Valentin Gatien-Baron
|
r42722 | retval.update(res[1]) | ||
Valentin Gatien-Baron
|
r42655 | else: | ||
yield res | ||||
Wojciech Lis
|
r35448 | threads[0].join(0.05) | ||
finishedthreads = [_t for _t in threads if not _t.is_alive()] | ||||
for t in finishedthreads: | ||||
if t.exception is not None: | ||||
raise t.exception | ||||
threads.remove(t) | ||||
Augie Fackler
|
r43346 | except (Exception, KeyboardInterrupt): # re-raises | ||
Wojciech Lis
|
r35448 | trykillworkers() | ||
raise | ||||
Wojciech Lis
|
r35427 | while not resultqueue.empty(): | ||
Valentin Gatien-Baron
|
r42722 | res = resultqueue.get() | ||
Valentin Gatien-Baron
|
r42655 | if hasretval and res[0]: | ||
Valentin Gatien-Baron
|
r42722 | retval.update(res[1]) | ||
Valentin Gatien-Baron
|
r42655 | else: | ||
yield res | ||||
if hasretval: | ||||
Valentin Gatien-Baron
|
r42722 | yield True, retval | ||
Wojciech Lis
|
r35427 | |||
Augie Fackler
|
r43346 | |||
Wojciech Lis
|
r35427 | if pycompat.iswindows: | ||
_platformworker = _windowsworker | ||||
else: | ||||
Bryan O'Sullivan
|
r18638 | _platformworker = _posixworker | ||
Bryan O'Sullivan
|
r18707 | _exitstatus = _posixexitstatus | ||
Bryan O'Sullivan
|
r18638 | |||
Augie Fackler
|
r43346 | |||
Bryan O'Sullivan
|
r18637 | def partition(lst, nslices): | ||
Augie Fackler
|
r46554 | """partition a list into N slices of roughly equal size | ||
Gregory Szorc
|
r28181 | |||
The current strategy takes every Nth element from the input. If | ||||
we ever write workers that need to preserve grouping in input | ||||
we should consider allowing callers to specify a partition strategy. | ||||
Gregory Szorc
|
r28292 | |||
mpm is not a fan of this partitioning strategy when files are involved. | ||||
In his words: | ||||
Single-threaded Mercurial makes a point of creating and visiting | ||||
files in a fixed order (alphabetical). When creating files in order, | ||||
a typical filesystem is likely to allocate them on nearby regions on | ||||
disk. Thus, when revisiting in the same order, locality is maximized | ||||
and various forms of OS and disk-level caching and read-ahead get a | ||||
chance to work. | ||||
This effect can be quite significant on spinning disks. I discovered it | ||||
circa Mercurial v0.4 when revlogs were named by hashes of filenames. | ||||
Tarring a repo and copying it to another disk effectively randomized | ||||
the revlog ordering on disk by sorting the revlogs by hash and suddenly | ||||
performance of my kernel checkout benchmark dropped by ~10x because the | ||||
"working set" of sectors visited no longer fit in the drive's cache and | ||||
the workload switched from streaming to random I/O. | ||||
What we should really be doing is have workers read filenames from a | ||||
ordered queue. This preserves locality and also keeps any worker from | ||||
getting more than one file out of balance. | ||||
Augie Fackler
|
r46554 | """ | ||
Gregory Szorc
|
r28181 | for i in range(nslices): | ||
yield lst[i::nslices] | ||||