##// END OF EJS Templates
workers: handling exceptions in windows workers...
Wojciech Lis -
r35428:71427ff1 default
parent child Browse files
Show More
@@ -214,18 +214,45 b' def _windowsworker(ui, func, staticargs,'
214 self._resultqueue = resultqueue
214 self._resultqueue = resultqueue
215 self._func = func
215 self._func = func
216 self._staticargs = staticargs
216 self._staticargs = staticargs
217 self._interrupted = False
218 self.exception = None
219
220 def interrupt(self):
221 self._interrupted = True
217
222
218 def run(self):
223 def run(self):
219 while not self._taskqueue.empty():
224 try:
220 try:
225 while not self._taskqueue.empty():
221 args = self._taskqueue.get_nowait()
226 try:
222 for res in self._func(*self._staticargs + (args,)):
227 args = self._taskqueue.get_nowait()
223 self._resultqueue.put(res)
228 for res in self._func(*self._staticargs + (args,)):
224 except util.empty:
229 self._resultqueue.put(res)
225 break
230 # threading doesn't provide a native way to
231 # interrupt execution. handle it manually at every
232 # iteration.
233 if self._interrupted:
234 return
235 except util.empty:
236 break
237 except Exception as e:
238 # store the exception such that the main thread can resurface
239 # it as if the func was running without workers.
240 self.exception = e
241 raise
242
243 threads = []
244 def killworkers():
245 for t in threads:
246 t.interrupt()
247 for t in threads:
248 # try to let the threads handle interruption, but don't wait
249 # indefintely. the thread could be in infinite loop, handling
250 # a very long task or in a deadlock situation
251 t.join(5)
252 if t.is_alive():
253 raise error.Abort(_('failed to join worker thread'))
226
254
227 workers = _numworkers(ui)
255 workers = _numworkers(ui)
228 threads = []
229 resultqueue = util.queue()
256 resultqueue = util.queue()
230 taskqueue = util.queue()
257 taskqueue = util.queue()
231 # partition work to more pieces than workers to minimize the chance
258 # partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 b' def _windowsworker(ui, func, staticargs,'
236 t = Worker(taskqueue, resultqueue, func, staticargs)
263 t = Worker(taskqueue, resultqueue, func, staticargs)
237 threads.append(t)
264 threads.append(t)
238 t.start()
265 t.start()
239 while any(t.is_alive() for t in threads):
266
267 while len(threads) > 0:
240 while not resultqueue.empty():
268 while not resultqueue.empty():
241 yield resultqueue.get()
269 yield resultqueue.get()
242 t = threads[0]
270 threads[0].join(0.05)
243 t.join(0.05)
271 finishedthreads = [_t for _t in threads if not _t.is_alive()]
244 if not t.is_alive():
272 for t in finishedthreads:
273 if t.exception is not None:
274 try:
275 killworkers()
276 except Exception:
277 # pass over the workers joining failure. it is more
278 # important to surface the inital exception than the
279 # fact that one of workers may be processing a large
280 # task and does not get to handle the interruption.
281 ui.warn(_("failed to kill worker threads while handling "
282 "an exception"))
283 raise t.exception
245 threads.remove(t)
284 threads.remove(t)
246 while not resultqueue.empty():
285 while not resultqueue.empty():
247 yield resultqueue.get()
286 yield resultqueue.get()
General Comments 0
You need to be logged in to leave comments. Login now