##// END OF EJS Templates
Fix doctest.
Fernando Perez -
Show More
@@ -1,1113 +1,1113
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_task -*-
3 3
4 4 """Task farming representation of the ControllerService."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import copy, time
20 20 from types import FunctionType
21 21
22 22 import zope.interface as zi, string
23 23 from twisted.internet import defer, reactor
24 24 from twisted.python import components, log, failure
25 25
26 26 from IPython.kernel.util import printer
27 27 from IPython.kernel import engineservice as es, error
28 28 from IPython.kernel import controllerservice as cs
29 29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
30 30
31 31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
32 32
33 33 #-----------------------------------------------------------------------------
34 34 # Definition of the Task objects
35 35 #-----------------------------------------------------------------------------
36 36
37 37 time_format = '%Y/%m/%d %H:%M:%S'
38 38
39 39 class ITask(zi.Interface):
40 40 """
41 41 This interface provides a generic definition of what constitutes a task.
42 42
43 43 There are two sides to a task. First a task needs to take input from
44 44 a user to determine what work is performed by the task. Second, the
45 45 task needs to have the logic that knows how to turn that information
46 46 info specific calls to a worker, through the `IQueuedEngine` interface.
47 47
48 48 Many method in this class get two things passed to them: a Deferred
49 49 and an IQueuedEngine implementer. Such methods should register callbacks
50 50 on the Deferred that use the IQueuedEngine to accomplish something. See
51 51 the existing task objects for examples.
52 52 """
53 53
54 54 zi.Attribute('retries','How many times to retry the task')
55 55 zi.Attribute('recovery_task','A task to try if the initial one fails')
56 56 zi.Attribute('taskid','the id of the task')
57 57
58 58 def start_time(result):
59 59 """
60 60 Do anything needed to start the timing of the task.
61 61
62 62 Must simply return the result after starting the timers.
63 63 """
64 64
65 65 def stop_time(result):
66 66 """
67 67 Do anything needed to stop the timing of the task.
68 68
69 69 Must simply return the result after stopping the timers. This
70 70 method will usually set attributes that are used by `process_result`
71 71 in building result of the task.
72 72 """
73 73
74 74 def pre_task(d, queued_engine):
75 75 """Do something with the queued_engine before the task is run.
76 76
77 77 This method should simply add callbacks to the input Deferred
78 78 that do something with the `queued_engine` before the task is run.
79 79
80 80 :Parameters:
81 81 d : Deferred
82 82 The deferred that actions should be attached to
83 83 queued_engine : IQueuedEngine implementer
84 84 The worker that has been allocated to perform the task
85 85 """
86 86
87 87 def post_task(d, queued_engine):
88 88 """Do something with the queued_engine after the task is run.
89 89
90 90 This method should simply add callbacks to the input Deferred
91 91 that do something with the `queued_engine` before the task is run.
92 92
93 93 :Parameters:
94 94 d : Deferred
95 95 The deferred that actions should be attached to
96 96 queued_engine : IQueuedEngine implementer
97 97 The worker that has been allocated to perform the task
98 98 """
99 99
100 100 def submit_task(d, queued_engine):
101 101 """Submit a task using the `queued_engine` we have been allocated.
102 102
103 103 When a task is ready to run, this method is called. This method
104 104 must take the internal information of the task and make suitable
105 105 calls on the queued_engine to have the actual work done.
106 106
107 107 This method should simply add callbacks to the input Deferred
108 108 that do something with the `queued_engine` before the task is run.
109 109
110 110 :Parameters:
111 111 d : Deferred
112 112 The deferred that actions should be attached to
113 113 queued_engine : IQueuedEngine implementer
114 114 The worker that has been allocated to perform the task
115 115 """
116 116
117 117 def process_result(d, result, engine_id):
118 118 """Take a raw task result.
119 119
120 120 Objects that implement `ITask` can choose how the result of running
121 121 the task is presented. This method takes the raw result and
122 122 does this logic. Two example are the `MapTask` which simply returns
123 123 the raw result or a `Failure` object and the `StringTask` which
124 124 returns a `TaskResult` object.
125 125
126 126 :Parameters:
127 127 d : Deferred
128 128 The deferred that actions should be attached to
129 129 result : object
130 130 The raw task result that needs to be wrapped
131 131 engine_id : int
132 132 The id of the engine that did the task
133 133
134 134 :Returns:
135 135 The result, as a tuple of the form: (success, result).
136 136 Here, success is a boolean indicating if the task
137 137 succeeded or failed and result is the result.
138 138 """
139 139
140 140 def check_depend(properties):
141 141 """Check properties to see if the task should be run.
142 142
143 143 :Parameters:
144 144 properties : dict
145 145 A dictionary of properties that an engine has set
146 146
147 147 :Returns:
148 148 True if the task should be run, False otherwise
149 149 """
150 150
151 151 def can_task(self):
152 152 """Serialize (can) any functions in the task for pickling.
153 153
154 154 Subclasses must override this method and make sure that all
155 155 functions in the task are canned by calling `can` on the
156 156 function.
157 157 """
158 158
159 159 def uncan_task(self):
160 160 """Unserialize (uncan) any canned function in the task."""
161 161
162 162 class BaseTask(object):
163 163 """
164 164 Common fuctionality for all objects implementing `ITask`.
165 165 """
166 166
167 167 zi.implements(ITask)
168 168
169 169 def __init__(self, clear_before=False, clear_after=False, retries=0,
170 170 recovery_task=None, depend=None):
171 171 """
172 172 Make a generic task.
173 173
174 174 :Parameters:
175 175 clear_before : boolean
176 176 Should the engines namespace be cleared before the task
177 177 is run
178 178 clear_after : boolean
179 179 Should the engines namespace be clear after the task is run
180 180 retries : int
181 181 The number of times a task should be retries upon failure
182 182 recovery_task : any task object
183 183 If a task fails and it has a recovery_task, that is run
184 184 upon a retry
185 185 depend : FunctionType
186 186 A function that is called to test for properties. This function
187 187 must take one argument, the properties dict and return a boolean
188 188 """
189 189 self.clear_before = clear_before
190 190 self.clear_after = clear_after
191 191 self.retries = retries
192 192 self.recovery_task = recovery_task
193 193 self.depend = depend
194 194 self.taskid = None
195 195
196 196 def start_time(self, result):
197 197 """
198 198 Start the basic timers.
199 199 """
200 200 self.start = time.time()
201 201 self.start_struct = time.localtime()
202 202 return result
203 203
204 204 def stop_time(self, result):
205 205 """
206 206 Stop the basic timers.
207 207 """
208 208 self.stop = time.time()
209 209 self.stop_struct = time.localtime()
210 210 self.duration = self.stop - self.start
211 211 self.submitted = time.strftime(time_format, self.start_struct)
212 212 self.completed = time.strftime(time_format)
213 213 return result
214 214
215 215 def pre_task(self, d, queued_engine):
216 216 """
217 217 Clear the engine before running the task if clear_before is set.
218 218 """
219 219 if self.clear_before:
220 220 d.addCallback(lambda r: queued_engine.reset())
221 221
222 222 def post_task(self, d, queued_engine):
223 223 """
224 224 Clear the engine after running the task if clear_after is set.
225 225 """
226 226 def reseter(result):
227 227 queued_engine.reset()
228 228 return result
229 229 if self.clear_after:
230 230 d.addBoth(reseter)
231 231
232 232 def submit_task(self, d, queued_engine):
233 233 raise NotImplementedError('submit_task must be implemented in a subclass')
234 234
235 235 def process_result(self, result, engine_id):
236 236 """
237 237 Process a task result.
238 238
239 239 This is the default `process_result` that just returns the raw
240 240 result or a `Failure`.
241 241 """
242 242 if isinstance(result, failure.Failure):
243 243 return (False, result)
244 244 else:
245 245 return (True, result)
246 246
247 247 def check_depend(self, properties):
248 248 """
249 249 Calls self.depend(properties) to see if a task should be run.
250 250 """
251 251 if self.depend is not None:
252 252 return self.depend(properties)
253 253 else:
254 254 return True
255 255
256 256 def can_task(self):
257 257 self.depend = can(self.depend)
258 258 if isinstance(self.recovery_task, BaseTask):
259 259 self.recovery_task.can_task()
260 260
261 261 def uncan_task(self):
262 262 self.depend = uncan(self.depend)
263 263 if isinstance(self.recovery_task, BaseTask):
264 264 self.recovery_task.uncan_task()
265 265
266 266 class MapTask(BaseTask):
267 267 """
268 268 A task that consists of a function and arguments.
269 269 """
270 270
271 271 zi.implements(ITask)
272 272
273 273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
274 274 clear_after=False, retries=0, recovery_task=None, depend=None):
275 275 """
276 276 Create a task based on a function, args and kwargs.
277 277
278 278 This is a simple type of task that consists of calling:
279 279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
280 280
281 281 The return value of the function, or a `Failure` wrapping an
282 282 exception is the task result for this type of task.
283 283 """
284 284 BaseTask.__init__(self, clear_before, clear_after, retries,
285 285 recovery_task, depend)
286 286 if not isinstance(function, FunctionType):
287 287 raise TypeError('a task function must be a FunctionType')
288 288 self.function = function
289 289 if args is None:
290 290 self.args = ()
291 291 else:
292 292 self.args = args
293 293 if not isinstance(self.args, (list, tuple)):
294 294 raise TypeError('a task args must be a list or tuple')
295 295 if kwargs is None:
296 296 self.kwargs = {}
297 297 else:
298 298 self.kwargs = kwargs
299 299 if not isinstance(self.kwargs, dict):
300 300 raise TypeError('a task kwargs must be a dict')
301 301
302 302 def submit_task(self, d, queued_engine):
303 303 d.addCallback(lambda r: queued_engine.push_function(
304 304 dict(_ipython_task_function=self.function))
305 305 )
306 306 d.addCallback(lambda r: queued_engine.push(
307 307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
308 308 )
309 309 d.addCallback(lambda r: queued_engine.execute(
310 310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
311 311 )
312 312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
313 313
314 314 def can_task(self):
315 315 self.function = can(self.function)
316 316 BaseTask.can_task(self)
317 317
318 318 def uncan_task(self):
319 319 self.function = uncan(self.function)
320 320 BaseTask.uncan_task(self)
321 321
322 322
323 323 class StringTask(BaseTask):
324 324 """
325 325 A task that consists of a string of Python code to run.
326 326 """
327 327
328 328 def __init__(self, expression, pull=None, push=None,
329 329 clear_before=False, clear_after=False, retries=0,
330 330 recovery_task=None, depend=None):
331 331 """
332 332 Create a task based on a Python expression and variables
333 333
334 334 This type of task lets you push a set of variables to the engines
335 335 namespace, run a Python string in that namespace and then bring back
336 336 a different set of Python variables as the result.
337 337
338 338 Because this type of task can return many results (through the
339 339 `pull` keyword argument) it returns a special `TaskResult` object
340 340 that wraps the pulled variables, statistics about the run and
341 341 any exceptions raised.
342 342 """
343 343 if not isinstance(expression, str):
344 344 raise TypeError('a task expression must be a string')
345 345 self.expression = expression
346 346
347 347 if pull==None:
348 348 self.pull = ()
349 349 elif isinstance(pull, str):
350 350 self.pull = (pull,)
351 351 elif isinstance(pull, (list, tuple)):
352 352 self.pull = pull
353 353 else:
354 354 raise TypeError('pull must be str or a sequence of strs')
355 355
356 356 if push==None:
357 357 self.push = {}
358 358 elif isinstance(push, dict):
359 359 self.push = push
360 360 else:
361 361 raise TypeError('push must be a dict')
362 362
363 363 BaseTask.__init__(self, clear_before, clear_after, retries,
364 364 recovery_task, depend)
365 365
366 366 def submit_task(self, d, queued_engine):
367 367 if self.push is not None:
368 368 d.addCallback(lambda r: queued_engine.push(self.push))
369 369
370 370 d.addCallback(lambda r: queued_engine.execute(self.expression))
371 371
372 372 if self.pull is not None:
373 373 d.addCallback(lambda r: queued_engine.pull(self.pull))
374 374 else:
375 375 d.addCallback(lambda r: None)
376 376
377 377 def process_result(self, result, engine_id):
378 378 if isinstance(result, failure.Failure):
379 379 tr = TaskResult(result, engine_id)
380 380 else:
381 381 if self.pull is None:
382 382 resultDict = {}
383 383 elif len(self.pull) == 1:
384 384 resultDict = {self.pull[0]:result}
385 385 else:
386 386 resultDict = dict(zip(self.pull, result))
387 387 tr = TaskResult(resultDict, engine_id)
388 388 # Assign task attributes
389 389 tr.submitted = self.submitted
390 390 tr.completed = self.completed
391 391 tr.duration = self.duration
392 392 if hasattr(self,'taskid'):
393 393 tr.taskid = self.taskid
394 394 else:
395 395 tr.taskid = None
396 396 if isinstance(result, failure.Failure):
397 397 return (False, tr)
398 398 else:
399 399 return (True, tr)
400 400
401 401 class ResultNS(object):
402 402 """
403 403 A dict like object for holding the results of a task.
404 404
405 405 The result namespace object for use in `TaskResult` objects as tr.ns.
406 406 It builds an object from a dictionary, such that it has attributes
407 407 according to the key,value pairs of the dictionary.
408 408
409 409 This works by calling setattr on ALL key,value pairs in the dict. If a user
410 410 chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
411 411 This can be a bad idea, as it may corrupt standard behavior of the
412 412 ns object.
413 413
414 414 Example
415 415 --------
416 416
417 417 >>> ns = ResultNS({'a':17,'foo':range(3)})
418 418 >>> print ns
419 NS{'a':17,'foo':range(3)}
419 NS{'a': 17, 'foo': [0, 1, 2]}
420 420 >>> ns.a
421 421 17
422 422 >>> ns['foo']
423 423 [0,1,2]
424 424 """
425 425 def __init__(self, dikt):
426 426 for k,v in dikt.iteritems():
427 427 setattr(self,k,v)
428 428
429 429 def __repr__(self):
430 430 l = dir(self)
431 431 d = {}
432 432 for k in l:
433 433 # do not print private objects
434 434 if k[:2] != '__' and k[-2:] != '__':
435 435 d[k] = getattr(self, k)
436 436 return "NS"+repr(d)
437 437
438 438 def __getitem__(self, key):
439 439 return getattr(self, key)
440 440
441 441 class TaskResult(object):
442 442 """
443 443 An object for returning task results for certain types of tasks.
444 444
445 445 This object encapsulates the results of a task. On task
446 446 success it will have a keys attribute that will have a list
447 447 of the variables that have been pulled back. These variables
448 448 are accessible as attributes of this class as well. On
449 449 success the failure attribute will be None.
450 450
451 451 In task failure, keys will be empty, but failure will contain
452 452 the failure object that encapsulates the remote exception.
453 453 One can also simply call the `raise_exception` method of
454 454 this class to re-raise any remote exception in the local
455 455 session.
456 456
457 457 The `TaskResult` has a `.ns` member, which is a property for access
458 458 to the results. If the Task had pull=['a', 'b'], then the
459 459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
460 460 Accessing `tr.ns` will raise the remote failure if the task failed.
461 461
462 462 The `engineid` attribute should have the `engineid` of the engine
463 463 that ran the task. But, because engines can come and go,
464 464 the `engineid` may not continue to be
465 465 valid or accurate.
466 466
467 467 The `taskid` attribute simply gives the `taskid` that the task
468 468 is tracked under.
469 469 """
470 470 taskid = None
471 471
472 472 def _getNS(self):
473 473 if isinstance(self.failure, failure.Failure):
474 474 return self.failure.raiseException()
475 475 else:
476 476 return self._ns
477 477
478 478 def _setNS(self, v):
479 479 raise Exception("the ns attribute cannot be changed")
480 480
481 481 ns = property(_getNS, _setNS)
482 482
483 483 def __init__(self, results, engineid):
484 484 self.engineid = engineid
485 485 if isinstance(results, failure.Failure):
486 486 self.failure = results
487 487 self.results = {}
488 488 else:
489 489 self.results = results
490 490 self.failure = None
491 491
492 492 self._ns = ResultNS(self.results)
493 493
494 494 self.keys = self.results.keys()
495 495
496 496 def __repr__(self):
497 497 if self.failure is not None:
498 498 contents = self.failure
499 499 else:
500 500 contents = self.results
501 501 return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
502 502
503 503 def __getitem__(self, key):
504 504 if self.failure is not None:
505 505 self.raise_exception()
506 506 return self.results[key]
507 507
508 508 def raise_exception(self):
509 509 """Re-raise any remote exceptions in the local python session."""
510 510 if self.failure is not None:
511 511 self.failure.raiseException()
512 512
513 513
514 514 #-----------------------------------------------------------------------------
515 515 # The controller side of things
516 516 #-----------------------------------------------------------------------------
517 517
518 518 class IWorker(zi.Interface):
519 519 """The Basic Worker Interface.
520 520
521 521 A worked is a representation of an Engine that is ready to run tasks.
522 522 """
523 523
524 524 zi.Attribute("workerid", "the id of the worker")
525 525
526 526 def run(task):
527 527 """Run task in worker's namespace.
528 528
529 529 :Parameters:
530 530 task : a `Task` object
531 531
532 532 :Returns: `Deferred` to a tuple of (success, result) where
533 533 success if a boolean that signifies success or failure
534 534 and result is the task result.
535 535 """
536 536
537 537
538 538 class WorkerFromQueuedEngine(object):
539 539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
540 540
541 541 zi.implements(IWorker)
542 542
543 543 def __init__(self, qe):
544 544 self.queuedEngine = qe
545 545 self.workerid = None
546 546
547 547 def _get_properties(self):
548 548 return self.queuedEngine.properties
549 549
550 550 properties = property(_get_properties, lambda self, _:None)
551 551
552 552 def run(self, task):
553 553 """Run task in worker's namespace.
554 554
555 555 This takes a task and calls methods on the task that actually
556 556 cause `self.queuedEngine` to do the task. See the methods of
557 557 `ITask` for more information about how these methods are called.
558 558
559 559 :Parameters:
560 560 task : a `Task` object
561 561
562 562 :Returns: `Deferred` to a tuple of (success, result) where
563 563 success if a boolean that signifies success or failure
564 564 and result is the task result.
565 565 """
566 566 d = defer.succeed(None)
567 567 d.addCallback(task.start_time)
568 568 task.pre_task(d, self.queuedEngine)
569 569 task.submit_task(d, self.queuedEngine)
570 570 task.post_task(d, self.queuedEngine)
571 571 d.addBoth(task.stop_time)
572 572 d.addBoth(task.process_result, self.queuedEngine.id)
573 573 # At this point, there will be (success, result) coming down the line
574 574 return d
575 575
576 576
577 577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
578 578
579 579 class IScheduler(zi.Interface):
580 580 """The interface for a Scheduler.
581 581 """
582 582 zi.Attribute("nworkers", "the number of unassigned workers")
583 583 zi.Attribute("ntasks", "the number of unscheduled tasks")
584 584 zi.Attribute("workerids", "a list of the worker ids")
585 585 zi.Attribute("taskids", "a list of the task ids")
586 586
587 587 def add_task(task, **flags):
588 588 """Add a task to the queue of the Scheduler.
589 589
590 590 :Parameters:
591 591 task : an `ITask` implementer
592 592 The task to be queued.
593 593 flags : dict
594 594 General keywords for more sophisticated scheduling
595 595 """
596 596
597 597 def pop_task(id=None):
598 598 """Pops a task object from the queue.
599 599
600 600 This gets the next task to be run. If no `id` is requested, the highest priority
601 601 task is returned.
602 602
603 603 :Parameters:
604 604 id
605 605 The id of the task to be popped. The default (None) is to return
606 606 the highest priority task.
607 607
608 608 :Returns: an `ITask` implementer
609 609
610 610 :Exceptions:
611 611 IndexError : raised if no taskid in queue
612 612 """
613 613
614 614 def add_worker(worker, **flags):
615 615 """Add a worker to the worker queue.
616 616
617 617 :Parameters:
618 618 worker : an `IWorker` implementer
619 619 flags : dict
620 620 General keywords for more sophisticated scheduling
621 621 """
622 622
623 623 def pop_worker(id=None):
624 624 """Pops an IWorker object that is ready to do work.
625 625
626 626 This gets the next IWorker that is ready to do work.
627 627
628 628 :Parameters:
629 629 id : if specified, will pop worker with workerid=id, else pops
630 630 highest priority worker. Defaults to None.
631 631
632 632 :Returns:
633 633 an IWorker object
634 634
635 635 :Exceptions:
636 636 IndexError : raised if no workerid in queue
637 637 """
638 638
639 639 def ready():
640 640 """Returns True if there is something to do, False otherwise"""
641 641
642 642 def schedule():
643 643 """Returns (worker,task) pair for the next task to be run."""
644 644
645 645
646 646 class FIFOScheduler(object):
647 647 """
648 648 A basic First-In-First-Out (Queue) Scheduler.
649 649
650 650 This is the default Scheduler for the `TaskController`.
651 651 See the docstrings for `IScheduler` for interface details.
652 652 """
653 653
654 654 zi.implements(IScheduler)
655 655
656 656 def __init__(self):
657 657 self.tasks = []
658 658 self.workers = []
659 659
660 660 def _ntasks(self):
661 661 return len(self.tasks)
662 662
663 663 def _nworkers(self):
664 664 return len(self.workers)
665 665
666 666 ntasks = property(_ntasks, lambda self, _:None)
667 667 nworkers = property(_nworkers, lambda self, _:None)
668 668
669 669 def _taskids(self):
670 670 return [t.taskid for t in self.tasks]
671 671
672 672 def _workerids(self):
673 673 return [w.workerid for w in self.workers]
674 674
675 675 taskids = property(_taskids, lambda self,_:None)
676 676 workerids = property(_workerids, lambda self,_:None)
677 677
678 678 def add_task(self, task, **flags):
679 679 self.tasks.append(task)
680 680
681 681 def pop_task(self, id=None):
682 682 if id is None:
683 683 return self.tasks.pop(0)
684 684 else:
685 685 for i in range(len(self.tasks)):
686 686 taskid = self.tasks[i].taskid
687 687 if id == taskid:
688 688 return self.tasks.pop(i)
689 689 raise IndexError("No task #%i"%id)
690 690
691 691 def add_worker(self, worker, **flags):
692 692 self.workers.append(worker)
693 693
694 694 def pop_worker(self, id=None):
695 695 if id is None:
696 696 return self.workers.pop(0)
697 697 else:
698 698 for i in range(len(self.workers)):
699 699 workerid = self.workers[i].workerid
700 700 if id == workerid:
701 701 return self.workers.pop(i)
702 702 raise IndexError("No worker #%i"%id)
703 703
704 704 def schedule(self):
705 705 for t in self.tasks:
706 706 for w in self.workers:
707 707 try:# do not allow exceptions to break this
708 708 # Allow the task to check itself using its
709 709 # check_depend method.
710 710 cando = t.check_depend(w.properties)
711 711 except:
712 712 cando = False
713 713 if cando:
714 714 return self.pop_worker(w.workerid), self.pop_task(t.taskid)
715 715 return None, None
716 716
717 717
718 718
719 719 class LIFOScheduler(FIFOScheduler):
720 720 """
721 721 A Last-In-First-Out (Stack) Scheduler.
722 722
723 723 This scheduler should naively reward fast engines by giving
724 724 them more jobs. This risks starvation, but only in cases with
725 725 low load, where starvation does not really matter.
726 726 """
727 727
728 728 def add_task(self, task, **flags):
729 729 # self.tasks.reverse()
730 730 self.tasks.insert(0, task)
731 731 # self.tasks.reverse()
732 732
733 733 def add_worker(self, worker, **flags):
734 734 # self.workers.reverse()
735 735 self.workers.insert(0, worker)
736 736 # self.workers.reverse()
737 737
738 738
739 739 class ITaskController(cs.IControllerBase):
740 740 """
741 741 The Task based interface to a `ControllerService` object
742 742
743 743 This adapts a `ControllerService` to the ITaskController interface.
744 744 """
745 745
746 746 def run(task):
747 747 """
748 748 Run a task.
749 749
750 750 :Parameters:
751 751 task : an IPython `Task` object
752 752
753 753 :Returns: the integer ID of the task
754 754 """
755 755
756 756 def get_task_result(taskid, block=False):
757 757 """
758 758 Get the result of a task by its ID.
759 759
760 760 :Parameters:
761 761 taskid : int
762 762 the id of the task whose result is requested
763 763
764 764 :Returns: `Deferred` to the task result if the task is done, and None
765 765 if not.
766 766
767 767 :Exceptions:
768 768 actualResult will be an `IndexError` if no such task has been submitted
769 769 """
770 770
771 771 def abort(taskid):
772 772 """Remove task from queue if task is has not been submitted.
773 773
774 774 If the task has already been submitted, wait for it to finish and discard
775 775 results and prevent resubmission.
776 776
777 777 :Parameters:
778 778 taskid : the id of the task to be aborted
779 779
780 780 :Returns:
781 781 `Deferred` to abort attempt completion. Will be None on success.
782 782
783 783 :Exceptions:
784 784 deferred will fail with `IndexError` if no such task has been submitted
785 785 or the task has already completed.
786 786 """
787 787
788 788 def barrier(taskids):
789 789 """
790 790 Block until the list of taskids are completed.
791 791
792 792 Returns None on success.
793 793 """
794 794
795 795 def spin():
796 796 """
797 797 Touch the scheduler, to resume scheduling without submitting a task.
798 798 """
799 799
800 800 def queue_status(verbose=False):
801 801 """
802 802 Get a dictionary with the current state of the task queue.
803 803
804 804 If verbose is True, then return lists of taskids, otherwise,
805 805 return the number of tasks with each status.
806 806 """
807 807
808 808 def clear():
809 809 """
810 810 Clear all previously run tasks from the task controller.
811 811
812 812 This is needed because the task controller keep all task results
813 813 in memory. This can be a problem is there are many completed
814 814 tasks. Users should call this periodically to clean out these
815 815 cached task results.
816 816 """
817 817
818 818
819 819 class TaskController(cs.ControllerAdapterBase):
820 820 """The Task based interface to a Controller object.
821 821
822 822 If you want to use a different scheduler, just subclass this and set
823 823 the `SchedulerClass` member to the *class* of your chosen scheduler.
824 824 """
825 825
826 826 zi.implements(ITaskController)
827 827 SchedulerClass = FIFOScheduler
828 828
829 829 timeout = 30
830 830
831 831 def __init__(self, controller):
832 832 self.controller = controller
833 833 self.controller.on_register_engine_do(self.registerWorker, True)
834 834 self.controller.on_unregister_engine_do(self.unregisterWorker, True)
835 835 self.taskid = 0
836 836 self.failurePenalty = 1 # the time in seconds to penalize
837 837 # a worker for failing a task
838 838 self.pendingTasks = {} # dict of {workerid:(taskid, task)}
839 839 self.deferredResults = {} # dict of {taskid:deferred}
840 840 self.finishedResults = {} # dict of {taskid:actualResult}
841 841 self.workers = {} # dict of {workerid:worker}
842 842 self.abortPending = [] # dict of {taskid:abortDeferred}
843 843 self.idleLater = None # delayed call object for timeout
844 844 self.scheduler = self.SchedulerClass()
845 845
846 846 for id in self.controller.engines.keys():
847 847 self.workers[id] = IWorker(self.controller.engines[id])
848 848 self.workers[id].workerid = id
849 849 self.schedule.add_worker(self.workers[id])
850 850
851 851 def registerWorker(self, id):
852 852 """Called by controller.register_engine."""
853 853 if self.workers.get(id):
854 854 raise ValueError("worker with id %s already exists. This should not happen." % id)
855 855 self.workers[id] = IWorker(self.controller.engines[id])
856 856 self.workers[id].workerid = id
857 857 if not self.pendingTasks.has_key(id):# if not working
858 858 self.scheduler.add_worker(self.workers[id])
859 859 self.distributeTasks()
860 860
861 861 def unregisterWorker(self, id):
862 862 """Called by controller.unregister_engine"""
863 863
864 864 if self.workers.has_key(id):
865 865 try:
866 866 self.scheduler.pop_worker(id)
867 867 except IndexError:
868 868 pass
869 869 self.workers.pop(id)
870 870
871 871 def _pendingTaskIDs(self):
872 872 return [t.taskid for t in self.pendingTasks.values()]
873 873
874 874 #---------------------------------------------------------------------------
875 875 # Interface methods
876 876 #---------------------------------------------------------------------------
877 877
878 878 def run(self, task):
879 879 """
880 880 Run a task and return `Deferred` to its taskid.
881 881 """
882 882 task.taskid = self.taskid
883 883 task.start = time.localtime()
884 884 self.taskid += 1
885 885 d = defer.Deferred()
886 886 self.scheduler.add_task(task)
887 887 log.msg('Queuing task: %i' % task.taskid)
888 888
889 889 self.deferredResults[task.taskid] = []
890 890 self.distributeTasks()
891 891 return defer.succeed(task.taskid)
892 892
893 893 def get_task_result(self, taskid, block=False):
894 894 """
895 895 Returns a `Deferred` to the task result, or None.
896 896 """
897 897 log.msg("Getting task result: %i" % taskid)
898 898 if self.finishedResults.has_key(taskid):
899 899 tr = self.finishedResults[taskid]
900 900 return defer.succeed(tr)
901 901 elif self.deferredResults.has_key(taskid):
902 902 if block:
903 903 d = defer.Deferred()
904 904 self.deferredResults[taskid].append(d)
905 905 return d
906 906 else:
907 907 return defer.succeed(None)
908 908 else:
909 909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
910 910
911 911 def abort(self, taskid):
912 912 """
913 913 Remove a task from the queue if it has not been run already.
914 914 """
915 915 if not isinstance(taskid, int):
916 916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
917 917 try:
918 918 self.scheduler.pop_task(taskid)
919 919 except IndexError, e:
920 920 if taskid in self.finishedResults.keys():
921 921 d = defer.fail(IndexError("Task Already Completed"))
922 922 elif taskid in self.abortPending:
923 923 d = defer.fail(IndexError("Task Already Aborted"))
924 924 elif taskid in self._pendingTaskIDs():# task is pending
925 925 self.abortPending.append(taskid)
926 926 d = defer.succeed(None)
927 927 else:
928 928 d = defer.fail(e)
929 929 else:
930 930 d = defer.execute(self._doAbort, taskid)
931 931
932 932 return d
933 933
934 934 def barrier(self, taskids):
935 935 dList = []
936 936 if isinstance(taskids, int):
937 937 taskids = [taskids]
938 938 for id in taskids:
939 939 d = self.get_task_result(id, block=True)
940 940 dList.append(d)
941 941 d = DeferredList(dList, consumeErrors=1)
942 942 d.addCallbacks(lambda r: None)
943 943 return d
944 944
945 945 def spin(self):
946 946 return defer.succeed(self.distributeTasks())
947 947
948 948 def queue_status(self, verbose=False):
949 949 pending = self._pendingTaskIDs()
950 950 failed = []
951 951 succeeded = []
952 952 for k,v in self.finishedResults.iteritems():
953 953 if not isinstance(v, failure.Failure):
954 954 if hasattr(v,'failure'):
955 955 if v.failure is None:
956 956 succeeded.append(k)
957 957 else:
958 958 failed.append(k)
959 959 scheduled = self.scheduler.taskids
960 960 if verbose:
961 961 result = dict(pending=pending, failed=failed,
962 962 succeeded=succeeded, scheduled=scheduled)
963 963 else:
964 964 result = dict(pending=len(pending),failed=len(failed),
965 965 succeeded=len(succeeded),scheduled=len(scheduled))
966 966 return defer.succeed(result)
967 967
968 968 #---------------------------------------------------------------------------
969 969 # Queue methods
970 970 #---------------------------------------------------------------------------
971 971
972 972 def _doAbort(self, taskid):
973 973 """
974 974 Helper function for aborting a pending task.
975 975 """
976 976 log.msg("Task aborted: %i" % taskid)
977 977 result = failure.Failure(error.TaskAborted())
978 978 self._finishTask(taskid, result)
979 979 if taskid in self.abortPending:
980 980 self.abortPending.remove(taskid)
981 981
982 982 def _finishTask(self, taskid, result):
983 983 dlist = self.deferredResults.pop(taskid)
984 984 # result.taskid = taskid # The TaskResult should save the taskid
985 985 self.finishedResults[taskid] = result
986 986 for d in dlist:
987 987 d.callback(result)
988 988
989 989 def distributeTasks(self):
990 990 """
991 991 Distribute tasks while self.scheduler has things to do.
992 992 """
993 993 log.msg("distributing Tasks")
994 994 worker, task = self.scheduler.schedule()
995 995 if not worker and not task:
996 996 if self.idleLater and self.idleLater.called:# we are inside failIdle
997 997 self.idleLater = None
998 998 else:
999 999 self.checkIdle()
1000 1000 return False
1001 1001 # else something to do:
1002 1002 while worker and task:
1003 1003 # get worker and task
1004 1004 # add to pending
1005 1005 self.pendingTasks[worker.workerid] = task
1006 1006 # run/link callbacks
1007 1007 d = worker.run(task)
1008 1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1009 1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
1010 1010 worker, task = self.scheduler.schedule()
1011 1011 # check for idle timeout:
1012 1012 self.checkIdle()
1013 1013 return True
1014 1014
1015 1015 def checkIdle(self):
1016 1016 if self.idleLater and not self.idleLater.called:
1017 1017 self.idleLater.cancel()
1018 1018 if self.scheduler.ntasks and self.workers and \
1019 1019 self.scheduler.nworkers == len(self.workers):
1020 1020 self.idleLater = reactor.callLater(self.timeout, self.failIdle)
1021 1021 else:
1022 1022 self.idleLater = None
1023 1023
1024 1024 def failIdle(self):
1025 1025 if not self.distributeTasks():
1026 1026 while self.scheduler.ntasks:
1027 1027 t = self.scheduler.pop_task()
1028 1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
1029 1029 msg += " for %i seconds"%self.timeout
1030 1030 log.msg("Task aborted by timeout: %i" % t.taskid)
1031 1031 f = failure.Failure(error.TaskTimeout(msg))
1032 1032 self._finishTask(t.taskid, f)
1033 1033 self.idleLater = None
1034 1034
1035 1035
1036 1036 def taskCompleted(self, success_and_result, taskid, workerid):
1037 1037 """This is the err/callback for a completed task."""
1038 1038 success, result = success_and_result
1039 1039 try:
1040 1040 task = self.pendingTasks.pop(workerid)
1041 1041 except:
1042 1042 # this should not happen
1043 1043 log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
1044 1044 log.msg("Result: %r"%result)
1045 1045 log.msg("Pending tasks: %s"%self.pendingTasks)
1046 1046 return
1047 1047
1048 1048 # Check if aborted while pending
1049 1049 aborted = False
1050 1050 if taskid in self.abortPending:
1051 1051 self._doAbort(taskid)
1052 1052 aborted = True
1053 1053
1054 1054 if not aborted:
1055 1055 if not success:
1056 1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1057 1057 if task.retries > 0: # resubmit
1058 1058 task.retries -= 1
1059 1059 self.scheduler.add_task(task)
1060 1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
1061 1061 log.msg(s)
1062 1062 self.distributeTasks()
1063 1063 elif isinstance(task.recovery_task, BaseTask) and \
1064 1064 task.recovery_task.retries > -1:
1065 1065 # retries = -1 is to prevent infinite recovery_task loop
1066 1066 task.retries = -1
1067 1067 task.recovery_task.taskid = taskid
1068 1068 task = task.recovery_task
1069 1069 self.scheduler.add_task(task)
1070 1070 s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
1071 1071 log.msg(s)
1072 1072 self.distributeTasks()
1073 1073 else: # done trying
1074 1074 self._finishTask(taskid, result)
1075 1075 # wait a second before readmitting a worker that failed
1076 1076 # it may have died, and not yet been unregistered
1077 1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1078 1078 else: # we succeeded
1079 1079 log.msg("Task completed: %i"% taskid)
1080 1080 self._finishTask(taskid, result)
1081 1081 self.readmitWorker(workerid)
1082 1082 else: # we aborted the task
1083 1083 if not success:
1084 1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
1085 1085 else:
1086 1086 self.readmitWorker(workerid)
1087 1087
1088 1088 def readmitWorker(self, workerid):
1089 1089 """
1090 1090 Readmit a worker to the scheduler.
1091 1091
1092 1092 This is outside `taskCompleted` because of the `failurePenalty` being
1093 1093 implemented through `reactor.callLater`.
1094 1094 """
1095 1095
1096 1096 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
1097 1097 self.scheduler.add_worker(self.workers[workerid])
1098 1098 self.distributeTasks()
1099 1099
1100 1100 def clear(self):
1101 1101 """
1102 1102 Clear all previously run tasks from the task controller.
1103 1103
1104 1104 This is needed because the task controller keep all task results
1105 1105 in memory. This can be a problem is there are many completed
1106 1106 tasks. Users should call this periodically to clean out these
1107 1107 cached task results.
1108 1108 """
1109 1109 self.finishedResults = {}
1110 1110 return defer.succeed(None)
1111 1111
1112 1112
1113 1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
General Comments 0
You need to be logged in to leave comments. Login now