##// END OF EJS Templates
Fix doctest.
Fernando Perez -
Show More
@@ -1,1113 +1,1113
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_task -*-
2 # -*- test-case-name: IPython.kernel.tests.test_task -*-
3
3
4 """Task farming representation of the ControllerService."""
4 """Task farming representation of the ControllerService."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 import copy, time
19 import copy, time
20 from types import FunctionType
20 from types import FunctionType
21
21
22 import zope.interface as zi, string
22 import zope.interface as zi, string
23 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
24 from twisted.python import components, log, failure
24 from twisted.python import components, log, failure
25
25
26 from IPython.kernel.util import printer
26 from IPython.kernel.util import printer
27 from IPython.kernel import engineservice as es, error
27 from IPython.kernel import engineservice as es, error
28 from IPython.kernel import controllerservice as cs
28 from IPython.kernel import controllerservice as cs
29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
30
30
31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
32
32
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34 # Definition of the Task objects
34 # Definition of the Task objects
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36
36
37 time_format = '%Y/%m/%d %H:%M:%S'
37 time_format = '%Y/%m/%d %H:%M:%S'
38
38
39 class ITask(zi.Interface):
39 class ITask(zi.Interface):
40 """
40 """
41 This interface provides a generic definition of what constitutes a task.
41 This interface provides a generic definition of what constitutes a task.
42
42
43 There are two sides to a task. First a task needs to take input from
43 There are two sides to a task. First a task needs to take input from
44 a user to determine what work is performed by the task. Second, the
44 a user to determine what work is performed by the task. Second, the
45 task needs to have the logic that knows how to turn that information
45 task needs to have the logic that knows how to turn that information
46 info specific calls to a worker, through the `IQueuedEngine` interface.
46 info specific calls to a worker, through the `IQueuedEngine` interface.
47
47
48 Many method in this class get two things passed to them: a Deferred
48 Many method in this class get two things passed to them: a Deferred
49 and an IQueuedEngine implementer. Such methods should register callbacks
49 and an IQueuedEngine implementer. Such methods should register callbacks
50 on the Deferred that use the IQueuedEngine to accomplish something. See
50 on the Deferred that use the IQueuedEngine to accomplish something. See
51 the existing task objects for examples.
51 the existing task objects for examples.
52 """
52 """
53
53
54 zi.Attribute('retries','How many times to retry the task')
54 zi.Attribute('retries','How many times to retry the task')
55 zi.Attribute('recovery_task','A task to try if the initial one fails')
55 zi.Attribute('recovery_task','A task to try if the initial one fails')
56 zi.Attribute('taskid','the id of the task')
56 zi.Attribute('taskid','the id of the task')
57
57
58 def start_time(result):
58 def start_time(result):
59 """
59 """
60 Do anything needed to start the timing of the task.
60 Do anything needed to start the timing of the task.
61
61
62 Must simply return the result after starting the timers.
62 Must simply return the result after starting the timers.
63 """
63 """
64
64
65 def stop_time(result):
65 def stop_time(result):
66 """
66 """
67 Do anything needed to stop the timing of the task.
67 Do anything needed to stop the timing of the task.
68
68
69 Must simply return the result after stopping the timers. This
69 Must simply return the result after stopping the timers. This
70 method will usually set attributes that are used by `process_result`
70 method will usually set attributes that are used by `process_result`
71 in building result of the task.
71 in building result of the task.
72 """
72 """
73
73
74 def pre_task(d, queued_engine):
74 def pre_task(d, queued_engine):
75 """Do something with the queued_engine before the task is run.
75 """Do something with the queued_engine before the task is run.
76
76
77 This method should simply add callbacks to the input Deferred
77 This method should simply add callbacks to the input Deferred
78 that do something with the `queued_engine` before the task is run.
78 that do something with the `queued_engine` before the task is run.
79
79
80 :Parameters:
80 :Parameters:
81 d : Deferred
81 d : Deferred
82 The deferred that actions should be attached to
82 The deferred that actions should be attached to
83 queued_engine : IQueuedEngine implementer
83 queued_engine : IQueuedEngine implementer
84 The worker that has been allocated to perform the task
84 The worker that has been allocated to perform the task
85 """
85 """
86
86
87 def post_task(d, queued_engine):
87 def post_task(d, queued_engine):
88 """Do something with the queued_engine after the task is run.
88 """Do something with the queued_engine after the task is run.
89
89
90 This method should simply add callbacks to the input Deferred
90 This method should simply add callbacks to the input Deferred
91 that do something with the `queued_engine` before the task is run.
91 that do something with the `queued_engine` before the task is run.
92
92
93 :Parameters:
93 :Parameters:
94 d : Deferred
94 d : Deferred
95 The deferred that actions should be attached to
95 The deferred that actions should be attached to
96 queued_engine : IQueuedEngine implementer
96 queued_engine : IQueuedEngine implementer
97 The worker that has been allocated to perform the task
97 The worker that has been allocated to perform the task
98 """
98 """
99
99
100 def submit_task(d, queued_engine):
100 def submit_task(d, queued_engine):
101 """Submit a task using the `queued_engine` we have been allocated.
101 """Submit a task using the `queued_engine` we have been allocated.
102
102
103 When a task is ready to run, this method is called. This method
103 When a task is ready to run, this method is called. This method
104 must take the internal information of the task and make suitable
104 must take the internal information of the task and make suitable
105 calls on the queued_engine to have the actual work done.
105 calls on the queued_engine to have the actual work done.
106
106
107 This method should simply add callbacks to the input Deferred
107 This method should simply add callbacks to the input Deferred
108 that do something with the `queued_engine` before the task is run.
108 that do something with the `queued_engine` before the task is run.
109
109
110 :Parameters:
110 :Parameters:
111 d : Deferred
111 d : Deferred
112 The deferred that actions should be attached to
112 The deferred that actions should be attached to
113 queued_engine : IQueuedEngine implementer
113 queued_engine : IQueuedEngine implementer
114 The worker that has been allocated to perform the task
114 The worker that has been allocated to perform the task
115 """
115 """
116
116
117 def process_result(d, result, engine_id):
117 def process_result(d, result, engine_id):
118 """Take a raw task result.
118 """Take a raw task result.
119
119
120 Objects that implement `ITask` can choose how the result of running
120 Objects that implement `ITask` can choose how the result of running
121 the task is presented. This method takes the raw result and
121 the task is presented. This method takes the raw result and
122 does this logic. Two example are the `MapTask` which simply returns
122 does this logic. Two example are the `MapTask` which simply returns
123 the raw result or a `Failure` object and the `StringTask` which
123 the raw result or a `Failure` object and the `StringTask` which
124 returns a `TaskResult` object.
124 returns a `TaskResult` object.
125
125
126 :Parameters:
126 :Parameters:
127 d : Deferred
127 d : Deferred
128 The deferred that actions should be attached to
128 The deferred that actions should be attached to
129 result : object
129 result : object
130 The raw task result that needs to be wrapped
130 The raw task result that needs to be wrapped
131 engine_id : int
131 engine_id : int
132 The id of the engine that did the task
132 The id of the engine that did the task
133
133
134 :Returns:
134 :Returns:
135 The result, as a tuple of the form: (success, result).
135 The result, as a tuple of the form: (success, result).
136 Here, success is a boolean indicating if the task
136 Here, success is a boolean indicating if the task
137 succeeded or failed and result is the result.
137 succeeded or failed and result is the result.
138 """
138 """
139
139
140 def check_depend(properties):
140 def check_depend(properties):
141 """Check properties to see if the task should be run.
141 """Check properties to see if the task should be run.
142
142
143 :Parameters:
143 :Parameters:
144 properties : dict
144 properties : dict
145 A dictionary of properties that an engine has set
145 A dictionary of properties that an engine has set
146
146
147 :Returns:
147 :Returns:
148 True if the task should be run, False otherwise
148 True if the task should be run, False otherwise
149 """
149 """
150
150
151 def can_task(self):
151 def can_task(self):
152 """Serialize (can) any functions in the task for pickling.
152 """Serialize (can) any functions in the task for pickling.
153
153
154 Subclasses must override this method and make sure that all
154 Subclasses must override this method and make sure that all
155 functions in the task are canned by calling `can` on the
155 functions in the task are canned by calling `can` on the
156 function.
156 function.
157 """
157 """
158
158
159 def uncan_task(self):
159 def uncan_task(self):
160 """Unserialize (uncan) any canned function in the task."""
160 """Unserialize (uncan) any canned function in the task."""
161
161
162 class BaseTask(object):
162 class BaseTask(object):
163 """
163 """
164 Common fuctionality for all objects implementing `ITask`.
164 Common fuctionality for all objects implementing `ITask`.
165 """
165 """
166
166
167 zi.implements(ITask)
167 zi.implements(ITask)
168
168
169 def __init__(self, clear_before=False, clear_after=False, retries=0,
169 def __init__(self, clear_before=False, clear_after=False, retries=0,
170 recovery_task=None, depend=None):
170 recovery_task=None, depend=None):
171 """
171 """
172 Make a generic task.
172 Make a generic task.
173
173
174 :Parameters:
174 :Parameters:
175 clear_before : boolean
175 clear_before : boolean
176 Should the engines namespace be cleared before the task
176 Should the engines namespace be cleared before the task
177 is run
177 is run
178 clear_after : boolean
178 clear_after : boolean
179 Should the engines namespace be clear after the task is run
179 Should the engines namespace be clear after the task is run
180 retries : int
180 retries : int
181 The number of times a task should be retries upon failure
181 The number of times a task should be retries upon failure
182 recovery_task : any task object
182 recovery_task : any task object
183 If a task fails and it has a recovery_task, that is run
183 If a task fails and it has a recovery_task, that is run
184 upon a retry
184 upon a retry
185 depend : FunctionType
185 depend : FunctionType
186 A function that is called to test for properties. This function
186 A function that is called to test for properties. This function
187 must take one argument, the properties dict and return a boolean
187 must take one argument, the properties dict and return a boolean
188 """
188 """
189 self.clear_before = clear_before
189 self.clear_before = clear_before
190 self.clear_after = clear_after
190 self.clear_after = clear_after
191 self.retries = retries
191 self.retries = retries
192 self.recovery_task = recovery_task
192 self.recovery_task = recovery_task
193 self.depend = depend
193 self.depend = depend
194 self.taskid = None
194 self.taskid = None
195
195
196 def start_time(self, result):
196 def start_time(self, result):
197 """
197 """
198 Start the basic timers.
198 Start the basic timers.
199 """
199 """
200 self.start = time.time()
200 self.start = time.time()
201 self.start_struct = time.localtime()
201 self.start_struct = time.localtime()
202 return result
202 return result
203
203
204 def stop_time(self, result):
204 def stop_time(self, result):
205 """
205 """
206 Stop the basic timers.
206 Stop the basic timers.
207 """
207 """
208 self.stop = time.time()
208 self.stop = time.time()
209 self.stop_struct = time.localtime()
209 self.stop_struct = time.localtime()
210 self.duration = self.stop - self.start
210 self.duration = self.stop - self.start
211 self.submitted = time.strftime(time_format, self.start_struct)
211 self.submitted = time.strftime(time_format, self.start_struct)
212 self.completed = time.strftime(time_format)
212 self.completed = time.strftime(time_format)
213 return result
213 return result
214
214
215 def pre_task(self, d, queued_engine):
215 def pre_task(self, d, queued_engine):
216 """
216 """
217 Clear the engine before running the task if clear_before is set.
217 Clear the engine before running the task if clear_before is set.
218 """
218 """
219 if self.clear_before:
219 if self.clear_before:
220 d.addCallback(lambda r: queued_engine.reset())
220 d.addCallback(lambda r: queued_engine.reset())
221
221
222 def post_task(self, d, queued_engine):
222 def post_task(self, d, queued_engine):
223 """
223 """
224 Clear the engine after running the task if clear_after is set.
224 Clear the engine after running the task if clear_after is set.
225 """
225 """
226 def reseter(result):
226 def reseter(result):
227 queued_engine.reset()
227 queued_engine.reset()
228 return result
228 return result
229 if self.clear_after:
229 if self.clear_after:
230 d.addBoth(reseter)
230 d.addBoth(reseter)
231
231
232 def submit_task(self, d, queued_engine):
232 def submit_task(self, d, queued_engine):
233 raise NotImplementedError('submit_task must be implemented in a subclass')
233 raise NotImplementedError('submit_task must be implemented in a subclass')
234
234
235 def process_result(self, result, engine_id):
235 def process_result(self, result, engine_id):
236 """
236 """
237 Process a task result.
237 Process a task result.
238
238
239 This is the default `process_result` that just returns the raw
239 This is the default `process_result` that just returns the raw
240 result or a `Failure`.
240 result or a `Failure`.
241 """
241 """
242 if isinstance(result, failure.Failure):
242 if isinstance(result, failure.Failure):
243 return (False, result)
243 return (False, result)
244 else:
244 else:
245 return (True, result)
245 return (True, result)
246
246
247 def check_depend(self, properties):
247 def check_depend(self, properties):
248 """
248 """
249 Calls self.depend(properties) to see if a task should be run.
249 Calls self.depend(properties) to see if a task should be run.
250 """
250 """
251 if self.depend is not None:
251 if self.depend is not None:
252 return self.depend(properties)
252 return self.depend(properties)
253 else:
253 else:
254 return True
254 return True
255
255
256 def can_task(self):
256 def can_task(self):
257 self.depend = can(self.depend)
257 self.depend = can(self.depend)
258 if isinstance(self.recovery_task, BaseTask):
258 if isinstance(self.recovery_task, BaseTask):
259 self.recovery_task.can_task()
259 self.recovery_task.can_task()
260
260
261 def uncan_task(self):
261 def uncan_task(self):
262 self.depend = uncan(self.depend)
262 self.depend = uncan(self.depend)
263 if isinstance(self.recovery_task, BaseTask):
263 if isinstance(self.recovery_task, BaseTask):
264 self.recovery_task.uncan_task()
264 self.recovery_task.uncan_task()
265
265
266 class MapTask(BaseTask):
266 class MapTask(BaseTask):
267 """
267 """
268 A task that consists of a function and arguments.
268 A task that consists of a function and arguments.
269 """
269 """
270
270
271 zi.implements(ITask)
271 zi.implements(ITask)
272
272
273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
274 clear_after=False, retries=0, recovery_task=None, depend=None):
274 clear_after=False, retries=0, recovery_task=None, depend=None):
275 """
275 """
276 Create a task based on a function, args and kwargs.
276 Create a task based on a function, args and kwargs.
277
277
278 This is a simple type of task that consists of calling:
278 This is a simple type of task that consists of calling:
279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
280
280
281 The return value of the function, or a `Failure` wrapping an
281 The return value of the function, or a `Failure` wrapping an
282 exception is the task result for this type of task.
282 exception is the task result for this type of task.
283 """
283 """
284 BaseTask.__init__(self, clear_before, clear_after, retries,
284 BaseTask.__init__(self, clear_before, clear_after, retries,
285 recovery_task, depend)
285 recovery_task, depend)
286 if not isinstance(function, FunctionType):
286 if not isinstance(function, FunctionType):
287 raise TypeError('a task function must be a FunctionType')
287 raise TypeError('a task function must be a FunctionType')
288 self.function = function
288 self.function = function
289 if args is None:
289 if args is None:
290 self.args = ()
290 self.args = ()
291 else:
291 else:
292 self.args = args
292 self.args = args
293 if not isinstance(self.args, (list, tuple)):
293 if not isinstance(self.args, (list, tuple)):
294 raise TypeError('a task args must be a list or tuple')
294 raise TypeError('a task args must be a list or tuple')
295 if kwargs is None:
295 if kwargs is None:
296 self.kwargs = {}
296 self.kwargs = {}
297 else:
297 else:
298 self.kwargs = kwargs
298 self.kwargs = kwargs
299 if not isinstance(self.kwargs, dict):
299 if not isinstance(self.kwargs, dict):
300 raise TypeError('a task kwargs must be a dict')
300 raise TypeError('a task kwargs must be a dict')
301
301
302 def submit_task(self, d, queued_engine):
302 def submit_task(self, d, queued_engine):
303 d.addCallback(lambda r: queued_engine.push_function(
303 d.addCallback(lambda r: queued_engine.push_function(
304 dict(_ipython_task_function=self.function))
304 dict(_ipython_task_function=self.function))
305 )
305 )
306 d.addCallback(lambda r: queued_engine.push(
306 d.addCallback(lambda r: queued_engine.push(
307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
308 )
308 )
309 d.addCallback(lambda r: queued_engine.execute(
309 d.addCallback(lambda r: queued_engine.execute(
310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
311 )
311 )
312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
313
313
314 def can_task(self):
314 def can_task(self):
315 self.function = can(self.function)
315 self.function = can(self.function)
316 BaseTask.can_task(self)
316 BaseTask.can_task(self)
317
317
318 def uncan_task(self):
318 def uncan_task(self):
319 self.function = uncan(self.function)
319 self.function = uncan(self.function)
320 BaseTask.uncan_task(self)
320 BaseTask.uncan_task(self)
321
321
322
322
323 class StringTask(BaseTask):
323 class StringTask(BaseTask):
324 """
324 """
325 A task that consists of a string of Python code to run.
325 A task that consists of a string of Python code to run.
326 """
326 """
327
327
328 def __init__(self, expression, pull=None, push=None,
328 def __init__(self, expression, pull=None, push=None,
329 clear_before=False, clear_after=False, retries=0,
329 clear_before=False, clear_after=False, retries=0,
330 recovery_task=None, depend=None):
330 recovery_task=None, depend=None):
331 """
331 """
332 Create a task based on a Python expression and variables
332 Create a task based on a Python expression and variables
333
333
334 This type of task lets you push a set of variables to the engines
334 This type of task lets you push a set of variables to the engines
335 namespace, run a Python string in that namespace and then bring back
335 namespace, run a Python string in that namespace and then bring back
336 a different set of Python variables as the result.
336 a different set of Python variables as the result.
337
337
338 Because this type of task can return many results (through the
338 Because this type of task can return many results (through the
339 `pull` keyword argument) it returns a special `TaskResult` object
339 `pull` keyword argument) it returns a special `TaskResult` object
340 that wraps the pulled variables, statistics about the run and
340 that wraps the pulled variables, statistics about the run and
341 any exceptions raised.
341 any exceptions raised.
342 """
342 """
343 if not isinstance(expression, str):
343 if not isinstance(expression, str):
344 raise TypeError('a task expression must be a string')
344 raise TypeError('a task expression must be a string')
345 self.expression = expression
345 self.expression = expression
346
346
347 if pull==None:
347 if pull==None:
348 self.pull = ()
348 self.pull = ()
349 elif isinstance(pull, str):
349 elif isinstance(pull, str):
350 self.pull = (pull,)
350 self.pull = (pull,)
351 elif isinstance(pull, (list, tuple)):
351 elif isinstance(pull, (list, tuple)):
352 self.pull = pull
352 self.pull = pull
353 else:
353 else:
354 raise TypeError('pull must be str or a sequence of strs')
354 raise TypeError('pull must be str or a sequence of strs')
355
355
356 if push==None:
356 if push==None:
357 self.push = {}
357 self.push = {}
358 elif isinstance(push, dict):
358 elif isinstance(push, dict):
359 self.push = push
359 self.push = push
360 else:
360 else:
361 raise TypeError('push must be a dict')
361 raise TypeError('push must be a dict')
362
362
363 BaseTask.__init__(self, clear_before, clear_after, retries,
363 BaseTask.__init__(self, clear_before, clear_after, retries,
364 recovery_task, depend)
364 recovery_task, depend)
365
365
366 def submit_task(self, d, queued_engine):
366 def submit_task(self, d, queued_engine):
367 if self.push is not None:
367 if self.push is not None:
368 d.addCallback(lambda r: queued_engine.push(self.push))
368 d.addCallback(lambda r: queued_engine.push(self.push))
369
369
370 d.addCallback(lambda r: queued_engine.execute(self.expression))
370 d.addCallback(lambda r: queued_engine.execute(self.expression))
371
371
372 if self.pull is not None:
372 if self.pull is not None:
373 d.addCallback(lambda r: queued_engine.pull(self.pull))
373 d.addCallback(lambda r: queued_engine.pull(self.pull))
374 else:
374 else:
375 d.addCallback(lambda r: None)
375 d.addCallback(lambda r: None)
376
376
377 def process_result(self, result, engine_id):
377 def process_result(self, result, engine_id):
378 if isinstance(result, failure.Failure):
378 if isinstance(result, failure.Failure):
379 tr = TaskResult(result, engine_id)
379 tr = TaskResult(result, engine_id)
380 else:
380 else:
381 if self.pull is None:
381 if self.pull is None:
382 resultDict = {}
382 resultDict = {}
383 elif len(self.pull) == 1:
383 elif len(self.pull) == 1:
384 resultDict = {self.pull[0]:result}
384 resultDict = {self.pull[0]:result}
385 else:
385 else:
386 resultDict = dict(zip(self.pull, result))
386 resultDict = dict(zip(self.pull, result))
387 tr = TaskResult(resultDict, engine_id)
387 tr = TaskResult(resultDict, engine_id)
388 # Assign task attributes
388 # Assign task attributes
389 tr.submitted = self.submitted
389 tr.submitted = self.submitted
390 tr.completed = self.completed
390 tr.completed = self.completed
391 tr.duration = self.duration
391 tr.duration = self.duration
392 if hasattr(self,'taskid'):
392 if hasattr(self,'taskid'):
393 tr.taskid = self.taskid
393 tr.taskid = self.taskid
394 else:
394 else:
395 tr.taskid = None
395 tr.taskid = None
396 if isinstance(result, failure.Failure):
396 if isinstance(result, failure.Failure):
397 return (False, tr)
397 return (False, tr)
398 else:
398 else:
399 return (True, tr)
399 return (True, tr)
400
400
401 class ResultNS(object):
401 class ResultNS(object):
402 """
402 """
403 A dict like object for holding the results of a task.
403 A dict like object for holding the results of a task.
404
404
405 The result namespace object for use in `TaskResult` objects as tr.ns.
405 The result namespace object for use in `TaskResult` objects as tr.ns.
406 It builds an object from a dictionary, such that it has attributes
406 It builds an object from a dictionary, such that it has attributes
407 according to the key,value pairs of the dictionary.
407 according to the key,value pairs of the dictionary.
408
408
409 This works by calling setattr on ALL key,value pairs in the dict. If a user
409 This works by calling setattr on ALL key,value pairs in the dict. If a user
410 chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
410 chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
411 This can be a bad idea, as it may corrupt standard behavior of the
411 This can be a bad idea, as it may corrupt standard behavior of the
412 ns object.
412 ns object.
413
413
414 Example
414 Example
415 --------
415 --------
416
416
417 >>> ns = ResultNS({'a':17,'foo':range(3)})
417 >>> ns = ResultNS({'a':17,'foo':range(3)})
418 >>> print ns
418 >>> print ns
419 NS{'a':17,'foo':range(3)}
419 NS{'a': 17, 'foo': [0, 1, 2]}
420 >>> ns.a
420 >>> ns.a
421 17
421 17
422 >>> ns['foo']
422 >>> ns['foo']
423 [0,1,2]
423 [0, 1, 2]
424 """
424 """
425 def __init__(self, dikt):
425 def __init__(self, dikt):
426 for k,v in dikt.iteritems():
426 for k,v in dikt.iteritems():
427 setattr(self,k,v)
427 setattr(self,k,v)
428
428
429 def __repr__(self):
429 def __repr__(self):
430 l = dir(self)
430 l = dir(self)
431 d = {}
431 d = {}
432 for k in l:
432 for k in l:
433 # do not print private objects
433 # do not print private objects
434 if k[:2] != '__' and k[-2:] != '__':
434 if k[:2] != '__' and k[-2:] != '__':
435 d[k] = getattr(self, k)
435 d[k] = getattr(self, k)
436 return "NS"+repr(d)
436 return "NS"+repr(d)
437
437
438 def __getitem__(self, key):
438 def __getitem__(self, key):
439 return getattr(self, key)
439 return getattr(self, key)
440
440
441 class TaskResult(object):
441 class TaskResult(object):
442 """
442 """
443 An object for returning task results for certain types of tasks.
443 An object for returning task results for certain types of tasks.
444
444
445 This object encapsulates the results of a task. On task
445 This object encapsulates the results of a task. On task
446 success it will have a keys attribute that will have a list
446 success it will have a keys attribute that will have a list
447 of the variables that have been pulled back. These variables
447 of the variables that have been pulled back. These variables
448 are accessible as attributes of this class as well. On
448 are accessible as attributes of this class as well. On
449 success the failure attribute will be None.
449 success the failure attribute will be None.
450
450
451 In task failure, keys will be empty, but failure will contain
451 In task failure, keys will be empty, but failure will contain
452 the failure object that encapsulates the remote exception.
452 the failure object that encapsulates the remote exception.
453 One can also simply call the `raise_exception` method of
453 One can also simply call the `raise_exception` method of
454 this class to re-raise any remote exception in the local
454 this class to re-raise any remote exception in the local
455 session.
455 session.
456
456
457 The `TaskResult` has a `.ns` member, which is a property for access
457 The `TaskResult` has a `.ns` member, which is a property for access
458 to the results. If the Task had pull=['a', 'b'], then the
458 to the results. If the Task had pull=['a', 'b'], then the
459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
460 Accessing `tr.ns` will raise the remote failure if the task failed.
460 Accessing `tr.ns` will raise the remote failure if the task failed.
461
461
462 The `engineid` attribute should have the `engineid` of the engine
462 The `engineid` attribute should have the `engineid` of the engine
463 that ran the task. But, because engines can come and go,
463 that ran the task. But, because engines can come and go,
464 the `engineid` may not continue to be
464 the `engineid` may not continue to be
465 valid or accurate.
465 valid or accurate.
466
466
467 The `taskid` attribute simply gives the `taskid` that the task
467 The `taskid` attribute simply gives the `taskid` that the task
468 is tracked under.
468 is tracked under.
469 """
469 """
470 taskid = None
470 taskid = None
471
471
472 def _getNS(self):
472 def _getNS(self):
473 if isinstance(self.failure, failure.Failure):
473 if isinstance(self.failure, failure.Failure):
474 return self.failure.raiseException()
474 return self.failure.raiseException()
475 else:
475 else:
476 return self._ns
476 return self._ns
477
477
478 def _setNS(self, v):
478 def _setNS(self, v):
479 raise Exception("the ns attribute cannot be changed")
479 raise Exception("the ns attribute cannot be changed")
480
480
481 ns = property(_getNS, _setNS)
481 ns = property(_getNS, _setNS)
482
482
483 def __init__(self, results, engineid):
483 def __init__(self, results, engineid):
484 self.engineid = engineid
484 self.engineid = engineid
485 if isinstance(results, failure.Failure):
485 if isinstance(results, failure.Failure):
486 self.failure = results
486 self.failure = results
487 self.results = {}
487 self.results = {}
488 else:
488 else:
489 self.results = results
489 self.results = results
490 self.failure = None
490 self.failure = None
491
491
492 self._ns = ResultNS(self.results)
492 self._ns = ResultNS(self.results)
493
493
494 self.keys = self.results.keys()
494 self.keys = self.results.keys()
495
495
496 def __repr__(self):
496 def __repr__(self):
497 if self.failure is not None:
497 if self.failure is not None:
498 contents = self.failure
498 contents = self.failure
499 else:
499 else:
500 contents = self.results
500 contents = self.results
501 return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
501 return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
502
502
503 def __getitem__(self, key):
503 def __getitem__(self, key):
504 if self.failure is not None:
504 if self.failure is not None:
505 self.raise_exception()
505 self.raise_exception()
506 return self.results[key]
506 return self.results[key]
507
507
508 def raise_exception(self):
508 def raise_exception(self):
509 """Re-raise any remote exceptions in the local python session."""
509 """Re-raise any remote exceptions in the local python session."""
510 if self.failure is not None:
510 if self.failure is not None:
511 self.failure.raiseException()
511 self.failure.raiseException()
512
512
513
513
514 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
515 # The controller side of things
515 # The controller side of things
516 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
517
517
518 class IWorker(zi.Interface):
518 class IWorker(zi.Interface):
519 """The Basic Worker Interface.
519 """The Basic Worker Interface.
520
520
521 A worked is a representation of an Engine that is ready to run tasks.
521 A worked is a representation of an Engine that is ready to run tasks.
522 """
522 """
523
523
524 zi.Attribute("workerid", "the id of the worker")
524 zi.Attribute("workerid", "the id of the worker")
525
525
526 def run(task):
526 def run(task):
527 """Run task in worker's namespace.
527 """Run task in worker's namespace.
528
528
529 :Parameters:
529 :Parameters:
530 task : a `Task` object
530 task : a `Task` object
531
531
532 :Returns: `Deferred` to a tuple of (success, result) where
532 :Returns: `Deferred` to a tuple of (success, result) where
533 success if a boolean that signifies success or failure
533 success if a boolean that signifies success or failure
534 and result is the task result.
534 and result is the task result.
535 """
535 """
536
536
537
537
538 class WorkerFromQueuedEngine(object):
538 class WorkerFromQueuedEngine(object):
539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
540
540
541 zi.implements(IWorker)
541 zi.implements(IWorker)
542
542
543 def __init__(self, qe):
543 def __init__(self, qe):
544 self.queuedEngine = qe
544 self.queuedEngine = qe
545 self.workerid = None
545 self.workerid = None
546
546
547 def _get_properties(self):
547 def _get_properties(self):
548 return self.queuedEngine.properties
548 return self.queuedEngine.properties
549
549
550 properties = property(_get_properties, lambda self, _:None)
550 properties = property(_get_properties, lambda self, _:None)
551
551
552 def run(self, task):
552 def run(self, task):
553 """Run task in worker's namespace.
553 """Run task in worker's namespace.
554
554
555 This takes a task and calls methods on the task that actually
555 This takes a task and calls methods on the task that actually
556 cause `self.queuedEngine` to do the task. See the methods of
556 cause `self.queuedEngine` to do the task. See the methods of
557 `ITask` for more information about how these methods are called.
557 `ITask` for more information about how these methods are called.
558
558
559 :Parameters:
559 :Parameters:
560 task : a `Task` object
560 task : a `Task` object
561
561
562 :Returns: `Deferred` to a tuple of (success, result) where
562 :Returns: `Deferred` to a tuple of (success, result) where
563 success if a boolean that signifies success or failure
563 success if a boolean that signifies success or failure
564 and result is the task result.
564 and result is the task result.
565 """
565 """
566 d = defer.succeed(None)
566 d = defer.succeed(None)
567 d.addCallback(task.start_time)
567 d.addCallback(task.start_time)
568 task.pre_task(d, self.queuedEngine)
568 task.pre_task(d, self.queuedEngine)
569 task.submit_task(d, self.queuedEngine)
569 task.submit_task(d, self.queuedEngine)
570 task.post_task(d, self.queuedEngine)
570 task.post_task(d, self.queuedEngine)
571 d.addBoth(task.stop_time)
571 d.addBoth(task.stop_time)
572 d.addBoth(task.process_result, self.queuedEngine.id)
572 d.addBoth(task.process_result, self.queuedEngine.id)
573 # At this point, there will be (success, result) coming down the line
573 # At this point, there will be (success, result) coming down the line
574 return d
574 return d
575
575
576
576
577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
578
578
579 class IScheduler(zi.Interface):
579 class IScheduler(zi.Interface):
580 """The interface for a Scheduler.
580 """The interface for a Scheduler.
581 """
581 """
582 zi.Attribute("nworkers", "the number of unassigned workers")
582 zi.Attribute("nworkers", "the number of unassigned workers")
583 zi.Attribute("ntasks", "the number of unscheduled tasks")
583 zi.Attribute("ntasks", "the number of unscheduled tasks")
584 zi.Attribute("workerids", "a list of the worker ids")
584 zi.Attribute("workerids", "a list of the worker ids")
585 zi.Attribute("taskids", "a list of the task ids")
585 zi.Attribute("taskids", "a list of the task ids")
586
586
587 def add_task(task, **flags):
587 def add_task(task, **flags):
588 """Add a task to the queue of the Scheduler.
588 """Add a task to the queue of the Scheduler.
589
589
590 :Parameters:
590 :Parameters:
591 task : an `ITask` implementer
591 task : an `ITask` implementer
592 The task to be queued.
592 The task to be queued.
593 flags : dict
593 flags : dict
594 General keywords for more sophisticated scheduling
594 General keywords for more sophisticated scheduling
595 """
595 """
596
596
597 def pop_task(id=None):
597 def pop_task(id=None):
598 """Pops a task object from the queue.
598 """Pops a task object from the queue.
599
599
600 This gets the next task to be run. If no `id` is requested, the highest priority
600 This gets the next task to be run. If no `id` is requested, the highest priority
601 task is returned.
601 task is returned.
602
602
603 :Parameters:
603 :Parameters:
604 id
604 id
605 The id of the task to be popped. The default (None) is to return
605 The id of the task to be popped. The default (None) is to return
606 the highest priority task.
606 the highest priority task.
607
607
608 :Returns: an `ITask` implementer
608 :Returns: an `ITask` implementer
609
609
610 :Exceptions:
610 :Exceptions:
611 IndexError : raised if no taskid in queue
611 IndexError : raised if no taskid in queue
612 """
612 """
613
613
614 def add_worker(worker, **flags):
614 def add_worker(worker, **flags):
615 """Add a worker to the worker queue.
615 """Add a worker to the worker queue.
616
616
617 :Parameters:
617 :Parameters:
618 worker : an `IWorker` implementer
618 worker : an `IWorker` implementer
619 flags : dict
619 flags : dict
620 General keywords for more sophisticated scheduling
620 General keywords for more sophisticated scheduling
621 """
621 """
622
622
623 def pop_worker(id=None):
623 def pop_worker(id=None):
624 """Pops an IWorker object that is ready to do work.
624 """Pops an IWorker object that is ready to do work.
625
625
626 This gets the next IWorker that is ready to do work.
626 This gets the next IWorker that is ready to do work.
627
627
628 :Parameters:
628 :Parameters:
629 id : if specified, will pop worker with workerid=id, else pops
629 id : if specified, will pop worker with workerid=id, else pops
630 highest priority worker. Defaults to None.
630 highest priority worker. Defaults to None.
631
631
632 :Returns:
632 :Returns:
633 an IWorker object
633 an IWorker object
634
634
635 :Exceptions:
635 :Exceptions:
636 IndexError : raised if no workerid in queue
636 IndexError : raised if no workerid in queue
637 """
637 """
638
638
639 def ready():
639 def ready():
640 """Returns True if there is something to do, False otherwise"""
640 """Returns True if there is something to do, False otherwise"""
641
641
642 def schedule():
642 def schedule():
643 """Returns (worker,task) pair for the next task to be run."""
643 """Returns (worker,task) pair for the next task to be run."""
644
644
645
645
646 class FIFOScheduler(object):
646 class FIFOScheduler(object):
647 """
647 """
648 A basic First-In-First-Out (Queue) Scheduler.
648 A basic First-In-First-Out (Queue) Scheduler.
649
649
650 This is the default Scheduler for the `TaskController`.
650 This is the default Scheduler for the `TaskController`.
651 See the docstrings for `IScheduler` for interface details.
651 See the docstrings for `IScheduler` for interface details.
652 """
652 """
653
653
654 zi.implements(IScheduler)
654 zi.implements(IScheduler)
655
655
656 def __init__(self):
656 def __init__(self):
657 self.tasks = []
657 self.tasks = []
658 self.workers = []
658 self.workers = []
659
659
660 def _ntasks(self):
660 def _ntasks(self):
661 return len(self.tasks)
661 return len(self.tasks)
662
662
663 def _nworkers(self):
663 def _nworkers(self):
664 return len(self.workers)
664 return len(self.workers)
665
665
666 ntasks = property(_ntasks, lambda self, _:None)
666 ntasks = property(_ntasks, lambda self, _:None)
667 nworkers = property(_nworkers, lambda self, _:None)
667 nworkers = property(_nworkers, lambda self, _:None)
668
668
669 def _taskids(self):
669 def _taskids(self):
670 return [t.taskid for t in self.tasks]
670 return [t.taskid for t in self.tasks]
671
671
672 def _workerids(self):
672 def _workerids(self):
673 return [w.workerid for w in self.workers]
673 return [w.workerid for w in self.workers]
674
674
675 taskids = property(_taskids, lambda self,_:None)
675 taskids = property(_taskids, lambda self,_:None)
676 workerids = property(_workerids, lambda self,_:None)
676 workerids = property(_workerids, lambda self,_:None)
677
677
678 def add_task(self, task, **flags):
678 def add_task(self, task, **flags):
679 self.tasks.append(task)
679 self.tasks.append(task)
680
680
681 def pop_task(self, id=None):
681 def pop_task(self, id=None):
682 if id is None:
682 if id is None:
683 return self.tasks.pop(0)
683 return self.tasks.pop(0)
684 else:
684 else:
685 for i in range(len(self.tasks)):
685 for i in range(len(self.tasks)):
686 taskid = self.tasks[i].taskid
686 taskid = self.tasks[i].taskid
687 if id == taskid:
687 if id == taskid:
688 return self.tasks.pop(i)
688 return self.tasks.pop(i)
689 raise IndexError("No task #%i"%id)
689 raise IndexError("No task #%i"%id)
690
690
691 def add_worker(self, worker, **flags):
691 def add_worker(self, worker, **flags):
692 self.workers.append(worker)
692 self.workers.append(worker)
693
693
694 def pop_worker(self, id=None):
694 def pop_worker(self, id=None):
695 if id is None:
695 if id is None:
696 return self.workers.pop(0)
696 return self.workers.pop(0)
697 else:
697 else:
698 for i in range(len(self.workers)):
698 for i in range(len(self.workers)):
699 workerid = self.workers[i].workerid
699 workerid = self.workers[i].workerid
700 if id == workerid:
700 if id == workerid:
701 return self.workers.pop(i)
701 return self.workers.pop(i)
702 raise IndexError("No worker #%i"%id)
702 raise IndexError("No worker #%i"%id)
703
703
704 def schedule(self):
704 def schedule(self):
705 for t in self.tasks:
705 for t in self.tasks:
706 for w in self.workers:
706 for w in self.workers:
707 try:# do not allow exceptions to break this
707 try:# do not allow exceptions to break this
708 # Allow the task to check itself using its
708 # Allow the task to check itself using its
709 # check_depend method.
709 # check_depend method.
710 cando = t.check_depend(w.properties)
710 cando = t.check_depend(w.properties)
711 except:
711 except:
712 cando = False
712 cando = False
713 if cando:
713 if cando:
714 return self.pop_worker(w.workerid), self.pop_task(t.taskid)
714 return self.pop_worker(w.workerid), self.pop_task(t.taskid)
715 return None, None
715 return None, None
716
716
717
717
718
718
719 class LIFOScheduler(FIFOScheduler):
719 class LIFOScheduler(FIFOScheduler):
720 """
720 """
721 A Last-In-First-Out (Stack) Scheduler.
721 A Last-In-First-Out (Stack) Scheduler.
722
722
723 This scheduler should naively reward fast engines by giving
723 This scheduler should naively reward fast engines by giving
724 them more jobs. This risks starvation, but only in cases with
724 them more jobs. This risks starvation, but only in cases with
725 low load, where starvation does not really matter.
725 low load, where starvation does not really matter.
726 """
726 """
727
727
728 def add_task(self, task, **flags):
728 def add_task(self, task, **flags):
729 # self.tasks.reverse()
729 # self.tasks.reverse()
730 self.tasks.insert(0, task)
730 self.tasks.insert(0, task)
731 # self.tasks.reverse()
731 # self.tasks.reverse()
732
732
733 def add_worker(self, worker, **flags):
733 def add_worker(self, worker, **flags):
734 # self.workers.reverse()
734 # self.workers.reverse()
735 self.workers.insert(0, worker)
735 self.workers.insert(0, worker)
736 # self.workers.reverse()
736 # self.workers.reverse()
737
737
738
738
739 class ITaskController(cs.IControllerBase):
739 class ITaskController(cs.IControllerBase):
740 """
740 """
741 The Task based interface to a `ControllerService` object
741 The Task based interface to a `ControllerService` object
742
742
743 This adapts a `ControllerService` to the ITaskController interface.
743 This adapts a `ControllerService` to the ITaskController interface.
744 """
744 """
745
745
746 def run(task):
746 def run(task):
747 """
747 """
748 Run a task.
748 Run a task.
749
749
750 :Parameters:
750 :Parameters:
751 task : an IPython `Task` object
751 task : an IPython `Task` object
752
752
753 :Returns: the integer ID of the task
753 :Returns: the integer ID of the task
754 """
754 """
755
755
756 def get_task_result(taskid, block=False):
756 def get_task_result(taskid, block=False):
757 """
757 """
758 Get the result of a task by its ID.
758 Get the result of a task by its ID.
759
759
760 :Parameters:
760 :Parameters:
761 taskid : int
761 taskid : int
762 the id of the task whose result is requested
762 the id of the task whose result is requested
763
763
764 :Returns: `Deferred` to the task result if the task is done, and None
764 :Returns: `Deferred` to the task result if the task is done, and None
765 if not.
765 if not.
766
766
767 :Exceptions:
767 :Exceptions:
768 actualResult will be an `IndexError` if no such task has been submitted
768 actualResult will be an `IndexError` if no such task has been submitted
769 """
769 """
770
770
771 def abort(taskid):
771 def abort(taskid):
772 """Remove task from queue if task is has not been submitted.
772 """Remove task from queue if task is has not been submitted.
773
773
774 If the task has already been submitted, wait for it to finish and discard
774 If the task has already been submitted, wait for it to finish and discard
775 results and prevent resubmission.
775 results and prevent resubmission.
776
776
777 :Parameters:
777 :Parameters:
778 taskid : the id of the task to be aborted
778 taskid : the id of the task to be aborted
779
779
780 :Returns:
780 :Returns:
781 `Deferred` to abort attempt completion. Will be None on success.
781 `Deferred` to abort attempt completion. Will be None on success.
782
782
783 :Exceptions:
783 :Exceptions:
784 deferred will fail with `IndexError` if no such task has been submitted
784 deferred will fail with `IndexError` if no such task has been submitted
785 or the task has already completed.
785 or the task has already completed.
786 """
786 """
787
787
788 def barrier(taskids):
788 def barrier(taskids):
789 """
789 """
790 Block until the list of taskids are completed.
790 Block until the list of taskids are completed.
791
791
792 Returns None on success.
792 Returns None on success.
793 """
793 """
794
794
795 def spin():
795 def spin():
796 """
796 """
797 Touch the scheduler, to resume scheduling without submitting a task.
797 Touch the scheduler, to resume scheduling without submitting a task.
798 """
798 """
799
799
800 def queue_status(verbose=False):
800 def queue_status(verbose=False):
801 """
801 """
802 Get a dictionary with the current state of the task queue.
802 Get a dictionary with the current state of the task queue.
803
803
804 If verbose is True, then return lists of taskids, otherwise,
804 If verbose is True, then return lists of taskids, otherwise,
805 return the number of tasks with each status.
805 return the number of tasks with each status.
806 """
806 """
807
807
808 def clear():
808 def clear():
809 """
809 """
810 Clear all previously run tasks from the task controller.
810 Clear all previously run tasks from the task controller.
811
811
812 This is needed because the task controller keep all task results
812 This is needed because the task controller keep all task results
813 in memory. This can be a problem is there are many completed
813 in memory. This can be a problem is there are many completed
814 tasks. Users should call this periodically to clean out these
814 tasks. Users should call this periodically to clean out these
815 cached task results.
815 cached task results.
816 """
816 """
817
817
818
818
819 class TaskController(cs.ControllerAdapterBase):
819 class TaskController(cs.ControllerAdapterBase):
820 """The Task based interface to a Controller object.
820 """The Task based interface to a Controller object.
821
821
822 If you want to use a different scheduler, just subclass this and set
822 If you want to use a different scheduler, just subclass this and set
823 the `SchedulerClass` member to the *class* of your chosen scheduler.
823 the `SchedulerClass` member to the *class* of your chosen scheduler.
824 """
824 """
825
825
826 zi.implements(ITaskController)
826 zi.implements(ITaskController)
827 SchedulerClass = FIFOScheduler
827 SchedulerClass = FIFOScheduler
828
828
829 timeout = 30
829 timeout = 30
830
830
831 def __init__(self, controller):
831 def __init__(self, controller):
832 self.controller = controller
832 self.controller = controller
833 self.controller.on_register_engine_do(self.registerWorker, True)
833 self.controller.on_register_engine_do(self.registerWorker, True)
834 self.controller.on_unregister_engine_do(self.unregisterWorker, True)
834 self.controller.on_unregister_engine_do(self.unregisterWorker, True)
835 self.taskid = 0
835 self.taskid = 0
836 self.failurePenalty = 1 # the time in seconds to penalize
836 self.failurePenalty = 1 # the time in seconds to penalize
837 # a worker for failing a task
837 # a worker for failing a task
838 self.pendingTasks = {} # dict of {workerid:(taskid, task)}
838 self.pendingTasks = {} # dict of {workerid:(taskid, task)}
839 self.deferredResults = {} # dict of {taskid:deferred}
839 self.deferredResults = {} # dict of {taskid:deferred}
840 self.finishedResults = {} # dict of {taskid:actualResult}
840 self.finishedResults = {} # dict of {taskid:actualResult}
841 self.workers = {} # dict of {workerid:worker}
841 self.workers = {} # dict of {workerid:worker}
842 self.abortPending = [] # dict of {taskid:abortDeferred}
842 self.abortPending = [] # dict of {taskid:abortDeferred}
843 self.idleLater = None # delayed call object for timeout
843 self.idleLater = None # delayed call object for timeout
844 self.scheduler = self.SchedulerClass()
844 self.scheduler = self.SchedulerClass()
845
845
846 for id in self.controller.engines.keys():
846 for id in self.controller.engines.keys():
847 self.workers[id] = IWorker(self.controller.engines[id])
847 self.workers[id] = IWorker(self.controller.engines[id])
848 self.workers[id].workerid = id
848 self.workers[id].workerid = id
849 self.schedule.add_worker(self.workers[id])
849 self.schedule.add_worker(self.workers[id])
850
850
851 def registerWorker(self, id):
851 def registerWorker(self, id):
852 """Called by controller.register_engine."""
852 """Called by controller.register_engine."""
853 if self.workers.get(id):
853 if self.workers.get(id):
854 raise ValueError("worker with id %s already exists. This should not happen." % id)
854 raise ValueError("worker with id %s already exists. This should not happen." % id)
855 self.workers[id] = IWorker(self.controller.engines[id])
855 self.workers[id] = IWorker(self.controller.engines[id])
856 self.workers[id].workerid = id
856 self.workers[id].workerid = id
857 if not self.pendingTasks.has_key(id):# if not working
857 if not self.pendingTasks.has_key(id):# if not working
858 self.scheduler.add_worker(self.workers[id])
858 self.scheduler.add_worker(self.workers[id])
859 self.distributeTasks()
859 self.distributeTasks()
860
860
861 def unregisterWorker(self, id):
861 def unregisterWorker(self, id):
862 """Called by controller.unregister_engine"""
862 """Called by controller.unregister_engine"""
863
863
864 if self.workers.has_key(id):
864 if self.workers.has_key(id):
865 try:
865 try:
866 self.scheduler.pop_worker(id)
866 self.scheduler.pop_worker(id)
867 except IndexError:
867 except IndexError:
868 pass
868 pass
869 self.workers.pop(id)
869 self.workers.pop(id)
870
870
871 def _pendingTaskIDs(self):
871 def _pendingTaskIDs(self):
872 return [t.taskid for t in self.pendingTasks.values()]
872 return [t.taskid for t in self.pendingTasks.values()]
873
873
874 #---------------------------------------------------------------------------
874 #---------------------------------------------------------------------------
875 # Interface methods
875 # Interface methods
876 #---------------------------------------------------------------------------
876 #---------------------------------------------------------------------------
877
877
878 def run(self, task):
878 def run(self, task):
879 """
879 """
880 Run a task and return `Deferred` to its taskid.
880 Run a task and return `Deferred` to its taskid.
881 """
881 """
882 task.taskid = self.taskid
882 task.taskid = self.taskid
883 task.start = time.localtime()
883 task.start = time.localtime()
884 self.taskid += 1
884 self.taskid += 1
885 d = defer.Deferred()
885 d = defer.Deferred()
886 self.scheduler.add_task(task)
886 self.scheduler.add_task(task)
887 log.msg('Queuing task: %i' % task.taskid)
887 log.msg('Queuing task: %i' % task.taskid)
888
888
889 self.deferredResults[task.taskid] = []
889 self.deferredResults[task.taskid] = []
890 self.distributeTasks()
890 self.distributeTasks()
891 return defer.succeed(task.taskid)
891 return defer.succeed(task.taskid)
892
892
893 def get_task_result(self, taskid, block=False):
893 def get_task_result(self, taskid, block=False):
894 """
894 """
895 Returns a `Deferred` to the task result, or None.
895 Returns a `Deferred` to the task result, or None.
896 """
896 """
897 log.msg("Getting task result: %i" % taskid)
897 log.msg("Getting task result: %i" % taskid)
898 if self.finishedResults.has_key(taskid):
898 if self.finishedResults.has_key(taskid):
899 tr = self.finishedResults[taskid]
899 tr = self.finishedResults[taskid]
900 return defer.succeed(tr)
900 return defer.succeed(tr)
901 elif self.deferredResults.has_key(taskid):
901 elif self.deferredResults.has_key(taskid):
902 if block:
902 if block:
903 d = defer.Deferred()
903 d = defer.Deferred()
904 self.deferredResults[taskid].append(d)
904 self.deferredResults[taskid].append(d)
905 return d
905 return d
906 else:
906 else:
907 return defer.succeed(None)
907 return defer.succeed(None)
908 else:
908 else:
909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
910
910
911 def abort(self, taskid):
911 def abort(self, taskid):
912 """
912 """
913 Remove a task from the queue if it has not been run already.
913 Remove a task from the queue if it has not been run already.
914 """
914 """
915 if not isinstance(taskid, int):
915 if not isinstance(taskid, int):
916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
917 try:
917 try:
918 self.scheduler.pop_task(taskid)
918 self.scheduler.pop_task(taskid)
919 except IndexError, e:
919 except IndexError, e:
920 if taskid in self.finishedResults.keys():
920 if taskid in self.finishedResults.keys():
921 d = defer.fail(IndexError("Task Already Completed"))
921 d = defer.fail(IndexError("Task Already Completed"))
922 elif taskid in self.abortPending:
922 elif taskid in self.abortPending:
923 d = defer.fail(IndexError("Task Already Aborted"))
923 d = defer.fail(IndexError("Task Already Aborted"))
924 elif taskid in self._pendingTaskIDs():# task is pending
924 elif taskid in self._pendingTaskIDs():# task is pending
925 self.abortPending.append(taskid)
925 self.abortPending.append(taskid)
926 d = defer.succeed(None)
926 d = defer.succeed(None)
927 else:
927 else:
928 d = defer.fail(e)
928 d = defer.fail(e)
929 else:
929 else:
930 d = defer.execute(self._doAbort, taskid)
930 d = defer.execute(self._doAbort, taskid)
931
931
932 return d
932 return d
933
933
934 def barrier(self, taskids):
934 def barrier(self, taskids):
935 dList = []
935 dList = []
936 if isinstance(taskids, int):
936 if isinstance(taskids, int):
937 taskids = [taskids]
937 taskids = [taskids]
938 for id in taskids:
938 for id in taskids:
939 d = self.get_task_result(id, block=True)
939 d = self.get_task_result(id, block=True)
940 dList.append(d)
940 dList.append(d)
941 d = DeferredList(dList, consumeErrors=1)
941 d = DeferredList(dList, consumeErrors=1)
942 d.addCallbacks(lambda r: None)
942 d.addCallbacks(lambda r: None)
943 return d
943 return d
944
944
945 def spin(self):
945 def spin(self):
946 return defer.succeed(self.distributeTasks())
946 return defer.succeed(self.distributeTasks())
947
947
948 def queue_status(self, verbose=False):
948 def queue_status(self, verbose=False):
949 pending = self._pendingTaskIDs()
949 pending = self._pendingTaskIDs()
950 failed = []
950 failed = []
951 succeeded = []
951 succeeded = []
952 for k,v in self.finishedResults.iteritems():
952 for k,v in self.finishedResults.iteritems():
953 if not isinstance(v, failure.Failure):
953 if not isinstance(v, failure.Failure):
954 if hasattr(v,'failure'):
954 if hasattr(v,'failure'):
955 if v.failure is None:
955 if v.failure is None:
956 succeeded.append(k)
956 succeeded.append(k)
957 else:
957 else:
958 failed.append(k)
958 failed.append(k)
959 scheduled = self.scheduler.taskids
959 scheduled = self.scheduler.taskids
960 if verbose:
960 if verbose:
961 result = dict(pending=pending, failed=failed,
961 result = dict(pending=pending, failed=failed,
962 succeeded=succeeded, scheduled=scheduled)
962 succeeded=succeeded, scheduled=scheduled)
963 else:
963 else:
964 result = dict(pending=len(pending),failed=len(failed),
964 result = dict(pending=len(pending),failed=len(failed),
965 succeeded=len(succeeded),scheduled=len(scheduled))
965 succeeded=len(succeeded),scheduled=len(scheduled))
966 return defer.succeed(result)
966 return defer.succeed(result)
967
967
968 #---------------------------------------------------------------------------
968 #---------------------------------------------------------------------------
969 # Queue methods
969 # Queue methods
970 #---------------------------------------------------------------------------
970 #---------------------------------------------------------------------------
971
971
972 def _doAbort(self, taskid):
972 def _doAbort(self, taskid):
973 """
973 """
974 Helper function for aborting a pending task.
974 Helper function for aborting a pending task.
975 """
975 """
976 log.msg("Task aborted: %i" % taskid)
976 log.msg("Task aborted: %i" % taskid)
977 result = failure.Failure(error.TaskAborted())
977 result = failure.Failure(error.TaskAborted())
978 self._finishTask(taskid, result)
978 self._finishTask(taskid, result)
979 if taskid in self.abortPending:
979 if taskid in self.abortPending:
980 self.abortPending.remove(taskid)
980 self.abortPending.remove(taskid)
981
981
982 def _finishTask(self, taskid, result):
982 def _finishTask(self, taskid, result):
983 dlist = self.deferredResults.pop(taskid)
983 dlist = self.deferredResults.pop(taskid)
984 # result.taskid = taskid # The TaskResult should save the taskid
984 # result.taskid = taskid # The TaskResult should save the taskid
985 self.finishedResults[taskid] = result
985 self.finishedResults[taskid] = result
986 for d in dlist:
986 for d in dlist:
987 d.callback(result)
987 d.callback(result)
988
988
989 def distributeTasks(self):
989 def distributeTasks(self):
990 """
990 """
991 Distribute tasks while self.scheduler has things to do.
991 Distribute tasks while self.scheduler has things to do.
992 """
992 """
993 log.msg("distributing Tasks")
993 log.msg("distributing Tasks")
994 worker, task = self.scheduler.schedule()
994 worker, task = self.scheduler.schedule()
995 if not worker and not task:
995 if not worker and not task:
996 if self.idleLater and self.idleLater.called:# we are inside failIdle
996 if self.idleLater and self.idleLater.called:# we are inside failIdle
997 self.idleLater = None
997 self.idleLater = None
998 else:
998 else:
999 self.checkIdle()
999 self.checkIdle()
1000 return False
1000 return False
1001 # else something to do:
1001 # else something to do:
1002 while worker and task:
1002 while worker and task:
1003 # get worker and task
1003 # get worker and task
1004 # add to pending
1004 # add to pending
1005 self.pendingTasks[worker.workerid] = task
1005 self.pendingTasks[worker.workerid] = task
1006 # run/link callbacks
1006 # run/link callbacks
1007 d = worker.run(task)
1007 d = worker.run(task)
1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1010 worker, task = self.scheduler.schedule()
1010 worker, task = self.scheduler.schedule()
1011 # check for idle timeout:
1011 # check for idle timeout:
1012 self.checkIdle()
1012 self.checkIdle()
1013 return True
1013 return True
1014
1014
1015 def checkIdle(self):
1015 def checkIdle(self):
1016 if self.idleLater and not self.idleLater.called:
1016 if self.idleLater and not self.idleLater.called:
1017 self.idleLater.cancel()
1017 self.idleLater.cancel()
1018 if self.scheduler.ntasks and self.workers and \
1018 if self.scheduler.ntasks and self.workers and \
1019 self.scheduler.nworkers == len(self.workers):
1019 self.scheduler.nworkers == len(self.workers):
1020 self.idleLater = reactor.callLater(self.timeout, self.failIdle)
1020 self.idleLater = reactor.callLater(self.timeout, self.failIdle)
1021 else:
1021 else:
1022 self.idleLater = None
1022 self.idleLater = None
1023
1023
1024 def failIdle(self):
1024 def failIdle(self):
1025 if not self.distributeTasks():
1025 if not self.distributeTasks():
1026 while self.scheduler.ntasks:
1026 while self.scheduler.ntasks:
1027 t = self.scheduler.pop_task()
1027 t = self.scheduler.pop_task()
1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1029 msg += " for %i seconds"%self.timeout
1029 msg += " for %i seconds"%self.timeout
1030 log.msg("Task aborted by timeout: %i" % t.taskid)
1030 log.msg("Task aborted by timeout: %i" % t.taskid)
1031 f = failure.Failure(error.TaskTimeout(msg))
1031 f = failure.Failure(error.TaskTimeout(msg))
1032 self._finishTask(t.taskid, f)
1032 self._finishTask(t.taskid, f)
1033 self.idleLater = None
1033 self.idleLater = None
1034
1034
1035
1035
1036 def taskCompleted(self, success_and_result, taskid, workerid):
1036 def taskCompleted(self, success_and_result, taskid, workerid):
1037 """This is the err/callback for a completed task."""
1037 """This is the err/callback for a completed task."""
1038 success, result = success_and_result
1038 success, result = success_and_result
1039 try:
1039 try:
1040 task = self.pendingTasks.pop(workerid)
1040 task = self.pendingTasks.pop(workerid)
1041 except:
1041 except:
1042 # this should not happen
1042 # this should not happen
1043 log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
1043 log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
1044 log.msg("Result: %r"%result)
1044 log.msg("Result: %r"%result)
1045 log.msg("Pending tasks: %s"%self.pendingTasks)
1045 log.msg("Pending tasks: %s"%self.pendingTasks)
1046 return
1046 return
1047
1047
1048 # Check if aborted while pending
1048 # Check if aborted while pending
1049 aborted = False
1049 aborted = False
1050 if taskid in self.abortPending:
1050 if taskid in self.abortPending:
1051 self._doAbort(taskid)
1051 self._doAbort(taskid)
1052 aborted = True
1052 aborted = True
1053
1053
1054 if not aborted:
1054 if not aborted:
1055 if not success:
1055 if not success:
1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1057 if task.retries > 0: # resubmit
1057 if task.retries > 0: # resubmit
1058 task.retries -= 1
1058 task.retries -= 1
1059 self.scheduler.add_task(task)
1059 self.scheduler.add_task(task)
1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1061 log.msg(s)
1061 log.msg(s)
1062 self.distributeTasks()
1062 self.distributeTasks()
1063 elif isinstance(task.recovery_task, BaseTask) and \
1063 elif isinstance(task.recovery_task, BaseTask) and \
1064 task.recovery_task.retries > -1:
1064 task.recovery_task.retries > -1:
1065 # retries = -1 is to prevent infinite recovery_task loop
1065 # retries = -1 is to prevent infinite recovery_task loop
1066 task.retries = -1
1066 task.retries = -1
1067 task.recovery_task.taskid = taskid
1067 task.recovery_task.taskid = taskid
1068 task = task.recovery_task
1068 task = task.recovery_task
1069 self.scheduler.add_task(task)
1069 self.scheduler.add_task(task)
1070 s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
1070 s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
1071 log.msg(s)
1071 log.msg(s)
1072 self.distributeTasks()
1072 self.distributeTasks()
1073 else: # done trying
1073 else: # done trying
1074 self._finishTask(taskid, result)
1074 self._finishTask(taskid, result)
1075 # wait a second before readmitting a worker that failed
1075 # wait a second before readmitting a worker that failed
1076 # it may have died, and not yet been unregistered
1076 # it may have died, and not yet been unregistered
1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1078 else: # we succeeded
1078 else: # we succeeded
1079 log.msg("Task completed: %i"% taskid)
1079 log.msg("Task completed: %i"% taskid)
1080 self._finishTask(taskid, result)
1080 self._finishTask(taskid, result)
1081 self.readmitWorker(workerid)
1081 self.readmitWorker(workerid)
1082 else: # we aborted the task
1082 else: # we aborted the task
1083 if not success:
1083 if not success:
1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1085 else:
1085 else:
1086 self.readmitWorker(workerid)
1086 self.readmitWorker(workerid)
1087
1087
1088 def readmitWorker(self, workerid):
1088 def readmitWorker(self, workerid):
1089 """
1089 """
1090 Readmit a worker to the scheduler.
1090 Readmit a worker to the scheduler.
1091
1091
1092 This is outside `taskCompleted` because of the `failurePenalty` being
1092 This is outside `taskCompleted` because of the `failurePenalty` being
1093 implemented through `reactor.callLater`.
1093 implemented through `reactor.callLater`.
1094 """
1094 """
1095
1095
1096 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
1096 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
1097 self.scheduler.add_worker(self.workers[workerid])
1097 self.scheduler.add_worker(self.workers[workerid])
1098 self.distributeTasks()
1098 self.distributeTasks()
1099
1099
1100 def clear(self):
1100 def clear(self):
1101 """
1101 """
1102 Clear all previously run tasks from the task controller.
1102 Clear all previously run tasks from the task controller.
1103
1103
1104 This is needed because the task controller keep all task results
1104 This is needed because the task controller keep all task results
1105 in memory. This can be a problem is there are many completed
1105 in memory. This can be a problem is there are many completed
1106 tasks. Users should call this periodically to clean out these
1106 tasks. Users should call this periodically to clean out these
1107 cached task results.
1107 cached task results.
1108 """
1108 """
1109 self.finishedResults = {}
1109 self.finishedResults = {}
1110 return defer.succeed(None)
1110 return defer.succeed(None)
1111
1111
1112
1112
1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
General Comments 0
You need to be logged in to leave comments. Login now