##// END OF EJS Templates
added optional taskids argument to TaskClient.clear()...
Fernando Perez -
Show More
@@ -1,1116 +1,1142 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_task -*-
2 # -*- test-case-name: IPython.kernel.tests.test_task -*-
3
3
4 """Task farming representation of the ControllerService."""
4 """Task farming representation of the ControllerService."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 # Tell nose to skip the testing of this module
19 # Tell nose to skip the testing of this module
20 __test__ = {}
20 __test__ = {}
21
21
22 import copy, time
22 import copy, time
23 from types import FunctionType
23 from types import FunctionType
24
24
25 import zope.interface as zi, string
25 import zope.interface as zi, string
26 from twisted.internet import defer, reactor
26 from twisted.internet import defer, reactor
27 from twisted.python import components, log, failure
27 from twisted.python import components, log, failure
28
28
29 from IPython.kernel.util import printer
29 from IPython.kernel.util import printer
30 from IPython.kernel import engineservice as es, error
30 from IPython.kernel import engineservice as es, error
31 from IPython.kernel import controllerservice as cs
31 from IPython.kernel import controllerservice as cs
32 from IPython.kernel.twistedutil import gatherBoth, DeferredList
32 from IPython.kernel.twistedutil import gatherBoth, DeferredList
33
33
34 from IPython.kernel.pickleutil import can, uncan, CannedFunction
34 from IPython.kernel.pickleutil import can, uncan, CannedFunction
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Definition of the Task objects
37 # Definition of the Task objects
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 time_format = '%Y/%m/%d %H:%M:%S'
40 time_format = '%Y/%m/%d %H:%M:%S'
41
41
42 class ITask(zi.Interface):
42 class ITask(zi.Interface):
43 """
43 """
44 This interface provides a generic definition of what constitutes a task.
44 This interface provides a generic definition of what constitutes a task.
45
45
46 There are two sides to a task. First a task needs to take input from
46 There are two sides to a task. First a task needs to take input from
47 a user to determine what work is performed by the task. Second, the
47 a user to determine what work is performed by the task. Second, the
48 task needs to have the logic that knows how to turn that information
48 task needs to have the logic that knows how to turn that information
49 info specific calls to a worker, through the `IQueuedEngine` interface.
49 info specific calls to a worker, through the `IQueuedEngine` interface.
50
50
51 Many method in this class get two things passed to them: a Deferred
51 Many method in this class get two things passed to them: a Deferred
52 and an IQueuedEngine implementer. Such methods should register callbacks
52 and an IQueuedEngine implementer. Such methods should register callbacks
53 on the Deferred that use the IQueuedEngine to accomplish something. See
53 on the Deferred that use the IQueuedEngine to accomplish something. See
54 the existing task objects for examples.
54 the existing task objects for examples.
55 """
55 """
56
56
57 zi.Attribute('retries','How many times to retry the task')
57 zi.Attribute('retries','How many times to retry the task')
58 zi.Attribute('recovery_task','A task to try if the initial one fails')
58 zi.Attribute('recovery_task','A task to try if the initial one fails')
59 zi.Attribute('taskid','the id of the task')
59 zi.Attribute('taskid','the id of the task')
60
60
61 def start_time(result):
61 def start_time(result):
62 """
62 """
63 Do anything needed to start the timing of the task.
63 Do anything needed to start the timing of the task.
64
64
65 Must simply return the result after starting the timers.
65 Must simply return the result after starting the timers.
66 """
66 """
67
67
68 def stop_time(result):
68 def stop_time(result):
69 """
69 """
70 Do anything needed to stop the timing of the task.
70 Do anything needed to stop the timing of the task.
71
71
72 Must simply return the result after stopping the timers. This
72 Must simply return the result after stopping the timers. This
73 method will usually set attributes that are used by `process_result`
73 method will usually set attributes that are used by `process_result`
74 in building result of the task.
74 in building result of the task.
75 """
75 """
76
76
77 def pre_task(d, queued_engine):
77 def pre_task(d, queued_engine):
78 """Do something with the queued_engine before the task is run.
78 """Do something with the queued_engine before the task is run.
79
79
80 This method should simply add callbacks to the input Deferred
80 This method should simply add callbacks to the input Deferred
81 that do something with the `queued_engine` before the task is run.
81 that do something with the `queued_engine` before the task is run.
82
82
83 :Parameters:
83 :Parameters:
84 d : Deferred
84 d : Deferred
85 The deferred that actions should be attached to
85 The deferred that actions should be attached to
86 queued_engine : IQueuedEngine implementer
86 queued_engine : IQueuedEngine implementer
87 The worker that has been allocated to perform the task
87 The worker that has been allocated to perform the task
88 """
88 """
89
89
90 def post_task(d, queued_engine):
90 def post_task(d, queued_engine):
91 """Do something with the queued_engine after the task is run.
91 """Do something with the queued_engine after the task is run.
92
92
93 This method should simply add callbacks to the input Deferred
93 This method should simply add callbacks to the input Deferred
94 that do something with the `queued_engine` before the task is run.
94 that do something with the `queued_engine` before the task is run.
95
95
96 :Parameters:
96 :Parameters:
97 d : Deferred
97 d : Deferred
98 The deferred that actions should be attached to
98 The deferred that actions should be attached to
99 queued_engine : IQueuedEngine implementer
99 queued_engine : IQueuedEngine implementer
100 The worker that has been allocated to perform the task
100 The worker that has been allocated to perform the task
101 """
101 """
102
102
103 def submit_task(d, queued_engine):
103 def submit_task(d, queued_engine):
104 """Submit a task using the `queued_engine` we have been allocated.
104 """Submit a task using the `queued_engine` we have been allocated.
105
105
106 When a task is ready to run, this method is called. This method
106 When a task is ready to run, this method is called. This method
107 must take the internal information of the task and make suitable
107 must take the internal information of the task and make suitable
108 calls on the queued_engine to have the actual work done.
108 calls on the queued_engine to have the actual work done.
109
109
110 This method should simply add callbacks to the input Deferred
110 This method should simply add callbacks to the input Deferred
111 that do something with the `queued_engine` before the task is run.
111 that do something with the `queued_engine` before the task is run.
112
112
113 :Parameters:
113 :Parameters:
114 d : Deferred
114 d : Deferred
115 The deferred that actions should be attached to
115 The deferred that actions should be attached to
116 queued_engine : IQueuedEngine implementer
116 queued_engine : IQueuedEngine implementer
117 The worker that has been allocated to perform the task
117 The worker that has been allocated to perform the task
118 """
118 """
119
119
120 def process_result(d, result, engine_id):
120 def process_result(d, result, engine_id):
121 """Take a raw task result.
121 """Take a raw task result.
122
122
123 Objects that implement `ITask` can choose how the result of running
123 Objects that implement `ITask` can choose how the result of running
124 the task is presented. This method takes the raw result and
124 the task is presented. This method takes the raw result and
125 does this logic. Two example are the `MapTask` which simply returns
125 does this logic. Two example are the `MapTask` which simply returns
126 the raw result or a `Failure` object and the `StringTask` which
126 the raw result or a `Failure` object and the `StringTask` which
127 returns a `TaskResult` object.
127 returns a `TaskResult` object.
128
128
129 :Parameters:
129 :Parameters:
130 d : Deferred
130 d : Deferred
131 The deferred that actions should be attached to
131 The deferred that actions should be attached to
132 result : object
132 result : object
133 The raw task result that needs to be wrapped
133 The raw task result that needs to be wrapped
134 engine_id : int
134 engine_id : int
135 The id of the engine that did the task
135 The id of the engine that did the task
136
136
137 :Returns:
137 :Returns:
138 The result, as a tuple of the form: (success, result).
138 The result, as a tuple of the form: (success, result).
139 Here, success is a boolean indicating if the task
139 Here, success is a boolean indicating if the task
140 succeeded or failed and result is the result.
140 succeeded or failed and result is the result.
141 """
141 """
142
142
143 def check_depend(properties):
143 def check_depend(properties):
144 """Check properties to see if the task should be run.
144 """Check properties to see if the task should be run.
145
145
146 :Parameters:
146 :Parameters:
147 properties : dict
147 properties : dict
148 A dictionary of properties that an engine has set
148 A dictionary of properties that an engine has set
149
149
150 :Returns:
150 :Returns:
151 True if the task should be run, False otherwise
151 True if the task should be run, False otherwise
152 """
152 """
153
153
154 def can_task(self):
154 def can_task(self):
155 """Serialize (can) any functions in the task for pickling.
155 """Serialize (can) any functions in the task for pickling.
156
156
157 Subclasses must override this method and make sure that all
157 Subclasses must override this method and make sure that all
158 functions in the task are canned by calling `can` on the
158 functions in the task are canned by calling `can` on the
159 function.
159 function.
160 """
160 """
161
161
162 def uncan_task(self):
162 def uncan_task(self):
163 """Unserialize (uncan) any canned function in the task."""
163 """Unserialize (uncan) any canned function in the task."""
164
164
165 class BaseTask(object):
165 class BaseTask(object):
166 """
166 """
167 Common fuctionality for all objects implementing `ITask`.
167 Common fuctionality for all objects implementing `ITask`.
168 """
168 """
169
169
170 zi.implements(ITask)
170 zi.implements(ITask)
171
171
172 def __init__(self, clear_before=False, clear_after=False, retries=0,
172 def __init__(self, clear_before=False, clear_after=False, retries=0,
173 recovery_task=None, depend=None):
173 recovery_task=None, depend=None):
174 """
174 """
175 Make a generic task.
175 Make a generic task.
176
176
177 :Parameters:
177 :Parameters:
178 clear_before : boolean
178 clear_before : boolean
179 Should the engines namespace be cleared before the task
179 Should the engines namespace be cleared before the task
180 is run
180 is run
181 clear_after : boolean
181 clear_after : boolean
182 Should the engines namespace be clear after the task is run
182 Should the engines namespace be clear after the task is run
183 retries : int
183 retries : int
184 The number of times a task should be retries upon failure
184 The number of times a task should be retries upon failure
185 recovery_task : any task object
185 recovery_task : any task object
186 If a task fails and it has a recovery_task, that is run
186 If a task fails and it has a recovery_task, that is run
187 upon a retry
187 upon a retry
188 depend : FunctionType
188 depend : FunctionType
189 A function that is called to test for properties. This function
189 A function that is called to test for properties. This function
190 must take one argument, the properties dict and return a boolean
190 must take one argument, the properties dict and return a boolean
191 """
191 """
192 self.clear_before = clear_before
192 self.clear_before = clear_before
193 self.clear_after = clear_after
193 self.clear_after = clear_after
194 self.retries = retries
194 self.retries = retries
195 self.recovery_task = recovery_task
195 self.recovery_task = recovery_task
196 self.depend = depend
196 self.depend = depend
197 self.taskid = None
197 self.taskid = None
198
198
199 def start_time(self, result):
199 def start_time(self, result):
200 """
200 """
201 Start the basic timers.
201 Start the basic timers.
202 """
202 """
203 self.start = time.time()
203 self.start = time.time()
204 self.start_struct = time.localtime()
204 self.start_struct = time.localtime()
205 return result
205 return result
206
206
207 def stop_time(self, result):
207 def stop_time(self, result):
208 """
208 """
209 Stop the basic timers.
209 Stop the basic timers.
210 """
210 """
211 self.stop = time.time()
211 self.stop = time.time()
212 self.stop_struct = time.localtime()
212 self.stop_struct = time.localtime()
213 self.duration = self.stop - self.start
213 self.duration = self.stop - self.start
214 self.submitted = time.strftime(time_format, self.start_struct)
214 self.submitted = time.strftime(time_format, self.start_struct)
215 self.completed = time.strftime(time_format)
215 self.completed = time.strftime(time_format)
216 return result
216 return result
217
217
218 def pre_task(self, d, queued_engine):
218 def pre_task(self, d, queued_engine):
219 """
219 """
220 Clear the engine before running the task if clear_before is set.
220 Clear the engine before running the task if clear_before is set.
221 """
221 """
222 if self.clear_before:
222 if self.clear_before:
223 d.addCallback(lambda r: queued_engine.reset())
223 d.addCallback(lambda r: queued_engine.reset())
224
224
225 def post_task(self, d, queued_engine):
225 def post_task(self, d, queued_engine):
226 """
226 """
227 Clear the engine after running the task if clear_after is set.
227 Clear the engine after running the task if clear_after is set.
228 """
228 """
229 def reseter(result):
229 def reseter(result):
230 queued_engine.reset()
230 queued_engine.reset()
231 return result
231 return result
232 if self.clear_after:
232 if self.clear_after:
233 d.addBoth(reseter)
233 d.addBoth(reseter)
234
234
235 def submit_task(self, d, queued_engine):
235 def submit_task(self, d, queued_engine):
236 raise NotImplementedError('submit_task must be implemented in a subclass')
236 raise NotImplementedError('submit_task must be implemented in a subclass')
237
237
238 def process_result(self, result, engine_id):
238 def process_result(self, result, engine_id):
239 """
239 """
240 Process a task result.
240 Process a task result.
241
241
242 This is the default `process_result` that just returns the raw
242 This is the default `process_result` that just returns the raw
243 result or a `Failure`.
243 result or a `Failure`.
244 """
244 """
245 if isinstance(result, failure.Failure):
245 if isinstance(result, failure.Failure):
246 return (False, result)
246 return (False, result)
247 else:
247 else:
248 return (True, result)
248 return (True, result)
249
249
250 def check_depend(self, properties):
250 def check_depend(self, properties):
251 """
251 """
252 Calls self.depend(properties) to see if a task should be run.
252 Calls self.depend(properties) to see if a task should be run.
253 """
253 """
254 if self.depend is not None:
254 if self.depend is not None:
255 return self.depend(properties)
255 return self.depend(properties)
256 else:
256 else:
257 return True
257 return True
258
258
259 def can_task(self):
259 def can_task(self):
260 self.depend = can(self.depend)
260 self.depend = can(self.depend)
261 if isinstance(self.recovery_task, BaseTask):
261 if isinstance(self.recovery_task, BaseTask):
262 self.recovery_task.can_task()
262 self.recovery_task.can_task()
263
263
264 def uncan_task(self):
264 def uncan_task(self):
265 self.depend = uncan(self.depend)
265 self.depend = uncan(self.depend)
266 if isinstance(self.recovery_task, BaseTask):
266 if isinstance(self.recovery_task, BaseTask):
267 self.recovery_task.uncan_task()
267 self.recovery_task.uncan_task()
268
268
269 class MapTask(BaseTask):
269 class MapTask(BaseTask):
270 """
270 """
271 A task that consists of a function and arguments.
271 A task that consists of a function and arguments.
272 """
272 """
273
273
274 zi.implements(ITask)
274 zi.implements(ITask)
275
275
276 def __init__(self, function, args=None, kwargs=None, clear_before=False,
276 def __init__(self, function, args=None, kwargs=None, clear_before=False,
277 clear_after=False, retries=0, recovery_task=None, depend=None):
277 clear_after=False, retries=0, recovery_task=None, depend=None):
278 """
278 """
279 Create a task based on a function, args and kwargs.
279 Create a task based on a function, args and kwargs.
280
280
281 This is a simple type of task that consists of calling:
281 This is a simple type of task that consists of calling:
282 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
282 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
283
283
284 The return value of the function, or a `Failure` wrapping an
284 The return value of the function, or a `Failure` wrapping an
285 exception is the task result for this type of task.
285 exception is the task result for this type of task.
286 """
286 """
287 BaseTask.__init__(self, clear_before, clear_after, retries,
287 BaseTask.__init__(self, clear_before, clear_after, retries,
288 recovery_task, depend)
288 recovery_task, depend)
289 if not isinstance(function, FunctionType):
289 if not isinstance(function, FunctionType):
290 raise TypeError('a task function must be a FunctionType')
290 raise TypeError('a task function must be a FunctionType')
291 self.function = function
291 self.function = function
292 if args is None:
292 if args is None:
293 self.args = ()
293 self.args = ()
294 else:
294 else:
295 self.args = args
295 self.args = args
296 if not isinstance(self.args, (list, tuple)):
296 if not isinstance(self.args, (list, tuple)):
297 raise TypeError('a task args must be a list or tuple')
297 raise TypeError('a task args must be a list or tuple')
298 if kwargs is None:
298 if kwargs is None:
299 self.kwargs = {}
299 self.kwargs = {}
300 else:
300 else:
301 self.kwargs = kwargs
301 self.kwargs = kwargs
302 if not isinstance(self.kwargs, dict):
302 if not isinstance(self.kwargs, dict):
303 raise TypeError('a task kwargs must be a dict')
303 raise TypeError('a task kwargs must be a dict')
304
304
305 def submit_task(self, d, queued_engine):
305 def submit_task(self, d, queued_engine):
306 d.addCallback(lambda r: queued_engine.push_function(
306 d.addCallback(lambda r: queued_engine.push_function(
307 dict(_ipython_task_function=self.function))
307 dict(_ipython_task_function=self.function))
308 )
308 )
309 d.addCallback(lambda r: queued_engine.push(
309 d.addCallback(lambda r: queued_engine.push(
310 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
310 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
311 )
311 )
312 d.addCallback(lambda r: queued_engine.execute(
312 d.addCallback(lambda r: queued_engine.execute(
313 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
313 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
314 )
314 )
315 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
315 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
316
316
317 def can_task(self):
317 def can_task(self):
318 self.function = can(self.function)
318 self.function = can(self.function)
319 BaseTask.can_task(self)
319 BaseTask.can_task(self)
320
320
321 def uncan_task(self):
321 def uncan_task(self):
322 self.function = uncan(self.function)
322 self.function = uncan(self.function)
323 BaseTask.uncan_task(self)
323 BaseTask.uncan_task(self)
324
324
325
325
326 class StringTask(BaseTask):
326 class StringTask(BaseTask):
327 """
327 """
328 A task that consists of a string of Python code to run.
328 A task that consists of a string of Python code to run.
329 """
329 """
330
330
331 def __init__(self, expression, pull=None, push=None,
331 def __init__(self, expression, pull=None, push=None,
332 clear_before=False, clear_after=False, retries=0,
332 clear_before=False, clear_after=False, retries=0,
333 recovery_task=None, depend=None):
333 recovery_task=None, depend=None):
334 """
334 """
335 Create a task based on a Python expression and variables
335 Create a task based on a Python expression and variables
336
336
337 This type of task lets you push a set of variables to the engines
337 This type of task lets you push a set of variables to the engines
338 namespace, run a Python string in that namespace and then bring back
338 namespace, run a Python string in that namespace and then bring back
339 a different set of Python variables as the result.
339 a different set of Python variables as the result.
340
340
341 Because this type of task can return many results (through the
341 Because this type of task can return many results (through the
342 `pull` keyword argument) it returns a special `TaskResult` object
342 `pull` keyword argument) it returns a special `TaskResult` object
343 that wraps the pulled variables, statistics about the run and
343 that wraps the pulled variables, statistics about the run and
344 any exceptions raised.
344 any exceptions raised.
345 """
345 """
346 if not isinstance(expression, str):
346 if not isinstance(expression, str):
347 raise TypeError('a task expression must be a string')
347 raise TypeError('a task expression must be a string')
348 self.expression = expression
348 self.expression = expression
349
349
350 if pull==None:
350 if pull==None:
351 self.pull = ()
351 self.pull = ()
352 elif isinstance(pull, str):
352 elif isinstance(pull, str):
353 self.pull = (pull,)
353 self.pull = (pull,)
354 elif isinstance(pull, (list, tuple)):
354 elif isinstance(pull, (list, tuple)):
355 self.pull = pull
355 self.pull = pull
356 else:
356 else:
357 raise TypeError('pull must be str or a sequence of strs')
357 raise TypeError('pull must be str or a sequence of strs')
358
358
359 if push==None:
359 if push==None:
360 self.push = {}
360 self.push = {}
361 elif isinstance(push, dict):
361 elif isinstance(push, dict):
362 self.push = push
362 self.push = push
363 else:
363 else:
364 raise TypeError('push must be a dict')
364 raise TypeError('push must be a dict')
365
365
366 BaseTask.__init__(self, clear_before, clear_after, retries,
366 BaseTask.__init__(self, clear_before, clear_after, retries,
367 recovery_task, depend)
367 recovery_task, depend)
368
368
369 def submit_task(self, d, queued_engine):
369 def submit_task(self, d, queued_engine):
370 if self.push is not None:
370 if self.push is not None:
371 d.addCallback(lambda r: queued_engine.push(self.push))
371 d.addCallback(lambda r: queued_engine.push(self.push))
372
372
373 d.addCallback(lambda r: queued_engine.execute(self.expression))
373 d.addCallback(lambda r: queued_engine.execute(self.expression))
374
374
375 if self.pull is not None:
375 if self.pull is not None:
376 d.addCallback(lambda r: queued_engine.pull(self.pull))
376 d.addCallback(lambda r: queued_engine.pull(self.pull))
377 else:
377 else:
378 d.addCallback(lambda r: None)
378 d.addCallback(lambda r: None)
379
379
380 def process_result(self, result, engine_id):
380 def process_result(self, result, engine_id):
381 if isinstance(result, failure.Failure):
381 if isinstance(result, failure.Failure):
382 tr = TaskResult(result, engine_id)
382 tr = TaskResult(result, engine_id)
383 else:
383 else:
384 if self.pull is None:
384 if self.pull is None:
385 resultDict = {}
385 resultDict = {}
386 elif len(self.pull) == 1:
386 elif len(self.pull) == 1:
387 resultDict = {self.pull[0]:result}
387 resultDict = {self.pull[0]:result}
388 else:
388 else:
389 resultDict = dict(zip(self.pull, result))
389 resultDict = dict(zip(self.pull, result))
390 tr = TaskResult(resultDict, engine_id)
390 tr = TaskResult(resultDict, engine_id)
391 # Assign task attributes
391 # Assign task attributes
392 tr.submitted = self.submitted
392 tr.submitted = self.submitted
393 tr.completed = self.completed
393 tr.completed = self.completed
394 tr.duration = self.duration
394 tr.duration = self.duration
395 if hasattr(self,'taskid'):
395 if hasattr(self,'taskid'):
396 tr.taskid = self.taskid
396 tr.taskid = self.taskid
397 else:
397 else:
398 tr.taskid = None
398 tr.taskid = None
399 if isinstance(result, failure.Failure):
399 if isinstance(result, failure.Failure):
400 return (False, tr)
400 return (False, tr)
401 else:
401 else:
402 return (True, tr)
402 return (True, tr)
403
403
404 class ResultNS(object):
404 class ResultNS(object):
405 """
405 """
406 A dict like object for holding the results of a task.
406 A dict like object for holding the results of a task.
407
407
408 The result namespace object for use in `TaskResult` objects as tr.ns.
408 The result namespace object for use in `TaskResult` objects as tr.ns.
409 It builds an object from a dictionary, such that it has attributes
409 It builds an object from a dictionary, such that it has attributes
410 according to the key,value pairs of the dictionary.
410 according to the key,value pairs of the dictionary.
411
411
412 This works by calling setattr on ALL key,value pairs in the dict. If a user
412 This works by calling setattr on ALL key,value pairs in the dict. If a user
413 chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
413 chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
414 This can be a bad idea, as it may corrupt standard behavior of the
414 This can be a bad idea, as it may corrupt standard behavior of the
415 ns object.
415 ns object.
416
416
417 Examples
417 Examples
418 --------
418 --------
419
419
420 >>> ns = ResultNS({'a':17,'foo':range(3)})
420 >>> ns = ResultNS({'a':17,'foo':range(3)})
421 >>> print ns
421 >>> print ns
422 NS{'a': 17, 'foo': [0, 1, 2]}
422 NS{'a': 17, 'foo': [0, 1, 2]}
423 >>> ns.a
423 >>> ns.a
424 17
424 17
425 >>> ns['foo']
425 >>> ns['foo']
426 [0, 1, 2]
426 [0, 1, 2]
427 """
427 """
428 def __init__(self, dikt):
428 def __init__(self, dikt):
429 for k,v in dikt.iteritems():
429 for k,v in dikt.iteritems():
430 setattr(self,k,v)
430 setattr(self,k,v)
431
431
432 def __repr__(self):
432 def __repr__(self):
433 l = dir(self)
433 l = dir(self)
434 d = {}
434 d = {}
435 for k in l:
435 for k in l:
436 # do not print private objects
436 # do not print private objects
437 if k[:2] != '__' and k[-2:] != '__':
437 if k[:2] != '__' and k[-2:] != '__':
438 d[k] = getattr(self, k)
438 d[k] = getattr(self, k)
439 return "NS"+repr(d)
439 return "NS"+repr(d)
440
440
441 def __getitem__(self, key):
441 def __getitem__(self, key):
442 return getattr(self, key)
442 return getattr(self, key)
443
443
444 class TaskResult(object):
444 class TaskResult(object):
445 """
445 """
446 An object for returning task results for certain types of tasks.
446 An object for returning task results for certain types of tasks.
447
447
448 This object encapsulates the results of a task. On task
448 This object encapsulates the results of a task. On task
449 success it will have a keys attribute that will have a list
449 success it will have a keys attribute that will have a list
450 of the variables that have been pulled back. These variables
450 of the variables that have been pulled back. These variables
451 are accessible as attributes of this class as well. On
451 are accessible as attributes of this class as well. On
452 success the failure attribute will be None.
452 success the failure attribute will be None.
453
453
454 In task failure, keys will be empty, but failure will contain
454 In task failure, keys will be empty, but failure will contain
455 the failure object that encapsulates the remote exception.
455 the failure object that encapsulates the remote exception.
456 One can also simply call the `raise_exception` method of
456 One can also simply call the `raise_exception` method of
457 this class to re-raise any remote exception in the local
457 this class to re-raise any remote exception in the local
458 session.
458 session.
459
459
460 The `TaskResult` has a `.ns` member, which is a property for access
460 The `TaskResult` has a `.ns` member, which is a property for access
461 to the results. If the Task had pull=['a', 'b'], then the
461 to the results. If the Task had pull=['a', 'b'], then the
462 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
462 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
463 Accessing `tr.ns` will raise the remote failure if the task failed.
463 Accessing `tr.ns` will raise the remote failure if the task failed.
464
464
465 The `engineid` attribute should have the `engineid` of the engine
465 The `engineid` attribute should have the `engineid` of the engine
466 that ran the task. But, because engines can come and go,
466 that ran the task. But, because engines can come and go,
467 the `engineid` may not continue to be
467 the `engineid` may not continue to be
468 valid or accurate.
468 valid or accurate.
469
469
470 The `taskid` attribute simply gives the `taskid` that the task
470 The `taskid` attribute simply gives the `taskid` that the task
471 is tracked under.
471 is tracked under.
472 """
472 """
473 taskid = None
473 taskid = None
474
474
475 def _getNS(self):
475 def _getNS(self):
476 if isinstance(self.failure, failure.Failure):
476 if isinstance(self.failure, failure.Failure):
477 return self.failure.raiseException()
477 return self.failure.raiseException()
478 else:
478 else:
479 return self._ns
479 return self._ns
480
480
481 def _setNS(self, v):
481 def _setNS(self, v):
482 raise Exception("the ns attribute cannot be changed")
482 raise Exception("the ns attribute cannot be changed")
483
483
484 ns = property(_getNS, _setNS)
484 ns = property(_getNS, _setNS)
485
485
486 def __init__(self, results, engineid):
486 def __init__(self, results, engineid):
487 self.engineid = engineid
487 self.engineid = engineid
488 if isinstance(results, failure.Failure):
488 if isinstance(results, failure.Failure):
489 self.failure = results
489 self.failure = results
490 self.results = {}
490 self.results = {}
491 else:
491 else:
492 self.results = results
492 self.results = results
493 self.failure = None
493 self.failure = None
494
494
495 self._ns = ResultNS(self.results)
495 self._ns = ResultNS(self.results)
496
496
497 self.keys = self.results.keys()
497 self.keys = self.results.keys()
498
498
499 def __repr__(self):
499 def __repr__(self):
500 if self.failure is not None:
500 if self.failure is not None:
501 contents = self.failure
501 contents = self.failure
502 else:
502 else:
503 contents = self.results
503 contents = self.results
504 return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
504 return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
505
505
506 def __getitem__(self, key):
506 def __getitem__(self, key):
507 if self.failure is not None:
507 if self.failure is not None:
508 self.raise_exception()
508 self.raise_exception()
509 return self.results[key]
509 return self.results[key]
510
510
511 def raise_exception(self):
511 def raise_exception(self):
512 """Re-raise any remote exceptions in the local python session."""
512 """Re-raise any remote exceptions in the local python session."""
513 if self.failure is not None:
513 if self.failure is not None:
514 self.failure.raiseException()
514 self.failure.raiseException()
515
515
516
516
517 #-----------------------------------------------------------------------------
517 #-----------------------------------------------------------------------------
518 # The controller side of things
518 # The controller side of things
519 #-----------------------------------------------------------------------------
519 #-----------------------------------------------------------------------------
520
520
521 class IWorker(zi.Interface):
521 class IWorker(zi.Interface):
522 """The Basic Worker Interface.
522 """The Basic Worker Interface.
523
523
524 A worked is a representation of an Engine that is ready to run tasks.
524 A worked is a representation of an Engine that is ready to run tasks.
525 """
525 """
526
526
527 zi.Attribute("workerid", "the id of the worker")
527 zi.Attribute("workerid", "the id of the worker")
528
528
529 def run(task):
529 def run(task):
530 """Run task in worker's namespace.
530 """Run task in worker's namespace.
531
531
532 :Parameters:
532 :Parameters:
533 task : a `Task` object
533 task : a `Task` object
534
534
535 :Returns: `Deferred` to a tuple of (success, result) where
535 :Returns: `Deferred` to a tuple of (success, result) where
536 success if a boolean that signifies success or failure
536 success if a boolean that signifies success or failure
537 and result is the task result.
537 and result is the task result.
538 """
538 """
539
539
540
540
541 class WorkerFromQueuedEngine(object):
541 class WorkerFromQueuedEngine(object):
542 """Adapt an `IQueuedEngine` to an `IWorker` object"""
542 """Adapt an `IQueuedEngine` to an `IWorker` object"""
543
543
544 zi.implements(IWorker)
544 zi.implements(IWorker)
545
545
546 def __init__(self, qe):
546 def __init__(self, qe):
547 self.queuedEngine = qe
547 self.queuedEngine = qe
548 self.workerid = None
548 self.workerid = None
549
549
550 def _get_properties(self):
550 def _get_properties(self):
551 return self.queuedEngine.properties
551 return self.queuedEngine.properties
552
552
553 properties = property(_get_properties, lambda self, _:None)
553 properties = property(_get_properties, lambda self, _:None)
554
554
555 def run(self, task):
555 def run(self, task):
556 """Run task in worker's namespace.
556 """Run task in worker's namespace.
557
557
558 This takes a task and calls methods on the task that actually
558 This takes a task and calls methods on the task that actually
559 cause `self.queuedEngine` to do the task. See the methods of
559 cause `self.queuedEngine` to do the task. See the methods of
560 `ITask` for more information about how these methods are called.
560 `ITask` for more information about how these methods are called.
561
561
562 :Parameters:
562 :Parameters:
563 task : a `Task` object
563 task : a `Task` object
564
564
565 :Returns: `Deferred` to a tuple of (success, result) where
565 :Returns: `Deferred` to a tuple of (success, result) where
566 success if a boolean that signifies success or failure
566 success if a boolean that signifies success or failure
567 and result is the task result.
567 and result is the task result.
568 """
568 """
569 d = defer.succeed(None)
569 d = defer.succeed(None)
570 d.addCallback(task.start_time)
570 d.addCallback(task.start_time)
571 task.pre_task(d, self.queuedEngine)
571 task.pre_task(d, self.queuedEngine)
572 task.submit_task(d, self.queuedEngine)
572 task.submit_task(d, self.queuedEngine)
573 task.post_task(d, self.queuedEngine)
573 task.post_task(d, self.queuedEngine)
574 d.addBoth(task.stop_time)
574 d.addBoth(task.stop_time)
575 d.addBoth(task.process_result, self.queuedEngine.id)
575 d.addBoth(task.process_result, self.queuedEngine.id)
576 # At this point, there will be (success, result) coming down the line
576 # At this point, there will be (success, result) coming down the line
577 return d
577 return d
578
578
579
579
580 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
580 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
581
581
582 class IScheduler(zi.Interface):
582 class IScheduler(zi.Interface):
583 """The interface for a Scheduler.
583 """The interface for a Scheduler.
584 """
584 """
585 zi.Attribute("nworkers", "the number of unassigned workers")
585 zi.Attribute("nworkers", "the number of unassigned workers")
586 zi.Attribute("ntasks", "the number of unscheduled tasks")
586 zi.Attribute("ntasks", "the number of unscheduled tasks")
587 zi.Attribute("workerids", "a list of the worker ids")
587 zi.Attribute("workerids", "a list of the worker ids")
588 zi.Attribute("taskids", "a list of the task ids")
588 zi.Attribute("taskids", "a list of the task ids")
589
589
590 def add_task(task, **flags):
590 def add_task(task, **flags):
591 """Add a task to the queue of the Scheduler.
591 """Add a task to the queue of the Scheduler.
592
592
593 :Parameters:
593 :Parameters:
594 task : an `ITask` implementer
594 task : an `ITask` implementer
595 The task to be queued.
595 The task to be queued.
596 flags : dict
596 flags : dict
597 General keywords for more sophisticated scheduling
597 General keywords for more sophisticated scheduling
598 """
598 """
599
599
600 def pop_task(id=None):
600 def pop_task(id=None):
601 """Pops a task object from the queue.
601 """Pops a task object from the queue.
602
602
603 This gets the next task to be run. If no `id` is requested, the highest priority
603 This gets the next task to be run. If no `id` is requested, the highest priority
604 task is returned.
604 task is returned.
605
605
606 :Parameters:
606 :Parameters:
607 id
607 id
608 The id of the task to be popped. The default (None) is to return
608 The id of the task to be popped. The default (None) is to return
609 the highest priority task.
609 the highest priority task.
610
610
611 :Returns: an `ITask` implementer
611 :Returns: an `ITask` implementer
612
612
613 :Exceptions:
613 :Exceptions:
614 IndexError : raised if no taskid in queue
614 IndexError : raised if no taskid in queue
615 """
615 """
616
616
617 def add_worker(worker, **flags):
617 def add_worker(worker, **flags):
618 """Add a worker to the worker queue.
618 """Add a worker to the worker queue.
619
619
620 :Parameters:
620 :Parameters:
621 worker : an `IWorker` implementer
621 worker : an `IWorker` implementer
622 flags : dict
622 flags : dict
623 General keywords for more sophisticated scheduling
623 General keywords for more sophisticated scheduling
624 """
624 """
625
625
626 def pop_worker(id=None):
626 def pop_worker(id=None):
627 """Pops an IWorker object that is ready to do work.
627 """Pops an IWorker object that is ready to do work.
628
628
629 This gets the next IWorker that is ready to do work.
629 This gets the next IWorker that is ready to do work.
630
630
631 :Parameters:
631 :Parameters:
632 id : if specified, will pop worker with workerid=id, else pops
632 id : if specified, will pop worker with workerid=id, else pops
633 highest priority worker. Defaults to None.
633 highest priority worker. Defaults to None.
634
634
635 :Returns:
635 :Returns:
636 an IWorker object
636 an IWorker object
637
637
638 :Exceptions:
638 :Exceptions:
639 IndexError : raised if no workerid in queue
639 IndexError : raised if no workerid in queue
640 """
640 """
641
641
642 def ready():
642 def ready():
643 """Returns True if there is something to do, False otherwise"""
643 """Returns True if there is something to do, False otherwise"""
644
644
645 def schedule():
645 def schedule():
646 """Returns (worker,task) pair for the next task to be run."""
646 """Returns (worker,task) pair for the next task to be run."""
647
647
648
648
649 class FIFOScheduler(object):
649 class FIFOScheduler(object):
650 """
650 """
651 A basic First-In-First-Out (Queue) Scheduler.
651 A basic First-In-First-Out (Queue) Scheduler.
652
652
653 This is the default Scheduler for the `TaskController`.
653 This is the default Scheduler for the `TaskController`.
654 See the docstrings for `IScheduler` for interface details.
654 See the docstrings for `IScheduler` for interface details.
655 """
655 """
656
656
657 zi.implements(IScheduler)
657 zi.implements(IScheduler)
658
658
659 def __init__(self):
659 def __init__(self):
660 self.tasks = []
660 self.tasks = []
661 self.workers = []
661 self.workers = []
662
662
663 def _ntasks(self):
663 def _ntasks(self):
664 return len(self.tasks)
664 return len(self.tasks)
665
665
666 def _nworkers(self):
666 def _nworkers(self):
667 return len(self.workers)
667 return len(self.workers)
668
668
669 ntasks = property(_ntasks, lambda self, _:None)
669 ntasks = property(_ntasks, lambda self, _:None)
670 nworkers = property(_nworkers, lambda self, _:None)
670 nworkers = property(_nworkers, lambda self, _:None)
671
671
672 def _taskids(self):
672 def _taskids(self):
673 return [t.taskid for t in self.tasks]
673 return [t.taskid for t in self.tasks]
674
674
675 def _workerids(self):
675 def _workerids(self):
676 return [w.workerid for w in self.workers]
676 return [w.workerid for w in self.workers]
677
677
678 taskids = property(_taskids, lambda self,_:None)
678 taskids = property(_taskids, lambda self,_:None)
679 workerids = property(_workerids, lambda self,_:None)
679 workerids = property(_workerids, lambda self,_:None)
680
680
681 def add_task(self, task, **flags):
681 def add_task(self, task, **flags):
682 self.tasks.append(task)
682 self.tasks.append(task)
683
683
684 def pop_task(self, id=None):
684 def pop_task(self, id=None):
685 if id is None:
685 if id is None:
686 return self.tasks.pop(0)
686 return self.tasks.pop(0)
687 else:
687 else:
688 for i in range(len(self.tasks)):
688 for i in range(len(self.tasks)):
689 taskid = self.tasks[i].taskid
689 taskid = self.tasks[i].taskid
690 if id == taskid:
690 if id == taskid:
691 return self.tasks.pop(i)
691 return self.tasks.pop(i)
692 raise IndexError("No task #%i"%id)
692 raise IndexError("No task #%i"%id)
693
693
694 def add_worker(self, worker, **flags):
694 def add_worker(self, worker, **flags):
695 self.workers.append(worker)
695 self.workers.append(worker)
696
696
697 def pop_worker(self, id=None):
697 def pop_worker(self, id=None):
698 if id is None:
698 if id is None:
699 return self.workers.pop(0)
699 return self.workers.pop(0)
700 else:
700 else:
701 for i in range(len(self.workers)):
701 for i in range(len(self.workers)):
702 workerid = self.workers[i].workerid
702 workerid = self.workers[i].workerid
703 if id == workerid:
703 if id == workerid:
704 return self.workers.pop(i)
704 return self.workers.pop(i)
705 raise IndexError("No worker #%i"%id)
705 raise IndexError("No worker #%i"%id)
706
706
707 def schedule(self):
707 def schedule(self):
708 for t in self.tasks:
708 for t in self.tasks:
709 for w in self.workers:
709 for w in self.workers:
710 try:# do not allow exceptions to break this
710 try:# do not allow exceptions to break this
711 # Allow the task to check itself using its
711 # Allow the task to check itself using its
712 # check_depend method.
712 # check_depend method.
713 cando = t.check_depend(w.properties)
713 cando = t.check_depend(w.properties)
714 except:
714 except:
715 cando = False
715 cando = False
716 if cando:
716 if cando:
717 return self.pop_worker(w.workerid), self.pop_task(t.taskid)
717 return self.pop_worker(w.workerid), self.pop_task(t.taskid)
718 return None, None
718 return None, None
719
719
720
720
721
721
722 class LIFOScheduler(FIFOScheduler):
722 class LIFOScheduler(FIFOScheduler):
723 """
723 """
724 A Last-In-First-Out (Stack) Scheduler.
724 A Last-In-First-Out (Stack) Scheduler.
725
725
726 This scheduler should naively reward fast engines by giving
726 This scheduler should naively reward fast engines by giving
727 them more jobs. This risks starvation, but only in cases with
727 them more jobs. This risks starvation, but only in cases with
728 low load, where starvation does not really matter.
728 low load, where starvation does not really matter.
729 """
729 """
730
730
731 def add_task(self, task, **flags):
731 def add_task(self, task, **flags):
732 # self.tasks.reverse()
732 # self.tasks.reverse()
733 self.tasks.insert(0, task)
733 self.tasks.insert(0, task)
734 # self.tasks.reverse()
734 # self.tasks.reverse()
735
735
736 def add_worker(self, worker, **flags):
736 def add_worker(self, worker, **flags):
737 # self.workers.reverse()
737 # self.workers.reverse()
738 self.workers.insert(0, worker)
738 self.workers.insert(0, worker)
739 # self.workers.reverse()
739 # self.workers.reverse()
740
740
741
741
742 class ITaskController(cs.IControllerBase):
742 class ITaskController(cs.IControllerBase):
743 """
743 """
744 The Task based interface to a `ControllerService` object
744 The Task based interface to a `ControllerService` object
745
745
746 This adapts a `ControllerService` to the ITaskController interface.
746 This adapts a `ControllerService` to the ITaskController interface.
747 """
747 """
748
748
749 def run(task):
749 def run(task):
750 """
750 """
751 Run a task.
751 Run a task.
752
752
753 :Parameters:
753 :Parameters:
754 task : an IPython `Task` object
754 task : an IPython `Task` object
755
755
756 :Returns: the integer ID of the task
756 :Returns: the integer ID of the task
757 """
757 """
758
758
759 def get_task_result(taskid, block=False):
759 def get_task_result(taskid, block=False):
760 """
760 """
761 Get the result of a task by its ID.
761 Get the result of a task by its ID.
762
762
763 :Parameters:
763 :Parameters:
764 taskid : int
764 taskid : int
765 the id of the task whose result is requested
765 the id of the task whose result is requested
766
766
767 :Returns: `Deferred` to the task result if the task is done, and None
767 :Returns: `Deferred` to the task result if the task is done, and None
768 if not.
768 if not.
769
769
770 :Exceptions:
770 :Exceptions:
771 actualResult will be an `IndexError` if no such task has been submitted
771 actualResult will be an `IndexError` if no such task has been submitted
772 """
772 """
773
773
774 def abort(taskid):
774 def abort(taskid):
775 """Remove task from queue if task is has not been submitted.
775 """Remove task from queue if task is has not been submitted.
776
776
777 If the task has already been submitted, wait for it to finish and discard
777 If the task has already been submitted, wait for it to finish and discard
778 results and prevent resubmission.
778 results and prevent resubmission.
779
779
780 :Parameters:
780 :Parameters:
781 taskid : the id of the task to be aborted
781 taskid : the id of the task to be aborted
782
782
783 :Returns:
783 :Returns:
784 `Deferred` to abort attempt completion. Will be None on success.
784 `Deferred` to abort attempt completion. Will be None on success.
785
785
786 :Exceptions:
786 :Exceptions:
787 deferred will fail with `IndexError` if no such task has been submitted
787 deferred will fail with `IndexError` if no such task has been submitted
788 or the task has already completed.
788 or the task has already completed.
789 """
789 """
790
790
791 def barrier(taskids):
791 def barrier(taskids):
792 """
792 """
793 Block until the list of taskids are completed.
793 Block until the list of taskids are completed.
794
794
795 Returns None on success.
795 Returns None on success.
796 """
796 """
797
797
798 def spin():
798 def spin():
799 """
799 """
800 Touch the scheduler, to resume scheduling without submitting a task.
800 Touch the scheduler, to resume scheduling without submitting a task.
801 """
801 """
802
802
803 def queue_status(verbose=False):
803 def queue_status(verbose=False):
804 """
804 """
805 Get a dictionary with the current state of the task queue.
805 Get a dictionary with the current state of the task queue.
806
806
807 If verbose is True, then return lists of taskids, otherwise,
807 If verbose is True, then return lists of taskids, otherwise,
808 return the number of tasks with each status.
808 return the number of tasks with each status.
809 """
809 """
810
810
811 def clear():
811 def clear(taskids=None):
812 """
812 """
813 Clear all previously run tasks from the task controller.
813 Clear previously run tasks from the task controller.
814
815 If no ids specified, clear all.
814
816
815 This is needed because the task controller keep all task results
817 This is needed because the task controller keep all task results
816 in memory. This can be a problem is there are many completed
818 in memory. This can be a problem is there are many completed
817 tasks. Users should call this periodically to clean out these
819 tasks. Users should call this periodically to clean out these
818 cached task results.
820 cached task results.
819 """
821 """
820
822
821
823
822 class TaskController(cs.ControllerAdapterBase):
824 class TaskController(cs.ControllerAdapterBase):
823 """The Task based interface to a Controller object.
825 """The Task based interface to a Controller object.
824
826
825 If you want to use a different scheduler, just subclass this and set
827 If you want to use a different scheduler, just subclass this and set
826 the `SchedulerClass` member to the *class* of your chosen scheduler.
828 the `SchedulerClass` member to the *class* of your chosen scheduler.
827 """
829 """
828
830
829 zi.implements(ITaskController)
831 zi.implements(ITaskController)
830 SchedulerClass = FIFOScheduler
832 SchedulerClass = FIFOScheduler
831
833
832 timeout = 30
834 timeout = 30
833
835
834 def __init__(self, controller):
836 def __init__(self, controller):
835 self.controller = controller
837 self.controller = controller
836 self.controller.on_register_engine_do(self.registerWorker, True)
838 self.controller.on_register_engine_do(self.registerWorker, True)
837 self.controller.on_unregister_engine_do(self.unregisterWorker, True)
839 self.controller.on_unregister_engine_do(self.unregisterWorker, True)
838 self.taskid = 0
840 self.taskid = 0
839 self.failurePenalty = 1 # the time in seconds to penalize
841 self.failurePenalty = 1 # the time in seconds to penalize
840 # a worker for failing a task
842 # a worker for failing a task
841 self.pendingTasks = {} # dict of {workerid:(taskid, task)}
843 self.pendingTasks = {} # dict of {workerid:(taskid, task)}
842 self.deferredResults = {} # dict of {taskid:deferred}
844 self.deferredResults = {} # dict of {taskid:deferred}
843 self.finishedResults = {} # dict of {taskid:actualResult}
845 self.finishedResults = {} # dict of {taskid:actualResult}
844 self.workers = {} # dict of {workerid:worker}
846 self.workers = {} # dict of {workerid:worker}
845 self.abortPending = [] # dict of {taskid:abortDeferred}
847 self.abortPending = [] # dict of {taskid:abortDeferred}
846 self.idleLater = None # delayed call object for timeout
848 self.idleLater = None # delayed call object for timeout
847 self.scheduler = self.SchedulerClass()
849 self.scheduler = self.SchedulerClass()
848
850
849 for id in self.controller.engines.keys():
851 for id in self.controller.engines.keys():
850 self.workers[id] = IWorker(self.controller.engines[id])
852 self.workers[id] = IWorker(self.controller.engines[id])
851 self.workers[id].workerid = id
853 self.workers[id].workerid = id
852 self.schedule.add_worker(self.workers[id])
854 self.schedule.add_worker(self.workers[id])
853
855
854 def registerWorker(self, id):
856 def registerWorker(self, id):
855 """Called by controller.register_engine."""
857 """Called by controller.register_engine."""
856 if self.workers.get(id):
858 if self.workers.get(id):
857 raise ValueError("worker with id %s already exists. This should not happen." % id)
859 raise ValueError("worker with id %s already exists. This should not happen." % id)
858 self.workers[id] = IWorker(self.controller.engines[id])
860 self.workers[id] = IWorker(self.controller.engines[id])
859 self.workers[id].workerid = id
861 self.workers[id].workerid = id
860 if not self.pendingTasks.has_key(id):# if not working
862 if not self.pendingTasks.has_key(id):# if not working
861 self.scheduler.add_worker(self.workers[id])
863 self.scheduler.add_worker(self.workers[id])
862 self.distributeTasks()
864 self.distributeTasks()
863
865
864 def unregisterWorker(self, id):
866 def unregisterWorker(self, id):
865 """Called by controller.unregister_engine"""
867 """Called by controller.unregister_engine"""
866
868
867 if self.workers.has_key(id):
869 if self.workers.has_key(id):
868 try:
870 try:
869 self.scheduler.pop_worker(id)
871 self.scheduler.pop_worker(id)
870 except IndexError:
872 except IndexError:
871 pass
873 pass
872 self.workers.pop(id)
874 self.workers.pop(id)
873
875
874 def _pendingTaskIDs(self):
876 def _pendingTaskIDs(self):
875 return [t.taskid for t in self.pendingTasks.values()]
877 return [t.taskid for t in self.pendingTasks.values()]
876
878
877 #---------------------------------------------------------------------------
879 #---------------------------------------------------------------------------
878 # Interface methods
880 # Interface methods
879 #---------------------------------------------------------------------------
881 #---------------------------------------------------------------------------
880
882
881 def run(self, task):
883 def run(self, task):
882 """
884 """
883 Run a task and return `Deferred` to its taskid.
885 Run a task and return `Deferred` to its taskid.
884 """
886 """
885 task.taskid = self.taskid
887 task.taskid = self.taskid
886 task.start = time.localtime()
888 task.start = time.localtime()
887 self.taskid += 1
889 self.taskid += 1
888 d = defer.Deferred()
890 d = defer.Deferred()
889 self.scheduler.add_task(task)
891 self.scheduler.add_task(task)
890 log.msg('Queuing task: %i' % task.taskid)
892 log.msg('Queuing task: %i' % task.taskid)
891
893
892 self.deferredResults[task.taskid] = []
894 self.deferredResults[task.taskid] = []
893 self.distributeTasks()
895 self.distributeTasks()
894 return defer.succeed(task.taskid)
896 return defer.succeed(task.taskid)
895
897
896 def get_task_result(self, taskid, block=False):
898 def get_task_result(self, taskid, block=False):
897 """
899 """
898 Returns a `Deferred` to the task result, or None.
900 Returns a `Deferred` to the task result, or None.
899 """
901 """
900 log.msg("Getting task result: %i" % taskid)
902 log.msg("Getting task result: %i" % taskid)
901 if self.finishedResults.has_key(taskid):
903 if self.finishedResults.has_key(taskid):
902 tr = self.finishedResults[taskid]
904 tr = self.finishedResults[taskid]
903 return defer.succeed(tr)
905 return defer.succeed(tr)
904 elif self.deferredResults.has_key(taskid):
906 elif self.deferredResults.has_key(taskid):
905 if block:
907 if block:
906 d = defer.Deferred()
908 d = defer.Deferred()
907 self.deferredResults[taskid].append(d)
909 self.deferredResults[taskid].append(d)
908 return d
910 return d
909 else:
911 else:
910 return defer.succeed(None)
912 return defer.succeed(None)
911 else:
913 else:
912 return defer.fail(IndexError("task ID not registered: %r" % taskid))
914 return defer.fail(IndexError("task ID not registered: %r" % taskid))
913
915
914 def abort(self, taskid):
916 def abort(self, taskid):
915 """
917 """
916 Remove a task from the queue if it has not been run already.
918 Remove a task from the queue if it has not been run already.
917 """
919 """
918 if not isinstance(taskid, int):
920 if not isinstance(taskid, int):
919 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
921 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
920 try:
922 try:
921 self.scheduler.pop_task(taskid)
923 self.scheduler.pop_task(taskid)
922 except IndexError, e:
924 except IndexError, e:
923 if taskid in self.finishedResults.keys():
925 if taskid in self.finishedResults.keys():
924 d = defer.fail(IndexError("Task Already Completed"))
926 d = defer.fail(IndexError("Task Already Completed"))
925 elif taskid in self.abortPending:
927 elif taskid in self.abortPending:
926 d = defer.fail(IndexError("Task Already Aborted"))
928 d = defer.fail(IndexError("Task Already Aborted"))
927 elif taskid in self._pendingTaskIDs():# task is pending
929 elif taskid in self._pendingTaskIDs():# task is pending
928 self.abortPending.append(taskid)
930 self.abortPending.append(taskid)
929 d = defer.succeed(None)
931 d = defer.succeed(None)
930 else:
932 else:
931 d = defer.fail(e)
933 d = defer.fail(e)
932 else:
934 else:
933 d = defer.execute(self._doAbort, taskid)
935 d = defer.execute(self._doAbort, taskid)
934
936
935 return d
937 return d
936
938
937 def barrier(self, taskids):
939 def barrier(self, taskids):
938 dList = []
940 dList = []
939 if isinstance(taskids, int):
941 if isinstance(taskids, int):
940 taskids = [taskids]
942 taskids = [taskids]
941 for id in taskids:
943 for id in taskids:
942 d = self.get_task_result(id, block=True)
944 d = self.get_task_result(id, block=True)
943 dList.append(d)
945 dList.append(d)
944 d = DeferredList(dList, consumeErrors=1)
946 d = DeferredList(dList, consumeErrors=1)
945 d.addCallbacks(lambda r: None)
947 d.addCallbacks(lambda r: None)
946 return d
948 return d
947
949
948 def spin(self):
950 def spin(self):
949 return defer.succeed(self.distributeTasks())
951 return defer.succeed(self.distributeTasks())
950
952
951 def queue_status(self, verbose=False):
953 def queue_status(self, verbose=False):
952 pending = self._pendingTaskIDs()
954 pending = self._pendingTaskIDs()
953 failed = []
955 failed = []
954 succeeded = []
956 succeeded = []
955 for k,v in self.finishedResults.iteritems():
957 for k,v in self.finishedResults.iteritems():
956 if not isinstance(v, failure.Failure):
958 if not isinstance(v, failure.Failure):
957 if hasattr(v,'failure'):
959 if hasattr(v,'failure'):
958 if v.failure is None:
960 if v.failure is None:
959 succeeded.append(k)
961 succeeded.append(k)
960 else:
962 else:
961 failed.append(k)
963 failed.append(k)
962 scheduled = self.scheduler.taskids
964 scheduled = self.scheduler.taskids
963 if verbose:
965 if verbose:
964 result = dict(pending=pending, failed=failed,
966 result = dict(pending=pending, failed=failed,
965 succeeded=succeeded, scheduled=scheduled)
967 succeeded=succeeded, scheduled=scheduled)
966 else:
968 else:
967 result = dict(pending=len(pending),failed=len(failed),
969 result = dict(pending=len(pending),failed=len(failed),
968 succeeded=len(succeeded),scheduled=len(scheduled))
970 succeeded=len(succeeded),scheduled=len(scheduled))
969 return defer.succeed(result)
971 return defer.succeed(result)
970
972
971 #---------------------------------------------------------------------------
973 #---------------------------------------------------------------------------
972 # Queue methods
974 # Queue methods
973 #---------------------------------------------------------------------------
975 #---------------------------------------------------------------------------
974
976
975 def _doAbort(self, taskid):
977 def _doAbort(self, taskid):
976 """
978 """
977 Helper function for aborting a pending task.
979 Helper function for aborting a pending task.
978 """
980 """
979 log.msg("Task aborted: %i" % taskid)
981 log.msg("Task aborted: %i" % taskid)
980 result = failure.Failure(error.TaskAborted())
982 result = failure.Failure(error.TaskAborted())
981 self._finishTask(taskid, result)
983 self._finishTask(taskid, result)
982 if taskid in self.abortPending:
984 if taskid in self.abortPending:
983 self.abortPending.remove(taskid)
985 self.abortPending.remove(taskid)
984
986
985 def _finishTask(self, taskid, result):
987 def _finishTask(self, taskid, result):
986 dlist = self.deferredResults.pop(taskid)
988 dlist = self.deferredResults.pop(taskid)
987 # result.taskid = taskid # The TaskResult should save the taskid
989 # result.taskid = taskid # The TaskResult should save the taskid
988 self.finishedResults[taskid] = result
990 self.finishedResults[taskid] = result
989 for d in dlist:
991 for d in dlist:
990 d.callback(result)
992 d.callback(result)
991
993
992 def distributeTasks(self):
994 def distributeTasks(self):
993 """
995 """
994 Distribute tasks while self.scheduler has things to do.
996 Distribute tasks while self.scheduler has things to do.
995 """
997 """
996 log.msg("distributing Tasks")
998 log.msg("distributing Tasks")
997 worker, task = self.scheduler.schedule()
999 worker, task = self.scheduler.schedule()
998 if not worker and not task:
1000 if not worker and not task:
999 if self.idleLater and self.idleLater.called:# we are inside failIdle
1001 if self.idleLater and self.idleLater.called:# we are inside failIdle
1000 self.idleLater = None
1002 self.idleLater = None
1001 else:
1003 else:
1002 self.checkIdle()
1004 self.checkIdle()
1003 return False
1005 return False
1004 # else something to do:
1006 # else something to do:
1005 while worker and task:
1007 while worker and task:
1006 # get worker and task
1008 # get worker and task
1007 # add to pending
1009 # add to pending
1008 self.pendingTasks[worker.workerid] = task
1010 self.pendingTasks[worker.workerid] = task
1009 # run/link callbacks
1011 # run/link callbacks
1010 d = worker.run(task)
1012 d = worker.run(task)
1011 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1013 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1012 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1014 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1013 worker, task = self.scheduler.schedule()
1015 worker, task = self.scheduler.schedule()
1014 # check for idle timeout:
1016 # check for idle timeout:
1015 self.checkIdle()
1017 self.checkIdle()
1016 return True
1018 return True
1017
1019
1018 def checkIdle(self):
1020 def checkIdle(self):
1019 if self.idleLater and not self.idleLater.called:
1021 if self.idleLater and not self.idleLater.called:
1020 self.idleLater.cancel()
1022 self.idleLater.cancel()
1021 if self.scheduler.ntasks and self.workers and \
1023 if self.scheduler.ntasks and self.workers and \
1022 self.scheduler.nworkers == len(self.workers):
1024 self.scheduler.nworkers == len(self.workers):
1023 self.idleLater = reactor.callLater(self.timeout, self.failIdle)
1025 self.idleLater = reactor.callLater(self.timeout, self.failIdle)
1024 else:
1026 else:
1025 self.idleLater = None
1027 self.idleLater = None
1026
1028
1027 def failIdle(self):
1029 def failIdle(self):
1028 if not self.distributeTasks():
1030 if not self.distributeTasks():
1029 while self.scheduler.ntasks:
1031 while self.scheduler.ntasks:
1030 t = self.scheduler.pop_task()
1032 t = self.scheduler.pop_task()
1031 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1033 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1032 msg += " for %i seconds"%self.timeout
1034 msg += " for %i seconds"%self.timeout
1033 log.msg("Task aborted by timeout: %i" % t.taskid)
1035 log.msg("Task aborted by timeout: %i" % t.taskid)
1034 f = failure.Failure(error.TaskTimeout(msg))
1036 f = failure.Failure(error.TaskTimeout(msg))
1035 self._finishTask(t.taskid, f)
1037 self._finishTask(t.taskid, f)
1036 self.idleLater = None
1038 self.idleLater = None
1037
1039
1038
1040
1039 def taskCompleted(self, success_and_result, taskid, workerid):
1041 def taskCompleted(self, success_and_result, taskid, workerid):
1040 """This is the err/callback for a completed task."""
1042 """This is the err/callback for a completed task."""
1041 success, result = success_and_result
1043 success, result = success_and_result
1042 try:
1044 try:
1043 task = self.pendingTasks.pop(workerid)
1045 task = self.pendingTasks.pop(workerid)
1044 except:
1046 except:
1045 # this should not happen
1047 # this should not happen
1046 log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
1048 log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
1047 log.msg("Result: %r"%result)
1049 log.msg("Result: %r"%result)
1048 log.msg("Pending tasks: %s"%self.pendingTasks)
1050 log.msg("Pending tasks: %s"%self.pendingTasks)
1049 return
1051 return
1050
1052
1051 # Check if aborted while pending
1053 # Check if aborted while pending
1052 aborted = False
1054 aborted = False
1053 if taskid in self.abortPending:
1055 if taskid in self.abortPending:
1054 self._doAbort(taskid)
1056 self._doAbort(taskid)
1055 aborted = True
1057 aborted = True
1056
1057 if not aborted:
1058 if not aborted:
1058 if not success:
1059 if not success:
1060 if isinstance(result,error.TaskRejectError):
1061 log.msg("Task %i dependencies unmet by worker %i"%(taskid, workerid))
1062
1063 return
1064
1059 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1065 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1060 if task.retries > 0: # resubmit
1066 if task.retries > 0: # resubmit
1061 task.retries -= 1
1067 task.retries -= 1
1062 self.scheduler.add_task(task)
1068 self.scheduler.add_task(task)
1063 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1069 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1064 log.msg(s)
1070 log.msg(s)
1065 self.distributeTasks()
1071 self.distributeTasks()
1066 elif isinstance(task.recovery_task, BaseTask) and \
1072 elif isinstance(task.recovery_task, BaseTask) and \
1067 task.recovery_task.retries > -1:
1073 task.recovery_task.retries > -1:
1068 # retries = -1 is to prevent infinite recovery_task loop
1074 # retries = -1 is to prevent infinite recovery_task loop
1069 task.retries = -1
1075 task.retries = -1
1070 task.recovery_task.taskid = taskid
1076 task.recovery_task.taskid = taskid
1071 task = task.recovery_task
1077 task = task.recovery_task
1072 self.scheduler.add_task(task)
1078 self.scheduler.add_task(task)
1073 s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
1079 s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
1074 log.msg(s)
1080 log.msg(s)
1075 self.distributeTasks()
1081 self.distributeTasks()
1076 else: # done trying
1082 else: # done trying
1077 self._finishTask(taskid, result)
1083 self._finishTask(taskid, result)
1078 # wait a second before readmitting a worker that failed
1084 # wait a second before readmitting a worker that failed
1079 # it may have died, and not yet been unregistered
1085 # it may have died, and not yet been unregistered
1080 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1086 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1081 else: # we succeeded
1087 else: # we succeeded
1082 log.msg("Task completed: %i"% taskid)
1088 log.msg("Task completed: %i"% taskid)
1083 self._finishTask(taskid, result)
1089 self._finishTask(taskid, result)
1084 self.readmitWorker(workerid)
1090 self.readmitWorker(workerid)
1085 else: # we aborted the task
1091 else: # we aborted the task
1086 if not success:
1092 if not success:
1087 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1093 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1088 else:
1094 else:
1089 self.readmitWorker(workerid)
1095 self.readmitWorker(workerid)
1090
1096
1091 def readmitWorker(self, workerid):
1097 def readmitWorker(self, workerid):
1092 """
1098 """
1093 Readmit a worker to the scheduler.
1099 Readmit a worker to the scheduler.
1094
1100
1095 This is outside `taskCompleted` because of the `failurePenalty` being
1101 This is outside `taskCompleted` because of the `failurePenalty` being
1096 implemented through `reactor.callLater`.
1102 implemented through `reactor.callLater`.
1097 """
1103 """
1098
1104
1099 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
1105 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
1100 self.scheduler.add_worker(self.workers[workerid])
1106 self.scheduler.add_worker(self.workers[workerid])
1101 self.distributeTasks()
1107 self.distributeTasks()
1102
1108
1103 def clear(self):
1109 def clear(self,taskids=None):
1104 """
1110 """
1105 Clear all previously run tasks from the task controller.
1111 Clear all previously run tasks from the task controller.
1106
1112
1107 This is needed because the task controller keep all task results
1113 This is needed because the task controller keep all task results
1108 in memory. This can be a problem is there are many completed
1114 in memory. This can be a problem is there are many completed
1109 tasks. Users should call this periodically to clean out these
1115 tasks. Users should call this periodically to clean out these
1110 cached task results.
1116 cached task results.
1111 """
1117 """
1112 self.finishedResults = {}
1118 before = len(self.finishedResults)
1113 return defer.succeed(None)
1119 failed = []
1120 if taskids is None:
1121 log.msg("Clearing all results")
1122 self.finishedResults = {}
1123 else:
1124 if isinstance(taskids, int):
1125 taskids = [taskids]
1126 if len(taskids) > 0:
1127 log.msg("Clearing results: %i et al."%(taskids[0]))
1128 for i in taskids:
1129 if self.finishedResults.has_key(i):
1130 self.finishedResults.pop(i)
1131 else:
1132 failed.append(i)
1133 after = len(self.finishedResults)
1134 log.msg("Cleared %i results"%(before-after))
1135 if failed:
1136 fails = ", ".join(map(str, failed))
1137 return defer.fail(KeyError("Cleared %i results, but no tasks found for ids: %s"%(before-after, fails)))
1138 else:
1139 return defer.succeed(before-after)
1114
1140
1115
1141
1116 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
1142 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -1,180 +1,180 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3
3
4 """
4 """
5 A blocking version of the task client.
5 A blocking version of the task client.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 from zope.interface import Interface, implements
21 from zope.interface import Interface, implements
22 from twisted.python import components, log
22 from twisted.python import components, log
23
23
24 from IPython.kernel.twistedutil import blockingCallFromThread
24 from IPython.kernel.twistedutil import blockingCallFromThread
25 from IPython.kernel import task, error
25 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
26 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
27 SynchronousTaskMapper,
28 ITaskMapperFactory,
28 ITaskMapperFactory,
29 IMapper
29 IMapper
30 )
30 )
31 from IPython.kernel.parallelfunction import (
31 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
32 ParallelFunction,
33 ITaskParallelDecorator
33 ITaskParallelDecorator
34 )
34 )
35
35
36 #-------------------------------------------------------------------------------
36 #-------------------------------------------------------------------------------
37 # The task client
37 # The task client
38 #-------------------------------------------------------------------------------
38 #-------------------------------------------------------------------------------
39
39
40 class IBlockingTaskClient(Interface):
40 class IBlockingTaskClient(Interface):
41 """
41 """
42 A vague interface of the blocking task client
42 A vague interface of the blocking task client
43 """
43 """
44 pass
44 pass
45
45
46 class BlockingTaskClient(object):
46 class BlockingTaskClient(object):
47 """
47 """
48 A blocking task client that adapts a non-blocking one.
48 A blocking task client that adapts a non-blocking one.
49 """
49 """
50
50
51 implements(
51 implements(
52 IBlockingTaskClient,
52 IBlockingTaskClient,
53 ITaskMapperFactory,
53 ITaskMapperFactory,
54 IMapper,
54 IMapper,
55 ITaskParallelDecorator
55 ITaskParallelDecorator
56 )
56 )
57
57
58 def __init__(self, task_controller):
58 def __init__(self, task_controller):
59 self.task_controller = task_controller
59 self.task_controller = task_controller
60 self.block = True
60 self.block = True
61
61
62 def run(self, task, block=False):
62 def run(self, task, block=False):
63 """Run a task on the `TaskController`.
63 """Run a task on the `TaskController`.
64
64
65 See the documentation of the `MapTask` and `StringTask` classes for
65 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
66 details on how to build a task of different types.
67
67
68 :Parameters:
68 :Parameters:
69 task : an `ITask` implementer
69 task : an `ITask` implementer
70
70
71 :Returns: The int taskid of the submitted task. Pass this to
71 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
72 `get_task_result` to get the `TaskResult` object.
73 """
73 """
74 tid = blockingCallFromThread(self.task_controller.run, task)
74 tid = blockingCallFromThread(self.task_controller.run, task)
75 if block:
75 if block:
76 return self.get_task_result(tid, block=True)
76 return self.get_task_result(tid, block=True)
77 else:
77 else:
78 return tid
78 return tid
79
79
80 def get_task_result(self, taskid, block=False):
80 def get_task_result(self, taskid, block=False):
81 """
81 """
82 Get a task result by taskid.
82 Get a task result by taskid.
83
83
84 :Parameters:
84 :Parameters:
85 taskid : int
85 taskid : int
86 The taskid of the task to be retrieved.
86 The taskid of the task to be retrieved.
87 block : boolean
87 block : boolean
88 Should I block until the task is done?
88 Should I block until the task is done?
89
89
90 :Returns: A `TaskResult` object that encapsulates the task result.
90 :Returns: A `TaskResult` object that encapsulates the task result.
91 """
91 """
92 return blockingCallFromThread(self.task_controller.get_task_result,
92 return blockingCallFromThread(self.task_controller.get_task_result,
93 taskid, block)
93 taskid, block)
94
94
95 def abort(self, taskid):
95 def abort(self, taskid):
96 """
96 """
97 Abort a task by taskid.
97 Abort a task by taskid.
98
98
99 :Parameters:
99 :Parameters:
100 taskid : int
100 taskid : int
101 The taskid of the task to be aborted.
101 The taskid of the task to be aborted.
102 """
102 """
103 return blockingCallFromThread(self.task_controller.abort, taskid)
103 return blockingCallFromThread(self.task_controller.abort, taskid)
104
104
105 def barrier(self, taskids):
105 def barrier(self, taskids):
106 """Block until a set of tasks are completed.
106 """Block until a set of tasks are completed.
107
107
108 :Parameters:
108 :Parameters:
109 taskids : list, tuple
109 taskids : list, tuple
110 A sequence of taskids to block on.
110 A sequence of taskids to block on.
111 """
111 """
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
112 return blockingCallFromThread(self.task_controller.barrier, taskids)
113
113
114 def spin(self):
114 def spin(self):
115 """
115 """
116 Touch the scheduler, to resume scheduling without submitting a task.
116 Touch the scheduler, to resume scheduling without submitting a task.
117
117
118 This method only needs to be called in unusual situations where the
118 This method only needs to be called in unusual situations where the
119 scheduler is idle for some reason.
119 scheduler is idle for some reason.
120 """
120 """
121 return blockingCallFromThread(self.task_controller.spin)
121 return blockingCallFromThread(self.task_controller.spin)
122
122
123 def queue_status(self, verbose=False):
123 def queue_status(self, verbose=False):
124 """
124 """
125 Get a dictionary with the current state of the task queue.
125 Get a dictionary with the current state of the task queue.
126
126
127 :Parameters:
127 :Parameters:
128 verbose : boolean
128 verbose : boolean
129 If True, return a list of taskids. If False, simply give
129 If True, return a list of taskids. If False, simply give
130 the number of tasks with each status.
130 the number of tasks with each status.
131
131
132 :Returns:
132 :Returns:
133 A dict with the queue status.
133 A dict with the queue status.
134 """
134 """
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136
136
137 def clear(self):
137 def clear(self,taskids=None):
138 """
138 """
139 Clear all previously run tasks from the task controller.
139 Clear all previously run tasks from the task controller.
140
140
141 This is needed because the task controller keep all task results
141 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
142 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
143 tasks. Users should call this periodically to clean out these
144 cached task results.
144 cached task results.
145 """
145 """
146 return blockingCallFromThread(self.task_controller.clear)
146 return blockingCallFromThread(self.task_controller.clear, taskids)
147
147
148 def map(self, func, *sequences):
148 def map(self, func, *sequences):
149 """
149 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
150 Apply func to *sequences elementwise. Like Python's builtin map.
151
151
152 This version is load balanced.
152 This version is load balanced.
153 """
153 """
154 return self.mapper().map(func, *sequences)
154 return self.mapper().map(func, *sequences)
155
155
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
157 recovery_task=None, depend=None, block=True):
158 """
158 """
159 Create an `IMapper` implementer with a given set of arguments.
159 Create an `IMapper` implementer with a given set of arguments.
160
160
161 The `IMapper` created using a task controller is load balanced.
161 The `IMapper` created using a task controller is load balanced.
162
162
163 See the documentation for `IPython.kernel.task.BaseTask` for
163 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
164 documentation on the arguments to this method.
165 """
165 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
167 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
168 recovery_task=recovery_task, depend=depend, block=block)
169
169
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
171 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
172 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
173 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
174 pf = ParallelFunction(mapper)
175 return pf
175 return pf
176
176
177 components.registerAdapter(BlockingTaskClient,
177 components.registerAdapter(BlockingTaskClient,
178 task.ITaskController, IBlockingTaskClient)
178 task.ITaskController, IBlockingTaskClient)
179
179
180
180
@@ -1,329 +1,340 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
3 """A Foolscap interface to a TaskController.
3 """A Foolscap interface to a TaskController.
4
4
5 This class lets Foolscap clients talk to a TaskController.
5 This class lets Foolscap clients talk to a TaskController.
6 """
6 """
7
7
8 __docformat__ = "restructuredtext en"
8 __docformat__ = "restructuredtext en"
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Copyright (C) 2008 The IPython Development Team
11 # Copyright (C) 2008 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16
16
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18 # Imports
18 # Imports
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20
20
21 import cPickle as pickle
21 import cPickle as pickle
22 import xmlrpclib, copy
22 import xmlrpclib, copy
23
23
24 from zope.interface import Interface, implements
24 from zope.interface import Interface, implements
25 from twisted.internet import defer
25 from twisted.internet import defer
26 from twisted.python import components, failure
26 from twisted.python import components, failure
27
27
28 from foolscap import Referenceable
28 from foolscap import Referenceable
29
29
30 from IPython.kernel.twistedutil import blockingCallFromThread
30 from IPython.kernel.twistedutil import blockingCallFromThread
31 from IPython.kernel import error, task as taskmodule, taskclient
31 from IPython.kernel import error, task as taskmodule, taskclient
32 from IPython.kernel.pickleutil import can, uncan
32 from IPython.kernel.pickleutil import can, uncan
33 from IPython.kernel.clientinterfaces import (
33 from IPython.kernel.clientinterfaces import (
34 IFCClientInterfaceProvider,
34 IFCClientInterfaceProvider,
35 IBlockingClientAdaptor
35 IBlockingClientAdaptor
36 )
36 )
37 from IPython.kernel.mapper import (
37 from IPython.kernel.mapper import (
38 TaskMapper,
38 TaskMapper,
39 ITaskMapperFactory,
39 ITaskMapperFactory,
40 IMapper
40 IMapper
41 )
41 )
42 from IPython.kernel.parallelfunction import (
42 from IPython.kernel.parallelfunction import (
43 ParallelFunction,
43 ParallelFunction,
44 ITaskParallelDecorator
44 ITaskParallelDecorator
45 )
45 )
46
46
47 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
48 # The Controller side of things
48 # The Controller side of things
49 #-------------------------------------------------------------------------------
49 #-------------------------------------------------------------------------------
50
50
51
51
52 class IFCTaskController(Interface):
52 class IFCTaskController(Interface):
53 """Foolscap interface to task controller.
53 """Foolscap interface to task controller.
54
54
55 See the documentation of `ITaskController` for more information.
55 See the documentation of `ITaskController` for more information.
56 """
56 """
57 def remote_run(binTask):
57 def remote_run(binTask):
58 """"""
58 """"""
59
59
60 def remote_abort(taskid):
60 def remote_abort(taskid):
61 """"""
61 """"""
62
62
63 def remote_get_task_result(taskid, block=False):
63 def remote_get_task_result(taskid, block=False):
64 """"""
64 """"""
65
65
66 def remote_barrier(taskids):
66 def remote_barrier(taskids):
67 """"""
67 """"""
68
68
69 def remote_spin():
69 def remote_spin():
70 """"""
70 """"""
71
71
72 def remote_queue_status(verbose):
72 def remote_queue_status(verbose):
73 """"""
73 """"""
74
74
75 def remote_clear():
75 def remote_clear(taskids=None):
76 """"""
76 """"""
77
77
78
78
79 class FCTaskControllerFromTaskController(Referenceable):
79 class FCTaskControllerFromTaskController(Referenceable):
80 """
80 """
81 Adapt a `TaskController` to an `IFCTaskController`
81 Adapt a `TaskController` to an `IFCTaskController`
82
82
83 This class is used to expose a `TaskController` over the wire using
83 This class is used to expose a `TaskController` over the wire using
84 the Foolscap network protocol.
84 the Foolscap network protocol.
85 """
85 """
86
86
87 implements(IFCTaskController, IFCClientInterfaceProvider)
87 implements(IFCTaskController, IFCClientInterfaceProvider)
88
88
89 def __init__(self, taskController):
89 def __init__(self, taskController):
90 self.taskController = taskController
90 self.taskController = taskController
91
91
92 #---------------------------------------------------------------------------
92 #---------------------------------------------------------------------------
93 # Non interface methods
93 # Non interface methods
94 #---------------------------------------------------------------------------
94 #---------------------------------------------------------------------------
95
95
96 def packageFailure(self, f):
96 def packageFailure(self, f):
97 f.cleanFailure()
97 f.cleanFailure()
98 return self.packageSuccess(f)
98 return self.packageSuccess(f)
99
99
100 def packageSuccess(self, obj):
100 def packageSuccess(self, obj):
101 serial = pickle.dumps(obj, 2)
101 serial = pickle.dumps(obj, 2)
102 return serial
102 return serial
103
103
104 #---------------------------------------------------------------------------
104 #---------------------------------------------------------------------------
105 # ITaskController related methods
105 # ITaskController related methods
106 #---------------------------------------------------------------------------
106 #---------------------------------------------------------------------------
107
107
108 def remote_run(self, ptask):
108 def remote_run(self, ptask):
109 try:
109 try:
110 task = pickle.loads(ptask)
110 task = pickle.loads(ptask)
111 task.uncan_task()
111 task.uncan_task()
112 except:
112 except:
113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
114 else:
114 else:
115 d = self.taskController.run(task)
115 d = self.taskController.run(task)
116 d.addCallback(self.packageSuccess)
116 d.addCallback(self.packageSuccess)
117 d.addErrback(self.packageFailure)
117 d.addErrback(self.packageFailure)
118 return d
118 return d
119
119
120 def remote_abort(self, taskid):
120 def remote_abort(self, taskid):
121 d = self.taskController.abort(taskid)
121 d = self.taskController.abort(taskid)
122 d.addCallback(self.packageSuccess)
122 d.addCallback(self.packageSuccess)
123 d.addErrback(self.packageFailure)
123 d.addErrback(self.packageFailure)
124 return d
124 return d
125
125
126 def remote_get_task_result(self, taskid, block=False):
126 def remote_get_task_result(self, taskid, block=False):
127 d = self.taskController.get_task_result(taskid, block)
127 d = self.taskController.get_task_result(taskid, block)
128 d.addCallback(self.packageSuccess)
128 d.addCallback(self.packageSuccess)
129 d.addErrback(self.packageFailure)
129 d.addErrback(self.packageFailure)
130 return d
130 return d
131
131
132 def remote_barrier(self, taskids):
132 def remote_barrier(self, taskids):
133 d = self.taskController.barrier(taskids)
133 d = self.taskController.barrier(taskids)
134 d.addCallback(self.packageSuccess)
134 d.addCallback(self.packageSuccess)
135 d.addErrback(self.packageFailure)
135 d.addErrback(self.packageFailure)
136 return d
136 return d
137
137
138 def remote_spin(self):
138 def remote_spin(self):
139 d = self.taskController.spin()
139 d = self.taskController.spin()
140 d.addCallback(self.packageSuccess)
140 d.addCallback(self.packageSuccess)
141 d.addErrback(self.packageFailure)
141 d.addErrback(self.packageFailure)
142 return d
142 return d
143
143
144 def remote_queue_status(self, verbose):
144 def remote_queue_status(self, verbose):
145 d = self.taskController.queue_status(verbose)
145 d = self.taskController.queue_status(verbose)
146 d.addCallback(self.packageSuccess)
146 d.addCallback(self.packageSuccess)
147 d.addErrback(self.packageFailure)
147 d.addErrback(self.packageFailure)
148 return d
148 return d
149
149
150 def remote_clear(self):
150 def remote_clear(self,taskids=None):
151 return self.taskController.clear()
151 d = self.taskController.clear(taskids)
152 d.addCallback(self.packageSuccess)
153 d.addErrback(self.packageFailure)
154 return d
152
155
153 def remote_get_client_name(self):
156 def remote_get_client_name(self):
154 return 'IPython.kernel.taskfc.FCTaskClient'
157 return 'IPython.kernel.taskfc.FCTaskClient'
155
158
156 components.registerAdapter(FCTaskControllerFromTaskController,
159 components.registerAdapter(FCTaskControllerFromTaskController,
157 taskmodule.ITaskController, IFCTaskController)
160 taskmodule.ITaskController, IFCTaskController)
158
161
159
162
160 #-------------------------------------------------------------------------------
163 #-------------------------------------------------------------------------------
161 # The Client side of things
164 # The Client side of things
162 #-------------------------------------------------------------------------------
165 #-------------------------------------------------------------------------------
163
166
164 class FCTaskClient(object):
167 class FCTaskClient(object):
165 """
168 """
166 Client class for Foolscap exposed `TaskController`.
169 Client class for Foolscap exposed `TaskController`.
167
170
168 This class is an adapter that makes a `RemoteReference` to a
171 This class is an adapter that makes a `RemoteReference` to a
169 `TaskController` look like an actual `ITaskController` on the client side.
172 `TaskController` look like an actual `ITaskController` on the client side.
170
173
171 This class also implements `IBlockingClientAdaptor` so that clients can
174 This class also implements `IBlockingClientAdaptor` so that clients can
172 automatically get a blocking version of this class.
175 automatically get a blocking version of this class.
173 """
176 """
174
177
175 implements(
178 implements(
176 taskmodule.ITaskController,
179 taskmodule.ITaskController,
177 IBlockingClientAdaptor,
180 IBlockingClientAdaptor,
178 ITaskMapperFactory,
181 ITaskMapperFactory,
179 IMapper,
182 IMapper,
180 ITaskParallelDecorator
183 ITaskParallelDecorator
181 )
184 )
182
185
183 def __init__(self, remote_reference):
186 def __init__(self, remote_reference):
184 self.remote_reference = remote_reference
187 self.remote_reference = remote_reference
185
188
186 #---------------------------------------------------------------------------
189 #---------------------------------------------------------------------------
187 # Non interface methods
190 # Non interface methods
188 #---------------------------------------------------------------------------
191 #---------------------------------------------------------------------------
189
192
190 def unpackage(self, r):
193 def unpackage(self, r):
191 return pickle.loads(r)
194 return pickle.loads(r)
192
195
193 #---------------------------------------------------------------------------
196 #---------------------------------------------------------------------------
194 # ITaskController related methods
197 # ITaskController related methods
195 #---------------------------------------------------------------------------
198 #---------------------------------------------------------------------------
196 def run(self, task):
199 def run(self, task):
197 """Run a task on the `TaskController`.
200 """Run a task on the `TaskController`.
198
201
199 See the documentation of the `MapTask` and `StringTask` classes for
202 See the documentation of the `MapTask` and `StringTask` classes for
200 details on how to build a task of different types.
203 details on how to build a task of different types.
201
204
202 :Parameters:
205 :Parameters:
203 task : an `ITask` implementer
206 task : an `ITask` implementer
204
207
205 :Returns: The int taskid of the submitted task. Pass this to
208 :Returns: The int taskid of the submitted task. Pass this to
206 `get_task_result` to get the `TaskResult` object.
209 `get_task_result` to get the `TaskResult` object.
207 """
210 """
208 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
211 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
209 task.can_task()
212 task.can_task()
210 ptask = pickle.dumps(task, 2)
213 ptask = pickle.dumps(task, 2)
211 task.uncan_task()
214 task.uncan_task()
212 d = self.remote_reference.callRemote('run', ptask)
215 d = self.remote_reference.callRemote('run', ptask)
213 d.addCallback(self.unpackage)
216 d.addCallback(self.unpackage)
214 return d
217 return d
215
218
216 def get_task_result(self, taskid, block=False):
219 def get_task_result(self, taskid, block=False):
217 """
220 """
218 Get a task result by taskid.
221 Get a task result by taskid.
219
222
220 :Parameters:
223 :Parameters:
221 taskid : int
224 taskid : int
222 The taskid of the task to be retrieved.
225 The taskid of the task to be retrieved.
223 block : boolean
226 block : boolean
224 Should I block until the task is done?
227 Should I block until the task is done?
225
228
226 :Returns: A `TaskResult` object that encapsulates the task result.
229 :Returns: A `TaskResult` object that encapsulates the task result.
227 """
230 """
228 d = self.remote_reference.callRemote('get_task_result', taskid, block)
231 d = self.remote_reference.callRemote('get_task_result', taskid, block)
229 d.addCallback(self.unpackage)
232 d.addCallback(self.unpackage)
230 return d
233 return d
231
234
232 def abort(self, taskid):
235 def abort(self, taskid):
233 """
236 """
234 Abort a task by taskid.
237 Abort a task by taskid.
235
238
236 :Parameters:
239 :Parameters:
237 taskid : int
240 taskid : int
238 The taskid of the task to be aborted.
241 The taskid of the task to be aborted.
239 """
242 """
240 d = self.remote_reference.callRemote('abort', taskid)
243 d = self.remote_reference.callRemote('abort', taskid)
241 d.addCallback(self.unpackage)
244 d.addCallback(self.unpackage)
242 return d
245 return d
243
246
244 def barrier(self, taskids):
247 def barrier(self, taskids):
245 """Block until a set of tasks are completed.
248 """Block until a set of tasks are completed.
246
249
247 :Parameters:
250 :Parameters:
248 taskids : list, tuple
251 taskids : list, tuple
249 A sequence of taskids to block on.
252 A sequence of taskids to block on.
250 """
253 """
251 d = self.remote_reference.callRemote('barrier', taskids)
254 d = self.remote_reference.callRemote('barrier', taskids)
252 d.addCallback(self.unpackage)
255 d.addCallback(self.unpackage)
253 return d
256 return d
254
257
255 def spin(self):
258 def spin(self):
256 """
259 """
257 Touch the scheduler, to resume scheduling without submitting a task.
260 Touch the scheduler, to resume scheduling without submitting a task.
258
261
259 This method only needs to be called in unusual situations where the
262 This method only needs to be called in unusual situations where the
260 scheduler is idle for some reason.
263 scheduler is idle for some reason.
261 """
264 """
262 d = self.remote_reference.callRemote('spin')
265 d = self.remote_reference.callRemote('spin')
263 d.addCallback(self.unpackage)
266 d.addCallback(self.unpackage)
264 return d
267 return d
265
268
266 def queue_status(self, verbose=False):
269 def queue_status(self, verbose=False):
267 """
270 """
268 Get a dictionary with the current state of the task queue.
271 Get a dictionary with the current state of the task queue.
269
272
270 :Parameters:
273 :Parameters:
271 verbose : boolean
274 verbose : boolean
272 If True, return a list of taskids. If False, simply give
275 If True, return a list of taskids. If False, simply give
273 the number of tasks with each status.
276 the number of tasks with each status.
274
277
275 :Returns:
278 :Returns:
276 A dict with the queue status.
279 A dict with the queue status.
277 """
280 """
278 d = self.remote_reference.callRemote('queue_status', verbose)
281 d = self.remote_reference.callRemote('queue_status', verbose)
279 d.addCallback(self.unpackage)
282 d.addCallback(self.unpackage)
280 return d
283 return d
281
284
282 def clear(self):
285 def clear(self,taskids=None):
283 """
286 """
284 Clear all previously run tasks from the task controller.
287 Clear previously run tasks from the task controller.
288 :Parameters:
289 taskids : list, tuple, None
290 A sequence of taskids whose results we should drop.
291 if None: clear all results
292
293 :Returns:
294 An int, the number of tasks cleared
285
295
286 This is needed because the task controller keep all task results
296 This is needed because the task controller keep all task results
287 in memory. This can be a problem is there are many completed
297 in memory. This can be a problem is there are many completed
288 tasks. Users should call this periodically to clean out these
298 tasks. Users should call this periodically to clean out these
289 cached task results.
299 cached task results.
290 """
300 """
291 d = self.remote_reference.callRemote('clear')
301 d = self.remote_reference.callRemote('clear', taskids)
302 d.addCallback(self.unpackage)
292 return d
303 return d
293
304
294 def adapt_to_blocking_client(self):
305 def adapt_to_blocking_client(self):
295 """
306 """
296 Wrap self in a blocking version that implements `IBlockingTaskClient.
307 Wrap self in a blocking version that implements `IBlockingTaskClient.
297 """
308 """
298 from IPython.kernel.taskclient import IBlockingTaskClient
309 from IPython.kernel.taskclient import IBlockingTaskClient
299 return IBlockingTaskClient(self)
310 return IBlockingTaskClient(self)
300
311
301 def map(self, func, *sequences):
312 def map(self, func, *sequences):
302 """
313 """
303 Apply func to *sequences elementwise. Like Python's builtin map.
314 Apply func to *sequences elementwise. Like Python's builtin map.
304
315
305 This version is load balanced.
316 This version is load balanced.
306 """
317 """
307 return self.mapper().map(func, *sequences)
318 return self.mapper().map(func, *sequences)
308
319
309 def mapper(self, clear_before=False, clear_after=False, retries=0,
320 def mapper(self, clear_before=False, clear_after=False, retries=0,
310 recovery_task=None, depend=None, block=True):
321 recovery_task=None, depend=None, block=True):
311 """
322 """
312 Create an `IMapper` implementer with a given set of arguments.
323 Create an `IMapper` implementer with a given set of arguments.
313
324
314 The `IMapper` created using a task controller is load balanced.
325 The `IMapper` created using a task controller is load balanced.
315
326
316 See the documentation for `IPython.kernel.task.BaseTask` for
327 See the documentation for `IPython.kernel.task.BaseTask` for
317 documentation on the arguments to this method.
328 documentation on the arguments to this method.
318 """
329 """
319 return TaskMapper(self, clear_before=clear_before,
330 return TaskMapper(self, clear_before=clear_before,
320 clear_after=clear_after, retries=retries,
331 clear_after=clear_after, retries=retries,
321 recovery_task=recovery_task, depend=depend, block=block)
332 recovery_task=recovery_task, depend=depend, block=block)
322
333
323 def parallel(self, clear_before=False, clear_after=False, retries=0,
334 def parallel(self, clear_before=False, clear_after=False, retries=0,
324 recovery_task=None, depend=None, block=True):
335 recovery_task=None, depend=None, block=True):
325 mapper = self.mapper(clear_before, clear_after, retries,
336 mapper = self.mapper(clear_before, clear_after, retries,
326 recovery_task, depend, block)
337 recovery_task, depend, block)
327 pf = ParallelFunction(mapper)
338 pf = ParallelFunction(mapper)
328 return pf
339 return pf
329
340
General Comments 0
You need to be logged in to leave comments. Login now