worker.py
368 lines
| 13.0 KiB
| text/x-python
|
PythonLexer
/ mercurial / worker.py
Bryan O'Sullivan
|
r18635 | # worker.py - master-slave parallelism support | ||
# | ||||
# Copyright 2013 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Gregory Szorc
|
r25992 | from __future__ import absolute_import | ||
import errno | ||||
import os | ||||
import signal | ||||
import sys | ||||
Wojciech Lis
|
r35427 | import threading | ||
Wojciech Lis
|
r35448 | import time | ||
Gregory Szorc
|
r25992 | |||
Danny Hooper
|
r38752 | try: | ||
import selectors | ||||
selectors.BaseSelector | ||||
except ImportError: | ||||
from .thirdparty import selectors2 as selectors | ||||
Gregory Szorc
|
r25992 | from .i18n import _ | ||
Jun Wu
|
r30396 | from . import ( | ||
Pulkit Goyal
|
r30635 | encoding, | ||
Jun Wu
|
r30396 | error, | ||
Pulkit Goyal
|
r30639 | pycompat, | ||
Jun Wu
|
r30521 | scmutil, | ||
Jun Wu
|
r30396 | util, | ||
) | ||||
Bryan O'Sullivan
|
r18635 | |||
def countcpus(): | ||||
'''try to count the number of CPUs on the system''' | ||||
Gregory Szorc
|
r26568 | |||
# posix | ||||
Bryan O'Sullivan
|
r18635 | try: | ||
Pulkit Goyal
|
r32611 | n = int(os.sysconf(r'SC_NPROCESSORS_ONLN')) | ||
Gregory Szorc
|
r26568 | if n > 0: | ||
return n | ||||
except (AttributeError, ValueError): | ||||
pass | ||||
# windows | ||||
try: | ||||
Pulkit Goyal
|
r30635 | n = int(encoding.environ['NUMBER_OF_PROCESSORS']) | ||
Gregory Szorc
|
r26568 | if n > 0: | ||
return n | ||||
except (KeyError, ValueError): | ||||
pass | ||||
return 1 | ||||
Bryan O'Sullivan
|
r18636 | |||
def _numworkers(ui): | ||||
s = ui.config('worker', 'numcpus') | ||||
if s: | ||||
try: | ||||
n = int(s) | ||||
if n >= 1: | ||||
return n | ||||
except ValueError: | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_('number of cpus must be an integer')) | ||
Bryan O'Sullivan
|
r18636 | return min(max(countcpus(), 4), 32) | ||
Wojciech Lis
|
r35427 | if pycompat.isposix or pycompat.iswindows: | ||
Gregory Szorc
|
r38753 | _STARTUP_COST = 0.01 | ||
Gregory Szorc
|
r38754 | # The Windows worker is thread based. If tasks are CPU bound, threads | ||
# in the presence of the GIL result in excessive context switching and | ||||
# this overhead can slow down execution. | ||||
_DISALLOW_THREAD_UNSAFE = pycompat.iswindows | ||||
Bryan O'Sullivan
|
r18636 | else: | ||
Gregory Szorc
|
r38753 | _STARTUP_COST = 1e30 | ||
Gregory Szorc
|
r38754 | _DISALLOW_THREAD_UNSAFE = False | ||
Bryan O'Sullivan
|
r18636 | |||
Gregory Szorc
|
r38754 | def worthwhile(ui, costperop, nops, threadsafe=True): | ||
Bryan O'Sullivan
|
r18636 | '''try to determine whether the benefit of multiple processes can | ||
outweigh the cost of starting them''' | ||||
Gregory Szorc
|
r38754 | |||
if not threadsafe and _DISALLOW_THREAD_UNSAFE: | ||||
return False | ||||
Bryan O'Sullivan
|
r18636 | linear = costperop * nops | ||
workers = _numworkers(ui) | ||||
Gregory Szorc
|
r38753 | benefit = linear - (_STARTUP_COST * workers + linear / workers) | ||
Bryan O'Sullivan
|
r18636 | return benefit >= 0.15 | ||
Bryan O'Sullivan
|
r18637 | |||
Gregory Szorc
|
r38754 | def worker(ui, costperarg, func, staticargs, args, threadsafe=True): | ||
Bryan O'Sullivan
|
r18638 | '''run a function, possibly in parallel in multiple worker | ||
processes. | ||||
returns a progress iterator | ||||
costperarg - cost of a single task | ||||
func - function to run | ||||
staticargs - arguments to pass to every invocation of the function | ||||
args - arguments to split into chunks, to pass to individual | ||||
workers | ||||
Gregory Szorc
|
r38754 | |||
threadsafe - whether work items are thread safe and can be executed using | ||||
a thread-based worker. Should be disabled for CPU heavy tasks that don't | ||||
release the GIL. | ||||
Bryan O'Sullivan
|
r18638 | ''' | ||
Wojciech Lis
|
r35447 | enabled = ui.configbool('worker', 'enabled') | ||
Gregory Szorc
|
r38754 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | ||
Bryan O'Sullivan
|
r18638 | return _platformworker(ui, func, staticargs, args) | ||
return func(*staticargs + (args,)) | ||||
def _posixworker(ui, func, staticargs, args): | ||||
workers = _numworkers(ui) | ||||
Bryan O'Sullivan
|
r18708 | oldhandler = signal.getsignal(signal.SIGINT) | ||
signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
Jun Wu
|
r30413 | pids, problem = set(), [0] | ||
Jun Wu
|
r30410 | def killworkers(): | ||
Yuya Nishihara
|
r30423 | # unregister SIGCHLD handler as all children will be killed. This | ||
# function shouldn't be interrupted by another SIGCHLD; otherwise pids | ||||
# could be updated while iterating, which would cause inconsistency. | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
Jun Wu
|
r30410 | # if one worker bails, there's no good reason to wait for the rest | ||
for p in pids: | ||||
try: | ||||
os.kill(p, signal.SIGTERM) | ||||
except OSError as err: | ||||
if err.errno != errno.ESRCH: | ||||
raise | ||||
Jun Wu
|
r30412 | def waitforworkers(blocking=True): | ||
Jun Wu
|
r30414 | for pid in pids.copy(): | ||
p = st = 0 | ||||
while True: | ||||
try: | ||||
p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) | ||||
Yuya Nishihara
|
r30422 | break | ||
Jun Wu
|
r30414 | except OSError as e: | ||
if e.errno == errno.EINTR: | ||||
continue | ||||
elif e.errno == errno.ECHILD: | ||||
Yuya Nishihara
|
r30425 | # child would already be reaped, but pids yet been | ||
# updated (maybe interrupted just after waitpid) | ||||
pids.discard(pid) | ||||
break | ||||
Jun Wu
|
r30414 | else: | ||
raise | ||||
FUJIWARA Katsunori
|
r31063 | if not p: | ||
# skip subsequent steps, because child process should | ||||
# be still running in this case | ||||
continue | ||||
pids.discard(p) | ||||
st = _exitstatus(st) | ||||
Jun Wu
|
r30410 | if st and not problem[0]: | ||
problem[0] = st | ||||
Jun Wu
|
r30415 | def sigchldhandler(signum, frame): | ||
waitforworkers(blocking=False) | ||||
Yuya Nishihara
|
r30424 | if problem[0]: | ||
killworkers() | ||||
Jun Wu
|
r30415 | oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | ||
David Soria Parra
|
r31696 | ui.flush() | ||
Jun Wu
|
r32112 | parentpid = os.getpid() | ||
Danny Hooper
|
r38752 | pipes = [] | ||
Bryan O'Sullivan
|
r18638 | for pargs in partition(args, workers): | ||
Danny Hooper
|
r38752 | # Every worker gets its own pipe to send results on, so we don't have to | ||
# implement atomic writes larger than PIPE_BUF. Each forked process has | ||||
# its own pipe's descriptors in the local variables, and the parent | ||||
# process has the full list of pipe descriptors (and it doesn't really | ||||
# care what order they're in). | ||||
rfd, wfd = os.pipe() | ||||
pipes.append((rfd, wfd)) | ||||
Jun Wu
|
r32112 | # make sure we use os._exit in all worker code paths. otherwise the | ||
# worker may do some clean-ups which could cause surprises like | ||||
# deadlock. see sshpeer.cleanup for example. | ||||
# override error handling *before* fork. this is necessary because | ||||
# exception (signal) may arrive after fork, before "pid =" assignment | ||||
# completes, and other exception handler (dispatch.py) can lead to | ||||
# unexpected code path without os._exit. | ||||
ret = -1 | ||||
try: | ||||
pid = os.fork() | ||||
if pid == 0: | ||||
signal.signal(signal.SIGINT, oldhandler) | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
Jun Wu
|
r30521 | |||
Jun Wu
|
r32112 | def workerfunc(): | ||
Danny Hooper
|
r38752 | for r, w in pipes[:-1]: | ||
os.close(r) | ||||
os.close(w) | ||||
Jun Wu
|
r32112 | os.close(rfd) | ||
Danny Hooper
|
r38553 | for result in func(*(staticargs + (pargs,))): | ||
os.write(wfd, util.pickle.dumps(result)) | ||||
Jun Wu
|
r32112 | return 0 | ||
ret = scmutil.callcatch(ui, workerfunc) | ||||
except: # parent re-raises, child never returns | ||||
if os.getpid() == parentpid: | ||||
raise | ||||
exctype = sys.exc_info()[0] | ||||
force = not issubclass(exctype, KeyboardInterrupt) | ||||
ui.traceback(force=force) | ||||
finally: | ||||
if os.getpid() != parentpid: | ||||
Yuya Nishihara
|
r31118 | try: | ||
ui.flush() | ||||
Jun Wu
|
r32112 | except: # never returns, no re-raises | ||
pass | ||||
Jun Wu
|
r30521 | finally: | ||
Jun Wu
|
r32112 | os._exit(ret & 255) | ||
Jun Wu
|
r30413 | pids.add(pid) | ||
Danny Hooper
|
r38752 | selector = selectors.DefaultSelector() | ||
for rfd, wfd in pipes: | ||||
os.close(wfd) | ||||
selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) | ||||
Bryan O'Sullivan
|
r18638 | def cleanup(): | ||
signal.signal(signal.SIGINT, oldhandler) | ||||
Jun Wu
|
r30416 | waitforworkers() | ||
Jun Wu
|
r30415 | signal.signal(signal.SIGCHLD, oldchldhandler) | ||
Yuya Nishihara
|
r38763 | selector.close() | ||
Bryan O'Sullivan
|
r18709 | status = problem[0] | ||
if status: | ||||
if status < 0: | ||||
os.kill(os.getpid(), -status) | ||||
sys.exit(status) | ||||
Bryan O'Sullivan
|
r18638 | try: | ||
Danny Hooper
|
r38752 | openpipes = len(pipes) | ||
while openpipes > 0: | ||||
for key, events in selector.select(): | ||||
try: | ||||
yield util.pickle.load(key.fileobj) | ||||
except EOFError: | ||||
selector.unregister(key.fileobj) | ||||
key.fileobj.close() | ||||
openpipes -= 1 | ||||
except IOError as e: | ||||
if e.errno == errno.EINTR: | ||||
continue | ||||
raise | ||||
Bryan O'Sullivan
|
r18638 | except: # re-raises | ||
Bryan O'Sullivan
|
r18709 | killworkers() | ||
Bryan O'Sullivan
|
r18638 | cleanup() | ||
raise | ||||
cleanup() | ||||
Bryan O'Sullivan
|
r18707 | def _posixexitstatus(code): | ||
'''convert a posix exit status into the same form returned by | ||||
os.spawnv | ||||
returns None if the process was stopped instead of exiting''' | ||||
if os.WIFEXITED(code): | ||||
return os.WEXITSTATUS(code) | ||||
elif os.WIFSIGNALED(code): | ||||
return -os.WTERMSIG(code) | ||||
Wojciech Lis
|
r35427 | def _windowsworker(ui, func, staticargs, args): | ||
class Worker(threading.Thread): | ||||
Matt Harbison
|
r40475 | def __init__(self, taskqueue, resultqueue, func, staticargs, *args, | ||
**kwargs): | ||||
threading.Thread.__init__(self, *args, **kwargs) | ||||
Wojciech Lis
|
r35427 | self._taskqueue = taskqueue | ||
self._resultqueue = resultqueue | ||||
self._func = func | ||||
self._staticargs = staticargs | ||||
Wojciech Lis
|
r35428 | self._interrupted = False | ||
Wojciech Lis
|
r35448 | self.daemon = True | ||
Wojciech Lis
|
r35428 | self.exception = None | ||
def interrupt(self): | ||||
self._interrupted = True | ||||
Wojciech Lis
|
r35427 | |||
def run(self): | ||||
Wojciech Lis
|
r35428 | try: | ||
while not self._taskqueue.empty(): | ||||
try: | ||||
args = self._taskqueue.get_nowait() | ||||
for res in self._func(*self._staticargs + (args,)): | ||||
self._resultqueue.put(res) | ||||
# threading doesn't provide a native way to | ||||
# interrupt execution. handle it manually at every | ||||
# iteration. | ||||
if self._interrupted: | ||||
return | ||||
Gregory Szorc
|
r37863 | except pycompat.queue.Empty: | ||
Wojciech Lis
|
r35428 | break | ||
except Exception as e: | ||||
# store the exception such that the main thread can resurface | ||||
# it as if the func was running without workers. | ||||
self.exception = e | ||||
raise | ||||
threads = [] | ||||
Wojciech Lis
|
r35448 | def trykillworkers(): | ||
# Allow up to 1 second to clean worker threads nicely | ||||
cleanupend = time.time() + 1 | ||||
Wojciech Lis
|
r35428 | for t in threads: | ||
t.interrupt() | ||||
for t in threads: | ||||
Wojciech Lis
|
r35448 | remainingtime = cleanupend - time.time() | ||
t.join(remainingtime) | ||||
Wojciech Lis
|
r35428 | if t.is_alive(): | ||
Wojciech Lis
|
r35448 | # pass over the workers joining failure. it is more | ||
# important to surface the inital exception than the | ||||
# fact that one of workers may be processing a large | ||||
# task and does not get to handle the interruption. | ||||
ui.warn(_("failed to kill worker threads while " | ||||
"handling an exception\n")) | ||||
return | ||||
Wojciech Lis
|
r35427 | |||
workers = _numworkers(ui) | ||||
Gregory Szorc
|
r37863 | resultqueue = pycompat.queue.Queue() | ||
taskqueue = pycompat.queue.Queue() | ||||
Wojciech Lis
|
r35427 | # partition work to more pieces than workers to minimize the chance | ||
# of uneven distribution of large tasks between the workers | ||||
for pargs in partition(args, workers * 20): | ||||
taskqueue.put(pargs) | ||||
for _i in range(workers): | ||||
t = Worker(taskqueue, resultqueue, func, staticargs) | ||||
threads.append(t) | ||||
t.start() | ||||
Wojciech Lis
|
r35448 | try: | ||
while len(threads) > 0: | ||||
while not resultqueue.empty(): | ||||
yield resultqueue.get() | ||||
threads[0].join(0.05) | ||||
finishedthreads = [_t for _t in threads if not _t.is_alive()] | ||||
for t in finishedthreads: | ||||
if t.exception is not None: | ||||
raise t.exception | ||||
threads.remove(t) | ||||
Wojciech Lis
|
r35469 | except (Exception, KeyboardInterrupt): # re-raises | ||
Wojciech Lis
|
r35448 | trykillworkers() | ||
raise | ||||
Wojciech Lis
|
r35427 | while not resultqueue.empty(): | ||
yield resultqueue.get() | ||||
if pycompat.iswindows: | ||||
_platformworker = _windowsworker | ||||
else: | ||||
Bryan O'Sullivan
|
r18638 | _platformworker = _posixworker | ||
Bryan O'Sullivan
|
r18707 | _exitstatus = _posixexitstatus | ||
Bryan O'Sullivan
|
r18638 | |||
Bryan O'Sullivan
|
r18637 | def partition(lst, nslices): | ||
Gregory Szorc
|
r28181 | '''partition a list into N slices of roughly equal size | ||
The current strategy takes every Nth element from the input. If | ||||
we ever write workers that need to preserve grouping in input | ||||
we should consider allowing callers to specify a partition strategy. | ||||
Gregory Szorc
|
r28292 | |||
mpm is not a fan of this partitioning strategy when files are involved. | ||||
In his words: | ||||
Single-threaded Mercurial makes a point of creating and visiting | ||||
files in a fixed order (alphabetical). When creating files in order, | ||||
a typical filesystem is likely to allocate them on nearby regions on | ||||
disk. Thus, when revisiting in the same order, locality is maximized | ||||
and various forms of OS and disk-level caching and read-ahead get a | ||||
chance to work. | ||||
This effect can be quite significant on spinning disks. I discovered it | ||||
circa Mercurial v0.4 when revlogs were named by hashes of filenames. | ||||
Tarring a repo and copying it to another disk effectively randomized | ||||
the revlog ordering on disk by sorting the revlogs by hash and suddenly | ||||
performance of my kernel checkout benchmark dropped by ~10x because the | ||||
"working set" of sectors visited no longer fit in the drive's cache and | ||||
the workload switched from streaming to random I/O. | ||||
What we should really be doing is have workers read filenames from a | ||||
ordered queue. This preserves locality and also keeps any worker from | ||||
getting more than one file out of balance. | ||||
Gregory Szorc
|
r28181 | ''' | ||
for i in range(nslices): | ||||
yield lst[i::nslices] | ||||