Show More
@@ -214,18 +214,45 b' def _windowsworker(ui, func, staticargs,' | |||||
214 | self._resultqueue = resultqueue |
|
214 | self._resultqueue = resultqueue | |
215 | self._func = func |
|
215 | self._func = func | |
216 | self._staticargs = staticargs |
|
216 | self._staticargs = staticargs | |
|
217 | self._interrupted = False | |||
|
218 | self.exception = None | |||
|
219 | ||||
|
220 | def interrupt(self): | |||
|
221 | self._interrupted = True | |||
217 |
|
222 | |||
218 | def run(self): |
|
223 | def run(self): | |
219 | while not self._taskqueue.empty(): |
|
224 | try: | |
220 | try: |
|
225 | while not self._taskqueue.empty(): | |
221 | args = self._taskqueue.get_nowait() |
|
226 | try: | |
222 | for res in self._func(*self._staticargs + (args,)): |
|
227 | args = self._taskqueue.get_nowait() | |
223 | self._resultqueue.put(res) |
|
228 | for res in self._func(*self._staticargs + (args,)): | |
224 | except util.empty: |
|
229 | self._resultqueue.put(res) | |
225 | break |
|
230 | # threading doesn't provide a native way to | |
|
231 | # interrupt execution. handle it manually at every | |||
|
232 | # iteration. | |||
|
233 | if self._interrupted: | |||
|
234 | return | |||
|
235 | except util.empty: | |||
|
236 | break | |||
|
237 | except Exception as e: | |||
|
238 | # store the exception such that the main thread can resurface | |||
|
239 | # it as if the func was running without workers. | |||
|
240 | self.exception = e | |||
|
241 | raise | |||
|
242 | ||||
|
243 | threads = [] | |||
|
244 | def killworkers(): | |||
|
245 | for t in threads: | |||
|
246 | t.interrupt() | |||
|
247 | for t in threads: | |||
|
248 | # try to let the threads handle interruption, but don't wait | |||
|
249 | # indefintely. the thread could be in infinite loop, handling | |||
|
250 | # a very long task or in a deadlock situation | |||
|
251 | t.join(5) | |||
|
252 | if t.is_alive(): | |||
|
253 | raise error.Abort(_('failed to join worker thread')) | |||
226 |
|
254 | |||
227 | workers = _numworkers(ui) |
|
255 | workers = _numworkers(ui) | |
228 | threads = [] |
|
|||
229 | resultqueue = util.queue() |
|
256 | resultqueue = util.queue() | |
230 | taskqueue = util.queue() |
|
257 | taskqueue = util.queue() | |
231 | # partition work to more pieces than workers to minimize the chance |
|
258 | # partition work to more pieces than workers to minimize the chance | |
@@ -236,12 +263,24 b' def _windowsworker(ui, func, staticargs,' | |||||
236 | t = Worker(taskqueue, resultqueue, func, staticargs) |
|
263 | t = Worker(taskqueue, resultqueue, func, staticargs) | |
237 | threads.append(t) |
|
264 | threads.append(t) | |
238 | t.start() |
|
265 | t.start() | |
239 | while any(t.is_alive() for t in threads): |
|
266 | ||
|
267 | while len(threads) > 0: | |||
240 | while not resultqueue.empty(): |
|
268 | while not resultqueue.empty(): | |
241 | yield resultqueue.get() |
|
269 | yield resultqueue.get() | |
242 |
|
|
270 | threads[0].join(0.05) | |
243 | t.join(0.05) |
|
271 | finishedthreads = [_t for _t in threads if not _t.is_alive()] | |
244 | if not t.is_alive(): |
|
272 | for t in finishedthreads: | |
|
273 | if t.exception is not None: | |||
|
274 | try: | |||
|
275 | killworkers() | |||
|
276 | except Exception: | |||
|
277 | # pass over the workers joining failure. it is more | |||
|
278 | # important to surface the inital exception than the | |||
|
279 | # fact that one of workers may be processing a large | |||
|
280 | # task and does not get to handle the interruption. | |||
|
281 | ui.warn(_("failed to kill worker threads while handling " | |||
|
282 | "an exception")) | |||
|
283 | raise t.exception | |||
245 | threads.remove(t) |
|
284 | threads.remove(t) | |
246 | while not resultqueue.empty(): |
|
285 | while not resultqueue.empty(): | |
247 | yield resultqueue.get() |
|
286 | yield resultqueue.get() |
General Comments 0
You need to be logged in to leave comments.
Login now