##// END OF EJS Templates
Parallel magics (%result, %px, %autopx) are fixed....
Brian Granger -
Show More
@@ -0,0 +1,205 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """Magic command interface for interactive parallel work."""
5
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 import new
18
19 from IPython.core.component import Component
20 from IPython.utils.traitlets import Bool, Any
21 from IPython.utils.autoattr import auto_attr
22
23 #-----------------------------------------------------------------------------
24 # Definitions of magic functions for use with IPython
25 #-----------------------------------------------------------------------------
26
27
28 NO_ACTIVE_MULTIENGINE_CLIENT = """
29 Use activate() on a MultiEngineClient object to activate it for magics.
30 """
31
32
33 class ParalleMagicComponent(Component):
34 """A component to manage the %result, %px and %autopx magics."""
35
36 active_multiengine_client = Any()
37 verbose = Bool(False, config=True)
38
39 def __init__(self, parent, name=None, config=None):
40 super(ParalleMagicComponent, self).__init__(parent, name=name, config=config)
41 self._define_magics()
42 # A flag showing if autopx is activated or not
43 self.autopx = False
44
45 # Access other components like this rather than by a regular attribute.
46 # This won't lookup the InteractiveShell object until it is used and
47 # then it is cached. This is both efficient and couples this class
48 # more loosely to InteractiveShell.
49 @auto_attr
50 def shell(self):
51 return Component.get_instances(
52 root=self.root,
53 klass='IPython.core.iplib.InteractiveShell')[0]
54
55 def _define_magics(self):
56 """Define the magic functions."""
57 self.shell.define_magic('result', self.magic_result)
58 self.shell.define_magic('px', self.magic_px)
59 self.shell.define_magic('autopx', self.magic_autopx)
60
61 def magic_result(self, ipself, parameter_s=''):
62 """Print the result of command i on all engines..
63
64 To use this a :class:`MultiEngineClient` instance must be created
65 and then activated by calling its :meth:`activate` method.
66
67 Then you can do the following::
68
69 In [23]: %result
70 Out[23]:
71 <Results List>
72 [0] In [6]: a = 10
73 [1] In [6]: a = 10
74
75 In [22]: %result 6
76 Out[22]:
77 <Results List>
78 [0] In [6]: a = 10
79 [1] In [6]: a = 10
80 """
81 if self.active_multiengine_client is None:
82 print NO_ACTIVE_MULTIENGINE_CLIENT
83 return
84
85 try:
86 index = int(parameter_s)
87 except:
88 index = None
89 result = self.active_multiengine_client.get_result(index)
90 return result
91
92 def magic_px(self, ipself, parameter_s=''):
93 """Executes the given python command in parallel.
94
95 To use this a :class:`MultiEngineClient` instance must be created
96 and then activated by calling its :meth:`activate` method.
97
98 Then you can do the following::
99
100 In [24]: %px a = 5
101 Parallel execution on engines: all
102 Out[24]:
103 <Results List>
104 [0] In [7]: a = 5
105 [1] In [7]: a = 5
106 """
107
108 if self.active_multiengine_client is None:
109 print NO_ACTIVE_MULTIENGINE_CLIENT
110 return
111 print "Parallel execution on engines: %s" % self.active_multiengine_client.targets
112 result = self.active_multiengine_client.execute(parameter_s)
113 return result
114
115 def magic_autopx(self, ipself, parameter_s=''):
116 """Toggles auto parallel mode.
117
118 To use this a :class:`MultiEngineClient` instance must be created
119 and then activated by calling its :meth:`activate` method. Once this
120 is called, all commands typed at the command line are send to
121 the engines to be executed in parallel. To control which engine
122 are used, set the ``targets`` attributed of the multiengine client
123 before entering ``%autopx`` mode.
124
125 Then you can do the following::
126
127 In [25]: %autopx
128 %autopx to enabled
129
130 In [26]: a = 10
131 <Results List>
132 [0] In [8]: a = 10
133 [1] In [8]: a = 10
134
135
136 In [27]: %autopx
137 %autopx disabled
138 """
139 if self.autopx:
140 self._disable_autopx()
141 else:
142 self._enable_autopx()
143
144 def _enable_autopx(self):
145 """Enable %autopx mode by saving the original runsource and installing
146 pxrunsource.
147 """
148 if self.active_multiengine_client is None:
149 print NO_ACTIVE_MULTIENGINE_CLIENT
150 return
151
152 self._original_runsource = self.shell.runsource
153 self.shell.runsource = new.instancemethod(
154 self.pxrunsource, self.shell, self.shell.__class__
155 )
156 self.autopx = True
157 print "%autopx enabled"
158
159 def _disable_autopx(self):
160 """Disable %autopx by restoring the original InteractiveShell.runsource."""
161 if self.autopx:
162 self.shell.runsource = self._original_runsource
163 self.autopx = False
164 print "%autopx disabled"
165
166 def pxrunsource(self, ipself, source, filename="<input>", symbol="single"):
167 """A parallel replacement for InteractiveShell.runsource."""
168
169 try:
170 code = ipself.compile(source, filename, symbol)
171 except (OverflowError, SyntaxError, ValueError):
172 # Case 1
173 ipself.showsyntaxerror(filename)
174 return None
175
176 if code is None:
177 # Case 2
178 return True
179
180 # Case 3
181 # Because autopx is enabled, we now call executeAll or disable autopx if
182 # %autopx or autopx has been called
183 if 'get_ipython().magic("%autopx' in source or 'get_ipython().magic("autopx' in source:
184 self._disable_autopx()
185 return False
186 else:
187 try:
188 result = self.active_multiengine_client.execute(source)
189 except:
190 ipself.showtraceback()
191 else:
192 print result.__repr__()
193 return False
194
195
196 _loaded = False
197
198
199 def load_ipython_extension(ip):
200 """Load the extension in IPython."""
201 global _loaded
202 if not _loaded:
203 prd = ParalleMagicComponent(ip, name='parallel_magic')
204 _loaded = True
205
@@ -1,17 +1,24 b''
1 1 c = get_config()
2 2
3 3 # This can be used at any point in a config file to load a sub config
4 4 # and merge it into the current one.
5 5 load_subconfig('ipython_config.py')
6 6
7 7 lines = """
8 8 from IPython.kernel.client import *
9 9 """
10 10
11 11 # You have to make sure that attributes that are containers already
12 12 # exist before using them. Simple assigning a new list will override
13 13 # all previous values.
14 14 if hasattr(c.Global, 'exec_lines'):
15 15 c.Global.exec_lines.append(lines)
16 16 else:
17 c.Global.exec_lines = [lines] No newline at end of file
17 c.Global.exec_lines = [lines]
18
19 # Load the parallelmagic extension to enable %result, %px, %autopx magics.
20 if hasattr(c.Global, 'extensions'):
21 c.Global.extensions.append('parallelmagic')
22 else:
23 c.Global.extensions = ['parallelmagic']
24
@@ -1,117 +1,118 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A context manager for managing things injected into :mod:`__builtin__`.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2009 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import __builtin__
23 23
24 24 from IPython.core.component import Component
25 25 from IPython.core.quitter import Quitter
26 26
27 27 from IPython.utils.autoattr import auto_attr
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Classes and functions
31 31 #-----------------------------------------------------------------------------
32 32
33 33
34 34 class BuiltinUndefined(object): pass
35 35 BuiltinUndefined = BuiltinUndefined()
36 36
37 37
38 38 class BuiltinTrap(Component):
39 39
40 40 def __init__(self, parent):
41 41 super(BuiltinTrap, self).__init__(parent, None, None)
42 42 self._orig_builtins = {}
43 43 # We define this to track if a single BuiltinTrap is nested.
44 44 # Only turn off the trap when the outermost call to __exit__ is made.
45 45 self._nested_level = 0
46 46
47 47 @auto_attr
48 48 def shell(self):
49 49 return Component.get_instances(
50 50 root=self.root,
51 51 klass='IPython.core.iplib.InteractiveShell')[0]
52 52
53 53 def __enter__(self):
54 54 if self._nested_level == 0:
55 55 self.set()
56 56 self._nested_level += 1
57 57 # I return self, so callers can use add_builtin in a with clause.
58 58 return self
59 59
60 60 def __exit__(self, type, value, traceback):
61 61 if self._nested_level == 1:
62 62 self.unset()
63 63 self._nested_level -= 1
64 64 # Returning False will cause exceptions to propagate
65 65 return False
66 66
67 67 def add_builtin(self, key, value):
68 68 """Add a builtin and save the original."""
69 69 orig = __builtin__.__dict__.get(key, BuiltinUndefined)
70 70 self._orig_builtins[key] = orig
71 71 __builtin__.__dict__[key] = value
72 72
73 73 def remove_builtin(self, key):
74 74 """Remove an added builtin and re-set the original."""
75 75 try:
76 76 orig = self._orig_builtins.pop(key)
77 77 except KeyError:
78 78 pass
79 79 else:
80 80 if orig is BuiltinUndefined:
81 81 del __builtin__.__dict__[key]
82 82 else:
83 83 __builtin__.__dict__[key] = orig
84 84
85 85 def set(self):
86 86 """Store ipython references in the __builtin__ namespace."""
87 87 self.add_builtin('exit', Quitter(self.shell, 'exit'))
88 88 self.add_builtin('quit', Quitter(self.shell, 'quit'))
89 self.add_builtin('get_ipython', self.shell.get_ipython)
89 90
90 91 # Recursive reload function
91 92 try:
92 93 from IPython.lib import deepreload
93 94 if self.shell.deep_reload:
94 95 self.add_builtin('reload', deepreload.reload)
95 96 else:
96 97 self.add_builtin('dreload', deepreload.reload)
97 98 del deepreload
98 99 except ImportError:
99 100 pass
100 101
101 102 # Keep in the builtins a flag for when IPython is active. We set it
102 103 # with setdefault so that multiple nested IPythons don't clobber one
103 104 # another. Each will increase its value by one upon being activated,
104 105 # which also gives us a way to determine the nesting level.
105 106 __builtin__.__dict__.setdefault('__IPYTHON__active',0)
106 107
107 108 def unset(self):
108 109 """Remove any builtins which might have been added by add_builtins, or
109 110 restore overwritten ones to their previous values."""
110 111 for key in self._orig_builtins.keys():
111 112 self.remove_builtin(key)
112 113 self._orig_builtins.clear()
113 114 self._builtins_added = False
114 115 try:
115 116 del __builtin__.__dict__['__IPYTHON__active']
116 117 except KeyError:
117 118 pass
@@ -1,959 +1,963 b''
1 1 # encoding: utf-8
2 2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3 3
4 4 """General Classes for IMultiEngine clients."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 import sys
20 20 import linecache
21 21 import warnings
22 22
23 23 from twisted.python import components
24 24 from twisted.python.failure import Failure
25 25 from zope.interface import Interface, implements, Attribute
26 26
27 27 from IPython.utils.coloransi import TermColors
28 28
29 29 from IPython.kernel.twistedutil import blockingCallFromThread
30 30 from IPython.kernel import error
31 31 from IPython.kernel.parallelfunction import ParallelFunction
32 32 from IPython.kernel.mapper import (
33 33 MultiEngineMapper,
34 34 IMultiEngineMapperFactory,
35 35 IMapper
36 36 )
37 37
38 38 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
39 39
40 40
41 41 #-------------------------------------------------------------------------------
42 42 # Pending Result things
43 43 #-------------------------------------------------------------------------------
44 44
45 45 class IPendingResult(Interface):
46 46 """A representation of a result that is pending.
47 47
48 48 This class is similar to Twisted's `Deferred` object, but is designed to be
49 49 used in a synchronous context.
50 50 """
51 51
52 52 result_id=Attribute("ID of the deferred on the other side")
53 53 client=Attribute("A client that I came from")
54 54 r=Attribute("An attribute that is a property that calls and returns get_result")
55 55
56 56 def get_result(default=None, block=True):
57 57 """
58 58 Get a result that is pending.
59 59
60 60 :Parameters:
61 61 default
62 62 The value to return if the result is not ready.
63 63 block : boolean
64 64 Should I block for the result.
65 65
66 66 :Returns: The actual result or the default value.
67 67 """
68 68
69 69 def add_callback(f, *args, **kwargs):
70 70 """
71 71 Add a callback that is called with the result.
72 72
73 73 If the original result is foo, adding a callback will cause
74 74 f(foo, *args, **kwargs) to be returned instead. If multiple
75 75 callbacks are registered, they are chained together: the result of
76 76 one is passed to the next and so on.
77 77
78 78 Unlike Twisted's Deferred object, there is no errback chain. Thus
79 79 any exception raised will not be caught and handled. User must
80 80 catch these by hand when calling `get_result`.
81 81 """
82 82
83 83
84 84 class PendingResult(object):
85 85 """A representation of a result that is not yet ready.
86 86
87 87 A user should not create a `PendingResult` instance by hand.
88 88
89 89 Methods:
90 90
91 91 * `get_result`
92 92 * `add_callback`
93 93
94 94 Properties:
95 95
96 96 * `r`
97 97 """
98 98
99 99 def __init__(self, client, result_id):
100 100 """Create a PendingResult with a result_id and a client instance.
101 101
102 102 The client should implement `_getPendingResult(result_id, block)`.
103 103 """
104 104 self.client = client
105 105 self.result_id = result_id
106 106 self.called = False
107 107 self.raised = False
108 108 self.callbacks = []
109 109
110 110 def get_result(self, default=None, block=True):
111 111 """Get a result that is pending.
112 112
113 113 This method will connect to an IMultiEngine adapted controller
114 114 and see if the result is ready. If the action triggers an exception
115 115 raise it and record it. This method records the result/exception once it is
116 116 retrieved. Calling `get_result` again will get this cached result or will
117 117 re-raise the exception. The .r attribute is a property that calls
118 118 `get_result` with block=True.
119 119
120 120 :Parameters:
121 121 default
122 122 The value to return if the result is not ready.
123 123 block : boolean
124 124 Should I block for the result.
125 125
126 126 :Returns: The actual result or the default value.
127 127 """
128 128
129 129 if self.called:
130 130 if self.raised:
131 131 raise self.result[0], self.result[1], self.result[2]
132 132 else:
133 133 return self.result
134 134 try:
135 135 result = self.client.get_pending_deferred(self.result_id, block)
136 136 except error.ResultNotCompleted:
137 137 return default
138 138 except:
139 139 # Reraise other error, but first record them so they can be reraised
140 140 # later if .r or get_result is called again.
141 141 self.result = sys.exc_info()
142 142 self.called = True
143 143 self.raised = True
144 144 raise
145 145 else:
146 146 for cb in self.callbacks:
147 147 result = cb[0](result, *cb[1], **cb[2])
148 148 self.result = result
149 149 self.called = True
150 150 return result
151 151
152 152 def add_callback(self, f, *args, **kwargs):
153 153 """Add a callback that is called with the result.
154 154
155 155 If the original result is result, adding a callback will cause
156 156 f(result, *args, **kwargs) to be returned instead. If multiple
157 157 callbacks are registered, they are chained together: the result of
158 158 one is passed to the next and so on.
159 159
160 160 Unlike Twisted's Deferred object, there is no errback chain. Thus
161 161 any exception raised will not be caught and handled. User must
162 162 catch these by hand when calling `get_result`.
163 163 """
164 164 assert callable(f)
165 165 self.callbacks.append((f, args, kwargs))
166 166
167 167 def __cmp__(self, other):
168 168 if self.result_id < other.result_id:
169 169 return -1
170 170 else:
171 171 return 1
172 172
173 173 def _get_r(self):
174 174 return self.get_result(block=True)
175 175
176 176 r = property(_get_r)
177 177 """This property is a shortcut to a `get_result(block=True)`."""
178 178
179 179
180 180 #-------------------------------------------------------------------------------
181 181 # Pretty printing wrappers for certain lists
182 182 #-------------------------------------------------------------------------------
183 183
184 184 class ResultList(list):
185 185 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
186 186
187 187 def __repr__(self):
188 188 output = []
189 189 # These colored prompts were not working on Windows
190 190 if sys.platform == 'win32':
191 191 blue = normal = red = green = ''
192 192 else:
193 193 blue = TermColors.Blue
194 194 normal = TermColors.Normal
195 195 red = TermColors.Red
196 196 green = TermColors.Green
197 197 output.append("<Results List>\n")
198 198 for cmd in self:
199 199 if isinstance(cmd, Failure):
200 200 output.append(cmd)
201 201 else:
202 202 target = cmd.get('id',None)
203 203 cmd_num = cmd.get('number',None)
204 204 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
205 205 cmd_stdout = cmd.get('stdout', None)
206 206 cmd_stderr = cmd.get('stderr', None)
207 207 output.append("%s[%i]%s In [%i]:%s %s\n" % \
208 208 (green, target,
209 209 blue, cmd_num, normal, cmd_stdin))
210 210 if cmd_stdout:
211 211 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
212 212 (green, target,
213 213 red, cmd_num, normal, cmd_stdout))
214 214 if cmd_stderr:
215 215 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
216 216 (green, target,
217 217 red, cmd_num, normal, cmd_stderr))
218 218 return ''.join(output)
219 219
220 220
221 221 def wrapResultList(result):
222 222 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
223 223 if len(result) == 0:
224 224 result = [result]
225 225 return ResultList(result)
226 226
227 227
228 228 class QueueStatusList(list):
229 229 """A subclass of list that pretty prints the output of `queue_status`."""
230 230
231 231 def __repr__(self):
232 232 output = []
233 233 output.append("<Queue Status List>\n")
234 234 for e in self:
235 235 output.append("Engine: %s\n" % repr(e[0]))
236 236 output.append(" Pending: %s\n" % repr(e[1]['pending']))
237 237 for q in e[1]['queue']:
238 238 output.append(" Command: %s\n" % repr(q))
239 239 return ''.join(output)
240 240
241 241
242 242 #-------------------------------------------------------------------------------
243 243 # InteractiveMultiEngineClient
244 244 #-------------------------------------------------------------------------------
245 245
246 246 class InteractiveMultiEngineClient(object):
247 247 """A mixin class that add a few methods to a multiengine client.
248 248
249 249 The methods in this mixin class are designed for interactive usage.
250 250 """
251 251
252 252 def activate(self):
253 253 """Make this `MultiEngineClient` active for parallel magic commands.
254 254
255 255 IPython has a magic command syntax to work with `MultiEngineClient` objects.
256 256 In a given IPython session there is a single active one. While
257 257 there can be many `MultiEngineClient` created and used by the user,
258 258 there is only one active one. The active `MultiEngineClient` is used whenever
259 259 the magic commands %px and %autopx are used.
260 260
261 261 The activate() method is called on a given `MultiEngineClient` to make it
262 262 active. Once this has been done, the magic commands can be used.
263 263 """
264 264
265 265 try:
266 __IPYTHON__.activeController = self
266 # This is injected into __builtins__.
267 ip = get_ipython()
267 268 except NameError:
268 print "The IPython Controller magics only work within IPython."
269 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
270 else:
271 pmagic = ip.get_component('parallel_magic')
272 pmagic.active_multiengine_client = self
269 273
270 274 def __setitem__(self, key, value):
271 275 """Add a dictionary interface for pushing/pulling.
272 276
273 277 This functions as a shorthand for `push`.
274 278
275 279 :Parameters:
276 280 key : str
277 281 What to call the remote object.
278 282 value : object
279 283 The local Python object to push.
280 284 """
281 285 targets, block = self._findTargetsAndBlock()
282 286 return self.push({key:value}, targets=targets, block=block)
283 287
284 288 def __getitem__(self, key):
285 289 """Add a dictionary interface for pushing/pulling.
286 290
287 291 This functions as a shorthand to `pull`.
288 292
289 293 :Parameters:
290 294 - `key`: A string representing the key.
291 295 """
292 296 if isinstance(key, str):
293 297 targets, block = self._findTargetsAndBlock()
294 298 return self.pull(key, targets=targets, block=block)
295 299 else:
296 300 raise TypeError("__getitem__ only takes strs")
297 301
298 302 def __len__(self):
299 303 """Return the number of available engines."""
300 304 return len(self.get_ids())
301 305
302 306 #---------------------------------------------------------------------------
303 307 # Make this a context manager for with
304 308 #---------------------------------------------------------------------------
305 309
306 310 def findsource_file(self,f):
307 311 linecache.checkcache()
308 312 s = findsource(f.f_code) # findsource is not defined!
309 313 lnum = f.f_lineno
310 314 wsource = s[0][f.f_lineno:]
311 315 return strip_whitespace(wsource)
312 316
313 317 def findsource_ipython(self,f):
314 318 from IPython.core import ipapi
315 319 self.ip = ipapi.get()
316 320 wsource = [l+'\n' for l in
317 321 self.ip.input_hist_raw[-1].splitlines()[1:]]
318 322 return strip_whitespace(wsource)
319 323
320 324 def __enter__(self):
321 325 f = sys._getframe(1)
322 326 local_ns = f.f_locals
323 327 global_ns = f.f_globals
324 328 if f.f_code.co_filename == '<ipython console>':
325 329 s = self.findsource_ipython(f)
326 330 else:
327 331 s = self.findsource_file(f)
328 332
329 333 self._with_context_result = self.execute(s)
330 334
331 335 def __exit__ (self, etype, value, tb):
332 336 if issubclass(etype,error.StopLocalExecution):
333 337 return True
334 338
335 339
336 340 def remote():
337 341 m = 'Special exception to stop local execution of parallel code.'
338 342 raise error.StopLocalExecution(m)
339 343
340 344 def strip_whitespace(source):
341 345 # Expand tabs to avoid any confusion.
342 346 wsource = [l.expandtabs(4) for l in source]
343 347 # Detect the indentation level
344 348 done = False
345 349 for line in wsource:
346 350 if line.isspace():
347 351 continue
348 352 for col,char in enumerate(line):
349 353 if char != ' ':
350 354 done = True
351 355 break
352 356 if done:
353 357 break
354 358 # Now we know how much leading space there is in the code. Next, we
355 359 # extract up to the first line that has less indentation.
356 360 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
357 361 # detect triple quoted strings that may have flush left text.
358 362 for lno,line in enumerate(wsource):
359 363 lead = line[:col]
360 364 if lead.isspace():
361 365 continue
362 366 else:
363 367 if not lead.lstrip().startswith('#'):
364 368 break
365 369 # The real 'with' source is up to lno
366 370 src_lines = [l[col:] for l in wsource[:lno+1]]
367 371
368 372 # Finally, check that the source's first non-comment line begins with the
369 373 # special call 'remote()'
370 374 for nline,line in enumerate(src_lines):
371 375 if line.isspace() or line.startswith('#'):
372 376 continue
373 377 if 'remote()' in line:
374 378 break
375 379 else:
376 380 raise ValueError('remote() call missing at the start of code')
377 381 src = ''.join(src_lines[nline+1:])
378 382 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
379 383 return src
380 384
381 385
382 386 #-------------------------------------------------------------------------------
383 387 # The top-level MultiEngine client adaptor
384 388 #-------------------------------------------------------------------------------
385 389
386 390
387 391 _prop_warn = """\
388 392
389 393 We are currently refactoring the task dependency system. This might
390 394 involve the removal of this method and other methods related to engine
391 395 properties. Please see the docstrings for IPython.kernel.TaskRejectError
392 396 for more information."""
393 397
394 398
395 399 class IFullBlockingMultiEngineClient(Interface):
396 400 pass
397 401
398 402
399 403 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
400 404 """
401 405 A blocking client to the `IMultiEngine` controller interface.
402 406
403 407 This class allows users to use a set of engines for a parallel
404 408 computation through the `IMultiEngine` interface. In this interface,
405 409 each engine has a specific id (an int) that is used to refer to the
406 410 engine, run code on it, etc.
407 411 """
408 412
409 413 implements(
410 414 IFullBlockingMultiEngineClient,
411 415 IMultiEngineMapperFactory,
412 416 IMapper
413 417 )
414 418
415 419 def __init__(self, smultiengine):
416 420 self.smultiengine = smultiengine
417 421 self.block = True
418 422 self.targets = 'all'
419 423
420 424 def _findBlock(self, block=None):
421 425 if block is None:
422 426 return self.block
423 427 else:
424 428 if block in (True, False):
425 429 return block
426 430 else:
427 431 raise ValueError("block must be True or False")
428 432
429 433 def _findTargets(self, targets=None):
430 434 if targets is None:
431 435 return self.targets
432 436 else:
433 437 if not isinstance(targets, (str,list,tuple,int)):
434 438 raise ValueError("targets must be a str, list, tuple or int")
435 439 return targets
436 440
437 441 def _findTargetsAndBlock(self, targets=None, block=None):
438 442 return self._findTargets(targets), self._findBlock(block)
439 443
440 444 def _blockFromThread(self, function, *args, **kwargs):
441 445 block = kwargs.get('block', None)
442 446 if block is None:
443 447 raise error.MissingBlockArgument("'block' keyword argument is missing")
444 448 result = blockingCallFromThread(function, *args, **kwargs)
445 449 if not block:
446 450 result = PendingResult(self, result)
447 451 return result
448 452
449 453 def get_pending_deferred(self, deferredID, block):
450 454 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
451 455
452 456 def barrier(self, pendingResults):
453 457 """Synchronize a set of `PendingResults`.
454 458
455 459 This method is a synchronization primitive that waits for a set of
456 460 `PendingResult` objects to complete. More specifically, barier does
457 461 the following.
458 462
459 463 * The `PendingResult`s are sorted by result_id.
460 464 * The `get_result` method is called for each `PendingResult` sequentially
461 465 with block=True.
462 466 * If a `PendingResult` gets a result that is an exception, it is
463 467 trapped and can be re-raised later by calling `get_result` again.
464 468 * The `PendingResult`s are flushed from the controller.
465 469
466 470 After barrier has been called on a `PendingResult`, its results can
467 471 be retrieved by calling `get_result` again or accesing the `r` attribute
468 472 of the instance.
469 473 """
470 474
471 475 # Convert to list for sorting and check class type
472 476 prList = list(pendingResults)
473 477 for pr in prList:
474 478 if not isinstance(pr, PendingResult):
475 479 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
476 480
477 481 # Sort the PendingResults so they are in order
478 482 prList.sort()
479 483 # Block on each PendingResult object
480 484 for pr in prList:
481 485 try:
482 486 result = pr.get_result(block=True)
483 487 except Exception:
484 488 pass
485 489
486 490 def flush(self):
487 491 """
488 492 Clear all pending deferreds/results from the controller.
489 493
490 494 For each `PendingResult` that is created by this client, the controller
491 495 holds on to the result for that `PendingResult`. This can be a problem
492 496 if there are a large number of `PendingResult` objects that are created.
493 497
494 498 Once the result of the `PendingResult` has been retrieved, the result
495 499 is removed from the controller, but if a user doesn't get a result (
496 500 they just ignore the `PendingResult`) the result is kept forever on the
497 501 controller. This method allows the user to clear out all un-retrieved
498 502 results on the controller.
499 503 """
500 504 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
501 505 return r
502 506
503 507 clear_pending_results = flush
504 508
505 509 #---------------------------------------------------------------------------
506 510 # IEngineMultiplexer related methods
507 511 #---------------------------------------------------------------------------
508 512
509 513 def execute(self, lines, targets=None, block=None):
510 514 """
511 515 Execute code on a set of engines.
512 516
513 517 :Parameters:
514 518 lines : str
515 519 The Python code to execute as a string
516 520 targets : id or list of ids
517 521 The engine to use for the execution
518 522 block : boolean
519 523 If False, this method will return the actual result. If False,
520 524 a `PendingResult` is returned which can be used to get the result
521 525 at a later time.
522 526 """
523 527 targets, block = self._findTargetsAndBlock(targets, block)
524 528 result = blockingCallFromThread(self.smultiengine.execute, lines,
525 529 targets=targets, block=block)
526 530 if block:
527 531 result = ResultList(result)
528 532 else:
529 533 result = PendingResult(self, result)
530 534 result.add_callback(wrapResultList)
531 535 return result
532 536
533 537 def push(self, namespace, targets=None, block=None):
534 538 """
535 539 Push a dictionary of keys and values to engines namespace.
536 540
537 541 Each engine has a persistent namespace. This method is used to push
538 542 Python objects into that namespace.
539 543
540 544 The objects in the namespace must be pickleable.
541 545
542 546 :Parameters:
543 547 namespace : dict
544 548 A dict that contains Python objects to be injected into
545 549 the engine persistent namespace.
546 550 targets : id or list of ids
547 551 The engine to use for the execution
548 552 block : boolean
549 553 If False, this method will return the actual result. If False,
550 554 a `PendingResult` is returned which can be used to get the result
551 555 at a later time.
552 556 """
553 557 targets, block = self._findTargetsAndBlock(targets, block)
554 558 return self._blockFromThread(self.smultiengine.push, namespace,
555 559 targets=targets, block=block)
556 560
557 561 def pull(self, keys, targets=None, block=None):
558 562 """
559 563 Pull Python objects by key out of engines namespaces.
560 564
561 565 :Parameters:
562 566 keys : str or list of str
563 567 The names of the variables to be pulled
564 568 targets : id or list of ids
565 569 The engine to use for the execution
566 570 block : boolean
567 571 If False, this method will return the actual result. If False,
568 572 a `PendingResult` is returned which can be used to get the result
569 573 at a later time.
570 574 """
571 575 targets, block = self._findTargetsAndBlock(targets, block)
572 576 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
573 577
574 578 def push_function(self, namespace, targets=None, block=None):
575 579 """
576 580 Push a Python function to an engine.
577 581
578 582 This method is used to push a Python function to an engine. This
579 583 method can then be used in code on the engines. Closures are not supported.
580 584
581 585 :Parameters:
582 586 namespace : dict
583 587 A dict whose values are the functions to be pushed. The keys give
584 588 that names that the function will appear as in the engines
585 589 namespace.
586 590 targets : id or list of ids
587 591 The engine to use for the execution
588 592 block : boolean
589 593 If False, this method will return the actual result. If False,
590 594 a `PendingResult` is returned which can be used to get the result
591 595 at a later time.
592 596 """
593 597 targets, block = self._findTargetsAndBlock(targets, block)
594 598 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
595 599
596 600 def pull_function(self, keys, targets=None, block=None):
597 601 """
598 602 Pull a Python function from an engine.
599 603
600 604 This method is used to pull a Python function from an engine.
601 605 Closures are not supported.
602 606
603 607 :Parameters:
604 608 keys : str or list of str
605 609 The names of the functions to be pulled
606 610 targets : id or list of ids
607 611 The engine to use for the execution
608 612 block : boolean
609 613 If False, this method will return the actual result. If False,
610 614 a `PendingResult` is returned which can be used to get the result
611 615 at a later time.
612 616 """
613 617 targets, block = self._findTargetsAndBlock(targets, block)
614 618 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
615 619
616 620 def push_serialized(self, namespace, targets=None, block=None):
617 621 targets, block = self._findTargetsAndBlock(targets, block)
618 622 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
619 623
620 624 def pull_serialized(self, keys, targets=None, block=None):
621 625 targets, block = self._findTargetsAndBlock(targets, block)
622 626 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
623 627
624 628 def get_result(self, i=None, targets=None, block=None):
625 629 """
626 630 Get a previous result.
627 631
628 632 When code is executed in an engine, a dict is created and returned. This
629 633 method retrieves that dict for previous commands.
630 634
631 635 :Parameters:
632 636 i : int
633 637 The number of the result to get
634 638 targets : id or list of ids
635 639 The engine to use for the execution
636 640 block : boolean
637 641 If False, this method will return the actual result. If False,
638 642 a `PendingResult` is returned which can be used to get the result
639 643 at a later time.
640 644 """
641 645 targets, block = self._findTargetsAndBlock(targets, block)
642 646 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
643 647 if block:
644 648 result = ResultList(result)
645 649 else:
646 650 result = PendingResult(self, result)
647 651 result.add_callback(wrapResultList)
648 652 return result
649 653
650 654 def reset(self, targets=None, block=None):
651 655 """
652 656 Reset an engine.
653 657
654 658 This method clears out the namespace of an engine.
655 659
656 660 :Parameters:
657 661 targets : id or list of ids
658 662 The engine to use for the execution
659 663 block : boolean
660 664 If False, this method will return the actual result. If False,
661 665 a `PendingResult` is returned which can be used to get the result
662 666 at a later time.
663 667 """
664 668 targets, block = self._findTargetsAndBlock(targets, block)
665 669 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
666 670
667 671 def keys(self, targets=None, block=None):
668 672 """
669 673 Get a list of all the variables in an engine's namespace.
670 674
671 675 :Parameters:
672 676 targets : id or list of ids
673 677 The engine to use for the execution
674 678 block : boolean
675 679 If False, this method will return the actual result. If False,
676 680 a `PendingResult` is returned which can be used to get the result
677 681 at a later time.
678 682 """
679 683 targets, block = self._findTargetsAndBlock(targets, block)
680 684 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
681 685
682 686 def kill(self, controller=False, targets=None, block=None):
683 687 """
684 688 Kill the engines and controller.
685 689
686 690 This method is used to stop the engine and controller by calling
687 691 `reactor.stop`.
688 692
689 693 :Parameters:
690 694 controller : boolean
691 695 If True, kill the engines and controller. If False, just the
692 696 engines
693 697 targets : id or list of ids
694 698 The engine to use for the execution
695 699 block : boolean
696 700 If False, this method will return the actual result. If False,
697 701 a `PendingResult` is returned which can be used to get the result
698 702 at a later time.
699 703 """
700 704 targets, block = self._findTargetsAndBlock(targets, block)
701 705 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
702 706
703 707 def clear_queue(self, targets=None, block=None):
704 708 """
705 709 Clear out the controller's queue for an engine.
706 710
707 711 The controller maintains a queue for each engine. This clear it out.
708 712
709 713 :Parameters:
710 714 targets : id or list of ids
711 715 The engine to use for the execution
712 716 block : boolean
713 717 If False, this method will return the actual result. If False,
714 718 a `PendingResult` is returned which can be used to get the result
715 719 at a later time.
716 720 """
717 721 targets, block = self._findTargetsAndBlock(targets, block)
718 722 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
719 723
720 724 def queue_status(self, targets=None, block=None):
721 725 """
722 726 Get the status of an engines queue.
723 727
724 728 :Parameters:
725 729 targets : id or list of ids
726 730 The engine to use for the execution
727 731 block : boolean
728 732 If False, this method will return the actual result. If False,
729 733 a `PendingResult` is returned which can be used to get the result
730 734 at a later time.
731 735 """
732 736 targets, block = self._findTargetsAndBlock(targets, block)
733 737 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
734 738
735 739 def set_properties(self, properties, targets=None, block=None):
736 740 warnings.warn(_prop_warn)
737 741 targets, block = self._findTargetsAndBlock(targets, block)
738 742 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
739 743
740 744 def get_properties(self, keys=None, targets=None, block=None):
741 745 warnings.warn(_prop_warn)
742 746 targets, block = self._findTargetsAndBlock(targets, block)
743 747 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
744 748
745 749 def has_properties(self, keys, targets=None, block=None):
746 750 warnings.warn(_prop_warn)
747 751 targets, block = self._findTargetsAndBlock(targets, block)
748 752 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
749 753
750 754 def del_properties(self, keys, targets=None, block=None):
751 755 warnings.warn(_prop_warn)
752 756 targets, block = self._findTargetsAndBlock(targets, block)
753 757 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
754 758
755 759 def clear_properties(self, targets=None, block=None):
756 760 warnings.warn(_prop_warn)
757 761 targets, block = self._findTargetsAndBlock(targets, block)
758 762 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
759 763
760 764 #---------------------------------------------------------------------------
761 765 # IMultiEngine related methods
762 766 #---------------------------------------------------------------------------
763 767
764 768 def get_ids(self):
765 769 """
766 770 Returns the ids of currently registered engines.
767 771 """
768 772 result = blockingCallFromThread(self.smultiengine.get_ids)
769 773 return result
770 774
771 775 #---------------------------------------------------------------------------
772 776 # IMultiEngineCoordinator
773 777 #---------------------------------------------------------------------------
774 778
775 779 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
776 780 """
777 781 Partition a Python sequence and send the partitions to a set of engines.
778 782 """
779 783 targets, block = self._findTargetsAndBlock(targets, block)
780 784 return self._blockFromThread(self.smultiengine.scatter, key, seq,
781 785 dist, flatten, targets=targets, block=block)
782 786
783 787 def gather(self, key, dist='b', targets=None, block=None):
784 788 """
785 789 Gather a partitioned sequence on a set of engines as a single local seq.
786 790 """
787 791 targets, block = self._findTargetsAndBlock(targets, block)
788 792 return self._blockFromThread(self.smultiengine.gather, key, dist,
789 793 targets=targets, block=block)
790 794
791 795 def raw_map(self, func, seq, dist='b', targets=None, block=None):
792 796 """
793 797 A parallelized version of Python's builtin map.
794 798
795 799 This has a slightly different syntax than the builtin `map`.
796 800 This is needed because we need to have keyword arguments and thus
797 801 can't use *args to capture all the sequences. Instead, they must
798 802 be passed in a list or tuple.
799 803
800 804 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
801 805
802 806 Most users will want to use parallel functions or the `mapper`
803 807 and `map` methods for an API that follows that of the builtin
804 808 `map`.
805 809 """
806 810 targets, block = self._findTargetsAndBlock(targets, block)
807 811 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
808 812 dist, targets=targets, block=block)
809 813
810 814 def map(self, func, *sequences):
811 815 """
812 816 A parallel version of Python's builtin `map` function.
813 817
814 818 This method applies a function to sequences of arguments. It
815 819 follows the same syntax as the builtin `map`.
816 820
817 821 This method creates a mapper objects by calling `self.mapper` with
818 822 no arguments and then uses that mapper to do the mapping. See
819 823 the documentation of `mapper` for more details.
820 824 """
821 825 return self.mapper().map(func, *sequences)
822 826
823 827 def mapper(self, dist='b', targets='all', block=None):
824 828 """
825 829 Create a mapper object that has a `map` method.
826 830
827 831 This method returns an object that implements the `IMapper`
828 832 interface. This method is a factory that is used to control how
829 833 the map happens.
830 834
831 835 :Parameters:
832 836 dist : str
833 837 What decomposition to use, 'b' is the only one supported
834 838 currently
835 839 targets : str, int, sequence of ints
836 840 Which engines to use for the map
837 841 block : boolean
838 842 Should calls to `map` block or not
839 843 """
840 844 return MultiEngineMapper(self, dist, targets, block)
841 845
842 846 def parallel(self, dist='b', targets=None, block=None):
843 847 """
844 848 A decorator that turns a function into a parallel function.
845 849
846 850 This can be used as:
847 851
848 852 @parallel()
849 853 def f(x, y)
850 854 ...
851 855
852 856 f(range(10), range(10))
853 857
854 858 This causes f(0,0), f(1,1), ... to be called in parallel.
855 859
856 860 :Parameters:
857 861 dist : str
858 862 What decomposition to use, 'b' is the only one supported
859 863 currently
860 864 targets : str, int, sequence of ints
861 865 Which engines to use for the map
862 866 block : boolean
863 867 Should calls to `map` block or not
864 868 """
865 869 targets, block = self._findTargetsAndBlock(targets, block)
866 870 mapper = self.mapper(dist, targets, block)
867 871 pf = ParallelFunction(mapper)
868 872 return pf
869 873
870 874 #---------------------------------------------------------------------------
871 875 # IMultiEngineExtras
872 876 #---------------------------------------------------------------------------
873 877
874 878 def zip_pull(self, keys, targets=None, block=None):
875 879 targets, block = self._findTargetsAndBlock(targets, block)
876 880 return self._blockFromThread(self.smultiengine.zip_pull, keys,
877 881 targets=targets, block=block)
878 882
879 883 def run(self, filename, targets=None, block=None):
880 884 """
881 885 Run a Python code in a file on the engines.
882 886
883 887 :Parameters:
884 888 filename : str
885 889 The name of the local file to run
886 890 targets : id or list of ids
887 891 The engine to use for the execution
888 892 block : boolean
889 893 If False, this method will return the actual result. If False,
890 894 a `PendingResult` is returned which can be used to get the result
891 895 at a later time.
892 896 """
893 897 targets, block = self._findTargetsAndBlock(targets, block)
894 898 return self._blockFromThread(self.smultiengine.run, filename,
895 899 targets=targets, block=block)
896 900
897 901 def benchmark(self, push_size=10000):
898 902 """
899 903 Run performance benchmarks for the current IPython cluster.
900 904
901 905 This method tests both the latency of sending command and data to the
902 906 engines as well as the throughput of sending large objects to the
903 907 engines using push. The latency is measured by having one or more
904 908 engines execute the command 'pass'. The throughput is measure by
905 909 sending an NumPy array of size `push_size` to one or more engines.
906 910
907 911 These benchmarks will vary widely on different hardware and networks
908 912 and thus can be used to get an idea of the performance characteristics
909 913 of a particular configuration of an IPython controller and engines.
910 914
911 915 This function is not testable within our current testing framework.
912 916 """
913 917 import timeit, __builtin__
914 918 __builtin__._mec_self = self
915 919 benchmarks = {}
916 920 repeat = 3
917 921 count = 10
918 922
919 923 timer = timeit.Timer('_mec_self.execute("pass",0)')
920 924 result = 1000*min(timer.repeat(repeat,count))/count
921 925 benchmarks['single_engine_latency'] = (result,'msec')
922 926
923 927 timer = timeit.Timer('_mec_self.execute("pass")')
924 928 result = 1000*min(timer.repeat(repeat,count))/count
925 929 benchmarks['all_engine_latency'] = (result,'msec')
926 930
927 931 try:
928 932 import numpy as np
929 933 except:
930 934 pass
931 935 else:
932 936 timer = timeit.Timer(
933 937 "_mec_self.push(d)",
934 938 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
935 939 )
936 940 result = min(timer.repeat(repeat,count))/count
937 941 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
938 942
939 943 try:
940 944 import numpy as np
941 945 except:
942 946 pass
943 947 else:
944 948 timer = timeit.Timer(
945 949 "_mec_self.push(d,0)",
946 950 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
947 951 )
948 952 result = min(timer.repeat(repeat,count))/count
949 953 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
950 954
951 955 return benchmarks
952 956
953 957
954 958 components.registerAdapter(FullBlockingMultiEngineClient,
955 959 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
956 960
957 961
958 962
959 963
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now