##// END OF EJS Templates
split get_results into get_result/result_status, add AsyncHubResult
MinRK -
Show More
@@ -1,232 +1,294 b''
1 """AsyncResult objects for the client"""
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import time
14
13 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
14 import error
16 import error
15
17
16 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
17 # Classes
19 # Classes
18 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
19
21
20 @decorator
22 @decorator
21 def check_ready(f, self, *args, **kwargs):
23 def check_ready(f, self, *args, **kwargs):
22 """Call spin() to sync state prior to calling the method."""
24 """Call spin() to sync state prior to calling the method."""
23 self.wait(0)
25 self.wait(0)
24 if not self._ready:
26 if not self._ready:
25 raise error.TimeoutError("result not ready")
27 raise error.TimeoutError("result not ready")
26 return f(self, *args, **kwargs)
28 return f(self, *args, **kwargs)
27
29
28 class AsyncResult(object):
30 class AsyncResult(object):
29 """Class for representing results of non-blocking calls.
31 """Class for representing results of non-blocking calls.
30
32
31 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
33 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
32 """
34 """
33
35
34 msg_ids = None
36 msg_ids = None
35
37
36 def __init__(self, client, msg_ids, fname=''):
38 def __init__(self, client, msg_ids, fname=''):
37 self._client = client
39 self._client = client
38 if isinstance(msg_ids, basestring):
40 if isinstance(msg_ids, basestring):
39 msg_ids = [msg_ids]
41 msg_ids = [msg_ids]
40 self.msg_ids = msg_ids
42 self.msg_ids = msg_ids
41 self._fname=fname
43 self._fname=fname
42 self._ready = False
44 self._ready = False
43 self._success = None
45 self._success = None
44 self._single_result = len(msg_ids) == 1
46 self._single_result = len(msg_ids) == 1
45
47
46 def __repr__(self):
48 def __repr__(self):
47 if self._ready:
49 if self._ready:
48 return "<%s: finished>"%(self.__class__.__name__)
50 return "<%s: finished>"%(self.__class__.__name__)
49 else:
51 else:
50 return "<%s: %s>"%(self.__class__.__name__,self._fname)
52 return "<%s: %s>"%(self.__class__.__name__,self._fname)
51
53
52
54
53 def _reconstruct_result(self, res):
55 def _reconstruct_result(self, res):
54 """
56 """
55 Override me in subclasses for turning a list of results
57 Override me in subclasses for turning a list of results
56 into the expected form.
58 into the expected form.
57 """
59 """
58 if self._single_result:
60 if self._single_result:
59 return res[0]
61 return res[0]
60 else:
62 else:
61 return res
63 return res
62
64
63 def get(self, timeout=-1):
65 def get(self, timeout=-1):
64 """Return the result when it arrives.
66 """Return the result when it arrives.
65
67
66 If `timeout` is not ``None`` and the result does not arrive within
68 If `timeout` is not ``None`` and the result does not arrive within
67 `timeout` seconds then ``TimeoutError`` is raised. If the
69 `timeout` seconds then ``TimeoutError`` is raised. If the
68 remote call raised an exception then that exception will be reraised
70 remote call raised an exception then that exception will be reraised
69 by get().
71 by get().
70 """
72 """
71 if not self.ready():
73 if not self.ready():
72 self.wait(timeout)
74 self.wait(timeout)
73
75
74 if self._ready:
76 if self._ready:
75 if self._success:
77 if self._success:
76 return self._result
78 return self._result
77 else:
79 else:
78 raise self._exception
80 raise self._exception
79 else:
81 else:
80 raise error.TimeoutError("Result not ready.")
82 raise error.TimeoutError("Result not ready.")
81
83
82 def ready(self):
84 def ready(self):
83 """Return whether the call has completed."""
85 """Return whether the call has completed."""
84 if not self._ready:
86 if not self._ready:
85 self.wait(0)
87 self.wait(0)
86 return self._ready
88 return self._ready
87
89
88 def wait(self, timeout=-1):
90 def wait(self, timeout=-1):
89 """Wait until the result is available or until `timeout` seconds pass.
91 """Wait until the result is available or until `timeout` seconds pass.
90 """
92 """
91 if self._ready:
93 if self._ready:
92 return
94 return
93 self._ready = self._client.barrier(self.msg_ids, timeout)
95 self._ready = self._client.barrier(self.msg_ids, timeout)
94 if self._ready:
96 if self._ready:
95 try:
97 try:
96 results = map(self._client.results.get, self.msg_ids)
98 results = map(self._client.results.get, self.msg_ids)
97 self._result = results
99 self._result = results
98 if self._single_result:
100 if self._single_result:
99 r = results[0]
101 r = results[0]
100 if isinstance(r, Exception):
102 if isinstance(r, Exception):
101 raise r
103 raise r
102 else:
104 else:
103 results = error.collect_exceptions(results, self._fname)
105 results = error.collect_exceptions(results, self._fname)
104 self._result = self._reconstruct_result(results)
106 self._result = self._reconstruct_result(results)
105 except Exception, e:
107 except Exception, e:
106 self._exception = e
108 self._exception = e
107 self._success = False
109 self._success = False
108 else:
110 else:
109 self._success = True
111 self._success = True
110 finally:
112 finally:
111 self._metadata = map(self._client.metadata.get, self.msg_ids)
113 self._metadata = map(self._client.metadata.get, self.msg_ids)
112
114
113
115
114 def successful(self):
116 def successful(self):
115 """Return whether the call completed without raising an exception.
117 """Return whether the call completed without raising an exception.
116
118
117 Will raise ``AssertionError`` if the result is not ready.
119 Will raise ``AssertionError`` if the result is not ready.
118 """
120 """
119 assert self._ready
121 assert self._ready
120 return self._success
122 return self._success
121
123
122 #----------------------------------------------------------------
124 #----------------------------------------------------------------
123 # Extra methods not in mp.pool.AsyncResult
125 # Extra methods not in mp.pool.AsyncResult
124 #----------------------------------------------------------------
126 #----------------------------------------------------------------
125
127
126 def get_dict(self, timeout=-1):
128 def get_dict(self, timeout=-1):
127 """Get the results as a dict, keyed by engine_id."""
129 """Get the results as a dict, keyed by engine_id."""
128 results = self.get(timeout)
130 results = self.get(timeout)
129 engine_ids = [ md['engine_id'] for md in self._metadata ]
131 engine_ids = [ md['engine_id'] for md in self._metadata ]
130 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
132 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
131 maxcount = bycount.count(bycount[-1])
133 maxcount = bycount.count(bycount[-1])
132 if maxcount > 1:
134 if maxcount > 1:
133 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
135 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
134 maxcount, bycount[-1]))
136 maxcount, bycount[-1]))
135
137
136 return dict(zip(engine_ids,results))
138 return dict(zip(engine_ids,results))
137
139
138 @property
140 @property
139 @check_ready
141 @check_ready
140 def result(self):
142 def result(self):
141 """result property."""
143 """result property."""
142 return self._result
144 return self._result
143
145
144 # abbreviated alias:
146 # abbreviated alias:
145 r = result
147 r = result
146
148
147 @property
149 @property
148 @check_ready
150 @check_ready
149 def metadata(self):
151 def metadata(self):
150 """metadata property."""
152 """metadata property."""
151 if self._single_result:
153 if self._single_result:
152 return self._metadata[0]
154 return self._metadata[0]
153 else:
155 else:
154 return self._metadata
156 return self._metadata
155
157
156 @property
158 @property
157 def result_dict(self):
159 def result_dict(self):
158 """result property as a dict."""
160 """result property as a dict."""
159 return self.get_dict(0)
161 return self.get_dict(0)
160
162
161 def __dict__(self):
163 def __dict__(self):
162 return self.get_dict(0)
164 return self.get_dict(0)
163
165
164 #-------------------------------------
166 #-------------------------------------
165 # dict-access
167 # dict-access
166 #-------------------------------------
168 #-------------------------------------
167
169
168 @check_ready
170 @check_ready
169 def __getitem__(self, key):
171 def __getitem__(self, key):
170 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
172 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
171 """
173 """
172 if isinstance(key, int):
174 if isinstance(key, int):
173 return error.collect_exceptions([self._result[key]], self._fname)[0]
175 return error.collect_exceptions([self._result[key]], self._fname)[0]
174 elif isinstance(key, slice):
176 elif isinstance(key, slice):
175 return error.collect_exceptions(self._result[key], self._fname)
177 return error.collect_exceptions(self._result[key], self._fname)
176 elif isinstance(key, basestring):
178 elif isinstance(key, basestring):
177 values = [ md[key] for md in self._metadata ]
179 values = [ md[key] for md in self._metadata ]
178 if self._single_result:
180 if self._single_result:
179 return values[0]
181 return values[0]
180 else:
182 else:
181 return values
183 return values
182 else:
184 else:
183 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
185 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
184
186
185 @check_ready
187 @check_ready
186 def __getattr__(self, key):
188 def __getattr__(self, key):
187 """getattr maps to getitem for convenient access to metadata."""
189 """getattr maps to getitem for convenient access to metadata."""
188 if key not in self._metadata[0].keys():
190 if key not in self._metadata[0].keys():
189 raise AttributeError("%r object has no attribute %r"%(
191 raise AttributeError("%r object has no attribute %r"%(
190 self.__class__.__name__, key))
192 self.__class__.__name__, key))
191 return self.__getitem__(key)
193 return self.__getitem__(key)
194
195 # asynchronous iterator:
196 def __iter__(self):
197 if self._single_result:
198 raise TypeError("AsyncResults with a single result are not iterable.")
199 try:
200 rlist = self.get(0)
201 except error.TimeoutError:
202 # wait for each result individually
203 for msg_id in self.msg_ids:
204 ar = AsyncResult(self._client, msg_id, self._fname)
205 yield ar.get()
206 else:
207 # already done
208 for r in rlist:
209 yield r
210
192
211
193
212
194 class AsyncMapResult(AsyncResult):
213 class AsyncMapResult(AsyncResult):
195 """Class for representing results of non-blocking gathers.
214 """Class for representing results of non-blocking gathers.
196
215
197 This will properly reconstruct the gather.
216 This will properly reconstruct the gather.
198 """
217 """
199
218
200 def __init__(self, client, msg_ids, mapObject, fname=''):
219 def __init__(self, client, msg_ids, mapObject, fname=''):
201 AsyncResult.__init__(self, client, msg_ids, fname=fname)
220 AsyncResult.__init__(self, client, msg_ids, fname=fname)
202 self._mapObject = mapObject
221 self._mapObject = mapObject
203 self._single_result = False
222 self._single_result = False
204
223
205 def _reconstruct_result(self, res):
224 def _reconstruct_result(self, res):
206 """Perform the gather on the actual results."""
225 """Perform the gather on the actual results."""
207 return self._mapObject.joinPartitions(res)
226 return self._mapObject.joinPartitions(res)
208
227
209 # asynchronous iterator:
228 # asynchronous iterator:
210 def __iter__(self):
229 def __iter__(self):
211 try:
230 try:
212 rlist = self.get(0)
231 rlist = self.get(0)
213 except error.TimeoutError:
232 except error.TimeoutError:
214 # wait for each result individually
233 # wait for each result individually
215 for msg_id in self.msg_ids:
234 for msg_id in self.msg_ids:
216 ar = AsyncResult(self._client, msg_id, self._fname)
235 ar = AsyncResult(self._client, msg_id, self._fname)
217 rlist = ar.get()
236 rlist = ar.get()
218 try:
237 try:
219 for r in rlist:
238 for r in rlist:
220 yield r
239 yield r
221 except TypeError:
240 except TypeError:
222 # flattened, not a list
241 # flattened, not a list
223 # this could get broken by flattened data that returns iterables
242 # this could get broken by flattened data that returns iterables
224 # but most calls to map do not expose the `flatten` argument
243 # but most calls to map do not expose the `flatten` argument
225 yield rlist
244 yield rlist
226 else:
245 else:
227 # already done
246 # already done
228 for r in rlist:
247 for r in rlist:
229 yield r
248 yield r
249
250
251 class AsyncHubResult(AsyncResult):
252 """Class to wrap pending results that must be requested from the Hub"""
230
253
254 def wait(self, timeout=-1):
255 """wait for result to complete."""
256 start = time.time()
257 if self._ready:
258 return
259 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
260 local_ready = self._client.barrier(local_ids, timeout)
261 if local_ready:
262 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
263 if not remote_ids:
264 self._ready = True
265 else:
266 rdict = self._client.result_status(remote_ids, status_only=False)
267 pending = rdict['pending']
268 while pending and time.time() < start+timeout:
269 rdict = self._client.result_status(remote_ids, status_only=False)
270 pending = rdict['pending']
271 if pending:
272 time.sleep(0.1)
273 if not pending:
274 self._ready = True
275 if self._ready:
276 try:
277 results = map(self._client.results.get, self.msg_ids)
278 self._result = results
279 if self._single_result:
280 r = results[0]
281 if isinstance(r, Exception):
282 raise r
283 else:
284 results = error.collect_exceptions(results, self._fname)
285 self._result = self._reconstruct_result(results)
286 except Exception, e:
287 self._exception = e
288 self._success = False
289 else:
290 self._success = True
291 finally:
292 self._metadata = map(self._client.metadata.get, self.msg_ids)
231
293
232 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
294 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] No newline at end of file
@@ -1,1378 +1,1497 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 Dict, List, Bool, Str, Set)
28 Dict, List, Bool, Str, Set)
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
31
31
32 import error
32 import error
33 import map as Map
33 import map as Map
34 import streamsession as ss
34 import streamsession as ss
35 from asyncresult import AsyncResult, AsyncMapResult
35 from asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
36 from clusterdir import ClusterDir, ClusterDirError
36 from clusterdir import ClusterDir, ClusterDirError
37 from dependency import Dependency, depend, require, dependent
37 from dependency import Dependency, depend, require, dependent
38 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
38 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
39 from util import ReverseDict, disambiguate_url, validate_url
39 from util import ReverseDict, disambiguate_url, validate_url
40 from view import DirectView, LoadBalancedView
40 from view import DirectView, LoadBalancedView
41
41
42 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
43 # helpers for implementing old MEC API via client.apply
43 # helpers for implementing old MEC API via client.apply
44 #--------------------------------------------------------------------------
44 #--------------------------------------------------------------------------
45
45
46 def _push(ns):
46 def _push(ns):
47 """helper method for implementing `client.push` via `client.apply`"""
47 """helper method for implementing `client.push` via `client.apply`"""
48 globals().update(ns)
48 globals().update(ns)
49
49
50 def _pull(keys):
50 def _pull(keys):
51 """helper method for implementing `client.pull` via `client.apply`"""
51 """helper method for implementing `client.pull` via `client.apply`"""
52 g = globals()
52 g = globals()
53 if isinstance(keys, (list,tuple, set)):
53 if isinstance(keys, (list,tuple, set)):
54 for key in keys:
54 for key in keys:
55 if not g.has_key(key):
55 if not g.has_key(key):
56 raise NameError("name '%s' is not defined"%key)
56 raise NameError("name '%s' is not defined"%key)
57 return map(g.get, keys)
57 return map(g.get, keys)
58 else:
58 else:
59 if not g.has_key(keys):
59 if not g.has_key(keys):
60 raise NameError("name '%s' is not defined"%keys)
60 raise NameError("name '%s' is not defined"%keys)
61 return g.get(keys)
61 return g.get(keys)
62
62
63 def _clear():
63 def _clear():
64 """helper method for implementing `client.clear` via `client.apply`"""
64 """helper method for implementing `client.clear` via `client.apply`"""
65 globals().clear()
65 globals().clear()
66
66
67 def _execute(code):
67 def _execute(code):
68 """helper method for implementing `client.execute` via `client.apply`"""
68 """helper method for implementing `client.execute` via `client.apply`"""
69 exec code in globals()
69 exec code in globals()
70
70
71
71
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73 # Decorators for Client methods
73 # Decorators for Client methods
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75
75
76 @decorator
76 @decorator
77 def spinfirst(f, self, *args, **kwargs):
77 def spinfirst(f, self, *args, **kwargs):
78 """Call spin() to sync state prior to calling the method."""
78 """Call spin() to sync state prior to calling the method."""
79 self.spin()
79 self.spin()
80 return f(self, *args, **kwargs)
80 return f(self, *args, **kwargs)
81
81
82 @decorator
82 @decorator
83 def defaultblock(f, self, *args, **kwargs):
83 def defaultblock(f, self, *args, **kwargs):
84 """Default to self.block; preserve self.block."""
84 """Default to self.block; preserve self.block."""
85 block = kwargs.get('block',None)
85 block = kwargs.get('block',None)
86 block = self.block if block is None else block
86 block = self.block if block is None else block
87 saveblock = self.block
87 saveblock = self.block
88 self.block = block
88 self.block = block
89 try:
89 try:
90 ret = f(self, *args, **kwargs)
90 ret = f(self, *args, **kwargs)
91 finally:
91 finally:
92 self.block = saveblock
92 self.block = saveblock
93 return ret
93 return ret
94
94
95
95
96 #--------------------------------------------------------------------------
96 #--------------------------------------------------------------------------
97 # Classes
97 # Classes
98 #--------------------------------------------------------------------------
98 #--------------------------------------------------------------------------
99
99
100 class Metadata(dict):
100 class Metadata(dict):
101 """Subclass of dict for initializing metadata values.
101 """Subclass of dict for initializing metadata values.
102
102
103 Attribute access works on keys.
103 Attribute access works on keys.
104
104
105 These objects have a strict set of keys - errors will raise if you try
105 These objects have a strict set of keys - errors will raise if you try
106 to add new keys.
106 to add new keys.
107 """
107 """
108 def __init__(self, *args, **kwargs):
108 def __init__(self, *args, **kwargs):
109 dict.__init__(self)
109 dict.__init__(self)
110 md = {'msg_id' : None,
110 md = {'msg_id' : None,
111 'submitted' : None,
111 'submitted' : None,
112 'started' : None,
112 'started' : None,
113 'completed' : None,
113 'completed' : None,
114 'received' : None,
114 'received' : None,
115 'engine_uuid' : None,
115 'engine_uuid' : None,
116 'engine_id' : None,
116 'engine_id' : None,
117 'follow' : None,
117 'follow' : None,
118 'after' : None,
118 'after' : None,
119 'status' : None,
119 'status' : None,
120
120
121 'pyin' : None,
121 'pyin' : None,
122 'pyout' : None,
122 'pyout' : None,
123 'pyerr' : None,
123 'pyerr' : None,
124 'stdout' : '',
124 'stdout' : '',
125 'stderr' : '',
125 'stderr' : '',
126 }
126 }
127 self.update(md)
127 self.update(md)
128 self.update(dict(*args, **kwargs))
128 self.update(dict(*args, **kwargs))
129
129
130 def __getattr__(self, key):
130 def __getattr__(self, key):
131 """getattr aliased to getitem"""
131 """getattr aliased to getitem"""
132 if key in self.iterkeys():
132 if key in self.iterkeys():
133 return self[key]
133 return self[key]
134 else:
134 else:
135 raise AttributeError(key)
135 raise AttributeError(key)
136
136
137 def __setattr__(self, key, value):
137 def __setattr__(self, key, value):
138 """setattr aliased to setitem, with strict"""
138 """setattr aliased to setitem, with strict"""
139 if key in self.iterkeys():
139 if key in self.iterkeys():
140 self[key] = value
140 self[key] = value
141 else:
141 else:
142 raise AttributeError(key)
142 raise AttributeError(key)
143
143
144 def __setitem__(self, key, value):
144 def __setitem__(self, key, value):
145 """strict static key enforcement"""
145 """strict static key enforcement"""
146 if key in self.iterkeys():
146 if key in self.iterkeys():
147 dict.__setitem__(self, key, value)
147 dict.__setitem__(self, key, value)
148 else:
148 else:
149 raise KeyError(key)
149 raise KeyError(key)
150
150
151
151
152 class Client(HasTraits):
152 class Client(HasTraits):
153 """A semi-synchronous client to the IPython ZMQ controller
153 """A semi-synchronous client to the IPython ZMQ controller
154
154
155 Parameters
155 Parameters
156 ----------
156 ----------
157
157
158 url_or_file : bytes; zmq url or path to ipcontroller-client.json
158 url_or_file : bytes; zmq url or path to ipcontroller-client.json
159 Connection information for the Hub's registration. If a json connector
159 Connection information for the Hub's registration. If a json connector
160 file is given, then likely no further configuration is necessary.
160 file is given, then likely no further configuration is necessary.
161 [Default: use profile]
161 [Default: use profile]
162 profile : bytes
162 profile : bytes
163 The name of the Cluster profile to be used to find connector information.
163 The name of the Cluster profile to be used to find connector information.
164 [Default: 'default']
164 [Default: 'default']
165 context : zmq.Context
165 context : zmq.Context
166 Pass an existing zmq.Context instance, otherwise the client will create its own.
166 Pass an existing zmq.Context instance, otherwise the client will create its own.
167 username : bytes
167 username : bytes
168 set username to be passed to the Session object
168 set username to be passed to the Session object
169 debug : bool
169 debug : bool
170 flag for lots of message printing for debug purposes
170 flag for lots of message printing for debug purposes
171
171
172 #-------------- ssh related args ----------------
172 #-------------- ssh related args ----------------
173 # These are args for configuring the ssh tunnel to be used
173 # These are args for configuring the ssh tunnel to be used
174 # credentials are used to forward connections over ssh to the Controller
174 # credentials are used to forward connections over ssh to the Controller
175 # Note that the ip given in `addr` needs to be relative to sshserver
175 # Note that the ip given in `addr` needs to be relative to sshserver
176 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
176 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
177 # and set sshserver as the same machine the Controller is on. However,
177 # and set sshserver as the same machine the Controller is on. However,
178 # the only requirement is that sshserver is able to see the Controller
178 # the only requirement is that sshserver is able to see the Controller
179 # (i.e. is within the same trusted network).
179 # (i.e. is within the same trusted network).
180
180
181 sshserver : str
181 sshserver : str
182 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
182 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
183 If keyfile or password is specified, and this is not, it will default to
183 If keyfile or password is specified, and this is not, it will default to
184 the ip given in addr.
184 the ip given in addr.
185 sshkey : str; path to public ssh key file
185 sshkey : str; path to public ssh key file
186 This specifies a key to be used in ssh login, default None.
186 This specifies a key to be used in ssh login, default None.
187 Regular default ssh keys will be used without specifying this argument.
187 Regular default ssh keys will be used without specifying this argument.
188 password : str
188 password : str
189 Your ssh password to sshserver. Note that if this is left None,
189 Your ssh password to sshserver. Note that if this is left None,
190 you will be prompted for it if passwordless key based login is unavailable.
190 you will be prompted for it if passwordless key based login is unavailable.
191 paramiko : bool
191 paramiko : bool
192 flag for whether to use paramiko instead of shell ssh for tunneling.
192 flag for whether to use paramiko instead of shell ssh for tunneling.
193 [default: True on win32, False else]
193 [default: True on win32, False else]
194
194
195 #------- exec authentication args -------
195 #------- exec authentication args -------
196 # If even localhost is untrusted, you can have some protection against
196 # If even localhost is untrusted, you can have some protection against
197 # unauthorized execution by using a key. Messages are still sent
197 # unauthorized execution by using a key. Messages are still sent
198 # as cleartext, so if someone can snoop your loopback traffic this will
198 # as cleartext, so if someone can snoop your loopback traffic this will
199 # not help against malicious attacks.
199 # not help against malicious attacks.
200
200
201 exec_key : str
201 exec_key : str
202 an authentication key or file containing a key
202 an authentication key or file containing a key
203 default: None
203 default: None
204
204
205
205
206 Attributes
206 Attributes
207 ----------
207 ----------
208
208
209 ids : set of int engine IDs
209 ids : set of int engine IDs
210 requesting the ids attribute always synchronizes
210 requesting the ids attribute always synchronizes
211 the registration state. To request ids without synchronization,
211 the registration state. To request ids without synchronization,
212 use semi-private _ids attributes.
212 use semi-private _ids attributes.
213
213
214 history : list of msg_ids
214 history : list of msg_ids
215 a list of msg_ids, keeping track of all the execution
215 a list of msg_ids, keeping track of all the execution
216 messages you have submitted in order.
216 messages you have submitted in order.
217
217
218 outstanding : set of msg_ids
218 outstanding : set of msg_ids
219 a set of msg_ids that have been submitted, but whose
219 a set of msg_ids that have been submitted, but whose
220 results have not yet been received.
220 results have not yet been received.
221
221
222 results : dict
222 results : dict
223 a dict of all our results, keyed by msg_id
223 a dict of all our results, keyed by msg_id
224
224
225 block : bool
225 block : bool
226 determines default behavior when block not specified
226 determines default behavior when block not specified
227 in execution methods
227 in execution methods
228
228
229 Methods
229 Methods
230 -------
230 -------
231
231
232 spin
232 spin
233 flushes incoming results and registration state changes
233 flushes incoming results and registration state changes
234 control methods spin, and requesting `ids` also ensures up to date
234 control methods spin, and requesting `ids` also ensures up to date
235
235
236 barrier
236 barrier
237 wait on one or more msg_ids
237 wait on one or more msg_ids
238
238
239 execution methods
239 execution methods
240 apply
240 apply
241 legacy: execute, run
241 legacy: execute, run
242
242
243 query methods
243 query methods
244 queue_status, get_result, purge
244 queue_status, get_result, purge
245
245
246 control methods
246 control methods
247 abort, shutdown
247 abort, shutdown
248
248
249 """
249 """
250
250
251
251
252 block = Bool(False)
252 block = Bool(False)
253 outstanding=Set()
253 outstanding=Set()
254 results = Dict()
254 results = Dict()
255 metadata = Dict()
255 metadata = Dict()
256 history = List()
256 history = List()
257 debug = Bool(False)
257 debug = Bool(False)
258 profile=CUnicode('default')
258 profile=CUnicode('default')
259
259
260 _ids = List()
260 _ids = List()
261 _connected=Bool(False)
261 _connected=Bool(False)
262 _ssh=Bool(False)
262 _ssh=Bool(False)
263 _context = Instance('zmq.Context')
263 _context = Instance('zmq.Context')
264 _config = Dict()
264 _config = Dict()
265 _engines=Instance(ReverseDict, (), {})
265 _engines=Instance(ReverseDict, (), {})
266 _registration_socket=Instance('zmq.Socket')
266 _registration_socket=Instance('zmq.Socket')
267 _query_socket=Instance('zmq.Socket')
267 _query_socket=Instance('zmq.Socket')
268 _control_socket=Instance('zmq.Socket')
268 _control_socket=Instance('zmq.Socket')
269 _iopub_socket=Instance('zmq.Socket')
269 _iopub_socket=Instance('zmq.Socket')
270 _notification_socket=Instance('zmq.Socket')
270 _notification_socket=Instance('zmq.Socket')
271 _mux_socket=Instance('zmq.Socket')
271 _mux_socket=Instance('zmq.Socket')
272 _task_socket=Instance('zmq.Socket')
272 _task_socket=Instance('zmq.Socket')
273 _task_scheme=Str()
273 _task_scheme=Str()
274 _balanced_views=Dict()
274 _balanced_views=Dict()
275 _direct_views=Dict()
275 _direct_views=Dict()
276 _closed = False
276 _closed = False
277
277
278 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
278 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
279 context=None, username=None, debug=False, exec_key=None,
279 context=None, username=None, debug=False, exec_key=None,
280 sshserver=None, sshkey=None, password=None, paramiko=None,
280 sshserver=None, sshkey=None, password=None, paramiko=None,
281 ):
281 ):
282 super(Client, self).__init__(debug=debug, profile=profile)
282 super(Client, self).__init__(debug=debug, profile=profile)
283 if context is None:
283 if context is None:
284 context = zmq.Context()
284 context = zmq.Context()
285 self._context = context
285 self._context = context
286
286
287
287
288 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
288 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
289 if self._cd is not None:
289 if self._cd is not None:
290 if url_or_file is None:
290 if url_or_file is None:
291 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
291 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
292 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
292 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
293 " Please specify at least one of url_or_file or profile."
293 " Please specify at least one of url_or_file or profile."
294
294
295 try:
295 try:
296 validate_url(url_or_file)
296 validate_url(url_or_file)
297 except AssertionError:
297 except AssertionError:
298 if not os.path.exists(url_or_file):
298 if not os.path.exists(url_or_file):
299 if self._cd:
299 if self._cd:
300 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
300 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
301 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
301 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
302 with open(url_or_file) as f:
302 with open(url_or_file) as f:
303 cfg = json.loads(f.read())
303 cfg = json.loads(f.read())
304 else:
304 else:
305 cfg = {'url':url_or_file}
305 cfg = {'url':url_or_file}
306
306
307 # sync defaults from args, json:
307 # sync defaults from args, json:
308 if sshserver:
308 if sshserver:
309 cfg['ssh'] = sshserver
309 cfg['ssh'] = sshserver
310 if exec_key:
310 if exec_key:
311 cfg['exec_key'] = exec_key
311 cfg['exec_key'] = exec_key
312 exec_key = cfg['exec_key']
312 exec_key = cfg['exec_key']
313 sshserver=cfg['ssh']
313 sshserver=cfg['ssh']
314 url = cfg['url']
314 url = cfg['url']
315 location = cfg.setdefault('location', None)
315 location = cfg.setdefault('location', None)
316 cfg['url'] = disambiguate_url(cfg['url'], location)
316 cfg['url'] = disambiguate_url(cfg['url'], location)
317 url = cfg['url']
317 url = cfg['url']
318
318
319 self._config = cfg
319 self._config = cfg
320
320
321 self._ssh = bool(sshserver or sshkey or password)
321 self._ssh = bool(sshserver or sshkey or password)
322 if self._ssh and sshserver is None:
322 if self._ssh and sshserver is None:
323 # default to ssh via localhost
323 # default to ssh via localhost
324 sshserver = url.split('://')[1].split(':')[0]
324 sshserver = url.split('://')[1].split(':')[0]
325 if self._ssh and password is None:
325 if self._ssh and password is None:
326 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
326 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
327 password=False
327 password=False
328 else:
328 else:
329 password = getpass("SSH Password for %s: "%sshserver)
329 password = getpass("SSH Password for %s: "%sshserver)
330 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
330 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
331 if exec_key is not None and os.path.isfile(exec_key):
331 if exec_key is not None and os.path.isfile(exec_key):
332 arg = 'keyfile'
332 arg = 'keyfile'
333 else:
333 else:
334 arg = 'key'
334 arg = 'key'
335 key_arg = {arg:exec_key}
335 key_arg = {arg:exec_key}
336 if username is None:
336 if username is None:
337 self.session = ss.StreamSession(**key_arg)
337 self.session = ss.StreamSession(**key_arg)
338 else:
338 else:
339 self.session = ss.StreamSession(username, **key_arg)
339 self.session = ss.StreamSession(username, **key_arg)
340 self._registration_socket = self._context.socket(zmq.XREQ)
340 self._registration_socket = self._context.socket(zmq.XREQ)
341 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
341 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
342 if self._ssh:
342 if self._ssh:
343 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
343 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
344 else:
344 else:
345 self._registration_socket.connect(url)
345 self._registration_socket.connect(url)
346
346
347 self.session.debug = self.debug
347 self.session.debug = self.debug
348
348
349 self._notification_handlers = {'registration_notification' : self._register_engine,
349 self._notification_handlers = {'registration_notification' : self._register_engine,
350 'unregistration_notification' : self._unregister_engine,
350 'unregistration_notification' : self._unregister_engine,
351 }
351 }
352 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
352 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
353 'apply_reply' : self._handle_apply_reply}
353 'apply_reply' : self._handle_apply_reply}
354 self._connect(sshserver, ssh_kwargs)
354 self._connect(sshserver, ssh_kwargs)
355
355
356
356
357 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
357 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
358 if ipython_dir is None:
358 if ipython_dir is None:
359 ipython_dir = get_ipython_dir()
359 ipython_dir = get_ipython_dir()
360 if cluster_dir is not None:
360 if cluster_dir is not None:
361 try:
361 try:
362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
362 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
363 except ClusterDirError:
363 except ClusterDirError:
364 pass
364 pass
365 elif profile is not None:
365 elif profile is not None:
366 try:
366 try:
367 self._cd = ClusterDir.find_cluster_dir_by_profile(
367 self._cd = ClusterDir.find_cluster_dir_by_profile(
368 ipython_dir, profile)
368 ipython_dir, profile)
369 except ClusterDirError:
369 except ClusterDirError:
370 pass
370 pass
371 else:
371 else:
372 self._cd = None
372 self._cd = None
373
373
374 @property
374 @property
375 def ids(self):
375 def ids(self):
376 """Always up-to-date ids property."""
376 """Always up-to-date ids property."""
377 self._flush_notifications()
377 self._flush_notifications()
378 return self._ids
378 return self._ids
379
379
380 def close(self):
380 def close(self):
381 if self._closed:
381 if self._closed:
382 return
382 return
383 snames = filter(lambda n: n.endswith('socket'), dir(self))
383 snames = filter(lambda n: n.endswith('socket'), dir(self))
384 for socket in map(lambda name: getattr(self, name), snames):
384 for socket in map(lambda name: getattr(self, name), snames):
385 socket.close()
385 socket.close()
386 self._closed = True
386 self._closed = True
387
387
388 def _update_engines(self, engines):
388 def _update_engines(self, engines):
389 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
389 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
390 for k,v in engines.iteritems():
390 for k,v in engines.iteritems():
391 eid = int(k)
391 eid = int(k)
392 self._engines[eid] = bytes(v) # force not unicode
392 self._engines[eid] = bytes(v) # force not unicode
393 self._ids.append(eid)
393 self._ids.append(eid)
394 self._ids = sorted(self._ids)
394 self._ids = sorted(self._ids)
395 if sorted(self._engines.keys()) != range(len(self._engines)) and \
395 if sorted(self._engines.keys()) != range(len(self._engines)) and \
396 self._task_scheme == 'pure' and self._task_socket:
396 self._task_scheme == 'pure' and self._task_socket:
397 self._stop_scheduling_tasks()
397 self._stop_scheduling_tasks()
398
398
399 def _stop_scheduling_tasks(self):
399 def _stop_scheduling_tasks(self):
400 """Stop scheduling tasks because an engine has been unregistered
400 """Stop scheduling tasks because an engine has been unregistered
401 from a pure ZMQ scheduler.
401 from a pure ZMQ scheduler.
402 """
402 """
403
403
404 self._task_socket.close()
404 self._task_socket.close()
405 self._task_socket = None
405 self._task_socket = None
406 msg = "An engine has been unregistered, and we are using pure " +\
406 msg = "An engine has been unregistered, and we are using pure " +\
407 "ZMQ task scheduling. Task farming will be disabled."
407 "ZMQ task scheduling. Task farming will be disabled."
408 if self.outstanding:
408 if self.outstanding:
409 msg += " If you were running tasks when this happened, " +\
409 msg += " If you were running tasks when this happened, " +\
410 "some `outstanding` msg_ids may never resolve."
410 "some `outstanding` msg_ids may never resolve."
411 warnings.warn(msg, RuntimeWarning)
411 warnings.warn(msg, RuntimeWarning)
412
412
413 def _build_targets(self, targets):
413 def _build_targets(self, targets):
414 """Turn valid target IDs or 'all' into two lists:
414 """Turn valid target IDs or 'all' into two lists:
415 (int_ids, uuids).
415 (int_ids, uuids).
416 """
416 """
417 if targets is None:
417 if targets is None:
418 targets = self._ids
418 targets = self._ids
419 elif isinstance(targets, str):
419 elif isinstance(targets, str):
420 if targets.lower() == 'all':
420 if targets.lower() == 'all':
421 targets = self._ids
421 targets = self._ids
422 else:
422 else:
423 raise TypeError("%r not valid str target, must be 'all'"%(targets))
423 raise TypeError("%r not valid str target, must be 'all'"%(targets))
424 elif isinstance(targets, int):
424 elif isinstance(targets, int):
425 targets = [targets]
425 targets = [targets]
426 return [self._engines[t] for t in targets], list(targets)
426 return [self._engines[t] for t in targets], list(targets)
427
427
428 def _connect(self, sshserver, ssh_kwargs):
428 def _connect(self, sshserver, ssh_kwargs):
429 """setup all our socket connections to the controller. This is called from
429 """setup all our socket connections to the controller. This is called from
430 __init__."""
430 __init__."""
431
431
432 # Maybe allow reconnecting?
432 # Maybe allow reconnecting?
433 if self._connected:
433 if self._connected:
434 return
434 return
435 self._connected=True
435 self._connected=True
436
436
437 def connect_socket(s, url):
437 def connect_socket(s, url):
438 url = disambiguate_url(url, self._config['location'])
438 url = disambiguate_url(url, self._config['location'])
439 if self._ssh:
439 if self._ssh:
440 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
440 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
441 else:
441 else:
442 return s.connect(url)
442 return s.connect(url)
443
443
444 self.session.send(self._registration_socket, 'connection_request')
444 self.session.send(self._registration_socket, 'connection_request')
445 idents,msg = self.session.recv(self._registration_socket,mode=0)
445 idents,msg = self.session.recv(self._registration_socket,mode=0)
446 if self.debug:
446 if self.debug:
447 pprint(msg)
447 pprint(msg)
448 msg = ss.Message(msg)
448 msg = ss.Message(msg)
449 content = msg.content
449 content = msg.content
450 self._config['registration'] = dict(content)
450 self._config['registration'] = dict(content)
451 if content.status == 'ok':
451 if content.status == 'ok':
452 if content.mux:
452 if content.mux:
453 self._mux_socket = self._context.socket(zmq.PAIR)
453 self._mux_socket = self._context.socket(zmq.PAIR)
454 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
455 connect_socket(self._mux_socket, content.mux)
455 connect_socket(self._mux_socket, content.mux)
456 if content.task:
456 if content.task:
457 self._task_scheme, task_addr = content.task
457 self._task_scheme, task_addr = content.task
458 self._task_socket = self._context.socket(zmq.PAIR)
458 self._task_socket = self._context.socket(zmq.PAIR)
459 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
459 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
460 connect_socket(self._task_socket, task_addr)
460 connect_socket(self._task_socket, task_addr)
461 if content.notification:
461 if content.notification:
462 self._notification_socket = self._context.socket(zmq.SUB)
462 self._notification_socket = self._context.socket(zmq.SUB)
463 connect_socket(self._notification_socket, content.notification)
463 connect_socket(self._notification_socket, content.notification)
464 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
464 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
465 if content.query:
465 if content.query:
466 self._query_socket = self._context.socket(zmq.PAIR)
466 self._query_socket = self._context.socket(zmq.PAIR)
467 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
467 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
468 connect_socket(self._query_socket, content.query)
468 connect_socket(self._query_socket, content.query)
469 if content.control:
469 if content.control:
470 self._control_socket = self._context.socket(zmq.PAIR)
470 self._control_socket = self._context.socket(zmq.PAIR)
471 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
471 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
472 connect_socket(self._control_socket, content.control)
472 connect_socket(self._control_socket, content.control)
473 if content.iopub:
473 if content.iopub:
474 self._iopub_socket = self._context.socket(zmq.SUB)
474 self._iopub_socket = self._context.socket(zmq.SUB)
475 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
475 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
476 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
476 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
477 connect_socket(self._iopub_socket, content.iopub)
477 connect_socket(self._iopub_socket, content.iopub)
478 self._update_engines(dict(content.engines))
478 self._update_engines(dict(content.engines))
479
479
480 else:
480 else:
481 self._connected = False
481 self._connected = False
482 raise Exception("Failed to connect!")
482 raise Exception("Failed to connect!")
483
483
484 #--------------------------------------------------------------------------
484 #--------------------------------------------------------------------------
485 # handlers and callbacks for incoming messages
485 # handlers and callbacks for incoming messages
486 #--------------------------------------------------------------------------
486 #--------------------------------------------------------------------------
487
487
488 def _unwrap_exception(self, content):
489 """unwrap exception, and remap engineid to int."""
490 e = ss.unwrap_exception(content)
491 if e.engine_info:
492 e_uuid = e.engine_info['engineid']
493 eid = self._engines[e_uuid]
494 e.engine_info['engineid'] = eid
495 return e
496
488 def _register_engine(self, msg):
497 def _register_engine(self, msg):
489 """Register a new engine, and update our connection info."""
498 """Register a new engine, and update our connection info."""
490 content = msg['content']
499 content = msg['content']
491 eid = content['id']
500 eid = content['id']
492 d = {eid : content['queue']}
501 d = {eid : content['queue']}
493 self._update_engines(d)
502 self._update_engines(d)
494
503
495 def _unregister_engine(self, msg):
504 def _unregister_engine(self, msg):
496 """Unregister an engine that has died."""
505 """Unregister an engine that has died."""
497 content = msg['content']
506 content = msg['content']
498 eid = int(content['id'])
507 eid = int(content['id'])
499 if eid in self._ids:
508 if eid in self._ids:
500 self._ids.remove(eid)
509 self._ids.remove(eid)
501 self._engines.pop(eid)
510 self._engines.pop(eid)
502 if self._task_socket and self._task_scheme == 'pure':
511 if self._task_socket and self._task_scheme == 'pure':
503 self._stop_scheduling_tasks()
512 self._stop_scheduling_tasks()
504
513
505 def _extract_metadata(self, header, parent, content):
514 def _extract_metadata(self, header, parent, content):
506 md = {'msg_id' : parent['msg_id'],
515 md = {'msg_id' : parent['msg_id'],
507 'received' : datetime.now(),
516 'received' : datetime.now(),
508 'engine_uuid' : header.get('engine', None),
517 'engine_uuid' : header.get('engine', None),
509 'follow' : parent.get('follow', []),
518 'follow' : parent.get('follow', []),
510 'after' : parent.get('after', []),
519 'after' : parent.get('after', []),
511 'status' : content['status'],
520 'status' : content['status'],
512 }
521 }
513
522
514 if md['engine_uuid'] is not None:
523 if md['engine_uuid'] is not None:
515 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
524 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
516
525
517 if 'date' in parent:
526 if 'date' in parent:
518 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
527 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
519 if 'started' in header:
528 if 'started' in header:
520 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
529 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
521 if 'date' in header:
530 if 'date' in header:
522 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
531 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
523 return md
532 return md
524
533
525 def _handle_execute_reply(self, msg):
534 def _handle_execute_reply(self, msg):
526 """Save the reply to an execute_request into our results.
535 """Save the reply to an execute_request into our results.
527
536
528 execute messages are never actually used. apply is used instead.
537 execute messages are never actually used. apply is used instead.
529 """
538 """
530
539
531 parent = msg['parent_header']
540 parent = msg['parent_header']
532 msg_id = parent['msg_id']
541 msg_id = parent['msg_id']
533 if msg_id not in self.outstanding:
542 if msg_id not in self.outstanding:
534 if msg_id in self.history:
543 if msg_id in self.history:
535 print ("got stale result: %s"%msg_id)
544 print ("got stale result: %s"%msg_id)
536 else:
545 else:
537 print ("got unknown result: %s"%msg_id)
546 print ("got unknown result: %s"%msg_id)
538 else:
547 else:
539 self.outstanding.remove(msg_id)
548 self.outstanding.remove(msg_id)
540 self.results[msg_id] = ss.unwrap_exception(msg['content'])
549 self.results[msg_id] = self._unwrap_exception(msg['content'])
541
550
542 def _handle_apply_reply(self, msg):
551 def _handle_apply_reply(self, msg):
543 """Save the reply to an apply_request into our results."""
552 """Save the reply to an apply_request into our results."""
544 parent = msg['parent_header']
553 parent = msg['parent_header']
545 msg_id = parent['msg_id']
554 msg_id = parent['msg_id']
546 if msg_id not in self.outstanding:
555 if msg_id not in self.outstanding:
547 if msg_id in self.history:
556 if msg_id in self.history:
548 print ("got stale result: %s"%msg_id)
557 print ("got stale result: %s"%msg_id)
549 print self.results[msg_id]
558 print self.results[msg_id]
550 print msg
559 print msg
551 else:
560 else:
552 print ("got unknown result: %s"%msg_id)
561 print ("got unknown result: %s"%msg_id)
553 else:
562 else:
554 self.outstanding.remove(msg_id)
563 self.outstanding.remove(msg_id)
555 content = msg['content']
564 content = msg['content']
556 header = msg['header']
565 header = msg['header']
557
566
558 # construct metadata:
567 # construct metadata:
559 md = self.metadata.setdefault(msg_id, Metadata())
568 md = self.metadata.setdefault(msg_id, Metadata())
560 md.update(self._extract_metadata(header, parent, content))
569 md.update(self._extract_metadata(header, parent, content))
561 self.metadata[msg_id] = md
570 self.metadata[msg_id] = md
562
571
563 # construct result:
572 # construct result:
564 if content['status'] == 'ok':
573 if content['status'] == 'ok':
565 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
574 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
566 elif content['status'] == 'aborted':
575 elif content['status'] == 'aborted':
567 self.results[msg_id] = error.AbortedTask(msg_id)
576 self.results[msg_id] = error.AbortedTask(msg_id)
568 elif content['status'] == 'resubmitted':
577 elif content['status'] == 'resubmitted':
569 # TODO: handle resubmission
578 # TODO: handle resubmission
570 pass
579 pass
571 else:
580 else:
572 e = ss.unwrap_exception(content)
581 self.results[msg_id] = self._unwrap_exception(content)
573 if e.engine_info:
574 e_uuid = e.engine_info['engineid']
575 eid = self._engines[e_uuid]
576 e.engine_info['engineid'] = eid
577 self.results[msg_id] = e
578
582
579 def _flush_notifications(self):
583 def _flush_notifications(self):
580 """Flush notifications of engine registrations waiting
584 """Flush notifications of engine registrations waiting
581 in ZMQ queue."""
585 in ZMQ queue."""
582 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
586 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
583 while msg is not None:
587 while msg is not None:
584 if self.debug:
588 if self.debug:
585 pprint(msg)
589 pprint(msg)
586 msg = msg[-1]
590 msg = msg[-1]
587 msg_type = msg['msg_type']
591 msg_type = msg['msg_type']
588 handler = self._notification_handlers.get(msg_type, None)
592 handler = self._notification_handlers.get(msg_type, None)
589 if handler is None:
593 if handler is None:
590 raise Exception("Unhandled message type: %s"%msg.msg_type)
594 raise Exception("Unhandled message type: %s"%msg.msg_type)
591 else:
595 else:
592 handler(msg)
596 handler(msg)
593 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
597 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
594
598
595 def _flush_results(self, sock):
599 def _flush_results(self, sock):
596 """Flush task or queue results waiting in ZMQ queue."""
600 """Flush task or queue results waiting in ZMQ queue."""
597 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
601 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
598 while msg is not None:
602 while msg is not None:
599 if self.debug:
603 if self.debug:
600 pprint(msg)
604 pprint(msg)
601 msg = msg[-1]
605 msg = msg[-1]
602 msg_type = msg['msg_type']
606 msg_type = msg['msg_type']
603 handler = self._queue_handlers.get(msg_type, None)
607 handler = self._queue_handlers.get(msg_type, None)
604 if handler is None:
608 if handler is None:
605 raise Exception("Unhandled message type: %s"%msg.msg_type)
609 raise Exception("Unhandled message type: %s"%msg.msg_type)
606 else:
610 else:
607 handler(msg)
611 handler(msg)
608 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
612 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
609
613
610 def _flush_control(self, sock):
614 def _flush_control(self, sock):
611 """Flush replies from the control channel waiting
615 """Flush replies from the control channel waiting
612 in the ZMQ queue.
616 in the ZMQ queue.
613
617
614 Currently: ignore them."""
618 Currently: ignore them."""
615 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
616 while msg is not None:
620 while msg is not None:
617 if self.debug:
621 if self.debug:
618 pprint(msg)
622 pprint(msg)
619 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
623 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
620
624
621 def _flush_iopub(self, sock):
625 def _flush_iopub(self, sock):
622 """Flush replies from the iopub channel waiting
626 """Flush replies from the iopub channel waiting
623 in the ZMQ queue.
627 in the ZMQ queue.
624 """
628 """
625 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
629 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
626 while msg is not None:
630 while msg is not None:
627 if self.debug:
631 if self.debug:
628 pprint(msg)
632 pprint(msg)
629 msg = msg[-1]
633 msg = msg[-1]
630 parent = msg['parent_header']
634 parent = msg['parent_header']
631 msg_id = parent['msg_id']
635 msg_id = parent['msg_id']
632 content = msg['content']
636 content = msg['content']
633 header = msg['header']
637 header = msg['header']
634 msg_type = msg['msg_type']
638 msg_type = msg['msg_type']
635
639
636 # init metadata:
640 # init metadata:
637 md = self.metadata.setdefault(msg_id, Metadata())
641 md = self.metadata.setdefault(msg_id, Metadata())
638
642
639 if msg_type == 'stream':
643 if msg_type == 'stream':
640 name = content['name']
644 name = content['name']
641 s = md[name] or ''
645 s = md[name] or ''
642 md[name] = s + content['data']
646 md[name] = s + content['data']
643 elif msg_type == 'pyerr':
647 elif msg_type == 'pyerr':
644 md.update({'pyerr' : ss.unwrap_exception(content)})
648 md.update({'pyerr' : self._unwrap_exception(content)})
645 else:
649 else:
646 md.update({msg_type : content['data']})
650 md.update({msg_type : content['data']})
647
651
648 self.metadata[msg_id] = md
652 self.metadata[msg_id] = md
649
653
650 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
654 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
651
655
652 #--------------------------------------------------------------------------
656 #--------------------------------------------------------------------------
653 # len, getitem
657 # len, getitem
654 #--------------------------------------------------------------------------
658 #--------------------------------------------------------------------------
655
659
656 def __len__(self):
660 def __len__(self):
657 """len(client) returns # of engines."""
661 """len(client) returns # of engines."""
658 return len(self.ids)
662 return len(self.ids)
659
663
660 def __getitem__(self, key):
664 def __getitem__(self, key):
661 """index access returns DirectView multiplexer objects
665 """index access returns DirectView multiplexer objects
662
666
663 Must be int, slice, or list/tuple/xrange of ints"""
667 Must be int, slice, or list/tuple/xrange of ints"""
664 if not isinstance(key, (int, slice, tuple, list, xrange)):
668 if not isinstance(key, (int, slice, tuple, list, xrange)):
665 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
669 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
666 else:
670 else:
667 return self.view(key, balanced=False)
671 return self.view(key, balanced=False)
668
672
669 #--------------------------------------------------------------------------
673 #--------------------------------------------------------------------------
670 # Begin public methods
674 # Begin public methods
671 #--------------------------------------------------------------------------
675 #--------------------------------------------------------------------------
672
676
673 def spin(self):
677 def spin(self):
674 """Flush any registration notifications and execution results
678 """Flush any registration notifications and execution results
675 waiting in the ZMQ queue.
679 waiting in the ZMQ queue.
676 """
680 """
677 if self._notification_socket:
681 if self._notification_socket:
678 self._flush_notifications()
682 self._flush_notifications()
679 if self._mux_socket:
683 if self._mux_socket:
680 self._flush_results(self._mux_socket)
684 self._flush_results(self._mux_socket)
681 if self._task_socket:
685 if self._task_socket:
682 self._flush_results(self._task_socket)
686 self._flush_results(self._task_socket)
683 if self._control_socket:
687 if self._control_socket:
684 self._flush_control(self._control_socket)
688 self._flush_control(self._control_socket)
685 if self._iopub_socket:
689 if self._iopub_socket:
686 self._flush_iopub(self._iopub_socket)
690 self._flush_iopub(self._iopub_socket)
687
691
688 def barrier(self, msg_ids=None, timeout=-1):
692 def barrier(self, jobs=None, timeout=-1):
689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
693 """waits on one or more `jobs`, for up to `timeout` seconds.
690
694
691 Parameters
695 Parameters
692 ----------
696 ----------
693
697
694 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
698 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
695 ints are indices to self.history
699 ints are indices to self.history
696 strs are msg_ids
700 strs are msg_ids
697 default: wait on all outstanding messages
701 default: wait on all outstanding messages
698 timeout : float
702 timeout : float
699 a time in seconds, after which to give up.
703 a time in seconds, after which to give up.
700 default is -1, which means no timeout
704 default is -1, which means no timeout
701
705
702 Returns
706 Returns
703 -------
707 -------
704
708
705 True : when all msg_ids are done
709 True : when all msg_ids are done
706 False : timeout reached, some msg_ids still outstanding
710 False : timeout reached, some msg_ids still outstanding
707 """
711 """
708 tic = time.time()
712 tic = time.time()
709 if msg_ids is None:
713 if jobs is None:
710 theids = self.outstanding
714 theids = self.outstanding
711 else:
715 else:
712 if isinstance(msg_ids, (int, str, AsyncResult)):
716 if isinstance(jobs, (int, str, AsyncResult)):
713 msg_ids = [msg_ids]
717 jobs = [jobs]
714 theids = set()
718 theids = set()
715 for msg_id in msg_ids:
719 for job in jobs:
716 if isinstance(msg_id, int):
720 if isinstance(job, int):
717 msg_id = self.history[msg_id]
721 # index access
718 elif isinstance(msg_id, AsyncResult):
722 job = self.history[job]
719 map(theids.add, msg_id.msg_ids)
723 elif isinstance(job, AsyncResult):
724 map(theids.add, job.msg_ids)
720 continue
725 continue
721 theids.add(msg_id)
726 theids.add(job)
722 if not theids.intersection(self.outstanding):
727 if not theids.intersection(self.outstanding):
723 return True
728 return True
724 self.spin()
729 self.spin()
725 while theids.intersection(self.outstanding):
730 while theids.intersection(self.outstanding):
726 if timeout >= 0 and ( time.time()-tic ) > timeout:
731 if timeout >= 0 and ( time.time()-tic ) > timeout:
727 break
732 break
728 time.sleep(1e-3)
733 time.sleep(1e-3)
729 self.spin()
734 self.spin()
730 return len(theids.intersection(self.outstanding)) == 0
735 return len(theids.intersection(self.outstanding)) == 0
731
736
732 #--------------------------------------------------------------------------
737 #--------------------------------------------------------------------------
733 # Control methods
738 # Control methods
734 #--------------------------------------------------------------------------
739 #--------------------------------------------------------------------------
735
740
736 @spinfirst
741 @spinfirst
737 @defaultblock
742 @defaultblock
738 def clear(self, targets=None, block=None):
743 def clear(self, targets=None, block=None):
739 """Clear the namespace in target(s)."""
744 """Clear the namespace in target(s)."""
740 targets = self._build_targets(targets)[0]
745 targets = self._build_targets(targets)[0]
741 for t in targets:
746 for t in targets:
742 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
747 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
743 error = False
748 error = False
744 if self.block:
749 if self.block:
745 for i in range(len(targets)):
750 for i in range(len(targets)):
746 idents,msg = self.session.recv(self._control_socket,0)
751 idents,msg = self.session.recv(self._control_socket,0)
747 if self.debug:
752 if self.debug:
748 pprint(msg)
753 pprint(msg)
749 if msg['content']['status'] != 'ok':
754 if msg['content']['status'] != 'ok':
750 error = ss.unwrap_exception(msg['content'])
755 error = self._unwrap_exception(msg['content'])
751 if error:
756 if error:
752 return error
757 return error
753
758
754
759
755 @spinfirst
760 @spinfirst
756 @defaultblock
761 @defaultblock
757 def abort(self, msg_ids = None, targets=None, block=None):
762 def abort(self, jobs=None, targets=None, block=None):
758 """Abort the execution queues of target(s)."""
763 """Abort specific jobs from the execution queues of target(s).
764
765 This is a mechanism to prevent jobs that have already been submitted
766 from executing.
767
768 Parameters
769 ----------
770
771 jobs : msg_id, list of msg_ids, or AsyncResult
772 The jobs to be aborted
773
774
775 """
759 targets = self._build_targets(targets)[0]
776 targets = self._build_targets(targets)[0]
760 if isinstance(msg_ids, basestring):
777 msg_ids = []
761 msg_ids = [msg_ids]
778 if isinstance(jobs, (basestring,AsyncResult)):
779 jobs = [jobs]
780 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
781 if bad_ids:
782 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
783 for j in jobs:
784 if isinstance(j, AsyncResult):
785 msg_ids.extend(j.msg_ids)
786 else:
787 msg_ids.append(j)
762 content = dict(msg_ids=msg_ids)
788 content = dict(msg_ids=msg_ids)
763 for t in targets:
789 for t in targets:
764 self.session.send(self._control_socket, 'abort_request',
790 self.session.send(self._control_socket, 'abort_request',
765 content=content, ident=t)
791 content=content, ident=t)
766 error = False
792 error = False
767 if self.block:
793 if self.block:
768 for i in range(len(targets)):
794 for i in range(len(targets)):
769 idents,msg = self.session.recv(self._control_socket,0)
795 idents,msg = self.session.recv(self._control_socket,0)
770 if self.debug:
796 if self.debug:
771 pprint(msg)
797 pprint(msg)
772 if msg['content']['status'] != 'ok':
798 if msg['content']['status'] != 'ok':
773 error = ss.unwrap_exception(msg['content'])
799 error = self._unwrap_exception(msg['content'])
774 if error:
800 if error:
775 return error
801 return error
776
802
777 @spinfirst
803 @spinfirst
778 @defaultblock
804 @defaultblock
779 def shutdown(self, targets=None, restart=False, controller=False, block=None):
805 def shutdown(self, targets=None, restart=False, controller=False, block=None):
780 """Terminates one or more engine processes, optionally including the controller."""
806 """Terminates one or more engine processes, optionally including the controller."""
781 if controller:
807 if controller:
782 targets = 'all'
808 targets = 'all'
783 targets = self._build_targets(targets)[0]
809 targets = self._build_targets(targets)[0]
784 for t in targets:
810 for t in targets:
785 self.session.send(self._control_socket, 'shutdown_request',
811 self.session.send(self._control_socket, 'shutdown_request',
786 content={'restart':restart},ident=t)
812 content={'restart':restart},ident=t)
787 error = False
813 error = False
788 if block or controller:
814 if block or controller:
789 for i in range(len(targets)):
815 for i in range(len(targets)):
790 idents,msg = self.session.recv(self._control_socket,0)
816 idents,msg = self.session.recv(self._control_socket,0)
791 if self.debug:
817 if self.debug:
792 pprint(msg)
818 pprint(msg)
793 if msg['content']['status'] != 'ok':
819 if msg['content']['status'] != 'ok':
794 error = ss.unwrap_exception(msg['content'])
820 error = self._unwrap_exception(msg['content'])
795
821
796 if controller:
822 if controller:
797 time.sleep(0.25)
823 time.sleep(0.25)
798 self.session.send(self._query_socket, 'shutdown_request')
824 self.session.send(self._query_socket, 'shutdown_request')
799 idents,msg = self.session.recv(self._query_socket, 0)
825 idents,msg = self.session.recv(self._query_socket, 0)
800 if self.debug:
826 if self.debug:
801 pprint(msg)
827 pprint(msg)
802 if msg['content']['status'] != 'ok':
828 if msg['content']['status'] != 'ok':
803 error = ss.unwrap_exception(msg['content'])
829 error = self._unwrap_exception(msg['content'])
804
830
805 if error:
831 if error:
806 raise error
832 raise error
807
833
808 #--------------------------------------------------------------------------
834 #--------------------------------------------------------------------------
809 # Execution methods
835 # Execution methods
810 #--------------------------------------------------------------------------
836 #--------------------------------------------------------------------------
811
837
812 @defaultblock
838 @defaultblock
813 def execute(self, code, targets='all', block=None):
839 def execute(self, code, targets='all', block=None):
814 """Executes `code` on `targets` in blocking or nonblocking manner.
840 """Executes `code` on `targets` in blocking or nonblocking manner.
815
841
816 ``execute`` is always `bound` (affects engine namespace)
842 ``execute`` is always `bound` (affects engine namespace)
817
843
818 Parameters
844 Parameters
819 ----------
845 ----------
820
846
821 code : str
847 code : str
822 the code string to be executed
848 the code string to be executed
823 targets : int/str/list of ints/strs
849 targets : int/str/list of ints/strs
824 the engines on which to execute
850 the engines on which to execute
825 default : all
851 default : all
826 block : bool
852 block : bool
827 whether or not to wait until done to return
853 whether or not to wait until done to return
828 default: self.block
854 default: self.block
829 """
855 """
830 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False)
856 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
831 return result
857 if not block:
858 return result
832
859
833 def run(self, filename, targets='all', block=None):
860 def run(self, filename, targets='all', block=None):
834 """Execute contents of `filename` on engine(s).
861 """Execute contents of `filename` on engine(s).
835
862
836 This simply reads the contents of the file and calls `execute`.
863 This simply reads the contents of the file and calls `execute`.
837
864
838 Parameters
865 Parameters
839 ----------
866 ----------
840
867
841 filename : str
868 filename : str
842 The path to the file
869 The path to the file
843 targets : int/str/list of ints/strs
870 targets : int/str/list of ints/strs
844 the engines on which to execute
871 the engines on which to execute
845 default : all
872 default : all
846 block : bool
873 block : bool
847 whether or not to wait until done
874 whether or not to wait until done
848 default: self.block
875 default: self.block
849
876
850 """
877 """
851 with open(filename, 'rb') as f:
878 with open(filename, 'rb') as f:
852 code = f.read()
879 code = f.read()
853 return self.execute(code, targets=targets, block=block)
880 return self.execute(code, targets=targets, block=block)
854
881
855 def _maybe_raise(self, result):
882 def _maybe_raise(self, result):
856 """wrapper for maybe raising an exception if apply failed."""
883 """wrapper for maybe raising an exception if apply failed."""
857 if isinstance(result, error.RemoteError):
884 if isinstance(result, error.RemoteError):
858 raise result
885 raise result
859
886
860 return result
887 return result
861
888
862 def _build_dependency(self, dep):
889 def _build_dependency(self, dep):
863 """helper for building jsonable dependencies from various input forms"""
890 """helper for building jsonable dependencies from various input forms"""
864 if isinstance(dep, Dependency):
891 if isinstance(dep, Dependency):
865 return dep.as_dict()
892 return dep.as_dict()
866 elif isinstance(dep, AsyncResult):
893 elif isinstance(dep, AsyncResult):
867 return dep.msg_ids
894 return dep.msg_ids
868 elif dep is None:
895 elif dep is None:
869 return []
896 return []
870 else:
897 else:
871 # pass to Dependency constructor
898 # pass to Dependency constructor
872 return list(Dependency(dep))
899 return list(Dependency(dep))
873
900
874 @defaultblock
901 @defaultblock
875 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
902 def apply(self, f, args=None, kwargs=None, bound=True, block=None,
876 targets=None, balanced=None,
903 targets=None, balanced=None,
877 after=None, follow=None, timeout=None):
904 after=None, follow=None, timeout=None):
878 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
905 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
879
906
880 This is the central execution command for the client.
907 This is the central execution command for the client.
881
908
882 Parameters
909 Parameters
883 ----------
910 ----------
884
911
885 f : function
912 f : function
886 The fuction to be called remotely
913 The fuction to be called remotely
887 args : tuple/list
914 args : tuple/list
888 The positional arguments passed to `f`
915 The positional arguments passed to `f`
889 kwargs : dict
916 kwargs : dict
890 The keyword arguments passed to `f`
917 The keyword arguments passed to `f`
891 bound : bool (default: True)
918 bound : bool (default: True)
892 Whether to execute in the Engine(s) namespace, or in a clean
919 Whether to execute in the Engine(s) namespace, or in a clean
893 namespace not affecting the engine.
920 namespace not affecting the engine.
894 block : bool (default: self.block)
921 block : bool (default: self.block)
895 Whether to wait for the result, or return immediately.
922 Whether to wait for the result, or return immediately.
896 False:
923 False:
897 returns AsyncResult
924 returns AsyncResult
898 True:
925 True:
899 returns actual result(s) of f(*args, **kwargs)
926 returns actual result(s) of f(*args, **kwargs)
900 if multiple targets:
927 if multiple targets:
901 list of results, matching `targets`
928 list of results, matching `targets`
902 targets : int,list of ints, 'all', None
929 targets : int,list of ints, 'all', None
903 Specify the destination of the job.
930 Specify the destination of the job.
904 if None:
931 if None:
905 Submit via Task queue for load-balancing.
932 Submit via Task queue for load-balancing.
906 if 'all':
933 if 'all':
907 Run on all active engines
934 Run on all active engines
908 if list:
935 if list:
909 Run on each specified engine
936 Run on each specified engine
910 if int:
937 if int:
911 Run on single engine
938 Run on single engine
912
939
913 balanced : bool, default None
940 balanced : bool, default None
914 whether to load-balance. This will default to True
941 whether to load-balance. This will default to True
915 if targets is unspecified, or False if targets is specified.
942 if targets is unspecified, or False if targets is specified.
916
943
917 The following arguments are only used when balanced is True:
944 The following arguments are only used when balanced is True:
918 after : Dependency or collection of msg_ids
945 after : Dependency or collection of msg_ids
919 Only for load-balanced execution (targets=None)
946 Only for load-balanced execution (targets=None)
920 Specify a list of msg_ids as a time-based dependency.
947 Specify a list of msg_ids as a time-based dependency.
921 This job will only be run *after* the dependencies
948 This job will only be run *after* the dependencies
922 have been met.
949 have been met.
923
950
924 follow : Dependency or collection of msg_ids
951 follow : Dependency or collection of msg_ids
925 Only for load-balanced execution (targets=None)
952 Only for load-balanced execution (targets=None)
926 Specify a list of msg_ids as a location-based dependency.
953 Specify a list of msg_ids as a location-based dependency.
927 This job will only be run on an engine where this dependency
954 This job will only be run on an engine where this dependency
928 is met.
955 is met.
929
956
930 timeout : float/int or None
957 timeout : float/int or None
931 Only for load-balanced execution (targets=None)
958 Only for load-balanced execution (targets=None)
932 Specify an amount of time (in seconds) for the scheduler to
959 Specify an amount of time (in seconds) for the scheduler to
933 wait for dependencies to be met before failing with a
960 wait for dependencies to be met before failing with a
934 DependencyTimeout.
961 DependencyTimeout.
935
962
936 after,follow,timeout only used if `balanced=True`.
963 after,follow,timeout only used if `balanced=True`.
937
964
938 Returns
965 Returns
939 -------
966 -------
940
967
941 if block is False:
968 if block is False:
942 return AsyncResult wrapping msg_ids
969 return AsyncResult wrapping msg_ids
943 output of AsyncResult.get() is identical to that of `apply(...block=True)`
970 output of AsyncResult.get() is identical to that of `apply(...block=True)`
944 else:
971 else:
945 if single target:
972 if single target:
946 return result of `f(*args, **kwargs)`
973 return result of `f(*args, **kwargs)`
947 else:
974 else:
948 return list of results, matching `targets`
975 return list of results, matching `targets`
949 """
976 """
950 assert not self._closed, "cannot use me anymore, I'm closed!"
977 assert not self._closed, "cannot use me anymore, I'm closed!"
951 # defaults:
978 # defaults:
952 block = block if block is not None else self.block
979 block = block if block is not None else self.block
953 args = args if args is not None else []
980 args = args if args is not None else []
954 kwargs = kwargs if kwargs is not None else {}
981 kwargs = kwargs if kwargs is not None else {}
955
982
956 if balanced is None:
983 if balanced is None:
957 if targets is None:
984 if targets is None:
958 # default to balanced if targets unspecified
985 # default to balanced if targets unspecified
959 balanced = True
986 balanced = True
960 else:
987 else:
961 # otherwise default to multiplexing
988 # otherwise default to multiplexing
962 balanced = False
989 balanced = False
963
990
964 if targets is None and balanced is False:
991 if targets is None and balanced is False:
965 # default to all if *not* balanced, and targets is unspecified
992 # default to all if *not* balanced, and targets is unspecified
966 targets = 'all'
993 targets = 'all'
967
994
968 # enforce types of f,args,kwrags
995 # enforce types of f,args,kwrags
969 if not callable(f):
996 if not callable(f):
970 raise TypeError("f must be callable, not %s"%type(f))
997 raise TypeError("f must be callable, not %s"%type(f))
971 if not isinstance(args, (tuple, list)):
998 if not isinstance(args, (tuple, list)):
972 raise TypeError("args must be tuple or list, not %s"%type(args))
999 raise TypeError("args must be tuple or list, not %s"%type(args))
973 if not isinstance(kwargs, dict):
1000 if not isinstance(kwargs, dict):
974 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1001 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
975
1002
976 options = dict(bound=bound, block=block, targets=targets)
1003 options = dict(bound=bound, block=block, targets=targets)
977
1004
978 if balanced:
1005 if balanced:
979 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1006 return self._apply_balanced(f, args, kwargs, timeout=timeout,
980 after=after, follow=follow, **options)
1007 after=after, follow=follow, **options)
981 elif follow or after or timeout:
1008 elif follow or after or timeout:
982 msg = "follow, after, and timeout args are only used for"
1009 msg = "follow, after, and timeout args are only used for"
983 msg += " load-balanced execution."
1010 msg += " load-balanced execution."
984 raise ValueError(msg)
1011 raise ValueError(msg)
985 else:
1012 else:
986 return self._apply_direct(f, args, kwargs, **options)
1013 return self._apply_direct(f, args, kwargs, **options)
987
1014
988 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1015 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
989 after=None, follow=None, timeout=None):
1016 after=None, follow=None, timeout=None):
990 """call f(*args, **kwargs) remotely in a load-balanced manner.
1017 """call f(*args, **kwargs) remotely in a load-balanced manner.
991
1018
992 This is a private method, see `apply` for details.
1019 This is a private method, see `apply` for details.
993 Not to be called directly!
1020 Not to be called directly!
994 """
1021 """
995
1022
996 loc = locals()
1023 loc = locals()
997 for name in ('bound', 'block'):
1024 for name in ('bound', 'block'):
998 assert loc[name] is not None, "kwarg %r must be specified!"%name
1025 assert loc[name] is not None, "kwarg %r must be specified!"%name
999
1026
1000 if self._task_socket is None:
1027 if self._task_socket is None:
1001 msg = "Task farming is disabled"
1028 msg = "Task farming is disabled"
1002 if self._task_scheme == 'pure':
1029 if self._task_scheme == 'pure':
1003 msg += " because the pure ZMQ scheduler cannot handle"
1030 msg += " because the pure ZMQ scheduler cannot handle"
1004 msg += " disappearing engines."
1031 msg += " disappearing engines."
1005 raise RuntimeError(msg)
1032 raise RuntimeError(msg)
1006
1033
1007 if self._task_scheme == 'pure':
1034 if self._task_scheme == 'pure':
1008 # pure zmq scheme doesn't support dependencies
1035 # pure zmq scheme doesn't support dependencies
1009 msg = "Pure ZMQ scheduler doesn't support dependencies"
1036 msg = "Pure ZMQ scheduler doesn't support dependencies"
1010 if (follow or after):
1037 if (follow or after):
1011 # hard fail on DAG dependencies
1038 # hard fail on DAG dependencies
1012 raise RuntimeError(msg)
1039 raise RuntimeError(msg)
1013 if isinstance(f, dependent):
1040 if isinstance(f, dependent):
1014 # soft warn on functional dependencies
1041 # soft warn on functional dependencies
1015 warnings.warn(msg, RuntimeWarning)
1042 warnings.warn(msg, RuntimeWarning)
1016
1043
1017 # defaults:
1044 # defaults:
1018 args = args if args is not None else []
1045 args = args if args is not None else []
1019 kwargs = kwargs if kwargs is not None else {}
1046 kwargs = kwargs if kwargs is not None else {}
1020
1047
1021 if targets:
1048 if targets:
1022 idents,_ = self._build_targets(targets)
1049 idents,_ = self._build_targets(targets)
1023 else:
1050 else:
1024 idents = []
1051 idents = []
1025
1052
1026 after = self._build_dependency(after)
1053 after = self._build_dependency(after)
1027 follow = self._build_dependency(follow)
1054 follow = self._build_dependency(follow)
1028 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1055 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1029 bufs = ss.pack_apply_message(f,args,kwargs)
1056 bufs = ss.pack_apply_message(f,args,kwargs)
1030 content = dict(bound=bound)
1057 content = dict(bound=bound)
1031
1058
1032 msg = self.session.send(self._task_socket, "apply_request",
1059 msg = self.session.send(self._task_socket, "apply_request",
1033 content=content, buffers=bufs, subheader=subheader)
1060 content=content, buffers=bufs, subheader=subheader)
1034 msg_id = msg['msg_id']
1061 msg_id = msg['msg_id']
1035 self.outstanding.add(msg_id)
1062 self.outstanding.add(msg_id)
1036 self.history.append(msg_id)
1063 self.history.append(msg_id)
1037 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1064 ar = AsyncResult(self, [msg_id], fname=f.__name__)
1038 if block:
1065 if block:
1039 try:
1066 try:
1040 return ar.get()
1067 return ar.get()
1041 except KeyboardInterrupt:
1068 except KeyboardInterrupt:
1042 return ar
1069 return ar
1043 else:
1070 else:
1044 return ar
1071 return ar
1045
1072
1046 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1073 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None):
1047 """Then underlying method for applying functions to specific engines
1074 """Then underlying method for applying functions to specific engines
1048 via the MUX queue.
1075 via the MUX queue.
1049
1076
1050 This is a private method, see `apply` for details.
1077 This is a private method, see `apply` for details.
1051 Not to be called directly!
1078 Not to be called directly!
1052 """
1079 """
1053 loc = locals()
1080 loc = locals()
1054 for name in ('bound', 'block', 'targets'):
1081 for name in ('bound', 'block', 'targets'):
1055 assert loc[name] is not None, "kwarg %r must be specified!"%name
1082 assert loc[name] is not None, "kwarg %r must be specified!"%name
1056
1083
1057 idents,targets = self._build_targets(targets)
1084 idents,targets = self._build_targets(targets)
1058
1085
1059 subheader = {}
1086 subheader = {}
1060 content = dict(bound=bound)
1087 content = dict(bound=bound)
1061 bufs = ss.pack_apply_message(f,args,kwargs)
1088 bufs = ss.pack_apply_message(f,args,kwargs)
1062
1089
1063 msg_ids = []
1090 msg_ids = []
1064 for ident in idents:
1091 for ident in idents:
1065 msg = self.session.send(self._mux_socket, "apply_request",
1092 msg = self.session.send(self._mux_socket, "apply_request",
1066 content=content, buffers=bufs, ident=ident, subheader=subheader)
1093 content=content, buffers=bufs, ident=ident, subheader=subheader)
1067 msg_id = msg['msg_id']
1094 msg_id = msg['msg_id']
1068 self.outstanding.add(msg_id)
1095 self.outstanding.add(msg_id)
1069 self.history.append(msg_id)
1096 self.history.append(msg_id)
1070 msg_ids.append(msg_id)
1097 msg_ids.append(msg_id)
1071 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1098 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1072 if block:
1099 if block:
1073 try:
1100 try:
1074 return ar.get()
1101 return ar.get()
1075 except KeyboardInterrupt:
1102 except KeyboardInterrupt:
1076 return ar
1103 return ar
1077 else:
1104 else:
1078 return ar
1105 return ar
1079
1106
1080 #--------------------------------------------------------------------------
1107 #--------------------------------------------------------------------------
1081 # construct a View object
1108 # construct a View object
1082 #--------------------------------------------------------------------------
1109 #--------------------------------------------------------------------------
1083
1110
1084 @defaultblock
1111 @defaultblock
1085 def remote(self, bound=True, block=None, targets=None, balanced=None):
1112 def remote(self, bound=True, block=None, targets=None, balanced=None):
1086 """Decorator for making a RemoteFunction"""
1113 """Decorator for making a RemoteFunction"""
1087 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1114 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1088
1115
1089 @defaultblock
1116 @defaultblock
1090 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1117 def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None):
1091 """Decorator for making a ParallelFunction"""
1118 """Decorator for making a ParallelFunction"""
1092 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1119 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1093
1120
1094 def _cache_view(self, targets, balanced):
1121 def _cache_view(self, targets, balanced):
1095 """save views, so subsequent requests don't create new objects."""
1122 """save views, so subsequent requests don't create new objects."""
1096 if balanced:
1123 if balanced:
1097 view_class = LoadBalancedView
1124 view_class = LoadBalancedView
1098 view_cache = self._balanced_views
1125 view_cache = self._balanced_views
1099 else:
1126 else:
1100 view_class = DirectView
1127 view_class = DirectView
1101 view_cache = self._direct_views
1128 view_cache = self._direct_views
1102
1129
1103 # use str, since often targets will be a list
1130 # use str, since often targets will be a list
1104 key = str(targets)
1131 key = str(targets)
1105 if key not in view_cache:
1132 if key not in view_cache:
1106 view_cache[key] = view_class(client=self, targets=targets)
1133 view_cache[key] = view_class(client=self, targets=targets)
1107
1134
1108 return view_cache[key]
1135 return view_cache[key]
1109
1136
1110 def view(self, targets=None, balanced=None):
1137 def view(self, targets=None, balanced=None):
1111 """Method for constructing View objects.
1138 """Method for constructing View objects.
1112
1139
1113 If no arguments are specified, create a LoadBalancedView
1140 If no arguments are specified, create a LoadBalancedView
1114 using all engines. If only `targets` specified, it will
1141 using all engines. If only `targets` specified, it will
1115 be a DirectView. This method is the underlying implementation
1142 be a DirectView. This method is the underlying implementation
1116 of ``client.__getitem__``.
1143 of ``client.__getitem__``.
1117
1144
1118 Parameters
1145 Parameters
1119 ----------
1146 ----------
1120
1147
1121 targets: list,slice,int,etc. [default: use all engines]
1148 targets: list,slice,int,etc. [default: use all engines]
1122 The engines to use for the View
1149 The engines to use for the View
1123 balanced : bool [default: False if targets specified, True else]
1150 balanced : bool [default: False if targets specified, True else]
1124 whether to build a LoadBalancedView or a DirectView
1151 whether to build a LoadBalancedView or a DirectView
1125
1152
1126 """
1153 """
1127
1154
1128 balanced = (targets is None) if balanced is None else balanced
1155 balanced = (targets is None) if balanced is None else balanced
1129
1156
1130 if targets is None:
1157 if targets is None:
1131 if balanced:
1158 if balanced:
1132 return self._cache_view(None,True)
1159 return self._cache_view(None,True)
1133 else:
1160 else:
1134 targets = slice(None)
1161 targets = slice(None)
1135
1162
1136 if isinstance(targets, int):
1163 if isinstance(targets, int):
1164 if targets < 0:
1165 targets = self.ids[targets]
1137 if targets not in self.ids:
1166 if targets not in self.ids:
1138 raise IndexError("No such engine: %i"%targets)
1167 raise IndexError("No such engine: %i"%targets)
1139 return self._cache_view(targets, balanced)
1168 return self._cache_view(targets, balanced)
1140
1169
1141 if isinstance(targets, slice):
1170 if isinstance(targets, slice):
1142 indices = range(len(self.ids))[targets]
1171 indices = range(len(self.ids))[targets]
1143 ids = sorted(self._ids)
1172 ids = sorted(self._ids)
1144 targets = [ ids[i] for i in indices ]
1173 targets = [ ids[i] for i in indices ]
1145
1174
1146 if isinstance(targets, (tuple, list, xrange)):
1175 if isinstance(targets, (tuple, list, xrange)):
1147 _,targets = self._build_targets(list(targets))
1176 _,targets = self._build_targets(list(targets))
1148 return self._cache_view(targets, balanced)
1177 return self._cache_view(targets, balanced)
1149 else:
1178 else:
1150 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1179 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1151
1180
1152 #--------------------------------------------------------------------------
1181 #--------------------------------------------------------------------------
1153 # Data movement
1182 # Data movement
1154 #--------------------------------------------------------------------------
1183 #--------------------------------------------------------------------------
1155
1184
1156 @defaultblock
1185 @defaultblock
1157 def push(self, ns, targets='all', block=None):
1186 def push(self, ns, targets='all', block=None):
1158 """Push the contents of `ns` into the namespace on `target`"""
1187 """Push the contents of `ns` into the namespace on `target`"""
1159 if not isinstance(ns, dict):
1188 if not isinstance(ns, dict):
1160 raise TypeError("Must be a dict, not %s"%type(ns))
1189 raise TypeError("Must be a dict, not %s"%type(ns))
1161 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1190 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False)
1162 return result
1191 if not block:
1192 return result
1163
1193
1164 @defaultblock
1194 @defaultblock
1165 def pull(self, keys, targets='all', block=None):
1195 def pull(self, keys, targets='all', block=None):
1166 """Pull objects from `target`'s namespace by `keys`"""
1196 """Pull objects from `target`'s namespace by `keys`"""
1167 if isinstance(keys, str):
1197 if isinstance(keys, str):
1168 pass
1198 pass
1169 elif isinstance(keys, (list,tuple,set)):
1199 elif isinstance(keys, (list,tuple,set)):
1170 for key in keys:
1200 for key in keys:
1171 if not isinstance(key, str):
1201 if not isinstance(key, str):
1172 raise TypeError
1202 raise TypeError
1173 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1203 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1174 return result
1204 return result
1175
1205
1176 @defaultblock
1206 @defaultblock
1177 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1207 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1178 """
1208 """
1179 Partition a Python sequence and send the partitions to a set of engines.
1209 Partition a Python sequence and send the partitions to a set of engines.
1180 """
1210 """
1181 targets = self._build_targets(targets)[-1]
1211 targets = self._build_targets(targets)[-1]
1182 mapObject = Map.dists[dist]()
1212 mapObject = Map.dists[dist]()
1183 nparts = len(targets)
1213 nparts = len(targets)
1184 msg_ids = []
1214 msg_ids = []
1185 for index, engineid in enumerate(targets):
1215 for index, engineid in enumerate(targets):
1186 partition = mapObject.getPartition(seq, index, nparts)
1216 partition = mapObject.getPartition(seq, index, nparts)
1187 if flatten and len(partition) == 1:
1217 if flatten and len(partition) == 1:
1188 r = self.push({key: partition[0]}, targets=engineid, block=False)
1218 r = self.push({key: partition[0]}, targets=engineid, block=False)
1189 else:
1219 else:
1190 r = self.push({key: partition}, targets=engineid, block=False)
1220 r = self.push({key: partition}, targets=engineid, block=False)
1191 msg_ids.extend(r.msg_ids)
1221 msg_ids.extend(r.msg_ids)
1192 r = AsyncResult(self, msg_ids, fname='scatter')
1222 r = AsyncResult(self, msg_ids, fname='scatter')
1193 if block:
1223 if block:
1194 return r.get()
1224 r.get()
1195 else:
1225 else:
1196 return r
1226 return r
1197
1227
1198 @defaultblock
1228 @defaultblock
1199 def gather(self, key, dist='b', targets='all', block=None):
1229 def gather(self, key, dist='b', targets='all', block=None):
1200 """
1230 """
1201 Gather a partitioned sequence on a set of engines as a single local seq.
1231 Gather a partitioned sequence on a set of engines as a single local seq.
1202 """
1232 """
1203
1233
1204 targets = self._build_targets(targets)[-1]
1234 targets = self._build_targets(targets)[-1]
1205 mapObject = Map.dists[dist]()
1235 mapObject = Map.dists[dist]()
1206 msg_ids = []
1236 msg_ids = []
1207 for index, engineid in enumerate(targets):
1237 for index, engineid in enumerate(targets):
1208 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1238 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1209
1239
1210 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1240 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1211 if block:
1241 if block:
1212 return r.get()
1242 return r.get()
1213 else:
1243 else:
1214 return r
1244 return r
1215
1245
1216 #--------------------------------------------------------------------------
1246 #--------------------------------------------------------------------------
1217 # Query methods
1247 # Query methods
1218 #--------------------------------------------------------------------------
1248 #--------------------------------------------------------------------------
1219
1249
1220 @spinfirst
1250 @spinfirst
1221 def get_results(self, msg_ids, status_only=False):
1251 @defaultblock
1222 """Returns the result of the execute or task request with `msg_ids`.
1252 def get_result(self, indices_or_msg_ids=None, block=None):
1253 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1254
1255 If the client already has the results, no request to the Hub will be made.
1256
1257 This is a convenient way to construct AsyncResult objects, which are wrappers
1258 that include metadata about execution, and allow for awaiting results that
1259 were not submitted by this Client.
1260
1261 It can also be a convenient way to retrieve the metadata associated with
1262 blocking execution, since it always retrieves
1263
1264 Examples
1265 --------
1266 ::
1267
1268 In [10]: r = client.apply()
1223
1269
1224 Parameters
1270 Parameters
1225 ----------
1271 ----------
1226
1272
1227 msg_ids : list of ints or msg_ids
1273 indices_or_msg_ids : integer history index, str msg_id, or list of either
1274 The indices or msg_ids of indices to be retrieved
1275
1276 block : bool
1277 Whether to wait for the result to be done
1278
1279 Returns
1280 -------
1281
1282 AsyncResult
1283 A single AsyncResult object will always be returned.
1284
1285 AsyncHubResult
1286 A subclass of AsyncResult that retrieves results from the Hub
1287
1288 """
1289 if indices_or_msg_ids is None:
1290 indices_or_msg_ids = -1
1291
1292 if not isinstance(indices_or_msg_ids, (list,tuple)):
1293 indices_or_msg_ids = [indices_or_msg_ids]
1294
1295 theids = []
1296 for id in indices_or_msg_ids:
1297 if isinstance(id, int):
1298 id = self.history[id]
1299 if not isinstance(id, str):
1300 raise TypeError("indices must be str or int, not %r"%id)
1301 theids.append(id)
1302
1303 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1304 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1305
1306 if remote_ids:
1307 ar = AsyncHubResult(self, msg_ids=theids)
1308 else:
1309 ar = AsyncResult(self, msg_ids=theids)
1310
1311 if block:
1312 ar.wait()
1313
1314 return ar
1315
1316 @spinfirst
1317 def result_status(self, msg_ids, status_only=True):
1318 """Check on the status of the result(s) of the apply request with `msg_ids`.
1319
1320 If status_only is False, then the actual results will be retrieved, else
1321 only the status of the results will be checked.
1322
1323 Parameters
1324 ----------
1325
1326 msg_ids : list of msg_ids
1228 if int:
1327 if int:
1229 Passed as index to self.history for convenience.
1328 Passed as index to self.history for convenience.
1230 status_only : bool (default: False)
1329 status_only : bool (default: True)
1231 if False:
1330 if False:
1232 return the actual results
1331 Retrieve the actual results of completed tasks.
1233
1332
1234 Returns
1333 Returns
1235 -------
1334 -------
1236
1335
1237 results : dict
1336 results : dict
1238 There will always be the keys 'pending' and 'completed', which will
1337 There will always be the keys 'pending' and 'completed', which will
1239 be lists of msg_ids.
1338 be lists of msg_ids that are incomplete or complete. If `status_only`
1339 is False, then completed results will be keyed by their `msg_id`.
1240 """
1340 """
1241 if not isinstance(msg_ids, (list,tuple)):
1341 if not isinstance(indices_or_msg_ids, (list,tuple)):
1242 msg_ids = [msg_ids]
1342 indices_or_msg_ids = [indices_or_msg_ids]
1343
1243 theids = []
1344 theids = []
1244 for msg_id in msg_ids:
1345 for msg_id in indices_or_msg_ids:
1245 if isinstance(msg_id, int):
1346 if isinstance(msg_id, int):
1246 msg_id = self.history[msg_id]
1347 msg_id = self.history[msg_id]
1247 if not isinstance(msg_id, str):
1348 if not isinstance(msg_id, basestring):
1248 raise TypeError("msg_ids must be str, not %r"%msg_id)
1349 raise TypeError("msg_ids must be str, not %r"%msg_id)
1249 theids.append(msg_id)
1350 theids.append(msg_id)
1250
1351
1251 completed = []
1352 completed = []
1252 local_results = {}
1353 local_results = {}
1253
1354
1254 # comment this block out to temporarily disable local shortcut:
1355 # comment this block out to temporarily disable local shortcut:
1255 for msg_id in list(theids):
1356 for msg_id in theids:
1256 if msg_id in self.results:
1357 if msg_id in self.results:
1257 completed.append(msg_id)
1358 completed.append(msg_id)
1258 local_results[msg_id] = self.results[msg_id]
1359 local_results[msg_id] = self.results[msg_id]
1259 theids.remove(msg_id)
1360 theids.remove(msg_id)
1260
1361
1261 if theids: # some not locally cached
1362 if theids: # some not locally cached
1262 content = dict(msg_ids=theids, status_only=status_only)
1363 content = dict(msg_ids=theids, status_only=status_only)
1263 msg = self.session.send(self._query_socket, "result_request", content=content)
1364 msg = self.session.send(self._query_socket, "result_request", content=content)
1264 zmq.select([self._query_socket], [], [])
1365 zmq.select([self._query_socket], [], [])
1265 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1366 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1266 if self.debug:
1367 if self.debug:
1267 pprint(msg)
1368 pprint(msg)
1268 content = msg['content']
1369 content = msg['content']
1269 if content['status'] != 'ok':
1370 if content['status'] != 'ok':
1270 raise ss.unwrap_exception(content)
1371 raise self._unwrap_exception(content)
1271 buffers = msg['buffers']
1372 buffers = msg['buffers']
1272 else:
1373 else:
1273 content = dict(completed=[],pending=[])
1374 content = dict(completed=[],pending=[])
1274
1375
1275 content['completed'].extend(completed)
1376 content['completed'].extend(completed)
1276
1377
1277 if status_only:
1378 if status_only:
1278 return content
1379 return content
1279
1380
1280 failures = []
1381 failures = []
1281 # load cached results into result:
1382 # load cached results into result:
1282 content.update(local_results)
1383 content.update(local_results)
1283 # update cache with results:
1384 # update cache with results:
1284 for msg_id in sorted(theids):
1385 for msg_id in sorted(theids):
1285 if msg_id in content['completed']:
1386 if msg_id in content['completed']:
1286 rec = content[msg_id]
1387 rec = content[msg_id]
1287 parent = rec['header']
1388 parent = rec['header']
1288 header = rec['result_header']
1389 header = rec['result_header']
1289 rcontent = rec['result_content']
1390 rcontent = rec['result_content']
1290 iodict = rec['io']
1391 iodict = rec['io']
1291 if isinstance(rcontent, str):
1392 if isinstance(rcontent, str):
1292 rcontent = self.session.unpack(rcontent)
1393 rcontent = self.session.unpack(rcontent)
1293
1394
1294 md = self.metadata.setdefault(msg_id, Metadata())
1395 md = self.metadata.setdefault(msg_id, Metadata())
1295 md.update(self._extract_metadata(header, parent, rcontent))
1396 md.update(self._extract_metadata(header, parent, rcontent))
1296 md.update(iodict)
1397 md.update(iodict)
1297
1398
1298 if rcontent['status'] == 'ok':
1399 if rcontent['status'] == 'ok':
1299 res,buffers = ss.unserialize_object(buffers)
1400 res,buffers = ss.unserialize_object(buffers)
1300 else:
1401 else:
1301 res = ss.unwrap_exception(rcontent)
1402 print rcontent
1403 res = self._unwrap_exception(rcontent)
1302 failures.append(res)
1404 failures.append(res)
1303
1405
1304 self.results[msg_id] = res
1406 self.results[msg_id] = res
1305 content[msg_id] = res
1407 content[msg_id] = res
1306
1408
1307 error.collect_exceptions(failures, "get_results")
1409 if len(theids) == 1 and failures:
1410 raise failures[0]
1411
1412 error.collect_exceptions(failures, "result_status")
1308 return content
1413 return content
1309
1414
1310 @spinfirst
1415 @spinfirst
1311 def queue_status(self, targets='all', verbose=False):
1416 def queue_status(self, targets='all', verbose=False):
1312 """Fetch the status of engine queues.
1417 """Fetch the status of engine queues.
1313
1418
1314 Parameters
1419 Parameters
1315 ----------
1420 ----------
1316
1421
1317 targets : int/str/list of ints/strs
1422 targets : int/str/list of ints/strs
1318 the engines whose states are to be queried.
1423 the engines whose states are to be queried.
1319 default : all
1424 default : all
1320 verbose : bool
1425 verbose : bool
1321 Whether to return lengths only, or lists of ids for each element
1426 Whether to return lengths only, or lists of ids for each element
1322 """
1427 """
1323 targets = self._build_targets(targets)[1]
1428 targets = self._build_targets(targets)[1]
1324 content = dict(targets=targets, verbose=verbose)
1429 content = dict(targets=targets, verbose=verbose)
1325 self.session.send(self._query_socket, "queue_request", content=content)
1430 self.session.send(self._query_socket, "queue_request", content=content)
1326 idents,msg = self.session.recv(self._query_socket, 0)
1431 idents,msg = self.session.recv(self._query_socket, 0)
1327 if self.debug:
1432 if self.debug:
1328 pprint(msg)
1433 pprint(msg)
1329 content = msg['content']
1434 content = msg['content']
1330 status = content.pop('status')
1435 status = content.pop('status')
1331 if status != 'ok':
1436 if status != 'ok':
1332 raise ss.unwrap_exception(content)
1437 raise self._unwrap_exception(content)
1333 return ss.rekey(content)
1438 return ss.rekey(content)
1334
1439
1335 @spinfirst
1440 @spinfirst
1336 def purge_results(self, msg_ids=[], targets=[]):
1441 def purge_results(self, jobs=[], targets=[]):
1337 """Tell the controller to forget results.
1442 """Tell the controller to forget results.
1338
1443
1339 Individual results can be purged by msg_id, or the entire
1444 Individual results can be purged by msg_id, or the entire
1340 history of specific targets can be purged.
1445 history of specific targets can be purged.
1341
1446
1342 Parameters
1447 Parameters
1343 ----------
1448 ----------
1344
1449
1345 msg_ids : str or list of strs
1450 jobs : str or list of strs or AsyncResult objects
1346 the msg_ids whose results should be forgotten.
1451 the msg_ids whose results should be forgotten.
1347 targets : int/str/list of ints/strs
1452 targets : int/str/list of ints/strs
1348 The targets, by uuid or int_id, whose entire history is to be purged.
1453 The targets, by uuid or int_id, whose entire history is to be purged.
1349 Use `targets='all'` to scrub everything from the controller's memory.
1454 Use `targets='all'` to scrub everything from the controller's memory.
1350
1455
1351 default : None
1456 default : None
1352 """
1457 """
1353 if not targets and not msg_ids:
1458 if not targets and not jobs:
1354 raise ValueError
1459 raise ValueError("Must specify at least one of `targets` and `jobs`")
1355 if targets:
1460 if targets:
1356 targets = self._build_targets(targets)[1]
1461 targets = self._build_targets(targets)[1]
1462
1463 # construct msg_ids from jobs
1464 msg_ids = []
1465 if isinstance(jobs, (basestring,AsyncResult)):
1466 jobs = [jobs]
1467 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1468 if bad_ids:
1469 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1470 for j in jobs:
1471 if isinstance(j, AsyncResult):
1472 msg_ids.extend(j.msg_ids)
1473 else:
1474 msg_ids.append(j)
1475
1357 content = dict(targets=targets, msg_ids=msg_ids)
1476 content = dict(targets=targets, msg_ids=msg_ids)
1358 self.session.send(self._query_socket, "purge_request", content=content)
1477 self.session.send(self._query_socket, "purge_request", content=content)
1359 idents, msg = self.session.recv(self._query_socket, 0)
1478 idents, msg = self.session.recv(self._query_socket, 0)
1360 if self.debug:
1479 if self.debug:
1361 pprint(msg)
1480 pprint(msg)
1362 content = msg['content']
1481 content = msg['content']
1363 if content['status'] != 'ok':
1482 if content['status'] != 'ok':
1364 raise ss.unwrap_exception(content)
1483 raise self._unwrap_exception(content)
1365
1484
1366
1485
1367 __all__ = [ 'Client',
1486 __all__ = [ 'Client',
1368 'depend',
1487 'depend',
1369 'require',
1488 'require',
1370 'remote',
1489 'remote',
1371 'parallel',
1490 'parallel',
1372 'RemoteFunction',
1491 'RemoteFunction',
1373 'ParallelFunction',
1492 'ParallelFunction',
1374 'DirectView',
1493 'DirectView',
1375 'LoadBalancedView',
1494 'LoadBalancedView',
1376 'AsyncResult',
1495 'AsyncResult',
1377 'AsyncMapResult'
1496 'AsyncMapResult'
1378 ]
1497 ]
@@ -1,166 +1,166 b''
1 """Remote Functions and decorators for the client."""
1 """Remote Functions and decorators for the client."""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import warnings
13 import warnings
14
14
15 import map as Map
15 import map as Map
16 from asyncresult import AsyncMapResult
16 from asyncresult import AsyncMapResult
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Decorators
19 # Decorators
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 def remote(client, bound=True, block=None, targets=None, balanced=None):
22 def remote(client, bound=True, block=None, targets=None, balanced=None):
23 """Turn a function into a remote function.
23 """Turn a function into a remote function.
24
24
25 This method can be used for map:
25 This method can be used for map:
26
26
27 >>> @remote(client,block=True)
27 >>> @remote(client,block=True)
28 def func(a)
28 def func(a)
29 """
29 """
30 def remote_function(f):
30 def remote_function(f):
31 return RemoteFunction(client, f, bound, block, targets, balanced)
31 return RemoteFunction(client, f, bound, block, targets, balanced)
32 return remote_function
32 return remote_function
33
33
34 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
34 def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None):
35 """Turn a function into a parallel remote function.
35 """Turn a function into a parallel remote function.
36
36
37 This method can be used for map:
37 This method can be used for map:
38
38
39 >>> @parallel(client,block=True)
39 >>> @parallel(client,block=True)
40 def func(a)
40 def func(a)
41 """
41 """
42 def parallel_function(f):
42 def parallel_function(f):
43 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
43 return ParallelFunction(client, f, dist, bound, block, targets, balanced)
44 return parallel_function
44 return parallel_function
45
45
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47 # Classes
47 # Classes
48 #--------------------------------------------------------------------------
48 #--------------------------------------------------------------------------
49
49
50 class RemoteFunction(object):
50 class RemoteFunction(object):
51 """Turn an existing function into a remote function.
51 """Turn an existing function into a remote function.
52
52
53 Parameters
53 Parameters
54 ----------
54 ----------
55
55
56 client : Client instance
56 client : Client instance
57 The client to be used to connect to engines
57 The client to be used to connect to engines
58 f : callable
58 f : callable
59 The function to be wrapped into a remote function
59 The function to be wrapped into a remote function
60 bound : bool [default: False]
60 bound : bool [default: False]
61 Whether the affect the remote namespace when called
61 Whether the affect the remote namespace when called
62 block : bool [default: None]
62 block : bool [default: None]
63 Whether to wait for results or not. The default behavior is
63 Whether to wait for results or not. The default behavior is
64 to use the current `block` attribute of `client`
64 to use the current `block` attribute of `client`
65 targets : valid target list [default: all]
65 targets : valid target list [default: all]
66 The targets on which to execute.
66 The targets on which to execute.
67 balanced : bool
67 balanced : bool
68 Whether to load-balance with the Task scheduler or not
68 Whether to load-balance with the Task scheduler or not
69 """
69 """
70
70
71 client = None # the remote connection
71 client = None # the remote connection
72 func = None # the wrapped function
72 func = None # the wrapped function
73 block = None # whether to block
73 block = None # whether to block
74 bound = None # whether to affect the namespace
74 bound = None # whether to affect the namespace
75 targets = None # where to execute
75 targets = None # where to execute
76 balanced = None # whether to load-balance
76 balanced = None # whether to load-balance
77
77
78 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
78 def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None):
79 self.client = client
79 self.client = client
80 self.func = f
80 self.func = f
81 self.block=block
81 self.block=block
82 self.bound=bound
82 self.bound=bound
83 self.targets=targets
83 self.targets=targets
84 if balanced is None:
84 if balanced is None:
85 if targets is None:
85 if targets is None:
86 balanced = True
86 balanced = True
87 else:
87 else:
88 balanced = False
88 balanced = False
89 self.balanced = balanced
89 self.balanced = balanced
90
90
91 def __call__(self, *args, **kwargs):
91 def __call__(self, *args, **kwargs):
92 return self.client.apply(self.func, args=args, kwargs=kwargs,
92 return self.client.apply(self.func, args=args, kwargs=kwargs,
93 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
93 block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced)
94
94
95
95
96 class ParallelFunction(RemoteFunction):
96 class ParallelFunction(RemoteFunction):
97 """Class for mapping a function to sequences."""
97 """Class for mapping a function to sequences."""
98 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None):
98 def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None):
99 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
99 super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced)
100 self.chunk_size = chunk_size
100 self.chunk_size = chunk_size
101
101
102 mapClass = Map.dists[dist]
102 mapClass = Map.dists[dist]
103 self.mapObject = mapClass()
103 self.mapObject = mapClass()
104
104
105 def __call__(self, *sequences):
105 def __call__(self, *sequences):
106 len_0 = len(sequences[0])
106 len_0 = len(sequences[0])
107 for s in sequences:
107 for s in sequences:
108 if len(s)!=len_0:
108 if len(s)!=len_0:
109 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
109 msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s))
110 raise ValueError(msg)
110 raise ValueError(msg)
111
111
112 if self.balanced:
112 if self.balanced:
113 if self.chunk_size:
113 if self.chunk_size:
114 nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0)
114 nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0)
115 else:
115 else:
116 nparts = len_0
116 nparts = len_0
117 targets = [self.targets]*nparts
117 targets = [self.targets]*nparts
118 else:
118 else:
119 if self.chunk_size:
119 if self.chunk_size:
120 warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning)
120 warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning)
121 # multiplexed:
121 # multiplexed:
122 targets = self.client._build_targets(self.targets)[-1]
122 targets = self.client._build_targets(self.targets)[-1]
123 nparts = len(targets)
123 nparts = len(targets)
124
124
125 msg_ids = []
125 msg_ids = []
126 # my_f = lambda *a: map(self.func, *a)
126 # my_f = lambda *a: map(self.func, *a)
127 for index, t in enumerate(targets):
127 for index, t in enumerate(targets):
128 args = []
128 args = []
129 for seq in sequences:
129 for seq in sequences:
130 part = self.mapObject.getPartition(seq, index, nparts)
130 part = self.mapObject.getPartition(seq, index, nparts)
131 if not part:
131 if len(part) == 0:
132 continue
132 continue
133 else:
133 else:
134 args.append(part)
134 args.append(part)
135 if not args:
135 if not args:
136 continue
136 continue
137
137
138 # print (args)
138 # print (args)
139 if hasattr(self, '_map'):
139 if hasattr(self, '_map'):
140 f = map
140 f = map
141 args = [self.func]+args
141 args = [self.func]+args
142 else:
142 else:
143 f=self.func
143 f=self.func
144 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
144 ar = self.client.apply(f, args=args, block=False, bound=self.bound,
145 targets=t, balanced=self.balanced)
145 targets=t, balanced=self.balanced)
146
146
147 msg_ids.append(ar.msg_ids[0])
147 msg_ids.append(ar.msg_ids[0])
148
148
149 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
149 r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
150 if self.block:
150 if self.block:
151 try:
151 try:
152 return r.get()
152 return r.get()
153 except KeyboardInterrupt:
153 except KeyboardInterrupt:
154 return r
154 return r
155 else:
155 else:
156 return r
156 return r
157
157
158 def map(self, *sequences):
158 def map(self, *sequences):
159 """call a function on each element of a sequence remotely."""
159 """call a function on each element of a sequence remotely."""
160 self._map = True
160 self._map = True
161 try:
161 try:
162 ret = self.__call__(*sequences)
162 ret = self.__call__(*sequences)
163 finally:
163 finally:
164 del self._map
164 del self._map
165 return ret
165 return ret
166
166
@@ -1,592 +1,639 b''
1 """Views of remote engines"""
1 """Views of remote engines"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
13 from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Instance
14
14
15 from IPython.external.decorator import decorator
15 from IPython.external.decorator import decorator
16 from IPython.zmq.parallel.asyncresult import AsyncResult
16 from IPython.zmq.parallel.asyncresult import AsyncResult
17 from IPython.zmq.parallel.dependency import Dependency
17 from IPython.zmq.parallel.dependency import Dependency
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
18 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel, remote
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Decorators
21 # Decorators
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 @decorator
24 @decorator
25 def myblock(f, self, *args, **kwargs):
25 def myblock(f, self, *args, **kwargs):
26 """override client.block with self.block during a call"""
26 """override client.block with self.block during a call"""
27 block = self.client.block
27 block = self.client.block
28 self.client.block = self.block
28 self.client.block = self.block
29 try:
29 try:
30 ret = f(self, *args, **kwargs)
30 ret = f(self, *args, **kwargs)
31 finally:
31 finally:
32 self.client.block = block
32 self.client.block = block
33 return ret
33 return ret
34
34
35 @decorator
35 @decorator
36 def save_ids(f, self, *args, **kwargs):
36 def save_ids(f, self, *args, **kwargs):
37 """Keep our history and outstanding attributes up to date after a method call."""
37 """Keep our history and outstanding attributes up to date after a method call."""
38 n_previous = len(self.client.history)
38 n_previous = len(self.client.history)
39 ret = f(self, *args, **kwargs)
39 ret = f(self, *args, **kwargs)
40 nmsgs = len(self.client.history) - n_previous
40 nmsgs = len(self.client.history) - n_previous
41 msg_ids = self.client.history[-nmsgs:]
41 msg_ids = self.client.history[-nmsgs:]
42 self.history.extend(msg_ids)
42 self.history.extend(msg_ids)
43 map(self.outstanding.add, msg_ids)
43 map(self.outstanding.add, msg_ids)
44 return ret
44 return ret
45
45
46 @decorator
46 @decorator
47 def sync_results(f, self, *args, **kwargs):
47 def sync_results(f, self, *args, **kwargs):
48 """sync relevant results from self.client to our results attribute."""
48 """sync relevant results from self.client to our results attribute."""
49 ret = f(self, *args, **kwargs)
49 ret = f(self, *args, **kwargs)
50 delta = self.outstanding.difference(self.client.outstanding)
50 delta = self.outstanding.difference(self.client.outstanding)
51 completed = self.outstanding.intersection(delta)
51 completed = self.outstanding.intersection(delta)
52 self.outstanding = self.outstanding.difference(completed)
52 self.outstanding = self.outstanding.difference(completed)
53 for msg_id in completed:
53 for msg_id in completed:
54 self.results[msg_id] = self.client.results[msg_id]
54 self.results[msg_id] = self.client.results[msg_id]
55 return ret
55 return ret
56
56
57 @decorator
57 @decorator
58 def spin_after(f, self, *args, **kwargs):
58 def spin_after(f, self, *args, **kwargs):
59 """call spin after the method."""
59 """call spin after the method."""
60 ret = f(self, *args, **kwargs)
60 ret = f(self, *args, **kwargs)
61 self.spin()
61 self.spin()
62 return ret
62 return ret
63
63
64 #-----------------------------------------------------------------------------
64 #-----------------------------------------------------------------------------
65 # Classes
65 # Classes
66 #-----------------------------------------------------------------------------
66 #-----------------------------------------------------------------------------
67
67
68 class View(HasTraits):
68 class View(HasTraits):
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
69 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
70
70
71 Don't use this class, use subclasses.
71 Don't use this class, use subclasses.
72 """
72 """
73 block=Bool(False)
73 block=Bool(False)
74 bound=Bool(False)
74 bound=Bool(False)
75 history=List()
75 history=List()
76 outstanding = Set()
76 outstanding = Set()
77 results = Dict()
77 results = Dict()
78 client = Instance('IPython.zmq.parallel.client.Client')
78 client = Instance('IPython.zmq.parallel.client.Client')
79
79
80 _ntargets = Int(1)
80 _ntargets = Int(1)
81 _balanced = Bool(False)
81 _balanced = Bool(False)
82 _default_names = List(['block', 'bound'])
82 _default_names = List(['block', 'bound'])
83 _targets = None
83 _targets = None
84
84
85 def __init__(self, client=None, targets=None):
85 def __init__(self, client=None, targets=None):
86 super(View, self).__init__(client=client)
86 super(View, self).__init__(client=client)
87 self._targets = targets
87 self._targets = targets
88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
88 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
89 self.block = client.block
89 self.block = client.block
90
90
91 for name in self._default_names:
91 for name in self._default_names:
92 setattr(self, name, getattr(self, name, None))
92 setattr(self, name, getattr(self, name, None))
93
93
94 assert not self.__class__ is View, "Don't use base View objects, use subclasses"
95
94
96
95 def __repr__(self):
97 def __repr__(self):
96 strtargets = str(self._targets)
98 strtargets = str(self._targets)
97 if len(strtargets) > 16:
99 if len(strtargets) > 16:
98 strtargets = strtargets[:12]+'...]'
100 strtargets = strtargets[:12]+'...]'
99 return "<%s %s>"%(self.__class__.__name__, strtargets)
101 return "<%s %s>"%(self.__class__.__name__, strtargets)
100
102
101 @property
103 @property
102 def targets(self):
104 def targets(self):
103 return self._targets
105 return self._targets
104
106
105 @targets.setter
107 @targets.setter
106 def targets(self, value):
108 def targets(self, value):
107 raise AttributeError("Cannot set View `targets` after construction!")
109 raise AttributeError("Cannot set View `targets` after construction!")
108
110
111 @property
112 def balanced(self):
113 return self._balanced
114
115 @balanced.setter
116 def balanced(self, value):
117 raise AttributeError("Cannot set View `balanced` after construction!")
118
109 def _defaults(self, *excludes):
119 def _defaults(self, *excludes):
110 """return dict of our default attributes, excluding names given."""
120 """return dict of our default attributes, excluding names given."""
111 d = dict(balanced=self._balanced, targets=self.targets)
121 d = dict(balanced=self._balanced, targets=self._targets)
112 for name in self._default_names:
122 for name in self._default_names:
113 if name not in excludes:
123 if name not in excludes:
114 d[name] = getattr(self, name)
124 d[name] = getattr(self, name)
115 return d
125 return d
116
126
117 def set_flags(self, **kwargs):
127 def set_flags(self, **kwargs):
118 """set my attribute flags by keyword.
128 """set my attribute flags by keyword.
119
129
120 A View is a wrapper for the Client's apply method, but
130 A View is a wrapper for the Client's apply method, but
121 with attributes that specify keyword arguments, those attributes
131 with attributes that specify keyword arguments, those attributes
122 can be set by keyword argument with this method.
132 can be set by keyword argument with this method.
123
133
124 Parameters
134 Parameters
125 ----------
135 ----------
126
136
127 block : bool
137 block : bool
128 whether to wait for results
138 whether to wait for results
129 bound : bool
139 bound : bool
130 whether to use the client's namespace
140 whether to use the client's namespace
131 """
141 """
132 for key in kwargs:
142 for key in kwargs:
133 if key not in self._default_names:
143 if key not in self._default_names:
134 raise KeyError("Invalid name: %r"%key)
144 raise KeyError("Invalid name: %r"%key)
135 for name in ('block', 'bound'):
145 for name in ('block', 'bound'):
136 if name in kwargs:
146 if name in kwargs:
137 setattr(self, name, kwargs[name])
147 setattr(self, name, kwargs[name])
138
148
139 #----------------------------------------------------------------
149 #----------------------------------------------------------------
140 # wrappers for client methods:
150 # wrappers for client methods:
141 #----------------------------------------------------------------
151 #----------------------------------------------------------------
142 @sync_results
152 @sync_results
143 def spin(self):
153 def spin(self):
144 """spin the client, and sync"""
154 """spin the client, and sync"""
145 self.client.spin()
155 self.client.spin()
146
156
147 @sync_results
157 @sync_results
148 @save_ids
158 @save_ids
149 def apply(self, f, *args, **kwargs):
159 def apply(self, f, *args, **kwargs):
150 """calls f(*args, **kwargs) on remote engines, returning the result.
160 """calls f(*args, **kwargs) on remote engines, returning the result.
151
161
152 This method does not involve the engine's namespace.
162 This method does not involve the engine's namespace.
153
163
154 if self.block is False:
164 if self.block is False:
155 returns msg_id
165 returns msg_id
156 else:
166 else:
157 returns actual result of f(*args, **kwargs)
167 returns actual result of f(*args, **kwargs)
158 """
168 """
159 return self.client.apply(f, args, kwargs, **self._defaults())
169 return self.client.apply(f, args, kwargs, **self._defaults())
160
170
161 @save_ids
171 @save_ids
162 def apply_async(self, f, *args, **kwargs):
172 def apply_async(self, f, *args, **kwargs):
163 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
173 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
164
174
165 This method does not involve the engine's namespace.
175 This method does not involve the engine's namespace.
166
176
167 returns msg_id
177 returns msg_id
168 """
178 """
169 d = self._defaults('block', 'bound')
179 d = self._defaults('block', 'bound')
170 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
180 return self.client.apply(f,args,kwargs, block=False, bound=False, **d)
171
181
172 @spin_after
182 @spin_after
173 @save_ids
183 @save_ids
174 def apply_sync(self, f, *args, **kwargs):
184 def apply_sync(self, f, *args, **kwargs):
175 """calls f(*args, **kwargs) on remote engines in a blocking manner,
185 """calls f(*args, **kwargs) on remote engines in a blocking manner,
176 returning the result.
186 returning the result.
177
187
178 This method does not involve the engine's namespace.
188 This method does not involve the engine's namespace.
179
189
180 returns: actual result of f(*args, **kwargs)
190 returns: actual result of f(*args, **kwargs)
181 """
191 """
182 d = self._defaults('block', 'bound')
192 d = self._defaults('block', 'bound')
183 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
193 return self.client.apply(f,args,kwargs, block=True, bound=False, **d)
184
194
185 @sync_results
195 # @sync_results
186 @save_ids
196 # @save_ids
187 def apply_bound(self, f, *args, **kwargs):
197 # def apply_bound(self, f, *args, **kwargs):
188 """calls f(*args, **kwargs) bound to engine namespace(s).
198 # """calls f(*args, **kwargs) bound to engine namespace(s).
189
199 #
190 if self.block is False:
200 # if self.block is False:
191 returns msg_id
201 # returns msg_id
192 else:
202 # else:
193 returns actual result of f(*args, **kwargs)
203 # returns actual result of f(*args, **kwargs)
194
204 #
195 This method has access to the targets' globals
205 # This method has access to the targets' namespace via globals()
196
206 #
197 """
207 # """
198 d = self._defaults('bound')
208 # d = self._defaults('bound')
199 return self.client.apply(f, args, kwargs, bound=True, **d)
209 # return self.client.apply(f, args, kwargs, bound=True, **d)
200
210 #
201 @sync_results
211 @sync_results
202 @save_ids
212 @save_ids
203 def apply_async_bound(self, f, *args, **kwargs):
213 def apply_async_bound(self, f, *args, **kwargs):
204 """calls f(*args, **kwargs) bound to engine namespace(s)
214 """calls f(*args, **kwargs) bound to engine namespace(s)
205 in a nonblocking manner.
215 in a nonblocking manner.
206
216
207 returns: msg_id
217 returns: msg_id
208
218
209 This method has access to the targets' globals
219 This method has access to the targets' namespace via globals()
210
220
211 """
221 """
212 d = self._defaults('block', 'bound')
222 d = self._defaults('block', 'bound')
213 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
223 return self.client.apply(f, args, kwargs, block=False, bound=True, **d)
214
224
215 @spin_after
225 @spin_after
216 @save_ids
226 @save_ids
217 def apply_sync_bound(self, f, *args, **kwargs):
227 def apply_sync_bound(self, f, *args, **kwargs):
218 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
228 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
219
229
220 returns: actual result of f(*args, **kwargs)
230 returns: actual result of f(*args, **kwargs)
221
231
222 This method has access to the targets' globals
232 This method has access to the targets' namespace via globals()
223
233
224 """
234 """
225 d = self._defaults('block', 'bound')
235 d = self._defaults('block', 'bound')
226 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
236 return self.client.apply(f, args, kwargs, block=True, bound=True, **d)
227
237
228 def abort(self, msg_ids=None, block=None):
238 def abort(self, jobs=None, block=None):
229 """Abort jobs on my engines.
239 """Abort jobs on my engines.
230
240
231 Parameters
241 Parameters
232 ----------
242 ----------
233
243
234 msg_ids : None, str, list of strs, optional
244 jobs : None, str, list of strs, optional
235 if None: abort all jobs.
245 if None: abort all jobs.
236 else: abort specific msg_id(s).
246 else: abort specific msg_id(s).
237 """
247 """
238 block = block if block is not None else self.block
248 block = block if block is not None else self.block
239 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
249 return self.client.abort(jobs=jobs, targets=self._targets, block=block)
240
250
241 def queue_status(self, verbose=False):
251 def queue_status(self, verbose=False):
242 """Fetch the Queue status of my engines"""
252 """Fetch the Queue status of my engines"""
243 return self.client.queue_status(targets=self.targets, verbose=verbose)
253 return self.client.queue_status(targets=self._targets, verbose=verbose)
244
254
245 def purge_results(self, msg_ids=[], targets=[]):
255 def purge_results(self, jobs=[], targets=[]):
246 """Instruct the controller to forget specific results."""
256 """Instruct the controller to forget specific results."""
247 if targets is None or targets == 'all':
257 if targets is None or targets == 'all':
248 targets = self.targets
258 targets = self._targets
249 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
259 return self.client.purge_results(jobs=jobs, targets=targets)
260
261 @spin_after
262 def get_result(self, indices_or_msg_ids=None):
263 """return one or more results, specified by history index or msg_id.
250
264
265 See client.get_result for details.
266
267 """
268
269 if indices_or_msg_ids is None:
270 indices_or_msg_ids = -1
271 if isinstance(indices_or_msg_ids, int):
272 indices_or_msg_ids = self.history[indices_or_msg_ids]
273 elif isinstance(indices_or_msg_ids, (list,tuple,set)):
274 indices_or_msg_ids = list(indices_or_msg_ids)
275 for i,index in enumerate(indices_or_msg_ids):
276 if isinstance(index, int):
277 indices_or_msg_ids[i] = self.history[index]
278 return self.client.get_result(indices_or_msg_ids)
279
251 #-------------------------------------------------------------------
280 #-------------------------------------------------------------------
252 # Map
281 # Map
253 #-------------------------------------------------------------------
282 #-------------------------------------------------------------------
254
283
255 def map(self, f, *sequences, **kwargs):
284 def map(self, f, *sequences, **kwargs):
256 """override in subclasses"""
285 """override in subclasses"""
257 raise NotImplementedError
286 raise NotImplementedError
258
287
259 def map_async(self, f, *sequences, **kwargs):
288 def map_async(self, f, *sequences, **kwargs):
260 """Parallel version of builtin `map`, using this view's engines.
289 """Parallel version of builtin `map`, using this view's engines.
261
290
262 This is equivalent to map(...block=False)
291 This is equivalent to map(...block=False)
263
292
264 See `map` for details.
293 See `self.map` for details.
265 """
294 """
266 if 'block' in kwargs:
295 if 'block' in kwargs:
267 raise TypeError("map_async doesn't take a `block` keyword argument.")
296 raise TypeError("map_async doesn't take a `block` keyword argument.")
268 kwargs['block'] = False
297 kwargs['block'] = False
269 return self.map(f,*sequences,**kwargs)
298 return self.map(f,*sequences,**kwargs)
270
299
271 def map_sync(self, f, *sequences, **kwargs):
300 def map_sync(self, f, *sequences, **kwargs):
272 """Parallel version of builtin `map`, using this view's engines.
301 """Parallel version of builtin `map`, using this view's engines.
273
302
274 This is equivalent to map(...block=True)
303 This is equivalent to map(...block=True)
275
304
276 See `map` for details.
305 See `self.map` for details.
277 """
306 """
278 if 'block' in kwargs:
307 if 'block' in kwargs:
279 raise TypeError("map_sync doesn't take a `block` keyword argument.")
308 raise TypeError("map_sync doesn't take a `block` keyword argument.")
280 kwargs['block'] = True
309 kwargs['block'] = True
281 return self.map(f,*sequences,**kwargs)
310 return self.map(f,*sequences,**kwargs)
282
311
312 def imap(self, f, *sequences, **kwargs):
313 """Parallel version of `itertools.imap`.
314
315 See `self.map` for details.
316 """
317
318 return iter(self.map_async(f,*sequences, **kwargs))
319
283 #-------------------------------------------------------------------
320 #-------------------------------------------------------------------
284 # Decorators
321 # Decorators
285 #-------------------------------------------------------------------
322 #-------------------------------------------------------------------
286
323
287 def remote(self, bound=True, block=True):
324 def remote(self, bound=True, block=True):
288 """Decorator for making a RemoteFunction"""
325 """Decorator for making a RemoteFunction"""
289 return remote(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
326 return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
290
327
291 def parallel(self, dist='b', bound=True, block=None):
328 def parallel(self, dist='b', bound=True, block=None):
292 """Decorator for making a ParallelFunction"""
329 """Decorator for making a ParallelFunction"""
293 block = self.block if block is None else block
330 block = self.block if block is None else block
294 return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced)
331 return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced)
295
332
296
333
297 class DirectView(View):
334 class DirectView(View):
298 """Direct Multiplexer View of one or more engines.
335 """Direct Multiplexer View of one or more engines.
299
336
300 These are created via indexed access to a client:
337 These are created via indexed access to a client:
301
338
302 >>> dv_1 = client[1]
339 >>> dv_1 = client[1]
303 >>> dv_all = client[:]
340 >>> dv_all = client[:]
304 >>> dv_even = client[::2]
341 >>> dv_even = client[::2]
305 >>> dv_some = client[1:3]
342 >>> dv_some = client[1:3]
306
343
307 This object provides dictionary access to engine namespaces:
344 This object provides dictionary access to engine namespaces:
308
345
309 # push a=5:
346 # push a=5:
310 >>> dv['a'] = 5
347 >>> dv['a'] = 5
311 # pull 'foo':
348 # pull 'foo':
312 >>> db['foo']
349 >>> db['foo']
313
350
314 """
351 """
315
352
316 def __init__(self, client=None, targets=None):
353 def __init__(self, client=None, targets=None):
317 super(DirectView, self).__init__(client=client, targets=targets)
354 super(DirectView, self).__init__(client=client, targets=targets)
318 self._balanced = False
355 self._balanced = False
319
356
320 @spin_after
357 @spin_after
321 @save_ids
358 @save_ids
322 def map(self, f, *sequences, **kwargs):
359 def map(self, f, *sequences, **kwargs):
323 """Parallel version of builtin `map`, using this View's `targets`.
360 """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult
361
362 Parallel version of builtin `map`, using this View's `targets`.
324
363
325 There will be one task per target, so work will be chunked
364 There will be one task per target, so work will be chunked
326 if the sequences are longer than `targets`.
365 if the sequences are longer than `targets`.
327
366
328 Results can be iterated as they are ready, but will become available in chunks.
367 Results can be iterated as they are ready, but will become available in chunks.
329
368
330 Parameters
369 Parameters
331 ----------
370 ----------
332
371
333 f : callable
372 f : callable
334 function to be mapped
373 function to be mapped
335 *sequences: one or more sequences of matching length
374 *sequences: one or more sequences of matching length
336 the sequences to be distributed and passed to `f`
375 the sequences to be distributed and passed to `f`
337 block : bool
376 block : bool
338 whether to wait for the result or not [default self.block]
377 whether to wait for the result or not [default self.block]
339 bound : bool
378 bound : bool
340 whether to wait for the result or not [default self.bound]
379 whether to have access to the engines' namespaces [default self.bound]
341
380
342 Returns
381 Returns
343 -------
382 -------
344
383
345 if block=False:
384 if block=False:
346 AsyncMapResult
385 AsyncMapResult
347 An object like AsyncResult, but which reassembles the sequence of results
386 An object like AsyncResult, but which reassembles the sequence of results
348 into a single list. AsyncMapResults can be iterated through before all
387 into a single list. AsyncMapResults can be iterated through before all
349 results are complete.
388 results are complete.
350 else:
389 else:
390 list
351 the result of map(f,*sequences)
391 the result of map(f,*sequences)
352 """
392 """
353
393
354 block = kwargs.get('block', self.block)
394 block = kwargs.get('block', self.block)
355 bound = kwargs.get('bound', self.bound)
395 bound = kwargs.get('bound', self.bound)
356 for k in kwargs.keys():
396 for k in kwargs.keys():
357 if k not in ['block', 'bound']:
397 if k not in ['block', 'bound']:
358 raise TypeError("invalid keyword arg, %r"%k)
398 raise TypeError("invalid keyword arg, %r"%k)
359
399
360 assert len(sequences) > 0, "must have some sequences to map onto!"
400 assert len(sequences) > 0, "must have some sequences to map onto!"
361 pf = ParallelFunction(self.client, f, block=block, bound=bound,
401 pf = ParallelFunction(self.client, f, block=block, bound=bound,
362 targets=self.targets, balanced=False)
402 targets=self._targets, balanced=False)
363 return pf.map(*sequences)
403 return pf.map(*sequences)
364
404
365 @sync_results
405 @sync_results
366 @save_ids
406 @save_ids
367 def execute(self, code, block=True):
407 def execute(self, code, block=True):
368 """execute some code on my targets."""
408 """execute some code on my targets."""
369 return self.client.execute(code, block=block, targets=self.targets)
409 return self.client.execute(code, block=block, targets=self._targets)
370
410
371 def update(self, ns):
411 def update(self, ns):
372 """update remote namespace with dict `ns`"""
412 """update remote namespace with dict `ns`"""
373 return self.client.push(ns, targets=self.targets, block=self.block)
413 return self.client.push(ns, targets=self._targets, block=self.block)
374
414
375 push = update
415 push = update
376
416
377 def get(self, key_s):
417 def get(self, key_s):
378 """get object(s) by `key_s` from remote namespace
418 """get object(s) by `key_s` from remote namespace
379 will return one object if it is a key.
419 will return one object if it is a key.
380 It also takes a list of keys, and will return a list of objects."""
420 It also takes a list of keys, and will return a list of objects."""
381 # block = block if block is not None else self.block
421 # block = block if block is not None else self.block
382 return self.client.pull(key_s, block=True, targets=self.targets)
422 return self.client.pull(key_s, block=True, targets=self._targets)
383
423
384 @sync_results
424 @sync_results
385 @save_ids
425 @save_ids
386 def pull(self, key_s, block=True):
426 def pull(self, key_s, block=True):
387 """get object(s) by `key_s` from remote namespace
427 """get object(s) by `key_s` from remote namespace
388 will return one object if it is a key.
428 will return one object if it is a key.
389 It also takes a list of keys, and will return a list of objects."""
429 It also takes a list of keys, and will return a list of objects."""
390 block = block if block is not None else self.block
430 block = block if block is not None else self.block
391 return self.client.pull(key_s, block=block, targets=self.targets)
431 return self.client.pull(key_s, block=block, targets=self._targets)
392
432
393 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
433 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
394 """
434 """
395 Partition a Python sequence and send the partitions to a set of engines.
435 Partition a Python sequence and send the partitions to a set of engines.
396 """
436 """
397 block = block if block is not None else self.block
437 block = block if block is not None else self.block
398 targets = targets if targets is not None else self.targets
438 targets = targets if targets is not None else self._targets
399
439
400 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
440 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
401 targets=targets, block=block)
441 targets=targets, block=block)
402
442
403 @sync_results
443 @sync_results
404 @save_ids
444 @save_ids
405 def gather(self, key, dist='b', targets=None, block=None):
445 def gather(self, key, dist='b', targets=None, block=None):
406 """
446 """
407 Gather a partitioned sequence on a set of engines as a single local seq.
447 Gather a partitioned sequence on a set of engines as a single local seq.
408 """
448 """
409 block = block if block is not None else self.block
449 block = block if block is not None else self.block
410 targets = targets if targets is not None else self.targets
450 targets = targets if targets is not None else self._targets
411
451
412 return self.client.gather(key, dist=dist, targets=targets, block=block)
452 return self.client.gather(key, dist=dist, targets=targets, block=block)
413
453
414 def __getitem__(self, key):
454 def __getitem__(self, key):
415 return self.get(key)
455 return self.get(key)
416
456
417 def __setitem__(self,key, value):
457 def __setitem__(self,key, value):
418 self.update({key:value})
458 self.update({key:value})
419
459
420 def clear(self, block=False):
460 def clear(self, block=False):
421 """Clear the remote namespaces on my engines."""
461 """Clear the remote namespaces on my engines."""
422 block = block if block is not None else self.block
462 block = block if block is not None else self.block
423 return self.client.clear(targets=self.targets, block=block)
463 return self.client.clear(targets=self._targets, block=block)
424
464
425 def kill(self, block=True):
465 def kill(self, block=True):
426 """Kill my engines."""
466 """Kill my engines."""
427 block = block if block is not None else self.block
467 block = block if block is not None else self.block
428 return self.client.kill(targets=self.targets, block=block)
468 return self.client.kill(targets=self._targets, block=block)
429
469
430 #----------------------------------------
470 #----------------------------------------
431 # activate for %px,%autopx magics
471 # activate for %px,%autopx magics
432 #----------------------------------------
472 #----------------------------------------
433 def activate(self):
473 def activate(self):
434 """Make this `View` active for parallel magic commands.
474 """Make this `View` active for parallel magic commands.
435
475
436 IPython has a magic command syntax to work with `MultiEngineClient` objects.
476 IPython has a magic command syntax to work with `MultiEngineClient` objects.
437 In a given IPython session there is a single active one. While
477 In a given IPython session there is a single active one. While
438 there can be many `Views` created and used by the user,
478 there can be many `Views` created and used by the user,
439 there is only one active one. The active `View` is used whenever
479 there is only one active one. The active `View` is used whenever
440 the magic commands %px and %autopx are used.
480 the magic commands %px and %autopx are used.
441
481
442 The activate() method is called on a given `View` to make it
482 The activate() method is called on a given `View` to make it
443 active. Once this has been done, the magic commands can be used.
483 active. Once this has been done, the magic commands can be used.
444 """
484 """
445
485
446 try:
486 try:
447 # This is injected into __builtins__.
487 # This is injected into __builtins__.
448 ip = get_ipython()
488 ip = get_ipython()
449 except NameError:
489 except NameError:
450 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
490 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
451 else:
491 else:
452 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
492 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
453 if pmagic is not None:
493 if pmagic is not None:
454 pmagic.active_multiengine_client = self
494 pmagic.active_multiengine_client = self
455 else:
495 else:
456 print "You must first load the parallelmagic extension " \
496 print "You must first load the parallelmagic extension " \
457 "by doing '%load_ext parallelmagic'"
497 "by doing '%load_ext parallelmagic'"
458
498
459
499
460 class LoadBalancedView(View):
500 class LoadBalancedView(View):
461 """An load-balancing View that only executes via the Task scheduler.
501 """An load-balancing View that only executes via the Task scheduler.
462
502
463 Load-balanced views can be created with the client's `view` method:
503 Load-balanced views can be created with the client's `view` method:
464
504
465 >>> v = client.view(balanced=True)
505 >>> v = client.view(balanced=True)
466
506
467 or targets can be specified, to restrict the potential destinations:
507 or targets can be specified, to restrict the potential destinations:
468
508
469 >>> v = client.view([1,3],balanced=True)
509 >>> v = client.view([1,3],balanced=True)
470
510
471 which would restrict loadbalancing to between engines 1 and 3.
511 which would restrict loadbalancing to between engines 1 and 3.
472
512
473 """
513 """
474
514
475 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
515 _default_names = ['block', 'bound', 'follow', 'after', 'timeout']
476
516
477 def __init__(self, client=None, targets=None):
517 def __init__(self, client=None, targets=None):
478 super(LoadBalancedView, self).__init__(client=client, targets=targets)
518 super(LoadBalancedView, self).__init__(client=client, targets=targets)
479 self._ntargets = 1
519 self._ntargets = 1
480 self._balanced = True
520 self._balanced = True
481
521
482 def _validate_dependency(self, dep):
522 def _validate_dependency(self, dep):
483 """validate a dependency.
523 """validate a dependency.
484
524
485 For use in `set_flags`.
525 For use in `set_flags`.
486 """
526 """
487 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
527 if dep is None or isinstance(dep, (str, AsyncResult, Dependency)):
488 return True
528 return True
489 elif isinstance(dep, (list,set, tuple)):
529 elif isinstance(dep, (list,set, tuple)):
490 for d in dep:
530 for d in dep:
491 if not isinstance(d, str, AsyncResult):
531 if not isinstance(d, str, AsyncResult):
492 return False
532 return False
493 elif isinstance(dep, dict):
533 elif isinstance(dep, dict):
494 if set(dep.keys()) != set(Dependency().as_dict().keys()):
534 if set(dep.keys()) != set(Dependency().as_dict().keys()):
495 return False
535 return False
496 if not isinstance(dep['msg_ids'], list):
536 if not isinstance(dep['msg_ids'], list):
497 return False
537 return False
498 for d in dep['msg_ids']:
538 for d in dep['msg_ids']:
499 if not isinstance(d, str):
539 if not isinstance(d, str):
500 return False
540 return False
501 else:
541 else:
502 return False
542 return False
503
543
504 def set_flags(self, **kwargs):
544 def set_flags(self, **kwargs):
505 """set my attribute flags by keyword.
545 """set my attribute flags by keyword.
506
546
507 A View is a wrapper for the Client's apply method, but
547 A View is a wrapper for the Client's apply method, but with attributes
508 with attributes that specify keyword arguments, those attributes
548 that specify keyword arguments, those attributes can be set by keyword
509 can be set by keyword argument with this method.
549 argument with this method.
510
550
511 Parameters
551 Parameters
512 ----------
552 ----------
513
553
514 block : bool
554 block : bool
515 whether to wait for results
555 whether to wait for results
516 bound : bool
556 bound : bool
517 whether to use the engine's namespace
557 whether to use the engine's namespace
518 follow : Dependency, list, msg_id, AsyncResult
558 follow : Dependency, list, msg_id, AsyncResult
519 the location dependencies of tasks
559 the location dependencies of tasks
520 after : Dependency, list, msg_id, AsyncResult
560 after : Dependency, list, msg_id, AsyncResult
521 the time dependencies of tasks
561 the time dependencies of tasks
522 timeout : int,None
562 timeout : int,None
523 the timeout to be used for tasks
563 the timeout to be used for tasks
524 """
564 """
525
565
526 super(LoadBalancedView, self).set_flags(**kwargs)
566 super(LoadBalancedView, self).set_flags(**kwargs)
527 for name in ('follow', 'after'):
567 for name in ('follow', 'after'):
528 if name in kwargs:
568 if name in kwargs:
529 value = kwargs[name]
569 value = kwargs[name]
530 if self._validate_dependency(value):
570 if self._validate_dependency(value):
531 setattr(self, name, value)
571 setattr(self, name, value)
532 else:
572 else:
533 raise ValueError("Invalid dependency: %r"%value)
573 raise ValueError("Invalid dependency: %r"%value)
534 if 'timeout' in kwargs:
574 if 'timeout' in kwargs:
535 t = kwargs['timeout']
575 t = kwargs['timeout']
536 if not isinstance(t, (int, long, float, None)):
576 if not isinstance(t, (int, long, float, None)):
537 raise TypeError("Invalid type for timeout: %r"%type(t))
577 raise TypeError("Invalid type for timeout: %r"%type(t))
538 if t is not None:
578 if t is not None:
539 if t < 0:
579 if t < 0:
540 raise ValueError("Invalid timeout: %s"%t)
580 raise ValueError("Invalid timeout: %s"%t)
541 self.timeout = t
581 self.timeout = t
542
582
543 @spin_after
583 @spin_after
544 @save_ids
584 @save_ids
545 def map(self, f, *sequences, **kwargs):
585 def map(self, f, *sequences, **kwargs):
546 """Parallel version of builtin `map`, load-balanced by this View.
586 """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult
587
588 Parallel version of builtin `map`, load-balanced by this View.
547
589
548 Each element will be a separate task, and will be load-balanced. This
590 `block`, `bound`, and `chunk_size` can be specified by keyword only.
549 lets individual elements be available for iteration as soon as they arrive.
591
592 Each `chunk_size` elements will be a separate task, and will be
593 load-balanced. This lets individual elements be available for iteration
594 as soon as they arrive.
550
595
551 Parameters
596 Parameters
552 ----------
597 ----------
553
598
554 f : callable
599 f : callable
555 function to be mapped
600 function to be mapped
556 *sequences: one or more sequences of matching length
601 *sequences: one or more sequences of matching length
557 the sequences to be distributed and passed to `f`
602 the sequences to be distributed and passed to `f`
558 block : bool
603 block : bool
559 whether to wait for the result or not [default self.block]
604 whether to wait for the result or not [default self.block]
560 bound : bool
605 bound : bool
561 whether to use the engine's namespace
606 whether to use the engine's namespace [default self.bound]
607 chunk_size : int
608 how many elements should be in each task [default 1]
562
609
563 Returns
610 Returns
564 -------
611 -------
565
612
566 if block=False:
613 if block=False:
567 AsyncMapResult
614 AsyncMapResult
568 An object like AsyncResult, but which reassembles the sequence of results
615 An object like AsyncResult, but which reassembles the sequence of results
569 into a single list. AsyncMapResults can be iterated through before all
616 into a single list. AsyncMapResults can be iterated through before all
570 results are complete.
617 results are complete.
571 else:
618 else:
572 the result of map(f,*sequences)
619 the result of map(f,*sequences)
573
620
574 """
621 """
575
622
576 # default
623 # default
577 block = kwargs.get('block', self.block)
624 block = kwargs.get('block', self.block)
578 bound = kwargs.get('bound', self.bound)
625 bound = kwargs.get('bound', self.bound)
579 chunk_size = kwargs.get('chunk_size', 1)
626 chunk_size = kwargs.get('chunk_size', 1)
580
627
581 keyset = set(kwargs.keys())
628 keyset = set(kwargs.keys())
582 extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
629 extra_keys = keyset.difference_update(set(['block', 'bound', 'chunk_size']))
583 if extra_keys:
630 if extra_keys:
584 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
631 raise TypeError("Invalid kwargs: %s"%list(extra_keys))
585
632
586 assert len(sequences) > 0, "must have some sequences to map onto!"
633 assert len(sequences) > 0, "must have some sequences to map onto!"
587
634
588 pf = ParallelFunction(self.client, f, block=block, bound=bound,
635 pf = ParallelFunction(self.client, f, block=block, bound=bound,
589 targets=self.targets, balanced=True,
636 targets=self._targets, balanced=True,
590 chunk_size=chunk_size)
637 chunk_size=chunk_size)
591 return pf.map(*sequences)
638 return pf.map(*sequences)
592
639
@@ -1,796 +1,814 b''
1 .. _parallelmultiengine:
1 .. _parallelmultiengine:
2
2
3 ==========================
3 ==========================
4 IPython's Direct interface
4 IPython's Direct interface
5 ==========================
5 ==========================
6
6
7 The direct, or multiengine, interface represents one possible way of working with a set of
7 The direct, or multiengine, interface represents one possible way of working with a set of
8 IPython engines. The basic idea behind the multiengine interface is that the
8 IPython engines. The basic idea behind the multiengine interface is that the
9 capabilities of each engine are directly and explicitly exposed to the user.
9 capabilities of each engine are directly and explicitly exposed to the user.
10 Thus, in the multiengine interface, each engine is given an id that is used to
10 Thus, in the multiengine interface, each engine is given an id that is used to
11 identify the engine and give it work to do. This interface is very intuitive
11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is thus the best place for
12 and is designed with interactive usage in mind, and is thus the best place for
13 new users of IPython to begin.
13 new users of IPython to begin.
14
14
15 Starting the IPython controller and engines
15 Starting the IPython controller and engines
16 ===========================================
16 ===========================================
17
17
18 To follow along with this tutorial, you will need to start the IPython
18 To follow along with this tutorial, you will need to start the IPython
19 controller and four IPython engines. The simplest way of doing this is to use
19 controller and four IPython engines. The simplest way of doing this is to use
20 the :command:`ipclusterz` command::
20 the :command:`ipclusterz` command::
21
21
22 $ ipclusterz start -n 4
22 $ ipclusterz start -n 4
23
23
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
25 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
26
26
27 Creating a ``Client`` instance
27 Creating a ``Client`` instance
28 ==============================
28 ==============================
29
29
30 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
30 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
32
32
33 .. sourcecode:: ipython
33 .. sourcecode:: ipython
34
34
35 In [1]: from IPython.zmq.parallel import client
35 In [1]: from IPython.zmq.parallel import client
36
36
37 In [2]: rc = client.Client()
37 In [2]: rc = client.Client()
38
38
39 This form assumes that the default connection information (stored in
39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHON_DIR/clusterz_default/security`) is
41 accurate. If the controller was started on a remote machine, you must copy that connection
41 accurate. If the controller was started on a remote machine, you must copy that connection
42 file to the client machine, or enter its contents as arguments to the Client constructor:
42 file to the client machine, or enter its contents as arguments to the Client constructor:
43
43
44 .. sourcecode:: ipython
44 .. sourcecode:: ipython
45
45
46 # If you have copied the json connector file from the controller:
46 # If you have copied the json connector file from the controller:
47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
47 In [2]: rc = client.Client('/path/to/ipcontroller-client.json')
48 # or for a remote controller at 10.0.1.5, visible from my.server.com:
48 # or for a remote controller at 10.0.1.5, visible from my.server.com:
49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
49 In [3]: rc = client.Client('tcp://10.0.1.5:12345', sshserver='my.server.com')
50
50
51
51
52 To make sure there are engines connected to the controller, users can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 of engine ids:
53 of engine ids:
54
54
55 .. sourcecode:: ipython
55 .. sourcecode:: ipython
56
56
57 In [3]: rc.ids
57 In [3]: rc.ids
58 Out[3]: [0, 1, 2, 3]
58 Out[3]: [0, 1, 2, 3]
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 constructed via list-access to the client:
64
65 .. sourcecode::
66
67 In [4]: dview = rc[:] # use all engines
68
69 .. seealso::
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_view>`.
72
73
62 Quick and easy parallelism
74 Quick and easy parallelism
63 ==========================
75 ==========================
64
76
65 In many cases, you simply want to apply a Python function to a sequence of
77 In many cases, you simply want to apply a Python function to a sequence of
66 objects, but *in parallel*. The client interface provides a simple way
78 objects, but *in parallel*. The client interface provides a simple way
67 of accomplishing this: using the builtin :func:`map` and the ``@remote``
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
68 function decorator, or the client's :meth:`map` method.
69
80
70 Parallel map
81 Parallel map
71 ------------
82 ------------
72
83
73 Python's builtin :func:`map` functions allows a function to be applied to a
84 Python's builtin :func:`map` functions allows a function to be applied to a
74 sequence element-by-element. This type of code is typically trivial to
85 sequence element-by-element. This type of code is typically trivial to
75 parallelize. In fact, since IPython's interface is all about functions anyway,
86 parallelize. In fact, since IPython's interface is all about functions anyway,
76 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
77 DirectView's :meth:`map` method:
88 DirectView's :meth:`map` method:
78
89
79 .. sourcecode:: ipython
90 .. sourcecode:: ipython
80
91
81 In [62]: serial_result = map(lambda x:x**10, range(32))
92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
94 In [63]: dview.block = True
95
96 In [66]: parallel_result = dview.map(lambda x: x**10, range(32))
82
97
83 In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32))
98 In [67]: serial_result==parallel_result
84
85 In [67]: serial_result==parallel_result.get()
86 Out[67]: True
99 Out[67]: True
87
100
88
101
89 .. note::
102 .. note::
90
103
91 The :class:`DirectView`'s version of :meth:`map` does
104 The :class:`DirectView`'s version of :meth:`map` does
92 not do any load balancing. For a load balanced version, use a
105 not do dynamic load balancing. For a load balanced version, use a
93 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
106 :class:`LoadBalancedView`, or a :class:`ParallelFunction` with
94 `balanced=True`.
107 `balanced=True`.
95
108
96 .. seealso::
109 .. seealso::
97
110
98 :meth:`map` is implemented via :class:`.ParallelFunction`.
111 :meth:`map` is implemented via :class:`ParallelFunction`.
99
112
100 Remote function decorator
113 Remote function decorators
101 -------------------------
114 --------------------------
102
115
103 Remote functions are just like normal functions, but when they are called,
116 Remote functions are just like normal functions, but when they are called,
104 they execute on one or more engines, rather than locally. IPython provides
117 they execute on one or more engines, rather than locally. IPython provides
105 some decorators:
118 two decorators:
106
119
107 .. sourcecode:: ipython
120 .. sourcecode:: ipython
108
121
109 In [10]: @rc.remote(block=True, targets=0)
122 In [10]: @rc.remote(block=True, targets='all')
110 ....: def f(x):
123 ...: def getpid():
111 ....: return 10.0*x**4
124 ...: import os
112 ....:
125 ...: return os.getpid()
126 ...:
127
128 In [11]: getpid()
129 Out[11]: [12345, 12346, 12347, 12348]
113
130
114 In [11]: map(f, range(32)) # this is done on engine 0
131 A ``@parallel`` decorator creates parallel functions, that break up an element-wise
115 Out[11]: [0.0,10.0,160.0,...]
132 operations and distribute them, reconstructing the result.
133
134 .. sourcecode:: ipython
135
136 In [12]: import numpy as np
137
138 In [13]: A = np.random.random((64,48))
139
140 In [14]: @rc.parallel(block=True, targets='all')
141 ...: def pmul(A,B):
142 ...: return A*B
143
144 In [15]: C_local = A*A
145
146 In [16]: C_remote_partial = pmul(A,A)
147
148 In [17]: (C_local == C_remote).all()
149 Out[17]: True
116
150
117 .. seealso::
151 .. seealso::
118
152
119 See the docstring for the :func:`parallel` and :func:`remote` decorators for
153 See the docstrings for the :func:`parallel` and :func:`remote` decorators for
120 options.
154 options.
121
155
122 Calling Python functions
156 Calling Python functions
123 ========================
157 ========================
124
158
125 The most basic type of operation that can be performed on the engines is to
159 The most basic type of operation that can be performed on the engines is to
126 execute Python code or call Python functions. Executing Python code can be
160 execute Python code or call Python functions. Executing Python code can be
127 done in blocking or non-blocking mode (non-blocking is default) using the
161 done in blocking or non-blocking mode (non-blocking is default) using the
128 :meth:`execute` method, and calling functions can be done via the
162 :meth:`execute` method, and calling functions can be done via the
129 :meth:`.View.apply` method.
163 :meth:`.View.apply` method.
130
164
131 apply
165 apply
132 -----
166 -----
133
167
134 The main method for doing remote execution (in fact, all methods that
168 The main method for doing remote execution (in fact, all methods that
135 communicate with the engines are built on top of it), is :meth:`Client.apply`.
169 communicate with the engines are built on top of it), is :meth:`Client.apply`.
136 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
170 Ideally, :meth:`apply` would have the signature ``apply(f,*args,**kwargs)``,
137 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
171 which would call ``f(*args,**kwargs)`` remotely. However, since :class:`Clients`
138 require some more options, they cannot easily provide this interface.
172 require some more options, they cannot easily provide this interface.
139 Instead, they provide the signature::
173 Instead, they provide the signature::
140
174
141 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
175 c.apply(f, args=None, kwargs=None, bound=True, block=None, targets=None,
142 after=None, follow=None, timeout=None)
176 after=None, follow=None, timeout=None)
143
177
144 In order to provide the nicer interface, we have :class:`View` classes, which wrap
178 In order to provide the nicer interface, we have :class:`View` classes, which wrap
145 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
179 :meth:`Client.apply` by using attributes and extra :meth:`apply_x` methods to determine
146 the extra arguments. For instance, performing index-access on a client creates a
180 the extra arguments. For instance, performing index-access on a client creates a
147 :class:`.DirectView`.
181 :class:`.DirectView`.
148
182
149 .. sourcecode:: ipython
183 .. sourcecode:: ipython
150
184
151 In [4]: view = rc[1:3]
185 In [4]: view = rc[1:3]
152 Out[4]: <DirectView [1, 2]>
186 Out[4]: <DirectView [1, 2]>
153
187
154 In [5]: view.apply<tab>
188 In [5]: view.apply<tab>
155 view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound
189 view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound
156
190
157 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
191 A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound`
158 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
192 and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x`
159 methods allow specifying `bound` and `block` via the different methods.
193 methods allow specifying `bound` and `block` via the different methods.
160
194
161 ================== ========== ==========
195 ================== ========== ==========
162 method block bound
196 method block bound
163 ================== ========== ==========
197 ================== ========== ==========
164 apply self.block self.bound
198 apply self.block self.bound
165 apply_sync True False
199 apply_sync True False
166 apply_async False False
200 apply_async False False
167 apply_sync_bound True True
201 apply_sync_bound True True
168 apply_async_bound False True
202 apply_async_bound False True
169 ================== ========== ==========
203 ================== ========== ==========
170
204
171 For explanation of these values, read on.
205 For explanation of these values, read on.
172
206
173 Blocking execution
207 Blocking execution
174 ------------------
208 ------------------
175
209
176 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
210 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
177 these examples) submits the command to the controller, which places the
211 these examples) submits the command to the controller, which places the
178 command in the engines' queues for execution. The :meth:`apply` call then
212 command in the engines' queues for execution. The :meth:`apply` call then
179 blocks until the engines are done executing the command:
213 blocks until the engines are done executing the command:
180
214
181 .. sourcecode:: ipython
215 .. sourcecode:: ipython
182
216
183 In [2]: rc.block=True
217 In [2]: dview = rc[:] # A DirectView of all engines
184 In [3]: dview = rc[:] # A DirectView of all engines
218 In [3]: dview.block=True
185 In [4]: dview['a'] = 5
219 In [4]: dview['a'] = 5
186
220
187 In [5]: dview['b'] = 10
221 In [5]: dview['b'] = 10
188
222
189 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
223 In [6]: dview.apply_bound(lambda x: a+b+x, 27)
190 Out[6]: [42, 42, 42, 42]
224 Out[6]: [42, 42, 42, 42]
191
225
192 Python commands can be executed on specific engines by calling execute using
226 Python commands can be executed on specific engines by calling execute using the ``targets``
193 the ``targets`` keyword argument, or creating a :class:`DirectView` instance
227 keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by
194 by index-access to the client:
228 index-access to the client:
195
229
196 .. sourcecode:: ipython
230 .. sourcecode:: ipython
197
231
198 In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2])
232 In [6]: rc.execute('c=a+b', targets=[0,2])
199
233
200 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
234 In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3])
201
235
202 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
236 In [8]: rc[:]['c'] # shorthand for rc.pull('c',targets='all')
203 Out[8]: [15, -5, 15, -5]
237 Out[8]: [15, -5, 15, -5]
204
238
205 .. note::
239 .. note::
206
240
207 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
241 Note that every call to ``rc.<meth>(...,targets=x)`` can be made via
208 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
242 ``rc[<x>].<meth>(...)``, which constructs a View object. The only place
209 where this differs in in :meth:`apply`. The :class:`Client` takes many
243 where this differs in in :meth:`apply`. The :class:`Client` takes many
210 arguments to apply, so it requires `args` and `kwargs` to be passed as
244 arguments to apply, so it requires `args` and `kwargs` to be passed as
211 individual arguments. Extended options such as `bound`,`targets`, and
245 individual arguments. Extended options such as `bound`,`targets`, and
212 `block` are controlled by the attributes of the :class:`View` objects, so
246 `block` are controlled by the attributes of the :class:`View` objects, so
213 they can provide the much more convenient
247 they can provide the much more convenient
214 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
248 :meth:`View.apply(f,*args,**kwargs)`, which simply calls
215 ``f(*args,**kwargs)`` remotely.
249 ``f(*args,**kwargs)`` remotely.
216
250
217 This example also shows one of the most important things about the IPython
251 Bound and unbound execution
252 ---------------------------
253
254 The previous example also shows one of the most important things about the IPython
218 engines: they have a persistent user namespaces. The :meth:`apply` method can
255 engines: they have a persistent user namespaces. The :meth:`apply` method can
219 be run in either a bound or unbound way. The default for a View is to be
256 be run in either a bound or unbound manner:
220 unbound, unless called by the :meth:`apply_bound` method:
221
257
222 .. sourcecode:: ipython
258 .. sourcecode:: ipython
223
259
224 In [9]: dview['b'] = 5 # assign b to 5 everywhere
260 In [9]: dview['b'] = 5 # assign b to 5 everywhere
225
261
226 In [10]: v0 = rc[0]
262 In [10]: v0 = rc[0]
227
263
228 In [12]: v0.apply_bound(lambda : b)
264 In [12]: v0.apply_sync_bound(lambda : b)
229 Out[12]: 5
265 Out[12]: 5
230
266
231 In [13]: v0.apply(lambda : b)
267 In [13]: v0.apply_sync(lambda : b)
232 ---------------------------------------------------------------------------
268 ---------------------------------------------------------------------------
233 RemoteError Traceback (most recent call last)
269 RemoteError Traceback (most recent call last)
234 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
270 /home/you/<ipython-input-34-21a468eb10f0> in <module>()
235 ----> 1 v0.apply(lambda : b)
271 ----> 1 v0.apply(lambda : b)
236 ...
272 ...
237 RemoteError: NameError(global name 'b' is not defined)
273 RemoteError: NameError(global name 'b' is not defined)
238 Traceback (most recent call last):
274 Traceback (most recent call last):
239 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
275 File "/Users/minrk/dev/ip/mine/IPython/zmq/parallel/streamkernel.py", line 294, in apply_request
240 exec code in working, working
276 exec code in working, working
241 File "<string>", line 1, in <module>
277 File "<string>", line 1, in <module>
242 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
278 File "<ipython-input-34-21a468eb10f0>", line 1, in <lambda>
243 NameError: global name 'b' is not defined
279 NameError: global name 'b' is not defined
244
280
245
281
246 Specifically, `bound=True` specifies that the engine's namespace is to be used
282 Specifically, `bound=True` specifies that the engine's namespace is to be used
247 for execution, and `bound=False` specifies that the engine's namespace is not
283 as the `globals` when the function is called, and `bound=False` specifies that
248 to be used (hence, 'b' is undefined during unbound execution, since the
284 the engine's namespace is not to be used (hence, 'b' is undefined during unbound
249 function is called in an empty namespace). Unbound execution is often useful
285 execution, since the function is called in an empty namespace). Unbound execution is
250 for large numbers of atomic tasks, which prevents bloating the engine's
286 often useful for large numbers of atomic tasks, which prevents bloating the engine's
251 memory, while bound execution lets you build on your previous work.
287 memory, while bound execution lets you build on your previous work.
252
288
253
289
254 Non-blocking execution
290 Non-blocking execution
255 ----------------------
291 ----------------------
256
292
257 In non-blocking mode, :meth:`apply` submits the command to be executed and
293 In non-blocking mode, :meth:`apply` submits the command to be executed and
258 then returns a :class:`AsyncResult` object immediately. The
294 then returns a :class:`AsyncResult` object immediately. The
259 :class:`AsyncResult` object gives you a way of getting a result at a later
295 :class:`AsyncResult` object gives you a way of getting a result at a later
260 time through its :meth:`get` method.
296 time through its :meth:`get` method.
261
297
262 .. Note::
298 .. Note::
263
299
264 The :class:`AsyncResult` object provides a superset of the interface in
300 The :class:`AsyncResult` object provides a superset of the interface in
265 :py:class:`multiprocessing.pool.AsyncResult`. See the
301 :py:class:`multiprocessing.pool.AsyncResult`. See the
266 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
302 `official Python documentation <http://docs.python.org/library/multiprocessing#multiprocessing.pool.AsyncResult>`_
267 for more.
303 for more.
268
304
269
305
270 This allows you to quickly submit long running commands without blocking your
306 This allows you to quickly submit long running commands without blocking your
271 local Python/IPython session:
307 local Python/IPython session:
272
308
273 .. sourcecode:: ipython
309 .. sourcecode:: ipython
274
310
275 # define our function
311 # define our function
276 In [6]: def wait(t):
312 In [6]: def wait(t):
277 ...: import time
313 ...: import time
278 ...: tic = time.time()
314 ...: tic = time.time()
279 ...: time.sleep(t)
315 ...: time.sleep(t)
280 ...: return time.time()-tic
316 ...: return time.time()-tic
281
317
282 # In non-blocking mode
318 # In non-blocking mode
283 In [7]: pr = dview.apply_async(wait, 2)
319 In [7]: ar = dview.apply_async(wait, 2)
284
320
285 # Now block for the result
321 # Now block for the result
286 In [8]: pr.get()
322 In [8]: ar.get()
287 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
323 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
288
324
289 # Again in non-blocking mode
325 # Again in non-blocking mode
290 In [9]: pr = dview.apply_async(wait, 10)
326 In [9]: ar = dview.apply_async(wait, 10)
291
327
292 # Poll to see if the result is ready
328 # Poll to see if the result is ready
293 In [10]: pr.ready()
329 In [10]: ar.ready()
294 Out[10]: False
330 Out[10]: False
295
331
296 # ask for the result, but wait a maximum of 1 second:
332 # ask for the result, but wait a maximum of 1 second:
297 In [45]: pr.get(1)
333 In [45]: ar.get(1)
298 ---------------------------------------------------------------------------
334 ---------------------------------------------------------------------------
299 TimeoutError Traceback (most recent call last)
335 TimeoutError Traceback (most recent call last)
300 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
336 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
301 ----> 1 pr.get(1)
337 ----> 1 ar.get(1)
302
338
303 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
339 /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout)
304 62 raise self._exception
340 62 raise self._exception
305 63 else:
341 63 else:
306 ---> 64 raise error.TimeoutError("Result not ready.")
342 ---> 64 raise error.TimeoutError("Result not ready.")
307 65
343 65
308 66 def ready(self):
344 66 def ready(self):
309
345
310 TimeoutError: Result not ready.
346 TimeoutError: Result not ready.
311
347
312 .. Note::
348 .. Note::
313
349
314 Note the import inside the function. This is a common model, to ensure
350 Note the import inside the function. This is a common model, to ensure
315 that the appropriate modules are imported where the task is run.
351 that the appropriate modules are imported where the task is run.
316
352
317 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
353 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
318 are done. For this, there is a the method :meth:`barrier`. This method takes a
354 are done. For this, there is a the method :meth:`barrier`. This method takes a
319 tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the
355 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
320 associated results are ready:
356 and blocks until all of the associated results are ready:
321
357
322 .. sourcecode:: ipython
358 .. sourcecode:: ipython
323
359
324 In [72]: rc.block=False
360 In [72]: rc.block=False
325
361
326 # A trivial list of AsyncResults objects
362 # A trivial list of AsyncResults objects
327 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
363 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
328
364
329 # Wait until all of them are done
365 # Wait until all of them are done
330 In [74]: rc.barrier(pr_list)
366 In [74]: rc.barrier(pr_list)
331
367
332 # Then, their results are ready using get() or the `.r` attribute
368 # Then, their results are ready using get() or the `.r` attribute
333 In [75]: pr_list[0].get()
369 In [75]: pr_list[0].get()
334 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
370 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
335
371
336
372
337
373
338 The ``block`` keyword argument and attributes
374 The ``block`` keyword argument and attributes
339 ---------------------------------------------
375 ---------------------------------------------
340
376
341 Most methods(like :meth:`apply`) accept
377 Most client methods(like :meth:`apply`) accept
342 ``block`` as a keyword argument. As we have seen above, these
378 ``block`` as a keyword argument. As we have seen above, these
343 keyword arguments control the blocking mode . The :class:`Client` class also has
379 keyword arguments control the blocking mode. The :class:`Client` class also has
344 a :attr:`block` attribute that controls the default behavior when the keyword
380 a :attr:`block` attribute that controls the default behavior when the keyword
345 argument is not provided. Thus the following logic is used for :attr:`block`:
381 argument is not provided. Thus the following logic is used for :attr:`block`:
346
382
347 * If no keyword argument is provided, the instance attributes are used.
383 * If no keyword argument is provided, the instance attributes are used.
348 * Keyword argument, if provided override the instance attributes for
384 * Keyword argument, if provided override the instance attributes for
349 the duration of a single call.
385 the duration of a single call.
386
387 DirectView objects also have a ``bound`` attribute, which is used in the same way.
350
388
351 The following examples demonstrate how to use the instance attributes:
389 The following examples demonstrate how to use the instance attributes:
352
390
353 .. sourcecode:: ipython
391 .. sourcecode:: ipython
354
392
355 In [17]: rc.block = False
393 In [17]: rc.block = False
356
394
357 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
395 In [18]: ar = rc.apply(lambda : 10, targets=[0,2])
358
396
359 In [19]: ar.get()
397 In [19]: ar.get()
360 Out[19]: [10,10]
398 Out[19]: [10,10]
361
399
362 In [21]: rc.block = True
400 In [21]: rc.block = True
363
401
364 # Note targets='all' means all engines
402 # Note targets='all' means all engines
365 In [22]: rc.apply(lambda : 42, targets='all')
403 In [22]: rc.apply(lambda : 42, targets='all')
366 Out[22]: [42, 42, 42, 42]
404 Out[22]: [42, 42, 42, 42]
367
405
368 The :attr:`block` and :attr:`targets` instance attributes of the
406 The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the
369 :class:`.DirectView` also determine the behavior of the parallel magic commands.
407 :class:`.DirectView` also determine the behavior of the parallel magic commands.
370
408
371
409
372 Parallel magic commands
410 Parallel magic commands
373 -----------------------
411 -----------------------
374
412
375 .. warning::
413 .. warning::
376
414
377 The magics have not been changed to work with the zeromq system. ``%px``
415 The magics have not been changed to work with the zeromq system. ``%px``
378 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
416 and ``%autopx`` do work, but ``%result`` does not. %px and %autopx *do
379 not* print stdin/out.
417 not* print stdin/out.
380
418
381 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
419 We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``)
382 that make it more pleasant to execute Python commands on the engines
420 that make it more pleasant to execute Python commands on the engines
383 interactively. These are simply shortcuts to :meth:`execute` and
421 interactively. These are simply shortcuts to :meth:`execute` and
384 :meth:`get_result`. The ``%px`` magic executes a single Python command on the
422 :meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single
385 engines specified by the :attr:`targets` attribute of the
423 Python command on the engines specified by the :attr:`targets` attribute of the
386 :class:`MultiEngineClient` instance (by default this is ``'all'``):
424 :class:`DirectView` instance:
387
425
388 .. sourcecode:: ipython
426 .. sourcecode:: ipython
389
427
390 # Create a DirectView for all targets
428 # Create a DirectView for all targets
391 In [22]: dv = rc[:]
429 In [22]: dv = rc[:]
392
430
393 # Make this DirectView active for parallel magic commands
431 # Make this DirectView active for parallel magic commands
394 In [23]: dv.activate()
432 In [23]: dv.activate()
395
433
396 In [24]: dv.block=True
434 In [24]: dv.block=True
397
435
398 In [25]: import numpy
436 In [25]: import numpy
399
437
400 In [26]: %px import numpy
438 In [26]: %px import numpy
401 Parallel execution on engines: [0, 1, 2, 3]
439 Parallel execution on engines: [0, 1, 2, 3]
402 Out[26]:[None,None,None,None]
403
440
404 In [27]: %px a = numpy.random.rand(2,2)
441 In [27]: %px a = numpy.random.rand(2,2)
405 Parallel execution on engines: [0, 1, 2, 3]
442 Parallel execution on engines: [0, 1, 2, 3]
406
443
407 In [28]: %px ev = numpy.linalg.eigvals(a)
444 In [28]: %px ev = numpy.linalg.eigvals(a)
408 Parallel execution on engines: [0, 1, 2, 3]
445 Parallel execution on engines: [0, 1, 2, 3]
409
446
410 In [28]: dv['ev']
447 In [28]: dv['ev']
411 Out[44]: [ array([ 1.09522024, -0.09645227]),
448 Out[28]: [ array([ 1.09522024, -0.09645227]),
412 array([ 1.21435496, -0.35546712]),
449 array([ 1.21435496, -0.35546712]),
413 array([ 0.72180653, 0.07133042]),
450 array([ 0.72180653, 0.07133042]),
414 array([ 1.46384341e+00, 1.04353244e-04])
451 array([ 1.46384341e+00, 1.04353244e-04])
415 ]
452 ]
416
453
417 .. Note::
454 The ``%result`` magic gets the most recent result, or takes an argument
418
455 specifying the index of the result to be requested. It is simply a shortcut to the
419 ``%result`` doesn't work
420
421 The ``%result`` magic gets and prints the stdin/stdout/stderr of the last
422 command executed on each engine. It is simply a shortcut to the
423 :meth:`get_result` method:
456 :meth:`get_result` method:
424
457
425 .. sourcecode:: ipython
458 .. sourcecode:: ipython
426
459
427 In [29]: %result
460 In [29]: dv.apply_async_bound(lambda : ev)
428 Out[29]:
461
429 <Results List>
462 In [30]: %result
430 [0] In [10]: print numpy.linalg.eigvals(a)
463 Out[30]: [ [ 1.28167017 0.14197338],
431 [0] Out[10]: [ 1.28167017 0.14197338]
464 [-0.14093616 1.27877273],
432
465 [-0.37023573 1.06779409],
433 [1] In [9]: print numpy.linalg.eigvals(a)
466 [ 0.83664764 -0.25602658] ]
434 [1] Out[9]: [-0.14093616 1.27877273]
435
436 [2] In [10]: print numpy.linalg.eigvals(a)
437 [2] Out[10]: [-0.37023573 1.06779409]
438
439 [3] In [9]: print numpy.linalg.eigvals(a)
440 [3] Out[9]: [ 0.83664764 -0.25602658]
441
467
442 The ``%autopx`` magic switches to a mode where everything you type is executed
468 The ``%autopx`` magic switches to a mode where everything you type is executed
443 on the engines given by the :attr:`targets` attribute:
469 on the engines given by the :attr:`targets` attribute:
444
470
445 .. sourcecode:: ipython
471 .. sourcecode:: ipython
446
472
447 In [30]: dv.block=False
473 In [30]: dv.block=False
448
474
449 In [31]: %autopx
475 In [31]: %autopx
450 Auto Parallel Enabled
476 Auto Parallel Enabled
451 Type %autopx to disable
477 Type %autopx to disable
452
478
453 In [32]: max_evals = []
479 In [32]: max_evals = []
454 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70>
480 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17b8a70>
455
481
456 In [33]: for i in range(100):
482 In [33]: for i in range(100):
457 ....: a = numpy.random.rand(10,10)
483 ....: a = numpy.random.rand(10,10)
458 ....: a = a+a.transpose()
484 ....: a = a+a.transpose()
459 ....: evals = numpy.linalg.eigvals(a)
485 ....: evals = numpy.linalg.eigvals(a)
460 ....: max_evals.append(evals[0].real)
486 ....: max_evals.append(evals[0].real)
461 ....:
487 ....:
462 ....:
488 ....:
463 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0>
489 <IPython.zmq.parallel.asyncresult.AsyncResult object at 0x17af8f0>
464
490
465 In [34]: %autopx
491 In [34]: %autopx
466 Auto Parallel Disabled
492 Auto Parallel Disabled
467
493
468 In [35]: dv.block=True
494 In [35]: dv.block=True
469
495
470 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
496 In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals))
471 Parallel execution on engines: [0, 1, 2, 3]
497 Parallel execution on engines: [0, 1, 2, 3]
472
498
473 In [37]: dv['ans']
499 In [37]: dv['ans']
474 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
500 Out[37]: [ 'Average max eigenvalue is: 10.1387247332',
475 'Average max eigenvalue is: 10.2076902286',
501 'Average max eigenvalue is: 10.2076902286',
476 'Average max eigenvalue is: 10.1891484655',
502 'Average max eigenvalue is: 10.1891484655',
477 'Average max eigenvalue is: 10.1158837784',]
503 'Average max eigenvalue is: 10.1158837784',]
478
504
479
505
480 .. Note::
481
482 Multiline ``%autpx`` gets fouled up by NameErrors, because IPython
483 currently introspects too much.
484
485
486 Moving Python objects around
506 Moving Python objects around
487 ============================
507 ============================
488
508
489 In addition to calling functions and executing code on engines, you can
509 In addition to calling functions and executing code on engines, you can
490 transfer Python objects to and from your IPython session and the engines. In
510 transfer Python objects to and from your IPython session and the engines. In
491 IPython, these operations are called :meth:`push` (sending an object to the
511 IPython, these operations are called :meth:`push` (sending an object to the
492 engines) and :meth:`pull` (getting an object from the engines).
512 engines) and :meth:`pull` (getting an object from the engines).
493
513
494 Basic push and pull
514 Basic push and pull
495 -------------------
515 -------------------
496
516
497 Here are some examples of how you use :meth:`push` and :meth:`pull`:
517 Here are some examples of how you use :meth:`push` and :meth:`pull`:
498
518
499 .. sourcecode:: ipython
519 .. sourcecode:: ipython
500
520
501 In [38]: rc.push(dict(a=1.03234,b=3453))
521 In [38]: rc.push(dict(a=1.03234,b=3453))
502 Out[38]: [None,None,None,None]
522 Out[38]: [None,None,None,None]
503
523
504 In [39]: rc.pull('a')
524 In [39]: rc.pull('a')
505 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
525 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
506
526
507 In [40]: rc.pull('b',targets=0)
527 In [40]: rc.pull('b',targets=0)
508 Out[40]: 3453
528 Out[40]: 3453
509
529
510 In [41]: rc.pull(('a','b'))
530 In [41]: rc.pull(('a','b'))
511 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
531 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
512
532
513 # zmq client does not have zip_pull
533 # zmq client does not have zip_pull
514 In [42]: rc.zip_pull(('a','b'))
534 In [42]: rc.zip_pull(('a','b'))
515 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
535 Out[42]: [(1.03234, 1.03234, 1.03234, 1.03234), (3453, 3453, 3453, 3453)]
516
536
517 In [43]: rc.push(dict(c='speed'))
537 In [43]: rc.push(dict(c='speed'))
518 Out[43]: [None,None,None,None]
538 Out[43]: [None,None,None,None]
519
539
520 In non-blocking mode :meth:`push` and :meth:`pull` also return
540 In non-blocking mode :meth:`push` and :meth:`pull` also return
521 :class:`AsyncResult` objects:
541 :class:`AsyncResult` objects:
522
542
523 .. sourcecode:: ipython
543 .. sourcecode:: ipython
524
544
525 In [47]: rc.block=False
545 In [47]: rc.block=False
526
546
527 In [48]: pr = rc.pull('a')
547 In [48]: ar = rc.pull('a')
528
548
529 In [49]: pr.get()
549 In [49]: ar.get()
530 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
550 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
531
551
532
552
533
534
535 Dictionary interface
553 Dictionary interface
536 --------------------
554 --------------------
537
555
538 Since a namespace is just a :class:`dict`, :class:`DirectView` objects provide
556 Since a namespace is just a :class:`dict`, :class:`DirectView` objects provide
539 dictionary-style access by key and methods such as :meth:`get` and
557 dictionary-style access by key and methods such as :meth:`get` and
540 :meth:`update` for convenience. This make the remote namespaces of the engines
558 :meth:`update` for convenience. This make the remote namespaces of the engines
541 appear as a local dictionary. Underneath, this uses :meth:`push` and
559 appear as a local dictionary. Underneath, this uses :meth:`push` and
542 :meth:`pull`:
560 :meth:`pull`:
543
561
544 .. sourcecode:: ipython
562 .. sourcecode:: ipython
545
563
546 In [50]: rc.block=True
564 In [50]: rc.block=True
547
565
548 In [51]: dview['a']=['foo','bar']
566 In [51]: dview['a']=['foo','bar']
549
567
550 In [52]: dview['a']
568 In [52]: dview['a']
551 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
569 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
552
570
553 Scatter and gather
571 Scatter and gather
554 ------------------
572 ------------------
555
573
556 Sometimes it is useful to partition a sequence and push the partitions to
574 Sometimes it is useful to partition a sequence and push the partitions to
557 different engines. In MPI language, this is know as scatter/gather and we
575 different engines. In MPI language, this is know as scatter/gather and we
558 follow that terminology. However, it is important to remember that in
576 follow that terminology. However, it is important to remember that in
559 IPython's :class:`Client` class, :meth:`scatter` is from the
577 IPython's :class:`Client` class, :meth:`scatter` is from the
560 interactive IPython session to the engines and :meth:`gather` is from the
578 interactive IPython session to the engines and :meth:`gather` is from the
561 engines back to the interactive IPython session. For scatter/gather operations
579 engines back to the interactive IPython session. For scatter/gather operations
562 between engines, MPI should be used:
580 between engines, MPI should be used:
563
581
564 .. sourcecode:: ipython
582 .. sourcecode:: ipython
565
583
566 In [58]: dview.scatter('a',range(16))
584 In [58]: dview.scatter('a',range(16))
567 Out[58]: [None,None,None,None]
585 Out[58]: [None,None,None,None]
568
586
569 In [59]: dview['a']
587 In [59]: dview['a']
570 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
588 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
571
589
572 In [60]: dview.gather('a')
590 In [60]: dview.gather('a')
573 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
591 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
574
592
575 Other things to look at
593 Other things to look at
576 =======================
594 =======================
577
595
578 How to do parallel list comprehensions
596 How to do parallel list comprehensions
579 --------------------------------------
597 --------------------------------------
580
598
581 In many cases list comprehensions are nicer than using the map function. While
599 In many cases list comprehensions are nicer than using the map function. While
582 we don't have fully parallel list comprehensions, it is simple to get the
600 we don't have fully parallel list comprehensions, it is simple to get the
583 basic effect using :meth:`scatter` and :meth:`gather`:
601 basic effect using :meth:`scatter` and :meth:`gather`:
584
602
585 .. sourcecode:: ipython
603 .. sourcecode:: ipython
586
604
587 In [66]: dview.scatter('x',range(64))
605 In [66]: dview.scatter('x',range(64))
588 Out[66]: [None,None,None,None]
606 Out[66]: [None,None,None,None]
589
607
590 In [67]: px y = [i**10 for i in x]
608 In [67]: px y = [i**10 for i in x]
591 Parallel execution on engines: [0, 1, 2, 3]
609 Parallel execution on engines: [0, 1, 2, 3]
592 Out[67]:
610 Out[67]:
593
611
594 In [68]: y = dview.gather('y')
612 In [68]: y = dview.gather('y')
595
613
596 In [69]: print y
614 In [69]: print y
597 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
615 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
598
616
599 Parallel exceptions
617 Parallel exceptions
600 -------------------
618 -------------------
601
619
602 In the multiengine interface, parallel commands can raise Python exceptions,
620 In the multiengine interface, parallel commands can raise Python exceptions,
603 just like serial commands. But, it is a little subtle, because a single
621 just like serial commands. But, it is a little subtle, because a single
604 parallel command can actually raise multiple exceptions (one for each engine
622 parallel command can actually raise multiple exceptions (one for each engine
605 the command was run on). To express this idea, the MultiEngine interface has a
623 the command was run on). To express this idea, the MultiEngine interface has a
606 :exc:`CompositeError` exception class that will be raised in most cases. The
624 :exc:`CompositeError` exception class that will be raised in most cases. The
607 :exc:`CompositeError` class is a special type of exception that wraps one or
625 :exc:`CompositeError` class is a special type of exception that wraps one or
608 more other types of exceptions. Here is how it works:
626 more other types of exceptions. Here is how it works:
609
627
610 .. sourcecode:: ipython
628 .. sourcecode:: ipython
611
629
612 In [76]: rc.block=True
630 In [76]: rc.block=True
613
631
614 In [77]: rc.execute('1/0')
632 In [77]: rc.execute('1/0')
615 ---------------------------------------------------------------------------
633 ---------------------------------------------------------------------------
616 CompositeError Traceback (most recent call last)
634 CompositeError Traceback (most recent call last)
617
635
618 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
636 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
619
637
620 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
638 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
621 432 targets, block = self._findTargetsAndBlock(targets, block)
639 432 targets, block = self._findTargetsAndBlock(targets, block)
622 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
640 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
623 --> 434 targets=targets, block=block)
641 --> 434 targets=targets, block=block)
624 435 if block:
642 435 if block:
625 436 result = ResultList(result)
643 436 result = ResultList(result)
626
644
627 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
645 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
628 72 result.raiseException()
646 72 result.raiseException()
629 73 except Exception, e:
647 73 except Exception, e:
630 ---> 74 raise e
648 ---> 74 raise e
631 75 return result
649 75 return result
632 76
650 76
633
651
634 CompositeError: one or more exceptions from call to method: execute
652 CompositeError: one or more exceptions from call to method: execute
635 [0:execute]: ZeroDivisionError: integer division or modulo by zero
653 [0:execute]: ZeroDivisionError: integer division or modulo by zero
636 [1:execute]: ZeroDivisionError: integer division or modulo by zero
654 [1:execute]: ZeroDivisionError: integer division or modulo by zero
637 [2:execute]: ZeroDivisionError: integer division or modulo by zero
655 [2:execute]: ZeroDivisionError: integer division or modulo by zero
638 [3:execute]: ZeroDivisionError: integer division or modulo by zero
656 [3:execute]: ZeroDivisionError: integer division or modulo by zero
639
657
640 Notice how the error message printed when :exc:`CompositeError` is raised has
658 Notice how the error message printed when :exc:`CompositeError` is raised has
641 information about the individual exceptions that were raised on each engine.
659 information about the individual exceptions that were raised on each engine.
642 If you want, you can even raise one of these original exceptions:
660 If you want, you can even raise one of these original exceptions:
643
661
644 .. sourcecode:: ipython
662 .. sourcecode:: ipython
645
663
646 In [80]: try:
664 In [80]: try:
647 ....: rc.execute('1/0')
665 ....: rc.execute('1/0')
648 ....: except client.CompositeError, e:
666 ....: except client.CompositeError, e:
649 ....: e.raise_exception()
667 ....: e.raise_exception()
650 ....:
668 ....:
651 ....:
669 ....:
652 ---------------------------------------------------------------------------
670 ---------------------------------------------------------------------------
653 ZeroDivisionError Traceback (most recent call last)
671 ZeroDivisionError Traceback (most recent call last)
654
672
655 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
673 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
656
674
657 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
675 /ipython1-client-r3021/ipython1/kernel/error.pyc in raise_exception(self, excid)
658 156 raise IndexError("an exception with index %i does not exist"%excid)
676 156 raise IndexError("an exception with index %i does not exist"%excid)
659 157 else:
677 157 else:
660 --> 158 raise et, ev, etb
678 --> 158 raise et, ev, etb
661 159
679 159
662 160 def collect_exceptions(rlist, method):
680 160 def collect_exceptions(rlist, method):
663
681
664 ZeroDivisionError: integer division or modulo by zero
682 ZeroDivisionError: integer division or modulo by zero
665
683
666 If you are working in IPython, you can simple type ``%debug`` after one of
684 If you are working in IPython, you can simple type ``%debug`` after one of
667 these :exc:`CompositeError` exceptions is raised, and inspect the exception
685 these :exc:`CompositeError` exceptions is raised, and inspect the exception
668 instance:
686 instance:
669
687
670 .. sourcecode:: ipython
688 .. sourcecode:: ipython
671
689
672 In [81]: rc.execute('1/0')
690 In [81]: rc.execute('1/0')
673 ---------------------------------------------------------------------------
691 ---------------------------------------------------------------------------
674 CompositeError Traceback (most recent call last)
692 CompositeError Traceback (most recent call last)
675
693
676 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
694 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
677
695
678 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
696 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in execute(self, lines, targets, block)
679 432 targets, block = self._findTargetsAndBlock(targets, block)
697 432 targets, block = self._findTargetsAndBlock(targets, block)
680 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
698 433 result = blockingCallFromThread(self.smultiengine.execute, lines,
681 --> 434 targets=targets, block=block)
699 --> 434 targets=targets, block=block)
682 435 if block:
700 435 if block:
683 436 result = ResultList(result)
701 436 result = ResultList(result)
684
702
685 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
703 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
686 72 result.raiseException()
704 72 result.raiseException()
687 73 except Exception, e:
705 73 except Exception, e:
688 ---> 74 raise e
706 ---> 74 raise e
689 75 return result
707 75 return result
690 76
708 76
691
709
692 CompositeError: one or more exceptions from call to method: execute
710 CompositeError: one or more exceptions from call to method: execute
693 [0:execute]: ZeroDivisionError: integer division or modulo by zero
711 [0:execute]: ZeroDivisionError: integer division or modulo by zero
694 [1:execute]: ZeroDivisionError: integer division or modulo by zero
712 [1:execute]: ZeroDivisionError: integer division or modulo by zero
695 [2:execute]: ZeroDivisionError: integer division or modulo by zero
713 [2:execute]: ZeroDivisionError: integer division or modulo by zero
696 [3:execute]: ZeroDivisionError: integer division or modulo by zero
714 [3:execute]: ZeroDivisionError: integer division or modulo by zero
697
715
698 In [82]: %debug
716 In [82]: %debug
699 >
717 >
700
718
701 /ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
719 /ipython1-client-r3021/ipython1/kernel/twistedutil.py(74)blockingCallFromThread()
702 73 except Exception, e:
720 73 except Exception, e:
703 ---> 74 raise e
721 ---> 74 raise e
704 75 return result
722 75 return result
705
723
706 # With the debugger running, e is the exceptions instance. We can tab complete
724 # With the debugger running, e is the exceptions instance. We can tab complete
707 # on it and see the extra methods that are available.
725 # on it and see the extra methods that are available.
708 ipdb> e.
726 ipdb> e.
709 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
727 e.__class__ e.__getitem__ e.__new__ e.__setstate__ e.args
710 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
728 e.__delattr__ e.__getslice__ e.__reduce__ e.__str__ e.elist
711 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
729 e.__dict__ e.__hash__ e.__reduce_ex__ e.__weakref__ e.message
712 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
730 e.__doc__ e.__init__ e.__repr__ e._get_engine_str e.print_tracebacks
713 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
731 e.__getattribute__ e.__module__ e.__setattr__ e._get_traceback e.raise_exception
714 ipdb> e.print_tracebacks()
732 ipdb> e.print_tracebacks()
715 [0:execute]:
733 [0:execute]:
716 ---------------------------------------------------------------------------
734 ---------------------------------------------------------------------------
717 ZeroDivisionError Traceback (most recent call last)
735 ZeroDivisionError Traceback (most recent call last)
718
736
719 /ipython1-client-r3021/docs/examples/<string> in <module>()
737 /ipython1-client-r3021/docs/examples/<string> in <module>()
720
738
721 ZeroDivisionError: integer division or modulo by zero
739 ZeroDivisionError: integer division or modulo by zero
722
740
723 [1:execute]:
741 [1:execute]:
724 ---------------------------------------------------------------------------
742 ---------------------------------------------------------------------------
725 ZeroDivisionError Traceback (most recent call last)
743 ZeroDivisionError Traceback (most recent call last)
726
744
727 /ipython1-client-r3021/docs/examples/<string> in <module>()
745 /ipython1-client-r3021/docs/examples/<string> in <module>()
728
746
729 ZeroDivisionError: integer division or modulo by zero
747 ZeroDivisionError: integer division or modulo by zero
730
748
731 [2:execute]:
749 [2:execute]:
732 ---------------------------------------------------------------------------
750 ---------------------------------------------------------------------------
733 ZeroDivisionError Traceback (most recent call last)
751 ZeroDivisionError Traceback (most recent call last)
734
752
735 /ipython1-client-r3021/docs/examples/<string> in <module>()
753 /ipython1-client-r3021/docs/examples/<string> in <module>()
736
754
737 ZeroDivisionError: integer division or modulo by zero
755 ZeroDivisionError: integer division or modulo by zero
738
756
739 [3:execute]:
757 [3:execute]:
740 ---------------------------------------------------------------------------
758 ---------------------------------------------------------------------------
741 ZeroDivisionError Traceback (most recent call last)
759 ZeroDivisionError Traceback (most recent call last)
742
760
743 /ipython1-client-r3021/docs/examples/<string> in <module>()
761 /ipython1-client-r3021/docs/examples/<string> in <module>()
744
762
745 ZeroDivisionError: integer division or modulo by zero
763 ZeroDivisionError: integer division or modulo by zero
746
764
747
765
748 All of this same error handling magic even works in non-blocking mode:
766 All of this same error handling magic even works in non-blocking mode:
749
767
750 .. sourcecode:: ipython
768 .. sourcecode:: ipython
751
769
752 In [83]: rc.block=False
770 In [83]: rc.block=False
753
771
754 In [84]: pr = rc.execute('1/0')
772 In [84]: ar = rc.execute('1/0')
755
773
756 In [85]: pr.get()
774 In [85]: ar.get()
757 ---------------------------------------------------------------------------
775 ---------------------------------------------------------------------------
758 CompositeError Traceback (most recent call last)
776 CompositeError Traceback (most recent call last)
759
777
760 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
778 /ipython1-client-r3021/docs/examples/<ipython console> in <module>()
761
779
762 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
780 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in _get_r(self)
763 170
781 170
764 171 def _get_r(self):
782 171 def _get_r(self):
765 --> 172 return self.get_result(block=True)
783 --> 172 return self.get_result(block=True)
766 173
784 173
767 174 r = property(_get_r)
785 174 r = property(_get_r)
768
786
769 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
787 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_result(self, default, block)
770 131 return self.result
788 131 return self.result
771 132 try:
789 132 try:
772 --> 133 result = self.client.get_pending_deferred(self.result_id, block)
790 --> 133 result = self.client.get_pending_deferred(self.result_id, block)
773 134 except error.ResultNotCompleted:
791 134 except error.ResultNotCompleted:
774 135 return default
792 135 return default
775
793
776 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
794 /ipython1-client-r3021/ipython1/kernel/multiengineclient.pyc in get_pending_deferred(self, deferredID, block)
777 385
795 385
778 386 def get_pending_deferred(self, deferredID, block):
796 386 def get_pending_deferred(self, deferredID, block):
779 --> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
797 --> 387 return blockingCallFromThread(self.smultiengine.get_pending_deferred, deferredID, block)
780 388
798 388
781 389 def barrier(self, pendingResults):
799 389 def barrier(self, pendingResults):
782
800
783 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
801 /ipython1-client-r3021/ipython1/kernel/twistedutil.pyc in blockingCallFromThread(f, *a, **kw)
784 72 result.raiseException()
802 72 result.raiseException()
785 73 except Exception, e:
803 73 except Exception, e:
786 ---> 74 raise e
804 ---> 74 raise e
787 75 return result
805 75 return result
788 76
806 76
789
807
790 CompositeError: one or more exceptions from call to method: execute
808 CompositeError: one or more exceptions from call to method: execute
791 [0:execute]: ZeroDivisionError: integer division or modulo by zero
809 [0:execute]: ZeroDivisionError: integer division or modulo by zero
792 [1:execute]: ZeroDivisionError: integer division or modulo by zero
810 [1:execute]: ZeroDivisionError: integer division or modulo by zero
793 [2:execute]: ZeroDivisionError: integer division or modulo by zero
811 [2:execute]: ZeroDivisionError: integer division or modulo by zero
794 [3:execute]: ZeroDivisionError: integer division or modulo by zero
812 [3:execute]: ZeroDivisionError: integer division or modulo by zero
795
813
796
814
@@ -1,395 +1,396 b''
1 .. _paralleltask:
1 .. _paralleltask:
2
2
3 ==========================
3 ==========================
4 The IPython task interface
4 The IPython task interface
5 ==========================
5 ==========================
6
6
7 The task interface to the cluster presents the engines as a fault tolerant,
7 The task interface to the cluster presents the engines as a fault tolerant,
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 the task interface the user have no direct access to individual engines. By
9 the task interface the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is simultaneously
10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 simpler and more powerful.
11 simpler and more powerful.
12
12
13 Best of all, the user can use both of these interfaces running at the same time
13 Best of all, the user can use both of these interfaces running at the same time
14 to take advantage of their respective strengths. When the user can break up
14 to take advantage of their respective strengths. When the user can break up
15 the user's work into segments that do not depend on previous execution, the
15 the user's work into segments that do not depend on previous execution, the
16 task interface is ideal. But it also has more power and flexibility, allowing
16 task interface is ideal. But it also has more power and flexibility, allowing
17 the user to guide the distribution of jobs, without having to assign tasks to
17 the user to guide the distribution of jobs, without having to assign tasks to
18 engines explicitly.
18 engines explicitly.
19
19
20 Starting the IPython controller and engines
20 Starting the IPython controller and engines
21 ===========================================
21 ===========================================
22
22
23 To follow along with this tutorial, you will need to start the IPython
23 To follow along with this tutorial, you will need to start the IPython
24 controller and four IPython engines. The simplest way of doing this is to use
24 controller and four IPython engines. The simplest way of doing this is to use
25 the :command:`ipclusterz` command::
25 the :command:`ipclusterz` command::
26
26
27 $ ipclusterz start -n 4
27 $ ipclusterz start -n 4
28
28
29 For more detailed information about starting the controller and engines, see
29 For more detailed information about starting the controller and engines, see
30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
31
31
32 Creating a ``Client`` instance
32 Creating a ``Client`` instance
33 ==============================
33 ==============================
34
34
35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
35 The first step is to import the IPython :mod:`IPython.zmq.parallel.client`
36 module and then create a :class:`.Client` instance:
36 module and then create a :class:`.Client` instance, and we will also be using
37 a :class:`LoadBalancedView`, here called `lview`:
37
38
38 .. sourcecode:: ipython
39 .. sourcecode:: ipython
39
40
40 In [1]: from IPython.zmq.parallel import client
41 In [1]: from IPython.zmq.parallel import client
41
42
42 In [2]: rc = client.Client()
43 In [2]: rc = client.Client()
43
44
44 In [3]: lview = rc.view(balanced=True)
45 In [3]: lview = rc.view()
45 Out[3]: <LoadBalancedView None>
46
46
47
47
48 This form assumes that the controller was started on localhost with default
48 This form assumes that the controller was started on localhost with default
49 configuration. If not, the location of the controller must be given as an
49 configuration. If not, the location of the controller must be given as an
50 argument to the constructor:
50 argument to the constructor:
51
51
52 .. sourcecode:: ipython
52 .. sourcecode:: ipython
53
53
54 # for a visible LAN controller listening on an external port:
54 # for a visible LAN controller listening on an external port:
55 In [2]: rc = client.Client('tcp://192.168.1.16:10101')
55 In [2]: rc = client.Client('tcp://192.168.1.16:10101')
56 # for a remote controller at my.server.com listening on localhost:
56 # for a remote controller at my.server.com listening on localhost:
57 In [3]: rc = client.Client(sshserver='my.server.com')
57 In [3]: rc = client.Client(sshserver='my.server.com')
58
58
59
59
60
60
61 Quick and easy parallelism
61 Quick and easy parallelism
62 ==========================
62 ==========================
63
63
64 In many cases, you simply want to apply a Python function to a sequence of
64 In many cases, you simply want to apply a Python function to a sequence of
65 objects, but *in parallel*. Like the multiengine interface, these can be
65 objects, but *in parallel*. Like the multiengine interface, these can be
66 implemented via the task interface. The exact same tools can perform these
66 implemented via the task interface. The exact same tools can perform these
67 actions in load-balanced ways as well as multiplexed ways: a parallel version
67 actions in load-balanced ways as well as multiplexed ways: a parallel version
68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
68 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
69 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
69 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
70 execution time per item varies significantly, you should use the versions in
70 execution time per item varies significantly, you should use the versions in
71 the task interface.
71 the task interface.
72
72
73 Parallel map
73 Parallel map
74 ------------
74 ------------
75
75
76 To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking
76 To load-balance :meth:`map`,simply use a LoadBalancedView:
77 for the ``None`` element:
78
77
79 .. sourcecode:: ipython
78 .. sourcecode:: ipython
80
79
80 In [62]: lview.block = True
81
81 In [63]: serial_result = map(lambda x:x**10, range(32))
82 In [63]: serial_result = map(lambda x:x**10, range(32))
82
83
83 In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True)
84 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
84
85
85 In [65]: serial_result==parallel_result
86 In [65]: serial_result==parallel_result
86 Out[65]: True
87 Out[65]: True
87
88
88 Parallel function decorator
89 Parallel function decorator
89 ---------------------------
90 ---------------------------
90
91
91 Parallel functions are just like normal function, but they can be called on
92 Parallel functions are just like normal function, but they can be called on
92 sequences and *in parallel*. The multiengine interface provides a decorator
93 sequences and *in parallel*. The multiengine interface provides a decorator
93 that turns any Python function into a parallel function:
94 that turns any Python function into a parallel function:
94
95
95 .. sourcecode:: ipython
96 .. sourcecode:: ipython
96
97
97 In [10]: @lview.parallel()
98 In [10]: @lview.parallel()
98 ....: def f(x):
99 ....: def f(x):
99 ....: return 10.0*x**4
100 ....: return 10.0*x**4
100 ....:
101 ....:
101
102
102 In [11]: f.map(range(32)) # this is done in parallel
103 In [11]: f.map(range(32)) # this is done in parallel
103 Out[11]: [0.0,10.0,160.0,...]
104 Out[11]: [0.0,10.0,160.0,...]
104
105
105 Dependencies
106 Dependencies
106 ============
107 ============
107
108
108 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
109 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
109 may want to associate some kind of `Dependency` that describes when, where, or whether
110 may want to associate some kind of `Dependency` that describes when, where, or whether
110 a task can be run. In IPython, we provide two types of dependencies:
111 a task can be run. In IPython, we provide two types of dependencies:
111 `Functional Dependencies`_ and `Graph Dependencies`_
112 `Functional Dependencies`_ and `Graph Dependencies`_
112
113
113 .. note::
114 .. note::
114
115
115 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
116 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
116 and you will see errors or warnings if you try to use dependencies with the pure
117 and you will see errors or warnings if you try to use dependencies with the pure
117 scheduler.
118 scheduler.
118
119
119 Functional Dependencies
120 Functional Dependencies
120 -----------------------
121 -----------------------
121
122
122 Functional dependencies are used to determine whether a given engine is capable of running
123 Functional dependencies are used to determine whether a given engine is capable of running
123 a particular task. This is implemented via a special :class:`Exception` class,
124 a particular task. This is implemented via a special :class:`Exception` class,
124 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
125 :class:`UnmetDependency`, found in `IPython.zmq.parallel.error`. Its use is very simple:
125 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
126 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
126 the error up to the client like any other error, catches the error, and submits the task
127 the error up to the client like any other error, catches the error, and submits the task
127 to a different engine. This will repeat indefinitely, and a task will never be submitted
128 to a different engine. This will repeat indefinitely, and a task will never be submitted
128 to a given engine a second time.
129 to a given engine a second time.
129
130
130 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
131 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
131 some decorators for facilitating this behavior.
132 some decorators for facilitating this behavior.
132
133
133 There are two decorators and a class used for functional dependencies:
134 There are two decorators and a class used for functional dependencies:
134
135
135 .. sourcecode:: ipython
136 .. sourcecode:: ipython
136
137
137 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
138 In [9]: from IPython.zmq.parallel.dependency import depend, require, dependent
138
139
139 @require
140 @require
140 ********
141 ********
141
142
142 The simplest sort of dependency is requiring that a Python module is available. The
143 The simplest sort of dependency is requiring that a Python module is available. The
143 ``@require`` decorator lets you define a function that will only run on engines where names
144 ``@require`` decorator lets you define a function that will only run on engines where names
144 you specify are importable:
145 you specify are importable:
145
146
146 .. sourcecode:: ipython
147 .. sourcecode:: ipython
147
148
148 In [10]: @require('numpy', 'zmq')
149 In [10]: @require('numpy', 'zmq')
149 ...: def myfunc():
150 ...: def myfunc():
150 ...: import numpy,zmq
151 ...: import numpy,zmq
151 ...: return dostuff()
152 ...: return dostuff()
152
153
153 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
154 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
154 numpy and pyzmq available.
155 numpy and pyzmq available.
155
156
156 @depend
157 @depend
157 *******
158 *******
158
159
159 The ``@depend`` decorator lets you decorate any function with any *other* function to
160 The ``@depend`` decorator lets you decorate any function with any *other* function to
160 evaluate the dependency. The dependency function will be called at the start of the task,
161 evaluate the dependency. The dependency function will be called at the start of the task,
161 and if it returns ``False``, then the dependency will be considered unmet, and the task
162 and if it returns ``False``, then the dependency will be considered unmet, and the task
162 will be assigned to another engine. If the dependency returns *anything other than
163 will be assigned to another engine. If the dependency returns *anything other than
163 ``False``*, the rest of the task will continue.
164 ``False``*, the rest of the task will continue.
164
165
165 .. sourcecode:: ipython
166 .. sourcecode:: ipython
166
167
167 In [10]: def platform_specific(plat):
168 In [10]: def platform_specific(plat):
168 ...: import sys
169 ...: import sys
169 ...: return sys.platform == plat
170 ...: return sys.platform == plat
170
171
171 In [11]: @depend(platform_specific, 'darwin')
172 In [11]: @depend(platform_specific, 'darwin')
172 ...: def mactask():
173 ...: def mactask():
173 ...: do_mac_stuff()
174 ...: do_mac_stuff()
174
175
175 In [12]: @depend(platform_specific, 'nt')
176 In [12]: @depend(platform_specific, 'nt')
176 ...: def wintask():
177 ...: def wintask():
177 ...: do_windows_stuff()
178 ...: do_windows_stuff()
178
179
179 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
180 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
180 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
181 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
181 signature.
182 signature.
182
183
183 dependents
184 dependents
184 **********
185 **********
185
186
186 You don't have to use the decorators on your tasks, if for instance you may want
187 You don't have to use the decorators on your tasks, if for instance you may want
187 to run tasks with a single function but varying dependencies, you can directly construct
188 to run tasks with a single function but varying dependencies, you can directly construct
188 the :class:`dependent` object that the decorators use:
189 the :class:`dependent` object that the decorators use:
189
190
190 .. sourcecode::ipython
191 .. sourcecode::ipython
191
192
192 In [13]: def mytask(*args):
193 In [13]: def mytask(*args):
193 ...: dostuff()
194 ...: dostuff()
194
195
195 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
196 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
196 # this is the same as decorating the declaration of mytask with @depend
197 # this is the same as decorating the declaration of mytask with @depend
197 # but you can do it again:
198 # but you can do it again:
198
199
199 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
200 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
200
201
201 # in general:
202 # in general:
202 In [16]: t = dependent(f, g, *dargs, **dkwargs)
203 In [16]: t = dependent(f, g, *dargs, **dkwargs)
203
204
204 # is equivalent to:
205 # is equivalent to:
205 In [17]: @depend(g, *dargs, **dkwargs)
206 In [17]: @depend(g, *dargs, **dkwargs)
206 ...: def t(a,b,c):
207 ...: def t(a,b,c):
207 ...: # contents of f
208 ...: # contents of f
208
209
209 Graph Dependencies
210 Graph Dependencies
210 ------------------
211 ------------------
211
212
212 Sometimes you want to restrict the time and/or location to run a given task as a function
213 Sometimes you want to restrict the time and/or location to run a given task as a function
213 of the time and/or location of other tasks. This is implemented via a subclass of
214 of the time and/or location of other tasks. This is implemented via a subclass of
214 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
215 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
215 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
216 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
216 has been met.
217 has been met.
217
218
218 The switches we provide for interpreting whether a given dependency set has been met:
219 The switches we provide for interpreting whether a given dependency set has been met:
219
220
220 any|all
221 any|all
221 Whether the dependency is considered met if *any* of the dependencies are done, or
222 Whether the dependency is considered met if *any* of the dependencies are done, or
222 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
223 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
223 boolean attribute, which defaults to ``True``.
224 boolean attribute, which defaults to ``True``.
224
225
225 success_only
226 success_only
226 Whether to consider only tasks that did not raise an error as being fulfilled.
227 Whether to consider only tasks that did not raise an error as being fulfilled.
227 Sometimes you want to run a task after another, but only if that task succeeded. In
228 Sometimes you want to run a task after another, but only if that task succeeded. In
228 this case, ``success_only`` should be ``True``. However sometimes you may not care
229 this case, ``success_only`` should be ``True``. However sometimes you may not care
229 whether the task succeeds, and always want the second task to run, in which case
230 whether the task succeeds, and always want the second task to run, in which case
230 you should use `success_only=False`. The default behavior is to only use successes.
231 you should use `success_only=False`. The default behavior is to only use successes.
231
232
232 There are other switches for interpretation that are made at the *task* level. These are
233 There are other switches for interpretation that are made at the *task* level. These are
233 specified via keyword arguments to the client's :meth:`apply` method.
234 specified via keyword arguments to the client's :meth:`apply` method.
234
235
235 after,follow
236 after,follow
236 You may want to run a task *after* a given set of dependencies have been run and/or
237 You may want to run a task *after* a given set of dependencies have been run and/or
237 run it *where* another set of dependencies are met. To support this, every task has an
238 run it *where* another set of dependencies are met. To support this, every task has an
238 `after` dependency to restrict time, and a `follow` dependency to restrict
239 `after` dependency to restrict time, and a `follow` dependency to restrict
239 destination.
240 destination.
240
241
241 timeout
242 timeout
242 You may also want to set a time-limit for how long the scheduler should wait before a
243 You may also want to set a time-limit for how long the scheduler should wait before a
243 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
244 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
244 indicates that the task should never timeout. If the timeout is reached, and the
245 indicates that the task should never timeout. If the timeout is reached, and the
245 scheduler still hasn't been able to assign the task to an engine, the task will fail
246 scheduler still hasn't been able to assign the task to an engine, the task will fail
246 with a :class:`DependencyTimeout`.
247 with a :class:`DependencyTimeout`.
247
248
248 .. note::
249 .. note::
249
250
250 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
251 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
251 task to run after a job submitted via the MUX interface.
252 task to run after a job submitted via the MUX interface.
252
253
253 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
254 The simplest form of Dependencies is with `all=True,success_only=True`. In these cases,
254 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
255 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
255 `follow` and `after` keywords to :meth:`client.apply`:
256 `follow` and `after` keywords to :meth:`client.apply`:
256
257
257 .. sourcecode:: ipython
258 .. sourcecode:: ipython
258
259
259 In [14]: client.block=False
260 In [14]: client.block=False
260
261
261 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
262 In [15]: ar = client.apply(f, args, kwargs, balanced=True)
262
263
263 In [16]: ar2 = client.apply(f2, balanced=True)
264 In [16]: ar2 = client.apply(f2, balanced=True)
264
265
265 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
266 In [17]: ar3 = client.apply(f3, after=[ar,ar2], balanced=True)
266
267
267 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
268 In [17]: ar4 = client.apply(f3, follow=[ar], timeout=2.5, balanced=True)
268
269
269
270
270 .. seealso::
271 .. seealso::
271
272
272 Some parallel workloads can be described as a `Directed Acyclic Graph
273 Some parallel workloads can be described as a `Directed Acyclic Graph
273 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
274 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
274 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
275 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
275 onto task dependencies.
276 onto task dependencies.
276
277
277
278
278
279
279 Impossible Dependencies
280 Impossible Dependencies
280 ***********************
281 ***********************
281
282
282 The schedulers do perform some analysis on graph dependencies to determine whether they
283 The schedulers do perform some analysis on graph dependencies to determine whether they
283 are not possible to be met. If the scheduler does discover that a dependency cannot be
284 are not possible to be met. If the scheduler does discover that a dependency cannot be
284 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
285 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
285 scheduler realized that a task can never be run, it won't sit indefinitely in the
286 scheduler realized that a task can never be run, it won't sit indefinitely in the
286 scheduler clogging the pipeline.
287 scheduler clogging the pipeline.
287
288
288 The basic cases that are checked:
289 The basic cases that are checked:
289
290
290 * depending on nonexistent messages
291 * depending on nonexistent messages
291 * `follow` dependencies were run on more than one machine and `all=True`
292 * `follow` dependencies were run on more than one machine and `all=True`
292 * any dependencies failed and `all=True,success_only=True`
293 * any dependencies failed and `all=True,success_only=True`
293 * all dependencies failed and `all=False,success_only=True`
294 * all dependencies failed and `all=False,success_only=True`
294
295
295 .. warning::
296 .. warning::
296
297
297 This analysis has not been proven to be rigorous, so it is likely possible for tasks
298 This analysis has not been proven to be rigorous, so it is likely possible for tasks
298 to become impossible to run in obscure situations, so a timeout may be a good choice.
299 to become impossible to run in obscure situations, so a timeout may be a good choice.
299
300
300 Schedulers
301 Schedulers
301 ==========
302 ==========
302
303
303 There are a variety of valid ways to determine where jobs should be assigned in a
304 There are a variety of valid ways to determine where jobs should be assigned in a
304 load-balancing situation. In IPython, we support several standard schemes, and
305 load-balancing situation. In IPython, we support several standard schemes, and
305 even make it easy to define your own. The scheme can be selected via the ``--scheme``
306 even make it easy to define your own. The scheme can be selected via the ``--scheme``
306 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
307 argument to :command:`ipcontrollerz`, or in the :attr:`HubFactory.scheme` attribute
307 of a controller config object.
308 of a controller config object.
308
309
309 The built-in routing schemes:
310 The built-in routing schemes:
310
311
311 lru: Least Recently Used
312 lru: Least Recently Used
312
313
313 Always assign work to the least-recently-used engine. A close relative of
314 Always assign work to the least-recently-used engine. A close relative of
314 round-robin, it will be fair with respect to the number of tasks, agnostic
315 round-robin, it will be fair with respect to the number of tasks, agnostic
315 with respect to runtime of each task.
316 with respect to runtime of each task.
316
317
317 plainrandom: Plain Random
318 plainrandom: Plain Random
318 Randomly picks an engine on which to run.
319 Randomly picks an engine on which to run.
319
320
320 twobin: Two-Bin Random
321 twobin: Two-Bin Random
321
322
322 **Depends on numpy**
323 **Depends on numpy**
323
324
324 Pick two engines at random, and use the LRU of the two. This is known to be better
325 Pick two engines at random, and use the LRU of the two. This is known to be better
325 than plain random in many cases, but requires a small amount of computation.
326 than plain random in many cases, but requires a small amount of computation.
326
327
327 leastload: Least Load
328 leastload: Least Load
328
329
329 **This is the default scheme**
330 **This is the default scheme**
330
331
331 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
332 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
332
333
333 weighted: Weighted Two-Bin Random
334 weighted: Weighted Two-Bin Random
334
335
335 **Depends on numpy**
336 **Depends on numpy**
336
337
337 Pick two engines at random using the number of outstanding tasks as inverse weights,
338 Pick two engines at random using the number of outstanding tasks as inverse weights,
338 and use the one with the lower load.
339 and use the one with the lower load.
339
340
340
341
341 Pure ZMQ Scheduler
342 Pure ZMQ Scheduler
342 ------------------
343 ------------------
343
344
344 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
345 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
345 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
346 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
346 load-balancing. This scheduler does not support any of the advanced features of the Python
347 load-balancing. This scheduler does not support any of the advanced features of the Python
347 :class:`.Scheduler`.
348 :class:`.Scheduler`.
348
349
349 Disabled features when using the ZMQ Scheduler:
350 Disabled features when using the ZMQ Scheduler:
350
351
351 * Engine unregistration
352 * Engine unregistration
352 Task farming will be disabled if an engine unregisters.
353 Task farming will be disabled if an engine unregisters.
353 Further, if an engine is unregistered during computation, the scheduler may not recover.
354 Further, if an engine is unregistered during computation, the scheduler may not recover.
354 * Dependencies
355 * Dependencies
355 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
356 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
356 based on message content.
357 based on message content.
357 * Early destination notification
358 * Early destination notification
358 The Python schedulers know which engine gets which task, and notify the Hub. This
359 The Python schedulers know which engine gets which task, and notify the Hub. This
359 allows graceful handling of Engines coming and going. There is no way to know
360 allows graceful handling of Engines coming and going. There is no way to know
360 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
361 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
361 engine until they *finish*. This makes recovery from engine shutdown very difficult.
362 engine until they *finish*. This makes recovery from engine shutdown very difficult.
362
363
363
364
364 .. note::
365 .. note::
365
366
366 TODO: performance comparisons
367 TODO: performance comparisons
367
368
368
369
369 More details
370 More details
370 ============
371 ============
371
372
372 The :class:`Client` has many more powerful features that allow quite a bit
373 The :class:`Client` has many more powerful features that allow quite a bit
373 of flexibility in how tasks are defined and run. The next places to look are
374 of flexibility in how tasks are defined and run. The next places to look are
374 in the following classes:
375 in the following classes:
375
376
376 * :class:`IPython.zmq.parallel.client.Client`
377 * :class:`IPython.zmq.parallel.client.Client`
377 * :class:`IPython.zmq.parallel.client.AsyncResult`
378 * :class:`IPython.zmq.parallel.client.AsyncResult`
378 * :meth:`IPython.zmq.parallel.client.Client.apply`
379 * :meth:`IPython.zmq.parallel.client.Client.apply`
379 * :mod:`IPython.zmq.parallel.dependency`
380 * :mod:`IPython.zmq.parallel.dependency`
380
381
381 The following is an overview of how to use these classes together:
382 The following is an overview of how to use these classes together:
382
383
383 1. Create a :class:`Client`.
384 1. Create a :class:`Client`.
384 2. Define some functions to be run as tasks
385 2. Define some functions to be run as tasks
385 3. Submit your tasks to using the :meth:`apply` method of your
386 3. Submit your tasks to using the :meth:`apply` method of your
386 :class:`Client` instance, specifying `balanced=True`. This signals
387 :class:`Client` instance, specifying `balanced=True`. This signals
387 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
388 the :class:`Client` to entrust the Scheduler with assigning tasks to engines.
388 4. Use :meth:`Client.get_results` to get the results of the
389 4. Use :meth:`Client.get_results` to get the results of the
389 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
390 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
390 for and then receive the results.
391 for and then receive the results.
391
392
392
393
393 .. seealso::
394 .. seealso::
394
395
395 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
396 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now