Show More
@@ -83,7 +83,8 b' def worthwhile(ui, costperop, nops, thre' | |||||
83 | benefit = linear - (_STARTUP_COST * workers + linear / workers) |
|
83 | benefit = linear - (_STARTUP_COST * workers + linear / workers) | |
84 | return benefit >= 0.15 |
|
84 | return benefit >= 0.15 | |
85 |
|
85 | |||
86 |
def worker(ui, costperarg, func, staticargs, args, |
|
86 | def worker(ui, costperarg, func, staticargs, args, hasretval=False, | |
|
87 | threadsafe=True): | |||
87 | '''run a function, possibly in parallel in multiple worker |
|
88 | '''run a function, possibly in parallel in multiple worker | |
88 | processes. |
|
89 | processes. | |
89 |
|
90 | |||
@@ -91,23 +92,27 b' def worker(ui, costperarg, func, statica' | |||||
91 |
|
92 | |||
92 | costperarg - cost of a single task |
|
93 | costperarg - cost of a single task | |
93 |
|
94 | |||
94 | func - function to run |
|
95 | func - function to run. It is expected to return a progress iterator. | |
95 |
|
96 | |||
96 | staticargs - arguments to pass to every invocation of the function |
|
97 | staticargs - arguments to pass to every invocation of the function | |
97 |
|
98 | |||
98 | args - arguments to split into chunks, to pass to individual |
|
99 | args - arguments to split into chunks, to pass to individual | |
99 | workers |
|
100 | workers | |
100 |
|
101 | |||
|
102 | hasretval - when True, func and the current function return an progress | |||
|
103 | iterator then a list (encoded as an iterator that yield many (False, ..) | |||
|
104 | then a (True, list)). The resulting list is in the natural order. | |||
|
105 | ||||
101 | threadsafe - whether work items are thread safe and can be executed using |
|
106 | threadsafe - whether work items are thread safe and can be executed using | |
102 | a thread-based worker. Should be disabled for CPU heavy tasks that don't |
|
107 | a thread-based worker. Should be disabled for CPU heavy tasks that don't | |
103 | release the GIL. |
|
108 | release the GIL. | |
104 | ''' |
|
109 | ''' | |
105 | enabled = ui.configbool('worker', 'enabled') |
|
110 | enabled = ui.configbool('worker', 'enabled') | |
106 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
|
111 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): | |
107 | return _platformworker(ui, func, staticargs, args) |
|
112 | return _platformworker(ui, func, staticargs, args, hasretval) | |
108 | return func(*staticargs + (args,)) |
|
113 | return func(*staticargs + (args,)) | |
109 |
|
114 | |||
110 | def _posixworker(ui, func, staticargs, args): |
|
115 | def _posixworker(ui, func, staticargs, args, hasretval): | |
111 | workers = _numworkers(ui) |
|
116 | workers = _numworkers(ui) | |
112 | oldhandler = signal.getsignal(signal.SIGINT) |
|
117 | oldhandler = signal.getsignal(signal.SIGINT) | |
113 | signal.signal(signal.SIGINT, signal.SIG_IGN) |
|
118 | signal.signal(signal.SIGINT, signal.SIG_IGN) | |
@@ -157,7 +162,8 b' def _posixworker(ui, func, staticargs, a' | |||||
157 | ui.flush() |
|
162 | ui.flush() | |
158 | parentpid = os.getpid() |
|
163 | parentpid = os.getpid() | |
159 | pipes = [] |
|
164 | pipes = [] | |
160 | for pargs in partition(args, workers): |
|
165 | retvals = [] | |
|
166 | for i, pargs in enumerate(partition(args, workers)): | |||
161 | # Every worker gets its own pipe to send results on, so we don't have to |
|
167 | # Every worker gets its own pipe to send results on, so we don't have to | |
162 | # implement atomic writes larger than PIPE_BUF. Each forked process has |
|
168 | # implement atomic writes larger than PIPE_BUF. Each forked process has | |
163 | # its own pipe's descriptors in the local variables, and the parent |
|
169 | # its own pipe's descriptors in the local variables, and the parent | |
@@ -165,6 +171,7 b' def _posixworker(ui, func, staticargs, a' | |||||
165 | # care what order they're in). |
|
171 | # care what order they're in). | |
166 | rfd, wfd = os.pipe() |
|
172 | rfd, wfd = os.pipe() | |
167 | pipes.append((rfd, wfd)) |
|
173 | pipes.append((rfd, wfd)) | |
|
174 | retvals.append(None) | |||
168 | # make sure we use os._exit in all worker code paths. otherwise the |
|
175 | # make sure we use os._exit in all worker code paths. otherwise the | |
169 | # worker may do some clean-ups which could cause surprises like |
|
176 | # worker may do some clean-ups which could cause surprises like | |
170 | # deadlock. see sshpeer.cleanup for example. |
|
177 | # deadlock. see sshpeer.cleanup for example. | |
@@ -185,7 +192,7 b' def _posixworker(ui, func, staticargs, a' | |||||
185 | os.close(w) |
|
192 | os.close(w) | |
186 | os.close(rfd) |
|
193 | os.close(rfd) | |
187 | for result in func(*(staticargs + (pargs,))): |
|
194 | for result in func(*(staticargs + (pargs,))): | |
188 | os.write(wfd, util.pickle.dumps(result)) |
|
195 | os.write(wfd, util.pickle.dumps((i, result))) | |
189 | return 0 |
|
196 | return 0 | |
190 |
|
197 | |||
191 | ret = scmutil.callcatch(ui, workerfunc) |
|
198 | ret = scmutil.callcatch(ui, workerfunc) | |
@@ -219,7 +226,11 b' def _posixworker(ui, func, staticargs, a' | |||||
219 | while openpipes > 0: |
|
226 | while openpipes > 0: | |
220 | for key, events in selector.select(): |
|
227 | for key, events in selector.select(): | |
221 | try: |
|
228 | try: | |
222 |
|
|
229 | i, res = util.pickle.load(key.fileobj) | |
|
230 | if hasretval and res[0]: | |||
|
231 | retvals[i] = res[1] | |||
|
232 | else: | |||
|
233 | yield res | |||
223 | except EOFError: |
|
234 | except EOFError: | |
224 | selector.unregister(key.fileobj) |
|
235 | selector.unregister(key.fileobj) | |
225 | key.fileobj.close() |
|
236 | key.fileobj.close() | |
@@ -237,6 +248,8 b' def _posixworker(ui, func, staticargs, a' | |||||
237 | if status < 0: |
|
248 | if status < 0: | |
238 | os.kill(os.getpid(), -status) |
|
249 | os.kill(os.getpid(), -status) | |
239 | sys.exit(status) |
|
250 | sys.exit(status) | |
|
251 | if hasretval: | |||
|
252 | yield True, sum(retvals, []) | |||
240 |
|
253 | |||
241 | def _posixexitstatus(code): |
|
254 | def _posixexitstatus(code): | |
242 | '''convert a posix exit status into the same form returned by |
|
255 | '''convert a posix exit status into the same form returned by | |
@@ -248,7 +261,7 b' def _posixexitstatus(code):' | |||||
248 | elif os.WIFSIGNALED(code): |
|
261 | elif os.WIFSIGNALED(code): | |
249 | return -os.WTERMSIG(code) |
|
262 | return -os.WTERMSIG(code) | |
250 |
|
263 | |||
251 | def _windowsworker(ui, func, staticargs, args): |
|
264 | def _windowsworker(ui, func, staticargs, args, hasretval): | |
252 | class Worker(threading.Thread): |
|
265 | class Worker(threading.Thread): | |
253 | def __init__(self, taskqueue, resultqueue, func, staticargs, *args, |
|
266 | def __init__(self, taskqueue, resultqueue, func, staticargs, *args, | |
254 | **kwargs): |
|
267 | **kwargs): | |
@@ -268,9 +281,9 b' def _windowsworker(ui, func, staticargs,' | |||||
268 | try: |
|
281 | try: | |
269 | while not self._taskqueue.empty(): |
|
282 | while not self._taskqueue.empty(): | |
270 | try: |
|
283 | try: | |
271 | args = self._taskqueue.get_nowait() |
|
284 | i, args = self._taskqueue.get_nowait() | |
272 | for res in self._func(*self._staticargs + (args,)): |
|
285 | for res in self._func(*self._staticargs + (args,)): | |
273 | self._resultqueue.put(res) |
|
286 | self._resultqueue.put((i, res)) | |
274 | # threading doesn't provide a native way to |
|
287 | # threading doesn't provide a native way to | |
275 | # interrupt execution. handle it manually at every |
|
288 | # interrupt execution. handle it manually at every | |
276 | # iteration. |
|
289 | # iteration. | |
@@ -305,9 +318,11 b' def _windowsworker(ui, func, staticargs,' | |||||
305 | workers = _numworkers(ui) |
|
318 | workers = _numworkers(ui) | |
306 | resultqueue = pycompat.queue.Queue() |
|
319 | resultqueue = pycompat.queue.Queue() | |
307 | taskqueue = pycompat.queue.Queue() |
|
320 | taskqueue = pycompat.queue.Queue() | |
|
321 | retvals = [] | |||
308 | # partition work to more pieces than workers to minimize the chance |
|
322 | # partition work to more pieces than workers to minimize the chance | |
309 | # of uneven distribution of large tasks between the workers |
|
323 | # of uneven distribution of large tasks between the workers | |
310 | for pargs in partition(args, workers * 20): |
|
324 | for pargs in enumerate(partition(args, workers * 20)): | |
|
325 | retvals.append(None) | |||
311 | taskqueue.put(pargs) |
|
326 | taskqueue.put(pargs) | |
312 | for _i in range(workers): |
|
327 | for _i in range(workers): | |
313 | t = Worker(taskqueue, resultqueue, func, staticargs) |
|
328 | t = Worker(taskqueue, resultqueue, func, staticargs) | |
@@ -316,7 +331,11 b' def _windowsworker(ui, func, staticargs,' | |||||
316 | try: |
|
331 | try: | |
317 | while len(threads) > 0: |
|
332 | while len(threads) > 0: | |
318 | while not resultqueue.empty(): |
|
333 | while not resultqueue.empty(): | |
319 |
|
|
334 | (i, res) = resultqueue.get() | |
|
335 | if hasretval and res[0]: | |||
|
336 | retvals[i] = res[1] | |||
|
337 | else: | |||
|
338 | yield res | |||
320 | threads[0].join(0.05) |
|
339 | threads[0].join(0.05) | |
321 | finishedthreads = [_t for _t in threads if not _t.is_alive()] |
|
340 | finishedthreads = [_t for _t in threads if not _t.is_alive()] | |
322 | for t in finishedthreads: |
|
341 | for t in finishedthreads: | |
@@ -327,7 +346,13 b' def _windowsworker(ui, func, staticargs,' | |||||
327 | trykillworkers() |
|
346 | trykillworkers() | |
328 | raise |
|
347 | raise | |
329 | while not resultqueue.empty(): |
|
348 | while not resultqueue.empty(): | |
330 |
|
|
349 | (i, res) = resultqueue.get() | |
|
350 | if hasretval and res[0]: | |||
|
351 | retvals[i] = res[1] | |||
|
352 | else: | |||
|
353 | yield res | |||
|
354 | if hasretval: | |||
|
355 | yield True, sum(retvals, []) | |||
331 |
|
356 | |||
332 | if pycompat.iswindows: |
|
357 | if pycompat.iswindows: | |
333 | _platformworker = _windowsworker |
|
358 | _platformworker = _windowsworker |
General Comments 0
You need to be logged in to leave comments.
Login now