##// END OF EJS Templates
even better IOError for no connection file
MinRK -
Show More
@@ -1,1875 +1,1889 b''
1 1 """A semi-synchronous Client for IPython parallel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 8 import os
9 9 import json
10 10 import sys
11 11 from threading import Thread, Event
12 12 import time
13 13 import warnings
14 14 from datetime import datetime
15 15 from getpass import getpass
16 16 from pprint import pprint
17 17
18 18 pjoin = os.path.join
19 19
20 20 import zmq
21 21
22 22 from IPython.config.configurable import MultipleInstanceError
23 23 from IPython.core.application import BaseIPythonApplication
24 24 from IPython.core.profiledir import ProfileDir, ProfileDirError
25 25
26 26 from IPython.utils.capture import RichOutput
27 27 from IPython.utils.coloransi import TermColors
28 28 from IPython.utils.jsonutil import rekey, extract_dates, parse_date
29 29 from IPython.utils.localinterfaces import localhost, is_local_ip
30 30 from IPython.utils.path import get_ipython_dir, compress_user
31 31 from IPython.utils.py3compat import cast_bytes, string_types, xrange, iteritems
32 32 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
33 33 Dict, List, Bool, Set, Any)
34 34 from IPython.external.decorator import decorator
35 35
36 36 from IPython.parallel import Reference
37 37 from IPython.parallel import error
38 38 from IPython.parallel import util
39 39
40 40 from IPython.kernel.zmq.session import Session, Message
41 41 from IPython.kernel.zmq import serialize
42 42
43 43 from .asyncresult import AsyncResult, AsyncHubResult
44 44 from .view import DirectView, LoadBalancedView
45 45
46 46 #--------------------------------------------------------------------------
47 47 # Decorators for Client methods
48 48 #--------------------------------------------------------------------------
49 49
50
50 51 @decorator
51 52 def spin_first(f, self, *args, **kwargs):
52 53 """Call spin() to sync state prior to calling the method."""
53 54 self.spin()
54 55 return f(self, *args, **kwargs)
55 56
56 57
57 58 #--------------------------------------------------------------------------
58 59 # Classes
59 60 #--------------------------------------------------------------------------
60 61
62 _no_connection_file_msg = """
63 Failed to connect because no Controller could be found.
64 Please double-check your profile and ensure that a cluster is running.
65 """
61 66
62 67 class ExecuteReply(RichOutput):
63 68 """wrapper for finished Execute results"""
64 69 def __init__(self, msg_id, content, metadata):
65 70 self.msg_id = msg_id
66 71 self._content = content
67 72 self.execution_count = content['execution_count']
68 73 self.metadata = metadata
69 74
70 75 # RichOutput overrides
71 76
72 77 @property
73 78 def source(self):
74 79 execute_result = self.metadata['execute_result']
75 80 if execute_result:
76 81 return execute_result.get('source', '')
77 82
78 83 @property
79 84 def data(self):
80 85 execute_result = self.metadata['execute_result']
81 86 if execute_result:
82 87 return execute_result.get('data', {})
83 88
84 89 @property
85 90 def _metadata(self):
86 91 execute_result = self.metadata['execute_result']
87 92 if execute_result:
88 93 return execute_result.get('metadata', {})
89 94
90 95 def display(self):
91 96 from IPython.display import publish_display_data
92 97 publish_display_data(self.data, self.metadata)
93 98
94 99 def _repr_mime_(self, mime):
95 100 if mime not in self.data:
96 101 return
97 102 data = self.data[mime]
98 103 if mime in self._metadata:
99 104 return data, self._metadata[mime]
100 105 else:
101 106 return data
102 107
103 108 def __getitem__(self, key):
104 109 return self.metadata[key]
105 110
106 111 def __getattr__(self, key):
107 112 if key not in self.metadata:
108 113 raise AttributeError(key)
109 114 return self.metadata[key]
110 115
111 116 def __repr__(self):
112 117 execute_result = self.metadata['execute_result'] or {'data':{}}
113 118 text_out = execute_result['data'].get('text/plain', '')
114 119 if len(text_out) > 32:
115 120 text_out = text_out[:29] + '...'
116 121
117 122 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
118 123
119 124 def _repr_pretty_(self, p, cycle):
120 125 execute_result = self.metadata['execute_result'] or {'data':{}}
121 126 text_out = execute_result['data'].get('text/plain', '')
122 127
123 128 if not text_out:
124 129 return
125 130
126 131 try:
127 132 ip = get_ipython()
128 133 except NameError:
129 134 colors = "NoColor"
130 135 else:
131 136 colors = ip.colors
132 137
133 138 if colors == "NoColor":
134 139 out = normal = ""
135 140 else:
136 141 out = TermColors.Red
137 142 normal = TermColors.Normal
138 143
139 144 if '\n' in text_out and not text_out.startswith('\n'):
140 145 # add newline for multiline reprs
141 146 text_out = '\n' + text_out
142 147
143 148 p.text(
144 149 out + u'Out[%i:%i]: ' % (
145 150 self.metadata['engine_id'], self.execution_count
146 151 ) + normal + text_out
147 152 )
148 153
149 154
150 155 class Metadata(dict):
151 156 """Subclass of dict for initializing metadata values.
152 157
153 158 Attribute access works on keys.
154 159
155 160 These objects have a strict set of keys - errors will raise if you try
156 161 to add new keys.
157 162 """
158 163 def __init__(self, *args, **kwargs):
159 164 dict.__init__(self)
160 165 md = {'msg_id' : None,
161 166 'submitted' : None,
162 167 'started' : None,
163 168 'completed' : None,
164 169 'received' : None,
165 170 'engine_uuid' : None,
166 171 'engine_id' : None,
167 172 'follow' : None,
168 173 'after' : None,
169 174 'status' : None,
170 175
171 176 'execute_input' : None,
172 177 'execute_result' : None,
173 178 'error' : None,
174 179 'stdout' : '',
175 180 'stderr' : '',
176 181 'outputs' : [],
177 182 'data': {},
178 183 'outputs_ready' : False,
179 184 }
180 185 self.update(md)
181 186 self.update(dict(*args, **kwargs))
182 187
183 188 def __getattr__(self, key):
184 189 """getattr aliased to getitem"""
185 190 if key in self:
186 191 return self[key]
187 192 else:
188 193 raise AttributeError(key)
189 194
190 195 def __setattr__(self, key, value):
191 196 """setattr aliased to setitem, with strict"""
192 197 if key in self:
193 198 self[key] = value
194 199 else:
195 200 raise AttributeError(key)
196 201
197 202 def __setitem__(self, key, value):
198 203 """strict static key enforcement"""
199 204 if key in self:
200 205 dict.__setitem__(self, key, value)
201 206 else:
202 207 raise KeyError(key)
203 208
204 209
205 210 class Client(HasTraits):
206 211 """A semi-synchronous client to the IPython ZMQ cluster
207 212
208 213 Parameters
209 214 ----------
210 215
211 216 url_file : str/unicode; path to ipcontroller-client.json
212 217 This JSON file should contain all the information needed to connect to a cluster,
213 218 and is likely the only argument needed.
214 219 Connection information for the Hub's registration. If a json connector
215 220 file is given, then likely no further configuration is necessary.
216 221 [Default: use profile]
217 222 profile : bytes
218 223 The name of the Cluster profile to be used to find connector information.
219 224 If run from an IPython application, the default profile will be the same
220 225 as the running application, otherwise it will be 'default'.
221 226 cluster_id : str
222 227 String id to added to runtime files, to prevent name collisions when using
223 228 multiple clusters with a single profile simultaneously.
224 229 When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json'
225 230 Since this is text inserted into filenames, typical recommendations apply:
226 231 Simple character strings are ideal, and spaces are not recommended (but
227 232 should generally work)
228 233 context : zmq.Context
229 234 Pass an existing zmq.Context instance, otherwise the client will create its own.
230 235 debug : bool
231 236 flag for lots of message printing for debug purposes
232 237 timeout : int/float
233 238 time (in seconds) to wait for connection replies from the Hub
234 239 [Default: 10]
235 240
236 241 #-------------- session related args ----------------
237 242
238 243 config : Config object
239 244 If specified, this will be relayed to the Session for configuration
240 245 username : str
241 246 set username for the session object
242 247
243 248 #-------------- ssh related args ----------------
244 249 # These are args for configuring the ssh tunnel to be used
245 250 # credentials are used to forward connections over ssh to the Controller
246 251 # Note that the ip given in `addr` needs to be relative to sshserver
247 252 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
248 253 # and set sshserver as the same machine the Controller is on. However,
249 254 # the only requirement is that sshserver is able to see the Controller
250 255 # (i.e. is within the same trusted network).
251 256
252 257 sshserver : str
253 258 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
254 259 If keyfile or password is specified, and this is not, it will default to
255 260 the ip given in addr.
256 261 sshkey : str; path to ssh private key file
257 262 This specifies a key to be used in ssh login, default None.
258 263 Regular default ssh keys will be used without specifying this argument.
259 264 password : str
260 265 Your ssh password to sshserver. Note that if this is left None,
261 266 you will be prompted for it if passwordless key based login is unavailable.
262 267 paramiko : bool
263 268 flag for whether to use paramiko instead of shell ssh for tunneling.
264 269 [default: True on win32, False else]
265 270
266 271
267 272 Attributes
268 273 ----------
269 274
270 275 ids : list of int engine IDs
271 276 requesting the ids attribute always synchronizes
272 277 the registration state. To request ids without synchronization,
273 278 use semi-private _ids attributes.
274 279
275 280 history : list of msg_ids
276 281 a list of msg_ids, keeping track of all the execution
277 282 messages you have submitted in order.
278 283
279 284 outstanding : set of msg_ids
280 285 a set of msg_ids that have been submitted, but whose
281 286 results have not yet been received.
282 287
283 288 results : dict
284 289 a dict of all our results, keyed by msg_id
285 290
286 291 block : bool
287 292 determines default behavior when block not specified
288 293 in execution methods
289 294
290 295 Methods
291 296 -------
292 297
293 298 spin
294 299 flushes incoming results and registration state changes
295 300 control methods spin, and requesting `ids` also ensures up to date
296 301
297 302 wait
298 303 wait on one or more msg_ids
299 304
300 305 execution methods
301 306 apply
302 307 legacy: execute, run
303 308
304 309 data movement
305 310 push, pull, scatter, gather
306 311
307 312 query methods
308 313 queue_status, get_result, purge, result_status
309 314
310 315 control methods
311 316 abort, shutdown
312 317
313 318 """
314 319
315 320
316 321 block = Bool(False)
317 322 outstanding = Set()
318 323 results = Instance('collections.defaultdict', (dict,))
319 324 metadata = Instance('collections.defaultdict', (Metadata,))
320 325 history = List()
321 326 debug = Bool(False)
322 327 _spin_thread = Any()
323 328 _stop_spinning = Any()
324 329
325 330 profile=Unicode()
326 331 def _profile_default(self):
327 332 if BaseIPythonApplication.initialized():
328 333 # an IPython app *might* be running, try to get its profile
329 334 try:
330 335 return BaseIPythonApplication.instance().profile
331 336 except (AttributeError, MultipleInstanceError):
332 337 # could be a *different* subclass of config.Application,
333 338 # which would raise one of these two errors.
334 339 return u'default'
335 340 else:
336 341 return u'default'
337 342
338 343
339 344 _outstanding_dict = Instance('collections.defaultdict', (set,))
340 345 _ids = List()
341 346 _connected=Bool(False)
342 347 _ssh=Bool(False)
343 348 _context = Instance('zmq.Context')
344 349 _config = Dict()
345 350 _engines=Instance(util.ReverseDict, (), {})
346 351 # _hub_socket=Instance('zmq.Socket')
347 352 _query_socket=Instance('zmq.Socket')
348 353 _control_socket=Instance('zmq.Socket')
349 354 _iopub_socket=Instance('zmq.Socket')
350 355 _notification_socket=Instance('zmq.Socket')
351 356 _mux_socket=Instance('zmq.Socket')
352 357 _task_socket=Instance('zmq.Socket')
353 358 _task_scheme=Unicode()
354 359 _closed = False
355 360 _ignored_control_replies=Integer(0)
356 361 _ignored_hub_replies=Integer(0)
357 362
358 363 def __new__(self, *args, **kw):
359 364 # don't raise on positional args
360 365 return HasTraits.__new__(self, **kw)
361 366
362 367 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
363 368 context=None, debug=False,
364 369 sshserver=None, sshkey=None, password=None, paramiko=None,
365 370 timeout=10, cluster_id=None, **extra_args
366 371 ):
367 372 if profile:
368 373 super(Client, self).__init__(debug=debug, profile=profile)
369 374 else:
370 375 super(Client, self).__init__(debug=debug)
371 376 if context is None:
372 377 context = zmq.Context.instance()
373 378 self._context = context
374 379 self._stop_spinning = Event()
375 380
376 381 if 'url_or_file' in extra_args:
377 382 url_file = extra_args['url_or_file']
378 383 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
379 384
380 385 if url_file and util.is_url(url_file):
381 386 raise ValueError("single urls cannot be specified, url-files must be used.")
382 387
383 388 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
384 389
390 no_file_msg = '\n'.join([
391 "You have attempted to connect to an IPython Cluster but no Controller could be found.",
392 "Please double-check your configuration and ensure that a cluster is running.",
393 ])
394
385 395 if self._cd is not None:
386 396 if url_file is None:
387 397 if not cluster_id:
388 398 client_json = 'ipcontroller-client.json'
389 399 else:
390 400 client_json = 'ipcontroller-%s-client.json' % cluster_id
391 401 url_file = pjoin(self._cd.security_dir, client_json)
402 if not os.path.exists(url_file):
403 msg = '\n'.join([
404 "Connection file %r not found." % compress_user(url_file),
405 no_file_msg,
406 ])
407 raise IOError(msg)
392 408 if url_file is None:
393 raise ValueError(
394 "I can't find enough information to connect to a hub!"
395 " Please specify at least one of url_file or profile."
396 )
409 raise IOError(no_file_msg)
397 410
398 411 if not os.path.exists(url_file):
412 # Connection file explicitly specified, but not found
399 413 raise IOError("Connection file %r not found. Is a controller running?" % \
400 414 compress_user(url_file)
401 415 )
402 416
403 417 with open(url_file) as f:
404 418 cfg = json.load(f)
405 419
406 420 self._task_scheme = cfg['task_scheme']
407 421
408 422 # sync defaults from args, json:
409 423 if sshserver:
410 424 cfg['ssh'] = sshserver
411 425
412 426 location = cfg.setdefault('location', None)
413 427
414 428 proto,addr = cfg['interface'].split('://')
415 429 addr = util.disambiguate_ip_address(addr, location)
416 430 cfg['interface'] = "%s://%s" % (proto, addr)
417 431
418 432 # turn interface,port into full urls:
419 433 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
420 434 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
421 435
422 436 url = cfg['registration']
423 437
424 438 if location is not None and addr == localhost():
425 439 # location specified, and connection is expected to be local
426 440 if not is_local_ip(location) and not sshserver:
427 441 # load ssh from JSON *only* if the controller is not on
428 442 # this machine
429 443 sshserver=cfg['ssh']
430 444 if not is_local_ip(location) and not sshserver:
431 445 # warn if no ssh specified, but SSH is probably needed
432 446 # This is only a warning, because the most likely cause
433 447 # is a local Controller on a laptop whose IP is dynamic
434 448 warnings.warn("""
435 449 Controller appears to be listening on localhost, but not on this machine.
436 450 If this is true, you should specify Client(...,sshserver='you@%s')
437 451 or instruct your controller to listen on an external IP."""%location,
438 452 RuntimeWarning)
439 453 elif not sshserver:
440 454 # otherwise sync with cfg
441 455 sshserver = cfg['ssh']
442 456
443 457 self._config = cfg
444 458
445 459 self._ssh = bool(sshserver or sshkey or password)
446 460 if self._ssh and sshserver is None:
447 461 # default to ssh via localhost
448 462 sshserver = addr
449 463 if self._ssh and password is None:
450 464 from zmq.ssh import tunnel
451 465 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
452 466 password=False
453 467 else:
454 468 password = getpass("SSH Password for %s: "%sshserver)
455 469 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
456 470
457 471 # configure and construct the session
458 472 try:
459 473 extra_args['packer'] = cfg['pack']
460 474 extra_args['unpacker'] = cfg['unpack']
461 475 extra_args['key'] = cast_bytes(cfg['key'])
462 476 extra_args['signature_scheme'] = cfg['signature_scheme']
463 477 except KeyError as exc:
464 478 msg = '\n'.join([
465 479 "Connection file is invalid (missing '{}'), possibly from an old version of IPython.",
466 480 "If you are reusing connection files, remove them and start ipcontroller again."
467 481 ])
468 482 raise ValueError(msg.format(exc.message))
469 483
470 484 self.session = Session(**extra_args)
471 485
472 486 self._query_socket = self._context.socket(zmq.DEALER)
473 487
474 488 if self._ssh:
475 489 from zmq.ssh import tunnel
476 490 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
477 491 else:
478 492 self._query_socket.connect(cfg['registration'])
479 493
480 494 self.session.debug = self.debug
481 495
482 496 self._notification_handlers = {'registration_notification' : self._register_engine,
483 497 'unregistration_notification' : self._unregister_engine,
484 498 'shutdown_notification' : lambda msg: self.close(),
485 499 }
486 500 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
487 501 'apply_reply' : self._handle_apply_reply}
488 502
489 503 try:
490 504 self._connect(sshserver, ssh_kwargs, timeout)
491 505 except:
492 506 self.close(linger=0)
493 507 raise
494 508
495 509 # last step: setup magics, if we are in IPython:
496 510
497 511 try:
498 512 ip = get_ipython()
499 513 except NameError:
500 514 return
501 515 else:
502 516 if 'px' not in ip.magics_manager.magics:
503 517 # in IPython but we are the first Client.
504 518 # activate a default view for parallel magics.
505 519 self.activate()
506 520
507 521 def __del__(self):
508 522 """cleanup sockets, but _not_ context."""
509 523 self.close()
510 524
511 525 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
512 526 if ipython_dir is None:
513 527 ipython_dir = get_ipython_dir()
514 528 if profile_dir is not None:
515 529 try:
516 530 self._cd = ProfileDir.find_profile_dir(profile_dir)
517 531 return
518 532 except ProfileDirError:
519 533 pass
520 534 elif profile is not None:
521 535 try:
522 536 self._cd = ProfileDir.find_profile_dir_by_name(
523 537 ipython_dir, profile)
524 538 return
525 539 except ProfileDirError:
526 540 pass
527 541 self._cd = None
528 542
529 543 def _update_engines(self, engines):
530 544 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
531 545 for k,v in iteritems(engines):
532 546 eid = int(k)
533 547 if eid not in self._engines:
534 548 self._ids.append(eid)
535 549 self._engines[eid] = v
536 550 self._ids = sorted(self._ids)
537 551 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
538 552 self._task_scheme == 'pure' and self._task_socket:
539 553 self._stop_scheduling_tasks()
540 554
541 555 def _stop_scheduling_tasks(self):
542 556 """Stop scheduling tasks because an engine has been unregistered
543 557 from a pure ZMQ scheduler.
544 558 """
545 559 self._task_socket.close()
546 560 self._task_socket = None
547 561 msg = "An engine has been unregistered, and we are using pure " +\
548 562 "ZMQ task scheduling. Task farming will be disabled."
549 563 if self.outstanding:
550 564 msg += " If you were running tasks when this happened, " +\
551 565 "some `outstanding` msg_ids may never resolve."
552 566 warnings.warn(msg, RuntimeWarning)
553 567
554 568 def _build_targets(self, targets):
555 569 """Turn valid target IDs or 'all' into two lists:
556 570 (int_ids, uuids).
557 571 """
558 572 if not self._ids:
559 573 # flush notification socket if no engines yet, just in case
560 574 if not self.ids:
561 575 raise error.NoEnginesRegistered("Can't build targets without any engines")
562 576
563 577 if targets is None:
564 578 targets = self._ids
565 579 elif isinstance(targets, string_types):
566 580 if targets.lower() == 'all':
567 581 targets = self._ids
568 582 else:
569 583 raise TypeError("%r not valid str target, must be 'all'"%(targets))
570 584 elif isinstance(targets, int):
571 585 if targets < 0:
572 586 targets = self.ids[targets]
573 587 if targets not in self._ids:
574 588 raise IndexError("No such engine: %i"%targets)
575 589 targets = [targets]
576 590
577 591 if isinstance(targets, slice):
578 592 indices = list(range(len(self._ids))[targets])
579 593 ids = self.ids
580 594 targets = [ ids[i] for i in indices ]
581 595
582 596 if not isinstance(targets, (tuple, list, xrange)):
583 597 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
584 598
585 599 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
586 600
587 601 def _connect(self, sshserver, ssh_kwargs, timeout):
588 602 """setup all our socket connections to the cluster. This is called from
589 603 __init__."""
590 604
591 605 # Maybe allow reconnecting?
592 606 if self._connected:
593 607 return
594 608 self._connected=True
595 609
596 610 def connect_socket(s, url):
597 611 if self._ssh:
598 612 from zmq.ssh import tunnel
599 613 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
600 614 else:
601 615 return s.connect(url)
602 616
603 617 self.session.send(self._query_socket, 'connection_request')
604 618 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
605 619 poller = zmq.Poller()
606 620 poller.register(self._query_socket, zmq.POLLIN)
607 621 # poll expects milliseconds, timeout is seconds
608 622 evts = poller.poll(timeout*1000)
609 623 if not evts:
610 624 raise error.TimeoutError("Hub connection request timed out")
611 625 idents,msg = self.session.recv(self._query_socket,mode=0)
612 626 if self.debug:
613 627 pprint(msg)
614 628 content = msg['content']
615 629 # self._config['registration'] = dict(content)
616 630 cfg = self._config
617 631 if content['status'] == 'ok':
618 632 self._mux_socket = self._context.socket(zmq.DEALER)
619 633 connect_socket(self._mux_socket, cfg['mux'])
620 634
621 635 self._task_socket = self._context.socket(zmq.DEALER)
622 636 connect_socket(self._task_socket, cfg['task'])
623 637
624 638 self._notification_socket = self._context.socket(zmq.SUB)
625 639 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
626 640 connect_socket(self._notification_socket, cfg['notification'])
627 641
628 642 self._control_socket = self._context.socket(zmq.DEALER)
629 643 connect_socket(self._control_socket, cfg['control'])
630 644
631 645 self._iopub_socket = self._context.socket(zmq.SUB)
632 646 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
633 647 connect_socket(self._iopub_socket, cfg['iopub'])
634 648
635 649 self._update_engines(dict(content['engines']))
636 650 else:
637 651 self._connected = False
638 652 raise Exception("Failed to connect!")
639 653
640 654 #--------------------------------------------------------------------------
641 655 # handlers and callbacks for incoming messages
642 656 #--------------------------------------------------------------------------
643 657
644 658 def _unwrap_exception(self, content):
645 659 """unwrap exception, and remap engine_id to int."""
646 660 e = error.unwrap_exception(content)
647 661 # print e.traceback
648 662 if e.engine_info:
649 663 e_uuid = e.engine_info['engine_uuid']
650 664 eid = self._engines[e_uuid]
651 665 e.engine_info['engine_id'] = eid
652 666 return e
653 667
654 668 def _extract_metadata(self, msg):
655 669 header = msg['header']
656 670 parent = msg['parent_header']
657 671 msg_meta = msg['metadata']
658 672 content = msg['content']
659 673 md = {'msg_id' : parent['msg_id'],
660 674 'received' : datetime.now(),
661 675 'engine_uuid' : msg_meta.get('engine', None),
662 676 'follow' : msg_meta.get('follow', []),
663 677 'after' : msg_meta.get('after', []),
664 678 'status' : content['status'],
665 679 }
666 680
667 681 if md['engine_uuid'] is not None:
668 682 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
669 683
670 684 if 'date' in parent:
671 685 md['submitted'] = parent['date']
672 686 if 'started' in msg_meta:
673 687 md['started'] = parse_date(msg_meta['started'])
674 688 if 'date' in header:
675 689 md['completed'] = header['date']
676 690 return md
677 691
678 692 def _register_engine(self, msg):
679 693 """Register a new engine, and update our connection info."""
680 694 content = msg['content']
681 695 eid = content['id']
682 696 d = {eid : content['uuid']}
683 697 self._update_engines(d)
684 698
685 699 def _unregister_engine(self, msg):
686 700 """Unregister an engine that has died."""
687 701 content = msg['content']
688 702 eid = int(content['id'])
689 703 if eid in self._ids:
690 704 self._ids.remove(eid)
691 705 uuid = self._engines.pop(eid)
692 706
693 707 self._handle_stranded_msgs(eid, uuid)
694 708
695 709 if self._task_socket and self._task_scheme == 'pure':
696 710 self._stop_scheduling_tasks()
697 711
698 712 def _handle_stranded_msgs(self, eid, uuid):
699 713 """Handle messages known to be on an engine when the engine unregisters.
700 714
701 715 It is possible that this will fire prematurely - that is, an engine will
702 716 go down after completing a result, and the client will be notified
703 717 of the unregistration and later receive the successful result.
704 718 """
705 719
706 720 outstanding = self._outstanding_dict[uuid]
707 721
708 722 for msg_id in list(outstanding):
709 723 if msg_id in self.results:
710 724 # we already
711 725 continue
712 726 try:
713 727 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
714 728 except:
715 729 content = error.wrap_exception()
716 730 # build a fake message:
717 731 msg = self.session.msg('apply_reply', content=content)
718 732 msg['parent_header']['msg_id'] = msg_id
719 733 msg['metadata']['engine'] = uuid
720 734 self._handle_apply_reply(msg)
721 735
722 736 def _handle_execute_reply(self, msg):
723 737 """Save the reply to an execute_request into our results.
724 738
725 739 execute messages are never actually used. apply is used instead.
726 740 """
727 741
728 742 parent = msg['parent_header']
729 743 msg_id = parent['msg_id']
730 744 if msg_id not in self.outstanding:
731 745 if msg_id in self.history:
732 746 print("got stale result: %s"%msg_id)
733 747 else:
734 748 print("got unknown result: %s"%msg_id)
735 749 else:
736 750 self.outstanding.remove(msg_id)
737 751
738 752 content = msg['content']
739 753 header = msg['header']
740 754
741 755 # construct metadata:
742 756 md = self.metadata[msg_id]
743 757 md.update(self._extract_metadata(msg))
744 758 # is this redundant?
745 759 self.metadata[msg_id] = md
746 760
747 761 e_outstanding = self._outstanding_dict[md['engine_uuid']]
748 762 if msg_id in e_outstanding:
749 763 e_outstanding.remove(msg_id)
750 764
751 765 # construct result:
752 766 if content['status'] == 'ok':
753 767 self.results[msg_id] = ExecuteReply(msg_id, content, md)
754 768 elif content['status'] == 'aborted':
755 769 self.results[msg_id] = error.TaskAborted(msg_id)
756 770 elif content['status'] == 'resubmitted':
757 771 # TODO: handle resubmission
758 772 pass
759 773 else:
760 774 self.results[msg_id] = self._unwrap_exception(content)
761 775
762 776 def _handle_apply_reply(self, msg):
763 777 """Save the reply to an apply_request into our results."""
764 778 parent = msg['parent_header']
765 779 msg_id = parent['msg_id']
766 780 if msg_id not in self.outstanding:
767 781 if msg_id in self.history:
768 782 print("got stale result: %s"%msg_id)
769 783 print(self.results[msg_id])
770 784 print(msg)
771 785 else:
772 786 print("got unknown result: %s"%msg_id)
773 787 else:
774 788 self.outstanding.remove(msg_id)
775 789 content = msg['content']
776 790 header = msg['header']
777 791
778 792 # construct metadata:
779 793 md = self.metadata[msg_id]
780 794 md.update(self._extract_metadata(msg))
781 795 # is this redundant?
782 796 self.metadata[msg_id] = md
783 797
784 798 e_outstanding = self._outstanding_dict[md['engine_uuid']]
785 799 if msg_id in e_outstanding:
786 800 e_outstanding.remove(msg_id)
787 801
788 802 # construct result:
789 803 if content['status'] == 'ok':
790 804 self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0]
791 805 elif content['status'] == 'aborted':
792 806 self.results[msg_id] = error.TaskAborted(msg_id)
793 807 elif content['status'] == 'resubmitted':
794 808 # TODO: handle resubmission
795 809 pass
796 810 else:
797 811 self.results[msg_id] = self._unwrap_exception(content)
798 812
799 813 def _flush_notifications(self):
800 814 """Flush notifications of engine registrations waiting
801 815 in ZMQ queue."""
802 816 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
803 817 while msg is not None:
804 818 if self.debug:
805 819 pprint(msg)
806 820 msg_type = msg['header']['msg_type']
807 821 handler = self._notification_handlers.get(msg_type, None)
808 822 if handler is None:
809 823 raise Exception("Unhandled message type: %s" % msg_type)
810 824 else:
811 825 handler(msg)
812 826 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
813 827
814 828 def _flush_results(self, sock):
815 829 """Flush task or queue results waiting in ZMQ queue."""
816 830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
817 831 while msg is not None:
818 832 if self.debug:
819 833 pprint(msg)
820 834 msg_type = msg['header']['msg_type']
821 835 handler = self._queue_handlers.get(msg_type, None)
822 836 if handler is None:
823 837 raise Exception("Unhandled message type: %s" % msg_type)
824 838 else:
825 839 handler(msg)
826 840 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
827 841
828 842 def _flush_control(self, sock):
829 843 """Flush replies from the control channel waiting
830 844 in the ZMQ queue.
831 845
832 846 Currently: ignore them."""
833 847 if self._ignored_control_replies <= 0:
834 848 return
835 849 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
836 850 while msg is not None:
837 851 self._ignored_control_replies -= 1
838 852 if self.debug:
839 853 pprint(msg)
840 854 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
841 855
842 856 def _flush_ignored_control(self):
843 857 """flush ignored control replies"""
844 858 while self._ignored_control_replies > 0:
845 859 self.session.recv(self._control_socket)
846 860 self._ignored_control_replies -= 1
847 861
848 862 def _flush_ignored_hub_replies(self):
849 863 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
850 864 while msg is not None:
851 865 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
852 866
853 867 def _flush_iopub(self, sock):
854 868 """Flush replies from the iopub channel waiting
855 869 in the ZMQ queue.
856 870 """
857 871 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
858 872 while msg is not None:
859 873 if self.debug:
860 874 pprint(msg)
861 875 parent = msg['parent_header']
862 876 # ignore IOPub messages with no parent.
863 877 # Caused by print statements or warnings from before the first execution.
864 878 if not parent:
865 879 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
866 880 continue
867 881 msg_id = parent['msg_id']
868 882 content = msg['content']
869 883 header = msg['header']
870 884 msg_type = msg['header']['msg_type']
871 885
872 886 # init metadata:
873 887 md = self.metadata[msg_id]
874 888
875 889 if msg_type == 'stream':
876 890 name = content['name']
877 891 s = md[name] or ''
878 892 md[name] = s + content['data']
879 893 elif msg_type == 'error':
880 894 md.update({'error' : self._unwrap_exception(content)})
881 895 elif msg_type == 'execute_input':
882 896 md.update({'execute_input' : content['code']})
883 897 elif msg_type == 'display_data':
884 898 md['outputs'].append(content)
885 899 elif msg_type == 'execute_result':
886 900 md['execute_result'] = content
887 901 elif msg_type == 'data_message':
888 902 data, remainder = serialize.unserialize_object(msg['buffers'])
889 903 md['data'].update(data)
890 904 elif msg_type == 'status':
891 905 # idle message comes after all outputs
892 906 if content['execution_state'] == 'idle':
893 907 md['outputs_ready'] = True
894 908 else:
895 909 # unhandled msg_type (status, etc.)
896 910 pass
897 911
898 912 # reduntant?
899 913 self.metadata[msg_id] = md
900 914
901 915 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
902 916
903 917 #--------------------------------------------------------------------------
904 918 # len, getitem
905 919 #--------------------------------------------------------------------------
906 920
907 921 def __len__(self):
908 922 """len(client) returns # of engines."""
909 923 return len(self.ids)
910 924
911 925 def __getitem__(self, key):
912 926 """index access returns DirectView multiplexer objects
913 927
914 928 Must be int, slice, or list/tuple/xrange of ints"""
915 929 if not isinstance(key, (int, slice, tuple, list, xrange)):
916 930 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
917 931 else:
918 932 return self.direct_view(key)
919 933
920 934 def __iter__(self):
921 935 """Since we define getitem, Client is iterable
922 936
923 937 but unless we also define __iter__, it won't work correctly unless engine IDs
924 938 start at zero and are continuous.
925 939 """
926 940 for eid in self.ids:
927 941 yield self.direct_view(eid)
928 942
929 943 #--------------------------------------------------------------------------
930 944 # Begin public methods
931 945 #--------------------------------------------------------------------------
932 946
933 947 @property
934 948 def ids(self):
935 949 """Always up-to-date ids property."""
936 950 self._flush_notifications()
937 951 # always copy:
938 952 return list(self._ids)
939 953
940 954 def activate(self, targets='all', suffix=''):
941 955 """Create a DirectView and register it with IPython magics
942 956
943 957 Defines the magics `%px, %autopx, %pxresult, %%px`
944 958
945 959 Parameters
946 960 ----------
947 961
948 962 targets: int, list of ints, or 'all'
949 963 The engines on which the view's magics will run
950 964 suffix: str [default: '']
951 965 The suffix, if any, for the magics. This allows you to have
952 966 multiple views associated with parallel magics at the same time.
953 967
954 968 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
955 969 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
956 970 on engine 0.
957 971 """
958 972 view = self.direct_view(targets)
959 973 view.block = True
960 974 view.activate(suffix)
961 975 return view
962 976
963 977 def close(self, linger=None):
964 978 """Close my zmq Sockets
965 979
966 980 If `linger`, set the zmq LINGER socket option,
967 981 which allows discarding of messages.
968 982 """
969 983 if self._closed:
970 984 return
971 985 self.stop_spin_thread()
972 986 snames = [ trait for trait in self.trait_names() if trait.endswith("socket") ]
973 987 for name in snames:
974 988 socket = getattr(self, name)
975 989 if socket is not None and not socket.closed:
976 990 if linger is not None:
977 991 socket.close(linger=linger)
978 992 else:
979 993 socket.close()
980 994 self._closed = True
981 995
982 996 def _spin_every(self, interval=1):
983 997 """target func for use in spin_thread"""
984 998 while True:
985 999 if self._stop_spinning.is_set():
986 1000 return
987 1001 time.sleep(interval)
988 1002 self.spin()
989 1003
990 1004 def spin_thread(self, interval=1):
991 1005 """call Client.spin() in a background thread on some regular interval
992 1006
993 1007 This helps ensure that messages don't pile up too much in the zmq queue
994 1008 while you are working on other things, or just leaving an idle terminal.
995 1009
996 1010 It also helps limit potential padding of the `received` timestamp
997 1011 on AsyncResult objects, used for timings.
998 1012
999 1013 Parameters
1000 1014 ----------
1001 1015
1002 1016 interval : float, optional
1003 1017 The interval on which to spin the client in the background thread
1004 1018 (simply passed to time.sleep).
1005 1019
1006 1020 Notes
1007 1021 -----
1008 1022
1009 1023 For precision timing, you may want to use this method to put a bound
1010 1024 on the jitter (in seconds) in `received` timestamps used
1011 1025 in AsyncResult.wall_time.
1012 1026
1013 1027 """
1014 1028 if self._spin_thread is not None:
1015 1029 self.stop_spin_thread()
1016 1030 self._stop_spinning.clear()
1017 1031 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
1018 1032 self._spin_thread.daemon = True
1019 1033 self._spin_thread.start()
1020 1034
1021 1035 def stop_spin_thread(self):
1022 1036 """stop background spin_thread, if any"""
1023 1037 if self._spin_thread is not None:
1024 1038 self._stop_spinning.set()
1025 1039 self._spin_thread.join()
1026 1040 self._spin_thread = None
1027 1041
1028 1042 def spin(self):
1029 1043 """Flush any registration notifications and execution results
1030 1044 waiting in the ZMQ queue.
1031 1045 """
1032 1046 if self._notification_socket:
1033 1047 self._flush_notifications()
1034 1048 if self._iopub_socket:
1035 1049 self._flush_iopub(self._iopub_socket)
1036 1050 if self._mux_socket:
1037 1051 self._flush_results(self._mux_socket)
1038 1052 if self._task_socket:
1039 1053 self._flush_results(self._task_socket)
1040 1054 if self._control_socket:
1041 1055 self._flush_control(self._control_socket)
1042 1056 if self._query_socket:
1043 1057 self._flush_ignored_hub_replies()
1044 1058
1045 1059 def wait(self, jobs=None, timeout=-1):
1046 1060 """waits on one or more `jobs`, for up to `timeout` seconds.
1047 1061
1048 1062 Parameters
1049 1063 ----------
1050 1064
1051 1065 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1052 1066 ints are indices to self.history
1053 1067 strs are msg_ids
1054 1068 default: wait on all outstanding messages
1055 1069 timeout : float
1056 1070 a time in seconds, after which to give up.
1057 1071 default is -1, which means no timeout
1058 1072
1059 1073 Returns
1060 1074 -------
1061 1075
1062 1076 True : when all msg_ids are done
1063 1077 False : timeout reached, some msg_ids still outstanding
1064 1078 """
1065 1079 tic = time.time()
1066 1080 if jobs is None:
1067 1081 theids = self.outstanding
1068 1082 else:
1069 1083 if isinstance(jobs, string_types + (int, AsyncResult)):
1070 1084 jobs = [jobs]
1071 1085 theids = set()
1072 1086 for job in jobs:
1073 1087 if isinstance(job, int):
1074 1088 # index access
1075 1089 job = self.history[job]
1076 1090 elif isinstance(job, AsyncResult):
1077 1091 theids.update(job.msg_ids)
1078 1092 continue
1079 1093 theids.add(job)
1080 1094 if not theids.intersection(self.outstanding):
1081 1095 return True
1082 1096 self.spin()
1083 1097 while theids.intersection(self.outstanding):
1084 1098 if timeout >= 0 and ( time.time()-tic ) > timeout:
1085 1099 break
1086 1100 time.sleep(1e-3)
1087 1101 self.spin()
1088 1102 return len(theids.intersection(self.outstanding)) == 0
1089 1103
1090 1104 #--------------------------------------------------------------------------
1091 1105 # Control methods
1092 1106 #--------------------------------------------------------------------------
1093 1107
1094 1108 @spin_first
1095 1109 def clear(self, targets=None, block=None):
1096 1110 """Clear the namespace in target(s)."""
1097 1111 block = self.block if block is None else block
1098 1112 targets = self._build_targets(targets)[0]
1099 1113 for t in targets:
1100 1114 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1101 1115 error = False
1102 1116 if block:
1103 1117 self._flush_ignored_control()
1104 1118 for i in range(len(targets)):
1105 1119 idents,msg = self.session.recv(self._control_socket,0)
1106 1120 if self.debug:
1107 1121 pprint(msg)
1108 1122 if msg['content']['status'] != 'ok':
1109 1123 error = self._unwrap_exception(msg['content'])
1110 1124 else:
1111 1125 self._ignored_control_replies += len(targets)
1112 1126 if error:
1113 1127 raise error
1114 1128
1115 1129
1116 1130 @spin_first
1117 1131 def abort(self, jobs=None, targets=None, block=None):
1118 1132 """Abort specific jobs from the execution queues of target(s).
1119 1133
1120 1134 This is a mechanism to prevent jobs that have already been submitted
1121 1135 from executing.
1122 1136
1123 1137 Parameters
1124 1138 ----------
1125 1139
1126 1140 jobs : msg_id, list of msg_ids, or AsyncResult
1127 1141 The jobs to be aborted
1128 1142
1129 1143 If unspecified/None: abort all outstanding jobs.
1130 1144
1131 1145 """
1132 1146 block = self.block if block is None else block
1133 1147 jobs = jobs if jobs is not None else list(self.outstanding)
1134 1148 targets = self._build_targets(targets)[0]
1135 1149
1136 1150 msg_ids = []
1137 1151 if isinstance(jobs, string_types + (AsyncResult,)):
1138 1152 jobs = [jobs]
1139 1153 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1140 1154 if bad_ids:
1141 1155 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1142 1156 for j in jobs:
1143 1157 if isinstance(j, AsyncResult):
1144 1158 msg_ids.extend(j.msg_ids)
1145 1159 else:
1146 1160 msg_ids.append(j)
1147 1161 content = dict(msg_ids=msg_ids)
1148 1162 for t in targets:
1149 1163 self.session.send(self._control_socket, 'abort_request',
1150 1164 content=content, ident=t)
1151 1165 error = False
1152 1166 if block:
1153 1167 self._flush_ignored_control()
1154 1168 for i in range(len(targets)):
1155 1169 idents,msg = self.session.recv(self._control_socket,0)
1156 1170 if self.debug:
1157 1171 pprint(msg)
1158 1172 if msg['content']['status'] != 'ok':
1159 1173 error = self._unwrap_exception(msg['content'])
1160 1174 else:
1161 1175 self._ignored_control_replies += len(targets)
1162 1176 if error:
1163 1177 raise error
1164 1178
1165 1179 @spin_first
1166 1180 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1167 1181 """Terminates one or more engine processes, optionally including the hub.
1168 1182
1169 1183 Parameters
1170 1184 ----------
1171 1185
1172 1186 targets: list of ints or 'all' [default: all]
1173 1187 Which engines to shutdown.
1174 1188 hub: bool [default: False]
1175 1189 Whether to include the Hub. hub=True implies targets='all'.
1176 1190 block: bool [default: self.block]
1177 1191 Whether to wait for clean shutdown replies or not.
1178 1192 restart: bool [default: False]
1179 1193 NOT IMPLEMENTED
1180 1194 whether to restart engines after shutting them down.
1181 1195 """
1182 1196 from IPython.parallel.error import NoEnginesRegistered
1183 1197 if restart:
1184 1198 raise NotImplementedError("Engine restart is not yet implemented")
1185 1199
1186 1200 block = self.block if block is None else block
1187 1201 if hub:
1188 1202 targets = 'all'
1189 1203 try:
1190 1204 targets = self._build_targets(targets)[0]
1191 1205 except NoEnginesRegistered:
1192 1206 targets = []
1193 1207 for t in targets:
1194 1208 self.session.send(self._control_socket, 'shutdown_request',
1195 1209 content={'restart':restart},ident=t)
1196 1210 error = False
1197 1211 if block or hub:
1198 1212 self._flush_ignored_control()
1199 1213 for i in range(len(targets)):
1200 1214 idents,msg = self.session.recv(self._control_socket, 0)
1201 1215 if self.debug:
1202 1216 pprint(msg)
1203 1217 if msg['content']['status'] != 'ok':
1204 1218 error = self._unwrap_exception(msg['content'])
1205 1219 else:
1206 1220 self._ignored_control_replies += len(targets)
1207 1221
1208 1222 if hub:
1209 1223 time.sleep(0.25)
1210 1224 self.session.send(self._query_socket, 'shutdown_request')
1211 1225 idents,msg = self.session.recv(self._query_socket, 0)
1212 1226 if self.debug:
1213 1227 pprint(msg)
1214 1228 if msg['content']['status'] != 'ok':
1215 1229 error = self._unwrap_exception(msg['content'])
1216 1230
1217 1231 if error:
1218 1232 raise error
1219 1233
1220 1234 #--------------------------------------------------------------------------
1221 1235 # Execution related methods
1222 1236 #--------------------------------------------------------------------------
1223 1237
1224 1238 def _maybe_raise(self, result):
1225 1239 """wrapper for maybe raising an exception if apply failed."""
1226 1240 if isinstance(result, error.RemoteError):
1227 1241 raise result
1228 1242
1229 1243 return result
1230 1244
1231 1245 def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False,
1232 1246 ident=None):
1233 1247 """construct and send an apply message via a socket.
1234 1248
1235 1249 This is the principal method with which all engine execution is performed by views.
1236 1250 """
1237 1251
1238 1252 if self._closed:
1239 1253 raise RuntimeError("Client cannot be used after its sockets have been closed")
1240 1254
1241 1255 # defaults:
1242 1256 args = args if args is not None else []
1243 1257 kwargs = kwargs if kwargs is not None else {}
1244 1258 metadata = metadata if metadata is not None else {}
1245 1259
1246 1260 # validate arguments
1247 1261 if not callable(f) and not isinstance(f, Reference):
1248 1262 raise TypeError("f must be callable, not %s"%type(f))
1249 1263 if not isinstance(args, (tuple, list)):
1250 1264 raise TypeError("args must be tuple or list, not %s"%type(args))
1251 1265 if not isinstance(kwargs, dict):
1252 1266 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1253 1267 if not isinstance(metadata, dict):
1254 1268 raise TypeError("metadata must be dict, not %s"%type(metadata))
1255 1269
1256 1270 bufs = serialize.pack_apply_message(f, args, kwargs,
1257 1271 buffer_threshold=self.session.buffer_threshold,
1258 1272 item_threshold=self.session.item_threshold,
1259 1273 )
1260 1274
1261 1275 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1262 1276 metadata=metadata, track=track)
1263 1277
1264 1278 msg_id = msg['header']['msg_id']
1265 1279 self.outstanding.add(msg_id)
1266 1280 if ident:
1267 1281 # possibly routed to a specific engine
1268 1282 if isinstance(ident, list):
1269 1283 ident = ident[-1]
1270 1284 if ident in self._engines.values():
1271 1285 # save for later, in case of engine death
1272 1286 self._outstanding_dict[ident].add(msg_id)
1273 1287 self.history.append(msg_id)
1274 1288 self.metadata[msg_id]['submitted'] = datetime.now()
1275 1289
1276 1290 return msg
1277 1291
1278 1292 def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None):
1279 1293 """construct and send an execute request via a socket.
1280 1294
1281 1295 """
1282 1296
1283 1297 if self._closed:
1284 1298 raise RuntimeError("Client cannot be used after its sockets have been closed")
1285 1299
1286 1300 # defaults:
1287 1301 metadata = metadata if metadata is not None else {}
1288 1302
1289 1303 # validate arguments
1290 1304 if not isinstance(code, string_types):
1291 1305 raise TypeError("code must be text, not %s" % type(code))
1292 1306 if not isinstance(metadata, dict):
1293 1307 raise TypeError("metadata must be dict, not %s" % type(metadata))
1294 1308
1295 1309 content = dict(code=code, silent=bool(silent), user_expressions={})
1296 1310
1297 1311
1298 1312 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1299 1313 metadata=metadata)
1300 1314
1301 1315 msg_id = msg['header']['msg_id']
1302 1316 self.outstanding.add(msg_id)
1303 1317 if ident:
1304 1318 # possibly routed to a specific engine
1305 1319 if isinstance(ident, list):
1306 1320 ident = ident[-1]
1307 1321 if ident in self._engines.values():
1308 1322 # save for later, in case of engine death
1309 1323 self._outstanding_dict[ident].add(msg_id)
1310 1324 self.history.append(msg_id)
1311 1325 self.metadata[msg_id]['submitted'] = datetime.now()
1312 1326
1313 1327 return msg
1314 1328
1315 1329 #--------------------------------------------------------------------------
1316 1330 # construct a View object
1317 1331 #--------------------------------------------------------------------------
1318 1332
1319 1333 def load_balanced_view(self, targets=None):
1320 1334 """construct a DirectView object.
1321 1335
1322 1336 If no arguments are specified, create a LoadBalancedView
1323 1337 using all engines.
1324 1338
1325 1339 Parameters
1326 1340 ----------
1327 1341
1328 1342 targets: list,slice,int,etc. [default: use all engines]
1329 1343 The subset of engines across which to load-balance
1330 1344 """
1331 1345 if targets == 'all':
1332 1346 targets = None
1333 1347 if targets is not None:
1334 1348 targets = self._build_targets(targets)[1]
1335 1349 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1336 1350
1337 1351 def direct_view(self, targets='all'):
1338 1352 """construct a DirectView object.
1339 1353
1340 1354 If no targets are specified, create a DirectView using all engines.
1341 1355
1342 1356 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1343 1357 evaluate the target engines at each execution, whereas rc[:] will connect to
1344 1358 all *current* engines, and that list will not change.
1345 1359
1346 1360 That is, 'all' will always use all engines, whereas rc[:] will not use
1347 1361 engines added after the DirectView is constructed.
1348 1362
1349 1363 Parameters
1350 1364 ----------
1351 1365
1352 1366 targets: list,slice,int,etc. [default: use all engines]
1353 1367 The engines to use for the View
1354 1368 """
1355 1369 single = isinstance(targets, int)
1356 1370 # allow 'all' to be lazily evaluated at each execution
1357 1371 if targets != 'all':
1358 1372 targets = self._build_targets(targets)[1]
1359 1373 if single:
1360 1374 targets = targets[0]
1361 1375 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1362 1376
1363 1377 #--------------------------------------------------------------------------
1364 1378 # Query methods
1365 1379 #--------------------------------------------------------------------------
1366 1380
1367 1381 @spin_first
1368 1382 def get_result(self, indices_or_msg_ids=None, block=None, owner=True):
1369 1383 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1370 1384
1371 1385 If the client already has the results, no request to the Hub will be made.
1372 1386
1373 1387 This is a convenient way to construct AsyncResult objects, which are wrappers
1374 1388 that include metadata about execution, and allow for awaiting results that
1375 1389 were not submitted by this Client.
1376 1390
1377 1391 It can also be a convenient way to retrieve the metadata associated with
1378 1392 blocking execution, since it always retrieves
1379 1393
1380 1394 Examples
1381 1395 --------
1382 1396 ::
1383 1397
1384 1398 In [10]: r = client.apply()
1385 1399
1386 1400 Parameters
1387 1401 ----------
1388 1402
1389 1403 indices_or_msg_ids : integer history index, str msg_id, or list of either
1390 1404 The indices or msg_ids of indices to be retrieved
1391 1405
1392 1406 block : bool
1393 1407 Whether to wait for the result to be done
1394 1408 owner : bool [default: True]
1395 1409 Whether this AsyncResult should own the result.
1396 1410 If so, calling `ar.get()` will remove data from the
1397 1411 client's result and metadata cache.
1398 1412 There should only be one owner of any given msg_id.
1399 1413
1400 1414 Returns
1401 1415 -------
1402 1416
1403 1417 AsyncResult
1404 1418 A single AsyncResult object will always be returned.
1405 1419
1406 1420 AsyncHubResult
1407 1421 A subclass of AsyncResult that retrieves results from the Hub
1408 1422
1409 1423 """
1410 1424 block = self.block if block is None else block
1411 1425 if indices_or_msg_ids is None:
1412 1426 indices_or_msg_ids = -1
1413 1427
1414 1428 single_result = False
1415 1429 if not isinstance(indices_or_msg_ids, (list,tuple)):
1416 1430 indices_or_msg_ids = [indices_or_msg_ids]
1417 1431 single_result = True
1418 1432
1419 1433 theids = []
1420 1434 for id in indices_or_msg_ids:
1421 1435 if isinstance(id, int):
1422 1436 id = self.history[id]
1423 1437 if not isinstance(id, string_types):
1424 1438 raise TypeError("indices must be str or int, not %r"%id)
1425 1439 theids.append(id)
1426 1440
1427 1441 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1428 1442 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1429 1443
1430 1444 # given single msg_id initially, get_result shot get the result itself,
1431 1445 # not a length-one list
1432 1446 if single_result:
1433 1447 theids = theids[0]
1434 1448
1435 1449 if remote_ids:
1436 1450 ar = AsyncHubResult(self, msg_ids=theids, owner=owner)
1437 1451 else:
1438 1452 ar = AsyncResult(self, msg_ids=theids, owner=owner)
1439 1453
1440 1454 if block:
1441 1455 ar.wait()
1442 1456
1443 1457 return ar
1444 1458
1445 1459 @spin_first
1446 1460 def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None):
1447 1461 """Resubmit one or more tasks.
1448 1462
1449 1463 in-flight tasks may not be resubmitted.
1450 1464
1451 1465 Parameters
1452 1466 ----------
1453 1467
1454 1468 indices_or_msg_ids : integer history index, str msg_id, or list of either
1455 1469 The indices or msg_ids of indices to be retrieved
1456 1470
1457 1471 block : bool
1458 1472 Whether to wait for the result to be done
1459 1473
1460 1474 Returns
1461 1475 -------
1462 1476
1463 1477 AsyncHubResult
1464 1478 A subclass of AsyncResult that retrieves results from the Hub
1465 1479
1466 1480 """
1467 1481 block = self.block if block is None else block
1468 1482 if indices_or_msg_ids is None:
1469 1483 indices_or_msg_ids = -1
1470 1484
1471 1485 if not isinstance(indices_or_msg_ids, (list,tuple)):
1472 1486 indices_or_msg_ids = [indices_or_msg_ids]
1473 1487
1474 1488 theids = []
1475 1489 for id in indices_or_msg_ids:
1476 1490 if isinstance(id, int):
1477 1491 id = self.history[id]
1478 1492 if not isinstance(id, string_types):
1479 1493 raise TypeError("indices must be str or int, not %r"%id)
1480 1494 theids.append(id)
1481 1495
1482 1496 content = dict(msg_ids = theids)
1483 1497
1484 1498 self.session.send(self._query_socket, 'resubmit_request', content)
1485 1499
1486 1500 zmq.select([self._query_socket], [], [])
1487 1501 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1488 1502 if self.debug:
1489 1503 pprint(msg)
1490 1504 content = msg['content']
1491 1505 if content['status'] != 'ok':
1492 1506 raise self._unwrap_exception(content)
1493 1507 mapping = content['resubmitted']
1494 1508 new_ids = [ mapping[msg_id] for msg_id in theids ]
1495 1509
1496 1510 ar = AsyncHubResult(self, msg_ids=new_ids)
1497 1511
1498 1512 if block:
1499 1513 ar.wait()
1500 1514
1501 1515 return ar
1502 1516
1503 1517 @spin_first
1504 1518 def result_status(self, msg_ids, status_only=True):
1505 1519 """Check on the status of the result(s) of the apply request with `msg_ids`.
1506 1520
1507 1521 If status_only is False, then the actual results will be retrieved, else
1508 1522 only the status of the results will be checked.
1509 1523
1510 1524 Parameters
1511 1525 ----------
1512 1526
1513 1527 msg_ids : list of msg_ids
1514 1528 if int:
1515 1529 Passed as index to self.history for convenience.
1516 1530 status_only : bool (default: True)
1517 1531 if False:
1518 1532 Retrieve the actual results of completed tasks.
1519 1533
1520 1534 Returns
1521 1535 -------
1522 1536
1523 1537 results : dict
1524 1538 There will always be the keys 'pending' and 'completed', which will
1525 1539 be lists of msg_ids that are incomplete or complete. If `status_only`
1526 1540 is False, then completed results will be keyed by their `msg_id`.
1527 1541 """
1528 1542 if not isinstance(msg_ids, (list,tuple)):
1529 1543 msg_ids = [msg_ids]
1530 1544
1531 1545 theids = []
1532 1546 for msg_id in msg_ids:
1533 1547 if isinstance(msg_id, int):
1534 1548 msg_id = self.history[msg_id]
1535 1549 if not isinstance(msg_id, string_types):
1536 1550 raise TypeError("msg_ids must be str, not %r"%msg_id)
1537 1551 theids.append(msg_id)
1538 1552
1539 1553 completed = []
1540 1554 local_results = {}
1541 1555
1542 1556 # comment this block out to temporarily disable local shortcut:
1543 1557 for msg_id in theids:
1544 1558 if msg_id in self.results:
1545 1559 completed.append(msg_id)
1546 1560 local_results[msg_id] = self.results[msg_id]
1547 1561 theids.remove(msg_id)
1548 1562
1549 1563 if theids: # some not locally cached
1550 1564 content = dict(msg_ids=theids, status_only=status_only)
1551 1565 msg = self.session.send(self._query_socket, "result_request", content=content)
1552 1566 zmq.select([self._query_socket], [], [])
1553 1567 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1554 1568 if self.debug:
1555 1569 pprint(msg)
1556 1570 content = msg['content']
1557 1571 if content['status'] != 'ok':
1558 1572 raise self._unwrap_exception(content)
1559 1573 buffers = msg['buffers']
1560 1574 else:
1561 1575 content = dict(completed=[],pending=[])
1562 1576
1563 1577 content['completed'].extend(completed)
1564 1578
1565 1579 if status_only:
1566 1580 return content
1567 1581
1568 1582 failures = []
1569 1583 # load cached results into result:
1570 1584 content.update(local_results)
1571 1585
1572 1586 # update cache with results:
1573 1587 for msg_id in sorted(theids):
1574 1588 if msg_id in content['completed']:
1575 1589 rec = content[msg_id]
1576 1590 parent = extract_dates(rec['header'])
1577 1591 header = extract_dates(rec['result_header'])
1578 1592 rcontent = rec['result_content']
1579 1593 iodict = rec['io']
1580 1594 if isinstance(rcontent, str):
1581 1595 rcontent = self.session.unpack(rcontent)
1582 1596
1583 1597 md = self.metadata[msg_id]
1584 1598 md_msg = dict(
1585 1599 content=rcontent,
1586 1600 parent_header=parent,
1587 1601 header=header,
1588 1602 metadata=rec['result_metadata'],
1589 1603 )
1590 1604 md.update(self._extract_metadata(md_msg))
1591 1605 if rec.get('received'):
1592 1606 md['received'] = parse_date(rec['received'])
1593 1607 md.update(iodict)
1594 1608
1595 1609 if rcontent['status'] == 'ok':
1596 1610 if header['msg_type'] == 'apply_reply':
1597 1611 res,buffers = serialize.unserialize_object(buffers)
1598 1612 elif header['msg_type'] == 'execute_reply':
1599 1613 res = ExecuteReply(msg_id, rcontent, md)
1600 1614 else:
1601 1615 raise KeyError("unhandled msg type: %r" % header['msg_type'])
1602 1616 else:
1603 1617 res = self._unwrap_exception(rcontent)
1604 1618 failures.append(res)
1605 1619
1606 1620 self.results[msg_id] = res
1607 1621 content[msg_id] = res
1608 1622
1609 1623 if len(theids) == 1 and failures:
1610 1624 raise failures[0]
1611 1625
1612 1626 error.collect_exceptions(failures, "result_status")
1613 1627 return content
1614 1628
1615 1629 @spin_first
1616 1630 def queue_status(self, targets='all', verbose=False):
1617 1631 """Fetch the status of engine queues.
1618 1632
1619 1633 Parameters
1620 1634 ----------
1621 1635
1622 1636 targets : int/str/list of ints/strs
1623 1637 the engines whose states are to be queried.
1624 1638 default : all
1625 1639 verbose : bool
1626 1640 Whether to return lengths only, or lists of ids for each element
1627 1641 """
1628 1642 if targets == 'all':
1629 1643 # allow 'all' to be evaluated on the engine
1630 1644 engine_ids = None
1631 1645 else:
1632 1646 engine_ids = self._build_targets(targets)[1]
1633 1647 content = dict(targets=engine_ids, verbose=verbose)
1634 1648 self.session.send(self._query_socket, "queue_request", content=content)
1635 1649 idents,msg = self.session.recv(self._query_socket, 0)
1636 1650 if self.debug:
1637 1651 pprint(msg)
1638 1652 content = msg['content']
1639 1653 status = content.pop('status')
1640 1654 if status != 'ok':
1641 1655 raise self._unwrap_exception(content)
1642 1656 content = rekey(content)
1643 1657 if isinstance(targets, int):
1644 1658 return content[targets]
1645 1659 else:
1646 1660 return content
1647 1661
1648 1662 def _build_msgids_from_target(self, targets=None):
1649 1663 """Build a list of msg_ids from the list of engine targets"""
1650 1664 if not targets: # needed as _build_targets otherwise uses all engines
1651 1665 return []
1652 1666 target_ids = self._build_targets(targets)[0]
1653 1667 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1654 1668
1655 1669 def _build_msgids_from_jobs(self, jobs=None):
1656 1670 """Build a list of msg_ids from "jobs" """
1657 1671 if not jobs:
1658 1672 return []
1659 1673 msg_ids = []
1660 1674 if isinstance(jobs, string_types + (AsyncResult,)):
1661 1675 jobs = [jobs]
1662 1676 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1663 1677 if bad_ids:
1664 1678 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1665 1679 for j in jobs:
1666 1680 if isinstance(j, AsyncResult):
1667 1681 msg_ids.extend(j.msg_ids)
1668 1682 else:
1669 1683 msg_ids.append(j)
1670 1684 return msg_ids
1671 1685
1672 1686 def purge_local_results(self, jobs=[], targets=[]):
1673 1687 """Clears the client caches of results and their metadata.
1674 1688
1675 1689 Individual results can be purged by msg_id, or the entire
1676 1690 history of specific targets can be purged.
1677 1691
1678 1692 Use `purge_local_results('all')` to scrub everything from the Clients's
1679 1693 results and metadata caches.
1680 1694
1681 1695 After this call all `AsyncResults` are invalid and should be discarded.
1682 1696
1683 1697 If you must "reget" the results, you can still do so by using
1684 1698 `client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will
1685 1699 redownload the results from the hub if they are still available
1686 1700 (i.e `client.purge_hub_results(...)` has not been called.
1687 1701
1688 1702 Parameters
1689 1703 ----------
1690 1704
1691 1705 jobs : str or list of str or AsyncResult objects
1692 1706 the msg_ids whose results should be purged.
1693 1707 targets : int/list of ints
1694 1708 The engines, by integer ID, whose entire result histories are to be purged.
1695 1709
1696 1710 Raises
1697 1711 ------
1698 1712
1699 1713 RuntimeError : if any of the tasks to be purged are still outstanding.
1700 1714
1701 1715 """
1702 1716 if not targets and not jobs:
1703 1717 raise ValueError("Must specify at least one of `targets` and `jobs`")
1704 1718
1705 1719 if jobs == 'all':
1706 1720 if self.outstanding:
1707 1721 raise RuntimeError("Can't purge outstanding tasks: %s" % self.outstanding)
1708 1722 self.results.clear()
1709 1723 self.metadata.clear()
1710 1724 else:
1711 1725 msg_ids = set()
1712 1726 msg_ids.update(self._build_msgids_from_target(targets))
1713 1727 msg_ids.update(self._build_msgids_from_jobs(jobs))
1714 1728 still_outstanding = self.outstanding.intersection(msg_ids)
1715 1729 if still_outstanding:
1716 1730 raise RuntimeError("Can't purge outstanding tasks: %s" % still_outstanding)
1717 1731 for mid in msg_ids:
1718 1732 self.results.pop(mid, None)
1719 1733 self.metadata.pop(mid, None)
1720 1734
1721 1735
1722 1736 @spin_first
1723 1737 def purge_hub_results(self, jobs=[], targets=[]):
1724 1738 """Tell the Hub to forget results.
1725 1739
1726 1740 Individual results can be purged by msg_id, or the entire
1727 1741 history of specific targets can be purged.
1728 1742
1729 1743 Use `purge_results('all')` to scrub everything from the Hub's db.
1730 1744
1731 1745 Parameters
1732 1746 ----------
1733 1747
1734 1748 jobs : str or list of str or AsyncResult objects
1735 1749 the msg_ids whose results should be forgotten.
1736 1750 targets : int/str/list of ints/strs
1737 1751 The targets, by int_id, whose entire history is to be purged.
1738 1752
1739 1753 default : None
1740 1754 """
1741 1755 if not targets and not jobs:
1742 1756 raise ValueError("Must specify at least one of `targets` and `jobs`")
1743 1757 if targets:
1744 1758 targets = self._build_targets(targets)[1]
1745 1759
1746 1760 # construct msg_ids from jobs
1747 1761 if jobs == 'all':
1748 1762 msg_ids = jobs
1749 1763 else:
1750 1764 msg_ids = self._build_msgids_from_jobs(jobs)
1751 1765
1752 1766 content = dict(engine_ids=targets, msg_ids=msg_ids)
1753 1767 self.session.send(self._query_socket, "purge_request", content=content)
1754 1768 idents, msg = self.session.recv(self._query_socket, 0)
1755 1769 if self.debug:
1756 1770 pprint(msg)
1757 1771 content = msg['content']
1758 1772 if content['status'] != 'ok':
1759 1773 raise self._unwrap_exception(content)
1760 1774
1761 1775 def purge_results(self, jobs=[], targets=[]):
1762 1776 """Clears the cached results from both the hub and the local client
1763 1777
1764 1778 Individual results can be purged by msg_id, or the entire
1765 1779 history of specific targets can be purged.
1766 1780
1767 1781 Use `purge_results('all')` to scrub every cached result from both the Hub's and
1768 1782 the Client's db.
1769 1783
1770 1784 Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with
1771 1785 the same arguments.
1772 1786
1773 1787 Parameters
1774 1788 ----------
1775 1789
1776 1790 jobs : str or list of str or AsyncResult objects
1777 1791 the msg_ids whose results should be forgotten.
1778 1792 targets : int/str/list of ints/strs
1779 1793 The targets, by int_id, whose entire history is to be purged.
1780 1794
1781 1795 default : None
1782 1796 """
1783 1797 self.purge_local_results(jobs=jobs, targets=targets)
1784 1798 self.purge_hub_results(jobs=jobs, targets=targets)
1785 1799
1786 1800 def purge_everything(self):
1787 1801 """Clears all content from previous Tasks from both the hub and the local client
1788 1802
1789 1803 In addition to calling `purge_results("all")` it also deletes the history and
1790 1804 other bookkeeping lists.
1791 1805 """
1792 1806 self.purge_results("all")
1793 1807 self.history = []
1794 1808 self.session.digest_history.clear()
1795 1809
1796 1810 @spin_first
1797 1811 def hub_history(self):
1798 1812 """Get the Hub's history
1799 1813
1800 1814 Just like the Client, the Hub has a history, which is a list of msg_ids.
1801 1815 This will contain the history of all clients, and, depending on configuration,
1802 1816 may contain history across multiple cluster sessions.
1803 1817
1804 1818 Any msg_id returned here is a valid argument to `get_result`.
1805 1819
1806 1820 Returns
1807 1821 -------
1808 1822
1809 1823 msg_ids : list of strs
1810 1824 list of all msg_ids, ordered by task submission time.
1811 1825 """
1812 1826
1813 1827 self.session.send(self._query_socket, "history_request", content={})
1814 1828 idents, msg = self.session.recv(self._query_socket, 0)
1815 1829
1816 1830 if self.debug:
1817 1831 pprint(msg)
1818 1832 content = msg['content']
1819 1833 if content['status'] != 'ok':
1820 1834 raise self._unwrap_exception(content)
1821 1835 else:
1822 1836 return content['history']
1823 1837
1824 1838 @spin_first
1825 1839 def db_query(self, query, keys=None):
1826 1840 """Query the Hub's TaskRecord database
1827 1841
1828 1842 This will return a list of task record dicts that match `query`
1829 1843
1830 1844 Parameters
1831 1845 ----------
1832 1846
1833 1847 query : mongodb query dict
1834 1848 The search dict. See mongodb query docs for details.
1835 1849 keys : list of strs [optional]
1836 1850 The subset of keys to be returned. The default is to fetch everything but buffers.
1837 1851 'msg_id' will *always* be included.
1838 1852 """
1839 1853 if isinstance(keys, string_types):
1840 1854 keys = [keys]
1841 1855 content = dict(query=query, keys=keys)
1842 1856 self.session.send(self._query_socket, "db_request", content=content)
1843 1857 idents, msg = self.session.recv(self._query_socket, 0)
1844 1858 if self.debug:
1845 1859 pprint(msg)
1846 1860 content = msg['content']
1847 1861 if content['status'] != 'ok':
1848 1862 raise self._unwrap_exception(content)
1849 1863
1850 1864 records = content['records']
1851 1865
1852 1866 buffer_lens = content['buffer_lens']
1853 1867 result_buffer_lens = content['result_buffer_lens']
1854 1868 buffers = msg['buffers']
1855 1869 has_bufs = buffer_lens is not None
1856 1870 has_rbufs = result_buffer_lens is not None
1857 1871 for i,rec in enumerate(records):
1858 1872 # unpack datetime objects
1859 1873 for hkey in ('header', 'result_header'):
1860 1874 if hkey in rec:
1861 1875 rec[hkey] = extract_dates(rec[hkey])
1862 1876 for dtkey in ('submitted', 'started', 'completed', 'received'):
1863 1877 if dtkey in rec:
1864 1878 rec[dtkey] = parse_date(rec[dtkey])
1865 1879 # relink buffers
1866 1880 if has_bufs:
1867 1881 blen = buffer_lens[i]
1868 1882 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1869 1883 if has_rbufs:
1870 1884 blen = result_buffer_lens[i]
1871 1885 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1872 1886
1873 1887 return records
1874 1888
1875 1889 __all__ = [ 'Client' ]
General Comments 0
You need to be logged in to leave comments. Login now