##// END OF EJS Templates
add db,resubmit/retries docs
MinRK -
Show More
@@ -0,0 +1,114
1 .. _parallel_db:
2
3 =======================
4 IPython's Task Database
5 =======================
6
7 The IPython Hub stores all task requests and results in a database. Currently supported backends
8 are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
9 this is clients requesting results for tasks they did not submit, via:
10
11 .. sourcecode:: ipython
12
13 In [1]: rc.get_result(task_id)
14
15 However, since we have this DB backend, we provide a direct query method in the :class:`client`
16 for users who want deeper introspection into their task history. The :meth:`db_query` method of
17 the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
18 familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
19 when using other backends, the interface is emulated and only a subset of queries is possible.
20
21 .. seealso::
22
23 MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying
24
25 :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list,
26 and values of either exact values to test, or MongoDB queries, which are dicts of The form:
27 ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies
28 which subset of keys should be retrieved. The default is to retrieve all keys excluding the
29 request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like
30 MongoDB, the `msg_id` key will always be included, whether requested or not.
31
32 TaskRecord keys:
33
34 =============== =============== =============
35 Key Type Description
36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
38 header dict The request header
39 content dict The request content (likely empty)
40 buffers list(bytes) buffers containing serialized request objects
41 submitted datetime timestamp for time of submission (set by client)
42 client_uuid uuid(bytes) IDENT of client's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
44 started datetime time task began execution on engine
45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
47 result_header dict header for result
48 result_content dict content for result
49 result_buffers list(bytes) buffers containing serialized request objects
50 queue bytes The name of the queue for the task ('mux' or 'task')
51 pyin <unused> Python input (unused)
52 pyout <unused> Python output (unused)
53 pyerr <unused> Python traceback (unused)
54 stdout str Stream of stdout data
55 stderr str Stream of stderr data
56
57 =============== =============== =============
58
59 MongoDB operators we emulate on all backends:
60
61 ========== =================
62 Operator Python equivalent
63 ========== =================
64 '$in' in
65 '$nin' not in
66 '$eq' ==
67 '$ne' !=
68 '$ge' >
69 '$gte' >=
70 '$le' <
71 '$lte' <=
72 ========== =================
73
74
75 The DB Query is useful for two primary cases:
76
77 1. deep polling of task status or metadata
78 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
79
80 Example Queries
81 ===============
82
83
84 To get all msg_ids that are not completed, only retrieving their ID and start time:
85
86 .. sourcecode:: ipython
87
88 In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
89
90 All jobs started in the last hour by me:
91
92 .. sourcecode:: ipython
93
94 In [1]: from datetime import datetime, timedelta
95
96 In [2]: hourago = datetime.now() - timedelta(1./24)
97
98 In [3]: recent = rc.db_query({'started' : {'$gte' : hourago },
99 'client_uuid' : rc.session.session})
100
101 All jobs started more than an hour ago, by clients *other than me*:
102
103 .. sourcecode:: ipython
104
105 In [3]: recent = rc.db_query({'started' : {'$le' : hourago },
106 'client_uuid' : {'$ne' : rc.session.session}})
107
108 Result headers for all jobs on engine 3 or 4:
109
110 .. sourcecode:: ipython
111
112 In [1]: uuids = map(rc._engines.get, (3,4))
113
114 In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
@@ -1,1354 +1,1356
1 """A semi-synchronous Client for the ZMQ cluster"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 Dict, List, Bool, Str, Set)
28 Dict, List, Bool, Str, Set)
29 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
30 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
31
31
32 from IPython.parallel import error
32 from IPython.parallel import error
33 from IPython.parallel import streamsession as ss
33 from IPython.parallel import streamsession as ss
34 from IPython.parallel import util
34 from IPython.parallel import util
35
35
36 from .asyncresult import AsyncResult, AsyncHubResult
36 from .asyncresult import AsyncResult, AsyncHubResult
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
38 from .view import DirectView, LoadBalancedView
38 from .view import DirectView, LoadBalancedView
39
39
40 #--------------------------------------------------------------------------
40 #--------------------------------------------------------------------------
41 # Decorators for Client methods
41 # Decorators for Client methods
42 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
43
43
44 @decorator
44 @decorator
45 def spin_first(f, self, *args, **kwargs):
45 def spin_first(f, self, *args, **kwargs):
46 """Call spin() to sync state prior to calling the method."""
46 """Call spin() to sync state prior to calling the method."""
47 self.spin()
47 self.spin()
48 return f(self, *args, **kwargs)
48 return f(self, *args, **kwargs)
49
49
50
50
51 #--------------------------------------------------------------------------
51 #--------------------------------------------------------------------------
52 # Classes
52 # Classes
53 #--------------------------------------------------------------------------
53 #--------------------------------------------------------------------------
54
54
55 class Metadata(dict):
55 class Metadata(dict):
56 """Subclass of dict for initializing metadata values.
56 """Subclass of dict for initializing metadata values.
57
57
58 Attribute access works on keys.
58 Attribute access works on keys.
59
59
60 These objects have a strict set of keys - errors will raise if you try
60 These objects have a strict set of keys - errors will raise if you try
61 to add new keys.
61 to add new keys.
62 """
62 """
63 def __init__(self, *args, **kwargs):
63 def __init__(self, *args, **kwargs):
64 dict.__init__(self)
64 dict.__init__(self)
65 md = {'msg_id' : None,
65 md = {'msg_id' : None,
66 'submitted' : None,
66 'submitted' : None,
67 'started' : None,
67 'started' : None,
68 'completed' : None,
68 'completed' : None,
69 'received' : None,
69 'received' : None,
70 'engine_uuid' : None,
70 'engine_uuid' : None,
71 'engine_id' : None,
71 'engine_id' : None,
72 'follow' : None,
72 'follow' : None,
73 'after' : None,
73 'after' : None,
74 'status' : None,
74 'status' : None,
75
75
76 'pyin' : None,
76 'pyin' : None,
77 'pyout' : None,
77 'pyout' : None,
78 'pyerr' : None,
78 'pyerr' : None,
79 'stdout' : '',
79 'stdout' : '',
80 'stderr' : '',
80 'stderr' : '',
81 }
81 }
82 self.update(md)
82 self.update(md)
83 self.update(dict(*args, **kwargs))
83 self.update(dict(*args, **kwargs))
84
84
85 def __getattr__(self, key):
85 def __getattr__(self, key):
86 """getattr aliased to getitem"""
86 """getattr aliased to getitem"""
87 if key in self.iterkeys():
87 if key in self.iterkeys():
88 return self[key]
88 return self[key]
89 else:
89 else:
90 raise AttributeError(key)
90 raise AttributeError(key)
91
91
92 def __setattr__(self, key, value):
92 def __setattr__(self, key, value):
93 """setattr aliased to setitem, with strict"""
93 """setattr aliased to setitem, with strict"""
94 if key in self.iterkeys():
94 if key in self.iterkeys():
95 self[key] = value
95 self[key] = value
96 else:
96 else:
97 raise AttributeError(key)
97 raise AttributeError(key)
98
98
99 def __setitem__(self, key, value):
99 def __setitem__(self, key, value):
100 """strict static key enforcement"""
100 """strict static key enforcement"""
101 if key in self.iterkeys():
101 if key in self.iterkeys():
102 dict.__setitem__(self, key, value)
102 dict.__setitem__(self, key, value)
103 else:
103 else:
104 raise KeyError(key)
104 raise KeyError(key)
105
105
106
106
107 class Client(HasTraits):
107 class Client(HasTraits):
108 """A semi-synchronous client to the IPython ZMQ cluster
108 """A semi-synchronous client to the IPython ZMQ cluster
109
109
110 Parameters
110 Parameters
111 ----------
111 ----------
112
112
113 url_or_file : bytes; zmq url or path to ipcontroller-client.json
113 url_or_file : bytes; zmq url or path to ipcontroller-client.json
114 Connection information for the Hub's registration. If a json connector
114 Connection information for the Hub's registration. If a json connector
115 file is given, then likely no further configuration is necessary.
115 file is given, then likely no further configuration is necessary.
116 [Default: use profile]
116 [Default: use profile]
117 profile : bytes
117 profile : bytes
118 The name of the Cluster profile to be used to find connector information.
118 The name of the Cluster profile to be used to find connector information.
119 [Default: 'default']
119 [Default: 'default']
120 context : zmq.Context
120 context : zmq.Context
121 Pass an existing zmq.Context instance, otherwise the client will create its own.
121 Pass an existing zmq.Context instance, otherwise the client will create its own.
122 username : bytes
122 username : bytes
123 set username to be passed to the Session object
123 set username to be passed to the Session object
124 debug : bool
124 debug : bool
125 flag for lots of message printing for debug purposes
125 flag for lots of message printing for debug purposes
126
126
127 #-------------- ssh related args ----------------
127 #-------------- ssh related args ----------------
128 # These are args for configuring the ssh tunnel to be used
128 # These are args for configuring the ssh tunnel to be used
129 # credentials are used to forward connections over ssh to the Controller
129 # credentials are used to forward connections over ssh to the Controller
130 # Note that the ip given in `addr` needs to be relative to sshserver
130 # Note that the ip given in `addr` needs to be relative to sshserver
131 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
131 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
132 # and set sshserver as the same machine the Controller is on. However,
132 # and set sshserver as the same machine the Controller is on. However,
133 # the only requirement is that sshserver is able to see the Controller
133 # the only requirement is that sshserver is able to see the Controller
134 # (i.e. is within the same trusted network).
134 # (i.e. is within the same trusted network).
135
135
136 sshserver : str
136 sshserver : str
137 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
137 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
138 If keyfile or password is specified, and this is not, it will default to
138 If keyfile or password is specified, and this is not, it will default to
139 the ip given in addr.
139 the ip given in addr.
140 sshkey : str; path to public ssh key file
140 sshkey : str; path to public ssh key file
141 This specifies a key to be used in ssh login, default None.
141 This specifies a key to be used in ssh login, default None.
142 Regular default ssh keys will be used without specifying this argument.
142 Regular default ssh keys will be used without specifying this argument.
143 password : str
143 password : str
144 Your ssh password to sshserver. Note that if this is left None,
144 Your ssh password to sshserver. Note that if this is left None,
145 you will be prompted for it if passwordless key based login is unavailable.
145 you will be prompted for it if passwordless key based login is unavailable.
146 paramiko : bool
146 paramiko : bool
147 flag for whether to use paramiko instead of shell ssh for tunneling.
147 flag for whether to use paramiko instead of shell ssh for tunneling.
148 [default: True on win32, False else]
148 [default: True on win32, False else]
149
149
150 ------- exec authentication args -------
150 ------- exec authentication args -------
151 If even localhost is untrusted, you can have some protection against
151 If even localhost is untrusted, you can have some protection against
152 unauthorized execution by using a key. Messages are still sent
152 unauthorized execution by using a key. Messages are still sent
153 as cleartext, so if someone can snoop your loopback traffic this will
153 as cleartext, so if someone can snoop your loopback traffic this will
154 not help against malicious attacks.
154 not help against malicious attacks.
155
155
156 exec_key : str
156 exec_key : str
157 an authentication key or file containing a key
157 an authentication key or file containing a key
158 default: None
158 default: None
159
159
160
160
161 Attributes
161 Attributes
162 ----------
162 ----------
163
163
164 ids : list of int engine IDs
164 ids : list of int engine IDs
165 requesting the ids attribute always synchronizes
165 requesting the ids attribute always synchronizes
166 the registration state. To request ids without synchronization,
166 the registration state. To request ids without synchronization,
167 use semi-private _ids attributes.
167 use semi-private _ids attributes.
168
168
169 history : list of msg_ids
169 history : list of msg_ids
170 a list of msg_ids, keeping track of all the execution
170 a list of msg_ids, keeping track of all the execution
171 messages you have submitted in order.
171 messages you have submitted in order.
172
172
173 outstanding : set of msg_ids
173 outstanding : set of msg_ids
174 a set of msg_ids that have been submitted, but whose
174 a set of msg_ids that have been submitted, but whose
175 results have not yet been received.
175 results have not yet been received.
176
176
177 results : dict
177 results : dict
178 a dict of all our results, keyed by msg_id
178 a dict of all our results, keyed by msg_id
179
179
180 block : bool
180 block : bool
181 determines default behavior when block not specified
181 determines default behavior when block not specified
182 in execution methods
182 in execution methods
183
183
184 Methods
184 Methods
185 -------
185 -------
186
186
187 spin
187 spin
188 flushes incoming results and registration state changes
188 flushes incoming results and registration state changes
189 control methods spin, and requesting `ids` also ensures up to date
189 control methods spin, and requesting `ids` also ensures up to date
190
190
191 wait
191 wait
192 wait on one or more msg_ids
192 wait on one or more msg_ids
193
193
194 execution methods
194 execution methods
195 apply
195 apply
196 legacy: execute, run
196 legacy: execute, run
197
197
198 data movement
198 data movement
199 push, pull, scatter, gather
199 push, pull, scatter, gather
200
200
201 query methods
201 query methods
202 queue_status, get_result, purge, result_status
202 queue_status, get_result, purge, result_status
203
203
204 control methods
204 control methods
205 abort, shutdown
205 abort, shutdown
206
206
207 """
207 """
208
208
209
209
210 block = Bool(False)
210 block = Bool(False)
211 outstanding = Set()
211 outstanding = Set()
212 results = Instance('collections.defaultdict', (dict,))
212 results = Instance('collections.defaultdict', (dict,))
213 metadata = Instance('collections.defaultdict', (Metadata,))
213 metadata = Instance('collections.defaultdict', (Metadata,))
214 history = List()
214 history = List()
215 debug = Bool(False)
215 debug = Bool(False)
216 profile=CUnicode('default')
216 profile=CUnicode('default')
217
217
218 _outstanding_dict = Instance('collections.defaultdict', (set,))
218 _outstanding_dict = Instance('collections.defaultdict', (set,))
219 _ids = List()
219 _ids = List()
220 _connected=Bool(False)
220 _connected=Bool(False)
221 _ssh=Bool(False)
221 _ssh=Bool(False)
222 _context = Instance('zmq.Context')
222 _context = Instance('zmq.Context')
223 _config = Dict()
223 _config = Dict()
224 _engines=Instance(util.ReverseDict, (), {})
224 _engines=Instance(util.ReverseDict, (), {})
225 # _hub_socket=Instance('zmq.Socket')
225 # _hub_socket=Instance('zmq.Socket')
226 _query_socket=Instance('zmq.Socket')
226 _query_socket=Instance('zmq.Socket')
227 _control_socket=Instance('zmq.Socket')
227 _control_socket=Instance('zmq.Socket')
228 _iopub_socket=Instance('zmq.Socket')
228 _iopub_socket=Instance('zmq.Socket')
229 _notification_socket=Instance('zmq.Socket')
229 _notification_socket=Instance('zmq.Socket')
230 _mux_socket=Instance('zmq.Socket')
230 _mux_socket=Instance('zmq.Socket')
231 _task_socket=Instance('zmq.Socket')
231 _task_socket=Instance('zmq.Socket')
232 _task_scheme=Str()
232 _task_scheme=Str()
233 _closed = False
233 _closed = False
234 _ignored_control_replies=Int(0)
234 _ignored_control_replies=Int(0)
235 _ignored_hub_replies=Int(0)
235 _ignored_hub_replies=Int(0)
236
236
237 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
237 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
238 context=None, username=None, debug=False, exec_key=None,
238 context=None, username=None, debug=False, exec_key=None,
239 sshserver=None, sshkey=None, password=None, paramiko=None,
239 sshserver=None, sshkey=None, password=None, paramiko=None,
240 timeout=10
240 timeout=10
241 ):
241 ):
242 super(Client, self).__init__(debug=debug, profile=profile)
242 super(Client, self).__init__(debug=debug, profile=profile)
243 if context is None:
243 if context is None:
244 context = zmq.Context.instance()
244 context = zmq.Context.instance()
245 self._context = context
245 self._context = context
246
246
247
247
248 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
248 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
249 if self._cd is not None:
249 if self._cd is not None:
250 if url_or_file is None:
250 if url_or_file is None:
251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
252 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
252 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
253 " Please specify at least one of url_or_file or profile."
253 " Please specify at least one of url_or_file or profile."
254
254
255 try:
255 try:
256 util.validate_url(url_or_file)
256 util.validate_url(url_or_file)
257 except AssertionError:
257 except AssertionError:
258 if not os.path.exists(url_or_file):
258 if not os.path.exists(url_or_file):
259 if self._cd:
259 if self._cd:
260 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
260 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
261 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
261 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
262 with open(url_or_file) as f:
262 with open(url_or_file) as f:
263 cfg = json.loads(f.read())
263 cfg = json.loads(f.read())
264 else:
264 else:
265 cfg = {'url':url_or_file}
265 cfg = {'url':url_or_file}
266
266
267 # sync defaults from args, json:
267 # sync defaults from args, json:
268 if sshserver:
268 if sshserver:
269 cfg['ssh'] = sshserver
269 cfg['ssh'] = sshserver
270 if exec_key:
270 if exec_key:
271 cfg['exec_key'] = exec_key
271 cfg['exec_key'] = exec_key
272 exec_key = cfg['exec_key']
272 exec_key = cfg['exec_key']
273 sshserver=cfg['ssh']
273 sshserver=cfg['ssh']
274 url = cfg['url']
274 url = cfg['url']
275 location = cfg.setdefault('location', None)
275 location = cfg.setdefault('location', None)
276 cfg['url'] = util.disambiguate_url(cfg['url'], location)
276 cfg['url'] = util.disambiguate_url(cfg['url'], location)
277 url = cfg['url']
277 url = cfg['url']
278
278
279 self._config = cfg
279 self._config = cfg
280
280
281 self._ssh = bool(sshserver or sshkey or password)
281 self._ssh = bool(sshserver or sshkey or password)
282 if self._ssh and sshserver is None:
282 if self._ssh and sshserver is None:
283 # default to ssh via localhost
283 # default to ssh via localhost
284 sshserver = url.split('://')[1].split(':')[0]
284 sshserver = url.split('://')[1].split(':')[0]
285 if self._ssh and password is None:
285 if self._ssh and password is None:
286 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
286 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
287 password=False
287 password=False
288 else:
288 else:
289 password = getpass("SSH Password for %s: "%sshserver)
289 password = getpass("SSH Password for %s: "%sshserver)
290 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
290 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
291 if exec_key is not None and os.path.isfile(exec_key):
291 if exec_key is not None and os.path.isfile(exec_key):
292 arg = 'keyfile'
292 arg = 'keyfile'
293 else:
293 else:
294 arg = 'key'
294 arg = 'key'
295 key_arg = {arg:exec_key}
295 key_arg = {arg:exec_key}
296 if username is None:
296 if username is None:
297 self.session = ss.StreamSession(**key_arg)
297 self.session = ss.StreamSession(**key_arg)
298 else:
298 else:
299 self.session = ss.StreamSession(username, **key_arg)
299 self.session = ss.StreamSession(username, **key_arg)
300 self._query_socket = self._context.socket(zmq.XREQ)
300 self._query_socket = self._context.socket(zmq.XREQ)
301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 if self._ssh:
302 if self._ssh:
303 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
303 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
304 else:
304 else:
305 self._query_socket.connect(url)
305 self._query_socket.connect(url)
306
306
307 self.session.debug = self.debug
307 self.session.debug = self.debug
308
308
309 self._notification_handlers = {'registration_notification' : self._register_engine,
309 self._notification_handlers = {'registration_notification' : self._register_engine,
310 'unregistration_notification' : self._unregister_engine,
310 'unregistration_notification' : self._unregister_engine,
311 'shutdown_notification' : lambda msg: self.close(),
311 'shutdown_notification' : lambda msg: self.close(),
312 }
312 }
313 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
313 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
314 'apply_reply' : self._handle_apply_reply}
314 'apply_reply' : self._handle_apply_reply}
315 self._connect(sshserver, ssh_kwargs, timeout)
315 self._connect(sshserver, ssh_kwargs, timeout)
316
316
317 def __del__(self):
317 def __del__(self):
318 """cleanup sockets, but _not_ context."""
318 """cleanup sockets, but _not_ context."""
319 self.close()
319 self.close()
320
320
321 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
321 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
322 if ipython_dir is None:
322 if ipython_dir is None:
323 ipython_dir = get_ipython_dir()
323 ipython_dir = get_ipython_dir()
324 if cluster_dir is not None:
324 if cluster_dir is not None:
325 try:
325 try:
326 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
326 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
327 return
327 return
328 except ClusterDirError:
328 except ClusterDirError:
329 pass
329 pass
330 elif profile is not None:
330 elif profile is not None:
331 try:
331 try:
332 self._cd = ClusterDir.find_cluster_dir_by_profile(
332 self._cd = ClusterDir.find_cluster_dir_by_profile(
333 ipython_dir, profile)
333 ipython_dir, profile)
334 return
334 return
335 except ClusterDirError:
335 except ClusterDirError:
336 pass
336 pass
337 self._cd = None
337 self._cd = None
338
338
339 def _update_engines(self, engines):
339 def _update_engines(self, engines):
340 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
340 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
341 for k,v in engines.iteritems():
341 for k,v in engines.iteritems():
342 eid = int(k)
342 eid = int(k)
343 self._engines[eid] = bytes(v) # force not unicode
343 self._engines[eid] = bytes(v) # force not unicode
344 self._ids.append(eid)
344 self._ids.append(eid)
345 self._ids = sorted(self._ids)
345 self._ids = sorted(self._ids)
346 if sorted(self._engines.keys()) != range(len(self._engines)) and \
346 if sorted(self._engines.keys()) != range(len(self._engines)) and \
347 self._task_scheme == 'pure' and self._task_socket:
347 self._task_scheme == 'pure' and self._task_socket:
348 self._stop_scheduling_tasks()
348 self._stop_scheduling_tasks()
349
349
350 def _stop_scheduling_tasks(self):
350 def _stop_scheduling_tasks(self):
351 """Stop scheduling tasks because an engine has been unregistered
351 """Stop scheduling tasks because an engine has been unregistered
352 from a pure ZMQ scheduler.
352 from a pure ZMQ scheduler.
353 """
353 """
354 self._task_socket.close()
354 self._task_socket.close()
355 self._task_socket = None
355 self._task_socket = None
356 msg = "An engine has been unregistered, and we are using pure " +\
356 msg = "An engine has been unregistered, and we are using pure " +\
357 "ZMQ task scheduling. Task farming will be disabled."
357 "ZMQ task scheduling. Task farming will be disabled."
358 if self.outstanding:
358 if self.outstanding:
359 msg += " If you were running tasks when this happened, " +\
359 msg += " If you were running tasks when this happened, " +\
360 "some `outstanding` msg_ids may never resolve."
360 "some `outstanding` msg_ids may never resolve."
361 warnings.warn(msg, RuntimeWarning)
361 warnings.warn(msg, RuntimeWarning)
362
362
363 def _build_targets(self, targets):
363 def _build_targets(self, targets):
364 """Turn valid target IDs or 'all' into two lists:
364 """Turn valid target IDs or 'all' into two lists:
365 (int_ids, uuids).
365 (int_ids, uuids).
366 """
366 """
367 if not self._ids:
367 if not self._ids:
368 # flush notification socket if no engines yet, just in case
368 # flush notification socket if no engines yet, just in case
369 if not self.ids:
369 if not self.ids:
370 raise error.NoEnginesRegistered("Can't build targets without any engines")
370 raise error.NoEnginesRegistered("Can't build targets without any engines")
371
371
372 if targets is None:
372 if targets is None:
373 targets = self._ids
373 targets = self._ids
374 elif isinstance(targets, str):
374 elif isinstance(targets, str):
375 if targets.lower() == 'all':
375 if targets.lower() == 'all':
376 targets = self._ids
376 targets = self._ids
377 else:
377 else:
378 raise TypeError("%r not valid str target, must be 'all'"%(targets))
378 raise TypeError("%r not valid str target, must be 'all'"%(targets))
379 elif isinstance(targets, int):
379 elif isinstance(targets, int):
380 if targets < 0:
380 if targets < 0:
381 targets = self.ids[targets]
381 targets = self.ids[targets]
382 if targets not in self._ids:
382 if targets not in self._ids:
383 raise IndexError("No such engine: %i"%targets)
383 raise IndexError("No such engine: %i"%targets)
384 targets = [targets]
384 targets = [targets]
385
385
386 if isinstance(targets, slice):
386 if isinstance(targets, slice):
387 indices = range(len(self._ids))[targets]
387 indices = range(len(self._ids))[targets]
388 ids = self.ids
388 ids = self.ids
389 targets = [ ids[i] for i in indices ]
389 targets = [ ids[i] for i in indices ]
390
390
391 if not isinstance(targets, (tuple, list, xrange)):
391 if not isinstance(targets, (tuple, list, xrange)):
392 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
392 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
393
393
394 return [self._engines[t] for t in targets], list(targets)
394 return [self._engines[t] for t in targets], list(targets)
395
395
396 def _connect(self, sshserver, ssh_kwargs, timeout):
396 def _connect(self, sshserver, ssh_kwargs, timeout):
397 """setup all our socket connections to the cluster. This is called from
397 """setup all our socket connections to the cluster. This is called from
398 __init__."""
398 __init__."""
399
399
400 # Maybe allow reconnecting?
400 # Maybe allow reconnecting?
401 if self._connected:
401 if self._connected:
402 return
402 return
403 self._connected=True
403 self._connected=True
404
404
405 def connect_socket(s, url):
405 def connect_socket(s, url):
406 url = util.disambiguate_url(url, self._config['location'])
406 url = util.disambiguate_url(url, self._config['location'])
407 if self._ssh:
407 if self._ssh:
408 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
408 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
409 else:
409 else:
410 return s.connect(url)
410 return s.connect(url)
411
411
412 self.session.send(self._query_socket, 'connection_request')
412 self.session.send(self._query_socket, 'connection_request')
413 r,w,x = zmq.select([self._query_socket],[],[], timeout)
413 r,w,x = zmq.select([self._query_socket],[],[], timeout)
414 if not r:
414 if not r:
415 raise error.TimeoutError("Hub connection request timed out")
415 raise error.TimeoutError("Hub connection request timed out")
416 idents,msg = self.session.recv(self._query_socket,mode=0)
416 idents,msg = self.session.recv(self._query_socket,mode=0)
417 if self.debug:
417 if self.debug:
418 pprint(msg)
418 pprint(msg)
419 msg = ss.Message(msg)
419 msg = ss.Message(msg)
420 content = msg.content
420 content = msg.content
421 self._config['registration'] = dict(content)
421 self._config['registration'] = dict(content)
422 if content.status == 'ok':
422 if content.status == 'ok':
423 if content.mux:
423 if content.mux:
424 self._mux_socket = self._context.socket(zmq.XREQ)
424 self._mux_socket = self._context.socket(zmq.XREQ)
425 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
425 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
426 connect_socket(self._mux_socket, content.mux)
426 connect_socket(self._mux_socket, content.mux)
427 if content.task:
427 if content.task:
428 self._task_scheme, task_addr = content.task
428 self._task_scheme, task_addr = content.task
429 self._task_socket = self._context.socket(zmq.XREQ)
429 self._task_socket = self._context.socket(zmq.XREQ)
430 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
430 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
431 connect_socket(self._task_socket, task_addr)
431 connect_socket(self._task_socket, task_addr)
432 if content.notification:
432 if content.notification:
433 self._notification_socket = self._context.socket(zmq.SUB)
433 self._notification_socket = self._context.socket(zmq.SUB)
434 connect_socket(self._notification_socket, content.notification)
434 connect_socket(self._notification_socket, content.notification)
435 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
435 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
436 # if content.query:
436 # if content.query:
437 # self._query_socket = self._context.socket(zmq.XREQ)
437 # self._query_socket = self._context.socket(zmq.XREQ)
438 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
438 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
439 # connect_socket(self._query_socket, content.query)
439 # connect_socket(self._query_socket, content.query)
440 if content.control:
440 if content.control:
441 self._control_socket = self._context.socket(zmq.XREQ)
441 self._control_socket = self._context.socket(zmq.XREQ)
442 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
442 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
443 connect_socket(self._control_socket, content.control)
443 connect_socket(self._control_socket, content.control)
444 if content.iopub:
444 if content.iopub:
445 self._iopub_socket = self._context.socket(zmq.SUB)
445 self._iopub_socket = self._context.socket(zmq.SUB)
446 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
446 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
447 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
447 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 connect_socket(self._iopub_socket, content.iopub)
448 connect_socket(self._iopub_socket, content.iopub)
449 self._update_engines(dict(content.engines))
449 self._update_engines(dict(content.engines))
450 else:
450 else:
451 self._connected = False
451 self._connected = False
452 raise Exception("Failed to connect!")
452 raise Exception("Failed to connect!")
453
453
454 #--------------------------------------------------------------------------
454 #--------------------------------------------------------------------------
455 # handlers and callbacks for incoming messages
455 # handlers and callbacks for incoming messages
456 #--------------------------------------------------------------------------
456 #--------------------------------------------------------------------------
457
457
458 def _unwrap_exception(self, content):
458 def _unwrap_exception(self, content):
459 """unwrap exception, and remap engine_id to int."""
459 """unwrap exception, and remap engine_id to int."""
460 e = error.unwrap_exception(content)
460 e = error.unwrap_exception(content)
461 # print e.traceback
461 # print e.traceback
462 if e.engine_info:
462 if e.engine_info:
463 e_uuid = e.engine_info['engine_uuid']
463 e_uuid = e.engine_info['engine_uuid']
464 eid = self._engines[e_uuid]
464 eid = self._engines[e_uuid]
465 e.engine_info['engine_id'] = eid
465 e.engine_info['engine_id'] = eid
466 return e
466 return e
467
467
468 def _extract_metadata(self, header, parent, content):
468 def _extract_metadata(self, header, parent, content):
469 md = {'msg_id' : parent['msg_id'],
469 md = {'msg_id' : parent['msg_id'],
470 'received' : datetime.now(),
470 'received' : datetime.now(),
471 'engine_uuid' : header.get('engine', None),
471 'engine_uuid' : header.get('engine', None),
472 'follow' : parent.get('follow', []),
472 'follow' : parent.get('follow', []),
473 'after' : parent.get('after', []),
473 'after' : parent.get('after', []),
474 'status' : content['status'],
474 'status' : content['status'],
475 }
475 }
476
476
477 if md['engine_uuid'] is not None:
477 if md['engine_uuid'] is not None:
478 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
478 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
479
479
480 if 'date' in parent:
480 if 'date' in parent:
481 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
481 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
482 if 'started' in header:
482 if 'started' in header:
483 md['started'] = datetime.strptime(header['started'], util.ISO8601)
483 md['started'] = datetime.strptime(header['started'], util.ISO8601)
484 if 'date' in header:
484 if 'date' in header:
485 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
485 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
486 return md
486 return md
487
487
488 def _register_engine(self, msg):
488 def _register_engine(self, msg):
489 """Register a new engine, and update our connection info."""
489 """Register a new engine, and update our connection info."""
490 content = msg['content']
490 content = msg['content']
491 eid = content['id']
491 eid = content['id']
492 d = {eid : content['queue']}
492 d = {eid : content['queue']}
493 self._update_engines(d)
493 self._update_engines(d)
494
494
495 def _unregister_engine(self, msg):
495 def _unregister_engine(self, msg):
496 """Unregister an engine that has died."""
496 """Unregister an engine that has died."""
497 content = msg['content']
497 content = msg['content']
498 eid = int(content['id'])
498 eid = int(content['id'])
499 if eid in self._ids:
499 if eid in self._ids:
500 self._ids.remove(eid)
500 self._ids.remove(eid)
501 uuid = self._engines.pop(eid)
501 uuid = self._engines.pop(eid)
502
502
503 self._handle_stranded_msgs(eid, uuid)
503 self._handle_stranded_msgs(eid, uuid)
504
504
505 if self._task_socket and self._task_scheme == 'pure':
505 if self._task_socket and self._task_scheme == 'pure':
506 self._stop_scheduling_tasks()
506 self._stop_scheduling_tasks()
507
507
508 def _handle_stranded_msgs(self, eid, uuid):
508 def _handle_stranded_msgs(self, eid, uuid):
509 """Handle messages known to be on an engine when the engine unregisters.
509 """Handle messages known to be on an engine when the engine unregisters.
510
510
511 It is possible that this will fire prematurely - that is, an engine will
511 It is possible that this will fire prematurely - that is, an engine will
512 go down after completing a result, and the client will be notified
512 go down after completing a result, and the client will be notified
513 of the unregistration and later receive the successful result.
513 of the unregistration and later receive the successful result.
514 """
514 """
515
515
516 outstanding = self._outstanding_dict[uuid]
516 outstanding = self._outstanding_dict[uuid]
517
517
518 for msg_id in list(outstanding):
518 for msg_id in list(outstanding):
519 if msg_id in self.results:
519 if msg_id in self.results:
520 # we already
520 # we already
521 continue
521 continue
522 try:
522 try:
523 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
523 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
524 except:
524 except:
525 content = error.wrap_exception()
525 content = error.wrap_exception()
526 # build a fake message:
526 # build a fake message:
527 parent = {}
527 parent = {}
528 header = {}
528 header = {}
529 parent['msg_id'] = msg_id
529 parent['msg_id'] = msg_id
530 header['engine'] = uuid
530 header['engine'] = uuid
531 header['date'] = datetime.now().strftime(util.ISO8601)
531 header['date'] = datetime.now().strftime(util.ISO8601)
532 msg = dict(parent_header=parent, header=header, content=content)
532 msg = dict(parent_header=parent, header=header, content=content)
533 self._handle_apply_reply(msg)
533 self._handle_apply_reply(msg)
534
534
535 def _handle_execute_reply(self, msg):
535 def _handle_execute_reply(self, msg):
536 """Save the reply to an execute_request into our results.
536 """Save the reply to an execute_request into our results.
537
537
538 execute messages are never actually used. apply is used instead.
538 execute messages are never actually used. apply is used instead.
539 """
539 """
540
540
541 parent = msg['parent_header']
541 parent = msg['parent_header']
542 msg_id = parent['msg_id']
542 msg_id = parent['msg_id']
543 if msg_id not in self.outstanding:
543 if msg_id not in self.outstanding:
544 if msg_id in self.history:
544 if msg_id in self.history:
545 print ("got stale result: %s"%msg_id)
545 print ("got stale result: %s"%msg_id)
546 else:
546 else:
547 print ("got unknown result: %s"%msg_id)
547 print ("got unknown result: %s"%msg_id)
548 else:
548 else:
549 self.outstanding.remove(msg_id)
549 self.outstanding.remove(msg_id)
550 self.results[msg_id] = self._unwrap_exception(msg['content'])
550 self.results[msg_id] = self._unwrap_exception(msg['content'])
551
551
552 def _handle_apply_reply(self, msg):
552 def _handle_apply_reply(self, msg):
553 """Save the reply to an apply_request into our results."""
553 """Save the reply to an apply_request into our results."""
554 parent = msg['parent_header']
554 parent = msg['parent_header']
555 msg_id = parent['msg_id']
555 msg_id = parent['msg_id']
556 if msg_id not in self.outstanding:
556 if msg_id not in self.outstanding:
557 if msg_id in self.history:
557 if msg_id in self.history:
558 print ("got stale result: %s"%msg_id)
558 print ("got stale result: %s"%msg_id)
559 print self.results[msg_id]
559 print self.results[msg_id]
560 print msg
560 print msg
561 else:
561 else:
562 print ("got unknown result: %s"%msg_id)
562 print ("got unknown result: %s"%msg_id)
563 else:
563 else:
564 self.outstanding.remove(msg_id)
564 self.outstanding.remove(msg_id)
565 content = msg['content']
565 content = msg['content']
566 header = msg['header']
566 header = msg['header']
567
567
568 # construct metadata:
568 # construct metadata:
569 md = self.metadata[msg_id]
569 md = self.metadata[msg_id]
570 md.update(self._extract_metadata(header, parent, content))
570 md.update(self._extract_metadata(header, parent, content))
571 # is this redundant?
571 # is this redundant?
572 self.metadata[msg_id] = md
572 self.metadata[msg_id] = md
573
573
574 e_outstanding = self._outstanding_dict[md['engine_uuid']]
574 e_outstanding = self._outstanding_dict[md['engine_uuid']]
575 if msg_id in e_outstanding:
575 if msg_id in e_outstanding:
576 e_outstanding.remove(msg_id)
576 e_outstanding.remove(msg_id)
577
577
578 # construct result:
578 # construct result:
579 if content['status'] == 'ok':
579 if content['status'] == 'ok':
580 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
580 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
581 elif content['status'] == 'aborted':
581 elif content['status'] == 'aborted':
582 self.results[msg_id] = error.TaskAborted(msg_id)
582 self.results[msg_id] = error.TaskAborted(msg_id)
583 elif content['status'] == 'resubmitted':
583 elif content['status'] == 'resubmitted':
584 # TODO: handle resubmission
584 # TODO: handle resubmission
585 pass
585 pass
586 else:
586 else:
587 self.results[msg_id] = self._unwrap_exception(content)
587 self.results[msg_id] = self._unwrap_exception(content)
588
588
589 def _flush_notifications(self):
589 def _flush_notifications(self):
590 """Flush notifications of engine registrations waiting
590 """Flush notifications of engine registrations waiting
591 in ZMQ queue."""
591 in ZMQ queue."""
592 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
592 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
593 while msg is not None:
593 while msg is not None:
594 if self.debug:
594 if self.debug:
595 pprint(msg)
595 pprint(msg)
596 msg = msg[-1]
596 msg = msg[-1]
597 msg_type = msg['msg_type']
597 msg_type = msg['msg_type']
598 handler = self._notification_handlers.get(msg_type, None)
598 handler = self._notification_handlers.get(msg_type, None)
599 if handler is None:
599 if handler is None:
600 raise Exception("Unhandled message type: %s"%msg.msg_type)
600 raise Exception("Unhandled message type: %s"%msg.msg_type)
601 else:
601 else:
602 handler(msg)
602 handler(msg)
603 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
603 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
604
604
605 def _flush_results(self, sock):
605 def _flush_results(self, sock):
606 """Flush task or queue results waiting in ZMQ queue."""
606 """Flush task or queue results waiting in ZMQ queue."""
607 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
607 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 while msg is not None:
608 while msg is not None:
609 if self.debug:
609 if self.debug:
610 pprint(msg)
610 pprint(msg)
611 msg = msg[-1]
611 msg = msg[-1]
612 msg_type = msg['msg_type']
612 msg_type = msg['msg_type']
613 handler = self._queue_handlers.get(msg_type, None)
613 handler = self._queue_handlers.get(msg_type, None)
614 if handler is None:
614 if handler is None:
615 raise Exception("Unhandled message type: %s"%msg.msg_type)
615 raise Exception("Unhandled message type: %s"%msg.msg_type)
616 else:
616 else:
617 handler(msg)
617 handler(msg)
618 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
618 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619
619
620 def _flush_control(self, sock):
620 def _flush_control(self, sock):
621 """Flush replies from the control channel waiting
621 """Flush replies from the control channel waiting
622 in the ZMQ queue.
622 in the ZMQ queue.
623
623
624 Currently: ignore them."""
624 Currently: ignore them."""
625 if self._ignored_control_replies <= 0:
625 if self._ignored_control_replies <= 0:
626 return
626 return
627 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
627 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
628 while msg is not None:
628 while msg is not None:
629 self._ignored_control_replies -= 1
629 self._ignored_control_replies -= 1
630 if self.debug:
630 if self.debug:
631 pprint(msg)
631 pprint(msg)
632 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
632 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
633
633
634 def _flush_ignored_control(self):
634 def _flush_ignored_control(self):
635 """flush ignored control replies"""
635 """flush ignored control replies"""
636 while self._ignored_control_replies > 0:
636 while self._ignored_control_replies > 0:
637 self.session.recv(self._control_socket)
637 self.session.recv(self._control_socket)
638 self._ignored_control_replies -= 1
638 self._ignored_control_replies -= 1
639
639
640 def _flush_ignored_hub_replies(self):
640 def _flush_ignored_hub_replies(self):
641 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
641 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
642 while msg is not None:
642 while msg is not None:
643 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
643 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
644
644
645 def _flush_iopub(self, sock):
645 def _flush_iopub(self, sock):
646 """Flush replies from the iopub channel waiting
646 """Flush replies from the iopub channel waiting
647 in the ZMQ queue.
647 in the ZMQ queue.
648 """
648 """
649 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
649 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 while msg is not None:
650 while msg is not None:
651 if self.debug:
651 if self.debug:
652 pprint(msg)
652 pprint(msg)
653 msg = msg[-1]
653 msg = msg[-1]
654 parent = msg['parent_header']
654 parent = msg['parent_header']
655 msg_id = parent['msg_id']
655 msg_id = parent['msg_id']
656 content = msg['content']
656 content = msg['content']
657 header = msg['header']
657 header = msg['header']
658 msg_type = msg['msg_type']
658 msg_type = msg['msg_type']
659
659
660 # init metadata:
660 # init metadata:
661 md = self.metadata[msg_id]
661 md = self.metadata[msg_id]
662
662
663 if msg_type == 'stream':
663 if msg_type == 'stream':
664 name = content['name']
664 name = content['name']
665 s = md[name] or ''
665 s = md[name] or ''
666 md[name] = s + content['data']
666 md[name] = s + content['data']
667 elif msg_type == 'pyerr':
667 elif msg_type == 'pyerr':
668 md.update({'pyerr' : self._unwrap_exception(content)})
668 md.update({'pyerr' : self._unwrap_exception(content)})
669 elif msg_type == 'pyin':
669 elif msg_type == 'pyin':
670 md.update({'pyin' : content['code']})
670 md.update({'pyin' : content['code']})
671 else:
671 else:
672 md.update({msg_type : content.get('data', '')})
672 md.update({msg_type : content.get('data', '')})
673
673
674 # reduntant?
674 # reduntant?
675 self.metadata[msg_id] = md
675 self.metadata[msg_id] = md
676
676
677 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
677 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
678
678
679 #--------------------------------------------------------------------------
679 #--------------------------------------------------------------------------
680 # len, getitem
680 # len, getitem
681 #--------------------------------------------------------------------------
681 #--------------------------------------------------------------------------
682
682
683 def __len__(self):
683 def __len__(self):
684 """len(client) returns # of engines."""
684 """len(client) returns # of engines."""
685 return len(self.ids)
685 return len(self.ids)
686
686
687 def __getitem__(self, key):
687 def __getitem__(self, key):
688 """index access returns DirectView multiplexer objects
688 """index access returns DirectView multiplexer objects
689
689
690 Must be int, slice, or list/tuple/xrange of ints"""
690 Must be int, slice, or list/tuple/xrange of ints"""
691 if not isinstance(key, (int, slice, tuple, list, xrange)):
691 if not isinstance(key, (int, slice, tuple, list, xrange)):
692 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
692 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
693 else:
693 else:
694 return self.direct_view(key)
694 return self.direct_view(key)
695
695
696 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
697 # Begin public methods
697 # Begin public methods
698 #--------------------------------------------------------------------------
698 #--------------------------------------------------------------------------
699
699
700 @property
700 @property
701 def ids(self):
701 def ids(self):
702 """Always up-to-date ids property."""
702 """Always up-to-date ids property."""
703 self._flush_notifications()
703 self._flush_notifications()
704 # always copy:
704 # always copy:
705 return list(self._ids)
705 return list(self._ids)
706
706
707 def close(self):
707 def close(self):
708 if self._closed:
708 if self._closed:
709 return
709 return
710 snames = filter(lambda n: n.endswith('socket'), dir(self))
710 snames = filter(lambda n: n.endswith('socket'), dir(self))
711 for socket in map(lambda name: getattr(self, name), snames):
711 for socket in map(lambda name: getattr(self, name), snames):
712 if isinstance(socket, zmq.Socket) and not socket.closed:
712 if isinstance(socket, zmq.Socket) and not socket.closed:
713 socket.close()
713 socket.close()
714 self._closed = True
714 self._closed = True
715
715
716 def spin(self):
716 def spin(self):
717 """Flush any registration notifications and execution results
717 """Flush any registration notifications and execution results
718 waiting in the ZMQ queue.
718 waiting in the ZMQ queue.
719 """
719 """
720 if self._notification_socket:
720 if self._notification_socket:
721 self._flush_notifications()
721 self._flush_notifications()
722 if self._mux_socket:
722 if self._mux_socket:
723 self._flush_results(self._mux_socket)
723 self._flush_results(self._mux_socket)
724 if self._task_socket:
724 if self._task_socket:
725 self._flush_results(self._task_socket)
725 self._flush_results(self._task_socket)
726 if self._control_socket:
726 if self._control_socket:
727 self._flush_control(self._control_socket)
727 self._flush_control(self._control_socket)
728 if self._iopub_socket:
728 if self._iopub_socket:
729 self._flush_iopub(self._iopub_socket)
729 self._flush_iopub(self._iopub_socket)
730 if self._query_socket:
730 if self._query_socket:
731 self._flush_ignored_hub_replies()
731 self._flush_ignored_hub_replies()
732
732
733 def wait(self, jobs=None, timeout=-1):
733 def wait(self, jobs=None, timeout=-1):
734 """waits on one or more `jobs`, for up to `timeout` seconds.
734 """waits on one or more `jobs`, for up to `timeout` seconds.
735
735
736 Parameters
736 Parameters
737 ----------
737 ----------
738
738
739 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
739 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
740 ints are indices to self.history
740 ints are indices to self.history
741 strs are msg_ids
741 strs are msg_ids
742 default: wait on all outstanding messages
742 default: wait on all outstanding messages
743 timeout : float
743 timeout : float
744 a time in seconds, after which to give up.
744 a time in seconds, after which to give up.
745 default is -1, which means no timeout
745 default is -1, which means no timeout
746
746
747 Returns
747 Returns
748 -------
748 -------
749
749
750 True : when all msg_ids are done
750 True : when all msg_ids are done
751 False : timeout reached, some msg_ids still outstanding
751 False : timeout reached, some msg_ids still outstanding
752 """
752 """
753 tic = time.time()
753 tic = time.time()
754 if jobs is None:
754 if jobs is None:
755 theids = self.outstanding
755 theids = self.outstanding
756 else:
756 else:
757 if isinstance(jobs, (int, str, AsyncResult)):
757 if isinstance(jobs, (int, str, AsyncResult)):
758 jobs = [jobs]
758 jobs = [jobs]
759 theids = set()
759 theids = set()
760 for job in jobs:
760 for job in jobs:
761 if isinstance(job, int):
761 if isinstance(job, int):
762 # index access
762 # index access
763 job = self.history[job]
763 job = self.history[job]
764 elif isinstance(job, AsyncResult):
764 elif isinstance(job, AsyncResult):
765 map(theids.add, job.msg_ids)
765 map(theids.add, job.msg_ids)
766 continue
766 continue
767 theids.add(job)
767 theids.add(job)
768 if not theids.intersection(self.outstanding):
768 if not theids.intersection(self.outstanding):
769 return True
769 return True
770 self.spin()
770 self.spin()
771 while theids.intersection(self.outstanding):
771 while theids.intersection(self.outstanding):
772 if timeout >= 0 and ( time.time()-tic ) > timeout:
772 if timeout >= 0 and ( time.time()-tic ) > timeout:
773 break
773 break
774 time.sleep(1e-3)
774 time.sleep(1e-3)
775 self.spin()
775 self.spin()
776 return len(theids.intersection(self.outstanding)) == 0
776 return len(theids.intersection(self.outstanding)) == 0
777
777
778 #--------------------------------------------------------------------------
778 #--------------------------------------------------------------------------
779 # Control methods
779 # Control methods
780 #--------------------------------------------------------------------------
780 #--------------------------------------------------------------------------
781
781
782 @spin_first
782 @spin_first
783 def clear(self, targets=None, block=None):
783 def clear(self, targets=None, block=None):
784 """Clear the namespace in target(s)."""
784 """Clear the namespace in target(s)."""
785 block = self.block if block is None else block
785 block = self.block if block is None else block
786 targets = self._build_targets(targets)[0]
786 targets = self._build_targets(targets)[0]
787 for t in targets:
787 for t in targets:
788 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
788 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
789 error = False
789 error = False
790 if block:
790 if block:
791 self._flush_ignored_control()
791 self._flush_ignored_control()
792 for i in range(len(targets)):
792 for i in range(len(targets)):
793 idents,msg = self.session.recv(self._control_socket,0)
793 idents,msg = self.session.recv(self._control_socket,0)
794 if self.debug:
794 if self.debug:
795 pprint(msg)
795 pprint(msg)
796 if msg['content']['status'] != 'ok':
796 if msg['content']['status'] != 'ok':
797 error = self._unwrap_exception(msg['content'])
797 error = self._unwrap_exception(msg['content'])
798 else:
798 else:
799 self._ignored_control_replies += len(targets)
799 self._ignored_control_replies += len(targets)
800 if error:
800 if error:
801 raise error
801 raise error
802
802
803
803
804 @spin_first
804 @spin_first
805 def abort(self, jobs=None, targets=None, block=None):
805 def abort(self, jobs=None, targets=None, block=None):
806 """Abort specific jobs from the execution queues of target(s).
806 """Abort specific jobs from the execution queues of target(s).
807
807
808 This is a mechanism to prevent jobs that have already been submitted
808 This is a mechanism to prevent jobs that have already been submitted
809 from executing.
809 from executing.
810
810
811 Parameters
811 Parameters
812 ----------
812 ----------
813
813
814 jobs : msg_id, list of msg_ids, or AsyncResult
814 jobs : msg_id, list of msg_ids, or AsyncResult
815 The jobs to be aborted
815 The jobs to be aborted
816
816
817
817
818 """
818 """
819 block = self.block if block is None else block
819 block = self.block if block is None else block
820 targets = self._build_targets(targets)[0]
820 targets = self._build_targets(targets)[0]
821 msg_ids = []
821 msg_ids = []
822 if isinstance(jobs, (basestring,AsyncResult)):
822 if isinstance(jobs, (basestring,AsyncResult)):
823 jobs = [jobs]
823 jobs = [jobs]
824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
825 if bad_ids:
825 if bad_ids:
826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
827 for j in jobs:
827 for j in jobs:
828 if isinstance(j, AsyncResult):
828 if isinstance(j, AsyncResult):
829 msg_ids.extend(j.msg_ids)
829 msg_ids.extend(j.msg_ids)
830 else:
830 else:
831 msg_ids.append(j)
831 msg_ids.append(j)
832 content = dict(msg_ids=msg_ids)
832 content = dict(msg_ids=msg_ids)
833 for t in targets:
833 for t in targets:
834 self.session.send(self._control_socket, 'abort_request',
834 self.session.send(self._control_socket, 'abort_request',
835 content=content, ident=t)
835 content=content, ident=t)
836 error = False
836 error = False
837 if block:
837 if block:
838 self._flush_ignored_control()
838 self._flush_ignored_control()
839 for i in range(len(targets)):
839 for i in range(len(targets)):
840 idents,msg = self.session.recv(self._control_socket,0)
840 idents,msg = self.session.recv(self._control_socket,0)
841 if self.debug:
841 if self.debug:
842 pprint(msg)
842 pprint(msg)
843 if msg['content']['status'] != 'ok':
843 if msg['content']['status'] != 'ok':
844 error = self._unwrap_exception(msg['content'])
844 error = self._unwrap_exception(msg['content'])
845 else:
845 else:
846 self._ignored_control_replies += len(targets)
846 self._ignored_control_replies += len(targets)
847 if error:
847 if error:
848 raise error
848 raise error
849
849
850 @spin_first
850 @spin_first
851 def shutdown(self, targets=None, restart=False, hub=False, block=None):
851 def shutdown(self, targets=None, restart=False, hub=False, block=None):
852 """Terminates one or more engine processes, optionally including the hub."""
852 """Terminates one or more engine processes, optionally including the hub."""
853 block = self.block if block is None else block
853 block = self.block if block is None else block
854 if hub:
854 if hub:
855 targets = 'all'
855 targets = 'all'
856 targets = self._build_targets(targets)[0]
856 targets = self._build_targets(targets)[0]
857 for t in targets:
857 for t in targets:
858 self.session.send(self._control_socket, 'shutdown_request',
858 self.session.send(self._control_socket, 'shutdown_request',
859 content={'restart':restart},ident=t)
859 content={'restart':restart},ident=t)
860 error = False
860 error = False
861 if block or hub:
861 if block or hub:
862 self._flush_ignored_control()
862 self._flush_ignored_control()
863 for i in range(len(targets)):
863 for i in range(len(targets)):
864 idents,msg = self.session.recv(self._control_socket, 0)
864 idents,msg = self.session.recv(self._control_socket, 0)
865 if self.debug:
865 if self.debug:
866 pprint(msg)
866 pprint(msg)
867 if msg['content']['status'] != 'ok':
867 if msg['content']['status'] != 'ok':
868 error = self._unwrap_exception(msg['content'])
868 error = self._unwrap_exception(msg['content'])
869 else:
869 else:
870 self._ignored_control_replies += len(targets)
870 self._ignored_control_replies += len(targets)
871
871
872 if hub:
872 if hub:
873 time.sleep(0.25)
873 time.sleep(0.25)
874 self.session.send(self._query_socket, 'shutdown_request')
874 self.session.send(self._query_socket, 'shutdown_request')
875 idents,msg = self.session.recv(self._query_socket, 0)
875 idents,msg = self.session.recv(self._query_socket, 0)
876 if self.debug:
876 if self.debug:
877 pprint(msg)
877 pprint(msg)
878 if msg['content']['status'] != 'ok':
878 if msg['content']['status'] != 'ok':
879 error = self._unwrap_exception(msg['content'])
879 error = self._unwrap_exception(msg['content'])
880
880
881 if error:
881 if error:
882 raise error
882 raise error
883
883
884 #--------------------------------------------------------------------------
884 #--------------------------------------------------------------------------
885 # Execution related methods
885 # Execution related methods
886 #--------------------------------------------------------------------------
886 #--------------------------------------------------------------------------
887
887
888 def _maybe_raise(self, result):
888 def _maybe_raise(self, result):
889 """wrapper for maybe raising an exception if apply failed."""
889 """wrapper for maybe raising an exception if apply failed."""
890 if isinstance(result, error.RemoteError):
890 if isinstance(result, error.RemoteError):
891 raise result
891 raise result
892
892
893 return result
893 return result
894
894
895 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
895 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
896 ident=None):
896 ident=None):
897 """construct and send an apply message via a socket.
897 """construct and send an apply message via a socket.
898
898
899 This is the principal method with which all engine execution is performed by views.
899 This is the principal method with which all engine execution is performed by views.
900 """
900 """
901
901
902 assert not self._closed, "cannot use me anymore, I'm closed!"
902 assert not self._closed, "cannot use me anymore, I'm closed!"
903 # defaults:
903 # defaults:
904 args = args if args is not None else []
904 args = args if args is not None else []
905 kwargs = kwargs if kwargs is not None else {}
905 kwargs = kwargs if kwargs is not None else {}
906 subheader = subheader if subheader is not None else {}
906 subheader = subheader if subheader is not None else {}
907
907
908 # validate arguments
908 # validate arguments
909 if not callable(f):
909 if not callable(f):
910 raise TypeError("f must be callable, not %s"%type(f))
910 raise TypeError("f must be callable, not %s"%type(f))
911 if not isinstance(args, (tuple, list)):
911 if not isinstance(args, (tuple, list)):
912 raise TypeError("args must be tuple or list, not %s"%type(args))
912 raise TypeError("args must be tuple or list, not %s"%type(args))
913 if not isinstance(kwargs, dict):
913 if not isinstance(kwargs, dict):
914 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
914 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
915 if not isinstance(subheader, dict):
915 if not isinstance(subheader, dict):
916 raise TypeError("subheader must be dict, not %s"%type(subheader))
916 raise TypeError("subheader must be dict, not %s"%type(subheader))
917
917
918 bufs = util.pack_apply_message(f,args,kwargs)
918 bufs = util.pack_apply_message(f,args,kwargs)
919
919
920 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
920 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
921 subheader=subheader, track=track)
921 subheader=subheader, track=track)
922
922
923 msg_id = msg['msg_id']
923 msg_id = msg['msg_id']
924 self.outstanding.add(msg_id)
924 self.outstanding.add(msg_id)
925 if ident:
925 if ident:
926 # possibly routed to a specific engine
926 # possibly routed to a specific engine
927 if isinstance(ident, list):
927 if isinstance(ident, list):
928 ident = ident[-1]
928 ident = ident[-1]
929 if ident in self._engines.values():
929 if ident in self._engines.values():
930 # save for later, in case of engine death
930 # save for later, in case of engine death
931 self._outstanding_dict[ident].add(msg_id)
931 self._outstanding_dict[ident].add(msg_id)
932 self.history.append(msg_id)
932 self.history.append(msg_id)
933 self.metadata[msg_id]['submitted'] = datetime.now()
933 self.metadata[msg_id]['submitted'] = datetime.now()
934
934
935 return msg
935 return msg
936
936
937 #--------------------------------------------------------------------------
937 #--------------------------------------------------------------------------
938 # construct a View object
938 # construct a View object
939 #--------------------------------------------------------------------------
939 #--------------------------------------------------------------------------
940
940
941 def load_balanced_view(self, targets=None):
941 def load_balanced_view(self, targets=None):
942 """construct a DirectView object.
942 """construct a DirectView object.
943
943
944 If no arguments are specified, create a LoadBalancedView
944 If no arguments are specified, create a LoadBalancedView
945 using all engines.
945 using all engines.
946
946
947 Parameters
947 Parameters
948 ----------
948 ----------
949
949
950 targets: list,slice,int,etc. [default: use all engines]
950 targets: list,slice,int,etc. [default: use all engines]
951 The subset of engines across which to load-balance
951 The subset of engines across which to load-balance
952 """
952 """
953 if targets is not None:
953 if targets is not None:
954 targets = self._build_targets(targets)[1]
954 targets = self._build_targets(targets)[1]
955 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
955 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
956
956
957 def direct_view(self, targets='all'):
957 def direct_view(self, targets='all'):
958 """construct a DirectView object.
958 """construct a DirectView object.
959
959
960 If no targets are specified, create a DirectView
960 If no targets are specified, create a DirectView
961 using all engines.
961 using all engines.
962
962
963 Parameters
963 Parameters
964 ----------
964 ----------
965
965
966 targets: list,slice,int,etc. [default: use all engines]
966 targets: list,slice,int,etc. [default: use all engines]
967 The engines to use for the View
967 The engines to use for the View
968 """
968 """
969 single = isinstance(targets, int)
969 single = isinstance(targets, int)
970 targets = self._build_targets(targets)[1]
970 targets = self._build_targets(targets)[1]
971 if single:
971 if single:
972 targets = targets[0]
972 targets = targets[0]
973 return DirectView(client=self, socket=self._mux_socket, targets=targets)
973 return DirectView(client=self, socket=self._mux_socket, targets=targets)
974
974
975 #--------------------------------------------------------------------------
975 #--------------------------------------------------------------------------
976 # Query methods
976 # Query methods
977 #--------------------------------------------------------------------------
977 #--------------------------------------------------------------------------
978
978
979 @spin_first
979 @spin_first
980 def get_result(self, indices_or_msg_ids=None, block=None):
980 def get_result(self, indices_or_msg_ids=None, block=None):
981 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
981 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
982
982
983 If the client already has the results, no request to the Hub will be made.
983 If the client already has the results, no request to the Hub will be made.
984
984
985 This is a convenient way to construct AsyncResult objects, which are wrappers
985 This is a convenient way to construct AsyncResult objects, which are wrappers
986 that include metadata about execution, and allow for awaiting results that
986 that include metadata about execution, and allow for awaiting results that
987 were not submitted by this Client.
987 were not submitted by this Client.
988
988
989 It can also be a convenient way to retrieve the metadata associated with
989 It can also be a convenient way to retrieve the metadata associated with
990 blocking execution, since it always retrieves
990 blocking execution, since it always retrieves
991
991
992 Examples
992 Examples
993 --------
993 --------
994 ::
994 ::
995
995
996 In [10]: r = client.apply()
996 In [10]: r = client.apply()
997
997
998 Parameters
998 Parameters
999 ----------
999 ----------
1000
1000
1001 indices_or_msg_ids : integer history index, str msg_id, or list of either
1001 indices_or_msg_ids : integer history index, str msg_id, or list of either
1002 The indices or msg_ids of indices to be retrieved
1002 The indices or msg_ids of indices to be retrieved
1003
1003
1004 block : bool
1004 block : bool
1005 Whether to wait for the result to be done
1005 Whether to wait for the result to be done
1006
1006
1007 Returns
1007 Returns
1008 -------
1008 -------
1009
1009
1010 AsyncResult
1010 AsyncResult
1011 A single AsyncResult object will always be returned.
1011 A single AsyncResult object will always be returned.
1012
1012
1013 AsyncHubResult
1013 AsyncHubResult
1014 A subclass of AsyncResult that retrieves results from the Hub
1014 A subclass of AsyncResult that retrieves results from the Hub
1015
1015
1016 """
1016 """
1017 block = self.block if block is None else block
1017 block = self.block if block is None else block
1018 if indices_or_msg_ids is None:
1018 if indices_or_msg_ids is None:
1019 indices_or_msg_ids = -1
1019 indices_or_msg_ids = -1
1020
1020
1021 if not isinstance(indices_or_msg_ids, (list,tuple)):
1021 if not isinstance(indices_or_msg_ids, (list,tuple)):
1022 indices_or_msg_ids = [indices_or_msg_ids]
1022 indices_or_msg_ids = [indices_or_msg_ids]
1023
1023
1024 theids = []
1024 theids = []
1025 for id in indices_or_msg_ids:
1025 for id in indices_or_msg_ids:
1026 if isinstance(id, int):
1026 if isinstance(id, int):
1027 id = self.history[id]
1027 id = self.history[id]
1028 if not isinstance(id, str):
1028 if not isinstance(id, str):
1029 raise TypeError("indices must be str or int, not %r"%id)
1029 raise TypeError("indices must be str or int, not %r"%id)
1030 theids.append(id)
1030 theids.append(id)
1031
1031
1032 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1032 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1033 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1033 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1034
1034
1035 if remote_ids:
1035 if remote_ids:
1036 ar = AsyncHubResult(self, msg_ids=theids)
1036 ar = AsyncHubResult(self, msg_ids=theids)
1037 else:
1037 else:
1038 ar = AsyncResult(self, msg_ids=theids)
1038 ar = AsyncResult(self, msg_ids=theids)
1039
1039
1040 if block:
1040 if block:
1041 ar.wait()
1041 ar.wait()
1042
1042
1043 return ar
1043 return ar
1044
1044
1045 @spin_first
1045 @spin_first
1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1047 """Resubmit one or more tasks.
1047 """Resubmit one or more tasks.
1048
1048
1049 in-flight tasks may not be resubmitted.
1049 in-flight tasks may not be resubmitted.
1050
1050
1051 Parameters
1051 Parameters
1052 ----------
1052 ----------
1053
1053
1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1055 The indices or msg_ids of indices to be retrieved
1055 The indices or msg_ids of indices to be retrieved
1056
1056
1057 block : bool
1057 block : bool
1058 Whether to wait for the result to be done
1058 Whether to wait for the result to be done
1059
1059
1060 Returns
1060 Returns
1061 -------
1061 -------
1062
1062
1063 AsyncHubResult
1063 AsyncHubResult
1064 A subclass of AsyncResult that retrieves results from the Hub
1064 A subclass of AsyncResult that retrieves results from the Hub
1065
1065
1066 """
1066 """
1067 block = self.block if block is None else block
1067 block = self.block if block is None else block
1068 if indices_or_msg_ids is None:
1068 if indices_or_msg_ids is None:
1069 indices_or_msg_ids = -1
1069 indices_or_msg_ids = -1
1070
1070
1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1072 indices_or_msg_ids = [indices_or_msg_ids]
1072 indices_or_msg_ids = [indices_or_msg_ids]
1073
1073
1074 theids = []
1074 theids = []
1075 for id in indices_or_msg_ids:
1075 for id in indices_or_msg_ids:
1076 if isinstance(id, int):
1076 if isinstance(id, int):
1077 id = self.history[id]
1077 id = self.history[id]
1078 if not isinstance(id, str):
1078 if not isinstance(id, str):
1079 raise TypeError("indices must be str or int, not %r"%id)
1079 raise TypeError("indices must be str or int, not %r"%id)
1080 theids.append(id)
1080 theids.append(id)
1081
1081
1082 for msg_id in theids:
1082 for msg_id in theids:
1083 self.outstanding.discard(msg_id)
1083 self.outstanding.discard(msg_id)
1084 if msg_id in self.history:
1084 if msg_id in self.history:
1085 self.history.remove(msg_id)
1085 self.history.remove(msg_id)
1086 self.results.pop(msg_id, None)
1086 self.results.pop(msg_id, None)
1087 self.metadata.pop(msg_id, None)
1087 self.metadata.pop(msg_id, None)
1088 content = dict(msg_ids = theids)
1088 content = dict(msg_ids = theids)
1089
1089
1090 self.session.send(self._query_socket, 'resubmit_request', content)
1090 self.session.send(self._query_socket, 'resubmit_request', content)
1091
1091
1092 zmq.select([self._query_socket], [], [])
1092 zmq.select([self._query_socket], [], [])
1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1094 if self.debug:
1094 if self.debug:
1095 pprint(msg)
1095 pprint(msg)
1096 content = msg['content']
1096 content = msg['content']
1097 if content['status'] != 'ok':
1097 if content['status'] != 'ok':
1098 raise self._unwrap_exception(content)
1098 raise self._unwrap_exception(content)
1099
1099
1100 ar = AsyncHubResult(self, msg_ids=theids)
1100 ar = AsyncHubResult(self, msg_ids=theids)
1101
1101
1102 if block:
1102 if block:
1103 ar.wait()
1103 ar.wait()
1104
1104
1105 return ar
1105 return ar
1106
1106
1107 @spin_first
1107 @spin_first
1108 def result_status(self, msg_ids, status_only=True):
1108 def result_status(self, msg_ids, status_only=True):
1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1110
1110
1111 If status_only is False, then the actual results will be retrieved, else
1111 If status_only is False, then the actual results will be retrieved, else
1112 only the status of the results will be checked.
1112 only the status of the results will be checked.
1113
1113
1114 Parameters
1114 Parameters
1115 ----------
1115 ----------
1116
1116
1117 msg_ids : list of msg_ids
1117 msg_ids : list of msg_ids
1118 if int:
1118 if int:
1119 Passed as index to self.history for convenience.
1119 Passed as index to self.history for convenience.
1120 status_only : bool (default: True)
1120 status_only : bool (default: True)
1121 if False:
1121 if False:
1122 Retrieve the actual results of completed tasks.
1122 Retrieve the actual results of completed tasks.
1123
1123
1124 Returns
1124 Returns
1125 -------
1125 -------
1126
1126
1127 results : dict
1127 results : dict
1128 There will always be the keys 'pending' and 'completed', which will
1128 There will always be the keys 'pending' and 'completed', which will
1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1130 is False, then completed results will be keyed by their `msg_id`.
1130 is False, then completed results will be keyed by their `msg_id`.
1131 """
1131 """
1132 if not isinstance(msg_ids, (list,tuple)):
1132 if not isinstance(msg_ids, (list,tuple)):
1133 msg_ids = [msg_ids]
1133 msg_ids = [msg_ids]
1134
1134
1135 theids = []
1135 theids = []
1136 for msg_id in msg_ids:
1136 for msg_id in msg_ids:
1137 if isinstance(msg_id, int):
1137 if isinstance(msg_id, int):
1138 msg_id = self.history[msg_id]
1138 msg_id = self.history[msg_id]
1139 if not isinstance(msg_id, basestring):
1139 if not isinstance(msg_id, basestring):
1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1141 theids.append(msg_id)
1141 theids.append(msg_id)
1142
1142
1143 completed = []
1143 completed = []
1144 local_results = {}
1144 local_results = {}
1145
1145
1146 # comment this block out to temporarily disable local shortcut:
1146 # comment this block out to temporarily disable local shortcut:
1147 for msg_id in theids:
1147 for msg_id in theids:
1148 if msg_id in self.results:
1148 if msg_id in self.results:
1149 completed.append(msg_id)
1149 completed.append(msg_id)
1150 local_results[msg_id] = self.results[msg_id]
1150 local_results[msg_id] = self.results[msg_id]
1151 theids.remove(msg_id)
1151 theids.remove(msg_id)
1152
1152
1153 if theids: # some not locally cached
1153 if theids: # some not locally cached
1154 content = dict(msg_ids=theids, status_only=status_only)
1154 content = dict(msg_ids=theids, status_only=status_only)
1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1156 zmq.select([self._query_socket], [], [])
1156 zmq.select([self._query_socket], [], [])
1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1158 if self.debug:
1158 if self.debug:
1159 pprint(msg)
1159 pprint(msg)
1160 content = msg['content']
1160 content = msg['content']
1161 if content['status'] != 'ok':
1161 if content['status'] != 'ok':
1162 raise self._unwrap_exception(content)
1162 raise self._unwrap_exception(content)
1163 buffers = msg['buffers']
1163 buffers = msg['buffers']
1164 else:
1164 else:
1165 content = dict(completed=[],pending=[])
1165 content = dict(completed=[],pending=[])
1166
1166
1167 content['completed'].extend(completed)
1167 content['completed'].extend(completed)
1168
1168
1169 if status_only:
1169 if status_only:
1170 return content
1170 return content
1171
1171
1172 failures = []
1172 failures = []
1173 # load cached results into result:
1173 # load cached results into result:
1174 content.update(local_results)
1174 content.update(local_results)
1175 # update cache with results:
1175 # update cache with results:
1176 for msg_id in sorted(theids):
1176 for msg_id in sorted(theids):
1177 if msg_id in content['completed']:
1177 if msg_id in content['completed']:
1178 rec = content[msg_id]
1178 rec = content[msg_id]
1179 parent = rec['header']
1179 parent = rec['header']
1180 header = rec['result_header']
1180 header = rec['result_header']
1181 rcontent = rec['result_content']
1181 rcontent = rec['result_content']
1182 iodict = rec['io']
1182 iodict = rec['io']
1183 if isinstance(rcontent, str):
1183 if isinstance(rcontent, str):
1184 rcontent = self.session.unpack(rcontent)
1184 rcontent = self.session.unpack(rcontent)
1185
1185
1186 md = self.metadata[msg_id]
1186 md = self.metadata[msg_id]
1187 md.update(self._extract_metadata(header, parent, rcontent))
1187 md.update(self._extract_metadata(header, parent, rcontent))
1188 md.update(iodict)
1188 md.update(iodict)
1189
1189
1190 if rcontent['status'] == 'ok':
1190 if rcontent['status'] == 'ok':
1191 res,buffers = util.unserialize_object(buffers)
1191 res,buffers = util.unserialize_object(buffers)
1192 else:
1192 else:
1193 print rcontent
1193 print rcontent
1194 res = self._unwrap_exception(rcontent)
1194 res = self._unwrap_exception(rcontent)
1195 failures.append(res)
1195 failures.append(res)
1196
1196
1197 self.results[msg_id] = res
1197 self.results[msg_id] = res
1198 content[msg_id] = res
1198 content[msg_id] = res
1199
1199
1200 if len(theids) == 1 and failures:
1200 if len(theids) == 1 and failures:
1201 raise failures[0]
1201 raise failures[0]
1202
1202
1203 error.collect_exceptions(failures, "result_status")
1203 error.collect_exceptions(failures, "result_status")
1204 return content
1204 return content
1205
1205
1206 @spin_first
1206 @spin_first
1207 def queue_status(self, targets='all', verbose=False):
1207 def queue_status(self, targets='all', verbose=False):
1208 """Fetch the status of engine queues.
1208 """Fetch the status of engine queues.
1209
1209
1210 Parameters
1210 Parameters
1211 ----------
1211 ----------
1212
1212
1213 targets : int/str/list of ints/strs
1213 targets : int/str/list of ints/strs
1214 the engines whose states are to be queried.
1214 the engines whose states are to be queried.
1215 default : all
1215 default : all
1216 verbose : bool
1216 verbose : bool
1217 Whether to return lengths only, or lists of ids for each element
1217 Whether to return lengths only, or lists of ids for each element
1218 """
1218 """
1219 engine_ids = self._build_targets(targets)[1]
1219 engine_ids = self._build_targets(targets)[1]
1220 content = dict(targets=engine_ids, verbose=verbose)
1220 content = dict(targets=engine_ids, verbose=verbose)
1221 self.session.send(self._query_socket, "queue_request", content=content)
1221 self.session.send(self._query_socket, "queue_request", content=content)
1222 idents,msg = self.session.recv(self._query_socket, 0)
1222 idents,msg = self.session.recv(self._query_socket, 0)
1223 if self.debug:
1223 if self.debug:
1224 pprint(msg)
1224 pprint(msg)
1225 content = msg['content']
1225 content = msg['content']
1226 status = content.pop('status')
1226 status = content.pop('status')
1227 if status != 'ok':
1227 if status != 'ok':
1228 raise self._unwrap_exception(content)
1228 raise self._unwrap_exception(content)
1229 content = util.rekey(content)
1229 content = util.rekey(content)
1230 if isinstance(targets, int):
1230 if isinstance(targets, int):
1231 return content[targets]
1231 return content[targets]
1232 else:
1232 else:
1233 return content
1233 return content
1234
1234
1235 @spin_first
1235 @spin_first
1236 def purge_results(self, jobs=[], targets=[]):
1236 def purge_results(self, jobs=[], targets=[]):
1237 """Tell the Hub to forget results.
1237 """Tell the Hub to forget results.
1238
1238
1239 Individual results can be purged by msg_id, or the entire
1239 Individual results can be purged by msg_id, or the entire
1240 history of specific targets can be purged.
1240 history of specific targets can be purged.
1241
1241
1242 Parameters
1242 Parameters
1243 ----------
1243 ----------
1244
1244
1245 jobs : str or list of str or AsyncResult objects
1245 jobs : str or list of str or AsyncResult objects
1246 the msg_ids whose results should be forgotten.
1246 the msg_ids whose results should be forgotten.
1247 targets : int/str/list of ints/strs
1247 targets : int/str/list of ints/strs
1248 The targets, by uuid or int_id, whose entire history is to be purged.
1248 The targets, by uuid or int_id, whose entire history is to be purged.
1249 Use `targets='all'` to scrub everything from the Hub's memory.
1249 Use `targets='all'` to scrub everything from the Hub's memory.
1250
1250
1251 default : None
1251 default : None
1252 """
1252 """
1253 if not targets and not jobs:
1253 if not targets and not jobs:
1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1255 if targets:
1255 if targets:
1256 targets = self._build_targets(targets)[1]
1256 targets = self._build_targets(targets)[1]
1257
1257
1258 # construct msg_ids from jobs
1258 # construct msg_ids from jobs
1259 msg_ids = []
1259 msg_ids = []
1260 if isinstance(jobs, (basestring,AsyncResult)):
1260 if isinstance(jobs, (basestring,AsyncResult)):
1261 jobs = [jobs]
1261 jobs = [jobs]
1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1263 if bad_ids:
1263 if bad_ids:
1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1265 for j in jobs:
1265 for j in jobs:
1266 if isinstance(j, AsyncResult):
1266 if isinstance(j, AsyncResult):
1267 msg_ids.extend(j.msg_ids)
1267 msg_ids.extend(j.msg_ids)
1268 else:
1268 else:
1269 msg_ids.append(j)
1269 msg_ids.append(j)
1270
1270
1271 content = dict(targets=targets, msg_ids=msg_ids)
1271 content = dict(targets=targets, msg_ids=msg_ids)
1272 self.session.send(self._query_socket, "purge_request", content=content)
1272 self.session.send(self._query_socket, "purge_request", content=content)
1273 idents, msg = self.session.recv(self._query_socket, 0)
1273 idents, msg = self.session.recv(self._query_socket, 0)
1274 if self.debug:
1274 if self.debug:
1275 pprint(msg)
1275 pprint(msg)
1276 content = msg['content']
1276 content = msg['content']
1277 if content['status'] != 'ok':
1277 if content['status'] != 'ok':
1278 raise self._unwrap_exception(content)
1278 raise self._unwrap_exception(content)
1279
1279
1280 @spin_first
1280 @spin_first
1281 def hub_history(self):
1281 def hub_history(self):
1282 """Get the Hub's history
1282 """Get the Hub's history
1283
1283
1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1285 This will contain the history of all clients, and, depending on configuration,
1285 This will contain the history of all clients, and, depending on configuration,
1286 may contain history across multiple cluster sessions.
1286 may contain history across multiple cluster sessions.
1287
1287
1288 Any msg_id returned here is a valid argument to `get_result`.
1288 Any msg_id returned here is a valid argument to `get_result`.
1289
1289
1290 Returns
1290 Returns
1291 -------
1291 -------
1292
1292
1293 msg_ids : list of strs
1293 msg_ids : list of strs
1294 list of all msg_ids, ordered by task submission time.
1294 list of all msg_ids, ordered by task submission time.
1295 """
1295 """
1296
1296
1297 self.session.send(self._query_socket, "history_request", content={})
1297 self.session.send(self._query_socket, "history_request", content={})
1298 idents, msg = self.session.recv(self._query_socket, 0)
1298 idents, msg = self.session.recv(self._query_socket, 0)
1299
1299
1300 if self.debug:
1300 if self.debug:
1301 pprint(msg)
1301 pprint(msg)
1302 content = msg['content']
1302 content = msg['content']
1303 if content['status'] != 'ok':
1303 if content['status'] != 'ok':
1304 raise self._unwrap_exception(content)
1304 raise self._unwrap_exception(content)
1305 else:
1305 else:
1306 return content['history']
1306 return content['history']
1307
1307
1308 @spin_first
1308 @spin_first
1309 def db_query(self, query, keys=None):
1309 def db_query(self, query, keys=None):
1310 """Query the Hub's TaskRecord database
1310 """Query the Hub's TaskRecord database
1311
1311
1312 This will return a list of task record dicts that match `query`
1312 This will return a list of task record dicts that match `query`
1313
1313
1314 Parameters
1314 Parameters
1315 ----------
1315 ----------
1316
1316
1317 query : mongodb query dict
1317 query : mongodb query dict
1318 The search dict. See mongodb query docs for details.
1318 The search dict. See mongodb query docs for details.
1319 keys : list of strs [optional]
1319 keys : list of strs [optional]
1320 THe subset of keys to be returned. The default is to fetch everything.
1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1321 'msg_id' will *always* be included.
1321 'msg_id' will *always* be included.
1322 """
1322 """
1323 if isinstance(keys, basestring):
1324 keys = [keys]
1323 content = dict(query=query, keys=keys)
1325 content = dict(query=query, keys=keys)
1324 self.session.send(self._query_socket, "db_request", content=content)
1326 self.session.send(self._query_socket, "db_request", content=content)
1325 idents, msg = self.session.recv(self._query_socket, 0)
1327 idents, msg = self.session.recv(self._query_socket, 0)
1326 if self.debug:
1328 if self.debug:
1327 pprint(msg)
1329 pprint(msg)
1328 content = msg['content']
1330 content = msg['content']
1329 if content['status'] != 'ok':
1331 if content['status'] != 'ok':
1330 raise self._unwrap_exception(content)
1332 raise self._unwrap_exception(content)
1331
1333
1332 records = content['records']
1334 records = content['records']
1333 buffer_lens = content['buffer_lens']
1335 buffer_lens = content['buffer_lens']
1334 result_buffer_lens = content['result_buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1335 buffers = msg['buffers']
1337 buffers = msg['buffers']
1336 has_bufs = buffer_lens is not None
1338 has_bufs = buffer_lens is not None
1337 has_rbufs = result_buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1338 for i,rec in enumerate(records):
1340 for i,rec in enumerate(records):
1339 # relink buffers
1341 # relink buffers
1340 if has_bufs:
1342 if has_bufs:
1341 blen = buffer_lens[i]
1343 blen = buffer_lens[i]
1342 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1343 if has_rbufs:
1345 if has_rbufs:
1344 blen = result_buffer_lens[i]
1346 blen = result_buffer_lens[i]
1345 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1346 # turn timestamps back into times
1348 # turn timestamps back into times
1347 for key in 'submitted started completed resubmitted'.split():
1349 for key in 'submitted started completed resubmitted'.split():
1348 maybedate = rec.get(key, None)
1350 maybedate = rec.get(key, None)
1349 if maybedate and util.ISO8601_RE.match(maybedate):
1351 if maybedate and util.ISO8601_RE.match(maybedate):
1350 rec[key] = datetime.strptime(maybedate, util.ISO8601)
1352 rec[key] = datetime.strptime(maybedate, util.ISO8601)
1351
1353
1352 return records
1354 return records
1353
1355
1354 __all__ = [ 'Client' ]
1356 __all__ = [ 'Client' ]
@@ -1,22 +1,23
1 .. _parallel_index:
1 .. _parallel_index:
2
2
3 ====================================
3 ====================================
4 Using IPython for parallel computing
4 Using IPython for parallel computing
5 ====================================
5 ====================================
6
6
7 .. toctree::
7 .. toctree::
8 :maxdepth: 2
8 :maxdepth: 2
9
9
10 parallel_intro.txt
10 parallel_intro.txt
11 parallel_process.txt
11 parallel_process.txt
12 parallel_multiengine.txt
12 parallel_multiengine.txt
13 parallel_task.txt
13 parallel_task.txt
14 parallel_mpi.txt
14 parallel_mpi.txt
15 parallel_db.txt
15 parallel_security.txt
16 parallel_security.txt
16 parallel_winhpc.txt
17 parallel_winhpc.txt
17 parallel_demos.txt
18 parallel_demos.txt
18 dag_dependencies.txt
19 dag_dependencies.txt
19 parallel_details.txt
20 parallel_details.txt
20 parallel_transition.txt
21 parallel_transition.txt
21
22
22
23
@@ -1,418 +1,442
1 .. _parallel_task:
1 .. _parallel_task:
2
2
3 ==========================
3 ==========================
4 The IPython task interface
4 The IPython task interface
5 ==========================
5 ==========================
6
6
7 The task interface to the cluster presents the engines as a fault tolerant,
7 The task interface to the cluster presents the engines as a fault tolerant,
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 the task interface the user have no direct access to individual engines. By
9 the task interface the user have no direct access to individual engines. By
10 allowing the IPython scheduler to assign work, this interface is simultaneously
10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 simpler and more powerful.
11 simpler and more powerful.
12
12
13 Best of all, the user can use both of these interfaces running at the same time
13 Best of all, the user can use both of these interfaces running at the same time
14 to take advantage of their respective strengths. When the user can break up
14 to take advantage of their respective strengths. When the user can break up
15 the user's work into segments that do not depend on previous execution, the
15 the user's work into segments that do not depend on previous execution, the
16 task interface is ideal. But it also has more power and flexibility, allowing
16 task interface is ideal. But it also has more power and flexibility, allowing
17 the user to guide the distribution of jobs, without having to assign tasks to
17 the user to guide the distribution of jobs, without having to assign tasks to
18 engines explicitly.
18 engines explicitly.
19
19
20 Starting the IPython controller and engines
20 Starting the IPython controller and engines
21 ===========================================
21 ===========================================
22
22
23 To follow along with this tutorial, you will need to start the IPython
23 To follow along with this tutorial, you will need to start the IPython
24 controller and four IPython engines. The simplest way of doing this is to use
24 controller and four IPython engines. The simplest way of doing this is to use
25 the :command:`ipcluster` command::
25 the :command:`ipcluster` command::
26
26
27 $ ipcluster start -n 4
27 $ ipcluster start -n 4
28
28
29 For more detailed information about starting the controller and engines, see
29 For more detailed information about starting the controller and engines, see
30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
31
31
32 Creating a ``Client`` instance
32 Creating a ``Client`` instance
33 ==============================
33 ==============================
34
34
35 The first step is to import the IPython :mod:`IPython.parallel`
35 The first step is to import the IPython :mod:`IPython.parallel`
36 module and then create a :class:`.Client` instance, and we will also be using
36 module and then create a :class:`.Client` instance, and we will also be using
37 a :class:`LoadBalancedView`, here called `lview`:
37 a :class:`LoadBalancedView`, here called `lview`:
38
38
39 .. sourcecode:: ipython
39 .. sourcecode:: ipython
40
40
41 In [1]: from IPython.parallel import Client
41 In [1]: from IPython.parallel import Client
42
42
43 In [2]: rc = Client()
43 In [2]: rc = Client()
44
44
45
45
46 This form assumes that the controller was started on localhost with default
46 This form assumes that the controller was started on localhost with default
47 configuration. If not, the location of the controller must be given as an
47 configuration. If not, the location of the controller must be given as an
48 argument to the constructor:
48 argument to the constructor:
49
49
50 .. sourcecode:: ipython
50 .. sourcecode:: ipython
51
51
52 # for a visible LAN controller listening on an external port:
52 # for a visible LAN controller listening on an external port:
53 In [2]: rc = Client('tcp://192.168.1.16:10101')
53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 # or to connect with a specific profile you have set up:
54 # or to connect with a specific profile you have set up:
55 In [3]: rc = Client(profile='mpi')
55 In [3]: rc = Client(profile='mpi')
56
56
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 be constructed via the client's :meth:`load_balanced_view` method:
58 be constructed via the client's :meth:`load_balanced_view` method:
59
59
60 .. sourcecode:: ipython
60 .. sourcecode:: ipython
61
61
62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63
63
64 .. seealso::
64 .. seealso::
65
65
66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67
67
68
68
69 Quick and easy parallelism
69 Quick and easy parallelism
70 ==========================
70 ==========================
71
71
72 In many cases, you simply want to apply a Python function to a sequence of
72 In many cases, you simply want to apply a Python function to a sequence of
73 objects, but *in parallel*. Like the multiengine interface, these can be
73 objects, but *in parallel*. Like the multiengine interface, these can be
74 implemented via the task interface. The exact same tools can perform these
74 implemented via the task interface. The exact same tools can perform these
75 actions in load-balanced ways as well as multiplexed ways: a parallel version
75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 execution time per item varies significantly, you should use the versions in
78 execution time per item varies significantly, you should use the versions in
79 the task interface.
79 the task interface.
80
80
81 Parallel map
81 Parallel map
82 ------------
82 ------------
83
83
84 To load-balance :meth:`map`,simply use a LoadBalancedView:
84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85
85
86 .. sourcecode:: ipython
86 .. sourcecode:: ipython
87
87
88 In [62]: lview.block = True
88 In [62]: lview.block = True
89
89
90 In [63]: serial_result = map(lambda x:x**10, range(32))
90 In [63]: serial_result = map(lambda x:x**10, range(32))
91
91
92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93
93
94 In [65]: serial_result==parallel_result
94 In [65]: serial_result==parallel_result
95 Out[65]: True
95 Out[65]: True
96
96
97 Parallel function decorator
97 Parallel function decorator
98 ---------------------------
98 ---------------------------
99
99
100 Parallel functions are just like normal function, but they can be called on
100 Parallel functions are just like normal function, but they can be called on
101 sequences and *in parallel*. The multiengine interface provides a decorator
101 sequences and *in parallel*. The multiengine interface provides a decorator
102 that turns any Python function into a parallel function:
102 that turns any Python function into a parallel function:
103
103
104 .. sourcecode:: ipython
104 .. sourcecode:: ipython
105
105
106 In [10]: @lview.parallel()
106 In [10]: @lview.parallel()
107 ....: def f(x):
107 ....: def f(x):
108 ....: return 10.0*x**4
108 ....: return 10.0*x**4
109 ....:
109 ....:
110
110
111 In [11]: f.map(range(32)) # this is done in parallel
111 In [11]: f.map(range(32)) # this is done in parallel
112 Out[11]: [0.0,10.0,160.0,...]
112 Out[11]: [0.0,10.0,160.0,...]
113
113
114 .. _parallel_dependencies:
114 .. _parallel_dependencies:
115
115
116 Dependencies
116 Dependencies
117 ============
117 ============
118
118
119 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
119 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
120 may want to associate some kind of `Dependency` that describes when, where, or whether
120 may want to associate some kind of `Dependency` that describes when, where, or whether
121 a task can be run. In IPython, we provide two types of dependencies:
121 a task can be run. In IPython, we provide two types of dependencies:
122 `Functional Dependencies`_ and `Graph Dependencies`_
122 `Functional Dependencies`_ and `Graph Dependencies`_
123
123
124 .. note::
124 .. note::
125
125
126 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
126 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
127 and you will see errors or warnings if you try to use dependencies with the pure
127 and you will see errors or warnings if you try to use dependencies with the pure
128 scheduler.
128 scheduler.
129
129
130 Functional Dependencies
130 Functional Dependencies
131 -----------------------
131 -----------------------
132
132
133 Functional dependencies are used to determine whether a given engine is capable of running
133 Functional dependencies are used to determine whether a given engine is capable of running
134 a particular task. This is implemented via a special :class:`Exception` class,
134 a particular task. This is implemented via a special :class:`Exception` class,
135 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
135 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
136 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
136 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
137 the error up to the client like any other error, catches the error, and submits the task
137 the error up to the client like any other error, catches the error, and submits the task
138 to a different engine. This will repeat indefinitely, and a task will never be submitted
138 to a different engine. This will repeat indefinitely, and a task will never be submitted
139 to a given engine a second time.
139 to a given engine a second time.
140
140
141 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
141 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
142 some decorators for facilitating this behavior.
142 some decorators for facilitating this behavior.
143
143
144 There are two decorators and a class used for functional dependencies:
144 There are two decorators and a class used for functional dependencies:
145
145
146 .. sourcecode:: ipython
146 .. sourcecode:: ipython
147
147
148 In [9]: from IPython.parallel import depend, require, dependent
148 In [9]: from IPython.parallel import depend, require, dependent
149
149
150 @require
150 @require
151 ********
151 ********
152
152
153 The simplest sort of dependency is requiring that a Python module is available. The
153 The simplest sort of dependency is requiring that a Python module is available. The
154 ``@require`` decorator lets you define a function that will only run on engines where names
154 ``@require`` decorator lets you define a function that will only run on engines where names
155 you specify are importable:
155 you specify are importable:
156
156
157 .. sourcecode:: ipython
157 .. sourcecode:: ipython
158
158
159 In [10]: @require('numpy', 'zmq')
159 In [10]: @require('numpy', 'zmq')
160 ...: def myfunc():
160 ...: def myfunc():
161 ...: return dostuff()
161 ...: return dostuff()
162
162
163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
165
165
166 @depend
166 @depend
167 *******
167 *******
168
168
169 The ``@depend`` decorator lets you decorate any function with any *other* function to
169 The ``@depend`` decorator lets you decorate any function with any *other* function to
170 evaluate the dependency. The dependency function will be called at the start of the task,
170 evaluate the dependency. The dependency function will be called at the start of the task,
171 and if it returns ``False``, then the dependency will be considered unmet, and the task
171 and if it returns ``False``, then the dependency will be considered unmet, and the task
172 will be assigned to another engine. If the dependency returns *anything other than
172 will be assigned to another engine. If the dependency returns *anything other than
173 ``False``*, the rest of the task will continue.
173 ``False``*, the rest of the task will continue.
174
174
175 .. sourcecode:: ipython
175 .. sourcecode:: ipython
176
176
177 In [10]: def platform_specific(plat):
177 In [10]: def platform_specific(plat):
178 ...: import sys
178 ...: import sys
179 ...: return sys.platform == plat
179 ...: return sys.platform == plat
180
180
181 In [11]: @depend(platform_specific, 'darwin')
181 In [11]: @depend(platform_specific, 'darwin')
182 ...: def mactask():
182 ...: def mactask():
183 ...: do_mac_stuff()
183 ...: do_mac_stuff()
184
184
185 In [12]: @depend(platform_specific, 'nt')
185 In [12]: @depend(platform_specific, 'nt')
186 ...: def wintask():
186 ...: def wintask():
187 ...: do_windows_stuff()
187 ...: do_windows_stuff()
188
188
189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
191 signature.
191 signature.
192
192
193 dependents
193 dependents
194 **********
194 **********
195
195
196 You don't have to use the decorators on your tasks, if for instance you may want
196 You don't have to use the decorators on your tasks, if for instance you may want
197 to run tasks with a single function but varying dependencies, you can directly construct
197 to run tasks with a single function but varying dependencies, you can directly construct
198 the :class:`dependent` object that the decorators use:
198 the :class:`dependent` object that the decorators use:
199
199
200 .. sourcecode::ipython
200 .. sourcecode::ipython
201
201
202 In [13]: def mytask(*args):
202 In [13]: def mytask(*args):
203 ...: dostuff()
203 ...: dostuff()
204
204
205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
206 # this is the same as decorating the declaration of mytask with @depend
206 # this is the same as decorating the declaration of mytask with @depend
207 # but you can do it again:
207 # but you can do it again:
208
208
209 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
209 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
210
210
211 # in general:
211 # in general:
212 In [16]: t = dependent(f, g, *dargs, **dkwargs)
212 In [16]: t = dependent(f, g, *dargs, **dkwargs)
213
213
214 # is equivalent to:
214 # is equivalent to:
215 In [17]: @depend(g, *dargs, **dkwargs)
215 In [17]: @depend(g, *dargs, **dkwargs)
216 ...: def t(a,b,c):
216 ...: def t(a,b,c):
217 ...: # contents of f
217 ...: # contents of f
218
218
219 Graph Dependencies
219 Graph Dependencies
220 ------------------
220 ------------------
221
221
222 Sometimes you want to restrict the time and/or location to run a given task as a function
222 Sometimes you want to restrict the time and/or location to run a given task as a function
223 of the time and/or location of other tasks. This is implemented via a subclass of
223 of the time and/or location of other tasks. This is implemented via a subclass of
224 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
224 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
225 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
225 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
226 has been met.
226 has been met.
227
227
228 The switches we provide for interpreting whether a given dependency set has been met:
228 The switches we provide for interpreting whether a given dependency set has been met:
229
229
230 any|all
230 any|all
231 Whether the dependency is considered met if *any* of the dependencies are done, or
231 Whether the dependency is considered met if *any* of the dependencies are done, or
232 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
232 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
233 boolean attribute, which defaults to ``True``.
233 boolean attribute, which defaults to ``True``.
234
234
235 success [default: True]
235 success [default: True]
236 Whether to consider tasks that succeeded as fulfilling dependencies.
236 Whether to consider tasks that succeeded as fulfilling dependencies.
237
237
238 failure [default : False]
238 failure [default : False]
239 Whether to consider tasks that failed as fulfilling dependencies.
239 Whether to consider tasks that failed as fulfilling dependencies.
240 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
240 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
241 only when tasks have failed.
241 only when tasks have failed.
242
242
243 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
243 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
244 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
244 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
245 not care whether the task succeeds, and always want the second task to run, in which case you
245 not care whether the task succeeds, and always want the second task to run, in which case you
246 should use `success=failure=True`. The default behavior is to only use successes.
246 should use `success=failure=True`. The default behavior is to only use successes.
247
247
248 There are other switches for interpretation that are made at the *task* level. These are
248 There are other switches for interpretation that are made at the *task* level. These are
249 specified via keyword arguments to the client's :meth:`apply` method.
249 specified via keyword arguments to the client's :meth:`apply` method.
250
250
251 after,follow
251 after,follow
252 You may want to run a task *after* a given set of dependencies have been run and/or
252 You may want to run a task *after* a given set of dependencies have been run and/or
253 run it *where* another set of dependencies are met. To support this, every task has an
253 run it *where* another set of dependencies are met. To support this, every task has an
254 `after` dependency to restrict time, and a `follow` dependency to restrict
254 `after` dependency to restrict time, and a `follow` dependency to restrict
255 destination.
255 destination.
256
256
257 timeout
257 timeout
258 You may also want to set a time-limit for how long the scheduler should wait before a
258 You may also want to set a time-limit for how long the scheduler should wait before a
259 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
259 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
260 indicates that the task should never timeout. If the timeout is reached, and the
260 indicates that the task should never timeout. If the timeout is reached, and the
261 scheduler still hasn't been able to assign the task to an engine, the task will fail
261 scheduler still hasn't been able to assign the task to an engine, the task will fail
262 with a :class:`DependencyTimeout`.
262 with a :class:`DependencyTimeout`.
263
263
264 .. note::
264 .. note::
265
265
266 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
266 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
267 task to run after a job submitted via the MUX interface.
267 task to run after a job submitted via the MUX interface.
268
268
269 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
269 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
270 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
270 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
271 `follow` and `after` keywords to :meth:`client.apply`:
271 `follow` and `after` keywords to :meth:`client.apply`:
272
272
273 .. sourcecode:: ipython
273 .. sourcecode:: ipython
274
274
275 In [14]: client.block=False
275 In [14]: client.block=False
276
276
277 In [15]: ar = lview.apply(f, args, kwargs)
277 In [15]: ar = lview.apply(f, args, kwargs)
278
278
279 In [16]: ar2 = lview.apply(f2)
279 In [16]: ar2 = lview.apply(f2)
280
280
281 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
281 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
282
282
283 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
283 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
284
284
285
285
286 .. seealso::
286 .. seealso::
287
287
288 Some parallel workloads can be described as a `Directed Acyclic Graph
288 Some parallel workloads can be described as a `Directed Acyclic Graph
289 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
289 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
290 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
290 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
291 onto task dependencies.
291 onto task dependencies.
292
292
293
293
294
294
295
295 Impossible Dependencies
296 Impossible Dependencies
296 ***********************
297 ***********************
297
298
298 The schedulers do perform some analysis on graph dependencies to determine whether they
299 The schedulers do perform some analysis on graph dependencies to determine whether they
299 are not possible to be met. If the scheduler does discover that a dependency cannot be
300 are not possible to be met. If the scheduler does discover that a dependency cannot be
300 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
301 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
301 scheduler realized that a task can never be run, it won't sit indefinitely in the
302 scheduler realized that a task can never be run, it won't sit indefinitely in the
302 scheduler clogging the pipeline.
303 scheduler clogging the pipeline.
303
304
304 The basic cases that are checked:
305 The basic cases that are checked:
305
306
306 * depending on nonexistent messages
307 * depending on nonexistent messages
307 * `follow` dependencies were run on more than one machine and `all=True`
308 * `follow` dependencies were run on more than one machine and `all=True`
308 * any dependencies failed and `all=True,success=True,failures=False`
309 * any dependencies failed and `all=True,success=True,failures=False`
309 * all dependencies failed and `all=False,success=True,failure=False`
310 * all dependencies failed and `all=False,success=True,failure=False`
310
311
311 .. warning::
312 .. warning::
312
313
313 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 to become impossible to run in obscure situations, so a timeout may be a good choice.
315 to become impossible to run in obscure situations, so a timeout may be a good choice.
315
316
317
318 Retries and Resubmit
319 ====================
320
321 Retries
322 -------
323
324 Another flag for tasks is `retries`. This is an integer, specifying how many times
325 a task should be resubmitted after failure. This is useful for tasks that should still run
326 if their engine was shutdown, or may have some statistical chance of failing. The default
327 is to not retry tasks.
328
329 Resubmit
330 --------
331
332 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
333 you have fixed the error, or because you want to restore the cluster to an interrupted state.
334 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
335 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
336 a task that is pending - only those that have finished, either successful or unsuccessful.
337
316 .. _parallel_schedulers:
338 .. _parallel_schedulers:
317
339
318 Schedulers
340 Schedulers
319 ==========
341 ==========
320
342
321 There are a variety of valid ways to determine where jobs should be assigned in a
343 There are a variety of valid ways to determine where jobs should be assigned in a
322 load-balancing situation. In IPython, we support several standard schemes, and
344 load-balancing situation. In IPython, we support several standard schemes, and
323 even make it easy to define your own. The scheme can be selected via the ``--scheme``
345 even make it easy to define your own. The scheme can be selected via the ``--scheme``
324 argument to :command:`ipcontroller`, or in the :attr:`HubFactory.scheme` attribute
346 argument to :command:`ipcontroller`, or in the :attr:`HubFactory.scheme` attribute
325 of a controller config object.
347 of a controller config object.
326
348
327 The built-in routing schemes:
349 The built-in routing schemes:
328
350
329 To select one of these schemes, simply do::
351 To select one of these schemes, simply do::
330
352
331 $ ipcontroller --scheme <schemename>
353 $ ipcontroller --scheme <schemename>
332 for instance:
354 for instance:
333 $ ipcontroller --scheme lru
355 $ ipcontroller --scheme lru
334
356
335 lru: Least Recently Used
357 lru: Least Recently Used
336
358
337 Always assign work to the least-recently-used engine. A close relative of
359 Always assign work to the least-recently-used engine. A close relative of
338 round-robin, it will be fair with respect to the number of tasks, agnostic
360 round-robin, it will be fair with respect to the number of tasks, agnostic
339 with respect to runtime of each task.
361 with respect to runtime of each task.
340
362
341 plainrandom: Plain Random
363 plainrandom: Plain Random
342
364
343 Randomly picks an engine on which to run.
365 Randomly picks an engine on which to run.
344
366
345 twobin: Two-Bin Random
367 twobin: Two-Bin Random
346
368
347 **Requires numpy**
369 **Requires numpy**
348
370
349 Pick two engines at random, and use the LRU of the two. This is known to be better
371 Pick two engines at random, and use the LRU of the two. This is known to be better
350 than plain random in many cases, but requires a small amount of computation.
372 than plain random in many cases, but requires a small amount of computation.
351
373
352 leastload: Least Load
374 leastload: Least Load
353
375
354 **This is the default scheme**
376 **This is the default scheme**
355
377
356 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
378 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
357
379
358 weighted: Weighted Two-Bin Random
380 weighted: Weighted Two-Bin Random
359
381
360 **Requires numpy**
382 **Requires numpy**
361
383
362 Pick two engines at random using the number of outstanding tasks as inverse weights,
384 Pick two engines at random using the number of outstanding tasks as inverse weights,
363 and use the one with the lower load.
385 and use the one with the lower load.
364
386
365
387
366 Pure ZMQ Scheduler
388 Pure ZMQ Scheduler
367 ------------------
389 ------------------
368
390
369 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
391 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
370 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
371 load-balancing. This scheduler does not support any of the advanced features of the Python
393 load-balancing. This scheduler does not support any of the advanced features of the Python
372 :class:`.Scheduler`.
394 :class:`.Scheduler`.
373
395
374 Disabled features when using the ZMQ Scheduler:
396 Disabled features when using the ZMQ Scheduler:
375
397
376 * Engine unregistration
398 * Engine unregistration
377 Task farming will be disabled if an engine unregisters.
399 Task farming will be disabled if an engine unregisters.
378 Further, if an engine is unregistered during computation, the scheduler may not recover.
400 Further, if an engine is unregistered during computation, the scheduler may not recover.
379 * Dependencies
401 * Dependencies
380 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
402 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
381 based on message content.
403 based on message content.
382 * Early destination notification
404 * Early destination notification
383 The Python schedulers know which engine gets which task, and notify the Hub. This
405 The Python schedulers know which engine gets which task, and notify the Hub. This
384 allows graceful handling of Engines coming and going. There is no way to know
406 allows graceful handling of Engines coming and going. There is no way to know
385 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
407 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
386 engine until they *finish*. This makes recovery from engine shutdown very difficult.
408 engine until they *finish*. This makes recovery from engine shutdown very difficult.
387
409
388
410
389 .. note::
411 .. note::
390
412
391 TODO: performance comparisons
413 TODO: performance comparisons
392
414
393
415
416
417
394 More details
418 More details
395 ============
419 ============
396
420
397 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
421 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
398 of flexibility in how tasks are defined and run. The next places to look are
422 of flexibility in how tasks are defined and run. The next places to look are
399 in the following classes:
423 in the following classes:
400
424
401 * :class:`~IPython.parallel.client.view.LoadBalancedView`
425 * :class:`~IPython.parallel.client.view.LoadBalancedView`
402 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
426 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
403 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
427 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
404 * :mod:`~IPython.parallel.controller.dependency`
428 * :mod:`~IPython.parallel.controller.dependency`
405
429
406 The following is an overview of how to use these classes together:
430 The following is an overview of how to use these classes together:
407
431
408 1. Create a :class:`Client` and :class:`LoadBalancedView`
432 1. Create a :class:`Client` and :class:`LoadBalancedView`
409 2. Define some functions to be run as tasks
433 2. Define some functions to be run as tasks
410 3. Submit your tasks to using the :meth:`apply` method of your
434 3. Submit your tasks to using the :meth:`apply` method of your
411 :class:`LoadBalancedView` instance.
435 :class:`LoadBalancedView` instance.
412 4. Use :meth:`Client.get_result` to get the results of the
436 4. Use :meth:`Client.get_result` to get the results of the
413 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
437 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
414 for and then receive the results.
438 for and then receive the results.
415
439
416 .. seealso::
440 .. seealso::
417
441
418 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
442 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now