##// END OF EJS Templates
Merge pull request #3281 from minrk/noparent...
Brian E. Granger -
r10602:720ee58d merge
parent child Browse files
Show More
@@ -1,1823 +1,1824 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36 36
37 37 from IPython.utils.coloransi import TermColors
38 38 from IPython.utils.jsonutil import rekey
39 39 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
40 40 from IPython.utils.path import get_ipython_dir
41 41 from IPython.utils.py3compat import cast_bytes
42 42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 43 Dict, List, Bool, Set, Any)
44 44 from IPython.external.decorator import decorator
45 45 from IPython.external.ssh import tunnel
46 46
47 47 from IPython.parallel import Reference
48 48 from IPython.parallel import error
49 49 from IPython.parallel import util
50 50
51 51 from IPython.kernel.zmq.session import Session, Message
52 52 from IPython.kernel.zmq import serialize
53 53
54 54 from .asyncresult import AsyncResult, AsyncHubResult
55 55 from .view import DirectView, LoadBalancedView
56 56
57 57 if sys.version_info[0] >= 3:
58 58 # xrange is used in a couple 'isinstance' tests in py2
59 59 # should be just 'range' in 3k
60 60 xrange = range
61 61
62 62 #--------------------------------------------------------------------------
63 63 # Decorators for Client methods
64 64 #--------------------------------------------------------------------------
65 65
66 66 @decorator
67 67 def spin_first(f, self, *args, **kwargs):
68 68 """Call spin() to sync state prior to calling the method."""
69 69 self.spin()
70 70 return f(self, *args, **kwargs)
71 71
72 72
73 73 #--------------------------------------------------------------------------
74 74 # Classes
75 75 #--------------------------------------------------------------------------
76 76
77 77
78 78 class ExecuteReply(object):
79 79 """wrapper for finished Execute results"""
80 80 def __init__(self, msg_id, content, metadata):
81 81 self.msg_id = msg_id
82 82 self._content = content
83 83 self.execution_count = content['execution_count']
84 84 self.metadata = metadata
85 85
86 86 def __getitem__(self, key):
87 87 return self.metadata[key]
88 88
89 89 def __getattr__(self, key):
90 90 if key not in self.metadata:
91 91 raise AttributeError(key)
92 92 return self.metadata[key]
93 93
94 94 def __repr__(self):
95 95 pyout = self.metadata['pyout'] or {'data':{}}
96 96 text_out = pyout['data'].get('text/plain', '')
97 97 if len(text_out) > 32:
98 98 text_out = text_out[:29] + '...'
99 99
100 100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
101 101
102 102 def _repr_pretty_(self, p, cycle):
103 103 pyout = self.metadata['pyout'] or {'data':{}}
104 104 text_out = pyout['data'].get('text/plain', '')
105 105
106 106 if not text_out:
107 107 return
108 108
109 109 try:
110 110 ip = get_ipython()
111 111 except NameError:
112 112 colors = "NoColor"
113 113 else:
114 114 colors = ip.colors
115 115
116 116 if colors == "NoColor":
117 117 out = normal = ""
118 118 else:
119 119 out = TermColors.Red
120 120 normal = TermColors.Normal
121 121
122 122 if '\n' in text_out and not text_out.startswith('\n'):
123 123 # add newline for multiline reprs
124 124 text_out = '\n' + text_out
125 125
126 126 p.text(
127 127 out + u'Out[%i:%i]: ' % (
128 128 self.metadata['engine_id'], self.execution_count
129 129 ) + normal + text_out
130 130 )
131 131
132 132 def _repr_html_(self):
133 133 pyout = self.metadata['pyout'] or {'data':{}}
134 134 return pyout['data'].get("text/html")
135 135
136 136 def _repr_latex_(self):
137 137 pyout = self.metadata['pyout'] or {'data':{}}
138 138 return pyout['data'].get("text/latex")
139 139
140 140 def _repr_json_(self):
141 141 pyout = self.metadata['pyout'] or {'data':{}}
142 142 return pyout['data'].get("application/json")
143 143
144 144 def _repr_javascript_(self):
145 145 pyout = self.metadata['pyout'] or {'data':{}}
146 146 return pyout['data'].get("application/javascript")
147 147
148 148 def _repr_png_(self):
149 149 pyout = self.metadata['pyout'] or {'data':{}}
150 150 return pyout['data'].get("image/png")
151 151
152 152 def _repr_jpeg_(self):
153 153 pyout = self.metadata['pyout'] or {'data':{}}
154 154 return pyout['data'].get("image/jpeg")
155 155
156 156 def _repr_svg_(self):
157 157 pyout = self.metadata['pyout'] or {'data':{}}
158 158 return pyout['data'].get("image/svg+xml")
159 159
160 160
161 161 class Metadata(dict):
162 162 """Subclass of dict for initializing metadata values.
163 163
164 164 Attribute access works on keys.
165 165
166 166 These objects have a strict set of keys - errors will raise if you try
167 167 to add new keys.
168 168 """
169 169 def __init__(self, *args, **kwargs):
170 170 dict.__init__(self)
171 171 md = {'msg_id' : None,
172 172 'submitted' : None,
173 173 'started' : None,
174 174 'completed' : None,
175 175 'received' : None,
176 176 'engine_uuid' : None,
177 177 'engine_id' : None,
178 178 'follow' : None,
179 179 'after' : None,
180 180 'status' : None,
181 181
182 182 'pyin' : None,
183 183 'pyout' : None,
184 184 'pyerr' : None,
185 185 'stdout' : '',
186 186 'stderr' : '',
187 187 'outputs' : [],
188 188 'data': {},
189 189 'outputs_ready' : False,
190 190 }
191 191 self.update(md)
192 192 self.update(dict(*args, **kwargs))
193 193
194 194 def __getattr__(self, key):
195 195 """getattr aliased to getitem"""
196 196 if key in self.iterkeys():
197 197 return self[key]
198 198 else:
199 199 raise AttributeError(key)
200 200
201 201 def __setattr__(self, key, value):
202 202 """setattr aliased to setitem, with strict"""
203 203 if key in self.iterkeys():
204 204 self[key] = value
205 205 else:
206 206 raise AttributeError(key)
207 207
208 208 def __setitem__(self, key, value):
209 209 """strict static key enforcement"""
210 210 if key in self.iterkeys():
211 211 dict.__setitem__(self, key, value)
212 212 else:
213 213 raise KeyError(key)
214 214
215 215
216 216 class Client(HasTraits):
217 217 """A semi-synchronous client to the IPython ZMQ cluster
218 218
219 219 Parameters
220 220 ----------
221 221
222 222 url_file : str/unicode; path to ipcontroller-client.json
223 223 This JSON file should contain all the information needed to connect to a cluster,
224 224 and is likely the only argument needed.
225 225 Connection information for the Hub's registration. If a json connector
226 226 file is given, then likely no further configuration is necessary.
227 227 [Default: use profile]
228 228 profile : bytes
229 229 The name of the Cluster profile to be used to find connector information.
230 230 If run from an IPython application, the default profile will be the same
231 231 as the running application, otherwise it will be 'default'.
232 232 cluster_id : str
233 233 String id to added to runtime files, to prevent name collisions when using
234 234 multiple clusters with a single profile simultaneously.
235 235 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
236 236 Since this is text inserted into filenames, typical recommendations apply:
237 237 Simple character strings are ideal, and spaces are not recommended (but
238 238 should generally work)
239 239 context : zmq.Context
240 240 Pass an existing zmq.Context instance, otherwise the client will create its own.
241 241 debug : bool
242 242 flag for lots of message printing for debug purposes
243 243 timeout : int/float
244 244 time (in seconds) to wait for connection replies from the Hub
245 245 [Default: 10]
246 246
247 247 #-------------- session related args ----------------
248 248
249 249 config : Config object
250 250 If specified, this will be relayed to the Session for configuration
251 251 username : str
252 252 set username for the session object
253 253
254 254 #-------------- ssh related args ----------------
255 255 # These are args for configuring the ssh tunnel to be used
256 256 # credentials are used to forward connections over ssh to the Controller
257 257 # Note that the ip given in `addr` needs to be relative to sshserver
258 258 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
259 259 # and set sshserver as the same machine the Controller is on. However,
260 260 # the only requirement is that sshserver is able to see the Controller
261 261 # (i.e. is within the same trusted network).
262 262
263 263 sshserver : str
264 264 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
265 265 If keyfile or password is specified, and this is not, it will default to
266 266 the ip given in addr.
267 267 sshkey : str; path to ssh private key file
268 268 This specifies a key to be used in ssh login, default None.
269 269 Regular default ssh keys will be used without specifying this argument.
270 270 password : str
271 271 Your ssh password to sshserver. Note that if this is left None,
272 272 you will be prompted for it if passwordless key based login is unavailable.
273 273 paramiko : bool
274 274 flag for whether to use paramiko instead of shell ssh for tunneling.
275 275 [default: True on win32, False else]
276 276
277 277
278 278 Attributes
279 279 ----------
280 280
281 281 ids : list of int engine IDs
282 282 requesting the ids attribute always synchronizes
283 283 the registration state. To request ids without synchronization,
284 284 use semi-private _ids attributes.
285 285
286 286 history : list of msg_ids
287 287 a list of msg_ids, keeping track of all the execution
288 288 messages you have submitted in order.
289 289
290 290 outstanding : set of msg_ids
291 291 a set of msg_ids that have been submitted, but whose
292 292 results have not yet been received.
293 293
294 294 results : dict
295 295 a dict of all our results, keyed by msg_id
296 296
297 297 block : bool
298 298 determines default behavior when block not specified
299 299 in execution methods
300 300
301 301 Methods
302 302 -------
303 303
304 304 spin
305 305 flushes incoming results and registration state changes
306 306 control methods spin, and requesting `ids` also ensures up to date
307 307
308 308 wait
309 309 wait on one or more msg_ids
310 310
311 311 execution methods
312 312 apply
313 313 legacy: execute, run
314 314
315 315 data movement
316 316 push, pull, scatter, gather
317 317
318 318 query methods
319 319 queue_status, get_result, purge, result_status
320 320
321 321 control methods
322 322 abort, shutdown
323 323
324 324 """
325 325
326 326
327 327 block = Bool(False)
328 328 outstanding = Set()
329 329 results = Instance('collections.defaultdict', (dict,))
330 330 metadata = Instance('collections.defaultdict', (Metadata,))
331 331 history = List()
332 332 debug = Bool(False)
333 333 _spin_thread = Any()
334 334 _stop_spinning = Any()
335 335
336 336 profile=Unicode()
337 337 def _profile_default(self):
338 338 if BaseIPythonApplication.initialized():
339 339 # an IPython app *might* be running, try to get its profile
340 340 try:
341 341 return BaseIPythonApplication.instance().profile
342 342 except (AttributeError, MultipleInstanceError):
343 343 # could be a *different* subclass of config.Application,
344 344 # which would raise one of these two errors.
345 345 return u'default'
346 346 else:
347 347 return u'default'
348 348
349 349
350 350 _outstanding_dict = Instance('collections.defaultdict', (set,))
351 351 _ids = List()
352 352 _connected=Bool(False)
353 353 _ssh=Bool(False)
354 354 _context = Instance('zmq.Context')
355 355 _config = Dict()
356 356 _engines=Instance(util.ReverseDict, (), {})
357 357 # _hub_socket=Instance('zmq.Socket')
358 358 _query_socket=Instance('zmq.Socket')
359 359 _control_socket=Instance('zmq.Socket')
360 360 _iopub_socket=Instance('zmq.Socket')
361 361 _notification_socket=Instance('zmq.Socket')
362 362 _mux_socket=Instance('zmq.Socket')
363 363 _task_socket=Instance('zmq.Socket')
364 364 _task_scheme=Unicode()
365 365 _closed = False
366 366 _ignored_control_replies=Integer(0)
367 367 _ignored_hub_replies=Integer(0)
368 368
369 369 def __new__(self, *args, **kw):
370 370 # don't raise on positional args
371 371 return HasTraits.__new__(self, **kw)
372 372
373 373 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
374 374 context=None, debug=False,
375 375 sshserver=None, sshkey=None, password=None, paramiko=None,
376 376 timeout=10, cluster_id=None, **extra_args
377 377 ):
378 378 if profile:
379 379 super(Client, self).__init__(debug=debug, profile=profile)
380 380 else:
381 381 super(Client, self).__init__(debug=debug)
382 382 if context is None:
383 383 context = zmq.Context.instance()
384 384 self._context = context
385 385 self._stop_spinning = Event()
386 386
387 387 if 'url_or_file' in extra_args:
388 388 url_file = extra_args['url_or_file']
389 389 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
390 390
391 391 if url_file and util.is_url(url_file):
392 392 raise ValueError("single urls cannot be specified, url-files must be used.")
393 393
394 394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395 395
396 396 if self._cd is not None:
397 397 if url_file is None:
398 398 if not cluster_id:
399 399 client_json = 'ipcontroller-client.json'
400 400 else:
401 401 client_json = 'ipcontroller-%s-client.json' % cluster_id
402 402 url_file = pjoin(self._cd.security_dir, client_json)
403 403 if url_file is None:
404 404 raise ValueError(
405 405 "I can't find enough information to connect to a hub!"
406 406 " Please specify at least one of url_file or profile."
407 407 )
408 408
409 409 with open(url_file) as f:
410 410 cfg = json.load(f)
411 411
412 412 self._task_scheme = cfg['task_scheme']
413 413
414 414 # sync defaults from args, json:
415 415 if sshserver:
416 416 cfg['ssh'] = sshserver
417 417
418 418 location = cfg.setdefault('location', None)
419 419
420 420 proto,addr = cfg['interface'].split('://')
421 421 addr = util.disambiguate_ip_address(addr, location)
422 422 cfg['interface'] = "%s://%s" % (proto, addr)
423 423
424 424 # turn interface,port into full urls:
425 425 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
426 426 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
427 427
428 428 url = cfg['registration']
429 429
430 430 if location is not None and addr == LOCALHOST:
431 431 # location specified, and connection is expected to be local
432 432 if location not in LOCAL_IPS and not sshserver:
433 433 # load ssh from JSON *only* if the controller is not on
434 434 # this machine
435 435 sshserver=cfg['ssh']
436 436 if location not in LOCAL_IPS and not sshserver:
437 437 # warn if no ssh specified, but SSH is probably needed
438 438 # This is only a warning, because the most likely cause
439 439 # is a local Controller on a laptop whose IP is dynamic
440 440 warnings.warn("""
441 441 Controller appears to be listening on localhost, but not on this machine.
442 442 If this is true, you should specify Client(...,sshserver='you@%s')
443 443 or instruct your controller to listen on an external IP."""%location,
444 444 RuntimeWarning)
445 445 elif not sshserver:
446 446 # otherwise sync with cfg
447 447 sshserver = cfg['ssh']
448 448
449 449 self._config = cfg
450 450
451 451 self._ssh = bool(sshserver or sshkey or password)
452 452 if self._ssh and sshserver is None:
453 453 # default to ssh via localhost
454 454 sshserver = addr
455 455 if self._ssh and password is None:
456 456 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
457 457 password=False
458 458 else:
459 459 password = getpass("SSH Password for %s: "%sshserver)
460 460 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
461 461
462 462 # configure and construct the session
463 463 extra_args['packer'] = cfg['pack']
464 464 extra_args['unpacker'] = cfg['unpack']
465 465 extra_args['key'] = cast_bytes(cfg['exec_key'])
466 466
467 467 self.session = Session(**extra_args)
468 468
469 469 self._query_socket = self._context.socket(zmq.DEALER)
470 470
471 471 if self._ssh:
472 472 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
473 473 else:
474 474 self._query_socket.connect(cfg['registration'])
475 475
476 476 self.session.debug = self.debug
477 477
478 478 self._notification_handlers = {'registration_notification' : self._register_engine,
479 479 'unregistration_notification' : self._unregister_engine,
480 480 'shutdown_notification' : lambda msg: self.close(),
481 481 }
482 482 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
483 483 'apply_reply' : self._handle_apply_reply}
484 484 self._connect(sshserver, ssh_kwargs, timeout)
485 485
486 486 # last step: setup magics, if we are in IPython:
487 487
488 488 try:
489 489 ip = get_ipython()
490 490 except NameError:
491 491 return
492 492 else:
493 493 if 'px' not in ip.magics_manager.magics:
494 494 # in IPython but we are the first Client.
495 495 # activate a default view for parallel magics.
496 496 self.activate()
497 497
498 498 def __del__(self):
499 499 """cleanup sockets, but _not_ context."""
500 500 self.close()
501 501
502 502 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
503 503 if ipython_dir is None:
504 504 ipython_dir = get_ipython_dir()
505 505 if profile_dir is not None:
506 506 try:
507 507 self._cd = ProfileDir.find_profile_dir(profile_dir)
508 508 return
509 509 except ProfileDirError:
510 510 pass
511 511 elif profile is not None:
512 512 try:
513 513 self._cd = ProfileDir.find_profile_dir_by_name(
514 514 ipython_dir, profile)
515 515 return
516 516 except ProfileDirError:
517 517 pass
518 518 self._cd = None
519 519
520 520 def _update_engines(self, engines):
521 521 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
522 522 for k,v in engines.iteritems():
523 523 eid = int(k)
524 524 if eid not in self._engines:
525 525 self._ids.append(eid)
526 526 self._engines[eid] = v
527 527 self._ids = sorted(self._ids)
528 528 if sorted(self._engines.keys()) != range(len(self._engines)) and \
529 529 self._task_scheme == 'pure' and self._task_socket:
530 530 self._stop_scheduling_tasks()
531 531
532 532 def _stop_scheduling_tasks(self):
533 533 """Stop scheduling tasks because an engine has been unregistered
534 534 from a pure ZMQ scheduler.
535 535 """
536 536 self._task_socket.close()
537 537 self._task_socket = None
538 538 msg = "An engine has been unregistered, and we are using pure " +\
539 539 "ZMQ task scheduling. Task farming will be disabled."
540 540 if self.outstanding:
541 541 msg += " If you were running tasks when this happened, " +\
542 542 "some `outstanding` msg_ids may never resolve."
543 543 warnings.warn(msg, RuntimeWarning)
544 544
545 545 def _build_targets(self, targets):
546 546 """Turn valid target IDs or 'all' into two lists:
547 547 (int_ids, uuids).
548 548 """
549 549 if not self._ids:
550 550 # flush notification socket if no engines yet, just in case
551 551 if not self.ids:
552 552 raise error.NoEnginesRegistered("Can't build targets without any engines")
553 553
554 554 if targets is None:
555 555 targets = self._ids
556 556 elif isinstance(targets, basestring):
557 557 if targets.lower() == 'all':
558 558 targets = self._ids
559 559 else:
560 560 raise TypeError("%r not valid str target, must be 'all'"%(targets))
561 561 elif isinstance(targets, int):
562 562 if targets < 0:
563 563 targets = self.ids[targets]
564 564 if targets not in self._ids:
565 565 raise IndexError("No such engine: %i"%targets)
566 566 targets = [targets]
567 567
568 568 if isinstance(targets, slice):
569 569 indices = range(len(self._ids))[targets]
570 570 ids = self.ids
571 571 targets = [ ids[i] for i in indices ]
572 572
573 573 if not isinstance(targets, (tuple, list, xrange)):
574 574 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
575 575
576 576 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
577 577
578 578 def _connect(self, sshserver, ssh_kwargs, timeout):
579 579 """setup all our socket connections to the cluster. This is called from
580 580 __init__."""
581 581
582 582 # Maybe allow reconnecting?
583 583 if self._connected:
584 584 return
585 585 self._connected=True
586 586
587 587 def connect_socket(s, url):
588 588 # url = util.disambiguate_url(url, self._config['location'])
589 589 if self._ssh:
590 590 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
591 591 else:
592 592 return s.connect(url)
593 593
594 594 self.session.send(self._query_socket, 'connection_request')
595 595 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
596 596 poller = zmq.Poller()
597 597 poller.register(self._query_socket, zmq.POLLIN)
598 598 # poll expects milliseconds, timeout is seconds
599 599 evts = poller.poll(timeout*1000)
600 600 if not evts:
601 601 raise error.TimeoutError("Hub connection request timed out")
602 602 idents,msg = self.session.recv(self._query_socket,mode=0)
603 603 if self.debug:
604 604 pprint(msg)
605 605 content = msg['content']
606 606 # self._config['registration'] = dict(content)
607 607 cfg = self._config
608 608 if content['status'] == 'ok':
609 609 self._mux_socket = self._context.socket(zmq.DEALER)
610 610 connect_socket(self._mux_socket, cfg['mux'])
611 611
612 612 self._task_socket = self._context.socket(zmq.DEALER)
613 613 connect_socket(self._task_socket, cfg['task'])
614 614
615 615 self._notification_socket = self._context.socket(zmq.SUB)
616 616 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
617 617 connect_socket(self._notification_socket, cfg['notification'])
618 618
619 619 self._control_socket = self._context.socket(zmq.DEALER)
620 620 connect_socket(self._control_socket, cfg['control'])
621 621
622 622 self._iopub_socket = self._context.socket(zmq.SUB)
623 623 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
624 624 connect_socket(self._iopub_socket, cfg['iopub'])
625 625
626 626 self._update_engines(dict(content['engines']))
627 627 else:
628 628 self._connected = False
629 629 raise Exception("Failed to connect!")
630 630
631 631 #--------------------------------------------------------------------------
632 632 # handlers and callbacks for incoming messages
633 633 #--------------------------------------------------------------------------
634 634
635 635 def _unwrap_exception(self, content):
636 636 """unwrap exception, and remap engine_id to int."""
637 637 e = error.unwrap_exception(content)
638 638 # print e.traceback
639 639 if e.engine_info:
640 640 e_uuid = e.engine_info['engine_uuid']
641 641 eid = self._engines[e_uuid]
642 642 e.engine_info['engine_id'] = eid
643 643 return e
644 644
645 645 def _extract_metadata(self, msg):
646 646 header = msg['header']
647 647 parent = msg['parent_header']
648 648 msg_meta = msg['metadata']
649 649 content = msg['content']
650 650 md = {'msg_id' : parent['msg_id'],
651 651 'received' : datetime.now(),
652 652 'engine_uuid' : msg_meta.get('engine', None),
653 653 'follow' : msg_meta.get('follow', []),
654 654 'after' : msg_meta.get('after', []),
655 655 'status' : content['status'],
656 656 }
657 657
658 658 if md['engine_uuid'] is not None:
659 659 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
660 660
661 661 if 'date' in parent:
662 662 md['submitted'] = parent['date']
663 663 if 'started' in msg_meta:
664 664 md['started'] = msg_meta['started']
665 665 if 'date' in header:
666 666 md['completed'] = header['date']
667 667 return md
668 668
669 669 def _register_engine(self, msg):
670 670 """Register a new engine, and update our connection info."""
671 671 content = msg['content']
672 672 eid = content['id']
673 673 d = {eid : content['uuid']}
674 674 self._update_engines(d)
675 675
676 676 def _unregister_engine(self, msg):
677 677 """Unregister an engine that has died."""
678 678 content = msg['content']
679 679 eid = int(content['id'])
680 680 if eid in self._ids:
681 681 self._ids.remove(eid)
682 682 uuid = self._engines.pop(eid)
683 683
684 684 self._handle_stranded_msgs(eid, uuid)
685 685
686 686 if self._task_socket and self._task_scheme == 'pure':
687 687 self._stop_scheduling_tasks()
688 688
689 689 def _handle_stranded_msgs(self, eid, uuid):
690 690 """Handle messages known to be on an engine when the engine unregisters.
691 691
692 692 It is possible that this will fire prematurely - that is, an engine will
693 693 go down after completing a result, and the client will be notified
694 694 of the unregistration and later receive the successful result.
695 695 """
696 696
697 697 outstanding = self._outstanding_dict[uuid]
698 698
699 699 for msg_id in list(outstanding):
700 700 if msg_id in self.results:
701 701 # we already
702 702 continue
703 703 try:
704 704 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
705 705 except:
706 706 content = error.wrap_exception()
707 707 # build a fake message:
708 708 msg = self.session.msg('apply_reply', content=content)
709 709 msg['parent_header']['msg_id'] = msg_id
710 710 msg['metadata']['engine'] = uuid
711 711 self._handle_apply_reply(msg)
712 712
713 713 def _handle_execute_reply(self, msg):
714 714 """Save the reply to an execute_request into our results.
715 715
716 716 execute messages are never actually used. apply is used instead.
717 717 """
718 718
719 719 parent = msg['parent_header']
720 720 msg_id = parent['msg_id']
721 721 if msg_id not in self.outstanding:
722 722 if msg_id in self.history:
723 723 print ("got stale result: %s"%msg_id)
724 724 else:
725 725 print ("got unknown result: %s"%msg_id)
726 726 else:
727 727 self.outstanding.remove(msg_id)
728 728
729 729 content = msg['content']
730 730 header = msg['header']
731 731
732 732 # construct metadata:
733 733 md = self.metadata[msg_id]
734 734 md.update(self._extract_metadata(msg))
735 735 # is this redundant?
736 736 self.metadata[msg_id] = md
737 737
738 738 e_outstanding = self._outstanding_dict[md['engine_uuid']]
739 739 if msg_id in e_outstanding:
740 740 e_outstanding.remove(msg_id)
741 741
742 742 # construct result:
743 743 if content['status'] == 'ok':
744 744 self.results[msg_id] = ExecuteReply(msg_id, content, md)
745 745 elif content['status'] == 'aborted':
746 746 self.results[msg_id] = error.TaskAborted(msg_id)
747 747 elif content['status'] == 'resubmitted':
748 748 # TODO: handle resubmission
749 749 pass
750 750 else:
751 751 self.results[msg_id] = self._unwrap_exception(content)
752 752
753 753 def _handle_apply_reply(self, msg):
754 754 """Save the reply to an apply_request into our results."""
755 755 parent = msg['parent_header']
756 756 msg_id = parent['msg_id']
757 757 if msg_id not in self.outstanding:
758 758 if msg_id in self.history:
759 759 print ("got stale result: %s"%msg_id)
760 760 print self.results[msg_id]
761 761 print msg
762 762 else:
763 763 print ("got unknown result: %s"%msg_id)
764 764 else:
765 765 self.outstanding.remove(msg_id)
766 766 content = msg['content']
767 767 header = msg['header']
768 768
769 769 # construct metadata:
770 770 md = self.metadata[msg_id]
771 771 md.update(self._extract_metadata(msg))
772 772 # is this redundant?
773 773 self.metadata[msg_id] = md
774 774
775 775 e_outstanding = self._outstanding_dict[md['engine_uuid']]
776 776 if msg_id in e_outstanding:
777 777 e_outstanding.remove(msg_id)
778 778
779 779 # construct result:
780 780 if content['status'] == 'ok':
781 781 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
782 782 elif content['status'] == 'aborted':
783 783 self.results[msg_id] = error.TaskAborted(msg_id)
784 784 elif content['status'] == 'resubmitted':
785 785 # TODO: handle resubmission
786 786 pass
787 787 else:
788 788 self.results[msg_id] = self._unwrap_exception(content)
789 789
790 790 def _flush_notifications(self):
791 791 """Flush notifications of engine registrations waiting
792 792 in ZMQ queue."""
793 793 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794 794 while msg is not None:
795 795 if self.debug:
796 796 pprint(msg)
797 797 msg_type = msg['header']['msg_type']
798 798 handler = self._notification_handlers.get(msg_type, None)
799 799 if handler is None:
800 800 raise Exception("Unhandled message type: %s" % msg_type)
801 801 else:
802 802 handler(msg)
803 803 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
804 804
805 805 def _flush_results(self, sock):
806 806 """Flush task or queue results waiting in ZMQ queue."""
807 807 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808 808 while msg is not None:
809 809 if self.debug:
810 810 pprint(msg)
811 811 msg_type = msg['header']['msg_type']
812 812 handler = self._queue_handlers.get(msg_type, None)
813 813 if handler is None:
814 814 raise Exception("Unhandled message type: %s" % msg_type)
815 815 else:
816 816 handler(msg)
817 817 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
818 818
819 819 def _flush_control(self, sock):
820 820 """Flush replies from the control channel waiting
821 821 in the ZMQ queue.
822 822
823 823 Currently: ignore them."""
824 824 if self._ignored_control_replies <= 0:
825 825 return
826 826 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
827 827 while msg is not None:
828 828 self._ignored_control_replies -= 1
829 829 if self.debug:
830 830 pprint(msg)
831 831 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
832 832
833 833 def _flush_ignored_control(self):
834 834 """flush ignored control replies"""
835 835 while self._ignored_control_replies > 0:
836 836 self.session.recv(self._control_socket)
837 837 self._ignored_control_replies -= 1
838 838
839 839 def _flush_ignored_hub_replies(self):
840 840 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
841 841 while msg is not None:
842 842 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
843 843
844 844 def _flush_iopub(self, sock):
845 845 """Flush replies from the iopub channel waiting
846 846 in the ZMQ queue.
847 847 """
848 848 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
849 849 while msg is not None:
850 850 if self.debug:
851 851 pprint(msg)
852 852 parent = msg['parent_header']
853 853 # ignore IOPub messages with no parent.
854 854 # Caused by print statements or warnings from before the first execution.
855 855 if not parent:
856 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
856 857 continue
857 858 msg_id = parent['msg_id']
858 859 content = msg['content']
859 860 header = msg['header']
860 861 msg_type = msg['header']['msg_type']
861 862
862 863 # init metadata:
863 864 md = self.metadata[msg_id]
864 865
865 866 if msg_type == 'stream':
866 867 name = content['name']
867 868 s = md[name] or ''
868 869 md[name] = s + content['data']
869 870 elif msg_type == 'pyerr':
870 871 md.update({'pyerr' : self._unwrap_exception(content)})
871 872 elif msg_type == 'pyin':
872 873 md.update({'pyin' : content['code']})
873 874 elif msg_type == 'display_data':
874 875 md['outputs'].append(content)
875 876 elif msg_type == 'pyout':
876 877 md['pyout'] = content
877 878 elif msg_type == 'data_message':
878 879 data, remainder = serialize.unserialize_object(msg['buffers'])
879 880 md['data'].update(data)
880 881 elif msg_type == 'status':
881 882 # idle message comes after all outputs
882 883 if content['execution_state'] == 'idle':
883 884 md['outputs_ready'] = True
884 885 else:
885 886 # unhandled msg_type (status, etc.)
886 887 pass
887 888
888 889 # reduntant?
889 890 self.metadata[msg_id] = md
890 891
891 892 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
892 893
893 894 #--------------------------------------------------------------------------
894 895 # len, getitem
895 896 #--------------------------------------------------------------------------
896 897
897 898 def __len__(self):
898 899 """len(client) returns # of engines."""
899 900 return len(self.ids)
900 901
901 902 def __getitem__(self, key):
902 903 """index access returns DirectView multiplexer objects
903 904
904 905 Must be int, slice, or list/tuple/xrange of ints"""
905 906 if not isinstance(key, (int, slice, tuple, list, xrange)):
906 907 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
907 908 else:
908 909 return self.direct_view(key)
909 910
910 911 #--------------------------------------------------------------------------
911 912 # Begin public methods
912 913 #--------------------------------------------------------------------------
913 914
914 915 @property
915 916 def ids(self):
916 917 """Always up-to-date ids property."""
917 918 self._flush_notifications()
918 919 # always copy:
919 920 return list(self._ids)
920 921
921 922 def activate(self, targets='all', suffix=''):
922 923 """Create a DirectView and register it with IPython magics
923 924
924 925 Defines the magics `%px, %autopx, %pxresult, %%px`
925 926
926 927 Parameters
927 928 ----------
928 929
929 930 targets: int, list of ints, or 'all'
930 931 The engines on which the view's magics will run
931 932 suffix: str [default: '']
932 933 The suffix, if any, for the magics. This allows you to have
933 934 multiple views associated with parallel magics at the same time.
934 935
935 936 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
936 937 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
937 938 on engine 0.
938 939 """
939 940 view = self.direct_view(targets)
940 941 view.block = True
941 942 view.activate(suffix)
942 943 return view
943 944
944 945 def close(self):
945 946 if self._closed:
946 947 return
947 948 self.stop_spin_thread()
948 949 snames = filter(lambda n: n.endswith('socket'), dir(self))
949 950 for socket in map(lambda name: getattr(self, name), snames):
950 951 if isinstance(socket, zmq.Socket) and not socket.closed:
951 952 socket.close()
952 953 self._closed = True
953 954
954 955 def _spin_every(self, interval=1):
955 956 """target func for use in spin_thread"""
956 957 while True:
957 958 if self._stop_spinning.is_set():
958 959 return
959 960 time.sleep(interval)
960 961 self.spin()
961 962
962 963 def spin_thread(self, interval=1):
963 964 """call Client.spin() in a background thread on some regular interval
964 965
965 966 This helps ensure that messages don't pile up too much in the zmq queue
966 967 while you are working on other things, or just leaving an idle terminal.
967 968
968 969 It also helps limit potential padding of the `received` timestamp
969 970 on AsyncResult objects, used for timings.
970 971
971 972 Parameters
972 973 ----------
973 974
974 975 interval : float, optional
975 976 The interval on which to spin the client in the background thread
976 977 (simply passed to time.sleep).
977 978
978 979 Notes
979 980 -----
980 981
981 982 For precision timing, you may want to use this method to put a bound
982 983 on the jitter (in seconds) in `received` timestamps used
983 984 in AsyncResult.wall_time.
984 985
985 986 """
986 987 if self._spin_thread is not None:
987 988 self.stop_spin_thread()
988 989 self._stop_spinning.clear()
989 990 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
990 991 self._spin_thread.daemon = True
991 992 self._spin_thread.start()
992 993
993 994 def stop_spin_thread(self):
994 995 """stop background spin_thread, if any"""
995 996 if self._spin_thread is not None:
996 997 self._stop_spinning.set()
997 998 self._spin_thread.join()
998 999 self._spin_thread = None
999 1000
1000 1001 def spin(self):
1001 1002 """Flush any registration notifications and execution results
1002 1003 waiting in the ZMQ queue.
1003 1004 """
1004 1005 if self._notification_socket:
1005 1006 self._flush_notifications()
1006 1007 if self._iopub_socket:
1007 1008 self._flush_iopub(self._iopub_socket)
1008 1009 if self._mux_socket:
1009 1010 self._flush_results(self._mux_socket)
1010 1011 if self._task_socket:
1011 1012 self._flush_results(self._task_socket)
1012 1013 if self._control_socket:
1013 1014 self._flush_control(self._control_socket)
1014 1015 if self._query_socket:
1015 1016 self._flush_ignored_hub_replies()
1016 1017
1017 1018 def wait(self, jobs=None, timeout=-1):
1018 1019 """waits on one or more `jobs`, for up to `timeout` seconds.
1019 1020
1020 1021 Parameters
1021 1022 ----------
1022 1023
1023 1024 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1024 1025 ints are indices to self.history
1025 1026 strs are msg_ids
1026 1027 default: wait on all outstanding messages
1027 1028 timeout : float
1028 1029 a time in seconds, after which to give up.
1029 1030 default is -1, which means no timeout
1030 1031
1031 1032 Returns
1032 1033 -------
1033 1034
1034 1035 True : when all msg_ids are done
1035 1036 False : timeout reached, some msg_ids still outstanding
1036 1037 """
1037 1038 tic = time.time()
1038 1039 if jobs is None:
1039 1040 theids = self.outstanding
1040 1041 else:
1041 1042 if isinstance(jobs, (int, basestring, AsyncResult)):
1042 1043 jobs = [jobs]
1043 1044 theids = set()
1044 1045 for job in jobs:
1045 1046 if isinstance(job, int):
1046 1047 # index access
1047 1048 job = self.history[job]
1048 1049 elif isinstance(job, AsyncResult):
1049 1050 map(theids.add, job.msg_ids)
1050 1051 continue
1051 1052 theids.add(job)
1052 1053 if not theids.intersection(self.outstanding):
1053 1054 return True
1054 1055 self.spin()
1055 1056 while theids.intersection(self.outstanding):
1056 1057 if timeout >= 0 and ( time.time()-tic ) > timeout:
1057 1058 break
1058 1059 time.sleep(1e-3)
1059 1060 self.spin()
1060 1061 return len(theids.intersection(self.outstanding)) == 0
1061 1062
1062 1063 #--------------------------------------------------------------------------
1063 1064 # Control methods
1064 1065 #--------------------------------------------------------------------------
1065 1066
1066 1067 @spin_first
1067 1068 def clear(self, targets=None, block=None):
1068 1069 """Clear the namespace in target(s)."""
1069 1070 block = self.block if block is None else block
1070 1071 targets = self._build_targets(targets)[0]
1071 1072 for t in targets:
1072 1073 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1073 1074 error = False
1074 1075 if block:
1075 1076 self._flush_ignored_control()
1076 1077 for i in range(len(targets)):
1077 1078 idents,msg = self.session.recv(self._control_socket,0)
1078 1079 if self.debug:
1079 1080 pprint(msg)
1080 1081 if msg['content']['status'] != 'ok':
1081 1082 error = self._unwrap_exception(msg['content'])
1082 1083 else:
1083 1084 self._ignored_control_replies += len(targets)
1084 1085 if error:
1085 1086 raise error
1086 1087
1087 1088
1088 1089 @spin_first
1089 1090 def abort(self, jobs=None, targets=None, block=None):
1090 1091 """Abort specific jobs from the execution queues of target(s).
1091 1092
1092 1093 This is a mechanism to prevent jobs that have already been submitted
1093 1094 from executing.
1094 1095
1095 1096 Parameters
1096 1097 ----------
1097 1098
1098 1099 jobs : msg_id, list of msg_ids, or AsyncResult
1099 1100 The jobs to be aborted
1100 1101
1101 1102 If unspecified/None: abort all outstanding jobs.
1102 1103
1103 1104 """
1104 1105 block = self.block if block is None else block
1105 1106 jobs = jobs if jobs is not None else list(self.outstanding)
1106 1107 targets = self._build_targets(targets)[0]
1107 1108
1108 1109 msg_ids = []
1109 1110 if isinstance(jobs, (basestring,AsyncResult)):
1110 1111 jobs = [jobs]
1111 1112 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1112 1113 if bad_ids:
1113 1114 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1114 1115 for j in jobs:
1115 1116 if isinstance(j, AsyncResult):
1116 1117 msg_ids.extend(j.msg_ids)
1117 1118 else:
1118 1119 msg_ids.append(j)
1119 1120 content = dict(msg_ids=msg_ids)
1120 1121 for t in targets:
1121 1122 self.session.send(self._control_socket, 'abort_request',
1122 1123 content=content, ident=t)
1123 1124 error = False
1124 1125 if block:
1125 1126 self._flush_ignored_control()
1126 1127 for i in range(len(targets)):
1127 1128 idents,msg = self.session.recv(self._control_socket,0)
1128 1129 if self.debug:
1129 1130 pprint(msg)
1130 1131 if msg['content']['status'] != 'ok':
1131 1132 error = self._unwrap_exception(msg['content'])
1132 1133 else:
1133 1134 self._ignored_control_replies += len(targets)
1134 1135 if error:
1135 1136 raise error
1136 1137
1137 1138 @spin_first
1138 1139 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1139 1140 """Terminates one or more engine processes, optionally including the hub.
1140 1141
1141 1142 Parameters
1142 1143 ----------
1143 1144
1144 1145 targets: list of ints or 'all' [default: all]
1145 1146 Which engines to shutdown.
1146 1147 hub: bool [default: False]
1147 1148 Whether to include the Hub. hub=True implies targets='all'.
1148 1149 block: bool [default: self.block]
1149 1150 Whether to wait for clean shutdown replies or not.
1150 1151 restart: bool [default: False]
1151 1152 NOT IMPLEMENTED
1152 1153 whether to restart engines after shutting them down.
1153 1154 """
1154 1155 from IPython.parallel.error import NoEnginesRegistered
1155 1156 if restart:
1156 1157 raise NotImplementedError("Engine restart is not yet implemented")
1157 1158
1158 1159 block = self.block if block is None else block
1159 1160 if hub:
1160 1161 targets = 'all'
1161 1162 try:
1162 1163 targets = self._build_targets(targets)[0]
1163 1164 except NoEnginesRegistered:
1164 1165 targets = []
1165 1166 for t in targets:
1166 1167 self.session.send(self._control_socket, 'shutdown_request',
1167 1168 content={'restart':restart},ident=t)
1168 1169 error = False
1169 1170 if block or hub:
1170 1171 self._flush_ignored_control()
1171 1172 for i in range(len(targets)):
1172 1173 idents,msg = self.session.recv(self._control_socket, 0)
1173 1174 if self.debug:
1174 1175 pprint(msg)
1175 1176 if msg['content']['status'] != 'ok':
1176 1177 error = self._unwrap_exception(msg['content'])
1177 1178 else:
1178 1179 self._ignored_control_replies += len(targets)
1179 1180
1180 1181 if hub:
1181 1182 time.sleep(0.25)
1182 1183 self.session.send(self._query_socket, 'shutdown_request')
1183 1184 idents,msg = self.session.recv(self._query_socket, 0)
1184 1185 if self.debug:
1185 1186 pprint(msg)
1186 1187 if msg['content']['status'] != 'ok':
1187 1188 error = self._unwrap_exception(msg['content'])
1188 1189
1189 1190 if error:
1190 1191 raise error
1191 1192
1192 1193 #--------------------------------------------------------------------------
1193 1194 # Execution related methods
1194 1195 #--------------------------------------------------------------------------
1195 1196
1196 1197 def _maybe_raise(self, result):
1197 1198 """wrapper for maybe raising an exception if apply failed."""
1198 1199 if isinstance(result, error.RemoteError):
1199 1200 raise result
1200 1201
1201 1202 return result
1202 1203
1203 1204 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1204 1205 ident=None):
1205 1206 """construct and send an apply message via a socket.
1206 1207
1207 1208 This is the principal method with which all engine execution is performed by views.
1208 1209 """
1209 1210
1210 1211 if self._closed:
1211 1212 raise RuntimeError("Client cannot be used after its sockets have been closed")
1212 1213
1213 1214 # defaults:
1214 1215 args = args if args is not None else []
1215 1216 kwargs = kwargs if kwargs is not None else {}
1216 1217 metadata = metadata if metadata is not None else {}
1217 1218
1218 1219 # validate arguments
1219 1220 if not callable(f) and not isinstance(f, Reference):
1220 1221 raise TypeError("f must be callable, not %s"%type(f))
1221 1222 if not isinstance(args, (tuple, list)):
1222 1223 raise TypeError("args must be tuple or list, not %s"%type(args))
1223 1224 if not isinstance(kwargs, dict):
1224 1225 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1225 1226 if not isinstance(metadata, dict):
1226 1227 raise TypeError("metadata must be dict, not %s"%type(metadata))
1227 1228
1228 1229 bufs = serialize.pack_apply_message(f, args, kwargs,
1229 1230 buffer_threshold=self.session.buffer_threshold,
1230 1231 item_threshold=self.session.item_threshold,
1231 1232 )
1232 1233
1233 1234 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1234 1235 metadata=metadata, track=track)
1235 1236
1236 1237 msg_id = msg['header']['msg_id']
1237 1238 self.outstanding.add(msg_id)
1238 1239 if ident:
1239 1240 # possibly routed to a specific engine
1240 1241 if isinstance(ident, list):
1241 1242 ident = ident[-1]
1242 1243 if ident in self._engines.values():
1243 1244 # save for later, in case of engine death
1244 1245 self._outstanding_dict[ident].add(msg_id)
1245 1246 self.history.append(msg_id)
1246 1247 self.metadata[msg_id]['submitted'] = datetime.now()
1247 1248
1248 1249 return msg
1249 1250
1250 1251 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1251 1252 """construct and send an execute request via a socket.
1252 1253
1253 1254 """
1254 1255
1255 1256 if self._closed:
1256 1257 raise RuntimeError("Client cannot be used after its sockets have been closed")
1257 1258
1258 1259 # defaults:
1259 1260 metadata = metadata if metadata is not None else {}
1260 1261
1261 1262 # validate arguments
1262 1263 if not isinstance(code, basestring):
1263 1264 raise TypeError("code must be text, not %s" % type(code))
1264 1265 if not isinstance(metadata, dict):
1265 1266 raise TypeError("metadata must be dict, not %s" % type(metadata))
1266 1267
1267 1268 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1268 1269
1269 1270
1270 1271 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1271 1272 metadata=metadata)
1272 1273
1273 1274 msg_id = msg['header']['msg_id']
1274 1275 self.outstanding.add(msg_id)
1275 1276 if ident:
1276 1277 # possibly routed to a specific engine
1277 1278 if isinstance(ident, list):
1278 1279 ident = ident[-1]
1279 1280 if ident in self._engines.values():
1280 1281 # save for later, in case of engine death
1281 1282 self._outstanding_dict[ident].add(msg_id)
1282 1283 self.history.append(msg_id)
1283 1284 self.metadata[msg_id]['submitted'] = datetime.now()
1284 1285
1285 1286 return msg
1286 1287
1287 1288 #--------------------------------------------------------------------------
1288 1289 # construct a View object
1289 1290 #--------------------------------------------------------------------------
1290 1291
1291 1292 def load_balanced_view(self, targets=None):
1292 1293 """construct a DirectView object.
1293 1294
1294 1295 If no arguments are specified, create a LoadBalancedView
1295 1296 using all engines.
1296 1297
1297 1298 Parameters
1298 1299 ----------
1299 1300
1300 1301 targets: list,slice,int,etc. [default: use all engines]
1301 1302 The subset of engines across which to load-balance
1302 1303 """
1303 1304 if targets == 'all':
1304 1305 targets = None
1305 1306 if targets is not None:
1306 1307 targets = self._build_targets(targets)[1]
1307 1308 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1308 1309
1309 1310 def direct_view(self, targets='all'):
1310 1311 """construct a DirectView object.
1311 1312
1312 1313 If no targets are specified, create a DirectView using all engines.
1313 1314
1314 1315 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1315 1316 evaluate the target engines at each execution, whereas rc[:] will connect to
1316 1317 all *current* engines, and that list will not change.
1317 1318
1318 1319 That is, 'all' will always use all engines, whereas rc[:] will not use
1319 1320 engines added after the DirectView is constructed.
1320 1321
1321 1322 Parameters
1322 1323 ----------
1323 1324
1324 1325 targets: list,slice,int,etc. [default: use all engines]
1325 1326 The engines to use for the View
1326 1327 """
1327 1328 single = isinstance(targets, int)
1328 1329 # allow 'all' to be lazily evaluated at each execution
1329 1330 if targets != 'all':
1330 1331 targets = self._build_targets(targets)[1]
1331 1332 if single:
1332 1333 targets = targets[0]
1333 1334 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1334 1335
1335 1336 #--------------------------------------------------------------------------
1336 1337 # Query methods
1337 1338 #--------------------------------------------------------------------------
1338 1339
1339 1340 @spin_first
1340 1341 def get_result(self, indices_or_msg_ids=None, block=None):
1341 1342 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1342 1343
1343 1344 If the client already has the results, no request to the Hub will be made.
1344 1345
1345 1346 This is a convenient way to construct AsyncResult objects, which are wrappers
1346 1347 that include metadata about execution, and allow for awaiting results that
1347 1348 were not submitted by this Client.
1348 1349
1349 1350 It can also be a convenient way to retrieve the metadata associated with
1350 1351 blocking execution, since it always retrieves
1351 1352
1352 1353 Examples
1353 1354 --------
1354 1355 ::
1355 1356
1356 1357 In [10]: r = client.apply()
1357 1358
1358 1359 Parameters
1359 1360 ----------
1360 1361
1361 1362 indices_or_msg_ids : integer history index, str msg_id, or list of either
1362 1363 The indices or msg_ids of indices to be retrieved
1363 1364
1364 1365 block : bool
1365 1366 Whether to wait for the result to be done
1366 1367
1367 1368 Returns
1368 1369 -------
1369 1370
1370 1371 AsyncResult
1371 1372 A single AsyncResult object will always be returned.
1372 1373
1373 1374 AsyncHubResult
1374 1375 A subclass of AsyncResult that retrieves results from the Hub
1375 1376
1376 1377 """
1377 1378 block = self.block if block is None else block
1378 1379 if indices_or_msg_ids is None:
1379 1380 indices_or_msg_ids = -1
1380 1381
1381 1382 if not isinstance(indices_or_msg_ids, (list,tuple)):
1382 1383 indices_or_msg_ids = [indices_or_msg_ids]
1383 1384
1384 1385 theids = []
1385 1386 for id in indices_or_msg_ids:
1386 1387 if isinstance(id, int):
1387 1388 id = self.history[id]
1388 1389 if not isinstance(id, basestring):
1389 1390 raise TypeError("indices must be str or int, not %r"%id)
1390 1391 theids.append(id)
1391 1392
1392 1393 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1393 1394 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1394 1395
1395 1396 if remote_ids:
1396 1397 ar = AsyncHubResult(self, msg_ids=theids)
1397 1398 else:
1398 1399 ar = AsyncResult(self, msg_ids=theids)
1399 1400
1400 1401 if block:
1401 1402 ar.wait()
1402 1403
1403 1404 return ar
1404 1405
1405 1406 @spin_first
1406 1407 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1407 1408 """Resubmit one or more tasks.
1408 1409
1409 1410 in-flight tasks may not be resubmitted.
1410 1411
1411 1412 Parameters
1412 1413 ----------
1413 1414
1414 1415 indices_or_msg_ids : integer history index, str msg_id, or list of either
1415 1416 The indices or msg_ids of indices to be retrieved
1416 1417
1417 1418 block : bool
1418 1419 Whether to wait for the result to be done
1419 1420
1420 1421 Returns
1421 1422 -------
1422 1423
1423 1424 AsyncHubResult
1424 1425 A subclass of AsyncResult that retrieves results from the Hub
1425 1426
1426 1427 """
1427 1428 block = self.block if block is None else block
1428 1429 if indices_or_msg_ids is None:
1429 1430 indices_or_msg_ids = -1
1430 1431
1431 1432 if not isinstance(indices_or_msg_ids, (list,tuple)):
1432 1433 indices_or_msg_ids = [indices_or_msg_ids]
1433 1434
1434 1435 theids = []
1435 1436 for id in indices_or_msg_ids:
1436 1437 if isinstance(id, int):
1437 1438 id = self.history[id]
1438 1439 if not isinstance(id, basestring):
1439 1440 raise TypeError("indices must be str or int, not %r"%id)
1440 1441 theids.append(id)
1441 1442
1442 1443 content = dict(msg_ids = theids)
1443 1444
1444 1445 self.session.send(self._query_socket, 'resubmit_request', content)
1445 1446
1446 1447 zmq.select([self._query_socket], [], [])
1447 1448 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1448 1449 if self.debug:
1449 1450 pprint(msg)
1450 1451 content = msg['content']
1451 1452 if content['status'] != 'ok':
1452 1453 raise self._unwrap_exception(content)
1453 1454 mapping = content['resubmitted']
1454 1455 new_ids = [ mapping[msg_id] for msg_id in theids ]
1455 1456
1456 1457 ar = AsyncHubResult(self, msg_ids=new_ids)
1457 1458
1458 1459 if block:
1459 1460 ar.wait()
1460 1461
1461 1462 return ar
1462 1463
1463 1464 @spin_first
1464 1465 def result_status(self, msg_ids, status_only=True):
1465 1466 """Check on the status of the result(s) of the apply request with `msg_ids`.
1466 1467
1467 1468 If status_only is False, then the actual results will be retrieved, else
1468 1469 only the status of the results will be checked.
1469 1470
1470 1471 Parameters
1471 1472 ----------
1472 1473
1473 1474 msg_ids : list of msg_ids
1474 1475 if int:
1475 1476 Passed as index to self.history for convenience.
1476 1477 status_only : bool (default: True)
1477 1478 if False:
1478 1479 Retrieve the actual results of completed tasks.
1479 1480
1480 1481 Returns
1481 1482 -------
1482 1483
1483 1484 results : dict
1484 1485 There will always be the keys 'pending' and 'completed', which will
1485 1486 be lists of msg_ids that are incomplete or complete. If `status_only`
1486 1487 is False, then completed results will be keyed by their `msg_id`.
1487 1488 """
1488 1489 if not isinstance(msg_ids, (list,tuple)):
1489 1490 msg_ids = [msg_ids]
1490 1491
1491 1492 theids = []
1492 1493 for msg_id in msg_ids:
1493 1494 if isinstance(msg_id, int):
1494 1495 msg_id = self.history[msg_id]
1495 1496 if not isinstance(msg_id, basestring):
1496 1497 raise TypeError("msg_ids must be str, not %r"%msg_id)
1497 1498 theids.append(msg_id)
1498 1499
1499 1500 completed = []
1500 1501 local_results = {}
1501 1502
1502 1503 # comment this block out to temporarily disable local shortcut:
1503 1504 for msg_id in theids:
1504 1505 if msg_id in self.results:
1505 1506 completed.append(msg_id)
1506 1507 local_results[msg_id] = self.results[msg_id]
1507 1508 theids.remove(msg_id)
1508 1509
1509 1510 if theids: # some not locally cached
1510 1511 content = dict(msg_ids=theids, status_only=status_only)
1511 1512 msg = self.session.send(self._query_socket, "result_request", content=content)
1512 1513 zmq.select([self._query_socket], [], [])
1513 1514 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1514 1515 if self.debug:
1515 1516 pprint(msg)
1516 1517 content = msg['content']
1517 1518 if content['status'] != 'ok':
1518 1519 raise self._unwrap_exception(content)
1519 1520 buffers = msg['buffers']
1520 1521 else:
1521 1522 content = dict(completed=[],pending=[])
1522 1523
1523 1524 content['completed'].extend(completed)
1524 1525
1525 1526 if status_only:
1526 1527 return content
1527 1528
1528 1529 failures = []
1529 1530 # load cached results into result:
1530 1531 content.update(local_results)
1531 1532
1532 1533 # update cache with results:
1533 1534 for msg_id in sorted(theids):
1534 1535 if msg_id in content['completed']:
1535 1536 rec = content[msg_id]
1536 1537 parent = rec['header']
1537 1538 header = rec['result_header']
1538 1539 rcontent = rec['result_content']
1539 1540 iodict = rec['io']
1540 1541 if isinstance(rcontent, str):
1541 1542 rcontent = self.session.unpack(rcontent)
1542 1543
1543 1544 md = self.metadata[msg_id]
1544 1545 md_msg = dict(
1545 1546 content=rcontent,
1546 1547 parent_header=parent,
1547 1548 header=header,
1548 1549 metadata=rec['result_metadata'],
1549 1550 )
1550 1551 md.update(self._extract_metadata(md_msg))
1551 1552 if rec.get('received'):
1552 1553 md['received'] = rec['received']
1553 1554 md.update(iodict)
1554 1555
1555 1556 if rcontent['status'] == 'ok':
1556 1557 if header['msg_type'] == 'apply_reply':
1557 1558 res,buffers = serialize.unserialize_object(buffers)
1558 1559 elif header['msg_type'] == 'execute_reply':
1559 1560 res = ExecuteReply(msg_id, rcontent, md)
1560 1561 else:
1561 1562 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1562 1563 else:
1563 1564 res = self._unwrap_exception(rcontent)
1564 1565 failures.append(res)
1565 1566
1566 1567 self.results[msg_id] = res
1567 1568 content[msg_id] = res
1568 1569
1569 1570 if len(theids) == 1 and failures:
1570 1571 raise failures[0]
1571 1572
1572 1573 error.collect_exceptions(failures, "result_status")
1573 1574 return content
1574 1575
1575 1576 @spin_first
1576 1577 def queue_status(self, targets='all', verbose=False):
1577 1578 """Fetch the status of engine queues.
1578 1579
1579 1580 Parameters
1580 1581 ----------
1581 1582
1582 1583 targets : int/str/list of ints/strs
1583 1584 the engines whose states are to be queried.
1584 1585 default : all
1585 1586 verbose : bool
1586 1587 Whether to return lengths only, or lists of ids for each element
1587 1588 """
1588 1589 if targets == 'all':
1589 1590 # allow 'all' to be evaluated on the engine
1590 1591 engine_ids = None
1591 1592 else:
1592 1593 engine_ids = self._build_targets(targets)[1]
1593 1594 content = dict(targets=engine_ids, verbose=verbose)
1594 1595 self.session.send(self._query_socket, "queue_request", content=content)
1595 1596 idents,msg = self.session.recv(self._query_socket, 0)
1596 1597 if self.debug:
1597 1598 pprint(msg)
1598 1599 content = msg['content']
1599 1600 status = content.pop('status')
1600 1601 if status != 'ok':
1601 1602 raise self._unwrap_exception(content)
1602 1603 content = rekey(content)
1603 1604 if isinstance(targets, int):
1604 1605 return content[targets]
1605 1606 else:
1606 1607 return content
1607 1608
1608 1609 def _build_msgids_from_target(self, targets=None):
1609 1610 """Build a list of msg_ids from the list of engine targets"""
1610 1611 if not targets: # needed as _build_targets otherwise uses all engines
1611 1612 return []
1612 1613 target_ids = self._build_targets(targets)[0]
1613 1614 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1614 1615
1615 1616 def _build_msgids_from_jobs(self, jobs=None):
1616 1617 """Build a list of msg_ids from "jobs" """
1617 1618 if not jobs:
1618 1619 return []
1619 1620 msg_ids = []
1620 1621 if isinstance(jobs, (basestring,AsyncResult)):
1621 1622 jobs = [jobs]
1622 1623 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1623 1624 if bad_ids:
1624 1625 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1625 1626 for j in jobs:
1626 1627 if isinstance(j, AsyncResult):
1627 1628 msg_ids.extend(j.msg_ids)
1628 1629 else:
1629 1630 msg_ids.append(j)
1630 1631 return msg_ids
1631 1632
1632 1633 def purge_local_results(self, jobs=[], targets=[]):
1633 1634 """Clears the client caches of results and frees such memory.
1634 1635
1635 1636 Individual results can be purged by msg_id, or the entire
1636 1637 history of specific targets can be purged.
1637 1638
1638 1639 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1639 1640
1640 1641 The client must have no outstanding tasks before purging the caches.
1641 1642 Raises `AssertionError` if there are still outstanding tasks.
1642 1643
1643 1644 After this call all `AsyncResults` are invalid and should be discarded.
1644 1645
1645 1646 If you must "reget" the results, you can still do so by using
1646 1647 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1647 1648 redownload the results from the hub if they are still available
1648 1649 (i.e `client.purge_hub_results(...)` has not been called.
1649 1650
1650 1651 Parameters
1651 1652 ----------
1652 1653
1653 1654 jobs : str or list of str or AsyncResult objects
1654 1655 the msg_ids whose results should be purged.
1655 1656 targets : int/str/list of ints/strs
1656 1657 The targets, by int_id, whose entire results are to be purged.
1657 1658
1658 1659 default : None
1659 1660 """
1660 1661 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1661 1662
1662 1663 if not targets and not jobs:
1663 1664 raise ValueError("Must specify at least one of `targets` and `jobs`")
1664 1665
1665 1666 if jobs == 'all':
1666 1667 self.results.clear()
1667 1668 self.metadata.clear()
1668 1669 return
1669 1670 else:
1670 1671 msg_ids = []
1671 1672 msg_ids.extend(self._build_msgids_from_target(targets))
1672 1673 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1673 1674 map(self.results.pop, msg_ids)
1674 1675 map(self.metadata.pop, msg_ids)
1675 1676
1676 1677
1677 1678 @spin_first
1678 1679 def purge_hub_results(self, jobs=[], targets=[]):
1679 1680 """Tell the Hub to forget results.
1680 1681
1681 1682 Individual results can be purged by msg_id, or the entire
1682 1683 history of specific targets can be purged.
1683 1684
1684 1685 Use `purge_results('all')` to scrub everything from the Hub's db.
1685 1686
1686 1687 Parameters
1687 1688 ----------
1688 1689
1689 1690 jobs : str or list of str or AsyncResult objects
1690 1691 the msg_ids whose results should be forgotten.
1691 1692 targets : int/str/list of ints/strs
1692 1693 The targets, by int_id, whose entire history is to be purged.
1693 1694
1694 1695 default : None
1695 1696 """
1696 1697 if not targets and not jobs:
1697 1698 raise ValueError("Must specify at least one of `targets` and `jobs`")
1698 1699 if targets:
1699 1700 targets = self._build_targets(targets)[1]
1700 1701
1701 1702 # construct msg_ids from jobs
1702 1703 if jobs == 'all':
1703 1704 msg_ids = jobs
1704 1705 else:
1705 1706 msg_ids = self._build_msgids_from_jobs(jobs)
1706 1707
1707 1708 content = dict(engine_ids=targets, msg_ids=msg_ids)
1708 1709 self.session.send(self._query_socket, "purge_request", content=content)
1709 1710 idents, msg = self.session.recv(self._query_socket, 0)
1710 1711 if self.debug:
1711 1712 pprint(msg)
1712 1713 content = msg['content']
1713 1714 if content['status'] != 'ok':
1714 1715 raise self._unwrap_exception(content)
1715 1716
1716 1717 def purge_results(self, jobs=[], targets=[]):
1717 1718 """Clears the cached results from both the hub and the local client
1718 1719
1719 1720 Individual results can be purged by msg_id, or the entire
1720 1721 history of specific targets can be purged.
1721 1722
1722 1723 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1723 1724 the Client's db.
1724 1725
1725 1726 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1726 1727 the same arguments.
1727 1728
1728 1729 Parameters
1729 1730 ----------
1730 1731
1731 1732 jobs : str or list of str or AsyncResult objects
1732 1733 the msg_ids whose results should be forgotten.
1733 1734 targets : int/str/list of ints/strs
1734 1735 The targets, by int_id, whose entire history is to be purged.
1735 1736
1736 1737 default : None
1737 1738 """
1738 1739 self.purge_local_results(jobs=jobs, targets=targets)
1739 1740 self.purge_hub_results(jobs=jobs, targets=targets)
1740 1741
1741 1742 def purge_everything(self):
1742 1743 """Clears all content from previous Tasks from both the hub and the local client
1743 1744
1744 1745 In addition to calling `purge_results("all")` it also deletes the history and
1745 1746 other bookkeeping lists.
1746 1747 """
1747 1748 self.purge_results("all")
1748 1749 self.history = []
1749 1750 self.session.digest_history.clear()
1750 1751
1751 1752 @spin_first
1752 1753 def hub_history(self):
1753 1754 """Get the Hub's history
1754 1755
1755 1756 Just like the Client, the Hub has a history, which is a list of msg_ids.
1756 1757 This will contain the history of all clients, and, depending on configuration,
1757 1758 may contain history across multiple cluster sessions.
1758 1759
1759 1760 Any msg_id returned here is a valid argument to `get_result`.
1760 1761
1761 1762 Returns
1762 1763 -------
1763 1764
1764 1765 msg_ids : list of strs
1765 1766 list of all msg_ids, ordered by task submission time.
1766 1767 """
1767 1768
1768 1769 self.session.send(self._query_socket, "history_request", content={})
1769 1770 idents, msg = self.session.recv(self._query_socket, 0)
1770 1771
1771 1772 if self.debug:
1772 1773 pprint(msg)
1773 1774 content = msg['content']
1774 1775 if content['status'] != 'ok':
1775 1776 raise self._unwrap_exception(content)
1776 1777 else:
1777 1778 return content['history']
1778 1779
1779 1780 @spin_first
1780 1781 def db_query(self, query, keys=None):
1781 1782 """Query the Hub's TaskRecord database
1782 1783
1783 1784 This will return a list of task record dicts that match `query`
1784 1785
1785 1786 Parameters
1786 1787 ----------
1787 1788
1788 1789 query : mongodb query dict
1789 1790 The search dict. See mongodb query docs for details.
1790 1791 keys : list of strs [optional]
1791 1792 The subset of keys to be returned. The default is to fetch everything but buffers.
1792 1793 'msg_id' will *always* be included.
1793 1794 """
1794 1795 if isinstance(keys, basestring):
1795 1796 keys = [keys]
1796 1797 content = dict(query=query, keys=keys)
1797 1798 self.session.send(self._query_socket, "db_request", content=content)
1798 1799 idents, msg = self.session.recv(self._query_socket, 0)
1799 1800 if self.debug:
1800 1801 pprint(msg)
1801 1802 content = msg['content']
1802 1803 if content['status'] != 'ok':
1803 1804 raise self._unwrap_exception(content)
1804 1805
1805 1806 records = content['records']
1806 1807
1807 1808 buffer_lens = content['buffer_lens']
1808 1809 result_buffer_lens = content['result_buffer_lens']
1809 1810 buffers = msg['buffers']
1810 1811 has_bufs = buffer_lens is not None
1811 1812 has_rbufs = result_buffer_lens is not None
1812 1813 for i,rec in enumerate(records):
1813 1814 # relink buffers
1814 1815 if has_bufs:
1815 1816 blen = buffer_lens[i]
1816 1817 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1817 1818 if has_rbufs:
1818 1819 blen = result_buffer_lens[i]
1819 1820 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1820 1821
1821 1822 return records
1822 1823
1823 1824 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now