##// END OF EJS Templates
The refactoring of the Task system is nearly complete. Now there are...
Brian E Granger -
Show More
@@ -0,0 +1,18 b''
1 from IPython.kernel import client
2
3 mec = client.MultiEngineClient()
4
5 result = mec.map(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 m = mec.mapper(block=False)
9 pr = m.map(lambda x: 2*x, range(10))
10 print "Submitted map, got PendingResult: ", pr
11 result = pr.r
12 print "Using a mapper: ", result
13
14 @mec.parallel()
15 def f(x): return 2*x
16
17 result = f(range(10))
18 print "Using a parallel function: ", result No newline at end of file
@@ -0,0 +1,19 b''
1 from IPython.kernel import client
2
3 tc = client.TaskClient()
4
5 result = tc.map(lambda x: 2*x, range(10))
6 print "Simple, default map: ", result
7
8 m = tc.mapper(block=False, clear_after=True, clear_before=True)
9 tids = m.map(lambda x: 2*x, range(10))
10 print "Submitted tasks, got ids: ", tids
11 tc.barrier(tids)
12 result = [tc.get_task_result(tid) for tid in tids]
13 print "Using a mapper: ", result
14
15 @tc.parallel()
16 def f(x): return 2*x
17
18 result = f(range(10))
19 print "Using a parallel function: ", result No newline at end of file
@@ -1,41 +1,41 b''
1 1 # encoding: utf-8
2 2
3 3 """Asynchronous clients for the IPython controller.
4 4
5 5 This module has clients for using the various interfaces of the controller
6 6 in a fully asynchronous manner. This means that you will need to run the
7 7 Twisted reactor yourself and that all methods of the client classes return
8 8 deferreds to the result.
9 9
10 10 The main methods are are `get_*_client` and `get_client`.
11 11 """
12 12
13 13 __docformat__ = "restructuredtext en"
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Copyright (C) 2008 The IPython Development Team
17 17 #
18 18 # Distributed under the terms of the BSD License. The full license is in
19 19 # the file COPYING, distributed as part of this software.
20 20 #-------------------------------------------------------------------------------
21 21
22 22 #-------------------------------------------------------------------------------
23 23 # Imports
24 24 #-------------------------------------------------------------------------------
25 25
26 26 from IPython.kernel import codeutil
27 27 from IPython.kernel.clientconnector import ClientConnector
28 28
29 29 # Other things that the user will need
30 from IPython.kernel.task import Task
30 from IPython.kernel.task import MapTask, StringTask
31 31 from IPython.kernel.error import CompositeError
32 32
33 33 #-------------------------------------------------------------------------------
34 34 # Code
35 35 #-------------------------------------------------------------------------------
36 36
37 37 _client_tub = ClientConnector()
38 38 get_multiengine_client = _client_tub.get_multiengine_client
39 39 get_task_client = _client_tub.get_task_client
40 40 get_client = _client_tub.get_client
41 41
@@ -1,96 +1,96 b''
1 1 # encoding: utf-8
2 2
3 3 """This module contains blocking clients for the controller interfaces.
4 4
5 5 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 6 blocking. This means that methods on the clients return the actual results
7 7 rather than a deferred to the result. Also, we manage the Twisted reactor
8 8 for you. This is done by running the reactor in a thread.
9 9
10 10 The main classes in this module are:
11 11
12 12 * MultiEngineClient
13 13 * TaskClient
14 14 * Task
15 15 * CompositeError
16 16 """
17 17
18 18 __docformat__ = "restructuredtext en"
19 19
20 20 #-------------------------------------------------------------------------------
21 21 # Copyright (C) 2008 The IPython Development Team
22 22 #
23 23 # Distributed under the terms of the BSD License. The full license is in
24 24 # the file COPYING, distributed as part of this software.
25 25 #-------------------------------------------------------------------------------
26 26
27 27 #-------------------------------------------------------------------------------
28 28 # Imports
29 29 #-------------------------------------------------------------------------------
30 30
31 31 import sys
32 32
33 33 # from IPython.tools import growl
34 34 # growl.start("IPython1 Client")
35 35
36 36
37 37 from twisted.internet import reactor
38 38 from IPython.kernel.clientconnector import ClientConnector
39 39 from IPython.kernel.twistedutil import ReactorInThread
40 40 from IPython.kernel.twistedutil import blockingCallFromThread
41 41
42 42 # These enable various things
43 43 from IPython.kernel import codeutil
44 44 import IPython.kernel.magic
45 45
46 46 # Other things that the user will need
47 from IPython.kernel.task import Task
47 from IPython.kernel.task import MapTask, StringTask
48 48 from IPython.kernel.error import CompositeError
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Code
52 52 #-------------------------------------------------------------------------------
53 53
54 54 _client_tub = ClientConnector()
55 55
56 56
57 57 def get_multiengine_client(furl_or_file=''):
58 58 """Get the blocking MultiEngine client.
59 59
60 60 :Parameters:
61 61 furl_or_file : str
62 62 A furl or a filename containing a furl. If empty, the
63 63 default furl_file will be used
64 64
65 65 :Returns:
66 66 The connected MultiEngineClient instance
67 67 """
68 68 client = blockingCallFromThread(_client_tub.get_multiengine_client,
69 69 furl_or_file)
70 70 return client.adapt_to_blocking_client()
71 71
72 72 def get_task_client(furl_or_file=''):
73 73 """Get the blocking Task client.
74 74
75 75 :Parameters:
76 76 furl_or_file : str
77 77 A furl or a filename containing a furl. If empty, the
78 78 default furl_file will be used
79 79
80 80 :Returns:
81 81 The connected TaskClient instance
82 82 """
83 83 client = blockingCallFromThread(_client_tub.get_task_client,
84 84 furl_or_file)
85 85 return client.adapt_to_blocking_client()
86 86
87 87
88 88 MultiEngineClient = get_multiengine_client
89 89 TaskClient = get_task_client
90 90
91 91
92 92
93 93 # Now we start the reactor in a thread
94 94 rit = ReactorInThread()
95 95 rit.setDaemon(True)
96 96 rit.start() No newline at end of file
@@ -1,171 +1,171 b''
1 1 # encoding: utf-8
2 2
3 3 """Magic command interface for interactive parallel work."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 import new
19 19
20 20 from IPython.iplib import InteractiveShell
21 21 from IPython.Shell import MTInteractiveShell
22 22
23 23 from twisted.internet.defer import Deferred
24 24
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Definitions of magic functions for use with IPython
28 28 #-------------------------------------------------------------------------------
29 29
30 30 NO_ACTIVE_CONTROLLER = """
31 31 Error: No Controller is activated
32 32 Use activate() on a RemoteController object to activate it for magics.
33 33 """
34 34
35 35 def magic_result(self,parameter_s=''):
36 36 """Print the result of command i on all engines of the active controller.
37 37
38 38 To activate a controller in IPython, first create it and then call
39 39 the activate() method.
40 40
41 41 Then you can do the following:
42 42
43 43 >>> result # Print the latest result
44 44 Printing result...
45 45 [127.0.0.1:0] In [1]: b = 10
46 46 [127.0.0.1:1] In [1]: b = 10
47 47
48 48 >>> result 0 # Print result 0
49 49 In [14]: result 0
50 50 Printing result...
51 51 [127.0.0.1:0] In [0]: a = 5
52 52 [127.0.0.1:1] In [0]: a = 5
53 53 """
54 54 try:
55 55 activeController = __IPYTHON__.activeController
56 56 except AttributeError:
57 57 print NO_ACTIVE_CONTROLLER
58 58 else:
59 59 try:
60 60 index = int(parameter_s)
61 61 except:
62 62 index = None
63 63 result = activeController.get_result(index)
64 64 return result
65 65
66 66 def magic_px(self,parameter_s=''):
67 67 """Executes the given python command on the active IPython Controller.
68 68
69 69 To activate a Controller in IPython, first create it and then call
70 70 the activate() method.
71 71
72 72 Then you can do the following:
73 73
74 74 >>> %px a = 5 # Runs a = 5 on all nodes
75 75 """
76 76
77 77 try:
78 78 activeController = __IPYTHON__.activeController
79 79 except AttributeError:
80 80 print NO_ACTIVE_CONTROLLER
81 81 else:
82 print "Executing command on Controller"
82 print "Parallel execution on engines: %s" % activeController.targets
83 83 result = activeController.execute(parameter_s)
84 84 return result
85 85
86 86 def pxrunsource(self, source, filename="<input>", symbol="single"):
87 87
88 88 try:
89 89 code = self.compile(source, filename, symbol)
90 90 except (OverflowError, SyntaxError, ValueError):
91 91 # Case 1
92 92 self.showsyntaxerror(filename)
93 93 return None
94 94
95 95 if code is None:
96 96 # Case 2
97 97 return True
98 98
99 99 # Case 3
100 100 # Because autopx is enabled, we now call executeAll or disable autopx if
101 101 # %autopx or autopx has been called
102 102 if '_ip.magic("%autopx' in source or '_ip.magic("autopx' in source:
103 103 _disable_autopx(self)
104 104 return False
105 105 else:
106 106 try:
107 107 result = self.activeController.execute(source)
108 108 except:
109 109 self.showtraceback()
110 110 else:
111 111 print result.__repr__()
112 112 return False
113 113
114 114 def magic_autopx(self, parameter_s=''):
115 115 """Toggles auto parallel mode for the active IPython Controller.
116 116
117 117 To activate a Controller in IPython, first create it and then call
118 118 the activate() method.
119 119
120 120 Then you can do the following:
121 121
122 122 >>> %autopx # Now all commands are executed in parallel
123 123 Auto Parallel Enabled
124 124 Type %autopx to disable
125 125 ...
126 126 >>> %autopx # Now all commands are locally executed
127 127 Auto Parallel Disabled
128 128 """
129 129
130 130 if hasattr(self, 'autopx'):
131 131 if self.autopx == True:
132 132 _disable_autopx(self)
133 133 else:
134 134 _enable_autopx(self)
135 135 else:
136 136 _enable_autopx(self)
137 137
138 138 def _enable_autopx(self):
139 139 """Enable %autopx mode by saving the original runsource and installing
140 140 pxrunsource.
141 141 """
142 142 try:
143 143 activeController = __IPYTHON__.activeController
144 144 except AttributeError:
145 145 print "No active RemoteController found, use RemoteController.activate()."
146 146 else:
147 147 self._original_runsource = self.runsource
148 148 self.runsource = new.instancemethod(pxrunsource, self, self.__class__)
149 149 self.autopx = True
150 150 print "Auto Parallel Enabled\nType %autopx to disable"
151 151
152 152 def _disable_autopx(self):
153 153 """Disable %autopx by restoring the original runsource."""
154 154 if hasattr(self, 'autopx'):
155 155 if self.autopx == True:
156 156 self.runsource = self._original_runsource
157 157 self.autopx = False
158 158 print "Auto Parallel Disabled"
159 159
160 160 # Add the new magic function to the class dict:
161 161
162 162 InteractiveShell.magic_result = magic_result
163 163 InteractiveShell.magic_px = magic_px
164 164 InteractiveShell.magic_autopx = magic_autopx
165 165
166 166 # And remove the global name to keep global namespace clean. Don't worry, the
167 167 # copy bound to IPython stays, we're just removing the global name.
168 168 del magic_result
169 169 del magic_px
170 170 del magic_autopx
171 171
@@ -1,42 +1,233 b''
1 1 # encoding: utf-8
2 2
3 3 """A parallelized version of Python's builtin map."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 #-------------------------------------------------------------------------------
7 #----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #----------------------------------------------------------------------------
13 13
14 #-------------------------------------------------------------------------------
14 #----------------------------------------------------------------------------
15 15 # Imports
16 #-------------------------------------------------------------------------------
16 #----------------------------------------------------------------------------
17 17
18 18 from types import FunctionType
19 19 from zope.interface import Interface, implements
20 from IPython.kernel.task import MapTask
21 from IPython.kernel.twistedutil import DeferredList, gatherBoth
22 from IPython.kernel.util import printer
23 from IPython.kernel.error import collect_exceptions
24
25 #----------------------------------------------------------------------------
26 # Code
27 #----------------------------------------------------------------------------
20 28
21 29 class IMapper(Interface):
30 """The basic interface for a Mapper.
31
32 This defines a generic interface for mapping. The idea of this is
33 similar to that of Python's builtin `map` function, which applies a function
34 elementwise to a sequence.
35 """
36
37 def map(func, *seqs):
38 """Do map in parallel.
39
40 Equivalent to map(func, *seqs) or:
41
42 [func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
43
44 :Parameters:
45 func : FunctionType
46 The function to apply to the sequence
47 sequences : tuple of iterables
48 A sequence of iterables that are used for sucessive function
49 arguments. This work just like map
50 """
51
52 class IMultiEngineMapperFactory(Interface):
53 """
54 An interface for something that creates `IMapper` instances.
55 """
56
57 def mapper(dist='b', targets='all', block=True):
58 """
59 Create an `IMapper` implementer with a given set of arguments.
60
61 The `IMapper` created using a multiengine controller is
62 not load balanced.
63 """
64
65 class ITaskMapperFactory(Interface):
66 """
67 An interface for something that creates `IMapper` instances.
68 """
22 69
23 def __call__(func, *sequences):
24 """Do map in parallel."""
70 def mapper(clear_before=False, clear_after=False, retries=0,
71 recovery_task=None, depend=None, block=True):
72 """
73 Create an `IMapper` implementer with a given set of arguments.
74
75 The `IMapper` created using a task controller is load balanced.
76
77 See the documentation for `IPython.kernel.task.BaseTask` for
78 documentation on the arguments to this method.
79 """
25 80
26 class Mapper(object):
81
82 class MultiEngineMapper(object):
83 """
84 A Mapper for `IMultiEngine` implementers.
85 """
27 86
28 87 implements(IMapper)
29 88
30 89 def __init__(self, multiengine, dist='b', targets='all', block=True):
90 """
91 Create a Mapper for a multiengine.
92
93 The value of all arguments are used for all calls to `map`. This
94 class allows these arguemnts to be set for a series of map calls.
95
96 :Parameters:
97 multiengine : `IMultiEngine` implementer
98 The multiengine to use for running the map commands
99 dist : str
100 The type of decomposition to use. Only block ('b') is
101 supported currently
102 targets : (str, int, tuple of ints)
103 The engines to use in the map
104 block : boolean
105 Whether to block when the map is applied
106 """
31 107 self.multiengine = multiengine
32 108 self.dist = dist
33 109 self.targets = targets
34 110 self.block = block
35
36 def __call__(self, func, *sequences):
37 return self.map(func, *sequences)
38 111
39 112 def map(self, func, *sequences):
113 """
114 Apply func to *sequences elementwise. Like Python's builtin map.
115
116 This version is not load balanced.
117 """
118 max_len = max(len(s) for s in sequences)
119 for s in sequences:
120 if len(s)!=max_len:
121 raise ValueError('all sequences must have equal length')
40 122 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
41 return self.multiengine._map(func, sequences, dist=self.dist,
42 targets=self.targets, block=self.block) No newline at end of file
123 return self.multiengine.raw_map(func, sequences, dist=self.dist,
124 targets=self.targets, block=self.block)
125
126 class TaskMapper(object):
127 """
128 Make an `ITaskController` look like an `IMapper`.
129
130 This class provides a load balanced version of `map`.
131 """
132
133 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
134 recovery_task=None, depend=None, block=True):
135 """
136 Create a `IMapper` given a `TaskController` and arguments.
137
138 The additional arguments are those that are common to all types of
139 tasks and are described in the documentation for
140 `IPython.kernel.task.BaseTask`.
141
142 :Parameters:
143 task_controller : an `IBlockingTaskClient` implementer
144 The `TaskController` to use for calls to `map`
145 """
146 self.task_controller = task_controller
147 self.clear_before = clear_before
148 self.clear_after = clear_after
149 self.retries = retries
150 self.recovery_task = recovery_task
151 self.depend = depend
152 self.block = block
153
154 def map(self, func, *sequences):
155 """
156 Apply func to *sequences elementwise. Like Python's builtin map.
157
158 This version is load balanced.
159 """
160 max_len = max(len(s) for s in sequences)
161 for s in sequences:
162 if len(s)!=max_len:
163 raise ValueError('all sequences must have equal length')
164 task_args = zip(*sequences)
165 task_ids = []
166 dlist = []
167 for ta in task_args:
168 task = MapTask(func, ta, clear_before=self.clear_before,
169 clear_after=self.clear_after, retries=self.retries,
170 recovery_task=self.recovery_task, depend=self.depend)
171 dlist.append(self.task_controller.run(task))
172 dlist = gatherBoth(dlist, consumeErrors=1)
173 dlist.addCallback(collect_exceptions,'map')
174 if self.block:
175 def get_results(task_ids):
176 d = self.task_controller.barrier(task_ids)
177 d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
178 d.addCallback(collect_exceptions, 'map')
179 return d
180 dlist.addCallback(get_results)
181 return dlist
182
183 class SynchronousTaskMapper(object):
184 """
185 Make an `IBlockingTaskClient` look like an `IMapper`.
186
187 This class provides a load balanced version of `map`.
188 """
189
190 def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
191 recovery_task=None, depend=None, block=True):
192 """
193 Create a `IMapper` given a `IBlockingTaskClient` and arguments.
194
195 The additional arguments are those that are common to all types of
196 tasks and are described in the documentation for
197 `IPython.kernel.task.BaseTask`.
198
199 :Parameters:
200 task_controller : an `IBlockingTaskClient` implementer
201 The `TaskController` to use for calls to `map`
202 """
203 self.task_controller = task_controller
204 self.clear_before = clear_before
205 self.clear_after = clear_after
206 self.retries = retries
207 self.recovery_task = recovery_task
208 self.depend = depend
209 self.block = block
210
211 def map(self, func, *sequences):
212 """
213 Apply func to *sequences elementwise. Like Python's builtin map.
214
215 This version is load balanced.
216 """
217 max_len = max(len(s) for s in sequences)
218 for s in sequences:
219 if len(s)!=max_len:
220 raise ValueError('all sequences must have equal length')
221 task_args = zip(*sequences)
222 task_ids = []
223 for ta in task_args:
224 task = MapTask(func, ta, clear_before=self.clear_before,
225 clear_after=self.clear_after, retries=self.retries,
226 recovery_task=self.recovery_task, depend=self.depend)
227 task_ids.append(self.task_controller.run(task))
228 if self.block:
229 self.task_controller.barrier(task_ids)
230 task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
231 return task_results
232 else:
233 return task_ids No newline at end of file
@@ -1,743 +1,753 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3 3
4 4 """Adapt the IPython ControllerServer to IMultiEngine.
5 5
6 6 This module provides classes that adapt a ControllerService to the
7 7 IMultiEngine interface. This interface is a basic interactive interface
8 8 for working with a set of engines where it is desired to have explicit
9 9 access to each registered engine.
10 10
11 11 The classes here are exposed to the network in files like:
12 12
13 13 * multienginevanilla.py
14 14 * multienginepb.py
15 15 """
16 16
17 17 __docformat__ = "restructuredtext en"
18 18
19 19 #-------------------------------------------------------------------------------
20 20 # Copyright (C) 2008 The IPython Development Team
21 21 #
22 22 # Distributed under the terms of the BSD License. The full license is in
23 23 # the file COPYING, distributed as part of this software.
24 24 #-------------------------------------------------------------------------------
25 25
26 26 #-------------------------------------------------------------------------------
27 27 # Imports
28 28 #-------------------------------------------------------------------------------
29 29
30 30 from new import instancemethod
31 31 from types import FunctionType
32 32
33 33 from twisted.application import service
34 34 from twisted.internet import defer, reactor
35 35 from twisted.python import log, components, failure
36 36 from zope.interface import Interface, implements, Attribute
37 37
38 38 from IPython.tools import growl
39 39 from IPython.kernel.util import printer
40 40 from IPython.kernel.twistedutil import gatherBoth
41 41 from IPython.kernel import map as Map
42 42 from IPython.kernel import error
43 43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 44 from IPython.kernel.controllerservice import \
45 45 ControllerAdapterBase, \
46 46 ControllerService, \
47 47 IControllerBase
48 48
49 49
50 50 #-------------------------------------------------------------------------------
51 51 # Interfaces for the MultiEngine representation of a controller
52 52 #-------------------------------------------------------------------------------
53 53
54 54 class IEngineMultiplexer(Interface):
55 55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56 56
57 57 This class simply acts as a multiplexer of methods that are in the
58 58 various IEngines* interfaces. Thus the methods here are jut like those
59 59 in the IEngine* interfaces, but with an extra first argument, targets.
60 60 The targets argument can have the following forms:
61 61
62 62 * targets = 10 # Engines are indexed by ints
63 63 * targets = [0,1,2,3] # A list of ints
64 64 * targets = 'all' # A string to indicate all targets
65 65
66 66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 67 includes engines not being registered.
68 68
69 69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 70 with length equal to the number of targets. The elements of the list will
71 71 correspond to the return of the corresponding IEngine method.
72 72
73 73 Failures are aggressive, meaning that if an action fails for any target,
74 74 the overall action will fail immediately with that Failure.
75 75
76 76 :Parameters:
77 77 targets : int, list of ints, or 'all'
78 78 Engine ids the action will apply to.
79 79
80 80 :Returns: Deferred to a list of results for each engine.
81 81
82 82 :Exception:
83 83 InvalidEngineID
84 84 If the targets argument is bad or engines aren't registered.
85 85 NoEnginesRegistered
86 86 If there are no engines registered and targets='all'
87 87 """
88 88
89 89 #---------------------------------------------------------------------------
90 90 # Mutiplexed methods
91 91 #---------------------------------------------------------------------------
92 92
93 93 def execute(lines, targets='all'):
94 94 """Execute lines of Python code on targets.
95 95
96 96 See the class docstring for information about targets and possible
97 97 exceptions this method can raise.
98 98
99 99 :Parameters:
100 100 lines : str
101 101 String of python code to be executed on targets.
102 102 """
103 103
104 104 def push(namespace, targets='all'):
105 105 """Push dict namespace into the user's namespace on targets.
106 106
107 107 See the class docstring for information about targets and possible
108 108 exceptions this method can raise.
109 109
110 110 :Parameters:
111 111 namspace : dict
112 112 Dict of key value pairs to be put into the users namspace.
113 113 """
114 114
115 115 def pull(keys, targets='all'):
116 116 """Pull values out of the user's namespace on targets by keys.
117 117
118 118 See the class docstring for information about targets and possible
119 119 exceptions this method can raise.
120 120
121 121 :Parameters:
122 122 keys : tuple of strings
123 123 Sequence of keys to be pulled from user's namespace.
124 124 """
125 125
126 126 def push_function(namespace, targets='all'):
127 127 """"""
128 128
129 129 def pull_function(keys, targets='all'):
130 130 """"""
131 131
132 132 def get_result(i=None, targets='all'):
133 133 """Get the result for command i from targets.
134 134
135 135 See the class docstring for information about targets and possible
136 136 exceptions this method can raise.
137 137
138 138 :Parameters:
139 139 i : int or None
140 140 Command index or None to indicate most recent command.
141 141 """
142 142
143 143 def reset(targets='all'):
144 144 """Reset targets.
145 145
146 146 This clears the users namespace of the Engines, but won't cause
147 147 modules to be reloaded.
148 148 """
149 149
150 150 def keys(targets='all'):
151 151 """Get variable names defined in user's namespace on targets."""
152 152
153 153 def kill(controller=False, targets='all'):
154 154 """Kill the targets Engines and possibly the controller.
155 155
156 156 :Parameters:
157 157 controller : boolean
158 158 Should the controller be killed as well. If so all the
159 159 engines will be killed first no matter what targets is.
160 160 """
161 161
162 162 def push_serialized(namespace, targets='all'):
163 163 """Push a namespace of Serialized objects to targets.
164 164
165 165 :Parameters:
166 166 namespace : dict
167 167 A dict whose keys are the variable names and whose values
168 168 are serialized version of the objects.
169 169 """
170 170
171 171 def pull_serialized(keys, targets='all'):
172 172 """Pull Serialized objects by keys from targets.
173 173
174 174 :Parameters:
175 175 keys : tuple of strings
176 176 Sequence of variable names to pull as serialized objects.
177 177 """
178 178
179 179 def clear_queue(targets='all'):
180 180 """Clear the queue of pending command for targets."""
181 181
182 182 def queue_status(targets='all'):
183 183 """Get the status of the queue on the targets."""
184 184
185 185 def set_properties(properties, targets='all'):
186 186 """set properties by key and value"""
187 187
188 188 def get_properties(keys=None, targets='all'):
189 189 """get a list of properties by `keys`, if no keys specified, get all"""
190 190
191 191 def del_properties(keys, targets='all'):
192 192 """delete properties by `keys`"""
193 193
194 194 def has_properties(keys, targets='all'):
195 195 """get a list of bool values for whether `properties` has `keys`"""
196 196
197 197 def clear_properties(targets='all'):
198 198 """clear the properties dict"""
199 199
200 200
201 201 class IMultiEngine(IEngineMultiplexer):
202 202 """A controller that exposes an explicit interface to all of its engines.
203 203
204 204 This is the primary inteface for interactive usage.
205 205 """
206 206
207 207 def get_ids():
208 208 """Return list of currently registered ids.
209 209
210 210 :Returns: A Deferred to a list of registered engine ids.
211 211 """
212 212
213 213
214 214
215 215 #-------------------------------------------------------------------------------
216 216 # Implementation of the core MultiEngine classes
217 217 #-------------------------------------------------------------------------------
218 218
219 219 class MultiEngine(ControllerAdapterBase):
220 220 """The representation of a ControllerService as a IMultiEngine.
221 221
222 222 Although it is not implemented currently, this class would be where a
223 223 client/notification API is implemented. It could inherit from something
224 224 like results.NotifierParent and then use the notify method to send
225 225 notifications.
226 226 """
227 227
228 228 implements(IMultiEngine)
229 229
230 230 def __init(self, controller):
231 231 ControllerAdapterBase.__init__(self, controller)
232 232
233 233 #---------------------------------------------------------------------------
234 234 # Helper methods
235 235 #---------------------------------------------------------------------------
236 236
237 237 def engineList(self, targets):
238 238 """Parse the targets argument into a list of valid engine objects.
239 239
240 240 :Parameters:
241 241 targets : int, list of ints or 'all'
242 242 The targets argument to be parsed.
243 243
244 244 :Returns: List of engine objects.
245 245
246 246 :Exception:
247 247 InvalidEngineID
248 248 If targets is not valid or if an engine is not registered.
249 249 """
250 250 if isinstance(targets, int):
251 251 if targets not in self.engines.keys():
252 252 log.msg("Engine with id %i is not registered" % targets)
253 253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 254 else:
255 255 return [self.engines[targets]]
256 256 elif isinstance(targets, (list, tuple)):
257 257 for id in targets:
258 258 if id not in self.engines.keys():
259 259 log.msg("Engine with id %r is not registered" % id)
260 260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 261 return map(self.engines.get, targets)
262 262 elif targets == 'all':
263 263 eList = self.engines.values()
264 264 if len(eList) == 0:
265 265 msg = """There are no engines registered.
266 266 Check the logs in ~/.ipython/log if you think there should have been."""
267 267 raise error.NoEnginesRegistered(msg)
268 268 else:
269 269 return eList
270 270 else:
271 271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272 272
273 273 def _performOnEngines(self, methodName, *args, **kwargs):
274 274 """Calls a method on engines and returns deferred to list of results.
275 275
276 276 :Parameters:
277 277 methodName : str
278 278 Name of the method to be called.
279 279 targets : int, list of ints, 'all'
280 280 The targets argument to be parsed into a list of engine objects.
281 281 args
282 282 The positional keyword arguments to be passed to the engines.
283 283 kwargs
284 284 The keyword arguments passed to the method
285 285
286 286 :Returns: List of deferreds to the results on each engine
287 287
288 288 :Exception:
289 289 InvalidEngineID
290 290 If the targets argument is bad in any way.
291 291 AttributeError
292 292 If the method doesn't exist on one of the engines.
293 293 """
294 294 targets = kwargs.pop('targets')
295 295 log.msg("Performing %s on %r" % (methodName, targets))
296 296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 297 # This will and should raise if targets is not valid!
298 298 engines = self.engineList(targets)
299 299 dList = []
300 300 for e in engines:
301 301 meth = getattr(e, methodName, None)
302 302 if meth is not None:
303 303 dList.append(meth(*args, **kwargs))
304 304 else:
305 305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 306 return dList
307 307
308 308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 309 """Called _performOnEngines and wraps result/exception into deferred."""
310 310 try:
311 311 dList = self._performOnEngines(methodName, *args, **kwargs)
312 312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 313 return defer.fail(failure.Failure())
314 314 else:
315 315 # Having fireOnOneErrback is causing problems with the determinacy
316 316 # of the system. Basically, once a single engine has errbacked, this
317 317 # method returns. In some cases, this will cause client to submit
318 318 # another command. Because the previous command is still running
319 319 # on some engines, this command will be queued. When those commands
320 320 # then errback, the second command will raise QueueCleared. Ahhh!
321 321 d = gatherBoth(dList,
322 322 fireOnOneErrback=0,
323 323 consumeErrors=1,
324 324 logErrors=0)
325 325 d.addCallback(error.collect_exceptions, methodName)
326 326 return d
327 327
328 328 #---------------------------------------------------------------------------
329 329 # General IMultiEngine methods
330 330 #---------------------------------------------------------------------------
331 331
332 332 def get_ids(self):
333 333 return defer.succeed(self.engines.keys())
334 334
335 335 #---------------------------------------------------------------------------
336 336 # IEngineMultiplexer methods
337 337 #---------------------------------------------------------------------------
338 338
339 339 def execute(self, lines, targets='all'):
340 340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341 341
342 342 def push(self, ns, targets='all'):
343 343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344 344
345 345 def pull(self, keys, targets='all'):
346 346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347 347
348 348 def push_function(self, ns, targets='all'):
349 349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350 350
351 351 def pull_function(self, keys, targets='all'):
352 352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353 353
354 354 def get_result(self, i=None, targets='all'):
355 355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356 356
357 357 def reset(self, targets='all'):
358 358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359 359
360 360 def keys(self, targets='all'):
361 361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362 362
363 363 def kill(self, controller=False, targets='all'):
364 364 if controller:
365 365 targets = 'all'
366 366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 367 if controller:
368 368 log.msg("Killing controller")
369 369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 370 # Consume any weird stuff coming back
371 371 d.addBoth(lambda _: None)
372 372 return d
373 373
374 374 def push_serialized(self, namespace, targets='all'):
375 375 for k, v in namespace.iteritems():
376 376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 378 return d
379 379
380 380 def pull_serialized(self, keys, targets='all'):
381 381 try:
382 382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 384 return defer.fail(failure.Failure())
385 385 else:
386 386 for d in dList:
387 387 d.addCallback(self._logSizes)
388 388 d = gatherBoth(dList,
389 389 fireOnOneErrback=0,
390 390 consumeErrors=1,
391 391 logErrors=0)
392 392 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 393 return d
394 394
395 395 def _logSizes(self, listOfSerialized):
396 396 if isinstance(listOfSerialized, (list, tuple)):
397 397 for s in listOfSerialized:
398 398 log.msg("Pulled object is %f MB" % s.getDataSize())
399 399 else:
400 400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 401 return listOfSerialized
402 402
403 403 def clear_queue(self, targets='all'):
404 404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405 405
406 406 def queue_status(self, targets='all'):
407 407 log.msg("Getting queue status on %r" % targets)
408 408 try:
409 409 engines = self.engineList(targets)
410 410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 411 return defer.fail(failure.Failure())
412 412 else:
413 413 dList = []
414 414 for e in engines:
415 415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 416 d = gatherBoth(dList,
417 417 fireOnOneErrback=0,
418 418 consumeErrors=1,
419 419 logErrors=0)
420 420 d.addCallback(error.collect_exceptions, 'queue_status')
421 421 return d
422 422
423 423 def get_properties(self, keys=None, targets='all'):
424 424 log.msg("Getting properties on %r" % targets)
425 425 try:
426 426 engines = self.engineList(targets)
427 427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 428 return defer.fail(failure.Failure())
429 429 else:
430 430 dList = [e.get_properties(keys) for e in engines]
431 431 d = gatherBoth(dList,
432 432 fireOnOneErrback=0,
433 433 consumeErrors=1,
434 434 logErrors=0)
435 435 d.addCallback(error.collect_exceptions, 'get_properties')
436 436 return d
437 437
438 438 def set_properties(self, properties, targets='all'):
439 439 log.msg("Setting properties on %r" % targets)
440 440 try:
441 441 engines = self.engineList(targets)
442 442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 443 return defer.fail(failure.Failure())
444 444 else:
445 445 dList = [e.set_properties(properties) for e in engines]
446 446 d = gatherBoth(dList,
447 447 fireOnOneErrback=0,
448 448 consumeErrors=1,
449 449 logErrors=0)
450 450 d.addCallback(error.collect_exceptions, 'set_properties')
451 451 return d
452 452
453 453 def has_properties(self, keys, targets='all'):
454 454 log.msg("Checking properties on %r" % targets)
455 455 try:
456 456 engines = self.engineList(targets)
457 457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 458 return defer.fail(failure.Failure())
459 459 else:
460 460 dList = [e.has_properties(keys) for e in engines]
461 461 d = gatherBoth(dList,
462 462 fireOnOneErrback=0,
463 463 consumeErrors=1,
464 464 logErrors=0)
465 465 d.addCallback(error.collect_exceptions, 'has_properties')
466 466 return d
467 467
468 468 def del_properties(self, keys, targets='all'):
469 469 log.msg("Deleting properties on %r" % targets)
470 470 try:
471 471 engines = self.engineList(targets)
472 472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 473 return defer.fail(failure.Failure())
474 474 else:
475 475 dList = [e.del_properties(keys) for e in engines]
476 476 d = gatherBoth(dList,
477 477 fireOnOneErrback=0,
478 478 consumeErrors=1,
479 479 logErrors=0)
480 480 d.addCallback(error.collect_exceptions, 'del_properties')
481 481 return d
482 482
483 483 def clear_properties(self, targets='all'):
484 484 log.msg("Clearing properties on %r" % targets)
485 485 try:
486 486 engines = self.engineList(targets)
487 487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 488 return defer.fail(failure.Failure())
489 489 else:
490 490 dList = [e.clear_properties() for e in engines]
491 491 d = gatherBoth(dList,
492 492 fireOnOneErrback=0,
493 493 consumeErrors=1,
494 494 logErrors=0)
495 495 d.addCallback(error.collect_exceptions, 'clear_properties')
496 496 return d
497 497
498 498
499 499 components.registerAdapter(MultiEngine,
500 500 IControllerBase,
501 501 IMultiEngine)
502 502
503 503
504 504 #-------------------------------------------------------------------------------
505 505 # Interfaces for the Synchronous MultiEngine
506 506 #-------------------------------------------------------------------------------
507 507
508 508 class ISynchronousEngineMultiplexer(Interface):
509 509 pass
510 510
511 511
512 512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 513 """Synchronous, two-phase version of IMultiEngine.
514 514
515 515 Methods in this interface are identical to those of IMultiEngine, but they
516 516 take one additional argument:
517 517
518 518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519 519
520 520 :Parameters:
521 521 block : boolean
522 522 Should the method return a deferred to a deferredID or the
523 523 actual result. If block=False a deferred to a deferredID is
524 524 returned and the user must call `get_pending_deferred` at a later
525 525 point. If block=True, a deferred to the actual result comes back.
526 526 """
527 527 def get_pending_deferred(deferredID, block=True):
528 528 """"""
529 529
530 530 def clear_pending_deferreds():
531 531 """"""
532 532
533 533
534 534 #-------------------------------------------------------------------------------
535 535 # Implementation of the Synchronous MultiEngine
536 536 #-------------------------------------------------------------------------------
537 537
538 538 class SynchronousMultiEngine(PendingDeferredManager):
539 539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540 540
541 541 Warning, this class uses a decorator that currently uses **kwargs.
542 542 Because of this block must be passed as a kwarg, not positionally.
543 543 """
544 544
545 545 implements(ISynchronousMultiEngine)
546 546
547 547 def __init__(self, multiengine):
548 548 self.multiengine = multiengine
549 549 PendingDeferredManager.__init__(self)
550 550
551 551 #---------------------------------------------------------------------------
552 552 # Decorated pending deferred methods
553 553 #---------------------------------------------------------------------------
554 554
555 555 @two_phase
556 556 def execute(self, lines, targets='all'):
557 557 d = self.multiengine.execute(lines, targets)
558 558 return d
559 559
560 560 @two_phase
561 561 def push(self, namespace, targets='all'):
562 562 return self.multiengine.push(namespace, targets)
563 563
564 564 @two_phase
565 565 def pull(self, keys, targets='all'):
566 566 d = self.multiengine.pull(keys, targets)
567 567 return d
568 568
569 569 @two_phase
570 570 def push_function(self, namespace, targets='all'):
571 571 return self.multiengine.push_function(namespace, targets)
572 572
573 573 @two_phase
574 574 def pull_function(self, keys, targets='all'):
575 575 d = self.multiengine.pull_function(keys, targets)
576 576 return d
577 577
578 578 @two_phase
579 579 def get_result(self, i=None, targets='all'):
580 580 return self.multiengine.get_result(i, targets='all')
581 581
582 582 @two_phase
583 583 def reset(self, targets='all'):
584 584 return self.multiengine.reset(targets)
585 585
586 586 @two_phase
587 587 def keys(self, targets='all'):
588 588 return self.multiengine.keys(targets)
589 589
590 590 @two_phase
591 591 def kill(self, controller=False, targets='all'):
592 592 return self.multiengine.kill(controller, targets)
593 593
594 594 @two_phase
595 595 def push_serialized(self, namespace, targets='all'):
596 596 return self.multiengine.push_serialized(namespace, targets)
597 597
598 598 @two_phase
599 599 def pull_serialized(self, keys, targets='all'):
600 600 return self.multiengine.pull_serialized(keys, targets)
601 601
602 602 @two_phase
603 603 def clear_queue(self, targets='all'):
604 604 return self.multiengine.clear_queue(targets)
605 605
606 606 @two_phase
607 607 def queue_status(self, targets='all'):
608 608 return self.multiengine.queue_status(targets)
609 609
610 610 @two_phase
611 611 def set_properties(self, properties, targets='all'):
612 612 return self.multiengine.set_properties(properties, targets)
613 613
614 614 @two_phase
615 615 def get_properties(self, keys=None, targets='all'):
616 616 return self.multiengine.get_properties(keys, targets)
617 617
618 618 @two_phase
619 619 def has_properties(self, keys, targets='all'):
620 620 return self.multiengine.has_properties(keys, targets)
621 621
622 622 @two_phase
623 623 def del_properties(self, keys, targets='all'):
624 624 return self.multiengine.del_properties(keys, targets)
625 625
626 626 @two_phase
627 627 def clear_properties(self, targets='all'):
628 628 return self.multiengine.clear_properties(targets)
629 629
630 630 #---------------------------------------------------------------------------
631 631 # IMultiEngine methods
632 632 #---------------------------------------------------------------------------
633 633
634 634 def get_ids(self):
635 635 """Return a list of registered engine ids.
636 636
637 637 Never use the two phase block/non-block stuff for this.
638 638 """
639 639 return self.multiengine.get_ids()
640 640
641 641
642 642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643 643
644 644
645 645 #-------------------------------------------------------------------------------
646 646 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 647 #-------------------------------------------------------------------------------
648 648
649 649 #-------------------------------------------------------------------------------
650 650 # IMultiEngineCoordinator
651 651 #-------------------------------------------------------------------------------
652 652
653 653 class IMultiEngineCoordinator(Interface):
654 654 """Methods that work on multiple engines explicitly."""
655 655
656 656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 657 """Partition and distribute a sequence to targets."""
658 658
659 659 def gather(key, dist='b', targets='all'):
660 660 """Gather object key from targets."""
661 661
662 def _map(func, seq, dist='b', targets='all'):
663 """A parallelized version of Python's builtin map."""
664
665 def map(func, *sequences):
666 """Do a basic map with default for dist and targets."""
667
668 def mapper(dist='b', targets='all'):
669 """Create a mapper with dist and targets."""
670
671 def parallel(dist='b', targets='all'):
672 """A decorator that build a parallel function."""
662 def raw_map(func, seqs, dist='b', targets='all'):
663 """
664 A parallelized version of Python's builtin `map` function.
665
666 This has a slightly different syntax than the builtin `map`.
667 This is needed because we need to have keyword arguments and thus
668 can't use *args to capture all the sequences. Instead, they must
669 be passed in a list or tuple.
670
671 The equivalence is:
672
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674
675 Most users will want to use parallel functions or the `mapper`
676 and `map` methods for an API that follows that of the builtin
677 `map`.
678 """
673 679
674 680
675 681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
676 682 """Methods that work on multiple engines explicitly."""
677 683
678 684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
679 685 """Partition and distribute a sequence to targets."""
680 686
681 687 def gather(key, dist='b', targets='all', block=True):
682 688 """Gather object key from targets"""
683 689
684 def _map(func, sequences, dist='b', targets='all', block=True):
685 """Perform an actual map."""
686
687 def map(func, *sequences):
688 """Do a basic map with default for dist and targets."""
689
690 def mapper(dist='b', targets='all', block=True):
691 """Create a mapper with dist, targets and block."""
692
693 def parallel(dist='b', targets='all', block=True):
694 """A decorator that build a parallel function."""
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 """
692 A parallelized version of Python's builtin map.
693
694 This has a slightly different syntax than the builtin `map`.
695 This is needed because we need to have keyword arguments and thus
696 can't use *args to capture all the sequences. Instead, they must
697 be passed in a list or tuple.
698
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700
701 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
703 `map`.
704 """
695 705
696 706
697 707 #-------------------------------------------------------------------------------
698 708 # IMultiEngineExtras
699 709 #-------------------------------------------------------------------------------
700 710
701 711 class IMultiEngineExtras(Interface):
702 712
703 713 def zip_pull(targets, keys):
704 714 """
705 715 Pull, but return results in a different format from `pull`.
706 716
707 717 This method basically returns zip(pull(targets, *keys)), with a few
708 718 edge cases handled differently. Users of chainsaw will find this format
709 719 familiar.
710 720 """
711 721
712 722 def run(targets, fname):
713 723 """Run a .py file on targets."""
714 724
715 725
716 726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
717 727 def zip_pull(targets, keys, block=True):
718 728 """
719 729 Pull, but return results in a different format from `pull`.
720 730
721 731 This method basically returns zip(pull(targets, *keys)), with a few
722 732 edge cases handled differently. Users of chainsaw will find this format
723 733 familiar.
724 734 """
725 735
726 736 def run(targets, fname, block=True):
727 737 """Run a .py file on targets."""
728 738
729 739 #-------------------------------------------------------------------------------
730 740 # The full MultiEngine interface
731 741 #-------------------------------------------------------------------------------
732 742
733 743 class IFullMultiEngine(IMultiEngine,
734 744 IMultiEngineCoordinator,
735 745 IMultiEngineExtras):
736 746 pass
737 747
738 748
739 749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
740 750 ISynchronousMultiEngineCoordinator,
741 751 ISynchronousMultiEngineExtras):
742 752 pass
743 753
@@ -1,852 +1,896 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3 3
4 4 """General Classes for IMultiEngine clients."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22 import linecache
23 23
24 24 from twisted.internet import reactor
25 25 from twisted.python import components, log
26 26 from twisted.python.failure import Failure
27 27 from zope.interface import Interface, implements, Attribute
28 28
29 29 from IPython.ColorANSI import TermColors
30 30
31 31 from IPython.kernel.twistedutil import blockingCallFromThread
32 32 from IPython.kernel import error
33 33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import Mapper
34 from IPython.kernel.mapper import (
35 MultiEngineMapper,
36 IMultiEngineMapperFactory,
37 IMapper
38 )
35 39 from IPython.kernel import map as Map
36 40 from IPython.kernel import multiengine as me
37 41 from IPython.kernel.multiengine import (IFullMultiEngine,
38 42 IFullSynchronousMultiEngine)
39 43
40 44
41 45 #-------------------------------------------------------------------------------
42 46 # Pending Result things
43 47 #-------------------------------------------------------------------------------
44 48
45 49 class IPendingResult(Interface):
46 50 """A representation of a result that is pending.
47 51
48 52 This class is similar to Twisted's `Deferred` object, but is designed to be
49 53 used in a synchronous context.
50 54 """
51 55
52 56 result_id=Attribute("ID of the deferred on the other side")
53 57 client=Attribute("A client that I came from")
54 58 r=Attribute("An attribute that is a property that calls and returns get_result")
55 59
56 60 def get_result(default=None, block=True):
57 61 """
58 62 Get a result that is pending.
59 63
60 64 :Parameters:
61 65 default
62 66 The value to return if the result is not ready.
63 67 block : boolean
64 68 Should I block for the result.
65 69
66 70 :Returns: The actual result or the default value.
67 71 """
68 72
69 73 def add_callback(f, *args, **kwargs):
70 74 """
71 75 Add a callback that is called with the result.
72 76
73 77 If the original result is foo, adding a callback will cause
74 78 f(foo, *args, **kwargs) to be returned instead. If multiple
75 79 callbacks are registered, they are chained together: the result of
76 80 one is passed to the next and so on.
77 81
78 82 Unlike Twisted's Deferred object, there is no errback chain. Thus
79 83 any exception raised will not be caught and handled. User must
80 84 catch these by hand when calling `get_result`.
81 85 """
82 86
83 87
84 88 class PendingResult(object):
85 89 """A representation of a result that is not yet ready.
86 90
87 91 A user should not create a `PendingResult` instance by hand.
88 92
89 93 Methods
90 94 =======
91 95
92 96 * `get_result`
93 97 * `add_callback`
94 98
95 99 Properties
96 100 ==========
97 101 * `r`
98 102 """
99 103
100 104 def __init__(self, client, result_id):
101 105 """Create a PendingResult with a result_id and a client instance.
102 106
103 107 The client should implement `_getPendingResult(result_id, block)`.
104 108 """
105 109 self.client = client
106 110 self.result_id = result_id
107 111 self.called = False
108 112 self.raised = False
109 113 self.callbacks = []
110 114
111 115 def get_result(self, default=None, block=True):
112 116 """Get a result that is pending.
113 117
114 118 This method will connect to an IMultiEngine adapted controller
115 119 and see if the result is ready. If the action triggers an exception
116 120 raise it and record it. This method records the result/exception once it is
117 121 retrieved. Calling `get_result` again will get this cached result or will
118 122 re-raise the exception. The .r attribute is a property that calls
119 123 `get_result` with block=True.
120 124
121 125 :Parameters:
122 126 default
123 127 The value to return if the result is not ready.
124 128 block : boolean
125 129 Should I block for the result.
126 130
127 131 :Returns: The actual result or the default value.
128 132 """
129 133
130 134 if self.called:
131 135 if self.raised:
132 136 raise self.result[0], self.result[1], self.result[2]
133 137 else:
134 138 return self.result
135 139 try:
136 140 result = self.client.get_pending_deferred(self.result_id, block)
137 141 except error.ResultNotCompleted:
138 142 return default
139 143 except:
140 144 # Reraise other error, but first record them so they can be reraised
141 145 # later if .r or get_result is called again.
142 146 self.result = sys.exc_info()
143 147 self.called = True
144 148 self.raised = True
145 149 raise
146 150 else:
147 151 for cb in self.callbacks:
148 152 result = cb[0](result, *cb[1], **cb[2])
149 153 self.result = result
150 154 self.called = True
151 155 return result
152 156
153 157 def add_callback(self, f, *args, **kwargs):
154 158 """Add a callback that is called with the result.
155 159
156 160 If the original result is result, adding a callback will cause
157 161 f(result, *args, **kwargs) to be returned instead. If multiple
158 162 callbacks are registered, they are chained together: the result of
159 163 one is passed to the next and so on.
160 164
161 165 Unlike Twisted's Deferred object, there is no errback chain. Thus
162 166 any exception raised will not be caught and handled. User must
163 167 catch these by hand when calling `get_result`.
164 168 """
165 169 assert callable(f)
166 170 self.callbacks.append((f, args, kwargs))
167 171
168 172 def __cmp__(self, other):
169 173 if self.result_id < other.result_id:
170 174 return -1
171 175 else:
172 176 return 1
173 177
174 178 def _get_r(self):
175 179 return self.get_result(block=True)
176 180
177 181 r = property(_get_r)
178 182 """This property is a shortcut to a `get_result(block=True)`."""
179 183
180 184
181 185 #-------------------------------------------------------------------------------
182 186 # Pretty printing wrappers for certain lists
183 187 #-------------------------------------------------------------------------------
184 188
185 189 class ResultList(list):
186 190 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
187 191
188 192 def __repr__(self):
189 193 output = []
190 blue = TermColors.Blue
191 normal = TermColors.Normal
192 red = TermColors.Red
193 green = TermColors.Green
194 # These colored prompts were not working on Windows
195 if sys.platform == 'win32':
196 blue = normal = red = green = ''
197 else:
198 blue = TermColors.Blue
199 normal = TermColors.Normal
200 red = TermColors.Red
201 green = TermColors.Green
194 202 output.append("<Results List>\n")
195 203 for cmd in self:
196 204 if isinstance(cmd, Failure):
197 205 output.append(cmd)
198 206 else:
199 207 target = cmd.get('id',None)
200 208 cmd_num = cmd.get('number',None)
201 209 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
202 210 cmd_stdout = cmd.get('stdout', None)
203 211 cmd_stderr = cmd.get('stderr', None)
204 212 output.append("%s[%i]%s In [%i]:%s %s\n" % \
205 213 (green, target,
206 214 blue, cmd_num, normal, cmd_stdin))
207 215 if cmd_stdout:
208 216 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
209 217 (green, target,
210 218 red, cmd_num, normal, cmd_stdout))
211 219 if cmd_stderr:
212 220 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
213 221 (green, target,
214 222 red, cmd_num, normal, cmd_stderr))
215 223 return ''.join(output)
216 224
217 225
218 226 def wrapResultList(result):
219 227 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
220 228 if len(result) == 0:
221 229 result = [result]
222 230 return ResultList(result)
223 231
224 232
225 233 class QueueStatusList(list):
226 234 """A subclass of list that pretty prints the output of `queue_status`."""
227 235
228 236 def __repr__(self):
229 237 output = []
230 238 output.append("<Queue Status List>\n")
231 239 for e in self:
232 240 output.append("Engine: %s\n" % repr(e[0]))
233 241 output.append(" Pending: %s\n" % repr(e[1]['pending']))
234 242 for q in e[1]['queue']:
235 243 output.append(" Command: %s\n" % repr(q))
236 244 return ''.join(output)
237 245
238 246
239 247 #-------------------------------------------------------------------------------
240 248 # InteractiveMultiEngineClient
241 249 #-------------------------------------------------------------------------------
242 250
243 251 class InteractiveMultiEngineClient(object):
244 252 """A mixin class that add a few methods to a multiengine client.
245 253
246 254 The methods in this mixin class are designed for interactive usage.
247 255 """
248 256
249 257 def activate(self):
250 258 """Make this `MultiEngineClient` active for parallel magic commands.
251 259
252 260 IPython has a magic command syntax to work with `MultiEngineClient` objects.
253 261 In a given IPython session there is a single active one. While
254 262 there can be many `MultiEngineClient` created and used by the user,
255 263 there is only one active one. The active `MultiEngineClient` is used whenever
256 264 the magic commands %px and %autopx are used.
257 265
258 266 The activate() method is called on a given `MultiEngineClient` to make it
259 267 active. Once this has been done, the magic commands can be used.
260 268 """
261 269
262 270 try:
263 271 __IPYTHON__.activeController = self
264 272 except NameError:
265 273 print "The IPython Controller magics only work within IPython."
266 274
267 275 def __setitem__(self, key, value):
268 276 """Add a dictionary interface for pushing/pulling.
269 277
270 278 This functions as a shorthand for `push`.
271 279
272 280 :Parameters:
273 281 key : str
274 282 What to call the remote object.
275 283 value : object
276 284 The local Python object to push.
277 285 """
278 286 targets, block = self._findTargetsAndBlock()
279 287 return self.push({key:value}, targets=targets, block=block)
280 288
281 289 def __getitem__(self, key):
282 290 """Add a dictionary interface for pushing/pulling.
283 291
284 292 This functions as a shorthand to `pull`.
285 293
286 294 :Parameters:
287 295 - `key`: A string representing the key.
288 296 """
289 297 if isinstance(key, str):
290 298 targets, block = self._findTargetsAndBlock()
291 299 return self.pull(key, targets=targets, block=block)
292 300 else:
293 301 raise TypeError("__getitem__ only takes strs")
294 302
295 303 def __len__(self):
296 304 """Return the number of available engines."""
297 305 return len(self.get_ids())
298
299 def parallelize(self, func, targets=None, block=None):
300 """Build a `ParallelFunction` object for functionName on engines.
301
302 The returned object will implement a parallel version of functionName
303 that takes a local sequence as its only argument and calls (in
304 parallel) functionName on each element of that sequence. The
305 `ParallelFunction` object has a `targets` attribute that controls
306 which engines the function is run on.
307
308 :Parameters:
309 targets : int, list or 'all'
310 The engine ids the action will apply to. Call `get_ids` to see
311 a list of currently available engines.
312 functionName : str
313 A Python string that names a callable defined on the engines.
314
315 :Returns: A `ParallelFunction` object.
316
317 Examples
318 ========
319
320 >>> psin = rc.parallelize('all','lambda x:sin(x)')
321 >>> psin(range(10000))
322 [0,2,4,9,25,36,...]
323 """
324 targets, block = self._findTargetsAndBlock(targets, block)
325 return ParallelFunction(func, self, targets, block)
326
306
327 307 #---------------------------------------------------------------------------
328 308 # Make this a context manager for with
329 309 #---------------------------------------------------------------------------
330 310
331 311 def findsource_file(self,f):
332 312 linecache.checkcache()
333 313 s = findsource(f.f_code)
334 314 lnum = f.f_lineno
335 315 wsource = s[0][f.f_lineno:]
336 316 return strip_whitespace(wsource)
337 317
338 318 def findsource_ipython(self,f):
339 319 from IPython import ipapi
340 320 self.ip = ipapi.get()
341 321 wsource = [l+'\n' for l in
342 322 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
343 323 return strip_whitespace(wsource)
344 324
345 325 def __enter__(self):
346 326 f = sys._getframe(1)
347 327 local_ns = f.f_locals
348 328 global_ns = f.f_globals
349 329 if f.f_code.co_filename == '<ipython console>':
350 330 s = self.findsource_ipython(f)
351 331 else:
352 332 s = self.findsource_file(f)
353 333
354 334 self._with_context_result = self.execute(s)
355 335
356 336 def __exit__ (self, etype, value, tb):
357 337 if issubclass(etype,error.StopLocalExecution):
358 338 return True
359 339
360 340
361 341 def remote():
362 342 m = 'Special exception to stop local execution of parallel code.'
363 343 raise error.StopLocalExecution(m)
364 344
365 345 def strip_whitespace(source):
366 346 # Expand tabs to avoid any confusion.
367 347 wsource = [l.expandtabs(4) for l in source]
368 348 # Detect the indentation level
369 349 done = False
370 350 for line in wsource:
371 351 if line.isspace():
372 352 continue
373 353 for col,char in enumerate(line):
374 354 if char != ' ':
375 355 done = True
376 356 break
377 357 if done:
378 358 break
379 359 # Now we know how much leading space there is in the code. Next, we
380 360 # extract up to the first line that has less indentation.
381 361 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
382 362 # detect triple quoted strings that may have flush left text.
383 363 for lno,line in enumerate(wsource):
384 364 lead = line[:col]
385 365 if lead.isspace():
386 366 continue
387 367 else:
388 368 if not lead.lstrip().startswith('#'):
389 369 break
390 370 # The real 'with' source is up to lno
391 371 src_lines = [l[col:] for l in wsource[:lno+1]]
392 372
393 373 # Finally, check that the source's first non-comment line begins with the
394 374 # special call 'remote()'
395 375 for nline,line in enumerate(src_lines):
396 376 if line.isspace() or line.startswith('#'):
397 377 continue
398 378 if 'remote()' in line:
399 379 break
400 380 else:
401 381 raise ValueError('remote() call missing at the start of code')
402 382 src = ''.join(src_lines[nline+1:])
403 383 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
404 384 return src
405 385
406 386
407 387 #-------------------------------------------------------------------------------
408 388 # The top-level MultiEngine client adaptor
409 389 #-------------------------------------------------------------------------------
410 390
411 391
412 392 class IFullBlockingMultiEngineClient(Interface):
413 393 pass
414 394
415 395
416 396 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
417 397 """
418 398 A blocking client to the `IMultiEngine` controller interface.
419 399
420 400 This class allows users to use a set of engines for a parallel
421 401 computation through the `IMultiEngine` interface. In this interface,
422 402 each engine has a specific id (an int) that is used to refer to the
423 403 engine, run code on it, etc.
424 404 """
425 405
426 implements(IFullBlockingMultiEngineClient)
406 implements(
407 IFullBlockingMultiEngineClient,
408 IMultiEngineMapperFactory,
409 IMapper
410 )
427 411
428 412 def __init__(self, smultiengine):
429 413 self.smultiengine = smultiengine
430 414 self.block = True
431 415 self.targets = 'all'
432 416
433 417 def _findBlock(self, block=None):
434 418 if block is None:
435 419 return self.block
436 420 else:
437 421 if block in (True, False):
438 422 return block
439 423 else:
440 424 raise ValueError("block must be True or False")
441 425
442 426 def _findTargets(self, targets=None):
443 427 if targets is None:
444 428 return self.targets
445 429 else:
446 430 if not isinstance(targets, (str,list,tuple,int)):
447 431 raise ValueError("targets must be a str, list, tuple or int")
448 432 return targets
449 433
450 434 def _findTargetsAndBlock(self, targets=None, block=None):
451 435 return self._findTargets(targets), self._findBlock(block)
452 436
453 437 def _blockFromThread(self, function, *args, **kwargs):
454 438 block = kwargs.get('block', None)
455 439 if block is None:
456 440 raise error.MissingBlockArgument("'block' keyword argument is missing")
457 441 result = blockingCallFromThread(function, *args, **kwargs)
458 442 if not block:
459 443 result = PendingResult(self, result)
460 444 return result
461 445
462 446 def get_pending_deferred(self, deferredID, block):
463 447 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
464 448
465 449 def barrier(self, pendingResults):
466 450 """Synchronize a set of `PendingResults`.
467 451
468 452 This method is a synchronization primitive that waits for a set of
469 453 `PendingResult` objects to complete. More specifically, barier does
470 454 the following.
471 455
472 456 * The `PendingResult`s are sorted by result_id.
473 457 * The `get_result` method is called for each `PendingResult` sequentially
474 458 with block=True.
475 459 * If a `PendingResult` gets a result that is an exception, it is
476 460 trapped and can be re-raised later by calling `get_result` again.
477 461 * The `PendingResult`s are flushed from the controller.
478 462
479 463 After barrier has been called on a `PendingResult`, its results can
480 464 be retrieved by calling `get_result` again or accesing the `r` attribute
481 465 of the instance.
482 466 """
483 467
484 468 # Convert to list for sorting and check class type
485 469 prList = list(pendingResults)
486 470 for pr in prList:
487 471 if not isinstance(pr, PendingResult):
488 472 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
489 473
490 474 # Sort the PendingResults so they are in order
491 475 prList.sort()
492 476 # Block on each PendingResult object
493 477 for pr in prList:
494 478 try:
495 479 result = pr.get_result(block=True)
496 480 except Exception:
497 481 pass
498 482
499 483 def flush(self):
500 484 """
501 485 Clear all pending deferreds/results from the controller.
502 486
503 487 For each `PendingResult` that is created by this client, the controller
504 488 holds on to the result for that `PendingResult`. This can be a problem
505 489 if there are a large number of `PendingResult` objects that are created.
506 490
507 491 Once the result of the `PendingResult` has been retrieved, the result
508 492 is removed from the controller, but if a user doesn't get a result (
509 493 they just ignore the `PendingResult`) the result is kept forever on the
510 494 controller. This method allows the user to clear out all un-retrieved
511 495 results on the controller.
512 496 """
513 497 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
514 498 return r
515 499
516 500 clear_pending_results = flush
517 501
518 502 #---------------------------------------------------------------------------
519 503 # IEngineMultiplexer related methods
520 504 #---------------------------------------------------------------------------
521 505
522 506 def execute(self, lines, targets=None, block=None):
523 507 """
524 508 Execute code on a set of engines.
525 509
526 510 :Parameters:
527 511 lines : str
528 512 The Python code to execute as a string
529 513 targets : id or list of ids
530 514 The engine to use for the execution
531 515 block : boolean
532 516 If False, this method will return the actual result. If False,
533 517 a `PendingResult` is returned which can be used to get the result
534 518 at a later time.
535 519 """
536 520 targets, block = self._findTargetsAndBlock(targets, block)
537 521 result = blockingCallFromThread(self.smultiengine.execute, lines,
538 522 targets=targets, block=block)
539 523 if block:
540 524 result = ResultList(result)
541 525 else:
542 526 result = PendingResult(self, result)
543 527 result.add_callback(wrapResultList)
544 528 return result
545 529
546 530 def push(self, namespace, targets=None, block=None):
547 531 """
548 532 Push a dictionary of keys and values to engines namespace.
549 533
550 534 Each engine has a persistent namespace. This method is used to push
551 535 Python objects into that namespace.
552 536
553 537 The objects in the namespace must be pickleable.
554 538
555 539 :Parameters:
556 540 namespace : dict
557 541 A dict that contains Python objects to be injected into
558 542 the engine persistent namespace.
559 543 targets : id or list of ids
560 544 The engine to use for the execution
561 545 block : boolean
562 546 If False, this method will return the actual result. If False,
563 547 a `PendingResult` is returned which can be used to get the result
564 548 at a later time.
565 549 """
566 550 targets, block = self._findTargetsAndBlock(targets, block)
567 551 return self._blockFromThread(self.smultiengine.push, namespace,
568 552 targets=targets, block=block)
569 553
570 554 def pull(self, keys, targets=None, block=None):
571 555 """
572 556 Pull Python objects by key out of engines namespaces.
573 557
574 558 :Parameters:
575 559 keys : str or list of str
576 560 The names of the variables to be pulled
577 561 targets : id or list of ids
578 562 The engine to use for the execution
579 563 block : boolean
580 564 If False, this method will return the actual result. If False,
581 565 a `PendingResult` is returned which can be used to get the result
582 566 at a later time.
583 567 """
584 568 targets, block = self._findTargetsAndBlock(targets, block)
585 569 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
586 570
587 571 def push_function(self, namespace, targets=None, block=None):
588 572 """
589 573 Push a Python function to an engine.
590 574
591 575 This method is used to push a Python function to an engine. This
592 576 method can then be used in code on the engines. Closures are not supported.
593 577
594 578 :Parameters:
595 579 namespace : dict
596 580 A dict whose values are the functions to be pushed. The keys give
597 581 that names that the function will appear as in the engines
598 582 namespace.
599 583 targets : id or list of ids
600 584 The engine to use for the execution
601 585 block : boolean
602 586 If False, this method will return the actual result. If False,
603 587 a `PendingResult` is returned which can be used to get the result
604 588 at a later time.
605 589 """
606 590 targets, block = self._findTargetsAndBlock(targets, block)
607 591 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
608 592
609 593 def pull_function(self, keys, targets=None, block=None):
610 594 """
611 595 Pull a Python function from an engine.
612 596
613 597 This method is used to pull a Python function from an engine.
614 598 Closures are not supported.
615 599
616 600 :Parameters:
617 601 keys : str or list of str
618 602 The names of the functions to be pulled
619 603 targets : id or list of ids
620 604 The engine to use for the execution
621 605 block : boolean
622 606 If False, this method will return the actual result. If False,
623 607 a `PendingResult` is returned which can be used to get the result
624 608 at a later time.
625 609 """
626 610 targets, block = self._findTargetsAndBlock(targets, block)
627 611 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
628 612
629 613 def push_serialized(self, namespace, targets=None, block=None):
630 614 targets, block = self._findTargetsAndBlock(targets, block)
631 615 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
632 616
633 617 def pull_serialized(self, keys, targets=None, block=None):
634 618 targets, block = self._findTargetsAndBlock(targets, block)
635 619 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
636 620
637 621 def get_result(self, i=None, targets=None, block=None):
638 622 """
639 623 Get a previous result.
640 624
641 625 When code is executed in an engine, a dict is created and returned. This
642 626 method retrieves that dict for previous commands.
643 627
644 628 :Parameters:
645 629 i : int
646 630 The number of the result to get
647 631 targets : id or list of ids
648 632 The engine to use for the execution
649 633 block : boolean
650 634 If False, this method will return the actual result. If False,
651 635 a `PendingResult` is returned which can be used to get the result
652 636 at a later time.
653 637 """
654 638 targets, block = self._findTargetsAndBlock(targets, block)
655 639 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
656 640 if block:
657 641 result = ResultList(result)
658 642 else:
659 643 result = PendingResult(self, result)
660 644 result.add_callback(wrapResultList)
661 645 return result
662 646
663 647 def reset(self, targets=None, block=None):
664 648 """
665 649 Reset an engine.
666 650
667 651 This method clears out the namespace of an engine.
668 652
669 653 :Parameters:
670 654 targets : id or list of ids
671 655 The engine to use for the execution
672 656 block : boolean
673 657 If False, this method will return the actual result. If False,
674 658 a `PendingResult` is returned which can be used to get the result
675 659 at a later time.
676 660 """
677 661 targets, block = self._findTargetsAndBlock(targets, block)
678 662 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
679 663
680 664 def keys(self, targets=None, block=None):
681 665 """
682 666 Get a list of all the variables in an engine's namespace.
683 667
684 668 :Parameters:
685 669 targets : id or list of ids
686 670 The engine to use for the execution
687 671 block : boolean
688 672 If False, this method will return the actual result. If False,
689 673 a `PendingResult` is returned which can be used to get the result
690 674 at a later time.
691 675 """
692 676 targets, block = self._findTargetsAndBlock(targets, block)
693 677 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
694 678
695 679 def kill(self, controller=False, targets=None, block=None):
696 680 """
697 681 Kill the engines and controller.
698 682
699 683 This method is used to stop the engine and controller by calling
700 684 `reactor.stop`.
701 685
702 686 :Parameters:
703 687 controller : boolean
704 688 If True, kill the engines and controller. If False, just the
705 689 engines
706 690 targets : id or list of ids
707 691 The engine to use for the execution
708 692 block : boolean
709 693 If False, this method will return the actual result. If False,
710 694 a `PendingResult` is returned which can be used to get the result
711 695 at a later time.
712 696 """
713 697 targets, block = self._findTargetsAndBlock(targets, block)
714 698 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
715 699
716 700 def clear_queue(self, targets=None, block=None):
717 701 """
718 702 Clear out the controller's queue for an engine.
719 703
720 704 The controller maintains a queue for each engine. This clear it out.
721 705
722 706 :Parameters:
723 707 targets : id or list of ids
724 708 The engine to use for the execution
725 709 block : boolean
726 710 If False, this method will return the actual result. If False,
727 711 a `PendingResult` is returned which can be used to get the result
728 712 at a later time.
729 713 """
730 714 targets, block = self._findTargetsAndBlock(targets, block)
731 715 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
732 716
733 717 def queue_status(self, targets=None, block=None):
734 718 """
735 719 Get the status of an engines queue.
736 720
737 721 :Parameters:
738 722 targets : id or list of ids
739 723 The engine to use for the execution
740 724 block : boolean
741 725 If False, this method will return the actual result. If False,
742 726 a `PendingResult` is returned which can be used to get the result
743 727 at a later time.
744 728 """
745 729 targets, block = self._findTargetsAndBlock(targets, block)
746 730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
747 731
748 732 def set_properties(self, properties, targets=None, block=None):
749 733 targets, block = self._findTargetsAndBlock(targets, block)
750 734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
751 735
752 736 def get_properties(self, keys=None, targets=None, block=None):
753 737 targets, block = self._findTargetsAndBlock(targets, block)
754 738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
755 739
756 740 def has_properties(self, keys, targets=None, block=None):
757 741 targets, block = self._findTargetsAndBlock(targets, block)
758 742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
759 743
760 744 def del_properties(self, keys, targets=None, block=None):
761 745 targets, block = self._findTargetsAndBlock(targets, block)
762 746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
763 747
764 748 def clear_properties(self, targets=None, block=None):
765 749 targets, block = self._findTargetsAndBlock(targets, block)
766 750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
767 751
768 752 #---------------------------------------------------------------------------
769 753 # IMultiEngine related methods
770 754 #---------------------------------------------------------------------------
771 755
772 756 def get_ids(self):
773 757 """
774 758 Returns the ids of currently registered engines.
775 759 """
776 760 result = blockingCallFromThread(self.smultiengine.get_ids)
777 761 return result
778 762
779 763 #---------------------------------------------------------------------------
780 764 # IMultiEngineCoordinator
781 765 #---------------------------------------------------------------------------
782 766
783 767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
784 768 """
785 769 Partition a Python sequence and send the partitions to a set of engines.
786 770 """
787 771 targets, block = self._findTargetsAndBlock(targets, block)
788 772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
789 773 dist, flatten, targets=targets, block=block)
790 774
791 775 def gather(self, key, dist='b', targets=None, block=None):
792 776 """
793 777 Gather a partitioned sequence on a set of engines as a single local seq.
794 778 """
795 779 targets, block = self._findTargetsAndBlock(targets, block)
796 780 return self._blockFromThread(self.smultiengine.gather, key, dist,
797 781 targets=targets, block=block)
798 782
799 def _map(self, func, seq, dist='b', targets=None, block=None):
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
800 784 """
801 A parallelized version of Python's builtin map
785 A parallelized version of Python's builtin map.
786
787 This has a slightly different syntax than the builtin `map`.
788 This is needed because we need to have keyword arguments and thus
789 can't use *args to capture all the sequences. Instead, they must
790 be passed in a list or tuple.
791
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793
794 Most users will want to use parallel functions or the `mapper`
795 and `map` methods for an API that follows that of the builtin
796 `map`.
802 797 """
803 798 targets, block = self._findTargetsAndBlock(targets, block)
804 return self._blockFromThread(self.smultiengine._map, func, seq,
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
805 800 dist, targets=targets, block=block)
806 801
807 802 def map(self, func, *sequences):
808 return self.mapper()(func, *sequences)
803 """
804 A parallel version of Python's builtin `map` function.
805
806 This method applies a function to sequences of arguments. It
807 follows the same syntax as the builtin `map`.
808
809 This method creates a mapper objects by calling `self.mapper` with
810 no arguments and then uses that mapper to do the mapping. See
811 the documentation of `mapper` for more details.
812 """
813 return self.mapper().map(func, *sequences)
809 814
810 815 def mapper(self, dist='b', targets='all', block=None):
811 return Mapper(self, dist, targets, block)
816 """
817 Create a mapper object that has a `map` method.
818
819 This method returns an object that implements the `IMapper`
820 interface. This method is a factory that is used to control how
821 the map happens.
822
823 :Parameters:
824 dist : str
825 What decomposition to use, 'b' is the only one supported
826 currently
827 targets : str, int, sequence of ints
828 Which engines to use for the map
829 block : boolean
830 Should calls to `map` block or not
831 """
832 return MultiEngineMapper(self, dist, targets, block)
812 833
813 834 def parallel(self, dist='b', targets=None, block=None):
835 """
836 A decorator that turns a function into a parallel function.
837
838 This can be used as:
839
840 @parallel()
841 def f(x, y)
842 ...
843
844 f(range(10), range(10))
845
846 This causes f(0,0), f(1,1), ... to be called in parallel.
847
848 :Parameters:
849 dist : str
850 What decomposition to use, 'b' is the only one supported
851 currently
852 targets : str, int, sequence of ints
853 Which engines to use for the map
854 block : boolean
855 Should calls to `map` block or not
856 """
814 857 targets, block = self._findTargetsAndBlock(targets, block)
815 pf = ParallelFunction(self, dist=dist, targets=targets, block=block)
858 mapper = self.mapper(dist, targets, block)
859 pf = ParallelFunction(mapper)
816 860 return pf
817 861
818 862 #---------------------------------------------------------------------------
819 863 # IMultiEngineExtras
820 864 #---------------------------------------------------------------------------
821 865
822 866 def zip_pull(self, keys, targets=None, block=None):
823 867 targets, block = self._findTargetsAndBlock(targets, block)
824 868 return self._blockFromThread(self.smultiengine.zip_pull, keys,
825 869 targets=targets, block=block)
826 870
827 871 def run(self, filename, targets=None, block=None):
828 872 """
829 873 Run a Python code in a file on the engines.
830 874
831 875 :Parameters:
832 876 filename : str
833 877 The name of the local file to run
834 878 targets : id or list of ids
835 879 The engine to use for the execution
836 880 block : boolean
837 881 If False, this method will return the actual result. If False,
838 882 a `PendingResult` is returned which can be used to get the result
839 883 at a later time.
840 884 """
841 885 targets, block = self._findTargetsAndBlock(targets, block)
842 886 return self._blockFromThread(self.smultiengine.run, filename,
843 887 targets=targets, block=block)
844 888
845 889
846 890
847 891 components.registerAdapter(FullBlockingMultiEngineClient,
848 892 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
849 893
850 894
851 895
852 896
@@ -1,688 +1,757 b''
1 1 # encoding: utf-8
2 2
3 3 """
4 4 Expose the multiengine controller over the Foolscap network protocol.
5 5 """
6 6
7 7 __docformat__ = "restructuredtext en"
8 8
9 9 #-------------------------------------------------------------------------------
10 10 # Copyright (C) 2008 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-------------------------------------------------------------------------------
15 15
16 16 #-------------------------------------------------------------------------------
17 17 # Imports
18 18 #-------------------------------------------------------------------------------
19 19
20 20 import cPickle as pickle
21 21 from types import FunctionType
22 22
23 23 from zope.interface import Interface, implements
24 24 from twisted.internet import defer
25 25 from twisted.python import components, failure, log
26 26
27 27 from foolscap import Referenceable
28 28
29 29 from IPython.kernel import error
30 30 from IPython.kernel.util import printer
31 31 from IPython.kernel import map as Map
32 32 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.mapper import Mapper
33 from IPython.kernel.mapper import (
34 MultiEngineMapper,
35 IMultiEngineMapperFactory,
36 IMapper
37 )
34 38 from IPython.kernel.twistedutil import gatherBoth
35 39 from IPython.kernel.multiengine import (MultiEngine,
36 40 IMultiEngine,
37 41 IFullSynchronousMultiEngine,
38 42 ISynchronousMultiEngine)
39 43 from IPython.kernel.multiengineclient import wrapResultList
40 44 from IPython.kernel.pendingdeferred import PendingDeferredManager
41 45 from IPython.kernel.pickleutil import (can, canDict,
42 46 canSequence, uncan, uncanDict, uncanSequence)
43 47
44 48 from IPython.kernel.clientinterfaces import (
45 49 IFCClientInterfaceProvider,
46 50 IBlockingClientAdaptor
47 51 )
48 52
49 53 # Needed to access the true globals from __main__.__dict__
50 54 import __main__
51 55
52 56 #-------------------------------------------------------------------------------
53 57 # The Controller side of things
54 58 #-------------------------------------------------------------------------------
55 59
56 60 def packageResult(wrappedMethod):
57 61
58 62 def wrappedPackageResult(self, *args, **kwargs):
59 63 d = wrappedMethod(self, *args, **kwargs)
60 64 d.addCallback(self.packageSuccess)
61 65 d.addErrback(self.packageFailure)
62 66 return d
63 67 return wrappedPackageResult
64 68
65 69
66 70 class IFCSynchronousMultiEngine(Interface):
67 71 """Foolscap interface to `ISynchronousMultiEngine`.
68 72
69 73 The methods in this interface are similar to those of
70 74 `ISynchronousMultiEngine`, but their arguments and return values are pickled
71 75 if they are not already simple Python types that can be send over XML-RPC.
72 76
73 77 See the documentation of `ISynchronousMultiEngine` and `IMultiEngine` for
74 78 documentation about the methods.
75 79
76 80 Most methods in this interface act like the `ISynchronousMultiEngine`
77 81 versions and can be called in blocking or non-blocking mode.
78 82 """
79 83 pass
80 84
81 85
82 86 class FCSynchronousMultiEngineFromMultiEngine(Referenceable):
83 87 """Adapt `IMultiEngine` -> `ISynchronousMultiEngine` -> `IFCSynchronousMultiEngine`.
84 88 """
85 89
86 90 implements(IFCSynchronousMultiEngine, IFCClientInterfaceProvider)
87 91
88 92 addSlash = True
89 93
90 94 def __init__(self, multiengine):
91 95 # Adapt the raw multiengine to `ISynchronousMultiEngine` before saving
92 96 # it. This allow this class to do two adaptation steps.
93 97 self.smultiengine = ISynchronousMultiEngine(multiengine)
94 98 self._deferredIDCallbacks = {}
95 99
96 100 #---------------------------------------------------------------------------
97 101 # Non interface methods
98 102 #---------------------------------------------------------------------------
99 103
100 104 def packageFailure(self, f):
101 105 f.cleanFailure()
102 106 return self.packageSuccess(f)
103 107
104 108 def packageSuccess(self, obj):
105 109 serial = pickle.dumps(obj, 2)
106 110 return serial
107 111
108 112 #---------------------------------------------------------------------------
109 113 # Things related to PendingDeferredManager
110 114 #---------------------------------------------------------------------------
111 115
112 116 @packageResult
113 117 def remote_get_pending_deferred(self, deferredID, block):
114 118 d = self.smultiengine.get_pending_deferred(deferredID, block)
115 119 try:
116 120 callback = self._deferredIDCallbacks.pop(deferredID)
117 121 except KeyError:
118 122 callback = None
119 123 if callback is not None:
120 124 d.addCallback(callback[0], *callback[1], **callback[2])
121 125 return d
122 126
123 127 @packageResult
124 128 def remote_clear_pending_deferreds(self):
125 129 return defer.maybeDeferred(self.smultiengine.clear_pending_deferreds)
126 130
127 131 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
128 132 self._deferredIDCallbacks[did] = (callback, args, kwargs)
129 133 return did
130 134
131 135 #---------------------------------------------------------------------------
132 136 # IEngineMultiplexer related methods
133 137 #---------------------------------------------------------------------------
134 138
135 139 @packageResult
136 140 def remote_execute(self, lines, targets, block):
137 141 return self.smultiengine.execute(lines, targets=targets, block=block)
138 142
139 143 @packageResult
140 144 def remote_push(self, binaryNS, targets, block):
141 145 try:
142 146 namespace = pickle.loads(binaryNS)
143 147 except:
144 148 d = defer.fail(failure.Failure())
145 149 else:
146 150 d = self.smultiengine.push(namespace, targets=targets, block=block)
147 151 return d
148 152
149 153 @packageResult
150 154 def remote_pull(self, keys, targets, block):
151 155 d = self.smultiengine.pull(keys, targets=targets, block=block)
152 156 return d
153 157
154 158 @packageResult
155 159 def remote_push_function(self, binaryNS, targets, block):
156 160 try:
157 161 namespace = pickle.loads(binaryNS)
158 162 except:
159 163 d = defer.fail(failure.Failure())
160 164 else:
161 165 namespace = uncanDict(namespace)
162 166 d = self.smultiengine.push_function(namespace, targets=targets, block=block)
163 167 return d
164 168
165 169 def _canMultipleKeys(self, result):
166 170 return [canSequence(r) for r in result]
167 171
168 172 @packageResult
169 173 def remote_pull_function(self, keys, targets, block):
170 174 def can_functions(r, keys):
171 175 if len(keys)==1 or isinstance(keys, str):
172 176 result = canSequence(r)
173 177 elif len(keys)>1:
174 178 result = [canSequence(s) for s in r]
175 179 return result
176 180 d = self.smultiengine.pull_function(keys, targets=targets, block=block)
177 181 if block:
178 182 d.addCallback(can_functions, keys)
179 183 else:
180 184 d.addCallback(lambda did: self._addDeferredIDCallback(did, can_functions, keys))
181 185 return d
182 186
183 187 @packageResult
184 188 def remote_push_serialized(self, binaryNS, targets, block):
185 189 try:
186 190 namespace = pickle.loads(binaryNS)
187 191 except:
188 192 d = defer.fail(failure.Failure())
189 193 else:
190 194 d = self.smultiengine.push_serialized(namespace, targets=targets, block=block)
191 195 return d
192 196
193 197 @packageResult
194 198 def remote_pull_serialized(self, keys, targets, block):
195 199 d = self.smultiengine.pull_serialized(keys, targets=targets, block=block)
196 200 return d
197 201
198 202 @packageResult
199 203 def remote_get_result(self, i, targets, block):
200 204 if i == 'None':
201 205 i = None
202 206 return self.smultiengine.get_result(i, targets=targets, block=block)
203 207
204 208 @packageResult
205 209 def remote_reset(self, targets, block):
206 210 return self.smultiengine.reset(targets=targets, block=block)
207 211
208 212 @packageResult
209 213 def remote_keys(self, targets, block):
210 214 return self.smultiengine.keys(targets=targets, block=block)
211 215
212 216 @packageResult
213 217 def remote_kill(self, controller, targets, block):
214 218 return self.smultiengine.kill(controller, targets=targets, block=block)
215 219
216 220 @packageResult
217 221 def remote_clear_queue(self, targets, block):
218 222 return self.smultiengine.clear_queue(targets=targets, block=block)
219 223
220 224 @packageResult
221 225 def remote_queue_status(self, targets, block):
222 226 return self.smultiengine.queue_status(targets=targets, block=block)
223 227
224 228 @packageResult
225 229 def remote_set_properties(self, binaryNS, targets, block):
226 230 try:
227 231 ns = pickle.loads(binaryNS)
228 232 except:
229 233 d = defer.fail(failure.Failure())
230 234 else:
231 235 d = self.smultiengine.set_properties(ns, targets=targets, block=block)
232 236 return d
233 237
234 238 @packageResult
235 239 def remote_get_properties(self, keys, targets, block):
236 240 if keys=='None':
237 241 keys=None
238 242 return self.smultiengine.get_properties(keys, targets=targets, block=block)
239 243
240 244 @packageResult
241 245 def remote_has_properties(self, keys, targets, block):
242 246 return self.smultiengine.has_properties(keys, targets=targets, block=block)
243 247
244 248 @packageResult
245 249 def remote_del_properties(self, keys, targets, block):
246 250 return self.smultiengine.del_properties(keys, targets=targets, block=block)
247 251
248 252 @packageResult
249 253 def remote_clear_properties(self, targets, block):
250 254 return self.smultiengine.clear_properties(targets=targets, block=block)
251 255
252 256 #---------------------------------------------------------------------------
253 257 # IMultiEngine related methods
254 258 #---------------------------------------------------------------------------
255 259
256 260 def remote_get_ids(self):
257 261 """Get the ids of the registered engines.
258 262
259 263 This method always blocks.
260 264 """
261 265 return self.smultiengine.get_ids()
262 266
263 267 #---------------------------------------------------------------------------
264 268 # IFCClientInterfaceProvider related methods
265 269 #---------------------------------------------------------------------------
266 270
267 271 def remote_get_client_name(self):
268 272 return 'IPython.kernel.multienginefc.FCFullSynchronousMultiEngineClient'
269 273
270 274
271 275 # The __init__ method of `FCMultiEngineFromMultiEngine` first adapts the
272 276 # `IMultiEngine` to `ISynchronousMultiEngine` so this is actually doing a
273 277 # two phase adaptation.
274 278 components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine,
275 279 IMultiEngine, IFCSynchronousMultiEngine)
276 280
277 281
278 282 #-------------------------------------------------------------------------------
279 283 # The Client side of things
280 284 #-------------------------------------------------------------------------------
281 285
282 286
283 287 class FCFullSynchronousMultiEngineClient(object):
284 288
285 implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor)
289 implements(
290 IFullSynchronousMultiEngine,
291 IBlockingClientAdaptor,
292 IMultiEngineMapperFactory,
293 IMapper
294 )
286 295
287 296 def __init__(self, remote_reference):
288 297 self.remote_reference = remote_reference
289 298 self._deferredIDCallbacks = {}
290 299 # This class manages some pending deferreds through this instance. This
291 300 # is required for methods like gather/scatter as it enables us to
292 301 # create our own pending deferreds for composite operations.
293 302 self.pdm = PendingDeferredManager()
294 303
295 304 #---------------------------------------------------------------------------
296 305 # Non interface methods
297 306 #---------------------------------------------------------------------------
298 307
299 308 def unpackage(self, r):
300 309 return pickle.loads(r)
301 310
302 311 #---------------------------------------------------------------------------
303 312 # Things related to PendingDeferredManager
304 313 #---------------------------------------------------------------------------
305 314
306 315 def get_pending_deferred(self, deferredID, block=True):
307 316
308 317 # Because we are managing some pending deferreds locally (through
309 318 # self.pdm) and some remotely (on the controller), we first try the
310 319 # local one and then the remote one.
311 320 if self.pdm.quick_has_id(deferredID):
312 321 d = self.pdm.get_pending_deferred(deferredID, block)
313 322 return d
314 323 else:
315 324 d = self.remote_reference.callRemote('get_pending_deferred', deferredID, block)
316 325 d.addCallback(self.unpackage)
317 326 try:
318 327 callback = self._deferredIDCallbacks.pop(deferredID)
319 328 except KeyError:
320 329 callback = None
321 330 if callback is not None:
322 331 d.addCallback(callback[0], *callback[1], **callback[2])
323 332 return d
324 333
325 334 def clear_pending_deferreds(self):
326 335
327 336 # This clear both the local (self.pdm) and remote pending deferreds
328 337 self.pdm.clear_pending_deferreds()
329 338 d2 = self.remote_reference.callRemote('clear_pending_deferreds')
330 339 d2.addCallback(self.unpackage)
331 340 return d2
332 341
333 342 def _addDeferredIDCallback(self, did, callback, *args, **kwargs):
334 343 self._deferredIDCallbacks[did] = (callback, args, kwargs)
335 344 return did
336 345
337 346 #---------------------------------------------------------------------------
338 347 # IEngineMultiplexer related methods
339 348 #---------------------------------------------------------------------------
340 349
341 350 def execute(self, lines, targets='all', block=True):
342 351 d = self.remote_reference.callRemote('execute', lines, targets, block)
343 352 d.addCallback(self.unpackage)
344 353 return d
345 354
346 355 def push(self, namespace, targets='all', block=True):
347 356 serial = pickle.dumps(namespace, 2)
348 357 d = self.remote_reference.callRemote('push', serial, targets, block)
349 358 d.addCallback(self.unpackage)
350 359 return d
351 360
352 361 def pull(self, keys, targets='all', block=True):
353 362 d = self.remote_reference.callRemote('pull', keys, targets, block)
354 363 d.addCallback(self.unpackage)
355 364 return d
356 365
357 366 def push_function(self, namespace, targets='all', block=True):
358 367 cannedNamespace = canDict(namespace)
359 368 serial = pickle.dumps(cannedNamespace, 2)
360 369 d = self.remote_reference.callRemote('push_function', serial, targets, block)
361 370 d.addCallback(self.unpackage)
362 371 return d
363 372
364 373 def pull_function(self, keys, targets='all', block=True):
365 374 def uncan_functions(r, keys):
366 375 if len(keys)==1 or isinstance(keys, str):
367 376 return uncanSequence(r)
368 377 elif len(keys)>1:
369 378 return [uncanSequence(s) for s in r]
370 379 d = self.remote_reference.callRemote('pull_function', keys, targets, block)
371 380 if block:
372 381 d.addCallback(self.unpackage)
373 382 d.addCallback(uncan_functions, keys)
374 383 else:
375 384 d.addCallback(self.unpackage)
376 385 d.addCallback(lambda did: self._addDeferredIDCallback(did, uncan_functions, keys))
377 386 return d
378 387
379 388 def push_serialized(self, namespace, targets='all', block=True):
380 389 cannedNamespace = canDict(namespace)
381 390 serial = pickle.dumps(cannedNamespace, 2)
382 391 d = self.remote_reference.callRemote('push_serialized', serial, targets, block)
383 392 d.addCallback(self.unpackage)
384 393 return d
385 394
386 395 def pull_serialized(self, keys, targets='all', block=True):
387 396 d = self.remote_reference.callRemote('pull_serialized', keys, targets, block)
388 397 d.addCallback(self.unpackage)
389 398 return d
390 399
391 400 def get_result(self, i=None, targets='all', block=True):
392 401 if i is None: # This is because None cannot be marshalled by xml-rpc
393 402 i = 'None'
394 403 d = self.remote_reference.callRemote('get_result', i, targets, block)
395 404 d.addCallback(self.unpackage)
396 405 return d
397 406
398 407 def reset(self, targets='all', block=True):
399 408 d = self.remote_reference.callRemote('reset', targets, block)
400 409 d.addCallback(self.unpackage)
401 410 return d
402 411
403 412 def keys(self, targets='all', block=True):
404 413 d = self.remote_reference.callRemote('keys', targets, block)
405 414 d.addCallback(self.unpackage)
406 415 return d
407 416
408 417 def kill(self, controller=False, targets='all', block=True):
409 418 d = self.remote_reference.callRemote('kill', controller, targets, block)
410 419 d.addCallback(self.unpackage)
411 420 return d
412 421
413 422 def clear_queue(self, targets='all', block=True):
414 423 d = self.remote_reference.callRemote('clear_queue', targets, block)
415 424 d.addCallback(self.unpackage)
416 425 return d
417 426
418 427 def queue_status(self, targets='all', block=True):
419 428 d = self.remote_reference.callRemote('queue_status', targets, block)
420 429 d.addCallback(self.unpackage)
421 430 return d
422 431
423 432 def set_properties(self, properties, targets='all', block=True):
424 433 serial = pickle.dumps(properties, 2)
425 434 d = self.remote_reference.callRemote('set_properties', serial, targets, block)
426 435 d.addCallback(self.unpackage)
427 436 return d
428 437
429 438 def get_properties(self, keys=None, targets='all', block=True):
430 439 if keys==None:
431 440 keys='None'
432 441 d = self.remote_reference.callRemote('get_properties', keys, targets, block)
433 442 d.addCallback(self.unpackage)
434 443 return d
435 444
436 445 def has_properties(self, keys, targets='all', block=True):
437 446 d = self.remote_reference.callRemote('has_properties', keys, targets, block)
438 447 d.addCallback(self.unpackage)
439 448 return d
440 449
441 450 def del_properties(self, keys, targets='all', block=True):
442 451 d = self.remote_reference.callRemote('del_properties', keys, targets, block)
443 452 d.addCallback(self.unpackage)
444 453 return d
445 454
446 455 def clear_properties(self, targets='all', block=True):
447 456 d = self.remote_reference.callRemote('clear_properties', targets, block)
448 457 d.addCallback(self.unpackage)
449 458 return d
450 459
451 460 #---------------------------------------------------------------------------
452 461 # IMultiEngine related methods
453 462 #---------------------------------------------------------------------------
454 463
455 464 def get_ids(self):
456 465 d = self.remote_reference.callRemote('get_ids')
457 466 return d
458 467
459 468 #---------------------------------------------------------------------------
460 469 # ISynchronousMultiEngineCoordinator related methods
461 470 #---------------------------------------------------------------------------
462 471
463 472 def _process_targets(self, targets):
464 473 def create_targets(ids):
465 474 if isinstance(targets, int):
466 475 engines = [targets]
467 476 elif targets=='all':
468 477 engines = ids
469 478 elif isinstance(targets, (list, tuple)):
470 479 engines = targets
471 480 for t in engines:
472 481 if not t in ids:
473 482 raise error.InvalidEngineID("engine with id %r does not exist"%t)
474 483 return engines
475 484
476 485 d = self.get_ids()
477 486 d.addCallback(create_targets)
478 487 return d
479 488
480 489 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True):
481 490
482 491 # Note: scatter and gather handle pending deferreds locally through self.pdm.
483 492 # This enables us to collect a bunch fo deferred ids and make a secondary
484 493 # deferred id that corresponds to the entire group. This logic is extremely
485 494 # difficult to get right though.
486 495 def do_scatter(engines):
487 496 nEngines = len(engines)
488 497 mapClass = Map.dists[dist]
489 498 mapObject = mapClass()
490 499 d_list = []
491 500 # Loop through and push to each engine in non-blocking mode.
492 501 # This returns a set of deferreds to deferred_ids
493 502 for index, engineid in enumerate(engines):
494 503 partition = mapObject.getPartition(seq, index, nEngines)
495 504 if flatten and len(partition) == 1:
496 505 d = self.push({key: partition[0]}, targets=engineid, block=False)
497 506 else:
498 507 d = self.push({key: partition}, targets=engineid, block=False)
499 508 d_list.append(d)
500 509 # Collect the deferred to deferred_ids
501 510 d = gatherBoth(d_list,
502 511 fireOnOneErrback=0,
503 512 consumeErrors=1,
504 513 logErrors=0)
505 514 # Now d has a list of deferred_ids or Failures coming
506 515 d.addCallback(error.collect_exceptions, 'scatter')
507 516 def process_did_list(did_list):
508 517 """Turn a list of deferred_ids into a final result or failure."""
509 518 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
510 519 final_d = gatherBoth(new_d_list,
511 520 fireOnOneErrback=0,
512 521 consumeErrors=1,
513 522 logErrors=0)
514 523 final_d.addCallback(error.collect_exceptions, 'scatter')
515 524 final_d.addCallback(lambda lop: [i[0] for i in lop])
516 525 return final_d
517 526 # Now, depending on block, we need to handle the list deferred_ids
518 527 # coming down the pipe diferently.
519 528 if block:
520 529 # If we are blocking register a callback that will transform the
521 530 # list of deferred_ids into the final result.
522 531 d.addCallback(process_did_list)
523 532 return d
524 533 else:
525 534 # Here we are going to use a _local_ PendingDeferredManager.
526 535 deferred_id = self.pdm.get_deferred_id()
527 536 # This is the deferred we will return to the user that will fire
528 537 # with the local deferred_id AFTER we have received the list of
529 538 # primary deferred_ids
530 539 d_to_return = defer.Deferred()
531 540 def do_it(did_list):
532 541 """Produce a deferred to the final result, but first fire the
533 542 deferred we will return to the user that has the local
534 543 deferred id."""
535 544 d_to_return.callback(deferred_id)
536 545 return process_did_list(did_list)
537 546 d.addCallback(do_it)
538 547 # Now save the deferred to the final result
539 548 self.pdm.save_pending_deferred(d, deferred_id)
540 549 return d_to_return
541 550
542 551 d = self._process_targets(targets)
543 552 d.addCallback(do_scatter)
544 553 return d
545 554
546 555 def gather(self, key, dist='b', targets='all', block=True):
547 556
548 557 # Note: scatter and gather handle pending deferreds locally through self.pdm.
549 558 # This enables us to collect a bunch fo deferred ids and make a secondary
550 559 # deferred id that corresponds to the entire group. This logic is extremely
551 560 # difficult to get right though.
552 561 def do_gather(engines):
553 562 nEngines = len(engines)
554 563 mapClass = Map.dists[dist]
555 564 mapObject = mapClass()
556 565 d_list = []
557 566 # Loop through and push to each engine in non-blocking mode.
558 567 # This returns a set of deferreds to deferred_ids
559 568 for index, engineid in enumerate(engines):
560 569 d = self.pull(key, targets=engineid, block=False)
561 570 d_list.append(d)
562 571 # Collect the deferred to deferred_ids
563 572 d = gatherBoth(d_list,
564 573 fireOnOneErrback=0,
565 574 consumeErrors=1,
566 575 logErrors=0)
567 576 # Now d has a list of deferred_ids or Failures coming
568 577 d.addCallback(error.collect_exceptions, 'scatter')
569 578 def process_did_list(did_list):
570 579 """Turn a list of deferred_ids into a final result or failure."""
571 580 new_d_list = [self.get_pending_deferred(did, True) for did in did_list]
572 581 final_d = gatherBoth(new_d_list,
573 582 fireOnOneErrback=0,
574 583 consumeErrors=1,
575 584 logErrors=0)
576 585 final_d.addCallback(error.collect_exceptions, 'gather')
577 586 final_d.addCallback(lambda lop: [i[0] for i in lop])
578 587 final_d.addCallback(mapObject.joinPartitions)
579 588 return final_d
580 589 # Now, depending on block, we need to handle the list deferred_ids
581 590 # coming down the pipe diferently.
582 591 if block:
583 592 # If we are blocking register a callback that will transform the
584 593 # list of deferred_ids into the final result.
585 594 d.addCallback(process_did_list)
586 595 return d
587 596 else:
588 597 # Here we are going to use a _local_ PendingDeferredManager.
589 598 deferred_id = self.pdm.get_deferred_id()
590 599 # This is the deferred we will return to the user that will fire
591 600 # with the local deferred_id AFTER we have received the list of
592 601 # primary deferred_ids
593 602 d_to_return = defer.Deferred()
594 603 def do_it(did_list):
595 604 """Produce a deferred to the final result, but first fire the
596 605 deferred we will return to the user that has the local
597 606 deferred id."""
598 607 d_to_return.callback(deferred_id)
599 608 return process_did_list(did_list)
600 609 d.addCallback(do_it)
601 610 # Now save the deferred to the final result
602 611 self.pdm.save_pending_deferred(d, deferred_id)
603 612 return d_to_return
604 613
605 614 d = self._process_targets(targets)
606 615 d.addCallback(do_gather)
607 616 return d
608 617
609 def _map(self, func, sequences, dist='b', targets='all', block=True):
618 def raw_map(self, func, sequences, dist='b', targets='all', block=True):
610 619 """
611 Call a callable on elements of a sequence.
620 A parallelized version of Python's builtin map.
621
622 This has a slightly different syntax than the builtin `map`.
623 This is needed because we need to have keyword arguments and thus
624 can't use *args to capture all the sequences. Instead, they must
625 be passed in a list or tuple.
626
627 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
628
629 Most users will want to use parallel functions or the `mapper`
630 and `map` methods for an API that follows that of the builtin
631 `map`.
612 632 """
613 633 if not isinstance(sequences, (list, tuple)):
614 634 raise TypeError('sequences must be a list or tuple')
615 635 max_len = max(len(s) for s in sequences)
616 636 for s in sequences:
617 637 if len(s)!=max_len:
618 638 raise ValueError('all sequences must have equal length')
619 639 if isinstance(func, FunctionType):
620 640 d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False)
621 641 d.addCallback(lambda did: self.get_pending_deferred(did, True))
622 642 sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))'
623 643 elif isinstance(func, str):
624 644 d = defer.succeed(None)
625 645 sourceToRun = \
626 646 '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func
627 647 else:
628 648 raise TypeError("func must be a function or str")
629 649
630 650 d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets))
631 651 d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False))
632 652 d.addCallback(lambda did: self.get_pending_deferred(did, True))
633 653 d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block))
634 654 return d
635 655
636 656 def map(self, func, *sequences):
637 return self.mapper()(func, *sequences)
657 """
658 A parallel version of Python's builtin `map` function.
659
660 This method applies a function to sequences of arguments. It
661 follows the same syntax as the builtin `map`.
662
663 This method creates a mapper objects by calling `self.mapper` with
664 no arguments and then uses that mapper to do the mapping. See
665 the documentation of `mapper` for more details.
666 """
667 return self.mapper().map(func, *sequences)
638 668
639 669 def mapper(self, dist='b', targets='all', block=True):
640 return Mapper(self, dist, targets, block)
670 """
671 Create a mapper object that has a `map` method.
672
673 This method returns an object that implements the `IMapper`
674 interface. This method is a factory that is used to control how
675 the map happens.
676
677 :Parameters:
678 dist : str
679 What decomposition to use, 'b' is the only one supported
680 currently
681 targets : str, int, sequence of ints
682 Which engines to use for the map
683 block : boolean
684 Should calls to `map` block or not
685 """
686 return MultiEngineMapper(self, dist, targets, block)
641 687
642 688 def parallel(self, dist='b', targets='all', block=True):
643 pf = ParallelFunction(self, dist=dist, targets=targets, block=True)
689 """
690 A decorator that turns a function into a parallel function.
691
692 This can be used as:
693
694 @parallel()
695 def f(x, y)
696 ...
697
698 f(range(10), range(10))
699
700 This causes f(0,0), f(1,1), ... to be called in parallel.
701
702 :Parameters:
703 dist : str
704 What decomposition to use, 'b' is the only one supported
705 currently
706 targets : str, int, sequence of ints
707 Which engines to use for the map
708 block : boolean
709 Should calls to `map` block or not
710 """
711 mapper = self.mapper(dist, targets, block)
712 pf = ParallelFunction(mapper)
644 713 return pf
645 714
646 715 #---------------------------------------------------------------------------
647 716 # ISynchronousMultiEngineExtras related methods
648 717 #---------------------------------------------------------------------------
649 718
650 719 def _transformPullResult(self, pushResult, multitargets, lenKeys):
651 720 if not multitargets:
652 721 result = pushResult[0]
653 722 elif lenKeys > 1:
654 723 result = zip(*pushResult)
655 724 elif lenKeys is 1:
656 725 result = list(pushResult)
657 726 return result
658 727
659 728 def zip_pull(self, keys, targets='all', block=True):
660 729 multitargets = not isinstance(targets, int) and len(targets) > 1
661 730 lenKeys = len(keys)
662 731 d = self.pull(keys, targets=targets, block=block)
663 732 if block:
664 733 d.addCallback(self._transformPullResult, multitargets, lenKeys)
665 734 else:
666 735 d.addCallback(lambda did: self._addDeferredIDCallback(did, self._transformPullResult, multitargets, lenKeys))
667 736 return d
668 737
669 738 def run(self, fname, targets='all', block=True):
670 739 fileobj = open(fname,'r')
671 740 source = fileobj.read()
672 741 fileobj.close()
673 742 # if the compilation blows, we get a local error right away
674 743 try:
675 744 code = compile(source,fname,'exec')
676 745 except:
677 746 return defer.fail(failure.Failure())
678 747 # Now run the code
679 748 d = self.execute(source, targets=targets, block=block)
680 749 return d
681 750
682 751 #---------------------------------------------------------------------------
683 752 # IBlockingClientAdaptor related methods
684 753 #---------------------------------------------------------------------------
685 754
686 755 def adapt_to_blocking_client(self):
687 756 from IPython.kernel.multiengineclient import IFullBlockingMultiEngineClient
688 757 return IFullBlockingMultiEngineClient(self)
@@ -1,47 +1,107 b''
1 1 # encoding: utf-8
2 2
3 3 """A parallelized function that does scatter/execute/gather."""
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 7 #-------------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-------------------------------------------------------------------------------
13 13
14 14 #-------------------------------------------------------------------------------
15 15 # Imports
16 16 #-------------------------------------------------------------------------------
17 17
18 18 from types import FunctionType
19 19 from zope.interface import Interface, implements
20 20
21 21
22 class IMultiEngineParallelDecorator(Interface):
23 """A decorator that creates a parallel function."""
24
25 def parallel(dist='b', targets=None, block=None):
26 """
27 A decorator that turns a function into a parallel function.
28
29 This can be used as:
30
31 @parallel()
32 def f(x, y)
33 ...
34
35 f(range(10), range(10))
36
37 This causes f(0,0), f(1,1), ... to be called in parallel.
38
39 :Parameters:
40 dist : str
41 What decomposition to use, 'b' is the only one supported
42 currently
43 targets : str, int, sequence of ints
44 Which engines to use for the map
45 block : boolean
46 Should calls to `map` block or not
47 """
48
49 class ITaskParallelDecorator(Interface):
50 """A decorator that creates a parallel function."""
51
52 def parallel(clear_before=False, clear_after=False, retries=0,
53 recovery_task=None, depend=None, block=True):
54 """
55 A decorator that turns a function into a parallel function.
56
57 This can be used as:
58
59 @parallel()
60 def f(x, y)
61 ...
62
63 f(range(10), range(10))
64
65 This causes f(0,0), f(1,1), ... to be called in parallel.
66
67 See the documentation for `IPython.kernel.task.BaseTask` for
68 documentation on the arguments to this method.
69 """
70
71 class IParallelFunction(Interface):
72 pass
73
22 74 class ParallelFunction(object):
23 75 """
24 A decorator for building parallel functions.
76 The implementation of a parallel function.
77
78 A parallel function is similar to Python's map function:
79
80 map(func, *sequences) -> pfunc(*sequences)
81
82 Parallel functions should be created by using the @parallel decorator.
25 83 """
26 84
27 def __init__(self, multiengine, dist='b', targets='all', block=True):
85 implements(IParallelFunction)
86
87 def __init__(self, mapper):
28 88 """
29 Create a `ParallelFunction decorator`.
89 Create a parallel function from an `IMapper`.
90
91 :Parameters:
92 mapper : an `IMapper` implementer.
93 The mapper to use for the parallel function
30 94 """
31 self.multiengine = multiengine
32 self.dist = dist
33 self.targets = targets
34 self.block = block
95 self.mapper = mapper
35 96
36 97 def __call__(self, func):
37 98 """
38 Decorate the function to make it run in parallel.
99 Decorate a function to make it run in parallel.
39 100 """
40 101 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
41 102 self.func = func
42 103 def call_function(*sequences):
43 return self.multiengine._map(self.func, sequences, dist=self.dist,
44 targets=self.targets, block=self.block)
104 return self.mapper.map(self.func, *sequences)
45 105 return call_function
46 106
47 107 No newline at end of file
This diff has been collapsed as it changes many lines, (693 lines changed) Show them Hide them
@@ -1,830 +1,1113 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_task -*-
3 3
4 4 """Task farming representation of the ControllerService."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 #-------------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 14
15 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 16 # Imports
17 #-------------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18 18
19 19 import copy, time
20 20 from types import FunctionType
21 21
22 22 import zope.interface as zi, string
23 23 from twisted.internet import defer, reactor
24 24 from twisted.python import components, log, failure
25 25
26 # from IPython.genutils import time
27
26 from IPython.kernel.util import printer
28 27 from IPython.kernel import engineservice as es, error
29 28 from IPython.kernel import controllerservice as cs
30 29 from IPython.kernel.twistedutil import gatherBoth, DeferredList
31 30
32 from IPython.kernel.pickleutil import can,uncan, CannedFunction
33
34 def can_task(task):
35 t = copy.copy(task)
36 t.depend = can(t.depend)
37 t.expression = can(t.expression)
38 if t.recovery_task:
39 t.recovery_task = can_task(t.recovery_task)
40 return t
31 from IPython.kernel.pickleutil import can, uncan, CannedFunction
41 32
42 def uncan_task(task):
43 t = copy.copy(task)
44 t.depend = uncan(t.depend)
45 t.expression = uncan(t.expression)
46 if t.recovery_task and t.recovery_task is not task:
47 t.recovery_task = uncan_task(t.recovery_task)
48 return t
33 #-----------------------------------------------------------------------------
34 # Definition of the Task objects
35 #-----------------------------------------------------------------------------
49 36
50 37 time_format = '%Y/%m/%d %H:%M:%S'
51 38
52 class Task(object):
53 """Our representation of a task for the `TaskController` interface.
54
55 The user should create instances of this class to represent a task that
56 needs to be done.
57
58 :Parameters:
59 expression : str
60 A str that is valid python code that is the task.
61 pull : str or list of str
62 The names of objects to be pulled as results. If not specified,
63 will return {'result', None}
64 push : dict
65 A dict of objects to be pushed into the engines namespace before
66 execution of the expression.
67 clear_before : boolean
68 Should the engine's namespace be cleared before the task is run.
69 Default=False.
70 clear_after : boolean
71 Should the engine's namespace be cleared after the task is run.
72 Default=False.
73 retries : int
74 The number of times to resumbit the task if it fails. Default=0.
75 recovery_task : Task
76 This is the Task to be run when the task has exhausted its retries
77 Default=None.
78 depend : bool function(properties)
79 This is the dependency function for the Task, which determines
80 whether a task can be run on a Worker. `depend` is called with
81 one argument, the worker's properties dict, and should return
82 True if the worker meets the dependencies or False if it does
83 not.
84 Default=None - run on any worker
85 options : dict
86 Any other keyword options for more elaborate uses of tasks
87
88 Examples
89 --------
39 class ITask(zi.Interface):
40 """
41 This interface provides a generic definition of what constitutes a task.
42
43 There are two sides to a task. First a task needs to take input from
44 a user to determine what work is performed by the task. Second, the
45 task needs to have the logic that knows how to turn that information
46 info specific calls to a worker, through the `IQueuedEngine` interface.
90 47
91 >>> t = Task('dostuff(args)')
92 >>> t = Task('a=5', pull='a')
93 >>> t = Task('a=5\nb=4', pull=['a','b'])
94 >>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea
95 # A dependency case:
96 >>> def hasMPI(props):
97 ... return props.get('mpi') is not None
98 >>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
48 Many method in this class get two things passed to them: a Deferred
49 and an IQueuedEngine implementer. Such methods should register callbacks
50 on the Deferred that use the IQueuedEngine to accomplish something. See
51 the existing task objects for examples.
99 52 """
100 53
101 def __init__(self, expression, args=None, kwargs=None, pull=None, push=None,
102 clear_before=False, clear_after=False, retries=0,
103 recovery_task=None, depend=None, **options):
104 self.expression = expression
54 zi.Attribute('retries','How many times to retry the task')
55 zi.Attribute('recovery_task','A task to try if the initial one fails')
56 zi.Attribute('taskid','the id of the task')
57
58 def start_time(result):
59 """
60 Do anything needed to start the timing of the task.
61
62 Must simply return the result after starting the timers.
63 """
64
65 def stop_time(result):
66 """
67 Do anything needed to stop the timing of the task.
68
69 Must simply return the result after stopping the timers. This
70 method will usually set attributes that are used by `process_result`
71 in building result of the task.
72 """
73
74 def pre_task(d, queued_engine):
75 """Do something with the queued_engine before the task is run.
76
77 This method should simply add callbacks to the input Deferred
78 that do something with the `queued_engine` before the task is run.
79
80 :Parameters:
81 d : Deferred
82 The deferred that actions should be attached to
83 queued_engine : IQueuedEngine implementer
84 The worker that has been allocated to perform the task
85 """
86
87 def post_task(d, queued_engine):
88 """Do something with the queued_engine after the task is run.
89
90 This method should simply add callbacks to the input Deferred
91 that do something with the `queued_engine` before the task is run.
92
93 :Parameters:
94 d : Deferred
95 The deferred that actions should be attached to
96 queued_engine : IQueuedEngine implementer
97 The worker that has been allocated to perform the task
98 """
99
100 def submit_task(d, queued_engine):
101 """Submit a task using the `queued_engine` we have been allocated.
102
103 When a task is ready to run, this method is called. This method
104 must take the internal information of the task and make suitable
105 calls on the queued_engine to have the actual work done.
106
107 This method should simply add callbacks to the input Deferred
108 that do something with the `queued_engine` before the task is run.
109
110 :Parameters:
111 d : Deferred
112 The deferred that actions should be attached to
113 queued_engine : IQueuedEngine implementer
114 The worker that has been allocated to perform the task
115 """
116
117 def process_result(d, result, engine_id):
118 """Take a raw task result.
119
120 Objects that implement `ITask` can choose how the result of running
121 the task is presented. This method takes the raw result and
122 does this logic. Two example are the `MapTask` which simply returns
123 the raw result or a `Failure` object and the `StringTask` which
124 returns a `TaskResult` object.
125
126 :Parameters:
127 d : Deferred
128 The deferred that actions should be attached to
129 result : object
130 The raw task result that needs to be wrapped
131 engine_id : int
132 The id of the engine that did the task
133
134 :Returns:
135 The result, as a tuple of the form: (success, result).
136 Here, success is a boolean indicating if the task
137 succeeded or failed and result is the result.
138 """
139
140 def check_depend(properties):
141 """Check properties to see if the task should be run.
142
143 :Parameters:
144 properties : dict
145 A dictionary of properties that an engine has set
146
147 :Returns:
148 True if the task should be run, False otherwise
149 """
150
151 def can_task(self):
152 """Serialize (can) any functions in the task for pickling.
153
154 Subclasses must override this method and make sure that all
155 functions in the task are canned by calling `can` on the
156 function.
157 """
158
159 def uncan_task(self):
160 """Unserialize (uncan) any canned function in the task."""
161
162 class BaseTask(object):
163 """
164 Common fuctionality for all objects implementing `ITask`.
165 """
166
167 zi.implements(ITask)
168
169 def __init__(self, clear_before=False, clear_after=False, retries=0,
170 recovery_task=None, depend=None):
171 """
172 Make a generic task.
173
174 :Parameters:
175 clear_before : boolean
176 Should the engines namespace be cleared before the task
177 is run
178 clear_after : boolean
179 Should the engines namespace be clear after the task is run
180 retries : int
181 The number of times a task should be retries upon failure
182 recovery_task : any task object
183 If a task fails and it has a recovery_task, that is run
184 upon a retry
185 depend : FunctionType
186 A function that is called to test for properties. This function
187 must take one argument, the properties dict and return a boolean
188 """
189 self.clear_before = clear_before
190 self.clear_after = clear_after
191 self.retries = retries
192 self.recovery_task = recovery_task
193 self.depend = depend
194 self.taskid = None
195
196 def start_time(self, result):
197 """
198 Start the basic timers.
199 """
200 self.start = time.time()
201 self.start_struct = time.localtime()
202 return result
203
204 def stop_time(self, result):
205 """
206 Stop the basic timers.
207 """
208 self.stop = time.time()
209 self.stop_struct = time.localtime()
210 self.duration = self.stop - self.start
211 self.submitted = time.strftime(time_format, self.start_struct)
212 self.completed = time.strftime(time_format)
213 return result
214
215 def pre_task(self, d, queued_engine):
216 """
217 Clear the engine before running the task if clear_before is set.
218 """
219 if self.clear_before:
220 d.addCallback(lambda r: queued_engine.reset())
221
222 def post_task(self, d, queued_engine):
223 """
224 Clear the engine after running the task if clear_after is set.
225 """
226 def reseter(result):
227 queued_engine.reset()
228 return result
229 if self.clear_after:
230 d.addBoth(reseter)
231
232 def submit_task(self, d, queued_engine):
233 raise NotImplementedError('submit_task must be implemented in a subclass')
234
235 def process_result(self, result, engine_id):
236 """
237 Process a task result.
238
239 This is the default `process_result` that just returns the raw
240 result or a `Failure`.
241 """
242 if isinstance(result, failure.Failure):
243 return (False, result)
244 else:
245 return (True, result)
246
247 def check_depend(self, properties):
248 """
249 Calls self.depend(properties) to see if a task should be run.
250 """
251 if self.depend is not None:
252 return self.depend(properties)
253 else:
254 return True
255
256 def can_task(self):
257 self.depend = can(self.depend)
258 if isinstance(self.recovery_task, BaseTask):
259 self.recovery_task.can_task()
260
261 def uncan_task(self):
262 self.depend = uncan(self.depend)
263 if isinstance(self.recovery_task, BaseTask):
264 self.recovery_task.uncan_task()
265
266 class MapTask(BaseTask):
267 """
268 A task that consists of a function and arguments.
269 """
270
271 zi.implements(ITask)
272
273 def __init__(self, function, args=None, kwargs=None, clear_before=False,
274 clear_after=False, retries=0, recovery_task=None, depend=None):
275 """
276 Create a task based on a function, args and kwargs.
277
278 This is a simple type of task that consists of calling:
279 function(*args, **kwargs) and wrapping the result in a `TaskResult`.
280
281 The return value of the function, or a `Failure` wrapping an
282 exception is the task result for this type of task.
283 """
284 BaseTask.__init__(self, clear_before, clear_after, retries,
285 recovery_task, depend)
286 if not isinstance(function, FunctionType):
287 raise TypeError('a task function must be a FunctionType')
288 self.function = function
105 289 if args is None:
106 290 self.args = ()
107 291 else:
108 292 self.args = args
293 if not isinstance(self.args, (list, tuple)):
294 raise TypeError('a task args must be a list or tuple')
109 295 if kwargs is None:
110 296 self.kwargs = {}
111 297 else:
112 298 self.kwargs = kwargs
113 if isinstance(pull, str):
114 self.pull = [pull]
115 else:
299 if not isinstance(self.kwargs, dict):
300 raise TypeError('a task kwargs must be a dict')
301
302 def submit_task(self, d, queued_engine):
303 d.addCallback(lambda r: queued_engine.push_function(
304 dict(_ipython_task_function=self.function))
305 )
306 d.addCallback(lambda r: queued_engine.push(
307 dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs))
308 )
309 d.addCallback(lambda r: queued_engine.execute(
310 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
311 )
312 d.addCallback(lambda r: queued_engine.pull('_ipython_task_result'))
313
314 def can_task(self):
315 self.function = can(self.function)
316 BaseTask.can_task(self)
317
318 def uncan_task(self):
319 self.function = uncan(self.function)
320 BaseTask.uncan_task(self)
321
322
323 class StringTask(BaseTask):
324 """
325 A task that consists of a string of Python code to run.
326 """
327
328 def __init__(self, expression, pull=None, push=None,
329 clear_before=False, clear_after=False, retries=0,
330 recovery_task=None, depend=None):
331 """
332 Create a task based on a Python expression and variables
333
334 This type of task lets you push a set of variables to the engines
335 namespace, run a Python string in that namespace and then bring back
336 a different set of Python variables as the result.
337
338 Because this type of task can return many results (through the
339 `pull` keyword argument) it returns a special `TaskResult` object
340 that wraps the pulled variables, statistics about the run and
341 any exceptions raised.
342 """
343 if not isinstance(expression, str):
344 raise TypeError('a task expression must be a string')
345 self.expression = expression
346
347 if pull==None:
348 self.pull = ()
349 elif isinstance(pull, str):
350 self.pull = (pull,)
351 elif isinstance(pull, (list, tuple)):
116 352 self.pull = pull
117 self.push = push
118 self.clear_before = clear_before
119 self.clear_after = clear_after
120 self.retries=retries
121 self.recovery_task = recovery_task
122 self.depend = depend
123 self.options = options
124 self.taskid = None
353 else:
354 raise TypeError('pull must be str or a sequence of strs')
355
356 if push==None:
357 self.push = {}
358 elif isinstance(push, dict):
359 self.push = push
360 else:
361 raise TypeError('push must be a dict')
362
363 BaseTask.__init__(self, clear_before, clear_after, retries,
364 recovery_task, depend)
365
366 def submit_task(self, d, queued_engine):
367 if self.push is not None:
368 d.addCallback(lambda r: queued_engine.push(self.push))
369
370 d.addCallback(lambda r: queued_engine.execute(self.expression))
371
372 if self.pull is not None:
373 d.addCallback(lambda r: queued_engine.pull(self.pull))
374 else:
375 d.addCallback(lambda r: None)
376
377 def process_result(self, result, engine_id):
378 if isinstance(result, failure.Failure):
379 tr = TaskResult(result, engine_id)
380 else:
381 if self.pull is None:
382 resultDict = {}
383 elif len(self.pull) == 1:
384 resultDict = {self.pull[0]:result}
385 else:
386 resultDict = dict(zip(self.pull, result))
387 tr = TaskResult(resultDict, engine_id)
388 # Assign task attributes
389 tr.submitted = self.submitted
390 tr.completed = self.completed
391 tr.duration = self.duration
392 if hasattr(self,'taskid'):
393 tr.taskid = self.taskid
394 else:
395 tr.taskid = None
396 if isinstance(result, failure.Failure):
397 return (False, tr)
398 else:
399 return (True, tr)
125 400
126 class ResultNS:
127 """The result namespace object for use in TaskResult objects as tr.ns.
401 class ResultNS(object):
402 """
403 A dict like object for holding the results of a task.
404
405 The result namespace object for use in `TaskResult` objects as tr.ns.
128 406 It builds an object from a dictionary, such that it has attributes
129 407 according to the key,value pairs of the dictionary.
130 408
131 409 This works by calling setattr on ALL key,value pairs in the dict. If a user
132 410 chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
133 411 This can be a bad idea, as it may corrupt standard behavior of the
134 412 ns object.
135 413
136 414 Example
137 415 --------
138 416
139 417 >>> ns = ResultNS({'a':17,'foo':range(3)})
140 418 >>> print ns
141 419 NS{'a':17,'foo':range(3)}
142 420 >>> ns.a
143 421 17
144 422 >>> ns['foo']
145 423 [0,1,2]
146 424 """
147 425 def __init__(self, dikt):
148 426 for k,v in dikt.iteritems():
149 427 setattr(self,k,v)
150 428
151 429 def __repr__(self):
152 430 l = dir(self)
153 431 d = {}
154 432 for k in l:
155 433 # do not print private objects
156 434 if k[:2] != '__' and k[-2:] != '__':
157 435 d[k] = getattr(self, k)
158 436 return "NS"+repr(d)
159 437
160 438 def __getitem__(self, key):
161 439 return getattr(self, key)
162 440
163 441 class TaskResult(object):
164 442 """
165 An object for returning task results.
443 An object for returning task results for certain types of tasks.
166 444
167 445 This object encapsulates the results of a task. On task
168 446 success it will have a keys attribute that will have a list
169 447 of the variables that have been pulled back. These variables
170 448 are accessible as attributes of this class as well. On
171 449 success the failure attribute will be None.
172 450
173 451 In task failure, keys will be empty, but failure will contain
174 452 the failure object that encapsulates the remote exception.
175 One can also simply call the raiseException() method of
453 One can also simply call the `raise_exception` method of
176 454 this class to re-raise any remote exception in the local
177 455 session.
178 456
179 The TaskResult has a .ns member, which is a property for access
457 The `TaskResult` has a `.ns` member, which is a property for access
180 458 to the results. If the Task had pull=['a', 'b'], then the
181 Task Result will have attributes tr.ns.a, tr.ns.b for those values.
182 Accessing tr.ns will raise the remote failure if the task failed.
459 Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values.
460 Accessing `tr.ns` will raise the remote failure if the task failed.
183 461
184 The engineid attribute should have the engineid of the engine
185 that ran the task. But, because engines can come and go in
186 the ipython task system, the engineid may not continue to be
462 The `engineid` attribute should have the `engineid` of the engine
463 that ran the task. But, because engines can come and go,
464 the `engineid` may not continue to be
187 465 valid or accurate.
188 466
189 The taskid attribute simply gives the taskid that the task
467 The `taskid` attribute simply gives the `taskid` that the task
190 468 is tracked under.
191 469 """
192 470 taskid = None
193 471
194 472 def _getNS(self):
195 473 if isinstance(self.failure, failure.Failure):
196 474 return self.failure.raiseException()
197 475 else:
198 476 return self._ns
199 477
200 478 def _setNS(self, v):
201 479 raise Exception("I am protected!")
202 480
203 481 ns = property(_getNS, _setNS)
204 482
205 483 def __init__(self, results, engineid):
206 484 self.engineid = engineid
207 485 if isinstance(results, failure.Failure):
208 486 self.failure = results
209 487 self.results = {}
210 488 else:
211 489 self.results = results
212 490 self.failure = None
213 491
214 492 self._ns = ResultNS(self.results)
215 493
216 494 self.keys = self.results.keys()
217 495
218 496 def __repr__(self):
219 497 if self.failure is not None:
220 498 contents = self.failure
221 499 else:
222 500 contents = self.results
223 501 return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
224 502
225 503 def __getitem__(self, key):
226 504 if self.failure is not None:
227 self.raiseException()
505 self.raise_exception()
228 506 return self.results[key]
229 507
230 def raiseException(self):
508 def raise_exception(self):
231 509 """Re-raise any remote exceptions in the local python session."""
232 510 if self.failure is not None:
233 511 self.failure.raiseException()
234 512
235 513
514 #-----------------------------------------------------------------------------
515 # The controller side of things
516 #-----------------------------------------------------------------------------
517
236 518 class IWorker(zi.Interface):
237 519 """The Basic Worker Interface.
238 520
239 521 A worked is a representation of an Engine that is ready to run tasks.
240 522 """
241 523
242 524 zi.Attribute("workerid", "the id of the worker")
243 525
244 526 def run(task):
245 527 """Run task in worker's namespace.
246 528
247 529 :Parameters:
248 530 task : a `Task` object
249 531
250 :Returns: `Deferred` to a `TaskResult` object.
532 :Returns: `Deferred` to a tuple of (success, result) where
533 success if a boolean that signifies success or failure
534 and result is the task result.
251 535 """
252 536
253 537
254 538 class WorkerFromQueuedEngine(object):
255 539 """Adapt an `IQueuedEngine` to an `IWorker` object"""
540
256 541 zi.implements(IWorker)
257 542
258 543 def __init__(self, qe):
259 544 self.queuedEngine = qe
260 545 self.workerid = None
261 546
262 547 def _get_properties(self):
263 548 return self.queuedEngine.properties
264 549
265 550 properties = property(_get_properties, lambda self, _:None)
266 551
267 552 def run(self, task):
268 553 """Run task in worker's namespace.
269 554
555 This takes a task and calls methods on the task that actually
556 cause `self.queuedEngine` to do the task. See the methods of
557 `ITask` for more information about how these methods are called.
558
270 559 :Parameters:
271 560 task : a `Task` object
272 561
273 :Returns: `Deferred` to a `TaskResult` object.
562 :Returns: `Deferred` to a tuple of (success, result) where
563 success if a boolean that signifies success or failure
564 and result is the task result.
274 565 """
275 if task.clear_before:
276 d = self.queuedEngine.reset()
277 else:
278 d = defer.succeed(None)
279
280 if isinstance(task.expression, FunctionType):
281 d.addCallback(lambda r: self.queuedEngine.push_function(
282 dict(_ipython_task_function=task.expression))
283 )
284 d.addCallback(lambda r: self.queuedEngine.push(
285 dict(_ipython_task_args=task.args,_ipython_task_kwargs=task.kwargs))
286 )
287 d.addCallback(lambda r: self.queuedEngine.execute(
288 '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)')
289 )
290 d.addCallback(lambda r: self.queuedEngine.pull('_ipython_task_result'))
291 elif isinstance(task.expression, str):
292 if task.push is not None:
293 d.addCallback(lambda r: self.queuedEngine.push(task.push))
294
295 d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
296
297 if task.pull is not None:
298 d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
299 else:
300 d.addCallback(lambda r: None)
301 else:
302 raise TypeError("task expression must be a str or function")
303
304 def reseter(result):
305 self.queuedEngine.reset()
306 return result
307
308 if task.clear_after:
309 d.addBoth(reseter)
310
311 if isinstance(task.expression, FunctionType):
312 return d.addBoth(self._zipResults, None, time.time(), time.localtime())
313 else:
314 return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
315
316 def _zipResults(self, result, names, start, start_struct):
317 """Callback for construting the TaskResult object."""
318 if isinstance(result, failure.Failure):
319 tr = TaskResult(result, self.queuedEngine.id)
320 else:
321 if names is None:
322 resultDict = {}
323 elif len(names) == 1:
324 resultDict = {names[0]:result}
325 else:
326 resultDict = dict(zip(names, result))
327 tr = TaskResult(resultDict, self.queuedEngine.id)
328 if names is None:
329 tr.result = result
330 else:
331 tr.result = None
332 # the time info
333 tr.submitted = time.strftime(time_format, start_struct)
334 tr.completed = time.strftime(time_format)
335 tr.duration = time.time()-start
336 return tr
337
566 d = defer.succeed(None)
567 d.addCallback(task.start_time)
568 task.pre_task(d, self.queuedEngine)
569 task.submit_task(d, self.queuedEngine)
570 task.post_task(d, self.queuedEngine)
571 d.addBoth(task.stop_time)
572 d.addBoth(task.process_result, self.queuedEngine.id)
573 # At this point, there will be (success, result) coming down the line
574 return d
575
338 576
339 577 components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
340 578
341 579 class IScheduler(zi.Interface):
342 580 """The interface for a Scheduler.
343 581 """
344 582 zi.Attribute("nworkers", "the number of unassigned workers")
345 583 zi.Attribute("ntasks", "the number of unscheduled tasks")
346 584 zi.Attribute("workerids", "a list of the worker ids")
347 585 zi.Attribute("taskids", "a list of the task ids")
348 586
349 587 def add_task(task, **flags):
350 588 """Add a task to the queue of the Scheduler.
351 589
352 590 :Parameters:
353 task : a `Task` object
591 task : an `ITask` implementer
354 592 The task to be queued.
355 593 flags : dict
356 594 General keywords for more sophisticated scheduling
357 595 """
358 596
359 597 def pop_task(id=None):
360 """Pops a Task object.
598 """Pops a task object from the queue.
361 599
362 600 This gets the next task to be run. If no `id` is requested, the highest priority
363 601 task is returned.
364 602
365 603 :Parameters:
366 604 id
367 605 The id of the task to be popped. The default (None) is to return
368 606 the highest priority task.
369 607
370 :Returns: a `Task` object
608 :Returns: an `ITask` implementer
371 609
372 610 :Exceptions:
373 611 IndexError : raised if no taskid in queue
374 612 """
375 613
376 614 def add_worker(worker, **flags):
377 615 """Add a worker to the worker queue.
378 616
379 617 :Parameters:
380 worker : an IWorker implementing object
381 flags : General keywords for more sophisticated scheduling
618 worker : an `IWorker` implementer
619 flags : dict
620 General keywords for more sophisticated scheduling
382 621 """
383 622
384 623 def pop_worker(id=None):
385 624 """Pops an IWorker object that is ready to do work.
386 625
387 626 This gets the next IWorker that is ready to do work.
388 627
389 628 :Parameters:
390 629 id : if specified, will pop worker with workerid=id, else pops
391 630 highest priority worker. Defaults to None.
392 631
393 632 :Returns:
394 633 an IWorker object
395 634
396 635 :Exceptions:
397 636 IndexError : raised if no workerid in queue
398 637 """
399 638
400 639 def ready():
401 640 """Returns True if there is something to do, False otherwise"""
402 641
403 642 def schedule():
404 """Returns a tuple of the worker and task pair for the next
405 task to be run.
406 """
643 """Returns (worker,task) pair for the next task to be run."""
407 644
408 645
409 646 class FIFOScheduler(object):
410 """A basic First-In-First-Out (Queue) Scheduler.
411 This is the default Scheduler for the TaskController.
412 See the docstrings for IScheduler for interface details.
647 """
648 A basic First-In-First-Out (Queue) Scheduler.
649
650 This is the default Scheduler for the `TaskController`.
651 See the docstrings for `IScheduler` for interface details.
413 652 """
414 653
415 654 zi.implements(IScheduler)
416 655
417 656 def __init__(self):
418 657 self.tasks = []
419 658 self.workers = []
420 659
421 660 def _ntasks(self):
422 661 return len(self.tasks)
423 662
424 663 def _nworkers(self):
425 664 return len(self.workers)
426 665
427 666 ntasks = property(_ntasks, lambda self, _:None)
428 667 nworkers = property(_nworkers, lambda self, _:None)
429 668
430 669 def _taskids(self):
431 670 return [t.taskid for t in self.tasks]
432 671
433 672 def _workerids(self):
434 673 return [w.workerid for w in self.workers]
435 674
436 675 taskids = property(_taskids, lambda self,_:None)
437 676 workerids = property(_workerids, lambda self,_:None)
438 677
439 678 def add_task(self, task, **flags):
440 679 self.tasks.append(task)
441 680
442 681 def pop_task(self, id=None):
443 682 if id is None:
444 683 return self.tasks.pop(0)
445 684 else:
446 685 for i in range(len(self.tasks)):
447 686 taskid = self.tasks[i].taskid
448 687 if id == taskid:
449 688 return self.tasks.pop(i)
450 689 raise IndexError("No task #%i"%id)
451 690
452 691 def add_worker(self, worker, **flags):
453 692 self.workers.append(worker)
454 693
455 694 def pop_worker(self, id=None):
456 695 if id is None:
457 696 return self.workers.pop(0)
458 697 else:
459 698 for i in range(len(self.workers)):
460 699 workerid = self.workers[i].workerid
461 700 if id == workerid:
462 701 return self.workers.pop(i)
463 702 raise IndexError("No worker #%i"%id)
464 703
465 704 def schedule(self):
466 705 for t in self.tasks:
467 706 for w in self.workers:
468 707 try:# do not allow exceptions to break this
469 cando = t.depend is None or t.depend(w.properties)
708 # Allow the task to check itself using its
709 # check_depend method.
710 cando = t.check_depend(w.properties)
470 711 except:
471 712 cando = False
472 713 if cando:
473 714 return self.pop_worker(w.workerid), self.pop_task(t.taskid)
474 715 return None, None
475 716
476 717
477 718
478 719 class LIFOScheduler(FIFOScheduler):
479 """A Last-In-First-Out (Stack) Scheduler. This scheduler should naively
480 reward fast engines by giving them more jobs. This risks starvation, but
481 only in cases with low load, where starvation does not really matter.
720 """
721 A Last-In-First-Out (Stack) Scheduler.
722
723 This scheduler should naively reward fast engines by giving
724 them more jobs. This risks starvation, but only in cases with
725 low load, where starvation does not really matter.
482 726 """
483 727
484 728 def add_task(self, task, **flags):
485 729 # self.tasks.reverse()
486 730 self.tasks.insert(0, task)
487 731 # self.tasks.reverse()
488 732
489 733 def add_worker(self, worker, **flags):
490 734 # self.workers.reverse()
491 735 self.workers.insert(0, worker)
492 736 # self.workers.reverse()
493 737
494 738
495 739 class ITaskController(cs.IControllerBase):
496 """The Task based interface to a `ControllerService` object
740 """
741 The Task based interface to a `ControllerService` object
497 742
498 743 This adapts a `ControllerService` to the ITaskController interface.
499 744 """
500 745
501 746 def run(task):
502 """Run a task.
747 """
748 Run a task.
503 749
504 750 :Parameters:
505 751 task : an IPython `Task` object
506 752
507 753 :Returns: the integer ID of the task
508 754 """
509 755
510 756 def get_task_result(taskid, block=False):
511 """Get the result of a task by its ID.
757 """
758 Get the result of a task by its ID.
512 759
513 760 :Parameters:
514 761 taskid : int
515 762 the id of the task whose result is requested
516 763
517 :Returns: `Deferred` to (taskid, actualResult) if the task is done, and None
764 :Returns: `Deferred` to the task result if the task is done, and None
518 765 if not.
519 766
520 767 :Exceptions:
521 768 actualResult will be an `IndexError` if no such task has been submitted
522 769 """
523 770
524 771 def abort(taskid):
525 772 """Remove task from queue if task is has not been submitted.
526 773
527 774 If the task has already been submitted, wait for it to finish and discard
528 775 results and prevent resubmission.
529 776
530 777 :Parameters:
531 778 taskid : the id of the task to be aborted
532 779
533 780 :Returns:
534 781 `Deferred` to abort attempt completion. Will be None on success.
535 782
536 783 :Exceptions:
537 784 deferred will fail with `IndexError` if no such task has been submitted
538 785 or the task has already completed.
539 786 """
540 787
541 788 def barrier(taskids):
542 """Block until the list of taskids are completed.
789 """
790 Block until the list of taskids are completed.
543 791
544 792 Returns None on success.
545 793 """
546 794
547 795 def spin():
548 """touch the scheduler, to resume scheduling without submitting
549 a task.
796 """
797 Touch the scheduler, to resume scheduling without submitting a task.
550 798 """
551 799
552 def queue_status(self, verbose=False):
553 """Get a dictionary with the current state of the task queue.
800 def queue_status(verbose=False):
801 """
802 Get a dictionary with the current state of the task queue.
554 803
555 804 If verbose is True, then return lists of taskids, otherwise,
556 805 return the number of tasks with each status.
557 806 """
558 807
808 def clear():
809 """
810 Clear all previously run tasks from the task controller.
811
812 This is needed because the task controller keep all task results
813 in memory. This can be a problem is there are many completed
814 tasks. Users should call this periodically to clean out these
815 cached task results.
816 """
817
559 818
560 819 class TaskController(cs.ControllerAdapterBase):
561 820 """The Task based interface to a Controller object.
562 821
563 822 If you want to use a different scheduler, just subclass this and set
564 823 the `SchedulerClass` member to the *class* of your chosen scheduler.
565 824 """
566 825
567 826 zi.implements(ITaskController)
568 827 SchedulerClass = FIFOScheduler
569 828
570 829 timeout = 30
571 830
572 831 def __init__(self, controller):
573 832 self.controller = controller
574 833 self.controller.on_register_engine_do(self.registerWorker, True)
575 834 self.controller.on_unregister_engine_do(self.unregisterWorker, True)
576 835 self.taskid = 0
577 836 self.failurePenalty = 1 # the time in seconds to penalize
578 837 # a worker for failing a task
579 838 self.pendingTasks = {} # dict of {workerid:(taskid, task)}
580 839 self.deferredResults = {} # dict of {taskid:deferred}
581 840 self.finishedResults = {} # dict of {taskid:actualResult}
582 841 self.workers = {} # dict of {workerid:worker}
583 842 self.abortPending = [] # dict of {taskid:abortDeferred}
584 843 self.idleLater = None # delayed call object for timeout
585 844 self.scheduler = self.SchedulerClass()
586 845
587 846 for id in self.controller.engines.keys():
588 847 self.workers[id] = IWorker(self.controller.engines[id])
589 848 self.workers[id].workerid = id
590 849 self.schedule.add_worker(self.workers[id])
591 850
592 851 def registerWorker(self, id):
593 852 """Called by controller.register_engine."""
594 853 if self.workers.get(id):
595 raise "We already have one! This should not happen."
854 raise ValueError("worker with id %s already exists. This should not happen." % id)
596 855 self.workers[id] = IWorker(self.controller.engines[id])
597 856 self.workers[id].workerid = id
598 857 if not self.pendingTasks.has_key(id):# if not working
599 858 self.scheduler.add_worker(self.workers[id])
600 859 self.distributeTasks()
601 860
602 861 def unregisterWorker(self, id):
603 862 """Called by controller.unregister_engine"""
604 863
605 864 if self.workers.has_key(id):
606 865 try:
607 866 self.scheduler.pop_worker(id)
608 867 except IndexError:
609 868 pass
610 869 self.workers.pop(id)
611 870
612 871 def _pendingTaskIDs(self):
613 872 return [t.taskid for t in self.pendingTasks.values()]
614 873
615 874 #---------------------------------------------------------------------------
616 875 # Interface methods
617 876 #---------------------------------------------------------------------------
618 877
619 878 def run(self, task):
620 """Run a task and return `Deferred` to its taskid."""
879 """
880 Run a task and return `Deferred` to its taskid.
881 """
621 882 task.taskid = self.taskid
622 883 task.start = time.localtime()
623 884 self.taskid += 1
624 885 d = defer.Deferred()
625 886 self.scheduler.add_task(task)
626 # log.msg('Queuing task: %i' % task.taskid)
887 log.msg('Queuing task: %i' % task.taskid)
627 888
628 889 self.deferredResults[task.taskid] = []
629 890 self.distributeTasks()
630 891 return defer.succeed(task.taskid)
631 892
632 893 def get_task_result(self, taskid, block=False):
633 """Returns a `Deferred` to a TaskResult tuple or None."""
634 # log.msg("Getting task result: %i" % taskid)
894 """
895 Returns a `Deferred` to the task result, or None.
896 """
897 log.msg("Getting task result: %i" % taskid)
635 898 if self.finishedResults.has_key(taskid):
636 899 tr = self.finishedResults[taskid]
637 900 return defer.succeed(tr)
638 901 elif self.deferredResults.has_key(taskid):
639 902 if block:
640 903 d = defer.Deferred()
641 904 self.deferredResults[taskid].append(d)
642 905 return d
643 906 else:
644 907 return defer.succeed(None)
645 908 else:
646 909 return defer.fail(IndexError("task ID not registered: %r" % taskid))
647 910
648 911 def abort(self, taskid):
649 """Remove a task from the queue if it has not been run already."""
912 """
913 Remove a task from the queue if it has not been run already.
914 """
650 915 if not isinstance(taskid, int):
651 916 return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
652 917 try:
653 918 self.scheduler.pop_task(taskid)
654 919 except IndexError, e:
655 920 if taskid in self.finishedResults.keys():
656 921 d = defer.fail(IndexError("Task Already Completed"))
657 922 elif taskid in self.abortPending:
658 923 d = defer.fail(IndexError("Task Already Aborted"))
659 924 elif taskid in self._pendingTaskIDs():# task is pending
660 925 self.abortPending.append(taskid)
661 926 d = defer.succeed(None)
662 927 else:
663 928 d = defer.fail(e)
664 929 else:
665 930 d = defer.execute(self._doAbort, taskid)
666 931
667 932 return d
668 933
669 934 def barrier(self, taskids):
670 935 dList = []
671 936 if isinstance(taskids, int):
672 937 taskids = [taskids]
673 938 for id in taskids:
674 939 d = self.get_task_result(id, block=True)
675 940 dList.append(d)
676 941 d = DeferredList(dList, consumeErrors=1)
677 942 d.addCallbacks(lambda r: None)
678 943 return d
679 944
680 945 def spin(self):
681 946 return defer.succeed(self.distributeTasks())
682 947
683 948 def queue_status(self, verbose=False):
684 949 pending = self._pendingTaskIDs()
685 950 failed = []
686 951 succeeded = []
687 952 for k,v in self.finishedResults.iteritems():
688 953 if not isinstance(v, failure.Failure):
689 954 if hasattr(v,'failure'):
690 955 if v.failure is None:
691 956 succeeded.append(k)
692 957 else:
693 958 failed.append(k)
694 959 scheduled = self.scheduler.taskids
695 960 if verbose:
696 961 result = dict(pending=pending, failed=failed,
697 962 succeeded=succeeded, scheduled=scheduled)
698 963 else:
699 964 result = dict(pending=len(pending),failed=len(failed),
700 965 succeeded=len(succeeded),scheduled=len(scheduled))
701 966 return defer.succeed(result)
702 967
703 968 #---------------------------------------------------------------------------
704 969 # Queue methods
705 970 #---------------------------------------------------------------------------
706 971
707 972 def _doAbort(self, taskid):
708 """Helper function for aborting a pending task."""
709 # log.msg("Task aborted: %i" % taskid)
973 """
974 Helper function for aborting a pending task.
975 """
976 log.msg("Task aborted: %i" % taskid)
710 977 result = failure.Failure(error.TaskAborted())
711 978 self._finishTask(taskid, result)
712 979 if taskid in self.abortPending:
713 980 self.abortPending.remove(taskid)
714 981
715 982 def _finishTask(self, taskid, result):
716 983 dlist = self.deferredResults.pop(taskid)
717 result.taskid = taskid # The TaskResult should save the taskid
984 # result.taskid = taskid # The TaskResult should save the taskid
718 985 self.finishedResults[taskid] = result
719 986 for d in dlist:
720 987 d.callback(result)
721 988
722 989 def distributeTasks(self):
723 """Distribute tasks while self.scheduler has things to do."""
724 # log.msg("distributing Tasks")
990 """
991 Distribute tasks while self.scheduler has things to do.
992 """
993 log.msg("distributing Tasks")
725 994 worker, task = self.scheduler.schedule()
726 995 if not worker and not task:
727 996 if self.idleLater and self.idleLater.called:# we are inside failIdle
728 997 self.idleLater = None
729 998 else:
730 999 self.checkIdle()
731 1000 return False
732 1001 # else something to do:
733 1002 while worker and task:
734 1003 # get worker and task
735 1004 # add to pending
736 1005 self.pendingTasks[worker.workerid] = task
737 1006 # run/link callbacks
738 1007 d = worker.run(task)
739 # log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
1008 log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
740 1009 d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
741 1010 worker, task = self.scheduler.schedule()
742 1011 # check for idle timeout:
743 1012 self.checkIdle()
744 1013 return True
745 1014
746 1015 def checkIdle(self):
747 1016 if self.idleLater and not self.idleLater.called:
748 1017 self.idleLater.cancel()
749 1018 if self.scheduler.ntasks and self.workers and \
750 1019 self.scheduler.nworkers == len(self.workers):
751 1020 self.idleLater = reactor.callLater(self.timeout, self.failIdle)
752 1021 else:
753 1022 self.idleLater = None
754 1023
755 1024 def failIdle(self):
756 1025 if not self.distributeTasks():
757 1026 while self.scheduler.ntasks:
758 1027 t = self.scheduler.pop_task()
759 1028 msg = "task %i failed to execute due to unmet dependencies"%t.taskid
760 1029 msg += " for %i seconds"%self.timeout
761 # log.msg("Task aborted by timeout: %i" % t.taskid)
1030 log.msg("Task aborted by timeout: %i" % t.taskid)
762 1031 f = failure.Failure(error.TaskTimeout(msg))
763 1032 self._finishTask(t.taskid, f)
764 1033 self.idleLater = None
765 1034
766 1035
767 def taskCompleted(self, result, taskid, workerid):
1036 def taskCompleted(self, success_and_result, taskid, workerid):
768 1037 """This is the err/callback for a completed task."""
1038 success, result = success_and_result
769 1039 try:
770 1040 task = self.pendingTasks.pop(workerid)
771 1041 except:
772 1042 # this should not happen
773 1043 log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
774 1044 log.msg("Result: %r"%result)
775 1045 log.msg("Pending tasks: %s"%self.pendingTasks)
776 1046 return
777 1047
778 1048 # Check if aborted while pending
779 1049 aborted = False
780 1050 if taskid in self.abortPending:
781 1051 self._doAbort(taskid)
782 1052 aborted = True
783 1053
784 1054 if not aborted:
785 if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed
1055 if not success:
786 1056 log.msg("Task %i failed on worker %i"% (taskid, workerid))
787 1057 if task.retries > 0: # resubmit
788 1058 task.retries -= 1
789 1059 self.scheduler.add_task(task)
790 1060 s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
791 1061 log.msg(s)
792 1062 self.distributeTasks()
793 elif isinstance(task.recovery_task, Task) and \
1063 elif isinstance(task.recovery_task, BaseTask) and \
794 1064 task.recovery_task.retries > -1:
795 1065 # retries = -1 is to prevent infinite recovery_task loop
796 1066 task.retries = -1
797 1067 task.recovery_task.taskid = taskid
798 1068 task = task.recovery_task
799 1069 self.scheduler.add_task(task)
800 1070 s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
801 1071 log.msg(s)
802 1072 self.distributeTasks()
803 1073 else: # done trying
804 1074 self._finishTask(taskid, result)
805 1075 # wait a second before readmitting a worker that failed
806 1076 # it may have died, and not yet been unregistered
807 1077 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
808 1078 else: # we succeeded
809 # log.msg("Task completed: %i"% taskid)
1079 log.msg("Task completed: %i"% taskid)
810 1080 self._finishTask(taskid, result)
811 1081 self.readmitWorker(workerid)
812 else:# we aborted the task
813 if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker
1082 else: # we aborted the task
1083 if not success:
814 1084 reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
815 1085 else:
816 1086 self.readmitWorker(workerid)
817 1087
818 1088 def readmitWorker(self, workerid):
819 """Readmit a worker to the scheduler.
1089 """
1090 Readmit a worker to the scheduler.
820 1091
821 1092 This is outside `taskCompleted` because of the `failurePenalty` being
822 1093 implemented through `reactor.callLater`.
823 1094 """
824 1095
825 1096 if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
826 1097 self.scheduler.add_worker(self.workers[workerid])
827 1098 self.distributeTasks()
1099
1100 def clear(self):
1101 """
1102 Clear all previously run tasks from the task controller.
1103
1104 This is needed because the task controller keep all task results
1105 in memory. This can be a problem is there are many completed
1106 tasks. Users should call this periodically to clean out these
1107 cached task results.
1108 """
1109 self.finishedResults = {}
1110 return defer.succeed(None)
828 1111
829 1112
830 1113 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -1,161 +1,180 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*-
3 3
4 """The Generic Task Client object.
5
6 This must be subclassed based on your connection method.
4 """
5 A blocking version of the task client.
7 6 """
8 7
9 8 __docformat__ = "restructuredtext en"
10 9
11 10 #-------------------------------------------------------------------------------
12 11 # Copyright (C) 2008 The IPython Development Team
13 12 #
14 13 # Distributed under the terms of the BSD License. The full license is in
15 14 # the file COPYING, distributed as part of this software.
16 15 #-------------------------------------------------------------------------------
17 16
18 17 #-------------------------------------------------------------------------------
19 18 # Imports
20 19 #-------------------------------------------------------------------------------
21 20
22 21 from zope.interface import Interface, implements
23 22 from twisted.python import components, log
24 23
25 24 from IPython.kernel.twistedutil import blockingCallFromThread
26 25 from IPython.kernel import task, error
26 from IPython.kernel.mapper import (
27 SynchronousTaskMapper,
28 ITaskMapperFactory,
29 IMapper
30 )
31 from IPython.kernel.parallelfunction import (
32 ParallelFunction,
33 ITaskParallelDecorator
34 )
27 35
28 36 #-------------------------------------------------------------------------------
29 # Connecting Task Client
37 # The task client
30 38 #-------------------------------------------------------------------------------
31 39
32 class InteractiveTaskClient(object):
33
34 def irun(self, *args, **kwargs):
35 """Run a task on the `TaskController`.
36
37 This method is a shorthand for run(task) and its arguments are simply
38 passed onto a `Task` object:
39
40 irun(*args, **kwargs) -> run(Task(*args, **kwargs))
41
42 :Parameters:
43 expression : str
44 A str that is valid python code that is the task.
45 pull : str or list of str
46 The names of objects to be pulled as results.
47 push : dict
48 A dict of objects to be pushed into the engines namespace before
49 execution of the expression.
50 clear_before : boolean
51 Should the engine's namespace be cleared before the task is run.
52 Default=False.
53 clear_after : boolean
54 Should the engine's namespace be cleared after the task is run.
55 Default=False.
56 retries : int
57 The number of times to resumbit the task if it fails. Default=0.
58 options : dict
59 Any other keyword options for more elaborate uses of tasks
60
61 :Returns: A `TaskResult` object.
62 """
63 block = kwargs.pop('block', False)
64 if len(args) == 1 and isinstance(args[0], task.Task):
65 t = args[0]
66 else:
67 t = task.Task(*args, **kwargs)
68 taskid = self.run(t)
69 print "TaskID = %i"%taskid
70 if block:
71 return self.get_task_result(taskid, block)
72 else:
73 return taskid
74
75 40 class IBlockingTaskClient(Interface):
76 41 """
77 An interface for blocking task clients.
42 A vague interface of the blocking task client
78 43 """
79 44 pass
80 45
81
82 class BlockingTaskClient(InteractiveTaskClient):
46 class BlockingTaskClient(object):
83 47 """
84 This class provides a blocking task client.
48 A blocking task client that adapts a non-blocking one.
85 49 """
86 50
87 implements(IBlockingTaskClient)
51 implements(
52 IBlockingTaskClient,
53 ITaskMapperFactory,
54 IMapper,
55 ITaskParallelDecorator
56 )
88 57
89 58 def __init__(self, task_controller):
90 59 self.task_controller = task_controller
91 60 self.block = True
92 61
93 def run(self, task):
94 """
95 Run a task and return a task id that can be used to get the task result.
62 def run(self, task, block=False):
63 """Run a task on the `TaskController`.
64
65 See the documentation of the `MapTask` and `StringTask` classes for
66 details on how to build a task of different types.
96 67
97 68 :Parameters:
98 task : `Task`
99 The `Task` object to run
69 task : an `ITask` implementer
70
71 :Returns: The int taskid of the submitted task. Pass this to
72 `get_task_result` to get the `TaskResult` object.
100 73 """
101 return blockingCallFromThread(self.task_controller.run, task)
74 tid = blockingCallFromThread(self.task_controller.run, task)
75 if block:
76 return self.get_task_result(tid, block=True)
77 else:
78 return tid
102 79
103 80 def get_task_result(self, taskid, block=False):
104 81 """
105 Get or poll for a task result.
82 Get a task result by taskid.
106 83
107 84 :Parameters:
108 85 taskid : int
109 The id of the task whose result to get
86 The taskid of the task to be retrieved.
110 87 block : boolean
111 If True, wait until the task is done and then result the
112 `TaskResult` object. If False, just poll for the result and
113 return None if the task is not done.
88 Should I block until the task is done?
89
90 :Returns: A `TaskResult` object that encapsulates the task result.
114 91 """
115 92 return blockingCallFromThread(self.task_controller.get_task_result,
116 93 taskid, block)
117 94
118 95 def abort(self, taskid):
119 96 """
120 Abort a task by task id if it has not been started.
97 Abort a task by taskid.
98
99 :Parameters:
100 taskid : int
101 The taskid of the task to be aborted.
121 102 """
122 103 return blockingCallFromThread(self.task_controller.abort, taskid)
123 104
124 105 def barrier(self, taskids):
125 """
126 Wait for a set of tasks to finish.
106 """Block until a set of tasks are completed.
127 107
128 108 :Parameters:
129 taskids : list of ints
130 A list of task ids to wait for.
109 taskids : list, tuple
110 A sequence of taskids to block on.
131 111 """
132 112 return blockingCallFromThread(self.task_controller.barrier, taskids)
133 113
134 114 def spin(self):
135 115 """
136 Cause the scheduler to schedule tasks.
116 Touch the scheduler, to resume scheduling without submitting a task.
137 117
138 118 This method only needs to be called in unusual situations where the
139 scheduler is idle for some reason.
119 scheduler is idle for some reason.
140 120 """
141 121 return blockingCallFromThread(self.task_controller.spin)
142 122
143 123 def queue_status(self, verbose=False):
144 124 """
145 125 Get a dictionary with the current state of the task queue.
146 126
147 127 :Parameters:
148 128 verbose : boolean
149 129 If True, return a list of taskids. If False, simply give
150 130 the number of tasks with each status.
151 131
152 132 :Returns:
153 133 A dict with the queue status.
154 134 """
155 135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136
137 def clear(self):
138 """
139 Clear all previously run tasks from the task controller.
140
141 This is needed because the task controller keep all task results
142 in memory. This can be a problem is there are many completed
143 tasks. Users should call this periodically to clean out these
144 cached task results.
145 """
146 return blockingCallFromThread(self.task_controller.clear)
147
148 def map(self, func, *sequences):
149 """
150 Apply func to *sequences elementwise. Like Python's builtin map.
151
152 This version is load balanced.
153 """
154 return self.mapper().map(func, *sequences)
156 155
156 def mapper(self, clear_before=False, clear_after=False, retries=0,
157 recovery_task=None, depend=None, block=True):
158 """
159 Create an `IMapper` implementer with a given set of arguments.
160
161 The `IMapper` created using a task controller is load balanced.
162
163 See the documentation for `IPython.kernel.task.BaseTask` for
164 documentation on the arguments to this method.
165 """
166 return SynchronousTaskMapper(self, clear_before=clear_before,
167 clear_after=clear_after, retries=retries,
168 recovery_task=recovery_task, depend=depend, block=block)
169
170 def parallel(self, clear_before=False, clear_after=False, retries=0,
171 recovery_task=None, depend=None, block=True):
172 mapper = self.mapper(clear_before, clear_after, retries,
173 recovery_task, depend, block)
174 pf = ParallelFunction(mapper)
175 return pf
157 176
158 177 components.registerAdapter(BlockingTaskClient,
159 178 task.ITaskController, IBlockingTaskClient)
160 179
161 180
@@ -1,267 +1,329 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.tests.test_taskxmlrpc -*-
3 3 """A Foolscap interface to a TaskController.
4 4
5 5 This class lets Foolscap clients talk to a TaskController.
6 6 """
7 7
8 8 __docformat__ = "restructuredtext en"
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Copyright (C) 2008 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-------------------------------------------------------------------------------
16 16
17 17 #-------------------------------------------------------------------------------
18 18 # Imports
19 19 #-------------------------------------------------------------------------------
20 20
21 21 import cPickle as pickle
22 22 import xmlrpclib, copy
23 23
24 24 from zope.interface import Interface, implements
25 25 from twisted.internet import defer
26 26 from twisted.python import components, failure
27 27
28 28 from foolscap import Referenceable
29 29
30 30 from IPython.kernel.twistedutil import blockingCallFromThread
31 31 from IPython.kernel import error, task as taskmodule, taskclient
32 32 from IPython.kernel.pickleutil import can, uncan
33 33 from IPython.kernel.clientinterfaces import (
34 34 IFCClientInterfaceProvider,
35 35 IBlockingClientAdaptor
36 36 )
37 from IPython.kernel.mapper import (
38 TaskMapper,
39 ITaskMapperFactory,
40 IMapper
41 )
42 from IPython.kernel.parallelfunction import (
43 ParallelFunction,
44 ITaskParallelDecorator
45 )
37 46
38 47 #-------------------------------------------------------------------------------
39 48 # The Controller side of things
40 49 #-------------------------------------------------------------------------------
41 50
42 51
43 52 class IFCTaskController(Interface):
44 53 """Foolscap interface to task controller.
45 54
46 See the documentation of ITaskController for documentation about the methods.
55 See the documentation of `ITaskController` for more information.
47 56 """
48 def remote_run(request, binTask):
57 def remote_run(binTask):
49 58 """"""
50 59
51 def remote_abort(request, taskid):
60 def remote_abort(taskid):
52 61 """"""
53 62
54 def remote_get_task_result(request, taskid, block=False):
63 def remote_get_task_result(taskid, block=False):
55 64 """"""
56 65
57 def remote_barrier(request, taskids):
66 def remote_barrier(taskids):
67 """"""
68
69 def remote_spin():
58 70 """"""
59 71
60 def remote_spin(request):
72 def remote_queue_status(verbose):
61 73 """"""
62 74
63 def remote_queue_status(request, verbose):
75 def remote_clear():
64 76 """"""
65 77
66 78
67 79 class FCTaskControllerFromTaskController(Referenceable):
68 """XML-RPC attachmeot for controller.
69
70 See IXMLRPCTaskController and ITaskController (and its children) for documentation.
71 80 """
81 Adapt a `TaskController` to an `IFCTaskController`
82
83 This class is used to expose a `TaskController` over the wire using
84 the Foolscap network protocol.
85 """
86
72 87 implements(IFCTaskController, IFCClientInterfaceProvider)
73 88
74 89 def __init__(self, taskController):
75 90 self.taskController = taskController
76 91
77 92 #---------------------------------------------------------------------------
78 93 # Non interface methods
79 94 #---------------------------------------------------------------------------
80 95
81 96 def packageFailure(self, f):
82 97 f.cleanFailure()
83 98 return self.packageSuccess(f)
84 99
85 100 def packageSuccess(self, obj):
86 101 serial = pickle.dumps(obj, 2)
87 102 return serial
88 103
89 104 #---------------------------------------------------------------------------
90 105 # ITaskController related methods
91 106 #---------------------------------------------------------------------------
92 107
93 108 def remote_run(self, ptask):
94 109 try:
95 ctask = pickle.loads(ptask)
96 task = taskmodule.uncan_task(ctask)
110 task = pickle.loads(ptask)
111 task.uncan_task()
97 112 except:
98 113 d = defer.fail(pickle.UnpickleableError("Could not unmarshal task"))
99 114 else:
100 115 d = self.taskController.run(task)
101 116 d.addCallback(self.packageSuccess)
102 117 d.addErrback(self.packageFailure)
103 118 return d
104 119
105 120 def remote_abort(self, taskid):
106 121 d = self.taskController.abort(taskid)
107 122 d.addCallback(self.packageSuccess)
108 123 d.addErrback(self.packageFailure)
109 124 return d
110 125
111 126 def remote_get_task_result(self, taskid, block=False):
112 127 d = self.taskController.get_task_result(taskid, block)
113 128 d.addCallback(self.packageSuccess)
114 129 d.addErrback(self.packageFailure)
115 130 return d
116 131
117 132 def remote_barrier(self, taskids):
118 133 d = self.taskController.barrier(taskids)
119 134 d.addCallback(self.packageSuccess)
120 135 d.addErrback(self.packageFailure)
121 136 return d
122 137
123 138 def remote_spin(self):
124 139 d = self.taskController.spin()
125 140 d.addCallback(self.packageSuccess)
126 141 d.addErrback(self.packageFailure)
127 142 return d
128 143
129 144 def remote_queue_status(self, verbose):
130 145 d = self.taskController.queue_status(verbose)
131 146 d.addCallback(self.packageSuccess)
132 147 d.addErrback(self.packageFailure)
133 148 return d
134 149
150 def remote_clear(self):
151 return self.taskController.clear()
152
135 153 def remote_get_client_name(self):
136 154 return 'IPython.kernel.taskfc.FCTaskClient'
137 155
138 156 components.registerAdapter(FCTaskControllerFromTaskController,
139 157 taskmodule.ITaskController, IFCTaskController)
140 158
141 159
142 160 #-------------------------------------------------------------------------------
143 161 # The Client side of things
144 162 #-------------------------------------------------------------------------------
145 163
146 164 class FCTaskClient(object):
147 """XML-RPC based TaskController client that implements ITaskController.
148
149 :Parameters:
150 addr : (ip, port)
151 The ip (str) and port (int) tuple of the `TaskController`.
152 165 """
153 implements(taskmodule.ITaskController, IBlockingClientAdaptor)
166 Client class for Foolscap exposed `TaskController`.
167
168 This class is an adapter that makes a `RemoteReference` to a
169 `TaskController` look like an actual `ITaskController` on the client side.
170
171 This class also implements `IBlockingClientAdaptor` so that clients can
172 automatically get a blocking version of this class.
173 """
174
175 implements(
176 taskmodule.ITaskController,
177 IBlockingClientAdaptor,
178 ITaskMapperFactory,
179 IMapper,
180 ITaskParallelDecorator
181 )
154 182
155 183 def __init__(self, remote_reference):
156 184 self.remote_reference = remote_reference
157 185
158 186 #---------------------------------------------------------------------------
159 187 # Non interface methods
160 188 #---------------------------------------------------------------------------
161 189
162 190 def unpackage(self, r):
163 191 return pickle.loads(r)
164 192
165 193 #---------------------------------------------------------------------------
166 194 # ITaskController related methods
167 195 #---------------------------------------------------------------------------
168 196 def run(self, task):
169 197 """Run a task on the `TaskController`.
170 198
171 :Parameters:
172 task : a `Task` object
173
174 The Task object is created using the following signature:
175
176 Task(expression, pull=None, push={}, clear_before=False,
177 clear_after=False, retries=0, **options):)
199 See the documentation of the `MapTask` and `StringTask` classes for
200 details on how to build a task of different types.
178 201
179 The meaning of the arguments is as follows:
202 :Parameters:
203 task : an `ITask` implementer
180 204
181 :Task Parameters:
182 expression : str
183 A str that is valid python code that is the task.
184 pull : str or list of str
185 The names of objects to be pulled as results.
186 push : dict
187 A dict of objects to be pushed into the engines namespace before
188 execution of the expression.
189 clear_before : boolean
190 Should the engine's namespace be cleared before the task is run.
191 Default=False.
192 clear_after : boolean
193 Should the engine's namespace be cleared after the task is run.
194 Default=False.
195 retries : int
196 The number of times to resumbit the task if it fails. Default=0.
197 options : dict
198 Any other keyword options for more elaborate uses of tasks
199
200 205 :Returns: The int taskid of the submitted task. Pass this to
201 206 `get_task_result` to get the `TaskResult` object.
202 207 """
203 assert isinstance(task, taskmodule.Task), "task must be a Task object!"
204 ctask = taskmodule.can_task(task) # handles arbitrary function in .depend
205 # as well as arbitrary recovery_task chains
206 ptask = pickle.dumps(ctask, 2)
208 assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!"
209 task.can_task()
210 ptask = pickle.dumps(task, 2)
211 task.uncan_task()
207 212 d = self.remote_reference.callRemote('run', ptask)
208 213 d.addCallback(self.unpackage)
209 214 return d
210 215
211 216 def get_task_result(self, taskid, block=False):
212 """The task result by taskid.
217 """
218 Get a task result by taskid.
213 219
214 220 :Parameters:
215 221 taskid : int
216 222 The taskid of the task to be retrieved.
217 223 block : boolean
218 224 Should I block until the task is done?
219 225
220 226 :Returns: A `TaskResult` object that encapsulates the task result.
221 227 """
222 228 d = self.remote_reference.callRemote('get_task_result', taskid, block)
223 229 d.addCallback(self.unpackage)
224 230 return d
225 231
226 232 def abort(self, taskid):
227 """Abort a task by taskid.
233 """
234 Abort a task by taskid.
228 235
229 236 :Parameters:
230 237 taskid : int
231 238 The taskid of the task to be aborted.
232 block : boolean
233 Should I block until the task is aborted.
234 239 """
235 240 d = self.remote_reference.callRemote('abort', taskid)
236 241 d.addCallback(self.unpackage)
237 242 return d
238 243
239 244 def barrier(self, taskids):
240 """Block until all tasks are completed.
245 """Block until a set of tasks are completed.
241 246
242 247 :Parameters:
243 248 taskids : list, tuple
244 249 A sequence of taskids to block on.
245 250 """
246 251 d = self.remote_reference.callRemote('barrier', taskids)
247 252 d.addCallback(self.unpackage)
248 253 return d
249 254
250 255 def spin(self):
251 """touch the scheduler, to resume scheduling without submitting
252 a task.
256 """
257 Touch the scheduler, to resume scheduling without submitting a task.
258
259 This method only needs to be called in unusual situations where the
260 scheduler is idle for some reason.
253 261 """
254 262 d = self.remote_reference.callRemote('spin')
255 263 d.addCallback(self.unpackage)
256 264 return d
257 265
258 266 def queue_status(self, verbose=False):
259 """Return a dict with the status of the task queue."""
267 """
268 Get a dictionary with the current state of the task queue.
269
270 :Parameters:
271 verbose : boolean
272 If True, return a list of taskids. If False, simply give
273 the number of tasks with each status.
274
275 :Returns:
276 A dict with the queue status.
277 """
260 278 d = self.remote_reference.callRemote('queue_status', verbose)
261 279 d.addCallback(self.unpackage)
262 280 return d
263 281
282 def clear(self):
283 """
284 Clear all previously run tasks from the task controller.
285
286 This is needed because the task controller keep all task results
287 in memory. This can be a problem is there are many completed
288 tasks. Users should call this periodically to clean out these
289 cached task results.
290 """
291 d = self.remote_reference.callRemote('clear')
292 return d
293
264 294 def adapt_to_blocking_client(self):
295 """
296 Wrap self in a blocking version that implements `IBlockingTaskClient.
297 """
265 298 from IPython.kernel.taskclient import IBlockingTaskClient
266 299 return IBlockingTaskClient(self)
300
301 def map(self, func, *sequences):
302 """
303 Apply func to *sequences elementwise. Like Python's builtin map.
304
305 This version is load balanced.
306 """
307 return self.mapper().map(func, *sequences)
308
309 def mapper(self, clear_before=False, clear_after=False, retries=0,
310 recovery_task=None, depend=None, block=True):
311 """
312 Create an `IMapper` implementer with a given set of arguments.
313
314 The `IMapper` created using a task controller is load balanced.
315
316 See the documentation for `IPython.kernel.task.BaseTask` for
317 documentation on the arguments to this method.
318 """
319 return TaskMapper(self, clear_before=clear_before,
320 clear_after=clear_after, retries=retries,
321 recovery_task=recovery_task, depend=depend, block=block)
322
323 def parallel(self, clear_before=False, clear_after=False, retries=0,
324 recovery_task=None, depend=None, block=True):
325 mapper = self.mapper(clear_before, clear_after, retries,
326 recovery_task, depend, block)
327 pf = ParallelFunction(mapper)
328 return pf
267 329
@@ -1,158 +1,187 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13 #-------------------------------------------------------------------------------
14 14 # Imports
15 15 #-------------------------------------------------------------------------------
16 16
17 17 import time
18 18
19 19 from IPython.kernel import task, engineservice as es
20 20 from IPython.kernel.util import printer
21 21 from IPython.kernel import error
22 22
23 23 #-------------------------------------------------------------------------------
24 24 # Tests
25 25 #-------------------------------------------------------------------------------
26 26
27 27 def _raise_it(f):
28 28 try:
29 29 f.raiseException()
30 30 except CompositeError, e:
31 31 e.raise_exception()
32 32
33 33 class TaskTestBase(object):
34 34
35 35 def addEngine(self, n=1):
36 36 for i in range(n):
37 37 e = es.EngineService()
38 38 e.startService()
39 39 regDict = self.controller.register_engine(es.QueuedEngine(e), None)
40 40 e.id = regDict['id']
41 41 self.engines.append(e)
42 42
43 43
44 44 class ITaskControllerTestCase(TaskTestBase):
45 45
46 def testTaskIDs(self):
46 def test_task_ids(self):
47 47 self.addEngine(1)
48 d = self.tc.run(task.Task('a=5'))
48 d = self.tc.run(task.StringTask('a=5'))
49 49 d.addCallback(lambda r: self.assertEquals(r, 0))
50 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
50 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
51 51 d.addCallback(lambda r: self.assertEquals(r, 1))
52 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
52 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
53 53 d.addCallback(lambda r: self.assertEquals(r, 2))
54 d.addCallback(lambda r: self.tc.run(task.Task('a=5')))
54 d.addCallback(lambda r: self.tc.run(task.StringTask('a=5')))
55 55 d.addCallback(lambda r: self.assertEquals(r, 3))
56 56 return d
57 57
58 def testAbort(self):
58 def test_abort(self):
59 59 """Cannot do a proper abort test, because blocking execution prevents
60 60 abort from being called before task completes"""
61 61 self.addEngine(1)
62 t = task.Task('a=5')
62 t = task.StringTask('a=5')
63 63 d = self.tc.abort(0)
64 64 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
65 65 d.addCallback(lambda _:self.tc.run(t))
66 66 d.addCallback(self.tc.abort)
67 67 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
68 68 return d
69 69
70 def testAbortType(self):
70 def test_abort_type(self):
71 71 self.addEngine(1)
72 72 d = self.tc.abort('asdfadsf')
73 73 d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException))
74 74 return d
75 75
76 def testClears(self):
76 def test_clear_before_and_after(self):
77 77 self.addEngine(1)
78 t = task.Task('a=1', clear_before=True, pull='b', clear_after=True)
78 t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True)
79 79 d = self.multiengine.execute('b=1', targets=0)
80 80 d.addCallback(lambda _: self.tc.run(t))
81 81 d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True))
82 82 d.addCallback(lambda tr: tr.failure)
83 83 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
84 84 d.addCallback(lambda _:self.multiengine.pull('a', targets=0))
85 85 d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f))
86 86 return d
87 87
88 def testSimpleRetries(self):
88 def test_simple_retries(self):
89 89 self.addEngine(1)
90 t = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.Task("i += 1\nassert i == 16", pull='i',retries=10)
90 t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
91 t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10)
92 92 d = self.multiengine.execute('i=0', targets=0)
93 93 d.addCallback(lambda r: self.tc.run(t))
94 94 d.addCallback(self.tc.get_task_result, block=True)
95 95 d.addCallback(lambda tr: tr.ns.i)
96 96 d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException))
97 97
98 98 d.addCallback(lambda r: self.tc.run(t2))
99 99 d.addCallback(self.tc.get_task_result, block=True)
100 100 d.addCallback(lambda tr: tr.ns.i)
101 101 d.addCallback(lambda r: self.assertEquals(r, 16))
102 102 return d
103 103
104 def testRecoveryTasks(self):
104 def test_recovery_tasks(self):
105 105 self.addEngine(1)
106 t = task.Task("i=16", pull='i')
107 t2 = task.Task("raise Exception", recovery_task=t, retries = 2)
106 t = task.StringTask("i=16", pull='i')
107 t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2)
108 108
109 109 d = self.tc.run(t2)
110 110 d.addCallback(self.tc.get_task_result, block=True)
111 111 d.addCallback(lambda tr: tr.ns.i)
112 112 d.addCallback(lambda r: self.assertEquals(r, 16))
113 113 return d
114 114
115 # def testInfiniteRecoveryLoop(self):
116 # self.addEngine(1)
117 # t = task.Task("raise Exception", retries = 5)
118 # t2 = task.Task("assert True", retries = 2, recovery_task = t)
119 # t.recovery_task = t2
120 #
121 # d = self.tc.run(t)
122 # d.addCallback(self.tc.get_task_result, block=True)
123 # d.addCallback(lambda tr: tr.ns.i)
124 # d.addBoth(printer)
125 # d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException))
126 # return d
127 #
128 def testSetupNS(self):
115 def test_setup_ns(self):
129 116 self.addEngine(1)
130 117 d = self.multiengine.execute('a=0', targets=0)
131 118 ns = dict(a=1, b=0)
132 t = task.Task("", push=ns, pull=['a','b'])
119 t = task.StringTask("", push=ns, pull=['a','b'])
133 120 d.addCallback(lambda r: self.tc.run(t))
134 121 d.addCallback(self.tc.get_task_result, block=True)
135 122 d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']})
136 123 d.addCallback(lambda r: self.assertEquals(r, ns))
137 124 return d
138 125
139 def testTaskResults(self):
126 def test_string_task_results(self):
140 127 self.addEngine(1)
141 t1 = task.Task('a=5', pull='a')
128 t1 = task.StringTask('a=5', pull='a')
142 129 d = self.tc.run(t1)
143 130 d.addCallback(self.tc.get_task_result, block=True)
144 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raiseException()))
131 d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception()))
145 132 d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None)))
146 133
147 t2 = task.Task('7=5')
134 t2 = task.StringTask('7=5')
148 135 d.addCallback(lambda r: self.tc.run(t2))
149 136 d.addCallback(self.tc.get_task_result, block=True)
150 137 d.addCallback(lambda tr: tr.ns)
151 138 d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException))
152 139
153 t3 = task.Task('', pull='b')
140 t3 = task.StringTask('', pull='b')
154 141 d.addCallback(lambda r: self.tc.run(t3))
155 142 d.addCallback(self.tc.get_task_result, block=True)
156 143 d.addCallback(lambda tr: tr.ns)
157 144 d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException))
158 145 return d
146
147 def test_map_task(self):
148 self.addEngine(1)
149 t1 = task.MapTask(lambda x: 2*x,(10,))
150 d = self.tc.run(t1)
151 d.addCallback(self.tc.get_task_result, block=True)
152 d.addCallback(lambda r: self.assertEquals(r,20))
153
154 t2 = task.MapTask(lambda : 20)
155 d.addCallback(lambda _: self.tc.run(t2))
156 d.addCallback(self.tc.get_task_result, block=True)
157 d.addCallback(lambda r: self.assertEquals(r,20))
158
159 t3 = task.MapTask(lambda x: x,(),{'x':20})
160 d.addCallback(lambda _: self.tc.run(t3))
161 d.addCallback(self.tc.get_task_result, block=True)
162 d.addCallback(lambda r: self.assertEquals(r,20))
163 return d
164
165 def test_map_task_failure(self):
166 self.addEngine(1)
167 t1 = task.MapTask(lambda x: 1/0,(10,))
168 d = self.tc.run(t1)
169 d.addCallback(self.tc.get_task_result, block=True)
170 d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException))
171 return d
172
173 def test_map_task_args(self):
174 self.assertRaises(TypeError, task.MapTask, 'asdfasdf')
175 self.assertRaises(TypeError, task.MapTask, lambda x: x, 10)
176 self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30)
177
178 def test_clear(self):
179 self.addEngine(1)
180 t1 = task.MapTask(lambda x: 2*x,(10,))
181 d = self.tc.run(t1)
182 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
183 d.addCallback(lambda r: self.assertEquals(r,20))
184 d.addCallback(lambda _: self.tc.clear())
185 d.addCallback(lambda _: self.tc.get_task_result(0, block=True))
186 d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException))
187 return d
@@ -1,90 +1,161 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 __docformat__ = "restructuredtext en"
5 5
6 6 #-------------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-------------------------------------------------------------------------------
12 12
13 13 #-------------------------------------------------------------------------------
14 14 # Imports
15 15 #-------------------------------------------------------------------------------
16 16
17 17 try:
18 18 import time
19 19
20 20 from twisted.internet import defer, reactor
21 21
22 22 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
23 23
24 24 from IPython.kernel import task as taskmodule
25 25 from IPython.kernel import controllerservice as cs
26 26 import IPython.kernel.multiengine as me
27 27 from IPython.testing.util import DeferredTestCase
28 28 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
29 29 from IPython.kernel.taskfc import IFCTaskController
30 30 from IPython.kernel.util import printer
31 31 from IPython.kernel.tests.tasktest import ITaskControllerTestCase
32 32 from IPython.kernel.clientconnector import ClientConnector
33 from IPython.kernel.error import CompositeError
34 from IPython.kernel.parallelfunction import ParallelFunction
33 35 except ImportError:
34 36 pass
35 37 else:
36 38
37 39 #-------------------------------------------------------------------------------
38 40 # Tests
39 41 #-------------------------------------------------------------------------------
40 42
43 def _raise_it(f):
44 try:
45 f.raiseException()
46 except CompositeError, e:
47 e.raise_exception()
48
41 49 class TaskTest(DeferredTestCase, ITaskControllerTestCase):
42 50
43 51 def setUp(self):
44 52
45 53 self.engines = []
46 54
47 55 self.controller = cs.ControllerService()
48 56 self.controller.startService()
49 57 self.imultiengine = me.IMultiEngine(self.controller)
50 58 self.itc = taskmodule.ITaskController(self.controller)
51 59 self.itc.failurePenalty = 0
52 60
53 61 self.mec_referenceable = IFCSynchronousMultiEngine(self.imultiengine)
54 62 self.tc_referenceable = IFCTaskController(self.itc)
55 63
56 64 self.controller_tub = Tub()
57 65 self.controller_tub.listenOn('tcp:10105:interface=127.0.0.1')
58 66 self.controller_tub.setLocation('127.0.0.1:10105')
59 67
60 68 mec_furl = self.controller_tub.registerReference(self.mec_referenceable)
61 69 tc_furl = self.controller_tub.registerReference(self.tc_referenceable)
62 70 self.controller_tub.startService()
63 71
64 72 self.client_tub = ClientConnector()
65 73 d = self.client_tub.get_multiengine_client(mec_furl)
66 74 d.addCallback(self.handle_mec_client)
67 75 d.addCallback(lambda _: self.client_tub.get_task_client(tc_furl))
68 76 d.addCallback(self.handle_tc_client)
69 77 return d
70 78
71 79 def handle_mec_client(self, client):
72 80 self.multiengine = client
73 81
74 82 def handle_tc_client(self, client):
75 83 self.tc = client
76 84
77 85 def tearDown(self):
78 86 dlist = []
79 87 # Shut down the multiengine client
80 88 d = self.client_tub.tub.stopService()
81 89 dlist.append(d)
82 90 # Shut down the engines
83 91 for e in self.engines:
84 92 e.stopService()
85 93 # Shut down the controller
86 94 d = self.controller_tub.stopService()
87 95 d.addBoth(lambda _: self.controller.stopService())
88 96 dlist.append(d)
89 97 return defer.DeferredList(dlist)
90
98
99 def test_mapper(self):
100 self.addEngine(1)
101 m = self.tc.mapper()
102 self.assertEquals(m.task_controller,self.tc)
103 self.assertEquals(m.clear_before,False)
104 self.assertEquals(m.clear_after,False)
105 self.assertEquals(m.retries,0)
106 self.assertEquals(m.recovery_task,None)
107 self.assertEquals(m.depend,None)
108 self.assertEquals(m.block,True)
109
110 def test_map_default(self):
111 self.addEngine(1)
112 m = self.tc.mapper()
113 d = m.map(lambda x: 2*x, range(10))
114 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
115 d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10)))
116 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
117 return d
118
119 def test_map_noblock(self):
120 self.addEngine(1)
121 m = self.tc.mapper(block=False)
122 d = m.map(lambda x: 2*x, range(10))
123 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
124 return d
125
126 def test_mapper_fail(self):
127 self.addEngine(1)
128 m = self.tc.mapper()
129 d = m.map(lambda x: 1/0, range(10))
130 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
131 return d
132
133 def test_parallel(self):
134 self.addEngine(1)
135 p = self.tc.parallel()
136 self.assert_(isinstance(p, ParallelFunction))
137 @p
138 def f(x): return 2*x
139 d = f(range(10))
140 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
141 return d
142
143 def test_parallel_noblock(self):
144 self.addEngine(1)
145 p = self.tc.parallel(block=False)
146 self.assert_(isinstance(p, ParallelFunction))
147 @p
148 def f(x): return 2*x
149 d = f(range(10))
150 d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)]))
151 return d
152
153 def test_parallel_fail(self):
154 self.addEngine(1)
155 p = self.tc.parallel()
156 self.assert_(isinstance(p, ParallelFunction))
157 @p
158 def f(x): return 1/0
159 d = f(range(10))
160 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
161 return d No newline at end of file
@@ -1,71 +1,71 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """Run a Monte-Carlo options pricer in parallel."""
4 4
5 5 from IPython.kernel import client
6 6 import numpy as N
7 7 from mcpricer import MCOptionPricer
8 8
9 9
10 10 tc = client.TaskClient()
11 11 rc = client.MultiEngineClient()
12 12
13 13 # Initialize the common code on the engines
14 14 rc.run('mcpricer.py')
15 15
16 16 # Push the variables that won't change
17 17 #(stock print, interest rate, days and MC paths)
18 18 rc.push(dict(S=100.0, r=0.05, days=260, paths=10000))
19 19
20 20 task_string = """\
21 21 op = MCOptionPricer(S,K,sigma,r,days,paths)
22 22 op.run()
23 23 vp, ap, vc, ac = op.vanilla_put, op.asian_put, op.vanilla_call, op.asian_call
24 24 """
25 25
26 26 # Create arrays of strike prices and volatilities
27 27 K_vals = N.linspace(90.0,100.0,5)
28 28 sigma_vals = N.linspace(0.0, 0.2,5)
29 29
30 30 # Submit tasks
31 31 taskids = []
32 32 for K in K_vals:
33 33 for sigma in sigma_vals:
34 t = client.Task(task_string,
34 t = client.StringTask(task_string,
35 35 push=dict(sigma=sigma,K=K),
36 36 pull=('vp','ap','vc','ac','sigma','K'))
37 37 taskids.append(tc.run(t))
38 38
39 39 print "Submitted tasks: ", taskids
40 40
41 41 # Block until tasks are completed
42 42 tc.barrier(taskids)
43 43
44 44 # Get the results
45 45 results = [tc.get_task_result(tid) for tid in taskids]
46 46
47 47 # Assemble the result
48 48 vc = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
49 49 vp = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
50 50 ac = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
51 51 ap = N.empty(K_vals.shape[0]*sigma_vals.shape[0],dtype='float64')
52 52 for i, tr in enumerate(results):
53 53 ns = tr.ns
54 54 vc[i] = ns.vc
55 55 vp[i] = ns.vp
56 56 ac[i] = ns.ac
57 57 ap[i] = ns.ap
58 58 vc.shape = (K_vals.shape[0],sigma_vals.shape[0])
59 59 vp.shape = (K_vals.shape[0],sigma_vals.shape[0])
60 60 ac.shape = (K_vals.shape[0],sigma_vals.shape[0])
61 61 ap.shape = (K_vals.shape[0],sigma_vals.shape[0])
62 62
63 63
64 64 def plot_options(K_vals, sigma_vals, prices):
65 65 """Make a contour plot of the option prices."""
66 66 import pylab
67 67 pylab.contourf(sigma_vals, K_vals, prices)
68 68 pylab.colorbar()
69 69 pylab.title("Option Price")
70 70 pylab.xlabel("Volatility")
71 71 pylab.ylabel("Strike Price")
@@ -1,18 +1,18 b''
1 1 from IPython.kernel import client
2 2
3 3 tc = client.TaskClient()
4 4 rc = client.MultiEngineClient()
5 5
6 6 rc.push(dict(d=30))
7 7
8 8 cmd1 = """\
9 9 a = 5
10 10 b = 10*d
11 11 c = a*b*d
12 12 """
13 13
14 t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
14 t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
15 15 tid1 = tc.run(t1)
16 16 tr1 = tc.get_task_result(tid1,block=True)
17 17 tr1.raiseException()
18 18 print "a, b: ", tr1.ns.a, tr1.ns.b No newline at end of file
@@ -1,44 +1,44 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 from IPython.kernel import client
5 5 import time
6 6
7 7 tc = client.TaskClient()
8 8 mec = client.MultiEngineClient()
9 9
10 10 mec.execute('import time')
11 11
12 12 for i in range(24):
13 tc.irun('time.sleep(1)')
13 tc.run(client.StringTask('time.sleep(1)'))
14 14
15 15 for i in range(6):
16 16 time.sleep(1.0)
17 17 print "Queue status (vebose=False)"
18 18 print tc.queue_status()
19 19
20 20 for i in range(24):
21 tc.irun('time.sleep(1)')
21 tc.run(client.StringTask('time.sleep(1)'))
22 22
23 23 for i in range(6):
24 24 time.sleep(1.0)
25 25 print "Queue status (vebose=True)"
26 26 print tc.queue_status(True)
27 27
28 28 for i in range(12):
29 tc.irun('time.sleep(2)')
29 tc.run(client.StringTask('time.sleep(2)'))
30 30
31 31 print "Queue status (vebose=True)"
32 32 print tc.queue_status(True)
33 33
34 34 qs = tc.queue_status(True)
35 35 sched = qs['scheduled']
36 36
37 37 for tid in sched[-4:]:
38 38 tc.abort(tid)
39 39
40 40 for i in range(6):
41 41 time.sleep(1.0)
42 42 print "Queue status (vebose=True)"
43 43 print tc.queue_status(True)
44 44
@@ -1,77 +1,77 b''
1 1 #!/usr/bin/env python
2 2 """Test the performance of the task farming system.
3 3
4 4 This script submits a set of tasks to the TaskClient. The tasks
5 5 are basically just a time.sleep(t), where t is a random number between
6 6 two limits that can be configured at the command line. To run
7 7 the script there must first be an IPython controller and engines running::
8 8
9 9 ipcluster -n 16
10 10
11 11 A good test to run with 16 engines is::
12 12
13 13 python task_profiler.py -n 128 -t 0.01 -T 1.0
14 14
15 15 This should show a speedup of 13-14x. The limitation here is that the
16 16 overhead of a single task is about 0.001-0.01 seconds.
17 17 """
18 18 import random, sys
19 19 from optparse import OptionParser
20 20
21 21 from IPython.genutils import time
22 22 from IPython.kernel import client
23 23
24 24 def main():
25 25 parser = OptionParser()
26 26 parser.set_defaults(n=100)
27 27 parser.set_defaults(tmin=1)
28 28 parser.set_defaults(tmax=60)
29 29 parser.set_defaults(controller='localhost')
30 30 parser.set_defaults(meport=10105)
31 31 parser.set_defaults(tport=10113)
32 32
33 33 parser.add_option("-n", type='int', dest='n',
34 34 help='the number of tasks to run')
35 35 parser.add_option("-t", type='float', dest='tmin',
36 36 help='the minimum task length in seconds')
37 37 parser.add_option("-T", type='float', dest='tmax',
38 38 help='the maximum task length in seconds')
39 39 parser.add_option("-c", type='string', dest='controller',
40 40 help='the address of the controller')
41 41 parser.add_option("-p", type='int', dest='meport',
42 42 help="the port on which the controller listens for the MultiEngine/RemoteController client")
43 43 parser.add_option("-P", type='int', dest='tport',
44 44 help="the port on which the controller listens for the TaskClient client")
45 45
46 46 (opts, args) = parser.parse_args()
47 47 assert opts.tmax >= opts.tmin, "tmax must not be smaller than tmin"
48 48
49 49 rc = client.MultiEngineClient()
50 50 tc = client.TaskClient()
51 51 print tc.task_controller
52 52 rc.block=True
53 53 nengines = len(rc.get_ids())
54 54 rc.execute('from IPython.genutils import time')
55 55
56 56 # the jobs should take a random time within a range
57 57 times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)]
58 tasks = [client.Task("time.sleep(%f)"%t) for t in times]
58 tasks = [client.StringTask("time.sleep(%f)"%t) for t in times]
59 59 stime = sum(times)
60 60
61 61 print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines)
62 62 time.sleep(1)
63 63 start = time.time()
64 64 taskids = [tc.run(t) for t in tasks]
65 65 tc.barrier(taskids)
66 66 stop = time.time()
67 67
68 68 ptime = stop-start
69 69 scale = stime/ptime
70 70
71 71 print "executed %.1f secs in %.1f secs"%(stime, ptime)
72 72 print "%.3fx parallel performance on %i engines"%(scale, nengines)
73 73 print "%.1f%% of theoretical max"%(100*scale/nengines)
74 74
75 75
76 76 if __name__ == '__main__':
77 77 main()
General Comments 0
You need to be logged in to leave comments. Login now