##// END OF EJS Templates
futures: switch to absolute and relative imports...
Gregory Szorc -
r37644:0a9c0d34 default
parent child Browse files
Show More
@@ -1,23 +1,27
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
2 # Licensed to PSF under a Contributor Agreement.
3
3
4 """Execute computations asynchronously using threads or processes."""
4 """Execute computations asynchronously using threads or processes."""
5
5
6 from __future__ import absolute_import
7
6 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
8 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
7
9
8 from concurrent.futures._base import (FIRST_COMPLETED,
10 from ._base import (
11 FIRST_COMPLETED,
9 FIRST_EXCEPTION,
12 FIRST_EXCEPTION,
10 ALL_COMPLETED,
13 ALL_COMPLETED,
11 CancelledError,
14 CancelledError,
12 TimeoutError,
15 TimeoutError,
13 Future,
16 Future,
14 Executor,
17 Executor,
15 wait,
18 wait,
16 as_completed)
19 as_completed,
17 from concurrent.futures.thread import ThreadPoolExecutor
20 )
21 from .thread import ThreadPoolExecutor
18
22
19 try:
23 try:
20 from concurrent.futures.process import ProcessPoolExecutor
24 from .process import ProcessPoolExecutor
21 except ImportError:
25 except ImportError:
22 # some platforms don't have multiprocessing
26 # some platforms don't have multiprocessing
23 pass
27 pass
@@ -1,667 +1,669
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
2 # Licensed to PSF under a Contributor Agreement.
3
3
4 from __future__ import absolute_import
5
4 import collections
6 import collections
5 import logging
7 import logging
6 import threading
8 import threading
7 import itertools
9 import itertools
8 import time
10 import time
9 import types
11 import types
10
12
11 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
13 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
12
14
13 FIRST_COMPLETED = 'FIRST_COMPLETED'
15 FIRST_COMPLETED = 'FIRST_COMPLETED'
14 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
16 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15 ALL_COMPLETED = 'ALL_COMPLETED'
17 ALL_COMPLETED = 'ALL_COMPLETED'
16 _AS_COMPLETED = '_AS_COMPLETED'
18 _AS_COMPLETED = '_AS_COMPLETED'
17
19
18 # Possible future states (for internal use by the futures package).
20 # Possible future states (for internal use by the futures package).
19 PENDING = 'PENDING'
21 PENDING = 'PENDING'
20 RUNNING = 'RUNNING'
22 RUNNING = 'RUNNING'
21 # The future was cancelled by the user...
23 # The future was cancelled by the user...
22 CANCELLED = 'CANCELLED'
24 CANCELLED = 'CANCELLED'
23 # ...and _Waiter.add_cancelled() was called by a worker.
25 # ...and _Waiter.add_cancelled() was called by a worker.
24 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
26 CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
25 FINISHED = 'FINISHED'
27 FINISHED = 'FINISHED'
26
28
27 _FUTURE_STATES = [
29 _FUTURE_STATES = [
28 PENDING,
30 PENDING,
29 RUNNING,
31 RUNNING,
30 CANCELLED,
32 CANCELLED,
31 CANCELLED_AND_NOTIFIED,
33 CANCELLED_AND_NOTIFIED,
32 FINISHED
34 FINISHED
33 ]
35 ]
34
36
35 _STATE_TO_DESCRIPTION_MAP = {
37 _STATE_TO_DESCRIPTION_MAP = {
36 PENDING: "pending",
38 PENDING: "pending",
37 RUNNING: "running",
39 RUNNING: "running",
38 CANCELLED: "cancelled",
40 CANCELLED: "cancelled",
39 CANCELLED_AND_NOTIFIED: "cancelled",
41 CANCELLED_AND_NOTIFIED: "cancelled",
40 FINISHED: "finished"
42 FINISHED: "finished"
41 }
43 }
42
44
43 # Logger for internal use by the futures package.
45 # Logger for internal use by the futures package.
44 LOGGER = logging.getLogger("concurrent.futures")
46 LOGGER = logging.getLogger("concurrent.futures")
45
47
46 class Error(Exception):
48 class Error(Exception):
47 """Base class for all future-related exceptions."""
49 """Base class for all future-related exceptions."""
48 pass
50 pass
49
51
50 class CancelledError(Error):
52 class CancelledError(Error):
51 """The Future was cancelled."""
53 """The Future was cancelled."""
52 pass
54 pass
53
55
54 class TimeoutError(Error):
56 class TimeoutError(Error):
55 """The operation exceeded the given deadline."""
57 """The operation exceeded the given deadline."""
56 pass
58 pass
57
59
58 class _Waiter(object):
60 class _Waiter(object):
59 """Provides the event that wait() and as_completed() block on."""
61 """Provides the event that wait() and as_completed() block on."""
60 def __init__(self):
62 def __init__(self):
61 self.event = threading.Event()
63 self.event = threading.Event()
62 self.finished_futures = []
64 self.finished_futures = []
63
65
64 def add_result(self, future):
66 def add_result(self, future):
65 self.finished_futures.append(future)
67 self.finished_futures.append(future)
66
68
67 def add_exception(self, future):
69 def add_exception(self, future):
68 self.finished_futures.append(future)
70 self.finished_futures.append(future)
69
71
70 def add_cancelled(self, future):
72 def add_cancelled(self, future):
71 self.finished_futures.append(future)
73 self.finished_futures.append(future)
72
74
73 class _AsCompletedWaiter(_Waiter):
75 class _AsCompletedWaiter(_Waiter):
74 """Used by as_completed()."""
76 """Used by as_completed()."""
75
77
76 def __init__(self):
78 def __init__(self):
77 super(_AsCompletedWaiter, self).__init__()
79 super(_AsCompletedWaiter, self).__init__()
78 self.lock = threading.Lock()
80 self.lock = threading.Lock()
79
81
80 def add_result(self, future):
82 def add_result(self, future):
81 with self.lock:
83 with self.lock:
82 super(_AsCompletedWaiter, self).add_result(future)
84 super(_AsCompletedWaiter, self).add_result(future)
83 self.event.set()
85 self.event.set()
84
86
85 def add_exception(self, future):
87 def add_exception(self, future):
86 with self.lock:
88 with self.lock:
87 super(_AsCompletedWaiter, self).add_exception(future)
89 super(_AsCompletedWaiter, self).add_exception(future)
88 self.event.set()
90 self.event.set()
89
91
90 def add_cancelled(self, future):
92 def add_cancelled(self, future):
91 with self.lock:
93 with self.lock:
92 super(_AsCompletedWaiter, self).add_cancelled(future)
94 super(_AsCompletedWaiter, self).add_cancelled(future)
93 self.event.set()
95 self.event.set()
94
96
95 class _FirstCompletedWaiter(_Waiter):
97 class _FirstCompletedWaiter(_Waiter):
96 """Used by wait(return_when=FIRST_COMPLETED)."""
98 """Used by wait(return_when=FIRST_COMPLETED)."""
97
99
98 def add_result(self, future):
100 def add_result(self, future):
99 super(_FirstCompletedWaiter, self).add_result(future)
101 super(_FirstCompletedWaiter, self).add_result(future)
100 self.event.set()
102 self.event.set()
101
103
102 def add_exception(self, future):
104 def add_exception(self, future):
103 super(_FirstCompletedWaiter, self).add_exception(future)
105 super(_FirstCompletedWaiter, self).add_exception(future)
104 self.event.set()
106 self.event.set()
105
107
106 def add_cancelled(self, future):
108 def add_cancelled(self, future):
107 super(_FirstCompletedWaiter, self).add_cancelled(future)
109 super(_FirstCompletedWaiter, self).add_cancelled(future)
108 self.event.set()
110 self.event.set()
109
111
110 class _AllCompletedWaiter(_Waiter):
112 class _AllCompletedWaiter(_Waiter):
111 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
113 """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
112
114
113 def __init__(self, num_pending_calls, stop_on_exception):
115 def __init__(self, num_pending_calls, stop_on_exception):
114 self.num_pending_calls = num_pending_calls
116 self.num_pending_calls = num_pending_calls
115 self.stop_on_exception = stop_on_exception
117 self.stop_on_exception = stop_on_exception
116 self.lock = threading.Lock()
118 self.lock = threading.Lock()
117 super(_AllCompletedWaiter, self).__init__()
119 super(_AllCompletedWaiter, self).__init__()
118
120
119 def _decrement_pending_calls(self):
121 def _decrement_pending_calls(self):
120 with self.lock:
122 with self.lock:
121 self.num_pending_calls -= 1
123 self.num_pending_calls -= 1
122 if not self.num_pending_calls:
124 if not self.num_pending_calls:
123 self.event.set()
125 self.event.set()
124
126
125 def add_result(self, future):
127 def add_result(self, future):
126 super(_AllCompletedWaiter, self).add_result(future)
128 super(_AllCompletedWaiter, self).add_result(future)
127 self._decrement_pending_calls()
129 self._decrement_pending_calls()
128
130
129 def add_exception(self, future):
131 def add_exception(self, future):
130 super(_AllCompletedWaiter, self).add_exception(future)
132 super(_AllCompletedWaiter, self).add_exception(future)
131 if self.stop_on_exception:
133 if self.stop_on_exception:
132 self.event.set()
134 self.event.set()
133 else:
135 else:
134 self._decrement_pending_calls()
136 self._decrement_pending_calls()
135
137
136 def add_cancelled(self, future):
138 def add_cancelled(self, future):
137 super(_AllCompletedWaiter, self).add_cancelled(future)
139 super(_AllCompletedWaiter, self).add_cancelled(future)
138 self._decrement_pending_calls()
140 self._decrement_pending_calls()
139
141
140 class _AcquireFutures(object):
142 class _AcquireFutures(object):
141 """A context manager that does an ordered acquire of Future conditions."""
143 """A context manager that does an ordered acquire of Future conditions."""
142
144
143 def __init__(self, futures):
145 def __init__(self, futures):
144 self.futures = sorted(futures, key=id)
146 self.futures = sorted(futures, key=id)
145
147
146 def __enter__(self):
148 def __enter__(self):
147 for future in self.futures:
149 for future in self.futures:
148 future._condition.acquire()
150 future._condition.acquire()
149
151
150 def __exit__(self, *args):
152 def __exit__(self, *args):
151 for future in self.futures:
153 for future in self.futures:
152 future._condition.release()
154 future._condition.release()
153
155
154 def _create_and_install_waiters(fs, return_when):
156 def _create_and_install_waiters(fs, return_when):
155 if return_when == _AS_COMPLETED:
157 if return_when == _AS_COMPLETED:
156 waiter = _AsCompletedWaiter()
158 waiter = _AsCompletedWaiter()
157 elif return_when == FIRST_COMPLETED:
159 elif return_when == FIRST_COMPLETED:
158 waiter = _FirstCompletedWaiter()
160 waiter = _FirstCompletedWaiter()
159 else:
161 else:
160 pending_count = sum(
162 pending_count = sum(
161 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
163 f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
162
164
163 if return_when == FIRST_EXCEPTION:
165 if return_when == FIRST_EXCEPTION:
164 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165 elif return_when == ALL_COMPLETED:
167 elif return_when == ALL_COMPLETED:
166 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
168 waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
167 else:
169 else:
168 raise ValueError("Invalid return condition: %r" % return_when)
170 raise ValueError("Invalid return condition: %r" % return_when)
169
171
170 for f in fs:
172 for f in fs:
171 f._waiters.append(waiter)
173 f._waiters.append(waiter)
172
174
173 return waiter
175 return waiter
174
176
175
177
176 def _yield_finished_futures(fs, waiter, ref_collect):
178 def _yield_finished_futures(fs, waiter, ref_collect):
177 """
179 """
178 Iterate on the list *fs*, yielding finished futures one by one in
180 Iterate on the list *fs*, yielding finished futures one by one in
179 reverse order.
181 reverse order.
180 Before yielding a future, *waiter* is removed from its waiters
182 Before yielding a future, *waiter* is removed from its waiters
181 and the future is removed from each set in the collection of sets
183 and the future is removed from each set in the collection of sets
182 *ref_collect*.
184 *ref_collect*.
183
185
184 The aim of this function is to avoid keeping stale references after
186 The aim of this function is to avoid keeping stale references after
185 the future is yielded and before the iterator resumes.
187 the future is yielded and before the iterator resumes.
186 """
188 """
187 while fs:
189 while fs:
188 f = fs[-1]
190 f = fs[-1]
189 for futures_set in ref_collect:
191 for futures_set in ref_collect:
190 futures_set.remove(f)
192 futures_set.remove(f)
191 with f._condition:
193 with f._condition:
192 f._waiters.remove(waiter)
194 f._waiters.remove(waiter)
193 del f
195 del f
194 # Careful not to keep a reference to the popped value
196 # Careful not to keep a reference to the popped value
195 yield fs.pop()
197 yield fs.pop()
196
198
197
199
198 def as_completed(fs, timeout=None):
200 def as_completed(fs, timeout=None):
199 """An iterator over the given futures that yields each as it completes.
201 """An iterator over the given futures that yields each as it completes.
200
202
201 Args:
203 Args:
202 fs: The sequence of Futures (possibly created by different Executors) to
204 fs: The sequence of Futures (possibly created by different Executors) to
203 iterate over.
205 iterate over.
204 timeout: The maximum number of seconds to wait. If None, then there
206 timeout: The maximum number of seconds to wait. If None, then there
205 is no limit on the wait time.
207 is no limit on the wait time.
206
208
207 Returns:
209 Returns:
208 An iterator that yields the given Futures as they complete (finished or
210 An iterator that yields the given Futures as they complete (finished or
209 cancelled). If any given Futures are duplicated, they will be returned
211 cancelled). If any given Futures are duplicated, they will be returned
210 once.
212 once.
211
213
212 Raises:
214 Raises:
213 TimeoutError: If the entire result iterator could not be generated
215 TimeoutError: If the entire result iterator could not be generated
214 before the given timeout.
216 before the given timeout.
215 """
217 """
216 if timeout is not None:
218 if timeout is not None:
217 end_time = timeout + time.time()
219 end_time = timeout + time.time()
218
220
219 fs = set(fs)
221 fs = set(fs)
220 total_futures = len(fs)
222 total_futures = len(fs)
221 with _AcquireFutures(fs):
223 with _AcquireFutures(fs):
222 finished = set(
224 finished = set(
223 f for f in fs
225 f for f in fs
224 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
226 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225 pending = fs - finished
227 pending = fs - finished
226 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
228 waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227 finished = list(finished)
229 finished = list(finished)
228 try:
230 try:
229 for f in _yield_finished_futures(finished, waiter,
231 for f in _yield_finished_futures(finished, waiter,
230 ref_collect=(fs,)):
232 ref_collect=(fs,)):
231 f = [f]
233 f = [f]
232 yield f.pop()
234 yield f.pop()
233
235
234 while pending:
236 while pending:
235 if timeout is None:
237 if timeout is None:
236 wait_timeout = None
238 wait_timeout = None
237 else:
239 else:
238 wait_timeout = end_time - time.time()
240 wait_timeout = end_time - time.time()
239 if wait_timeout < 0:
241 if wait_timeout < 0:
240 raise TimeoutError(
242 raise TimeoutError(
241 '%d (of %d) futures unfinished' % (
243 '%d (of %d) futures unfinished' % (
242 len(pending), total_futures))
244 len(pending), total_futures))
243
245
244 waiter.event.wait(wait_timeout)
246 waiter.event.wait(wait_timeout)
245
247
246 with waiter.lock:
248 with waiter.lock:
247 finished = waiter.finished_futures
249 finished = waiter.finished_futures
248 waiter.finished_futures = []
250 waiter.finished_futures = []
249 waiter.event.clear()
251 waiter.event.clear()
250
252
251 # reverse to keep finishing order
253 # reverse to keep finishing order
252 finished.reverse()
254 finished.reverse()
253 for f in _yield_finished_futures(finished, waiter,
255 for f in _yield_finished_futures(finished, waiter,
254 ref_collect=(fs, pending)):
256 ref_collect=(fs, pending)):
255 f = [f]
257 f = [f]
256 yield f.pop()
258 yield f.pop()
257
259
258 finally:
260 finally:
259 # Remove waiter from unfinished futures
261 # Remove waiter from unfinished futures
260 for f in fs:
262 for f in fs:
261 with f._condition:
263 with f._condition:
262 f._waiters.remove(waiter)
264 f._waiters.remove(waiter)
263
265
264 DoneAndNotDoneFutures = collections.namedtuple(
266 DoneAndNotDoneFutures = collections.namedtuple(
265 'DoneAndNotDoneFutures', 'done not_done')
267 'DoneAndNotDoneFutures', 'done not_done')
266 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
268 def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267 """Wait for the futures in the given sequence to complete.
269 """Wait for the futures in the given sequence to complete.
268
270
269 Args:
271 Args:
270 fs: The sequence of Futures (possibly created by different Executors) to
272 fs: The sequence of Futures (possibly created by different Executors) to
271 wait upon.
273 wait upon.
272 timeout: The maximum number of seconds to wait. If None, then there
274 timeout: The maximum number of seconds to wait. If None, then there
273 is no limit on the wait time.
275 is no limit on the wait time.
274 return_when: Indicates when this function should return. The options
276 return_when: Indicates when this function should return. The options
275 are:
277 are:
276
278
277 FIRST_COMPLETED - Return when any future finishes or is
279 FIRST_COMPLETED - Return when any future finishes or is
278 cancelled.
280 cancelled.
279 FIRST_EXCEPTION - Return when any future finishes by raising an
281 FIRST_EXCEPTION - Return when any future finishes by raising an
280 exception. If no future raises an exception
282 exception. If no future raises an exception
281 then it is equivalent to ALL_COMPLETED.
283 then it is equivalent to ALL_COMPLETED.
282 ALL_COMPLETED - Return when all futures finish or are cancelled.
284 ALL_COMPLETED - Return when all futures finish or are cancelled.
283
285
284 Returns:
286 Returns:
285 A named 2-tuple of sets. The first set, named 'done', contains the
287 A named 2-tuple of sets. The first set, named 'done', contains the
286 futures that completed (is finished or cancelled) before the wait
288 futures that completed (is finished or cancelled) before the wait
287 completed. The second set, named 'not_done', contains uncompleted
289 completed. The second set, named 'not_done', contains uncompleted
288 futures.
290 futures.
289 """
291 """
290 with _AcquireFutures(fs):
292 with _AcquireFutures(fs):
291 done = set(f for f in fs
293 done = set(f for f in fs
292 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
294 if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293 not_done = set(fs) - done
295 not_done = set(fs) - done
294
296
295 if (return_when == FIRST_COMPLETED) and done:
297 if (return_when == FIRST_COMPLETED) and done:
296 return DoneAndNotDoneFutures(done, not_done)
298 return DoneAndNotDoneFutures(done, not_done)
297 elif (return_when == FIRST_EXCEPTION) and done:
299 elif (return_when == FIRST_EXCEPTION) and done:
298 if any(f for f in done
300 if any(f for f in done
299 if not f.cancelled() and f.exception() is not None):
301 if not f.cancelled() and f.exception() is not None):
300 return DoneAndNotDoneFutures(done, not_done)
302 return DoneAndNotDoneFutures(done, not_done)
301
303
302 if len(done) == len(fs):
304 if len(done) == len(fs):
303 return DoneAndNotDoneFutures(done, not_done)
305 return DoneAndNotDoneFutures(done, not_done)
304
306
305 waiter = _create_and_install_waiters(fs, return_when)
307 waiter = _create_and_install_waiters(fs, return_when)
306
308
307 waiter.event.wait(timeout)
309 waiter.event.wait(timeout)
308 for f in fs:
310 for f in fs:
309 with f._condition:
311 with f._condition:
310 f._waiters.remove(waiter)
312 f._waiters.remove(waiter)
311
313
312 done.update(waiter.finished_futures)
314 done.update(waiter.finished_futures)
313 return DoneAndNotDoneFutures(done, set(fs) - done)
315 return DoneAndNotDoneFutures(done, set(fs) - done)
314
316
315 class Future(object):
317 class Future(object):
316 """Represents the result of an asynchronous computation."""
318 """Represents the result of an asynchronous computation."""
317
319
318 def __init__(self):
320 def __init__(self):
319 """Initializes the future. Should not be called by clients."""
321 """Initializes the future. Should not be called by clients."""
320 self._condition = threading.Condition()
322 self._condition = threading.Condition()
321 self._state = PENDING
323 self._state = PENDING
322 self._result = None
324 self._result = None
323 self._exception = None
325 self._exception = None
324 self._traceback = None
326 self._traceback = None
325 self._waiters = []
327 self._waiters = []
326 self._done_callbacks = []
328 self._done_callbacks = []
327
329
328 def _invoke_callbacks(self):
330 def _invoke_callbacks(self):
329 for callback in self._done_callbacks:
331 for callback in self._done_callbacks:
330 try:
332 try:
331 callback(self)
333 callback(self)
332 except Exception:
334 except Exception:
333 LOGGER.exception('exception calling callback for %r', self)
335 LOGGER.exception('exception calling callback for %r', self)
334 except BaseException:
336 except BaseException:
335 # Explicitly let all other new-style exceptions through so
337 # Explicitly let all other new-style exceptions through so
336 # that we can catch all old-style exceptions with a simple
338 # that we can catch all old-style exceptions with a simple
337 # "except:" clause below.
339 # "except:" clause below.
338 #
340 #
339 # All old-style exception objects are instances of
341 # All old-style exception objects are instances of
340 # types.InstanceType, but "except types.InstanceType:" does
342 # types.InstanceType, but "except types.InstanceType:" does
341 # not catch old-style exceptions for some reason. Thus, the
343 # not catch old-style exceptions for some reason. Thus, the
342 # only way to catch all old-style exceptions without catching
344 # only way to catch all old-style exceptions without catching
343 # any new-style exceptions is to filter out the new-style
345 # any new-style exceptions is to filter out the new-style
344 # exceptions, which all derive from BaseException.
346 # exceptions, which all derive from BaseException.
345 raise
347 raise
346 except:
348 except:
347 # Because of the BaseException clause above, this handler only
349 # Because of the BaseException clause above, this handler only
348 # executes for old-style exception objects.
350 # executes for old-style exception objects.
349 LOGGER.exception('exception calling callback for %r', self)
351 LOGGER.exception('exception calling callback for %r', self)
350
352
351 def __repr__(self):
353 def __repr__(self):
352 with self._condition:
354 with self._condition:
353 if self._state == FINISHED:
355 if self._state == FINISHED:
354 if self._exception:
356 if self._exception:
355 return '<%s at %#x state=%s raised %s>' % (
357 return '<%s at %#x state=%s raised %s>' % (
356 self.__class__.__name__,
358 self.__class__.__name__,
357 id(self),
359 id(self),
358 _STATE_TO_DESCRIPTION_MAP[self._state],
360 _STATE_TO_DESCRIPTION_MAP[self._state],
359 self._exception.__class__.__name__)
361 self._exception.__class__.__name__)
360 else:
362 else:
361 return '<%s at %#x state=%s returned %s>' % (
363 return '<%s at %#x state=%s returned %s>' % (
362 self.__class__.__name__,
364 self.__class__.__name__,
363 id(self),
365 id(self),
364 _STATE_TO_DESCRIPTION_MAP[self._state],
366 _STATE_TO_DESCRIPTION_MAP[self._state],
365 self._result.__class__.__name__)
367 self._result.__class__.__name__)
366 return '<%s at %#x state=%s>' % (
368 return '<%s at %#x state=%s>' % (
367 self.__class__.__name__,
369 self.__class__.__name__,
368 id(self),
370 id(self),
369 _STATE_TO_DESCRIPTION_MAP[self._state])
371 _STATE_TO_DESCRIPTION_MAP[self._state])
370
372
371 def cancel(self):
373 def cancel(self):
372 """Cancel the future if possible.
374 """Cancel the future if possible.
373
375
374 Returns True if the future was cancelled, False otherwise. A future
376 Returns True if the future was cancelled, False otherwise. A future
375 cannot be cancelled if it is running or has already completed.
377 cannot be cancelled if it is running or has already completed.
376 """
378 """
377 with self._condition:
379 with self._condition:
378 if self._state in [RUNNING, FINISHED]:
380 if self._state in [RUNNING, FINISHED]:
379 return False
381 return False
380
382
381 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
383 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
382 return True
384 return True
383
385
384 self._state = CANCELLED
386 self._state = CANCELLED
385 self._condition.notify_all()
387 self._condition.notify_all()
386
388
387 self._invoke_callbacks()
389 self._invoke_callbacks()
388 return True
390 return True
389
391
390 def cancelled(self):
392 def cancelled(self):
391 """Return True if the future was cancelled."""
393 """Return True if the future was cancelled."""
392 with self._condition:
394 with self._condition:
393 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
395 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
394
396
395 def running(self):
397 def running(self):
396 """Return True if the future is currently executing."""
398 """Return True if the future is currently executing."""
397 with self._condition:
399 with self._condition:
398 return self._state == RUNNING
400 return self._state == RUNNING
399
401
400 def done(self):
402 def done(self):
401 """Return True of the future was cancelled or finished executing."""
403 """Return True of the future was cancelled or finished executing."""
402 with self._condition:
404 with self._condition:
403 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
405 return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
404
406
405 def __get_result(self):
407 def __get_result(self):
406 if self._exception:
408 if self._exception:
407 if isinstance(self._exception, types.InstanceType):
409 if isinstance(self._exception, types.InstanceType):
408 # The exception is an instance of an old-style class, which
410 # The exception is an instance of an old-style class, which
409 # means type(self._exception) returns types.ClassType instead
411 # means type(self._exception) returns types.ClassType instead
410 # of the exception's actual class type.
412 # of the exception's actual class type.
411 exception_type = self._exception.__class__
413 exception_type = self._exception.__class__
412 else:
414 else:
413 exception_type = type(self._exception)
415 exception_type = type(self._exception)
414 raise exception_type, self._exception, self._traceback
416 raise exception_type, self._exception, self._traceback
415 else:
417 else:
416 return self._result
418 return self._result
417
419
418 def add_done_callback(self, fn):
420 def add_done_callback(self, fn):
419 """Attaches a callable that will be called when the future finishes.
421 """Attaches a callable that will be called when the future finishes.
420
422
421 Args:
423 Args:
422 fn: A callable that will be called with this future as its only
424 fn: A callable that will be called with this future as its only
423 argument when the future completes or is cancelled. The callable
425 argument when the future completes or is cancelled. The callable
424 will always be called by a thread in the same process in which
426 will always be called by a thread in the same process in which
425 it was added. If the future has already completed or been
427 it was added. If the future has already completed or been
426 cancelled then the callable will be called immediately. These
428 cancelled then the callable will be called immediately. These
427 callables are called in the order that they were added.
429 callables are called in the order that they were added.
428 """
430 """
429 with self._condition:
431 with self._condition:
430 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
432 if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431 self._done_callbacks.append(fn)
433 self._done_callbacks.append(fn)
432 return
434 return
433 fn(self)
435 fn(self)
434
436
435 def result(self, timeout=None):
437 def result(self, timeout=None):
436 """Return the result of the call that the future represents.
438 """Return the result of the call that the future represents.
437
439
438 Args:
440 Args:
439 timeout: The number of seconds to wait for the result if the future
441 timeout: The number of seconds to wait for the result if the future
440 isn't done. If None, then there is no limit on the wait time.
442 isn't done. If None, then there is no limit on the wait time.
441
443
442 Returns:
444 Returns:
443 The result of the call that the future represents.
445 The result of the call that the future represents.
444
446
445 Raises:
447 Raises:
446 CancelledError: If the future was cancelled.
448 CancelledError: If the future was cancelled.
447 TimeoutError: If the future didn't finish executing before the given
449 TimeoutError: If the future didn't finish executing before the given
448 timeout.
450 timeout.
449 Exception: If the call raised then that exception will be raised.
451 Exception: If the call raised then that exception will be raised.
450 """
452 """
451 with self._condition:
453 with self._condition:
452 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
454 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453 raise CancelledError()
455 raise CancelledError()
454 elif self._state == FINISHED:
456 elif self._state == FINISHED:
455 return self.__get_result()
457 return self.__get_result()
456
458
457 self._condition.wait(timeout)
459 self._condition.wait(timeout)
458
460
459 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
461 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460 raise CancelledError()
462 raise CancelledError()
461 elif self._state == FINISHED:
463 elif self._state == FINISHED:
462 return self.__get_result()
464 return self.__get_result()
463 else:
465 else:
464 raise TimeoutError()
466 raise TimeoutError()
465
467
466 def exception_info(self, timeout=None):
468 def exception_info(self, timeout=None):
467 """Return a tuple of (exception, traceback) raised by the call that the
469 """Return a tuple of (exception, traceback) raised by the call that the
468 future represents.
470 future represents.
469
471
470 Args:
472 Args:
471 timeout: The number of seconds to wait for the exception if the
473 timeout: The number of seconds to wait for the exception if the
472 future isn't done. If None, then there is no limit on the wait
474 future isn't done. If None, then there is no limit on the wait
473 time.
475 time.
474
476
475 Returns:
477 Returns:
476 The exception raised by the call that the future represents or None
478 The exception raised by the call that the future represents or None
477 if the call completed without raising.
479 if the call completed without raising.
478
480
479 Raises:
481 Raises:
480 CancelledError: If the future was cancelled.
482 CancelledError: If the future was cancelled.
481 TimeoutError: If the future didn't finish executing before the given
483 TimeoutError: If the future didn't finish executing before the given
482 timeout.
484 timeout.
483 """
485 """
484 with self._condition:
486 with self._condition:
485 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
487 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486 raise CancelledError()
488 raise CancelledError()
487 elif self._state == FINISHED:
489 elif self._state == FINISHED:
488 return self._exception, self._traceback
490 return self._exception, self._traceback
489
491
490 self._condition.wait(timeout)
492 self._condition.wait(timeout)
491
493
492 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
494 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493 raise CancelledError()
495 raise CancelledError()
494 elif self._state == FINISHED:
496 elif self._state == FINISHED:
495 return self._exception, self._traceback
497 return self._exception, self._traceback
496 else:
498 else:
497 raise TimeoutError()
499 raise TimeoutError()
498
500
499 def exception(self, timeout=None):
501 def exception(self, timeout=None):
500 """Return the exception raised by the call that the future represents.
502 """Return the exception raised by the call that the future represents.
501
503
502 Args:
504 Args:
503 timeout: The number of seconds to wait for the exception if the
505 timeout: The number of seconds to wait for the exception if the
504 future isn't done. If None, then there is no limit on the wait
506 future isn't done. If None, then there is no limit on the wait
505 time.
507 time.
506
508
507 Returns:
509 Returns:
508 The exception raised by the call that the future represents or None
510 The exception raised by the call that the future represents or None
509 if the call completed without raising.
511 if the call completed without raising.
510
512
511 Raises:
513 Raises:
512 CancelledError: If the future was cancelled.
514 CancelledError: If the future was cancelled.
513 TimeoutError: If the future didn't finish executing before the given
515 TimeoutError: If the future didn't finish executing before the given
514 timeout.
516 timeout.
515 """
517 """
516 return self.exception_info(timeout)[0]
518 return self.exception_info(timeout)[0]
517
519
518 # The following methods should only be used by Executors and in tests.
520 # The following methods should only be used by Executors and in tests.
519 def set_running_or_notify_cancel(self):
521 def set_running_or_notify_cancel(self):
520 """Mark the future as running or process any cancel notifications.
522 """Mark the future as running or process any cancel notifications.
521
523
522 Should only be used by Executor implementations and unit tests.
524 Should only be used by Executor implementations and unit tests.
523
525
524 If the future has been cancelled (cancel() was called and returned
526 If the future has been cancelled (cancel() was called and returned
525 True) then any threads waiting on the future completing (though calls
527 True) then any threads waiting on the future completing (though calls
526 to as_completed() or wait()) are notified and False is returned.
528 to as_completed() or wait()) are notified and False is returned.
527
529
528 If the future was not cancelled then it is put in the running state
530 If the future was not cancelled then it is put in the running state
529 (future calls to running() will return True) and True is returned.
531 (future calls to running() will return True) and True is returned.
530
532
531 This method should be called by Executor implementations before
533 This method should be called by Executor implementations before
532 executing the work associated with this future. If this method returns
534 executing the work associated with this future. If this method returns
533 False then the work should not be executed.
535 False then the work should not be executed.
534
536
535 Returns:
537 Returns:
536 False if the Future was cancelled, True otherwise.
538 False if the Future was cancelled, True otherwise.
537
539
538 Raises:
540 Raises:
539 RuntimeError: if this method was already called or if set_result()
541 RuntimeError: if this method was already called or if set_result()
540 or set_exception() was called.
542 or set_exception() was called.
541 """
543 """
542 with self._condition:
544 with self._condition:
543 if self._state == CANCELLED:
545 if self._state == CANCELLED:
544 self._state = CANCELLED_AND_NOTIFIED
546 self._state = CANCELLED_AND_NOTIFIED
545 for waiter in self._waiters:
547 for waiter in self._waiters:
546 waiter.add_cancelled(self)
548 waiter.add_cancelled(self)
547 # self._condition.notify_all() is not necessary because
549 # self._condition.notify_all() is not necessary because
548 # self.cancel() triggers a notification.
550 # self.cancel() triggers a notification.
549 return False
551 return False
550 elif self._state == PENDING:
552 elif self._state == PENDING:
551 self._state = RUNNING
553 self._state = RUNNING
552 return True
554 return True
553 else:
555 else:
554 LOGGER.critical('Future %s in unexpected state: %s',
556 LOGGER.critical('Future %s in unexpected state: %s',
555 id(self),
557 id(self),
556 self._state)
558 self._state)
557 raise RuntimeError('Future in unexpected state')
559 raise RuntimeError('Future in unexpected state')
558
560
559 def set_result(self, result):
561 def set_result(self, result):
560 """Sets the return value of work associated with the future.
562 """Sets the return value of work associated with the future.
561
563
562 Should only be used by Executor implementations and unit tests.
564 Should only be used by Executor implementations and unit tests.
563 """
565 """
564 with self._condition:
566 with self._condition:
565 self._result = result
567 self._result = result
566 self._state = FINISHED
568 self._state = FINISHED
567 for waiter in self._waiters:
569 for waiter in self._waiters:
568 waiter.add_result(self)
570 waiter.add_result(self)
569 self._condition.notify_all()
571 self._condition.notify_all()
570 self._invoke_callbacks()
572 self._invoke_callbacks()
571
573
572 def set_exception_info(self, exception, traceback):
574 def set_exception_info(self, exception, traceback):
573 """Sets the result of the future as being the given exception
575 """Sets the result of the future as being the given exception
574 and traceback.
576 and traceback.
575
577
576 Should only be used by Executor implementations and unit tests.
578 Should only be used by Executor implementations and unit tests.
577 """
579 """
578 with self._condition:
580 with self._condition:
579 self._exception = exception
581 self._exception = exception
580 self._traceback = traceback
582 self._traceback = traceback
581 self._state = FINISHED
583 self._state = FINISHED
582 for waiter in self._waiters:
584 for waiter in self._waiters:
583 waiter.add_exception(self)
585 waiter.add_exception(self)
584 self._condition.notify_all()
586 self._condition.notify_all()
585 self._invoke_callbacks()
587 self._invoke_callbacks()
586
588
587 def set_exception(self, exception):
589 def set_exception(self, exception):
588 """Sets the result of the future as being the given exception.
590 """Sets the result of the future as being the given exception.
589
591
590 Should only be used by Executor implementations and unit tests.
592 Should only be used by Executor implementations and unit tests.
591 """
593 """
592 self.set_exception_info(exception, None)
594 self.set_exception_info(exception, None)
593
595
594 class Executor(object):
596 class Executor(object):
595 """This is an abstract base class for concrete asynchronous executors."""
597 """This is an abstract base class for concrete asynchronous executors."""
596
598
597 def submit(self, fn, *args, **kwargs):
599 def submit(self, fn, *args, **kwargs):
598 """Submits a callable to be executed with the given arguments.
600 """Submits a callable to be executed with the given arguments.
599
601
600 Schedules the callable to be executed as fn(*args, **kwargs) and returns
602 Schedules the callable to be executed as fn(*args, **kwargs) and returns
601 a Future instance representing the execution of the callable.
603 a Future instance representing the execution of the callable.
602
604
603 Returns:
605 Returns:
604 A Future representing the given call.
606 A Future representing the given call.
605 """
607 """
606 raise NotImplementedError()
608 raise NotImplementedError()
607
609
608 def map(self, fn, *iterables, **kwargs):
610 def map(self, fn, *iterables, **kwargs):
609 """Returns an iterator equivalent to map(fn, iter).
611 """Returns an iterator equivalent to map(fn, iter).
610
612
611 Args:
613 Args:
612 fn: A callable that will take as many arguments as there are
614 fn: A callable that will take as many arguments as there are
613 passed iterables.
615 passed iterables.
614 timeout: The maximum number of seconds to wait. If None, then there
616 timeout: The maximum number of seconds to wait. If None, then there
615 is no limit on the wait time.
617 is no limit on the wait time.
616
618
617 Returns:
619 Returns:
618 An iterator equivalent to: map(func, *iterables) but the calls may
620 An iterator equivalent to: map(func, *iterables) but the calls may
619 be evaluated out-of-order.
621 be evaluated out-of-order.
620
622
621 Raises:
623 Raises:
622 TimeoutError: If the entire result iterator could not be generated
624 TimeoutError: If the entire result iterator could not be generated
623 before the given timeout.
625 before the given timeout.
624 Exception: If fn(*args) raises for any values.
626 Exception: If fn(*args) raises for any values.
625 """
627 """
626 timeout = kwargs.get('timeout')
628 timeout = kwargs.get('timeout')
627 if timeout is not None:
629 if timeout is not None:
628 end_time = timeout + time.time()
630 end_time = timeout + time.time()
629
631
630 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
632 fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
631
633
632 # Yield must be hidden in closure so that the futures are submitted
634 # Yield must be hidden in closure so that the futures are submitted
633 # before the first iterator value is required.
635 # before the first iterator value is required.
634 def result_iterator():
636 def result_iterator():
635 try:
637 try:
636 # reverse to keep finishing order
638 # reverse to keep finishing order
637 fs.reverse()
639 fs.reverse()
638 while fs:
640 while fs:
639 # Careful not to keep a reference to the popped future
641 # Careful not to keep a reference to the popped future
640 if timeout is None:
642 if timeout is None:
641 yield fs.pop().result()
643 yield fs.pop().result()
642 else:
644 else:
643 yield fs.pop().result(end_time - time.time())
645 yield fs.pop().result(end_time - time.time())
644 finally:
646 finally:
645 for future in fs:
647 for future in fs:
646 future.cancel()
648 future.cancel()
647 return result_iterator()
649 return result_iterator()
648
650
649 def shutdown(self, wait=True):
651 def shutdown(self, wait=True):
650 """Clean-up the resources associated with the Executor.
652 """Clean-up the resources associated with the Executor.
651
653
652 It is safe to call this method several times. Otherwise, no other
654 It is safe to call this method several times. Otherwise, no other
653 methods can be called after this one.
655 methods can be called after this one.
654
656
655 Args:
657 Args:
656 wait: If True then shutdown will not return until all running
658 wait: If True then shutdown will not return until all running
657 futures have finished executing and the resources used by the
659 futures have finished executing and the resources used by the
658 executor have been reclaimed.
660 executor have been reclaimed.
659 """
661 """
660 pass
662 pass
661
663
662 def __enter__(self):
664 def __enter__(self):
663 return self
665 return self
664
666
665 def __exit__(self, exc_type, exc_val, exc_tb):
667 def __exit__(self, exc_type, exc_val, exc_tb):
666 self.shutdown(wait=True)
668 self.shutdown(wait=True)
667 return False
669 return False
@@ -1,363 +1,365
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
2 # Licensed to PSF under a Contributor Agreement.
3
3
4 """Implements ProcessPoolExecutor.
4 """Implements ProcessPoolExecutor.
5
5
6 The follow diagram and text describe the data-flow through the system:
6 The follow diagram and text describe the data-flow through the system:
7
7
8 |======================= In-process =====================|== Out-of-process ==|
8 |======================= In-process =====================|== Out-of-process ==|
9
9
10 +----------+ +----------+ +--------+ +-----------+ +---------+
10 +----------+ +----------+ +--------+ +-----------+ +---------+
11 | | => | Work Ids | => | | => | Call Q | => | |
11 | | => | Work Ids | => | | => | Call Q | => | |
12 | | +----------+ | | +-----------+ | |
12 | | +----------+ | | +-----------+ | |
13 | | | ... | | | | ... | | |
13 | | | ... | | | | ... | | |
14 | | | 6 | | | | 5, call() | | |
14 | | | 6 | | | | 5, call() | | |
15 | | | 7 | | | | ... | | |
15 | | | 7 | | | | ... | | |
16 | Process | | ... | | Local | +-----------+ | Process |
16 | Process | | ... | | Local | +-----------+ | Process |
17 | Pool | +----------+ | Worker | | #1..n |
17 | Pool | +----------+ | Worker | | #1..n |
18 | Executor | | Thread | | |
18 | Executor | | Thread | | |
19 | | +----------- + | | +-----------+ | |
19 | | +----------- + | | +-----------+ | |
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
21 | | +------------+ | | +-----------+ | |
21 | | +------------+ | | +-----------+ | |
22 | | | 6: call() | | | | ... | | |
22 | | | 6: call() | | | | ... | | |
23 | | | future | | | | 4, result | | |
23 | | | future | | | | 4, result | | |
24 | | | ... | | | | 3, except | | |
24 | | | ... | | | | 3, except | | |
25 +----------+ +------------+ +--------+ +-----------+ +---------+
25 +----------+ +------------+ +--------+ +-----------+ +---------+
26
26
27 Executor.submit() called:
27 Executor.submit() called:
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29 - adds the id of the _WorkItem to the "Work Ids" queue
29 - adds the id of the _WorkItem to the "Work Ids" queue
30
30
31 Local worker thread:
31 Local worker thread:
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38 - reads _ResultItems from "Result Q", updates the future stored in the
38 - reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
39 "Work Items" dict and deletes the dict entry
40
40
41 Process #1..n:
41 Process #1..n:
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Request Q"
43 _ResultItems in "Request Q"
44 """
44 """
45
45
46 from __future__ import absolute_import
47
46 import atexit
48 import atexit
47 from concurrent.futures import _base
49 from . import _base
48 import Queue as queue
50 import Queue as queue
49 import multiprocessing
51 import multiprocessing
50 import threading
52 import threading
51 import weakref
53 import weakref
52 import sys
54 import sys
53
55
54 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
56 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
55
57
56 # Workers are created as daemon threads and processes. This is done to allow the
58 # Workers are created as daemon threads and processes. This is done to allow the
57 # interpreter to exit when there are still idle processes in a
59 # interpreter to exit when there are still idle processes in a
58 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
60 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
59 # allowing workers to die with the interpreter has two undesirable properties:
61 # allowing workers to die with the interpreter has two undesirable properties:
60 # - The workers would still be running during interpretor shutdown,
62 # - The workers would still be running during interpretor shutdown,
61 # meaning that they would fail in unpredictable ways.
63 # meaning that they would fail in unpredictable ways.
62 # - The workers could be killed while evaluating a work item, which could
64 # - The workers could be killed while evaluating a work item, which could
63 # be bad if the callable being evaluated has external side-effects e.g.
65 # be bad if the callable being evaluated has external side-effects e.g.
64 # writing to a file.
66 # writing to a file.
65 #
67 #
66 # To work around this problem, an exit handler is installed which tells the
68 # To work around this problem, an exit handler is installed which tells the
67 # workers to exit when their work queues are empty and then waits until the
69 # workers to exit when their work queues are empty and then waits until the
68 # threads/processes finish.
70 # threads/processes finish.
69
71
70 _threads_queues = weakref.WeakKeyDictionary()
72 _threads_queues = weakref.WeakKeyDictionary()
71 _shutdown = False
73 _shutdown = False
72
74
73 def _python_exit():
75 def _python_exit():
74 global _shutdown
76 global _shutdown
75 _shutdown = True
77 _shutdown = True
76 items = list(_threads_queues.items()) if _threads_queues else ()
78 items = list(_threads_queues.items()) if _threads_queues else ()
77 for t, q in items:
79 for t, q in items:
78 q.put(None)
80 q.put(None)
79 for t, q in items:
81 for t, q in items:
80 t.join(sys.maxint)
82 t.join(sys.maxint)
81
83
82 # Controls how many more calls than processes will be queued in the call queue.
84 # Controls how many more calls than processes will be queued in the call queue.
83 # A smaller number will mean that processes spend more time idle waiting for
85 # A smaller number will mean that processes spend more time idle waiting for
84 # work while a larger number will make Future.cancel() succeed less frequently
86 # work while a larger number will make Future.cancel() succeed less frequently
85 # (Futures in the call queue cannot be cancelled).
87 # (Futures in the call queue cannot be cancelled).
86 EXTRA_QUEUED_CALLS = 1
88 EXTRA_QUEUED_CALLS = 1
87
89
88 class _WorkItem(object):
90 class _WorkItem(object):
89 def __init__(self, future, fn, args, kwargs):
91 def __init__(self, future, fn, args, kwargs):
90 self.future = future
92 self.future = future
91 self.fn = fn
93 self.fn = fn
92 self.args = args
94 self.args = args
93 self.kwargs = kwargs
95 self.kwargs = kwargs
94
96
95 class _ResultItem(object):
97 class _ResultItem(object):
96 def __init__(self, work_id, exception=None, result=None):
98 def __init__(self, work_id, exception=None, result=None):
97 self.work_id = work_id
99 self.work_id = work_id
98 self.exception = exception
100 self.exception = exception
99 self.result = result
101 self.result = result
100
102
101 class _CallItem(object):
103 class _CallItem(object):
102 def __init__(self, work_id, fn, args, kwargs):
104 def __init__(self, work_id, fn, args, kwargs):
103 self.work_id = work_id
105 self.work_id = work_id
104 self.fn = fn
106 self.fn = fn
105 self.args = args
107 self.args = args
106 self.kwargs = kwargs
108 self.kwargs = kwargs
107
109
108 def _process_worker(call_queue, result_queue):
110 def _process_worker(call_queue, result_queue):
109 """Evaluates calls from call_queue and places the results in result_queue.
111 """Evaluates calls from call_queue and places the results in result_queue.
110
112
111 This worker is run in a separate process.
113 This worker is run in a separate process.
112
114
113 Args:
115 Args:
114 call_queue: A multiprocessing.Queue of _CallItems that will be read and
116 call_queue: A multiprocessing.Queue of _CallItems that will be read and
115 evaluated by the worker.
117 evaluated by the worker.
116 result_queue: A multiprocessing.Queue of _ResultItems that will written
118 result_queue: A multiprocessing.Queue of _ResultItems that will written
117 to by the worker.
119 to by the worker.
118 shutdown: A multiprocessing.Event that will be set as a signal to the
120 shutdown: A multiprocessing.Event that will be set as a signal to the
119 worker that it should exit when call_queue is empty.
121 worker that it should exit when call_queue is empty.
120 """
122 """
121 while True:
123 while True:
122 call_item = call_queue.get(block=True)
124 call_item = call_queue.get(block=True)
123 if call_item is None:
125 if call_item is None:
124 # Wake up queue management thread
126 # Wake up queue management thread
125 result_queue.put(None)
127 result_queue.put(None)
126 return
128 return
127 try:
129 try:
128 r = call_item.fn(*call_item.args, **call_item.kwargs)
130 r = call_item.fn(*call_item.args, **call_item.kwargs)
129 except:
131 except:
130 e = sys.exc_info()[1]
132 e = sys.exc_info()[1]
131 result_queue.put(_ResultItem(call_item.work_id,
133 result_queue.put(_ResultItem(call_item.work_id,
132 exception=e))
134 exception=e))
133 else:
135 else:
134 result_queue.put(_ResultItem(call_item.work_id,
136 result_queue.put(_ResultItem(call_item.work_id,
135 result=r))
137 result=r))
136
138
137 def _add_call_item_to_queue(pending_work_items,
139 def _add_call_item_to_queue(pending_work_items,
138 work_ids,
140 work_ids,
139 call_queue):
141 call_queue):
140 """Fills call_queue with _WorkItems from pending_work_items.
142 """Fills call_queue with _WorkItems from pending_work_items.
141
143
142 This function never blocks.
144 This function never blocks.
143
145
144 Args:
146 Args:
145 pending_work_items: A dict mapping work ids to _WorkItems e.g.
147 pending_work_items: A dict mapping work ids to _WorkItems e.g.
146 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
148 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
147 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
149 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
148 are consumed and the corresponding _WorkItems from
150 are consumed and the corresponding _WorkItems from
149 pending_work_items are transformed into _CallItems and put in
151 pending_work_items are transformed into _CallItems and put in
150 call_queue.
152 call_queue.
151 call_queue: A multiprocessing.Queue that will be filled with _CallItems
153 call_queue: A multiprocessing.Queue that will be filled with _CallItems
152 derived from _WorkItems.
154 derived from _WorkItems.
153 """
155 """
154 while True:
156 while True:
155 if call_queue.full():
157 if call_queue.full():
156 return
158 return
157 try:
159 try:
158 work_id = work_ids.get(block=False)
160 work_id = work_ids.get(block=False)
159 except queue.Empty:
161 except queue.Empty:
160 return
162 return
161 else:
163 else:
162 work_item = pending_work_items[work_id]
164 work_item = pending_work_items[work_id]
163
165
164 if work_item.future.set_running_or_notify_cancel():
166 if work_item.future.set_running_or_notify_cancel():
165 call_queue.put(_CallItem(work_id,
167 call_queue.put(_CallItem(work_id,
166 work_item.fn,
168 work_item.fn,
167 work_item.args,
169 work_item.args,
168 work_item.kwargs),
170 work_item.kwargs),
169 block=True)
171 block=True)
170 else:
172 else:
171 del pending_work_items[work_id]
173 del pending_work_items[work_id]
172 continue
174 continue
173
175
174 def _queue_management_worker(executor_reference,
176 def _queue_management_worker(executor_reference,
175 processes,
177 processes,
176 pending_work_items,
178 pending_work_items,
177 work_ids_queue,
179 work_ids_queue,
178 call_queue,
180 call_queue,
179 result_queue):
181 result_queue):
180 """Manages the communication between this process and the worker processes.
182 """Manages the communication between this process and the worker processes.
181
183
182 This function is run in a local thread.
184 This function is run in a local thread.
183
185
184 Args:
186 Args:
185 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
187 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
186 this thread. Used to determine if the ProcessPoolExecutor has been
188 this thread. Used to determine if the ProcessPoolExecutor has been
187 garbage collected and that this function can exit.
189 garbage collected and that this function can exit.
188 process: A list of the multiprocessing.Process instances used as
190 process: A list of the multiprocessing.Process instances used as
189 workers.
191 workers.
190 pending_work_items: A dict mapping work ids to _WorkItems e.g.
192 pending_work_items: A dict mapping work ids to _WorkItems e.g.
191 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
193 {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
194 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193 call_queue: A multiprocessing.Queue that will be filled with _CallItems
195 call_queue: A multiprocessing.Queue that will be filled with _CallItems
194 derived from _WorkItems for processing by the process workers.
196 derived from _WorkItems for processing by the process workers.
195 result_queue: A multiprocessing.Queue of _ResultItems generated by the
197 result_queue: A multiprocessing.Queue of _ResultItems generated by the
196 process workers.
198 process workers.
197 """
199 """
198 nb_shutdown_processes = [0]
200 nb_shutdown_processes = [0]
199 def shutdown_one_process():
201 def shutdown_one_process():
200 """Tell a worker to terminate, which will in turn wake us again"""
202 """Tell a worker to terminate, which will in turn wake us again"""
201 call_queue.put(None)
203 call_queue.put(None)
202 nb_shutdown_processes[0] += 1
204 nb_shutdown_processes[0] += 1
203 while True:
205 while True:
204 _add_call_item_to_queue(pending_work_items,
206 _add_call_item_to_queue(pending_work_items,
205 work_ids_queue,
207 work_ids_queue,
206 call_queue)
208 call_queue)
207
209
208 result_item = result_queue.get(block=True)
210 result_item = result_queue.get(block=True)
209 if result_item is not None:
211 if result_item is not None:
210 work_item = pending_work_items[result_item.work_id]
212 work_item = pending_work_items[result_item.work_id]
211 del pending_work_items[result_item.work_id]
213 del pending_work_items[result_item.work_id]
212
214
213 if result_item.exception:
215 if result_item.exception:
214 work_item.future.set_exception(result_item.exception)
216 work_item.future.set_exception(result_item.exception)
215 else:
217 else:
216 work_item.future.set_result(result_item.result)
218 work_item.future.set_result(result_item.result)
217 # Delete references to object. See issue16284
219 # Delete references to object. See issue16284
218 del work_item
220 del work_item
219 # Check whether we should start shutting down.
221 # Check whether we should start shutting down.
220 executor = executor_reference()
222 executor = executor_reference()
221 # No more work items can be added if:
223 # No more work items can be added if:
222 # - The interpreter is shutting down OR
224 # - The interpreter is shutting down OR
223 # - The executor that owns this worker has been collected OR
225 # - The executor that owns this worker has been collected OR
224 # - The executor that owns this worker has been shutdown.
226 # - The executor that owns this worker has been shutdown.
225 if _shutdown or executor is None or executor._shutdown_thread:
227 if _shutdown or executor is None or executor._shutdown_thread:
226 # Since no new work items can be added, it is safe to shutdown
228 # Since no new work items can be added, it is safe to shutdown
227 # this thread if there are no pending work items.
229 # this thread if there are no pending work items.
228 if not pending_work_items:
230 if not pending_work_items:
229 while nb_shutdown_processes[0] < len(processes):
231 while nb_shutdown_processes[0] < len(processes):
230 shutdown_one_process()
232 shutdown_one_process()
231 # If .join() is not called on the created processes then
233 # If .join() is not called on the created processes then
232 # some multiprocessing.Queue methods may deadlock on Mac OS
234 # some multiprocessing.Queue methods may deadlock on Mac OS
233 # X.
235 # X.
234 for p in processes:
236 for p in processes:
235 p.join()
237 p.join()
236 call_queue.close()
238 call_queue.close()
237 return
239 return
238 del executor
240 del executor
239
241
240 _system_limits_checked = False
242 _system_limits_checked = False
241 _system_limited = None
243 _system_limited = None
242 def _check_system_limits():
244 def _check_system_limits():
243 global _system_limits_checked, _system_limited
245 global _system_limits_checked, _system_limited
244 if _system_limits_checked:
246 if _system_limits_checked:
245 if _system_limited:
247 if _system_limited:
246 raise NotImplementedError(_system_limited)
248 raise NotImplementedError(_system_limited)
247 _system_limits_checked = True
249 _system_limits_checked = True
248 try:
250 try:
249 import os
251 import os
250 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
252 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
251 except (AttributeError, ValueError):
253 except (AttributeError, ValueError):
252 # sysconf not available or setting not available
254 # sysconf not available or setting not available
253 return
255 return
254 if nsems_max == -1:
256 if nsems_max == -1:
255 # indetermine limit, assume that limit is determined
257 # indetermine limit, assume that limit is determined
256 # by available memory only
258 # by available memory only
257 return
259 return
258 if nsems_max >= 256:
260 if nsems_max >= 256:
259 # minimum number of semaphores available
261 # minimum number of semaphores available
260 # according to POSIX
262 # according to POSIX
261 return
263 return
262 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
264 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
263 raise NotImplementedError(_system_limited)
265 raise NotImplementedError(_system_limited)
264
266
265
267
266 class ProcessPoolExecutor(_base.Executor):
268 class ProcessPoolExecutor(_base.Executor):
267 def __init__(self, max_workers=None):
269 def __init__(self, max_workers=None):
268 """Initializes a new ProcessPoolExecutor instance.
270 """Initializes a new ProcessPoolExecutor instance.
269
271
270 Args:
272 Args:
271 max_workers: The maximum number of processes that can be used to
273 max_workers: The maximum number of processes that can be used to
272 execute the given calls. If None or not given then as many
274 execute the given calls. If None or not given then as many
273 worker processes will be created as the machine has processors.
275 worker processes will be created as the machine has processors.
274 """
276 """
275 _check_system_limits()
277 _check_system_limits()
276
278
277 if max_workers is None:
279 if max_workers is None:
278 self._max_workers = multiprocessing.cpu_count()
280 self._max_workers = multiprocessing.cpu_count()
279 else:
281 else:
280 if max_workers <= 0:
282 if max_workers <= 0:
281 raise ValueError("max_workers must be greater than 0")
283 raise ValueError("max_workers must be greater than 0")
282
284
283 self._max_workers = max_workers
285 self._max_workers = max_workers
284
286
285 # Make the call queue slightly larger than the number of processes to
287 # Make the call queue slightly larger than the number of processes to
286 # prevent the worker processes from idling. But don't make it too big
288 # prevent the worker processes from idling. But don't make it too big
287 # because futures in the call queue cannot be cancelled.
289 # because futures in the call queue cannot be cancelled.
288 self._call_queue = multiprocessing.Queue(self._max_workers +
290 self._call_queue = multiprocessing.Queue(self._max_workers +
289 EXTRA_QUEUED_CALLS)
291 EXTRA_QUEUED_CALLS)
290 self._result_queue = multiprocessing.Queue()
292 self._result_queue = multiprocessing.Queue()
291 self._work_ids = queue.Queue()
293 self._work_ids = queue.Queue()
292 self._queue_management_thread = None
294 self._queue_management_thread = None
293 self._processes = set()
295 self._processes = set()
294
296
295 # Shutdown is a two-step process.
297 # Shutdown is a two-step process.
296 self._shutdown_thread = False
298 self._shutdown_thread = False
297 self._shutdown_lock = threading.Lock()
299 self._shutdown_lock = threading.Lock()
298 self._queue_count = 0
300 self._queue_count = 0
299 self._pending_work_items = {}
301 self._pending_work_items = {}
300
302
301 def _start_queue_management_thread(self):
303 def _start_queue_management_thread(self):
302 # When the executor gets lost, the weakref callback will wake up
304 # When the executor gets lost, the weakref callback will wake up
303 # the queue management thread.
305 # the queue management thread.
304 def weakref_cb(_, q=self._result_queue):
306 def weakref_cb(_, q=self._result_queue):
305 q.put(None)
307 q.put(None)
306 if self._queue_management_thread is None:
308 if self._queue_management_thread is None:
307 self._queue_management_thread = threading.Thread(
309 self._queue_management_thread = threading.Thread(
308 target=_queue_management_worker,
310 target=_queue_management_worker,
309 args=(weakref.ref(self, weakref_cb),
311 args=(weakref.ref(self, weakref_cb),
310 self._processes,
312 self._processes,
311 self._pending_work_items,
313 self._pending_work_items,
312 self._work_ids,
314 self._work_ids,
313 self._call_queue,
315 self._call_queue,
314 self._result_queue))
316 self._result_queue))
315 self._queue_management_thread.daemon = True
317 self._queue_management_thread.daemon = True
316 self._queue_management_thread.start()
318 self._queue_management_thread.start()
317 _threads_queues[self._queue_management_thread] = self._result_queue
319 _threads_queues[self._queue_management_thread] = self._result_queue
318
320
319 def _adjust_process_count(self):
321 def _adjust_process_count(self):
320 for _ in range(len(self._processes), self._max_workers):
322 for _ in range(len(self._processes), self._max_workers):
321 p = multiprocessing.Process(
323 p = multiprocessing.Process(
322 target=_process_worker,
324 target=_process_worker,
323 args=(self._call_queue,
325 args=(self._call_queue,
324 self._result_queue))
326 self._result_queue))
325 p.start()
327 p.start()
326 self._processes.add(p)
328 self._processes.add(p)
327
329
328 def submit(self, fn, *args, **kwargs):
330 def submit(self, fn, *args, **kwargs):
329 with self._shutdown_lock:
331 with self._shutdown_lock:
330 if self._shutdown_thread:
332 if self._shutdown_thread:
331 raise RuntimeError('cannot schedule new futures after shutdown')
333 raise RuntimeError('cannot schedule new futures after shutdown')
332
334
333 f = _base.Future()
335 f = _base.Future()
334 w = _WorkItem(f, fn, args, kwargs)
336 w = _WorkItem(f, fn, args, kwargs)
335
337
336 self._pending_work_items[self._queue_count] = w
338 self._pending_work_items[self._queue_count] = w
337 self._work_ids.put(self._queue_count)
339 self._work_ids.put(self._queue_count)
338 self._queue_count += 1
340 self._queue_count += 1
339 # Wake up queue management thread
341 # Wake up queue management thread
340 self._result_queue.put(None)
342 self._result_queue.put(None)
341
343
342 self._start_queue_management_thread()
344 self._start_queue_management_thread()
343 self._adjust_process_count()
345 self._adjust_process_count()
344 return f
346 return f
345 submit.__doc__ = _base.Executor.submit.__doc__
347 submit.__doc__ = _base.Executor.submit.__doc__
346
348
347 def shutdown(self, wait=True):
349 def shutdown(self, wait=True):
348 with self._shutdown_lock:
350 with self._shutdown_lock:
349 self._shutdown_thread = True
351 self._shutdown_thread = True
350 if self._queue_management_thread:
352 if self._queue_management_thread:
351 # Wake up queue management thread
353 # Wake up queue management thread
352 self._result_queue.put(None)
354 self._result_queue.put(None)
353 if wait:
355 if wait:
354 self._queue_management_thread.join(sys.maxint)
356 self._queue_management_thread.join(sys.maxint)
355 # To reduce the risk of openning too many files, remove references to
357 # To reduce the risk of openning too many files, remove references to
356 # objects that use file descriptors.
358 # objects that use file descriptors.
357 self._queue_management_thread = None
359 self._queue_management_thread = None
358 self._call_queue = None
360 self._call_queue = None
359 self._result_queue = None
361 self._result_queue = None
360 self._processes = None
362 self._processes = None
361 shutdown.__doc__ = _base.Executor.shutdown.__doc__
363 shutdown.__doc__ = _base.Executor.shutdown.__doc__
362
364
363 atexit.register(_python_exit)
365 atexit.register(_python_exit)
@@ -1,160 +1,162
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
2 # Licensed to PSF under a Contributor Agreement.
3
3
4 """Implements ThreadPoolExecutor."""
4 """Implements ThreadPoolExecutor."""
5
5
6 from __future__ import absolute_import
7
6 import atexit
8 import atexit
7 from concurrent.futures import _base
9 from . import _base
8 import itertools
10 import itertools
9 import Queue as queue
11 import Queue as queue
10 import threading
12 import threading
11 import weakref
13 import weakref
12 import sys
14 import sys
13
15
14 try:
16 try:
15 from multiprocessing import cpu_count
17 from multiprocessing import cpu_count
16 except ImportError:
18 except ImportError:
17 # some platforms don't have multiprocessing
19 # some platforms don't have multiprocessing
18 def cpu_count():
20 def cpu_count():
19 return None
21 return None
20
22
21 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
23 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
22
24
23 # Workers are created as daemon threads. This is done to allow the interpreter
25 # Workers are created as daemon threads. This is done to allow the interpreter
24 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
26 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
25 # pool (i.e. shutdown() was not called). However, allowing workers to die with
27 # pool (i.e. shutdown() was not called). However, allowing workers to die with
26 # the interpreter has two undesirable properties:
28 # the interpreter has two undesirable properties:
27 # - The workers would still be running during interpretor shutdown,
29 # - The workers would still be running during interpretor shutdown,
28 # meaning that they would fail in unpredictable ways.
30 # meaning that they would fail in unpredictable ways.
29 # - The workers could be killed while evaluating a work item, which could
31 # - The workers could be killed while evaluating a work item, which could
30 # be bad if the callable being evaluated has external side-effects e.g.
32 # be bad if the callable being evaluated has external side-effects e.g.
31 # writing to a file.
33 # writing to a file.
32 #
34 #
33 # To work around this problem, an exit handler is installed which tells the
35 # To work around this problem, an exit handler is installed which tells the
34 # workers to exit when their work queues are empty and then waits until the
36 # workers to exit when their work queues are empty and then waits until the
35 # threads finish.
37 # threads finish.
36
38
37 _threads_queues = weakref.WeakKeyDictionary()
39 _threads_queues = weakref.WeakKeyDictionary()
38 _shutdown = False
40 _shutdown = False
39
41
40 def _python_exit():
42 def _python_exit():
41 global _shutdown
43 global _shutdown
42 _shutdown = True
44 _shutdown = True
43 items = list(_threads_queues.items()) if _threads_queues else ()
45 items = list(_threads_queues.items()) if _threads_queues else ()
44 for t, q in items:
46 for t, q in items:
45 q.put(None)
47 q.put(None)
46 for t, q in items:
48 for t, q in items:
47 t.join(sys.maxint)
49 t.join(sys.maxint)
48
50
49 atexit.register(_python_exit)
51 atexit.register(_python_exit)
50
52
51 class _WorkItem(object):
53 class _WorkItem(object):
52 def __init__(self, future, fn, args, kwargs):
54 def __init__(self, future, fn, args, kwargs):
53 self.future = future
55 self.future = future
54 self.fn = fn
56 self.fn = fn
55 self.args = args
57 self.args = args
56 self.kwargs = kwargs
58 self.kwargs = kwargs
57
59
58 def run(self):
60 def run(self):
59 if not self.future.set_running_or_notify_cancel():
61 if not self.future.set_running_or_notify_cancel():
60 return
62 return
61
63
62 try:
64 try:
63 result = self.fn(*self.args, **self.kwargs)
65 result = self.fn(*self.args, **self.kwargs)
64 except:
66 except:
65 e, tb = sys.exc_info()[1:]
67 e, tb = sys.exc_info()[1:]
66 self.future.set_exception_info(e, tb)
68 self.future.set_exception_info(e, tb)
67 else:
69 else:
68 self.future.set_result(result)
70 self.future.set_result(result)
69
71
70 def _worker(executor_reference, work_queue):
72 def _worker(executor_reference, work_queue):
71 try:
73 try:
72 while True:
74 while True:
73 work_item = work_queue.get(block=True)
75 work_item = work_queue.get(block=True)
74 if work_item is not None:
76 if work_item is not None:
75 work_item.run()
77 work_item.run()
76 # Delete references to object. See issue16284
78 # Delete references to object. See issue16284
77 del work_item
79 del work_item
78 continue
80 continue
79 executor = executor_reference()
81 executor = executor_reference()
80 # Exit if:
82 # Exit if:
81 # - The interpreter is shutting down OR
83 # - The interpreter is shutting down OR
82 # - The executor that owns the worker has been collected OR
84 # - The executor that owns the worker has been collected OR
83 # - The executor that owns the worker has been shutdown.
85 # - The executor that owns the worker has been shutdown.
84 if _shutdown or executor is None or executor._shutdown:
86 if _shutdown or executor is None or executor._shutdown:
85 # Notice other workers
87 # Notice other workers
86 work_queue.put(None)
88 work_queue.put(None)
87 return
89 return
88 del executor
90 del executor
89 except:
91 except:
90 _base.LOGGER.critical('Exception in worker', exc_info=True)
92 _base.LOGGER.critical('Exception in worker', exc_info=True)
91
93
92
94
93 class ThreadPoolExecutor(_base.Executor):
95 class ThreadPoolExecutor(_base.Executor):
94
96
95 # Used to assign unique thread names when thread_name_prefix is not supplied.
97 # Used to assign unique thread names when thread_name_prefix is not supplied.
96 _counter = itertools.count().next
98 _counter = itertools.count().next
97
99
98 def __init__(self, max_workers=None, thread_name_prefix=''):
100 def __init__(self, max_workers=None, thread_name_prefix=''):
99 """Initializes a new ThreadPoolExecutor instance.
101 """Initializes a new ThreadPoolExecutor instance.
100
102
101 Args:
103 Args:
102 max_workers: The maximum number of threads that can be used to
104 max_workers: The maximum number of threads that can be used to
103 execute the given calls.
105 execute the given calls.
104 thread_name_prefix: An optional name prefix to give our threads.
106 thread_name_prefix: An optional name prefix to give our threads.
105 """
107 """
106 if max_workers is None:
108 if max_workers is None:
107 # Use this number because ThreadPoolExecutor is often
109 # Use this number because ThreadPoolExecutor is often
108 # used to overlap I/O instead of CPU work.
110 # used to overlap I/O instead of CPU work.
109 max_workers = (cpu_count() or 1) * 5
111 max_workers = (cpu_count() or 1) * 5
110 if max_workers <= 0:
112 if max_workers <= 0:
111 raise ValueError("max_workers must be greater than 0")
113 raise ValueError("max_workers must be greater than 0")
112
114
113 self._max_workers = max_workers
115 self._max_workers = max_workers
114 self._work_queue = queue.Queue()
116 self._work_queue = queue.Queue()
115 self._threads = set()
117 self._threads = set()
116 self._shutdown = False
118 self._shutdown = False
117 self._shutdown_lock = threading.Lock()
119 self._shutdown_lock = threading.Lock()
118 self._thread_name_prefix = (thread_name_prefix or
120 self._thread_name_prefix = (thread_name_prefix or
119 ("ThreadPoolExecutor-%d" % self._counter()))
121 ("ThreadPoolExecutor-%d" % self._counter()))
120
122
121 def submit(self, fn, *args, **kwargs):
123 def submit(self, fn, *args, **kwargs):
122 with self._shutdown_lock:
124 with self._shutdown_lock:
123 if self._shutdown:
125 if self._shutdown:
124 raise RuntimeError('cannot schedule new futures after shutdown')
126 raise RuntimeError('cannot schedule new futures after shutdown')
125
127
126 f = _base.Future()
128 f = _base.Future()
127 w = _WorkItem(f, fn, args, kwargs)
129 w = _WorkItem(f, fn, args, kwargs)
128
130
129 self._work_queue.put(w)
131 self._work_queue.put(w)
130 self._adjust_thread_count()
132 self._adjust_thread_count()
131 return f
133 return f
132 submit.__doc__ = _base.Executor.submit.__doc__
134 submit.__doc__ = _base.Executor.submit.__doc__
133
135
134 def _adjust_thread_count(self):
136 def _adjust_thread_count(self):
135 # When the executor gets lost, the weakref callback will wake up
137 # When the executor gets lost, the weakref callback will wake up
136 # the worker threads.
138 # the worker threads.
137 def weakref_cb(_, q=self._work_queue):
139 def weakref_cb(_, q=self._work_queue):
138 q.put(None)
140 q.put(None)
139 # TODO(bquinlan): Should avoid creating new threads if there are more
141 # TODO(bquinlan): Should avoid creating new threads if there are more
140 # idle threads than items in the work queue.
142 # idle threads than items in the work queue.
141 num_threads = len(self._threads)
143 num_threads = len(self._threads)
142 if num_threads < self._max_workers:
144 if num_threads < self._max_workers:
143 thread_name = '%s_%d' % (self._thread_name_prefix or self,
145 thread_name = '%s_%d' % (self._thread_name_prefix or self,
144 num_threads)
146 num_threads)
145 t = threading.Thread(name=thread_name, target=_worker,
147 t = threading.Thread(name=thread_name, target=_worker,
146 args=(weakref.ref(self, weakref_cb),
148 args=(weakref.ref(self, weakref_cb),
147 self._work_queue))
149 self._work_queue))
148 t.daemon = True
150 t.daemon = True
149 t.start()
151 t.start()
150 self._threads.add(t)
152 self._threads.add(t)
151 _threads_queues[t] = self._work_queue
153 _threads_queues[t] = self._work_queue
152
154
153 def shutdown(self, wait=True):
155 def shutdown(self, wait=True):
154 with self._shutdown_lock:
156 with self._shutdown_lock:
155 self._shutdown = True
157 self._shutdown = True
156 self._work_queue.put(None)
158 self._work_queue.put(None)
157 if wait:
159 if wait:
158 for t in self._threads:
160 for t in self._threads:
159 t.join(sys.maxint)
161 t.join(sys.maxint)
160 shutdown.__doc__ = _base.Executor.shutdown.__doc__
162 shutdown.__doc__ = _base.Executor.shutdown.__doc__
General Comments 0
You need to be logged in to leave comments. Login now