##// END OF EJS Templates
worker: change partition strategy to every Nth element...
worker: change partition strategy to every Nth element The only consumer of the worker pool code today is `hg update`. Previously, the algorithm to partition work to each worker process preserved input list ordering. We'd take the first N elements, then the next N elements, etc. Measurements on mozilla-central demonstrate this isn't an optimal partitioning strategy. I added debug code to print when workers were exiting. When performing a working copy update on a previously empty working copy of mozilla-central, I noticed that process lifetimes were all over the map. One worker would complete after 7s. Many would complete after 12s. And another worker would often take >16s. This behavior occurred for many worker process counts and was more pronounced on some than others. What I suspect is happening is some workers end up with lots of small files and others with large files. This is because the update code passes in actions according to sorted filenames. And, directories under tend to accumulate similar files. For example, test directories often consist of many small test files and media directories contain binary (often larger) media files. This patch changes the partitioning algorithm to select every Nth element from the input list. Each worker thus has a similar composition of files to operate on. The result of this change is that worker processes now all tend to exit around the same time. The possibility of a long pole due to being unlucky and receiving all the large files has been mitigated. Overall execution time seems to drop, but not by a statistically significant amount on mozilla-central. However, repositories with directories containing many large files will likely show a drop. There shouldn't be any regressions due to partial manifest decoding because the update code already iterates the manifest to determine what files to operate on, so the manifest should already be decoded.

File last commit:

r28181:f8efc8a3 default
r28181:f8efc8a3 default
Show More
worker.py
162 lines | 4.5 KiB | text/x-python | PythonLexer
# worker.py - master-slave parallelism support
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import errno
import os
import signal
import sys
import threading
from .i18n import _
from . import error
def countcpus():
'''try to count the number of CPUs on the system'''
# posix
try:
n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
if n > 0:
return n
except (AttributeError, ValueError):
pass
# windows
try:
n = int(os.environ['NUMBER_OF_PROCESSORS'])
if n > 0:
return n
except (KeyError, ValueError):
pass
return 1
def _numworkers(ui):
s = ui.config('worker', 'numcpus')
if s:
try:
n = int(s)
if n >= 1:
return n
except ValueError:
raise error.Abort(_('number of cpus must be an integer'))
return min(max(countcpus(), 4), 32)
if os.name == 'posix':
_startupcost = 0.01
else:
_startupcost = 1e30
def worthwhile(ui, costperop, nops):
'''try to determine whether the benefit of multiple processes can
outweigh the cost of starting them'''
linear = costperop * nops
workers = _numworkers(ui)
benefit = linear - (_startupcost * workers + linear / workers)
return benefit >= 0.15
def worker(ui, costperarg, func, staticargs, args):
'''run a function, possibly in parallel in multiple worker
processes.
returns a progress iterator
costperarg - cost of a single task
func - function to run
staticargs - arguments to pass to every invocation of the function
args - arguments to split into chunks, to pass to individual
workers
'''
if worthwhile(ui, costperarg, len(args)):
return _platformworker(ui, func, staticargs, args)
return func(*staticargs + (args,))
def _posixworker(ui, func, staticargs, args):
rfd, wfd = os.pipe()
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
pids, problem = [], [0]
for pargs in partition(args, workers):
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGINT, oldhandler)
try:
os.close(rfd)
for i, item in func(*(staticargs + (pargs,))):
os.write(wfd, '%d %s\n' % (i, item))
os._exit(0)
except KeyboardInterrupt:
os._exit(255)
# other exceptions are allowed to propagate, we rely
# on lock.py's pid checks to avoid release callbacks
pids.append(pid)
pids.reverse()
os.close(wfd)
fp = os.fdopen(rfd, 'rb', 0)
def killworkers():
# if one worker bails, there's no good reason to wait for the rest
for p in pids:
try:
os.kill(p, signal.SIGTERM)
except OSError as err:
if err.errno != errno.ESRCH:
raise
def waitforworkers():
for _pid in pids:
st = _exitstatus(os.wait()[1])
if st and not problem[0]:
problem[0] = st
killworkers()
t = threading.Thread(target=waitforworkers)
t.start()
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
t.join()
status = problem[0]
if status:
if status < 0:
os.kill(os.getpid(), -status)
sys.exit(status)
try:
for line in fp:
l = line.split(' ', 1)
yield int(l[0]), l[1][:-1]
except: # re-raises
killworkers()
cleanup()
raise
cleanup()
def _posixexitstatus(code):
'''convert a posix exit status into the same form returned by
os.spawnv
returns None if the process was stopped instead of exiting'''
if os.WIFEXITED(code):
return os.WEXITSTATUS(code)
elif os.WIFSIGNALED(code):
return -os.WTERMSIG(code)
if os.name != 'nt':
_platformworker = _posixworker
_exitstatus = _posixexitstatus
def partition(lst, nslices):
'''partition a list into N slices of roughly equal size
The current strategy takes every Nth element from the input. If
we ever write workers that need to preserve grouping in input
we should consider allowing callers to specify a partition strategy.
'''
for i in range(nslices):
yield lst[i::nslices]