# HG changeset patch # User Wojciech Lis # Date 2017-11-20 18:25:29 # Node ID 02b36e860e0b893928d5f565417d55b5dd6495fc # Parent 60f2a215faa79ec0043792a223f09673d2c8baef workers: implemented worker on windows This change implements thread based worker on windows. The handling of exception from within threads will happen in separate diff. The worker is for now used in mercurial/merge.py and in lfs extension After multiple tests and milions of files materiealized, thousands lfs fetched it seems that neither merge.py nor lfs/blobstore.py is thread unsafe. I also looked through the code and besides the backgroundfilecloser (handled in base of this) things look good. The performance boost of this on windows is ~50% for sparse --enable-profile * Speedup of hg up/rebase - not exactly measured Test Plan: Ran 10s of hg sparse --enable-profile and --disable-profile operations on large profiles and verified that workers are running. Used sysinternals suite to see that all threads are spawned and run as they should Run various other operations on the repo including update and rebase Ran tests on CentOS and all tests that pass on @ pass here Differential Revision: https://phab.mercurial-scm.org/D1458 diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -11,6 +11,7 @@ import errno import os import signal import sys +import threading from .i18n import _ from . import ( @@ -53,7 +54,7 @@ def _numworkers(ui): raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) -if pycompat.isposix: +if pycompat.isposix or pycompat.iswindows: _startupcost = 0.01 else: _startupcost = 1e30 @@ -203,7 +204,51 @@ def _posixexitstatus(code): elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) -if not pycompat.iswindows: +def _windowsworker(ui, func, staticargs, args): + class Worker(threading.Thread): + def __init__(self, taskqueue, resultqueue, func, staticargs, + group=None, target=None, name=None, verbose=None): + threading.Thread.__init__(self, group=group, target=target, + name=name, verbose=verbose) + self._taskqueue = taskqueue + self._resultqueue = resultqueue + self._func = func + self._staticargs = staticargs + + def run(self): + while not self._taskqueue.empty(): + try: + args = self._taskqueue.get_nowait() + for res in self._func(*self._staticargs + (args,)): + self._resultqueue.put(res) + except util.empty: + break + + workers = _numworkers(ui) + threads = [] + resultqueue = util.queue() + taskqueue = util.queue() + # partition work to more pieces than workers to minimize the chance + # of uneven distribution of large tasks between the workers + for pargs in partition(args, workers * 20): + taskqueue.put(pargs) + for _i in range(workers): + t = Worker(taskqueue, resultqueue, func, staticargs) + threads.append(t) + t.start() + while any(t.is_alive() for t in threads): + while not resultqueue.empty(): + yield resultqueue.get() + t = threads[0] + t.join(0.05) + if not t.is_alive(): + threads.remove(t) + while not resultqueue.empty(): + yield resultqueue.get() + +if pycompat.iswindows: + _platformworker = _windowsworker +else: _platformworker = _posixworker _exitstatus = _posixexitstatus