|
@@
-1,959
+1,963
b''
|
|
1
|
# encoding: utf-8
|
|
1
|
# encoding: utf-8
|
|
2
|
# -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
|
|
2
|
# -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
|
|
3
|
|
|
3
|
|
|
4
|
"""General Classes for IMultiEngine clients."""
|
|
4
|
"""General Classes for IMultiEngine clients."""
|
|
5
|
|
|
5
|
|
|
6
|
__docformat__ = "restructuredtext en"
|
|
6
|
__docformat__ = "restructuredtext en"
|
|
7
|
|
|
7
|
|
|
8
|
#-------------------------------------------------------------------------------
|
|
8
|
#-------------------------------------------------------------------------------
|
|
9
|
# Copyright (C) 2008 The IPython Development Team
|
|
9
|
# Copyright (C) 2008 The IPython Development Team
|
|
10
|
#
|
|
10
|
#
|
|
11
|
# Distributed under the terms of the BSD License. The full license is in
|
|
11
|
# Distributed under the terms of the BSD License. The full license is in
|
|
12
|
# the file COPYING, distributed as part of this software.
|
|
12
|
# the file COPYING, distributed as part of this software.
|
|
13
|
#-------------------------------------------------------------------------------
|
|
13
|
#-------------------------------------------------------------------------------
|
|
14
|
|
|
14
|
|
|
15
|
#-------------------------------------------------------------------------------
|
|
15
|
#-------------------------------------------------------------------------------
|
|
16
|
# Imports
|
|
16
|
# Imports
|
|
17
|
#-------------------------------------------------------------------------------
|
|
17
|
#-------------------------------------------------------------------------------
|
|
18
|
|
|
18
|
|
|
19
|
import sys
|
|
19
|
import sys
|
|
20
|
import linecache
|
|
20
|
import linecache
|
|
21
|
import warnings
|
|
21
|
import warnings
|
|
22
|
|
|
22
|
|
|
23
|
from twisted.python import components
|
|
23
|
from twisted.python import components
|
|
24
|
from twisted.python.failure import Failure
|
|
24
|
from twisted.python.failure import Failure
|
|
25
|
from zope.interface import Interface, implements, Attribute
|
|
25
|
from zope.interface import Interface, implements, Attribute
|
|
26
|
|
|
26
|
|
|
27
|
from IPython.utils.coloransi import TermColors
|
|
27
|
from IPython.utils.coloransi import TermColors
|
|
28
|
|
|
28
|
|
|
29
|
from IPython.kernel.twistedutil import blockingCallFromThread
|
|
29
|
from IPython.kernel.twistedutil import blockingCallFromThread
|
|
30
|
from IPython.kernel import error
|
|
30
|
from IPython.kernel import error
|
|
31
|
from IPython.kernel.parallelfunction import ParallelFunction
|
|
31
|
from IPython.kernel.parallelfunction import ParallelFunction
|
|
32
|
from IPython.kernel.mapper import (
|
|
32
|
from IPython.kernel.mapper import (
|
|
33
|
MultiEngineMapper,
|
|
33
|
MultiEngineMapper,
|
|
34
|
IMultiEngineMapperFactory,
|
|
34
|
IMultiEngineMapperFactory,
|
|
35
|
IMapper
|
|
35
|
IMapper
|
|
36
|
)
|
|
36
|
)
|
|
37
|
|
|
37
|
|
|
38
|
from IPython.kernel.multiengine import IFullSynchronousMultiEngine
|
|
38
|
from IPython.kernel.multiengine import IFullSynchronousMultiEngine
|
|
39
|
|
|
39
|
|
|
40
|
|
|
40
|
|
|
41
|
#-------------------------------------------------------------------------------
|
|
41
|
#-------------------------------------------------------------------------------
|
|
42
|
# Pending Result things
|
|
42
|
# Pending Result things
|
|
43
|
#-------------------------------------------------------------------------------
|
|
43
|
#-------------------------------------------------------------------------------
|
|
44
|
|
|
44
|
|
|
45
|
class IPendingResult(Interface):
|
|
45
|
class IPendingResult(Interface):
|
|
46
|
"""A representation of a result that is pending.
|
|
46
|
"""A representation of a result that is pending.
|
|
47
|
|
|
47
|
|
|
48
|
This class is similar to Twisted's `Deferred` object, but is designed to be
|
|
48
|
This class is similar to Twisted's `Deferred` object, but is designed to be
|
|
49
|
used in a synchronous context.
|
|
49
|
used in a synchronous context.
|
|
50
|
"""
|
|
50
|
"""
|
|
51
|
|
|
51
|
|
|
52
|
result_id=Attribute("ID of the deferred on the other side")
|
|
52
|
result_id=Attribute("ID of the deferred on the other side")
|
|
53
|
client=Attribute("A client that I came from")
|
|
53
|
client=Attribute("A client that I came from")
|
|
54
|
r=Attribute("An attribute that is a property that calls and returns get_result")
|
|
54
|
r=Attribute("An attribute that is a property that calls and returns get_result")
|
|
55
|
|
|
55
|
|
|
56
|
def get_result(default=None, block=True):
|
|
56
|
def get_result(default=None, block=True):
|
|
57
|
"""
|
|
57
|
"""
|
|
58
|
Get a result that is pending.
|
|
58
|
Get a result that is pending.
|
|
59
|
|
|
59
|
|
|
60
|
:Parameters:
|
|
60
|
:Parameters:
|
|
61
|
default
|
|
61
|
default
|
|
62
|
The value to return if the result is not ready.
|
|
62
|
The value to return if the result is not ready.
|
|
63
|
block : boolean
|
|
63
|
block : boolean
|
|
64
|
Should I block for the result.
|
|
64
|
Should I block for the result.
|
|
65
|
|
|
65
|
|
|
66
|
:Returns: The actual result or the default value.
|
|
66
|
:Returns: The actual result or the default value.
|
|
67
|
"""
|
|
67
|
"""
|
|
68
|
|
|
68
|
|
|
69
|
def add_callback(f, *args, **kwargs):
|
|
69
|
def add_callback(f, *args, **kwargs):
|
|
70
|
"""
|
|
70
|
"""
|
|
71
|
Add a callback that is called with the result.
|
|
71
|
Add a callback that is called with the result.
|
|
72
|
|
|
72
|
|
|
73
|
If the original result is foo, adding a callback will cause
|
|
73
|
If the original result is foo, adding a callback will cause
|
|
74
|
f(foo, *args, **kwargs) to be returned instead. If multiple
|
|
74
|
f(foo, *args, **kwargs) to be returned instead. If multiple
|
|
75
|
callbacks are registered, they are chained together: the result of
|
|
75
|
callbacks are registered, they are chained together: the result of
|
|
76
|
one is passed to the next and so on.
|
|
76
|
one is passed to the next and so on.
|
|
77
|
|
|
77
|
|
|
78
|
Unlike Twisted's Deferred object, there is no errback chain. Thus
|
|
78
|
Unlike Twisted's Deferred object, there is no errback chain. Thus
|
|
79
|
any exception raised will not be caught and handled. User must
|
|
79
|
any exception raised will not be caught and handled. User must
|
|
80
|
catch these by hand when calling `get_result`.
|
|
80
|
catch these by hand when calling `get_result`.
|
|
81
|
"""
|
|
81
|
"""
|
|
82
|
|
|
82
|
|
|
83
|
|
|
83
|
|
|
84
|
class PendingResult(object):
|
|
84
|
class PendingResult(object):
|
|
85
|
"""A representation of a result that is not yet ready.
|
|
85
|
"""A representation of a result that is not yet ready.
|
|
86
|
|
|
86
|
|
|
87
|
A user should not create a `PendingResult` instance by hand.
|
|
87
|
A user should not create a `PendingResult` instance by hand.
|
|
88
|
|
|
88
|
|
|
89
|
Methods:
|
|
89
|
Methods:
|
|
90
|
|
|
90
|
|
|
91
|
* `get_result`
|
|
91
|
* `get_result`
|
|
92
|
* `add_callback`
|
|
92
|
* `add_callback`
|
|
93
|
|
|
93
|
|
|
94
|
Properties:
|
|
94
|
Properties:
|
|
95
|
|
|
95
|
|
|
96
|
* `r`
|
|
96
|
* `r`
|
|
97
|
"""
|
|
97
|
"""
|
|
98
|
|
|
98
|
|
|
99
|
def __init__(self, client, result_id):
|
|
99
|
def __init__(self, client, result_id):
|
|
100
|
"""Create a PendingResult with a result_id and a client instance.
|
|
100
|
"""Create a PendingResult with a result_id and a client instance.
|
|
101
|
|
|
101
|
|
|
102
|
The client should implement `_getPendingResult(result_id, block)`.
|
|
102
|
The client should implement `_getPendingResult(result_id, block)`.
|
|
103
|
"""
|
|
103
|
"""
|
|
104
|
self.client = client
|
|
104
|
self.client = client
|
|
105
|
self.result_id = result_id
|
|
105
|
self.result_id = result_id
|
|
106
|
self.called = False
|
|
106
|
self.called = False
|
|
107
|
self.raised = False
|
|
107
|
self.raised = False
|
|
108
|
self.callbacks = []
|
|
108
|
self.callbacks = []
|
|
109
|
|
|
109
|
|
|
110
|
def get_result(self, default=None, block=True):
|
|
110
|
def get_result(self, default=None, block=True):
|
|
111
|
"""Get a result that is pending.
|
|
111
|
"""Get a result that is pending.
|
|
112
|
|
|
112
|
|
|
113
|
This method will connect to an IMultiEngine adapted controller
|
|
113
|
This method will connect to an IMultiEngine adapted controller
|
|
114
|
and see if the result is ready. If the action triggers an exception
|
|
114
|
and see if the result is ready. If the action triggers an exception
|
|
115
|
raise it and record it. This method records the result/exception once it is
|
|
115
|
raise it and record it. This method records the result/exception once it is
|
|
116
|
retrieved. Calling `get_result` again will get this cached result or will
|
|
116
|
retrieved. Calling `get_result` again will get this cached result or will
|
|
117
|
re-raise the exception. The .r attribute is a property that calls
|
|
117
|
re-raise the exception. The .r attribute is a property that calls
|
|
118
|
`get_result` with block=True.
|
|
118
|
`get_result` with block=True.
|
|
119
|
|
|
119
|
|
|
120
|
:Parameters:
|
|
120
|
:Parameters:
|
|
121
|
default
|
|
121
|
default
|
|
122
|
The value to return if the result is not ready.
|
|
122
|
The value to return if the result is not ready.
|
|
123
|
block : boolean
|
|
123
|
block : boolean
|
|
124
|
Should I block for the result.
|
|
124
|
Should I block for the result.
|
|
125
|
|
|
125
|
|
|
126
|
:Returns: The actual result or the default value.
|
|
126
|
:Returns: The actual result or the default value.
|
|
127
|
"""
|
|
127
|
"""
|
|
128
|
|
|
128
|
|
|
129
|
if self.called:
|
|
129
|
if self.called:
|
|
130
|
if self.raised:
|
|
130
|
if self.raised:
|
|
131
|
raise self.result[0], self.result[1], self.result[2]
|
|
131
|
raise self.result[0], self.result[1], self.result[2]
|
|
132
|
else:
|
|
132
|
else:
|
|
133
|
return self.result
|
|
133
|
return self.result
|
|
134
|
try:
|
|
134
|
try:
|
|
135
|
result = self.client.get_pending_deferred(self.result_id, block)
|
|
135
|
result = self.client.get_pending_deferred(self.result_id, block)
|
|
136
|
except error.ResultNotCompleted:
|
|
136
|
except error.ResultNotCompleted:
|
|
137
|
return default
|
|
137
|
return default
|
|
138
|
except:
|
|
138
|
except:
|
|
139
|
# Reraise other error, but first record them so they can be reraised
|
|
139
|
# Reraise other error, but first record them so they can be reraised
|
|
140
|
# later if .r or get_result is called again.
|
|
140
|
# later if .r or get_result is called again.
|
|
141
|
self.result = sys.exc_info()
|
|
141
|
self.result = sys.exc_info()
|
|
142
|
self.called = True
|
|
142
|
self.called = True
|
|
143
|
self.raised = True
|
|
143
|
self.raised = True
|
|
144
|
raise
|
|
144
|
raise
|
|
145
|
else:
|
|
145
|
else:
|
|
146
|
for cb in self.callbacks:
|
|
146
|
for cb in self.callbacks:
|
|
147
|
result = cb[0](result, *cb[1], **cb[2])
|
|
147
|
result = cb[0](result, *cb[1], **cb[2])
|
|
148
|
self.result = result
|
|
148
|
self.result = result
|
|
149
|
self.called = True
|
|
149
|
self.called = True
|
|
150
|
return result
|
|
150
|
return result
|
|
151
|
|
|
151
|
|
|
152
|
def add_callback(self, f, *args, **kwargs):
|
|
152
|
def add_callback(self, f, *args, **kwargs):
|
|
153
|
"""Add a callback that is called with the result.
|
|
153
|
"""Add a callback that is called with the result.
|
|
154
|
|
|
154
|
|
|
155
|
If the original result is result, adding a callback will cause
|
|
155
|
If the original result is result, adding a callback will cause
|
|
156
|
f(result, *args, **kwargs) to be returned instead. If multiple
|
|
156
|
f(result, *args, **kwargs) to be returned instead. If multiple
|
|
157
|
callbacks are registered, they are chained together: the result of
|
|
157
|
callbacks are registered, they are chained together: the result of
|
|
158
|
one is passed to the next and so on.
|
|
158
|
one is passed to the next and so on.
|
|
159
|
|
|
159
|
|
|
160
|
Unlike Twisted's Deferred object, there is no errback chain. Thus
|
|
160
|
Unlike Twisted's Deferred object, there is no errback chain. Thus
|
|
161
|
any exception raised will not be caught and handled. User must
|
|
161
|
any exception raised will not be caught and handled. User must
|
|
162
|
catch these by hand when calling `get_result`.
|
|
162
|
catch these by hand when calling `get_result`.
|
|
163
|
"""
|
|
163
|
"""
|
|
164
|
assert callable(f)
|
|
164
|
assert callable(f)
|
|
165
|
self.callbacks.append((f, args, kwargs))
|
|
165
|
self.callbacks.append((f, args, kwargs))
|
|
166
|
|
|
166
|
|
|
167
|
def __cmp__(self, other):
|
|
167
|
def __cmp__(self, other):
|
|
168
|
if self.result_id < other.result_id:
|
|
168
|
if self.result_id < other.result_id:
|
|
169
|
return -1
|
|
169
|
return -1
|
|
170
|
else:
|
|
170
|
else:
|
|
171
|
return 1
|
|
171
|
return 1
|
|
172
|
|
|
172
|
|
|
173
|
def _get_r(self):
|
|
173
|
def _get_r(self):
|
|
174
|
return self.get_result(block=True)
|
|
174
|
return self.get_result(block=True)
|
|
175
|
|
|
175
|
|
|
176
|
r = property(_get_r)
|
|
176
|
r = property(_get_r)
|
|
177
|
"""This property is a shortcut to a `get_result(block=True)`."""
|
|
177
|
"""This property is a shortcut to a `get_result(block=True)`."""
|
|
178
|
|
|
178
|
|
|
179
|
|
|
179
|
|
|
180
|
#-------------------------------------------------------------------------------
|
|
180
|
#-------------------------------------------------------------------------------
|
|
181
|
# Pretty printing wrappers for certain lists
|
|
181
|
# Pretty printing wrappers for certain lists
|
|
182
|
#-------------------------------------------------------------------------------
|
|
182
|
#-------------------------------------------------------------------------------
|
|
183
|
|
|
183
|
|
|
184
|
class ResultList(list):
|
|
184
|
class ResultList(list):
|
|
185
|
"""A subclass of list that pretty prints the output of `execute`/`get_result`."""
|
|
185
|
"""A subclass of list that pretty prints the output of `execute`/`get_result`."""
|
|
186
|
|
|
186
|
|
|
187
|
def __repr__(self):
|
|
187
|
def __repr__(self):
|
|
188
|
output = []
|
|
188
|
output = []
|
|
189
|
# These colored prompts were not working on Windows
|
|
189
|
# These colored prompts were not working on Windows
|
|
190
|
if sys.platform == 'win32':
|
|
190
|
if sys.platform == 'win32':
|
|
191
|
blue = normal = red = green = ''
|
|
191
|
blue = normal = red = green = ''
|
|
192
|
else:
|
|
192
|
else:
|
|
193
|
blue = TermColors.Blue
|
|
193
|
blue = TermColors.Blue
|
|
194
|
normal = TermColors.Normal
|
|
194
|
normal = TermColors.Normal
|
|
195
|
red = TermColors.Red
|
|
195
|
red = TermColors.Red
|
|
196
|
green = TermColors.Green
|
|
196
|
green = TermColors.Green
|
|
197
|
output.append("<Results List>\n")
|
|
197
|
output.append("<Results List>\n")
|
|
198
|
for cmd in self:
|
|
198
|
for cmd in self:
|
|
199
|
if isinstance(cmd, Failure):
|
|
199
|
if isinstance(cmd, Failure):
|
|
200
|
output.append(cmd)
|
|
200
|
output.append(cmd)
|
|
201
|
else:
|
|
201
|
else:
|
|
202
|
target = cmd.get('id',None)
|
|
202
|
target = cmd.get('id',None)
|
|
203
|
cmd_num = cmd.get('number',None)
|
|
203
|
cmd_num = cmd.get('number',None)
|
|
204
|
cmd_stdin = cmd.get('input',{}).get('translated','No Input')
|
|
204
|
cmd_stdin = cmd.get('input',{}).get('translated','No Input')
|
|
205
|
cmd_stdout = cmd.get('stdout', None)
|
|
205
|
cmd_stdout = cmd.get('stdout', None)
|
|
206
|
cmd_stderr = cmd.get('stderr', None)
|
|
206
|
cmd_stderr = cmd.get('stderr', None)
|
|
207
|
output.append("%s[%i]%s In [%i]:%s %s\n" % \
|
|
207
|
output.append("%s[%i]%s In [%i]:%s %s\n" % \
|
|
208
|
(green, target,
|
|
208
|
(green, target,
|
|
209
|
blue, cmd_num, normal, cmd_stdin))
|
|
209
|
blue, cmd_num, normal, cmd_stdin))
|
|
210
|
if cmd_stdout:
|
|
210
|
if cmd_stdout:
|
|
211
|
output.append("%s[%i]%s Out[%i]:%s %s\n" % \
|
|
211
|
output.append("%s[%i]%s Out[%i]:%s %s\n" % \
|
|
212
|
(green, target,
|
|
212
|
(green, target,
|
|
213
|
red, cmd_num, normal, cmd_stdout))
|
|
213
|
red, cmd_num, normal, cmd_stdout))
|
|
214
|
if cmd_stderr:
|
|
214
|
if cmd_stderr:
|
|
215
|
output.append("%s[%i]%s Err[%i]:\n%s %s" % \
|
|
215
|
output.append("%s[%i]%s Err[%i]:\n%s %s" % \
|
|
216
|
(green, target,
|
|
216
|
(green, target,
|
|
217
|
red, cmd_num, normal, cmd_stderr))
|
|
217
|
red, cmd_num, normal, cmd_stderr))
|
|
218
|
return ''.join(output)
|
|
218
|
return ''.join(output)
|
|
219
|
|
|
219
|
|
|
220
|
|
|
220
|
|
|
221
|
def wrapResultList(result):
|
|
221
|
def wrapResultList(result):
|
|
222
|
"""A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
|
|
222
|
"""A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
|
|
223
|
if len(result) == 0:
|
|
223
|
if len(result) == 0:
|
|
224
|
result = [result]
|
|
224
|
result = [result]
|
|
225
|
return ResultList(result)
|
|
225
|
return ResultList(result)
|
|
226
|
|
|
226
|
|
|
227
|
|
|
227
|
|
|
228
|
class QueueStatusList(list):
|
|
228
|
class QueueStatusList(list):
|
|
229
|
"""A subclass of list that pretty prints the output of `queue_status`."""
|
|
229
|
"""A subclass of list that pretty prints the output of `queue_status`."""
|
|
230
|
|
|
230
|
|
|
231
|
def __repr__(self):
|
|
231
|
def __repr__(self):
|
|
232
|
output = []
|
|
232
|
output = []
|
|
233
|
output.append("<Queue Status List>\n")
|
|
233
|
output.append("<Queue Status List>\n")
|
|
234
|
for e in self:
|
|
234
|
for e in self:
|
|
235
|
output.append("Engine: %s\n" % repr(e[0]))
|
|
235
|
output.append("Engine: %s\n" % repr(e[0]))
|
|
236
|
output.append(" Pending: %s\n" % repr(e[1]['pending']))
|
|
236
|
output.append(" Pending: %s\n" % repr(e[1]['pending']))
|
|
237
|
for q in e[1]['queue']:
|
|
237
|
for q in e[1]['queue']:
|
|
238
|
output.append(" Command: %s\n" % repr(q))
|
|
238
|
output.append(" Command: %s\n" % repr(q))
|
|
239
|
return ''.join(output)
|
|
239
|
return ''.join(output)
|
|
240
|
|
|
240
|
|
|
241
|
|
|
241
|
|
|
242
|
#-------------------------------------------------------------------------------
|
|
242
|
#-------------------------------------------------------------------------------
|
|
243
|
# InteractiveMultiEngineClient
|
|
243
|
# InteractiveMultiEngineClient
|
|
244
|
#-------------------------------------------------------------------------------
|
|
244
|
#-------------------------------------------------------------------------------
|
|
245
|
|
|
245
|
|
|
246
|
class InteractiveMultiEngineClient(object):
|
|
246
|
class InteractiveMultiEngineClient(object):
|
|
247
|
"""A mixin class that add a few methods to a multiengine client.
|
|
247
|
"""A mixin class that add a few methods to a multiengine client.
|
|
248
|
|
|
248
|
|
|
249
|
The methods in this mixin class are designed for interactive usage.
|
|
249
|
The methods in this mixin class are designed for interactive usage.
|
|
250
|
"""
|
|
250
|
"""
|
|
251
|
|
|
251
|
|
|
252
|
def activate(self):
|
|
252
|
def activate(self):
|
|
253
|
"""Make this `MultiEngineClient` active for parallel magic commands.
|
|
253
|
"""Make this `MultiEngineClient` active for parallel magic commands.
|
|
254
|
|
|
254
|
|
|
255
|
IPython has a magic command syntax to work with `MultiEngineClient` objects.
|
|
255
|
IPython has a magic command syntax to work with `MultiEngineClient` objects.
|
|
256
|
In a given IPython session there is a single active one. While
|
|
256
|
In a given IPython session there is a single active one. While
|
|
257
|
there can be many `MultiEngineClient` created and used by the user,
|
|
257
|
there can be many `MultiEngineClient` created and used by the user,
|
|
258
|
there is only one active one. The active `MultiEngineClient` is used whenever
|
|
258
|
there is only one active one. The active `MultiEngineClient` is used whenever
|
|
259
|
the magic commands %px and %autopx are used.
|
|
259
|
the magic commands %px and %autopx are used.
|
|
260
|
|
|
260
|
|
|
261
|
The activate() method is called on a given `MultiEngineClient` to make it
|
|
261
|
The activate() method is called on a given `MultiEngineClient` to make it
|
|
262
|
active. Once this has been done, the magic commands can be used.
|
|
262
|
active. Once this has been done, the magic commands can be used.
|
|
263
|
"""
|
|
263
|
"""
|
|
264
|
|
|
264
|
|
|
265
|
try:
|
|
265
|
try:
|
|
266
|
__IPYTHON__.activeController = self
|
|
266
|
# This is injected into __builtins__.
|
|
|
|
|
267
|
ip = get_ipython()
|
|
267
|
except NameError:
|
|
268
|
except NameError:
|
|
268
|
print "The IPython Controller magics only work within IPython."
|
|
269
|
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
|
|
|
|
|
270
|
else:
|
|
|
|
|
271
|
pmagic = ip.get_component('parallel_magic')
|
|
|
|
|
272
|
pmagic.active_multiengine_client = self
|
|
269
|
|
|
273
|
|
|
270
|
def __setitem__(self, key, value):
|
|
274
|
def __setitem__(self, key, value):
|
|
271
|
"""Add a dictionary interface for pushing/pulling.
|
|
275
|
"""Add a dictionary interface for pushing/pulling.
|
|
272
|
|
|
276
|
|
|
273
|
This functions as a shorthand for `push`.
|
|
277
|
This functions as a shorthand for `push`.
|
|
274
|
|
|
278
|
|
|
275
|
:Parameters:
|
|
279
|
:Parameters:
|
|
276
|
key : str
|
|
280
|
key : str
|
|
277
|
What to call the remote object.
|
|
281
|
What to call the remote object.
|
|
278
|
value : object
|
|
282
|
value : object
|
|
279
|
The local Python object to push.
|
|
283
|
The local Python object to push.
|
|
280
|
"""
|
|
284
|
"""
|
|
281
|
targets, block = self._findTargetsAndBlock()
|
|
285
|
targets, block = self._findTargetsAndBlock()
|
|
282
|
return self.push({key:value}, targets=targets, block=block)
|
|
286
|
return self.push({key:value}, targets=targets, block=block)
|
|
283
|
|
|
287
|
|
|
284
|
def __getitem__(self, key):
|
|
288
|
def __getitem__(self, key):
|
|
285
|
"""Add a dictionary interface for pushing/pulling.
|
|
289
|
"""Add a dictionary interface for pushing/pulling.
|
|
286
|
|
|
290
|
|
|
287
|
This functions as a shorthand to `pull`.
|
|
291
|
This functions as a shorthand to `pull`.
|
|
288
|
|
|
292
|
|
|
289
|
:Parameters:
|
|
293
|
:Parameters:
|
|
290
|
- `key`: A string representing the key.
|
|
294
|
- `key`: A string representing the key.
|
|
291
|
"""
|
|
295
|
"""
|
|
292
|
if isinstance(key, str):
|
|
296
|
if isinstance(key, str):
|
|
293
|
targets, block = self._findTargetsAndBlock()
|
|
297
|
targets, block = self._findTargetsAndBlock()
|
|
294
|
return self.pull(key, targets=targets, block=block)
|
|
298
|
return self.pull(key, targets=targets, block=block)
|
|
295
|
else:
|
|
299
|
else:
|
|
296
|
raise TypeError("__getitem__ only takes strs")
|
|
300
|
raise TypeError("__getitem__ only takes strs")
|
|
297
|
|
|
301
|
|
|
298
|
def __len__(self):
|
|
302
|
def __len__(self):
|
|
299
|
"""Return the number of available engines."""
|
|
303
|
"""Return the number of available engines."""
|
|
300
|
return len(self.get_ids())
|
|
304
|
return len(self.get_ids())
|
|
301
|
|
|
305
|
|
|
302
|
#---------------------------------------------------------------------------
|
|
306
|
#---------------------------------------------------------------------------
|
|
303
|
# Make this a context manager for with
|
|
307
|
# Make this a context manager for with
|
|
304
|
#---------------------------------------------------------------------------
|
|
308
|
#---------------------------------------------------------------------------
|
|
305
|
|
|
309
|
|
|
306
|
def findsource_file(self,f):
|
|
310
|
def findsource_file(self,f):
|
|
307
|
linecache.checkcache()
|
|
311
|
linecache.checkcache()
|
|
308
|
s = findsource(f.f_code) # findsource is not defined!
|
|
312
|
s = findsource(f.f_code) # findsource is not defined!
|
|
309
|
lnum = f.f_lineno
|
|
313
|
lnum = f.f_lineno
|
|
310
|
wsource = s[0][f.f_lineno:]
|
|
314
|
wsource = s[0][f.f_lineno:]
|
|
311
|
return strip_whitespace(wsource)
|
|
315
|
return strip_whitespace(wsource)
|
|
312
|
|
|
316
|
|
|
313
|
def findsource_ipython(self,f):
|
|
317
|
def findsource_ipython(self,f):
|
|
314
|
from IPython.core import ipapi
|
|
318
|
from IPython.core import ipapi
|
|
315
|
self.ip = ipapi.get()
|
|
319
|
self.ip = ipapi.get()
|
|
316
|
wsource = [l+'\n' for l in
|
|
320
|
wsource = [l+'\n' for l in
|
|
317
|
self.ip.input_hist_raw[-1].splitlines()[1:]]
|
|
321
|
self.ip.input_hist_raw[-1].splitlines()[1:]]
|
|
318
|
return strip_whitespace(wsource)
|
|
322
|
return strip_whitespace(wsource)
|
|
319
|
|
|
323
|
|
|
320
|
def __enter__(self):
|
|
324
|
def __enter__(self):
|
|
321
|
f = sys._getframe(1)
|
|
325
|
f = sys._getframe(1)
|
|
322
|
local_ns = f.f_locals
|
|
326
|
local_ns = f.f_locals
|
|
323
|
global_ns = f.f_globals
|
|
327
|
global_ns = f.f_globals
|
|
324
|
if f.f_code.co_filename == '<ipython console>':
|
|
328
|
if f.f_code.co_filename == '<ipython console>':
|
|
325
|
s = self.findsource_ipython(f)
|
|
329
|
s = self.findsource_ipython(f)
|
|
326
|
else:
|
|
330
|
else:
|
|
327
|
s = self.findsource_file(f)
|
|
331
|
s = self.findsource_file(f)
|
|
328
|
|
|
332
|
|
|
329
|
self._with_context_result = self.execute(s)
|
|
333
|
self._with_context_result = self.execute(s)
|
|
330
|
|
|
334
|
|
|
331
|
def __exit__ (self, etype, value, tb):
|
|
335
|
def __exit__ (self, etype, value, tb):
|
|
332
|
if issubclass(etype,error.StopLocalExecution):
|
|
336
|
if issubclass(etype,error.StopLocalExecution):
|
|
333
|
return True
|
|
337
|
return True
|
|
334
|
|
|
338
|
|
|
335
|
|
|
339
|
|
|
336
|
def remote():
|
|
340
|
def remote():
|
|
337
|
m = 'Special exception to stop local execution of parallel code.'
|
|
341
|
m = 'Special exception to stop local execution of parallel code.'
|
|
338
|
raise error.StopLocalExecution(m)
|
|
342
|
raise error.StopLocalExecution(m)
|
|
339
|
|
|
343
|
|
|
340
|
def strip_whitespace(source):
|
|
344
|
def strip_whitespace(source):
|
|
341
|
# Expand tabs to avoid any confusion.
|
|
345
|
# Expand tabs to avoid any confusion.
|
|
342
|
wsource = [l.expandtabs(4) for l in source]
|
|
346
|
wsource = [l.expandtabs(4) for l in source]
|
|
343
|
# Detect the indentation level
|
|
347
|
# Detect the indentation level
|
|
344
|
done = False
|
|
348
|
done = False
|
|
345
|
for line in wsource:
|
|
349
|
for line in wsource:
|
|
346
|
if line.isspace():
|
|
350
|
if line.isspace():
|
|
347
|
continue
|
|
351
|
continue
|
|
348
|
for col,char in enumerate(line):
|
|
352
|
for col,char in enumerate(line):
|
|
349
|
if char != ' ':
|
|
353
|
if char != ' ':
|
|
350
|
done = True
|
|
354
|
done = True
|
|
351
|
break
|
|
355
|
break
|
|
352
|
if done:
|
|
356
|
if done:
|
|
353
|
break
|
|
357
|
break
|
|
354
|
# Now we know how much leading space there is in the code. Next, we
|
|
358
|
# Now we know how much leading space there is in the code. Next, we
|
|
355
|
# extract up to the first line that has less indentation.
|
|
359
|
# extract up to the first line that has less indentation.
|
|
356
|
# WARNINGS: we skip comments that may be misindented, but we do NOT yet
|
|
360
|
# WARNINGS: we skip comments that may be misindented, but we do NOT yet
|
|
357
|
# detect triple quoted strings that may have flush left text.
|
|
361
|
# detect triple quoted strings that may have flush left text.
|
|
358
|
for lno,line in enumerate(wsource):
|
|
362
|
for lno,line in enumerate(wsource):
|
|
359
|
lead = line[:col]
|
|
363
|
lead = line[:col]
|
|
360
|
if lead.isspace():
|
|
364
|
if lead.isspace():
|
|
361
|
continue
|
|
365
|
continue
|
|
362
|
else:
|
|
366
|
else:
|
|
363
|
if not lead.lstrip().startswith('#'):
|
|
367
|
if not lead.lstrip().startswith('#'):
|
|
364
|
break
|
|
368
|
break
|
|
365
|
# The real 'with' source is up to lno
|
|
369
|
# The real 'with' source is up to lno
|
|
366
|
src_lines = [l[col:] for l in wsource[:lno+1]]
|
|
370
|
src_lines = [l[col:] for l in wsource[:lno+1]]
|
|
367
|
|
|
371
|
|
|
368
|
# Finally, check that the source's first non-comment line begins with the
|
|
372
|
# Finally, check that the source's first non-comment line begins with the
|
|
369
|
# special call 'remote()'
|
|
373
|
# special call 'remote()'
|
|
370
|
for nline,line in enumerate(src_lines):
|
|
374
|
for nline,line in enumerate(src_lines):
|
|
371
|
if line.isspace() or line.startswith('#'):
|
|
375
|
if line.isspace() or line.startswith('#'):
|
|
372
|
continue
|
|
376
|
continue
|
|
373
|
if 'remote()' in line:
|
|
377
|
if 'remote()' in line:
|
|
374
|
break
|
|
378
|
break
|
|
375
|
else:
|
|
379
|
else:
|
|
376
|
raise ValueError('remote() call missing at the start of code')
|
|
380
|
raise ValueError('remote() call missing at the start of code')
|
|
377
|
src = ''.join(src_lines[nline+1:])
|
|
381
|
src = ''.join(src_lines[nline+1:])
|
|
378
|
#print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
|
|
382
|
#print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
|
|
379
|
return src
|
|
383
|
return src
|
|
380
|
|
|
384
|
|
|
381
|
|
|
385
|
|
|
382
|
#-------------------------------------------------------------------------------
|
|
386
|
#-------------------------------------------------------------------------------
|
|
383
|
# The top-level MultiEngine client adaptor
|
|
387
|
# The top-level MultiEngine client adaptor
|
|
384
|
#-------------------------------------------------------------------------------
|
|
388
|
#-------------------------------------------------------------------------------
|
|
385
|
|
|
389
|
|
|
386
|
|
|
390
|
|
|
387
|
_prop_warn = """\
|
|
391
|
_prop_warn = """\
|
|
388
|
|
|
392
|
|
|
389
|
We are currently refactoring the task dependency system. This might
|
|
393
|
We are currently refactoring the task dependency system. This might
|
|
390
|
involve the removal of this method and other methods related to engine
|
|
394
|
involve the removal of this method and other methods related to engine
|
|
391
|
properties. Please see the docstrings for IPython.kernel.TaskRejectError
|
|
395
|
properties. Please see the docstrings for IPython.kernel.TaskRejectError
|
|
392
|
for more information."""
|
|
396
|
for more information."""
|
|
393
|
|
|
397
|
|
|
394
|
|
|
398
|
|
|
395
|
class IFullBlockingMultiEngineClient(Interface):
|
|
399
|
class IFullBlockingMultiEngineClient(Interface):
|
|
396
|
pass
|
|
400
|
pass
|
|
397
|
|
|
401
|
|
|
398
|
|
|
402
|
|
|
399
|
class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
|
|
403
|
class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
|
|
400
|
"""
|
|
404
|
"""
|
|
401
|
A blocking client to the `IMultiEngine` controller interface.
|
|
405
|
A blocking client to the `IMultiEngine` controller interface.
|
|
402
|
|
|
406
|
|
|
403
|
This class allows users to use a set of engines for a parallel
|
|
407
|
This class allows users to use a set of engines for a parallel
|
|
404
|
computation through the `IMultiEngine` interface. In this interface,
|
|
408
|
computation through the `IMultiEngine` interface. In this interface,
|
|
405
|
each engine has a specific id (an int) that is used to refer to the
|
|
409
|
each engine has a specific id (an int) that is used to refer to the
|
|
406
|
engine, run code on it, etc.
|
|
410
|
engine, run code on it, etc.
|
|
407
|
"""
|
|
411
|
"""
|
|
408
|
|
|
412
|
|
|
409
|
implements(
|
|
413
|
implements(
|
|
410
|
IFullBlockingMultiEngineClient,
|
|
414
|
IFullBlockingMultiEngineClient,
|
|
411
|
IMultiEngineMapperFactory,
|
|
415
|
IMultiEngineMapperFactory,
|
|
412
|
IMapper
|
|
416
|
IMapper
|
|
413
|
)
|
|
417
|
)
|
|
414
|
|
|
418
|
|
|
415
|
def __init__(self, smultiengine):
|
|
419
|
def __init__(self, smultiengine):
|
|
416
|
self.smultiengine = smultiengine
|
|
420
|
self.smultiengine = smultiengine
|
|
417
|
self.block = True
|
|
421
|
self.block = True
|
|
418
|
self.targets = 'all'
|
|
422
|
self.targets = 'all'
|
|
419
|
|
|
423
|
|
|
420
|
def _findBlock(self, block=None):
|
|
424
|
def _findBlock(self, block=None):
|
|
421
|
if block is None:
|
|
425
|
if block is None:
|
|
422
|
return self.block
|
|
426
|
return self.block
|
|
423
|
else:
|
|
427
|
else:
|
|
424
|
if block in (True, False):
|
|
428
|
if block in (True, False):
|
|
425
|
return block
|
|
429
|
return block
|
|
426
|
else:
|
|
430
|
else:
|
|
427
|
raise ValueError("block must be True or False")
|
|
431
|
raise ValueError("block must be True or False")
|
|
428
|
|
|
432
|
|
|
429
|
def _findTargets(self, targets=None):
|
|
433
|
def _findTargets(self, targets=None):
|
|
430
|
if targets is None:
|
|
434
|
if targets is None:
|
|
431
|
return self.targets
|
|
435
|
return self.targets
|
|
432
|
else:
|
|
436
|
else:
|
|
433
|
if not isinstance(targets, (str,list,tuple,int)):
|
|
437
|
if not isinstance(targets, (str,list,tuple,int)):
|
|
434
|
raise ValueError("targets must be a str, list, tuple or int")
|
|
438
|
raise ValueError("targets must be a str, list, tuple or int")
|
|
435
|
return targets
|
|
439
|
return targets
|
|
436
|
|
|
440
|
|
|
437
|
def _findTargetsAndBlock(self, targets=None, block=None):
|
|
441
|
def _findTargetsAndBlock(self, targets=None, block=None):
|
|
438
|
return self._findTargets(targets), self._findBlock(block)
|
|
442
|
return self._findTargets(targets), self._findBlock(block)
|
|
439
|
|
|
443
|
|
|
440
|
def _blockFromThread(self, function, *args, **kwargs):
|
|
444
|
def _blockFromThread(self, function, *args, **kwargs):
|
|
441
|
block = kwargs.get('block', None)
|
|
445
|
block = kwargs.get('block', None)
|
|
442
|
if block is None:
|
|
446
|
if block is None:
|
|
443
|
raise error.MissingBlockArgument("'block' keyword argument is missing")
|
|
447
|
raise error.MissingBlockArgument("'block' keyword argument is missing")
|
|
444
|
result = blockingCallFromThread(function, *args, **kwargs)
|
|
448
|
result = blockingCallFromThread(function, *args, **kwargs)
|
|
445
|
if not block:
|
|
449
|
if not block:
|
|
446
|
result = PendingResult(self, result)
|
|
450
|
result = PendingResult(self, result)
|
|
447
|
return result
|
|
451
|
return result
|
|
448
|
|
|
452
|
|
|
449
|
def get_pending_deferred(self, deferredID, block):
|
|
453
|
def get_pending_deferred(self, deferredID, block):
|
|
450
|
return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
|
|
454
|
return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
|
|
451
|
|
|
455
|
|
|
452
|
def barrier(self, pendingResults):
|
|
456
|
def barrier(self, pendingResults):
|
|
453
|
"""Synchronize a set of `PendingResults`.
|
|
457
|
"""Synchronize a set of `PendingResults`.
|
|
454
|
|
|
458
|
|
|
455
|
This method is a synchronization primitive that waits for a set of
|
|
459
|
This method is a synchronization primitive that waits for a set of
|
|
456
|
`PendingResult` objects to complete. More specifically, barier does
|
|
460
|
`PendingResult` objects to complete. More specifically, barier does
|
|
457
|
the following.
|
|
461
|
the following.
|
|
458
|
|
|
462
|
|
|
459
|
* The `PendingResult`s are sorted by result_id.
|
|
463
|
* The `PendingResult`s are sorted by result_id.
|
|
460
|
* The `get_result` method is called for each `PendingResult` sequentially
|
|
464
|
* The `get_result` method is called for each `PendingResult` sequentially
|
|
461
|
with block=True.
|
|
465
|
with block=True.
|
|
462
|
* If a `PendingResult` gets a result that is an exception, it is
|
|
466
|
* If a `PendingResult` gets a result that is an exception, it is
|
|
463
|
trapped and can be re-raised later by calling `get_result` again.
|
|
467
|
trapped and can be re-raised later by calling `get_result` again.
|
|
464
|
* The `PendingResult`s are flushed from the controller.
|
|
468
|
* The `PendingResult`s are flushed from the controller.
|
|
465
|
|
|
469
|
|
|
466
|
After barrier has been called on a `PendingResult`, its results can
|
|
470
|
After barrier has been called on a `PendingResult`, its results can
|
|
467
|
be retrieved by calling `get_result` again or accesing the `r` attribute
|
|
471
|
be retrieved by calling `get_result` again or accesing the `r` attribute
|
|
468
|
of the instance.
|
|
472
|
of the instance.
|
|
469
|
"""
|
|
473
|
"""
|
|
470
|
|
|
474
|
|
|
471
|
# Convert to list for sorting and check class type
|
|
475
|
# Convert to list for sorting and check class type
|
|
472
|
prList = list(pendingResults)
|
|
476
|
prList = list(pendingResults)
|
|
473
|
for pr in prList:
|
|
477
|
for pr in prList:
|
|
474
|
if not isinstance(pr, PendingResult):
|
|
478
|
if not isinstance(pr, PendingResult):
|
|
475
|
raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
|
|
479
|
raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
|
|
476
|
|
|
480
|
|
|
477
|
# Sort the PendingResults so they are in order
|
|
481
|
# Sort the PendingResults so they are in order
|
|
478
|
prList.sort()
|
|
482
|
prList.sort()
|
|
479
|
# Block on each PendingResult object
|
|
483
|
# Block on each PendingResult object
|
|
480
|
for pr in prList:
|
|
484
|
for pr in prList:
|
|
481
|
try:
|
|
485
|
try:
|
|
482
|
result = pr.get_result(block=True)
|
|
486
|
result = pr.get_result(block=True)
|
|
483
|
except Exception:
|
|
487
|
except Exception:
|
|
484
|
pass
|
|
488
|
pass
|
|
485
|
|
|
489
|
|
|
486
|
def flush(self):
|
|
490
|
def flush(self):
|
|
487
|
"""
|
|
491
|
"""
|
|
488
|
Clear all pending deferreds/results from the controller.
|
|
492
|
Clear all pending deferreds/results from the controller.
|
|
489
|
|
|
493
|
|
|
490
|
For each `PendingResult` that is created by this client, the controller
|
|
494
|
For each `PendingResult` that is created by this client, the controller
|
|
491
|
holds on to the result for that `PendingResult`. This can be a problem
|
|
495
|
holds on to the result for that `PendingResult`. This can be a problem
|
|
492
|
if there are a large number of `PendingResult` objects that are created.
|
|
496
|
if there are a large number of `PendingResult` objects that are created.
|
|
493
|
|
|
497
|
|
|
494
|
Once the result of the `PendingResult` has been retrieved, the result
|
|
498
|
Once the result of the `PendingResult` has been retrieved, the result
|
|
495
|
is removed from the controller, but if a user doesn't get a result (
|
|
499
|
is removed from the controller, but if a user doesn't get a result (
|
|
496
|
they just ignore the `PendingResult`) the result is kept forever on the
|
|
500
|
they just ignore the `PendingResult`) the result is kept forever on the
|
|
497
|
controller. This method allows the user to clear out all un-retrieved
|
|
501
|
controller. This method allows the user to clear out all un-retrieved
|
|
498
|
results on the controller.
|
|
502
|
results on the controller.
|
|
499
|
"""
|
|
503
|
"""
|
|
500
|
r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
|
|
504
|
r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
|
|
501
|
return r
|
|
505
|
return r
|
|
502
|
|
|
506
|
|
|
503
|
clear_pending_results = flush
|
|
507
|
clear_pending_results = flush
|
|
504
|
|
|
508
|
|
|
505
|
#---------------------------------------------------------------------------
|
|
509
|
#---------------------------------------------------------------------------
|
|
506
|
# IEngineMultiplexer related methods
|
|
510
|
# IEngineMultiplexer related methods
|
|
507
|
#---------------------------------------------------------------------------
|
|
511
|
#---------------------------------------------------------------------------
|
|
508
|
|
|
512
|
|
|
509
|
def execute(self, lines, targets=None, block=None):
|
|
513
|
def execute(self, lines, targets=None, block=None):
|
|
510
|
"""
|
|
514
|
"""
|
|
511
|
Execute code on a set of engines.
|
|
515
|
Execute code on a set of engines.
|
|
512
|
|
|
516
|
|
|
513
|
:Parameters:
|
|
517
|
:Parameters:
|
|
514
|
lines : str
|
|
518
|
lines : str
|
|
515
|
The Python code to execute as a string
|
|
519
|
The Python code to execute as a string
|
|
516
|
targets : id or list of ids
|
|
520
|
targets : id or list of ids
|
|
517
|
The engine to use for the execution
|
|
521
|
The engine to use for the execution
|
|
518
|
block : boolean
|
|
522
|
block : boolean
|
|
519
|
If False, this method will return the actual result. If False,
|
|
523
|
If False, this method will return the actual result. If False,
|
|
520
|
a `PendingResult` is returned which can be used to get the result
|
|
524
|
a `PendingResult` is returned which can be used to get the result
|
|
521
|
at a later time.
|
|
525
|
at a later time.
|
|
522
|
"""
|
|
526
|
"""
|
|
523
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
527
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
524
|
result = blockingCallFromThread(self.smultiengine.execute, lines,
|
|
528
|
result = blockingCallFromThread(self.smultiengine.execute, lines,
|
|
525
|
targets=targets, block=block)
|
|
529
|
targets=targets, block=block)
|
|
526
|
if block:
|
|
530
|
if block:
|
|
527
|
result = ResultList(result)
|
|
531
|
result = ResultList(result)
|
|
528
|
else:
|
|
532
|
else:
|
|
529
|
result = PendingResult(self, result)
|
|
533
|
result = PendingResult(self, result)
|
|
530
|
result.add_callback(wrapResultList)
|
|
534
|
result.add_callback(wrapResultList)
|
|
531
|
return result
|
|
535
|
return result
|
|
532
|
|
|
536
|
|
|
533
|
def push(self, namespace, targets=None, block=None):
|
|
537
|
def push(self, namespace, targets=None, block=None):
|
|
534
|
"""
|
|
538
|
"""
|
|
535
|
Push a dictionary of keys and values to engines namespace.
|
|
539
|
Push a dictionary of keys and values to engines namespace.
|
|
536
|
|
|
540
|
|
|
537
|
Each engine has a persistent namespace. This method is used to push
|
|
541
|
Each engine has a persistent namespace. This method is used to push
|
|
538
|
Python objects into that namespace.
|
|
542
|
Python objects into that namespace.
|
|
539
|
|
|
543
|
|
|
540
|
The objects in the namespace must be pickleable.
|
|
544
|
The objects in the namespace must be pickleable.
|
|
541
|
|
|
545
|
|
|
542
|
:Parameters:
|
|
546
|
:Parameters:
|
|
543
|
namespace : dict
|
|
547
|
namespace : dict
|
|
544
|
A dict that contains Python objects to be injected into
|
|
548
|
A dict that contains Python objects to be injected into
|
|
545
|
the engine persistent namespace.
|
|
549
|
the engine persistent namespace.
|
|
546
|
targets : id or list of ids
|
|
550
|
targets : id or list of ids
|
|
547
|
The engine to use for the execution
|
|
551
|
The engine to use for the execution
|
|
548
|
block : boolean
|
|
552
|
block : boolean
|
|
549
|
If False, this method will return the actual result. If False,
|
|
553
|
If False, this method will return the actual result. If False,
|
|
550
|
a `PendingResult` is returned which can be used to get the result
|
|
554
|
a `PendingResult` is returned which can be used to get the result
|
|
551
|
at a later time.
|
|
555
|
at a later time.
|
|
552
|
"""
|
|
556
|
"""
|
|
553
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
557
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
554
|
return self._blockFromThread(self.smultiengine.push, namespace,
|
|
558
|
return self._blockFromThread(self.smultiengine.push, namespace,
|
|
555
|
targets=targets, block=block)
|
|
559
|
targets=targets, block=block)
|
|
556
|
|
|
560
|
|
|
557
|
def pull(self, keys, targets=None, block=None):
|
|
561
|
def pull(self, keys, targets=None, block=None):
|
|
558
|
"""
|
|
562
|
"""
|
|
559
|
Pull Python objects by key out of engines namespaces.
|
|
563
|
Pull Python objects by key out of engines namespaces.
|
|
560
|
|
|
564
|
|
|
561
|
:Parameters:
|
|
565
|
:Parameters:
|
|
562
|
keys : str or list of str
|
|
566
|
keys : str or list of str
|
|
563
|
The names of the variables to be pulled
|
|
567
|
The names of the variables to be pulled
|
|
564
|
targets : id or list of ids
|
|
568
|
targets : id or list of ids
|
|
565
|
The engine to use for the execution
|
|
569
|
The engine to use for the execution
|
|
566
|
block : boolean
|
|
570
|
block : boolean
|
|
567
|
If False, this method will return the actual result. If False,
|
|
571
|
If False, this method will return the actual result. If False,
|
|
568
|
a `PendingResult` is returned which can be used to get the result
|
|
572
|
a `PendingResult` is returned which can be used to get the result
|
|
569
|
at a later time.
|
|
573
|
at a later time.
|
|
570
|
"""
|
|
574
|
"""
|
|
571
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
575
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
572
|
return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
|
|
576
|
return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
|
|
573
|
|
|
577
|
|
|
574
|
def push_function(self, namespace, targets=None, block=None):
|
|
578
|
def push_function(self, namespace, targets=None, block=None):
|
|
575
|
"""
|
|
579
|
"""
|
|
576
|
Push a Python function to an engine.
|
|
580
|
Push a Python function to an engine.
|
|
577
|
|
|
581
|
|
|
578
|
This method is used to push a Python function to an engine. This
|
|
582
|
This method is used to push a Python function to an engine. This
|
|
579
|
method can then be used in code on the engines. Closures are not supported.
|
|
583
|
method can then be used in code on the engines. Closures are not supported.
|
|
580
|
|
|
584
|
|
|
581
|
:Parameters:
|
|
585
|
:Parameters:
|
|
582
|
namespace : dict
|
|
586
|
namespace : dict
|
|
583
|
A dict whose values are the functions to be pushed. The keys give
|
|
587
|
A dict whose values are the functions to be pushed. The keys give
|
|
584
|
that names that the function will appear as in the engines
|
|
588
|
that names that the function will appear as in the engines
|
|
585
|
namespace.
|
|
589
|
namespace.
|
|
586
|
targets : id or list of ids
|
|
590
|
targets : id or list of ids
|
|
587
|
The engine to use for the execution
|
|
591
|
The engine to use for the execution
|
|
588
|
block : boolean
|
|
592
|
block : boolean
|
|
589
|
If False, this method will return the actual result. If False,
|
|
593
|
If False, this method will return the actual result. If False,
|
|
590
|
a `PendingResult` is returned which can be used to get the result
|
|
594
|
a `PendingResult` is returned which can be used to get the result
|
|
591
|
at a later time.
|
|
595
|
at a later time.
|
|
592
|
"""
|
|
596
|
"""
|
|
593
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
597
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
594
|
return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
|
|
598
|
return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
|
|
595
|
|
|
599
|
|
|
596
|
def pull_function(self, keys, targets=None, block=None):
|
|
600
|
def pull_function(self, keys, targets=None, block=None):
|
|
597
|
"""
|
|
601
|
"""
|
|
598
|
Pull a Python function from an engine.
|
|
602
|
Pull a Python function from an engine.
|
|
599
|
|
|
603
|
|
|
600
|
This method is used to pull a Python function from an engine.
|
|
604
|
This method is used to pull a Python function from an engine.
|
|
601
|
Closures are not supported.
|
|
605
|
Closures are not supported.
|
|
602
|
|
|
606
|
|
|
603
|
:Parameters:
|
|
607
|
:Parameters:
|
|
604
|
keys : str or list of str
|
|
608
|
keys : str or list of str
|
|
605
|
The names of the functions to be pulled
|
|
609
|
The names of the functions to be pulled
|
|
606
|
targets : id or list of ids
|
|
610
|
targets : id or list of ids
|
|
607
|
The engine to use for the execution
|
|
611
|
The engine to use for the execution
|
|
608
|
block : boolean
|
|
612
|
block : boolean
|
|
609
|
If False, this method will return the actual result. If False,
|
|
613
|
If False, this method will return the actual result. If False,
|
|
610
|
a `PendingResult` is returned which can be used to get the result
|
|
614
|
a `PendingResult` is returned which can be used to get the result
|
|
611
|
at a later time.
|
|
615
|
at a later time.
|
|
612
|
"""
|
|
616
|
"""
|
|
613
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
617
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
614
|
return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
|
|
618
|
return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
|
|
615
|
|
|
619
|
|
|
616
|
def push_serialized(self, namespace, targets=None, block=None):
|
|
620
|
def push_serialized(self, namespace, targets=None, block=None):
|
|
617
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
621
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
618
|
return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
|
|
622
|
return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
|
|
619
|
|
|
623
|
|
|
620
|
def pull_serialized(self, keys, targets=None, block=None):
|
|
624
|
def pull_serialized(self, keys, targets=None, block=None):
|
|
621
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
625
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
622
|
return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
|
|
626
|
return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
|
|
623
|
|
|
627
|
|
|
624
|
def get_result(self, i=None, targets=None, block=None):
|
|
628
|
def get_result(self, i=None, targets=None, block=None):
|
|
625
|
"""
|
|
629
|
"""
|
|
626
|
Get a previous result.
|
|
630
|
Get a previous result.
|
|
627
|
|
|
631
|
|
|
628
|
When code is executed in an engine, a dict is created and returned. This
|
|
632
|
When code is executed in an engine, a dict is created and returned. This
|
|
629
|
method retrieves that dict for previous commands.
|
|
633
|
method retrieves that dict for previous commands.
|
|
630
|
|
|
634
|
|
|
631
|
:Parameters:
|
|
635
|
:Parameters:
|
|
632
|
i : int
|
|
636
|
i : int
|
|
633
|
The number of the result to get
|
|
637
|
The number of the result to get
|
|
634
|
targets : id or list of ids
|
|
638
|
targets : id or list of ids
|
|
635
|
The engine to use for the execution
|
|
639
|
The engine to use for the execution
|
|
636
|
block : boolean
|
|
640
|
block : boolean
|
|
637
|
If False, this method will return the actual result. If False,
|
|
641
|
If False, this method will return the actual result. If False,
|
|
638
|
a `PendingResult` is returned which can be used to get the result
|
|
642
|
a `PendingResult` is returned which can be used to get the result
|
|
639
|
at a later time.
|
|
643
|
at a later time.
|
|
640
|
"""
|
|
644
|
"""
|
|
641
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
645
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
642
|
result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
|
|
646
|
result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
|
|
643
|
if block:
|
|
647
|
if block:
|
|
644
|
result = ResultList(result)
|
|
648
|
result = ResultList(result)
|
|
645
|
else:
|
|
649
|
else:
|
|
646
|
result = PendingResult(self, result)
|
|
650
|
result = PendingResult(self, result)
|
|
647
|
result.add_callback(wrapResultList)
|
|
651
|
result.add_callback(wrapResultList)
|
|
648
|
return result
|
|
652
|
return result
|
|
649
|
|
|
653
|
|
|
650
|
def reset(self, targets=None, block=None):
|
|
654
|
def reset(self, targets=None, block=None):
|
|
651
|
"""
|
|
655
|
"""
|
|
652
|
Reset an engine.
|
|
656
|
Reset an engine.
|
|
653
|
|
|
657
|
|
|
654
|
This method clears out the namespace of an engine.
|
|
658
|
This method clears out the namespace of an engine.
|
|
655
|
|
|
659
|
|
|
656
|
:Parameters:
|
|
660
|
:Parameters:
|
|
657
|
targets : id or list of ids
|
|
661
|
targets : id or list of ids
|
|
658
|
The engine to use for the execution
|
|
662
|
The engine to use for the execution
|
|
659
|
block : boolean
|
|
663
|
block : boolean
|
|
660
|
If False, this method will return the actual result. If False,
|
|
664
|
If False, this method will return the actual result. If False,
|
|
661
|
a `PendingResult` is returned which can be used to get the result
|
|
665
|
a `PendingResult` is returned which can be used to get the result
|
|
662
|
at a later time.
|
|
666
|
at a later time.
|
|
663
|
"""
|
|
667
|
"""
|
|
664
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
668
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
665
|
return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
|
|
669
|
return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
|
|
666
|
|
|
670
|
|
|
667
|
def keys(self, targets=None, block=None):
|
|
671
|
def keys(self, targets=None, block=None):
|
|
668
|
"""
|
|
672
|
"""
|
|
669
|
Get a list of all the variables in an engine's namespace.
|
|
673
|
Get a list of all the variables in an engine's namespace.
|
|
670
|
|
|
674
|
|
|
671
|
:Parameters:
|
|
675
|
:Parameters:
|
|
672
|
targets : id or list of ids
|
|
676
|
targets : id or list of ids
|
|
673
|
The engine to use for the execution
|
|
677
|
The engine to use for the execution
|
|
674
|
block : boolean
|
|
678
|
block : boolean
|
|
675
|
If False, this method will return the actual result. If False,
|
|
679
|
If False, this method will return the actual result. If False,
|
|
676
|
a `PendingResult` is returned which can be used to get the result
|
|
680
|
a `PendingResult` is returned which can be used to get the result
|
|
677
|
at a later time.
|
|
681
|
at a later time.
|
|
678
|
"""
|
|
682
|
"""
|
|
679
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
683
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
680
|
return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
|
|
684
|
return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
|
|
681
|
|
|
685
|
|
|
682
|
def kill(self, controller=False, targets=None, block=None):
|
|
686
|
def kill(self, controller=False, targets=None, block=None):
|
|
683
|
"""
|
|
687
|
"""
|
|
684
|
Kill the engines and controller.
|
|
688
|
Kill the engines and controller.
|
|
685
|
|
|
689
|
|
|
686
|
This method is used to stop the engine and controller by calling
|
|
690
|
This method is used to stop the engine and controller by calling
|
|
687
|
`reactor.stop`.
|
|
691
|
`reactor.stop`.
|
|
688
|
|
|
692
|
|
|
689
|
:Parameters:
|
|
693
|
:Parameters:
|
|
690
|
controller : boolean
|
|
694
|
controller : boolean
|
|
691
|
If True, kill the engines and controller. If False, just the
|
|
695
|
If True, kill the engines and controller. If False, just the
|
|
692
|
engines
|
|
696
|
engines
|
|
693
|
targets : id or list of ids
|
|
697
|
targets : id or list of ids
|
|
694
|
The engine to use for the execution
|
|
698
|
The engine to use for the execution
|
|
695
|
block : boolean
|
|
699
|
block : boolean
|
|
696
|
If False, this method will return the actual result. If False,
|
|
700
|
If False, this method will return the actual result. If False,
|
|
697
|
a `PendingResult` is returned which can be used to get the result
|
|
701
|
a `PendingResult` is returned which can be used to get the result
|
|
698
|
at a later time.
|
|
702
|
at a later time.
|
|
699
|
"""
|
|
703
|
"""
|
|
700
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
704
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
701
|
return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
|
|
705
|
return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
|
|
702
|
|
|
706
|
|
|
703
|
def clear_queue(self, targets=None, block=None):
|
|
707
|
def clear_queue(self, targets=None, block=None):
|
|
704
|
"""
|
|
708
|
"""
|
|
705
|
Clear out the controller's queue for an engine.
|
|
709
|
Clear out the controller's queue for an engine.
|
|
706
|
|
|
710
|
|
|
707
|
The controller maintains a queue for each engine. This clear it out.
|
|
711
|
The controller maintains a queue for each engine. This clear it out.
|
|
708
|
|
|
712
|
|
|
709
|
:Parameters:
|
|
713
|
:Parameters:
|
|
710
|
targets : id or list of ids
|
|
714
|
targets : id or list of ids
|
|
711
|
The engine to use for the execution
|
|
715
|
The engine to use for the execution
|
|
712
|
block : boolean
|
|
716
|
block : boolean
|
|
713
|
If False, this method will return the actual result. If False,
|
|
717
|
If False, this method will return the actual result. If False,
|
|
714
|
a `PendingResult` is returned which can be used to get the result
|
|
718
|
a `PendingResult` is returned which can be used to get the result
|
|
715
|
at a later time.
|
|
719
|
at a later time.
|
|
716
|
"""
|
|
720
|
"""
|
|
717
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
721
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
718
|
return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
|
|
722
|
return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
|
|
719
|
|
|
723
|
|
|
720
|
def queue_status(self, targets=None, block=None):
|
|
724
|
def queue_status(self, targets=None, block=None):
|
|
721
|
"""
|
|
725
|
"""
|
|
722
|
Get the status of an engines queue.
|
|
726
|
Get the status of an engines queue.
|
|
723
|
|
|
727
|
|
|
724
|
:Parameters:
|
|
728
|
:Parameters:
|
|
725
|
targets : id or list of ids
|
|
729
|
targets : id or list of ids
|
|
726
|
The engine to use for the execution
|
|
730
|
The engine to use for the execution
|
|
727
|
block : boolean
|
|
731
|
block : boolean
|
|
728
|
If False, this method will return the actual result. If False,
|
|
732
|
If False, this method will return the actual result. If False,
|
|
729
|
a `PendingResult` is returned which can be used to get the result
|
|
733
|
a `PendingResult` is returned which can be used to get the result
|
|
730
|
at a later time.
|
|
734
|
at a later time.
|
|
731
|
"""
|
|
735
|
"""
|
|
732
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
736
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
733
|
return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
|
|
737
|
return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
|
|
734
|
|
|
738
|
|
|
735
|
def set_properties(self, properties, targets=None, block=None):
|
|
739
|
def set_properties(self, properties, targets=None, block=None):
|
|
736
|
warnings.warn(_prop_warn)
|
|
740
|
warnings.warn(_prop_warn)
|
|
737
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
741
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
738
|
return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
|
|
742
|
return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
|
|
739
|
|
|
743
|
|
|
740
|
def get_properties(self, keys=None, targets=None, block=None):
|
|
744
|
def get_properties(self, keys=None, targets=None, block=None):
|
|
741
|
warnings.warn(_prop_warn)
|
|
745
|
warnings.warn(_prop_warn)
|
|
742
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
746
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
743
|
return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
|
|
747
|
return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
|
|
744
|
|
|
748
|
|
|
745
|
def has_properties(self, keys, targets=None, block=None):
|
|
749
|
def has_properties(self, keys, targets=None, block=None):
|
|
746
|
warnings.warn(_prop_warn)
|
|
750
|
warnings.warn(_prop_warn)
|
|
747
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
751
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
748
|
return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
|
|
752
|
return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
|
|
749
|
|
|
753
|
|
|
750
|
def del_properties(self, keys, targets=None, block=None):
|
|
754
|
def del_properties(self, keys, targets=None, block=None):
|
|
751
|
warnings.warn(_prop_warn)
|
|
755
|
warnings.warn(_prop_warn)
|
|
752
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
756
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
753
|
return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
|
|
757
|
return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
|
|
754
|
|
|
758
|
|
|
755
|
def clear_properties(self, targets=None, block=None):
|
|
759
|
def clear_properties(self, targets=None, block=None):
|
|
756
|
warnings.warn(_prop_warn)
|
|
760
|
warnings.warn(_prop_warn)
|
|
757
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
761
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
758
|
return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
|
|
762
|
return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
|
|
759
|
|
|
763
|
|
|
760
|
#---------------------------------------------------------------------------
|
|
764
|
#---------------------------------------------------------------------------
|
|
761
|
# IMultiEngine related methods
|
|
765
|
# IMultiEngine related methods
|
|
762
|
#---------------------------------------------------------------------------
|
|
766
|
#---------------------------------------------------------------------------
|
|
763
|
|
|
767
|
|
|
764
|
def get_ids(self):
|
|
768
|
def get_ids(self):
|
|
765
|
"""
|
|
769
|
"""
|
|
766
|
Returns the ids of currently registered engines.
|
|
770
|
Returns the ids of currently registered engines.
|
|
767
|
"""
|
|
771
|
"""
|
|
768
|
result = blockingCallFromThread(self.smultiengine.get_ids)
|
|
772
|
result = blockingCallFromThread(self.smultiengine.get_ids)
|
|
769
|
return result
|
|
773
|
return result
|
|
770
|
|
|
774
|
|
|
771
|
#---------------------------------------------------------------------------
|
|
775
|
#---------------------------------------------------------------------------
|
|
772
|
# IMultiEngineCoordinator
|
|
776
|
# IMultiEngineCoordinator
|
|
773
|
#---------------------------------------------------------------------------
|
|
777
|
#---------------------------------------------------------------------------
|
|
774
|
|
|
778
|
|
|
775
|
def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
|
|
779
|
def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
|
|
776
|
"""
|
|
780
|
"""
|
|
777
|
Partition a Python sequence and send the partitions to a set of engines.
|
|
781
|
Partition a Python sequence and send the partitions to a set of engines.
|
|
778
|
"""
|
|
782
|
"""
|
|
779
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
783
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
780
|
return self._blockFromThread(self.smultiengine.scatter, key, seq,
|
|
784
|
return self._blockFromThread(self.smultiengine.scatter, key, seq,
|
|
781
|
dist, flatten, targets=targets, block=block)
|
|
785
|
dist, flatten, targets=targets, block=block)
|
|
782
|
|
|
786
|
|
|
783
|
def gather(self, key, dist='b', targets=None, block=None):
|
|
787
|
def gather(self, key, dist='b', targets=None, block=None):
|
|
784
|
"""
|
|
788
|
"""
|
|
785
|
Gather a partitioned sequence on a set of engines as a single local seq.
|
|
789
|
Gather a partitioned sequence on a set of engines as a single local seq.
|
|
786
|
"""
|
|
790
|
"""
|
|
787
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
791
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
788
|
return self._blockFromThread(self.smultiengine.gather, key, dist,
|
|
792
|
return self._blockFromThread(self.smultiengine.gather, key, dist,
|
|
789
|
targets=targets, block=block)
|
|
793
|
targets=targets, block=block)
|
|
790
|
|
|
794
|
|
|
791
|
def raw_map(self, func, seq, dist='b', targets=None, block=None):
|
|
795
|
def raw_map(self, func, seq, dist='b', targets=None, block=None):
|
|
792
|
"""
|
|
796
|
"""
|
|
793
|
A parallelized version of Python's builtin map.
|
|
797
|
A parallelized version of Python's builtin map.
|
|
794
|
|
|
798
|
|
|
795
|
This has a slightly different syntax than the builtin `map`.
|
|
799
|
This has a slightly different syntax than the builtin `map`.
|
|
796
|
This is needed because we need to have keyword arguments and thus
|
|
800
|
This is needed because we need to have keyword arguments and thus
|
|
797
|
can't use *args to capture all the sequences. Instead, they must
|
|
801
|
can't use *args to capture all the sequences. Instead, they must
|
|
798
|
be passed in a list or tuple.
|
|
802
|
be passed in a list or tuple.
|
|
799
|
|
|
803
|
|
|
800
|
raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
|
|
804
|
raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
|
|
801
|
|
|
805
|
|
|
802
|
Most users will want to use parallel functions or the `mapper`
|
|
806
|
Most users will want to use parallel functions or the `mapper`
|
|
803
|
and `map` methods for an API that follows that of the builtin
|
|
807
|
and `map` methods for an API that follows that of the builtin
|
|
804
|
`map`.
|
|
808
|
`map`.
|
|
805
|
"""
|
|
809
|
"""
|
|
806
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
810
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
807
|
return self._blockFromThread(self.smultiengine.raw_map, func, seq,
|
|
811
|
return self._blockFromThread(self.smultiengine.raw_map, func, seq,
|
|
808
|
dist, targets=targets, block=block)
|
|
812
|
dist, targets=targets, block=block)
|
|
809
|
|
|
813
|
|
|
810
|
def map(self, func, *sequences):
|
|
814
|
def map(self, func, *sequences):
|
|
811
|
"""
|
|
815
|
"""
|
|
812
|
A parallel version of Python's builtin `map` function.
|
|
816
|
A parallel version of Python's builtin `map` function.
|
|
813
|
|
|
817
|
|
|
814
|
This method applies a function to sequences of arguments. It
|
|
818
|
This method applies a function to sequences of arguments. It
|
|
815
|
follows the same syntax as the builtin `map`.
|
|
819
|
follows the same syntax as the builtin `map`.
|
|
816
|
|
|
820
|
|
|
817
|
This method creates a mapper objects by calling `self.mapper` with
|
|
821
|
This method creates a mapper objects by calling `self.mapper` with
|
|
818
|
no arguments and then uses that mapper to do the mapping. See
|
|
822
|
no arguments and then uses that mapper to do the mapping. See
|
|
819
|
the documentation of `mapper` for more details.
|
|
823
|
the documentation of `mapper` for more details.
|
|
820
|
"""
|
|
824
|
"""
|
|
821
|
return self.mapper().map(func, *sequences)
|
|
825
|
return self.mapper().map(func, *sequences)
|
|
822
|
|
|
826
|
|
|
823
|
def mapper(self, dist='b', targets='all', block=None):
|
|
827
|
def mapper(self, dist='b', targets='all', block=None):
|
|
824
|
"""
|
|
828
|
"""
|
|
825
|
Create a mapper object that has a `map` method.
|
|
829
|
Create a mapper object that has a `map` method.
|
|
826
|
|
|
830
|
|
|
827
|
This method returns an object that implements the `IMapper`
|
|
831
|
This method returns an object that implements the `IMapper`
|
|
828
|
interface. This method is a factory that is used to control how
|
|
832
|
interface. This method is a factory that is used to control how
|
|
829
|
the map happens.
|
|
833
|
the map happens.
|
|
830
|
|
|
834
|
|
|
831
|
:Parameters:
|
|
835
|
:Parameters:
|
|
832
|
dist : str
|
|
836
|
dist : str
|
|
833
|
What decomposition to use, 'b' is the only one supported
|
|
837
|
What decomposition to use, 'b' is the only one supported
|
|
834
|
currently
|
|
838
|
currently
|
|
835
|
targets : str, int, sequence of ints
|
|
839
|
targets : str, int, sequence of ints
|
|
836
|
Which engines to use for the map
|
|
840
|
Which engines to use for the map
|
|
837
|
block : boolean
|
|
841
|
block : boolean
|
|
838
|
Should calls to `map` block or not
|
|
842
|
Should calls to `map` block or not
|
|
839
|
"""
|
|
843
|
"""
|
|
840
|
return MultiEngineMapper(self, dist, targets, block)
|
|
844
|
return MultiEngineMapper(self, dist, targets, block)
|
|
841
|
|
|
845
|
|
|
842
|
def parallel(self, dist='b', targets=None, block=None):
|
|
846
|
def parallel(self, dist='b', targets=None, block=None):
|
|
843
|
"""
|
|
847
|
"""
|
|
844
|
A decorator that turns a function into a parallel function.
|
|
848
|
A decorator that turns a function into a parallel function.
|
|
845
|
|
|
849
|
|
|
846
|
This can be used as:
|
|
850
|
This can be used as:
|
|
847
|
|
|
851
|
|
|
848
|
@parallel()
|
|
852
|
@parallel()
|
|
849
|
def f(x, y)
|
|
853
|
def f(x, y)
|
|
850
|
...
|
|
854
|
...
|
|
851
|
|
|
855
|
|
|
852
|
f(range(10), range(10))
|
|
856
|
f(range(10), range(10))
|
|
853
|
|
|
857
|
|
|
854
|
This causes f(0,0), f(1,1), ... to be called in parallel.
|
|
858
|
This causes f(0,0), f(1,1), ... to be called in parallel.
|
|
855
|
|
|
859
|
|
|
856
|
:Parameters:
|
|
860
|
:Parameters:
|
|
857
|
dist : str
|
|
861
|
dist : str
|
|
858
|
What decomposition to use, 'b' is the only one supported
|
|
862
|
What decomposition to use, 'b' is the only one supported
|
|
859
|
currently
|
|
863
|
currently
|
|
860
|
targets : str, int, sequence of ints
|
|
864
|
targets : str, int, sequence of ints
|
|
861
|
Which engines to use for the map
|
|
865
|
Which engines to use for the map
|
|
862
|
block : boolean
|
|
866
|
block : boolean
|
|
863
|
Should calls to `map` block or not
|
|
867
|
Should calls to `map` block or not
|
|
864
|
"""
|
|
868
|
"""
|
|
865
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
869
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
866
|
mapper = self.mapper(dist, targets, block)
|
|
870
|
mapper = self.mapper(dist, targets, block)
|
|
867
|
pf = ParallelFunction(mapper)
|
|
871
|
pf = ParallelFunction(mapper)
|
|
868
|
return pf
|
|
872
|
return pf
|
|
869
|
|
|
873
|
|
|
870
|
#---------------------------------------------------------------------------
|
|
874
|
#---------------------------------------------------------------------------
|
|
871
|
# IMultiEngineExtras
|
|
875
|
# IMultiEngineExtras
|
|
872
|
#---------------------------------------------------------------------------
|
|
876
|
#---------------------------------------------------------------------------
|
|
873
|
|
|
877
|
|
|
874
|
def zip_pull(self, keys, targets=None, block=None):
|
|
878
|
def zip_pull(self, keys, targets=None, block=None):
|
|
875
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
879
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
876
|
return self._blockFromThread(self.smultiengine.zip_pull, keys,
|
|
880
|
return self._blockFromThread(self.smultiengine.zip_pull, keys,
|
|
877
|
targets=targets, block=block)
|
|
881
|
targets=targets, block=block)
|
|
878
|
|
|
882
|
|
|
879
|
def run(self, filename, targets=None, block=None):
|
|
883
|
def run(self, filename, targets=None, block=None):
|
|
880
|
"""
|
|
884
|
"""
|
|
881
|
Run a Python code in a file on the engines.
|
|
885
|
Run a Python code in a file on the engines.
|
|
882
|
|
|
886
|
|
|
883
|
:Parameters:
|
|
887
|
:Parameters:
|
|
884
|
filename : str
|
|
888
|
filename : str
|
|
885
|
The name of the local file to run
|
|
889
|
The name of the local file to run
|
|
886
|
targets : id or list of ids
|
|
890
|
targets : id or list of ids
|
|
887
|
The engine to use for the execution
|
|
891
|
The engine to use for the execution
|
|
888
|
block : boolean
|
|
892
|
block : boolean
|
|
889
|
If False, this method will return the actual result. If False,
|
|
893
|
If False, this method will return the actual result. If False,
|
|
890
|
a `PendingResult` is returned which can be used to get the result
|
|
894
|
a `PendingResult` is returned which can be used to get the result
|
|
891
|
at a later time.
|
|
895
|
at a later time.
|
|
892
|
"""
|
|
896
|
"""
|
|
893
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
897
|
targets, block = self._findTargetsAndBlock(targets, block)
|
|
894
|
return self._blockFromThread(self.smultiengine.run, filename,
|
|
898
|
return self._blockFromThread(self.smultiengine.run, filename,
|
|
895
|
targets=targets, block=block)
|
|
899
|
targets=targets, block=block)
|
|
896
|
|
|
900
|
|
|
897
|
def benchmark(self, push_size=10000):
|
|
901
|
def benchmark(self, push_size=10000):
|
|
898
|
"""
|
|
902
|
"""
|
|
899
|
Run performance benchmarks for the current IPython cluster.
|
|
903
|
Run performance benchmarks for the current IPython cluster.
|
|
900
|
|
|
904
|
|
|
901
|
This method tests both the latency of sending command and data to the
|
|
905
|
This method tests both the latency of sending command and data to the
|
|
902
|
engines as well as the throughput of sending large objects to the
|
|
906
|
engines as well as the throughput of sending large objects to the
|
|
903
|
engines using push. The latency is measured by having one or more
|
|
907
|
engines using push. The latency is measured by having one or more
|
|
904
|
engines execute the command 'pass'. The throughput is measure by
|
|
908
|
engines execute the command 'pass'. The throughput is measure by
|
|
905
|
sending an NumPy array of size `push_size` to one or more engines.
|
|
909
|
sending an NumPy array of size `push_size` to one or more engines.
|
|
906
|
|
|
910
|
|
|
907
|
These benchmarks will vary widely on different hardware and networks
|
|
911
|
These benchmarks will vary widely on different hardware and networks
|
|
908
|
and thus can be used to get an idea of the performance characteristics
|
|
912
|
and thus can be used to get an idea of the performance characteristics
|
|
909
|
of a particular configuration of an IPython controller and engines.
|
|
913
|
of a particular configuration of an IPython controller and engines.
|
|
910
|
|
|
914
|
|
|
911
|
This function is not testable within our current testing framework.
|
|
915
|
This function is not testable within our current testing framework.
|
|
912
|
"""
|
|
916
|
"""
|
|
913
|
import timeit, __builtin__
|
|
917
|
import timeit, __builtin__
|
|
914
|
__builtin__._mec_self = self
|
|
918
|
__builtin__._mec_self = self
|
|
915
|
benchmarks = {}
|
|
919
|
benchmarks = {}
|
|
916
|
repeat = 3
|
|
920
|
repeat = 3
|
|
917
|
count = 10
|
|
921
|
count = 10
|
|
918
|
|
|
922
|
|
|
919
|
timer = timeit.Timer('_mec_self.execute("pass",0)')
|
|
923
|
timer = timeit.Timer('_mec_self.execute("pass",0)')
|
|
920
|
result = 1000*min(timer.repeat(repeat,count))/count
|
|
924
|
result = 1000*min(timer.repeat(repeat,count))/count
|
|
921
|
benchmarks['single_engine_latency'] = (result,'msec')
|
|
925
|
benchmarks['single_engine_latency'] = (result,'msec')
|
|
922
|
|
|
926
|
|
|
923
|
timer = timeit.Timer('_mec_self.execute("pass")')
|
|
927
|
timer = timeit.Timer('_mec_self.execute("pass")')
|
|
924
|
result = 1000*min(timer.repeat(repeat,count))/count
|
|
928
|
result = 1000*min(timer.repeat(repeat,count))/count
|
|
925
|
benchmarks['all_engine_latency'] = (result,'msec')
|
|
929
|
benchmarks['all_engine_latency'] = (result,'msec')
|
|
926
|
|
|
930
|
|
|
927
|
try:
|
|
931
|
try:
|
|
928
|
import numpy as np
|
|
932
|
import numpy as np
|
|
929
|
except:
|
|
933
|
except:
|
|
930
|
pass
|
|
934
|
pass
|
|
931
|
else:
|
|
935
|
else:
|
|
932
|
timer = timeit.Timer(
|
|
936
|
timer = timeit.Timer(
|
|
933
|
"_mec_self.push(d)",
|
|
937
|
"_mec_self.push(d)",
|
|
934
|
"import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
|
|
938
|
"import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
|
|
935
|
)
|
|
939
|
)
|
|
936
|
result = min(timer.repeat(repeat,count))/count
|
|
940
|
result = min(timer.repeat(repeat,count))/count
|
|
937
|
benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
|
|
941
|
benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
|
|
938
|
|
|
942
|
|
|
939
|
try:
|
|
943
|
try:
|
|
940
|
import numpy as np
|
|
944
|
import numpy as np
|
|
941
|
except:
|
|
945
|
except:
|
|
942
|
pass
|
|
946
|
pass
|
|
943
|
else:
|
|
947
|
else:
|
|
944
|
timer = timeit.Timer(
|
|
948
|
timer = timeit.Timer(
|
|
945
|
"_mec_self.push(d,0)",
|
|
949
|
"_mec_self.push(d,0)",
|
|
946
|
"import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
|
|
950
|
"import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
|
|
947
|
)
|
|
951
|
)
|
|
948
|
result = min(timer.repeat(repeat,count))/count
|
|
952
|
result = min(timer.repeat(repeat,count))/count
|
|
949
|
benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
|
|
953
|
benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
|
|
950
|
|
|
954
|
|
|
951
|
return benchmarks
|
|
955
|
return benchmarks
|
|
952
|
|
|
956
|
|
|
953
|
|
|
957
|
|
|
954
|
components.registerAdapter(FullBlockingMultiEngineClient,
|
|
958
|
components.registerAdapter(FullBlockingMultiEngineClient,
|
|
955
|
IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
|
|
959
|
IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
|
|
956
|
|
|
960
|
|
|
957
|
|
|
961
|
|
|
958
|
|
|
962
|
|
|
959
|
|
|
963
|
|