##// END OF EJS Templates
lfs: avoid closing connections when the worker doesn't fork...
Matt Harbison -
r50439:3556f039 stable
parent child Browse files
Show More
@@ -597,7 +597,9 b' class _gitlfsremote:'
597 597 continue
598 598 raise
599 599
600 # Until https multiplexing gets sorted out
600 # Until https multiplexing gets sorted out. It's not clear if
601 # ConnectionManager.set_ready() is externally synchronized for thread
602 # safety with Windows workers.
601 603 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
602 604 # The POSIX workers are forks of this process, so before spinning
603 605 # them up, close all pooled connections. Otherwise, there's no way
@@ -608,7 +610,7 b' class _gitlfsremote:'
608 610 # ready connections as in use, and roll that back after the fork?
609 611 # That would allow the existing pool of connections in this process
610 612 # to be preserved.
611 if not pycompat.iswindows:
613 def prefork():
612 614 for h in self.urlopener.handlers:
613 615 getattr(h, "close_all", lambda: None)()
614 616
@@ -618,6 +620,7 b' class _gitlfsremote:'
618 620 transfer,
619 621 (),
620 622 sorted(objects, key=lambda o: o.get(b'oid')),
623 prefork=prefork,
621 624 )
622 625 else:
623 626 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
@@ -125,7 +125,14 b' def worthwhile(ui, costperop, nops, thre'
125 125
126 126
127 127 def worker(
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 ui,
129 costperarg,
130 func,
131 staticargs,
132 args,
133 hasretval=False,
134 threadsafe=True,
135 prefork=None,
129 136 ):
130 137 """run a function, possibly in parallel in multiple worker
131 138 processes.
@@ -149,6 +156,10 b' def worker('
149 156 threadsafe - whether work items are thread safe and can be executed using
150 157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 158 release the GIL.
159
160 prefork - a parameterless Callable that is invoked prior to forking the
161 process. fork() is only used on non-Windows platforms, but is also not
162 called on POSIX platforms if the work amount doesn't warrant a worker.
152 163 """
153 164 enabled = ui.configbool(b'worker', b'enabled')
154 165 if enabled and _platformworker is _posixworker and not ismainthread():
@@ -157,11 +168,13 b' def worker('
157 168 enabled = False
158 169
159 170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
160 return _platformworker(ui, func, staticargs, args, hasretval)
171 return _platformworker(
172 ui, func, staticargs, args, hasretval, prefork=prefork
173 )
161 174 return func(*staticargs + (args,))
162 175
163 176
164 def _posixworker(ui, func, staticargs, args, hasretval):
177 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
165 178 workers = _numworkers(ui)
166 179 oldhandler = signal.getsignal(signal.SIGINT)
167 180 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -207,6 +220,10 b' def _posixworker(ui, func, staticargs, a'
207 220 parentpid = os.getpid()
208 221 pipes = []
209 222 retval = {}
223
224 if prefork:
225 prefork()
226
210 227 for pargs in partition(args, min(workers, len(args))):
211 228 # Every worker gets its own pipe to send results on, so we don't have to
212 229 # implement atomic writes larger than PIPE_BUF. Each forked process has
@@ -316,7 +333,7 b' def _posixexitstatus(code):'
316 333 return -(os.WTERMSIG(code))
317 334
318 335
319 def _windowsworker(ui, func, staticargs, args, hasretval):
336 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
320 337 class Worker(threading.Thread):
321 338 def __init__(
322 339 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
General Comments 0
You need to be logged in to leave comments. Login now