##// END OF EJS Templates
lfs: avoid closing connections when the worker doesn't fork...
Matt Harbison -
r50439:3556f039 stable
parent child Browse files
Show More
@@ -597,7 +597,9 b' class _gitlfsremote:'
597 continue
597 continue
598 raise
598 raise
599
599
600 # Until https multiplexing gets sorted out
600 # Until https multiplexing gets sorted out. It's not clear if
601 # ConnectionManager.set_ready() is externally synchronized for thread
602 # safety with Windows workers.
601 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
603 if self.ui.configbool(b'experimental', b'lfs.worker-enable'):
602 # The POSIX workers are forks of this process, so before spinning
604 # The POSIX workers are forks of this process, so before spinning
603 # them up, close all pooled connections. Otherwise, there's no way
605 # them up, close all pooled connections. Otherwise, there's no way
@@ -608,7 +610,7 b' class _gitlfsremote:'
608 # ready connections as in use, and roll that back after the fork?
610 # ready connections as in use, and roll that back after the fork?
609 # That would allow the existing pool of connections in this process
611 # That would allow the existing pool of connections in this process
610 # to be preserved.
612 # to be preserved.
611 if not pycompat.iswindows:
613 def prefork():
612 for h in self.urlopener.handlers:
614 for h in self.urlopener.handlers:
613 getattr(h, "close_all", lambda: None)()
615 getattr(h, "close_all", lambda: None)()
614
616
@@ -618,6 +620,7 b' class _gitlfsremote:'
618 transfer,
620 transfer,
619 (),
621 (),
620 sorted(objects, key=lambda o: o.get(b'oid')),
622 sorted(objects, key=lambda o: o.get(b'oid')),
623 prefork=prefork,
621 )
624 )
622 else:
625 else:
623 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
626 oids = transfer(sorted(objects, key=lambda o: o.get(b'oid')))
@@ -125,7 +125,14 b' def worthwhile(ui, costperop, nops, thre'
125
125
126
126
127 def worker(
127 def worker(
128 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
128 ui,
129 costperarg,
130 func,
131 staticargs,
132 args,
133 hasretval=False,
134 threadsafe=True,
135 prefork=None,
129 ):
136 ):
130 """run a function, possibly in parallel in multiple worker
137 """run a function, possibly in parallel in multiple worker
131 processes.
138 processes.
@@ -149,6 +156,10 b' def worker('
149 threadsafe - whether work items are thread safe and can be executed using
156 threadsafe - whether work items are thread safe and can be executed using
150 a thread-based worker. Should be disabled for CPU heavy tasks that don't
157 a thread-based worker. Should be disabled for CPU heavy tasks that don't
151 release the GIL.
158 release the GIL.
159
160 prefork - a parameterless Callable that is invoked prior to forking the
161 process. fork() is only used on non-Windows platforms, but is also not
162 called on POSIX platforms if the work amount doesn't warrant a worker.
152 """
163 """
153 enabled = ui.configbool(b'worker', b'enabled')
164 enabled = ui.configbool(b'worker', b'enabled')
154 if enabled and _platformworker is _posixworker and not ismainthread():
165 if enabled and _platformworker is _posixworker and not ismainthread():
@@ -157,11 +168,13 b' def worker('
157 enabled = False
168 enabled = False
158
169
159 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
170 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
160 return _platformworker(ui, func, staticargs, args, hasretval)
171 return _platformworker(
172 ui, func, staticargs, args, hasretval, prefork=prefork
173 )
161 return func(*staticargs + (args,))
174 return func(*staticargs + (args,))
162
175
163
176
164 def _posixworker(ui, func, staticargs, args, hasretval):
177 def _posixworker(ui, func, staticargs, args, hasretval, prefork=None):
165 workers = _numworkers(ui)
178 workers = _numworkers(ui)
166 oldhandler = signal.getsignal(signal.SIGINT)
179 oldhandler = signal.getsignal(signal.SIGINT)
167 signal.signal(signal.SIGINT, signal.SIG_IGN)
180 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -207,6 +220,10 b' def _posixworker(ui, func, staticargs, a'
207 parentpid = os.getpid()
220 parentpid = os.getpid()
208 pipes = []
221 pipes = []
209 retval = {}
222 retval = {}
223
224 if prefork:
225 prefork()
226
210 for pargs in partition(args, min(workers, len(args))):
227 for pargs in partition(args, min(workers, len(args))):
211 # Every worker gets its own pipe to send results on, so we don't have to
228 # Every worker gets its own pipe to send results on, so we don't have to
212 # implement atomic writes larger than PIPE_BUF. Each forked process has
229 # implement atomic writes larger than PIPE_BUF. Each forked process has
@@ -316,7 +333,7 b' def _posixexitstatus(code):'
316 return -(os.WTERMSIG(code))
333 return -(os.WTERMSIG(code))
317
334
318
335
319 def _windowsworker(ui, func, staticargs, args, hasretval):
336 def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None):
320 class Worker(threading.Thread):
337 class Worker(threading.Thread):
321 def __init__(
338 def __init__(
322 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
339 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
General Comments 0
You need to be logged in to leave comments. Login now