##// END OF EJS Templates
Adding documentation to the new benchmark method of MultiEngineClient.
Brian Granger -
r1879:77f8166c merge
parent child Browse files
Show More
@@ -1,896 +1,951 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengineclient -*-
3
3
4 """General Classes for IMultiEngine clients."""
4 """General Classes for IMultiEngine clients."""
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
9 # Copyright (C) 2008 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import cPickle as pickle
20 import cPickle as pickle
21 from types import FunctionType
21 from types import FunctionType
22 import linecache
22 import linecache
23
23
24 from twisted.internet import reactor
24 from twisted.internet import reactor
25 from twisted.python import components, log
25 from twisted.python import components, log
26 from twisted.python.failure import Failure
26 from twisted.python.failure import Failure
27 from zope.interface import Interface, implements, Attribute
27 from zope.interface import Interface, implements, Attribute
28
28
29 from IPython.ColorANSI import TermColors
29 from IPython.ColorANSI import TermColors
30
30
31 from IPython.kernel.twistedutil import blockingCallFromThread
31 from IPython.kernel.twistedutil import blockingCallFromThread
32 from IPython.kernel import error
32 from IPython.kernel import error
33 from IPython.kernel.parallelfunction import ParallelFunction
33 from IPython.kernel.parallelfunction import ParallelFunction
34 from IPython.kernel.mapper import (
34 from IPython.kernel.mapper import (
35 MultiEngineMapper,
35 MultiEngineMapper,
36 IMultiEngineMapperFactory,
36 IMultiEngineMapperFactory,
37 IMapper
37 IMapper
38 )
38 )
39 from IPython.kernel import map as Map
39 from IPython.kernel import map as Map
40 from IPython.kernel import multiengine as me
40 from IPython.kernel import multiengine as me
41 from IPython.kernel.multiengine import (IFullMultiEngine,
41 from IPython.kernel.multiengine import (IFullMultiEngine,
42 IFullSynchronousMultiEngine)
42 IFullSynchronousMultiEngine)
43
43
44
44
45 #-------------------------------------------------------------------------------
45 #-------------------------------------------------------------------------------
46 # Pending Result things
46 # Pending Result things
47 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
48
48
49 class IPendingResult(Interface):
49 class IPendingResult(Interface):
50 """A representation of a result that is pending.
50 """A representation of a result that is pending.
51
51
52 This class is similar to Twisted's `Deferred` object, but is designed to be
52 This class is similar to Twisted's `Deferred` object, but is designed to be
53 used in a synchronous context.
53 used in a synchronous context.
54 """
54 """
55
55
56 result_id=Attribute("ID of the deferred on the other side")
56 result_id=Attribute("ID of the deferred on the other side")
57 client=Attribute("A client that I came from")
57 client=Attribute("A client that I came from")
58 r=Attribute("An attribute that is a property that calls and returns get_result")
58 r=Attribute("An attribute that is a property that calls and returns get_result")
59
59
60 def get_result(default=None, block=True):
60 def get_result(default=None, block=True):
61 """
61 """
62 Get a result that is pending.
62 Get a result that is pending.
63
63
64 :Parameters:
64 :Parameters:
65 default
65 default
66 The value to return if the result is not ready.
66 The value to return if the result is not ready.
67 block : boolean
67 block : boolean
68 Should I block for the result.
68 Should I block for the result.
69
69
70 :Returns: The actual result or the default value.
70 :Returns: The actual result or the default value.
71 """
71 """
72
72
73 def add_callback(f, *args, **kwargs):
73 def add_callback(f, *args, **kwargs):
74 """
74 """
75 Add a callback that is called with the result.
75 Add a callback that is called with the result.
76
76
77 If the original result is foo, adding a callback will cause
77 If the original result is foo, adding a callback will cause
78 f(foo, *args, **kwargs) to be returned instead. If multiple
78 f(foo, *args, **kwargs) to be returned instead. If multiple
79 callbacks are registered, they are chained together: the result of
79 callbacks are registered, they are chained together: the result of
80 one is passed to the next and so on.
80 one is passed to the next and so on.
81
81
82 Unlike Twisted's Deferred object, there is no errback chain. Thus
82 Unlike Twisted's Deferred object, there is no errback chain. Thus
83 any exception raised will not be caught and handled. User must
83 any exception raised will not be caught and handled. User must
84 catch these by hand when calling `get_result`.
84 catch these by hand when calling `get_result`.
85 """
85 """
86
86
87
87
88 class PendingResult(object):
88 class PendingResult(object):
89 """A representation of a result that is not yet ready.
89 """A representation of a result that is not yet ready.
90
90
91 A user should not create a `PendingResult` instance by hand.
91 A user should not create a `PendingResult` instance by hand.
92
92
93 Methods
93 Methods
94 =======
94 =======
95
95
96 * `get_result`
96 * `get_result`
97 * `add_callback`
97 * `add_callback`
98
98
99 Properties
99 Properties
100 ==========
100 ==========
101 * `r`
101 * `r`
102 """
102 """
103
103
104 def __init__(self, client, result_id):
104 def __init__(self, client, result_id):
105 """Create a PendingResult with a result_id and a client instance.
105 """Create a PendingResult with a result_id and a client instance.
106
106
107 The client should implement `_getPendingResult(result_id, block)`.
107 The client should implement `_getPendingResult(result_id, block)`.
108 """
108 """
109 self.client = client
109 self.client = client
110 self.result_id = result_id
110 self.result_id = result_id
111 self.called = False
111 self.called = False
112 self.raised = False
112 self.raised = False
113 self.callbacks = []
113 self.callbacks = []
114
114
115 def get_result(self, default=None, block=True):
115 def get_result(self, default=None, block=True):
116 """Get a result that is pending.
116 """Get a result that is pending.
117
117
118 This method will connect to an IMultiEngine adapted controller
118 This method will connect to an IMultiEngine adapted controller
119 and see if the result is ready. If the action triggers an exception
119 and see if the result is ready. If the action triggers an exception
120 raise it and record it. This method records the result/exception once it is
120 raise it and record it. This method records the result/exception once it is
121 retrieved. Calling `get_result` again will get this cached result or will
121 retrieved. Calling `get_result` again will get this cached result or will
122 re-raise the exception. The .r attribute is a property that calls
122 re-raise the exception. The .r attribute is a property that calls
123 `get_result` with block=True.
123 `get_result` with block=True.
124
124
125 :Parameters:
125 :Parameters:
126 default
126 default
127 The value to return if the result is not ready.
127 The value to return if the result is not ready.
128 block : boolean
128 block : boolean
129 Should I block for the result.
129 Should I block for the result.
130
130
131 :Returns: The actual result or the default value.
131 :Returns: The actual result or the default value.
132 """
132 """
133
133
134 if self.called:
134 if self.called:
135 if self.raised:
135 if self.raised:
136 raise self.result[0], self.result[1], self.result[2]
136 raise self.result[0], self.result[1], self.result[2]
137 else:
137 else:
138 return self.result
138 return self.result
139 try:
139 try:
140 result = self.client.get_pending_deferred(self.result_id, block)
140 result = self.client.get_pending_deferred(self.result_id, block)
141 except error.ResultNotCompleted:
141 except error.ResultNotCompleted:
142 return default
142 return default
143 except:
143 except:
144 # Reraise other error, but first record them so they can be reraised
144 # Reraise other error, but first record them so they can be reraised
145 # later if .r or get_result is called again.
145 # later if .r or get_result is called again.
146 self.result = sys.exc_info()
146 self.result = sys.exc_info()
147 self.called = True
147 self.called = True
148 self.raised = True
148 self.raised = True
149 raise
149 raise
150 else:
150 else:
151 for cb in self.callbacks:
151 for cb in self.callbacks:
152 result = cb[0](result, *cb[1], **cb[2])
152 result = cb[0](result, *cb[1], **cb[2])
153 self.result = result
153 self.result = result
154 self.called = True
154 self.called = True
155 return result
155 return result
156
156
157 def add_callback(self, f, *args, **kwargs):
157 def add_callback(self, f, *args, **kwargs):
158 """Add a callback that is called with the result.
158 """Add a callback that is called with the result.
159
159
160 If the original result is result, adding a callback will cause
160 If the original result is result, adding a callback will cause
161 f(result, *args, **kwargs) to be returned instead. If multiple
161 f(result, *args, **kwargs) to be returned instead. If multiple
162 callbacks are registered, they are chained together: the result of
162 callbacks are registered, they are chained together: the result of
163 one is passed to the next and so on.
163 one is passed to the next and so on.
164
164
165 Unlike Twisted's Deferred object, there is no errback chain. Thus
165 Unlike Twisted's Deferred object, there is no errback chain. Thus
166 any exception raised will not be caught and handled. User must
166 any exception raised will not be caught and handled. User must
167 catch these by hand when calling `get_result`.
167 catch these by hand when calling `get_result`.
168 """
168 """
169 assert callable(f)
169 assert callable(f)
170 self.callbacks.append((f, args, kwargs))
170 self.callbacks.append((f, args, kwargs))
171
171
172 def __cmp__(self, other):
172 def __cmp__(self, other):
173 if self.result_id < other.result_id:
173 if self.result_id < other.result_id:
174 return -1
174 return -1
175 else:
175 else:
176 return 1
176 return 1
177
177
178 def _get_r(self):
178 def _get_r(self):
179 return self.get_result(block=True)
179 return self.get_result(block=True)
180
180
181 r = property(_get_r)
181 r = property(_get_r)
182 """This property is a shortcut to a `get_result(block=True)`."""
182 """This property is a shortcut to a `get_result(block=True)`."""
183
183
184
184
185 #-------------------------------------------------------------------------------
185 #-------------------------------------------------------------------------------
186 # Pretty printing wrappers for certain lists
186 # Pretty printing wrappers for certain lists
187 #-------------------------------------------------------------------------------
187 #-------------------------------------------------------------------------------
188
188
189 class ResultList(list):
189 class ResultList(list):
190 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
190 """A subclass of list that pretty prints the output of `execute`/`get_result`."""
191
191
192 def __repr__(self):
192 def __repr__(self):
193 output = []
193 output = []
194 # These colored prompts were not working on Windows
194 # These colored prompts were not working on Windows
195 if sys.platform == 'win32':
195 if sys.platform == 'win32':
196 blue = normal = red = green = ''
196 blue = normal = red = green = ''
197 else:
197 else:
198 blue = TermColors.Blue
198 blue = TermColors.Blue
199 normal = TermColors.Normal
199 normal = TermColors.Normal
200 red = TermColors.Red
200 red = TermColors.Red
201 green = TermColors.Green
201 green = TermColors.Green
202 output.append("<Results List>\n")
202 output.append("<Results List>\n")
203 for cmd in self:
203 for cmd in self:
204 if isinstance(cmd, Failure):
204 if isinstance(cmd, Failure):
205 output.append(cmd)
205 output.append(cmd)
206 else:
206 else:
207 target = cmd.get('id',None)
207 target = cmd.get('id',None)
208 cmd_num = cmd.get('number',None)
208 cmd_num = cmd.get('number',None)
209 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
209 cmd_stdin = cmd.get('input',{}).get('translated','No Input')
210 cmd_stdout = cmd.get('stdout', None)
210 cmd_stdout = cmd.get('stdout', None)
211 cmd_stderr = cmd.get('stderr', None)
211 cmd_stderr = cmd.get('stderr', None)
212 output.append("%s[%i]%s In [%i]:%s %s\n" % \
212 output.append("%s[%i]%s In [%i]:%s %s\n" % \
213 (green, target,
213 (green, target,
214 blue, cmd_num, normal, cmd_stdin))
214 blue, cmd_num, normal, cmd_stdin))
215 if cmd_stdout:
215 if cmd_stdout:
216 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
216 output.append("%s[%i]%s Out[%i]:%s %s\n" % \
217 (green, target,
217 (green, target,
218 red, cmd_num, normal, cmd_stdout))
218 red, cmd_num, normal, cmd_stdout))
219 if cmd_stderr:
219 if cmd_stderr:
220 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
220 output.append("%s[%i]%s Err[%i]:\n%s %s" % \
221 (green, target,
221 (green, target,
222 red, cmd_num, normal, cmd_stderr))
222 red, cmd_num, normal, cmd_stderr))
223 return ''.join(output)
223 return ''.join(output)
224
224
225
225
226 def wrapResultList(result):
226 def wrapResultList(result):
227 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
227 """A function that wraps the output of `execute`/`get_result` -> `ResultList`."""
228 if len(result) == 0:
228 if len(result) == 0:
229 result = [result]
229 result = [result]
230 return ResultList(result)
230 return ResultList(result)
231
231
232
232
233 class QueueStatusList(list):
233 class QueueStatusList(list):
234 """A subclass of list that pretty prints the output of `queue_status`."""
234 """A subclass of list that pretty prints the output of `queue_status`."""
235
235
236 def __repr__(self):
236 def __repr__(self):
237 output = []
237 output = []
238 output.append("<Queue Status List>\n")
238 output.append("<Queue Status List>\n")
239 for e in self:
239 for e in self:
240 output.append("Engine: %s\n" % repr(e[0]))
240 output.append("Engine: %s\n" % repr(e[0]))
241 output.append(" Pending: %s\n" % repr(e[1]['pending']))
241 output.append(" Pending: %s\n" % repr(e[1]['pending']))
242 for q in e[1]['queue']:
242 for q in e[1]['queue']:
243 output.append(" Command: %s\n" % repr(q))
243 output.append(" Command: %s\n" % repr(q))
244 return ''.join(output)
244 return ''.join(output)
245
245
246
246
247 #-------------------------------------------------------------------------------
247 #-------------------------------------------------------------------------------
248 # InteractiveMultiEngineClient
248 # InteractiveMultiEngineClient
249 #-------------------------------------------------------------------------------
249 #-------------------------------------------------------------------------------
250
250
251 class InteractiveMultiEngineClient(object):
251 class InteractiveMultiEngineClient(object):
252 """A mixin class that add a few methods to a multiengine client.
252 """A mixin class that add a few methods to a multiengine client.
253
253
254 The methods in this mixin class are designed for interactive usage.
254 The methods in this mixin class are designed for interactive usage.
255 """
255 """
256
256
257 def activate(self):
257 def activate(self):
258 """Make this `MultiEngineClient` active for parallel magic commands.
258 """Make this `MultiEngineClient` active for parallel magic commands.
259
259
260 IPython has a magic command syntax to work with `MultiEngineClient` objects.
260 IPython has a magic command syntax to work with `MultiEngineClient` objects.
261 In a given IPython session there is a single active one. While
261 In a given IPython session there is a single active one. While
262 there can be many `MultiEngineClient` created and used by the user,
262 there can be many `MultiEngineClient` created and used by the user,
263 there is only one active one. The active `MultiEngineClient` is used whenever
263 there is only one active one. The active `MultiEngineClient` is used whenever
264 the magic commands %px and %autopx are used.
264 the magic commands %px and %autopx are used.
265
265
266 The activate() method is called on a given `MultiEngineClient` to make it
266 The activate() method is called on a given `MultiEngineClient` to make it
267 active. Once this has been done, the magic commands can be used.
267 active. Once this has been done, the magic commands can be used.
268 """
268 """
269
269
270 try:
270 try:
271 __IPYTHON__.activeController = self
271 __IPYTHON__.activeController = self
272 except NameError:
272 except NameError:
273 print "The IPython Controller magics only work within IPython."
273 print "The IPython Controller magics only work within IPython."
274
274
275 def __setitem__(self, key, value):
275 def __setitem__(self, key, value):
276 """Add a dictionary interface for pushing/pulling.
276 """Add a dictionary interface for pushing/pulling.
277
277
278 This functions as a shorthand for `push`.
278 This functions as a shorthand for `push`.
279
279
280 :Parameters:
280 :Parameters:
281 key : str
281 key : str
282 What to call the remote object.
282 What to call the remote object.
283 value : object
283 value : object
284 The local Python object to push.
284 The local Python object to push.
285 """
285 """
286 targets, block = self._findTargetsAndBlock()
286 targets, block = self._findTargetsAndBlock()
287 return self.push({key:value}, targets=targets, block=block)
287 return self.push({key:value}, targets=targets, block=block)
288
288
289 def __getitem__(self, key):
289 def __getitem__(self, key):
290 """Add a dictionary interface for pushing/pulling.
290 """Add a dictionary interface for pushing/pulling.
291
291
292 This functions as a shorthand to `pull`.
292 This functions as a shorthand to `pull`.
293
293
294 :Parameters:
294 :Parameters:
295 - `key`: A string representing the key.
295 - `key`: A string representing the key.
296 """
296 """
297 if isinstance(key, str):
297 if isinstance(key, str):
298 targets, block = self._findTargetsAndBlock()
298 targets, block = self._findTargetsAndBlock()
299 return self.pull(key, targets=targets, block=block)
299 return self.pull(key, targets=targets, block=block)
300 else:
300 else:
301 raise TypeError("__getitem__ only takes strs")
301 raise TypeError("__getitem__ only takes strs")
302
302
303 def __len__(self):
303 def __len__(self):
304 """Return the number of available engines."""
304 """Return the number of available engines."""
305 return len(self.get_ids())
305 return len(self.get_ids())
306
306
307 #---------------------------------------------------------------------------
307 #---------------------------------------------------------------------------
308 # Make this a context manager for with
308 # Make this a context manager for with
309 #---------------------------------------------------------------------------
309 #---------------------------------------------------------------------------
310
310
311 def findsource_file(self,f):
311 def findsource_file(self,f):
312 linecache.checkcache()
312 linecache.checkcache()
313 s = findsource(f.f_code)
313 s = findsource(f.f_code)
314 lnum = f.f_lineno
314 lnum = f.f_lineno
315 wsource = s[0][f.f_lineno:]
315 wsource = s[0][f.f_lineno:]
316 return strip_whitespace(wsource)
316 return strip_whitespace(wsource)
317
317
318 def findsource_ipython(self,f):
318 def findsource_ipython(self,f):
319 from IPython import ipapi
319 from IPython import ipapi
320 self.ip = ipapi.get()
320 self.ip = ipapi.get()
321 wsource = [l+'\n' for l in
321 wsource = [l+'\n' for l in
322 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
322 self.ip.IP.input_hist_raw[-1].splitlines()[1:]]
323 return strip_whitespace(wsource)
323 return strip_whitespace(wsource)
324
324
325 def __enter__(self):
325 def __enter__(self):
326 f = sys._getframe(1)
326 f = sys._getframe(1)
327 local_ns = f.f_locals
327 local_ns = f.f_locals
328 global_ns = f.f_globals
328 global_ns = f.f_globals
329 if f.f_code.co_filename == '<ipython console>':
329 if f.f_code.co_filename == '<ipython console>':
330 s = self.findsource_ipython(f)
330 s = self.findsource_ipython(f)
331 else:
331 else:
332 s = self.findsource_file(f)
332 s = self.findsource_file(f)
333
333
334 self._with_context_result = self.execute(s)
334 self._with_context_result = self.execute(s)
335
335
336 def __exit__ (self, etype, value, tb):
336 def __exit__ (self, etype, value, tb):
337 if issubclass(etype,error.StopLocalExecution):
337 if issubclass(etype,error.StopLocalExecution):
338 return True
338 return True
339
339
340
340
341 def remote():
341 def remote():
342 m = 'Special exception to stop local execution of parallel code.'
342 m = 'Special exception to stop local execution of parallel code.'
343 raise error.StopLocalExecution(m)
343 raise error.StopLocalExecution(m)
344
344
345 def strip_whitespace(source):
345 def strip_whitespace(source):
346 # Expand tabs to avoid any confusion.
346 # Expand tabs to avoid any confusion.
347 wsource = [l.expandtabs(4) for l in source]
347 wsource = [l.expandtabs(4) for l in source]
348 # Detect the indentation level
348 # Detect the indentation level
349 done = False
349 done = False
350 for line in wsource:
350 for line in wsource:
351 if line.isspace():
351 if line.isspace():
352 continue
352 continue
353 for col,char in enumerate(line):
353 for col,char in enumerate(line):
354 if char != ' ':
354 if char != ' ':
355 done = True
355 done = True
356 break
356 break
357 if done:
357 if done:
358 break
358 break
359 # Now we know how much leading space there is in the code. Next, we
359 # Now we know how much leading space there is in the code. Next, we
360 # extract up to the first line that has less indentation.
360 # extract up to the first line that has less indentation.
361 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
361 # WARNINGS: we skip comments that may be misindented, but we do NOT yet
362 # detect triple quoted strings that may have flush left text.
362 # detect triple quoted strings that may have flush left text.
363 for lno,line in enumerate(wsource):
363 for lno,line in enumerate(wsource):
364 lead = line[:col]
364 lead = line[:col]
365 if lead.isspace():
365 if lead.isspace():
366 continue
366 continue
367 else:
367 else:
368 if not lead.lstrip().startswith('#'):
368 if not lead.lstrip().startswith('#'):
369 break
369 break
370 # The real 'with' source is up to lno
370 # The real 'with' source is up to lno
371 src_lines = [l[col:] for l in wsource[:lno+1]]
371 src_lines = [l[col:] for l in wsource[:lno+1]]
372
372
373 # Finally, check that the source's first non-comment line begins with the
373 # Finally, check that the source's first non-comment line begins with the
374 # special call 'remote()'
374 # special call 'remote()'
375 for nline,line in enumerate(src_lines):
375 for nline,line in enumerate(src_lines):
376 if line.isspace() or line.startswith('#'):
376 if line.isspace() or line.startswith('#'):
377 continue
377 continue
378 if 'remote()' in line:
378 if 'remote()' in line:
379 break
379 break
380 else:
380 else:
381 raise ValueError('remote() call missing at the start of code')
381 raise ValueError('remote() call missing at the start of code')
382 src = ''.join(src_lines[nline+1:])
382 src = ''.join(src_lines[nline+1:])
383 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
383 #print 'SRC:\n<<<<<<<>>>>>>>\n%s<<<<<>>>>>>' % src # dbg
384 return src
384 return src
385
385
386
386
387 #-------------------------------------------------------------------------------
387 #-------------------------------------------------------------------------------
388 # The top-level MultiEngine client adaptor
388 # The top-level MultiEngine client adaptor
389 #-------------------------------------------------------------------------------
389 #-------------------------------------------------------------------------------
390
390
391
391
392 class IFullBlockingMultiEngineClient(Interface):
392 class IFullBlockingMultiEngineClient(Interface):
393 pass
393 pass
394
394
395
395
396 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
396 class FullBlockingMultiEngineClient(InteractiveMultiEngineClient):
397 """
397 """
398 A blocking client to the `IMultiEngine` controller interface.
398 A blocking client to the `IMultiEngine` controller interface.
399
399
400 This class allows users to use a set of engines for a parallel
400 This class allows users to use a set of engines for a parallel
401 computation through the `IMultiEngine` interface. In this interface,
401 computation through the `IMultiEngine` interface. In this interface,
402 each engine has a specific id (an int) that is used to refer to the
402 each engine has a specific id (an int) that is used to refer to the
403 engine, run code on it, etc.
403 engine, run code on it, etc.
404 """
404 """
405
405
406 implements(
406 implements(
407 IFullBlockingMultiEngineClient,
407 IFullBlockingMultiEngineClient,
408 IMultiEngineMapperFactory,
408 IMultiEngineMapperFactory,
409 IMapper
409 IMapper
410 )
410 )
411
411
412 def __init__(self, smultiengine):
412 def __init__(self, smultiengine):
413 self.smultiengine = smultiengine
413 self.smultiengine = smultiengine
414 self.block = True
414 self.block = True
415 self.targets = 'all'
415 self.targets = 'all'
416
416
417 def _findBlock(self, block=None):
417 def _findBlock(self, block=None):
418 if block is None:
418 if block is None:
419 return self.block
419 return self.block
420 else:
420 else:
421 if block in (True, False):
421 if block in (True, False):
422 return block
422 return block
423 else:
423 else:
424 raise ValueError("block must be True or False")
424 raise ValueError("block must be True or False")
425
425
426 def _findTargets(self, targets=None):
426 def _findTargets(self, targets=None):
427 if targets is None:
427 if targets is None:
428 return self.targets
428 return self.targets
429 else:
429 else:
430 if not isinstance(targets, (str,list,tuple,int)):
430 if not isinstance(targets, (str,list,tuple,int)):
431 raise ValueError("targets must be a str, list, tuple or int")
431 raise ValueError("targets must be a str, list, tuple or int")
432 return targets
432 return targets
433
433
434 def _findTargetsAndBlock(self, targets=None, block=None):
434 def _findTargetsAndBlock(self, targets=None, block=None):
435 return self._findTargets(targets), self._findBlock(block)
435 return self._findTargets(targets), self._findBlock(block)
436
436
437 def _blockFromThread(self, function, *args, **kwargs):
437 def _blockFromThread(self, function, *args, **kwargs):
438 block = kwargs.get('block', None)
438 block = kwargs.get('block', None)
439 if block is None:
439 if block is None:
440 raise error.MissingBlockArgument("'block' keyword argument is missing")
440 raise error.MissingBlockArgument("'block' keyword argument is missing")
441 result = blockingCallFromThread(function, *args, **kwargs)
441 result = blockingCallFromThread(function, *args, **kwargs)
442 if not block:
442 if not block:
443 result = PendingResult(self, result)
443 result = PendingResult(self, result)
444 return result
444 return result
445
445
446 def get_pending_deferred(self, deferredID, block):
446 def get_pending_deferred(self, deferredID, block):
447 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
447 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
448
448
449 def barrier(self, pendingResults):
449 def barrier(self, pendingResults):
450 """Synchronize a set of `PendingResults`.
450 """Synchronize a set of `PendingResults`.
451
451
452 This method is a synchronization primitive that waits for a set of
452 This method is a synchronization primitive that waits for a set of
453 `PendingResult` objects to complete. More specifically, barier does
453 `PendingResult` objects to complete. More specifically, barier does
454 the following.
454 the following.
455
455
456 * The `PendingResult`s are sorted by result_id.
456 * The `PendingResult`s are sorted by result_id.
457 * The `get_result` method is called for each `PendingResult` sequentially
457 * The `get_result` method is called for each `PendingResult` sequentially
458 with block=True.
458 with block=True.
459 * If a `PendingResult` gets a result that is an exception, it is
459 * If a `PendingResult` gets a result that is an exception, it is
460 trapped and can be re-raised later by calling `get_result` again.
460 trapped and can be re-raised later by calling `get_result` again.
461 * The `PendingResult`s are flushed from the controller.
461 * The `PendingResult`s are flushed from the controller.
462
462
463 After barrier has been called on a `PendingResult`, its results can
463 After barrier has been called on a `PendingResult`, its results can
464 be retrieved by calling `get_result` again or accesing the `r` attribute
464 be retrieved by calling `get_result` again or accesing the `r` attribute
465 of the instance.
465 of the instance.
466 """
466 """
467
467
468 # Convert to list for sorting and check class type
468 # Convert to list for sorting and check class type
469 prList = list(pendingResults)
469 prList = list(pendingResults)
470 for pr in prList:
470 for pr in prList:
471 if not isinstance(pr, PendingResult):
471 if not isinstance(pr, PendingResult):
472 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
472 raise error.NotAPendingResult("Objects passed to barrier must be PendingResult instances")
473
473
474 # Sort the PendingResults so they are in order
474 # Sort the PendingResults so they are in order
475 prList.sort()
475 prList.sort()
476 # Block on each PendingResult object
476 # Block on each PendingResult object
477 for pr in prList:
477 for pr in prList:
478 try:
478 try:
479 result = pr.get_result(block=True)
479 result = pr.get_result(block=True)
480 except Exception:
480 except Exception:
481 pass
481 pass
482
482
483 def flush(self):
483 def flush(self):
484 """
484 """
485 Clear all pending deferreds/results from the controller.
485 Clear all pending deferreds/results from the controller.
486
486
487 For each `PendingResult` that is created by this client, the controller
487 For each `PendingResult` that is created by this client, the controller
488 holds on to the result for that `PendingResult`. This can be a problem
488 holds on to the result for that `PendingResult`. This can be a problem
489 if there are a large number of `PendingResult` objects that are created.
489 if there are a large number of `PendingResult` objects that are created.
490
490
491 Once the result of the `PendingResult` has been retrieved, the result
491 Once the result of the `PendingResult` has been retrieved, the result
492 is removed from the controller, but if a user doesn't get a result (
492 is removed from the controller, but if a user doesn't get a result (
493 they just ignore the `PendingResult`) the result is kept forever on the
493 they just ignore the `PendingResult`) the result is kept forever on the
494 controller. This method allows the user to clear out all un-retrieved
494 controller. This method allows the user to clear out all un-retrieved
495 results on the controller.
495 results on the controller.
496 """
496 """
497 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
497 r = blockingCallFromThread(self.smultiengine.clear_pending_deferreds)
498 return r
498 return r
499
499
500 clear_pending_results = flush
500 clear_pending_results = flush
501
501
502 #---------------------------------------------------------------------------
502 #---------------------------------------------------------------------------
503 # IEngineMultiplexer related methods
503 # IEngineMultiplexer related methods
504 #---------------------------------------------------------------------------
504 #---------------------------------------------------------------------------
505
505
506 def execute(self, lines, targets=None, block=None):
506 def execute(self, lines, targets=None, block=None):
507 """
507 """
508 Execute code on a set of engines.
508 Execute code on a set of engines.
509
509
510 :Parameters:
510 :Parameters:
511 lines : str
511 lines : str
512 The Python code to execute as a string
512 The Python code to execute as a string
513 targets : id or list of ids
513 targets : id or list of ids
514 The engine to use for the execution
514 The engine to use for the execution
515 block : boolean
515 block : boolean
516 If False, this method will return the actual result. If False,
516 If False, this method will return the actual result. If False,
517 a `PendingResult` is returned which can be used to get the result
517 a `PendingResult` is returned which can be used to get the result
518 at a later time.
518 at a later time.
519 """
519 """
520 targets, block = self._findTargetsAndBlock(targets, block)
520 targets, block = self._findTargetsAndBlock(targets, block)
521 result = blockingCallFromThread(self.smultiengine.execute, lines,
521 result = blockingCallFromThread(self.smultiengine.execute, lines,
522 targets=targets, block=block)
522 targets=targets, block=block)
523 if block:
523 if block:
524 result = ResultList(result)
524 result = ResultList(result)
525 else:
525 else:
526 result = PendingResult(self, result)
526 result = PendingResult(self, result)
527 result.add_callback(wrapResultList)
527 result.add_callback(wrapResultList)
528 return result
528 return result
529
529
530 def push(self, namespace, targets=None, block=None):
530 def push(self, namespace, targets=None, block=None):
531 """
531 """
532 Push a dictionary of keys and values to engines namespace.
532 Push a dictionary of keys and values to engines namespace.
533
533
534 Each engine has a persistent namespace. This method is used to push
534 Each engine has a persistent namespace. This method is used to push
535 Python objects into that namespace.
535 Python objects into that namespace.
536
536
537 The objects in the namespace must be pickleable.
537 The objects in the namespace must be pickleable.
538
538
539 :Parameters:
539 :Parameters:
540 namespace : dict
540 namespace : dict
541 A dict that contains Python objects to be injected into
541 A dict that contains Python objects to be injected into
542 the engine persistent namespace.
542 the engine persistent namespace.
543 targets : id or list of ids
543 targets : id or list of ids
544 The engine to use for the execution
544 The engine to use for the execution
545 block : boolean
545 block : boolean
546 If False, this method will return the actual result. If False,
546 If False, this method will return the actual result. If False,
547 a `PendingResult` is returned which can be used to get the result
547 a `PendingResult` is returned which can be used to get the result
548 at a later time.
548 at a later time.
549 """
549 """
550 targets, block = self._findTargetsAndBlock(targets, block)
550 targets, block = self._findTargetsAndBlock(targets, block)
551 return self._blockFromThread(self.smultiengine.push, namespace,
551 return self._blockFromThread(self.smultiengine.push, namespace,
552 targets=targets, block=block)
552 targets=targets, block=block)
553
553
554 def pull(self, keys, targets=None, block=None):
554 def pull(self, keys, targets=None, block=None):
555 """
555 """
556 Pull Python objects by key out of engines namespaces.
556 Pull Python objects by key out of engines namespaces.
557
557
558 :Parameters:
558 :Parameters:
559 keys : str or list of str
559 keys : str or list of str
560 The names of the variables to be pulled
560 The names of the variables to be pulled
561 targets : id or list of ids
561 targets : id or list of ids
562 The engine to use for the execution
562 The engine to use for the execution
563 block : boolean
563 block : boolean
564 If False, this method will return the actual result. If False,
564 If False, this method will return the actual result. If False,
565 a `PendingResult` is returned which can be used to get the result
565 a `PendingResult` is returned which can be used to get the result
566 at a later time.
566 at a later time.
567 """
567 """
568 targets, block = self._findTargetsAndBlock(targets, block)
568 targets, block = self._findTargetsAndBlock(targets, block)
569 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
569 return self._blockFromThread(self.smultiengine.pull, keys, targets=targets, block=block)
570
570
571 def push_function(self, namespace, targets=None, block=None):
571 def push_function(self, namespace, targets=None, block=None):
572 """
572 """
573 Push a Python function to an engine.
573 Push a Python function to an engine.
574
574
575 This method is used to push a Python function to an engine. This
575 This method is used to push a Python function to an engine. This
576 method can then be used in code on the engines. Closures are not supported.
576 method can then be used in code on the engines. Closures are not supported.
577
577
578 :Parameters:
578 :Parameters:
579 namespace : dict
579 namespace : dict
580 A dict whose values are the functions to be pushed. The keys give
580 A dict whose values are the functions to be pushed. The keys give
581 that names that the function will appear as in the engines
581 that names that the function will appear as in the engines
582 namespace.
582 namespace.
583 targets : id or list of ids
583 targets : id or list of ids
584 The engine to use for the execution
584 The engine to use for the execution
585 block : boolean
585 block : boolean
586 If False, this method will return the actual result. If False,
586 If False, this method will return the actual result. If False,
587 a `PendingResult` is returned which can be used to get the result
587 a `PendingResult` is returned which can be used to get the result
588 at a later time.
588 at a later time.
589 """
589 """
590 targets, block = self._findTargetsAndBlock(targets, block)
590 targets, block = self._findTargetsAndBlock(targets, block)
591 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
591 return self._blockFromThread(self.smultiengine.push_function, namespace, targets=targets, block=block)
592
592
593 def pull_function(self, keys, targets=None, block=None):
593 def pull_function(self, keys, targets=None, block=None):
594 """
594 """
595 Pull a Python function from an engine.
595 Pull a Python function from an engine.
596
596
597 This method is used to pull a Python function from an engine.
597 This method is used to pull a Python function from an engine.
598 Closures are not supported.
598 Closures are not supported.
599
599
600 :Parameters:
600 :Parameters:
601 keys : str or list of str
601 keys : str or list of str
602 The names of the functions to be pulled
602 The names of the functions to be pulled
603 targets : id or list of ids
603 targets : id or list of ids
604 The engine to use for the execution
604 The engine to use for the execution
605 block : boolean
605 block : boolean
606 If False, this method will return the actual result. If False,
606 If False, this method will return the actual result. If False,
607 a `PendingResult` is returned which can be used to get the result
607 a `PendingResult` is returned which can be used to get the result
608 at a later time.
608 at a later time.
609 """
609 """
610 targets, block = self._findTargetsAndBlock(targets, block)
610 targets, block = self._findTargetsAndBlock(targets, block)
611 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
611 return self._blockFromThread(self.smultiengine.pull_function, keys, targets=targets, block=block)
612
612
613 def push_serialized(self, namespace, targets=None, block=None):
613 def push_serialized(self, namespace, targets=None, block=None):
614 targets, block = self._findTargetsAndBlock(targets, block)
614 targets, block = self._findTargetsAndBlock(targets, block)
615 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
615 return self._blockFromThread(self.smultiengine.push_serialized, namespace, targets=targets, block=block)
616
616
617 def pull_serialized(self, keys, targets=None, block=None):
617 def pull_serialized(self, keys, targets=None, block=None):
618 targets, block = self._findTargetsAndBlock(targets, block)
618 targets, block = self._findTargetsAndBlock(targets, block)
619 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
619 return self._blockFromThread(self.smultiengine.pull_serialized, keys, targets=targets, block=block)
620
620
621 def get_result(self, i=None, targets=None, block=None):
621 def get_result(self, i=None, targets=None, block=None):
622 """
622 """
623 Get a previous result.
623 Get a previous result.
624
624
625 When code is executed in an engine, a dict is created and returned. This
625 When code is executed in an engine, a dict is created and returned. This
626 method retrieves that dict for previous commands.
626 method retrieves that dict for previous commands.
627
627
628 :Parameters:
628 :Parameters:
629 i : int
629 i : int
630 The number of the result to get
630 The number of the result to get
631 targets : id or list of ids
631 targets : id or list of ids
632 The engine to use for the execution
632 The engine to use for the execution
633 block : boolean
633 block : boolean
634 If False, this method will return the actual result. If False,
634 If False, this method will return the actual result. If False,
635 a `PendingResult` is returned which can be used to get the result
635 a `PendingResult` is returned which can be used to get the result
636 at a later time.
636 at a later time.
637 """
637 """
638 targets, block = self._findTargetsAndBlock(targets, block)
638 targets, block = self._findTargetsAndBlock(targets, block)
639 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
639 result = blockingCallFromThread(self.smultiengine.get_result, i, targets=targets, block=block)
640 if block:
640 if block:
641 result = ResultList(result)
641 result = ResultList(result)
642 else:
642 else:
643 result = PendingResult(self, result)
643 result = PendingResult(self, result)
644 result.add_callback(wrapResultList)
644 result.add_callback(wrapResultList)
645 return result
645 return result
646
646
647 def reset(self, targets=None, block=None):
647 def reset(self, targets=None, block=None):
648 """
648 """
649 Reset an engine.
649 Reset an engine.
650
650
651 This method clears out the namespace of an engine.
651 This method clears out the namespace of an engine.
652
652
653 :Parameters:
653 :Parameters:
654 targets : id or list of ids
654 targets : id or list of ids
655 The engine to use for the execution
655 The engine to use for the execution
656 block : boolean
656 block : boolean
657 If False, this method will return the actual result. If False,
657 If False, this method will return the actual result. If False,
658 a `PendingResult` is returned which can be used to get the result
658 a `PendingResult` is returned which can be used to get the result
659 at a later time.
659 at a later time.
660 """
660 """
661 targets, block = self._findTargetsAndBlock(targets, block)
661 targets, block = self._findTargetsAndBlock(targets, block)
662 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
662 return self._blockFromThread(self.smultiengine.reset, targets=targets, block=block)
663
663
664 def keys(self, targets=None, block=None):
664 def keys(self, targets=None, block=None):
665 """
665 """
666 Get a list of all the variables in an engine's namespace.
666 Get a list of all the variables in an engine's namespace.
667
667
668 :Parameters:
668 :Parameters:
669 targets : id or list of ids
669 targets : id or list of ids
670 The engine to use for the execution
670 The engine to use for the execution
671 block : boolean
671 block : boolean
672 If False, this method will return the actual result. If False,
672 If False, this method will return the actual result. If False,
673 a `PendingResult` is returned which can be used to get the result
673 a `PendingResult` is returned which can be used to get the result
674 at a later time.
674 at a later time.
675 """
675 """
676 targets, block = self._findTargetsAndBlock(targets, block)
676 targets, block = self._findTargetsAndBlock(targets, block)
677 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
677 return self._blockFromThread(self.smultiengine.keys, targets=targets, block=block)
678
678
679 def kill(self, controller=False, targets=None, block=None):
679 def kill(self, controller=False, targets=None, block=None):
680 """
680 """
681 Kill the engines and controller.
681 Kill the engines and controller.
682
682
683 This method is used to stop the engine and controller by calling
683 This method is used to stop the engine and controller by calling
684 `reactor.stop`.
684 `reactor.stop`.
685
685
686 :Parameters:
686 :Parameters:
687 controller : boolean
687 controller : boolean
688 If True, kill the engines and controller. If False, just the
688 If True, kill the engines and controller. If False, just the
689 engines
689 engines
690 targets : id or list of ids
690 targets : id or list of ids
691 The engine to use for the execution
691 The engine to use for the execution
692 block : boolean
692 block : boolean
693 If False, this method will return the actual result. If False,
693 If False, this method will return the actual result. If False,
694 a `PendingResult` is returned which can be used to get the result
694 a `PendingResult` is returned which can be used to get the result
695 at a later time.
695 at a later time.
696 """
696 """
697 targets, block = self._findTargetsAndBlock(targets, block)
697 targets, block = self._findTargetsAndBlock(targets, block)
698 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
698 return self._blockFromThread(self.smultiengine.kill, controller, targets=targets, block=block)
699
699
700 def clear_queue(self, targets=None, block=None):
700 def clear_queue(self, targets=None, block=None):
701 """
701 """
702 Clear out the controller's queue for an engine.
702 Clear out the controller's queue for an engine.
703
703
704 The controller maintains a queue for each engine. This clear it out.
704 The controller maintains a queue for each engine. This clear it out.
705
705
706 :Parameters:
706 :Parameters:
707 targets : id or list of ids
707 targets : id or list of ids
708 The engine to use for the execution
708 The engine to use for the execution
709 block : boolean
709 block : boolean
710 If False, this method will return the actual result. If False,
710 If False, this method will return the actual result. If False,
711 a `PendingResult` is returned which can be used to get the result
711 a `PendingResult` is returned which can be used to get the result
712 at a later time.
712 at a later time.
713 """
713 """
714 targets, block = self._findTargetsAndBlock(targets, block)
714 targets, block = self._findTargetsAndBlock(targets, block)
715 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
715 return self._blockFromThread(self.smultiengine.clear_queue, targets=targets, block=block)
716
716
717 def queue_status(self, targets=None, block=None):
717 def queue_status(self, targets=None, block=None):
718 """
718 """
719 Get the status of an engines queue.
719 Get the status of an engines queue.
720
720
721 :Parameters:
721 :Parameters:
722 targets : id or list of ids
722 targets : id or list of ids
723 The engine to use for the execution
723 The engine to use for the execution
724 block : boolean
724 block : boolean
725 If False, this method will return the actual result. If False,
725 If False, this method will return the actual result. If False,
726 a `PendingResult` is returned which can be used to get the result
726 a `PendingResult` is returned which can be used to get the result
727 at a later time.
727 at a later time.
728 """
728 """
729 targets, block = self._findTargetsAndBlock(targets, block)
729 targets, block = self._findTargetsAndBlock(targets, block)
730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
730 return self._blockFromThread(self.smultiengine.queue_status, targets=targets, block=block)
731
731
732 def set_properties(self, properties, targets=None, block=None):
732 def set_properties(self, properties, targets=None, block=None):
733 targets, block = self._findTargetsAndBlock(targets, block)
733 targets, block = self._findTargetsAndBlock(targets, block)
734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
734 return self._blockFromThread(self.smultiengine.set_properties, properties, targets=targets, block=block)
735
735
736 def get_properties(self, keys=None, targets=None, block=None):
736 def get_properties(self, keys=None, targets=None, block=None):
737 targets, block = self._findTargetsAndBlock(targets, block)
737 targets, block = self._findTargetsAndBlock(targets, block)
738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
738 return self._blockFromThread(self.smultiengine.get_properties, keys, targets=targets, block=block)
739
739
740 def has_properties(self, keys, targets=None, block=None):
740 def has_properties(self, keys, targets=None, block=None):
741 targets, block = self._findTargetsAndBlock(targets, block)
741 targets, block = self._findTargetsAndBlock(targets, block)
742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
742 return self._blockFromThread(self.smultiengine.has_properties, keys, targets=targets, block=block)
743
743
744 def del_properties(self, keys, targets=None, block=None):
744 def del_properties(self, keys, targets=None, block=None):
745 targets, block = self._findTargetsAndBlock(targets, block)
745 targets, block = self._findTargetsAndBlock(targets, block)
746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
746 return self._blockFromThread(self.smultiengine.del_properties, keys, targets=targets, block=block)
747
747
748 def clear_properties(self, targets=None, block=None):
748 def clear_properties(self, targets=None, block=None):
749 targets, block = self._findTargetsAndBlock(targets, block)
749 targets, block = self._findTargetsAndBlock(targets, block)
750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
750 return self._blockFromThread(self.smultiengine.clear_properties, targets=targets, block=block)
751
751
752 #---------------------------------------------------------------------------
752 #---------------------------------------------------------------------------
753 # IMultiEngine related methods
753 # IMultiEngine related methods
754 #---------------------------------------------------------------------------
754 #---------------------------------------------------------------------------
755
755
756 def get_ids(self):
756 def get_ids(self):
757 """
757 """
758 Returns the ids of currently registered engines.
758 Returns the ids of currently registered engines.
759 """
759 """
760 result = blockingCallFromThread(self.smultiengine.get_ids)
760 result = blockingCallFromThread(self.smultiengine.get_ids)
761 return result
761 return result
762
762
763 #---------------------------------------------------------------------------
763 #---------------------------------------------------------------------------
764 # IMultiEngineCoordinator
764 # IMultiEngineCoordinator
765 #---------------------------------------------------------------------------
765 #---------------------------------------------------------------------------
766
766
767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
767 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
768 """
768 """
769 Partition a Python sequence and send the partitions to a set of engines.
769 Partition a Python sequence and send the partitions to a set of engines.
770 """
770 """
771 targets, block = self._findTargetsAndBlock(targets, block)
771 targets, block = self._findTargetsAndBlock(targets, block)
772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
772 return self._blockFromThread(self.smultiengine.scatter, key, seq,
773 dist, flatten, targets=targets, block=block)
773 dist, flatten, targets=targets, block=block)
774
774
775 def gather(self, key, dist='b', targets=None, block=None):
775 def gather(self, key, dist='b', targets=None, block=None):
776 """
776 """
777 Gather a partitioned sequence on a set of engines as a single local seq.
777 Gather a partitioned sequence on a set of engines as a single local seq.
778 """
778 """
779 targets, block = self._findTargetsAndBlock(targets, block)
779 targets, block = self._findTargetsAndBlock(targets, block)
780 return self._blockFromThread(self.smultiengine.gather, key, dist,
780 return self._blockFromThread(self.smultiengine.gather, key, dist,
781 targets=targets, block=block)
781 targets=targets, block=block)
782
782
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
783 def raw_map(self, func, seq, dist='b', targets=None, block=None):
784 """
784 """
785 A parallelized version of Python's builtin map.
785 A parallelized version of Python's builtin map.
786
786
787 This has a slightly different syntax than the builtin `map`.
787 This has a slightly different syntax than the builtin `map`.
788 This is needed because we need to have keyword arguments and thus
788 This is needed because we need to have keyword arguments and thus
789 can't use *args to capture all the sequences. Instead, they must
789 can't use *args to capture all the sequences. Instead, they must
790 be passed in a list or tuple.
790 be passed in a list or tuple.
791
791
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
792 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
793
793
794 Most users will want to use parallel functions or the `mapper`
794 Most users will want to use parallel functions or the `mapper`
795 and `map` methods for an API that follows that of the builtin
795 and `map` methods for an API that follows that of the builtin
796 `map`.
796 `map`.
797 """
797 """
798 targets, block = self._findTargetsAndBlock(targets, block)
798 targets, block = self._findTargetsAndBlock(targets, block)
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
799 return self._blockFromThread(self.smultiengine.raw_map, func, seq,
800 dist, targets=targets, block=block)
800 dist, targets=targets, block=block)
801
801
802 def map(self, func, *sequences):
802 def map(self, func, *sequences):
803 """
803 """
804 A parallel version of Python's builtin `map` function.
804 A parallel version of Python's builtin `map` function.
805
805
806 This method applies a function to sequences of arguments. It
806 This method applies a function to sequences of arguments. It
807 follows the same syntax as the builtin `map`.
807 follows the same syntax as the builtin `map`.
808
808
809 This method creates a mapper objects by calling `self.mapper` with
809 This method creates a mapper objects by calling `self.mapper` with
810 no arguments and then uses that mapper to do the mapping. See
810 no arguments and then uses that mapper to do the mapping. See
811 the documentation of `mapper` for more details.
811 the documentation of `mapper` for more details.
812 """
812 """
813 return self.mapper().map(func, *sequences)
813 return self.mapper().map(func, *sequences)
814
814
815 def mapper(self, dist='b', targets='all', block=None):
815 def mapper(self, dist='b', targets='all', block=None):
816 """
816 """
817 Create a mapper object that has a `map` method.
817 Create a mapper object that has a `map` method.
818
818
819 This method returns an object that implements the `IMapper`
819 This method returns an object that implements the `IMapper`
820 interface. This method is a factory that is used to control how
820 interface. This method is a factory that is used to control how
821 the map happens.
821 the map happens.
822
822
823 :Parameters:
823 :Parameters:
824 dist : str
824 dist : str
825 What decomposition to use, 'b' is the only one supported
825 What decomposition to use, 'b' is the only one supported
826 currently
826 currently
827 targets : str, int, sequence of ints
827 targets : str, int, sequence of ints
828 Which engines to use for the map
828 Which engines to use for the map
829 block : boolean
829 block : boolean
830 Should calls to `map` block or not
830 Should calls to `map` block or not
831 """
831 """
832 return MultiEngineMapper(self, dist, targets, block)
832 return MultiEngineMapper(self, dist, targets, block)
833
833
834 def parallel(self, dist='b', targets=None, block=None):
834 def parallel(self, dist='b', targets=None, block=None):
835 """
835 """
836 A decorator that turns a function into a parallel function.
836 A decorator that turns a function into a parallel function.
837
837
838 This can be used as:
838 This can be used as:
839
839
840 @parallel()
840 @parallel()
841 def f(x, y)
841 def f(x, y)
842 ...
842 ...
843
843
844 f(range(10), range(10))
844 f(range(10), range(10))
845
845
846 This causes f(0,0), f(1,1), ... to be called in parallel.
846 This causes f(0,0), f(1,1), ... to be called in parallel.
847
847
848 :Parameters:
848 :Parameters:
849 dist : str
849 dist : str
850 What decomposition to use, 'b' is the only one supported
850 What decomposition to use, 'b' is the only one supported
851 currently
851 currently
852 targets : str, int, sequence of ints
852 targets : str, int, sequence of ints
853 Which engines to use for the map
853 Which engines to use for the map
854 block : boolean
854 block : boolean
855 Should calls to `map` block or not
855 Should calls to `map` block or not
856 """
856 """
857 targets, block = self._findTargetsAndBlock(targets, block)
857 targets, block = self._findTargetsAndBlock(targets, block)
858 mapper = self.mapper(dist, targets, block)
858 mapper = self.mapper(dist, targets, block)
859 pf = ParallelFunction(mapper)
859 pf = ParallelFunction(mapper)
860 return pf
860 return pf
861
861
862 #---------------------------------------------------------------------------
862 #---------------------------------------------------------------------------
863 # IMultiEngineExtras
863 # IMultiEngineExtras
864 #---------------------------------------------------------------------------
864 #---------------------------------------------------------------------------
865
865
866 def zip_pull(self, keys, targets=None, block=None):
866 def zip_pull(self, keys, targets=None, block=None):
867 targets, block = self._findTargetsAndBlock(targets, block)
867 targets, block = self._findTargetsAndBlock(targets, block)
868 return self._blockFromThread(self.smultiengine.zip_pull, keys,
868 return self._blockFromThread(self.smultiengine.zip_pull, keys,
869 targets=targets, block=block)
869 targets=targets, block=block)
870
870
871 def run(self, filename, targets=None, block=None):
871 def run(self, filename, targets=None, block=None):
872 """
872 """
873 Run a Python code in a file on the engines.
873 Run a Python code in a file on the engines.
874
874
875 :Parameters:
875 :Parameters:
876 filename : str
876 filename : str
877 The name of the local file to run
877 The name of the local file to run
878 targets : id or list of ids
878 targets : id or list of ids
879 The engine to use for the execution
879 The engine to use for the execution
880 block : boolean
880 block : boolean
881 If False, this method will return the actual result. If False,
881 If False, this method will return the actual result. If False,
882 a `PendingResult` is returned which can be used to get the result
882 a `PendingResult` is returned which can be used to get the result
883 at a later time.
883 at a later time.
884 """
884 """
885 targets, block = self._findTargetsAndBlock(targets, block)
885 targets, block = self._findTargetsAndBlock(targets, block)
886 return self._blockFromThread(self.smultiengine.run, filename,
886 return self._blockFromThread(self.smultiengine.run, filename,
887 targets=targets, block=block)
887 targets=targets, block=block)
888
888
889 def benchmark(self, push_size=10000):
890 """
891 Run performance benchmarks for the current IPython cluster.
892
893 This method tests both the latency of sending command and data to the
894 engines as well as the throughput of sending large objects to the
895 engines using push. The latency is measured by having one or more
896 engines execute the command 'pass'. The throughput is measure by
897 sending an NumPy array of size `push_size` to one or more engines.
898
899 These benchmarks will vary widely on different hardware and networks
900 and thus can be used to get an idea of the performance characteristics
901 of a particular configuration of an IPython controller and engines.
902
903 This function is not testable within our current testing framework.
904 """
905 import timeit, __builtin__
906 __builtin__._mec_self = self
907 benchmarks = {}
908 repeat = 3
909 count = 10
910
911 timer = timeit.Timer('_mec_self.execute("pass",0)')
912 result = 1000*min(timer.repeat(repeat,count))/count
913 benchmarks['single_engine_latency'] = (result,'msec')
914
915 timer = timeit.Timer('_mec_self.execute("pass")')
916 result = 1000*min(timer.repeat(repeat,count))/count
917 benchmarks['all_engine_latency'] = (result,'msec')
918
919 try:
920 import numpy as np
921 except:
922 pass
923 else:
924 timer = timeit.Timer(
925 "_mec_self.push(d)",
926 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
927 )
928 result = min(timer.repeat(repeat,count))/count
929 benchmarks['all_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
930
931 try:
932 import numpy as np
933 except:
934 pass
935 else:
936 timer = timeit.Timer(
937 "_mec_self.push(d,0)",
938 "import numpy as np; d = dict(a=np.zeros(%r,dtype='float64'))" % push_size
939 )
940 result = min(timer.repeat(repeat,count))/count
941 benchmarks['single_engine_push'] = (1e-6*push_size*8/result, 'MB/sec')
942
943 return benchmarks
889
944
890
945
891 components.registerAdapter(FullBlockingMultiEngineClient,
946 components.registerAdapter(FullBlockingMultiEngineClient,
892 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
947 IFullSynchronousMultiEngine, IFullBlockingMultiEngineClient)
893
948
894
949
895
950
896
951
General Comments 0
You need to be logged in to leave comments. Login now