# HG changeset patch # User Bryan O'Sullivan # Date 2013-02-09 23:51:32 # Node ID 047110c0e2a8371b54e55a051b5e4f6753eca90d # Parent ac4dbceeb14a5d2d3d1db4b52839df65d5405641 worker: allow a function to be run in multiple worker processes If we estimate that it will be worth the cost, we run the function in multiple processes. Otherwise, we run it in-process. Children report progress to the parent through a pipe. Not yet implemented on Windows. diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -6,7 +6,7 @@ # GNU General Public License version 2 or any later version. from i18n import _ -import os, util +import os, signal, sys, util def countcpus(): '''try to count the number of CPUs on the system''' @@ -53,6 +53,62 @@ def worthwhile(ui, costperop, nops): benefit = linear - (_startupcost * workers + linear / workers) return benefit >= 0.15 +def worker(ui, costperarg, func, staticargs, args): + '''run a function, possibly in parallel in multiple worker + processes. + + returns a progress iterator + + costperarg - cost of a single task + + func - function to run + + staticargs - arguments to pass to every invocation of the function + + args - arguments to split into chunks, to pass to individual + workers + ''' + if worthwhile(ui, costperarg, len(args)): + return _platformworker(ui, func, staticargs, args) + return func(*staticargs + (args,)) + +def _posixworker(ui, func, staticargs, args): + rfd, wfd = os.pipe() + workers = _numworkers(ui) + for pargs in partition(args, workers): + pid = os.fork() + if pid == 0: + try: + os.close(rfd) + for i, item in func(*(staticargs + (pargs,))): + os.write(wfd, '%d %s\n' % (i, item)) + os._exit(0) + except KeyboardInterrupt: + os._exit(255) + os.close(wfd) + fp = os.fdopen(rfd, 'rb', 0) + oldhandler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, signal.SIG_IGN) + def cleanup(): + # python 2.4 is too dumb for try/yield/finally + signal.signal(signal.SIGINT, oldhandler) + problems = 0 + for i in xrange(workers): + problems |= os.wait()[1] + if problems: + sys.exit(1) + try: + for line in fp: + l = line.split(' ', 1) + yield int(l[0]), l[1][:-1] + except: # re-raises + cleanup() + raise + cleanup() + +if os.name != 'nt': + _platformworker = _posixworker + def partition(lst, nslices): '''partition a list into N slices of equal size''' n = len(lst)