##// END OF EJS Templates
cleanup Client.close...
MinRK -
Show More
@@ -1,1845 +1,1853 b''
1 1 """A semi-synchronous Client for the ZMQ cluster
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import json
20 20 import sys
21 21 from threading import Thread, Event
22 22 import time
23 23 import warnings
24 24 from datetime import datetime
25 25 from getpass import getpass
26 26 from pprint import pprint
27 27
28 28 pjoin = os.path.join
29 29
30 30 import zmq
31 31 # from zmq.eventloop import ioloop, zmqstream
32 32
33 33 from IPython.config.configurable import MultipleInstanceError
34 34 from IPython.core.application import BaseIPythonApplication
35 35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36 36
37 37 from IPython.utils.capture import RichOutput
38 38 from IPython.utils.coloransi import TermColors
39 39 from IPython.utils.jsonutil import rekey
40 40 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
41 41 from IPython.utils.path import get_ipython_dir
42 42 from IPython.utils.py3compat import cast_bytes
43 43 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
44 44 Dict, List, Bool, Set, Any)
45 45 from IPython.external.decorator import decorator
46 46 from IPython.external.ssh import tunnel
47 47
48 48 from IPython.parallel import Reference
49 49 from IPython.parallel import error
50 50 from IPython.parallel import util
51 51
52 52 from IPython.kernel.zmq.session import Session, Message
53 53 from IPython.kernel.zmq import serialize
54 54
55 55 from .asyncresult import AsyncResult, AsyncHubResult
56 56 from .view import DirectView, LoadBalancedView
57 57
58 58 if sys.version_info[0] >= 3:
59 59 # xrange is used in a couple 'isinstance' tests in py2
60 60 # should be just 'range' in 3k
61 61 xrange = range
62 62
63 63 #--------------------------------------------------------------------------
64 64 # Decorators for Client methods
65 65 #--------------------------------------------------------------------------
66 66
67 67 @decorator
68 68 def spin_first(f, self, *args, **kwargs):
69 69 """Call spin() to sync state prior to calling the method."""
70 70 self.spin()
71 71 return f(self, *args, **kwargs)
72 72
73 73
74 74 #--------------------------------------------------------------------------
75 75 # Classes
76 76 #--------------------------------------------------------------------------
77 77
78 78
79 79 class ExecuteReply(RichOutput):
80 80 """wrapper for finished Execute results"""
81 81 def __init__(self, msg_id, content, metadata):
82 82 self.msg_id = msg_id
83 83 self._content = content
84 84 self.execution_count = content['execution_count']
85 85 self.metadata = metadata
86 86
87 87 # RichOutput overrides
88 88
89 89 @property
90 90 def source(self):
91 91 pyout = self.metadata['pyout']
92 92 if pyout:
93 93 return pyout.get('source', '')
94 94
95 95 @property
96 96 def data(self):
97 97 pyout = self.metadata['pyout']
98 98 if pyout:
99 99 return pyout.get('data', {})
100 100
101 101 @property
102 102 def _metadata(self):
103 103 pyout = self.metadata['pyout']
104 104 if pyout:
105 105 return pyout.get('metadata', {})
106 106
107 107 def display(self):
108 108 from IPython.display import publish_display_data
109 109 publish_display_data(self.source, self.data, self.metadata)
110 110
111 111 def _repr_mime_(self, mime):
112 112 if mime not in self.data:
113 113 return
114 114 data = self.data[mime]
115 115 if mime in self._metadata:
116 116 return data, self._metadata[mime]
117 117 else:
118 118 return data
119 119
120 120 def __getitem__(self, key):
121 121 return self.metadata[key]
122 122
123 123 def __getattr__(self, key):
124 124 if key not in self.metadata:
125 125 raise AttributeError(key)
126 126 return self.metadata[key]
127 127
128 128 def __repr__(self):
129 129 pyout = self.metadata['pyout'] or {'data':{}}
130 130 text_out = pyout['data'].get('text/plain', '')
131 131 if len(text_out) > 32:
132 132 text_out = text_out[:29] + '...'
133 133
134 134 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
135 135
136 136 def _repr_pretty_(self, p, cycle):
137 137 pyout = self.metadata['pyout'] or {'data':{}}
138 138 text_out = pyout['data'].get('text/plain', '')
139 139
140 140 if not text_out:
141 141 return
142 142
143 143 try:
144 144 ip = get_ipython()
145 145 except NameError:
146 146 colors = "NoColor"
147 147 else:
148 148 colors = ip.colors
149 149
150 150 if colors == "NoColor":
151 151 out = normal = ""
152 152 else:
153 153 out = TermColors.Red
154 154 normal = TermColors.Normal
155 155
156 156 if '\n' in text_out and not text_out.startswith('\n'):
157 157 # add newline for multiline reprs
158 158 text_out = '\n' + text_out
159 159
160 160 p.text(
161 161 out + u'Out[%i:%i]: ' % (
162 162 self.metadata['engine_id'], self.execution_count
163 163 ) + normal + text_out
164 164 )
165 165
166 166
167 167 class Metadata(dict):
168 168 """Subclass of dict for initializing metadata values.
169 169
170 170 Attribute access works on keys.
171 171
172 172 These objects have a strict set of keys - errors will raise if you try
173 173 to add new keys.
174 174 """
175 175 def __init__(self, *args, **kwargs):
176 176 dict.__init__(self)
177 177 md = {'msg_id' : None,
178 178 'submitted' : None,
179 179 'started' : None,
180 180 'completed' : None,
181 181 'received' : None,
182 182 'engine_uuid' : None,
183 183 'engine_id' : None,
184 184 'follow' : None,
185 185 'after' : None,
186 186 'status' : None,
187 187
188 188 'pyin' : None,
189 189 'pyout' : None,
190 190 'pyerr' : None,
191 191 'stdout' : '',
192 192 'stderr' : '',
193 193 'outputs' : [],
194 194 'data': {},
195 195 'outputs_ready' : False,
196 196 }
197 197 self.update(md)
198 198 self.update(dict(*args, **kwargs))
199 199
200 200 def __getattr__(self, key):
201 201 """getattr aliased to getitem"""
202 202 if key in self.iterkeys():
203 203 return self[key]
204 204 else:
205 205 raise AttributeError(key)
206 206
207 207 def __setattr__(self, key, value):
208 208 """setattr aliased to setitem, with strict"""
209 209 if key in self.iterkeys():
210 210 self[key] = value
211 211 else:
212 212 raise AttributeError(key)
213 213
214 214 def __setitem__(self, key, value):
215 215 """strict static key enforcement"""
216 216 if key in self.iterkeys():
217 217 dict.__setitem__(self, key, value)
218 218 else:
219 219 raise KeyError(key)
220 220
221 221
222 222 class Client(HasTraits):
223 223 """A semi-synchronous client to the IPython ZMQ cluster
224 224
225 225 Parameters
226 226 ----------
227 227
228 228 url_file : str/unicode; path to ipcontroller-client.json
229 229 This JSON file should contain all the information needed to connect to a cluster,
230 230 and is likely the only argument needed.
231 231 Connection information for the Hub's registration. If a json connector
232 232 file is given, then likely no further configuration is necessary.
233 233 [Default: use profile]
234 234 profile : bytes
235 235 The name of the Cluster profile to be used to find connector information.
236 236 If run from an IPython application, the default profile will be the same
237 237 as the running application, otherwise it will be 'default'.
238 238 cluster_id : str
239 239 String id to added to runtime files, to prevent name collisions when using
240 240 multiple clusters with a single profile simultaneously.
241 241 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
242 242 Since this is text inserted into filenames, typical recommendations apply:
243 243 Simple character strings are ideal, and spaces are not recommended (but
244 244 should generally work)
245 245 context : zmq.Context
246 246 Pass an existing zmq.Context instance, otherwise the client will create its own.
247 247 debug : bool
248 248 flag for lots of message printing for debug purposes
249 249 timeout : int/float
250 250 time (in seconds) to wait for connection replies from the Hub
251 251 [Default: 10]
252 252
253 253 #-------------- session related args ----------------
254 254
255 255 config : Config object
256 256 If specified, this will be relayed to the Session for configuration
257 257 username : str
258 258 set username for the session object
259 259
260 260 #-------------- ssh related args ----------------
261 261 # These are args for configuring the ssh tunnel to be used
262 262 # credentials are used to forward connections over ssh to the Controller
263 263 # Note that the ip given in `addr` needs to be relative to sshserver
264 264 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
265 265 # and set sshserver as the same machine the Controller is on. However,
266 266 # the only requirement is that sshserver is able to see the Controller
267 267 # (i.e. is within the same trusted network).
268 268
269 269 sshserver : str
270 270 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
271 271 If keyfile or password is specified, and this is not, it will default to
272 272 the ip given in addr.
273 273 sshkey : str; path to ssh private key file
274 274 This specifies a key to be used in ssh login, default None.
275 275 Regular default ssh keys will be used without specifying this argument.
276 276 password : str
277 277 Your ssh password to sshserver. Note that if this is left None,
278 278 you will be prompted for it if passwordless key based login is unavailable.
279 279 paramiko : bool
280 280 flag for whether to use paramiko instead of shell ssh for tunneling.
281 281 [default: True on win32, False else]
282 282
283 283
284 284 Attributes
285 285 ----------
286 286
287 287 ids : list of int engine IDs
288 288 requesting the ids attribute always synchronizes
289 289 the registration state. To request ids without synchronization,
290 290 use semi-private _ids attributes.
291 291
292 292 history : list of msg_ids
293 293 a list of msg_ids, keeping track of all the execution
294 294 messages you have submitted in order.
295 295
296 296 outstanding : set of msg_ids
297 297 a set of msg_ids that have been submitted, but whose
298 298 results have not yet been received.
299 299
300 300 results : dict
301 301 a dict of all our results, keyed by msg_id
302 302
303 303 block : bool
304 304 determines default behavior when block not specified
305 305 in execution methods
306 306
307 307 Methods
308 308 -------
309 309
310 310 spin
311 311 flushes incoming results and registration state changes
312 312 control methods spin, and requesting `ids` also ensures up to date
313 313
314 314 wait
315 315 wait on one or more msg_ids
316 316
317 317 execution methods
318 318 apply
319 319 legacy: execute, run
320 320
321 321 data movement
322 322 push, pull, scatter, gather
323 323
324 324 query methods
325 325 queue_status, get_result, purge, result_status
326 326
327 327 control methods
328 328 abort, shutdown
329 329
330 330 """
331 331
332 332
333 333 block = Bool(False)
334 334 outstanding = Set()
335 335 results = Instance('collections.defaultdict', (dict,))
336 336 metadata = Instance('collections.defaultdict', (Metadata,))
337 337 history = List()
338 338 debug = Bool(False)
339 339 _spin_thread = Any()
340 340 _stop_spinning = Any()
341 341
342 342 profile=Unicode()
343 343 def _profile_default(self):
344 344 if BaseIPythonApplication.initialized():
345 345 # an IPython app *might* be running, try to get its profile
346 346 try:
347 347 return BaseIPythonApplication.instance().profile
348 348 except (AttributeError, MultipleInstanceError):
349 349 # could be a *different* subclass of config.Application,
350 350 # which would raise one of these two errors.
351 351 return u'default'
352 352 else:
353 353 return u'default'
354 354
355 355
356 356 _outstanding_dict = Instance('collections.defaultdict', (set,))
357 357 _ids = List()
358 358 _connected=Bool(False)
359 359 _ssh=Bool(False)
360 360 _context = Instance('zmq.Context')
361 361 _config = Dict()
362 362 _engines=Instance(util.ReverseDict, (), {})
363 363 # _hub_socket=Instance('zmq.Socket')
364 364 _query_socket=Instance('zmq.Socket')
365 365 _control_socket=Instance('zmq.Socket')
366 366 _iopub_socket=Instance('zmq.Socket')
367 367 _notification_socket=Instance('zmq.Socket')
368 368 _mux_socket=Instance('zmq.Socket')
369 369 _task_socket=Instance('zmq.Socket')
370 370 _task_scheme=Unicode()
371 371 _closed = False
372 372 _ignored_control_replies=Integer(0)
373 373 _ignored_hub_replies=Integer(0)
374 374
375 375 def __new__(self, *args, **kw):
376 376 # don't raise on positional args
377 377 return HasTraits.__new__(self, **kw)
378 378
379 379 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
380 380 context=None, debug=False,
381 381 sshserver=None, sshkey=None, password=None, paramiko=None,
382 382 timeout=10, cluster_id=None, **extra_args
383 383 ):
384 384 if profile:
385 385 super(Client, self).__init__(debug=debug, profile=profile)
386 386 else:
387 387 super(Client, self).__init__(debug=debug)
388 388 if context is None:
389 389 context = zmq.Context.instance()
390 390 self._context = context
391 391 self._stop_spinning = Event()
392 392
393 393 if 'url_or_file' in extra_args:
394 394 url_file = extra_args['url_or_file']
395 395 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
396 396
397 397 if url_file and util.is_url(url_file):
398 398 raise ValueError("single urls cannot be specified, url-files must be used.")
399 399
400 400 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
401 401
402 402 if self._cd is not None:
403 403 if url_file is None:
404 404 if not cluster_id:
405 405 client_json = 'ipcontroller-client.json'
406 406 else:
407 407 client_json = 'ipcontroller-%s-client.json' % cluster_id
408 408 url_file = pjoin(self._cd.security_dir, client_json)
409 409 if url_file is None:
410 410 raise ValueError(
411 411 "I can't find enough information to connect to a hub!"
412 412 " Please specify at least one of url_file or profile."
413 413 )
414 414
415 415 with open(url_file) as f:
416 416 cfg = json.load(f)
417 417
418 418 self._task_scheme = cfg['task_scheme']
419 419
420 420 # sync defaults from args, json:
421 421 if sshserver:
422 422 cfg['ssh'] = sshserver
423 423
424 424 location = cfg.setdefault('location', None)
425 425
426 426 proto,addr = cfg['interface'].split('://')
427 427 addr = util.disambiguate_ip_address(addr, location)
428 428 cfg['interface'] = "%s://%s" % (proto, addr)
429 429
430 430 # turn interface,port into full urls:
431 431 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
432 432 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
433 433
434 434 url = cfg['registration']
435 435
436 436 if location is not None and addr == LOCALHOST:
437 437 # location specified, and connection is expected to be local
438 438 if location not in LOCAL_IPS and not sshserver:
439 439 # load ssh from JSON *only* if the controller is not on
440 440 # this machine
441 441 sshserver=cfg['ssh']
442 442 if location not in LOCAL_IPS and not sshserver:
443 443 # warn if no ssh specified, but SSH is probably needed
444 444 # This is only a warning, because the most likely cause
445 445 # is a local Controller on a laptop whose IP is dynamic
446 446 warnings.warn("""
447 447 Controller appears to be listening on localhost, but not on this machine.
448 448 If this is true, you should specify Client(...,sshserver='you@%s')
449 449 or instruct your controller to listen on an external IP."""%location,
450 450 RuntimeWarning)
451 451 elif not sshserver:
452 452 # otherwise sync with cfg
453 453 sshserver = cfg['ssh']
454 454
455 455 self._config = cfg
456 456
457 457 self._ssh = bool(sshserver or sshkey or password)
458 458 if self._ssh and sshserver is None:
459 459 # default to ssh via localhost
460 460 sshserver = addr
461 461 if self._ssh and password is None:
462 462 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
463 463 password=False
464 464 else:
465 465 password = getpass("SSH Password for %s: "%sshserver)
466 466 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
467 467
468 468 # configure and construct the session
469 469 try:
470 470 extra_args['packer'] = cfg['pack']
471 471 extra_args['unpacker'] = cfg['unpack']
472 472 extra_args['key'] = cast_bytes(cfg['key'])
473 473 extra_args['signature_scheme'] = cfg['signature_scheme']
474 474 except KeyError as exc:
475 475 msg = '\n'.join([
476 476 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
477 477 "If you are reusing connection files, remove them and start ipcontroller again."
478 478 ])
479 479 raise ValueError(msg.format(exc.message))
480 480
481 481 self.session = Session(**extra_args)
482 482
483 483 self._query_socket = self._context.socket(zmq.DEALER)
484 484
485 485 if self._ssh:
486 486 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
487 487 else:
488 488 self._query_socket.connect(cfg['registration'])
489 489
490 490 self.session.debug = self.debug
491 491
492 492 self._notification_handlers = {'registration_notification' : self._register_engine,
493 493 'unregistration_notification' : self._unregister_engine,
494 494 'shutdown_notification' : lambda msg: self.close(),
495 495 }
496 496 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
497 497 'apply_reply' : self._handle_apply_reply}
498 498 self._connect(sshserver, ssh_kwargs, timeout)
499 499
500 500 # last step: setup magics, if we are in IPython:
501 501
502 502 try:
503 503 ip = get_ipython()
504 504 except NameError:
505 505 return
506 506 else:
507 507 if 'px' not in ip.magics_manager.magics:
508 508 # in IPython but we are the first Client.
509 509 # activate a default view for parallel magics.
510 510 self.activate()
511 511
512 512 def __del__(self):
513 513 """cleanup sockets, but _not_ context."""
514 514 self.close()
515 515
516 516 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
517 517 if ipython_dir is None:
518 518 ipython_dir = get_ipython_dir()
519 519 if profile_dir is not None:
520 520 try:
521 521 self._cd = ProfileDir.find_profile_dir(profile_dir)
522 522 return
523 523 except ProfileDirError:
524 524 pass
525 525 elif profile is not None:
526 526 try:
527 527 self._cd = ProfileDir.find_profile_dir_by_name(
528 528 ipython_dir, profile)
529 529 return
530 530 except ProfileDirError:
531 531 pass
532 532 self._cd = None
533 533
534 534 def _update_engines(self, engines):
535 535 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
536 536 for k,v in engines.iteritems():
537 537 eid = int(k)
538 538 if eid not in self._engines:
539 539 self._ids.append(eid)
540 540 self._engines[eid] = v
541 541 self._ids = sorted(self._ids)
542 542 if sorted(self._engines.keys()) != range(len(self._engines)) and \
543 543 self._task_scheme == 'pure' and self._task_socket:
544 544 self._stop_scheduling_tasks()
545 545
546 546 def _stop_scheduling_tasks(self):
547 547 """Stop scheduling tasks because an engine has been unregistered
548 548 from a pure ZMQ scheduler.
549 549 """
550 550 self._task_socket.close()
551 551 self._task_socket = None
552 552 msg = "An engine has been unregistered, and we are using pure " +\
553 553 "ZMQ task scheduling. Task farming will be disabled."
554 554 if self.outstanding:
555 555 msg += " If you were running tasks when this happened, " +\
556 556 "some `outstanding` msg_ids may never resolve."
557 557 warnings.warn(msg, RuntimeWarning)
558 558
559 559 def _build_targets(self, targets):
560 560 """Turn valid target IDs or 'all' into two lists:
561 561 (int_ids, uuids).
562 562 """
563 563 if not self._ids:
564 564 # flush notification socket if no engines yet, just in case
565 565 if not self.ids:
566 566 raise error.NoEnginesRegistered("Can't build targets without any engines")
567 567
568 568 if targets is None:
569 569 targets = self._ids
570 570 elif isinstance(targets, basestring):
571 571 if targets.lower() == 'all':
572 572 targets = self._ids
573 573 else:
574 574 raise TypeError("%r not valid str target, must be 'all'"%(targets))
575 575 elif isinstance(targets, int):
576 576 if targets < 0:
577 577 targets = self.ids[targets]
578 578 if targets not in self._ids:
579 579 raise IndexError("No such engine: %i"%targets)
580 580 targets = [targets]
581 581
582 582 if isinstance(targets, slice):
583 583 indices = range(len(self._ids))[targets]
584 584 ids = self.ids
585 585 targets = [ ids[i] for i in indices ]
586 586
587 587 if not isinstance(targets, (tuple, list, xrange)):
588 588 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
589 589
590 590 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
591 591
592 592 def _connect(self, sshserver, ssh_kwargs, timeout):
593 593 """setup all our socket connections to the cluster. This is called from
594 594 __init__."""
595 595
596 596 # Maybe allow reconnecting?
597 597 if self._connected:
598 598 return
599 599 self._connected=True
600 600
601 601 def connect_socket(s, url):
602 # url = util.disambiguate_url(url, self._config['location'])
603 602 if self._ssh:
604 603 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
605 604 else:
606 605 return s.connect(url)
607 606
608 607 self.session.send(self._query_socket, 'connection_request')
609 608 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
610 609 poller = zmq.Poller()
611 610 poller.register(self._query_socket, zmq.POLLIN)
612 611 # poll expects milliseconds, timeout is seconds
613 612 evts = poller.poll(timeout*1000)
614 613 if not evts:
615 614 raise error.TimeoutError("Hub connection request timed out")
616 615 idents,msg = self.session.recv(self._query_socket,mode=0)
617 616 if self.debug:
618 617 pprint(msg)
619 618 content = msg['content']
620 619 # self._config['registration'] = dict(content)
621 620 cfg = self._config
622 621 if content['status'] == 'ok':
623 622 self._mux_socket = self._context.socket(zmq.DEALER)
624 623 connect_socket(self._mux_socket, cfg['mux'])
625 624
626 625 self._task_socket = self._context.socket(zmq.DEALER)
627 626 connect_socket(self._task_socket, cfg['task'])
628 627
629 628 self._notification_socket = self._context.socket(zmq.SUB)
630 629 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
631 630 connect_socket(self._notification_socket, cfg['notification'])
632 631
633 632 self._control_socket = self._context.socket(zmq.DEALER)
634 633 connect_socket(self._control_socket, cfg['control'])
635 634
636 635 self._iopub_socket = self._context.socket(zmq.SUB)
637 636 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
638 637 connect_socket(self._iopub_socket, cfg['iopub'])
639 638
640 639 self._update_engines(dict(content['engines']))
641 640 else:
642 641 self._connected = False
643 642 raise Exception("Failed to connect!")
644 643
645 644 #--------------------------------------------------------------------------
646 645 # handlers and callbacks for incoming messages
647 646 #--------------------------------------------------------------------------
648 647
649 648 def _unwrap_exception(self, content):
650 649 """unwrap exception, and remap engine_id to int."""
651 650 e = error.unwrap_exception(content)
652 651 # print e.traceback
653 652 if e.engine_info:
654 653 e_uuid = e.engine_info['engine_uuid']
655 654 eid = self._engines[e_uuid]
656 655 e.engine_info['engine_id'] = eid
657 656 return e
658 657
659 658 def _extract_metadata(self, msg):
660 659 header = msg['header']
661 660 parent = msg['parent_header']
662 661 msg_meta = msg['metadata']
663 662 content = msg['content']
664 663 md = {'msg_id' : parent['msg_id'],
665 664 'received' : datetime.now(),
666 665 'engine_uuid' : msg_meta.get('engine', None),
667 666 'follow' : msg_meta.get('follow', []),
668 667 'after' : msg_meta.get('after', []),
669 668 'status' : content['status'],
670 669 }
671 670
672 671 if md['engine_uuid'] is not None:
673 672 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
674 673
675 674 if 'date' in parent:
676 675 md['submitted'] = parent['date']
677 676 if 'started' in msg_meta:
678 677 md['started'] = msg_meta['started']
679 678 if 'date' in header:
680 679 md['completed'] = header['date']
681 680 return md
682 681
683 682 def _register_engine(self, msg):
684 683 """Register a new engine, and update our connection info."""
685 684 content = msg['content']
686 685 eid = content['id']
687 686 d = {eid : content['uuid']}
688 687 self._update_engines(d)
689 688
690 689 def _unregister_engine(self, msg):
691 690 """Unregister an engine that has died."""
692 691 content = msg['content']
693 692 eid = int(content['id'])
694 693 if eid in self._ids:
695 694 self._ids.remove(eid)
696 695 uuid = self._engines.pop(eid)
697 696
698 697 self._handle_stranded_msgs(eid, uuid)
699 698
700 699 if self._task_socket and self._task_scheme == 'pure':
701 700 self._stop_scheduling_tasks()
702 701
703 702 def _handle_stranded_msgs(self, eid, uuid):
704 703 """Handle messages known to be on an engine when the engine unregisters.
705 704
706 705 It is possible that this will fire prematurely - that is, an engine will
707 706 go down after completing a result, and the client will be notified
708 707 of the unregistration and later receive the successful result.
709 708 """
710 709
711 710 outstanding = self._outstanding_dict[uuid]
712 711
713 712 for msg_id in list(outstanding):
714 713 if msg_id in self.results:
715 714 # we already
716 715 continue
717 716 try:
718 717 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
719 718 except:
720 719 content = error.wrap_exception()
721 720 # build a fake message:
722 721 msg = self.session.msg('apply_reply', content=content)
723 722 msg['parent_header']['msg_id'] = msg_id
724 723 msg['metadata']['engine'] = uuid
725 724 self._handle_apply_reply(msg)
726 725
727 726 def _handle_execute_reply(self, msg):
728 727 """Save the reply to an execute_request into our results.
729 728
730 729 execute messages are never actually used. apply is used instead.
731 730 """
732 731
733 732 parent = msg['parent_header']
734 733 msg_id = parent['msg_id']
735 734 if msg_id not in self.outstanding:
736 735 if msg_id in self.history:
737 736 print ("got stale result: %s"%msg_id)
738 737 else:
739 738 print ("got unknown result: %s"%msg_id)
740 739 else:
741 740 self.outstanding.remove(msg_id)
742 741
743 742 content = msg['content']
744 743 header = msg['header']
745 744
746 745 # construct metadata:
747 746 md = self.metadata[msg_id]
748 747 md.update(self._extract_metadata(msg))
749 748 # is this redundant?
750 749 self.metadata[msg_id] = md
751 750
752 751 e_outstanding = self._outstanding_dict[md['engine_uuid']]
753 752 if msg_id in e_outstanding:
754 753 e_outstanding.remove(msg_id)
755 754
756 755 # construct result:
757 756 if content['status'] == 'ok':
758 757 self.results[msg_id] = ExecuteReply(msg_id, content, md)
759 758 elif content['status'] == 'aborted':
760 759 self.results[msg_id] = error.TaskAborted(msg_id)
761 760 elif content['status'] == 'resubmitted':
762 761 # TODO: handle resubmission
763 762 pass
764 763 else:
765 764 self.results[msg_id] = self._unwrap_exception(content)
766 765
767 766 def _handle_apply_reply(self, msg):
768 767 """Save the reply to an apply_request into our results."""
769 768 parent = msg['parent_header']
770 769 msg_id = parent['msg_id']
771 770 if msg_id not in self.outstanding:
772 771 if msg_id in self.history:
773 772 print ("got stale result: %s"%msg_id)
774 773 print self.results[msg_id]
775 774 print msg
776 775 else:
777 776 print ("got unknown result: %s"%msg_id)
778 777 else:
779 778 self.outstanding.remove(msg_id)
780 779 content = msg['content']
781 780 header = msg['header']
782 781
783 782 # construct metadata:
784 783 md = self.metadata[msg_id]
785 784 md.update(self._extract_metadata(msg))
786 785 # is this redundant?
787 786 self.metadata[msg_id] = md
788 787
789 788 e_outstanding = self._outstanding_dict[md['engine_uuid']]
790 789 if msg_id in e_outstanding:
791 790 e_outstanding.remove(msg_id)
792 791
793 792 # construct result:
794 793 if content['status'] == 'ok':
795 794 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
796 795 elif content['status'] == 'aborted':
797 796 self.results[msg_id] = error.TaskAborted(msg_id)
798 797 elif content['status'] == 'resubmitted':
799 798 # TODO: handle resubmission
800 799 pass
801 800 else:
802 801 self.results[msg_id] = self._unwrap_exception(content)
803 802
804 803 def _flush_notifications(self):
805 804 """Flush notifications of engine registrations waiting
806 805 in ZMQ queue."""
807 806 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
808 807 while msg is not None:
809 808 if self.debug:
810 809 pprint(msg)
811 810 msg_type = msg['header']['msg_type']
812 811 handler = self._notification_handlers.get(msg_type, None)
813 812 if handler is None:
814 813 raise Exception("Unhandled message type: %s" % msg_type)
815 814 else:
816 815 handler(msg)
817 816 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
818 817
819 818 def _flush_results(self, sock):
820 819 """Flush task or queue results waiting in ZMQ queue."""
821 820 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
822 821 while msg is not None:
823 822 if self.debug:
824 823 pprint(msg)
825 824 msg_type = msg['header']['msg_type']
826 825 handler = self._queue_handlers.get(msg_type, None)
827 826 if handler is None:
828 827 raise Exception("Unhandled message type: %s" % msg_type)
829 828 else:
830 829 handler(msg)
831 830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
832 831
833 832 def _flush_control(self, sock):
834 833 """Flush replies from the control channel waiting
835 834 in the ZMQ queue.
836 835
837 836 Currently: ignore them."""
838 837 if self._ignored_control_replies <= 0:
839 838 return
840 839 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
841 840 while msg is not None:
842 841 self._ignored_control_replies -= 1
843 842 if self.debug:
844 843 pprint(msg)
845 844 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
846 845
847 846 def _flush_ignored_control(self):
848 847 """flush ignored control replies"""
849 848 while self._ignored_control_replies > 0:
850 849 self.session.recv(self._control_socket)
851 850 self._ignored_control_replies -= 1
852 851
853 852 def _flush_ignored_hub_replies(self):
854 853 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
855 854 while msg is not None:
856 855 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
857 856
858 857 def _flush_iopub(self, sock):
859 858 """Flush replies from the iopub channel waiting
860 859 in the ZMQ queue.
861 860 """
862 861 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
863 862 while msg is not None:
864 863 if self.debug:
865 864 pprint(msg)
866 865 parent = msg['parent_header']
867 866 # ignore IOPub messages with no parent.
868 867 # Caused by print statements or warnings from before the first execution.
869 868 if not parent:
870 869 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
871 870 continue
872 871 msg_id = parent['msg_id']
873 872 content = msg['content']
874 873 header = msg['header']
875 874 msg_type = msg['header']['msg_type']
876 875
877 876 # init metadata:
878 877 md = self.metadata[msg_id]
879 878
880 879 if msg_type == 'stream':
881 880 name = content['name']
882 881 s = md[name] or ''
883 882 md[name] = s + content['data']
884 883 elif msg_type == 'pyerr':
885 884 md.update({'pyerr' : self._unwrap_exception(content)})
886 885 elif msg_type == 'pyin':
887 886 md.update({'pyin' : content['code']})
888 887 elif msg_type == 'display_data':
889 888 md['outputs'].append(content)
890 889 elif msg_type == 'pyout':
891 890 md['pyout'] = content
892 891 elif msg_type == 'data_message':
893 892 data, remainder = serialize.unserialize_object(msg['buffers'])
894 893 md['data'].update(data)
895 894 elif msg_type == 'status':
896 895 # idle message comes after all outputs
897 896 if content['execution_state'] == 'idle':
898 897 md['outputs_ready'] = True
899 898 else:
900 899 # unhandled msg_type (status, etc.)
901 900 pass
902 901
903 902 # reduntant?
904 903 self.metadata[msg_id] = md
905 904
906 905 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
907 906
908 907 #--------------------------------------------------------------------------
909 908 # len, getitem
910 909 #--------------------------------------------------------------------------
911 910
912 911 def __len__(self):
913 912 """len(client) returns # of engines."""
914 913 return len(self.ids)
915 914
916 915 def __getitem__(self, key):
917 916 """index access returns DirectView multiplexer objects
918 917
919 918 Must be int, slice, or list/tuple/xrange of ints"""
920 919 if not isinstance(key, (int, slice, tuple, list, xrange)):
921 920 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
922 921 else:
923 922 return self.direct_view(key)
924 923
925 924 #--------------------------------------------------------------------------
926 925 # Begin public methods
927 926 #--------------------------------------------------------------------------
928 927
929 928 @property
930 929 def ids(self):
931 930 """Always up-to-date ids property."""
932 931 self._flush_notifications()
933 932 # always copy:
934 933 return list(self._ids)
935 934
936 935 def activate(self, targets='all', suffix=''):
937 936 """Create a DirectView and register it with IPython magics
938 937
939 938 Defines the magics `%px, %autopx, %pxresult, %%px`
940 939
941 940 Parameters
942 941 ----------
943 942
944 943 targets: int, list of ints, or 'all'
945 944 The engines on which the view's magics will run
946 945 suffix: str [default: '']
947 946 The suffix, if any, for the magics. This allows you to have
948 947 multiple views associated with parallel magics at the same time.
949 948
950 949 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
951 950 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
952 951 on engine 0.
953 952 """
954 953 view = self.direct_view(targets)
955 954 view.block = True
956 955 view.activate(suffix)
957 956 return view
958 957
959 def close(self):
958 def close(self, linger=None):
959 """Close my zmq Sockets
960
961 If `linger`, set the zmq LINGER socket option,
962 which allows discarding of messages.
963 """
960 964 if self._closed:
961 965 return
962 966 self.stop_spin_thread()
963 snames = filter(lambda n: n.endswith('socket'), dir(self))
964 for socket in map(lambda name: getattr(self, name), snames):
965 if isinstance(socket, zmq.Socket) and not socket.closed:
966 socket.close()
967 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
968 for name in snames:
969 socket = getattr(self, name)
970 if socket is not None and not socket.closed:
971 if linger is not None:
972 socket.close(linger=linger)
973 else:
974 socket.close()
967 975 self._closed = True
968 976
969 977 def _spin_every(self, interval=1):
970 978 """target func for use in spin_thread"""
971 979 while True:
972 980 if self._stop_spinning.is_set():
973 981 return
974 982 time.sleep(interval)
975 983 self.spin()
976 984
977 985 def spin_thread(self, interval=1):
978 986 """call Client.spin() in a background thread on some regular interval
979 987
980 988 This helps ensure that messages don't pile up too much in the zmq queue
981 989 while you are working on other things, or just leaving an idle terminal.
982 990
983 991 It also helps limit potential padding of the `received` timestamp
984 992 on AsyncResult objects, used for timings.
985 993
986 994 Parameters
987 995 ----------
988 996
989 997 interval : float, optional
990 998 The interval on which to spin the client in the background thread
991 999 (simply passed to time.sleep).
992 1000
993 1001 Notes
994 1002 -----
995 1003
996 1004 For precision timing, you may want to use this method to put a bound
997 1005 on the jitter (in seconds) in `received` timestamps used
998 1006 in AsyncResult.wall_time.
999 1007
1000 1008 """
1001 1009 if self._spin_thread is not None:
1002 1010 self.stop_spin_thread()
1003 1011 self._stop_spinning.clear()
1004 1012 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1005 1013 self._spin_thread.daemon = True
1006 1014 self._spin_thread.start()
1007 1015
1008 1016 def stop_spin_thread(self):
1009 1017 """stop background spin_thread, if any"""
1010 1018 if self._spin_thread is not None:
1011 1019 self._stop_spinning.set()
1012 1020 self._spin_thread.join()
1013 1021 self._spin_thread = None
1014 1022
1015 1023 def spin(self):
1016 1024 """Flush any registration notifications and execution results
1017 1025 waiting in the ZMQ queue.
1018 1026 """
1019 1027 if self._notification_socket:
1020 1028 self._flush_notifications()
1021 1029 if self._iopub_socket:
1022 1030 self._flush_iopub(self._iopub_socket)
1023 1031 if self._mux_socket:
1024 1032 self._flush_results(self._mux_socket)
1025 1033 if self._task_socket:
1026 1034 self._flush_results(self._task_socket)
1027 1035 if self._control_socket:
1028 1036 self._flush_control(self._control_socket)
1029 1037 if self._query_socket:
1030 1038 self._flush_ignored_hub_replies()
1031 1039
1032 1040 def wait(self, jobs=None, timeout=-1):
1033 1041 """waits on one or more `jobs`, for up to `timeout` seconds.
1034 1042
1035 1043 Parameters
1036 1044 ----------
1037 1045
1038 1046 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1039 1047 ints are indices to self.history
1040 1048 strs are msg_ids
1041 1049 default: wait on all outstanding messages
1042 1050 timeout : float
1043 1051 a time in seconds, after which to give up.
1044 1052 default is -1, which means no timeout
1045 1053
1046 1054 Returns
1047 1055 -------
1048 1056
1049 1057 True : when all msg_ids are done
1050 1058 False : timeout reached, some msg_ids still outstanding
1051 1059 """
1052 1060 tic = time.time()
1053 1061 if jobs is None:
1054 1062 theids = self.outstanding
1055 1063 else:
1056 1064 if isinstance(jobs, (int, basestring, AsyncResult)):
1057 1065 jobs = [jobs]
1058 1066 theids = set()
1059 1067 for job in jobs:
1060 1068 if isinstance(job, int):
1061 1069 # index access
1062 1070 job = self.history[job]
1063 1071 elif isinstance(job, AsyncResult):
1064 1072 map(theids.add, job.msg_ids)
1065 1073 continue
1066 1074 theids.add(job)
1067 1075 if not theids.intersection(self.outstanding):
1068 1076 return True
1069 1077 self.spin()
1070 1078 while theids.intersection(self.outstanding):
1071 1079 if timeout >= 0 and ( time.time()-tic ) > timeout:
1072 1080 break
1073 1081 time.sleep(1e-3)
1074 1082 self.spin()
1075 1083 return len(theids.intersection(self.outstanding)) == 0
1076 1084
1077 1085 #--------------------------------------------------------------------------
1078 1086 # Control methods
1079 1087 #--------------------------------------------------------------------------
1080 1088
1081 1089 @spin_first
1082 1090 def clear(self, targets=None, block=None):
1083 1091 """Clear the namespace in target(s)."""
1084 1092 block = self.block if block is None else block
1085 1093 targets = self._build_targets(targets)[0]
1086 1094 for t in targets:
1087 1095 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1088 1096 error = False
1089 1097 if block:
1090 1098 self._flush_ignored_control()
1091 1099 for i in range(len(targets)):
1092 1100 idents,msg = self.session.recv(self._control_socket,0)
1093 1101 if self.debug:
1094 1102 pprint(msg)
1095 1103 if msg['content']['status'] != 'ok':
1096 1104 error = self._unwrap_exception(msg['content'])
1097 1105 else:
1098 1106 self._ignored_control_replies += len(targets)
1099 1107 if error:
1100 1108 raise error
1101 1109
1102 1110
1103 1111 @spin_first
1104 1112 def abort(self, jobs=None, targets=None, block=None):
1105 1113 """Abort specific jobs from the execution queues of target(s).
1106 1114
1107 1115 This is a mechanism to prevent jobs that have already been submitted
1108 1116 from executing.
1109 1117
1110 1118 Parameters
1111 1119 ----------
1112 1120
1113 1121 jobs : msg_id, list of msg_ids, or AsyncResult
1114 1122 The jobs to be aborted
1115 1123
1116 1124 If unspecified/None: abort all outstanding jobs.
1117 1125
1118 1126 """
1119 1127 block = self.block if block is None else block
1120 1128 jobs = jobs if jobs is not None else list(self.outstanding)
1121 1129 targets = self._build_targets(targets)[0]
1122 1130
1123 1131 msg_ids = []
1124 1132 if isinstance(jobs, (basestring,AsyncResult)):
1125 1133 jobs = [jobs]
1126 1134 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1127 1135 if bad_ids:
1128 1136 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1129 1137 for j in jobs:
1130 1138 if isinstance(j, AsyncResult):
1131 1139 msg_ids.extend(j.msg_ids)
1132 1140 else:
1133 1141 msg_ids.append(j)
1134 1142 content = dict(msg_ids=msg_ids)
1135 1143 for t in targets:
1136 1144 self.session.send(self._control_socket, 'abort_request',
1137 1145 content=content, ident=t)
1138 1146 error = False
1139 1147 if block:
1140 1148 self._flush_ignored_control()
1141 1149 for i in range(len(targets)):
1142 1150 idents,msg = self.session.recv(self._control_socket,0)
1143 1151 if self.debug:
1144 1152 pprint(msg)
1145 1153 if msg['content']['status'] != 'ok':
1146 1154 error = self._unwrap_exception(msg['content'])
1147 1155 else:
1148 1156 self._ignored_control_replies += len(targets)
1149 1157 if error:
1150 1158 raise error
1151 1159
1152 1160 @spin_first
1153 1161 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1154 1162 """Terminates one or more engine processes, optionally including the hub.
1155 1163
1156 1164 Parameters
1157 1165 ----------
1158 1166
1159 1167 targets: list of ints or 'all' [default: all]
1160 1168 Which engines to shutdown.
1161 1169 hub: bool [default: False]
1162 1170 Whether to include the Hub. hub=True implies targets='all'.
1163 1171 block: bool [default: self.block]
1164 1172 Whether to wait for clean shutdown replies or not.
1165 1173 restart: bool [default: False]
1166 1174 NOT IMPLEMENTED
1167 1175 whether to restart engines after shutting them down.
1168 1176 """
1169 1177 from IPython.parallel.error import NoEnginesRegistered
1170 1178 if restart:
1171 1179 raise NotImplementedError("Engine restart is not yet implemented")
1172 1180
1173 1181 block = self.block if block is None else block
1174 1182 if hub:
1175 1183 targets = 'all'
1176 1184 try:
1177 1185 targets = self._build_targets(targets)[0]
1178 1186 except NoEnginesRegistered:
1179 1187 targets = []
1180 1188 for t in targets:
1181 1189 self.session.send(self._control_socket, 'shutdown_request',
1182 1190 content={'restart':restart},ident=t)
1183 1191 error = False
1184 1192 if block or hub:
1185 1193 self._flush_ignored_control()
1186 1194 for i in range(len(targets)):
1187 1195 idents,msg = self.session.recv(self._control_socket, 0)
1188 1196 if self.debug:
1189 1197 pprint(msg)
1190 1198 if msg['content']['status'] != 'ok':
1191 1199 error = self._unwrap_exception(msg['content'])
1192 1200 else:
1193 1201 self._ignored_control_replies += len(targets)
1194 1202
1195 1203 if hub:
1196 1204 time.sleep(0.25)
1197 1205 self.session.send(self._query_socket, 'shutdown_request')
1198 1206 idents,msg = self.session.recv(self._query_socket, 0)
1199 1207 if self.debug:
1200 1208 pprint(msg)
1201 1209 if msg['content']['status'] != 'ok':
1202 1210 error = self._unwrap_exception(msg['content'])
1203 1211
1204 1212 if error:
1205 1213 raise error
1206 1214
1207 1215 #--------------------------------------------------------------------------
1208 1216 # Execution related methods
1209 1217 #--------------------------------------------------------------------------
1210 1218
1211 1219 def _maybe_raise(self, result):
1212 1220 """wrapper for maybe raising an exception if apply failed."""
1213 1221 if isinstance(result, error.RemoteError):
1214 1222 raise result
1215 1223
1216 1224 return result
1217 1225
1218 1226 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1219 1227 ident=None):
1220 1228 """construct and send an apply message via a socket.
1221 1229
1222 1230 This is the principal method with which all engine execution is performed by views.
1223 1231 """
1224 1232
1225 1233 if self._closed:
1226 1234 raise RuntimeError("Client cannot be used after its sockets have been closed")
1227 1235
1228 1236 # defaults:
1229 1237 args = args if args is not None else []
1230 1238 kwargs = kwargs if kwargs is not None else {}
1231 1239 metadata = metadata if metadata is not None else {}
1232 1240
1233 1241 # validate arguments
1234 1242 if not callable(f) and not isinstance(f, Reference):
1235 1243 raise TypeError("f must be callable, not %s"%type(f))
1236 1244 if not isinstance(args, (tuple, list)):
1237 1245 raise TypeError("args must be tuple or list, not %s"%type(args))
1238 1246 if not isinstance(kwargs, dict):
1239 1247 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1240 1248 if not isinstance(metadata, dict):
1241 1249 raise TypeError("metadata must be dict, not %s"%type(metadata))
1242 1250
1243 1251 bufs = serialize.pack_apply_message(f, args, kwargs,
1244 1252 buffer_threshold=self.session.buffer_threshold,
1245 1253 item_threshold=self.session.item_threshold,
1246 1254 )
1247 1255
1248 1256 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1249 1257 metadata=metadata, track=track)
1250 1258
1251 1259 msg_id = msg['header']['msg_id']
1252 1260 self.outstanding.add(msg_id)
1253 1261 if ident:
1254 1262 # possibly routed to a specific engine
1255 1263 if isinstance(ident, list):
1256 1264 ident = ident[-1]
1257 1265 if ident in self._engines.values():
1258 1266 # save for later, in case of engine death
1259 1267 self._outstanding_dict[ident].add(msg_id)
1260 1268 self.history.append(msg_id)
1261 1269 self.metadata[msg_id]['submitted'] = datetime.now()
1262 1270
1263 1271 return msg
1264 1272
1265 1273 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1266 1274 """construct and send an execute request via a socket.
1267 1275
1268 1276 """
1269 1277
1270 1278 if self._closed:
1271 1279 raise RuntimeError("Client cannot be used after its sockets have been closed")
1272 1280
1273 1281 # defaults:
1274 1282 metadata = metadata if metadata is not None else {}
1275 1283
1276 1284 # validate arguments
1277 1285 if not isinstance(code, basestring):
1278 1286 raise TypeError("code must be text, not %s" % type(code))
1279 1287 if not isinstance(metadata, dict):
1280 1288 raise TypeError("metadata must be dict, not %s" % type(metadata))
1281 1289
1282 1290 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1283 1291
1284 1292
1285 1293 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1286 1294 metadata=metadata)
1287 1295
1288 1296 msg_id = msg['header']['msg_id']
1289 1297 self.outstanding.add(msg_id)
1290 1298 if ident:
1291 1299 # possibly routed to a specific engine
1292 1300 if isinstance(ident, list):
1293 1301 ident = ident[-1]
1294 1302 if ident in self._engines.values():
1295 1303 # save for later, in case of engine death
1296 1304 self._outstanding_dict[ident].add(msg_id)
1297 1305 self.history.append(msg_id)
1298 1306 self.metadata[msg_id]['submitted'] = datetime.now()
1299 1307
1300 1308 return msg
1301 1309
1302 1310 #--------------------------------------------------------------------------
1303 1311 # construct a View object
1304 1312 #--------------------------------------------------------------------------
1305 1313
1306 1314 def load_balanced_view(self, targets=None):
1307 1315 """construct a DirectView object.
1308 1316
1309 1317 If no arguments are specified, create a LoadBalancedView
1310 1318 using all engines.
1311 1319
1312 1320 Parameters
1313 1321 ----------
1314 1322
1315 1323 targets: list,slice,int,etc. [default: use all engines]
1316 1324 The subset of engines across which to load-balance
1317 1325 """
1318 1326 if targets == 'all':
1319 1327 targets = None
1320 1328 if targets is not None:
1321 1329 targets = self._build_targets(targets)[1]
1322 1330 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1323 1331
1324 1332 def direct_view(self, targets='all'):
1325 1333 """construct a DirectView object.
1326 1334
1327 1335 If no targets are specified, create a DirectView using all engines.
1328 1336
1329 1337 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1330 1338 evaluate the target engines at each execution, whereas rc[:] will connect to
1331 1339 all *current* engines, and that list will not change.
1332 1340
1333 1341 That is, 'all' will always use all engines, whereas rc[:] will not use
1334 1342 engines added after the DirectView is constructed.
1335 1343
1336 1344 Parameters
1337 1345 ----------
1338 1346
1339 1347 targets: list,slice,int,etc. [default: use all engines]
1340 1348 The engines to use for the View
1341 1349 """
1342 1350 single = isinstance(targets, int)
1343 1351 # allow 'all' to be lazily evaluated at each execution
1344 1352 if targets != 'all':
1345 1353 targets = self._build_targets(targets)[1]
1346 1354 if single:
1347 1355 targets = targets[0]
1348 1356 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1349 1357
1350 1358 #--------------------------------------------------------------------------
1351 1359 # Query methods
1352 1360 #--------------------------------------------------------------------------
1353 1361
1354 1362 @spin_first
1355 1363 def get_result(self, indices_or_msg_ids=None, block=None):
1356 1364 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1357 1365
1358 1366 If the client already has the results, no request to the Hub will be made.
1359 1367
1360 1368 This is a convenient way to construct AsyncResult objects, which are wrappers
1361 1369 that include metadata about execution, and allow for awaiting results that
1362 1370 were not submitted by this Client.
1363 1371
1364 1372 It can also be a convenient way to retrieve the metadata associated with
1365 1373 blocking execution, since it always retrieves
1366 1374
1367 1375 Examples
1368 1376 --------
1369 1377 ::
1370 1378
1371 1379 In [10]: r = client.apply()
1372 1380
1373 1381 Parameters
1374 1382 ----------
1375 1383
1376 1384 indices_or_msg_ids : integer history index, str msg_id, or list of either
1377 1385 The indices or msg_ids of indices to be retrieved
1378 1386
1379 1387 block : bool
1380 1388 Whether to wait for the result to be done
1381 1389
1382 1390 Returns
1383 1391 -------
1384 1392
1385 1393 AsyncResult
1386 1394 A single AsyncResult object will always be returned.
1387 1395
1388 1396 AsyncHubResult
1389 1397 A subclass of AsyncResult that retrieves results from the Hub
1390 1398
1391 1399 """
1392 1400 block = self.block if block is None else block
1393 1401 if indices_or_msg_ids is None:
1394 1402 indices_or_msg_ids = -1
1395 1403
1396 1404 single_result = False
1397 1405 if not isinstance(indices_or_msg_ids, (list,tuple)):
1398 1406 indices_or_msg_ids = [indices_or_msg_ids]
1399 1407 single_result = True
1400 1408
1401 1409 theids = []
1402 1410 for id in indices_or_msg_ids:
1403 1411 if isinstance(id, int):
1404 1412 id = self.history[id]
1405 1413 if not isinstance(id, basestring):
1406 1414 raise TypeError("indices must be str or int, not %r"%id)
1407 1415 theids.append(id)
1408 1416
1409 1417 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1410 1418 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1411 1419
1412 1420 # given single msg_id initially, get_result shot get the result itself,
1413 1421 # not a length-one list
1414 1422 if single_result:
1415 1423 theids = theids[0]
1416 1424
1417 1425 if remote_ids:
1418 1426 ar = AsyncHubResult(self, msg_ids=theids)
1419 1427 else:
1420 1428 ar = AsyncResult(self, msg_ids=theids)
1421 1429
1422 1430 if block:
1423 1431 ar.wait()
1424 1432
1425 1433 return ar
1426 1434
1427 1435 @spin_first
1428 1436 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1429 1437 """Resubmit one or more tasks.
1430 1438
1431 1439 in-flight tasks may not be resubmitted.
1432 1440
1433 1441 Parameters
1434 1442 ----------
1435 1443
1436 1444 indices_or_msg_ids : integer history index, str msg_id, or list of either
1437 1445 The indices or msg_ids of indices to be retrieved
1438 1446
1439 1447 block : bool
1440 1448 Whether to wait for the result to be done
1441 1449
1442 1450 Returns
1443 1451 -------
1444 1452
1445 1453 AsyncHubResult
1446 1454 A subclass of AsyncResult that retrieves results from the Hub
1447 1455
1448 1456 """
1449 1457 block = self.block if block is None else block
1450 1458 if indices_or_msg_ids is None:
1451 1459 indices_or_msg_ids = -1
1452 1460
1453 1461 if not isinstance(indices_or_msg_ids, (list,tuple)):
1454 1462 indices_or_msg_ids = [indices_or_msg_ids]
1455 1463
1456 1464 theids = []
1457 1465 for id in indices_or_msg_ids:
1458 1466 if isinstance(id, int):
1459 1467 id = self.history[id]
1460 1468 if not isinstance(id, basestring):
1461 1469 raise TypeError("indices must be str or int, not %r"%id)
1462 1470 theids.append(id)
1463 1471
1464 1472 content = dict(msg_ids = theids)
1465 1473
1466 1474 self.session.send(self._query_socket, 'resubmit_request', content)
1467 1475
1468 1476 zmq.select([self._query_socket], [], [])
1469 1477 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1470 1478 if self.debug:
1471 1479 pprint(msg)
1472 1480 content = msg['content']
1473 1481 if content['status'] != 'ok':
1474 1482 raise self._unwrap_exception(content)
1475 1483 mapping = content['resubmitted']
1476 1484 new_ids = [ mapping[msg_id] for msg_id in theids ]
1477 1485
1478 1486 ar = AsyncHubResult(self, msg_ids=new_ids)
1479 1487
1480 1488 if block:
1481 1489 ar.wait()
1482 1490
1483 1491 return ar
1484 1492
1485 1493 @spin_first
1486 1494 def result_status(self, msg_ids, status_only=True):
1487 1495 """Check on the status of the result(s) of the apply request with `msg_ids`.
1488 1496
1489 1497 If status_only is False, then the actual results will be retrieved, else
1490 1498 only the status of the results will be checked.
1491 1499
1492 1500 Parameters
1493 1501 ----------
1494 1502
1495 1503 msg_ids : list of msg_ids
1496 1504 if int:
1497 1505 Passed as index to self.history for convenience.
1498 1506 status_only : bool (default: True)
1499 1507 if False:
1500 1508 Retrieve the actual results of completed tasks.
1501 1509
1502 1510 Returns
1503 1511 -------
1504 1512
1505 1513 results : dict
1506 1514 There will always be the keys 'pending' and 'completed', which will
1507 1515 be lists of msg_ids that are incomplete or complete. If `status_only`
1508 1516 is False, then completed results will be keyed by their `msg_id`.
1509 1517 """
1510 1518 if not isinstance(msg_ids, (list,tuple)):
1511 1519 msg_ids = [msg_ids]
1512 1520
1513 1521 theids = []
1514 1522 for msg_id in msg_ids:
1515 1523 if isinstance(msg_id, int):
1516 1524 msg_id = self.history[msg_id]
1517 1525 if not isinstance(msg_id, basestring):
1518 1526 raise TypeError("msg_ids must be str, not %r"%msg_id)
1519 1527 theids.append(msg_id)
1520 1528
1521 1529 completed = []
1522 1530 local_results = {}
1523 1531
1524 1532 # comment this block out to temporarily disable local shortcut:
1525 1533 for msg_id in theids:
1526 1534 if msg_id in self.results:
1527 1535 completed.append(msg_id)
1528 1536 local_results[msg_id] = self.results[msg_id]
1529 1537 theids.remove(msg_id)
1530 1538
1531 1539 if theids: # some not locally cached
1532 1540 content = dict(msg_ids=theids, status_only=status_only)
1533 1541 msg = self.session.send(self._query_socket, "result_request", content=content)
1534 1542 zmq.select([self._query_socket], [], [])
1535 1543 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1536 1544 if self.debug:
1537 1545 pprint(msg)
1538 1546 content = msg['content']
1539 1547 if content['status'] != 'ok':
1540 1548 raise self._unwrap_exception(content)
1541 1549 buffers = msg['buffers']
1542 1550 else:
1543 1551 content = dict(completed=[],pending=[])
1544 1552
1545 1553 content['completed'].extend(completed)
1546 1554
1547 1555 if status_only:
1548 1556 return content
1549 1557
1550 1558 failures = []
1551 1559 # load cached results into result:
1552 1560 content.update(local_results)
1553 1561
1554 1562 # update cache with results:
1555 1563 for msg_id in sorted(theids):
1556 1564 if msg_id in content['completed']:
1557 1565 rec = content[msg_id]
1558 1566 parent = rec['header']
1559 1567 header = rec['result_header']
1560 1568 rcontent = rec['result_content']
1561 1569 iodict = rec['io']
1562 1570 if isinstance(rcontent, str):
1563 1571 rcontent = self.session.unpack(rcontent)
1564 1572
1565 1573 md = self.metadata[msg_id]
1566 1574 md_msg = dict(
1567 1575 content=rcontent,
1568 1576 parent_header=parent,
1569 1577 header=header,
1570 1578 metadata=rec['result_metadata'],
1571 1579 )
1572 1580 md.update(self._extract_metadata(md_msg))
1573 1581 if rec.get('received'):
1574 1582 md['received'] = rec['received']
1575 1583 md.update(iodict)
1576 1584
1577 1585 if rcontent['status'] == 'ok':
1578 1586 if header['msg_type'] == 'apply_reply':
1579 1587 res,buffers = serialize.unserialize_object(buffers)
1580 1588 elif header['msg_type'] == 'execute_reply':
1581 1589 res = ExecuteReply(msg_id, rcontent, md)
1582 1590 else:
1583 1591 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1584 1592 else:
1585 1593 res = self._unwrap_exception(rcontent)
1586 1594 failures.append(res)
1587 1595
1588 1596 self.results[msg_id] = res
1589 1597 content[msg_id] = res
1590 1598
1591 1599 if len(theids) == 1 and failures:
1592 1600 raise failures[0]
1593 1601
1594 1602 error.collect_exceptions(failures, "result_status")
1595 1603 return content
1596 1604
1597 1605 @spin_first
1598 1606 def queue_status(self, targets='all', verbose=False):
1599 1607 """Fetch the status of engine queues.
1600 1608
1601 1609 Parameters
1602 1610 ----------
1603 1611
1604 1612 targets : int/str/list of ints/strs
1605 1613 the engines whose states are to be queried.
1606 1614 default : all
1607 1615 verbose : bool
1608 1616 Whether to return lengths only, or lists of ids for each element
1609 1617 """
1610 1618 if targets == 'all':
1611 1619 # allow 'all' to be evaluated on the engine
1612 1620 engine_ids = None
1613 1621 else:
1614 1622 engine_ids = self._build_targets(targets)[1]
1615 1623 content = dict(targets=engine_ids, verbose=verbose)
1616 1624 self.session.send(self._query_socket, "queue_request", content=content)
1617 1625 idents,msg = self.session.recv(self._query_socket, 0)
1618 1626 if self.debug:
1619 1627 pprint(msg)
1620 1628 content = msg['content']
1621 1629 status = content.pop('status')
1622 1630 if status != 'ok':
1623 1631 raise self._unwrap_exception(content)
1624 1632 content = rekey(content)
1625 1633 if isinstance(targets, int):
1626 1634 return content[targets]
1627 1635 else:
1628 1636 return content
1629 1637
1630 1638 def _build_msgids_from_target(self, targets=None):
1631 1639 """Build a list of msg_ids from the list of engine targets"""
1632 1640 if not targets: # needed as _build_targets otherwise uses all engines
1633 1641 return []
1634 1642 target_ids = self._build_targets(targets)[0]
1635 1643 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1636 1644
1637 1645 def _build_msgids_from_jobs(self, jobs=None):
1638 1646 """Build a list of msg_ids from "jobs" """
1639 1647 if not jobs:
1640 1648 return []
1641 1649 msg_ids = []
1642 1650 if isinstance(jobs, (basestring,AsyncResult)):
1643 1651 jobs = [jobs]
1644 1652 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1645 1653 if bad_ids:
1646 1654 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1647 1655 for j in jobs:
1648 1656 if isinstance(j, AsyncResult):
1649 1657 msg_ids.extend(j.msg_ids)
1650 1658 else:
1651 1659 msg_ids.append(j)
1652 1660 return msg_ids
1653 1661
1654 1662 def purge_local_results(self, jobs=[], targets=[]):
1655 1663 """Clears the client caches of results and frees such memory.
1656 1664
1657 1665 Individual results can be purged by msg_id, or the entire
1658 1666 history of specific targets can be purged.
1659 1667
1660 1668 Use `purge_local_results('all')` to scrub everything from the Clients's db.
1661 1669
1662 1670 The client must have no outstanding tasks before purging the caches.
1663 1671 Raises `AssertionError` if there are still outstanding tasks.
1664 1672
1665 1673 After this call all `AsyncResults` are invalid and should be discarded.
1666 1674
1667 1675 If you must "reget" the results, you can still do so by using
1668 1676 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1669 1677 redownload the results from the hub if they are still available
1670 1678 (i.e `client.purge_hub_results(...)` has not been called.
1671 1679
1672 1680 Parameters
1673 1681 ----------
1674 1682
1675 1683 jobs : str or list of str or AsyncResult objects
1676 1684 the msg_ids whose results should be purged.
1677 1685 targets : int/str/list of ints/strs
1678 1686 The targets, by int_id, whose entire results are to be purged.
1679 1687
1680 1688 default : None
1681 1689 """
1682 1690 assert not self.outstanding, "Can't purge a client with outstanding tasks!"
1683 1691
1684 1692 if not targets and not jobs:
1685 1693 raise ValueError("Must specify at least one of `targets` and `jobs`")
1686 1694
1687 1695 if jobs == 'all':
1688 1696 self.results.clear()
1689 1697 self.metadata.clear()
1690 1698 return
1691 1699 else:
1692 1700 msg_ids = []
1693 1701 msg_ids.extend(self._build_msgids_from_target(targets))
1694 1702 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1695 1703 map(self.results.pop, msg_ids)
1696 1704 map(self.metadata.pop, msg_ids)
1697 1705
1698 1706
1699 1707 @spin_first
1700 1708 def purge_hub_results(self, jobs=[], targets=[]):
1701 1709 """Tell the Hub to forget results.
1702 1710
1703 1711 Individual results can be purged by msg_id, or the entire
1704 1712 history of specific targets can be purged.
1705 1713
1706 1714 Use `purge_results('all')` to scrub everything from the Hub's db.
1707 1715
1708 1716 Parameters
1709 1717 ----------
1710 1718
1711 1719 jobs : str or list of str or AsyncResult objects
1712 1720 the msg_ids whose results should be forgotten.
1713 1721 targets : int/str/list of ints/strs
1714 1722 The targets, by int_id, whose entire history is to be purged.
1715 1723
1716 1724 default : None
1717 1725 """
1718 1726 if not targets and not jobs:
1719 1727 raise ValueError("Must specify at least one of `targets` and `jobs`")
1720 1728 if targets:
1721 1729 targets = self._build_targets(targets)[1]
1722 1730
1723 1731 # construct msg_ids from jobs
1724 1732 if jobs == 'all':
1725 1733 msg_ids = jobs
1726 1734 else:
1727 1735 msg_ids = self._build_msgids_from_jobs(jobs)
1728 1736
1729 1737 content = dict(engine_ids=targets, msg_ids=msg_ids)
1730 1738 self.session.send(self._query_socket, "purge_request", content=content)
1731 1739 idents, msg = self.session.recv(self._query_socket, 0)
1732 1740 if self.debug:
1733 1741 pprint(msg)
1734 1742 content = msg['content']
1735 1743 if content['status'] != 'ok':
1736 1744 raise self._unwrap_exception(content)
1737 1745
1738 1746 def purge_results(self, jobs=[], targets=[]):
1739 1747 """Clears the cached results from both the hub and the local client
1740 1748
1741 1749 Individual results can be purged by msg_id, or the entire
1742 1750 history of specific targets can be purged.
1743 1751
1744 1752 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1745 1753 the Client's db.
1746 1754
1747 1755 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1748 1756 the same arguments.
1749 1757
1750 1758 Parameters
1751 1759 ----------
1752 1760
1753 1761 jobs : str or list of str or AsyncResult objects
1754 1762 the msg_ids whose results should be forgotten.
1755 1763 targets : int/str/list of ints/strs
1756 1764 The targets, by int_id, whose entire history is to be purged.
1757 1765
1758 1766 default : None
1759 1767 """
1760 1768 self.purge_local_results(jobs=jobs, targets=targets)
1761 1769 self.purge_hub_results(jobs=jobs, targets=targets)
1762 1770
1763 1771 def purge_everything(self):
1764 1772 """Clears all content from previous Tasks from both the hub and the local client
1765 1773
1766 1774 In addition to calling `purge_results("all")` it also deletes the history and
1767 1775 other bookkeeping lists.
1768 1776 """
1769 1777 self.purge_results("all")
1770 1778 self.history = []
1771 1779 self.session.digest_history.clear()
1772 1780
1773 1781 @spin_first
1774 1782 def hub_history(self):
1775 1783 """Get the Hub's history
1776 1784
1777 1785 Just like the Client, the Hub has a history, which is a list of msg_ids.
1778 1786 This will contain the history of all clients, and, depending on configuration,
1779 1787 may contain history across multiple cluster sessions.
1780 1788
1781 1789 Any msg_id returned here is a valid argument to `get_result`.
1782 1790
1783 1791 Returns
1784 1792 -------
1785 1793
1786 1794 msg_ids : list of strs
1787 1795 list of all msg_ids, ordered by task submission time.
1788 1796 """
1789 1797
1790 1798 self.session.send(self._query_socket, "history_request", content={})
1791 1799 idents, msg = self.session.recv(self._query_socket, 0)
1792 1800
1793 1801 if self.debug:
1794 1802 pprint(msg)
1795 1803 content = msg['content']
1796 1804 if content['status'] != 'ok':
1797 1805 raise self._unwrap_exception(content)
1798 1806 else:
1799 1807 return content['history']
1800 1808
1801 1809 @spin_first
1802 1810 def db_query(self, query, keys=None):
1803 1811 """Query the Hub's TaskRecord database
1804 1812
1805 1813 This will return a list of task record dicts that match `query`
1806 1814
1807 1815 Parameters
1808 1816 ----------
1809 1817
1810 1818 query : mongodb query dict
1811 1819 The search dict. See mongodb query docs for details.
1812 1820 keys : list of strs [optional]
1813 1821 The subset of keys to be returned. The default is to fetch everything but buffers.
1814 1822 'msg_id' will *always* be included.
1815 1823 """
1816 1824 if isinstance(keys, basestring):
1817 1825 keys = [keys]
1818 1826 content = dict(query=query, keys=keys)
1819 1827 self.session.send(self._query_socket, "db_request", content=content)
1820 1828 idents, msg = self.session.recv(self._query_socket, 0)
1821 1829 if self.debug:
1822 1830 pprint(msg)
1823 1831 content = msg['content']
1824 1832 if content['status'] != 'ok':
1825 1833 raise self._unwrap_exception(content)
1826 1834
1827 1835 records = content['records']
1828 1836
1829 1837 buffer_lens = content['buffer_lens']
1830 1838 result_buffer_lens = content['result_buffer_lens']
1831 1839 buffers = msg['buffers']
1832 1840 has_bufs = buffer_lens is not None
1833 1841 has_rbufs = result_buffer_lens is not None
1834 1842 for i,rec in enumerate(records):
1835 1843 # relink buffers
1836 1844 if has_bufs:
1837 1845 blen = buffer_lens[i]
1838 1846 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1839 1847 if has_rbufs:
1840 1848 blen = result_buffer_lens[i]
1841 1849 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1842 1850
1843 1851 return records
1844 1852
1845 1853 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now