##// END OF EJS Templates
Parallel magics (%result, %px, %autopx) are fixed....
Brian Granger -
Show More
@@ -0,0 +1,205 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """Magic command interface for interactive parallel work."""
5
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 import new
18
19 from IPython.core.component import Component
20 from IPython.utils.traitlets import Bool, Any
21 from IPython.utils.autoattr import auto_attr
22
23 #-----------------------------------------------------------------------------
24 # Definitions of magic functions for use with IPython
25 #-----------------------------------------------------------------------------
26
27
28 NO_ACTIVE_MULTIENGINE_CLIENT = """
29 Use activate() on a MultiEngineClient object to activate it for magics.
30 """
31
32
33 class ParalleMagicComponent(Component):
34 """A component to manage the %result, %px and %autopx magics."""
35
36 active_multiengine_client = Any()
37 verbose = Bool(False, config=True)
38
39 def __init__(self, parent, name=None, config=None):
40 super(ParalleMagicComponent, self).__init__(parent, name=name, config=config)
41 self._define_magics()
42 # A flag showing if autopx is activated or not
43 self.autopx = False
44
45 # Access other components like this rather than by a regular attribute.
46 # This won't lookup the InteractiveShell object until it is used and
47 # then it is cached. This is both efficient and couples this class
48 # more loosely to InteractiveShell.
49 @auto_attr
50 def shell(self):
51 return Component.get_instances(
52 root=self.root,
53 klass='IPython.core.iplib.InteractiveShell')[0]
54
55 def _define_magics(self):
56 """Define the magic functions."""
57 self.shell.define_magic('result', self.magic_result)
58 self.shell.define_magic('px', self.magic_px)
59 self.shell.define_magic('autopx', self.magic_autopx)
60
61 def magic_result(self, ipself, parameter_s=''):
62 """Print the result of command i on all engines..
63
64 To use this a :class:`MultiEngineClient` instance must be created
65 and then activated by calling its :meth:`activate` method.
66
67 Then you can do the following::
68
69 In [23]: %result
70 Out[23]:
71 <Results List>
72 [0] In [6]: a = 10
73 [1] In [6]: a = 10
74
75 In [22]: %result 6
76 Out[22]:
77 <Results List>
78 [0] In [6]: a = 10
79 [1] In [6]: a = 10
80 """
81 if self.active_multiengine_client is None:
82 print NO_ACTIVE_MULTIENGINE_CLIENT
83 return
84
85 try:
86 index = int(parameter_s)
87 except:
88 index = None
89 result = self.active_multiengine_client.get_result(index)
90 return result
91
92 def magic_px(self, ipself, parameter_s=''):
93 """Executes the given python command in parallel.
94
95 To use this a :class:`MultiEngineClient` instance must be created
96 and then activated by calling its :meth:`activate` method.
97
98 Then you can do the following::
99
100 In [24]: %px a = 5
101 Parallel execution on engines: all
102 Out[24]:
103 <Results List>
104 [0] In [7]: a = 5
105 [1] In [7]: a = 5
106 """
107
108 if self.active_multiengine_client is None:
109 print NO_ACTIVE_MULTIENGINE_CLIENT
110 return
111 print "Parallel execution on engines: %s" % self.active_multiengine_client.targets
112 result = self.active_multiengine_client.execute(parameter_s)
113 return result
114
115 def magic_autopx(self, ipself, parameter_s=''):
116 """Toggles auto parallel mode.
117
118 To use this a :class:`MultiEngineClient` instance must be created
119 and then activated by calling its :meth:`activate` method. Once this
120 is called, all commands typed at the command line are send to
121 the engines to be executed in parallel. To control which engine
122 are used, set the ``targets`` attributed of the multiengine client
123 before entering ``%autopx`` mode.
124
125 Then you can do the following::
126
127 In [25]: %autopx
128 %autopx to enabled
129
130 In [26]: a = 10
131 <Results List>
132 [0] In [8]: a = 10
133 [1] In [8]: a = 10
134
135
136 In [27]: %autopx
137 %autopx disabled
138 """
139 if self.autopx:
140 self._disable_autopx()
141 else:
142 self._enable_autopx()
143
144 def _enable_autopx(self):
145 """Enable %autopx mode by saving the original runsource and installing
146 pxrunsource.
147 """
148 if self.active_multiengine_client is None:
149 print NO_ACTIVE_MULTIENGINE_CLIENT
150 return
151
152 self._original_runsource = self.shell.runsource
153 self.shell.runsource = new.instancemethod(
154 self.pxrunsource, self.shell, self.shell.__class__
155 )
156 self.autopx = True
157 print "%autopx enabled"
158
159 def _disable_autopx(self):
160 """Disable %autopx by restoring the original InteractiveShell.runsource."""
161 if self.autopx:
162 self.shell.runsource = self._original_runsource
163 self.autopx = False
164 print "%autopx disabled"
165
166 def pxrunsource(self, ipself, source, filename="<input>", symbol="single"):
167 """A parallel replacement for InteractiveShell.runsource."""
168
169 try:
170 code = ipself.compile(source, filename, symbol)
171 except (OverflowError, SyntaxError, ValueError):
172 # Case 1
173 ipself.showsyntaxerror(filename)
174 return None
175
176 if code is None:
177 # Case 2
178 return True
179
180 # Case 3
181 # Because autopx is enabled, we now call executeAll or disable autopx if
182 # %autopx or autopx has been called
183 if 'get_ipython().magic("%autopx' in source or 'get_ipython().magic("autopx' in source:
184 self._disable_autopx()
185 return False
186 else:
187 try:
188 result = self.active_multiengine_client.execute(source)
189 except:
190 ipself.showtraceback()
191 else:
192 print result.__repr__()
193 return False
194
195
196 _loaded = False
197
198
199 def load_ipython_extension(ip):
200 """Load the extension in IPython."""
201 global _loaded
202 if not _loaded:
203 prd = ParalleMagicComponent(ip, name='parallel_magic')
204 _loaded = True
205
@@ -1,17 +1,24 b''
1 c = get_config()
1 c = get_config()
2
2
3 # This can be used at any point in a config file to load a sub config
3 # This can be used at any point in a config file to load a sub config
4 # and merge it into the current one.
4 # and merge it into the current one.
5 load_subconfig('ipython_config.py')
5 load_subconfig('ipython_config.py')
6
6
7 lines = """
7 lines = """
8 from IPython.kernel.client import *
8 from IPython.kernel.client import *
9 """
9 """
10
10
11 # You have to make sure that attributes that are containers already
11 # You have to make sure that attributes that are containers already
12 # exist before using them. Simple assigning a new list will override
12 # exist before using them. Simple assigning a new list will override
13 # all previous values.
13 # all previous values.
14 if hasattr(c.Global, 'exec_lines'):
14 if hasattr(c.Global, 'exec_lines'):
15 c.Global.exec_lines.append(lines)
15 c.Global.exec_lines.append(lines)
16 else:
16 else:
17 c.Global.exec_lines = [lines] No newline at end of file
17 c.Global.exec_lines = [lines]
18
19 # Load the parallelmagic extension to enable %result, %px, %autopx magics.
20 if hasattr(c.Global, 'extensions'):
21 c.Global.extensions.append('parallelmagic')
22 else:
23 c.Global.extensions = ['parallelmagic']
24
@@ -1,117 +1,118 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A context manager for managing things injected into :mod:`__builtin__`.
4 A context manager for managing things injected into :mod:`__builtin__`.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2009 The IPython Development Team
12 # Copyright (C) 2008-2009 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import __builtin__
22 import __builtin__
23
23
24 from IPython.core.component import Component
24 from IPython.core.component import Component
25 from IPython.core.quitter import Quitter
25 from IPython.core.quitter import Quitter
26
26
27 from IPython.utils.autoattr import auto_attr
27 from IPython.utils.autoattr import auto_attr
28
28
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Classes and functions
30 # Classes and functions
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32
32
33
33
34 class BuiltinUndefined(object): pass
34 class BuiltinUndefined(object): pass
35 BuiltinUndefined = BuiltinUndefined()
35 BuiltinUndefined = BuiltinUndefined()
36
36
37
37
38 class BuiltinTrap(Component):
38 class BuiltinTrap(Component):
39
39
40 def __init__(self, parent):
40 def __init__(self, parent):
41 super(BuiltinTrap, self).__init__(parent, None, None)
41 super(BuiltinTrap, self).__init__(parent, None, None)
42 self._orig_builtins = {}
42 self._orig_builtins = {}
43 # We define this to track if a single BuiltinTrap is nested.
43 # We define this to track if a single BuiltinTrap is nested.
44 # Only turn off the trap when the outermost call to __exit__ is made.
44 # Only turn off the trap when the outermost call to __exit__ is made.
45 self._nested_level = 0
45 self._nested_level = 0
46
46
47 @auto_attr
47 @auto_attr
48 def shell(self):
48 def shell(self):
49 return Component.get_instances(
49 return Component.get_instances(
50 root=self.root,
50 root=self.root,
51 klass='IPython.core.iplib.InteractiveShell')[0]
51 klass='IPython.core.iplib.InteractiveShell')[0]
52
52
53 def __enter__(self):
53 def __enter__(self):
54 if self._nested_level == 0:
54 if self._nested_level == 0:
55 self.set()
55 self.set()
56 self._nested_level += 1
56 self._nested_level += 1
57 # I return self, so callers can use add_builtin in a with clause.
57 # I return self, so callers can use add_builtin in a with clause.
58 return self
58 return self
59
59
60 def __exit__(self, type, value, traceback):
60 def __exit__(self, type, value, traceback):
61 if self._nested_level == 1:
61 if self._nested_level == 1:
62 self.unset()
62 self.unset()
63 self._nested_level -= 1
63 self._nested_level -= 1
64 # Returning False will cause exceptions to propagate
64 # Returning False will cause exceptions to propagate
65 return False
65 return False
66
66
67 def add_builtin(self, key, value):
67 def add_builtin(self, key, value):
68 """Add a builtin and save the original."""
68 """Add a builtin and save the original."""
69 orig = __builtin__.__dict__.get(key, BuiltinUndefined)
69 orig = __builtin__.__dict__.get(key, BuiltinUndefined)
70 self._orig_builtins[key] = orig
70 self._orig_builtins[key] = orig
71 __builtin__.__dict__[key] = value
71 __builtin__.__dict__[key] = value
72
72
73 def remove_builtin(self, key):
73 def remove_builtin(self, key):
74 """Remove an added builtin and re-set the original."""
74 """Remove an added builtin and re-set the original."""
75 try:
75 try:
76 orig = self._orig_builtins.pop(key)
76 orig = self._orig_builtins.pop(key)
77 except KeyError:
77 except KeyError:
78 pass
78 pass
79 else:
79 else:
80 if orig is BuiltinUndefined:
80 if orig is BuiltinUndefined:
81 del __builtin__.__dict__[key]
81 del __builtin__.__dict__[key]
82 else:
82 else:
83 __builtin__.__dict__[key] = orig
83 __builtin__.__dict__[key] = orig
84
84
85 def set(self):
85 def set(self):
86 """Store ipython references in the __builtin__ namespace."""
86 """Store ipython references in the __builtin__ namespace."""
87 self.add_builtin('exit', Quitter(self.shell, 'exit'))
87 self.add_builtin('exit', Quitter(self.shell, 'exit'))
88 self.add_builtin('quit', Quitter(self.shell, 'quit'))
88 self.add_builtin('quit', Quitter(self.shell, 'quit'))
89 self.add_builtin('get_ipython', self.shell.get_ipython)
89
90
90 # Recursive reload function
91 # Recursive reload function
91 try:
92 try:
92 from IPython.lib import deepreload
93 from IPython.lib import deepreload
93 if self.shell.deep_reload:
94 if self.shell.deep_reload:
94 self.add_builtin('reload', deepreload.reload)
95 self.add_builtin('reload', deepreload.reload)
95 else:
96 else:
96 self.add_builtin('dreload', deepreload.reload)
97 self.add_builtin('dreload', deepreload.reload)
97 del deepreload
98 del deepreload
98 except ImportError:
99 except ImportError:
99 pass
100 pass
100
101
101 # Keep in the builtins a flag for when IPython is active. We set it
102 # Keep in the builtins a flag for when IPython is active. We set it
102 # with setdefault so that multiple nested IPythons don't clobber one
103 # with setdefault so that multiple nested IPythons don't clobber one
103 # another. Each will increase its value by one upon being activated,
104 # another. Each will increase its value by one upon being activated,
104 # which also gives us a way to determine the nesting level.
105 # which also gives us a way to determine the nesting level.
105 __builtin__.__dict__.setdefault('__IPYTHON__active',0)
106 __builtin__.__dict__.setdefault('__IPYTHON__active',0)
106
107
107 def unset(self):
108 def unset(self):
108 """Remove any builtins which might have been added by add_builtins, or
109 """Remove any builtins which might have been added by add_builtins, or
109 restore overwritten ones to their previous values."""
110 restore overwritten ones to their previous values."""
110 for key in self._orig_builtins.keys():
111 for key in self._orig_builtins.keys():
111 self.remove_builtin(key)
112 self.remove_builtin(key)
112 self._orig_builtins.clear()
113 self._orig_builtins.clear()
113 self._builtins_added = False
114 self._builtins_added = False
114 try:
115 try:
115 del __builtin__.__dict__['__IPYTHON__active']
116 del __builtin__.__dict__['__IPYTHON__active']
116 except KeyError:
117 except KeyError:
117 pass
118 pass
@@ -1,959 +1,963 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3
3
4 """General Classes for IMultiEngine clients."""
4 """General Classes for IMultiEngine clients."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import linecache
20 import linecache
21 import warnings
21 import warnings
22
22
23 from twisted.python import components
23 from twisted.python import components
24 from twisted.python.failure import Failure
24 from twisted.python.failure import Failure
25 from zope.interface import Interface, implements, Attribute
25 from zope.interface import Interface, implements, Attribute
26
26
27 from IPython.utils.coloransi import TermColors
27 from IPython.utils.coloransi import TermColors
28
28
29 from IPython.kernel.twistedutil import blockingCallFromThread
29 from IPython.kernel.twistedutil import blockingCallFromThread
30 from IPython.kernel import error
30 from IPython.kernel import error
31 from IPython.kernel.parallelfunction import ParallelFunction
31 from IPython.kernel.parallelfunction import ParallelFunction
32 from IPython.kernel.mapper import (
32 from IPython.kernel.mapper import (
33 MultiEngineMapper,
33 MultiEngineMapper,
34 IMultiEngineMapperFactory,
34 IMultiEngineMapperFactory,
35 IMapper
35 IMapper
36 )
36 )
37
37
38 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
38 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
39
39
40
40
41 #-------------------------------------------------------------------------------
41 #-------------------------------------------------------------------------------
42 # Pending Result things
42 # Pending Result things
43 #-------------------------------------------------------------------------------
43 #-------------------------------------------------------------------------------
44
44
45 class IPendingResult(Interface):
45 class IPendingResult(Interface):
46 """A representation of a result that is pending.
46 """A representation of a result that is pending.
47
47
48 This class is similar to Twisted's `Deferred` object, but is designed to be
48 This class is similar to Twisted's `Deferred` object, but is designed to be
49 used in a synchronous context.
49 used in a synchronous context.
50 """
50 """
51
51
52 result_id=Attribute("ID of the deferred on the other side")
52 result_id=Attribute("ID of the deferred on the other side")
53 client=Attribute("A client that I came from")
53 client=Attribute("A client that I came from")
54 r=Attribute("An attribute that is a property that calls and returns get_result")
54 r=Attribute("An attribute that is a property that calls and returns get_result")
55
55
56 def get_result(default=None, block=True):
56 def get_result(default=None, block=True):
57 """
57 """
58 Get a result that is pending.
58 Get a result that is pending.
59
59
60 :Parameters:
60 :Parameters:
61 default
61 default
62 The value to return if the result is not ready.
62 The value to return if the result is not ready.
63 block : boolean
63 block : boolean
64 Should I block for the result.
64 Should I block for the result.
65
65
66 :Returns: The actual result or the default value.
66 :Returns: The actual result or the default value.
67 """
67 """
68
68
69 def add_callback(f, *args, **kwargs):
69 def add_callback(f, *args, **kwargs):
70 """
70 """
71 Add a callback that is called with the result.
71 Add a callback that is called with the result.
72
72
73 If the original result is foo, adding a callback will cause
73 If the original result is foo, adding a callback will cause
74 f(foo, *args, **kwargs) to be returned instead. If multiple
74 f(foo, *args, **kwargs) to be returned instead. If multiple
75 callbacks are registered, they are chained together: the result of
75 callbacks are registered, they are chained together: the result of
76 one is passed to the next and so on.
76 one is passed to the next and so on.
77
77
78 Unlike Twisted's Deferred object, there is no errback chain. Thus
78 Unlike Twisted's Deferred object, there is no errback chain. Thus
79 any exception raised will not be caught and handled. User must
79 any exception raised will not be caught and handled. User must
80 catch these by hand when calling `get_result`.
80 catch these by hand when calling `get_result`.
81 """
81 """
82
82
83
83
84 class PendingResult(object):
84 class PendingResult(object):
85 """A representation of a result that is not yet ready.
85 """A representation of a result that is not yet ready.
86
86
87 A user should not create a `PendingResult` instance by hand.
87 A user should not create a `PendingResult` instance by hand.
88
88
89 Methods:
89 Methods:
90
90
91 * `get_result`
91 * `get_result`
92 * `add_callback`
92 * `add_callback`
93
93
94 Properties:
94 Properties:
95
95
96 * `r`
96 * `r`
97 """
97 """
98
98
99 def __init__(self, client, result_id):
99 def __init__(self, client, result_id):
100 """Create a PendingResult with a result_id and a client instance.
100 """Create a PendingResult with a result_id and a client instance.
101
101
102 The client should implement `_getPendingResult(result_id, block)`.
102 The client should implement `_getPendingResult(result_id, block)`.
103 """
103 """
104 self.client = client
104 self.client = client
105 self.result_id = result_id
105 self.result_id = result_id
106 self.called = False
106 self.called = False
107 self.raised = False
107 self.raised = False
108 self.callbacks = []
108 self.callbacks = []
109
109
110 def get_result(self, default=None, block=True):
110 def get_result(self, default=None, block=True):
111 """Get a result that is pending.
111 """Get a result that is pending.
112
112
113 This method will connect to an IMultiEngine adapted controller
113 This method will connect to an IMultiEngine adapted controller
114 and see if the result is ready. If the action triggers an exception
114 and see if the result is ready. If the action triggers an exception
115 raise it and record it. This method records the result/exception once it is
115 raise it and record it. This method records the result/exception once it is
116 retrieved. Calling `get_result` again will get this cached result or will
116 retrieved. Calling `get_result` again will get this cached result or will
117 re-raise the exception. The .r attribute is a property that calls
117 re-raise the exception. The .r attribute is a property that calls
118 `get_result` with block=True.
118 `get_result` with block=True.
119
119
120 :Parameters:
120 :Parameters:
121 default
121 default
122 The value to return if the result is not ready.
122 The value to return if the result is not ready.
123 block : boolean
123 block : boolean
124 Should I block for the result.
124 Should I block for the result.
125
125
126 :Returns: The actual result or the default value.
126 :Returns: The actual result or the default value.
127 """
127 """
128
128
129 if self.called:
129 if self.called:
130 if self.raised:
130 if self.raised:
131 raise self.result[0], self.result[1], self.result[2]
131 raise self.result[0], self.result[1], self.result[2]
132 else:
132 else:
133 return self.result
133 return self.result
134 try:
134 try:
135 result = self.client.get_pending_deferred(self.result_id, block)
135 result = self.client.get_pending_deferred(self.result_id, block)
136 except error.ResultNotCompleted:
136 except error.ResultNotCompleted:
137 return default
137 return default
138 except:
138 except:
139 # Reraise other error, but first record them so they can be reraised
139 # Reraise other error, but first record them so they can be reraised
140 # later if .r or get_result is called again.
140 # later if .r or get_result is called again.
141 self.result = sys.exc_info()
141 self.result = sys.exc_info()
142 self.called = True
142 self.called = True
143 self.raised = True
143 self.raised = True
144 raise
144 raise
145 else:
145 else:
146 for cb in self.callbacks:
146 for cb in self.callbacks:
147 result = cb[0](result, *cb[1], **cb[2])
147 result = cb[0](result, *cb[1], **cb[2])
148 self.result = result
148 self.result = result
149 self.called = True
149 self.called = True
150 return result
150 return result
151
151
152 def add_callback(self, f, *args, **kwargs):
152 def add_callback(self, f, *args, **kwargs):
153 """Add a callback that is called with the result.
153 """Add a callback that is called with the result.
154
154
155 If the original result is result, adding a callback will cause
155 If the original result is result, adding a callback will cause
156 f(result, *args, **kwargs) to be returned instead. If multiple
156 f(result, *args, **kwargs) to be returned instead. If multiple
157 callbacks are registered, they are chained together: the result of
157 callbacks are registered, they are chained together: the result of
158 one is passed to the next and so on.
158 one is passed to the next and so on.
159
159
160 Unlike Twisted's Deferred object, there is no errback chain. Thus
160 Unlike Twisted's Deferred object, there is no errback chain. Thus
161 any exception raised will not be caught and handled. User must
161 any exception raised will not be caught and handled. User must
162 catch these by hand when calling `get_result`.
162 catch these by hand when calling `get_result`.
163 """
163 """
164 assert callable(f)
164 assert callable(f)
165 self.callbacks.append((f, args, kwargs))
165 self.callbacks.append((f, args, kwargs))
166
166
167 def __cmp__(self, other):
167 def __cmp__(self, other):
168 if self.result_id < other.result_id:
168 if self.result_id < other.result_id:
169 return -1
169 return -1
170 else:
170 else:
171 return 1
171 return 1
172
172
173 def _get_r(self):
173 def _get_r(self):
174 return self.get_result(block=True)
174 return self.get_result(block=True)
175
175
176 r = property(_get_r)
176 r = property(_get_r)
177 """This property is a shortcut to a `get_result(block=True)`."""
177 """This property is a shortcut to a `get_result(block=True)`."""
178
178
179
179
180 #-------------------------------------------------------------------------------
180 #-------------------------------------------------------------------------------
181 # Pretty printing wrappers for certain lists
181 # Pretty printing wrappers for certain lists
182 #-------------------------------------------------------------------------------
182 #-------------------------------------------------------------------------------
183
183
184 class ResultList(list):
184 class ResultList(list):
185 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
185 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
186
186
187 def __repr__(self):
187 def __repr__(self):
188 output = []
188 output = []
189 # These colored prompts were not working on Windows
189 # These colored prompts were not working on Windows
190 if sys.platform == 'win32':
190 if sys.platform == 'win32':
191 blue = normal = red = green = ''
191 blue = normal = red = green = ''
192 else:
192 else:
193 blue = TermColors.Blue
193 blue = TermColors.Blue
194 normal = TermColors.Normal
194 normal = TermColors.Normal
195 red = TermColors.Red
195 red = TermColors.Red
196 green = TermColors.Green
196 green = TermColors.Green
197 output.append("<Results List>\n")
197 output.append("<Results List>\n")
198 for cmd in self:
198 for cmd in self:
199 if isinstance(cmd, Failure):
199 if isinstance(cmd, Failure):
200 output.append(cmd)
200 output.append(cmd)
201 else:
201 else:
202 target = cmd.get('id',None)
202 target = cmd.get('id',None)
203 cmd_num = cmd.get('number',None)
203 cmd_num = cmd.get('number',None)
204 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
204 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
205 cmd_stdout = cmd.get('stdout', None)
205 cmd_stdout = cmd.get('stdout', None)
206 cmd_stderr = cmd.get('stderr', None)
206 cmd_stderr = cmd.get('stderr', None)
207 output.append("%s[%i]%s In [%i]:%s %s\n" % \
207 output.append("%s[%i]%s In [%i]:%s %s\n" % \
208 (green, target,
208 (green, target,
209 blue, cmd_num, normal, cmd_stdin))
209 blue, cmd_num, normal, cmd_stdin))
210 if cmd_stdout:
210 if cmd_stdout:
211 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
211 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
212 (green, target,
212 (green, target,
213 red, cmd_num, normal, cmd_stdout))
213 red, cmd_num, normal, cmd_stdout))
214 if cmd_stderr:
214 if cmd_stderr:
215 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
215 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
216 (green, target,
216 (green, target,
217 red, cmd_num, normal, cmd_stderr))
217 red, cmd_num, normal, cmd_stderr))
218 return ''.join(output)
218 return ''.join(output)
219
219
220
220
221 def wrapResultList(result):
221 def wrapResultList(result):
222 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
222 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
223 if len(result) == 0:
223 if len(result) == 0:
224 result = [result]
224 result = [result]
225 return ResultList(result)
225 return ResultList(result)
226
226
227
227
228 class QueueStatusList(list):
228 class QueueStatusList(list):
229 """A subclass of list that pretty prints the output of `queue_status`."""
229 """A subclass of list that pretty prints the output of `queue_status`."""
230
230
231 def __repr__(self):
231 def __repr__(self):
232 output = []
232 output = []
233 output.append("<Queue Status List>\n")
233 output.append("<Queue Status List>\n")
234 for e in self:
234 for e in self:
235 output.append("Engine: %s\n" % repr(e[0]))
235 output.append("Engine: %s\n" % repr(e[0]))
236 output.append(" Pending: %s\n" % repr(e[1]['pending']))
236 output.append(" Pending: %s\n" % repr(e[1]['pending']))
237 for q in e[1]['queue']:
237 for q in e[1]['queue']:
238 output.append(" Command: %s\n" % repr(q))
238 output.append(" Command: %s\n" % repr(q))
239 return ''.join(output)
239 return ''.join(output)
240
240
241
241
242 #-------------------------------------------------------------------------------
242 #-------------------------------------------------------------------------------
243 # InteractiveMultiEngineClient
243 # InteractiveMultiEngineClient
244 #-------------------------------------------------------------------------------
244 #-------------------------------------------------------------------------------
245
245
246 class InteractiveMultiEngineClient(object):
246 class InteractiveMultiEngineClient(object):
247 """A mixin class that add a few methods to a multiengine client.
247 """A mixin class that add a few methods to a multiengine client.
248
248
249 The methods in this mixin class are designed for interactive usage.
249 The methods in this mixin class are designed for interactive usage.
250 """
250 """
251
251
252 def activate(self):
252 def activate(self):
253 """Make this `MultiEngineClient` active for parallel magic commands.
253 """Make this `MultiEngineClient` active for parallel magic commands.
254
254
255 IPython has a magic command syntax to work with `MultiEngineClient` objects.
255 IPython has a magic command syntax to work with `MultiEngineClient` objects.
256 In a given IPython session there is a single active one. While
256 In a given IPython session there is a single active one. While
257 there can be many `MultiEngineClient` created and used by the user,
257 there can be many `MultiEngineClient` created and used by the user,
258 there is only one active one. The active `MultiEngineClient` is used whenever
258 there is only one active one. The active `MultiEngineClient` is used whenever
259 the magic commands %px and %autopx are used.
259 the magic commands %px and %autopx are used.
260
260
261 The activate() method is called on a given `MultiEngineClient` to make it
261 The activate() method is called on a given `MultiEngineClient` to make it
262 active. Once this has been done, the magic commands can be used.
262 active. Once this has been done, the magic commands can be used.
263 """
263 """
264
264
265 try:
265 try:
266 __IPYTHON__.activeController = self
266 # This is injected into __builtins__.
267 ip = get_ipython()
267 except NameError:
268 except NameError:
268 print "The IPython Controller magics only work within IPython."
269 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
270 else:
271 pmagic = ip.get_component('parallel_magic')
272 pmagic.active_multiengine_client = self
269
273
270 def __setitem__(self, key, value):
274 def __setitem__(self, key, value):
271 """Add a dictionary interface for pushing/pulling.
275 """Add a dictionary interface for pushing/pulling.
272
276
273 This functions as a shorthand for `push`.
277 This functions as a shorthand for `push`.
274
278
275 :Parameters:
279 :Parameters:
276 key : str
280 key : str
277 What to call the remote object.
281 What to call the remote object.
278 value : object
282 value : object
279 The local Python object to push.
283 The local Python object to push.
280 """
284 """
281 targets, block = self._findTargetsAndBlock()
285 targets, block = self._findTargetsAndBlock()
282 return self.push({key:value}, targets=targets, block=block)
286 return self.push({key:value}, targets=targets, block=block)
283
287
284 def __getitem__(self, key):
288 def __getitem__(self, key):
285 """Add a dictionary interface for pushing/pulling.
289 """Add a dictionary interface for pushing/pulling.
286
290
287 This functions as a shorthand to `pull`.
291 This functions as a shorthand to `pull`.
288
292
289 :Parameters:
293 :Parameters:
290 - `key`: A string representing the key.
294 - `key`: A string representing the key.
291 """
295 """
292 if isinstance(key, str):
296 if isinstance(key, str):
293 targets, block = self._findTargetsAndBlock()
297 targets, block = self._findTargetsAndBlock()
294 return self.pull(key, targets=targets, block=block)
298 return self.pull(key, targets=targets, block=block)
295 else:
299 else:
296 raise TypeError("__getitem__ only takes strs")
300 raise TypeError("__getitem__ only takes strs")
297
301
298 def __len__(self):
302 def __len__(self):
299 """Return the number of available engines."""
303 """Return the number of available engines."""
300 return len(self.get_ids())
304 return len(self.get_ids())
301
305
302 #---------------------------------------------------------------------------
306 #---------------------------------------------------------------------------
303 # Make this a context manager for with
307 # Make this a context manager for with
304 #---------------------------------------------------------------------------
308 #---------------------------------------------------------------------------
305
309
306 def findsource_file(self,f):
310 def findsource_file(self,f):
307 linecache.checkcache()
311 linecache.checkcache()
308 s = findsource(f.f_code) # findsource is not defined!
312 s = findsource(f.f_code) # findsource is not defined!
309 lnum = f.f_lineno
313 lnum = f.f_lineno
310 wsource = s[0][f.f_lineno:]
314 wsource = s[0][f.f_lineno:]
311 return strip_whitespace(wsource)
315 return strip_whitespace(wsource)
312
316
313 def findsource_ipython(self,f):
317 def findsource_ipython(self,f):
314 from IPython.core import ipapi
318 from IPython.core import ipapi
315 self.ip = ipapi.get()
319 self.ip = ipapi.get()
316 wsource = [l+'\n' for l in
320 wsource = [l+'\n' for l in
317 self.ip.input_hist_raw[-1].splitlines()[1:]]
321 self.ip.input_hist_raw[-1].splitlines()[1:]]
318 return strip_whitespace(wsource)
322 return strip_whitespace(wsource)
319
323
320 def __enter__(self):
324 def __enter__(self):
321 f = sys._getframe(1)
325 f = sys._getframe(1)
322 local_ns = f.f_locals
326 local_ns = f.f_locals
323 global_ns = f.f_globals
327 global_ns = f.f_globals
324 if f.f_code.co_filename == '<ipython console>':
328 if f.f_code.co_filename == '<ipython console>':
325 s = self.findsource_ipython(f)
329 s = self.findsource_ipython(f)
326 else:
330 else:
327 s = self.findsource_file(f)
331 s = self.findsource_file(f)
328
332
329 self._with_context_result = self.execute(s)
333 self._with_context_result = self.execute(s)
330
334
331 def __exit__ (self, etype, value, tb):
335 def __exit__ (self, etype, value, tb):
332 if issubclass(etype,error.StopLocalExecution):
336 if issubclass(etype,error.StopLocalExecution):
333 return True
337 return True
334
338
335
339
336 def remote():
340 def remote():
337 m = 'Special exception to stop local execution of parallel code.'
341 m = 'Special exception to stop local execution of parallel code.'
338 raise error.StopLocalExecution(m)
342 raise error.StopLocalExecution(m)
339
343
340 def strip_whitespace(source):
344 def strip_whitespace(source):
341 # Expand tabs to avoid any confusion.
345 # Expand tabs to avoid any confusion.
342 wsource = [l.expandtabs(4) for l in source]
346 wsource = [l.expandtabs(4) for l in source]
343 # Detect the indentation level
347 # Detect the indentation level
344 done = False
348 done = False
345 for line in wsource:
349 for line in wsource:
346 if line.isspace():
350 if line.isspace():
347 continue
351 continue
348 for col,char in enumerate(line):
352 for col,char in enumerate(line):
349 if char != ' ':
353 if char != ' ':
350 done = True
354 done = True
351 break
355 break
352 if done:
356 if done:
353 break
357 break
354 # Now we know how much leading space there is in the code. Next, we
358 # Now we know how much leading space there is in the code. Next, we
355 # extract up to the first line that has less indentation.
359 # extract up to the first line that has less indentation.
356 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
360 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
357 # detect triple quoted strings that may have flush left text.
361 # detect triple quoted strings that may have flush left text.
358 for lno,line in enumerate(wsource):
362 for lno,line in enumerate(wsource):
359 lead = line[:col]
363 lead = line[:col]
360 if lead.isspace():
364 if lead.isspace():
361 continue
365 continue
362 else:
366 else:
363 if not lead.lstrip().startswith('#'):
367 if not lead.lstrip().startswith('#'):
364 break
368 break
365 # The real 'with' source is up to lno
369 # The real 'with' source is up to lno
366 src_lines = [l[col:] for l in wsource[:lno+1]]
370 src_lines = [l[col:] for l in wsource[:lno+1]]
367
371
368 # Finally, check that the source's first non-comment line begins with the
372 # Finally, check that the source's first non-comment line begins with the
369 # special call 'remote()'
373 # special call 'remote()'
370 for nline,line in enumerate(src_lines):
374 for nline,line in enumerate(src_lines):
371 if line.isspace() or line.startswith('#'):
375 if line.isspace() or line.startswith('#'):
372 continue
376 continue
373 if 'remote()' in line:
377 if 'remote()' in line:
374 break
378 break
375 else:
379 else:
376 raise ValueError('remote() call missing at the start of code')
380 raise ValueError('remote() call missing at the start of code')
377 src = ''.join(src_lines[nline+1:])
381 src = ''.join(src_lines[nline+1:])
378 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
382 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
379 return src
383 return src
380
384
381
385
382 #-------------------------------------------------------------------------------
386 #-------------------------------------------------------------------------------
383 # The top-level MultiEngine client adaptor
387 # The top-level MultiEngine client adaptor
384 #-------------------------------------------------------------------------------
388 #-------------------------------------------------------------------------------
385
389
386
390
387 _prop_warn = """\
391 _prop_warn = """\
388
392
389 We are currently refactoring the task dependency system. This might
393 We are currently refactoring the task dependency system. This might
390 involve the removal of this method and other methods related to engine
394 involve the removal of this method and other methods related to engine
391 properties. Please see the docstrings for IPython.kernel.TaskRejectError
395 properties. Please see the docstrings for IPython.kernel.TaskRejectError
392 for more information."""
396 for more information."""
393
397
394
398
395 class IFullBlockingMultiEngineClient(Interface):
399 class IFullBlockingMultiEngineClient(Interface):
396 pass
400 pass
397
401
398
402
399 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
403 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
400 """
404 """
401 A blocking client to the `IMultiEngine` controller interface.
405 A blocking client to the `IMultiEngine` controller interface.
402
406
403 This class allows users to use a set of engines for a parallel
407 This class allows users to use a set of engines for a parallel
404 computation through the `IMultiEngine` interface. In this interface,
408 computation through the `IMultiEngine` interface. In this interface,
405 each engine has a specific id (an int) that is used to refer to the
409 each engine has a specific id (an int) that is used to refer to the
406 engine, run code on it, etc.
410 engine, run code on it, etc.
407 """
411 """
408
412
409 implements(
413 implements(
410 IFullBlockingMultiEngineClient,
414 IFullBlockingMultiEngineClient,
411 IMultiEngineMapperFactory,
415 IMultiEngineMapperFactory,
412 IMapper
416 IMapper
413 )
417 )
414
418
415 def __init__(self, smultiengine):
419 def __init__(self, smultiengine):
416 self.smultiengine = smultiengine
420 self.smultiengine = smultiengine
417 self.block = True
421 self.block = True
418 self.targets = 'all'
422 self.targets = 'all'
419
423
420 def _findBlock(self, block=None):
424 def _findBlock(self, block=None):
421 if block is None:
425 if block is None:
422 return self.block
426 return self.block
423 else:
427 else:
424 if block in (True, False):
428 if block in (True, False):
425 return block
429 return block
426 else:
430 else:
427 raise ValueError("block must be True or False")
431 raise ValueError("block must be True or False")
428
432
429 def _findTargets(self, targets=None):
433 def _findTargets(self, targets=None):
430 if targets is None:
434 if targets is None:
431 return self.targets
435 return self.targets
432 else:
436 else:
433 if not isinstance(targets, (str,list,tuple,int)):
437 if not isinstance(targets, (str,list,tuple,int)):
434 raise ValueError("targets must be a str, list, tuple or int")
438 raise ValueError("targets must be a str, list, tuple or int")
435 return targets
439 return targets
436
440
437 def _findTargetsAndBlock(self, targets=None, block=None):
441 def _findTargetsAndBlock(self, targets=None, block=None):
438 return self._findTargets(targets), self._findBlock(block)
442 return self._findTargets(targets), self._findBlock(block)
439
443
440 def _blockFromThread(self, function, *args, **kwargs):
444 def _blockFromThread(self, function, *args, **kwargs):
441 block = kwargs.get('block', None)
445 block = kwargs.get('block', None)
442 if block is None:
446 if block is None:
443 raise error.MissingBlockArgument("'block' keyword argument is missing")
447 raise error.MissingBlockArgument("'block' keyword argument is missing")
444 result = blockingCallFromThread(function, *args, **kwargs)
448 result = blockingCallFromThread(function, *args, **kwargs)
445 if not block:
449 if not block:
446 result = PendingResult(self, result)
450 result = PendingResult(self, result)
447 return result
451 return result
448
452
449 def get_pending_deferred(self, deferredID, block):
453 def get_pending_deferred(self, deferredID, block):
450 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
454 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
451
455
452 def barrier(self, pendingResults):
456 def barrier(self, pendingResults):
453 """Synchronize a set of `PendingResults`.
457 """Synchronize a set of `PendingResults`.
454
458
455 This method is a synchronization primitive that waits for a set of
459 This method is a synchronization primitive that waits for a set of
456 `PendingResult` objects to complete. More specifically, barier does
460 `PendingResult` objects to complete. More specifically, barier does
457 the following.
461 the following.
458
462
459 * The `PendingResult`s are sorted by result_id.
463 * The `PendingResult`s are sorted by result_id.
460 * The `get_result` method is called for each `PendingResult` sequentially
464 * The `get_result` method is called for each `PendingResult` sequentially
461 with block=True.
465 with block=True.
462 * If a `PendingResult` gets a result that is an exception, it is
466 * If a `PendingResult` gets a result that is an exception, it is
463 trapped and can be re-raised later by calling `get_result` again.
467 trapped and can be re-raised later by calling `get_result` again.
464 * The `PendingResult`s are flushed from the controller.
468 * The `PendingResult`s are flushed from the controller.
465
469
466 After barrier has been called on a `PendingResult`, its results can
470 After barrier has been called on a `PendingResult`, its results can
467 be retrieved by calling `get_result` again or accesing the `r` attribute
471 be retrieved by calling `get_result` again or accesing the `r` attribute
468 of the instance.
472 of the instance.
469 """
473 """
470
474
471 # Convert to list for sorting and check class type
475 # Convert to list for sorting and check class type
472 prList = list(pendingResults)
476 prList = list(pendingResults)
473 for pr in prList:
477 for pr in prList:
474 if not isinstance(pr, PendingResult):
478 if not isinstance(pr, PendingResult):
475 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
479 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
476
480
477 # Sort the PendingResults so they are in order
481 # Sort the PendingResults so they are in order
478 prList.sort()
482 prList.sort()
479 # Block on each PendingResult object
483 # Block on each PendingResult object
480 for pr in prList:
484 for pr in prList:
481 try:
485 try:
482 result = pr.get_result(block=True)
486 result = pr.get_result(block=True)
483 except Exception:
487 except Exception:
484 pass
488 pass
485
489
486 def flush(self):
490 def flush(self):
487 """
491 """
488 Clear all pending deferreds/results from the controller.
492 Clear all pending deferreds/results from the controller.
489
493
490 For each `PendingResult` that is created by this client, the controller
494 For each `PendingResult` that is created by this client, the controller
491 holds on to the result for that `PendingResult`. This can be a problem
495 holds on to the result for that `PendingResult`. This can be a problem
492 if there are a large number of `PendingResult` objects that are created.
496 if there are a large number of `PendingResult` objects that are created.
493
497
494 Once the result of the `PendingResult` has been retrieved, the result
498 Once the result of the `PendingResult` has been retrieved, the result
495 is removed from the controller, but if a user doesn't get a result (
499 is removed from the controller, but if a user doesn't get a result (
496 they just ignore the `PendingResult`) the result is kept forever on the
500 they just ignore the `PendingResult`) the result is kept forever on the
497 controller. This method allows the user to clear out all un-retrieved
501 controller. This method allows the user to clear out all un-retrieved
498 results on the controller.
502 results on the controller.
499 """
503 """
500 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
504 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
501 return r
505 return r
502
506
503 clear_pending_results = flush
507 clear_pending_results = flush
504
508
505 #---------------------------------------------------------------------------
509 #---------------------------------------------------------------------------
506 # IEngineMultiplexer related methods
510 # IEngineMultiplexer related methods
507 #---------------------------------------------------------------------------
511 #---------------------------------------------------------------------------
508
512
509 def execute(self, lines, targets=None, block=None):
513 def execute(self, lines, targets=None, block=None):
510 """
514 """
511 Execute code on a set of engines.
515 Execute code on a set of engines.
512
516
513 :Parameters:
517 :Parameters:
514 lines : str
518 lines : str
515 The Python code to execute as a string
519 The Python code to execute as a string
516 targets : id or list of ids
520 targets : id or list of ids
517 The engine to use for the execution
521 The engine to use for the execution
518 block : boolean
522 block : boolean
519 If False, this method will return the actual result. If False,
523 If False, this method will return the actual result. If False,
520 a `PendingResult` is returned which can be used to get the result
524 a `PendingResult` is returned which can be used to get the result
521 at a later time.
525 at a later time.
522 """
526 """
523 targets, block = self._findTargetsAndBlock(targets, block)
527 targets, block = self._findTargetsAndBlock(targets, block)
524 result = blockingCallFromThread(self.smultiengine.execute, lines,
528 result = blockingCallFromThread(self.smultiengine.execute, lines,
525 targets=targets, block=block)
529 targets=targets, block=block)
526 if block:
530 if block:
527 result = ResultList(result)
531 result = ResultList(result)
528 else:
532 else:
529 result = PendingResult(self, result)
533 result = PendingResult(self, result)
530 result.add_callback(wrapResultList)
534 result.add_callback(wrapResultList)
531 return result
535 return result
532
536
533 def push(self, namespace, targets=None, block=None):
537 def push(self, namespace, targets=None, block=None):
534 """
538 """
535 Push a dictionary of keys and values to engines namespace.
539 Push a dictionary of keys and values to engines namespace.
536
540
537 Each engine has a persistent namespace. This method is used to push
541 Each engine has a persistent namespace. This method is used to push
538 Python objects into that namespace.
542 Python objects into that namespace.
539
543
540 The objects in the namespace must be pickleable.
544 The objects in the namespace must be pickleable.
541
545
542 :Parameters:
546 :Parameters:
543 namespace : dict
547 namespace : dict
544 A dict that contains Python objects to be injected into
548 A dict that contains Python objects to be injected into
545 the engine persistent namespace.
549 the engine persistent namespace.
546 targets : id or list of ids
550 targets : id or list of ids
547 The engine to use for the execution
551 The engine to use for the execution
548 block : boolean
552 block : boolean
549 If False, this method will return the actual result. If False,
553 If False, this method will return the actual result. If False,
550 a `PendingResult` is returned which can be used to get the result
554 a `PendingResult` is returned which can be used to get the result
551 at a later time.
555 at a later time.
552 """
556 """
553 targets, block = self._findTargetsAndBlock(targets, block)
557 targets, block = self._findTargetsAndBlock(targets, block)
554 return self._blockFromThread(self.smultiengine.push, namespace,
558 return self._blockFromThread(self.smultiengine.push, namespace,
555 targets=targets, block=block)
559 targets=targets, block=block)
556
560
557 def pull(self, keys, targets=None, block=None):
561 def pull(self, keys, targets=None, block=None):
558 """
562 """
559 Pull Python objects by key out of engines namespaces.
563 Pull Python objects by key out of engines namespaces.
560
564
561 :Parameters:
565 :Parameters:
562 keys : str or list of str
566 keys : str or list of str
563 The names of the variables to be pulled
567 The names of the variables to be pulled
564 targets : id or list of ids
568 targets : id or list of ids
565 The engine to use for the execution
569 The engine to use for the execution
566 block : boolean
570 block : boolean
567 If False, this method will return the actual result. If False,
571 If False, this method will return the actual result. If False,
568 a `PendingResult` is returned which can be used to get the result
572 a `PendingResult` is returned which can be used to get the result
569 at a later time.
573 at a later time.
570 """
574 """
571 targets, block = self._findTargetsAndBlock(targets, block)
575 targets, block = self._findTargetsAndBlock(targets, block)
572 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
576 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
573
577
574 def push_function(self, namespace, targets=None, block=None):
578 def push_function(self, namespace, targets=None, block=None):
575 """
579 """
576 Push a Python function to an engine.
580 Push a Python function to an engine.
577
581
578 This method is used to push a Python function to an engine. This
582 This method is used to push a Python function to an engine. This
579 method can then be used in code on the engines. Closures are not supported.
583 method can then be used in code on the engines. Closures are not supported.
580
584
581 :Parameters:
585 :Parameters:
582 namespace : dict
586 namespace : dict
583 A dict whose values are the functions to be pushed. The keys give
587 A dict whose values are the functions to be pushed. The keys give
584 that names that the function will appear as in the engines
588 that names that the function will appear as in the engines
585 namespace.
589 namespace.
586 targets : id or list of ids
590 targets : id or list of ids
587 The engine to use for the execution
591 The engine to use for the execution
588 block : boolean
592 block : boolean
589 If False, this method will return the actual result. If False,
593 If False, this method will return the actual result. If False,
590 a `PendingResult` is returned which can be used to get the result
594 a `PendingResult` is returned which can be used to get the result
591 at a later time.
595 at a later time.
592 """
596 """
593 targets, block = self._findTargetsAndBlock(targets, block)
597 targets, block = self._findTargetsAndBlock(targets, block)
594 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
598 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
595
599
596 def pull_function(self, keys, targets=None, block=None):
600 def pull_function(self, keys, targets=None, block=None):
597 """
601 """
598 Pull a Python function from an engine.
602 Pull a Python function from an engine.
599
603
600 This method is used to pull a Python function from an engine.
604 This method is used to pull a Python function from an engine.
601 Closures are not supported.
605 Closures are not supported.
602
606
603 :Parameters:
607 :Parameters:
604 keys : str or list of str
608 keys : str or list of str
605 The names of the functions to be pulled
609 The names of the functions to be pulled
606 targets : id or list of ids
610 targets : id or list of ids
607 The engine to use for the execution
611 The engine to use for the execution
608 block : boolean
612 block : boolean
609 If False, this method will return the actual result. If False,
613 If False, this method will return the actual result. If False,
610 a `PendingResult` is returned which can be used to get the result
614 a `PendingResult` is returned which can be used to get the result
611 at a later time.
615 at a later time.
612 """
616 """
613 targets, block = self._findTargetsAndBlock(targets, block)
617 targets, block = self._findTargetsAndBlock(targets, block)
614 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
618 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
615
619
616 def push_serialized(self, namespace, targets=None, block=None):
620 def push_serialized(self, namespace, targets=None, block=None):
617 targets, block = self._findTargetsAndBlock(targets, block)
621 targets, block = self._findTargetsAndBlock(targets, block)
618 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
622 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
619
623
620 def pull_serialized(self, keys, targets=None, block=None):
624 def pull_serialized(self, keys, targets=None, block=None):
621 targets, block = self._findTargetsAndBlock(targets, block)
625 targets, block = self._findTargetsAndBlock(targets, block)
622 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
626 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
623
627
624 def get_result(self, i=None, targets=None, block=None):
628 def get_result(self, i=None, targets=None, block=None):
625 """
629 """
626 Get a previous result.
630 Get a previous result.
627
631
628 When code is executed in an engine, a dict is created and returned. This
632 When code is executed in an engine, a dict is created and returned. This
629 method retrieves that dict for previous commands.
633 method retrieves that dict for previous commands.
630
634
631 :Parameters:
635 :Parameters:
632 i : int
636 i : int
633 The number of the result to get
637 The number of the result to get
634 targets : id or list of ids
638 targets : id or list of ids
635 The engine to use for the execution
639 The engine to use for the execution
636 block : boolean
640 block : boolean
637 If False, this method will return the actual result. If False,
641 If False, this method will return the actual result. If False,
638 a `PendingResult` is returned which can be used to get the result
642 a `PendingResult` is returned which can be used to get the result
639 at a later time.
643 at a later time.
640 """
644 """
641 targets, block = self._findTargetsAndBlock(targets, block)
645 targets, block = self._findTargetsAndBlock(targets, block)
642 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
646 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
643 if block:
647 if block:
644 result = ResultList(result)
648 result = ResultList(result)
645 else:
649 else:
646 result = PendingResult(self, result)
650 result = PendingResult(self, result)
647 result.add_callback(wrapResultList)
651 result.add_callback(wrapResultList)
648 return result
652 return result
649
653
650 def reset(self, targets=None, block=None):
654 def reset(self, targets=None, block=None):
651 """
655 """
652 Reset an engine.
656 Reset an engine.
653
657
654 This method clears out the namespace of an engine.
658 This method clears out the namespace of an engine.
655
659
656 :Parameters:
660 :Parameters:
657 targets : id or list of ids
661 targets : id or list of ids
658 The engine to use for the execution
662 The engine to use for the execution
659 block : boolean
663 block : boolean
660 If False, this method will return the actual result. If False,
664 If False, this method will return the actual result. If False,
661 a `PendingResult` is returned which can be used to get the result
665 a `PendingResult` is returned which can be used to get the result
662 at a later time.
666 at a later time.
663 """
667 """
664 targets, block = self._findTargetsAndBlock(targets, block)
668 targets, block = self._findTargetsAndBlock(targets, block)
665 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
669 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
666
670
667 def keys(self, targets=None, block=None):
671 def keys(self, targets=None, block=None):
668 """
672 """
669 Get a list of all the variables in an engine's namespace.
673 Get a list of all the variables in an engine's namespace.
670
674
671 :Parameters:
675 :Parameters:
672 targets : id or list of ids
676 targets : id or list of ids
673 The engine to use for the execution
677 The engine to use for the execution
674 block : boolean
678 block : boolean
675 If False, this method will return the actual result. If False,
679 If False, this method will return the actual result. If False,
676 a `PendingResult` is returned which can be used to get the result
680 a `PendingResult` is returned which can be used to get the result
677 at a later time.
681 at a later time.
678 """
682 """
679 targets, block = self._findTargetsAndBlock(targets, block)
683 targets, block = self._findTargetsAndBlock(targets, block)
680 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
684 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
681
685
682 def kill(self, controller=False, targets=None, block=None):
686 def kill(self, controller=False, targets=None, block=None):
683 """
687 """
684 Kill the engines and controller.
688 Kill the engines and controller.
685
689
686 This method is used to stop the engine and controller by calling
690 This method is used to stop the engine and controller by calling
687 `reactor.stop`.
691 `reactor.stop`.
688
692
689 :Parameters:
693 :Parameters:
690 controller : boolean
694 controller : boolean
691 If True, kill the engines and controller. If False, just the
695 If True, kill the engines and controller. If False, just the
692 engines
696 engines
693 targets : id or list of ids
697 targets : id or list of ids
694 The engine to use for the execution
698 The engine to use for the execution
695 block : boolean
699 block : boolean
696 If False, this method will return the actual result. If False,
700 If False, this method will return the actual result. If False,
697 a `PendingResult` is returned which can be used to get the result
701 a `PendingResult` is returned which can be used to get the result
698 at a later time.
702 at a later time.
699 """
703 """
700 targets, block = self._findTargetsAndBlock(targets, block)
704 targets, block = self._findTargetsAndBlock(targets, block)
701 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
705 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
702
706
703 def clear_queue(self, targets=None, block=None):
707 def clear_queue(self, targets=None, block=None):
704 """
708 """
705 Clear out the controller's queue for an engine.
709 Clear out the controller's queue for an engine.
706
710
707 The controller maintains a queue for each engine. This clear it out.
711 The controller maintains a queue for each engine. This clear it out.
708
712
709 :Parameters:
713 :Parameters:
710 targets : id or list of ids
714 targets : id or list of ids
711 The engine to use for the execution
715 The engine to use for the execution
712 block : boolean
716 block : boolean
713 If False, this method will return the actual result. If False,
717 If False, this method will return the actual result. If False,
714 a `PendingResult` is returned which can be used to get the result
718 a `PendingResult` is returned which can be used to get the result
715 at a later time.
719 at a later time.
716 """
720 """
717 targets, block = self._findTargetsAndBlock(targets, block)
721 targets, block = self._findTargetsAndBlock(targets, block)
718 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
722 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
719
723
720 def queue_status(self, targets=None, block=None):
724 def queue_status(self, targets=None, block=None):
721 """
725 """
722 Get the status of an engines queue.
726 Get the status of an engines queue.
723
727
724 :Parameters:
728 :Parameters:
725 targets : id or list of ids
729 targets : id or list of ids
726 The engine to use for the execution
730 The engine to use for the execution
727 block : boolean
731 block : boolean
728 If False, this method will return the actual result. If False,
732 If False, this method will return the actual result. If False,
729 a `PendingResult` is returned which can be used to get the result
733 a `PendingResult` is returned which can be used to get the result
730 at a later time.
734 at a later time.
731 """
735 """
732 targets, block = self._findTargetsAndBlock(targets, block)
736 targets, block = self._findTargetsAndBlock(targets, block)
733 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
737 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
734
738
735 def set_properties(self, properties, targets=None, block=None):
739 def set_properties(self, properties, targets=None, block=None):
736 warnings.warn(_prop_warn)
740 warnings.warn(_prop_warn)
737 targets, block = self._findTargetsAndBlock(targets, block)
741 targets, block = self._findTargetsAndBlock(targets, block)
738 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
742 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
739
743
740 def get_properties(self, keys=None, targets=None, block=None):
744 def get_properties(self, keys=None, targets=None, block=None):
741 warnings.warn(_prop_warn)
745 warnings.warn(_prop_warn)
742 targets, block = self._findTargetsAndBlock(targets, block)
746 targets, block = self._findTargetsAndBlock(targets, block)
743 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
747 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
744
748
745 def has_properties(self, keys, targets=None, block=None):
749 def has_properties(self, keys, targets=None, block=None):
746 warnings.warn(_prop_warn)
750 warnings.warn(_prop_warn)
747 targets, block = self._findTargetsAndBlock(targets, block)
751 targets, block = self._findTargetsAndBlock(targets, block)
748 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
752 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
749
753
750 def del_properties(self, keys, targets=None, block=None):
754 def del_properties(self, keys, targets=None, block=None):
751 warnings.warn(_prop_warn)
755 warnings.warn(_prop_warn)
752 targets, block = self._findTargetsAndBlock(targets, block)
756 targets, block = self._findTargetsAndBlock(targets, block)
753 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
757 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
754
758
755 def clear_properties(self, targets=None, block=None):
759 def clear_properties(self, targets=None, block=None):
756 warnings.warn(_prop_warn)
760 warnings.warn(_prop_warn)
757 targets, block = self._findTargetsAndBlock(targets, block)
761 targets, block = self._findTargetsAndBlock(targets, block)
758 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
762 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
759
763
760 #---------------------------------------------------------------------------
764 #---------------------------------------------------------------------------
761 # IMultiEngine related methods
765 # IMultiEngine related methods
762 #---------------------------------------------------------------------------
766 #---------------------------------------------------------------------------
763
767
764 def get_ids(self):
768 def get_ids(self):
765 """
769 """
766 Returns the ids of currently registered engines.
770 Returns the ids of currently registered engines.
767 """
771 """
768 result = blockingCallFromThread(self.smultiengine.get_ids)
772 result = blockingCallFromThread(self.smultiengine.get_ids)
769 return result
773 return result
770
774
771 #---------------------------------------------------------------------------
775 #---------------------------------------------------------------------------
772 # IMultiEngineCoordinator
776 # IMultiEngineCoordinator
773 #---------------------------------------------------------------------------
777 #---------------------------------------------------------------------------
774
778
775 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
779 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
776 """
780 """
777 Partition a Python sequence and send the partitions to a set of engines.
781 Partition a Python sequence and send the partitions to a set of engines.
778 """
782 """
779 targets, block = self._findTargetsAndBlock(targets, block)
783 targets, block = self._findTargetsAndBlock(targets, block)
780 return self._blockFromThread(self.smultiengine.scatter, key, seq,
784 return self._blockFromThread(self.smultiengine.scatter, key, seq,
781 dist, flatten, targets=targets, block=block)
785 dist, flatten, targets=targets, block=block)
782
786
783 def gather(self, key, dist='b', targets=None, block=None):
787 def gather(self, key, dist='b', targets=None, block=None):
784 """
788 """
785 Gather a partitioned sequence on a set of engines as a single local seq.
789 Gather a partitioned sequence on a set of engines as a single local seq.
786 """
790 """
787 targets, block = self._findTargetsAndBlock(targets, block)
791 targets, block = self._findTargetsAndBlock(targets, block)
788 return self._blockFromThread(self.smultiengine.gather, key, dist,
792 return self._blockFromThread(self.smultiengine.gather, key, dist,
789 targets=targets, block=block)
793 targets=targets, block=block)
790
794
791 def raw_map(self, func, seq, dist='b', targets=None, block=None):
795 def raw_map(self, func, seq, dist='b', targets=None, block=None):
792 """
796 """
793 A parallelized version of Python's builtin map.
797 A parallelized version of Python's builtin map.
794
798
795 This has a slightly different syntax than the builtin `map`.
799 This has a slightly different syntax than the builtin `map`.
796 This is needed because we need to have keyword arguments and thus
800 This is needed because we need to have keyword arguments and thus
797 can't use *args to capture all the sequences. Instead, they must
801 can't use *args to capture all the sequences. Instead, they must
798 be passed in a list or tuple.
802 be passed in a list or tuple.
799
803
800 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
804 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
801
805
802 Most users will want to use parallel functions or the `mapper`
806 Most users will want to use parallel functions or the `mapper`
803 and `map` methods for an API that follows that of the builtin
807 and `map` methods for an API that follows that of the builtin
804 `map`.
808 `map`.
805 """
809 """
806 targets, block = self._findTargetsAndBlock(targets, block)
810 targets, block = self._findTargetsAndBlock(targets, block)
807 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
811 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
808 dist, targets=targets, block=block)
812 dist, targets=targets, block=block)
809
813
810 def map(self, func, *sequences):
814 def map(self, func, *sequences):
811 """
815 """
812 A parallel version of Python's builtin `map` function.
816 A parallel version of Python's builtin `map` function.
813
817
814 This method applies a function to sequences of arguments. It
818 This method applies a function to sequences of arguments. It
815 follows the same syntax as the builtin `map`.
819 follows the same syntax as the builtin `map`.
816
820
817 This method creates a mapper objects by calling `self.mapper` with
821 This method creates a mapper objects by calling `self.mapper` with
818 no arguments and then uses that mapper to do the mapping. See
822 no arguments and then uses that mapper to do the mapping. See
819 the documentation of `mapper` for more details.
823 the documentation of `mapper` for more details.
820 """
824 """
821 return self.mapper().map(func, *sequences)
825 return self.mapper().map(func, *sequences)
822
826
823 def mapper(self, dist='b', targets='all', block=None):
827 def mapper(self, dist='b', targets='all', block=None):
824 """
828 """
825 Create a mapper object that has a `map` method.
829 Create a mapper object that has a `map` method.
826
830
827 This method returns an object that implements the `IMapper`
831 This method returns an object that implements the `IMapper`
828 interface. This method is a factory that is used to control how
832 interface. This method is a factory that is used to control how
829 the map happens.
833 the map happens.
830
834
831 :Parameters:
835 :Parameters:
832 dist : str
836 dist : str
833 What decomposition to use, 'b' is the only one supported
837 What decomposition to use, 'b' is the only one supported
834 currently
838 currently
835 targets : str, int, sequence of ints
839 targets : str, int, sequence of ints
836 Which engines to use for the map
840 Which engines to use for the map
837 block : boolean
841 block : boolean
838 Should calls to `map` block or not
842 Should calls to `map` block or not
839 """
843 """
840 return MultiEngineMapper(self, dist, targets, block)
844 return MultiEngineMapper(self, dist, targets, block)
841
845
842 def parallel(self, dist='b', targets=None, block=None):
846 def parallel(self, dist='b', targets=None, block=None):
843 """
847 """
844 A decorator that turns a function into a parallel function.
848 A decorator that turns a function into a parallel function.
845
849
846 This can be used as:
850 This can be used as:
847
851
848 @parallel()
852 @parallel()
849 def f(x, y)
853 def f(x, y)
850 ...
854 ...
851
855
852 f(range(10), range(10))
856 f(range(10), range(10))
853
857
854 This causes f(0,0), f(1,1), ... to be called in parallel.
858 This causes f(0,0), f(1,1), ... to be called in parallel.
855
859
856 :Parameters:
860 :Parameters:
857 dist : str
861 dist : str
858 What decomposition to use, 'b' is the only one supported
862 What decomposition to use, 'b' is the only one supported
859 currently
863 currently
860 targets : str, int, sequence of ints
864 targets : str, int, sequence of ints
861 Which engines to use for the map
865 Which engines to use for the map
862 block : boolean
866 block : boolean
863 Should calls to `map` block or not
867 Should calls to `map` block or not
864 """
868 """
865 targets, block = self._findTargetsAndBlock(targets, block)
869 targets, block = self._findTargetsAndBlock(targets, block)
866 mapper = self.mapper(dist, targets, block)
870 mapper = self.mapper(dist, targets, block)
867 pf = ParallelFunction(mapper)
871 pf = ParallelFunction(mapper)
868 return pf
872 return pf
869
873
870 #---------------------------------------------------------------------------
874 #---------------------------------------------------------------------------
871 # IMultiEngineExtras
875 # IMultiEngineExtras
872 #---------------------------------------------------------------------------
876 #---------------------------------------------------------------------------
873
877
874 def zip_pull(self, keys, targets=None, block=None):
878 def zip_pull(self, keys, targets=None, block=None):
875 targets, block = self._findTargetsAndBlock(targets, block)
879 targets, block = self._findTargetsAndBlock(targets, block)
876 return self._blockFromThread(self.smultiengine.zip_pull, keys,
880 return self._blockFromThread(self.smultiengine.zip_pull, keys,
877 targets=targets, block=block)
881 targets=targets, block=block)
878
882
879 def run(self, filename, targets=None, block=None):
883 def run(self, filename, targets=None, block=None):
880 """
884 """
881 Run a Python code in a file on the engines.
885 Run a Python code in a file on the engines.
882
886
883 :Parameters:
887 :Parameters:
884 filename : str
888 filename : str
885 The name of the local file to run
889 The name of the local file to run
886 targets : id or list of ids
890 targets : id or list of ids
887 The engine to use for the execution
891 The engine to use for the execution
888 block : boolean
892 block : boolean
889 If False, this method will return the actual result. If False,
893 If False, this method will return the actual result. If False,
890 a `PendingResult` is returned which can be used to get the result
894 a `PendingResult` is returned which can be used to get the result
891 at a later time.
895 at a later time.
892 """
896 """
893 targets, block = self._findTargetsAndBlock(targets, block)
897 targets, block = self._findTargetsAndBlock(targets, block)
894 return self._blockFromThread(self.smultiengine.run, filename,
898 return self._blockFromThread(self.smultiengine.run, filename,
895 targets=targets, block=block)
899 targets=targets, block=block)
896
900
897 def benchmark(self, push_size=10000):
901 def benchmark(self, push_size=10000):
898 """
902 """
899 Run performance benchmarks for the current IPython cluster.
903 Run performance benchmarks for the current IPython cluster.
900
904
901 This method tests both the latency of sending command and data to the
905 This method tests both the latency of sending command and data to the
902 engines as well as the throughput of sending large objects to the
906 engines as well as the throughput of sending large objects to the
903 engines using push. The latency is measured by having one or more
907 engines using push. The latency is measured by having one or more
904 engines execute the command 'pass'. The throughput is measure by
908 engines execute the command 'pass'. The throughput is measure by
905 sending an NumPy array of size `push_size` to one or more engines.
909 sending an NumPy array of size `push_size` to one or more engines.
906
910
907 These benchmarks will vary widely on different hardware and networks
911 These benchmarks will vary widely on different hardware and networks
908 and thus can be used to get an idea of the performance characteristics
912 and thus can be used to get an idea of the performance characteristics
909 of a particular configuration of an IPython controller and engines.
913 of a particular configuration of an IPython controller and engines.
910
914
911 This function is not testable within our current testing framework.
915 This function is not testable within our current testing framework.
912 """
916 """
913 import timeit, __builtin__
917 import timeit, __builtin__
914 __builtin__._mec_self = self
918 __builtin__._mec_self = self
915 benchmarks = {}
919 benchmarks = {}
916 repeat = 3
920 repeat = 3
917 count = 10
921 count = 10
918
922
919 timer = timeit.Timer('_mec_self.execute("pass",0)')
923 timer = timeit.Timer('_mec_self.execute("pass",0)')
920 result = 1000*min(timer.repeat(repeat,count))/count
924 result = 1000*min(timer.repeat(repeat,count))/count
921 benchmarks['single_engine_latency'] = (result,'msec')
925 benchmarks['single_engine_latency'] = (result,'msec')
922
926
923 timer = timeit.Timer('_mec_self.execute("pass")')
927 timer = timeit.Timer('_mec_self.execute("pass")')
924 result = 1000*min(timer.repeat(repeat,count))/count
928 result = 1000*min(timer.repeat(repeat,count))/count
925 benchmarks['all_engine_latency'] = (result,'msec')
929 benchmarks['all_engine_latency'] = (result,'msec')
926
930
927 try:
931 try:
928 import numpy as np
932 import numpy as np
929 except:
933 except:
930 pass
934 pass
931 else:
935 else:
932 timer = timeit.Timer(
936 timer = timeit.Timer(
933 "_mec_self.push(d)",
937 "_mec_self.push(d)",
934 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
938 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
935 )
939 )
936 result = min(timer.repeat(repeat,count))/count
940 result = min(timer.repeat(repeat,count))/count
937 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
941 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
938
942
939 try:
943 try:
940 import numpy as np
944 import numpy as np
941 except:
945 except:
942 pass
946 pass
943 else:
947 else:
944 timer = timeit.Timer(
948 timer = timeit.Timer(
945 "_mec_self.push(d,0)",
949 "_mec_self.push(d,0)",
946 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
950 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
947 )
951 )
948 result = min(timer.repeat(repeat,count))/count
952 result = min(timer.repeat(repeat,count))/count
949 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
953 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
950
954
951 return benchmarks
955 return benchmarks
952
956
953
957
954 components.registerAdapter(FullBlockingMultiEngineClient,
958 components.registerAdapter(FullBlockingMultiEngineClient,
955 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
959 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
956
960
957
961
958
962
959
963
1 NO CONTENT: file was removed
NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now