diff --git a/hgext/lfs/blobstore.py b/hgext/lfs/blobstore.py --- a/hgext/lfs/blobstore.py +++ b/hgext/lfs/blobstore.py @@ -597,7 +597,9 @@ class _gitlfsremote: continue raise - # Until https multiplexing gets sorted out + # Until https multiplexing gets sorted out. It's not clear if + # ConnectionManager.set_ready() is externally synchronized for thread + # safety with Windows workers. if self.ui.configbool(b'experimental', b'lfs.worker-enable'): # The POSIX workers are forks of this process, so before spinning # them up, close all pooled connections. Otherwise, there's no way @@ -608,7 +610,7 @@ class _gitlfsremote: # ready connections as in use, and roll that back after the fork? # That would allow the existing pool of connections in this process # to be preserved. - if not pycompat.iswindows: + def prefork(): for h in self.urlopener.handlers: getattr(h, "close_all", lambda: None)() @@ -618,6 +620,7 @@ class _gitlfsremote: transfer, (), sorted(objects, key=lambda o: o.get(b'oid')), + prefork=prefork, ) else: oids = transfer(sorted(objects, key=lambda o: o.get(b'oid'))) diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -125,7 +125,14 @@ def worthwhile(ui, costperop, nops, thre def worker( - ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True + ui, + costperarg, + func, + staticargs, + args, + hasretval=False, + threadsafe=True, + prefork=None, ): """run a function, possibly in parallel in multiple worker processes. @@ -149,6 +156,10 @@ def worker( threadsafe - whether work items are thread safe and can be executed using a thread-based worker. Should be disabled for CPU heavy tasks that don't release the GIL. + + prefork - a parameterless Callable that is invoked prior to forking the + process. fork() is only used on non-Windows platforms, but is also not + called on POSIX platforms if the work amount doesn't warrant a worker. """ enabled = ui.configbool(b'worker', b'enabled') if enabled and _platformworker is _posixworker and not ismainthread(): @@ -157,11 +168,13 @@ def worker( enabled = False if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): - return _platformworker(ui, func, staticargs, args, hasretval) + return _platformworker( + ui, func, staticargs, args, hasretval, prefork=prefork + ) return func(*staticargs + (args,)) -def _posixworker(ui, func, staticargs, args, hasretval): +def _posixworker(ui, func, staticargs, args, hasretval, prefork=None): workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -207,6 +220,10 @@ def _posixworker(ui, func, staticargs, a parentpid = os.getpid() pipes = [] retval = {} + + if prefork: + prefork() + for pargs in partition(args, min(workers, len(args))): # Every worker gets its own pipe to send results on, so we don't have to # implement atomic writes larger than PIPE_BUF. Each forked process has @@ -316,7 +333,7 @@ def _posixexitstatus(code): return -(os.WTERMSIG(code)) -def _windowsworker(ui, func, staticargs, args, hasretval): +def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None): class Worker(threading.Thread): def __init__( self, taskqueue, resultqueue, func, staticargs, *args, **kwargs