worker.py
162 lines
| 4.5 KiB
| text/x-python
|
PythonLexer
/ mercurial / worker.py
Bryan O'Sullivan
|
r18635 | # worker.py - master-slave parallelism support | ||
# | ||||
# Copyright 2013 Facebook, Inc. | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
Gregory Szorc
|
r25992 | from __future__ import absolute_import | ||
import errno | ||||
import os | ||||
import signal | ||||
import sys | ||||
import threading | ||||
from .i18n import _ | ||||
Pierre-Yves David
|
r26587 | from . import error | ||
Bryan O'Sullivan
|
r18635 | |||
def countcpus(): | ||||
'''try to count the number of CPUs on the system''' | ||||
Gregory Szorc
|
r26568 | |||
# posix | ||||
Bryan O'Sullivan
|
r18635 | try: | ||
Gregory Szorc
|
r26568 | n = int(os.sysconf('SC_NPROCESSORS_ONLN')) | ||
if n > 0: | ||||
return n | ||||
except (AttributeError, ValueError): | ||||
pass | ||||
# windows | ||||
try: | ||||
n = int(os.environ['NUMBER_OF_PROCESSORS']) | ||||
if n > 0: | ||||
return n | ||||
except (KeyError, ValueError): | ||||
pass | ||||
return 1 | ||||
Bryan O'Sullivan
|
r18636 | |||
def _numworkers(ui): | ||||
s = ui.config('worker', 'numcpus') | ||||
if s: | ||||
try: | ||||
n = int(s) | ||||
if n >= 1: | ||||
return n | ||||
except ValueError: | ||||
Pierre-Yves David
|
r26587 | raise error.Abort(_('number of cpus must be an integer')) | ||
Bryan O'Sullivan
|
r18636 | return min(max(countcpus(), 4), 32) | ||
if os.name == 'posix': | ||||
_startupcost = 0.01 | ||||
else: | ||||
_startupcost = 1e30 | ||||
def worthwhile(ui, costperop, nops): | ||||
'''try to determine whether the benefit of multiple processes can | ||||
outweigh the cost of starting them''' | ||||
linear = costperop * nops | ||||
workers = _numworkers(ui) | ||||
benefit = linear - (_startupcost * workers + linear / workers) | ||||
return benefit >= 0.15 | ||||
Bryan O'Sullivan
|
r18637 | |||
Bryan O'Sullivan
|
r18638 | def worker(ui, costperarg, func, staticargs, args): | ||
'''run a function, possibly in parallel in multiple worker | ||||
processes. | ||||
returns a progress iterator | ||||
costperarg - cost of a single task | ||||
func - function to run | ||||
staticargs - arguments to pass to every invocation of the function | ||||
args - arguments to split into chunks, to pass to individual | ||||
workers | ||||
''' | ||||
if worthwhile(ui, costperarg, len(args)): | ||||
return _platformworker(ui, func, staticargs, args) | ||||
return func(*staticargs + (args,)) | ||||
def _posixworker(ui, func, staticargs, args): | ||||
rfd, wfd = os.pipe() | ||||
workers = _numworkers(ui) | ||||
Bryan O'Sullivan
|
r18708 | oldhandler = signal.getsignal(signal.SIGINT) | ||
signal.signal(signal.SIGINT, signal.SIG_IGN) | ||||
Bryan O'Sullivan
|
r18709 | pids, problem = [], [0] | ||
Bryan O'Sullivan
|
r18638 | for pargs in partition(args, workers): | ||
pid = os.fork() | ||||
if pid == 0: | ||||
Bryan O'Sullivan
|
r18708 | signal.signal(signal.SIGINT, oldhandler) | ||
Bryan O'Sullivan
|
r18638 | try: | ||
os.close(rfd) | ||||
for i, item in func(*(staticargs + (pargs,))): | ||||
os.write(wfd, '%d %s\n' % (i, item)) | ||||
os._exit(0) | ||||
except KeyboardInterrupt: | ||||
os._exit(255) | ||||
Matt Mackall
|
r19408 | # other exceptions are allowed to propagate, we rely | ||
# on lock.py's pid checks to avoid release callbacks | ||||
Bryan O'Sullivan
|
r18709 | pids.append(pid) | ||
pids.reverse() | ||||
Bryan O'Sullivan
|
r18638 | os.close(wfd) | ||
fp = os.fdopen(rfd, 'rb', 0) | ||||
Bryan O'Sullivan
|
r18709 | def killworkers(): | ||
# if one worker bails, there's no good reason to wait for the rest | ||||
for p in pids: | ||||
try: | ||||
os.kill(p, signal.SIGTERM) | ||||
Gregory Szorc
|
r25660 | except OSError as err: | ||
Bryan O'Sullivan
|
r18709 | if err.errno != errno.ESRCH: | ||
raise | ||||
def waitforworkers(): | ||||
Mads Kiilerich
|
r22199 | for _pid in pids: | ||
Bryan O'Sullivan
|
r18709 | st = _exitstatus(os.wait()[1]) | ||
Matt Mackall
|
r19406 | if st and not problem[0]: | ||
Bryan O'Sullivan
|
r18709 | problem[0] = st | ||
killworkers() | ||||
t = threading.Thread(target=waitforworkers) | ||||
t.start() | ||||
Bryan O'Sullivan
|
r18638 | def cleanup(): | ||
signal.signal(signal.SIGINT, oldhandler) | ||||
Bryan O'Sullivan
|
r18709 | t.join() | ||
status = problem[0] | ||||
if status: | ||||
if status < 0: | ||||
os.kill(os.getpid(), -status) | ||||
sys.exit(status) | ||||
Bryan O'Sullivan
|
r18638 | try: | ||
for line in fp: | ||||
l = line.split(' ', 1) | ||||
yield int(l[0]), l[1][:-1] | ||||
except: # re-raises | ||||
Bryan O'Sullivan
|
r18709 | killworkers() | ||
Bryan O'Sullivan
|
r18638 | cleanup() | ||
raise | ||||
cleanup() | ||||
Bryan O'Sullivan
|
r18707 | def _posixexitstatus(code): | ||
'''convert a posix exit status into the same form returned by | ||||
os.spawnv | ||||
returns None if the process was stopped instead of exiting''' | ||||
if os.WIFEXITED(code): | ||||
return os.WEXITSTATUS(code) | ||||
elif os.WIFSIGNALED(code): | ||||
return -os.WTERMSIG(code) | ||||
Bryan O'Sullivan
|
r18638 | if os.name != 'nt': | ||
_platformworker = _posixworker | ||||
Bryan O'Sullivan
|
r18707 | _exitstatus = _posixexitstatus | ||
Bryan O'Sullivan
|
r18638 | |||
Bryan O'Sullivan
|
r18637 | def partition(lst, nslices): | ||
Gregory Szorc
|
r28181 | '''partition a list into N slices of roughly equal size | ||
The current strategy takes every Nth element from the input. If | ||||
we ever write workers that need to preserve grouping in input | ||||
we should consider allowing callers to specify a partition strategy. | ||||
''' | ||||
for i in range(nslices): | ||||
yield lst[i::nslices] | ||||