##// END OF EJS Templates
Updating IPython.kernel to fix minor bugs....
Brian Granger -
Show More
@@ -1,202 +1,214 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Magic command interface for interactive parallel work."""
4 """Magic command interface for interactive parallel work."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import new
17 import new
18
18
19 from IPython.core.plugin import Plugin
19 from IPython.core.plugin import Plugin
20 from IPython.utils.traitlets import Bool, Any, Instance
20 from IPython.utils.traitlets import Bool, Any, Instance
21 from IPython.utils.autoattr import auto_attr
21 from IPython.utils.autoattr import auto_attr
22 from IPython.testing import decorators as testdec
22 from IPython.testing import decorators as testdec
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Definitions of magic functions for use with IPython
25 # Definitions of magic functions for use with IPython
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28
28
29 NO_ACTIVE_MULTIENGINE_CLIENT = """
29 NO_ACTIVE_MULTIENGINE_CLIENT = """
30 Use activate() on a MultiEngineClient object to activate it for magics.
30 Use activate() on a MultiEngineClient object to activate it for magics.
31 """
31 """
32
32
33
33
34 class ParalleMagic(Plugin):
34 class ParalleMagic(Plugin):
35 """A component to manage the %result, %px and %autopx magics."""
35 """A component to manage the %result, %px and %autopx magics."""
36
36
37 active_multiengine_client = Any()
37 active_multiengine_client = Any()
38 verbose = Bool(False, config=True)
38 verbose = Bool(False, config=True)
39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
40
40
41 def __init__(self, shell=None, config=None):
41 def __init__(self, shell=None, config=None):
42 super(ParalleMagic, self).__init__(shell=shell, config=config)
42 super(ParalleMagic, self).__init__(shell=shell, config=config)
43 self._define_magics()
43 self._define_magics()
44 # A flag showing if autopx is activated or not
44 # A flag showing if autopx is activated or not
45 self.autopx = False
45 self.autopx = False
46
46
47 def _define_magics(self):
47 def _define_magics(self):
48 """Define the magic functions."""
48 """Define the magic functions."""
49 self.shell.define_magic('result', self.magic_result)
49 self.shell.define_magic('result', self.magic_result)
50 self.shell.define_magic('px', self.magic_px)
50 self.shell.define_magic('px', self.magic_px)
51 self.shell.define_magic('autopx', self.magic_autopx)
51 self.shell.define_magic('autopx', self.magic_autopx)
52
52
53 @testdec.skip_doctest
53 @testdec.skip_doctest
54 def magic_result(self, ipself, parameter_s=''):
54 def magic_result(self, ipself, parameter_s=''):
55 """Print the result of command i on all engines..
55 """Print the result of command i on all engines..
56
56
57 To use this a :class:`MultiEngineClient` instance must be created
57 To use this a :class:`MultiEngineClient` instance must be created
58 and then activated by calling its :meth:`activate` method.
58 and then activated by calling its :meth:`activate` method.
59
59
60 Then you can do the following::
60 Then you can do the following::
61
61
62 In [23]: %result
62 In [23]: %result
63 Out[23]:
63 Out[23]:
64 <Results List>
64 <Results List>
65 [0] In [6]: a = 10
65 [0] In [6]: a = 10
66 [1] In [6]: a = 10
66 [1] In [6]: a = 10
67
67
68 In [22]: %result 6
68 In [22]: %result 6
69 Out[22]:
69 Out[22]:
70 <Results List>
70 <Results List>
71 [0] In [6]: a = 10
71 [0] In [6]: a = 10
72 [1] In [6]: a = 10
72 [1] In [6]: a = 10
73 """
73 """
74 if self.active_multiengine_client is None:
74 if self.active_multiengine_client is None:
75 print NO_ACTIVE_MULTIENGINE_CLIENT
75 print NO_ACTIVE_MULTIENGINE_CLIENT
76 return
76 return
77
77
78 try:
78 try:
79 index = int(parameter_s)
79 index = int(parameter_s)
80 except:
80 except:
81 index = None
81 index = None
82 result = self.active_multiengine_client.get_result(index)
82 result = self.active_multiengine_client.get_result(index)
83 return result
83 return result
84
84
85 @testdec.skip_doctest
85 @testdec.skip_doctest
86 def magic_px(self, ipself, parameter_s=''):
86 def magic_px(self, ipself, parameter_s=''):
87 """Executes the given python command in parallel.
87 """Executes the given python command in parallel.
88
88
89 To use this a :class:`MultiEngineClient` instance must be created
89 To use this a :class:`MultiEngineClient` instance must be created
90 and then activated by calling its :meth:`activate` method.
90 and then activated by calling its :meth:`activate` method.
91
91
92 Then you can do the following::
92 Then you can do the following::
93
93
94 In [24]: %px a = 5
94 In [24]: %px a = 5
95 Parallel execution on engines: all
95 Parallel execution on engines: all
96 Out[24]:
96 Out[24]:
97 <Results List>
97 <Results List>
98 [0] In [7]: a = 5
98 [0] In [7]: a = 5
99 [1] In [7]: a = 5
99 [1] In [7]: a = 5
100 """
100 """
101
101
102 if self.active_multiengine_client is None:
102 if self.active_multiengine_client is None:
103 print NO_ACTIVE_MULTIENGINE_CLIENT
103 print NO_ACTIVE_MULTIENGINE_CLIENT
104 return
104 return
105 print "Parallel execution on engines: %s" % self.active_multiengine_client.targets
105 print "Parallel execution on engines: %s" % self.active_multiengine_client.targets
106 result = self.active_multiengine_client.execute(parameter_s)
106 result = self.active_multiengine_client.execute(parameter_s)
107 return result
107 return result
108
108
109 @testdec.skip_doctest
109 @testdec.skip_doctest
110 def magic_autopx(self, ipself, parameter_s=''):
110 def magic_autopx(self, ipself, parameter_s=''):
111 """Toggles auto parallel mode.
111 """Toggles auto parallel mode.
112
112
113 To use this a :class:`MultiEngineClient` instance must be created
113 To use this a :class:`MultiEngineClient` instance must be created
114 and then activated by calling its :meth:`activate` method. Once this
114 and then activated by calling its :meth:`activate` method. Once this
115 is called, all commands typed at the command line are send to
115 is called, all commands typed at the command line are send to
116 the engines to be executed in parallel. To control which engine
116 the engines to be executed in parallel. To control which engine
117 are used, set the ``targets`` attributed of the multiengine client
117 are used, set the ``targets`` attributed of the multiengine client
118 before entering ``%autopx`` mode.
118 before entering ``%autopx`` mode.
119
119
120 Then you can do the following::
120 Then you can do the following::
121
121
122 In [25]: %autopx
122 In [25]: %autopx
123 %autopx to enabled
123 %autopx to enabled
124
124
125 In [26]: a = 10
125 In [26]: a = 10
126 <Results List>
126 <Results List>
127 [0] In [8]: a = 10
127 [0] In [8]: a = 10
128 [1] In [8]: a = 10
128 [1] In [8]: a = 10
129
129
130
130
131 In [27]: %autopx
131 In [27]: %autopx
132 %autopx disabled
132 %autopx disabled
133 """
133 """
134 if self.autopx:
134 if self.autopx:
135 self._disable_autopx()
135 self._disable_autopx()
136 else:
136 else:
137 self._enable_autopx()
137 self._enable_autopx()
138
138
139 def _enable_autopx(self):
139 def _enable_autopx(self):
140 """Enable %autopx mode by saving the original run_source and installing
140 """Enable %autopx mode by saving the original run_source and installing
141 pxrun_source.
141 pxrun_source.
142 """
142 """
143 if self.active_multiengine_client is None:
143 if self.active_multiengine_client is None:
144 print NO_ACTIVE_MULTIENGINE_CLIENT
144 print NO_ACTIVE_MULTIENGINE_CLIENT
145 return
145 return
146
146
147 self._original_run_source = self.shell.run_source
147 self._original_run_source = self.shell.run_source
148 self.shell.run_source = new.instancemethod(
148 self.shell.run_source = new.instancemethod(
149 self.pxrun_source, self.shell, self.shell.__class__
149 self.pxrun_source, self.shell, self.shell.__class__
150 )
150 )
151 self.autopx = True
151 self.autopx = True
152 print "%autopx enabled"
152 print "%autopx enabled"
153
153
154 def _disable_autopx(self):
154 def _disable_autopx(self):
155 """Disable %autopx by restoring the original InteractiveShell.run_source.
155 """Disable %autopx by restoring the original InteractiveShell.run_source.
156 """
156 """
157 if self.autopx:
157 if self.autopx:
158 self.shell.run_source = self._original_run_source
158 self.shell.run_source = self._original_run_source
159 self.autopx = False
159 self.autopx = False
160 print "%autopx disabled"
160 print "%autopx disabled"
161
161
162 def pxrun_source(self, ipself, source, filename="<input>", symbol="single"):
162 def pxrun_source(self, ipself, source, filename=None,
163 """A parallel replacement for InteractiveShell.run_source."""
163 symbol='single', post_execute=True):
164
164
165 # We need to ensure that the source is unicode from here on.
166 if type(source)==str:
167 usource = source.decode(ipself.stdin_encoding)
168 else:
169 usource = source
170
171 if 0: # dbg
172 print 'Source:', repr(source) # dbg
173 print 'USource:', repr(usource) # dbg
174 print 'type:', type(source) # dbg
175 print 'encoding', ipself.stdin_encoding # dbg
176
165 try:
177 try:
166 code = ipself.compile(source, filename, symbol)
178 code = ipself.compile(usource, symbol, ipself.execution_count)
167 except (OverflowError, SyntaxError, ValueError):
179 except (OverflowError, SyntaxError, ValueError, TypeError, MemoryError):
168 # Case 1
180 # Case 1
169 ipself.showsyntaxerror(filename)
181 ipself.showsyntaxerror(filename)
170 return None
182 return None
171
183
172 if code is None:
184 if code is None:
173 # Case 2
185 # Case 2
174 return True
186 return True
175
187
176 # Case 3
188 # Case 3
177 # Because autopx is enabled, we now call executeAll or disable autopx if
189 # Because autopx is enabled, we now call executeAll or disable autopx if
178 # %autopx or autopx has been called
190 # %autopx or autopx has been called
179 if 'get_ipython().magic("%autopx' in source or 'get_ipython().magic("autopx' in source:
191 if 'get_ipython().magic("%autopx' in source or 'get_ipython().magic("autopx' in source:
180 self._disable_autopx()
192 self._disable_autopx()
181 return False
193 return False
182 else:
194 else:
183 try:
195 try:
184 result = self.active_multiengine_client.execute(source)
196 result = self.active_multiengine_client.execute(source)
185 except:
197 except:
186 ipself.showtraceback()
198 ipself.showtraceback()
187 else:
199 else:
188 print result.__repr__()
200 print result.__repr__()
189 return False
201 return False
190
202
191
203
192 _loaded = False
204 _loaded = False
193
205
194
206
195 def load_ipython_extension(ip):
207 def load_ipython_extension(ip):
196 """Load the extension in IPython."""
208 """Load the extension in IPython."""
197 global _loaded
209 global _loaded
198 if not _loaded:
210 if not _loaded:
199 plugin = ParalleMagic(shell=ip, config=ip.config)
211 plugin = ParalleMagic(shell=ip, config=ip.config)
200 ip.plugin_manager.register_plugin('parallel_magic', plugin)
212 ip.plugin_manager.register_plugin('parallelmagic', plugin)
201 _loaded = True
213 _loaded = True
202
214
@@ -1,95 +1,110 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """This module contains blocking clients for the controller interfaces.
4 """This module contains blocking clients for the controller interfaces.
5
5
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
7 blocking. This means that methods on the clients return the actual results
7 blocking. This means that methods on the clients return the actual results
8 rather than a deferred to the result. Also, we manage the Twisted reactor
8 rather than a deferred to the result. Also, we manage the Twisted reactor
9 for you. This is done by running the reactor in a thread.
9 for you. This is done by running the reactor in a thread.
10
10
11 The main classes in this module are:
11 The main classes in this module are:
12
12
13 * MultiEngineClient
13 * MultiEngineClient
14 * TaskClient
14 * TaskClient
15 * Task
15 * Task
16 * CompositeError
16 * CompositeError
17 """
17 """
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Copyright (C) 2008-2009 The IPython Development Team
20 # Copyright (C) 2008-2009 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Warnings control
27 # Warnings control
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 import warnings
30 import warnings
31
31
32 # Twisted generates annoying warnings with Python 2.6, as will do other code
32 # Twisted generates annoying warnings with Python 2.6, as will do other code
33 # that imports 'sets' as of today
33 # that imports 'sets' as of today
34 warnings.filterwarnings('ignore', 'the sets module is deprecated',
34 warnings.filterwarnings('ignore', 'the sets module is deprecated',
35 DeprecationWarning )
35 DeprecationWarning )
36
36
37 # This one also comes from Twisted
37 # This one also comes from Twisted
38 warnings.filterwarnings('ignore', 'the sha module is deprecated',
38 warnings.filterwarnings('ignore', 'the sha module is deprecated',
39 DeprecationWarning)
39 DeprecationWarning)
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Imports
42 # Imports
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 import sys
45 import sys
46
46
47 import twisted
47 from twisted.internet import reactor
48 from twisted.internet import reactor
48 from twisted.internet.error import PotentialZombieWarning
49 from twisted.python import log
49 from twisted.python import log
50
50
51 from IPython.kernel.clientconnector import ClientConnector, Cluster
51 from IPython.kernel.clientconnector import ClientConnector, Cluster
52 from IPython.kernel.twistedutil import ReactorInThread
52 from IPython.kernel.twistedutil import ReactorInThread
53 from IPython.kernel.twistedutil import blockingCallFromThread
53 from IPython.kernel.twistedutil import blockingCallFromThread
54
54
55 # These enable various things
55 # These enable various things
56 from IPython.kernel import codeutil
56 from IPython.kernel import codeutil
57 # import IPython.kernel.magic
58
57
59 # Other things that the user will need
58 # Other things that the user will need
60 from IPython.kernel.task import MapTask, StringTask
59 from IPython.kernel.task import MapTask, StringTask
61 from IPython.kernel.error import CompositeError
60 from IPython.kernel.error import CompositeError
62
61
63 #-------------------------------------------------------------------------------
62 #-------------------------------------------------------------------------------
64 # Code
63 # Code
65 #-------------------------------------------------------------------------------
64 #-------------------------------------------------------------------------------
66
65
67 warnings.simplefilter('ignore', PotentialZombieWarning)
66 # PotentialZombieWarning is deprecated from Twisted 10.0.0 and above and
67 # using the filter on > 10.0.0 creates a warning itself.
68 if twisted.version.major < 10:
69 from twisted.internet.error import PotentialZombieWarning
70 warnings.simplefilter('ignore', PotentialZombieWarning)
68
71
69 _client_tub = ClientConnector()
72 _client_tub = ClientConnector()
70
73
71 get_multiengine_client = _client_tub.get_multiengine_client
74 get_multiengine_client = _client_tub.get_multiengine_client
72 get_task_client = _client_tub.get_task_client
75 get_task_client = _client_tub.get_task_client
73 MultiEngineClient = get_multiengine_client
76 MultiEngineClient = get_multiengine_client
74 TaskClient = get_task_client
77 TaskClient = get_task_client
75
78
76 # This isn't great. I should probably set this up in the ReactorInThread
79 # This isn't great. I should probably set this up in the ReactorInThread
77 # class below. But, it does work for now.
80 # class below. But, it does work for now.
78 log.startLogging(sys.stdout, setStdout=0)
81 log.startLogging(sys.stdout, setStdout=0)
79
82
83 def _result_list_printer(obj, p, cycle):
84 if cycle:
85 return p.text('ResultList(...)')
86 return p.text(repr(obj))
87
88 # ResultList is a list subclass and will use the default pretty printer.
89 # This overrides that to use the __repr__ of ResultList.
90 ip = get_ipython()
91 ip.displayhook.default_formatter.for_type_by_name(
92 'IPython.kernel.multiengineclient', 'ResultList', _result_list_printer
93 )
94
80 # Now we start the reactor in a thread
95 # Now we start the reactor in a thread
81 rit = ReactorInThread()
96 rit = ReactorInThread()
82 rit.setDaemon(True)
97 rit.setDaemon(True)
83 rit.start()
98 rit.start()
84
99
85
100
86 __all__ = [
101 __all__ = [
87 'MapTask',
102 'MapTask',
88 'StringTask',
103 'StringTask',
89 'MultiEngineClient',
104 'MultiEngineClient',
90 'TaskClient',
105 'TaskClient',
91 'CompositeError',
106 'CompositeError',
92 'get_task_client',
107 'get_task_client',
93 'get_multiengine_client',
108 'get_multiengine_client',
94 'Cluster'
109 'Cluster'
95 ]
110 ]
@@ -1,905 +1,905 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3
3
4 """General Classes for IMultiEngine clients."""
4 """General Classes for IMultiEngine clients."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import warnings
20 import warnings
21
21
22 from twisted.python import components
22 from twisted.python import components
23 from twisted.python.failure import Failure
23 from twisted.python.failure import Failure
24 from zope.interface import Interface, implements, Attribute
24 from zope.interface import Interface, implements, Attribute
25
25
26 try:
26 try:
27 from foolscap.api import DeadReferenceError
27 from foolscap.api import DeadReferenceError
28 except ImportError:
28 except ImportError:
29 from foolscap import DeadReferenceError
29 from foolscap import DeadReferenceError
30
30
31 from IPython.utils.coloransi import TermColors
31 from IPython.utils.coloransi import TermColors
32
32
33 from IPython.kernel.twistedutil import blockingCallFromThread
33 from IPython.kernel.twistedutil import blockingCallFromThread
34 from IPython.kernel import error
34 from IPython.kernel import error
35 from IPython.kernel.parallelfunction import ParallelFunction
35 from IPython.kernel.parallelfunction import ParallelFunction
36 from IPython.kernel.mapper import (
36 from IPython.kernel.mapper import (
37 MultiEngineMapper,
37 MultiEngineMapper,
38 IMultiEngineMapperFactory,
38 IMultiEngineMapperFactory,
39 IMapper
39 IMapper
40 )
40 )
41
41
42 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
42 from IPython.kernel.multiengine import IFullSynchronousMultiEngine
43
43
44
44
45 #-------------------------------------------------------------------------------
45 #-------------------------------------------------------------------------------
46 # Pending Result things
46 # Pending Result things
47 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
48
48
49 class IPendingResult(Interface):
49 class IPendingResult(Interface):
50 """A representation of a result that is pending.
50 """A representation of a result that is pending.
51
51
52 This class is similar to Twisted's `Deferred` object, but is designed to be
52 This class is similar to Twisted's `Deferred` object, but is designed to be
53 used in a synchronous context.
53 used in a synchronous context.
54 """
54 """
55
55
56 result_id=Attribute("ID of the deferred on the other side")
56 result_id=Attribute("ID of the deferred on the other side")
57 client=Attribute("A client that I came from")
57 client=Attribute("A client that I came from")
58 r=Attribute("An attribute that is a property that calls and returns get_result")
58 r=Attribute("An attribute that is a property that calls and returns get_result")
59
59
60 def get_result(default=None, block=True):
60 def get_result(default=None, block=True):
61 """
61 """
62 Get a result that is pending.
62 Get a result that is pending.
63
63
64 :Parameters:
64 :Parameters:
65 default
65 default
66 The value to return if the result is not ready.
66 The value to return if the result is not ready.
67 block : boolean
67 block : boolean
68 Should I block for the result.
68 Should I block for the result.
69
69
70 :Returns: The actual result or the default value.
70 :Returns: The actual result or the default value.
71 """
71 """
72
72
73 def add_callback(f, *args, **kwargs):
73 def add_callback(f, *args, **kwargs):
74 """
74 """
75 Add a callback that is called with the result.
75 Add a callback that is called with the result.
76
76
77 If the original result is foo, adding a callback will cause
77 If the original result is foo, adding a callback will cause
78 f(foo, *args, **kwargs) to be returned instead. If multiple
78 f(foo, *args, **kwargs) to be returned instead. If multiple
79 callbacks are registered, they are chained together: the result of
79 callbacks are registered, they are chained together: the result of
80 one is passed to the next and so on.
80 one is passed to the next and so on.
81
81
82 Unlike Twisted's Deferred object, there is no errback chain. Thus
82 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 any exception raised will not be caught and handled. User must
83 any exception raised will not be caught and handled. User must
84 catch these by hand when calling `get_result`.
84 catch these by hand when calling `get_result`.
85 """
85 """
86
86
87
87
88 class PendingResult(object):
88 class PendingResult(object):
89 """A representation of a result that is not yet ready.
89 """A representation of a result that is not yet ready.
90
90
91 A user should not create a `PendingResult` instance by hand.
91 A user should not create a `PendingResult` instance by hand.
92
92
93 Methods:
93 Methods:
94
94
95 * `get_result`
95 * `get_result`
96 * `add_callback`
96 * `add_callback`
97
97
98 Properties:
98 Properties:
99
99
100 * `r`
100 * `r`
101 """
101 """
102
102
103 def __init__(self, client, result_id):
103 def __init__(self, client, result_id):
104 """Create a PendingResult with a result_id and a client instance.
104 """Create a PendingResult with a result_id and a client instance.
105
105
106 The client should implement `_getPendingResult(result_id, block)`.
106 The client should implement `_getPendingResult(result_id, block)`.
107 """
107 """
108 self.client = client
108 self.client = client
109 self.result_id = result_id
109 self.result_id = result_id
110 self.called = False
110 self.called = False
111 self.raised = False
111 self.raised = False
112 self.callbacks = []
112 self.callbacks = []
113
113
114 def get_result(self, default=None, block=True):
114 def get_result(self, default=None, block=True):
115 """Get a result that is pending.
115 """Get a result that is pending.
116
116
117 This method will connect to an IMultiEngine adapted controller
117 This method will connect to an IMultiEngine adapted controller
118 and see if the result is ready. If the action triggers an exception
118 and see if the result is ready. If the action triggers an exception
119 raise it and record it. This method records the result/exception once it is
119 raise it and record it. This method records the result/exception once it is
120 retrieved. Calling `get_result` again will get this cached result or will
120 retrieved. Calling `get_result` again will get this cached result or will
121 re-raise the exception. The .r attribute is a property that calls
121 re-raise the exception. The .r attribute is a property that calls
122 `get_result` with block=True.
122 `get_result` with block=True.
123
123
124 :Parameters:
124 :Parameters:
125 default
125 default
126 The value to return if the result is not ready.
126 The value to return if the result is not ready.
127 block : boolean
127 block : boolean
128 Should I block for the result.
128 Should I block for the result.
129
129
130 :Returns: The actual result or the default value.
130 :Returns: The actual result or the default value.
131 """
131 """
132
132
133 if self.called:
133 if self.called:
134 if self.raised:
134 if self.raised:
135 raise self.result[0], self.result[1], self.result[2]
135 raise self.result[0], self.result[1], self.result[2]
136 else:
136 else:
137 return self.result
137 return self.result
138 try:
138 try:
139 result = self.client.get_pending_deferred(self.result_id, block)
139 result = self.client.get_pending_deferred(self.result_id, block)
140 except error.ResultNotCompleted:
140 except error.ResultNotCompleted:
141 return default
141 return default
142 except:
142 except:
143 # Reraise other error, but first record them so they can be reraised
143 # Reraise other error, but first record them so they can be reraised
144 # later if .r or get_result is called again.
144 # later if .r or get_result is called again.
145 self.result = sys.exc_info()
145 self.result = sys.exc_info()
146 self.called = True
146 self.called = True
147 self.raised = True
147 self.raised = True
148 raise
148 raise
149 else:
149 else:
150 for cb in self.callbacks:
150 for cb in self.callbacks:
151 result = cb[0](result, *cb[1], **cb[2])
151 result = cb[0](result, *cb[1], **cb[2])
152 self.result = result
152 self.result = result
153 self.called = True
153 self.called = True
154 return result
154 return result
155
155
156 def add_callback(self, f, *args, **kwargs):
156 def add_callback(self, f, *args, **kwargs):
157 """Add a callback that is called with the result.
157 """Add a callback that is called with the result.
158
158
159 If the original result is result, adding a callback will cause
159 If the original result is result, adding a callback will cause
160 f(result, *args, **kwargs) to be returned instead. If multiple
160 f(result, *args, **kwargs) to be returned instead. If multiple
161 callbacks are registered, they are chained together: the result of
161 callbacks are registered, they are chained together: the result of
162 one is passed to the next and so on.
162 one is passed to the next and so on.
163
163
164 Unlike Twisted's Deferred object, there is no errback chain. Thus
164 Unlike Twisted's Deferred object, there is no errback chain. Thus
165 any exception raised will not be caught and handled. User must
165 any exception raised will not be caught and handled. User must
166 catch these by hand when calling `get_result`.
166 catch these by hand when calling `get_result`.
167 """
167 """
168 assert callable(f)
168 assert callable(f)
169 self.callbacks.append((f, args, kwargs))
169 self.callbacks.append((f, args, kwargs))
170
170
171 def __cmp__(self, other):
171 def __cmp__(self, other):
172 if self.result_id < other.result_id:
172 if self.result_id < other.result_id:
173 return -1
173 return -1
174 else:
174 else:
175 return 1
175 return 1
176
176
177 def _get_r(self):
177 def _get_r(self):
178 return self.get_result(block=True)
178 return self.get_result(block=True)
179
179
180 r = property(_get_r)
180 r = property(_get_r)
181 """This property is a shortcut to a `get_result(block=True)`."""
181 """This property is a shortcut to a `get_result(block=True)`."""
182
182
183
183
184 #-------------------------------------------------------------------------------
184 #-------------------------------------------------------------------------------
185 # Pretty printing wrappers for certain lists
185 # Pretty printing wrappers for certain lists
186 #-------------------------------------------------------------------------------
186 #-------------------------------------------------------------------------------
187
187
188 class ResultList(list):
188 class ResultList(list):
189 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
189 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
190
190
191 def __repr__(self):
191 def __repr__(self):
192 output = []
192 output = []
193 # These colored prompts were not working on Windows
193 # These colored prompts were not working on Windows
194 if sys.platform == 'win32':
194 if sys.platform == 'win32':
195 blue = normal = red = green = ''
195 blue = normal = red = green = ''
196 else:
196 else:
197 blue = TermColors.Blue
197 blue = TermColors.Blue
198 normal = TermColors.Normal
198 normal = TermColors.Normal
199 red = TermColors.Red
199 red = TermColors.Red
200 green = TermColors.Green
200 green = TermColors.Green
201 output.append("<Results List>\n")
201 output.append("<Results List>\n")
202 for cmd in self:
202 for cmd in self:
203 if isinstance(cmd, Failure):
203 if isinstance(cmd, Failure):
204 output.append(cmd)
204 output.append(cmd)
205 else:
205 else:
206 target = cmd.get('id',None)
206 target = cmd.get('id',None)
207 cmd_num = cmd.get('number',None)
207 cmd_num = cmd.get('number',None)
208 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
208 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
209 cmd_stdout = cmd.get('stdout', None)
209 cmd_stdout = cmd.get('stdout', None)
210 cmd_stderr = cmd.get('stderr', None)
210 cmd_stderr = cmd.get('stderr', None)
211 output.append("%s[%i]%s In [%i]:%s %s\n" % \
211 output.append("%s[%i]%s In [%i]:%s %s\n" % \
212 (green, target,
212 (green, target,
213 blue, cmd_num, normal, cmd_stdin))
213 blue, cmd_num, normal, cmd_stdin))
214 if cmd_stdout:
214 if cmd_stdout:
215 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
215 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
216 (green, target,
216 (green, target,
217 red, cmd_num, normal, cmd_stdout))
217 red, cmd_num, normal, cmd_stdout))
218 if cmd_stderr:
218 if cmd_stderr:
219 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
219 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
220 (green, target,
220 (green, target,
221 red, cmd_num, normal, cmd_stderr))
221 red, cmd_num, normal, cmd_stderr))
222 return ''.join(output)
222 return ''.join(output)
223
223
224
224
225 def wrapResultList(result):
225 def wrapResultList(result):
226 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
226 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
227 if len(result) == 0:
227 if len(result) == 0:
228 result = [result]
228 result = [result]
229 return ResultList(result)
229 return ResultList(result)
230
230
231
231
232 class QueueStatusList(list):
232 class QueueStatusList(list):
233 """A subclass of list that pretty prints the output of `queue_status`."""
233 """A subclass of list that pretty prints the output of `queue_status`."""
234
234
235 def __repr__(self):
235 def __repr__(self):
236 output = []
236 output = []
237 output.append("<Queue Status List>\n")
237 output.append("<Queue Status List>\n")
238 for e in self:
238 for e in self:
239 output.append("Engine: %s\n" % repr(e[0]))
239 output.append("Engine: %s\n" % repr(e[0]))
240 output.append(" Pending: %s\n" % repr(e[1]['pending']))
240 output.append(" Pending: %s\n" % repr(e[1]['pending']))
241 for q in e[1]['queue']:
241 for q in e[1]['queue']:
242 output.append(" Command: %s\n" % repr(q))
242 output.append(" Command: %s\n" % repr(q))
243 return ''.join(output)
243 return ''.join(output)
244
244
245
245
246 #-------------------------------------------------------------------------------
246 #-------------------------------------------------------------------------------
247 # InteractiveMultiEngineClient
247 # InteractiveMultiEngineClient
248 #-------------------------------------------------------------------------------
248 #-------------------------------------------------------------------------------
249
249
250 class InteractiveMultiEngineClient(object):
250 class InteractiveMultiEngineClient(object):
251 """A mixin class that add a few methods to a multiengine client.
251 """A mixin class that add a few methods to a multiengine client.
252
252
253 The methods in this mixin class are designed for interactive usage.
253 The methods in this mixin class are designed for interactive usage.
254 """
254 """
255
255
256 def activate(self):
256 def activate(self):
257 """Make this `MultiEngineClient` active for parallel magic commands.
257 """Make this `MultiEngineClient` active for parallel magic commands.
258
258
259 IPython has a magic command syntax to work with `MultiEngineClient` objects.
259 IPython has a magic command syntax to work with `MultiEngineClient` objects.
260 In a given IPython session there is a single active one. While
260 In a given IPython session there is a single active one. While
261 there can be many `MultiEngineClient` created and used by the user,
261 there can be many `MultiEngineClient` created and used by the user,
262 there is only one active one. The active `MultiEngineClient` is used whenever
262 there is only one active one. The active `MultiEngineClient` is used whenever
263 the magic commands %px and %autopx are used.
263 the magic commands %px and %autopx are used.
264
264
265 The activate() method is called on a given `MultiEngineClient` to make it
265 The activate() method is called on a given `MultiEngineClient` to make it
266 active. Once this has been done, the magic commands can be used.
266 active. Once this has been done, the magic commands can be used.
267 """
267 """
268
268
269 try:
269 try:
270 # This is injected into __builtins__.
270 # This is injected into __builtins__.
271 ip = get_ipython()
271 ip = get_ipython()
272 except NameError:
272 except NameError:
273 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
273 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
274 else:
274 else:
275 pmagic = ip.plugin_manager.get_plugin('parallel_magic')
275 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
276 if pmagic is not None:
276 if pmagic is not None:
277 pmagic.active_multiengine_client = self
277 pmagic.active_multiengine_client = self
278 else:
278 else:
279 print "You must first load the parallelmagic extension " \
279 print "You must first load the parallelmagic extension " \
280 "by doing '%load_ext parallelmagic'"
280 "by doing '%load_ext parallelmagic'"
281
281
282 def __setitem__(self, key, value):
282 def __setitem__(self, key, value):
283 """Add a dictionary interface for pushing/pulling.
283 """Add a dictionary interface for pushing/pulling.
284
284
285 This functions as a shorthand for `push`.
285 This functions as a shorthand for `push`.
286
286
287 :Parameters:
287 :Parameters:
288 key : str
288 key : str
289 What to call the remote object.
289 What to call the remote object.
290 value : object
290 value : object
291 The local Python object to push.
291 The local Python object to push.
292 """
292 """
293 targets, block = self._findTargetsAndBlock()
293 targets, block = self._findTargetsAndBlock()
294 return self.push({key:value}, targets=targets, block=block)
294 return self.push({key:value}, targets=targets, block=block)
295
295
296 def __getitem__(self, key):
296 def __getitem__(self, key):
297 """Add a dictionary interface for pushing/pulling.
297 """Add a dictionary interface for pushing/pulling.
298
298
299 This functions as a shorthand to `pull`.
299 This functions as a shorthand to `pull`.
300
300
301 :Parameters:
301 :Parameters:
302 - `key`: A string representing the key.
302 - `key`: A string representing the key.
303 """
303 """
304 if isinstance(key, str):
304 if isinstance(key, str):
305 targets, block = self._findTargetsAndBlock()
305 targets, block = self._findTargetsAndBlock()
306 return self.pull(key, targets=targets, block=block)
306 return self.pull(key, targets=targets, block=block)
307 else:
307 else:
308 raise TypeError("__getitem__ only takes strs")
308 raise TypeError("__getitem__ only takes strs")
309
309
310 def __len__(self):
310 def __len__(self):
311 """Return the number of available engines."""
311 """Return the number of available engines."""
312 return len(self.get_ids())
312 return len(self.get_ids())
313
313
314
314
315 #-------------------------------------------------------------------------------
315 #-------------------------------------------------------------------------------
316 # The top-level MultiEngine client adaptor
316 # The top-level MultiEngine client adaptor
317 #-------------------------------------------------------------------------------
317 #-------------------------------------------------------------------------------
318
318
319
319
320 _prop_warn = """\
320 _prop_warn = """\
321
321
322 We are currently refactoring the task dependency system. This might
322 We are currently refactoring the task dependency system. This might
323 involve the removal of this method and other methods related to engine
323 involve the removal of this method and other methods related to engine
324 properties. Please see the docstrings for IPython.kernel.TaskRejectError
324 properties. Please see the docstrings for IPython.kernel.TaskRejectError
325 for more information."""
325 for more information."""
326
326
327
327
328 class IFullBlockingMultiEngineClient(Interface):
328 class IFullBlockingMultiEngineClient(Interface):
329 pass
329 pass
330
330
331
331
332 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
332 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
333 """
333 """
334 A blocking client to the `IMultiEngine` controller interface.
334 A blocking client to the `IMultiEngine` controller interface.
335
335
336 This class allows users to use a set of engines for a parallel
336 This class allows users to use a set of engines for a parallel
337 computation through the `IMultiEngine` interface. In this interface,
337 computation through the `IMultiEngine` interface. In this interface,
338 each engine has a specific id (an int) that is used to refer to the
338 each engine has a specific id (an int) that is used to refer to the
339 engine, run code on it, etc.
339 engine, run code on it, etc.
340 """
340 """
341
341
342 implements(
342 implements(
343 IFullBlockingMultiEngineClient,
343 IFullBlockingMultiEngineClient,
344 IMultiEngineMapperFactory,
344 IMultiEngineMapperFactory,
345 IMapper
345 IMapper
346 )
346 )
347
347
348 def __init__(self, smultiengine):
348 def __init__(self, smultiengine):
349 self.smultiengine = smultiengine
349 self.smultiengine = smultiengine
350 self.block = True
350 self.block = True
351 self.targets = 'all'
351 self.targets = 'all'
352
352
353 def _findBlock(self, block=None):
353 def _findBlock(self, block=None):
354 if block is None:
354 if block is None:
355 return self.block
355 return self.block
356 else:
356 else:
357 if block in (True, False):
357 if block in (True, False):
358 return block
358 return block
359 else:
359 else:
360 raise ValueError("block must be True or False")
360 raise ValueError("block must be True or False")
361
361
362 def _findTargets(self, targets=None):
362 def _findTargets(self, targets=None):
363 if targets is None:
363 if targets is None:
364 return self.targets
364 return self.targets
365 else:
365 else:
366 if not isinstance(targets, (str,list,tuple,int)):
366 if not isinstance(targets, (str,list,tuple,int)):
367 raise ValueError("targets must be a str, list, tuple or int")
367 raise ValueError("targets must be a str, list, tuple or int")
368 return targets
368 return targets
369
369
370 def _findTargetsAndBlock(self, targets=None, block=None):
370 def _findTargetsAndBlock(self, targets=None, block=None):
371 return self._findTargets(targets), self._findBlock(block)
371 return self._findTargets(targets), self._findBlock(block)
372
372
373 def _bcft(self, *args, **kwargs):
373 def _bcft(self, *args, **kwargs):
374 try:
374 try:
375 result = blockingCallFromThread(*args, **kwargs)
375 result = blockingCallFromThread(*args, **kwargs)
376 except DeadReferenceError:
376 except DeadReferenceError:
377 raise error.ConnectionError(
377 raise error.ConnectionError(
378 """A connection error has occurred in trying to connect to the
378 """A connection error has occurred in trying to connect to the
379 controller. This is usually caused by the controller dying or
379 controller. This is usually caused by the controller dying or
380 being restarted. To resolve this issue try recreating the
380 being restarted. To resolve this issue try recreating the
381 multiengine client."""
381 multiengine client."""
382 )
382 )
383 else:
383 else:
384 return result
384 return result
385
385
386 def _blockFromThread(self, function, *args, **kwargs):
386 def _blockFromThread(self, function, *args, **kwargs):
387 block = kwargs.get('block', None)
387 block = kwargs.get('block', None)
388 if block is None:
388 if block is None:
389 raise error.MissingBlockArgument("'block' keyword argument is missing")
389 raise error.MissingBlockArgument("'block' keyword argument is missing")
390 result = self._bcft(function, *args, **kwargs)
390 result = self._bcft(function, *args, **kwargs)
391 if not block:
391 if not block:
392 result = PendingResult(self, result)
392 result = PendingResult(self, result)
393 return result
393 return result
394
394
395 def get_pending_deferred(self, deferredID, block):
395 def get_pending_deferred(self, deferredID, block):
396 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
396 return self._bcft(self.smultiengine.get_pending_deferred, deferredID, block)
397
397
398 def barrier(self, pendingResults):
398 def barrier(self, pendingResults):
399 """Synchronize a set of `PendingResults`.
399 """Synchronize a set of `PendingResults`.
400
400
401 This method is a synchronization primitive that waits for a set of
401 This method is a synchronization primitive that waits for a set of
402 `PendingResult` objects to complete. More specifically, barier does
402 `PendingResult` objects to complete. More specifically, barier does
403 the following.
403 the following.
404
404
405 * The `PendingResult`s are sorted by result_id.
405 * The `PendingResult`s are sorted by result_id.
406 * The `get_result` method is called for each `PendingResult` sequentially
406 * The `get_result` method is called for each `PendingResult` sequentially
407 with block=True.
407 with block=True.
408 * If a `PendingResult` gets a result that is an exception, it is
408 * If a `PendingResult` gets a result that is an exception, it is
409 trapped and can be re-raised later by calling `get_result` again.
409 trapped and can be re-raised later by calling `get_result` again.
410 * The `PendingResult`s are flushed from the controller.
410 * The `PendingResult`s are flushed from the controller.
411
411
412 After barrier has been called on a `PendingResult`, its results can
412 After barrier has been called on a `PendingResult`, its results can
413 be retrieved by calling `get_result` again or accesing the `r` attribute
413 be retrieved by calling `get_result` again or accesing the `r` attribute
414 of the instance.
414 of the instance.
415 """
415 """
416
416
417 # Convert to list for sorting and check class type
417 # Convert to list for sorting and check class type
418 prList = list(pendingResults)
418 prList = list(pendingResults)
419 for pr in prList:
419 for pr in prList:
420 if not isinstance(pr, PendingResult):
420 if not isinstance(pr, PendingResult):
421 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
421 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
422
422
423 # Sort the PendingResults so they are in order
423 # Sort the PendingResults so they are in order
424 prList.sort()
424 prList.sort()
425 # Block on each PendingResult object
425 # Block on each PendingResult object
426 for pr in prList:
426 for pr in prList:
427 try:
427 try:
428 result = pr.get_result(block=True)
428 result = pr.get_result(block=True)
429 except Exception:
429 except Exception:
430 pass
430 pass
431
431
432 def flush(self):
432 def flush(self):
433 """
433 """
434 Clear all pending deferreds/results from the controller.
434 Clear all pending deferreds/results from the controller.
435
435
436 For each `PendingResult` that is created by this client, the controller
436 For each `PendingResult` that is created by this client, the controller
437 holds on to the result for that `PendingResult`. This can be a problem
437 holds on to the result for that `PendingResult`. This can be a problem
438 if there are a large number of `PendingResult` objects that are created.
438 if there are a large number of `PendingResult` objects that are created.
439
439
440 Once the result of the `PendingResult` has been retrieved, the result
440 Once the result of the `PendingResult` has been retrieved, the result
441 is removed from the controller, but if a user doesn't get a result (
441 is removed from the controller, but if a user doesn't get a result (
442 they just ignore the `PendingResult`) the result is kept forever on the
442 they just ignore the `PendingResult`) the result is kept forever on the
443 controller. This method allows the user to clear out all un-retrieved
443 controller. This method allows the user to clear out all un-retrieved
444 results on the controller.
444 results on the controller.
445 """
445 """
446 r = self._bcft(self.smultiengine.clear_pending_deferreds)
446 r = self._bcft(self.smultiengine.clear_pending_deferreds)
447 return r
447 return r
448
448
449 clear_pending_results = flush
449 clear_pending_results = flush
450
450
451 #---------------------------------------------------------------------------
451 #---------------------------------------------------------------------------
452 # IEngineMultiplexer related methods
452 # IEngineMultiplexer related methods
453 #---------------------------------------------------------------------------
453 #---------------------------------------------------------------------------
454
454
455 def execute(self, lines, targets=None, block=None):
455 def execute(self, lines, targets=None, block=None):
456 """
456 """
457 Execute code on a set of engines.
457 Execute code on a set of engines.
458
458
459 :Parameters:
459 :Parameters:
460 lines : str
460 lines : str
461 The Python code to execute as a string
461 The Python code to execute as a string
462 targets : id or list of ids
462 targets : id or list of ids
463 The engine to use for the execution
463 The engine to use for the execution
464 block : boolean
464 block : boolean
465 If False, this method will return the actual result. If False,
465 If False, this method will return the actual result. If False,
466 a `PendingResult` is returned which can be used to get the result
466 a `PendingResult` is returned which can be used to get the result
467 at a later time.
467 at a later time.
468 """
468 """
469 targets, block = self._findTargetsAndBlock(targets, block)
469 targets, block = self._findTargetsAndBlock(targets, block)
470 result = self._bcft(self.smultiengine.execute, lines,
470 result = self._bcft(self.smultiengine.execute, lines,
471 targets=targets, block=block)
471 targets=targets, block=block)
472 if block:
472 if block:
473 result = ResultList(result)
473 result = ResultList(result)
474 else:
474 else:
475 result = PendingResult(self, result)
475 result = PendingResult(self, result)
476 result.add_callback(wrapResultList)
476 result.add_callback(wrapResultList)
477 return result
477 return result
478
478
479 def push(self, namespace, targets=None, block=None):
479 def push(self, namespace, targets=None, block=None):
480 """
480 """
481 Push a dictionary of keys and values to engines namespace.
481 Push a dictionary of keys and values to engines namespace.
482
482
483 Each engine has a persistent namespace. This method is used to push
483 Each engine has a persistent namespace. This method is used to push
484 Python objects into that namespace.
484 Python objects into that namespace.
485
485
486 The objects in the namespace must be pickleable.
486 The objects in the namespace must be pickleable.
487
487
488 :Parameters:
488 :Parameters:
489 namespace : dict
489 namespace : dict
490 A dict that contains Python objects to be injected into
490 A dict that contains Python objects to be injected into
491 the engine persistent namespace.
491 the engine persistent namespace.
492 targets : id or list of ids
492 targets : id or list of ids
493 The engine to use for the execution
493 The engine to use for the execution
494 block : boolean
494 block : boolean
495 If False, this method will return the actual result. If False,
495 If False, this method will return the actual result. If False,
496 a `PendingResult` is returned which can be used to get the result
496 a `PendingResult` is returned which can be used to get the result
497 at a later time.
497 at a later time.
498 """
498 """
499 targets, block = self._findTargetsAndBlock(targets, block)
499 targets, block = self._findTargetsAndBlock(targets, block)
500 return self._blockFromThread(self.smultiengine.push, namespace,
500 return self._blockFromThread(self.smultiengine.push, namespace,
501 targets=targets, block=block)
501 targets=targets, block=block)
502
502
503 def pull(self, keys, targets=None, block=None):
503 def pull(self, keys, targets=None, block=None):
504 """
504 """
505 Pull Python objects by key out of engines namespaces.
505 Pull Python objects by key out of engines namespaces.
506
506
507 :Parameters:
507 :Parameters:
508 keys : str or list of str
508 keys : str or list of str
509 The names of the variables to be pulled
509 The names of the variables to be pulled
510 targets : id or list of ids
510 targets : id or list of ids
511 The engine to use for the execution
511 The engine to use for the execution
512 block : boolean
512 block : boolean
513 If False, this method will return the actual result. If False,
513 If False, this method will return the actual result. If False,
514 a `PendingResult` is returned which can be used to get the result
514 a `PendingResult` is returned which can be used to get the result
515 at a later time.
515 at a later time.
516 """
516 """
517 targets, block = self._findTargetsAndBlock(targets, block)
517 targets, block = self._findTargetsAndBlock(targets, block)
518 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
518 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
519
519
520 def push_function(self, namespace, targets=None, block=None):
520 def push_function(self, namespace, targets=None, block=None):
521 """
521 """
522 Push a Python function to an engine.
522 Push a Python function to an engine.
523
523
524 This method is used to push a Python function to an engine. This
524 This method is used to push a Python function to an engine. This
525 method can then be used in code on the engines. Closures are not supported.
525 method can then be used in code on the engines. Closures are not supported.
526
526
527 :Parameters:
527 :Parameters:
528 namespace : dict
528 namespace : dict
529 A dict whose values are the functions to be pushed. The keys give
529 A dict whose values are the functions to be pushed. The keys give
530 that names that the function will appear as in the engines
530 that names that the function will appear as in the engines
531 namespace.
531 namespace.
532 targets : id or list of ids
532 targets : id or list of ids
533 The engine to use for the execution
533 The engine to use for the execution
534 block : boolean
534 block : boolean
535 If False, this method will return the actual result. If False,
535 If False, this method will return the actual result. If False,
536 a `PendingResult` is returned which can be used to get the result
536 a `PendingResult` is returned which can be used to get the result
537 at a later time.
537 at a later time.
538 """
538 """
539 targets, block = self._findTargetsAndBlock(targets, block)
539 targets, block = self._findTargetsAndBlock(targets, block)
540 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
540 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
541
541
542 def pull_function(self, keys, targets=None, block=None):
542 def pull_function(self, keys, targets=None, block=None):
543 """
543 """
544 Pull a Python function from an engine.
544 Pull a Python function from an engine.
545
545
546 This method is used to pull a Python function from an engine.
546 This method is used to pull a Python function from an engine.
547 Closures are not supported.
547 Closures are not supported.
548
548
549 :Parameters:
549 :Parameters:
550 keys : str or list of str
550 keys : str or list of str
551 The names of the functions to be pulled
551 The names of the functions to be pulled
552 targets : id or list of ids
552 targets : id or list of ids
553 The engine to use for the execution
553 The engine to use for the execution
554 block : boolean
554 block : boolean
555 If False, this method will return the actual result. If False,
555 If False, this method will return the actual result. If False,
556 a `PendingResult` is returned which can be used to get the result
556 a `PendingResult` is returned which can be used to get the result
557 at a later time.
557 at a later time.
558 """
558 """
559 targets, block = self._findTargetsAndBlock(targets, block)
559 targets, block = self._findTargetsAndBlock(targets, block)
560 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
560 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
561
561
562 def push_serialized(self, namespace, targets=None, block=None):
562 def push_serialized(self, namespace, targets=None, block=None):
563 targets, block = self._findTargetsAndBlock(targets, block)
563 targets, block = self._findTargetsAndBlock(targets, block)
564 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
564 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
565
565
566 def pull_serialized(self, keys, targets=None, block=None):
566 def pull_serialized(self, keys, targets=None, block=None):
567 targets, block = self._findTargetsAndBlock(targets, block)
567 targets, block = self._findTargetsAndBlock(targets, block)
568 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
568 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
569
569
570 def get_result(self, i=None, targets=None, block=None):
570 def get_result(self, i=None, targets=None, block=None):
571 """
571 """
572 Get a previous result.
572 Get a previous result.
573
573
574 When code is executed in an engine, a dict is created and returned. This
574 When code is executed in an engine, a dict is created and returned. This
575 method retrieves that dict for previous commands.
575 method retrieves that dict for previous commands.
576
576
577 :Parameters:
577 :Parameters:
578 i : int
578 i : int
579 The number of the result to get
579 The number of the result to get
580 targets : id or list of ids
580 targets : id or list of ids
581 The engine to use for the execution
581 The engine to use for the execution
582 block : boolean
582 block : boolean
583 If False, this method will return the actual result. If False,
583 If False, this method will return the actual result. If False,
584 a `PendingResult` is returned which can be used to get the result
584 a `PendingResult` is returned which can be used to get the result
585 at a later time.
585 at a later time.
586 """
586 """
587 targets, block = self._findTargetsAndBlock(targets, block)
587 targets, block = self._findTargetsAndBlock(targets, block)
588 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
588 result = self._bcft(self.smultiengine.get_result, i, targets=targets, block=block)
589 if block:
589 if block:
590 result = ResultList(result)
590 result = ResultList(result)
591 else:
591 else:
592 result = PendingResult(self, result)
592 result = PendingResult(self, result)
593 result.add_callback(wrapResultList)
593 result.add_callback(wrapResultList)
594 return result
594 return result
595
595
596 def reset(self, targets=None, block=None):
596 def reset(self, targets=None, block=None):
597 """
597 """
598 Reset an engine.
598 Reset an engine.
599
599
600 This method clears out the namespace of an engine.
600 This method clears out the namespace of an engine.
601
601
602 :Parameters:
602 :Parameters:
603 targets : id or list of ids
603 targets : id or list of ids
604 The engine to use for the execution
604 The engine to use for the execution
605 block : boolean
605 block : boolean
606 If False, this method will return the actual result. If False,
606 If False, this method will return the actual result. If False,
607 a `PendingResult` is returned which can be used to get the result
607 a `PendingResult` is returned which can be used to get the result
608 at a later time.
608 at a later time.
609 """
609 """
610 targets, block = self._findTargetsAndBlock(targets, block)
610 targets, block = self._findTargetsAndBlock(targets, block)
611 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
611 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
612
612
613 def keys(self, targets=None, block=None):
613 def keys(self, targets=None, block=None):
614 """
614 """
615 Get a list of all the variables in an engine's namespace.
615 Get a list of all the variables in an engine's namespace.
616
616
617 :Parameters:
617 :Parameters:
618 targets : id or list of ids
618 targets : id or list of ids
619 The engine to use for the execution
619 The engine to use for the execution
620 block : boolean
620 block : boolean
621 If False, this method will return the actual result. If False,
621 If False, this method will return the actual result. If False,
622 a `PendingResult` is returned which can be used to get the result
622 a `PendingResult` is returned which can be used to get the result
623 at a later time.
623 at a later time.
624 """
624 """
625 targets, block = self._findTargetsAndBlock(targets, block)
625 targets, block = self._findTargetsAndBlock(targets, block)
626 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
626 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
627
627
628 def kill(self, controller=False, targets=None, block=None):
628 def kill(self, controller=False, targets=None, block=None):
629 """
629 """
630 Kill the engines and controller.
630 Kill the engines and controller.
631
631
632 This method is used to stop the engine and controller by calling
632 This method is used to stop the engine and controller by calling
633 `reactor.stop`.
633 `reactor.stop`.
634
634
635 :Parameters:
635 :Parameters:
636 controller : boolean
636 controller : boolean
637 If True, kill the engines and controller. If False, just the
637 If True, kill the engines and controller. If False, just the
638 engines
638 engines
639 targets : id or list of ids
639 targets : id or list of ids
640 The engine to use for the execution
640 The engine to use for the execution
641 block : boolean
641 block : boolean
642 If False, this method will return the actual result. If False,
642 If False, this method will return the actual result. If False,
643 a `PendingResult` is returned which can be used to get the result
643 a `PendingResult` is returned which can be used to get the result
644 at a later time.
644 at a later time.
645 """
645 """
646 targets, block = self._findTargetsAndBlock(targets, block)
646 targets, block = self._findTargetsAndBlock(targets, block)
647 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
647 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
648
648
649 def clear_queue(self, targets=None, block=None):
649 def clear_queue(self, targets=None, block=None):
650 """
650 """
651 Clear out the controller's queue for an engine.
651 Clear out the controller's queue for an engine.
652
652
653 The controller maintains a queue for each engine. This clear it out.
653 The controller maintains a queue for each engine. This clear it out.
654
654
655 :Parameters:
655 :Parameters:
656 targets : id or list of ids
656 targets : id or list of ids
657 The engine to use for the execution
657 The engine to use for the execution
658 block : boolean
658 block : boolean
659 If False, this method will return the actual result. If False,
659 If False, this method will return the actual result. If False,
660 a `PendingResult` is returned which can be used to get the result
660 a `PendingResult` is returned which can be used to get the result
661 at a later time.
661 at a later time.
662 """
662 """
663 targets, block = self._findTargetsAndBlock(targets, block)
663 targets, block = self._findTargetsAndBlock(targets, block)
664 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
664 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
665
665
666 def queue_status(self, targets=None, block=None):
666 def queue_status(self, targets=None, block=None):
667 """
667 """
668 Get the status of an engines queue.
668 Get the status of an engines queue.
669
669
670 :Parameters:
670 :Parameters:
671 targets : id or list of ids
671 targets : id or list of ids
672 The engine to use for the execution
672 The engine to use for the execution
673 block : boolean
673 block : boolean
674 If False, this method will return the actual result. If False,
674 If False, this method will return the actual result. If False,
675 a `PendingResult` is returned which can be used to get the result
675 a `PendingResult` is returned which can be used to get the result
676 at a later time.
676 at a later time.
677 """
677 """
678 targets, block = self._findTargetsAndBlock(targets, block)
678 targets, block = self._findTargetsAndBlock(targets, block)
679 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
679 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
680
680
681 def set_properties(self, properties, targets=None, block=None):
681 def set_properties(self, properties, targets=None, block=None):
682 warnings.warn(_prop_warn)
682 warnings.warn(_prop_warn)
683 targets, block = self._findTargetsAndBlock(targets, block)
683 targets, block = self._findTargetsAndBlock(targets, block)
684 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
684 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
685
685
686 def get_properties(self, keys=None, targets=None, block=None):
686 def get_properties(self, keys=None, targets=None, block=None):
687 warnings.warn(_prop_warn)
687 warnings.warn(_prop_warn)
688 targets, block = self._findTargetsAndBlock(targets, block)
688 targets, block = self._findTargetsAndBlock(targets, block)
689 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
689 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
690
690
691 def has_properties(self, keys, targets=None, block=None):
691 def has_properties(self, keys, targets=None, block=None):
692 warnings.warn(_prop_warn)
692 warnings.warn(_prop_warn)
693 targets, block = self._findTargetsAndBlock(targets, block)
693 targets, block = self._findTargetsAndBlock(targets, block)
694 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
694 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
695
695
696 def del_properties(self, keys, targets=None, block=None):
696 def del_properties(self, keys, targets=None, block=None):
697 warnings.warn(_prop_warn)
697 warnings.warn(_prop_warn)
698 targets, block = self._findTargetsAndBlock(targets, block)
698 targets, block = self._findTargetsAndBlock(targets, block)
699 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
699 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
700
700
701 def clear_properties(self, targets=None, block=None):
701 def clear_properties(self, targets=None, block=None):
702 warnings.warn(_prop_warn)
702 warnings.warn(_prop_warn)
703 targets, block = self._findTargetsAndBlock(targets, block)
703 targets, block = self._findTargetsAndBlock(targets, block)
704 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
704 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
705
705
706 #---------------------------------------------------------------------------
706 #---------------------------------------------------------------------------
707 # IMultiEngine related methods
707 # IMultiEngine related methods
708 #---------------------------------------------------------------------------
708 #---------------------------------------------------------------------------
709
709
710 def get_ids(self):
710 def get_ids(self):
711 """
711 """
712 Returns the ids of currently registered engines.
712 Returns the ids of currently registered engines.
713 """
713 """
714 result = self._bcft(self.smultiengine.get_ids)
714 result = self._bcft(self.smultiengine.get_ids)
715 return result
715 return result
716
716
717 #---------------------------------------------------------------------------
717 #---------------------------------------------------------------------------
718 # IMultiEngineCoordinator
718 # IMultiEngineCoordinator
719 #---------------------------------------------------------------------------
719 #---------------------------------------------------------------------------
720
720
721 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
721 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
722 """
722 """
723 Partition a Python sequence and send the partitions to a set of engines.
723 Partition a Python sequence and send the partitions to a set of engines.
724 """
724 """
725 targets, block = self._findTargetsAndBlock(targets, block)
725 targets, block = self._findTargetsAndBlock(targets, block)
726 return self._blockFromThread(self.smultiengine.scatter, key, seq,
726 return self._blockFromThread(self.smultiengine.scatter, key, seq,
727 dist, flatten, targets=targets, block=block)
727 dist, flatten, targets=targets, block=block)
728
728
729 def gather(self, key, dist='b', targets=None, block=None):
729 def gather(self, key, dist='b', targets=None, block=None):
730 """
730 """
731 Gather a partitioned sequence on a set of engines as a single local seq.
731 Gather a partitioned sequence on a set of engines as a single local seq.
732 """
732 """
733 targets, block = self._findTargetsAndBlock(targets, block)
733 targets, block = self._findTargetsAndBlock(targets, block)
734 return self._blockFromThread(self.smultiengine.gather, key, dist,
734 return self._blockFromThread(self.smultiengine.gather, key, dist,
735 targets=targets, block=block)
735 targets=targets, block=block)
736
736
737 def raw_map(self, func, seq, dist='b', targets=None, block=None):
737 def raw_map(self, func, seq, dist='b', targets=None, block=None):
738 """
738 """
739 A parallelized version of Python's builtin map.
739 A parallelized version of Python's builtin map.
740
740
741 This has a slightly different syntax than the builtin `map`.
741 This has a slightly different syntax than the builtin `map`.
742 This is needed because we need to have keyword arguments and thus
742 This is needed because we need to have keyword arguments and thus
743 can't use *args to capture all the sequences. Instead, they must
743 can't use *args to capture all the sequences. Instead, they must
744 be passed in a list or tuple.
744 be passed in a list or tuple.
745
745
746 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
746 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
747
747
748 Most users will want to use parallel functions or the `mapper`
748 Most users will want to use parallel functions or the `mapper`
749 and `map` methods for an API that follows that of the builtin
749 and `map` methods for an API that follows that of the builtin
750 `map`.
750 `map`.
751 """
751 """
752 targets, block = self._findTargetsAndBlock(targets, block)
752 targets, block = self._findTargetsAndBlock(targets, block)
753 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
753 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
754 dist, targets=targets, block=block)
754 dist, targets=targets, block=block)
755
755
756 def map(self, func, *sequences):
756 def map(self, func, *sequences):
757 """
757 """
758 A parallel version of Python's builtin `map` function.
758 A parallel version of Python's builtin `map` function.
759
759
760 This method applies a function to sequences of arguments. It
760 This method applies a function to sequences of arguments. It
761 follows the same syntax as the builtin `map`.
761 follows the same syntax as the builtin `map`.
762
762
763 This method creates a mapper objects by calling `self.mapper` with
763 This method creates a mapper objects by calling `self.mapper` with
764 no arguments and then uses that mapper to do the mapping. See
764 no arguments and then uses that mapper to do the mapping. See
765 the documentation of `mapper` for more details.
765 the documentation of `mapper` for more details.
766 """
766 """
767 return self.mapper().map(func, *sequences)
767 return self.mapper().map(func, *sequences)
768
768
769 def mapper(self, dist='b', targets='all', block=None):
769 def mapper(self, dist='b', targets='all', block=None):
770 """
770 """
771 Create a mapper object that has a `map` method.
771 Create a mapper object that has a `map` method.
772
772
773 This method returns an object that implements the `IMapper`
773 This method returns an object that implements the `IMapper`
774 interface. This method is a factory that is used to control how
774 interface. This method is a factory that is used to control how
775 the map happens.
775 the map happens.
776
776
777 :Parameters:
777 :Parameters:
778 dist : str
778 dist : str
779 What decomposition to use, 'b' is the only one supported
779 What decomposition to use, 'b' is the only one supported
780 currently
780 currently
781 targets : str, int, sequence of ints
781 targets : str, int, sequence of ints
782 Which engines to use for the map
782 Which engines to use for the map
783 block : boolean
783 block : boolean
784 Should calls to `map` block or not
784 Should calls to `map` block or not
785 """
785 """
786 return MultiEngineMapper(self, dist, targets, block)
786 return MultiEngineMapper(self, dist, targets, block)
787
787
788 def parallel(self, dist='b', targets=None, block=None):
788 def parallel(self, dist='b', targets=None, block=None):
789 """
789 """
790 A decorator that turns a function into a parallel function.
790 A decorator that turns a function into a parallel function.
791
791
792 This can be used as:
792 This can be used as:
793
793
794 @parallel()
794 @parallel()
795 def f(x, y)
795 def f(x, y)
796 ...
796 ...
797
797
798 f(range(10), range(10))
798 f(range(10), range(10))
799
799
800 This causes f(0,0), f(1,1), ... to be called in parallel.
800 This causes f(0,0), f(1,1), ... to be called in parallel.
801
801
802 :Parameters:
802 :Parameters:
803 dist : str
803 dist : str
804 What decomposition to use, 'b' is the only one supported
804 What decomposition to use, 'b' is the only one supported
805 currently
805 currently
806 targets : str, int, sequence of ints
806 targets : str, int, sequence of ints
807 Which engines to use for the map
807 Which engines to use for the map
808 block : boolean
808 block : boolean
809 Should calls to `map` block or not
809 Should calls to `map` block or not
810 """
810 """
811 targets, block = self._findTargetsAndBlock(targets, block)
811 targets, block = self._findTargetsAndBlock(targets, block)
812 mapper = self.mapper(dist, targets, block)
812 mapper = self.mapper(dist, targets, block)
813 pf = ParallelFunction(mapper)
813 pf = ParallelFunction(mapper)
814 return pf
814 return pf
815
815
816 #---------------------------------------------------------------------------
816 #---------------------------------------------------------------------------
817 # IMultiEngineExtras
817 # IMultiEngineExtras
818 #---------------------------------------------------------------------------
818 #---------------------------------------------------------------------------
819
819
820 def zip_pull(self, keys, targets=None, block=None):
820 def zip_pull(self, keys, targets=None, block=None):
821 targets, block = self._findTargetsAndBlock(targets, block)
821 targets, block = self._findTargetsAndBlock(targets, block)
822 return self._blockFromThread(self.smultiengine.zip_pull, keys,
822 return self._blockFromThread(self.smultiengine.zip_pull, keys,
823 targets=targets, block=block)
823 targets=targets, block=block)
824
824
825 def run(self, filename, targets=None, block=None):
825 def run(self, filename, targets=None, block=None):
826 """
826 """
827 Run a Python code in a file on the engines.
827 Run a Python code in a file on the engines.
828
828
829 :Parameters:
829 :Parameters:
830 filename : str
830 filename : str
831 The name of the local file to run
831 The name of the local file to run
832 targets : id or list of ids
832 targets : id or list of ids
833 The engine to use for the execution
833 The engine to use for the execution
834 block : boolean
834 block : boolean
835 If False, this method will return the actual result. If False,
835 If False, this method will return the actual result. If False,
836 a `PendingResult` is returned which can be used to get the result
836 a `PendingResult` is returned which can be used to get the result
837 at a later time.
837 at a later time.
838 """
838 """
839 targets, block = self._findTargetsAndBlock(targets, block)
839 targets, block = self._findTargetsAndBlock(targets, block)
840 return self._blockFromThread(self.smultiengine.run, filename,
840 return self._blockFromThread(self.smultiengine.run, filename,
841 targets=targets, block=block)
841 targets=targets, block=block)
842
842
843 def benchmark(self, push_size=10000):
843 def benchmark(self, push_size=10000):
844 """
844 """
845 Run performance benchmarks for the current IPython cluster.
845 Run performance benchmarks for the current IPython cluster.
846
846
847 This method tests both the latency of sending command and data to the
847 This method tests both the latency of sending command and data to the
848 engines as well as the throughput of sending large objects to the
848 engines as well as the throughput of sending large objects to the
849 engines using push. The latency is measured by having one or more
849 engines using push. The latency is measured by having one or more
850 engines execute the command 'pass'. The throughput is measure by
850 engines execute the command 'pass'. The throughput is measure by
851 sending an NumPy array of size `push_size` to one or more engines.
851 sending an NumPy array of size `push_size` to one or more engines.
852
852
853 These benchmarks will vary widely on different hardware and networks
853 These benchmarks will vary widely on different hardware and networks
854 and thus can be used to get an idea of the performance characteristics
854 and thus can be used to get an idea of the performance characteristics
855 of a particular configuration of an IPython controller and engines.
855 of a particular configuration of an IPython controller and engines.
856
856
857 This function is not testable within our current testing framework.
857 This function is not testable within our current testing framework.
858 """
858 """
859 import timeit, __builtin__
859 import timeit, __builtin__
860 __builtin__._mec_self = self
860 __builtin__._mec_self = self
861 benchmarks = {}
861 benchmarks = {}
862 repeat = 3
862 repeat = 3
863 count = 10
863 count = 10
864
864
865 timer = timeit.Timer('_mec_self.execute("pass",0)')
865 timer = timeit.Timer('_mec_self.execute("pass",0)')
866 result = 1000*min(timer.repeat(repeat,count))/count
866 result = 1000*min(timer.repeat(repeat,count))/count
867 benchmarks['single_engine_latency'] = (result,'msec')
867 benchmarks['single_engine_latency'] = (result,'msec')
868
868
869 timer = timeit.Timer('_mec_self.execute("pass")')
869 timer = timeit.Timer('_mec_self.execute("pass")')
870 result = 1000*min(timer.repeat(repeat,count))/count
870 result = 1000*min(timer.repeat(repeat,count))/count
871 benchmarks['all_engine_latency'] = (result,'msec')
871 benchmarks['all_engine_latency'] = (result,'msec')
872
872
873 try:
873 try:
874 import numpy as np
874 import numpy as np
875 except:
875 except:
876 pass
876 pass
877 else:
877 else:
878 timer = timeit.Timer(
878 timer = timeit.Timer(
879 "_mec_self.push(d)",
879 "_mec_self.push(d)",
880 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
880 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
881 )
881 )
882 result = min(timer.repeat(repeat,count))/count
882 result = min(timer.repeat(repeat,count))/count
883 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
883 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
884
884
885 try:
885 try:
886 import numpy as np
886 import numpy as np
887 except:
887 except:
888 pass
888 pass
889 else:
889 else:
890 timer = timeit.Timer(
890 timer = timeit.Timer(
891 "_mec_self.push(d,0)",
891 "_mec_self.push(d,0)",
892 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
892 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
893 )
893 )
894 result = min(timer.repeat(repeat,count))/count
894 result = min(timer.repeat(repeat,count))/count
895 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
895 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
896
896
897 return benchmarks
897 return benchmarks
898
898
899
899
900 components.registerAdapter(FullBlockingMultiEngineClient,
900 components.registerAdapter(FullBlockingMultiEngineClient,
901 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
901 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
902
902
903
903
904
904
905
905
General Comments 0
You need to be logged in to leave comments. Login now