##// END OF EJS Templates
worker: support parallelization of functions with return values...
Valentin Gatien-Baron -
r42655:5ca136bb default
parent child Browse files
Show More
@@ -83,7 +83,8 b' def worthwhile(ui, costperop, nops, thre'
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 return benefit >= 0.15
84 return benefit >= 0.15
85
85
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
86 def worker(ui, costperarg, func, staticargs, args, hasretval=False,
87 threadsafe=True):
87 '''run a function, possibly in parallel in multiple worker
88 '''run a function, possibly in parallel in multiple worker
88 processes.
89 processes.
89
90
@@ -91,23 +92,27 b' def worker(ui, costperarg, func, statica'
91
92
92 costperarg - cost of a single task
93 costperarg - cost of a single task
93
94
94 func - function to run
95 func - function to run. It is expected to return a progress iterator.
95
96
96 staticargs - arguments to pass to every invocation of the function
97 staticargs - arguments to pass to every invocation of the function
97
98
98 args - arguments to split into chunks, to pass to individual
99 args - arguments to split into chunks, to pass to individual
99 workers
100 workers
100
101
102 hasretval - when True, func and the current function return an progress
103 iterator then a list (encoded as an iterator that yield many (False, ..)
104 then a (True, list)). The resulting list is in the natural order.
105
101 threadsafe - whether work items are thread safe and can be executed using
106 threadsafe - whether work items are thread safe and can be executed using
102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
107 a thread-based worker. Should be disabled for CPU heavy tasks that don't
103 release the GIL.
108 release the GIL.
104 '''
109 '''
105 enabled = ui.configbool('worker', 'enabled')
110 enabled = ui.configbool('worker', 'enabled')
106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
111 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
107 return _platformworker(ui, func, staticargs, args)
112 return _platformworker(ui, func, staticargs, args, hasretval)
108 return func(*staticargs + (args,))
113 return func(*staticargs + (args,))
109
114
110 def _posixworker(ui, func, staticargs, args):
115 def _posixworker(ui, func, staticargs, args, hasretval):
111 workers = _numworkers(ui)
116 workers = _numworkers(ui)
112 oldhandler = signal.getsignal(signal.SIGINT)
117 oldhandler = signal.getsignal(signal.SIGINT)
113 signal.signal(signal.SIGINT, signal.SIG_IGN)
118 signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -157,7 +162,8 b' def _posixworker(ui, func, staticargs, a'
157 ui.flush()
162 ui.flush()
158 parentpid = os.getpid()
163 parentpid = os.getpid()
159 pipes = []
164 pipes = []
160 for pargs in partition(args, workers):
165 retvals = []
166 for i, pargs in enumerate(partition(args, workers)):
161 # Every worker gets its own pipe to send results on, so we don't have to
167 # Every worker gets its own pipe to send results on, so we don't have to
162 # implement atomic writes larger than PIPE_BUF. Each forked process has
168 # implement atomic writes larger than PIPE_BUF. Each forked process has
163 # its own pipe's descriptors in the local variables, and the parent
169 # its own pipe's descriptors in the local variables, and the parent
@@ -165,6 +171,7 b' def _posixworker(ui, func, staticargs, a'
165 # care what order they're in).
171 # care what order they're in).
166 rfd, wfd = os.pipe()
172 rfd, wfd = os.pipe()
167 pipes.append((rfd, wfd))
173 pipes.append((rfd, wfd))
174 retvals.append(None)
168 # make sure we use os._exit in all worker code paths. otherwise the
175 # make sure we use os._exit in all worker code paths. otherwise the
169 # worker may do some clean-ups which could cause surprises like
176 # worker may do some clean-ups which could cause surprises like
170 # deadlock. see sshpeer.cleanup for example.
177 # deadlock. see sshpeer.cleanup for example.
@@ -185,7 +192,7 b' def _posixworker(ui, func, staticargs, a'
185 os.close(w)
192 os.close(w)
186 os.close(rfd)
193 os.close(rfd)
187 for result in func(*(staticargs + (pargs,))):
194 for result in func(*(staticargs + (pargs,))):
188 os.write(wfd, util.pickle.dumps(result))
195 os.write(wfd, util.pickle.dumps((i, result)))
189 return 0
196 return 0
190
197
191 ret = scmutil.callcatch(ui, workerfunc)
198 ret = scmutil.callcatch(ui, workerfunc)
@@ -219,7 +226,11 b' def _posixworker(ui, func, staticargs, a'
219 while openpipes > 0:
226 while openpipes > 0:
220 for key, events in selector.select():
227 for key, events in selector.select():
221 try:
228 try:
222 yield util.pickle.load(key.fileobj)
229 i, res = util.pickle.load(key.fileobj)
230 if hasretval and res[0]:
231 retvals[i] = res[1]
232 else:
233 yield res
223 except EOFError:
234 except EOFError:
224 selector.unregister(key.fileobj)
235 selector.unregister(key.fileobj)
225 key.fileobj.close()
236 key.fileobj.close()
@@ -237,6 +248,8 b' def _posixworker(ui, func, staticargs, a'
237 if status < 0:
248 if status < 0:
238 os.kill(os.getpid(), -status)
249 os.kill(os.getpid(), -status)
239 sys.exit(status)
250 sys.exit(status)
251 if hasretval:
252 yield True, sum(retvals, [])
240
253
241 def _posixexitstatus(code):
254 def _posixexitstatus(code):
242 '''convert a posix exit status into the same form returned by
255 '''convert a posix exit status into the same form returned by
@@ -248,7 +261,7 b' def _posixexitstatus(code):'
248 elif os.WIFSIGNALED(code):
261 elif os.WIFSIGNALED(code):
249 return -os.WTERMSIG(code)
262 return -os.WTERMSIG(code)
250
263
251 def _windowsworker(ui, func, staticargs, args):
264 def _windowsworker(ui, func, staticargs, args, hasretval):
252 class Worker(threading.Thread):
265 class Worker(threading.Thread):
253 def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
266 def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
254 **kwargs):
267 **kwargs):
@@ -268,9 +281,9 b' def _windowsworker(ui, func, staticargs,'
268 try:
281 try:
269 while not self._taskqueue.empty():
282 while not self._taskqueue.empty():
270 try:
283 try:
271 args = self._taskqueue.get_nowait()
284 i, args = self._taskqueue.get_nowait()
272 for res in self._func(*self._staticargs + (args,)):
285 for res in self._func(*self._staticargs + (args,)):
273 self._resultqueue.put(res)
286 self._resultqueue.put((i, res))
274 # threading doesn't provide a native way to
287 # threading doesn't provide a native way to
275 # interrupt execution. handle it manually at every
288 # interrupt execution. handle it manually at every
276 # iteration.
289 # iteration.
@@ -305,9 +318,11 b' def _windowsworker(ui, func, staticargs,'
305 workers = _numworkers(ui)
318 workers = _numworkers(ui)
306 resultqueue = pycompat.queue.Queue()
319 resultqueue = pycompat.queue.Queue()
307 taskqueue = pycompat.queue.Queue()
320 taskqueue = pycompat.queue.Queue()
321 retvals = []
308 # partition work to more pieces than workers to minimize the chance
322 # partition work to more pieces than workers to minimize the chance
309 # of uneven distribution of large tasks between the workers
323 # of uneven distribution of large tasks between the workers
310 for pargs in partition(args, workers * 20):
324 for pargs in enumerate(partition(args, workers * 20)):
325 retvals.append(None)
311 taskqueue.put(pargs)
326 taskqueue.put(pargs)
312 for _i in range(workers):
327 for _i in range(workers):
313 t = Worker(taskqueue, resultqueue, func, staticargs)
328 t = Worker(taskqueue, resultqueue, func, staticargs)
@@ -316,7 +331,11 b' def _windowsworker(ui, func, staticargs,'
316 try:
331 try:
317 while len(threads) > 0:
332 while len(threads) > 0:
318 while not resultqueue.empty():
333 while not resultqueue.empty():
319 yield resultqueue.get()
334 (i, res) = resultqueue.get()
335 if hasretval and res[0]:
336 retvals[i] = res[1]
337 else:
338 yield res
320 threads[0].join(0.05)
339 threads[0].join(0.05)
321 finishedthreads = [_t for _t in threads if not _t.is_alive()]
340 finishedthreads = [_t for _t in threads if not _t.is_alive()]
322 for t in finishedthreads:
341 for t in finishedthreads:
@@ -327,7 +346,13 b' def _windowsworker(ui, func, staticargs,'
327 trykillworkers()
346 trykillworkers()
328 raise
347 raise
329 while not resultqueue.empty():
348 while not resultqueue.empty():
330 yield resultqueue.get()
349 (i, res) = resultqueue.get()
350 if hasretval and res[0]:
351 retvals[i] = res[1]
352 else:
353 yield res
354 if hasretval:
355 yield True, sum(retvals, [])
331
356
332 if pycompat.iswindows:
357 if pycompat.iswindows:
333 _platformworker = _windowsworker
358 _platformworker = _windowsworker
General Comments 0
You need to be logged in to leave comments. Login now