##// END OF EJS Templates
ignore IOPub messages with no parent....
MinRK -
Show More
@@ -1,1444 +1,1448 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 import time
22 22 import warnings
23 23 from datetime import datetime
24 24 from getpass import getpass
25 25 from pprint import pprint
26 26
27 27 pjoin = os.path.join
28 28
29 29 import zmq
30 30 # from zmq.eventloop import ioloop, zmqstream
31 31
32 32 from IPython.config.configurable import MultipleInstanceError
33 33 from IPython.core.application import BaseIPythonApplication
34 34
35 35 from IPython.utils.jsonutil import rekey
36 36 from IPython.utils.localinterfaces import LOCAL_IPS
37 37 from IPython.utils.path import get_ipython_dir
38 38 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 39 Dict, List, Bool, Set)
40 40 from IPython.external.decorator import decorator
41 41 from IPython.external.ssh import tunnel
42 42
43 43 from IPython.parallel import Reference
44 44 from IPython.parallel import error
45 45 from IPython.parallel import util
46 46
47 47 from IPython.zmq.session import Session, Message
48 48
49 49 from .asyncresult import AsyncResult, AsyncHubResult
50 50 from IPython.core.profiledir import ProfileDir, ProfileDirError
51 51 from .view import DirectView, LoadBalancedView
52 52
53 53 if sys.version_info[0] >= 3:
54 54 # xrange is used in a couple 'isinstance' tests in py2
55 55 # should be just 'range' in 3k
56 56 xrange = range
57 57
58 58 #--------------------------------------------------------------------------
59 59 # Decorators for Client methods
60 60 #--------------------------------------------------------------------------
61 61
62 62 @decorator
63 63 def spin_first(f, self, *args, **kwargs):
64 64 """Call spin() to sync state prior to calling the method."""
65 65 self.spin()
66 66 return f(self, *args, **kwargs)
67 67
68 68
69 69 #--------------------------------------------------------------------------
70 70 # Classes
71 71 #--------------------------------------------------------------------------
72 72
73 73 class Metadata(dict):
74 74 """Subclass of dict for initializing metadata values.
75 75
76 76 Attribute access works on keys.
77 77
78 78 These objects have a strict set of keys - errors will raise if you try
79 79 to add new keys.
80 80 """
81 81 def __init__(self, *args, **kwargs):
82 82 dict.__init__(self)
83 83 md = {'msg_id' : None,
84 84 'submitted' : None,
85 85 'started' : None,
86 86 'completed' : None,
87 87 'received' : None,
88 88 'engine_uuid' : None,
89 89 'engine_id' : None,
90 90 'follow' : None,
91 91 'after' : None,
92 92 'status' : None,
93 93
94 94 'pyin' : None,
95 95 'pyout' : None,
96 96 'pyerr' : None,
97 97 'stdout' : '',
98 98 'stderr' : '',
99 99 }
100 100 self.update(md)
101 101 self.update(dict(*args, **kwargs))
102 102
103 103 def __getattr__(self, key):
104 104 """getattr aliased to getitem"""
105 105 if key in self.iterkeys():
106 106 return self[key]
107 107 else:
108 108 raise AttributeError(key)
109 109
110 110 def __setattr__(self, key, value):
111 111 """setattr aliased to setitem, with strict"""
112 112 if key in self.iterkeys():
113 113 self[key] = value
114 114 else:
115 115 raise AttributeError(key)
116 116
117 117 def __setitem__(self, key, value):
118 118 """strict static key enforcement"""
119 119 if key in self.iterkeys():
120 120 dict.__setitem__(self, key, value)
121 121 else:
122 122 raise KeyError(key)
123 123
124 124
125 125 class Client(HasTraits):
126 126 """A semi-synchronous client to the IPython ZMQ cluster
127 127
128 128 Parameters
129 129 ----------
130 130
131 131 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
132 132 Connection information for the Hub's registration. If a json connector
133 133 file is given, then likely no further configuration is necessary.
134 134 [Default: use profile]
135 135 profile : bytes
136 136 The name of the Cluster profile to be used to find connector information.
137 137 If run from an IPython application, the default profile will be the same
138 138 as the running application, otherwise it will be 'default'.
139 139 context : zmq.Context
140 140 Pass an existing zmq.Context instance, otherwise the client will create its own.
141 141 debug : bool
142 142 flag for lots of message printing for debug purposes
143 143 timeout : int/float
144 144 time (in seconds) to wait for connection replies from the Hub
145 145 [Default: 10]
146 146
147 147 #-------------- session related args ----------------
148 148
149 149 config : Config object
150 150 If specified, this will be relayed to the Session for configuration
151 151 username : str
152 152 set username for the session object
153 153 packer : str (import_string) or callable
154 154 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
155 155 function to serialize messages. Must support same input as
156 156 JSON, and output must be bytes.
157 157 You can pass a callable directly as `pack`
158 158 unpacker : str (import_string) or callable
159 159 The inverse of packer. Only necessary if packer is specified as *not* one
160 160 of 'json' or 'pickle'.
161 161
162 162 #-------------- ssh related args ----------------
163 163 # These are args for configuring the ssh tunnel to be used
164 164 # credentials are used to forward connections over ssh to the Controller
165 165 # Note that the ip given in `addr` needs to be relative to sshserver
166 166 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
167 167 # and set sshserver as the same machine the Controller is on. However,
168 168 # the only requirement is that sshserver is able to see the Controller
169 169 # (i.e. is within the same trusted network).
170 170
171 171 sshserver : str
172 172 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
173 173 If keyfile or password is specified, and this is not, it will default to
174 174 the ip given in addr.
175 175 sshkey : str; path to ssh private key file
176 176 This specifies a key to be used in ssh login, default None.
177 177 Regular default ssh keys will be used without specifying this argument.
178 178 password : str
179 179 Your ssh password to sshserver. Note that if this is left None,
180 180 you will be prompted for it if passwordless key based login is unavailable.
181 181 paramiko : bool
182 182 flag for whether to use paramiko instead of shell ssh for tunneling.
183 183 [default: True on win32, False else]
184 184
185 185 ------- exec authentication args -------
186 186 If even localhost is untrusted, you can have some protection against
187 187 unauthorized execution by signing messages with HMAC digests.
188 188 Messages are still sent as cleartext, so if someone can snoop your
189 189 loopback traffic this will not protect your privacy, but will prevent
190 190 unauthorized execution.
191 191
192 192 exec_key : str
193 193 an authentication key or file containing a key
194 194 default: None
195 195
196 196
197 197 Attributes
198 198 ----------
199 199
200 200 ids : list of int engine IDs
201 201 requesting the ids attribute always synchronizes
202 202 the registration state. To request ids without synchronization,
203 203 use semi-private _ids attributes.
204 204
205 205 history : list of msg_ids
206 206 a list of msg_ids, keeping track of all the execution
207 207 messages you have submitted in order.
208 208
209 209 outstanding : set of msg_ids
210 210 a set of msg_ids that have been submitted, but whose
211 211 results have not yet been received.
212 212
213 213 results : dict
214 214 a dict of all our results, keyed by msg_id
215 215
216 216 block : bool
217 217 determines default behavior when block not specified
218 218 in execution methods
219 219
220 220 Methods
221 221 -------
222 222
223 223 spin
224 224 flushes incoming results and registration state changes
225 225 control methods spin, and requesting `ids` also ensures up to date
226 226
227 227 wait
228 228 wait on one or more msg_ids
229 229
230 230 execution methods
231 231 apply
232 232 legacy: execute, run
233 233
234 234 data movement
235 235 push, pull, scatter, gather
236 236
237 237 query methods
238 238 queue_status, get_result, purge, result_status
239 239
240 240 control methods
241 241 abort, shutdown
242 242
243 243 """
244 244
245 245
246 246 block = Bool(False)
247 247 outstanding = Set()
248 248 results = Instance('collections.defaultdict', (dict,))
249 249 metadata = Instance('collections.defaultdict', (Metadata,))
250 250 history = List()
251 251 debug = Bool(False)
252 252
253 253 profile=Unicode()
254 254 def _profile_default(self):
255 255 if BaseIPythonApplication.initialized():
256 256 # an IPython app *might* be running, try to get its profile
257 257 try:
258 258 return BaseIPythonApplication.instance().profile
259 259 except (AttributeError, MultipleInstanceError):
260 260 # could be a *different* subclass of config.Application,
261 261 # which would raise one of these two errors.
262 262 return u'default'
263 263 else:
264 264 return u'default'
265 265
266 266
267 267 _outstanding_dict = Instance('collections.defaultdict', (set,))
268 268 _ids = List()
269 269 _connected=Bool(False)
270 270 _ssh=Bool(False)
271 271 _context = Instance('zmq.Context')
272 272 _config = Dict()
273 273 _engines=Instance(util.ReverseDict, (), {})
274 274 # _hub_socket=Instance('zmq.Socket')
275 275 _query_socket=Instance('zmq.Socket')
276 276 _control_socket=Instance('zmq.Socket')
277 277 _iopub_socket=Instance('zmq.Socket')
278 278 _notification_socket=Instance('zmq.Socket')
279 279 _mux_socket=Instance('zmq.Socket')
280 280 _task_socket=Instance('zmq.Socket')
281 281 _task_scheme=Unicode()
282 282 _closed = False
283 283 _ignored_control_replies=Integer(0)
284 284 _ignored_hub_replies=Integer(0)
285 285
286 286 def __new__(self, *args, **kw):
287 287 # don't raise on positional args
288 288 return HasTraits.__new__(self, **kw)
289 289
290 290 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
291 291 context=None, debug=False, exec_key=None,
292 292 sshserver=None, sshkey=None, password=None, paramiko=None,
293 293 timeout=10, **extra_args
294 294 ):
295 295 if profile:
296 296 super(Client, self).__init__(debug=debug, profile=profile)
297 297 else:
298 298 super(Client, self).__init__(debug=debug)
299 299 if context is None:
300 300 context = zmq.Context.instance()
301 301 self._context = context
302 302
303 303 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
304 304 if self._cd is not None:
305 305 if url_or_file is None:
306 306 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
307 307 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
308 308 " Please specify at least one of url_or_file or profile."
309 309
310 310 if not util.is_url(url_or_file):
311 311 # it's not a url, try for a file
312 312 if not os.path.exists(url_or_file):
313 313 if self._cd:
314 314 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
315 315 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
316 316 with open(url_or_file) as f:
317 317 cfg = json.loads(f.read())
318 318 else:
319 319 cfg = {'url':url_or_file}
320 320
321 321 # sync defaults from args, json:
322 322 if sshserver:
323 323 cfg['ssh'] = sshserver
324 324 if exec_key:
325 325 cfg['exec_key'] = exec_key
326 326 exec_key = cfg['exec_key']
327 327 location = cfg.setdefault('location', None)
328 328 cfg['url'] = util.disambiguate_url(cfg['url'], location)
329 329 url = cfg['url']
330 330 proto,addr,port = util.split_url(url)
331 331 if location is not None and addr == '127.0.0.1':
332 332 # location specified, and connection is expected to be local
333 333 if location not in LOCAL_IPS and not sshserver:
334 334 # load ssh from JSON *only* if the controller is not on
335 335 # this machine
336 336 sshserver=cfg['ssh']
337 337 if location not in LOCAL_IPS and not sshserver:
338 338 # warn if no ssh specified, but SSH is probably needed
339 339 # This is only a warning, because the most likely cause
340 340 # is a local Controller on a laptop whose IP is dynamic
341 341 warnings.warn("""
342 342 Controller appears to be listening on localhost, but not on this machine.
343 343 If this is true, you should specify Client(...,sshserver='you@%s')
344 344 or instruct your controller to listen on an external IP."""%location,
345 345 RuntimeWarning)
346 346 elif not sshserver:
347 347 # otherwise sync with cfg
348 348 sshserver = cfg['ssh']
349 349
350 350 self._config = cfg
351 351
352 352 self._ssh = bool(sshserver or sshkey or password)
353 353 if self._ssh and sshserver is None:
354 354 # default to ssh via localhost
355 355 sshserver = url.split('://')[1].split(':')[0]
356 356 if self._ssh and password is None:
357 357 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
358 358 password=False
359 359 else:
360 360 password = getpass("SSH Password for %s: "%sshserver)
361 361 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
362 362
363 363 # configure and construct the session
364 364 if exec_key is not None:
365 365 if os.path.isfile(exec_key):
366 366 extra_args['keyfile'] = exec_key
367 367 else:
368 368 exec_key = util.asbytes(exec_key)
369 369 extra_args['key'] = exec_key
370 370 self.session = Session(**extra_args)
371 371
372 372 self._query_socket = self._context.socket(zmq.DEALER)
373 373 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
374 374 if self._ssh:
375 375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
376 376 else:
377 377 self._query_socket.connect(url)
378 378
379 379 self.session.debug = self.debug
380 380
381 381 self._notification_handlers = {'registration_notification' : self._register_engine,
382 382 'unregistration_notification' : self._unregister_engine,
383 383 'shutdown_notification' : lambda msg: self.close(),
384 384 }
385 385 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
386 386 'apply_reply' : self._handle_apply_reply}
387 387 self._connect(sshserver, ssh_kwargs, timeout)
388 388
389 389 def __del__(self):
390 390 """cleanup sockets, but _not_ context."""
391 391 self.close()
392 392
393 393 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
394 394 if ipython_dir is None:
395 395 ipython_dir = get_ipython_dir()
396 396 if profile_dir is not None:
397 397 try:
398 398 self._cd = ProfileDir.find_profile_dir(profile_dir)
399 399 return
400 400 except ProfileDirError:
401 401 pass
402 402 elif profile is not None:
403 403 try:
404 404 self._cd = ProfileDir.find_profile_dir_by_name(
405 405 ipython_dir, profile)
406 406 return
407 407 except ProfileDirError:
408 408 pass
409 409 self._cd = None
410 410
411 411 def _update_engines(self, engines):
412 412 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
413 413 for k,v in engines.iteritems():
414 414 eid = int(k)
415 415 self._engines[eid] = v
416 416 self._ids.append(eid)
417 417 self._ids = sorted(self._ids)
418 418 if sorted(self._engines.keys()) != range(len(self._engines)) and \
419 419 self._task_scheme == 'pure' and self._task_socket:
420 420 self._stop_scheduling_tasks()
421 421
422 422 def _stop_scheduling_tasks(self):
423 423 """Stop scheduling tasks because an engine has been unregistered
424 424 from a pure ZMQ scheduler.
425 425 """
426 426 self._task_socket.close()
427 427 self._task_socket = None
428 428 msg = "An engine has been unregistered, and we are using pure " +\
429 429 "ZMQ task scheduling. Task farming will be disabled."
430 430 if self.outstanding:
431 431 msg += " If you were running tasks when this happened, " +\
432 432 "some `outstanding` msg_ids may never resolve."
433 433 warnings.warn(msg, RuntimeWarning)
434 434
435 435 def _build_targets(self, targets):
436 436 """Turn valid target IDs or 'all' into two lists:
437 437 (int_ids, uuids).
438 438 """
439 439 if not self._ids:
440 440 # flush notification socket if no engines yet, just in case
441 441 if not self.ids:
442 442 raise error.NoEnginesRegistered("Can't build targets without any engines")
443 443
444 444 if targets is None:
445 445 targets = self._ids
446 446 elif isinstance(targets, basestring):
447 447 if targets.lower() == 'all':
448 448 targets = self._ids
449 449 else:
450 450 raise TypeError("%r not valid str target, must be 'all'"%(targets))
451 451 elif isinstance(targets, int):
452 452 if targets < 0:
453 453 targets = self.ids[targets]
454 454 if targets not in self._ids:
455 455 raise IndexError("No such engine: %i"%targets)
456 456 targets = [targets]
457 457
458 458 if isinstance(targets, slice):
459 459 indices = range(len(self._ids))[targets]
460 460 ids = self.ids
461 461 targets = [ ids[i] for i in indices ]
462 462
463 463 if not isinstance(targets, (tuple, list, xrange)):
464 464 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
465 465
466 466 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
467 467
468 468 def _connect(self, sshserver, ssh_kwargs, timeout):
469 469 """setup all our socket connections to the cluster. This is called from
470 470 __init__."""
471 471
472 472 # Maybe allow reconnecting?
473 473 if self._connected:
474 474 return
475 475 self._connected=True
476 476
477 477 def connect_socket(s, url):
478 478 url = util.disambiguate_url(url, self._config['location'])
479 479 if self._ssh:
480 480 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
481 481 else:
482 482 return s.connect(url)
483 483
484 484 self.session.send(self._query_socket, 'connection_request')
485 485 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
486 486 poller = zmq.Poller()
487 487 poller.register(self._query_socket, zmq.POLLIN)
488 488 # poll expects milliseconds, timeout is seconds
489 489 evts = poller.poll(timeout*1000)
490 490 if not evts:
491 491 raise error.TimeoutError("Hub connection request timed out")
492 492 idents,msg = self.session.recv(self._query_socket,mode=0)
493 493 if self.debug:
494 494 pprint(msg)
495 495 msg = Message(msg)
496 496 content = msg.content
497 497 self._config['registration'] = dict(content)
498 498 if content.status == 'ok':
499 499 ident = self.session.bsession
500 500 if content.mux:
501 501 self._mux_socket = self._context.socket(zmq.DEALER)
502 502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
503 503 connect_socket(self._mux_socket, content.mux)
504 504 if content.task:
505 505 self._task_scheme, task_addr = content.task
506 506 self._task_socket = self._context.socket(zmq.DEALER)
507 507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
508 508 connect_socket(self._task_socket, task_addr)
509 509 if content.notification:
510 510 self._notification_socket = self._context.socket(zmq.SUB)
511 511 connect_socket(self._notification_socket, content.notification)
512 512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
513 513 # if content.query:
514 514 # self._query_socket = self._context.socket(zmq.DEALER)
515 515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
516 516 # connect_socket(self._query_socket, content.query)
517 517 if content.control:
518 518 self._control_socket = self._context.socket(zmq.DEALER)
519 519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
520 520 connect_socket(self._control_socket, content.control)
521 521 if content.iopub:
522 522 self._iopub_socket = self._context.socket(zmq.SUB)
523 523 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
524 524 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
525 525 connect_socket(self._iopub_socket, content.iopub)
526 526 self._update_engines(dict(content.engines))
527 527 else:
528 528 self._connected = False
529 529 raise Exception("Failed to connect!")
530 530
531 531 #--------------------------------------------------------------------------
532 532 # handlers and callbacks for incoming messages
533 533 #--------------------------------------------------------------------------
534 534
535 535 def _unwrap_exception(self, content):
536 536 """unwrap exception, and remap engine_id to int."""
537 537 e = error.unwrap_exception(content)
538 538 # print e.traceback
539 539 if e.engine_info:
540 540 e_uuid = e.engine_info['engine_uuid']
541 541 eid = self._engines[e_uuid]
542 542 e.engine_info['engine_id'] = eid
543 543 return e
544 544
545 545 def _extract_metadata(self, header, parent, content):
546 546 md = {'msg_id' : parent['msg_id'],
547 547 'received' : datetime.now(),
548 548 'engine_uuid' : header.get('engine', None),
549 549 'follow' : parent.get('follow', []),
550 550 'after' : parent.get('after', []),
551 551 'status' : content['status'],
552 552 }
553 553
554 554 if md['engine_uuid'] is not None:
555 555 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
556 556
557 557 if 'date' in parent:
558 558 md['submitted'] = parent['date']
559 559 if 'started' in header:
560 560 md['started'] = header['started']
561 561 if 'date' in header:
562 562 md['completed'] = header['date']
563 563 return md
564 564
565 565 def _register_engine(self, msg):
566 566 """Register a new engine, and update our connection info."""
567 567 content = msg['content']
568 568 eid = content['id']
569 569 d = {eid : content['queue']}
570 570 self._update_engines(d)
571 571
572 572 def _unregister_engine(self, msg):
573 573 """Unregister an engine that has died."""
574 574 content = msg['content']
575 575 eid = int(content['id'])
576 576 if eid in self._ids:
577 577 self._ids.remove(eid)
578 578 uuid = self._engines.pop(eid)
579 579
580 580 self._handle_stranded_msgs(eid, uuid)
581 581
582 582 if self._task_socket and self._task_scheme == 'pure':
583 583 self._stop_scheduling_tasks()
584 584
585 585 def _handle_stranded_msgs(self, eid, uuid):
586 586 """Handle messages known to be on an engine when the engine unregisters.
587 587
588 588 It is possible that this will fire prematurely - that is, an engine will
589 589 go down after completing a result, and the client will be notified
590 590 of the unregistration and later receive the successful result.
591 591 """
592 592
593 593 outstanding = self._outstanding_dict[uuid]
594 594
595 595 for msg_id in list(outstanding):
596 596 if msg_id in self.results:
597 597 # we already
598 598 continue
599 599 try:
600 600 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
601 601 except:
602 602 content = error.wrap_exception()
603 603 # build a fake message:
604 604 parent = {}
605 605 header = {}
606 606 parent['msg_id'] = msg_id
607 607 header['engine'] = uuid
608 608 header['date'] = datetime.now()
609 609 msg = dict(parent_header=parent, header=header, content=content)
610 610 self._handle_apply_reply(msg)
611 611
612 612 def _handle_execute_reply(self, msg):
613 613 """Save the reply to an execute_request into our results.
614 614
615 615 execute messages are never actually used. apply is used instead.
616 616 """
617 617
618 618 parent = msg['parent_header']
619 619 msg_id = parent['msg_id']
620 620 if msg_id not in self.outstanding:
621 621 if msg_id in self.history:
622 622 print ("got stale result: %s"%msg_id)
623 623 else:
624 624 print ("got unknown result: %s"%msg_id)
625 625 else:
626 626 self.outstanding.remove(msg_id)
627 627 self.results[msg_id] = self._unwrap_exception(msg['content'])
628 628
629 629 def _handle_apply_reply(self, msg):
630 630 """Save the reply to an apply_request into our results."""
631 631 parent = msg['parent_header']
632 632 msg_id = parent['msg_id']
633 633 if msg_id not in self.outstanding:
634 634 if msg_id in self.history:
635 635 print ("got stale result: %s"%msg_id)
636 636 print self.results[msg_id]
637 637 print msg
638 638 else:
639 639 print ("got unknown result: %s"%msg_id)
640 640 else:
641 641 self.outstanding.remove(msg_id)
642 642 content = msg['content']
643 643 header = msg['header']
644 644
645 645 # construct metadata:
646 646 md = self.metadata[msg_id]
647 647 md.update(self._extract_metadata(header, parent, content))
648 648 # is this redundant?
649 649 self.metadata[msg_id] = md
650 650
651 651 e_outstanding = self._outstanding_dict[md['engine_uuid']]
652 652 if msg_id in e_outstanding:
653 653 e_outstanding.remove(msg_id)
654 654
655 655 # construct result:
656 656 if content['status'] == 'ok':
657 657 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
658 658 elif content['status'] == 'aborted':
659 659 self.results[msg_id] = error.TaskAborted(msg_id)
660 660 elif content['status'] == 'resubmitted':
661 661 # TODO: handle resubmission
662 662 pass
663 663 else:
664 664 self.results[msg_id] = self._unwrap_exception(content)
665 665
666 666 def _flush_notifications(self):
667 667 """Flush notifications of engine registrations waiting
668 668 in ZMQ queue."""
669 669 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
670 670 while msg is not None:
671 671 if self.debug:
672 672 pprint(msg)
673 673 msg_type = msg['header']['msg_type']
674 674 handler = self._notification_handlers.get(msg_type, None)
675 675 if handler is None:
676 676 raise Exception("Unhandled message type: %s"%msg.msg_type)
677 677 else:
678 678 handler(msg)
679 679 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
680 680
681 681 def _flush_results(self, sock):
682 682 """Flush task or queue results waiting in ZMQ queue."""
683 683 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
684 684 while msg is not None:
685 685 if self.debug:
686 686 pprint(msg)
687 687 msg_type = msg['header']['msg_type']
688 688 handler = self._queue_handlers.get(msg_type, None)
689 689 if handler is None:
690 690 raise Exception("Unhandled message type: %s"%msg.msg_type)
691 691 else:
692 692 handler(msg)
693 693 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
694 694
695 695 def _flush_control(self, sock):
696 696 """Flush replies from the control channel waiting
697 697 in the ZMQ queue.
698 698
699 699 Currently: ignore them."""
700 700 if self._ignored_control_replies <= 0:
701 701 return
702 702 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
703 703 while msg is not None:
704 704 self._ignored_control_replies -= 1
705 705 if self.debug:
706 706 pprint(msg)
707 707 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
708 708
709 709 def _flush_ignored_control(self):
710 710 """flush ignored control replies"""
711 711 while self._ignored_control_replies > 0:
712 712 self.session.recv(self._control_socket)
713 713 self._ignored_control_replies -= 1
714 714
715 715 def _flush_ignored_hub_replies(self):
716 716 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
717 717 while msg is not None:
718 718 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
719 719
720 720 def _flush_iopub(self, sock):
721 721 """Flush replies from the iopub channel waiting
722 722 in the ZMQ queue.
723 723 """
724 724 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
725 725 while msg is not None:
726 726 if self.debug:
727 727 pprint(msg)
728 728 parent = msg['parent_header']
729 # ignore IOPub messages with no parent.
730 # Caused by print statements or warnings from before the first execution.
731 if not parent:
732 continue
729 733 msg_id = parent['msg_id']
730 734 content = msg['content']
731 735 header = msg['header']
732 736 msg_type = msg['header']['msg_type']
733 737
734 738 # init metadata:
735 739 md = self.metadata[msg_id]
736 740
737 741 if msg_type == 'stream':
738 742 name = content['name']
739 743 s = md[name] or ''
740 744 md[name] = s + content['data']
741 745 elif msg_type == 'pyerr':
742 746 md.update({'pyerr' : self._unwrap_exception(content)})
743 747 elif msg_type == 'pyin':
744 748 md.update({'pyin' : content['code']})
745 749 else:
746 750 md.update({msg_type : content.get('data', '')})
747 751
748 752 # reduntant?
749 753 self.metadata[msg_id] = md
750 754
751 755 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
752 756
753 757 #--------------------------------------------------------------------------
754 758 # len, getitem
755 759 #--------------------------------------------------------------------------
756 760
757 761 def __len__(self):
758 762 """len(client) returns # of engines."""
759 763 return len(self.ids)
760 764
761 765 def __getitem__(self, key):
762 766 """index access returns DirectView multiplexer objects
763 767
764 768 Must be int, slice, or list/tuple/xrange of ints"""
765 769 if not isinstance(key, (int, slice, tuple, list, xrange)):
766 770 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
767 771 else:
768 772 return self.direct_view(key)
769 773
770 774 #--------------------------------------------------------------------------
771 775 # Begin public methods
772 776 #--------------------------------------------------------------------------
773 777
774 778 @property
775 779 def ids(self):
776 780 """Always up-to-date ids property."""
777 781 self._flush_notifications()
778 782 # always copy:
779 783 return list(self._ids)
780 784
781 785 def close(self):
782 786 if self._closed:
783 787 return
784 788 snames = filter(lambda n: n.endswith('socket'), dir(self))
785 789 for socket in map(lambda name: getattr(self, name), snames):
786 790 if isinstance(socket, zmq.Socket) and not socket.closed:
787 791 socket.close()
788 792 self._closed = True
789 793
790 794 def spin(self):
791 795 """Flush any registration notifications and execution results
792 796 waiting in the ZMQ queue.
793 797 """
794 798 if self._notification_socket:
795 799 self._flush_notifications()
796 800 if self._mux_socket:
797 801 self._flush_results(self._mux_socket)
798 802 if self._task_socket:
799 803 self._flush_results(self._task_socket)
800 804 if self._control_socket:
801 805 self._flush_control(self._control_socket)
802 806 if self._iopub_socket:
803 807 self._flush_iopub(self._iopub_socket)
804 808 if self._query_socket:
805 809 self._flush_ignored_hub_replies()
806 810
807 811 def wait(self, jobs=None, timeout=-1):
808 812 """waits on one or more `jobs`, for up to `timeout` seconds.
809 813
810 814 Parameters
811 815 ----------
812 816
813 817 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
814 818 ints are indices to self.history
815 819 strs are msg_ids
816 820 default: wait on all outstanding messages
817 821 timeout : float
818 822 a time in seconds, after which to give up.
819 823 default is -1, which means no timeout
820 824
821 825 Returns
822 826 -------
823 827
824 828 True : when all msg_ids are done
825 829 False : timeout reached, some msg_ids still outstanding
826 830 """
827 831 tic = time.time()
828 832 if jobs is None:
829 833 theids = self.outstanding
830 834 else:
831 835 if isinstance(jobs, (int, basestring, AsyncResult)):
832 836 jobs = [jobs]
833 837 theids = set()
834 838 for job in jobs:
835 839 if isinstance(job, int):
836 840 # index access
837 841 job = self.history[job]
838 842 elif isinstance(job, AsyncResult):
839 843 map(theids.add, job.msg_ids)
840 844 continue
841 845 theids.add(job)
842 846 if not theids.intersection(self.outstanding):
843 847 return True
844 848 self.spin()
845 849 while theids.intersection(self.outstanding):
846 850 if timeout >= 0 and ( time.time()-tic ) > timeout:
847 851 break
848 852 time.sleep(1e-3)
849 853 self.spin()
850 854 return len(theids.intersection(self.outstanding)) == 0
851 855
852 856 #--------------------------------------------------------------------------
853 857 # Control methods
854 858 #--------------------------------------------------------------------------
855 859
856 860 @spin_first
857 861 def clear(self, targets=None, block=None):
858 862 """Clear the namespace in target(s)."""
859 863 block = self.block if block is None else block
860 864 targets = self._build_targets(targets)[0]
861 865 for t in targets:
862 866 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
863 867 error = False
864 868 if block:
865 869 self._flush_ignored_control()
866 870 for i in range(len(targets)):
867 871 idents,msg = self.session.recv(self._control_socket,0)
868 872 if self.debug:
869 873 pprint(msg)
870 874 if msg['content']['status'] != 'ok':
871 875 error = self._unwrap_exception(msg['content'])
872 876 else:
873 877 self._ignored_control_replies += len(targets)
874 878 if error:
875 879 raise error
876 880
877 881
878 882 @spin_first
879 883 def abort(self, jobs=None, targets=None, block=None):
880 884 """Abort specific jobs from the execution queues of target(s).
881 885
882 886 This is a mechanism to prevent jobs that have already been submitted
883 887 from executing.
884 888
885 889 Parameters
886 890 ----------
887 891
888 892 jobs : msg_id, list of msg_ids, or AsyncResult
889 893 The jobs to be aborted
890 894
891 895 If unspecified/None: abort all outstanding jobs.
892 896
893 897 """
894 898 block = self.block if block is None else block
895 899 jobs = jobs if jobs is not None else list(self.outstanding)
896 900 targets = self._build_targets(targets)[0]
897 901
898 902 msg_ids = []
899 903 if isinstance(jobs, (basestring,AsyncResult)):
900 904 jobs = [jobs]
901 905 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
902 906 if bad_ids:
903 907 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
904 908 for j in jobs:
905 909 if isinstance(j, AsyncResult):
906 910 msg_ids.extend(j.msg_ids)
907 911 else:
908 912 msg_ids.append(j)
909 913 content = dict(msg_ids=msg_ids)
910 914 for t in targets:
911 915 self.session.send(self._control_socket, 'abort_request',
912 916 content=content, ident=t)
913 917 error = False
914 918 if block:
915 919 self._flush_ignored_control()
916 920 for i in range(len(targets)):
917 921 idents,msg = self.session.recv(self._control_socket,0)
918 922 if self.debug:
919 923 pprint(msg)
920 924 if msg['content']['status'] != 'ok':
921 925 error = self._unwrap_exception(msg['content'])
922 926 else:
923 927 self._ignored_control_replies += len(targets)
924 928 if error:
925 929 raise error
926 930
927 931 @spin_first
928 932 def shutdown(self, targets=None, restart=False, hub=False, block=None):
929 933 """Terminates one or more engine processes, optionally including the hub."""
930 934 block = self.block if block is None else block
931 935 if hub:
932 936 targets = 'all'
933 937 targets = self._build_targets(targets)[0]
934 938 for t in targets:
935 939 self.session.send(self._control_socket, 'shutdown_request',
936 940 content={'restart':restart},ident=t)
937 941 error = False
938 942 if block or hub:
939 943 self._flush_ignored_control()
940 944 for i in range(len(targets)):
941 945 idents,msg = self.session.recv(self._control_socket, 0)
942 946 if self.debug:
943 947 pprint(msg)
944 948 if msg['content']['status'] != 'ok':
945 949 error = self._unwrap_exception(msg['content'])
946 950 else:
947 951 self._ignored_control_replies += len(targets)
948 952
949 953 if hub:
950 954 time.sleep(0.25)
951 955 self.session.send(self._query_socket, 'shutdown_request')
952 956 idents,msg = self.session.recv(self._query_socket, 0)
953 957 if self.debug:
954 958 pprint(msg)
955 959 if msg['content']['status'] != 'ok':
956 960 error = self._unwrap_exception(msg['content'])
957 961
958 962 if error:
959 963 raise error
960 964
961 965 #--------------------------------------------------------------------------
962 966 # Execution related methods
963 967 #--------------------------------------------------------------------------
964 968
965 969 def _maybe_raise(self, result):
966 970 """wrapper for maybe raising an exception if apply failed."""
967 971 if isinstance(result, error.RemoteError):
968 972 raise result
969 973
970 974 return result
971 975
972 976 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
973 977 ident=None):
974 978 """construct and send an apply message via a socket.
975 979
976 980 This is the principal method with which all engine execution is performed by views.
977 981 """
978 982
979 983 assert not self._closed, "cannot use me anymore, I'm closed!"
980 984 # defaults:
981 985 args = args if args is not None else []
982 986 kwargs = kwargs if kwargs is not None else {}
983 987 subheader = subheader if subheader is not None else {}
984 988
985 989 # validate arguments
986 990 if not callable(f) and not isinstance(f, Reference):
987 991 raise TypeError("f must be callable, not %s"%type(f))
988 992 if not isinstance(args, (tuple, list)):
989 993 raise TypeError("args must be tuple or list, not %s"%type(args))
990 994 if not isinstance(kwargs, dict):
991 995 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
992 996 if not isinstance(subheader, dict):
993 997 raise TypeError("subheader must be dict, not %s"%type(subheader))
994 998
995 999 bufs = util.pack_apply_message(f,args,kwargs)
996 1000
997 1001 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
998 1002 subheader=subheader, track=track)
999 1003
1000 1004 msg_id = msg['header']['msg_id']
1001 1005 self.outstanding.add(msg_id)
1002 1006 if ident:
1003 1007 # possibly routed to a specific engine
1004 1008 if isinstance(ident, list):
1005 1009 ident = ident[-1]
1006 1010 if ident in self._engines.values():
1007 1011 # save for later, in case of engine death
1008 1012 self._outstanding_dict[ident].add(msg_id)
1009 1013 self.history.append(msg_id)
1010 1014 self.metadata[msg_id]['submitted'] = datetime.now()
1011 1015
1012 1016 return msg
1013 1017
1014 1018 #--------------------------------------------------------------------------
1015 1019 # construct a View object
1016 1020 #--------------------------------------------------------------------------
1017 1021
1018 1022 def load_balanced_view(self, targets=None):
1019 1023 """construct a DirectView object.
1020 1024
1021 1025 If no arguments are specified, create a LoadBalancedView
1022 1026 using all engines.
1023 1027
1024 1028 Parameters
1025 1029 ----------
1026 1030
1027 1031 targets: list,slice,int,etc. [default: use all engines]
1028 1032 The subset of engines across which to load-balance
1029 1033 """
1030 1034 if targets == 'all':
1031 1035 targets = None
1032 1036 if targets is not None:
1033 1037 targets = self._build_targets(targets)[1]
1034 1038 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1035 1039
1036 1040 def direct_view(self, targets='all'):
1037 1041 """construct a DirectView object.
1038 1042
1039 1043 If no targets are specified, create a DirectView using all engines.
1040 1044
1041 1045 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1042 1046 evaluate the target engines at each execution, whereas rc[:] will connect to
1043 1047 all *current* engines, and that list will not change.
1044 1048
1045 1049 That is, 'all' will always use all engines, whereas rc[:] will not use
1046 1050 engines added after the DirectView is constructed.
1047 1051
1048 1052 Parameters
1049 1053 ----------
1050 1054
1051 1055 targets: list,slice,int,etc. [default: use all engines]
1052 1056 The engines to use for the View
1053 1057 """
1054 1058 single = isinstance(targets, int)
1055 1059 # allow 'all' to be lazily evaluated at each execution
1056 1060 if targets != 'all':
1057 1061 targets = self._build_targets(targets)[1]
1058 1062 if single:
1059 1063 targets = targets[0]
1060 1064 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1061 1065
1062 1066 #--------------------------------------------------------------------------
1063 1067 # Query methods
1064 1068 #--------------------------------------------------------------------------
1065 1069
1066 1070 @spin_first
1067 1071 def get_result(self, indices_or_msg_ids=None, block=None):
1068 1072 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1069 1073
1070 1074 If the client already has the results, no request to the Hub will be made.
1071 1075
1072 1076 This is a convenient way to construct AsyncResult objects, which are wrappers
1073 1077 that include metadata about execution, and allow for awaiting results that
1074 1078 were not submitted by this Client.
1075 1079
1076 1080 It can also be a convenient way to retrieve the metadata associated with
1077 1081 blocking execution, since it always retrieves
1078 1082
1079 1083 Examples
1080 1084 --------
1081 1085 ::
1082 1086
1083 1087 In [10]: r = client.apply()
1084 1088
1085 1089 Parameters
1086 1090 ----------
1087 1091
1088 1092 indices_or_msg_ids : integer history index, str msg_id, or list of either
1089 1093 The indices or msg_ids of indices to be retrieved
1090 1094
1091 1095 block : bool
1092 1096 Whether to wait for the result to be done
1093 1097
1094 1098 Returns
1095 1099 -------
1096 1100
1097 1101 AsyncResult
1098 1102 A single AsyncResult object will always be returned.
1099 1103
1100 1104 AsyncHubResult
1101 1105 A subclass of AsyncResult that retrieves results from the Hub
1102 1106
1103 1107 """
1104 1108 block = self.block if block is None else block
1105 1109 if indices_or_msg_ids is None:
1106 1110 indices_or_msg_ids = -1
1107 1111
1108 1112 if not isinstance(indices_or_msg_ids, (list,tuple)):
1109 1113 indices_or_msg_ids = [indices_or_msg_ids]
1110 1114
1111 1115 theids = []
1112 1116 for id in indices_or_msg_ids:
1113 1117 if isinstance(id, int):
1114 1118 id = self.history[id]
1115 1119 if not isinstance(id, basestring):
1116 1120 raise TypeError("indices must be str or int, not %r"%id)
1117 1121 theids.append(id)
1118 1122
1119 1123 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1120 1124 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1121 1125
1122 1126 if remote_ids:
1123 1127 ar = AsyncHubResult(self, msg_ids=theids)
1124 1128 else:
1125 1129 ar = AsyncResult(self, msg_ids=theids)
1126 1130
1127 1131 if block:
1128 1132 ar.wait()
1129 1133
1130 1134 return ar
1131 1135
1132 1136 @spin_first
1133 1137 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1134 1138 """Resubmit one or more tasks.
1135 1139
1136 1140 in-flight tasks may not be resubmitted.
1137 1141
1138 1142 Parameters
1139 1143 ----------
1140 1144
1141 1145 indices_or_msg_ids : integer history index, str msg_id, or list of either
1142 1146 The indices or msg_ids of indices to be retrieved
1143 1147
1144 1148 block : bool
1145 1149 Whether to wait for the result to be done
1146 1150
1147 1151 Returns
1148 1152 -------
1149 1153
1150 1154 AsyncHubResult
1151 1155 A subclass of AsyncResult that retrieves results from the Hub
1152 1156
1153 1157 """
1154 1158 block = self.block if block is None else block
1155 1159 if indices_or_msg_ids is None:
1156 1160 indices_or_msg_ids = -1
1157 1161
1158 1162 if not isinstance(indices_or_msg_ids, (list,tuple)):
1159 1163 indices_or_msg_ids = [indices_or_msg_ids]
1160 1164
1161 1165 theids = []
1162 1166 for id in indices_or_msg_ids:
1163 1167 if isinstance(id, int):
1164 1168 id = self.history[id]
1165 1169 if not isinstance(id, basestring):
1166 1170 raise TypeError("indices must be str or int, not %r"%id)
1167 1171 theids.append(id)
1168 1172
1169 1173 for msg_id in theids:
1170 1174 self.outstanding.discard(msg_id)
1171 1175 if msg_id in self.history:
1172 1176 self.history.remove(msg_id)
1173 1177 self.results.pop(msg_id, None)
1174 1178 self.metadata.pop(msg_id, None)
1175 1179 content = dict(msg_ids = theids)
1176 1180
1177 1181 self.session.send(self._query_socket, 'resubmit_request', content)
1178 1182
1179 1183 zmq.select([self._query_socket], [], [])
1180 1184 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1181 1185 if self.debug:
1182 1186 pprint(msg)
1183 1187 content = msg['content']
1184 1188 if content['status'] != 'ok':
1185 1189 raise self._unwrap_exception(content)
1186 1190
1187 1191 ar = AsyncHubResult(self, msg_ids=theids)
1188 1192
1189 1193 if block:
1190 1194 ar.wait()
1191 1195
1192 1196 return ar
1193 1197
1194 1198 @spin_first
1195 1199 def result_status(self, msg_ids, status_only=True):
1196 1200 """Check on the status of the result(s) of the apply request with `msg_ids`.
1197 1201
1198 1202 If status_only is False, then the actual results will be retrieved, else
1199 1203 only the status of the results will be checked.
1200 1204
1201 1205 Parameters
1202 1206 ----------
1203 1207
1204 1208 msg_ids : list of msg_ids
1205 1209 if int:
1206 1210 Passed as index to self.history for convenience.
1207 1211 status_only : bool (default: True)
1208 1212 if False:
1209 1213 Retrieve the actual results of completed tasks.
1210 1214
1211 1215 Returns
1212 1216 -------
1213 1217
1214 1218 results : dict
1215 1219 There will always be the keys 'pending' and 'completed', which will
1216 1220 be lists of msg_ids that are incomplete or complete. If `status_only`
1217 1221 is False, then completed results will be keyed by their `msg_id`.
1218 1222 """
1219 1223 if not isinstance(msg_ids, (list,tuple)):
1220 1224 msg_ids = [msg_ids]
1221 1225
1222 1226 theids = []
1223 1227 for msg_id in msg_ids:
1224 1228 if isinstance(msg_id, int):
1225 1229 msg_id = self.history[msg_id]
1226 1230 if not isinstance(msg_id, basestring):
1227 1231 raise TypeError("msg_ids must be str, not %r"%msg_id)
1228 1232 theids.append(msg_id)
1229 1233
1230 1234 completed = []
1231 1235 local_results = {}
1232 1236
1233 1237 # comment this block out to temporarily disable local shortcut:
1234 1238 for msg_id in theids:
1235 1239 if msg_id in self.results:
1236 1240 completed.append(msg_id)
1237 1241 local_results[msg_id] = self.results[msg_id]
1238 1242 theids.remove(msg_id)
1239 1243
1240 1244 if theids: # some not locally cached
1241 1245 content = dict(msg_ids=theids, status_only=status_only)
1242 1246 msg = self.session.send(self._query_socket, "result_request", content=content)
1243 1247 zmq.select([self._query_socket], [], [])
1244 1248 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1245 1249 if self.debug:
1246 1250 pprint(msg)
1247 1251 content = msg['content']
1248 1252 if content['status'] != 'ok':
1249 1253 raise self._unwrap_exception(content)
1250 1254 buffers = msg['buffers']
1251 1255 else:
1252 1256 content = dict(completed=[],pending=[])
1253 1257
1254 1258 content['completed'].extend(completed)
1255 1259
1256 1260 if status_only:
1257 1261 return content
1258 1262
1259 1263 failures = []
1260 1264 # load cached results into result:
1261 1265 content.update(local_results)
1262 1266
1263 1267 # update cache with results:
1264 1268 for msg_id in sorted(theids):
1265 1269 if msg_id in content['completed']:
1266 1270 rec = content[msg_id]
1267 1271 parent = rec['header']
1268 1272 header = rec['result_header']
1269 1273 rcontent = rec['result_content']
1270 1274 iodict = rec['io']
1271 1275 if isinstance(rcontent, str):
1272 1276 rcontent = self.session.unpack(rcontent)
1273 1277
1274 1278 md = self.metadata[msg_id]
1275 1279 md.update(self._extract_metadata(header, parent, rcontent))
1276 1280 md.update(iodict)
1277 1281
1278 1282 if rcontent['status'] == 'ok':
1279 1283 res,buffers = util.unserialize_object(buffers)
1280 1284 else:
1281 1285 print rcontent
1282 1286 res = self._unwrap_exception(rcontent)
1283 1287 failures.append(res)
1284 1288
1285 1289 self.results[msg_id] = res
1286 1290 content[msg_id] = res
1287 1291
1288 1292 if len(theids) == 1 and failures:
1289 1293 raise failures[0]
1290 1294
1291 1295 error.collect_exceptions(failures, "result_status")
1292 1296 return content
1293 1297
1294 1298 @spin_first
1295 1299 def queue_status(self, targets='all', verbose=False):
1296 1300 """Fetch the status of engine queues.
1297 1301
1298 1302 Parameters
1299 1303 ----------
1300 1304
1301 1305 targets : int/str/list of ints/strs
1302 1306 the engines whose states are to be queried.
1303 1307 default : all
1304 1308 verbose : bool
1305 1309 Whether to return lengths only, or lists of ids for each element
1306 1310 """
1307 1311 engine_ids = self._build_targets(targets)[1]
1308 1312 content = dict(targets=engine_ids, verbose=verbose)
1309 1313 self.session.send(self._query_socket, "queue_request", content=content)
1310 1314 idents,msg = self.session.recv(self._query_socket, 0)
1311 1315 if self.debug:
1312 1316 pprint(msg)
1313 1317 content = msg['content']
1314 1318 status = content.pop('status')
1315 1319 if status != 'ok':
1316 1320 raise self._unwrap_exception(content)
1317 1321 content = rekey(content)
1318 1322 if isinstance(targets, int):
1319 1323 return content[targets]
1320 1324 else:
1321 1325 return content
1322 1326
1323 1327 @spin_first
1324 1328 def purge_results(self, jobs=[], targets=[]):
1325 1329 """Tell the Hub to forget results.
1326 1330
1327 1331 Individual results can be purged by msg_id, or the entire
1328 1332 history of specific targets can be purged.
1329 1333
1330 1334 Use `purge_results('all')` to scrub everything from the Hub's db.
1331 1335
1332 1336 Parameters
1333 1337 ----------
1334 1338
1335 1339 jobs : str or list of str or AsyncResult objects
1336 1340 the msg_ids whose results should be forgotten.
1337 1341 targets : int/str/list of ints/strs
1338 1342 The targets, by int_id, whose entire history is to be purged.
1339 1343
1340 1344 default : None
1341 1345 """
1342 1346 if not targets and not jobs:
1343 1347 raise ValueError("Must specify at least one of `targets` and `jobs`")
1344 1348 if targets:
1345 1349 targets = self._build_targets(targets)[1]
1346 1350
1347 1351 # construct msg_ids from jobs
1348 1352 if jobs == 'all':
1349 1353 msg_ids = jobs
1350 1354 else:
1351 1355 msg_ids = []
1352 1356 if isinstance(jobs, (basestring,AsyncResult)):
1353 1357 jobs = [jobs]
1354 1358 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1355 1359 if bad_ids:
1356 1360 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1357 1361 for j in jobs:
1358 1362 if isinstance(j, AsyncResult):
1359 1363 msg_ids.extend(j.msg_ids)
1360 1364 else:
1361 1365 msg_ids.append(j)
1362 1366
1363 1367 content = dict(engine_ids=targets, msg_ids=msg_ids)
1364 1368 self.session.send(self._query_socket, "purge_request", content=content)
1365 1369 idents, msg = self.session.recv(self._query_socket, 0)
1366 1370 if self.debug:
1367 1371 pprint(msg)
1368 1372 content = msg['content']
1369 1373 if content['status'] != 'ok':
1370 1374 raise self._unwrap_exception(content)
1371 1375
1372 1376 @spin_first
1373 1377 def hub_history(self):
1374 1378 """Get the Hub's history
1375 1379
1376 1380 Just like the Client, the Hub has a history, which is a list of msg_ids.
1377 1381 This will contain the history of all clients, and, depending on configuration,
1378 1382 may contain history across multiple cluster sessions.
1379 1383
1380 1384 Any msg_id returned here is a valid argument to `get_result`.
1381 1385
1382 1386 Returns
1383 1387 -------
1384 1388
1385 1389 msg_ids : list of strs
1386 1390 list of all msg_ids, ordered by task submission time.
1387 1391 """
1388 1392
1389 1393 self.session.send(self._query_socket, "history_request", content={})
1390 1394 idents, msg = self.session.recv(self._query_socket, 0)
1391 1395
1392 1396 if self.debug:
1393 1397 pprint(msg)
1394 1398 content = msg['content']
1395 1399 if content['status'] != 'ok':
1396 1400 raise self._unwrap_exception(content)
1397 1401 else:
1398 1402 return content['history']
1399 1403
1400 1404 @spin_first
1401 1405 def db_query(self, query, keys=None):
1402 1406 """Query the Hub's TaskRecord database
1403 1407
1404 1408 This will return a list of task record dicts that match `query`
1405 1409
1406 1410 Parameters
1407 1411 ----------
1408 1412
1409 1413 query : mongodb query dict
1410 1414 The search dict. See mongodb query docs for details.
1411 1415 keys : list of strs [optional]
1412 1416 The subset of keys to be returned. The default is to fetch everything but buffers.
1413 1417 'msg_id' will *always* be included.
1414 1418 """
1415 1419 if isinstance(keys, basestring):
1416 1420 keys = [keys]
1417 1421 content = dict(query=query, keys=keys)
1418 1422 self.session.send(self._query_socket, "db_request", content=content)
1419 1423 idents, msg = self.session.recv(self._query_socket, 0)
1420 1424 if self.debug:
1421 1425 pprint(msg)
1422 1426 content = msg['content']
1423 1427 if content['status'] != 'ok':
1424 1428 raise self._unwrap_exception(content)
1425 1429
1426 1430 records = content['records']
1427 1431
1428 1432 buffer_lens = content['buffer_lens']
1429 1433 result_buffer_lens = content['result_buffer_lens']
1430 1434 buffers = msg['buffers']
1431 1435 has_bufs = buffer_lens is not None
1432 1436 has_rbufs = result_buffer_lens is not None
1433 1437 for i,rec in enumerate(records):
1434 1438 # relink buffers
1435 1439 if has_bufs:
1436 1440 blen = buffer_lens[i]
1437 1441 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1438 1442 if has_rbufs:
1439 1443 blen = result_buffer_lens[i]
1440 1444 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1441 1445
1442 1446 return records
1443 1447
1444 1448 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now