##// END OF EJS Templates
fill out client.shutdown docstring...
MinRK -
Show More
@@ -1,1694 +1,1712
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36 36
37 37 from IPython.utils.coloransi import TermColors
38 38 from IPython.utils.jsonutil import rekey
39 39 from IPython.utils.localinterfaces import LOCAL_IPS
40 40 from IPython.utils.path import get_ipython_dir
41 41 from IPython.utils.py3compat import cast_bytes
42 42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 43 Dict, List, Bool, Set, Any)
44 44 from IPython.external.decorator import decorator
45 45 from IPython.external.ssh import tunnel
46 46
47 47 from IPython.parallel import Reference
48 48 from IPython.parallel import error
49 49 from IPython.parallel import util
50 50
51 51 from IPython.zmq.session import Session, Message
52 52
53 53 from .asyncresult import AsyncResult, AsyncHubResult
54 54 from .view import DirectView, LoadBalancedView
55 55
56 56 if sys.version_info[0] >= 3:
57 57 # xrange is used in a couple 'isinstance' tests in py2
58 58 # should be just 'range' in 3k
59 59 xrange = range
60 60
61 61 #--------------------------------------------------------------------------
62 62 # Decorators for Client methods
63 63 #--------------------------------------------------------------------------
64 64
65 65 @decorator
66 66 def spin_first(f, self, *args, **kwargs):
67 67 """Call spin() to sync state prior to calling the method."""
68 68 self.spin()
69 69 return f(self, *args, **kwargs)
70 70
71 71
72 72 #--------------------------------------------------------------------------
73 73 # Classes
74 74 #--------------------------------------------------------------------------
75 75
76 76
77 77 class ExecuteReply(object):
78 78 """wrapper for finished Execute results"""
79 79 def __init__(self, msg_id, content, metadata):
80 80 self.msg_id = msg_id
81 81 self._content = content
82 82 self.execution_count = content['execution_count']
83 83 self.metadata = metadata
84 84
85 85 def __getitem__(self, key):
86 86 return self.metadata[key]
87 87
88 88 def __getattr__(self, key):
89 89 if key not in self.metadata:
90 90 raise AttributeError(key)
91 91 return self.metadata[key]
92 92
93 93 def __repr__(self):
94 94 pyout = self.metadata['pyout'] or {'data':{}}
95 95 text_out = pyout['data'].get('text/plain', '')
96 96 if len(text_out) > 32:
97 97 text_out = text_out[:29] + '...'
98 98
99 99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 100
101 101 def _repr_pretty_(self, p, cycle):
102 102 pyout = self.metadata['pyout'] or {'data':{}}
103 103 text_out = pyout['data'].get('text/plain', '')
104 104
105 105 if not text_out:
106 106 return
107 107
108 108 try:
109 109 ip = get_ipython()
110 110 except NameError:
111 111 colors = "NoColor"
112 112 else:
113 113 colors = ip.colors
114 114
115 115 if colors == "NoColor":
116 116 out = normal = ""
117 117 else:
118 118 out = TermColors.Red
119 119 normal = TermColors.Normal
120 120
121 121 if '\n' in text_out and not text_out.startswith('\n'):
122 122 # add newline for multiline reprs
123 123 text_out = '\n' + text_out
124 124
125 125 p.text(
126 126 out + u'Out[%i:%i]: ' % (
127 127 self.metadata['engine_id'], self.execution_count
128 128 ) + normal + text_out
129 129 )
130 130
131 131 def _repr_html_(self):
132 132 pyout = self.metadata['pyout'] or {'data':{}}
133 133 return pyout['data'].get("text/html")
134 134
135 135 def _repr_latex_(self):
136 136 pyout = self.metadata['pyout'] or {'data':{}}
137 137 return pyout['data'].get("text/latex")
138 138
139 139 def _repr_json_(self):
140 140 pyout = self.metadata['pyout'] or {'data':{}}
141 141 return pyout['data'].get("application/json")
142 142
143 143 def _repr_javascript_(self):
144 144 pyout = self.metadata['pyout'] or {'data':{}}
145 145 return pyout['data'].get("application/javascript")
146 146
147 147 def _repr_png_(self):
148 148 pyout = self.metadata['pyout'] or {'data':{}}
149 149 return pyout['data'].get("image/png")
150 150
151 151 def _repr_jpeg_(self):
152 152 pyout = self.metadata['pyout'] or {'data':{}}
153 153 return pyout['data'].get("image/jpeg")
154 154
155 155 def _repr_svg_(self):
156 156 pyout = self.metadata['pyout'] or {'data':{}}
157 157 return pyout['data'].get("image/svg+xml")
158 158
159 159
160 160 class Metadata(dict):
161 161 """Subclass of dict for initializing metadata values.
162 162
163 163 Attribute access works on keys.
164 164
165 165 These objects have a strict set of keys - errors will raise if you try
166 166 to add new keys.
167 167 """
168 168 def __init__(self, *args, **kwargs):
169 169 dict.__init__(self)
170 170 md = {'msg_id' : None,
171 171 'submitted' : None,
172 172 'started' : None,
173 173 'completed' : None,
174 174 'received' : None,
175 175 'engine_uuid' : None,
176 176 'engine_id' : None,
177 177 'follow' : None,
178 178 'after' : None,
179 179 'status' : None,
180 180
181 181 'pyin' : None,
182 182 'pyout' : None,
183 183 'pyerr' : None,
184 184 'stdout' : '',
185 185 'stderr' : '',
186 186 'outputs' : [],
187 187 }
188 188 self.update(md)
189 189 self.update(dict(*args, **kwargs))
190 190
191 191 def __getattr__(self, key):
192 192 """getattr aliased to getitem"""
193 193 if key in self.iterkeys():
194 194 return self[key]
195 195 else:
196 196 raise AttributeError(key)
197 197
198 198 def __setattr__(self, key, value):
199 199 """setattr aliased to setitem, with strict"""
200 200 if key in self.iterkeys():
201 201 self[key] = value
202 202 else:
203 203 raise AttributeError(key)
204 204
205 205 def __setitem__(self, key, value):
206 206 """strict static key enforcement"""
207 207 if key in self.iterkeys():
208 208 dict.__setitem__(self, key, value)
209 209 else:
210 210 raise KeyError(key)
211 211
212 212
213 213 class Client(HasTraits):
214 214 """A semi-synchronous client to the IPython ZMQ cluster
215 215
216 216 Parameters
217 217 ----------
218 218
219 219 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 220 Connection information for the Hub's registration. If a json connector
221 221 file is given, then likely no further configuration is necessary.
222 222 [Default: use profile]
223 223 profile : bytes
224 224 The name of the Cluster profile to be used to find connector information.
225 225 If run from an IPython application, the default profile will be the same
226 226 as the running application, otherwise it will be 'default'.
227 227 context : zmq.Context
228 228 Pass an existing zmq.Context instance, otherwise the client will create its own.
229 229 debug : bool
230 230 flag for lots of message printing for debug purposes
231 231 timeout : int/float
232 232 time (in seconds) to wait for connection replies from the Hub
233 233 [Default: 10]
234 234
235 235 #-------------- session related args ----------------
236 236
237 237 config : Config object
238 238 If specified, this will be relayed to the Session for configuration
239 239 username : str
240 240 set username for the session object
241 241 packer : str (import_string) or callable
242 242 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
243 243 function to serialize messages. Must support same input as
244 244 JSON, and output must be bytes.
245 245 You can pass a callable directly as `pack`
246 246 unpacker : str (import_string) or callable
247 247 The inverse of packer. Only necessary if packer is specified as *not* one
248 248 of 'json' or 'pickle'.
249 249
250 250 #-------------- ssh related args ----------------
251 251 # These are args for configuring the ssh tunnel to be used
252 252 # credentials are used to forward connections over ssh to the Controller
253 253 # Note that the ip given in `addr` needs to be relative to sshserver
254 254 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
255 255 # and set sshserver as the same machine the Controller is on. However,
256 256 # the only requirement is that sshserver is able to see the Controller
257 257 # (i.e. is within the same trusted network).
258 258
259 259 sshserver : str
260 260 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
261 261 If keyfile or password is specified, and this is not, it will default to
262 262 the ip given in addr.
263 263 sshkey : str; path to ssh private key file
264 264 This specifies a key to be used in ssh login, default None.
265 265 Regular default ssh keys will be used without specifying this argument.
266 266 password : str
267 267 Your ssh password to sshserver. Note that if this is left None,
268 268 you will be prompted for it if passwordless key based login is unavailable.
269 269 paramiko : bool
270 270 flag for whether to use paramiko instead of shell ssh for tunneling.
271 271 [default: True on win32, False else]
272 272
273 273 ------- exec authentication args -------
274 274 If even localhost is untrusted, you can have some protection against
275 275 unauthorized execution by signing messages with HMAC digests.
276 276 Messages are still sent as cleartext, so if someone can snoop your
277 277 loopback traffic this will not protect your privacy, but will prevent
278 278 unauthorized execution.
279 279
280 280 exec_key : str
281 281 an authentication key or file containing a key
282 282 default: None
283 283
284 284
285 285 Attributes
286 286 ----------
287 287
288 288 ids : list of int engine IDs
289 289 requesting the ids attribute always synchronizes
290 290 the registration state. To request ids without synchronization,
291 291 use semi-private _ids attributes.
292 292
293 293 history : list of msg_ids
294 294 a list of msg_ids, keeping track of all the execution
295 295 messages you have submitted in order.
296 296
297 297 outstanding : set of msg_ids
298 298 a set of msg_ids that have been submitted, but whose
299 299 results have not yet been received.
300 300
301 301 results : dict
302 302 a dict of all our results, keyed by msg_id
303 303
304 304 block : bool
305 305 determines default behavior when block not specified
306 306 in execution methods
307 307
308 308 Methods
309 309 -------
310 310
311 311 spin
312 312 flushes incoming results and registration state changes
313 313 control methods spin, and requesting `ids` also ensures up to date
314 314
315 315 wait
316 316 wait on one or more msg_ids
317 317
318 318 execution methods
319 319 apply
320 320 legacy: execute, run
321 321
322 322 data movement
323 323 push, pull, scatter, gather
324 324
325 325 query methods
326 326 queue_status, get_result, purge, result_status
327 327
328 328 control methods
329 329 abort, shutdown
330 330
331 331 """
332 332
333 333
334 334 block = Bool(False)
335 335 outstanding = Set()
336 336 results = Instance('collections.defaultdict', (dict,))
337 337 metadata = Instance('collections.defaultdict', (Metadata,))
338 338 history = List()
339 339 debug = Bool(False)
340 340 _spin_thread = Any()
341 341 _stop_spinning = Any()
342 342
343 343 profile=Unicode()
344 344 def _profile_default(self):
345 345 if BaseIPythonApplication.initialized():
346 346 # an IPython app *might* be running, try to get its profile
347 347 try:
348 348 return BaseIPythonApplication.instance().profile
349 349 except (AttributeError, MultipleInstanceError):
350 350 # could be a *different* subclass of config.Application,
351 351 # which would raise one of these two errors.
352 352 return u'default'
353 353 else:
354 354 return u'default'
355 355
356 356
357 357 _outstanding_dict = Instance('collections.defaultdict', (set,))
358 358 _ids = List()
359 359 _connected=Bool(False)
360 360 _ssh=Bool(False)
361 361 _context = Instance('zmq.Context')
362 362 _config = Dict()
363 363 _engines=Instance(util.ReverseDict, (), {})
364 364 # _hub_socket=Instance('zmq.Socket')
365 365 _query_socket=Instance('zmq.Socket')
366 366 _control_socket=Instance('zmq.Socket')
367 367 _iopub_socket=Instance('zmq.Socket')
368 368 _notification_socket=Instance('zmq.Socket')
369 369 _mux_socket=Instance('zmq.Socket')
370 370 _task_socket=Instance('zmq.Socket')
371 371 _task_scheme=Unicode()
372 372 _closed = False
373 373 _ignored_control_replies=Integer(0)
374 374 _ignored_hub_replies=Integer(0)
375 375
376 376 def __new__(self, *args, **kw):
377 377 # don't raise on positional args
378 378 return HasTraits.__new__(self, **kw)
379 379
380 380 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
381 381 context=None, debug=False, exec_key=None,
382 382 sshserver=None, sshkey=None, password=None, paramiko=None,
383 383 timeout=10, **extra_args
384 384 ):
385 385 if profile:
386 386 super(Client, self).__init__(debug=debug, profile=profile)
387 387 else:
388 388 super(Client, self).__init__(debug=debug)
389 389 if context is None:
390 390 context = zmq.Context.instance()
391 391 self._context = context
392 392 self._stop_spinning = Event()
393 393
394 394 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395 395 if self._cd is not None:
396 396 if url_or_file is None:
397 397 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
398 398 if url_or_file is None:
399 399 raise ValueError(
400 400 "I can't find enough information to connect to a hub!"
401 401 " Please specify at least one of url_or_file or profile."
402 402 )
403 403
404 404 if not util.is_url(url_or_file):
405 405 # it's not a url, try for a file
406 406 if not os.path.exists(url_or_file):
407 407 if self._cd:
408 408 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
409 409 if not os.path.exists(url_or_file):
410 410 raise IOError("Connection file not found: %r" % url_or_file)
411 411 with open(url_or_file) as f:
412 412 cfg = json.loads(f.read())
413 413 else:
414 414 cfg = {'url':url_or_file}
415 415
416 416 # sync defaults from args, json:
417 417 if sshserver:
418 418 cfg['ssh'] = sshserver
419 419 if exec_key:
420 420 cfg['exec_key'] = exec_key
421 421 exec_key = cfg['exec_key']
422 422 location = cfg.setdefault('location', None)
423 423 cfg['url'] = util.disambiguate_url(cfg['url'], location)
424 424 url = cfg['url']
425 425 proto,addr,port = util.split_url(url)
426 426 if location is not None and addr == '127.0.0.1':
427 427 # location specified, and connection is expected to be local
428 428 if location not in LOCAL_IPS and not sshserver:
429 429 # load ssh from JSON *only* if the controller is not on
430 430 # this machine
431 431 sshserver=cfg['ssh']
432 432 if location not in LOCAL_IPS and not sshserver:
433 433 # warn if no ssh specified, but SSH is probably needed
434 434 # This is only a warning, because the most likely cause
435 435 # is a local Controller on a laptop whose IP is dynamic
436 436 warnings.warn("""
437 437 Controller appears to be listening on localhost, but not on this machine.
438 438 If this is true, you should specify Client(...,sshserver='you@%s')
439 439 or instruct your controller to listen on an external IP."""%location,
440 440 RuntimeWarning)
441 441 elif not sshserver:
442 442 # otherwise sync with cfg
443 443 sshserver = cfg['ssh']
444 444
445 445 self._config = cfg
446 446
447 447 self._ssh = bool(sshserver or sshkey or password)
448 448 if self._ssh and sshserver is None:
449 449 # default to ssh via localhost
450 450 sshserver = url.split('://')[1].split(':')[0]
451 451 if self._ssh and password is None:
452 452 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
453 453 password=False
454 454 else:
455 455 password = getpass("SSH Password for %s: "%sshserver)
456 456 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
457 457
458 458 # configure and construct the session
459 459 if exec_key is not None:
460 460 if os.path.isfile(exec_key):
461 461 extra_args['keyfile'] = exec_key
462 462 else:
463 463 exec_key = cast_bytes(exec_key)
464 464 extra_args['key'] = exec_key
465 465 self.session = Session(**extra_args)
466 466
467 467 self._query_socket = self._context.socket(zmq.DEALER)
468 468 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
469 469 if self._ssh:
470 470 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
471 471 else:
472 472 self._query_socket.connect(url)
473 473
474 474 self.session.debug = self.debug
475 475
476 476 self._notification_handlers = {'registration_notification' : self._register_engine,
477 477 'unregistration_notification' : self._unregister_engine,
478 478 'shutdown_notification' : lambda msg: self.close(),
479 479 }
480 480 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
481 481 'apply_reply' : self._handle_apply_reply}
482 482 self._connect(sshserver, ssh_kwargs, timeout)
483 483
484 484 # last step: setup magics, if we are in IPython:
485 485
486 486 try:
487 487 ip = get_ipython()
488 488 except NameError:
489 489 return
490 490 else:
491 491 if 'px' not in ip.magics_manager.magics:
492 492 # in IPython but we are the first Client.
493 493 # activate a default view for parallel magics.
494 494 self.activate()
495 495
496 496 def __del__(self):
497 497 """cleanup sockets, but _not_ context."""
498 498 self.close()
499 499
500 500 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
501 501 if ipython_dir is None:
502 502 ipython_dir = get_ipython_dir()
503 503 if profile_dir is not None:
504 504 try:
505 505 self._cd = ProfileDir.find_profile_dir(profile_dir)
506 506 return
507 507 except ProfileDirError:
508 508 pass
509 509 elif profile is not None:
510 510 try:
511 511 self._cd = ProfileDir.find_profile_dir_by_name(
512 512 ipython_dir, profile)
513 513 return
514 514 except ProfileDirError:
515 515 pass
516 516 self._cd = None
517 517
518 518 def _update_engines(self, engines):
519 519 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
520 520 for k,v in engines.iteritems():
521 521 eid = int(k)
522 522 self._engines[eid] = v
523 523 self._ids.append(eid)
524 524 self._ids = sorted(self._ids)
525 525 if sorted(self._engines.keys()) != range(len(self._engines)) and \
526 526 self._task_scheme == 'pure' and self._task_socket:
527 527 self._stop_scheduling_tasks()
528 528
529 529 def _stop_scheduling_tasks(self):
530 530 """Stop scheduling tasks because an engine has been unregistered
531 531 from a pure ZMQ scheduler.
532 532 """
533 533 self._task_socket.close()
534 534 self._task_socket = None
535 535 msg = "An engine has been unregistered, and we are using pure " +\
536 536 "ZMQ task scheduling. Task farming will be disabled."
537 537 if self.outstanding:
538 538 msg += " If you were running tasks when this happened, " +\
539 539 "some `outstanding` msg_ids may never resolve."
540 540 warnings.warn(msg, RuntimeWarning)
541 541
542 542 def _build_targets(self, targets):
543 543 """Turn valid target IDs or 'all' into two lists:
544 544 (int_ids, uuids).
545 545 """
546 546 if not self._ids:
547 547 # flush notification socket if no engines yet, just in case
548 548 if not self.ids:
549 549 raise error.NoEnginesRegistered("Can't build targets without any engines")
550 550
551 551 if targets is None:
552 552 targets = self._ids
553 553 elif isinstance(targets, basestring):
554 554 if targets.lower() == 'all':
555 555 targets = self._ids
556 556 else:
557 557 raise TypeError("%r not valid str target, must be 'all'"%(targets))
558 558 elif isinstance(targets, int):
559 559 if targets < 0:
560 560 targets = self.ids[targets]
561 561 if targets not in self._ids:
562 562 raise IndexError("No such engine: %i"%targets)
563 563 targets = [targets]
564 564
565 565 if isinstance(targets, slice):
566 566 indices = range(len(self._ids))[targets]
567 567 ids = self.ids
568 568 targets = [ ids[i] for i in indices ]
569 569
570 570 if not isinstance(targets, (tuple, list, xrange)):
571 571 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
572 572
573 573 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
574 574
575 575 def _connect(self, sshserver, ssh_kwargs, timeout):
576 576 """setup all our socket connections to the cluster. This is called from
577 577 __init__."""
578 578
579 579 # Maybe allow reconnecting?
580 580 if self._connected:
581 581 return
582 582 self._connected=True
583 583
584 584 def connect_socket(s, url):
585 585 url = util.disambiguate_url(url, self._config['location'])
586 586 if self._ssh:
587 587 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
588 588 else:
589 589 return s.connect(url)
590 590
591 591 self.session.send(self._query_socket, 'connection_request')
592 592 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
593 593 poller = zmq.Poller()
594 594 poller.register(self._query_socket, zmq.POLLIN)
595 595 # poll expects milliseconds, timeout is seconds
596 596 evts = poller.poll(timeout*1000)
597 597 if not evts:
598 598 raise error.TimeoutError("Hub connection request timed out")
599 599 idents,msg = self.session.recv(self._query_socket,mode=0)
600 600 if self.debug:
601 601 pprint(msg)
602 602 msg = Message(msg)
603 603 content = msg.content
604 604 self._config['registration'] = dict(content)
605 605 if content.status == 'ok':
606 606 ident = self.session.bsession
607 607 if content.mux:
608 608 self._mux_socket = self._context.socket(zmq.DEALER)
609 609 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
610 610 connect_socket(self._mux_socket, content.mux)
611 611 if content.task:
612 612 self._task_scheme, task_addr = content.task
613 613 self._task_socket = self._context.socket(zmq.DEALER)
614 614 self._task_socket.setsockopt(zmq.IDENTITY, ident)
615 615 connect_socket(self._task_socket, task_addr)
616 616 if content.notification:
617 617 self._notification_socket = self._context.socket(zmq.SUB)
618 618 connect_socket(self._notification_socket, content.notification)
619 619 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
620 620 # if content.query:
621 621 # self._query_socket = self._context.socket(zmq.DEALER)
622 622 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
623 623 # connect_socket(self._query_socket, content.query)
624 624 if content.control:
625 625 self._control_socket = self._context.socket(zmq.DEALER)
626 626 self._control_socket.setsockopt(zmq.IDENTITY, ident)
627 627 connect_socket(self._control_socket, content.control)
628 628 if content.iopub:
629 629 self._iopub_socket = self._context.socket(zmq.SUB)
630 630 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
631 631 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
632 632 connect_socket(self._iopub_socket, content.iopub)
633 633 self._update_engines(dict(content.engines))
634 634 else:
635 635 self._connected = False
636 636 raise Exception("Failed to connect!")
637 637
638 638 #--------------------------------------------------------------------------
639 639 # handlers and callbacks for incoming messages
640 640 #--------------------------------------------------------------------------
641 641
642 642 def _unwrap_exception(self, content):
643 643 """unwrap exception, and remap engine_id to int."""
644 644 e = error.unwrap_exception(content)
645 645 # print e.traceback
646 646 if e.engine_info:
647 647 e_uuid = e.engine_info['engine_uuid']
648 648 eid = self._engines[e_uuid]
649 649 e.engine_info['engine_id'] = eid
650 650 return e
651 651
652 652 def _extract_metadata(self, header, parent, content):
653 653 md = {'msg_id' : parent['msg_id'],
654 654 'received' : datetime.now(),
655 655 'engine_uuid' : header.get('engine', None),
656 656 'follow' : parent.get('follow', []),
657 657 'after' : parent.get('after', []),
658 658 'status' : content['status'],
659 659 }
660 660
661 661 if md['engine_uuid'] is not None:
662 662 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
663 663
664 664 if 'date' in parent:
665 665 md['submitted'] = parent['date']
666 666 if 'started' in header:
667 667 md['started'] = header['started']
668 668 if 'date' in header:
669 669 md['completed'] = header['date']
670 670 return md
671 671
672 672 def _register_engine(self, msg):
673 673 """Register a new engine, and update our connection info."""
674 674 content = msg['content']
675 675 eid = content['id']
676 676 d = {eid : content['queue']}
677 677 self._update_engines(d)
678 678
679 679 def _unregister_engine(self, msg):
680 680 """Unregister an engine that has died."""
681 681 content = msg['content']
682 682 eid = int(content['id'])
683 683 if eid in self._ids:
684 684 self._ids.remove(eid)
685 685 uuid = self._engines.pop(eid)
686 686
687 687 self._handle_stranded_msgs(eid, uuid)
688 688
689 689 if self._task_socket and self._task_scheme == 'pure':
690 690 self._stop_scheduling_tasks()
691 691
692 692 def _handle_stranded_msgs(self, eid, uuid):
693 693 """Handle messages known to be on an engine when the engine unregisters.
694 694
695 695 It is possible that this will fire prematurely - that is, an engine will
696 696 go down after completing a result, and the client will be notified
697 697 of the unregistration and later receive the successful result.
698 698 """
699 699
700 700 outstanding = self._outstanding_dict[uuid]
701 701
702 702 for msg_id in list(outstanding):
703 703 if msg_id in self.results:
704 704 # we already
705 705 continue
706 706 try:
707 707 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
708 708 except:
709 709 content = error.wrap_exception()
710 710 # build a fake message:
711 711 parent = {}
712 712 header = {}
713 713 parent['msg_id'] = msg_id
714 714 header['engine'] = uuid
715 715 header['date'] = datetime.now()
716 716 msg = dict(parent_header=parent, header=header, content=content)
717 717 self._handle_apply_reply(msg)
718 718
719 719 def _handle_execute_reply(self, msg):
720 720 """Save the reply to an execute_request into our results.
721 721
722 722 execute messages are never actually used. apply is used instead.
723 723 """
724 724
725 725 parent = msg['parent_header']
726 726 msg_id = parent['msg_id']
727 727 if msg_id not in self.outstanding:
728 728 if msg_id in self.history:
729 729 print ("got stale result: %s"%msg_id)
730 730 else:
731 731 print ("got unknown result: %s"%msg_id)
732 732 else:
733 733 self.outstanding.remove(msg_id)
734 734
735 735 content = msg['content']
736 736 header = msg['header']
737 737
738 738 # construct metadata:
739 739 md = self.metadata[msg_id]
740 740 md.update(self._extract_metadata(header, parent, content))
741 741 # is this redundant?
742 742 self.metadata[msg_id] = md
743 743
744 744 e_outstanding = self._outstanding_dict[md['engine_uuid']]
745 745 if msg_id in e_outstanding:
746 746 e_outstanding.remove(msg_id)
747 747
748 748 # construct result:
749 749 if content['status'] == 'ok':
750 750 self.results[msg_id] = ExecuteReply(msg_id, content, md)
751 751 elif content['status'] == 'aborted':
752 752 self.results[msg_id] = error.TaskAborted(msg_id)
753 753 elif content['status'] == 'resubmitted':
754 754 # TODO: handle resubmission
755 755 pass
756 756 else:
757 757 self.results[msg_id] = self._unwrap_exception(content)
758 758
759 759 def _handle_apply_reply(self, msg):
760 760 """Save the reply to an apply_request into our results."""
761 761 parent = msg['parent_header']
762 762 msg_id = parent['msg_id']
763 763 if msg_id not in self.outstanding:
764 764 if msg_id in self.history:
765 765 print ("got stale result: %s"%msg_id)
766 766 print self.results[msg_id]
767 767 print msg
768 768 else:
769 769 print ("got unknown result: %s"%msg_id)
770 770 else:
771 771 self.outstanding.remove(msg_id)
772 772 content = msg['content']
773 773 header = msg['header']
774 774
775 775 # construct metadata:
776 776 md = self.metadata[msg_id]
777 777 md.update(self._extract_metadata(header, parent, content))
778 778 # is this redundant?
779 779 self.metadata[msg_id] = md
780 780
781 781 e_outstanding = self._outstanding_dict[md['engine_uuid']]
782 782 if msg_id in e_outstanding:
783 783 e_outstanding.remove(msg_id)
784 784
785 785 # construct result:
786 786 if content['status'] == 'ok':
787 787 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
788 788 elif content['status'] == 'aborted':
789 789 self.results[msg_id] = error.TaskAborted(msg_id)
790 790 elif content['status'] == 'resubmitted':
791 791 # TODO: handle resubmission
792 792 pass
793 793 else:
794 794 self.results[msg_id] = self._unwrap_exception(content)
795 795
796 796 def _flush_notifications(self):
797 797 """Flush notifications of engine registrations waiting
798 798 in ZMQ queue."""
799 799 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
800 800 while msg is not None:
801 801 if self.debug:
802 802 pprint(msg)
803 803 msg_type = msg['header']['msg_type']
804 804 handler = self._notification_handlers.get(msg_type, None)
805 805 if handler is None:
806 806 raise Exception("Unhandled message type: %s"%msg.msg_type)
807 807 else:
808 808 handler(msg)
809 809 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
810 810
811 811 def _flush_results(self, sock):
812 812 """Flush task or queue results waiting in ZMQ queue."""
813 813 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
814 814 while msg is not None:
815 815 if self.debug:
816 816 pprint(msg)
817 817 msg_type = msg['header']['msg_type']
818 818 handler = self._queue_handlers.get(msg_type, None)
819 819 if handler is None:
820 820 raise Exception("Unhandled message type: %s"%msg.msg_type)
821 821 else:
822 822 handler(msg)
823 823 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
824 824
825 825 def _flush_control(self, sock):
826 826 """Flush replies from the control channel waiting
827 827 in the ZMQ queue.
828 828
829 829 Currently: ignore them."""
830 830 if self._ignored_control_replies <= 0:
831 831 return
832 832 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
833 833 while msg is not None:
834 834 self._ignored_control_replies -= 1
835 835 if self.debug:
836 836 pprint(msg)
837 837 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
838 838
839 839 def _flush_ignored_control(self):
840 840 """flush ignored control replies"""
841 841 while self._ignored_control_replies > 0:
842 842 self.session.recv(self._control_socket)
843 843 self._ignored_control_replies -= 1
844 844
845 845 def _flush_ignored_hub_replies(self):
846 846 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
847 847 while msg is not None:
848 848 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
849 849
850 850 def _flush_iopub(self, sock):
851 851 """Flush replies from the iopub channel waiting
852 852 in the ZMQ queue.
853 853 """
854 854 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
855 855 while msg is not None:
856 856 if self.debug:
857 857 pprint(msg)
858 858 parent = msg['parent_header']
859 859 # ignore IOPub messages with no parent.
860 860 # Caused by print statements or warnings from before the first execution.
861 861 if not parent:
862 862 continue
863 863 msg_id = parent['msg_id']
864 864 content = msg['content']
865 865 header = msg['header']
866 866 msg_type = msg['header']['msg_type']
867 867
868 868 # init metadata:
869 869 md = self.metadata[msg_id]
870 870
871 871 if msg_type == 'stream':
872 872 name = content['name']
873 873 s = md[name] or ''
874 874 md[name] = s + content['data']
875 875 elif msg_type == 'pyerr':
876 876 md.update({'pyerr' : self._unwrap_exception(content)})
877 877 elif msg_type == 'pyin':
878 878 md.update({'pyin' : content['code']})
879 879 elif msg_type == 'display_data':
880 880 md['outputs'].append(content)
881 881 elif msg_type == 'pyout':
882 882 md['pyout'] = content
883 883 else:
884 884 # unhandled msg_type (status, etc.)
885 885 pass
886 886
887 887 # reduntant?
888 888 self.metadata[msg_id] = md
889 889
890 890 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
891 891
892 892 #--------------------------------------------------------------------------
893 893 # len, getitem
894 894 #--------------------------------------------------------------------------
895 895
896 896 def __len__(self):
897 897 """len(client) returns # of engines."""
898 898 return len(self.ids)
899 899
900 900 def __getitem__(self, key):
901 901 """index access returns DirectView multiplexer objects
902 902
903 903 Must be int, slice, or list/tuple/xrange of ints"""
904 904 if not isinstance(key, (int, slice, tuple, list, xrange)):
905 905 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
906 906 else:
907 907 return self.direct_view(key)
908 908
909 909 #--------------------------------------------------------------------------
910 910 # Begin public methods
911 911 #--------------------------------------------------------------------------
912 912
913 913 @property
914 914 def ids(self):
915 915 """Always up-to-date ids property."""
916 916 self._flush_notifications()
917 917 # always copy:
918 918 return list(self._ids)
919 919
920 920 def activate(self, targets='all', suffix=''):
921 921 """Create a DirectView and register it with IPython magics
922 922
923 923 Defines the magics `%px, %autopx, %pxresult, %%px`
924 924
925 925 Parameters
926 926 ----------
927 927
928 928 targets: int, list of ints, or 'all'
929 929 The engines on which the view's magics will run
930 930 suffix: str [default: '']
931 931 The suffix, if any, for the magics. This allows you to have
932 932 multiple views associated with parallel magics at the same time.
933 933
934 934 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
935 935 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
936 936 on engine 0.
937 937 """
938 938 view = self.direct_view(targets)
939 939 view.block = True
940 940 view.activate(suffix)
941 941 return view
942 942
943 943 def close(self):
944 944 if self._closed:
945 945 return
946 946 self.stop_spin_thread()
947 947 snames = filter(lambda n: n.endswith('socket'), dir(self))
948 948 for socket in map(lambda name: getattr(self, name), snames):
949 949 if isinstance(socket, zmq.Socket) and not socket.closed:
950 950 socket.close()
951 951 self._closed = True
952 952
953 953 def _spin_every(self, interval=1):
954 954 """target func for use in spin_thread"""
955 955 while True:
956 956 if self._stop_spinning.is_set():
957 957 return
958 958 time.sleep(interval)
959 959 self.spin()
960 960
961 961 def spin_thread(self, interval=1):
962 962 """call Client.spin() in a background thread on some regular interval
963 963
964 964 This helps ensure that messages don't pile up too much in the zmq queue
965 965 while you are working on other things, or just leaving an idle terminal.
966 966
967 967 It also helps limit potential padding of the `received` timestamp
968 968 on AsyncResult objects, used for timings.
969 969
970 970 Parameters
971 971 ----------
972 972
973 973 interval : float, optional
974 974 The interval on which to spin the client in the background thread
975 975 (simply passed to time.sleep).
976 976
977 977 Notes
978 978 -----
979 979
980 980 For precision timing, you may want to use this method to put a bound
981 981 on the jitter (in seconds) in `received` timestamps used
982 982 in AsyncResult.wall_time.
983 983
984 984 """
985 985 if self._spin_thread is not None:
986 986 self.stop_spin_thread()
987 987 self._stop_spinning.clear()
988 988 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
989 989 self._spin_thread.daemon = True
990 990 self._spin_thread.start()
991 991
992 992 def stop_spin_thread(self):
993 993 """stop background spin_thread, if any"""
994 994 if self._spin_thread is not None:
995 995 self._stop_spinning.set()
996 996 self._spin_thread.join()
997 997 self._spin_thread = None
998 998
999 999 def spin(self):
1000 1000 """Flush any registration notifications and execution results
1001 1001 waiting in the ZMQ queue.
1002 1002 """
1003 1003 if self._notification_socket:
1004 1004 self._flush_notifications()
1005 1005 if self._iopub_socket:
1006 1006 self._flush_iopub(self._iopub_socket)
1007 1007 if self._mux_socket:
1008 1008 self._flush_results(self._mux_socket)
1009 1009 if self._task_socket:
1010 1010 self._flush_results(self._task_socket)
1011 1011 if self._control_socket:
1012 1012 self._flush_control(self._control_socket)
1013 1013 if self._query_socket:
1014 1014 self._flush_ignored_hub_replies()
1015 1015
1016 1016 def wait(self, jobs=None, timeout=-1):
1017 1017 """waits on one or more `jobs`, for up to `timeout` seconds.
1018 1018
1019 1019 Parameters
1020 1020 ----------
1021 1021
1022 1022 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1023 1023 ints are indices to self.history
1024 1024 strs are msg_ids
1025 1025 default: wait on all outstanding messages
1026 1026 timeout : float
1027 1027 a time in seconds, after which to give up.
1028 1028 default is -1, which means no timeout
1029 1029
1030 1030 Returns
1031 1031 -------
1032 1032
1033 1033 True : when all msg_ids are done
1034 1034 False : timeout reached, some msg_ids still outstanding
1035 1035 """
1036 1036 tic = time.time()
1037 1037 if jobs is None:
1038 1038 theids = self.outstanding
1039 1039 else:
1040 1040 if isinstance(jobs, (int, basestring, AsyncResult)):
1041 1041 jobs = [jobs]
1042 1042 theids = set()
1043 1043 for job in jobs:
1044 1044 if isinstance(job, int):
1045 1045 # index access
1046 1046 job = self.history[job]
1047 1047 elif isinstance(job, AsyncResult):
1048 1048 map(theids.add, job.msg_ids)
1049 1049 continue
1050 1050 theids.add(job)
1051 1051 if not theids.intersection(self.outstanding):
1052 1052 return True
1053 1053 self.spin()
1054 1054 while theids.intersection(self.outstanding):
1055 1055 if timeout >= 0 and ( time.time()-tic ) > timeout:
1056 1056 break
1057 1057 time.sleep(1e-3)
1058 1058 self.spin()
1059 1059 return len(theids.intersection(self.outstanding)) == 0
1060 1060
1061 1061 #--------------------------------------------------------------------------
1062 1062 # Control methods
1063 1063 #--------------------------------------------------------------------------
1064 1064
1065 1065 @spin_first
1066 1066 def clear(self, targets=None, block=None):
1067 1067 """Clear the namespace in target(s)."""
1068 1068 block = self.block if block is None else block
1069 1069 targets = self._build_targets(targets)[0]
1070 1070 for t in targets:
1071 1071 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1072 1072 error = False
1073 1073 if block:
1074 1074 self._flush_ignored_control()
1075 1075 for i in range(len(targets)):
1076 1076 idents,msg = self.session.recv(self._control_socket,0)
1077 1077 if self.debug:
1078 1078 pprint(msg)
1079 1079 if msg['content']['status'] != 'ok':
1080 1080 error = self._unwrap_exception(msg['content'])
1081 1081 else:
1082 1082 self._ignored_control_replies += len(targets)
1083 1083 if error:
1084 1084 raise error
1085 1085
1086 1086
1087 1087 @spin_first
1088 1088 def abort(self, jobs=None, targets=None, block=None):
1089 1089 """Abort specific jobs from the execution queues of target(s).
1090 1090
1091 1091 This is a mechanism to prevent jobs that have already been submitted
1092 1092 from executing.
1093 1093
1094 1094 Parameters
1095 1095 ----------
1096 1096
1097 1097 jobs : msg_id, list of msg_ids, or AsyncResult
1098 1098 The jobs to be aborted
1099 1099
1100 1100 If unspecified/None: abort all outstanding jobs.
1101 1101
1102 1102 """
1103 1103 block = self.block if block is None else block
1104 1104 jobs = jobs if jobs is not None else list(self.outstanding)
1105 1105 targets = self._build_targets(targets)[0]
1106 1106
1107 1107 msg_ids = []
1108 1108 if isinstance(jobs, (basestring,AsyncResult)):
1109 1109 jobs = [jobs]
1110 1110 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1111 1111 if bad_ids:
1112 1112 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1113 1113 for j in jobs:
1114 1114 if isinstance(j, AsyncResult):
1115 1115 msg_ids.extend(j.msg_ids)
1116 1116 else:
1117 1117 msg_ids.append(j)
1118 1118 content = dict(msg_ids=msg_ids)
1119 1119 for t in targets:
1120 1120 self.session.send(self._control_socket, 'abort_request',
1121 1121 content=content, ident=t)
1122 1122 error = False
1123 1123 if block:
1124 1124 self._flush_ignored_control()
1125 1125 for i in range(len(targets)):
1126 1126 idents,msg = self.session.recv(self._control_socket,0)
1127 1127 if self.debug:
1128 1128 pprint(msg)
1129 1129 if msg['content']['status'] != 'ok':
1130 1130 error = self._unwrap_exception(msg['content'])
1131 1131 else:
1132 1132 self._ignored_control_replies += len(targets)
1133 1133 if error:
1134 1134 raise error
1135 1135
1136 1136 @spin_first
1137 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1138 """Terminates one or more engine processes, optionally including the hub."""
1137 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1138 """Terminates one or more engine processes, optionally including the hub.
1139
1140 Parameters
1141 ----------
1142
1143 targets: list of ints or 'all' [default: all]
1144 Which engines to shutdown.
1145 hub: bool [default: False]
1146 Whether to include the Hub. hub=True implies targets='all'.
1147 block: bool [default: self.block]
1148 Whether to wait for clean shutdown replies or not.
1149 restart: bool [default: False]
1150 NOT IMPLEMENTED
1151 whether to restart engines after shutting them down.
1152 """
1153
1154 if restart:
1155 raise NotImplementedError("Engine restart is not yet implemented")
1156
1139 1157 block = self.block if block is None else block
1140 1158 if hub:
1141 1159 targets = 'all'
1142 1160 targets = self._build_targets(targets)[0]
1143 1161 for t in targets:
1144 1162 self.session.send(self._control_socket, 'shutdown_request',
1145 1163 content={'restart':restart},ident=t)
1146 1164 error = False
1147 1165 if block or hub:
1148 1166 self._flush_ignored_control()
1149 1167 for i in range(len(targets)):
1150 1168 idents,msg = self.session.recv(self._control_socket, 0)
1151 1169 if self.debug:
1152 1170 pprint(msg)
1153 1171 if msg['content']['status'] != 'ok':
1154 1172 error = self._unwrap_exception(msg['content'])
1155 1173 else:
1156 1174 self._ignored_control_replies += len(targets)
1157 1175
1158 1176 if hub:
1159 1177 time.sleep(0.25)
1160 1178 self.session.send(self._query_socket, 'shutdown_request')
1161 1179 idents,msg = self.session.recv(self._query_socket, 0)
1162 1180 if self.debug:
1163 1181 pprint(msg)
1164 1182 if msg['content']['status'] != 'ok':
1165 1183 error = self._unwrap_exception(msg['content'])
1166 1184
1167 1185 if error:
1168 1186 raise error
1169 1187
1170 1188 #--------------------------------------------------------------------------
1171 1189 # Execution related methods
1172 1190 #--------------------------------------------------------------------------
1173 1191
1174 1192 def _maybe_raise(self, result):
1175 1193 """wrapper for maybe raising an exception if apply failed."""
1176 1194 if isinstance(result, error.RemoteError):
1177 1195 raise result
1178 1196
1179 1197 return result
1180 1198
1181 1199 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1182 1200 ident=None):
1183 1201 """construct and send an apply message via a socket.
1184 1202
1185 1203 This is the principal method with which all engine execution is performed by views.
1186 1204 """
1187 1205
1188 1206 if self._closed:
1189 1207 raise RuntimeError("Client cannot be used after its sockets have been closed")
1190 1208
1191 1209 # defaults:
1192 1210 args = args if args is not None else []
1193 1211 kwargs = kwargs if kwargs is not None else {}
1194 1212 subheader = subheader if subheader is not None else {}
1195 1213
1196 1214 # validate arguments
1197 1215 if not callable(f) and not isinstance(f, Reference):
1198 1216 raise TypeError("f must be callable, not %s"%type(f))
1199 1217 if not isinstance(args, (tuple, list)):
1200 1218 raise TypeError("args must be tuple or list, not %s"%type(args))
1201 1219 if not isinstance(kwargs, dict):
1202 1220 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1203 1221 if not isinstance(subheader, dict):
1204 1222 raise TypeError("subheader must be dict, not %s"%type(subheader))
1205 1223
1206 1224 bufs = util.pack_apply_message(f,args,kwargs)
1207 1225
1208 1226 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1209 1227 subheader=subheader, track=track)
1210 1228
1211 1229 msg_id = msg['header']['msg_id']
1212 1230 self.outstanding.add(msg_id)
1213 1231 if ident:
1214 1232 # possibly routed to a specific engine
1215 1233 if isinstance(ident, list):
1216 1234 ident = ident[-1]
1217 1235 if ident in self._engines.values():
1218 1236 # save for later, in case of engine death
1219 1237 self._outstanding_dict[ident].add(msg_id)
1220 1238 self.history.append(msg_id)
1221 1239 self.metadata[msg_id]['submitted'] = datetime.now()
1222 1240
1223 1241 return msg
1224 1242
1225 1243 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1226 1244 """construct and send an execute request via a socket.
1227 1245
1228 1246 """
1229 1247
1230 1248 if self._closed:
1231 1249 raise RuntimeError("Client cannot be used after its sockets have been closed")
1232 1250
1233 1251 # defaults:
1234 1252 subheader = subheader if subheader is not None else {}
1235 1253
1236 1254 # validate arguments
1237 1255 if not isinstance(code, basestring):
1238 1256 raise TypeError("code must be text, not %s" % type(code))
1239 1257 if not isinstance(subheader, dict):
1240 1258 raise TypeError("subheader must be dict, not %s" % type(subheader))
1241 1259
1242 1260 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1243 1261
1244 1262
1245 1263 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1246 1264 subheader=subheader)
1247 1265
1248 1266 msg_id = msg['header']['msg_id']
1249 1267 self.outstanding.add(msg_id)
1250 1268 if ident:
1251 1269 # possibly routed to a specific engine
1252 1270 if isinstance(ident, list):
1253 1271 ident = ident[-1]
1254 1272 if ident in self._engines.values():
1255 1273 # save for later, in case of engine death
1256 1274 self._outstanding_dict[ident].add(msg_id)
1257 1275 self.history.append(msg_id)
1258 1276 self.metadata[msg_id]['submitted'] = datetime.now()
1259 1277
1260 1278 return msg
1261 1279
1262 1280 #--------------------------------------------------------------------------
1263 1281 # construct a View object
1264 1282 #--------------------------------------------------------------------------
1265 1283
1266 1284 def load_balanced_view(self, targets=None):
1267 1285 """construct a DirectView object.
1268 1286
1269 1287 If no arguments are specified, create a LoadBalancedView
1270 1288 using all engines.
1271 1289
1272 1290 Parameters
1273 1291 ----------
1274 1292
1275 1293 targets: list,slice,int,etc. [default: use all engines]
1276 1294 The subset of engines across which to load-balance
1277 1295 """
1278 1296 if targets == 'all':
1279 1297 targets = None
1280 1298 if targets is not None:
1281 1299 targets = self._build_targets(targets)[1]
1282 1300 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1283 1301
1284 1302 def direct_view(self, targets='all'):
1285 1303 """construct a DirectView object.
1286 1304
1287 1305 If no targets are specified, create a DirectView using all engines.
1288 1306
1289 1307 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1290 1308 evaluate the target engines at each execution, whereas rc[:] will connect to
1291 1309 all *current* engines, and that list will not change.
1292 1310
1293 1311 That is, 'all' will always use all engines, whereas rc[:] will not use
1294 1312 engines added after the DirectView is constructed.
1295 1313
1296 1314 Parameters
1297 1315 ----------
1298 1316
1299 1317 targets: list,slice,int,etc. [default: use all engines]
1300 1318 The engines to use for the View
1301 1319 """
1302 1320 single = isinstance(targets, int)
1303 1321 # allow 'all' to be lazily evaluated at each execution
1304 1322 if targets != 'all':
1305 1323 targets = self._build_targets(targets)[1]
1306 1324 if single:
1307 1325 targets = targets[0]
1308 1326 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1309 1327
1310 1328 #--------------------------------------------------------------------------
1311 1329 # Query methods
1312 1330 #--------------------------------------------------------------------------
1313 1331
1314 1332 @spin_first
1315 1333 def get_result(self, indices_or_msg_ids=None, block=None):
1316 1334 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1317 1335
1318 1336 If the client already has the results, no request to the Hub will be made.
1319 1337
1320 1338 This is a convenient way to construct AsyncResult objects, which are wrappers
1321 1339 that include metadata about execution, and allow for awaiting results that
1322 1340 were not submitted by this Client.
1323 1341
1324 1342 It can also be a convenient way to retrieve the metadata associated with
1325 1343 blocking execution, since it always retrieves
1326 1344
1327 1345 Examples
1328 1346 --------
1329 1347 ::
1330 1348
1331 1349 In [10]: r = client.apply()
1332 1350
1333 1351 Parameters
1334 1352 ----------
1335 1353
1336 1354 indices_or_msg_ids : integer history index, str msg_id, or list of either
1337 1355 The indices or msg_ids of indices to be retrieved
1338 1356
1339 1357 block : bool
1340 1358 Whether to wait for the result to be done
1341 1359
1342 1360 Returns
1343 1361 -------
1344 1362
1345 1363 AsyncResult
1346 1364 A single AsyncResult object will always be returned.
1347 1365
1348 1366 AsyncHubResult
1349 1367 A subclass of AsyncResult that retrieves results from the Hub
1350 1368
1351 1369 """
1352 1370 block = self.block if block is None else block
1353 1371 if indices_or_msg_ids is None:
1354 1372 indices_or_msg_ids = -1
1355 1373
1356 1374 if not isinstance(indices_or_msg_ids, (list,tuple)):
1357 1375 indices_or_msg_ids = [indices_or_msg_ids]
1358 1376
1359 1377 theids = []
1360 1378 for id in indices_or_msg_ids:
1361 1379 if isinstance(id, int):
1362 1380 id = self.history[id]
1363 1381 if not isinstance(id, basestring):
1364 1382 raise TypeError("indices must be str or int, not %r"%id)
1365 1383 theids.append(id)
1366 1384
1367 1385 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1368 1386 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1369 1387
1370 1388 if remote_ids:
1371 1389 ar = AsyncHubResult(self, msg_ids=theids)
1372 1390 else:
1373 1391 ar = AsyncResult(self, msg_ids=theids)
1374 1392
1375 1393 if block:
1376 1394 ar.wait()
1377 1395
1378 1396 return ar
1379 1397
1380 1398 @spin_first
1381 1399 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1382 1400 """Resubmit one or more tasks.
1383 1401
1384 1402 in-flight tasks may not be resubmitted.
1385 1403
1386 1404 Parameters
1387 1405 ----------
1388 1406
1389 1407 indices_or_msg_ids : integer history index, str msg_id, or list of either
1390 1408 The indices or msg_ids of indices to be retrieved
1391 1409
1392 1410 block : bool
1393 1411 Whether to wait for the result to be done
1394 1412
1395 1413 Returns
1396 1414 -------
1397 1415
1398 1416 AsyncHubResult
1399 1417 A subclass of AsyncResult that retrieves results from the Hub
1400 1418
1401 1419 """
1402 1420 block = self.block if block is None else block
1403 1421 if indices_or_msg_ids is None:
1404 1422 indices_or_msg_ids = -1
1405 1423
1406 1424 if not isinstance(indices_or_msg_ids, (list,tuple)):
1407 1425 indices_or_msg_ids = [indices_or_msg_ids]
1408 1426
1409 1427 theids = []
1410 1428 for id in indices_or_msg_ids:
1411 1429 if isinstance(id, int):
1412 1430 id = self.history[id]
1413 1431 if not isinstance(id, basestring):
1414 1432 raise TypeError("indices must be str or int, not %r"%id)
1415 1433 theids.append(id)
1416 1434
1417 1435 content = dict(msg_ids = theids)
1418 1436
1419 1437 self.session.send(self._query_socket, 'resubmit_request', content)
1420 1438
1421 1439 zmq.select([self._query_socket], [], [])
1422 1440 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1423 1441 if self.debug:
1424 1442 pprint(msg)
1425 1443 content = msg['content']
1426 1444 if content['status'] != 'ok':
1427 1445 raise self._unwrap_exception(content)
1428 1446 mapping = content['resubmitted']
1429 1447 new_ids = [ mapping[msg_id] for msg_id in theids ]
1430 1448
1431 1449 ar = AsyncHubResult(self, msg_ids=new_ids)
1432 1450
1433 1451 if block:
1434 1452 ar.wait()
1435 1453
1436 1454 return ar
1437 1455
1438 1456 @spin_first
1439 1457 def result_status(self, msg_ids, status_only=True):
1440 1458 """Check on the status of the result(s) of the apply request with `msg_ids`.
1441 1459
1442 1460 If status_only is False, then the actual results will be retrieved, else
1443 1461 only the status of the results will be checked.
1444 1462
1445 1463 Parameters
1446 1464 ----------
1447 1465
1448 1466 msg_ids : list of msg_ids
1449 1467 if int:
1450 1468 Passed as index to self.history for convenience.
1451 1469 status_only : bool (default: True)
1452 1470 if False:
1453 1471 Retrieve the actual results of completed tasks.
1454 1472
1455 1473 Returns
1456 1474 -------
1457 1475
1458 1476 results : dict
1459 1477 There will always be the keys 'pending' and 'completed', which will
1460 1478 be lists of msg_ids that are incomplete or complete. If `status_only`
1461 1479 is False, then completed results will be keyed by their `msg_id`.
1462 1480 """
1463 1481 if not isinstance(msg_ids, (list,tuple)):
1464 1482 msg_ids = [msg_ids]
1465 1483
1466 1484 theids = []
1467 1485 for msg_id in msg_ids:
1468 1486 if isinstance(msg_id, int):
1469 1487 msg_id = self.history[msg_id]
1470 1488 if not isinstance(msg_id, basestring):
1471 1489 raise TypeError("msg_ids must be str, not %r"%msg_id)
1472 1490 theids.append(msg_id)
1473 1491
1474 1492 completed = []
1475 1493 local_results = {}
1476 1494
1477 1495 # comment this block out to temporarily disable local shortcut:
1478 1496 for msg_id in theids:
1479 1497 if msg_id in self.results:
1480 1498 completed.append(msg_id)
1481 1499 local_results[msg_id] = self.results[msg_id]
1482 1500 theids.remove(msg_id)
1483 1501
1484 1502 if theids: # some not locally cached
1485 1503 content = dict(msg_ids=theids, status_only=status_only)
1486 1504 msg = self.session.send(self._query_socket, "result_request", content=content)
1487 1505 zmq.select([self._query_socket], [], [])
1488 1506 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1489 1507 if self.debug:
1490 1508 pprint(msg)
1491 1509 content = msg['content']
1492 1510 if content['status'] != 'ok':
1493 1511 raise self._unwrap_exception(content)
1494 1512 buffers = msg['buffers']
1495 1513 else:
1496 1514 content = dict(completed=[],pending=[])
1497 1515
1498 1516 content['completed'].extend(completed)
1499 1517
1500 1518 if status_only:
1501 1519 return content
1502 1520
1503 1521 failures = []
1504 1522 # load cached results into result:
1505 1523 content.update(local_results)
1506 1524
1507 1525 # update cache with results:
1508 1526 for msg_id in sorted(theids):
1509 1527 if msg_id in content['completed']:
1510 1528 rec = content[msg_id]
1511 1529 parent = rec['header']
1512 1530 header = rec['result_header']
1513 1531 rcontent = rec['result_content']
1514 1532 iodict = rec['io']
1515 1533 if isinstance(rcontent, str):
1516 1534 rcontent = self.session.unpack(rcontent)
1517 1535
1518 1536 md = self.metadata[msg_id]
1519 1537 md.update(self._extract_metadata(header, parent, rcontent))
1520 1538 if rec.get('received'):
1521 1539 md['received'] = rec['received']
1522 1540 md.update(iodict)
1523 1541
1524 1542 if rcontent['status'] == 'ok':
1525 1543 res,buffers = util.unserialize_object(buffers)
1526 1544 else:
1527 1545 print rcontent
1528 1546 res = self._unwrap_exception(rcontent)
1529 1547 failures.append(res)
1530 1548
1531 1549 self.results[msg_id] = res
1532 1550 content[msg_id] = res
1533 1551
1534 1552 if len(theids) == 1 and failures:
1535 1553 raise failures[0]
1536 1554
1537 1555 error.collect_exceptions(failures, "result_status")
1538 1556 return content
1539 1557
1540 1558 @spin_first
1541 1559 def queue_status(self, targets='all', verbose=False):
1542 1560 """Fetch the status of engine queues.
1543 1561
1544 1562 Parameters
1545 1563 ----------
1546 1564
1547 1565 targets : int/str/list of ints/strs
1548 1566 the engines whose states are to be queried.
1549 1567 default : all
1550 1568 verbose : bool
1551 1569 Whether to return lengths only, or lists of ids for each element
1552 1570 """
1553 1571 if targets == 'all':
1554 1572 # allow 'all' to be evaluated on the engine
1555 1573 engine_ids = None
1556 1574 else:
1557 1575 engine_ids = self._build_targets(targets)[1]
1558 1576 content = dict(targets=engine_ids, verbose=verbose)
1559 1577 self.session.send(self._query_socket, "queue_request", content=content)
1560 1578 idents,msg = self.session.recv(self._query_socket, 0)
1561 1579 if self.debug:
1562 1580 pprint(msg)
1563 1581 content = msg['content']
1564 1582 status = content.pop('status')
1565 1583 if status != 'ok':
1566 1584 raise self._unwrap_exception(content)
1567 1585 content = rekey(content)
1568 1586 if isinstance(targets, int):
1569 1587 return content[targets]
1570 1588 else:
1571 1589 return content
1572 1590
1573 1591 @spin_first
1574 1592 def purge_results(self, jobs=[], targets=[]):
1575 1593 """Tell the Hub to forget results.
1576 1594
1577 1595 Individual results can be purged by msg_id, or the entire
1578 1596 history of specific targets can be purged.
1579 1597
1580 1598 Use `purge_results('all')` to scrub everything from the Hub's db.
1581 1599
1582 1600 Parameters
1583 1601 ----------
1584 1602
1585 1603 jobs : str or list of str or AsyncResult objects
1586 1604 the msg_ids whose results should be forgotten.
1587 1605 targets : int/str/list of ints/strs
1588 1606 The targets, by int_id, whose entire history is to be purged.
1589 1607
1590 1608 default : None
1591 1609 """
1592 1610 if not targets and not jobs:
1593 1611 raise ValueError("Must specify at least one of `targets` and `jobs`")
1594 1612 if targets:
1595 1613 targets = self._build_targets(targets)[1]
1596 1614
1597 1615 # construct msg_ids from jobs
1598 1616 if jobs == 'all':
1599 1617 msg_ids = jobs
1600 1618 else:
1601 1619 msg_ids = []
1602 1620 if isinstance(jobs, (basestring,AsyncResult)):
1603 1621 jobs = [jobs]
1604 1622 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1605 1623 if bad_ids:
1606 1624 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1607 1625 for j in jobs:
1608 1626 if isinstance(j, AsyncResult):
1609 1627 msg_ids.extend(j.msg_ids)
1610 1628 else:
1611 1629 msg_ids.append(j)
1612 1630
1613 1631 content = dict(engine_ids=targets, msg_ids=msg_ids)
1614 1632 self.session.send(self._query_socket, "purge_request", content=content)
1615 1633 idents, msg = self.session.recv(self._query_socket, 0)
1616 1634 if self.debug:
1617 1635 pprint(msg)
1618 1636 content = msg['content']
1619 1637 if content['status'] != 'ok':
1620 1638 raise self._unwrap_exception(content)
1621 1639
1622 1640 @spin_first
1623 1641 def hub_history(self):
1624 1642 """Get the Hub's history
1625 1643
1626 1644 Just like the Client, the Hub has a history, which is a list of msg_ids.
1627 1645 This will contain the history of all clients, and, depending on configuration,
1628 1646 may contain history across multiple cluster sessions.
1629 1647
1630 1648 Any msg_id returned here is a valid argument to `get_result`.
1631 1649
1632 1650 Returns
1633 1651 -------
1634 1652
1635 1653 msg_ids : list of strs
1636 1654 list of all msg_ids, ordered by task submission time.
1637 1655 """
1638 1656
1639 1657 self.session.send(self._query_socket, "history_request", content={})
1640 1658 idents, msg = self.session.recv(self._query_socket, 0)
1641 1659
1642 1660 if self.debug:
1643 1661 pprint(msg)
1644 1662 content = msg['content']
1645 1663 if content['status'] != 'ok':
1646 1664 raise self._unwrap_exception(content)
1647 1665 else:
1648 1666 return content['history']
1649 1667
1650 1668 @spin_first
1651 1669 def db_query(self, query, keys=None):
1652 1670 """Query the Hub's TaskRecord database
1653 1671
1654 1672 This will return a list of task record dicts that match `query`
1655 1673
1656 1674 Parameters
1657 1675 ----------
1658 1676
1659 1677 query : mongodb query dict
1660 1678 The search dict. See mongodb query docs for details.
1661 1679 keys : list of strs [optional]
1662 1680 The subset of keys to be returned. The default is to fetch everything but buffers.
1663 1681 'msg_id' will *always* be included.
1664 1682 """
1665 1683 if isinstance(keys, basestring):
1666 1684 keys = [keys]
1667 1685 content = dict(query=query, keys=keys)
1668 1686 self.session.send(self._query_socket, "db_request", content=content)
1669 1687 idents, msg = self.session.recv(self._query_socket, 0)
1670 1688 if self.debug:
1671 1689 pprint(msg)
1672 1690 content = msg['content']
1673 1691 if content['status'] != 'ok':
1674 1692 raise self._unwrap_exception(content)
1675 1693
1676 1694 records = content['records']
1677 1695
1678 1696 buffer_lens = content['buffer_lens']
1679 1697 result_buffer_lens = content['result_buffer_lens']
1680 1698 buffers = msg['buffers']
1681 1699 has_bufs = buffer_lens is not None
1682 1700 has_rbufs = result_buffer_lens is not None
1683 1701 for i,rec in enumerate(records):
1684 1702 # relink buffers
1685 1703 if has_bufs:
1686 1704 blen = buffer_lens[i]
1687 1705 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1688 1706 if has_rbufs:
1689 1707 blen = result_buffer_lens[i]
1690 1708 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1691 1709
1692 1710 return records
1693 1711
1694 1712 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now