##// END OF EJS Templates
Initial refactor of task dependency system....
Brian Granger -
Show More
@@ -0,0 +1,53 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """
5 A new example showing how to use `TaskRejectError` to handle dependencies
6 in the IPython task system.
7
8 To run this example, do::
9
10 $ ipcluster local -n 4
11
12 Then, in another terminal start up IPython and do::
13
14 In [0]: %run taskreject.py
15
16 In [1]: mec.execute('run=True', targets=[0,1])
17
18 After the first command, the scheduler will keep rescheduling the tasks, as
19 they will fail with `TaskRejectError`. But after the second command, there
20 are two engines that the tasks can run on. The tasks are quickly funneled
21 to these engines.
22
23 If you want to see how the controller is scheduling and retrying the tasks
24 do a `tail -f` on the controller's log file in ~/.ipython/log.
25 """
26
27 #-----------------------------------------------------------------------------
28 # Copyright (C) 2008-2009 The IPython Development Team
29 #
30 # Distributed under the terms of the BSD License. The full license is in
31 # the file COPYING, distributed as part of this software.
32 #-----------------------------------------------------------------------------
33
34 from IPython.kernel import client
35 from IPython.kernel import TaskRejectError
36
37 mec = client.MultiEngineClient()
38 tc = client.TaskClient()
39
40 mec.execute('from IPython.kernel import TaskRejectError')
41 mec.execute('run = False')
42
43 def map_task():
44 if not run:
45 raise TaskRejectError('task dependency not met')
46 return 3.0e8
47
48 task_ids = []
49
50 for i in range(10):
51 task = client.MapTask(map_task, retries=20)
52 task_ids.append(tc.run(task, block=False))
53
@@ -21,3 +21,5 b' __docformat__ = "restructuredtext en"'
21 21 # Distributed under the terms of the BSD License. The full license is in
22 22 # the file COPYING, distributed as part of this software.
23 23 #-----------------------------------------------------------------------------
24
25 from IPython.kernel.error import TaskRejectError No newline at end of file
@@ -107,6 +107,20 b' class SecurityError(KernelError):'
107 107 class FileTimeoutError(KernelError):
108 108 pass
109 109
110 class TaskRejectError(KernelError):
111 """Exception to raise when a task should be rejected by an engine.
112
113 This exception can be used to allow a task running on an engine to test
114 if the engine (or the user's namespace on the engine) has the needed
115 task dependencies. If not, the task should raise this exception. For
116 the task to be retried on another engine, the task should be created
117 with the `retries` argument > 1.
118
119 The advantage of this approach over our older properties system is that
120 tasks have full access to the user's namespace on the engines and the
121 properties don't have to be managed or tested by the controller.
122 """
123
110 124 class CompositeError(KernelError):
111 125 def __init__(self, message, elist):
112 126 Exception.__init__(self, *(message, elist))
@@ -20,6 +20,7 b' import sys'
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22 import linecache
23 import warnings
23 24
24 25 from twisted.internet import reactor
25 26 from twisted.python import components, log
@@ -389,6 +390,14 b' def strip_whitespace(source):'
389 390 #-------------------------------------------------------------------------------
390 391
391 392
393 _prop_warn = """\
394
395 We are currently refactoring the task dependency system. This might
396 involve the removal of this method and other methods related to engine
397 properties. Please see the docstrings for IPython.kernel.TaskRejectError
398 for more information."""
399
400
392 401 class IFullBlockingMultiEngineClient(Interface):
393 402 pass
394 403
@@ -730,22 +739,27 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
730 739 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731 740
732 741 def set_properties(self, properties, targets=None, block=None):
742 warnings.warn(_prop_warn)
733 743 targets, block = self._findTargetsAndBlock(targets, block)
734 744 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735 745
736 746 def get_properties(self, keys=None, targets=None, block=None):
747 warnings.warn(_prop_warn)
737 748 targets, block = self._findTargetsAndBlock(targets, block)
738 749 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739 750
740 751 def has_properties(self, keys, targets=None, block=None):
752 warnings.warn(_prop_warn)
741 753 targets, block = self._findTargetsAndBlock(targets, block)
742 754 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743 755
744 756 def del_properties(self, keys, targets=None, block=None):
757 warnings.warn(_prop_warn)
745 758 targets, block = self._findTargetsAndBlock(targets, block)
746 759 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747 760
748 761 def clear_properties(self, targets=None, block=None):
762 warnings.warn(_prop_warn)
749 763 targets, block = self._findTargetsAndBlock(targets, block)
750 764 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751 765
1 NO CONTENT: modified file
@@ -2,6 +2,10 b''
2 2 IPython/Vision Beam Pattern Demo
3 3 ==================================
4 4
5 .. note::
6
7 This page has not been updated to reflect the recent work on ipcluster.
8 This work makes it much easier to use IPython on a cluster.
5 9
6 10 Installing and testing IPython at OSC systems
7 11 =============================================
General Comments 0
You need to be logged in to leave comments. Login now