Show More
@@ -83,7 +83,8 b' def worthwhile(ui, costperop, nops, thre' | |||
|
83 | 83 | benefit = linear - (_STARTUP_COST * workers + linear / workers) |
|
84 | 84 | return benefit >= 0.15 |
|
85 | 85 | |
|
86 |
def worker(ui, costperarg, func, staticargs, args, |
|
|
86 | def worker(ui, costperarg, func, staticargs, args, hasretval=False, | |
|
87 | threadsafe=True): | |
|
87 | 88 | '''run a function, possibly in parallel in multiple worker |
|
88 | 89 | processes. |
|
89 | 90 | |
@@ -91,23 +92,27 b' def worker(ui, costperarg, func, statica' | |||
|
91 | 92 | |
|
92 | 93 | costperarg - cost of a single task |
|
93 | 94 | |
|
94 | func - function to run | |
|
95 | func - function to run. It is expected to return a progress iterator. | |
|
95 | 96 | |
|
96 | 97 | staticargs - arguments to pass to every invocation of the function |
|
97 | 98 | |
|
98 | 99 | args - arguments to split into chunks, to pass to individual |
|
99 | 100 | workers |
|
100 | 101 | |
|
102 | hasretval - when True, func and the current function return an progress | |
|
103 | iterator then a list (encoded as an iterator that yield many (False, ..) | |
|
104 | then a (True, list)). The resulting list is in the natural order. | |
|
105 | ||
|
101 | 106 | threadsafe - whether work items are thread safe and can be executed using |
|
102 | 107 | a thread-based worker. Should be disabled for CPU heavy tasks that don't |
|
103 | 108 | release the GIL. |
|
104 | 109 | ''' |
|
105 | 110 | enabled = ui.configbool('worker', 'enabled') |
|
106 | 111 | if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe): |
|
107 | return _platformworker(ui, func, staticargs, args) | |
|
112 | return _platformworker(ui, func, staticargs, args, hasretval) | |
|
108 | 113 | return func(*staticargs + (args,)) |
|
109 | 114 | |
|
110 | def _posixworker(ui, func, staticargs, args): | |
|
115 | def _posixworker(ui, func, staticargs, args, hasretval): | |
|
111 | 116 | workers = _numworkers(ui) |
|
112 | 117 | oldhandler = signal.getsignal(signal.SIGINT) |
|
113 | 118 | signal.signal(signal.SIGINT, signal.SIG_IGN) |
@@ -157,7 +162,8 b' def _posixworker(ui, func, staticargs, a' | |||
|
157 | 162 | ui.flush() |
|
158 | 163 | parentpid = os.getpid() |
|
159 | 164 | pipes = [] |
|
160 | for pargs in partition(args, workers): | |
|
165 | retvals = [] | |
|
166 | for i, pargs in enumerate(partition(args, workers)): | |
|
161 | 167 | # Every worker gets its own pipe to send results on, so we don't have to |
|
162 | 168 | # implement atomic writes larger than PIPE_BUF. Each forked process has |
|
163 | 169 | # its own pipe's descriptors in the local variables, and the parent |
@@ -165,6 +171,7 b' def _posixworker(ui, func, staticargs, a' | |||
|
165 | 171 | # care what order they're in). |
|
166 | 172 | rfd, wfd = os.pipe() |
|
167 | 173 | pipes.append((rfd, wfd)) |
|
174 | retvals.append(None) | |
|
168 | 175 | # make sure we use os._exit in all worker code paths. otherwise the |
|
169 | 176 | # worker may do some clean-ups which could cause surprises like |
|
170 | 177 | # deadlock. see sshpeer.cleanup for example. |
@@ -185,7 +192,7 b' def _posixworker(ui, func, staticargs, a' | |||
|
185 | 192 | os.close(w) |
|
186 | 193 | os.close(rfd) |
|
187 | 194 | for result in func(*(staticargs + (pargs,))): |
|
188 | os.write(wfd, util.pickle.dumps(result)) | |
|
195 | os.write(wfd, util.pickle.dumps((i, result))) | |
|
189 | 196 | return 0 |
|
190 | 197 | |
|
191 | 198 | ret = scmutil.callcatch(ui, workerfunc) |
@@ -219,7 +226,11 b' def _posixworker(ui, func, staticargs, a' | |||
|
219 | 226 | while openpipes > 0: |
|
220 | 227 | for key, events in selector.select(): |
|
221 | 228 | try: |
|
222 |
|
|
|
229 | i, res = util.pickle.load(key.fileobj) | |
|
230 | if hasretval and res[0]: | |
|
231 | retvals[i] = res[1] | |
|
232 | else: | |
|
233 | yield res | |
|
223 | 234 | except EOFError: |
|
224 | 235 | selector.unregister(key.fileobj) |
|
225 | 236 | key.fileobj.close() |
@@ -237,6 +248,8 b' def _posixworker(ui, func, staticargs, a' | |||
|
237 | 248 | if status < 0: |
|
238 | 249 | os.kill(os.getpid(), -status) |
|
239 | 250 | sys.exit(status) |
|
251 | if hasretval: | |
|
252 | yield True, sum(retvals, []) | |
|
240 | 253 | |
|
241 | 254 | def _posixexitstatus(code): |
|
242 | 255 | '''convert a posix exit status into the same form returned by |
@@ -248,7 +261,7 b' def _posixexitstatus(code):' | |||
|
248 | 261 | elif os.WIFSIGNALED(code): |
|
249 | 262 | return -os.WTERMSIG(code) |
|
250 | 263 | |
|
251 | def _windowsworker(ui, func, staticargs, args): | |
|
264 | def _windowsworker(ui, func, staticargs, args, hasretval): | |
|
252 | 265 | class Worker(threading.Thread): |
|
253 | 266 | def __init__(self, taskqueue, resultqueue, func, staticargs, *args, |
|
254 | 267 | **kwargs): |
@@ -268,9 +281,9 b' def _windowsworker(ui, func, staticargs,' | |||
|
268 | 281 | try: |
|
269 | 282 | while not self._taskqueue.empty(): |
|
270 | 283 | try: |
|
271 | args = self._taskqueue.get_nowait() | |
|
284 | i, args = self._taskqueue.get_nowait() | |
|
272 | 285 | for res in self._func(*self._staticargs + (args,)): |
|
273 | self._resultqueue.put(res) | |
|
286 | self._resultqueue.put((i, res)) | |
|
274 | 287 | # threading doesn't provide a native way to |
|
275 | 288 | # interrupt execution. handle it manually at every |
|
276 | 289 | # iteration. |
@@ -305,9 +318,11 b' def _windowsworker(ui, func, staticargs,' | |||
|
305 | 318 | workers = _numworkers(ui) |
|
306 | 319 | resultqueue = pycompat.queue.Queue() |
|
307 | 320 | taskqueue = pycompat.queue.Queue() |
|
321 | retvals = [] | |
|
308 | 322 | # partition work to more pieces than workers to minimize the chance |
|
309 | 323 | # of uneven distribution of large tasks between the workers |
|
310 | for pargs in partition(args, workers * 20): | |
|
324 | for pargs in enumerate(partition(args, workers * 20)): | |
|
325 | retvals.append(None) | |
|
311 | 326 | taskqueue.put(pargs) |
|
312 | 327 | for _i in range(workers): |
|
313 | 328 | t = Worker(taskqueue, resultqueue, func, staticargs) |
@@ -316,7 +331,11 b' def _windowsworker(ui, func, staticargs,' | |||
|
316 | 331 | try: |
|
317 | 332 | while len(threads) > 0: |
|
318 | 333 | while not resultqueue.empty(): |
|
319 |
|
|
|
334 | (i, res) = resultqueue.get() | |
|
335 | if hasretval and res[0]: | |
|
336 | retvals[i] = res[1] | |
|
337 | else: | |
|
338 | yield res | |
|
320 | 339 | threads[0].join(0.05) |
|
321 | 340 | finishedthreads = [_t for _t in threads if not _t.is_alive()] |
|
322 | 341 | for t in finishedthreads: |
@@ -327,7 +346,13 b' def _windowsworker(ui, func, staticargs,' | |||
|
327 | 346 | trykillworkers() |
|
328 | 347 | raise |
|
329 | 348 | while not resultqueue.empty(): |
|
330 |
|
|
|
349 | (i, res) = resultqueue.get() | |
|
350 | if hasretval and res[0]: | |
|
351 | retvals[i] = res[1] | |
|
352 | else: | |
|
353 | yield res | |
|
354 | if hasretval: | |
|
355 | yield True, sum(retvals, []) | |
|
331 | 356 | |
|
332 | 357 | if pycompat.iswindows: |
|
333 | 358 | _platformworker = _windowsworker |
General Comments 0
You need to be logged in to leave comments.
Login now