Show More
@@ -597,7 +597,9 b' class _gitlfsremote:' | |||||
597 | continue |
|
597 | continue | |
598 | raise |
|
598 | raise | |
599 |
|
599 | |||
600 | # Until https multiplexing gets sorted out |
|
600 | # Until https multiplexing gets sorted out. It's not clear if | |
|
601 | # ConnectionManager.set_ready() is externally synchronized for thread | |||
|
602 | # safety with Windows workers. | |||
601 | if self.ui.configbool(b'experimental', b'lfs.worker-enable'): |
|
603 | if self.ui.configbool(b'experimental', b'lfs.worker-enable'): | |
602 | # The POSIX workers are forks of this process, so before spinning |
|
604 | # The POSIX workers are forks of this process, so before spinning | |
603 | # them up, close all pooled connections. Otherwise, there's no way |
|
605 | # them up, close all pooled connections. Otherwise, there's no way | |
@@ -608,7 +610,7 b' class _gitlfsremote:' | |||||
608 | # ready connections as in use, and roll that back after the fork? |
|
610 | # ready connections as in use, and roll that back after the fork? | |
609 | # That would allow the existing pool of connections in this process |
|
611 | # That would allow the existing pool of connections in this process | |
610 | # to be preserved. |
|
612 | # to be preserved. | |
611 | if not pycompat.iswindows: |
|
613 | def prefork(): | |
612 | for h in self.urlopener.handlers: |
|
614 | for h in self.urlopener.handlers: | |
613 | getattr(h, "close_all", lambda: None)() |
|
615 | getattr(h, "close_all", lambda: None)() | |
614 |
|
616 | |||
@@ -618,6 +620,7 b' class _gitlfsremote:' | |||||
618 | transfer, |
|
620 | transfer, | |
619 | (), |
|
621 | (), | |
620 | sorted(objects, key=lambda o: o.get(b'oid')), |
|
622 | sorted(objects, key=lambda o: o.get(b'oid')), | |
|
623 | prefork=prefork, | |||
621 | ) |
|
624 | ) | |
622 | else: |
|
625 | else: | |
623 | oids = transfer(sorted(objects, key=lambda o: o.get(b'oid'))) |
|
626 | oids = transfer(sorted(objects, key=lambda o: o.get(b'oid'))) |
@@ -125,7 +125,14 b' def worthwhile(ui, costperop, nops, thre' | |||||
125 |
|
125 | |||
126 |
|
126 | |||
127 | def worker( |
|
127 | def worker( | |
128 | ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True |
|
128 | ui, | |
|
129 | costperarg, | |||
|
130 | func, | |||
|
131 | staticargs, | |||
|
132 | args, | |||
|
133 | hasretval=False, | |||
|
134 | threadsafe=True, | |||
|
135 | prefork=None, | |||
129 | ): |
|
136 | ): | |
130 | """run a function, possibly in parallel in multiple worker |
|
137 | """run a function, possibly in parallel in multiple worker | |
131 | processes. |
|
138 | processes. | |
@@ -149,6 +156,10 b' def worker(' | |||||
149 | threadsafe - whether work items are thread safe and can be executed using |
|
156 | threadsafe - whether work items are thread safe and can be executed using | |
150 | a thread-based worker. Should be disabled for CPU heavy tasks that don't |
|
157 | a thread-based worker. Should be disabled for CPU heavy tasks that don't | |
151 | release the GIL. |
|
158 | release the GIL. | |
|
159 | ||||
|
160 | prefork - a parameterless Callable that is invoked prior to forking the | |||
|
161 | process. fork() is only used on non-Windows platforms, but is also not | |||
|
162 | called on POSIX platforms if the work amount doesn't warrant a worker. | |||
152 | """ |
|
163 | """ | |
153 | enabled = ui.configbool(b'worker', b'enabled') |
|
164 | enabled = ui.configbool(b'worker', b'enabled') | |
154 | if enabled and _platformworker is _posixworker and not ismainthread(): |
|
165 | if enabled and _platformworker is _posixworker and not ismainthread(): | |
@@ -157,11 +168,13 b' def worker(' | |||||
157 | enabled = False |
|
168 | enabled = False | |
158 |
|
169 | |||
159 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
|
170 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | |
160 |
return _platformworker( |
|
171 | return _platformworker( | |
|
172 | ui, func, staticargs, args, hasretval, prefork=prefork | |||
|
173 | ) | |||
161 | return func(*staticargs + (args,)) |
|
174 | return func(*staticargs + (args,)) | |
162 |
|
175 | |||
163 |
|
176 | |||
164 | def _posixworker(ui, func, staticargs, args, hasretval): |
|
177 | def _posixworker(ui, func, staticargs, args, hasretval, prefork=None): | |
165 | workers = _numworkers(ui) |
|
178 | workers = _numworkers(ui) | |
166 | oldhandler = signal.getsignal(signal.SIGINT) |
|
179 | oldhandler = signal.getsignal(signal.SIGINT) | |
167 | signal.signal(signal.SIGINT, signal.SIG_IGN) |
|
180 | signal.signal(signal.SIGINT, signal.SIG_IGN) | |
@@ -207,6 +220,10 b' def _posixworker(ui, func, staticargs, a' | |||||
207 | parentpid = os.getpid() |
|
220 | parentpid = os.getpid() | |
208 | pipes = [] |
|
221 | pipes = [] | |
209 | retval = {} |
|
222 | retval = {} | |
|
223 | ||||
|
224 | if prefork: | |||
|
225 | prefork() | |||
|
226 | ||||
210 | for pargs in partition(args, min(workers, len(args))): |
|
227 | for pargs in partition(args, min(workers, len(args))): | |
211 | # Every worker gets its own pipe to send results on, so we don't have to |
|
228 | # Every worker gets its own pipe to send results on, so we don't have to | |
212 | # implement atomic writes larger than PIPE_BUF. Each forked process has |
|
229 | # implement atomic writes larger than PIPE_BUF. Each forked process has | |
@@ -316,7 +333,7 b' def _posixexitstatus(code):' | |||||
316 | return -(os.WTERMSIG(code)) |
|
333 | return -(os.WTERMSIG(code)) | |
317 |
|
334 | |||
318 |
|
335 | |||
319 | def _windowsworker(ui, func, staticargs, args, hasretval): |
|
336 | def _windowsworker(ui, func, staticargs, args, hasretval, prefork=None): | |
320 | class Worker(threading.Thread): |
|
337 | class Worker(threading.Thread): | |
321 | def __init__( |
|
338 | def __init__( | |
322 | self, taskqueue, resultqueue, func, staticargs, *args, **kwargs |
|
339 | self, taskqueue, resultqueue, func, staticargs, *args, **kwargs |
General Comments 0
You need to be logged in to leave comments.
Login now