##// END OF EJS Templates
worker: make windows workers daemons...
Wojciech Lis -
r35448:86b8cc1f default
parent child Browse files
Show More
@@ -12,6 +12,7 b' import os'
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15
16
16 from .i18n import _
17 from .i18n import _
17 from . import (
18 from . import (
@@ -216,6 +217,7 b' def _windowsworker(ui, func, staticargs,'
216 self._func = func
217 self._func = func
217 self._staticargs = staticargs
218 self._staticargs = staticargs
218 self._interrupted = False
219 self._interrupted = False
220 self.daemon = True
219 self.exception = None
221 self.exception = None
220
222
221 def interrupt(self):
223 def interrupt(self):
@@ -242,16 +244,22 b' def _windowsworker(ui, func, staticargs,'
242 raise
244 raise
243
245
244 threads = []
246 threads = []
245 def killworkers():
247 def trykillworkers():
248 # Allow up to 1 second to clean worker threads nicely
249 cleanupend = time.time() + 1
246 for t in threads:
250 for t in threads:
247 t.interrupt()
251 t.interrupt()
248 for t in threads:
252 for t in threads:
249 # try to let the threads handle interruption, but don't wait
253 remainingtime = cleanupend - time.time()
250 # indefintely. the thread could be in infinite loop, handling
254 t.join(remainingtime)
251 # a very long task or in a deadlock situation
252 t.join(5)
253 if t.is_alive():
255 if t.is_alive():
254 raise error.Abort(_('failed to join worker thread'))
256 # pass over the workers joining failure. it is more
257 # important to surface the inital exception than the
258 # fact that one of workers may be processing a large
259 # task and does not get to handle the interruption.
260 ui.warn(_("failed to kill worker threads while "
261 "handling an exception\n"))
262 return
255
263
256 workers = _numworkers(ui)
264 workers = _numworkers(ui)
257 resultqueue = util.queue()
265 resultqueue = util.queue()
@@ -264,25 +272,19 b' def _windowsworker(ui, func, staticargs,'
264 t = Worker(taskqueue, resultqueue, func, staticargs)
272 t = Worker(taskqueue, resultqueue, func, staticargs)
265 threads.append(t)
273 threads.append(t)
266 t.start()
274 t.start()
267
275 try:
268 while len(threads) > 0:
276 while len(threads) > 0:
269 while not resultqueue.empty():
277 while not resultqueue.empty():
270 yield resultqueue.get()
278 yield resultqueue.get()
271 threads[0].join(0.05)
279 threads[0].join(0.05)
272 finishedthreads = [_t for _t in threads if not _t.is_alive()]
280 finishedthreads = [_t for _t in threads if not _t.is_alive()]
273 for t in finishedthreads:
281 for t in finishedthreads:
274 if t.exception is not None:
282 if t.exception is not None:
275 try:
283 raise t.exception
276 killworkers()
284 threads.remove(t)
277 except Exception:
285 except Exception: # re-raises
278 # pass over the workers joining failure. it is more
286 trykillworkers()
279 # important to surface the inital exception than the
287 raise
280 # fact that one of workers may be processing a large
281 # task and does not get to handle the interruption.
282 ui.warn(_("failed to kill worker threads while handling "
283 "an exception"))
284 raise t.exception
285 threads.remove(t)
286 while not resultqueue.empty():
288 while not resultqueue.empty():
287 yield resultqueue.get()
289 yield resultqueue.get()
288
290
General Comments 0
You need to be logged in to leave comments. Login now