Show More
worker.py
327 lines
| 11.4 KiB
| text/x-python
|
PythonLexer
/ mercurial / worker.py
Bryan O'Sullivan
|
r18635 | # worker.py - master-slave parallelism support | ||
# | ||||
# Copyright 2013 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Gregory Szorc
|
r25992 | from __future__ import absolute_import | ||
import errno | ||||
import os | ||||
import signal | ||||
import sys | ||||
Wojciech Lis
|
r35427 | import threading | ||
Wojciech Lis
|
r35448 | import time | ||
Gregory Szorc
|
r25992 | |||
from .i18n import _ | ||||
Jun Wu
|
r30396 | from . import ( | ||
Pulkit Goyal
|
r30635 | encoding, | ||
Jun Wu
|
r30396 | error, | ||
Pulkit Goyal
|
r30639 | pycompat, | ||
Jun Wu
|
r30521 | scmutil, | ||
Jun Wu
|
r30396 | util, | ||
) | ||||
Bryan O'Sullivan
|
r18635 | |||
def countcpus(): | ||||
'''try to count the number of CPUs on the system''' | ||||
Gregory Szorc
|
r26568 | |||
# posix | ||||
Bryan O'Sullivan
|
r18635 | try: | ||
Pulkit Goyal
|
r32611 | n = int(os.sysconf(r'SC_NPROCESSORS_ONLN')) | ||
Gregory Szorc
|
r26568 | if n > 0: | ||
return n | ||||
except (AttributeError, ValueError): | ||||
pass | ||||
# windows | ||||
try: | ||||
Pulkit Goyal
|
r30635 | n = int(encoding.environ['NUMBER_OF_PROCESSORS']) | ||
Gregory Szorc
|
r26568 | if n > 0: | ||
return n | ||||
except (KeyError, ValueError): | ||||
pass | ||||
return 1 | ||||
Bryan O'Sullivan
|
r18636 | |||
def _numworkers(ui): | ||||
s = ui.config('worker', 'numcpus') | ||||
if s: | ||||
try: | ||||
n = int(s) | ||||
if n >= 1: | ||||
return n | ||||
except ValueError: | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_('number of cpus must be an integer')) | ||
Bryan O'Sullivan
|
r18636 | return min(max(countcpus(), 4), 32) | ||
Wojciech Lis
|
r35427 | if pycompat.isposix or pycompat.iswindows: | ||
Bryan O'Sullivan
|
r18636 | _startupcost = 0.01 | ||
else: | ||||
_startupcost = 1e30 | ||||
def worthwhile(ui, costperop, nops): | ||||
'''try to determine whether the benefit of multiple processes can | ||||
outweigh the cost of starting them''' | ||||
linear = costperop * nops | ||||
workers = _numworkers(ui) | ||||
benefit = linear - (_startupcost * workers + linear / workers) | ||||
return benefit >= 0.15 | ||||
Bryan O'Sullivan
|
r18637 | |||
Bryan O'Sullivan
|
r18638 | def worker(ui, costperarg, func, staticargs, args): | ||
'''run a function, possibly in parallel in multiple worker | ||||
processes. | ||||
returns a progress iterator | ||||
costperarg - cost of a single task | ||||
func - function to run | ||||
staticargs - arguments to pass to every invocation of the function | ||||
args - arguments to split into chunks, to pass to individual | ||||
workers | ||||
''' | ||||
Wojciech Lis
|
r35447 | enabled = ui.configbool('worker', 'enabled') | ||
if enabled and worthwhile(ui, costperarg, len(args)): | ||||
Bryan O'Sullivan
|
r18638 | return _platformworker(ui, func, staticargs, args) | ||
return func(*staticargs + (args,)) | ||||
def _posixworker(ui, func, staticargs, args): | ||||
rfd, wfd = os.pipe() | ||||
workers = _numworkers(ui) | ||||
Bryan O'Sullivan
|
r18708 | oldhandler = signal.getsignal(signal.SIGINT) | ||
signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
Jun Wu
|
r30413 | pids, problem = set(), [0] | ||
Jun Wu
|
r30410 | def killworkers(): | ||
Yuya Nishihara
|
r30423 | # unregister SIGCHLD handler as all children will be killed. This | ||
# function shouldn't be interrupted by another SIGCHLD; otherwise pids | ||||
# could be updated while iterating, which would cause inconsistency. | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
Jun Wu
|
r30410 | # if one worker bails, there's no good reason to wait for the rest | ||
for p in pids: | ||||
try: | ||||
os.kill(p, signal.SIGTERM) | ||||
except OSError as err: | ||||
if err.errno != errno.ESRCH: | ||||
raise | ||||
Jun Wu
|
r30412 | def waitforworkers(blocking=True): | ||
Jun Wu
|
r30414 | for pid in pids.copy(): | ||
p = st = 0 | ||||
while True: | ||||
try: | ||||
p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG)) | ||||
Yuya Nishihara
|
r30422 | break | ||
Jun Wu
|
r30414 | except OSError as e: | ||
if e.errno == errno.EINTR: | ||||
continue | ||||
elif e.errno == errno.ECHILD: | ||||
Yuya Nishihara
|
r30425 | # child would already be reaped, but pids yet been | ||
# updated (maybe interrupted just after waitpid) | ||||
pids.discard(pid) | ||||
break | ||||
Jun Wu
|
r30414 | else: | ||
raise | ||||
FUJIWARA Katsunori
|
r31063 | if not p: | ||
# skip subsequent steps, because child process should | ||||
# be still running in this case | ||||
continue | ||||
pids.discard(p) | ||||
st = _exitstatus(st) | ||||
Jun Wu
|
r30410 | if st and not problem[0]: | ||
problem[0] = st | ||||
Jun Wu
|
r30415 | def sigchldhandler(signum, frame): | ||
waitforworkers(blocking=False) | ||||
Yuya Nishihara
|
r30424 | if problem[0]: | ||
killworkers() | ||||
Jun Wu
|
r30415 | oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) | ||
David Soria Parra
|
r31696 | ui.flush() | ||
Jun Wu
|
r32112 | parentpid = os.getpid() | ||
Bryan O'Sullivan
|
r18638 | for pargs in partition(args, workers): | ||
Jun Wu
|
r32112 | # make sure we use os._exit in all worker code paths. otherwise the | ||
# worker may do some clean-ups which could cause surprises like | ||||
# deadlock. see sshpeer.cleanup for example. | ||||
# override error handling *before* fork. this is necessary because | ||||
# exception (signal) may arrive after fork, before "pid =" assignment | ||||
# completes, and other exception handler (dispatch.py) can lead to | ||||
# unexpected code path without os._exit. | ||||
ret = -1 | ||||
try: | ||||
pid = os.fork() | ||||
if pid == 0: | ||||
signal.signal(signal.SIGINT, oldhandler) | ||||
signal.signal(signal.SIGCHLD, oldchldhandler) | ||||
Jun Wu
|
r30521 | |||
Jun Wu
|
r32112 | def workerfunc(): | ||
os.close(rfd) | ||||
for i, item in func(*(staticargs + (pargs,))): | ||||
os.write(wfd, '%d %s\n' % (i, item)) | ||||
return 0 | ||||
ret = scmutil.callcatch(ui, workerfunc) | ||||
except: # parent re-raises, child never returns | ||||
if os.getpid() == parentpid: | ||||
raise | ||||
exctype = sys.exc_info()[0] | ||||
force = not issubclass(exctype, KeyboardInterrupt) | ||||
ui.traceback(force=force) | ||||
finally: | ||||
if os.getpid() != parentpid: | ||||
Yuya Nishihara
|
r31118 | try: | ||
ui.flush() | ||||
Jun Wu
|
r32112 | except: # never returns, no re-raises | ||
pass | ||||
Jun Wu
|
r30521 | finally: | ||
Jun Wu
|
r32112 | os._exit(ret & 255) | ||
Jun Wu
|
r30413 | pids.add(pid) | ||
Bryan O'Sullivan
|
r18638 | os.close(wfd) | ||
Pulkit Goyal
|
r30924 | fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0) | ||
Bryan O'Sullivan
|
r18638 | def cleanup(): | ||
signal.signal(signal.SIGINT, oldhandler) | ||||
Jun Wu
|
r30416 | waitforworkers() | ||
Jun Wu
|
r30415 | signal.signal(signal.SIGCHLD, oldchldhandler) | ||
Bryan O'Sullivan
|
r18709 | status = problem[0] | ||
if status: | ||||
if status < 0: | ||||
os.kill(os.getpid(), -status) | ||||
sys.exit(status) | ||||
Bryan O'Sullivan
|
r18638 | try: | ||
Jun Wu
|
r30396 | for line in util.iterfile(fp): | ||
Bryan O'Sullivan
|
r18638 | l = line.split(' ', 1) | ||
yield int(l[0]), l[1][:-1] | ||||
except: # re-raises | ||||
Bryan O'Sullivan
|
r18709 | killworkers() | ||
Bryan O'Sullivan
|
r18638 | cleanup() | ||
raise | ||||
cleanup() | ||||
Bryan O'Sullivan
|
r18707 | def _posixexitstatus(code): | ||
'''convert a posix exit status into the same form returned by | ||||
os.spawnv | ||||
returns None if the process was stopped instead of exiting''' | ||||
if os.WIFEXITED(code): | ||||
return os.WEXITSTATUS(code) | ||||
elif os.WIFSIGNALED(code): | ||||
return -os.WTERMSIG(code) | ||||
Wojciech Lis
|
r35427 | def _windowsworker(ui, func, staticargs, args): | ||
class Worker(threading.Thread): | ||||
def __init__(self, taskqueue, resultqueue, func, staticargs, | ||||
group=None, target=None, name=None, verbose=None): | ||||
threading.Thread.__init__(self, group=group, target=target, | ||||
name=name, verbose=verbose) | ||||
self._taskqueue = taskqueue | ||||
self._resultqueue = resultqueue | ||||
self._func = func | ||||
self._staticargs = staticargs | ||||
Wojciech Lis
|
r35428 | self._interrupted = False | ||
Wojciech Lis
|
r35448 | self.daemon = True | ||
Wojciech Lis
|
r35428 | self.exception = None | ||
def interrupt(self): | ||||
self._interrupted = True | ||||
Wojciech Lis
|
r35427 | |||
def run(self): | ||||
Wojciech Lis
|
r35428 | try: | ||
while not self._taskqueue.empty(): | ||||
try: | ||||
args = self._taskqueue.get_nowait() | ||||
for res in self._func(*self._staticargs + (args,)): | ||||
self._resultqueue.put(res) | ||||
# threading doesn't provide a native way to | ||||
# interrupt execution. handle it manually at every | ||||
# iteration. | ||||
if self._interrupted: | ||||
return | ||||
except util.empty: | ||||
break | ||||
except Exception as e: | ||||
# store the exception such that the main thread can resurface | ||||
# it as if the func was running without workers. | ||||
self.exception = e | ||||
raise | ||||
threads = [] | ||||
Wojciech Lis
|
r35448 | def trykillworkers(): | ||
# Allow up to 1 second to clean worker threads nicely | ||||
cleanupend = time.time() + 1 | ||||
Wojciech Lis
|
r35428 | for t in threads: | ||
t.interrupt() | ||||
for t in threads: | ||||
Wojciech Lis
|
r35448 | remainingtime = cleanupend - time.time() | ||
t.join(remainingtime) | ||||
Wojciech Lis
|
r35428 | if t.is_alive(): | ||
Wojciech Lis
|
r35448 | # pass over the workers joining failure. it is more | ||
# important to surface the inital exception than the | ||||
# fact that one of workers may be processing a large | ||||
# task and does not get to handle the interruption. | ||||
ui.warn(_("failed to kill worker threads while " | ||||
"handling an exception\n")) | ||||
return | ||||
Wojciech Lis
|
r35427 | |||
workers = _numworkers(ui) | ||||
resultqueue = util.queue() | ||||
taskqueue = util.queue() | ||||
# partition work to more pieces than workers to minimize the chance | ||||
# of uneven distribution of large tasks between the workers | ||||
for pargs in partition(args, workers * 20): | ||||
taskqueue.put(pargs) | ||||
for _i in range(workers): | ||||
t = Worker(taskqueue, resultqueue, func, staticargs) | ||||
threads.append(t) | ||||
t.start() | ||||
Wojciech Lis
|
r35448 | try: | ||
while len(threads) > 0: | ||||
while not resultqueue.empty(): | ||||
yield resultqueue.get() | ||||
threads[0].join(0.05) | ||||
finishedthreads = [_t for _t in threads if not _t.is_alive()] | ||||
for t in finishedthreads: | ||||
if t.exception is not None: | ||||
raise t.exception | ||||
threads.remove(t) | ||||
Wojciech Lis
|
r35469 | except (Exception, KeyboardInterrupt): # re-raises | ||
Wojciech Lis
|
r35448 | trykillworkers() | ||
raise | ||||
Wojciech Lis
|
r35427 | while not resultqueue.empty(): | ||
yield resultqueue.get() | ||||
if pycompat.iswindows: | ||||
_platformworker = _windowsworker | ||||
else: | ||||
Bryan O'Sullivan
|
r18638 | _platformworker = _posixworker | ||
Bryan O'Sullivan
|
r18707 | _exitstatus = _posixexitstatus | ||
Bryan O'Sullivan
|
r18638 | |||
Bryan O'Sullivan
|
r18637 | def partition(lst, nslices): | ||
Gregory Szorc
|
r28181 | '''partition a list into N slices of roughly equal size | ||
The current strategy takes every Nth element from the input. If | ||||
we ever write workers that need to preserve grouping in input | ||||
we should consider allowing callers to specify a partition strategy. | ||||
Gregory Szorc
|
r28292 | |||
mpm is not a fan of this partitioning strategy when files are involved. | ||||
In his words: | ||||
Single-threaded Mercurial makes a point of creating and visiting | ||||
files in a fixed order (alphabetical). When creating files in order, | ||||
a typical filesystem is likely to allocate them on nearby regions on | ||||
disk. Thus, when revisiting in the same order, locality is maximized | ||||
and various forms of OS and disk-level caching and read-ahead get a | ||||
chance to work. | ||||
This effect can be quite significant on spinning disks. I discovered it | ||||
circa Mercurial v0.4 when revlogs were named by hashes of filenames. | ||||
Tarring a repo and copying it to another disk effectively randomized | ||||
the revlog ordering on disk by sorting the revlogs by hash and suddenly | ||||
performance of my kernel checkout benchmark dropped by ~10x because the | ||||
"working set" of sectors visited no longer fit in the drive's cache and | ||||
the workload switched from streaming to random I/O. | ||||
What we should really be doing is have workers read filenames from a | ||||
ordered queue. This preserves locality and also keeps any worker from | ||||
getting more than one file out of balance. | ||||
Gregory Szorc
|
r28181 | ''' | ||
for i in range(nslices): | ||||
yield lst[i::nslices] | ||||