##// END OF EJS Templates
Initial refactor of task dependency system....
Brian Granger -
Show More
@@ -0,0 +1,53 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """
5 A new example showing how to use `TaskRejectError` to handle dependencies
6 in the IPython task system.
7
8 To run this example, do::
9
10 $ ipcluster local -n 4
11
12 Then, in another terminal start up IPython and do::
13
14 In [0]: %run taskreject.py
15
16 In [1]: mec.execute('run=True', targets=[0,1])
17
18 After the first command, the scheduler will keep rescheduling the tasks, as
19 they will fail with `TaskRejectError`. But after the second command, there
20 are two engines that the tasks can run on. The tasks are quickly funneled
21 to these engines.
22
23 If you want to see how the controller is scheduling and retrying the tasks
24 do a `tail -f` on the controller's log file in ~/.ipython/log.
25 """
26
27 #-----------------------------------------------------------------------------
28 # Copyright (C) 2008-2009 The IPython Development Team
29 #
30 # Distributed under the terms of the BSD License. The full license is in
31 # the file COPYING, distributed as part of this software.
32 #-----------------------------------------------------------------------------
33
34 from IPython.kernel import client
35 from IPython.kernel import TaskRejectError
36
37 mec = client.MultiEngineClient()
38 tc = client.TaskClient()
39
40 mec.execute('from IPython.kernel import TaskRejectError')
41 mec.execute('run = False')
42
43 def map_task():
44 if not run:
45 raise TaskRejectError('task dependency not met')
46 return 3.0e8
47
48 task_ids = []
49
50 for i in range(10):
51 task = client.MapTask(map_task, retries=20)
52 task_ids.append(tc.run(task, block=False))
53
@@ -21,3 +21,5 b' __docformat__ = "restructuredtext en"'
21 # Distributed under the terms of the BSD License. The full license is in
21 # Distributed under the terms of the BSD License. The full license is in
22 # the file COPYING, distributed as part of this software.
22 # the file COPYING, distributed as part of this software.
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24
25 from IPython.kernel.error import TaskRejectError No newline at end of file
@@ -107,6 +107,20 b' class SecurityError(KernelError):'
107 class FileTimeoutError(KernelError):
107 class FileTimeoutError(KernelError):
108 pass
108 pass
109
109
110 class TaskRejectError(KernelError):
111 """Exception to raise when a task should be rejected by an engine.
112
113 This exception can be used to allow a task running on an engine to test
114 if the engine (or the user's namespace on the engine) has the needed
115 task dependencies. If not, the task should raise this exception. For
116 the task to be retried on another engine, the task should be created
117 with the `retries` argument > 1.
118
119 The advantage of this approach over our older properties system is that
120 tasks have full access to the user's namespace on the engines and the
121 properties don't have to be managed or tested by the controller.
122 """
123
110 class CompositeError(KernelError):
124 class CompositeError(KernelError):
111 def __init__(self, message, elist):
125 def __init__(self, message, elist):
112 Exception.__init__(self, *(message, elist))
126 Exception.__init__(self, *(message, elist))
@@ -20,6 +20,7 b' import sys'
20 import cPickle as pickle
20 import cPickle as pickle
21 from types import FunctionType
21 from types import FunctionType
22 import linecache
22 import linecache
23 import warnings
23
24
24 from twisted.internet import reactor
25 from twisted.internet import reactor
25 from twisted.python import components, log
26 from twisted.python import components, log
@@ -389,6 +390,14 b' def strip_whitespace(source):'
389 #-------------------------------------------------------------------------------
390 #-------------------------------------------------------------------------------
390
391
391
392
393 _prop_warn = """\
394
395 We are currently refactoring the task dependency system. This might
396 involve the removal of this method and other methods related to engine
397 properties. Please see the docstrings for IPython.kernel.TaskRejectError
398 for more information."""
399
400
392 class IFullBlockingMultiEngineClient(Interface):
401 class IFullBlockingMultiEngineClient(Interface):
393 pass
402 pass
394
403
@@ -730,22 +739,27 b' class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):'
730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
739 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731
740
732 def set_properties(self, properties, targets=None, block=None):
741 def set_properties(self, properties, targets=None, block=None):
742 warnings.warn(_prop_warn)
733 targets, block = self._findTargetsAndBlock(targets, block)
743 targets, block = self._findTargetsAndBlock(targets, block)
734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
744 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735
745
736 def get_properties(self, keys=None, targets=None, block=None):
746 def get_properties(self, keys=None, targets=None, block=None):
747 warnings.warn(_prop_warn)
737 targets, block = self._findTargetsAndBlock(targets, block)
748 targets, block = self._findTargetsAndBlock(targets, block)
738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
749 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739
750
740 def has_properties(self, keys, targets=None, block=None):
751 def has_properties(self, keys, targets=None, block=None):
752 warnings.warn(_prop_warn)
741 targets, block = self._findTargetsAndBlock(targets, block)
753 targets, block = self._findTargetsAndBlock(targets, block)
742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
754 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743
755
744 def del_properties(self, keys, targets=None, block=None):
756 def del_properties(self, keys, targets=None, block=None):
757 warnings.warn(_prop_warn)
745 targets, block = self._findTargetsAndBlock(targets, block)
758 targets, block = self._findTargetsAndBlock(targets, block)
746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
759 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747
760
748 def clear_properties(self, targets=None, block=None):
761 def clear_properties(self, targets=None, block=None):
762 warnings.warn(_prop_warn)
749 targets, block = self._findTargetsAndBlock(targets, block)
763 targets, block = self._findTargetsAndBlock(targets, block)
750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
764 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751
765
1 NO CONTENT: modified file
NO CONTENT: modified file
@@ -2,6 +2,10 b''
2 IPython/Vision Beam Pattern Demo
2 IPython/Vision Beam Pattern Demo
3 ==================================
3 ==================================
4
4
5 .. note::
6
7 This page has not been updated to reflect the recent work on ipcluster.
8 This work makes it much easier to use IPython on a cluster.
5
9
6 Installing and testing IPython at OSC systems
10 Installing and testing IPython at OSC systems
7 =============================================
11 =============================================
General Comments 0
You need to be logged in to leave comments. Login now