##// END OF EJS Templates
handle data_pub in parallel Client
MinRK -
Show More
@@ -1,1713 +1,1718
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36 36
37 37 from IPython.utils.coloransi import TermColors
38 38 from IPython.utils.jsonutil import rekey
39 39 from IPython.utils.localinterfaces import LOCAL_IPS
40 40 from IPython.utils.path import get_ipython_dir
41 41 from IPython.utils.py3compat import cast_bytes
42 42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 43 Dict, List, Bool, Set, Any)
44 44 from IPython.external.decorator import decorator
45 45 from IPython.external.ssh import tunnel
46 46
47 47 from IPython.parallel import Reference
48 48 from IPython.parallel import error
49 49 from IPython.parallel import util
50 50
51 51 from IPython.zmq.session import Session, Message
52 from IPython.zmq import serialize
52 53
53 54 from .asyncresult import AsyncResult, AsyncHubResult
54 55 from .view import DirectView, LoadBalancedView
55 56
56 57 if sys.version_info[0] >= 3:
57 58 # xrange is used in a couple 'isinstance' tests in py2
58 59 # should be just 'range' in 3k
59 60 xrange = range
60 61
61 62 #--------------------------------------------------------------------------
62 63 # Decorators for Client methods
63 64 #--------------------------------------------------------------------------
64 65
65 66 @decorator
66 67 def spin_first(f, self, *args, **kwargs):
67 68 """Call spin() to sync state prior to calling the method."""
68 69 self.spin()
69 70 return f(self, *args, **kwargs)
70 71
71 72
72 73 #--------------------------------------------------------------------------
73 74 # Classes
74 75 #--------------------------------------------------------------------------
75 76
76 77
77 78 class ExecuteReply(object):
78 79 """wrapper for finished Execute results"""
79 80 def __init__(self, msg_id, content, metadata):
80 81 self.msg_id = msg_id
81 82 self._content = content
82 83 self.execution_count = content['execution_count']
83 84 self.metadata = metadata
84 85
85 86 def __getitem__(self, key):
86 87 return self.metadata[key]
87 88
88 89 def __getattr__(self, key):
89 90 if key not in self.metadata:
90 91 raise AttributeError(key)
91 92 return self.metadata[key]
92 93
93 94 def __repr__(self):
94 95 pyout = self.metadata['pyout'] or {'data':{}}
95 96 text_out = pyout['data'].get('text/plain', '')
96 97 if len(text_out) > 32:
97 98 text_out = text_out[:29] + '...'
98 99
99 100 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100 101
101 102 def _repr_pretty_(self, p, cycle):
102 103 pyout = self.metadata['pyout'] or {'data':{}}
103 104 text_out = pyout['data'].get('text/plain', '')
104 105
105 106 if not text_out:
106 107 return
107 108
108 109 try:
109 110 ip = get_ipython()
110 111 except NameError:
111 112 colors = "NoColor"
112 113 else:
113 114 colors = ip.colors
114 115
115 116 if colors == "NoColor":
116 117 out = normal = ""
117 118 else:
118 119 out = TermColors.Red
119 120 normal = TermColors.Normal
120 121
121 122 if '\n' in text_out and not text_out.startswith('\n'):
122 123 # add newline for multiline reprs
123 124 text_out = '\n' + text_out
124 125
125 126 p.text(
126 127 out + u'Out[%i:%i]: ' % (
127 128 self.metadata['engine_id'], self.execution_count
128 129 ) + normal + text_out
129 130 )
130 131
131 132 def _repr_html_(self):
132 133 pyout = self.metadata['pyout'] or {'data':{}}
133 134 return pyout['data'].get("text/html")
134 135
135 136 def _repr_latex_(self):
136 137 pyout = self.metadata['pyout'] or {'data':{}}
137 138 return pyout['data'].get("text/latex")
138 139
139 140 def _repr_json_(self):
140 141 pyout = self.metadata['pyout'] or {'data':{}}
141 142 return pyout['data'].get("application/json")
142 143
143 144 def _repr_javascript_(self):
144 145 pyout = self.metadata['pyout'] or {'data':{}}
145 146 return pyout['data'].get("application/javascript")
146 147
147 148 def _repr_png_(self):
148 149 pyout = self.metadata['pyout'] or {'data':{}}
149 150 return pyout['data'].get("image/png")
150 151
151 152 def _repr_jpeg_(self):
152 153 pyout = self.metadata['pyout'] or {'data':{}}
153 154 return pyout['data'].get("image/jpeg")
154 155
155 156 def _repr_svg_(self):
156 157 pyout = self.metadata['pyout'] or {'data':{}}
157 158 return pyout['data'].get("image/svg+xml")
158 159
159 160
160 161 class Metadata(dict):
161 162 """Subclass of dict for initializing metadata values.
162 163
163 164 Attribute access works on keys.
164 165
165 166 These objects have a strict set of keys - errors will raise if you try
166 167 to add new keys.
167 168 """
168 169 def __init__(self, *args, **kwargs):
169 170 dict.__init__(self)
170 171 md = {'msg_id' : None,
171 172 'submitted' : None,
172 173 'started' : None,
173 174 'completed' : None,
174 175 'received' : None,
175 176 'engine_uuid' : None,
176 177 'engine_id' : None,
177 178 'follow' : None,
178 179 'after' : None,
179 180 'status' : None,
180 181
181 182 'pyin' : None,
182 183 'pyout' : None,
183 184 'pyerr' : None,
184 185 'stdout' : '',
185 186 'stderr' : '',
186 187 'outputs' : [],
188 'data': {},
187 189 'outputs_ready' : False,
188 190 }
189 191 self.update(md)
190 192 self.update(dict(*args, **kwargs))
191 193
192 194 def __getattr__(self, key):
193 195 """getattr aliased to getitem"""
194 196 if key in self.iterkeys():
195 197 return self[key]
196 198 else:
197 199 raise AttributeError(key)
198 200
199 201 def __setattr__(self, key, value):
200 202 """setattr aliased to setitem, with strict"""
201 203 if key in self.iterkeys():
202 204 self[key] = value
203 205 else:
204 206 raise AttributeError(key)
205 207
206 208 def __setitem__(self, key, value):
207 209 """strict static key enforcement"""
208 210 if key in self.iterkeys():
209 211 dict.__setitem__(self, key, value)
210 212 else:
211 213 raise KeyError(key)
212 214
213 215
214 216 class Client(HasTraits):
215 217 """A semi-synchronous client to the IPython ZMQ cluster
216 218
217 219 Parameters
218 220 ----------
219 221
220 222 url_file : str/unicode; path to ipcontroller-client.json
221 223 This JSON file should contain all the information needed to connect to a cluster,
222 224 and is likely the only argument needed.
223 225 Connection information for the Hub's registration. If a json connector
224 226 file is given, then likely no further configuration is necessary.
225 227 [Default: use profile]
226 228 profile : bytes
227 229 The name of the Cluster profile to be used to find connector information.
228 230 If run from an IPython application, the default profile will be the same
229 231 as the running application, otherwise it will be 'default'.
230 232 context : zmq.Context
231 233 Pass an existing zmq.Context instance, otherwise the client will create its own.
232 234 debug : bool
233 235 flag for lots of message printing for debug purposes
234 236 timeout : int/float
235 237 time (in seconds) to wait for connection replies from the Hub
236 238 [Default: 10]
237 239
238 240 #-------------- session related args ----------------
239 241
240 242 config : Config object
241 243 If specified, this will be relayed to the Session for configuration
242 244 username : str
243 245 set username for the session object
244 246
245 247 #-------------- ssh related args ----------------
246 248 # These are args for configuring the ssh tunnel to be used
247 249 # credentials are used to forward connections over ssh to the Controller
248 250 # Note that the ip given in `addr` needs to be relative to sshserver
249 251 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
250 252 # and set sshserver as the same machine the Controller is on. However,
251 253 # the only requirement is that sshserver is able to see the Controller
252 254 # (i.e. is within the same trusted network).
253 255
254 256 sshserver : str
255 257 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
256 258 If keyfile or password is specified, and this is not, it will default to
257 259 the ip given in addr.
258 260 sshkey : str; path to ssh private key file
259 261 This specifies a key to be used in ssh login, default None.
260 262 Regular default ssh keys will be used without specifying this argument.
261 263 password : str
262 264 Your ssh password to sshserver. Note that if this is left None,
263 265 you will be prompted for it if passwordless key based login is unavailable.
264 266 paramiko : bool
265 267 flag for whether to use paramiko instead of shell ssh for tunneling.
266 268 [default: True on win32, False else]
267 269
268 270
269 271 Attributes
270 272 ----------
271 273
272 274 ids : list of int engine IDs
273 275 requesting the ids attribute always synchronizes
274 276 the registration state. To request ids without synchronization,
275 277 use semi-private _ids attributes.
276 278
277 279 history : list of msg_ids
278 280 a list of msg_ids, keeping track of all the execution
279 281 messages you have submitted in order.
280 282
281 283 outstanding : set of msg_ids
282 284 a set of msg_ids that have been submitted, but whose
283 285 results have not yet been received.
284 286
285 287 results : dict
286 288 a dict of all our results, keyed by msg_id
287 289
288 290 block : bool
289 291 determines default behavior when block not specified
290 292 in execution methods
291 293
292 294 Methods
293 295 -------
294 296
295 297 spin
296 298 flushes incoming results and registration state changes
297 299 control methods spin, and requesting `ids` also ensures up to date
298 300
299 301 wait
300 302 wait on one or more msg_ids
301 303
302 304 execution methods
303 305 apply
304 306 legacy: execute, run
305 307
306 308 data movement
307 309 push, pull, scatter, gather
308 310
309 311 query methods
310 312 queue_status, get_result, purge, result_status
311 313
312 314 control methods
313 315 abort, shutdown
314 316
315 317 """
316 318
317 319
318 320 block = Bool(False)
319 321 outstanding = Set()
320 322 results = Instance('collections.defaultdict', (dict,))
321 323 metadata = Instance('collections.defaultdict', (Metadata,))
322 324 history = List()
323 325 debug = Bool(False)
324 326 _spin_thread = Any()
325 327 _stop_spinning = Any()
326 328
327 329 profile=Unicode()
328 330 def _profile_default(self):
329 331 if BaseIPythonApplication.initialized():
330 332 # an IPython app *might* be running, try to get its profile
331 333 try:
332 334 return BaseIPythonApplication.instance().profile
333 335 except (AttributeError, MultipleInstanceError):
334 336 # could be a *different* subclass of config.Application,
335 337 # which would raise one of these two errors.
336 338 return u'default'
337 339 else:
338 340 return u'default'
339 341
340 342
341 343 _outstanding_dict = Instance('collections.defaultdict', (set,))
342 344 _ids = List()
343 345 _connected=Bool(False)
344 346 _ssh=Bool(False)
345 347 _context = Instance('zmq.Context')
346 348 _config = Dict()
347 349 _engines=Instance(util.ReverseDict, (), {})
348 350 # _hub_socket=Instance('zmq.Socket')
349 351 _query_socket=Instance('zmq.Socket')
350 352 _control_socket=Instance('zmq.Socket')
351 353 _iopub_socket=Instance('zmq.Socket')
352 354 _notification_socket=Instance('zmq.Socket')
353 355 _mux_socket=Instance('zmq.Socket')
354 356 _task_socket=Instance('zmq.Socket')
355 357 _task_scheme=Unicode()
356 358 _closed = False
357 359 _ignored_control_replies=Integer(0)
358 360 _ignored_hub_replies=Integer(0)
359 361
360 362 def __new__(self, *args, **kw):
361 363 # don't raise on positional args
362 364 return HasTraits.__new__(self, **kw)
363 365
364 366 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
365 367 context=None, debug=False,
366 368 sshserver=None, sshkey=None, password=None, paramiko=None,
367 369 timeout=10, **extra_args
368 370 ):
369 371 if profile:
370 372 super(Client, self).__init__(debug=debug, profile=profile)
371 373 else:
372 374 super(Client, self).__init__(debug=debug)
373 375 if context is None:
374 376 context = zmq.Context.instance()
375 377 self._context = context
376 378 self._stop_spinning = Event()
377 379
378 380 if 'url_or_file' in extra_args:
379 381 url_file = extra_args['url_or_file']
380 382 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381 383
382 384 if url_file and util.is_url(url_file):
383 385 raise ValueError("single urls cannot be specified, url-files must be used.")
384 386
385 387 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386 388
387 389 if self._cd is not None:
388 390 if url_file is None:
389 391 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
390 392 if url_file is None:
391 393 raise ValueError(
392 394 "I can't find enough information to connect to a hub!"
393 395 " Please specify at least one of url_file or profile."
394 396 )
395 397
396 398 with open(url_file) as f:
397 399 cfg = json.load(f)
398 400
399 401 self._task_scheme = cfg['task_scheme']
400 402
401 403 # sync defaults from args, json:
402 404 if sshserver:
403 405 cfg['ssh'] = sshserver
404 406
405 407 location = cfg.setdefault('location', None)
406 408
407 409 proto,addr = cfg['interface'].split('://')
408 410 addr = util.disambiguate_ip_address(addr)
409 411 cfg['interface'] = "%s://%s" % (proto, addr)
410 412
411 413 # turn interface,port into full urls:
412 414 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
413 415 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
414 416
415 417 url = cfg['registration']
416 418
417 419 if location is not None and addr == '127.0.0.1':
418 420 # location specified, and connection is expected to be local
419 421 if location not in LOCAL_IPS and not sshserver:
420 422 # load ssh from JSON *only* if the controller is not on
421 423 # this machine
422 424 sshserver=cfg['ssh']
423 425 if location not in LOCAL_IPS and not sshserver:
424 426 # warn if no ssh specified, but SSH is probably needed
425 427 # This is only a warning, because the most likely cause
426 428 # is a local Controller on a laptop whose IP is dynamic
427 429 warnings.warn("""
428 430 Controller appears to be listening on localhost, but not on this machine.
429 431 If this is true, you should specify Client(...,sshserver='you@%s')
430 432 or instruct your controller to listen on an external IP."""%location,
431 433 RuntimeWarning)
432 434 elif not sshserver:
433 435 # otherwise sync with cfg
434 436 sshserver = cfg['ssh']
435 437
436 438 self._config = cfg
437 439
438 440 self._ssh = bool(sshserver or sshkey or password)
439 441 if self._ssh and sshserver is None:
440 442 # default to ssh via localhost
441 443 sshserver = addr
442 444 if self._ssh and password is None:
443 445 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
444 446 password=False
445 447 else:
446 448 password = getpass("SSH Password for %s: "%sshserver)
447 449 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
448 450
449 451 # configure and construct the session
450 452 extra_args['packer'] = cfg['pack']
451 453 extra_args['unpacker'] = cfg['unpack']
452 454 extra_args['key'] = cast_bytes(cfg['exec_key'])
453 455
454 456 self.session = Session(**extra_args)
455 457
456 458 self._query_socket = self._context.socket(zmq.DEALER)
457 459
458 460 if self._ssh:
459 461 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
460 462 else:
461 463 self._query_socket.connect(cfg['registration'])
462 464
463 465 self.session.debug = self.debug
464 466
465 467 self._notification_handlers = {'registration_notification' : self._register_engine,
466 468 'unregistration_notification' : self._unregister_engine,
467 469 'shutdown_notification' : lambda msg: self.close(),
468 470 }
469 471 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
470 472 'apply_reply' : self._handle_apply_reply}
471 473 self._connect(sshserver, ssh_kwargs, timeout)
472 474
473 475 # last step: setup magics, if we are in IPython:
474 476
475 477 try:
476 478 ip = get_ipython()
477 479 except NameError:
478 480 return
479 481 else:
480 482 if 'px' not in ip.magics_manager.magics:
481 483 # in IPython but we are the first Client.
482 484 # activate a default view for parallel magics.
483 485 self.activate()
484 486
485 487 def __del__(self):
486 488 """cleanup sockets, but _not_ context."""
487 489 self.close()
488 490
489 491 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
490 492 if ipython_dir is None:
491 493 ipython_dir = get_ipython_dir()
492 494 if profile_dir is not None:
493 495 try:
494 496 self._cd = ProfileDir.find_profile_dir(profile_dir)
495 497 return
496 498 except ProfileDirError:
497 499 pass
498 500 elif profile is not None:
499 501 try:
500 502 self._cd = ProfileDir.find_profile_dir_by_name(
501 503 ipython_dir, profile)
502 504 return
503 505 except ProfileDirError:
504 506 pass
505 507 self._cd = None
506 508
507 509 def _update_engines(self, engines):
508 510 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
509 511 for k,v in engines.iteritems():
510 512 eid = int(k)
511 513 if eid not in self._engines:
512 514 self._ids.append(eid)
513 515 self._engines[eid] = v
514 516 self._ids = sorted(self._ids)
515 517 if sorted(self._engines.keys()) != range(len(self._engines)) and \
516 518 self._task_scheme == 'pure' and self._task_socket:
517 519 self._stop_scheduling_tasks()
518 520
519 521 def _stop_scheduling_tasks(self):
520 522 """Stop scheduling tasks because an engine has been unregistered
521 523 from a pure ZMQ scheduler.
522 524 """
523 525 self._task_socket.close()
524 526 self._task_socket = None
525 527 msg = "An engine has been unregistered, and we are using pure " +\
526 528 "ZMQ task scheduling. Task farming will be disabled."
527 529 if self.outstanding:
528 530 msg += " If you were running tasks when this happened, " +\
529 531 "some `outstanding` msg_ids may never resolve."
530 532 warnings.warn(msg, RuntimeWarning)
531 533
532 534 def _build_targets(self, targets):
533 535 """Turn valid target IDs or 'all' into two lists:
534 536 (int_ids, uuids).
535 537 """
536 538 if not self._ids:
537 539 # flush notification socket if no engines yet, just in case
538 540 if not self.ids:
539 541 raise error.NoEnginesRegistered("Can't build targets without any engines")
540 542
541 543 if targets is None:
542 544 targets = self._ids
543 545 elif isinstance(targets, basestring):
544 546 if targets.lower() == 'all':
545 547 targets = self._ids
546 548 else:
547 549 raise TypeError("%r not valid str target, must be 'all'"%(targets))
548 550 elif isinstance(targets, int):
549 551 if targets < 0:
550 552 targets = self.ids[targets]
551 553 if targets not in self._ids:
552 554 raise IndexError("No such engine: %i"%targets)
553 555 targets = [targets]
554 556
555 557 if isinstance(targets, slice):
556 558 indices = range(len(self._ids))[targets]
557 559 ids = self.ids
558 560 targets = [ ids[i] for i in indices ]
559 561
560 562 if not isinstance(targets, (tuple, list, xrange)):
561 563 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
562 564
563 565 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
564 566
565 567 def _connect(self, sshserver, ssh_kwargs, timeout):
566 568 """setup all our socket connections to the cluster. This is called from
567 569 __init__."""
568 570
569 571 # Maybe allow reconnecting?
570 572 if self._connected:
571 573 return
572 574 self._connected=True
573 575
574 576 def connect_socket(s, url):
575 577 # url = util.disambiguate_url(url, self._config['location'])
576 578 if self._ssh:
577 579 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
578 580 else:
579 581 return s.connect(url)
580 582
581 583 self.session.send(self._query_socket, 'connection_request')
582 584 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
583 585 poller = zmq.Poller()
584 586 poller.register(self._query_socket, zmq.POLLIN)
585 587 # poll expects milliseconds, timeout is seconds
586 588 evts = poller.poll(timeout*1000)
587 589 if not evts:
588 590 raise error.TimeoutError("Hub connection request timed out")
589 591 idents,msg = self.session.recv(self._query_socket,mode=0)
590 592 if self.debug:
591 593 pprint(msg)
592 594 content = msg['content']
593 595 # self._config['registration'] = dict(content)
594 596 cfg = self._config
595 597 if content['status'] == 'ok':
596 598 self._mux_socket = self._context.socket(zmq.DEALER)
597 599 connect_socket(self._mux_socket, cfg['mux'])
598 600
599 601 self._task_socket = self._context.socket(zmq.DEALER)
600 602 connect_socket(self._task_socket, cfg['task'])
601 603
602 604 self._notification_socket = self._context.socket(zmq.SUB)
603 605 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
604 606 connect_socket(self._notification_socket, cfg['notification'])
605 607
606 608 self._control_socket = self._context.socket(zmq.DEALER)
607 609 connect_socket(self._control_socket, cfg['control'])
608 610
609 611 self._iopub_socket = self._context.socket(zmq.SUB)
610 612 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
611 613 connect_socket(self._iopub_socket, cfg['iopub'])
612 614
613 615 self._update_engines(dict(content['engines']))
614 616 else:
615 617 self._connected = False
616 618 raise Exception("Failed to connect!")
617 619
618 620 #--------------------------------------------------------------------------
619 621 # handlers and callbacks for incoming messages
620 622 #--------------------------------------------------------------------------
621 623
622 624 def _unwrap_exception(self, content):
623 625 """unwrap exception, and remap engine_id to int."""
624 626 e = error.unwrap_exception(content)
625 627 # print e.traceback
626 628 if e.engine_info:
627 629 e_uuid = e.engine_info['engine_uuid']
628 630 eid = self._engines[e_uuid]
629 631 e.engine_info['engine_id'] = eid
630 632 return e
631 633
632 634 def _extract_metadata(self, msg):
633 635 header = msg['header']
634 636 parent = msg['parent_header']
635 637 msg_meta = msg['metadata']
636 638 content = msg['content']
637 639 md = {'msg_id' : parent['msg_id'],
638 640 'received' : datetime.now(),
639 641 'engine_uuid' : msg_meta.get('engine', None),
640 642 'follow' : msg_meta.get('follow', []),
641 643 'after' : msg_meta.get('after', []),
642 644 'status' : content['status'],
643 645 }
644 646
645 647 if md['engine_uuid'] is not None:
646 648 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
647 649
648 650 if 'date' in parent:
649 651 md['submitted'] = parent['date']
650 652 if 'started' in msg_meta:
651 653 md['started'] = msg_meta['started']
652 654 if 'date' in header:
653 655 md['completed'] = header['date']
654 656 return md
655 657
656 658 def _register_engine(self, msg):
657 659 """Register a new engine, and update our connection info."""
658 660 content = msg['content']
659 661 eid = content['id']
660 662 d = {eid : content['uuid']}
661 663 self._update_engines(d)
662 664
663 665 def _unregister_engine(self, msg):
664 666 """Unregister an engine that has died."""
665 667 content = msg['content']
666 668 eid = int(content['id'])
667 669 if eid in self._ids:
668 670 self._ids.remove(eid)
669 671 uuid = self._engines.pop(eid)
670 672
671 673 self._handle_stranded_msgs(eid, uuid)
672 674
673 675 if self._task_socket and self._task_scheme == 'pure':
674 676 self._stop_scheduling_tasks()
675 677
676 678 def _handle_stranded_msgs(self, eid, uuid):
677 679 """Handle messages known to be on an engine when the engine unregisters.
678 680
679 681 It is possible that this will fire prematurely - that is, an engine will
680 682 go down after completing a result, and the client will be notified
681 683 of the unregistration and later receive the successful result.
682 684 """
683 685
684 686 outstanding = self._outstanding_dict[uuid]
685 687
686 688 for msg_id in list(outstanding):
687 689 if msg_id in self.results:
688 690 # we already
689 691 continue
690 692 try:
691 693 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
692 694 except:
693 695 content = error.wrap_exception()
694 696 # build a fake message:
695 697 parent = {}
696 698 header = {}
697 699 parent['msg_id'] = msg_id
698 700 header['engine'] = uuid
699 701 header['date'] = datetime.now()
700 702 msg = dict(parent_header=parent, header=header, content=content)
701 703 self._handle_apply_reply(msg)
702 704
703 705 def _handle_execute_reply(self, msg):
704 706 """Save the reply to an execute_request into our results.
705 707
706 708 execute messages are never actually used. apply is used instead.
707 709 """
708 710
709 711 parent = msg['parent_header']
710 712 msg_id = parent['msg_id']
711 713 if msg_id not in self.outstanding:
712 714 if msg_id in self.history:
713 715 print ("got stale result: %s"%msg_id)
714 716 else:
715 717 print ("got unknown result: %s"%msg_id)
716 718 else:
717 719 self.outstanding.remove(msg_id)
718 720
719 721 content = msg['content']
720 722 header = msg['header']
721 723
722 724 # construct metadata:
723 725 md = self.metadata[msg_id]
724 726 md.update(self._extract_metadata(msg))
725 727 # is this redundant?
726 728 self.metadata[msg_id] = md
727 729
728 730 e_outstanding = self._outstanding_dict[md['engine_uuid']]
729 731 if msg_id in e_outstanding:
730 732 e_outstanding.remove(msg_id)
731 733
732 734 # construct result:
733 735 if content['status'] == 'ok':
734 736 self.results[msg_id] = ExecuteReply(msg_id, content, md)
735 737 elif content['status'] == 'aborted':
736 738 self.results[msg_id] = error.TaskAborted(msg_id)
737 739 elif content['status'] == 'resubmitted':
738 740 # TODO: handle resubmission
739 741 pass
740 742 else:
741 743 self.results[msg_id] = self._unwrap_exception(content)
742 744
743 745 def _handle_apply_reply(self, msg):
744 746 """Save the reply to an apply_request into our results."""
745 747 parent = msg['parent_header']
746 748 msg_id = parent['msg_id']
747 749 if msg_id not in self.outstanding:
748 750 if msg_id in self.history:
749 751 print ("got stale result: %s"%msg_id)
750 752 print self.results[msg_id]
751 753 print msg
752 754 else:
753 755 print ("got unknown result: %s"%msg_id)
754 756 else:
755 757 self.outstanding.remove(msg_id)
756 758 content = msg['content']
757 759 header = msg['header']
758 760
759 761 # construct metadata:
760 762 md = self.metadata[msg_id]
761 763 md.update(self._extract_metadata(msg))
762 764 # is this redundant?
763 765 self.metadata[msg_id] = md
764 766
765 767 e_outstanding = self._outstanding_dict[md['engine_uuid']]
766 768 if msg_id in e_outstanding:
767 769 e_outstanding.remove(msg_id)
768 770
769 771 # construct result:
770 772 if content['status'] == 'ok':
771 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
773 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
772 774 elif content['status'] == 'aborted':
773 775 self.results[msg_id] = error.TaskAborted(msg_id)
774 776 elif content['status'] == 'resubmitted':
775 777 # TODO: handle resubmission
776 778 pass
777 779 else:
778 780 self.results[msg_id] = self._unwrap_exception(content)
779 781
780 782 def _flush_notifications(self):
781 783 """Flush notifications of engine registrations waiting
782 784 in ZMQ queue."""
783 785 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
784 786 while msg is not None:
785 787 if self.debug:
786 788 pprint(msg)
787 789 msg_type = msg['header']['msg_type']
788 790 handler = self._notification_handlers.get(msg_type, None)
789 791 if handler is None:
790 792 raise Exception("Unhandled message type: %s"%msg.msg_type)
791 793 else:
792 794 handler(msg)
793 795 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
794 796
795 797 def _flush_results(self, sock):
796 798 """Flush task or queue results waiting in ZMQ queue."""
797 799 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
798 800 while msg is not None:
799 801 if self.debug:
800 802 pprint(msg)
801 803 msg_type = msg['header']['msg_type']
802 804 handler = self._queue_handlers.get(msg_type, None)
803 805 if handler is None:
804 806 raise Exception("Unhandled message type: %s"%msg.msg_type)
805 807 else:
806 808 handler(msg)
807 809 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
808 810
809 811 def _flush_control(self, sock):
810 812 """Flush replies from the control channel waiting
811 813 in the ZMQ queue.
812 814
813 815 Currently: ignore them."""
814 816 if self._ignored_control_replies <= 0:
815 817 return
816 818 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 819 while msg is not None:
818 820 self._ignored_control_replies -= 1
819 821 if self.debug:
820 822 pprint(msg)
821 823 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822 824
823 825 def _flush_ignored_control(self):
824 826 """flush ignored control replies"""
825 827 while self._ignored_control_replies > 0:
826 828 self.session.recv(self._control_socket)
827 829 self._ignored_control_replies -= 1
828 830
829 831 def _flush_ignored_hub_replies(self):
830 832 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
831 833 while msg is not None:
832 834 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
833 835
834 836 def _flush_iopub(self, sock):
835 837 """Flush replies from the iopub channel waiting
836 838 in the ZMQ queue.
837 839 """
838 840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839 841 while msg is not None:
840 842 if self.debug:
841 843 pprint(msg)
842 844 parent = msg['parent_header']
843 845 # ignore IOPub messages with no parent.
844 846 # Caused by print statements or warnings from before the first execution.
845 847 if not parent:
846 848 continue
847 849 msg_id = parent['msg_id']
848 850 content = msg['content']
849 851 header = msg['header']
850 852 msg_type = msg['header']['msg_type']
851 853
852 854 # init metadata:
853 855 md = self.metadata[msg_id]
854 856
855 857 if msg_type == 'stream':
856 858 name = content['name']
857 859 s = md[name] or ''
858 860 md[name] = s + content['data']
859 861 elif msg_type == 'pyerr':
860 862 md.update({'pyerr' : self._unwrap_exception(content)})
861 863 elif msg_type == 'pyin':
862 864 md.update({'pyin' : content['code']})
863 865 elif msg_type == 'display_data':
864 866 md['outputs'].append(content)
865 867 elif msg_type == 'pyout':
866 868 md['pyout'] = content
869 elif msg_type == 'data_message':
870 data, remainder = serialize.unserialize_object(msg['buffers'])
871 md['data'].update(data)
867 872 elif msg_type == 'status':
868 873 # idle message comes after all outputs
869 874 if content['execution_state'] == 'idle':
870 875 md['outputs_ready'] = True
871 876 else:
872 877 # unhandled msg_type (status, etc.)
873 878 pass
874 879
875 880 # reduntant?
876 881 self.metadata[msg_id] = md
877 882
878 883 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
879 884
880 885 #--------------------------------------------------------------------------
881 886 # len, getitem
882 887 #--------------------------------------------------------------------------
883 888
884 889 def __len__(self):
885 890 """len(client) returns # of engines."""
886 891 return len(self.ids)
887 892
888 893 def __getitem__(self, key):
889 894 """index access returns DirectView multiplexer objects
890 895
891 896 Must be int, slice, or list/tuple/xrange of ints"""
892 897 if not isinstance(key, (int, slice, tuple, list, xrange)):
893 898 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
894 899 else:
895 900 return self.direct_view(key)
896 901
897 902 #--------------------------------------------------------------------------
898 903 # Begin public methods
899 904 #--------------------------------------------------------------------------
900 905
901 906 @property
902 907 def ids(self):
903 908 """Always up-to-date ids property."""
904 909 self._flush_notifications()
905 910 # always copy:
906 911 return list(self._ids)
907 912
908 913 def activate(self, targets='all', suffix=''):
909 914 """Create a DirectView and register it with IPython magics
910 915
911 916 Defines the magics `%px, %autopx, %pxresult, %%px`
912 917
913 918 Parameters
914 919 ----------
915 920
916 921 targets: int, list of ints, or 'all'
917 922 The engines on which the view's magics will run
918 923 suffix: str [default: '']
919 924 The suffix, if any, for the magics. This allows you to have
920 925 multiple views associated with parallel magics at the same time.
921 926
922 927 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
923 928 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
924 929 on engine 0.
925 930 """
926 931 view = self.direct_view(targets)
927 932 view.block = True
928 933 view.activate(suffix)
929 934 return view
930 935
931 936 def close(self):
932 937 if self._closed:
933 938 return
934 939 self.stop_spin_thread()
935 940 snames = filter(lambda n: n.endswith('socket'), dir(self))
936 941 for socket in map(lambda name: getattr(self, name), snames):
937 942 if isinstance(socket, zmq.Socket) and not socket.closed:
938 943 socket.close()
939 944 self._closed = True
940 945
941 946 def _spin_every(self, interval=1):
942 947 """target func for use in spin_thread"""
943 948 while True:
944 949 if self._stop_spinning.is_set():
945 950 return
946 951 time.sleep(interval)
947 952 self.spin()
948 953
949 954 def spin_thread(self, interval=1):
950 955 """call Client.spin() in a background thread on some regular interval
951 956
952 957 This helps ensure that messages don't pile up too much in the zmq queue
953 958 while you are working on other things, or just leaving an idle terminal.
954 959
955 960 It also helps limit potential padding of the `received` timestamp
956 961 on AsyncResult objects, used for timings.
957 962
958 963 Parameters
959 964 ----------
960 965
961 966 interval : float, optional
962 967 The interval on which to spin the client in the background thread
963 968 (simply passed to time.sleep).
964 969
965 970 Notes
966 971 -----
967 972
968 973 For precision timing, you may want to use this method to put a bound
969 974 on the jitter (in seconds) in `received` timestamps used
970 975 in AsyncResult.wall_time.
971 976
972 977 """
973 978 if self._spin_thread is not None:
974 979 self.stop_spin_thread()
975 980 self._stop_spinning.clear()
976 981 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
977 982 self._spin_thread.daemon = True
978 983 self._spin_thread.start()
979 984
980 985 def stop_spin_thread(self):
981 986 """stop background spin_thread, if any"""
982 987 if self._spin_thread is not None:
983 988 self._stop_spinning.set()
984 989 self._spin_thread.join()
985 990 self._spin_thread = None
986 991
987 992 def spin(self):
988 993 """Flush any registration notifications and execution results
989 994 waiting in the ZMQ queue.
990 995 """
991 996 if self._notification_socket:
992 997 self._flush_notifications()
993 998 if self._iopub_socket:
994 999 self._flush_iopub(self._iopub_socket)
995 1000 if self._mux_socket:
996 1001 self._flush_results(self._mux_socket)
997 1002 if self._task_socket:
998 1003 self._flush_results(self._task_socket)
999 1004 if self._control_socket:
1000 1005 self._flush_control(self._control_socket)
1001 1006 if self._query_socket:
1002 1007 self._flush_ignored_hub_replies()
1003 1008
1004 1009 def wait(self, jobs=None, timeout=-1):
1005 1010 """waits on one or more `jobs`, for up to `timeout` seconds.
1006 1011
1007 1012 Parameters
1008 1013 ----------
1009 1014
1010 1015 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1011 1016 ints are indices to self.history
1012 1017 strs are msg_ids
1013 1018 default: wait on all outstanding messages
1014 1019 timeout : float
1015 1020 a time in seconds, after which to give up.
1016 1021 default is -1, which means no timeout
1017 1022
1018 1023 Returns
1019 1024 -------
1020 1025
1021 1026 True : when all msg_ids are done
1022 1027 False : timeout reached, some msg_ids still outstanding
1023 1028 """
1024 1029 tic = time.time()
1025 1030 if jobs is None:
1026 1031 theids = self.outstanding
1027 1032 else:
1028 1033 if isinstance(jobs, (int, basestring, AsyncResult)):
1029 1034 jobs = [jobs]
1030 1035 theids = set()
1031 1036 for job in jobs:
1032 1037 if isinstance(job, int):
1033 1038 # index access
1034 1039 job = self.history[job]
1035 1040 elif isinstance(job, AsyncResult):
1036 1041 map(theids.add, job.msg_ids)
1037 1042 continue
1038 1043 theids.add(job)
1039 1044 if not theids.intersection(self.outstanding):
1040 1045 return True
1041 1046 self.spin()
1042 1047 while theids.intersection(self.outstanding):
1043 1048 if timeout >= 0 and ( time.time()-tic ) > timeout:
1044 1049 break
1045 1050 time.sleep(1e-3)
1046 1051 self.spin()
1047 1052 return len(theids.intersection(self.outstanding)) == 0
1048 1053
1049 1054 #--------------------------------------------------------------------------
1050 1055 # Control methods
1051 1056 #--------------------------------------------------------------------------
1052 1057
1053 1058 @spin_first
1054 1059 def clear(self, targets=None, block=None):
1055 1060 """Clear the namespace in target(s)."""
1056 1061 block = self.block if block is None else block
1057 1062 targets = self._build_targets(targets)[0]
1058 1063 for t in targets:
1059 1064 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1060 1065 error = False
1061 1066 if block:
1062 1067 self._flush_ignored_control()
1063 1068 for i in range(len(targets)):
1064 1069 idents,msg = self.session.recv(self._control_socket,0)
1065 1070 if self.debug:
1066 1071 pprint(msg)
1067 1072 if msg['content']['status'] != 'ok':
1068 1073 error = self._unwrap_exception(msg['content'])
1069 1074 else:
1070 1075 self._ignored_control_replies += len(targets)
1071 1076 if error:
1072 1077 raise error
1073 1078
1074 1079
1075 1080 @spin_first
1076 1081 def abort(self, jobs=None, targets=None, block=None):
1077 1082 """Abort specific jobs from the execution queues of target(s).
1078 1083
1079 1084 This is a mechanism to prevent jobs that have already been submitted
1080 1085 from executing.
1081 1086
1082 1087 Parameters
1083 1088 ----------
1084 1089
1085 1090 jobs : msg_id, list of msg_ids, or AsyncResult
1086 1091 The jobs to be aborted
1087 1092
1088 1093 If unspecified/None: abort all outstanding jobs.
1089 1094
1090 1095 """
1091 1096 block = self.block if block is None else block
1092 1097 jobs = jobs if jobs is not None else list(self.outstanding)
1093 1098 targets = self._build_targets(targets)[0]
1094 1099
1095 1100 msg_ids = []
1096 1101 if isinstance(jobs, (basestring,AsyncResult)):
1097 1102 jobs = [jobs]
1098 1103 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1099 1104 if bad_ids:
1100 1105 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1101 1106 for j in jobs:
1102 1107 if isinstance(j, AsyncResult):
1103 1108 msg_ids.extend(j.msg_ids)
1104 1109 else:
1105 1110 msg_ids.append(j)
1106 1111 content = dict(msg_ids=msg_ids)
1107 1112 for t in targets:
1108 1113 self.session.send(self._control_socket, 'abort_request',
1109 1114 content=content, ident=t)
1110 1115 error = False
1111 1116 if block:
1112 1117 self._flush_ignored_control()
1113 1118 for i in range(len(targets)):
1114 1119 idents,msg = self.session.recv(self._control_socket,0)
1115 1120 if self.debug:
1116 1121 pprint(msg)
1117 1122 if msg['content']['status'] != 'ok':
1118 1123 error = self._unwrap_exception(msg['content'])
1119 1124 else:
1120 1125 self._ignored_control_replies += len(targets)
1121 1126 if error:
1122 1127 raise error
1123 1128
1124 1129 @spin_first
1125 1130 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1126 1131 """Terminates one or more engine processes, optionally including the hub.
1127 1132
1128 1133 Parameters
1129 1134 ----------
1130 1135
1131 1136 targets: list of ints or 'all' [default: all]
1132 1137 Which engines to shutdown.
1133 1138 hub: bool [default: False]
1134 1139 Whether to include the Hub. hub=True implies targets='all'.
1135 1140 block: bool [default: self.block]
1136 1141 Whether to wait for clean shutdown replies or not.
1137 1142 restart: bool [default: False]
1138 1143 NOT IMPLEMENTED
1139 1144 whether to restart engines after shutting them down.
1140 1145 """
1141 1146
1142 1147 if restart:
1143 1148 raise NotImplementedError("Engine restart is not yet implemented")
1144 1149
1145 1150 block = self.block if block is None else block
1146 1151 if hub:
1147 1152 targets = 'all'
1148 1153 targets = self._build_targets(targets)[0]
1149 1154 for t in targets:
1150 1155 self.session.send(self._control_socket, 'shutdown_request',
1151 1156 content={'restart':restart},ident=t)
1152 1157 error = False
1153 1158 if block or hub:
1154 1159 self._flush_ignored_control()
1155 1160 for i in range(len(targets)):
1156 1161 idents,msg = self.session.recv(self._control_socket, 0)
1157 1162 if self.debug:
1158 1163 pprint(msg)
1159 1164 if msg['content']['status'] != 'ok':
1160 1165 error = self._unwrap_exception(msg['content'])
1161 1166 else:
1162 1167 self._ignored_control_replies += len(targets)
1163 1168
1164 1169 if hub:
1165 1170 time.sleep(0.25)
1166 1171 self.session.send(self._query_socket, 'shutdown_request')
1167 1172 idents,msg = self.session.recv(self._query_socket, 0)
1168 1173 if self.debug:
1169 1174 pprint(msg)
1170 1175 if msg['content']['status'] != 'ok':
1171 1176 error = self._unwrap_exception(msg['content'])
1172 1177
1173 1178 if error:
1174 1179 raise error
1175 1180
1176 1181 #--------------------------------------------------------------------------
1177 1182 # Execution related methods
1178 1183 #--------------------------------------------------------------------------
1179 1184
1180 1185 def _maybe_raise(self, result):
1181 1186 """wrapper for maybe raising an exception if apply failed."""
1182 1187 if isinstance(result, error.RemoteError):
1183 1188 raise result
1184 1189
1185 1190 return result
1186 1191
1187 1192 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1188 1193 ident=None):
1189 1194 """construct and send an apply message via a socket.
1190 1195
1191 1196 This is the principal method with which all engine execution is performed by views.
1192 1197 """
1193 1198
1194 1199 if self._closed:
1195 1200 raise RuntimeError("Client cannot be used after its sockets have been closed")
1196 1201
1197 1202 # defaults:
1198 1203 args = args if args is not None else []
1199 1204 kwargs = kwargs if kwargs is not None else {}
1200 1205 metadata = metadata if metadata is not None else {}
1201 1206
1202 1207 # validate arguments
1203 1208 if not callable(f) and not isinstance(f, Reference):
1204 1209 raise TypeError("f must be callable, not %s"%type(f))
1205 1210 if not isinstance(args, (tuple, list)):
1206 1211 raise TypeError("args must be tuple or list, not %s"%type(args))
1207 1212 if not isinstance(kwargs, dict):
1208 1213 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1209 1214 if not isinstance(metadata, dict):
1210 1215 raise TypeError("metadata must be dict, not %s"%type(metadata))
1211 1216
1212 bufs = util.pack_apply_message(f, args, kwargs,
1217 bufs = serialize.pack_apply_message(f, args, kwargs,
1213 1218 buffer_threshold=self.session.buffer_threshold,
1214 1219 item_threshold=self.session.item_threshold,
1215 1220 )
1216 1221
1217 1222 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1218 1223 metadata=metadata, track=track)
1219 1224
1220 1225 msg_id = msg['header']['msg_id']
1221 1226 self.outstanding.add(msg_id)
1222 1227 if ident:
1223 1228 # possibly routed to a specific engine
1224 1229 if isinstance(ident, list):
1225 1230 ident = ident[-1]
1226 1231 if ident in self._engines.values():
1227 1232 # save for later, in case of engine death
1228 1233 self._outstanding_dict[ident].add(msg_id)
1229 1234 self.history.append(msg_id)
1230 1235 self.metadata[msg_id]['submitted'] = datetime.now()
1231 1236
1232 1237 return msg
1233 1238
1234 1239 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1235 1240 """construct and send an execute request via a socket.
1236 1241
1237 1242 """
1238 1243
1239 1244 if self._closed:
1240 1245 raise RuntimeError("Client cannot be used after its sockets have been closed")
1241 1246
1242 1247 # defaults:
1243 1248 metadata = metadata if metadata is not None else {}
1244 1249
1245 1250 # validate arguments
1246 1251 if not isinstance(code, basestring):
1247 1252 raise TypeError("code must be text, not %s" % type(code))
1248 1253 if not isinstance(metadata, dict):
1249 1254 raise TypeError("metadata must be dict, not %s" % type(metadata))
1250 1255
1251 1256 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1252 1257
1253 1258
1254 1259 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1255 1260 metadata=metadata)
1256 1261
1257 1262 msg_id = msg['header']['msg_id']
1258 1263 self.outstanding.add(msg_id)
1259 1264 if ident:
1260 1265 # possibly routed to a specific engine
1261 1266 if isinstance(ident, list):
1262 1267 ident = ident[-1]
1263 1268 if ident in self._engines.values():
1264 1269 # save for later, in case of engine death
1265 1270 self._outstanding_dict[ident].add(msg_id)
1266 1271 self.history.append(msg_id)
1267 1272 self.metadata[msg_id]['submitted'] = datetime.now()
1268 1273
1269 1274 return msg
1270 1275
1271 1276 #--------------------------------------------------------------------------
1272 1277 # construct a View object
1273 1278 #--------------------------------------------------------------------------
1274 1279
1275 1280 def load_balanced_view(self, targets=None):
1276 1281 """construct a DirectView object.
1277 1282
1278 1283 If no arguments are specified, create a LoadBalancedView
1279 1284 using all engines.
1280 1285
1281 1286 Parameters
1282 1287 ----------
1283 1288
1284 1289 targets: list,slice,int,etc. [default: use all engines]
1285 1290 The subset of engines across which to load-balance
1286 1291 """
1287 1292 if targets == 'all':
1288 1293 targets = None
1289 1294 if targets is not None:
1290 1295 targets = self._build_targets(targets)[1]
1291 1296 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1292 1297
1293 1298 def direct_view(self, targets='all'):
1294 1299 """construct a DirectView object.
1295 1300
1296 1301 If no targets are specified, create a DirectView using all engines.
1297 1302
1298 1303 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1299 1304 evaluate the target engines at each execution, whereas rc[:] will connect to
1300 1305 all *current* engines, and that list will not change.
1301 1306
1302 1307 That is, 'all' will always use all engines, whereas rc[:] will not use
1303 1308 engines added after the DirectView is constructed.
1304 1309
1305 1310 Parameters
1306 1311 ----------
1307 1312
1308 1313 targets: list,slice,int,etc. [default: use all engines]
1309 1314 The engines to use for the View
1310 1315 """
1311 1316 single = isinstance(targets, int)
1312 1317 # allow 'all' to be lazily evaluated at each execution
1313 1318 if targets != 'all':
1314 1319 targets = self._build_targets(targets)[1]
1315 1320 if single:
1316 1321 targets = targets[0]
1317 1322 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1318 1323
1319 1324 #--------------------------------------------------------------------------
1320 1325 # Query methods
1321 1326 #--------------------------------------------------------------------------
1322 1327
1323 1328 @spin_first
1324 1329 def get_result(self, indices_or_msg_ids=None, block=None):
1325 1330 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1326 1331
1327 1332 If the client already has the results, no request to the Hub will be made.
1328 1333
1329 1334 This is a convenient way to construct AsyncResult objects, which are wrappers
1330 1335 that include metadata about execution, and allow for awaiting results that
1331 1336 were not submitted by this Client.
1332 1337
1333 1338 It can also be a convenient way to retrieve the metadata associated with
1334 1339 blocking execution, since it always retrieves
1335 1340
1336 1341 Examples
1337 1342 --------
1338 1343 ::
1339 1344
1340 1345 In [10]: r = client.apply()
1341 1346
1342 1347 Parameters
1343 1348 ----------
1344 1349
1345 1350 indices_or_msg_ids : integer history index, str msg_id, or list of either
1346 1351 The indices or msg_ids of indices to be retrieved
1347 1352
1348 1353 block : bool
1349 1354 Whether to wait for the result to be done
1350 1355
1351 1356 Returns
1352 1357 -------
1353 1358
1354 1359 AsyncResult
1355 1360 A single AsyncResult object will always be returned.
1356 1361
1357 1362 AsyncHubResult
1358 1363 A subclass of AsyncResult that retrieves results from the Hub
1359 1364
1360 1365 """
1361 1366 block = self.block if block is None else block
1362 1367 if indices_or_msg_ids is None:
1363 1368 indices_or_msg_ids = -1
1364 1369
1365 1370 if not isinstance(indices_or_msg_ids, (list,tuple)):
1366 1371 indices_or_msg_ids = [indices_or_msg_ids]
1367 1372
1368 1373 theids = []
1369 1374 for id in indices_or_msg_ids:
1370 1375 if isinstance(id, int):
1371 1376 id = self.history[id]
1372 1377 if not isinstance(id, basestring):
1373 1378 raise TypeError("indices must be str or int, not %r"%id)
1374 1379 theids.append(id)
1375 1380
1376 1381 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1377 1382 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1378 1383
1379 1384 if remote_ids:
1380 1385 ar = AsyncHubResult(self, msg_ids=theids)
1381 1386 else:
1382 1387 ar = AsyncResult(self, msg_ids=theids)
1383 1388
1384 1389 if block:
1385 1390 ar.wait()
1386 1391
1387 1392 return ar
1388 1393
1389 1394 @spin_first
1390 1395 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1391 1396 """Resubmit one or more tasks.
1392 1397
1393 1398 in-flight tasks may not be resubmitted.
1394 1399
1395 1400 Parameters
1396 1401 ----------
1397 1402
1398 1403 indices_or_msg_ids : integer history index, str msg_id, or list of either
1399 1404 The indices or msg_ids of indices to be retrieved
1400 1405
1401 1406 block : bool
1402 1407 Whether to wait for the result to be done
1403 1408
1404 1409 Returns
1405 1410 -------
1406 1411
1407 1412 AsyncHubResult
1408 1413 A subclass of AsyncResult that retrieves results from the Hub
1409 1414
1410 1415 """
1411 1416 block = self.block if block is None else block
1412 1417 if indices_or_msg_ids is None:
1413 1418 indices_or_msg_ids = -1
1414 1419
1415 1420 if not isinstance(indices_or_msg_ids, (list,tuple)):
1416 1421 indices_or_msg_ids = [indices_or_msg_ids]
1417 1422
1418 1423 theids = []
1419 1424 for id in indices_or_msg_ids:
1420 1425 if isinstance(id, int):
1421 1426 id = self.history[id]
1422 1427 if not isinstance(id, basestring):
1423 1428 raise TypeError("indices must be str or int, not %r"%id)
1424 1429 theids.append(id)
1425 1430
1426 1431 content = dict(msg_ids = theids)
1427 1432
1428 1433 self.session.send(self._query_socket, 'resubmit_request', content)
1429 1434
1430 1435 zmq.select([self._query_socket], [], [])
1431 1436 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1432 1437 if self.debug:
1433 1438 pprint(msg)
1434 1439 content = msg['content']
1435 1440 if content['status'] != 'ok':
1436 1441 raise self._unwrap_exception(content)
1437 1442 mapping = content['resubmitted']
1438 1443 new_ids = [ mapping[msg_id] for msg_id in theids ]
1439 1444
1440 1445 ar = AsyncHubResult(self, msg_ids=new_ids)
1441 1446
1442 1447 if block:
1443 1448 ar.wait()
1444 1449
1445 1450 return ar
1446 1451
1447 1452 @spin_first
1448 1453 def result_status(self, msg_ids, status_only=True):
1449 1454 """Check on the status of the result(s) of the apply request with `msg_ids`.
1450 1455
1451 1456 If status_only is False, then the actual results will be retrieved, else
1452 1457 only the status of the results will be checked.
1453 1458
1454 1459 Parameters
1455 1460 ----------
1456 1461
1457 1462 msg_ids : list of msg_ids
1458 1463 if int:
1459 1464 Passed as index to self.history for convenience.
1460 1465 status_only : bool (default: True)
1461 1466 if False:
1462 1467 Retrieve the actual results of completed tasks.
1463 1468
1464 1469 Returns
1465 1470 -------
1466 1471
1467 1472 results : dict
1468 1473 There will always be the keys 'pending' and 'completed', which will
1469 1474 be lists of msg_ids that are incomplete or complete. If `status_only`
1470 1475 is False, then completed results will be keyed by their `msg_id`.
1471 1476 """
1472 1477 if not isinstance(msg_ids, (list,tuple)):
1473 1478 msg_ids = [msg_ids]
1474 1479
1475 1480 theids = []
1476 1481 for msg_id in msg_ids:
1477 1482 if isinstance(msg_id, int):
1478 1483 msg_id = self.history[msg_id]
1479 1484 if not isinstance(msg_id, basestring):
1480 1485 raise TypeError("msg_ids must be str, not %r"%msg_id)
1481 1486 theids.append(msg_id)
1482 1487
1483 1488 completed = []
1484 1489 local_results = {}
1485 1490
1486 1491 # comment this block out to temporarily disable local shortcut:
1487 1492 for msg_id in theids:
1488 1493 if msg_id in self.results:
1489 1494 completed.append(msg_id)
1490 1495 local_results[msg_id] = self.results[msg_id]
1491 1496 theids.remove(msg_id)
1492 1497
1493 1498 if theids: # some not locally cached
1494 1499 content = dict(msg_ids=theids, status_only=status_only)
1495 1500 msg = self.session.send(self._query_socket, "result_request", content=content)
1496 1501 zmq.select([self._query_socket], [], [])
1497 1502 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1498 1503 if self.debug:
1499 1504 pprint(msg)
1500 1505 content = msg['content']
1501 1506 if content['status'] != 'ok':
1502 1507 raise self._unwrap_exception(content)
1503 1508 buffers = msg['buffers']
1504 1509 else:
1505 1510 content = dict(completed=[],pending=[])
1506 1511
1507 1512 content['completed'].extend(completed)
1508 1513
1509 1514 if status_only:
1510 1515 return content
1511 1516
1512 1517 failures = []
1513 1518 # load cached results into result:
1514 1519 content.update(local_results)
1515 1520
1516 1521 # update cache with results:
1517 1522 for msg_id in sorted(theids):
1518 1523 if msg_id in content['completed']:
1519 1524 rec = content[msg_id]
1520 1525 parent = rec['header']
1521 1526 header = rec['result_header']
1522 1527 rcontent = rec['result_content']
1523 1528 iodict = rec['io']
1524 1529 if isinstance(rcontent, str):
1525 1530 rcontent = self.session.unpack(rcontent)
1526 1531
1527 1532 md = self.metadata[msg_id]
1528 1533 md_msg = dict(
1529 1534 content=rcontent,
1530 1535 parent_header=parent,
1531 1536 header=header,
1532 1537 metadata=rec['result_metadata'],
1533 1538 )
1534 1539 md.update(self._extract_metadata(md_msg))
1535 1540 if rec.get('received'):
1536 1541 md['received'] = rec['received']
1537 1542 md.update(iodict)
1538 1543
1539 1544 if rcontent['status'] == 'ok':
1540 1545 if header['msg_type'] == 'apply_reply':
1541 res,buffers = util.unserialize_object(buffers)
1546 res,buffers = serialize.unserialize_object(buffers)
1542 1547 elif header['msg_type'] == 'execute_reply':
1543 1548 res = ExecuteReply(msg_id, rcontent, md)
1544 1549 else:
1545 1550 raise KeyError("unhandled msg type: %r" % header[msg_type])
1546 1551 else:
1547 1552 res = self._unwrap_exception(rcontent)
1548 1553 failures.append(res)
1549 1554
1550 1555 self.results[msg_id] = res
1551 1556 content[msg_id] = res
1552 1557
1553 1558 if len(theids) == 1 and failures:
1554 1559 raise failures[0]
1555 1560
1556 1561 error.collect_exceptions(failures, "result_status")
1557 1562 return content
1558 1563
1559 1564 @spin_first
1560 1565 def queue_status(self, targets='all', verbose=False):
1561 1566 """Fetch the status of engine queues.
1562 1567
1563 1568 Parameters
1564 1569 ----------
1565 1570
1566 1571 targets : int/str/list of ints/strs
1567 1572 the engines whose states are to be queried.
1568 1573 default : all
1569 1574 verbose : bool
1570 1575 Whether to return lengths only, or lists of ids for each element
1571 1576 """
1572 1577 if targets == 'all':
1573 1578 # allow 'all' to be evaluated on the engine
1574 1579 engine_ids = None
1575 1580 else:
1576 1581 engine_ids = self._build_targets(targets)[1]
1577 1582 content = dict(targets=engine_ids, verbose=verbose)
1578 1583 self.session.send(self._query_socket, "queue_request", content=content)
1579 1584 idents,msg = self.session.recv(self._query_socket, 0)
1580 1585 if self.debug:
1581 1586 pprint(msg)
1582 1587 content = msg['content']
1583 1588 status = content.pop('status')
1584 1589 if status != 'ok':
1585 1590 raise self._unwrap_exception(content)
1586 1591 content = rekey(content)
1587 1592 if isinstance(targets, int):
1588 1593 return content[targets]
1589 1594 else:
1590 1595 return content
1591 1596
1592 1597 @spin_first
1593 1598 def purge_results(self, jobs=[], targets=[]):
1594 1599 """Tell the Hub to forget results.
1595 1600
1596 1601 Individual results can be purged by msg_id, or the entire
1597 1602 history of specific targets can be purged.
1598 1603
1599 1604 Use `purge_results('all')` to scrub everything from the Hub's db.
1600 1605
1601 1606 Parameters
1602 1607 ----------
1603 1608
1604 1609 jobs : str or list of str or AsyncResult objects
1605 1610 the msg_ids whose results should be forgotten.
1606 1611 targets : int/str/list of ints/strs
1607 1612 The targets, by int_id, whose entire history is to be purged.
1608 1613
1609 1614 default : None
1610 1615 """
1611 1616 if not targets and not jobs:
1612 1617 raise ValueError("Must specify at least one of `targets` and `jobs`")
1613 1618 if targets:
1614 1619 targets = self._build_targets(targets)[1]
1615 1620
1616 1621 # construct msg_ids from jobs
1617 1622 if jobs == 'all':
1618 1623 msg_ids = jobs
1619 1624 else:
1620 1625 msg_ids = []
1621 1626 if isinstance(jobs, (basestring,AsyncResult)):
1622 1627 jobs = [jobs]
1623 1628 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1624 1629 if bad_ids:
1625 1630 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1626 1631 for j in jobs:
1627 1632 if isinstance(j, AsyncResult):
1628 1633 msg_ids.extend(j.msg_ids)
1629 1634 else:
1630 1635 msg_ids.append(j)
1631 1636
1632 1637 content = dict(engine_ids=targets, msg_ids=msg_ids)
1633 1638 self.session.send(self._query_socket, "purge_request", content=content)
1634 1639 idents, msg = self.session.recv(self._query_socket, 0)
1635 1640 if self.debug:
1636 1641 pprint(msg)
1637 1642 content = msg['content']
1638 1643 if content['status'] != 'ok':
1639 1644 raise self._unwrap_exception(content)
1640 1645
1641 1646 @spin_first
1642 1647 def hub_history(self):
1643 1648 """Get the Hub's history
1644 1649
1645 1650 Just like the Client, the Hub has a history, which is a list of msg_ids.
1646 1651 This will contain the history of all clients, and, depending on configuration,
1647 1652 may contain history across multiple cluster sessions.
1648 1653
1649 1654 Any msg_id returned here is a valid argument to `get_result`.
1650 1655
1651 1656 Returns
1652 1657 -------
1653 1658
1654 1659 msg_ids : list of strs
1655 1660 list of all msg_ids, ordered by task submission time.
1656 1661 """
1657 1662
1658 1663 self.session.send(self._query_socket, "history_request", content={})
1659 1664 idents, msg = self.session.recv(self._query_socket, 0)
1660 1665
1661 1666 if self.debug:
1662 1667 pprint(msg)
1663 1668 content = msg['content']
1664 1669 if content['status'] != 'ok':
1665 1670 raise self._unwrap_exception(content)
1666 1671 else:
1667 1672 return content['history']
1668 1673
1669 1674 @spin_first
1670 1675 def db_query(self, query, keys=None):
1671 1676 """Query the Hub's TaskRecord database
1672 1677
1673 1678 This will return a list of task record dicts that match `query`
1674 1679
1675 1680 Parameters
1676 1681 ----------
1677 1682
1678 1683 query : mongodb query dict
1679 1684 The search dict. See mongodb query docs for details.
1680 1685 keys : list of strs [optional]
1681 1686 The subset of keys to be returned. The default is to fetch everything but buffers.
1682 1687 'msg_id' will *always* be included.
1683 1688 """
1684 1689 if isinstance(keys, basestring):
1685 1690 keys = [keys]
1686 1691 content = dict(query=query, keys=keys)
1687 1692 self.session.send(self._query_socket, "db_request", content=content)
1688 1693 idents, msg = self.session.recv(self._query_socket, 0)
1689 1694 if self.debug:
1690 1695 pprint(msg)
1691 1696 content = msg['content']
1692 1697 if content['status'] != 'ok':
1693 1698 raise self._unwrap_exception(content)
1694 1699
1695 1700 records = content['records']
1696 1701
1697 1702 buffer_lens = content['buffer_lens']
1698 1703 result_buffer_lens = content['result_buffer_lens']
1699 1704 buffers = msg['buffers']
1700 1705 has_bufs = buffer_lens is not None
1701 1706 has_rbufs = result_buffer_lens is not None
1702 1707 for i,rec in enumerate(records):
1703 1708 # relink buffers
1704 1709 if has_bufs:
1705 1710 blen = buffer_lens[i]
1706 1711 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1707 1712 if has_rbufs:
1708 1713 blen = result_buffer_lens[i]
1709 1714 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1710 1715
1711 1716 return records
1712 1717
1713 1718 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now