##// END OF EJS Templates
thirdparty: vendor futures 3.2.0...
Gregory Szorc -
r37641:eb687c28 default
parent child Browse files
Show More
@@ -0,0 +1,48 b''
1 PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
2 --------------------------------------------
3
4 1. This LICENSE AGREEMENT is between the Python Software Foundation
5 ("PSF"), and the Individual or Organization ("Licensee") accessing and
6 otherwise using this software ("Python") in source or binary form and
7 its associated documentation.
8
9 2. Subject to the terms and conditions of this License Agreement, PSF
10 hereby grants Licensee a nonexclusive, royalty-free, world-wide
11 license to reproduce, analyze, test, perform and/or display publicly,
12 prepare derivative works, distribute, and otherwise use Python
13 alone or in any derivative version, provided, however, that PSF's
14 License Agreement and PSF's notice of copyright, i.e., "Copyright (c)
15 2001, 2002, 2003, 2004, 2005, 2006 Python Software Foundation; All Rights
16 Reserved" are retained in Python alone or in any derivative version
17 prepared by Licensee.
18
19 3. In the event Licensee prepares a derivative work that is based on
20 or incorporates Python or any part thereof, and wants to make
21 the derivative work available to others as provided herein, then
22 Licensee hereby agrees to include in any such work a brief summary of
23 the changes made to Python.
24
25 4. PSF is making Python available to Licensee on an "AS IS"
26 basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
27 IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
28 DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
29 FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
30 INFRINGE ANY THIRD PARTY RIGHTS.
31
32 5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
33 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
34 A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
35 OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
36
37 6. This License Agreement will automatically terminate upon a material
38 breach of its terms and conditions.
39
40 7. Nothing in this License Agreement shall be deemed to create any
41 relationship of agency, partnership, or joint venture between PSF and
42 Licensee. This License Agreement does not grant permission to use PSF
43 trademarks or trade name in a trademark sense to endorse or promote
44 products or services of Licensee, or any third party.
45
46 8. By copying, installing or otherwise using Python, Licensee
47 agrees to be bound by the terms and conditions of this License
48 Agreement.
@@ -0,0 +1,3 b''
1 from pkgutil import extend_path
2
3 __path__ = extend_path(__path__, __name__)
@@ -0,0 +1,23 b''
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Execute computations asynchronously using threads or processes."""
5
6 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
7
8 from concurrent.futures._base import (FIRST_COMPLETED,
9 FIRST_EXCEPTION,
10 ALL_COMPLETED,
11 CancelledError,
12 TimeoutError,
13 Future,
14 Executor,
15 wait,
16 as_completed)
17 from concurrent.futures.thread import ThreadPoolExecutor
18
19 try:
20 from concurrent.futures.process import ProcessPoolExecutor
21 except ImportError:
22 # some platforms don't have multiprocessing
23 pass
This diff has been collapsed as it changes many lines, (667 lines changed) Show them Hide them
@@ -0,0 +1,667 b''
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 import collections
5 import logging
6 import threading
7 import itertools
8 import time
9 import types
10
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
12
13 FIRST_COMPLETED = 'FIRST_COMPLETED'
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15 ALL_COMPLETED = 'ALL_COMPLETED'
16 _AS_COMPLETED = '_AS_COMPLETED'
17
18 # Possible future states (for internal use by the futures package).
19 PENDING = 'PENDING'
20 RUNNING = 'RUNNING'
21 # The future was cancelled by the user...
22 CANCELLED = 'CANCELLED'
23 # ...and _Waiter.add_cancelled() was called by a worker.
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
25 FINISHED = 'FINISHED'
26
27 _FUTURE_STATES = [
28 PENDING,
29 RUNNING,
30 CANCELLED,
31 CANCELLED_AND_NOTIFIED,
32 FINISHED
33 ]
34
35 _STATE_TO_DESCRIPTION_MAP = {
36 PENDING: "pending",
37 RUNNING: "running",
38 CANCELLED: "cancelled",
39 CANCELLED_AND_NOTIFIED: "cancelled",
40 FINISHED: "finished"
41 }
42
43 # Logger for internal use by the futures package.
44 LOGGER = logging.getLogger("concurrent.futures")
45
46 class Error(Exception):
47 """Base class for all future-related exceptions."""
48 pass
49
50 class CancelledError(Error):
51 """The Future was cancelled."""
52 pass
53
54 class TimeoutError(Error):
55 """The operation exceeded the given deadline."""
56 pass
57
58 class _Waiter(object):
59 """Provides the event that wait() and as_completed() block on."""
60 def __init__(self):
61 self.event = threading.Event()
62 self.finished_futures = []
63
64 def add_result(self, future):
65 self.finished_futures.append(future)
66
67 def add_exception(self, future):
68 self.finished_futures.append(future)
69
70 def add_cancelled(self, future):
71 self.finished_futures.append(future)
72
73 class _AsCompletedWaiter(_Waiter):
74 """Used by as_completed()."""
75
76 def __init__(self):
77 super(_AsCompletedWaiter, self).__init__()
78 self.lock = threading.Lock()
79
80 def add_result(self, future):
81 with self.lock:
82 super(_AsCompletedWaiter, self).add_result(future)
83 self.event.set()
84
85 def add_exception(self, future):
86 with self.lock:
87 super(_AsCompletedWaiter, self).add_exception(future)
88 self.event.set()
89
90 def add_cancelled(self, future):
91 with self.lock:
92 super(_AsCompletedWaiter, self).add_cancelled(future)
93 self.event.set()
94
95 class _FirstCompletedWaiter(_Waiter):
96 """Used by wait(return_when=FIRST_COMPLETED)."""
97
98 def add_result(self, future):
99 super(_FirstCompletedWaiter, self).add_result(future)
100 self.event.set()
101
102 def add_exception(self, future):
103 super(_FirstCompletedWaiter, self).add_exception(future)
104 self.event.set()
105
106 def add_cancelled(self, future):
107 super(_FirstCompletedWaiter, self).add_cancelled(future)
108 self.event.set()
109
110 class _AllCompletedWaiter(_Waiter):
111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
112
113 def __init__(self, num_pending_calls, stop_on_exception):
114 self.num_pending_calls = num_pending_calls
115 self.stop_on_exception = stop_on_exception
116 self.lock = threading.Lock()
117 super(_AllCompletedWaiter, self).__init__()
118
119 def _decrement_pending_calls(self):
120 with self.lock:
121 self.num_pending_calls -= 1
122 if not self.num_pending_calls:
123 self.event.set()
124
125 def add_result(self, future):
126 super(_AllCompletedWaiter, self).add_result(future)
127 self._decrement_pending_calls()
128
129 def add_exception(self, future):
130 super(_AllCompletedWaiter, self).add_exception(future)
131 if self.stop_on_exception:
132 self.event.set()
133 else:
134 self._decrement_pending_calls()
135
136 def add_cancelled(self, future):
137 super(_AllCompletedWaiter, self).add_cancelled(future)
138 self._decrement_pending_calls()
139
140 class _AcquireFutures(object):
141 """A context manager that does an ordered acquire of Future conditions."""
142
143 def __init__(self, futures):
144 self.futures = sorted(futures, key=id)
145
146 def __enter__(self):
147 for future in self.futures:
148 future._condition.acquire()
149
150 def __exit__(self, *args):
151 for future in self.futures:
152 future._condition.release()
153
154 def _create_and_install_waiters(fs, return_when):
155 if return_when == _AS_COMPLETED:
156 waiter = _AsCompletedWaiter()
157 elif return_when == FIRST_COMPLETED:
158 waiter = _FirstCompletedWaiter()
159 else:
160 pending_count = sum(
161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
162
163 if return_when == FIRST_EXCEPTION:
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165 elif return_when == ALL_COMPLETED:
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
167 else:
168 raise ValueError("Invalid return condition: %r" % return_when)
169
170 for f in fs:
171 f._waiters.append(waiter)
172
173 return waiter
174
175
176 def _yield_finished_futures(fs, waiter, ref_collect):
177 """
178 Iterate on the list *fs*, yielding finished futures one by one in
179 reverse order.
180 Before yielding a future, *waiter* is removed from its waiters
181 and the future is removed from each set in the collection of sets
182 *ref_collect*.
183
184 The aim of this function is to avoid keeping stale references after
185 the future is yielded and before the iterator resumes.
186 """
187 while fs:
188 f = fs[-1]
189 for futures_set in ref_collect:
190 futures_set.remove(f)
191 with f._condition:
192 f._waiters.remove(waiter)
193 del f
194 # Careful not to keep a reference to the popped value
195 yield fs.pop()
196
197
198 def as_completed(fs, timeout=None):
199 """An iterator over the given futures that yields each as it completes.
200
201 Args:
202 fs: The sequence of Futures (possibly created by different Executors) to
203 iterate over.
204 timeout: The maximum number of seconds to wait. If None, then there
205 is no limit on the wait time.
206
207 Returns:
208 An iterator that yields the given Futures as they complete (finished or
209 cancelled). If any given Futures are duplicated, they will be returned
210 once.
211
212 Raises:
213 TimeoutError: If the entire result iterator could not be generated
214 before the given timeout.
215 """
216 if timeout is not None:
217 end_time = timeout + time.time()
218
219 fs = set(fs)
220 total_futures = len(fs)
221 with _AcquireFutures(fs):
222 finished = set(
223 f for f in fs
224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225 pending = fs - finished
226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227 finished = list(finished)
228 try:
229 for f in _yield_finished_futures(finished, waiter,
230 ref_collect=(fs,)):
231 f = [f]
232 yield f.pop()
233
234 while pending:
235 if timeout is None:
236 wait_timeout = None
237 else:
238 wait_timeout = end_time - time.time()
239 if wait_timeout < 0:
240 raise TimeoutError(
241 '%d (of %d) futures unfinished' % (
242 len(pending), total_futures))
243
244 waiter.event.wait(wait_timeout)
245
246 with waiter.lock:
247 finished = waiter.finished_futures
248 waiter.finished_futures = []
249 waiter.event.clear()
250
251 # reverse to keep finishing order
252 finished.reverse()
253 for f in _yield_finished_futures(finished, waiter,
254 ref_collect=(fs, pending)):
255 f = [f]
256 yield f.pop()
257
258 finally:
259 # Remove waiter from unfinished futures
260 for f in fs:
261 with f._condition:
262 f._waiters.remove(waiter)
263
264 DoneAndNotDoneFutures = collections.namedtuple(
265 'DoneAndNotDoneFutures', 'done not_done')
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267 """Wait for the futures in the given sequence to complete.
268
269 Args:
270 fs: The sequence of Futures (possibly created by different Executors) to
271 wait upon.
272 timeout: The maximum number of seconds to wait. If None, then there
273 is no limit on the wait time.
274 return_when: Indicates when this function should return. The options
275 are:
276
277 FIRST_COMPLETED - Return when any future finishes or is
278 cancelled.
279 FIRST_EXCEPTION - Return when any future finishes by raising an
280 exception. If no future raises an exception
281 then it is equivalent to ALL_COMPLETED.
282 ALL_COMPLETED - Return when all futures finish or are cancelled.
283
284 Returns:
285 A named 2-tuple of sets. The first set, named 'done', contains the
286 futures that completed (is finished or cancelled) before the wait
287 completed. The second set, named 'not_done', contains uncompleted
288 futures.
289 """
290 with _AcquireFutures(fs):
291 done = set(f for f in fs
292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293 not_done = set(fs) - done
294
295 if (return_when == FIRST_COMPLETED) and done:
296 return DoneAndNotDoneFutures(done, not_done)
297 elif (return_when == FIRST_EXCEPTION) and done:
298 if any(f for f in done
299 if not f.cancelled() and f.exception() is not None):
300 return DoneAndNotDoneFutures(done, not_done)
301
302 if len(done) == len(fs):
303 return DoneAndNotDoneFutures(done, not_done)
304
305 waiter = _create_and_install_waiters(fs, return_when)
306
307 waiter.event.wait(timeout)
308 for f in fs:
309 with f._condition:
310 f._waiters.remove(waiter)
311
312 done.update(waiter.finished_futures)
313 return DoneAndNotDoneFutures(done, set(fs) - done)
314
315 class Future(object):
316 """Represents the result of an asynchronous computation."""
317
318 def __init__(self):
319 """Initializes the future. Should not be called by clients."""
320 self._condition = threading.Condition()
321 self._state = PENDING
322 self._result = None
323 self._exception = None
324 self._traceback = None
325 self._waiters = []
326 self._done_callbacks = []
327
328 def _invoke_callbacks(self):
329 for callback in self._done_callbacks:
330 try:
331 callback(self)
332 except Exception:
333 LOGGER.exception('exception calling callback for %r', self)
334 except BaseException:
335 # Explicitly let all other new-style exceptions through so
336 # that we can catch all old-style exceptions with a simple
337 # "except:" clause below.
338 #
339 # All old-style exception objects are instances of
340 # types.InstanceType, but "except types.InstanceType:" does
341 # not catch old-style exceptions for some reason. Thus, the
342 # only way to catch all old-style exceptions without catching
343 # any new-style exceptions is to filter out the new-style
344 # exceptions, which all derive from BaseException.
345 raise
346 except:
347 # Because of the BaseException clause above, this handler only
348 # executes for old-style exception objects.
349 LOGGER.exception('exception calling callback for %r', self)
350
351 def __repr__(self):
352 with self._condition:
353 if self._state == FINISHED:
354 if self._exception:
355 return '<%s at %#x state=%s raised %s>' % (
356 self.__class__.__name__,
357 id(self),
358 _STATE_TO_DESCRIPTION_MAP[self._state],
359 self._exception.__class__.__name__)
360 else:
361 return '<%s at %#x state=%s returned %s>' % (
362 self.__class__.__name__,
363 id(self),
364 _STATE_TO_DESCRIPTION_MAP[self._state],
365 self._result.__class__.__name__)
366 return '<%s at %#x state=%s>' % (
367 self.__class__.__name__,
368 id(self),
369 _STATE_TO_DESCRIPTION_MAP[self._state])
370
371 def cancel(self):
372 """Cancel the future if possible.
373
374 Returns True if the future was cancelled, False otherwise. A future
375 cannot be cancelled if it is running or has already completed.
376 """
377 with self._condition:
378 if self._state in [RUNNING, FINISHED]:
379 return False
380
381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
382 return True
383
384 self._state = CANCELLED
385 self._condition.notify_all()
386
387 self._invoke_callbacks()
388 return True
389
390 def cancelled(self):
391 """Return True if the future was cancelled."""
392 with self._condition:
393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
394
395 def running(self):
396 """Return True if the future is currently executing."""
397 with self._condition:
398 return self._state == RUNNING
399
400 def done(self):
401 """Return True of the future was cancelled or finished executing."""
402 with self._condition:
403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
404
405 def __get_result(self):
406 if self._exception:
407 if isinstance(self._exception, types.InstanceType):
408 # The exception is an instance of an old-style class, which
409 # means type(self._exception) returns types.ClassType instead
410 # of the exception's actual class type.
411 exception_type = self._exception.__class__
412 else:
413 exception_type = type(self._exception)
414 raise exception_type, self._exception, self._traceback
415 else:
416 return self._result
417
418 def add_done_callback(self, fn):
419 """Attaches a callable that will be called when the future finishes.
420
421 Args:
422 fn: A callable that will be called with this future as its only
423 argument when the future completes or is cancelled. The callable
424 will always be called by a thread in the same process in which
425 it was added. If the future has already completed or been
426 cancelled then the callable will be called immediately. These
427 callables are called in the order that they were added.
428 """
429 with self._condition:
430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431 self._done_callbacks.append(fn)
432 return
433 fn(self)
434
435 def result(self, timeout=None):
436 """Return the result of the call that the future represents.
437
438 Args:
439 timeout: The number of seconds to wait for the result if the future
440 isn't done. If None, then there is no limit on the wait time.
441
442 Returns:
443 The result of the call that the future represents.
444
445 Raises:
446 CancelledError: If the future was cancelled.
447 TimeoutError: If the future didn't finish executing before the given
448 timeout.
449 Exception: If the call raised then that exception will be raised.
450 """
451 with self._condition:
452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453 raise CancelledError()
454 elif self._state == FINISHED:
455 return self.__get_result()
456
457 self._condition.wait(timeout)
458
459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460 raise CancelledError()
461 elif self._state == FINISHED:
462 return self.__get_result()
463 else:
464 raise TimeoutError()
465
466 def exception_info(self, timeout=None):
467 """Return a tuple of (exception, traceback) raised by the call that the
468 future represents.
469
470 Args:
471 timeout: The number of seconds to wait for the exception if the
472 future isn't done. If None, then there is no limit on the wait
473 time.
474
475 Returns:
476 The exception raised by the call that the future represents or None
477 if the call completed without raising.
478
479 Raises:
480 CancelledError: If the future was cancelled.
481 TimeoutError: If the future didn't finish executing before the given
482 timeout.
483 """
484 with self._condition:
485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486 raise CancelledError()
487 elif self._state == FINISHED:
488 return self._exception, self._traceback
489
490 self._condition.wait(timeout)
491
492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493 raise CancelledError()
494 elif self._state == FINISHED:
495 return self._exception, self._traceback
496 else:
497 raise TimeoutError()
498
499 def exception(self, timeout=None):
500 """Return the exception raised by the call that the future represents.
501
502 Args:
503 timeout: The number of seconds to wait for the exception if the
504 future isn't done. If None, then there is no limit on the wait
505 time.
506
507 Returns:
508 The exception raised by the call that the future represents or None
509 if the call completed without raising.
510
511 Raises:
512 CancelledError: If the future was cancelled.
513 TimeoutError: If the future didn't finish executing before the given
514 timeout.
515 """
516 return self.exception_info(timeout)[0]
517
518 # The following methods should only be used by Executors and in tests.
519 def set_running_or_notify_cancel(self):
520 """Mark the future as running or process any cancel notifications.
521
522 Should only be used by Executor implementations and unit tests.
523
524 If the future has been cancelled (cancel() was called and returned
525 True) then any threads waiting on the future completing (though calls
526 to as_completed() or wait()) are notified and False is returned.
527
528 If the future was not cancelled then it is put in the running state
529 (future calls to running() will return True) and True is returned.
530
531 This method should be called by Executor implementations before
532 executing the work associated with this future. If this method returns
533 False then the work should not be executed.
534
535 Returns:
536 False if the Future was cancelled, True otherwise.
537
538 Raises:
539 RuntimeError: if this method was already called or if set_result()
540 or set_exception() was called.
541 """
542 with self._condition:
543 if self._state == CANCELLED:
544 self._state = CANCELLED_AND_NOTIFIED
545 for waiter in self._waiters:
546 waiter.add_cancelled(self)
547 # self._condition.notify_all() is not necessary because
548 # self.cancel() triggers a notification.
549 return False
550 elif self._state == PENDING:
551 self._state = RUNNING
552 return True
553 else:
554 LOGGER.critical('Future %s in unexpected state: %s',
555 id(self),
556 self._state)
557 raise RuntimeError('Future in unexpected state')
558
559 def set_result(self, result):
560 """Sets the return value of work associated with the future.
561
562 Should only be used by Executor implementations and unit tests.
563 """
564 with self._condition:
565 self._result = result
566 self._state = FINISHED
567 for waiter in self._waiters:
568 waiter.add_result(self)
569 self._condition.notify_all()
570 self._invoke_callbacks()
571
572 def set_exception_info(self, exception, traceback):
573 """Sets the result of the future as being the given exception
574 and traceback.
575
576 Should only be used by Executor implementations and unit tests.
577 """
578 with self._condition:
579 self._exception = exception
580 self._traceback = traceback
581 self._state = FINISHED
582 for waiter in self._waiters:
583 waiter.add_exception(self)
584 self._condition.notify_all()
585 self._invoke_callbacks()
586
587 def set_exception(self, exception):
588 """Sets the result of the future as being the given exception.
589
590 Should only be used by Executor implementations and unit tests.
591 """
592 self.set_exception_info(exception, None)
593
594 class Executor(object):
595 """This is an abstract base class for concrete asynchronous executors."""
596
597 def submit(self, fn, *args, **kwargs):
598 """Submits a callable to be executed with the given arguments.
599
600 Schedules the callable to be executed as fn(*args, **kwargs) and returns
601 a Future instance representing the execution of the callable.
602
603 Returns:
604 A Future representing the given call.
605 """
606 raise NotImplementedError()
607
608 def map(self, fn, *iterables, **kwargs):
609 """Returns an iterator equivalent to map(fn, iter).
610
611 Args:
612 fn: A callable that will take as many arguments as there are
613 passed iterables.
614 timeout: The maximum number of seconds to wait. If None, then there
615 is no limit on the wait time.
616
617 Returns:
618 An iterator equivalent to: map(func, *iterables) but the calls may
619 be evaluated out-of-order.
620
621 Raises:
622 TimeoutError: If the entire result iterator could not be generated
623 before the given timeout.
624 Exception: If fn(*args) raises for any values.
625 """
626 timeout = kwargs.get('timeout')
627 if timeout is not None:
628 end_time = timeout + time.time()
629
630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
631
632 # Yield must be hidden in closure so that the futures are submitted
633 # before the first iterator value is required.
634 def result_iterator():
635 try:
636 # reverse to keep finishing order
637 fs.reverse()
638 while fs:
639 # Careful not to keep a reference to the popped future
640 if timeout is None:
641 yield fs.pop().result()
642 else:
643 yield fs.pop().result(end_time - time.time())
644 finally:
645 for future in fs:
646 future.cancel()
647 return result_iterator()
648
649 def shutdown(self, wait=True):
650 """Clean-up the resources associated with the Executor.
651
652 It is safe to call this method several times. Otherwise, no other
653 methods can be called after this one.
654
655 Args:
656 wait: If True then shutdown will not return until all running
657 futures have finished executing and the resources used by the
658 executor have been reclaimed.
659 """
660 pass
661
662 def __enter__(self):
663 return self
664
665 def __exit__(self, exc_type, exc_val, exc_tb):
666 self.shutdown(wait=True)
667 return False
@@ -0,0 +1,363 b''
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Implements ProcessPoolExecutor.
5
6 The follow diagram and text describe the data-flow through the system:
7
8 |======================= In-process =====================|== Out-of-process ==|
9
10 +----------+ +----------+ +--------+ +-----------+ +---------+
11 | | => | Work Ids | => | | => | Call Q | => | |
12 | | +----------+ | | +-----------+ | |
13 | | | ... | | | | ... | | |
14 | | | 6 | | | | 5, call() | | |
15 | | | 7 | | | | ... | | |
16 | Process | | ... | | Local | +-----------+ | Process |
17 | Pool | +----------+ | Worker | | #1..n |
18 | Executor | | Thread | | |
19 | | +----------- + | | +-----------+ | |
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
21 | | +------------+ | | +-----------+ | |
22 | | | 6: call() | | | | ... | | |
23 | | | future | | | | 4, result | | |
24 | | | ... | | | | 3, except | | |
25 +----------+ +------------+ +--------+ +-----------+ +---------+
26
27 Executor.submit() called:
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29 - adds the id of the _WorkItem to the "Work Ids" queue
30
31 Local worker thread:
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38 - reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41 Process #1..n:
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
44 """
45
46 import atexit
47 from concurrent.futures import _base
48 import Queue as queue
49 import multiprocessing
50 import threading
51 import weakref
52 import sys
53
54 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
55
56 # Workers are created as daemon threads and processes. This is done to allow the
57 # interpreter to exit when there are still idle processes in a
58 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
59 # allowing workers to die with the interpreter has two undesirable properties:
60 # - The workers would still be running during interpretor shutdown,
61 # meaning that they would fail in unpredictable ways.
62 # - The workers could be killed while evaluating a work item, which could
63 # be bad if the callable being evaluated has external side-effects e.g.
64 # writing to a file.
65 #
66 # To work around this problem, an exit handler is installed which tells the
67 # workers to exit when their work queues are empty and then waits until the
68 # threads/processes finish.
69
70 _threads_queues = weakref.WeakKeyDictionary()
71 _shutdown = False
72
73 def _python_exit():
74 global _shutdown
75 _shutdown = True
76 items = list(_threads_queues.items()) if _threads_queues else ()
77 for t, q in items:
78 q.put(None)
79 for t, q in items:
80 t.join(sys.maxint)
81
82 # Controls how many more calls than processes will be queued in the call queue.
83 # A smaller number will mean that processes spend more time idle waiting for
84 # work while a larger number will make Future.cancel() succeed less frequently
85 # (Futures in the call queue cannot be cancelled).
86 EXTRA_QUEUED_CALLS = 1
87
88 class _WorkItem(object):
89 def __init__(self, future, fn, args, kwargs):
90 self.future = future
91 self.fn = fn
92 self.args = args
93 self.kwargs = kwargs
94
95 class _ResultItem(object):
96 def __init__(self, work_id, exception=None, result=None):
97 self.work_id = work_id
98 self.exception = exception
99 self.result = result
100
101 class _CallItem(object):
102 def __init__(self, work_id, fn, args, kwargs):
103 self.work_id = work_id
104 self.fn = fn
105 self.args = args
106 self.kwargs = kwargs
107
108 def _process_worker(call_queue, result_queue):
109 """Evaluates calls from call_queue and places the results in result_queue.
110
111 This worker is run in a separate process.
112
113 Args:
114 call_queue: A multiprocessing.Queue of _CallItems that will be read and
115 evaluated by the worker.
116 result_queue: A multiprocessing.Queue of _ResultItems that will written
117 to by the worker.
118 shutdown: A multiprocessing.Event that will be set as a signal to the
119 worker that it should exit when call_queue is empty.
120 """
121 while True:
122 call_item = call_queue.get(block=True)
123 if call_item is None:
124 # Wake up queue management thread
125 result_queue.put(None)
126 return
127 try:
128 r = call_item.fn(*call_item.args, **call_item.kwargs)
129 except:
130 e = sys.exc_info()[1]
131 result_queue.put(_ResultItem(call_item.work_id,
132 exception=e))
133 else:
134 result_queue.put(_ResultItem(call_item.work_id,
135 result=r))
136
137 def _add_call_item_to_queue(pending_work_items,
138 work_ids,
139 call_queue):
140 """Fills call_queue with _WorkItems from pending_work_items.
141
142 This function never blocks.
143
144 Args:
145 pending_work_items: A dict mapping work ids to _WorkItems e.g.
146 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
147 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
148 are consumed and the corresponding _WorkItems from
149 pending_work_items are transformed into _CallItems and put in
150 call_queue.
151 call_queue: A multiprocessing.Queue that will be filled with _CallItems
152 derived from _WorkItems.
153 """
154 while True:
155 if call_queue.full():
156 return
157 try:
158 work_id = work_ids.get(block=False)
159 except queue.Empty:
160 return
161 else:
162 work_item = pending_work_items[work_id]
163
164 if work_item.future.set_running_or_notify_cancel():
165 call_queue.put(_CallItem(work_id,
166 work_item.fn,
167 work_item.args,
168 work_item.kwargs),
169 block=True)
170 else:
171 del pending_work_items[work_id]
172 continue
173
174 def _queue_management_worker(executor_reference,
175 processes,
176 pending_work_items,
177 work_ids_queue,
178 call_queue,
179 result_queue):
180 """Manages the communication between this process and the worker processes.
181
182 This function is run in a local thread.
183
184 Args:
185 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
186 this thread. Used to determine if the ProcessPoolExecutor has been
187 garbage collected and that this function can exit.
188 process: A list of the multiprocessing.Process instances used as
189 workers.
190 pending_work_items: A dict mapping work ids to _WorkItems e.g.
191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193 call_queue: A multiprocessing.Queue that will be filled with _CallItems
194 derived from _WorkItems for processing by the process workers.
195 result_queue: A multiprocessing.Queue of _ResultItems generated by the
196 process workers.
197 """
198 nb_shutdown_processes = [0]
199 def shutdown_one_process():
200 """Tell a worker to terminate, which will in turn wake us again"""
201 call_queue.put(None)
202 nb_shutdown_processes[0] += 1
203 while True:
204 _add_call_item_to_queue(pending_work_items,
205 work_ids_queue,
206 call_queue)
207
208 result_item = result_queue.get(block=True)
209 if result_item is not None:
210 work_item = pending_work_items[result_item.work_id]
211 del pending_work_items[result_item.work_id]
212
213 if result_item.exception:
214 work_item.future.set_exception(result_item.exception)
215 else:
216 work_item.future.set_result(result_item.result)
217 # Delete references to object. See issue16284
218 del work_item
219 # Check whether we should start shutting down.
220 executor = executor_reference()
221 # No more work items can be added if:
222 # - The interpreter is shutting down OR
223 # - The executor that owns this worker has been collected OR
224 # - The executor that owns this worker has been shutdown.
225 if _shutdown or executor is None or executor._shutdown_thread:
226 # Since no new work items can be added, it is safe to shutdown
227 # this thread if there are no pending work items.
228 if not pending_work_items:
229 while nb_shutdown_processes[0] < len(processes):
230 shutdown_one_process()
231 # If .join() is not called on the created processes then
232 # some multiprocessing.Queue methods may deadlock on Mac OS
233 # X.
234 for p in processes:
235 p.join()
236 call_queue.close()
237 return
238 del executor
239
240 _system_limits_checked = False
241 _system_limited = None
242 def _check_system_limits():
243 global _system_limits_checked, _system_limited
244 if _system_limits_checked:
245 if _system_limited:
246 raise NotImplementedError(_system_limited)
247 _system_limits_checked = True
248 try:
249 import os
250 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
251 except (AttributeError, ValueError):
252 # sysconf not available or setting not available
253 return
254 if nsems_max == -1:
255 # indetermine limit, assume that limit is determined
256 # by available memory only
257 return
258 if nsems_max >= 256:
259 # minimum number of semaphores available
260 # according to POSIX
261 return
262 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
263 raise NotImplementedError(_system_limited)
264
265
266 class ProcessPoolExecutor(_base.Executor):
267 def __init__(self, max_workers=None):
268 """Initializes a new ProcessPoolExecutor instance.
269
270 Args:
271 max_workers: The maximum number of processes that can be used to
272 execute the given calls. If None or not given then as many
273 worker processes will be created as the machine has processors.
274 """
275 _check_system_limits()
276
277 if max_workers is None:
278 self._max_workers = multiprocessing.cpu_count()
279 else:
280 if max_workers <= 0:
281 raise ValueError("max_workers must be greater than 0")
282
283 self._max_workers = max_workers
284
285 # Make the call queue slightly larger than the number of processes to
286 # prevent the worker processes from idling. But don't make it too big
287 # because futures in the call queue cannot be cancelled.
288 self._call_queue = multiprocessing.Queue(self._max_workers +
289 EXTRA_QUEUED_CALLS)
290 self._result_queue = multiprocessing.Queue()
291 self._work_ids = queue.Queue()
292 self._queue_management_thread = None
293 self._processes = set()
294
295 # Shutdown is a two-step process.
296 self._shutdown_thread = False
297 self._shutdown_lock = threading.Lock()
298 self._queue_count = 0
299 self._pending_work_items = {}
300
301 def _start_queue_management_thread(self):
302 # When the executor gets lost, the weakref callback will wake up
303 # the queue management thread.
304 def weakref_cb(_, q=self._result_queue):
305 q.put(None)
306 if self._queue_management_thread is None:
307 self._queue_management_thread = threading.Thread(
308 target=_queue_management_worker,
309 args=(weakref.ref(self, weakref_cb),
310 self._processes,
311 self._pending_work_items,
312 self._work_ids,
313 self._call_queue,
314 self._result_queue))
315 self._queue_management_thread.daemon = True
316 self._queue_management_thread.start()
317 _threads_queues[self._queue_management_thread] = self._result_queue
318
319 def _adjust_process_count(self):
320 for _ in range(len(self._processes), self._max_workers):
321 p = multiprocessing.Process(
322 target=_process_worker,
323 args=(self._call_queue,
324 self._result_queue))
325 p.start()
326 self._processes.add(p)
327
328 def submit(self, fn, *args, **kwargs):
329 with self._shutdown_lock:
330 if self._shutdown_thread:
331 raise RuntimeError('cannot schedule new futures after shutdown')
332
333 f = _base.Future()
334 w = _WorkItem(f, fn, args, kwargs)
335
336 self._pending_work_items[self._queue_count] = w
337 self._work_ids.put(self._queue_count)
338 self._queue_count += 1
339 # Wake up queue management thread
340 self._result_queue.put(None)
341
342 self._start_queue_management_thread()
343 self._adjust_process_count()
344 return f
345 submit.__doc__ = _base.Executor.submit.__doc__
346
347 def shutdown(self, wait=True):
348 with self._shutdown_lock:
349 self._shutdown_thread = True
350 if self._queue_management_thread:
351 # Wake up queue management thread
352 self._result_queue.put(None)
353 if wait:
354 self._queue_management_thread.join(sys.maxint)
355 # To reduce the risk of openning too many files, remove references to
356 # objects that use file descriptors.
357 self._queue_management_thread = None
358 self._call_queue = None
359 self._result_queue = None
360 self._processes = None
361 shutdown.__doc__ = _base.Executor.shutdown.__doc__
362
363 atexit.register(_python_exit)
@@ -0,0 +1,160 b''
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Implements ThreadPoolExecutor."""
5
6 import atexit
7 from concurrent.futures import _base
8 import itertools
9 import Queue as queue
10 import threading
11 import weakref
12 import sys
13
14 try:
15 from multiprocessing import cpu_count
16 except ImportError:
17 # some platforms don't have multiprocessing
18 def cpu_count():
19 return None
20
21 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
22
23 # Workers are created as daemon threads. This is done to allow the interpreter
24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
25 # pool (i.e. shutdown() was not called). However, allowing workers to die with
26 # the interpreter has two undesirable properties:
27 # - The workers would still be running during interpretor shutdown,
28 # meaning that they would fail in unpredictable ways.
29 # - The workers could be killed while evaluating a work item, which could
30 # be bad if the callable being evaluated has external side-effects e.g.
31 # writing to a file.
32 #
33 # To work around this problem, an exit handler is installed which tells the
34 # workers to exit when their work queues are empty and then waits until the
35 # threads finish.
36
37 _threads_queues = weakref.WeakKeyDictionary()
38 _shutdown = False
39
40 def _python_exit():
41 global _shutdown
42 _shutdown = True
43 items = list(_threads_queues.items()) if _threads_queues else ()
44 for t, q in items:
45 q.put(None)
46 for t, q in items:
47 t.join(sys.maxint)
48
49 atexit.register(_python_exit)
50
51 class _WorkItem(object):
52 def __init__(self, future, fn, args, kwargs):
53 self.future = future
54 self.fn = fn
55 self.args = args
56 self.kwargs = kwargs
57
58 def run(self):
59 if not self.future.set_running_or_notify_cancel():
60 return
61
62 try:
63 result = self.fn(*self.args, **self.kwargs)
64 except:
65 e, tb = sys.exc_info()[1:]
66 self.future.set_exception_info(e, tb)
67 else:
68 self.future.set_result(result)
69
70 def _worker(executor_reference, work_queue):
71 try:
72 while True:
73 work_item = work_queue.get(block=True)
74 if work_item is not None:
75 work_item.run()
76 # Delete references to object. See issue16284
77 del work_item
78 continue
79 executor = executor_reference()
80 # Exit if:
81 # - The interpreter is shutting down OR
82 # - The executor that owns the worker has been collected OR
83 # - The executor that owns the worker has been shutdown.
84 if _shutdown or executor is None or executor._shutdown:
85 # Notice other workers
86 work_queue.put(None)
87 return
88 del executor
89 except:
90 _base.LOGGER.critical('Exception in worker', exc_info=True)
91
92
93 class ThreadPoolExecutor(_base.Executor):
94
95 # Used to assign unique thread names when thread_name_prefix is not supplied.
96 _counter = itertools.count().next
97
98 def __init__(self, max_workers=None, thread_name_prefix=''):
99 """Initializes a new ThreadPoolExecutor instance.
100
101 Args:
102 max_workers: The maximum number of threads that can be used to
103 execute the given calls.
104 thread_name_prefix: An optional name prefix to give our threads.
105 """
106 if max_workers is None:
107 # Use this number because ThreadPoolExecutor is often
108 # used to overlap I/O instead of CPU work.
109 max_workers = (cpu_count() or 1) * 5
110 if max_workers <= 0:
111 raise ValueError("max_workers must be greater than 0")
112
113 self._max_workers = max_workers
114 self._work_queue = queue.Queue()
115 self._threads = set()
116 self._shutdown = False
117 self._shutdown_lock = threading.Lock()
118 self._thread_name_prefix = (thread_name_prefix or
119 ("ThreadPoolExecutor-%d" % self._counter()))
120
121 def submit(self, fn, *args, **kwargs):
122 with self._shutdown_lock:
123 if self._shutdown:
124 raise RuntimeError('cannot schedule new futures after shutdown')
125
126 f = _base.Future()
127 w = _WorkItem(f, fn, args, kwargs)
128
129 self._work_queue.put(w)
130 self._adjust_thread_count()
131 return f
132 submit.__doc__ = _base.Executor.submit.__doc__
133
134 def _adjust_thread_count(self):
135 # When the executor gets lost, the weakref callback will wake up
136 # the worker threads.
137 def weakref_cb(_, q=self._work_queue):
138 q.put(None)
139 # TODO(bquinlan): Should avoid creating new threads if there are more
140 # idle threads than items in the work queue.
141 num_threads = len(self._threads)
142 if num_threads < self._max_workers:
143 thread_name = '%s_%d' % (self._thread_name_prefix or self,
144 num_threads)
145 t = threading.Thread(name=thread_name, target=_worker,
146 args=(weakref.ref(self, weakref_cb),
147 self._work_queue))
148 t.daemon = True
149 t.start()
150 self._threads.add(t)
151 _threads_queues[t] = self._work_queue
152
153 def shutdown(self, wait=True):
154 with self._shutdown_lock:
155 self._shutdown = True
156 self._work_queue.put(None)
157 if wait:
158 for t in self._threads:
159 t.join(sys.maxint)
160 shutdown.__doc__ = _base.Executor.shutdown.__doc__
General Comments 0
You need to be logged in to leave comments. Login now