##// END OF EJS Templates
add db,resubmit/retries docs
MinRK -
Show More
@@ -0,0 +1,114
1 .. _parallel_db:
2
3 =======================
4 IPython's Task Database
5 =======================
6
7 The IPython Hub stores all task requests and results in a database. Currently supported backends
8 are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
9 this is clients requesting results for tasks they did not submit, via:
10
11 .. sourcecode:: ipython
12
13 In [1]: rc.get_result(task_id)
14
15 However, since we have this DB backend, we provide a direct query method in the :class:`client`
16 for users who want deeper introspection into their task history. The :meth:`db_query` method of
17 the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
18 familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
19 when using other backends, the interface is emulated and only a subset of queries is possible.
20
21 .. seealso::
22
23 MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying
24
25 :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list,
26 and values of either exact values to test, or MongoDB queries, which are dicts of The form:
27 ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies
28 which subset of keys should be retrieved. The default is to retrieve all keys excluding the
29 request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like
30 MongoDB, the `msg_id` key will always be included, whether requested or not.
31
32 TaskRecord keys:
33
34 =============== =============== =============
35 Key Type Description
36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
38 header dict The request header
39 content dict The request content (likely empty)
40 buffers list(bytes) buffers containing serialized request objects
41 submitted datetime timestamp for time of submission (set by client)
42 client_uuid uuid(bytes) IDENT of client's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
44 started datetime time task began execution on engine
45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
47 result_header dict header for result
48 result_content dict content for result
49 result_buffers list(bytes) buffers containing serialized request objects
50 queue bytes The name of the queue for the task ('mux' or 'task')
51 pyin <unused> Python input (unused)
52 pyout <unused> Python output (unused)
53 pyerr <unused> Python traceback (unused)
54 stdout str Stream of stdout data
55 stderr str Stream of stderr data
56
57 =============== =============== =============
58
59 MongoDB operators we emulate on all backends:
60
61 ========== =================
62 Operator Python equivalent
63 ========== =================
64 '$in' in
65 '$nin' not in
66 '$eq' ==
67 '$ne' !=
68 '$ge' >
69 '$gte' >=
70 '$le' <
71 '$lte' <=
72 ========== =================
73
74
75 The DB Query is useful for two primary cases:
76
77 1. deep polling of task status or metadata
78 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
79
80 Example Queries
81 ===============
82
83
84 To get all msg_ids that are not completed, only retrieving their ID and start time:
85
86 .. sourcecode:: ipython
87
88 In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
89
90 All jobs started in the last hour by me:
91
92 .. sourcecode:: ipython
93
94 In [1]: from datetime import datetime, timedelta
95
96 In [2]: hourago = datetime.now() - timedelta(1./24)
97
98 In [3]: recent = rc.db_query({'started' : {'$gte' : hourago },
99 'client_uuid' : rc.session.session})
100
101 All jobs started more than an hour ago, by clients *other than me*:
102
103 .. sourcecode:: ipython
104
105 In [3]: recent = rc.db_query({'started' : {'$le' : hourago },
106 'client_uuid' : {'$ne' : rc.session.session}})
107
108 Result headers for all jobs on engine 3 or 4:
109
110 .. sourcecode:: ipython
111
112 In [1]: uuids = map(rc._engines.get, (3,4))
113
114 In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
@@ -1,1354 +1,1356
1 1 """A semi-synchronous Client for the ZMQ cluster"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2010 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Imports
11 11 #-----------------------------------------------------------------------------
12 12
13 13 import os
14 14 import json
15 15 import time
16 16 import warnings
17 17 from datetime import datetime
18 18 from getpass import getpass
19 19 from pprint import pprint
20 20
21 21 pjoin = os.path.join
22 22
23 23 import zmq
24 24 # from zmq.eventloop import ioloop, zmqstream
25 25
26 26 from IPython.utils.path import get_ipython_dir
27 27 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 28 Dict, List, Bool, Str, Set)
29 29 from IPython.external.decorator import decorator
30 30 from IPython.external.ssh import tunnel
31 31
32 32 from IPython.parallel import error
33 33 from IPython.parallel import streamsession as ss
34 34 from IPython.parallel import util
35 35
36 36 from .asyncresult import AsyncResult, AsyncHubResult
37 37 from IPython.parallel.apps.clusterdir import ClusterDir, ClusterDirError
38 38 from .view import DirectView, LoadBalancedView
39 39
40 40 #--------------------------------------------------------------------------
41 41 # Decorators for Client methods
42 42 #--------------------------------------------------------------------------
43 43
44 44 @decorator
45 45 def spin_first(f, self, *args, **kwargs):
46 46 """Call spin() to sync state prior to calling the method."""
47 47 self.spin()
48 48 return f(self, *args, **kwargs)
49 49
50 50
51 51 #--------------------------------------------------------------------------
52 52 # Classes
53 53 #--------------------------------------------------------------------------
54 54
55 55 class Metadata(dict):
56 56 """Subclass of dict for initializing metadata values.
57 57
58 58 Attribute access works on keys.
59 59
60 60 These objects have a strict set of keys - errors will raise if you try
61 61 to add new keys.
62 62 """
63 63 def __init__(self, *args, **kwargs):
64 64 dict.__init__(self)
65 65 md = {'msg_id' : None,
66 66 'submitted' : None,
67 67 'started' : None,
68 68 'completed' : None,
69 69 'received' : None,
70 70 'engine_uuid' : None,
71 71 'engine_id' : None,
72 72 'follow' : None,
73 73 'after' : None,
74 74 'status' : None,
75 75
76 76 'pyin' : None,
77 77 'pyout' : None,
78 78 'pyerr' : None,
79 79 'stdout' : '',
80 80 'stderr' : '',
81 81 }
82 82 self.update(md)
83 83 self.update(dict(*args, **kwargs))
84 84
85 85 def __getattr__(self, key):
86 86 """getattr aliased to getitem"""
87 87 if key in self.iterkeys():
88 88 return self[key]
89 89 else:
90 90 raise AttributeError(key)
91 91
92 92 def __setattr__(self, key, value):
93 93 """setattr aliased to setitem, with strict"""
94 94 if key in self.iterkeys():
95 95 self[key] = value
96 96 else:
97 97 raise AttributeError(key)
98 98
99 99 def __setitem__(self, key, value):
100 100 """strict static key enforcement"""
101 101 if key in self.iterkeys():
102 102 dict.__setitem__(self, key, value)
103 103 else:
104 104 raise KeyError(key)
105 105
106 106
107 107 class Client(HasTraits):
108 108 """A semi-synchronous client to the IPython ZMQ cluster
109 109
110 110 Parameters
111 111 ----------
112 112
113 113 url_or_file : bytes; zmq url or path to ipcontroller-client.json
114 114 Connection information for the Hub's registration. If a json connector
115 115 file is given, then likely no further configuration is necessary.
116 116 [Default: use profile]
117 117 profile : bytes
118 118 The name of the Cluster profile to be used to find connector information.
119 119 [Default: 'default']
120 120 context : zmq.Context
121 121 Pass an existing zmq.Context instance, otherwise the client will create its own.
122 122 username : bytes
123 123 set username to be passed to the Session object
124 124 debug : bool
125 125 flag for lots of message printing for debug purposes
126 126
127 127 #-------------- ssh related args ----------------
128 128 # These are args for configuring the ssh tunnel to be used
129 129 # credentials are used to forward connections over ssh to the Controller
130 130 # Note that the ip given in `addr` needs to be relative to sshserver
131 131 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
132 132 # and set sshserver as the same machine the Controller is on. However,
133 133 # the only requirement is that sshserver is able to see the Controller
134 134 # (i.e. is within the same trusted network).
135 135
136 136 sshserver : str
137 137 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
138 138 If keyfile or password is specified, and this is not, it will default to
139 139 the ip given in addr.
140 140 sshkey : str; path to public ssh key file
141 141 This specifies a key to be used in ssh login, default None.
142 142 Regular default ssh keys will be used without specifying this argument.
143 143 password : str
144 144 Your ssh password to sshserver. Note that if this is left None,
145 145 you will be prompted for it if passwordless key based login is unavailable.
146 146 paramiko : bool
147 147 flag for whether to use paramiko instead of shell ssh for tunneling.
148 148 [default: True on win32, False else]
149 149
150 150 ------- exec authentication args -------
151 151 If even localhost is untrusted, you can have some protection against
152 152 unauthorized execution by using a key. Messages are still sent
153 153 as cleartext, so if someone can snoop your loopback traffic this will
154 154 not help against malicious attacks.
155 155
156 156 exec_key : str
157 157 an authentication key or file containing a key
158 158 default: None
159 159
160 160
161 161 Attributes
162 162 ----------
163 163
164 164 ids : list of int engine IDs
165 165 requesting the ids attribute always synchronizes
166 166 the registration state. To request ids without synchronization,
167 167 use semi-private _ids attributes.
168 168
169 169 history : list of msg_ids
170 170 a list of msg_ids, keeping track of all the execution
171 171 messages you have submitted in order.
172 172
173 173 outstanding : set of msg_ids
174 174 a set of msg_ids that have been submitted, but whose
175 175 results have not yet been received.
176 176
177 177 results : dict
178 178 a dict of all our results, keyed by msg_id
179 179
180 180 block : bool
181 181 determines default behavior when block not specified
182 182 in execution methods
183 183
184 184 Methods
185 185 -------
186 186
187 187 spin
188 188 flushes incoming results and registration state changes
189 189 control methods spin, and requesting `ids` also ensures up to date
190 190
191 191 wait
192 192 wait on one or more msg_ids
193 193
194 194 execution methods
195 195 apply
196 196 legacy: execute, run
197 197
198 198 data movement
199 199 push, pull, scatter, gather
200 200
201 201 query methods
202 202 queue_status, get_result, purge, result_status
203 203
204 204 control methods
205 205 abort, shutdown
206 206
207 207 """
208 208
209 209
210 210 block = Bool(False)
211 211 outstanding = Set()
212 212 results = Instance('collections.defaultdict', (dict,))
213 213 metadata = Instance('collections.defaultdict', (Metadata,))
214 214 history = List()
215 215 debug = Bool(False)
216 216 profile=CUnicode('default')
217 217
218 218 _outstanding_dict = Instance('collections.defaultdict', (set,))
219 219 _ids = List()
220 220 _connected=Bool(False)
221 221 _ssh=Bool(False)
222 222 _context = Instance('zmq.Context')
223 223 _config = Dict()
224 224 _engines=Instance(util.ReverseDict, (), {})
225 225 # _hub_socket=Instance('zmq.Socket')
226 226 _query_socket=Instance('zmq.Socket')
227 227 _control_socket=Instance('zmq.Socket')
228 228 _iopub_socket=Instance('zmq.Socket')
229 229 _notification_socket=Instance('zmq.Socket')
230 230 _mux_socket=Instance('zmq.Socket')
231 231 _task_socket=Instance('zmq.Socket')
232 232 _task_scheme=Str()
233 233 _closed = False
234 234 _ignored_control_replies=Int(0)
235 235 _ignored_hub_replies=Int(0)
236 236
237 237 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
238 238 context=None, username=None, debug=False, exec_key=None,
239 239 sshserver=None, sshkey=None, password=None, paramiko=None,
240 240 timeout=10
241 241 ):
242 242 super(Client, self).__init__(debug=debug, profile=profile)
243 243 if context is None:
244 244 context = zmq.Context.instance()
245 245 self._context = context
246 246
247 247
248 248 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
249 249 if self._cd is not None:
250 250 if url_or_file is None:
251 251 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
252 252 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
253 253 " Please specify at least one of url_or_file or profile."
254 254
255 255 try:
256 256 util.validate_url(url_or_file)
257 257 except AssertionError:
258 258 if not os.path.exists(url_or_file):
259 259 if self._cd:
260 260 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
261 261 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
262 262 with open(url_or_file) as f:
263 263 cfg = json.loads(f.read())
264 264 else:
265 265 cfg = {'url':url_or_file}
266 266
267 267 # sync defaults from args, json:
268 268 if sshserver:
269 269 cfg['ssh'] = sshserver
270 270 if exec_key:
271 271 cfg['exec_key'] = exec_key
272 272 exec_key = cfg['exec_key']
273 273 sshserver=cfg['ssh']
274 274 url = cfg['url']
275 275 location = cfg.setdefault('location', None)
276 276 cfg['url'] = util.disambiguate_url(cfg['url'], location)
277 277 url = cfg['url']
278 278
279 279 self._config = cfg
280 280
281 281 self._ssh = bool(sshserver or sshkey or password)
282 282 if self._ssh and sshserver is None:
283 283 # default to ssh via localhost
284 284 sshserver = url.split('://')[1].split(':')[0]
285 285 if self._ssh and password is None:
286 286 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
287 287 password=False
288 288 else:
289 289 password = getpass("SSH Password for %s: "%sshserver)
290 290 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
291 291 if exec_key is not None and os.path.isfile(exec_key):
292 292 arg = 'keyfile'
293 293 else:
294 294 arg = 'key'
295 295 key_arg = {arg:exec_key}
296 296 if username is None:
297 297 self.session = ss.StreamSession(**key_arg)
298 298 else:
299 299 self.session = ss.StreamSession(username, **key_arg)
300 300 self._query_socket = self._context.socket(zmq.XREQ)
301 301 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 302 if self._ssh:
303 303 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
304 304 else:
305 305 self._query_socket.connect(url)
306 306
307 307 self.session.debug = self.debug
308 308
309 309 self._notification_handlers = {'registration_notification' : self._register_engine,
310 310 'unregistration_notification' : self._unregister_engine,
311 311 'shutdown_notification' : lambda msg: self.close(),
312 312 }
313 313 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
314 314 'apply_reply' : self._handle_apply_reply}
315 315 self._connect(sshserver, ssh_kwargs, timeout)
316 316
317 317 def __del__(self):
318 318 """cleanup sockets, but _not_ context."""
319 319 self.close()
320 320
321 321 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
322 322 if ipython_dir is None:
323 323 ipython_dir = get_ipython_dir()
324 324 if cluster_dir is not None:
325 325 try:
326 326 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
327 327 return
328 328 except ClusterDirError:
329 329 pass
330 330 elif profile is not None:
331 331 try:
332 332 self._cd = ClusterDir.find_cluster_dir_by_profile(
333 333 ipython_dir, profile)
334 334 return
335 335 except ClusterDirError:
336 336 pass
337 337 self._cd = None
338 338
339 339 def _update_engines(self, engines):
340 340 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
341 341 for k,v in engines.iteritems():
342 342 eid = int(k)
343 343 self._engines[eid] = bytes(v) # force not unicode
344 344 self._ids.append(eid)
345 345 self._ids = sorted(self._ids)
346 346 if sorted(self._engines.keys()) != range(len(self._engines)) and \
347 347 self._task_scheme == 'pure' and self._task_socket:
348 348 self._stop_scheduling_tasks()
349 349
350 350 def _stop_scheduling_tasks(self):
351 351 """Stop scheduling tasks because an engine has been unregistered
352 352 from a pure ZMQ scheduler.
353 353 """
354 354 self._task_socket.close()
355 355 self._task_socket = None
356 356 msg = "An engine has been unregistered, and we are using pure " +\
357 357 "ZMQ task scheduling. Task farming will be disabled."
358 358 if self.outstanding:
359 359 msg += " If you were running tasks when this happened, " +\
360 360 "some `outstanding` msg_ids may never resolve."
361 361 warnings.warn(msg, RuntimeWarning)
362 362
363 363 def _build_targets(self, targets):
364 364 """Turn valid target IDs or 'all' into two lists:
365 365 (int_ids, uuids).
366 366 """
367 367 if not self._ids:
368 368 # flush notification socket if no engines yet, just in case
369 369 if not self.ids:
370 370 raise error.NoEnginesRegistered("Can't build targets without any engines")
371 371
372 372 if targets is None:
373 373 targets = self._ids
374 374 elif isinstance(targets, str):
375 375 if targets.lower() == 'all':
376 376 targets = self._ids
377 377 else:
378 378 raise TypeError("%r not valid str target, must be 'all'"%(targets))
379 379 elif isinstance(targets, int):
380 380 if targets < 0:
381 381 targets = self.ids[targets]
382 382 if targets not in self._ids:
383 383 raise IndexError("No such engine: %i"%targets)
384 384 targets = [targets]
385 385
386 386 if isinstance(targets, slice):
387 387 indices = range(len(self._ids))[targets]
388 388 ids = self.ids
389 389 targets = [ ids[i] for i in indices ]
390 390
391 391 if not isinstance(targets, (tuple, list, xrange)):
392 392 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
393 393
394 394 return [self._engines[t] for t in targets], list(targets)
395 395
396 396 def _connect(self, sshserver, ssh_kwargs, timeout):
397 397 """setup all our socket connections to the cluster. This is called from
398 398 __init__."""
399 399
400 400 # Maybe allow reconnecting?
401 401 if self._connected:
402 402 return
403 403 self._connected=True
404 404
405 405 def connect_socket(s, url):
406 406 url = util.disambiguate_url(url, self._config['location'])
407 407 if self._ssh:
408 408 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
409 409 else:
410 410 return s.connect(url)
411 411
412 412 self.session.send(self._query_socket, 'connection_request')
413 413 r,w,x = zmq.select([self._query_socket],[],[], timeout)
414 414 if not r:
415 415 raise error.TimeoutError("Hub connection request timed out")
416 416 idents,msg = self.session.recv(self._query_socket,mode=0)
417 417 if self.debug:
418 418 pprint(msg)
419 419 msg = ss.Message(msg)
420 420 content = msg.content
421 421 self._config['registration'] = dict(content)
422 422 if content.status == 'ok':
423 423 if content.mux:
424 424 self._mux_socket = self._context.socket(zmq.XREQ)
425 425 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
426 426 connect_socket(self._mux_socket, content.mux)
427 427 if content.task:
428 428 self._task_scheme, task_addr = content.task
429 429 self._task_socket = self._context.socket(zmq.XREQ)
430 430 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
431 431 connect_socket(self._task_socket, task_addr)
432 432 if content.notification:
433 433 self._notification_socket = self._context.socket(zmq.SUB)
434 434 connect_socket(self._notification_socket, content.notification)
435 435 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
436 436 # if content.query:
437 437 # self._query_socket = self._context.socket(zmq.XREQ)
438 438 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
439 439 # connect_socket(self._query_socket, content.query)
440 440 if content.control:
441 441 self._control_socket = self._context.socket(zmq.XREQ)
442 442 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
443 443 connect_socket(self._control_socket, content.control)
444 444 if content.iopub:
445 445 self._iopub_socket = self._context.socket(zmq.SUB)
446 446 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
447 447 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 448 connect_socket(self._iopub_socket, content.iopub)
449 449 self._update_engines(dict(content.engines))
450 450 else:
451 451 self._connected = False
452 452 raise Exception("Failed to connect!")
453 453
454 454 #--------------------------------------------------------------------------
455 455 # handlers and callbacks for incoming messages
456 456 #--------------------------------------------------------------------------
457 457
458 458 def _unwrap_exception(self, content):
459 459 """unwrap exception, and remap engine_id to int."""
460 460 e = error.unwrap_exception(content)
461 461 # print e.traceback
462 462 if e.engine_info:
463 463 e_uuid = e.engine_info['engine_uuid']
464 464 eid = self._engines[e_uuid]
465 465 e.engine_info['engine_id'] = eid
466 466 return e
467 467
468 468 def _extract_metadata(self, header, parent, content):
469 469 md = {'msg_id' : parent['msg_id'],
470 470 'received' : datetime.now(),
471 471 'engine_uuid' : header.get('engine', None),
472 472 'follow' : parent.get('follow', []),
473 473 'after' : parent.get('after', []),
474 474 'status' : content['status'],
475 475 }
476 476
477 477 if md['engine_uuid'] is not None:
478 478 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
479 479
480 480 if 'date' in parent:
481 481 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
482 482 if 'started' in header:
483 483 md['started'] = datetime.strptime(header['started'], util.ISO8601)
484 484 if 'date' in header:
485 485 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
486 486 return md
487 487
488 488 def _register_engine(self, msg):
489 489 """Register a new engine, and update our connection info."""
490 490 content = msg['content']
491 491 eid = content['id']
492 492 d = {eid : content['queue']}
493 493 self._update_engines(d)
494 494
495 495 def _unregister_engine(self, msg):
496 496 """Unregister an engine that has died."""
497 497 content = msg['content']
498 498 eid = int(content['id'])
499 499 if eid in self._ids:
500 500 self._ids.remove(eid)
501 501 uuid = self._engines.pop(eid)
502 502
503 503 self._handle_stranded_msgs(eid, uuid)
504 504
505 505 if self._task_socket and self._task_scheme == 'pure':
506 506 self._stop_scheduling_tasks()
507 507
508 508 def _handle_stranded_msgs(self, eid, uuid):
509 509 """Handle messages known to be on an engine when the engine unregisters.
510 510
511 511 It is possible that this will fire prematurely - that is, an engine will
512 512 go down after completing a result, and the client will be notified
513 513 of the unregistration and later receive the successful result.
514 514 """
515 515
516 516 outstanding = self._outstanding_dict[uuid]
517 517
518 518 for msg_id in list(outstanding):
519 519 if msg_id in self.results:
520 520 # we already
521 521 continue
522 522 try:
523 523 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
524 524 except:
525 525 content = error.wrap_exception()
526 526 # build a fake message:
527 527 parent = {}
528 528 header = {}
529 529 parent['msg_id'] = msg_id
530 530 header['engine'] = uuid
531 531 header['date'] = datetime.now().strftime(util.ISO8601)
532 532 msg = dict(parent_header=parent, header=header, content=content)
533 533 self._handle_apply_reply(msg)
534 534
535 535 def _handle_execute_reply(self, msg):
536 536 """Save the reply to an execute_request into our results.
537 537
538 538 execute messages are never actually used. apply is used instead.
539 539 """
540 540
541 541 parent = msg['parent_header']
542 542 msg_id = parent['msg_id']
543 543 if msg_id not in self.outstanding:
544 544 if msg_id in self.history:
545 545 print ("got stale result: %s"%msg_id)
546 546 else:
547 547 print ("got unknown result: %s"%msg_id)
548 548 else:
549 549 self.outstanding.remove(msg_id)
550 550 self.results[msg_id] = self._unwrap_exception(msg['content'])
551 551
552 552 def _handle_apply_reply(self, msg):
553 553 """Save the reply to an apply_request into our results."""
554 554 parent = msg['parent_header']
555 555 msg_id = parent['msg_id']
556 556 if msg_id not in self.outstanding:
557 557 if msg_id in self.history:
558 558 print ("got stale result: %s"%msg_id)
559 559 print self.results[msg_id]
560 560 print msg
561 561 else:
562 562 print ("got unknown result: %s"%msg_id)
563 563 else:
564 564 self.outstanding.remove(msg_id)
565 565 content = msg['content']
566 566 header = msg['header']
567 567
568 568 # construct metadata:
569 569 md = self.metadata[msg_id]
570 570 md.update(self._extract_metadata(header, parent, content))
571 571 # is this redundant?
572 572 self.metadata[msg_id] = md
573 573
574 574 e_outstanding = self._outstanding_dict[md['engine_uuid']]
575 575 if msg_id in e_outstanding:
576 576 e_outstanding.remove(msg_id)
577 577
578 578 # construct result:
579 579 if content['status'] == 'ok':
580 580 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
581 581 elif content['status'] == 'aborted':
582 582 self.results[msg_id] = error.TaskAborted(msg_id)
583 583 elif content['status'] == 'resubmitted':
584 584 # TODO: handle resubmission
585 585 pass
586 586 else:
587 587 self.results[msg_id] = self._unwrap_exception(content)
588 588
589 589 def _flush_notifications(self):
590 590 """Flush notifications of engine registrations waiting
591 591 in ZMQ queue."""
592 592 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
593 593 while msg is not None:
594 594 if self.debug:
595 595 pprint(msg)
596 596 msg = msg[-1]
597 597 msg_type = msg['msg_type']
598 598 handler = self._notification_handlers.get(msg_type, None)
599 599 if handler is None:
600 600 raise Exception("Unhandled message type: %s"%msg.msg_type)
601 601 else:
602 602 handler(msg)
603 603 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
604 604
605 605 def _flush_results(self, sock):
606 606 """Flush task or queue results waiting in ZMQ queue."""
607 607 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
608 608 while msg is not None:
609 609 if self.debug:
610 610 pprint(msg)
611 611 msg = msg[-1]
612 612 msg_type = msg['msg_type']
613 613 handler = self._queue_handlers.get(msg_type, None)
614 614 if handler is None:
615 615 raise Exception("Unhandled message type: %s"%msg.msg_type)
616 616 else:
617 617 handler(msg)
618 618 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619 619
620 620 def _flush_control(self, sock):
621 621 """Flush replies from the control channel waiting
622 622 in the ZMQ queue.
623 623
624 624 Currently: ignore them."""
625 625 if self._ignored_control_replies <= 0:
626 626 return
627 627 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
628 628 while msg is not None:
629 629 self._ignored_control_replies -= 1
630 630 if self.debug:
631 631 pprint(msg)
632 632 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
633 633
634 634 def _flush_ignored_control(self):
635 635 """flush ignored control replies"""
636 636 while self._ignored_control_replies > 0:
637 637 self.session.recv(self._control_socket)
638 638 self._ignored_control_replies -= 1
639 639
640 640 def _flush_ignored_hub_replies(self):
641 641 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
642 642 while msg is not None:
643 643 msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
644 644
645 645 def _flush_iopub(self, sock):
646 646 """Flush replies from the iopub channel waiting
647 647 in the ZMQ queue.
648 648 """
649 649 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 650 while msg is not None:
651 651 if self.debug:
652 652 pprint(msg)
653 653 msg = msg[-1]
654 654 parent = msg['parent_header']
655 655 msg_id = parent['msg_id']
656 656 content = msg['content']
657 657 header = msg['header']
658 658 msg_type = msg['msg_type']
659 659
660 660 # init metadata:
661 661 md = self.metadata[msg_id]
662 662
663 663 if msg_type == 'stream':
664 664 name = content['name']
665 665 s = md[name] or ''
666 666 md[name] = s + content['data']
667 667 elif msg_type == 'pyerr':
668 668 md.update({'pyerr' : self._unwrap_exception(content)})
669 669 elif msg_type == 'pyin':
670 670 md.update({'pyin' : content['code']})
671 671 else:
672 672 md.update({msg_type : content.get('data', '')})
673 673
674 674 # reduntant?
675 675 self.metadata[msg_id] = md
676 676
677 677 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
678 678
679 679 #--------------------------------------------------------------------------
680 680 # len, getitem
681 681 #--------------------------------------------------------------------------
682 682
683 683 def __len__(self):
684 684 """len(client) returns # of engines."""
685 685 return len(self.ids)
686 686
687 687 def __getitem__(self, key):
688 688 """index access returns DirectView multiplexer objects
689 689
690 690 Must be int, slice, or list/tuple/xrange of ints"""
691 691 if not isinstance(key, (int, slice, tuple, list, xrange)):
692 692 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
693 693 else:
694 694 return self.direct_view(key)
695 695
696 696 #--------------------------------------------------------------------------
697 697 # Begin public methods
698 698 #--------------------------------------------------------------------------
699 699
700 700 @property
701 701 def ids(self):
702 702 """Always up-to-date ids property."""
703 703 self._flush_notifications()
704 704 # always copy:
705 705 return list(self._ids)
706 706
707 707 def close(self):
708 708 if self._closed:
709 709 return
710 710 snames = filter(lambda n: n.endswith('socket'), dir(self))
711 711 for socket in map(lambda name: getattr(self, name), snames):
712 712 if isinstance(socket, zmq.Socket) and not socket.closed:
713 713 socket.close()
714 714 self._closed = True
715 715
716 716 def spin(self):
717 717 """Flush any registration notifications and execution results
718 718 waiting in the ZMQ queue.
719 719 """
720 720 if self._notification_socket:
721 721 self._flush_notifications()
722 722 if self._mux_socket:
723 723 self._flush_results(self._mux_socket)
724 724 if self._task_socket:
725 725 self._flush_results(self._task_socket)
726 726 if self._control_socket:
727 727 self._flush_control(self._control_socket)
728 728 if self._iopub_socket:
729 729 self._flush_iopub(self._iopub_socket)
730 730 if self._query_socket:
731 731 self._flush_ignored_hub_replies()
732 732
733 733 def wait(self, jobs=None, timeout=-1):
734 734 """waits on one or more `jobs`, for up to `timeout` seconds.
735 735
736 736 Parameters
737 737 ----------
738 738
739 739 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
740 740 ints are indices to self.history
741 741 strs are msg_ids
742 742 default: wait on all outstanding messages
743 743 timeout : float
744 744 a time in seconds, after which to give up.
745 745 default is -1, which means no timeout
746 746
747 747 Returns
748 748 -------
749 749
750 750 True : when all msg_ids are done
751 751 False : timeout reached, some msg_ids still outstanding
752 752 """
753 753 tic = time.time()
754 754 if jobs is None:
755 755 theids = self.outstanding
756 756 else:
757 757 if isinstance(jobs, (int, str, AsyncResult)):
758 758 jobs = [jobs]
759 759 theids = set()
760 760 for job in jobs:
761 761 if isinstance(job, int):
762 762 # index access
763 763 job = self.history[job]
764 764 elif isinstance(job, AsyncResult):
765 765 map(theids.add, job.msg_ids)
766 766 continue
767 767 theids.add(job)
768 768 if not theids.intersection(self.outstanding):
769 769 return True
770 770 self.spin()
771 771 while theids.intersection(self.outstanding):
772 772 if timeout >= 0 and ( time.time()-tic ) > timeout:
773 773 break
774 774 time.sleep(1e-3)
775 775 self.spin()
776 776 return len(theids.intersection(self.outstanding)) == 0
777 777
778 778 #--------------------------------------------------------------------------
779 779 # Control methods
780 780 #--------------------------------------------------------------------------
781 781
782 782 @spin_first
783 783 def clear(self, targets=None, block=None):
784 784 """Clear the namespace in target(s)."""
785 785 block = self.block if block is None else block
786 786 targets = self._build_targets(targets)[0]
787 787 for t in targets:
788 788 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
789 789 error = False
790 790 if block:
791 791 self._flush_ignored_control()
792 792 for i in range(len(targets)):
793 793 idents,msg = self.session.recv(self._control_socket,0)
794 794 if self.debug:
795 795 pprint(msg)
796 796 if msg['content']['status'] != 'ok':
797 797 error = self._unwrap_exception(msg['content'])
798 798 else:
799 799 self._ignored_control_replies += len(targets)
800 800 if error:
801 801 raise error
802 802
803 803
804 804 @spin_first
805 805 def abort(self, jobs=None, targets=None, block=None):
806 806 """Abort specific jobs from the execution queues of target(s).
807 807
808 808 This is a mechanism to prevent jobs that have already been submitted
809 809 from executing.
810 810
811 811 Parameters
812 812 ----------
813 813
814 814 jobs : msg_id, list of msg_ids, or AsyncResult
815 815 The jobs to be aborted
816 816
817 817
818 818 """
819 819 block = self.block if block is None else block
820 820 targets = self._build_targets(targets)[0]
821 821 msg_ids = []
822 822 if isinstance(jobs, (basestring,AsyncResult)):
823 823 jobs = [jobs]
824 824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
825 825 if bad_ids:
826 826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
827 827 for j in jobs:
828 828 if isinstance(j, AsyncResult):
829 829 msg_ids.extend(j.msg_ids)
830 830 else:
831 831 msg_ids.append(j)
832 832 content = dict(msg_ids=msg_ids)
833 833 for t in targets:
834 834 self.session.send(self._control_socket, 'abort_request',
835 835 content=content, ident=t)
836 836 error = False
837 837 if block:
838 838 self._flush_ignored_control()
839 839 for i in range(len(targets)):
840 840 idents,msg = self.session.recv(self._control_socket,0)
841 841 if self.debug:
842 842 pprint(msg)
843 843 if msg['content']['status'] != 'ok':
844 844 error = self._unwrap_exception(msg['content'])
845 845 else:
846 846 self._ignored_control_replies += len(targets)
847 847 if error:
848 848 raise error
849 849
850 850 @spin_first
851 851 def shutdown(self, targets=None, restart=False, hub=False, block=None):
852 852 """Terminates one or more engine processes, optionally including the hub."""
853 853 block = self.block if block is None else block
854 854 if hub:
855 855 targets = 'all'
856 856 targets = self._build_targets(targets)[0]
857 857 for t in targets:
858 858 self.session.send(self._control_socket, 'shutdown_request',
859 859 content={'restart':restart},ident=t)
860 860 error = False
861 861 if block or hub:
862 862 self._flush_ignored_control()
863 863 for i in range(len(targets)):
864 864 idents,msg = self.session.recv(self._control_socket, 0)
865 865 if self.debug:
866 866 pprint(msg)
867 867 if msg['content']['status'] != 'ok':
868 868 error = self._unwrap_exception(msg['content'])
869 869 else:
870 870 self._ignored_control_replies += len(targets)
871 871
872 872 if hub:
873 873 time.sleep(0.25)
874 874 self.session.send(self._query_socket, 'shutdown_request')
875 875 idents,msg = self.session.recv(self._query_socket, 0)
876 876 if self.debug:
877 877 pprint(msg)
878 878 if msg['content']['status'] != 'ok':
879 879 error = self._unwrap_exception(msg['content'])
880 880
881 881 if error:
882 882 raise error
883 883
884 884 #--------------------------------------------------------------------------
885 885 # Execution related methods
886 886 #--------------------------------------------------------------------------
887 887
888 888 def _maybe_raise(self, result):
889 889 """wrapper for maybe raising an exception if apply failed."""
890 890 if isinstance(result, error.RemoteError):
891 891 raise result
892 892
893 893 return result
894 894
895 895 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
896 896 ident=None):
897 897 """construct and send an apply message via a socket.
898 898
899 899 This is the principal method with which all engine execution is performed by views.
900 900 """
901 901
902 902 assert not self._closed, "cannot use me anymore, I'm closed!"
903 903 # defaults:
904 904 args = args if args is not None else []
905 905 kwargs = kwargs if kwargs is not None else {}
906 906 subheader = subheader if subheader is not None else {}
907 907
908 908 # validate arguments
909 909 if not callable(f):
910 910 raise TypeError("f must be callable, not %s"%type(f))
911 911 if not isinstance(args, (tuple, list)):
912 912 raise TypeError("args must be tuple or list, not %s"%type(args))
913 913 if not isinstance(kwargs, dict):
914 914 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
915 915 if not isinstance(subheader, dict):
916 916 raise TypeError("subheader must be dict, not %s"%type(subheader))
917 917
918 918 bufs = util.pack_apply_message(f,args,kwargs)
919 919
920 920 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
921 921 subheader=subheader, track=track)
922 922
923 923 msg_id = msg['msg_id']
924 924 self.outstanding.add(msg_id)
925 925 if ident:
926 926 # possibly routed to a specific engine
927 927 if isinstance(ident, list):
928 928 ident = ident[-1]
929 929 if ident in self._engines.values():
930 930 # save for later, in case of engine death
931 931 self._outstanding_dict[ident].add(msg_id)
932 932 self.history.append(msg_id)
933 933 self.metadata[msg_id]['submitted'] = datetime.now()
934 934
935 935 return msg
936 936
937 937 #--------------------------------------------------------------------------
938 938 # construct a View object
939 939 #--------------------------------------------------------------------------
940 940
941 941 def load_balanced_view(self, targets=None):
942 942 """construct a DirectView object.
943 943
944 944 If no arguments are specified, create a LoadBalancedView
945 945 using all engines.
946 946
947 947 Parameters
948 948 ----------
949 949
950 950 targets: list,slice,int,etc. [default: use all engines]
951 951 The subset of engines across which to load-balance
952 952 """
953 953 if targets is not None:
954 954 targets = self._build_targets(targets)[1]
955 955 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
956 956
957 957 def direct_view(self, targets='all'):
958 958 """construct a DirectView object.
959 959
960 960 If no targets are specified, create a DirectView
961 961 using all engines.
962 962
963 963 Parameters
964 964 ----------
965 965
966 966 targets: list,slice,int,etc. [default: use all engines]
967 967 The engines to use for the View
968 968 """
969 969 single = isinstance(targets, int)
970 970 targets = self._build_targets(targets)[1]
971 971 if single:
972 972 targets = targets[0]
973 973 return DirectView(client=self, socket=self._mux_socket, targets=targets)
974 974
975 975 #--------------------------------------------------------------------------
976 976 # Query methods
977 977 #--------------------------------------------------------------------------
978 978
979 979 @spin_first
980 980 def get_result(self, indices_or_msg_ids=None, block=None):
981 981 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
982 982
983 983 If the client already has the results, no request to the Hub will be made.
984 984
985 985 This is a convenient way to construct AsyncResult objects, which are wrappers
986 986 that include metadata about execution, and allow for awaiting results that
987 987 were not submitted by this Client.
988 988
989 989 It can also be a convenient way to retrieve the metadata associated with
990 990 blocking execution, since it always retrieves
991 991
992 992 Examples
993 993 --------
994 994 ::
995 995
996 996 In [10]: r = client.apply()
997 997
998 998 Parameters
999 999 ----------
1000 1000
1001 1001 indices_or_msg_ids : integer history index, str msg_id, or list of either
1002 1002 The indices or msg_ids of indices to be retrieved
1003 1003
1004 1004 block : bool
1005 1005 Whether to wait for the result to be done
1006 1006
1007 1007 Returns
1008 1008 -------
1009 1009
1010 1010 AsyncResult
1011 1011 A single AsyncResult object will always be returned.
1012 1012
1013 1013 AsyncHubResult
1014 1014 A subclass of AsyncResult that retrieves results from the Hub
1015 1015
1016 1016 """
1017 1017 block = self.block if block is None else block
1018 1018 if indices_or_msg_ids is None:
1019 1019 indices_or_msg_ids = -1
1020 1020
1021 1021 if not isinstance(indices_or_msg_ids, (list,tuple)):
1022 1022 indices_or_msg_ids = [indices_or_msg_ids]
1023 1023
1024 1024 theids = []
1025 1025 for id in indices_or_msg_ids:
1026 1026 if isinstance(id, int):
1027 1027 id = self.history[id]
1028 1028 if not isinstance(id, str):
1029 1029 raise TypeError("indices must be str or int, not %r"%id)
1030 1030 theids.append(id)
1031 1031
1032 1032 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1033 1033 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1034 1034
1035 1035 if remote_ids:
1036 1036 ar = AsyncHubResult(self, msg_ids=theids)
1037 1037 else:
1038 1038 ar = AsyncResult(self, msg_ids=theids)
1039 1039
1040 1040 if block:
1041 1041 ar.wait()
1042 1042
1043 1043 return ar
1044 1044
1045 1045 @spin_first
1046 1046 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1047 1047 """Resubmit one or more tasks.
1048 1048
1049 1049 in-flight tasks may not be resubmitted.
1050 1050
1051 1051 Parameters
1052 1052 ----------
1053 1053
1054 1054 indices_or_msg_ids : integer history index, str msg_id, or list of either
1055 1055 The indices or msg_ids of indices to be retrieved
1056 1056
1057 1057 block : bool
1058 1058 Whether to wait for the result to be done
1059 1059
1060 1060 Returns
1061 1061 -------
1062 1062
1063 1063 AsyncHubResult
1064 1064 A subclass of AsyncResult that retrieves results from the Hub
1065 1065
1066 1066 """
1067 1067 block = self.block if block is None else block
1068 1068 if indices_or_msg_ids is None:
1069 1069 indices_or_msg_ids = -1
1070 1070
1071 1071 if not isinstance(indices_or_msg_ids, (list,tuple)):
1072 1072 indices_or_msg_ids = [indices_or_msg_ids]
1073 1073
1074 1074 theids = []
1075 1075 for id in indices_or_msg_ids:
1076 1076 if isinstance(id, int):
1077 1077 id = self.history[id]
1078 1078 if not isinstance(id, str):
1079 1079 raise TypeError("indices must be str or int, not %r"%id)
1080 1080 theids.append(id)
1081 1081
1082 1082 for msg_id in theids:
1083 1083 self.outstanding.discard(msg_id)
1084 1084 if msg_id in self.history:
1085 1085 self.history.remove(msg_id)
1086 1086 self.results.pop(msg_id, None)
1087 1087 self.metadata.pop(msg_id, None)
1088 1088 content = dict(msg_ids = theids)
1089 1089
1090 1090 self.session.send(self._query_socket, 'resubmit_request', content)
1091 1091
1092 1092 zmq.select([self._query_socket], [], [])
1093 1093 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1094 1094 if self.debug:
1095 1095 pprint(msg)
1096 1096 content = msg['content']
1097 1097 if content['status'] != 'ok':
1098 1098 raise self._unwrap_exception(content)
1099 1099
1100 1100 ar = AsyncHubResult(self, msg_ids=theids)
1101 1101
1102 1102 if block:
1103 1103 ar.wait()
1104 1104
1105 1105 return ar
1106 1106
1107 1107 @spin_first
1108 1108 def result_status(self, msg_ids, status_only=True):
1109 1109 """Check on the status of the result(s) of the apply request with `msg_ids`.
1110 1110
1111 1111 If status_only is False, then the actual results will be retrieved, else
1112 1112 only the status of the results will be checked.
1113 1113
1114 1114 Parameters
1115 1115 ----------
1116 1116
1117 1117 msg_ids : list of msg_ids
1118 1118 if int:
1119 1119 Passed as index to self.history for convenience.
1120 1120 status_only : bool (default: True)
1121 1121 if False:
1122 1122 Retrieve the actual results of completed tasks.
1123 1123
1124 1124 Returns
1125 1125 -------
1126 1126
1127 1127 results : dict
1128 1128 There will always be the keys 'pending' and 'completed', which will
1129 1129 be lists of msg_ids that are incomplete or complete. If `status_only`
1130 1130 is False, then completed results will be keyed by their `msg_id`.
1131 1131 """
1132 1132 if not isinstance(msg_ids, (list,tuple)):
1133 1133 msg_ids = [msg_ids]
1134 1134
1135 1135 theids = []
1136 1136 for msg_id in msg_ids:
1137 1137 if isinstance(msg_id, int):
1138 1138 msg_id = self.history[msg_id]
1139 1139 if not isinstance(msg_id, basestring):
1140 1140 raise TypeError("msg_ids must be str, not %r"%msg_id)
1141 1141 theids.append(msg_id)
1142 1142
1143 1143 completed = []
1144 1144 local_results = {}
1145 1145
1146 1146 # comment this block out to temporarily disable local shortcut:
1147 1147 for msg_id in theids:
1148 1148 if msg_id in self.results:
1149 1149 completed.append(msg_id)
1150 1150 local_results[msg_id] = self.results[msg_id]
1151 1151 theids.remove(msg_id)
1152 1152
1153 1153 if theids: # some not locally cached
1154 1154 content = dict(msg_ids=theids, status_only=status_only)
1155 1155 msg = self.session.send(self._query_socket, "result_request", content=content)
1156 1156 zmq.select([self._query_socket], [], [])
1157 1157 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1158 1158 if self.debug:
1159 1159 pprint(msg)
1160 1160 content = msg['content']
1161 1161 if content['status'] != 'ok':
1162 1162 raise self._unwrap_exception(content)
1163 1163 buffers = msg['buffers']
1164 1164 else:
1165 1165 content = dict(completed=[],pending=[])
1166 1166
1167 1167 content['completed'].extend(completed)
1168 1168
1169 1169 if status_only:
1170 1170 return content
1171 1171
1172 1172 failures = []
1173 1173 # load cached results into result:
1174 1174 content.update(local_results)
1175 1175 # update cache with results:
1176 1176 for msg_id in sorted(theids):
1177 1177 if msg_id in content['completed']:
1178 1178 rec = content[msg_id]
1179 1179 parent = rec['header']
1180 1180 header = rec['result_header']
1181 1181 rcontent = rec['result_content']
1182 1182 iodict = rec['io']
1183 1183 if isinstance(rcontent, str):
1184 1184 rcontent = self.session.unpack(rcontent)
1185 1185
1186 1186 md = self.metadata[msg_id]
1187 1187 md.update(self._extract_metadata(header, parent, rcontent))
1188 1188 md.update(iodict)
1189 1189
1190 1190 if rcontent['status'] == 'ok':
1191 1191 res,buffers = util.unserialize_object(buffers)
1192 1192 else:
1193 1193 print rcontent
1194 1194 res = self._unwrap_exception(rcontent)
1195 1195 failures.append(res)
1196 1196
1197 1197 self.results[msg_id] = res
1198 1198 content[msg_id] = res
1199 1199
1200 1200 if len(theids) == 1 and failures:
1201 1201 raise failures[0]
1202 1202
1203 1203 error.collect_exceptions(failures, "result_status")
1204 1204 return content
1205 1205
1206 1206 @spin_first
1207 1207 def queue_status(self, targets='all', verbose=False):
1208 1208 """Fetch the status of engine queues.
1209 1209
1210 1210 Parameters
1211 1211 ----------
1212 1212
1213 1213 targets : int/str/list of ints/strs
1214 1214 the engines whose states are to be queried.
1215 1215 default : all
1216 1216 verbose : bool
1217 1217 Whether to return lengths only, or lists of ids for each element
1218 1218 """
1219 1219 engine_ids = self._build_targets(targets)[1]
1220 1220 content = dict(targets=engine_ids, verbose=verbose)
1221 1221 self.session.send(self._query_socket, "queue_request", content=content)
1222 1222 idents,msg = self.session.recv(self._query_socket, 0)
1223 1223 if self.debug:
1224 1224 pprint(msg)
1225 1225 content = msg['content']
1226 1226 status = content.pop('status')
1227 1227 if status != 'ok':
1228 1228 raise self._unwrap_exception(content)
1229 1229 content = util.rekey(content)
1230 1230 if isinstance(targets, int):
1231 1231 return content[targets]
1232 1232 else:
1233 1233 return content
1234 1234
1235 1235 @spin_first
1236 1236 def purge_results(self, jobs=[], targets=[]):
1237 1237 """Tell the Hub to forget results.
1238 1238
1239 1239 Individual results can be purged by msg_id, or the entire
1240 1240 history of specific targets can be purged.
1241 1241
1242 1242 Parameters
1243 1243 ----------
1244 1244
1245 1245 jobs : str or list of str or AsyncResult objects
1246 1246 the msg_ids whose results should be forgotten.
1247 1247 targets : int/str/list of ints/strs
1248 1248 The targets, by uuid or int_id, whose entire history is to be purged.
1249 1249 Use `targets='all'` to scrub everything from the Hub's memory.
1250 1250
1251 1251 default : None
1252 1252 """
1253 1253 if not targets and not jobs:
1254 1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1255 1255 if targets:
1256 1256 targets = self._build_targets(targets)[1]
1257 1257
1258 1258 # construct msg_ids from jobs
1259 1259 msg_ids = []
1260 1260 if isinstance(jobs, (basestring,AsyncResult)):
1261 1261 jobs = [jobs]
1262 1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1263 1263 if bad_ids:
1264 1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1265 1265 for j in jobs:
1266 1266 if isinstance(j, AsyncResult):
1267 1267 msg_ids.extend(j.msg_ids)
1268 1268 else:
1269 1269 msg_ids.append(j)
1270 1270
1271 1271 content = dict(targets=targets, msg_ids=msg_ids)
1272 1272 self.session.send(self._query_socket, "purge_request", content=content)
1273 1273 idents, msg = self.session.recv(self._query_socket, 0)
1274 1274 if self.debug:
1275 1275 pprint(msg)
1276 1276 content = msg['content']
1277 1277 if content['status'] != 'ok':
1278 1278 raise self._unwrap_exception(content)
1279 1279
1280 1280 @spin_first
1281 1281 def hub_history(self):
1282 1282 """Get the Hub's history
1283 1283
1284 1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1285 1285 This will contain the history of all clients, and, depending on configuration,
1286 1286 may contain history across multiple cluster sessions.
1287 1287
1288 1288 Any msg_id returned here is a valid argument to `get_result`.
1289 1289
1290 1290 Returns
1291 1291 -------
1292 1292
1293 1293 msg_ids : list of strs
1294 1294 list of all msg_ids, ordered by task submission time.
1295 1295 """
1296 1296
1297 1297 self.session.send(self._query_socket, "history_request", content={})
1298 1298 idents, msg = self.session.recv(self._query_socket, 0)
1299 1299
1300 1300 if self.debug:
1301 1301 pprint(msg)
1302 1302 content = msg['content']
1303 1303 if content['status'] != 'ok':
1304 1304 raise self._unwrap_exception(content)
1305 1305 else:
1306 1306 return content['history']
1307 1307
1308 1308 @spin_first
1309 1309 def db_query(self, query, keys=None):
1310 1310 """Query the Hub's TaskRecord database
1311 1311
1312 1312 This will return a list of task record dicts that match `query`
1313 1313
1314 1314 Parameters
1315 1315 ----------
1316 1316
1317 1317 query : mongodb query dict
1318 1318 The search dict. See mongodb query docs for details.
1319 1319 keys : list of strs [optional]
1320 THe subset of keys to be returned. The default is to fetch everything.
1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1321 1321 'msg_id' will *always* be included.
1322 1322 """
1323 if isinstance(keys, basestring):
1324 keys = [keys]
1323 1325 content = dict(query=query, keys=keys)
1324 1326 self.session.send(self._query_socket, "db_request", content=content)
1325 1327 idents, msg = self.session.recv(self._query_socket, 0)
1326 1328 if self.debug:
1327 1329 pprint(msg)
1328 1330 content = msg['content']
1329 1331 if content['status'] != 'ok':
1330 1332 raise self._unwrap_exception(content)
1331 1333
1332 1334 records = content['records']
1333 1335 buffer_lens = content['buffer_lens']
1334 1336 result_buffer_lens = content['result_buffer_lens']
1335 1337 buffers = msg['buffers']
1336 1338 has_bufs = buffer_lens is not None
1337 1339 has_rbufs = result_buffer_lens is not None
1338 1340 for i,rec in enumerate(records):
1339 1341 # relink buffers
1340 1342 if has_bufs:
1341 1343 blen = buffer_lens[i]
1342 1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1343 1345 if has_rbufs:
1344 1346 blen = result_buffer_lens[i]
1345 1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1346 1348 # turn timestamps back into times
1347 1349 for key in 'submitted started completed resubmitted'.split():
1348 1350 maybedate = rec.get(key, None)
1349 1351 if maybedate and util.ISO8601_RE.match(maybedate):
1350 1352 rec[key] = datetime.strptime(maybedate, util.ISO8601)
1351 1353
1352 1354 return records
1353 1355
1354 1356 __all__ = [ 'Client' ]
@@ -1,22 +1,23
1 1 .. _parallel_index:
2 2
3 3 ====================================
4 4 Using IPython for parallel computing
5 5 ====================================
6 6
7 7 .. toctree::
8 8 :maxdepth: 2
9 9
10 10 parallel_intro.txt
11 11 parallel_process.txt
12 12 parallel_multiengine.txt
13 13 parallel_task.txt
14 14 parallel_mpi.txt
15 parallel_db.txt
15 16 parallel_security.txt
16 17 parallel_winhpc.txt
17 18 parallel_demos.txt
18 19 dag_dependencies.txt
19 20 parallel_details.txt
20 21 parallel_transition.txt
21 22
22 23
@@ -1,418 +1,442
1 1 .. _parallel_task:
2 2
3 3 ==========================
4 4 The IPython task interface
5 5 ==========================
6 6
7 7 The task interface to the cluster presents the engines as a fault tolerant,
8 8 dynamic load-balanced system of workers. Unlike the multiengine interface, in
9 9 the task interface the user have no direct access to individual engines. By
10 10 allowing the IPython scheduler to assign work, this interface is simultaneously
11 11 simpler and more powerful.
12 12
13 13 Best of all, the user can use both of these interfaces running at the same time
14 14 to take advantage of their respective strengths. When the user can break up
15 15 the user's work into segments that do not depend on previous execution, the
16 16 task interface is ideal. But it also has more power and flexibility, allowing
17 17 the user to guide the distribution of jobs, without having to assign tasks to
18 18 engines explicitly.
19 19
20 20 Starting the IPython controller and engines
21 21 ===========================================
22 22
23 23 To follow along with this tutorial, you will need to start the IPython
24 24 controller and four IPython engines. The simplest way of doing this is to use
25 25 the :command:`ipcluster` command::
26 26
27 27 $ ipcluster start -n 4
28 28
29 29 For more detailed information about starting the controller and engines, see
30 30 our :ref:`introduction <ip1par>` to using IPython for parallel computing.
31 31
32 32 Creating a ``Client`` instance
33 33 ==============================
34 34
35 35 The first step is to import the IPython :mod:`IPython.parallel`
36 36 module and then create a :class:`.Client` instance, and we will also be using
37 37 a :class:`LoadBalancedView`, here called `lview`:
38 38
39 39 .. sourcecode:: ipython
40 40
41 41 In [1]: from IPython.parallel import Client
42 42
43 43 In [2]: rc = Client()
44 44
45 45
46 46 This form assumes that the controller was started on localhost with default
47 47 configuration. If not, the location of the controller must be given as an
48 48 argument to the constructor:
49 49
50 50 .. sourcecode:: ipython
51 51
52 52 # for a visible LAN controller listening on an external port:
53 53 In [2]: rc = Client('tcp://192.168.1.16:10101')
54 54 # or to connect with a specific profile you have set up:
55 55 In [3]: rc = Client(profile='mpi')
56 56
57 57 For load-balanced execution, we will make use of a :class:`LoadBalancedView` object, which can
58 58 be constructed via the client's :meth:`load_balanced_view` method:
59 59
60 60 .. sourcecode:: ipython
61 61
62 62 In [4]: lview = rc.load_balanced_view() # default load-balanced view
63 63
64 64 .. seealso::
65 65
66 66 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
67 67
68 68
69 69 Quick and easy parallelism
70 70 ==========================
71 71
72 72 In many cases, you simply want to apply a Python function to a sequence of
73 73 objects, but *in parallel*. Like the multiengine interface, these can be
74 74 implemented via the task interface. The exact same tools can perform these
75 75 actions in load-balanced ways as well as multiplexed ways: a parallel version
76 76 of :func:`map` and :func:`@parallel` function decorator. If one specifies the
77 77 argument `balanced=True`, then they are dynamically load balanced. Thus, if the
78 78 execution time per item varies significantly, you should use the versions in
79 79 the task interface.
80 80
81 81 Parallel map
82 82 ------------
83 83
84 84 To load-balance :meth:`map`,simply use a LoadBalancedView:
85 85
86 86 .. sourcecode:: ipython
87 87
88 88 In [62]: lview.block = True
89 89
90 90 In [63]: serial_result = map(lambda x:x**10, range(32))
91 91
92 92 In [64]: parallel_result = lview.map(lambda x:x**10, range(32))
93 93
94 94 In [65]: serial_result==parallel_result
95 95 Out[65]: True
96 96
97 97 Parallel function decorator
98 98 ---------------------------
99 99
100 100 Parallel functions are just like normal function, but they can be called on
101 101 sequences and *in parallel*. The multiengine interface provides a decorator
102 102 that turns any Python function into a parallel function:
103 103
104 104 .. sourcecode:: ipython
105 105
106 106 In [10]: @lview.parallel()
107 107 ....: def f(x):
108 108 ....: return 10.0*x**4
109 109 ....:
110 110
111 111 In [11]: f.map(range(32)) # this is done in parallel
112 112 Out[11]: [0.0,10.0,160.0,...]
113 113
114 114 .. _parallel_dependencies:
115 115
116 116 Dependencies
117 117 ============
118 118
119 119 Often, pure atomic load-balancing is too primitive for your work. In these cases, you
120 120 may want to associate some kind of `Dependency` that describes when, where, or whether
121 121 a task can be run. In IPython, we provide two types of dependencies:
122 122 `Functional Dependencies`_ and `Graph Dependencies`_
123 123
124 124 .. note::
125 125
126 126 It is important to note that the pure ZeroMQ scheduler does not support dependencies,
127 127 and you will see errors or warnings if you try to use dependencies with the pure
128 128 scheduler.
129 129
130 130 Functional Dependencies
131 131 -----------------------
132 132
133 133 Functional dependencies are used to determine whether a given engine is capable of running
134 134 a particular task. This is implemented via a special :class:`Exception` class,
135 135 :class:`UnmetDependency`, found in `IPython.parallel.error`. Its use is very simple:
136 136 if a task fails with an UnmetDependency exception, then the scheduler, instead of relaying
137 137 the error up to the client like any other error, catches the error, and submits the task
138 138 to a different engine. This will repeat indefinitely, and a task will never be submitted
139 139 to a given engine a second time.
140 140
141 141 You can manually raise the :class:`UnmetDependency` yourself, but IPython has provided
142 142 some decorators for facilitating this behavior.
143 143
144 144 There are two decorators and a class used for functional dependencies:
145 145
146 146 .. sourcecode:: ipython
147 147
148 148 In [9]: from IPython.parallel import depend, require, dependent
149 149
150 150 @require
151 151 ********
152 152
153 153 The simplest sort of dependency is requiring that a Python module is available. The
154 154 ``@require`` decorator lets you define a function that will only run on engines where names
155 155 you specify are importable:
156 156
157 157 .. sourcecode:: ipython
158 158
159 159 In [10]: @require('numpy', 'zmq')
160 160 ...: def myfunc():
161 161 ...: return dostuff()
162 162
163 163 Now, any time you apply :func:`myfunc`, the task will only run on a machine that has
164 164 numpy and pyzmq available, and when :func:`myfunc` is called, numpy and zmq will be imported.
165 165
166 166 @depend
167 167 *******
168 168
169 169 The ``@depend`` decorator lets you decorate any function with any *other* function to
170 170 evaluate the dependency. The dependency function will be called at the start of the task,
171 171 and if it returns ``False``, then the dependency will be considered unmet, and the task
172 172 will be assigned to another engine. If the dependency returns *anything other than
173 173 ``False``*, the rest of the task will continue.
174 174
175 175 .. sourcecode:: ipython
176 176
177 177 In [10]: def platform_specific(plat):
178 178 ...: import sys
179 179 ...: return sys.platform == plat
180 180
181 181 In [11]: @depend(platform_specific, 'darwin')
182 182 ...: def mactask():
183 183 ...: do_mac_stuff()
184 184
185 185 In [12]: @depend(platform_specific, 'nt')
186 186 ...: def wintask():
187 187 ...: do_windows_stuff()
188 188
189 189 In this case, any time you apply ``mytask``, it will only run on an OSX machine.
190 190 ``@depend`` is just like ``apply``, in that it has a ``@depend(f,*args,**kwargs)``
191 191 signature.
192 192
193 193 dependents
194 194 **********
195 195
196 196 You don't have to use the decorators on your tasks, if for instance you may want
197 197 to run tasks with a single function but varying dependencies, you can directly construct
198 198 the :class:`dependent` object that the decorators use:
199 199
200 200 .. sourcecode::ipython
201 201
202 202 In [13]: def mytask(*args):
203 203 ...: dostuff()
204 204
205 205 In [14]: mactask = dependent(mytask, platform_specific, 'darwin')
206 206 # this is the same as decorating the declaration of mytask with @depend
207 207 # but you can do it again:
208 208
209 209 In [15]: wintask = dependent(mytask, platform_specific, 'nt')
210 210
211 211 # in general:
212 212 In [16]: t = dependent(f, g, *dargs, **dkwargs)
213 213
214 214 # is equivalent to:
215 215 In [17]: @depend(g, *dargs, **dkwargs)
216 216 ...: def t(a,b,c):
217 217 ...: # contents of f
218 218
219 219 Graph Dependencies
220 220 ------------------
221 221
222 222 Sometimes you want to restrict the time and/or location to run a given task as a function
223 223 of the time and/or location of other tasks. This is implemented via a subclass of
224 224 :class:`set`, called a :class:`Dependency`. A Dependency is just a set of `msg_ids`
225 225 corresponding to tasks, and a few attributes to guide how to decide when the Dependency
226 226 has been met.
227 227
228 228 The switches we provide for interpreting whether a given dependency set has been met:
229 229
230 230 any|all
231 231 Whether the dependency is considered met if *any* of the dependencies are done, or
232 232 only after *all* of them have finished. This is set by a Dependency's :attr:`all`
233 233 boolean attribute, which defaults to ``True``.
234 234
235 235 success [default: True]
236 236 Whether to consider tasks that succeeded as fulfilling dependencies.
237 237
238 238 failure [default : False]
239 239 Whether to consider tasks that failed as fulfilling dependencies.
240 240 using `failure=True,success=False` is useful for setting up cleanup tasks, to be run
241 241 only when tasks have failed.
242 242
243 243 Sometimes you want to run a task after another, but only if that task succeeded. In this case,
244 244 ``success`` should be ``True`` and ``failure`` should be ``False``. However sometimes you may
245 245 not care whether the task succeeds, and always want the second task to run, in which case you
246 246 should use `success=failure=True`. The default behavior is to only use successes.
247 247
248 248 There are other switches for interpretation that are made at the *task* level. These are
249 249 specified via keyword arguments to the client's :meth:`apply` method.
250 250
251 251 after,follow
252 252 You may want to run a task *after* a given set of dependencies have been run and/or
253 253 run it *where* another set of dependencies are met. To support this, every task has an
254 254 `after` dependency to restrict time, and a `follow` dependency to restrict
255 255 destination.
256 256
257 257 timeout
258 258 You may also want to set a time-limit for how long the scheduler should wait before a
259 259 task's dependencies are met. This is done via a `timeout`, which defaults to 0, which
260 260 indicates that the task should never timeout. If the timeout is reached, and the
261 261 scheduler still hasn't been able to assign the task to an engine, the task will fail
262 262 with a :class:`DependencyTimeout`.
263 263
264 264 .. note::
265 265
266 266 Dependencies only work within the task scheduler. You cannot instruct a load-balanced
267 267 task to run after a job submitted via the MUX interface.
268 268
269 269 The simplest form of Dependencies is with `all=True,success=True,failure=False`. In these cases,
270 270 you can skip using Dependency objects, and just pass msg_ids or AsyncResult objects as the
271 271 `follow` and `after` keywords to :meth:`client.apply`:
272 272
273 273 .. sourcecode:: ipython
274 274
275 275 In [14]: client.block=False
276 276
277 277 In [15]: ar = lview.apply(f, args, kwargs)
278 278
279 279 In [16]: ar2 = lview.apply(f2)
280 280
281 281 In [17]: ar3 = lview.apply_with_flags(f3, after=[ar,ar2])
282 282
283 283 In [17]: ar4 = lview.apply_with_flags(f3, follow=[ar], timeout=2.5)
284 284
285 285
286 286 .. seealso::
287 287
288 288 Some parallel workloads can be described as a `Directed Acyclic Graph
289 289 <http://en.wikipedia.org/wiki/Directed_acyclic_graph>`_, or DAG. See :ref:`DAG
290 290 Dependencies <dag_dependencies>` for an example demonstrating how to use map a NetworkX DAG
291 291 onto task dependencies.
292 292
293 293
294 294
295
295 296 Impossible Dependencies
296 297 ***********************
297 298
298 299 The schedulers do perform some analysis on graph dependencies to determine whether they
299 300 are not possible to be met. If the scheduler does discover that a dependency cannot be
300 301 met, then the task will fail with an :class:`ImpossibleDependency` error. This way, if the
301 302 scheduler realized that a task can never be run, it won't sit indefinitely in the
302 303 scheduler clogging the pipeline.
303 304
304 305 The basic cases that are checked:
305 306
306 307 * depending on nonexistent messages
307 308 * `follow` dependencies were run on more than one machine and `all=True`
308 309 * any dependencies failed and `all=True,success=True,failures=False`
309 310 * all dependencies failed and `all=False,success=True,failure=False`
310 311
311 312 .. warning::
312 313
313 314 This analysis has not been proven to be rigorous, so it is likely possible for tasks
314 315 to become impossible to run in obscure situations, so a timeout may be a good choice.
315 316
317
318 Retries and Resubmit
319 ====================
320
321 Retries
322 -------
323
324 Another flag for tasks is `retries`. This is an integer, specifying how many times
325 a task should be resubmitted after failure. This is useful for tasks that should still run
326 if their engine was shutdown, or may have some statistical chance of failing. The default
327 is to not retry tasks.
328
329 Resubmit
330 --------
331
332 Sometimes you may want to re-run a task. This could be because it failed for some reason, and
333 you have fixed the error, or because you want to restore the cluster to an interrupted state.
334 For this, the :class:`Client` has a :meth:`rc.resubmit` method. This simply takes one or more
335 msg_ids, and returns an :class:`AsyncHubResult` for the result(s). You cannot resubmit
336 a task that is pending - only those that have finished, either successful or unsuccessful.
337
316 338 .. _parallel_schedulers:
317 339
318 340 Schedulers
319 341 ==========
320 342
321 343 There are a variety of valid ways to determine where jobs should be assigned in a
322 344 load-balancing situation. In IPython, we support several standard schemes, and
323 345 even make it easy to define your own. The scheme can be selected via the ``--scheme``
324 346 argument to :command:`ipcontroller`, or in the :attr:`HubFactory.scheme` attribute
325 347 of a controller config object.
326 348
327 349 The built-in routing schemes:
328 350
329 351 To select one of these schemes, simply do::
330 352
331 353 $ ipcontroller --scheme <schemename>
332 354 for instance:
333 355 $ ipcontroller --scheme lru
334 356
335 357 lru: Least Recently Used
336 358
337 359 Always assign work to the least-recently-used engine. A close relative of
338 360 round-robin, it will be fair with respect to the number of tasks, agnostic
339 361 with respect to runtime of each task.
340 362
341 363 plainrandom: Plain Random
342 364
343 365 Randomly picks an engine on which to run.
344 366
345 367 twobin: Two-Bin Random
346 368
347 369 **Requires numpy**
348 370
349 371 Pick two engines at random, and use the LRU of the two. This is known to be better
350 372 than plain random in many cases, but requires a small amount of computation.
351 373
352 374 leastload: Least Load
353 375
354 376 **This is the default scheme**
355 377
356 378 Always assign tasks to the engine with the fewest outstanding tasks (LRU breaks tie).
357 379
358 380 weighted: Weighted Two-Bin Random
359 381
360 382 **Requires numpy**
361 383
362 384 Pick two engines at random using the number of outstanding tasks as inverse weights,
363 385 and use the one with the lower load.
364 386
365 387
366 388 Pure ZMQ Scheduler
367 389 ------------------
368 390
369 391 For maximum throughput, the 'pure' scheme is not Python at all, but a C-level
370 392 :class:`MonitoredQueue` from PyZMQ, which uses a ZeroMQ ``XREQ`` socket to perform all
371 393 load-balancing. This scheduler does not support any of the advanced features of the Python
372 394 :class:`.Scheduler`.
373 395
374 396 Disabled features when using the ZMQ Scheduler:
375 397
376 398 * Engine unregistration
377 399 Task farming will be disabled if an engine unregisters.
378 400 Further, if an engine is unregistered during computation, the scheduler may not recover.
379 401 * Dependencies
380 402 Since there is no Python logic inside the Scheduler, routing decisions cannot be made
381 403 based on message content.
382 404 * Early destination notification
383 405 The Python schedulers know which engine gets which task, and notify the Hub. This
384 406 allows graceful handling of Engines coming and going. There is no way to know
385 407 where ZeroMQ messages have gone, so there is no way to know what tasks are on which
386 408 engine until they *finish*. This makes recovery from engine shutdown very difficult.
387 409
388 410
389 411 .. note::
390 412
391 413 TODO: performance comparisons
392 414
393 415
416
417
394 418 More details
395 419 ============
396 420
397 421 The :class:`LoadBalancedView` has many more powerful features that allow quite a bit
398 422 of flexibility in how tasks are defined and run. The next places to look are
399 423 in the following classes:
400 424
401 425 * :class:`~IPython.parallel.client.view.LoadBalancedView`
402 426 * :class:`~IPython.parallel.client.asyncresult.AsyncResult`
403 427 * :meth:`~IPython.parallel.client.view.LoadBalancedView.apply`
404 428 * :mod:`~IPython.parallel.controller.dependency`
405 429
406 430 The following is an overview of how to use these classes together:
407 431
408 432 1. Create a :class:`Client` and :class:`LoadBalancedView`
409 433 2. Define some functions to be run as tasks
410 434 3. Submit your tasks to using the :meth:`apply` method of your
411 435 :class:`LoadBalancedView` instance.
412 436 4. Use :meth:`Client.get_result` to get the results of the
413 437 tasks, or use the :meth:`AsyncResult.get` method of the results to wait
414 438 for and then receive the results.
415 439
416 440 .. seealso::
417 441
418 442 A demo of :ref:`DAG Dependencies <dag_dependencies>` with NetworkX and IPython.
General Comments 0
You need to be logged in to leave comments. Login now