##// END OF EJS Templates
futures: switch to absolute and relative imports...
Gregory Szorc -
r37644:0a9c0d34 default
parent child Browse files
Show More
@@ -1,23 +1,27 b''
1 1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 2 # Licensed to PSF under a Contributor Agreement.
3 3
4 4 """Execute computations asynchronously using threads or processes."""
5 5
6 from __future__ import absolute_import
7
6 8 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
7 9
8 from concurrent.futures._base import (FIRST_COMPLETED,
9 FIRST_EXCEPTION,
10 ALL_COMPLETED,
11 CancelledError,
12 TimeoutError,
13 Future,
14 Executor,
15 wait,
16 as_completed)
17 from concurrent.futures.thread import ThreadPoolExecutor
10 from ._base import (
11 FIRST_COMPLETED,
12 FIRST_EXCEPTION,
13 ALL_COMPLETED,
14 CancelledError,
15 TimeoutError,
16 Future,
17 Executor,
18 wait,
19 as_completed,
20 )
21 from .thread import ThreadPoolExecutor
18 22
19 23 try:
20 from concurrent.futures.process import ProcessPoolExecutor
24 from .process import ProcessPoolExecutor
21 25 except ImportError:
22 26 # some platforms don't have multiprocessing
23 27 pass
@@ -1,667 +1,669 b''
1 1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 2 # Licensed to PSF under a Contributor Agreement.
3 3
4 from __future__ import absolute_import
5
4 6 import collections
5 7 import logging
6 8 import threading
7 9 import itertools
8 10 import time
9 11 import types
10 12
11 13 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
12 14
13 15 FIRST_COMPLETED = 'FIRST_COMPLETED'
14 16 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15 17 ALL_COMPLETED = 'ALL_COMPLETED'
16 18 _AS_COMPLETED = '_AS_COMPLETED'
17 19
18 20 # Possible future states (for internal use by the futures package).
19 21 PENDING = 'PENDING'
20 22 RUNNING = 'RUNNING'
21 23 # The future was cancelled by the user...
22 24 CANCELLED = 'CANCELLED'
23 25 # ...and _Waiter.add_cancelled() was called by a worker.
24 26 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
25 27 FINISHED = 'FINISHED'
26 28
27 29 _FUTURE_STATES = [
28 30 PENDING,
29 31 RUNNING,
30 32 CANCELLED,
31 33 CANCELLED_AND_NOTIFIED,
32 34 FINISHED
33 35 ]
34 36
35 37 _STATE_TO_DESCRIPTION_MAP = {
36 38 PENDING: "pending",
37 39 RUNNING: "running",
38 40 CANCELLED: "cancelled",
39 41 CANCELLED_AND_NOTIFIED: "cancelled",
40 42 FINISHED: "finished"
41 43 }
42 44
43 45 # Logger for internal use by the futures package.
44 46 LOGGER = logging.getLogger("concurrent.futures")
45 47
46 48 class Error(Exception):
47 49 """Base class for all future-related exceptions."""
48 50 pass
49 51
50 52 class CancelledError(Error):
51 53 """The Future was cancelled."""
52 54 pass
53 55
54 56 class TimeoutError(Error):
55 57 """The operation exceeded the given deadline."""
56 58 pass
57 59
58 60 class _Waiter(object):
59 61 """Provides the event that wait() and as_completed() block on."""
60 62 def __init__(self):
61 63 self.event = threading.Event()
62 64 self.finished_futures = []
63 65
64 66 def add_result(self, future):
65 67 self.finished_futures.append(future)
66 68
67 69 def add_exception(self, future):
68 70 self.finished_futures.append(future)
69 71
70 72 def add_cancelled(self, future):
71 73 self.finished_futures.append(future)
72 74
73 75 class _AsCompletedWaiter(_Waiter):
74 76 """Used by as_completed()."""
75 77
76 78 def __init__(self):
77 79 super(_AsCompletedWaiter, self).__init__()
78 80 self.lock = threading.Lock()
79 81
80 82 def add_result(self, future):
81 83 with self.lock:
82 84 super(_AsCompletedWaiter, self).add_result(future)
83 85 self.event.set()
84 86
85 87 def add_exception(self, future):
86 88 with self.lock:
87 89 super(_AsCompletedWaiter, self).add_exception(future)
88 90 self.event.set()
89 91
90 92 def add_cancelled(self, future):
91 93 with self.lock:
92 94 super(_AsCompletedWaiter, self).add_cancelled(future)
93 95 self.event.set()
94 96
95 97 class _FirstCompletedWaiter(_Waiter):
96 98 """Used by wait(return_when=FIRST_COMPLETED)."""
97 99
98 100 def add_result(self, future):
99 101 super(_FirstCompletedWaiter, self).add_result(future)
100 102 self.event.set()
101 103
102 104 def add_exception(self, future):
103 105 super(_FirstCompletedWaiter, self).add_exception(future)
104 106 self.event.set()
105 107
106 108 def add_cancelled(self, future):
107 109 super(_FirstCompletedWaiter, self).add_cancelled(future)
108 110 self.event.set()
109 111
110 112 class _AllCompletedWaiter(_Waiter):
111 113 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
112 114
113 115 def __init__(self, num_pending_calls, stop_on_exception):
114 116 self.num_pending_calls = num_pending_calls
115 117 self.stop_on_exception = stop_on_exception
116 118 self.lock = threading.Lock()
117 119 super(_AllCompletedWaiter, self).__init__()
118 120
119 121 def _decrement_pending_calls(self):
120 122 with self.lock:
121 123 self.num_pending_calls -= 1
122 124 if not self.num_pending_calls:
123 125 self.event.set()
124 126
125 127 def add_result(self, future):
126 128 super(_AllCompletedWaiter, self).add_result(future)
127 129 self._decrement_pending_calls()
128 130
129 131 def add_exception(self, future):
130 132 super(_AllCompletedWaiter, self).add_exception(future)
131 133 if self.stop_on_exception:
132 134 self.event.set()
133 135 else:
134 136 self._decrement_pending_calls()
135 137
136 138 def add_cancelled(self, future):
137 139 super(_AllCompletedWaiter, self).add_cancelled(future)
138 140 self._decrement_pending_calls()
139 141
140 142 class _AcquireFutures(object):
141 143 """A context manager that does an ordered acquire of Future conditions."""
142 144
143 145 def __init__(self, futures):
144 146 self.futures = sorted(futures, key=id)
145 147
146 148 def __enter__(self):
147 149 for future in self.futures:
148 150 future._condition.acquire()
149 151
150 152 def __exit__(self, *args):
151 153 for future in self.futures:
152 154 future._condition.release()
153 155
154 156 def _create_and_install_waiters(fs, return_when):
155 157 if return_when == _AS_COMPLETED:
156 158 waiter = _AsCompletedWaiter()
157 159 elif return_when == FIRST_COMPLETED:
158 160 waiter = _FirstCompletedWaiter()
159 161 else:
160 162 pending_count = sum(
161 163 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
162 164
163 165 if return_when == FIRST_EXCEPTION:
164 166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165 167 elif return_when == ALL_COMPLETED:
166 168 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
167 169 else:
168 170 raise ValueError("Invalid return condition: %r" % return_when)
169 171
170 172 for f in fs:
171 173 f._waiters.append(waiter)
172 174
173 175 return waiter
174 176
175 177
176 178 def _yield_finished_futures(fs, waiter, ref_collect):
177 179 """
178 180 Iterate on the list *fs*, yielding finished futures one by one in
179 181 reverse order.
180 182 Before yielding a future, *waiter* is removed from its waiters
181 183 and the future is removed from each set in the collection of sets
182 184 *ref_collect*.
183 185
184 186 The aim of this function is to avoid keeping stale references after
185 187 the future is yielded and before the iterator resumes.
186 188 """
187 189 while fs:
188 190 f = fs[-1]
189 191 for futures_set in ref_collect:
190 192 futures_set.remove(f)
191 193 with f._condition:
192 194 f._waiters.remove(waiter)
193 195 del f
194 196 # Careful not to keep a reference to the popped value
195 197 yield fs.pop()
196 198
197 199
198 200 def as_completed(fs, timeout=None):
199 201 """An iterator over the given futures that yields each as it completes.
200 202
201 203 Args:
202 204 fs: The sequence of Futures (possibly created by different Executors) to
203 205 iterate over.
204 206 timeout: The maximum number of seconds to wait. If None, then there
205 207 is no limit on the wait time.
206 208
207 209 Returns:
208 210 An iterator that yields the given Futures as they complete (finished or
209 211 cancelled). If any given Futures are duplicated, they will be returned
210 212 once.
211 213
212 214 Raises:
213 215 TimeoutError: If the entire result iterator could not be generated
214 216 before the given timeout.
215 217 """
216 218 if timeout is not None:
217 219 end_time = timeout + time.time()
218 220
219 221 fs = set(fs)
220 222 total_futures = len(fs)
221 223 with _AcquireFutures(fs):
222 224 finished = set(
223 225 f for f in fs
224 226 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225 227 pending = fs - finished
226 228 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227 229 finished = list(finished)
228 230 try:
229 231 for f in _yield_finished_futures(finished, waiter,
230 232 ref_collect=(fs,)):
231 233 f = [f]
232 234 yield f.pop()
233 235
234 236 while pending:
235 237 if timeout is None:
236 238 wait_timeout = None
237 239 else:
238 240 wait_timeout = end_time - time.time()
239 241 if wait_timeout < 0:
240 242 raise TimeoutError(
241 243 '%d (of %d) futures unfinished' % (
242 244 len(pending), total_futures))
243 245
244 246 waiter.event.wait(wait_timeout)
245 247
246 248 with waiter.lock:
247 249 finished = waiter.finished_futures
248 250 waiter.finished_futures = []
249 251 waiter.event.clear()
250 252
251 253 # reverse to keep finishing order
252 254 finished.reverse()
253 255 for f in _yield_finished_futures(finished, waiter,
254 256 ref_collect=(fs, pending)):
255 257 f = [f]
256 258 yield f.pop()
257 259
258 260 finally:
259 261 # Remove waiter from unfinished futures
260 262 for f in fs:
261 263 with f._condition:
262 264 f._waiters.remove(waiter)
263 265
264 266 DoneAndNotDoneFutures = collections.namedtuple(
265 267 'DoneAndNotDoneFutures', 'done not_done')
266 268 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267 269 """Wait for the futures in the given sequence to complete.
268 270
269 271 Args:
270 272 fs: The sequence of Futures (possibly created by different Executors) to
271 273 wait upon.
272 274 timeout: The maximum number of seconds to wait. If None, then there
273 275 is no limit on the wait time.
274 276 return_when: Indicates when this function should return. The options
275 277 are:
276 278
277 279 FIRST_COMPLETED - Return when any future finishes or is
278 280 cancelled.
279 281 FIRST_EXCEPTION - Return when any future finishes by raising an
280 282 exception. If no future raises an exception
281 283 then it is equivalent to ALL_COMPLETED.
282 284 ALL_COMPLETED - Return when all futures finish or are cancelled.
283 285
284 286 Returns:
285 287 A named 2-tuple of sets. The first set, named 'done', contains the
286 288 futures that completed (is finished or cancelled) before the wait
287 289 completed. The second set, named 'not_done', contains uncompleted
288 290 futures.
289 291 """
290 292 with _AcquireFutures(fs):
291 293 done = set(f for f in fs
292 294 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293 295 not_done = set(fs) - done
294 296
295 297 if (return_when == FIRST_COMPLETED) and done:
296 298 return DoneAndNotDoneFutures(done, not_done)
297 299 elif (return_when == FIRST_EXCEPTION) and done:
298 300 if any(f for f in done
299 301 if not f.cancelled() and f.exception() is not None):
300 302 return DoneAndNotDoneFutures(done, not_done)
301 303
302 304 if len(done) == len(fs):
303 305 return DoneAndNotDoneFutures(done, not_done)
304 306
305 307 waiter = _create_and_install_waiters(fs, return_when)
306 308
307 309 waiter.event.wait(timeout)
308 310 for f in fs:
309 311 with f._condition:
310 312 f._waiters.remove(waiter)
311 313
312 314 done.update(waiter.finished_futures)
313 315 return DoneAndNotDoneFutures(done, set(fs) - done)
314 316
315 317 class Future(object):
316 318 """Represents the result of an asynchronous computation."""
317 319
318 320 def __init__(self):
319 321 """Initializes the future. Should not be called by clients."""
320 322 self._condition = threading.Condition()
321 323 self._state = PENDING
322 324 self._result = None
323 325 self._exception = None
324 326 self._traceback = None
325 327 self._waiters = []
326 328 self._done_callbacks = []
327 329
328 330 def _invoke_callbacks(self):
329 331 for callback in self._done_callbacks:
330 332 try:
331 333 callback(self)
332 334 except Exception:
333 335 LOGGER.exception('exception calling callback for %r', self)
334 336 except BaseException:
335 337 # Explicitly let all other new-style exceptions through so
336 338 # that we can catch all old-style exceptions with a simple
337 339 # "except:" clause below.
338 340 #
339 341 # All old-style exception objects are instances of
340 342 # types.InstanceType, but "except types.InstanceType:" does
341 343 # not catch old-style exceptions for some reason. Thus, the
342 344 # only way to catch all old-style exceptions without catching
343 345 # any new-style exceptions is to filter out the new-style
344 346 # exceptions, which all derive from BaseException.
345 347 raise
346 348 except:
347 349 # Because of the BaseException clause above, this handler only
348 350 # executes for old-style exception objects.
349 351 LOGGER.exception('exception calling callback for %r', self)
350 352
351 353 def __repr__(self):
352 354 with self._condition:
353 355 if self._state == FINISHED:
354 356 if self._exception:
355 357 return '<%s at %#x state=%s raised %s>' % (
356 358 self.__class__.__name__,
357 359 id(self),
358 360 _STATE_TO_DESCRIPTION_MAP[self._state],
359 361 self._exception.__class__.__name__)
360 362 else:
361 363 return '<%s at %#x state=%s returned %s>' % (
362 364 self.__class__.__name__,
363 365 id(self),
364 366 _STATE_TO_DESCRIPTION_MAP[self._state],
365 367 self._result.__class__.__name__)
366 368 return '<%s at %#x state=%s>' % (
367 369 self.__class__.__name__,
368 370 id(self),
369 371 _STATE_TO_DESCRIPTION_MAP[self._state])
370 372
371 373 def cancel(self):
372 374 """Cancel the future if possible.
373 375
374 376 Returns True if the future was cancelled, False otherwise. A future
375 377 cannot be cancelled if it is running or has already completed.
376 378 """
377 379 with self._condition:
378 380 if self._state in [RUNNING, FINISHED]:
379 381 return False
380 382
381 383 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
382 384 return True
383 385
384 386 self._state = CANCELLED
385 387 self._condition.notify_all()
386 388
387 389 self._invoke_callbacks()
388 390 return True
389 391
390 392 def cancelled(self):
391 393 """Return True if the future was cancelled."""
392 394 with self._condition:
393 395 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
394 396
395 397 def running(self):
396 398 """Return True if the future is currently executing."""
397 399 with self._condition:
398 400 return self._state == RUNNING
399 401
400 402 def done(self):
401 403 """Return True of the future was cancelled or finished executing."""
402 404 with self._condition:
403 405 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
404 406
405 407 def __get_result(self):
406 408 if self._exception:
407 409 if isinstance(self._exception, types.InstanceType):
408 410 # The exception is an instance of an old-style class, which
409 411 # means type(self._exception) returns types.ClassType instead
410 412 # of the exception's actual class type.
411 413 exception_type = self._exception.__class__
412 414 else:
413 415 exception_type = type(self._exception)
414 416 raise exception_type, self._exception, self._traceback
415 417 else:
416 418 return self._result
417 419
418 420 def add_done_callback(self, fn):
419 421 """Attaches a callable that will be called when the future finishes.
420 422
421 423 Args:
422 424 fn: A callable that will be called with this future as its only
423 425 argument when the future completes or is cancelled. The callable
424 426 will always be called by a thread in the same process in which
425 427 it was added. If the future has already completed or been
426 428 cancelled then the callable will be called immediately. These
427 429 callables are called in the order that they were added.
428 430 """
429 431 with self._condition:
430 432 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431 433 self._done_callbacks.append(fn)
432 434 return
433 435 fn(self)
434 436
435 437 def result(self, timeout=None):
436 438 """Return the result of the call that the future represents.
437 439
438 440 Args:
439 441 timeout: The number of seconds to wait for the result if the future
440 442 isn't done. If None, then there is no limit on the wait time.
441 443
442 444 Returns:
443 445 The result of the call that the future represents.
444 446
445 447 Raises:
446 448 CancelledError: If the future was cancelled.
447 449 TimeoutError: If the future didn't finish executing before the given
448 450 timeout.
449 451 Exception: If the call raised then that exception will be raised.
450 452 """
451 453 with self._condition:
452 454 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453 455 raise CancelledError()
454 456 elif self._state == FINISHED:
455 457 return self.__get_result()
456 458
457 459 self._condition.wait(timeout)
458 460
459 461 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460 462 raise CancelledError()
461 463 elif self._state == FINISHED:
462 464 return self.__get_result()
463 465 else:
464 466 raise TimeoutError()
465 467
466 468 def exception_info(self, timeout=None):
467 469 """Return a tuple of (exception, traceback) raised by the call that the
468 470 future represents.
469 471
470 472 Args:
471 473 timeout: The number of seconds to wait for the exception if the
472 474 future isn't done. If None, then there is no limit on the wait
473 475 time.
474 476
475 477 Returns:
476 478 The exception raised by the call that the future represents or None
477 479 if the call completed without raising.
478 480
479 481 Raises:
480 482 CancelledError: If the future was cancelled.
481 483 TimeoutError: If the future didn't finish executing before the given
482 484 timeout.
483 485 """
484 486 with self._condition:
485 487 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486 488 raise CancelledError()
487 489 elif self._state == FINISHED:
488 490 return self._exception, self._traceback
489 491
490 492 self._condition.wait(timeout)
491 493
492 494 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493 495 raise CancelledError()
494 496 elif self._state == FINISHED:
495 497 return self._exception, self._traceback
496 498 else:
497 499 raise TimeoutError()
498 500
499 501 def exception(self, timeout=None):
500 502 """Return the exception raised by the call that the future represents.
501 503
502 504 Args:
503 505 timeout: The number of seconds to wait for the exception if the
504 506 future isn't done. If None, then there is no limit on the wait
505 507 time.
506 508
507 509 Returns:
508 510 The exception raised by the call that the future represents or None
509 511 if the call completed without raising.
510 512
511 513 Raises:
512 514 CancelledError: If the future was cancelled.
513 515 TimeoutError: If the future didn't finish executing before the given
514 516 timeout.
515 517 """
516 518 return self.exception_info(timeout)[0]
517 519
518 520 # The following methods should only be used by Executors and in tests.
519 521 def set_running_or_notify_cancel(self):
520 522 """Mark the future as running or process any cancel notifications.
521 523
522 524 Should only be used by Executor implementations and unit tests.
523 525
524 526 If the future has been cancelled (cancel() was called and returned
525 527 True) then any threads waiting on the future completing (though calls
526 528 to as_completed() or wait()) are notified and False is returned.
527 529
528 530 If the future was not cancelled then it is put in the running state
529 531 (future calls to running() will return True) and True is returned.
530 532
531 533 This method should be called by Executor implementations before
532 534 executing the work associated with this future. If this method returns
533 535 False then the work should not be executed.
534 536
535 537 Returns:
536 538 False if the Future was cancelled, True otherwise.
537 539
538 540 Raises:
539 541 RuntimeError: if this method was already called or if set_result()
540 542 or set_exception() was called.
541 543 """
542 544 with self._condition:
543 545 if self._state == CANCELLED:
544 546 self._state = CANCELLED_AND_NOTIFIED
545 547 for waiter in self._waiters:
546 548 waiter.add_cancelled(self)
547 549 # self._condition.notify_all() is not necessary because
548 550 # self.cancel() triggers a notification.
549 551 return False
550 552 elif self._state == PENDING:
551 553 self._state = RUNNING
552 554 return True
553 555 else:
554 556 LOGGER.critical('Future %s in unexpected state: %s',
555 557 id(self),
556 558 self._state)
557 559 raise RuntimeError('Future in unexpected state')
558 560
559 561 def set_result(self, result):
560 562 """Sets the return value of work associated with the future.
561 563
562 564 Should only be used by Executor implementations and unit tests.
563 565 """
564 566 with self._condition:
565 567 self._result = result
566 568 self._state = FINISHED
567 569 for waiter in self._waiters:
568 570 waiter.add_result(self)
569 571 self._condition.notify_all()
570 572 self._invoke_callbacks()
571 573
572 574 def set_exception_info(self, exception, traceback):
573 575 """Sets the result of the future as being the given exception
574 576 and traceback.
575 577
576 578 Should only be used by Executor implementations and unit tests.
577 579 """
578 580 with self._condition:
579 581 self._exception = exception
580 582 self._traceback = traceback
581 583 self._state = FINISHED
582 584 for waiter in self._waiters:
583 585 waiter.add_exception(self)
584 586 self._condition.notify_all()
585 587 self._invoke_callbacks()
586 588
587 589 def set_exception(self, exception):
588 590 """Sets the result of the future as being the given exception.
589 591
590 592 Should only be used by Executor implementations and unit tests.
591 593 """
592 594 self.set_exception_info(exception, None)
593 595
594 596 class Executor(object):
595 597 """This is an abstract base class for concrete asynchronous executors."""
596 598
597 599 def submit(self, fn, *args, **kwargs):
598 600 """Submits a callable to be executed with the given arguments.
599 601
600 602 Schedules the callable to be executed as fn(*args, **kwargs) and returns
601 603 a Future instance representing the execution of the callable.
602 604
603 605 Returns:
604 606 A Future representing the given call.
605 607 """
606 608 raise NotImplementedError()
607 609
608 610 def map(self, fn, *iterables, **kwargs):
609 611 """Returns an iterator equivalent to map(fn, iter).
610 612
611 613 Args:
612 614 fn: A callable that will take as many arguments as there are
613 615 passed iterables.
614 616 timeout: The maximum number of seconds to wait. If None, then there
615 617 is no limit on the wait time.
616 618
617 619 Returns:
618 620 An iterator equivalent to: map(func, *iterables) but the calls may
619 621 be evaluated out-of-order.
620 622
621 623 Raises:
622 624 TimeoutError: If the entire result iterator could not be generated
623 625 before the given timeout.
624 626 Exception: If fn(*args) raises for any values.
625 627 """
626 628 timeout = kwargs.get('timeout')
627 629 if timeout is not None:
628 630 end_time = timeout + time.time()
629 631
630 632 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
631 633
632 634 # Yield must be hidden in closure so that the futures are submitted
633 635 # before the first iterator value is required.
634 636 def result_iterator():
635 637 try:
636 638 # reverse to keep finishing order
637 639 fs.reverse()
638 640 while fs:
639 641 # Careful not to keep a reference to the popped future
640 642 if timeout is None:
641 643 yield fs.pop().result()
642 644 else:
643 645 yield fs.pop().result(end_time - time.time())
644 646 finally:
645 647 for future in fs:
646 648 future.cancel()
647 649 return result_iterator()
648 650
649 651 def shutdown(self, wait=True):
650 652 """Clean-up the resources associated with the Executor.
651 653
652 654 It is safe to call this method several times. Otherwise, no other
653 655 methods can be called after this one.
654 656
655 657 Args:
656 658 wait: If True then shutdown will not return until all running
657 659 futures have finished executing and the resources used by the
658 660 executor have been reclaimed.
659 661 """
660 662 pass
661 663
662 664 def __enter__(self):
663 665 return self
664 666
665 667 def __exit__(self, exc_type, exc_val, exc_tb):
666 668 self.shutdown(wait=True)
667 669 return False
@@ -1,363 +1,365 b''
1 1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 2 # Licensed to PSF under a Contributor Agreement.
3 3
4 4 """Implements ProcessPoolExecutor.
5 5
6 6 The follow diagram and text describe the data-flow through the system:
7 7
8 8 |======================= In-process =====================|== Out-of-process ==|
9 9
10 10 +----------+ +----------+ +--------+ +-----------+ +---------+
11 11 | | => | Work Ids | => | | => | Call Q | => | |
12 12 | | +----------+ | | +-----------+ | |
13 13 | | | ... | | | | ... | | |
14 14 | | | 6 | | | | 5, call() | | |
15 15 | | | 7 | | | | ... | | |
16 16 | Process | | ... | | Local | +-----------+ | Process |
17 17 | Pool | +----------+ | Worker | | #1..n |
18 18 | Executor | | Thread | | |
19 19 | | +----------- + | | +-----------+ | |
20 20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
21 21 | | +------------+ | | +-----------+ | |
22 22 | | | 6: call() | | | | ... | | |
23 23 | | | future | | | | 4, result | | |
24 24 | | | ... | | | | 3, except | | |
25 25 +----------+ +------------+ +--------+ +-----------+ +---------+
26 26
27 27 Executor.submit() called:
28 28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29 29 - adds the id of the _WorkItem to the "Work Ids" queue
30 30
31 31 Local worker thread:
32 32 - reads work ids from the "Work Ids" queue and looks up the corresponding
33 33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 34 it is simply removed from the dict, otherwise it is repackaged as a
35 35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38 38 - reads _ResultItems from "Result Q", updates the future stored in the
39 39 "Work Items" dict and deletes the dict entry
40 40
41 41 Process #1..n:
42 42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 43 _ResultItems in "Request Q"
44 44 """
45 45
46 from __future__ import absolute_import
47
46 48 import atexit
47 from concurrent.futures import _base
49 from . import _base
48 50 import Queue as queue
49 51 import multiprocessing
50 52 import threading
51 53 import weakref
52 54 import sys
53 55
54 56 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
55 57
56 58 # Workers are created as daemon threads and processes. This is done to allow the
57 59 # interpreter to exit when there are still idle processes in a
58 60 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
59 61 # allowing workers to die with the interpreter has two undesirable properties:
60 62 # - The workers would still be running during interpretor shutdown,
61 63 # meaning that they would fail in unpredictable ways.
62 64 # - The workers could be killed while evaluating a work item, which could
63 65 # be bad if the callable being evaluated has external side-effects e.g.
64 66 # writing to a file.
65 67 #
66 68 # To work around this problem, an exit handler is installed which tells the
67 69 # workers to exit when their work queues are empty and then waits until the
68 70 # threads/processes finish.
69 71
70 72 _threads_queues = weakref.WeakKeyDictionary()
71 73 _shutdown = False
72 74
73 75 def _python_exit():
74 76 global _shutdown
75 77 _shutdown = True
76 78 items = list(_threads_queues.items()) if _threads_queues else ()
77 79 for t, q in items:
78 80 q.put(None)
79 81 for t, q in items:
80 82 t.join(sys.maxint)
81 83
82 84 # Controls how many more calls than processes will be queued in the call queue.
83 85 # A smaller number will mean that processes spend more time idle waiting for
84 86 # work while a larger number will make Future.cancel() succeed less frequently
85 87 # (Futures in the call queue cannot be cancelled).
86 88 EXTRA_QUEUED_CALLS = 1
87 89
88 90 class _WorkItem(object):
89 91 def __init__(self, future, fn, args, kwargs):
90 92 self.future = future
91 93 self.fn = fn
92 94 self.args = args
93 95 self.kwargs = kwargs
94 96
95 97 class _ResultItem(object):
96 98 def __init__(self, work_id, exception=None, result=None):
97 99 self.work_id = work_id
98 100 self.exception = exception
99 101 self.result = result
100 102
101 103 class _CallItem(object):
102 104 def __init__(self, work_id, fn, args, kwargs):
103 105 self.work_id = work_id
104 106 self.fn = fn
105 107 self.args = args
106 108 self.kwargs = kwargs
107 109
108 110 def _process_worker(call_queue, result_queue):
109 111 """Evaluates calls from call_queue and places the results in result_queue.
110 112
111 113 This worker is run in a separate process.
112 114
113 115 Args:
114 116 call_queue: A multiprocessing.Queue of _CallItems that will be read and
115 117 evaluated by the worker.
116 118 result_queue: A multiprocessing.Queue of _ResultItems that will written
117 119 to by the worker.
118 120 shutdown: A multiprocessing.Event that will be set as a signal to the
119 121 worker that it should exit when call_queue is empty.
120 122 """
121 123 while True:
122 124 call_item = call_queue.get(block=True)
123 125 if call_item is None:
124 126 # Wake up queue management thread
125 127 result_queue.put(None)
126 128 return
127 129 try:
128 130 r = call_item.fn(*call_item.args, **call_item.kwargs)
129 131 except:
130 132 e = sys.exc_info()[1]
131 133 result_queue.put(_ResultItem(call_item.work_id,
132 134 exception=e))
133 135 else:
134 136 result_queue.put(_ResultItem(call_item.work_id,
135 137 result=r))
136 138
137 139 def _add_call_item_to_queue(pending_work_items,
138 140 work_ids,
139 141 call_queue):
140 142 """Fills call_queue with _WorkItems from pending_work_items.
141 143
142 144 This function never blocks.
143 145
144 146 Args:
145 147 pending_work_items: A dict mapping work ids to _WorkItems e.g.
146 148 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
147 149 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
148 150 are consumed and the corresponding _WorkItems from
149 151 pending_work_items are transformed into _CallItems and put in
150 152 call_queue.
151 153 call_queue: A multiprocessing.Queue that will be filled with _CallItems
152 154 derived from _WorkItems.
153 155 """
154 156 while True:
155 157 if call_queue.full():
156 158 return
157 159 try:
158 160 work_id = work_ids.get(block=False)
159 161 except queue.Empty:
160 162 return
161 163 else:
162 164 work_item = pending_work_items[work_id]
163 165
164 166 if work_item.future.set_running_or_notify_cancel():
165 167 call_queue.put(_CallItem(work_id,
166 168 work_item.fn,
167 169 work_item.args,
168 170 work_item.kwargs),
169 171 block=True)
170 172 else:
171 173 del pending_work_items[work_id]
172 174 continue
173 175
174 176 def _queue_management_worker(executor_reference,
175 177 processes,
176 178 pending_work_items,
177 179 work_ids_queue,
178 180 call_queue,
179 181 result_queue):
180 182 """Manages the communication between this process and the worker processes.
181 183
182 184 This function is run in a local thread.
183 185
184 186 Args:
185 187 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
186 188 this thread. Used to determine if the ProcessPoolExecutor has been
187 189 garbage collected and that this function can exit.
188 190 process: A list of the multiprocessing.Process instances used as
189 191 workers.
190 192 pending_work_items: A dict mapping work ids to _WorkItems e.g.
191 193 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192 194 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193 195 call_queue: A multiprocessing.Queue that will be filled with _CallItems
194 196 derived from _WorkItems for processing by the process workers.
195 197 result_queue: A multiprocessing.Queue of _ResultItems generated by the
196 198 process workers.
197 199 """
198 200 nb_shutdown_processes = [0]
199 201 def shutdown_one_process():
200 202 """Tell a worker to terminate, which will in turn wake us again"""
201 203 call_queue.put(None)
202 204 nb_shutdown_processes[0] += 1
203 205 while True:
204 206 _add_call_item_to_queue(pending_work_items,
205 207 work_ids_queue,
206 208 call_queue)
207 209
208 210 result_item = result_queue.get(block=True)
209 211 if result_item is not None:
210 212 work_item = pending_work_items[result_item.work_id]
211 213 del pending_work_items[result_item.work_id]
212 214
213 215 if result_item.exception:
214 216 work_item.future.set_exception(result_item.exception)
215 217 else:
216 218 work_item.future.set_result(result_item.result)
217 219 # Delete references to object. See issue16284
218 220 del work_item
219 221 # Check whether we should start shutting down.
220 222 executor = executor_reference()
221 223 # No more work items can be added if:
222 224 # - The interpreter is shutting down OR
223 225 # - The executor that owns this worker has been collected OR
224 226 # - The executor that owns this worker has been shutdown.
225 227 if _shutdown or executor is None or executor._shutdown_thread:
226 228 # Since no new work items can be added, it is safe to shutdown
227 229 # this thread if there are no pending work items.
228 230 if not pending_work_items:
229 231 while nb_shutdown_processes[0] < len(processes):
230 232 shutdown_one_process()
231 233 # If .join() is not called on the created processes then
232 234 # some multiprocessing.Queue methods may deadlock on Mac OS
233 235 # X.
234 236 for p in processes:
235 237 p.join()
236 238 call_queue.close()
237 239 return
238 240 del executor
239 241
240 242 _system_limits_checked = False
241 243 _system_limited = None
242 244 def _check_system_limits():
243 245 global _system_limits_checked, _system_limited
244 246 if _system_limits_checked:
245 247 if _system_limited:
246 248 raise NotImplementedError(_system_limited)
247 249 _system_limits_checked = True
248 250 try:
249 251 import os
250 252 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
251 253 except (AttributeError, ValueError):
252 254 # sysconf not available or setting not available
253 255 return
254 256 if nsems_max == -1:
255 257 # indetermine limit, assume that limit is determined
256 258 # by available memory only
257 259 return
258 260 if nsems_max >= 256:
259 261 # minimum number of semaphores available
260 262 # according to POSIX
261 263 return
262 264 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
263 265 raise NotImplementedError(_system_limited)
264 266
265 267
266 268 class ProcessPoolExecutor(_base.Executor):
267 269 def __init__(self, max_workers=None):
268 270 """Initializes a new ProcessPoolExecutor instance.
269 271
270 272 Args:
271 273 max_workers: The maximum number of processes that can be used to
272 274 execute the given calls. If None or not given then as many
273 275 worker processes will be created as the machine has processors.
274 276 """
275 277 _check_system_limits()
276 278
277 279 if max_workers is None:
278 280 self._max_workers = multiprocessing.cpu_count()
279 281 else:
280 282 if max_workers <= 0:
281 283 raise ValueError("max_workers must be greater than 0")
282 284
283 285 self._max_workers = max_workers
284 286
285 287 # Make the call queue slightly larger than the number of processes to
286 288 # prevent the worker processes from idling. But don't make it too big
287 289 # because futures in the call queue cannot be cancelled.
288 290 self._call_queue = multiprocessing.Queue(self._max_workers +
289 291 EXTRA_QUEUED_CALLS)
290 292 self._result_queue = multiprocessing.Queue()
291 293 self._work_ids = queue.Queue()
292 294 self._queue_management_thread = None
293 295 self._processes = set()
294 296
295 297 # Shutdown is a two-step process.
296 298 self._shutdown_thread = False
297 299 self._shutdown_lock = threading.Lock()
298 300 self._queue_count = 0
299 301 self._pending_work_items = {}
300 302
301 303 def _start_queue_management_thread(self):
302 304 # When the executor gets lost, the weakref callback will wake up
303 305 # the queue management thread.
304 306 def weakref_cb(_, q=self._result_queue):
305 307 q.put(None)
306 308 if self._queue_management_thread is None:
307 309 self._queue_management_thread = threading.Thread(
308 310 target=_queue_management_worker,
309 311 args=(weakref.ref(self, weakref_cb),
310 312 self._processes,
311 313 self._pending_work_items,
312 314 self._work_ids,
313 315 self._call_queue,
314 316 self._result_queue))
315 317 self._queue_management_thread.daemon = True
316 318 self._queue_management_thread.start()
317 319 _threads_queues[self._queue_management_thread] = self._result_queue
318 320
319 321 def _adjust_process_count(self):
320 322 for _ in range(len(self._processes), self._max_workers):
321 323 p = multiprocessing.Process(
322 324 target=_process_worker,
323 325 args=(self._call_queue,
324 326 self._result_queue))
325 327 p.start()
326 328 self._processes.add(p)
327 329
328 330 def submit(self, fn, *args, **kwargs):
329 331 with self._shutdown_lock:
330 332 if self._shutdown_thread:
331 333 raise RuntimeError('cannot schedule new futures after shutdown')
332 334
333 335 f = _base.Future()
334 336 w = _WorkItem(f, fn, args, kwargs)
335 337
336 338 self._pending_work_items[self._queue_count] = w
337 339 self._work_ids.put(self._queue_count)
338 340 self._queue_count += 1
339 341 # Wake up queue management thread
340 342 self._result_queue.put(None)
341 343
342 344 self._start_queue_management_thread()
343 345 self._adjust_process_count()
344 346 return f
345 347 submit.__doc__ = _base.Executor.submit.__doc__
346 348
347 349 def shutdown(self, wait=True):
348 350 with self._shutdown_lock:
349 351 self._shutdown_thread = True
350 352 if self._queue_management_thread:
351 353 # Wake up queue management thread
352 354 self._result_queue.put(None)
353 355 if wait:
354 356 self._queue_management_thread.join(sys.maxint)
355 357 # To reduce the risk of openning too many files, remove references to
356 358 # objects that use file descriptors.
357 359 self._queue_management_thread = None
358 360 self._call_queue = None
359 361 self._result_queue = None
360 362 self._processes = None
361 363 shutdown.__doc__ = _base.Executor.shutdown.__doc__
362 364
363 365 atexit.register(_python_exit)
@@ -1,160 +1,162 b''
1 1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 2 # Licensed to PSF under a Contributor Agreement.
3 3
4 4 """Implements ThreadPoolExecutor."""
5 5
6 from __future__ import absolute_import
7
6 8 import atexit
7 from concurrent.futures import _base
9 from . import _base
8 10 import itertools
9 11 import Queue as queue
10 12 import threading
11 13 import weakref
12 14 import sys
13 15
14 16 try:
15 17 from multiprocessing import cpu_count
16 18 except ImportError:
17 19 # some platforms don't have multiprocessing
18 20 def cpu_count():
19 21 return None
20 22
21 23 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
22 24
23 25 # Workers are created as daemon threads. This is done to allow the interpreter
24 26 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
25 27 # pool (i.e. shutdown() was not called). However, allowing workers to die with
26 28 # the interpreter has two undesirable properties:
27 29 # - The workers would still be running during interpretor shutdown,
28 30 # meaning that they would fail in unpredictable ways.
29 31 # - The workers could be killed while evaluating a work item, which could
30 32 # be bad if the callable being evaluated has external side-effects e.g.
31 33 # writing to a file.
32 34 #
33 35 # To work around this problem, an exit handler is installed which tells the
34 36 # workers to exit when their work queues are empty and then waits until the
35 37 # threads finish.
36 38
37 39 _threads_queues = weakref.WeakKeyDictionary()
38 40 _shutdown = False
39 41
40 42 def _python_exit():
41 43 global _shutdown
42 44 _shutdown = True
43 45 items = list(_threads_queues.items()) if _threads_queues else ()
44 46 for t, q in items:
45 47 q.put(None)
46 48 for t, q in items:
47 49 t.join(sys.maxint)
48 50
49 51 atexit.register(_python_exit)
50 52
51 53 class _WorkItem(object):
52 54 def __init__(self, future, fn, args, kwargs):
53 55 self.future = future
54 56 self.fn = fn
55 57 self.args = args
56 58 self.kwargs = kwargs
57 59
58 60 def run(self):
59 61 if not self.future.set_running_or_notify_cancel():
60 62 return
61 63
62 64 try:
63 65 result = self.fn(*self.args, **self.kwargs)
64 66 except:
65 67 e, tb = sys.exc_info()[1:]
66 68 self.future.set_exception_info(e, tb)
67 69 else:
68 70 self.future.set_result(result)
69 71
70 72 def _worker(executor_reference, work_queue):
71 73 try:
72 74 while True:
73 75 work_item = work_queue.get(block=True)
74 76 if work_item is not None:
75 77 work_item.run()
76 78 # Delete references to object. See issue16284
77 79 del work_item
78 80 continue
79 81 executor = executor_reference()
80 82 # Exit if:
81 83 # - The interpreter is shutting down OR
82 84 # - The executor that owns the worker has been collected OR
83 85 # - The executor that owns the worker has been shutdown.
84 86 if _shutdown or executor is None or executor._shutdown:
85 87 # Notice other workers
86 88 work_queue.put(None)
87 89 return
88 90 del executor
89 91 except:
90 92 _base.LOGGER.critical('Exception in worker', exc_info=True)
91 93
92 94
93 95 class ThreadPoolExecutor(_base.Executor):
94 96
95 97 # Used to assign unique thread names when thread_name_prefix is not supplied.
96 98 _counter = itertools.count().next
97 99
98 100 def __init__(self, max_workers=None, thread_name_prefix=''):
99 101 """Initializes a new ThreadPoolExecutor instance.
100 102
101 103 Args:
102 104 max_workers: The maximum number of threads that can be used to
103 105 execute the given calls.
104 106 thread_name_prefix: An optional name prefix to give our threads.
105 107 """
106 108 if max_workers is None:
107 109 # Use this number because ThreadPoolExecutor is often
108 110 # used to overlap I/O instead of CPU work.
109 111 max_workers = (cpu_count() or 1) * 5
110 112 if max_workers <= 0:
111 113 raise ValueError("max_workers must be greater than 0")
112 114
113 115 self._max_workers = max_workers
114 116 self._work_queue = queue.Queue()
115 117 self._threads = set()
116 118 self._shutdown = False
117 119 self._shutdown_lock = threading.Lock()
118 120 self._thread_name_prefix = (thread_name_prefix or
119 121 ("ThreadPoolExecutor-%d" % self._counter()))
120 122
121 123 def submit(self, fn, *args, **kwargs):
122 124 with self._shutdown_lock:
123 125 if self._shutdown:
124 126 raise RuntimeError('cannot schedule new futures after shutdown')
125 127
126 128 f = _base.Future()
127 129 w = _WorkItem(f, fn, args, kwargs)
128 130
129 131 self._work_queue.put(w)
130 132 self._adjust_thread_count()
131 133 return f
132 134 submit.__doc__ = _base.Executor.submit.__doc__
133 135
134 136 def _adjust_thread_count(self):
135 137 # When the executor gets lost, the weakref callback will wake up
136 138 # the worker threads.
137 139 def weakref_cb(_, q=self._work_queue):
138 140 q.put(None)
139 141 # TODO(bquinlan): Should avoid creating new threads if there are more
140 142 # idle threads than items in the work queue.
141 143 num_threads = len(self._threads)
142 144 if num_threads < self._max_workers:
143 145 thread_name = '%s_%d' % (self._thread_name_prefix or self,
144 146 num_threads)
145 147 t = threading.Thread(name=thread_name, target=_worker,
146 148 args=(weakref.ref(self, weakref_cb),
147 149 self._work_queue))
148 150 t.daemon = True
149 151 t.start()
150 152 self._threads.add(t)
151 153 _threads_queues[t] = self._work_queue
152 154
153 155 def shutdown(self, wait=True):
154 156 with self._shutdown_lock:
155 157 self._shutdown = True
156 158 self._work_queue.put(None)
157 159 if wait:
158 160 for t in self._threads:
159 161 t.join(sys.maxint)
160 162 shutdown.__doc__ = _base.Executor.shutdown.__doc__
General Comments 0
You need to be logged in to leave comments. Login now