client.py
1839 lines
| 65.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """A semi-synchronous Client for the ZMQ cluster | ||
Authors: | ||||
* MinRK | ||||
""" | ||||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3551 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3575 | import os | ||
MinRK
|
r3631 | import json | ||
MinRK
|
r4155 | import sys | ||
MinRK
|
r6484 | from threading import Thread, Event | ||
MinRK
|
r3551 | import time | ||
MinRK
|
r3631 | import warnings | ||
from datetime import datetime | ||||
MinRK
|
r3581 | from getpass import getpass | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3631 | |||
MinRK
|
r3614 | pjoin = os.path.join | ||
MinRK
|
r3540 | |||
MinRK
|
r3551 | import zmq | ||
MinRK
|
r3635 | # from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r3551 | |||
MinRK
|
r4071 | from IPython.config.configurable import MultipleInstanceError | ||
from IPython.core.application import BaseIPythonApplication | ||||
MinRK
|
r7476 | from IPython.core.profiledir import ProfileDir, ProfileDirError | ||
MinRK
|
r4071 | |||
MinRK
|
r7026 | from IPython.utils.coloransi import TermColors | ||
MinRK
|
r4036 | from IPython.utils.jsonutil import rekey | ||
W. Trevor King
|
r9254 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS | ||
MinRK
|
r3614 | from IPython.utils.path import get_ipython_dir | ||
MinRK
|
r6813 | from IPython.utils.py3compat import cast_bytes | ||
MinRK
|
r5344 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, | ||
MinRK
|
r6484 | Dict, List, Bool, Set, Any) | ||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
MinRK
|
r3619 | from IPython.external.ssh import tunnel | ||
MinRK
|
r3539 | |||
MinRK
|
r5821 | from IPython.parallel import Reference | ||
MinRK
|
r3673 | from IPython.parallel import error | ||
from IPython.parallel import util | ||||
MinRK
|
r9372 | from IPython.kernel.zmq.session import Session, Message | ||
from IPython.kernel.zmq import serialize | ||||
MinRK
|
r4006 | |||
MinRK
|
r3666 | from .asyncresult import AsyncResult, AsyncHubResult | ||
MinRK
|
r3642 | from .view import DirectView, LoadBalancedView | ||
MinRK
|
r3539 | |||
MinRK
|
r4155 | if sys.version_info[0] >= 3: | ||
MinRK
|
r4161 | # xrange is used in a couple 'isinstance' tests in py2 | ||
MinRK
|
r4155 | # should be just 'range' in 3k | ||
xrange = range | ||||
MinRK
|
r3584 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3555 | # Decorators for Client methods | ||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @decorator | ||
MinRK
|
r3664 | def spin_first(f, self, *args, **kwargs): | ||
MinRK
|
r3555 | """Call spin() to sync state prior to calling the method.""" | ||
MinRK
|
r3539 | self.spin() | ||
return f(self, *args, **kwargs) | ||||
MinRK
|
r3598 | |||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r6819 | |||
class ExecuteReply(object): | ||||
"""wrapper for finished Execute results""" | ||||
def __init__(self, msg_id, content, metadata): | ||||
self.msg_id = msg_id | ||||
self._content = content | ||||
self.execution_count = content['execution_count'] | ||||
self.metadata = metadata | ||||
def __getitem__(self, key): | ||||
return self.metadata[key] | ||||
def __getattr__(self, key): | ||||
if key not in self.metadata: | ||||
raise AttributeError(key) | ||||
return self.metadata[key] | ||||
def __repr__(self): | ||||
MinRK
|
r7026 | pyout = self.metadata['pyout'] or {'data':{}} | ||
text_out = pyout['data'].get('text/plain', '') | ||||
MinRK
|
r6819 | if len(text_out) > 32: | ||
text_out = text_out[:29] + '...' | ||||
return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out) | ||||
MinRK
|
r7026 | def _repr_pretty_(self, p, cycle): | ||
pyout = self.metadata['pyout'] or {'data':{}} | ||||
text_out = pyout['data'].get('text/plain', '') | ||||
if not text_out: | ||||
return | ||||
try: | ||||
ip = get_ipython() | ||||
except NameError: | ||||
colors = "NoColor" | ||||
else: | ||||
colors = ip.colors | ||||
if colors == "NoColor": | ||||
out = normal = "" | ||||
else: | ||||
out = TermColors.Red | ||||
normal = TermColors.Normal | ||||
MinRK
|
r7239 | if '\n' in text_out and not text_out.startswith('\n'): | ||
# add newline for multiline reprs | ||||
text_out = '\n' + text_out | ||||
MinRK
|
r7026 | p.text( | ||
MinRK
|
r7239 | out + u'Out[%i:%i]: ' % ( | ||
self.metadata['engine_id'], self.execution_count | ||||
) + normal + text_out | ||||
MinRK
|
r7026 | ) | ||
MinRK
|
r6819 | def _repr_html_(self): | ||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("text/html") | ||||
MinRK
|
r6819 | |||
def _repr_latex_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("text/latex") | ||||
MinRK
|
r6819 | |||
def _repr_json_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("application/json") | ||||
MinRK
|
r6819 | |||
def _repr_javascript_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("application/javascript") | ||||
MinRK
|
r6819 | |||
def _repr_png_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("image/png") | ||||
MinRK
|
r6819 | |||
def _repr_jpeg_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("image/jpeg") | ||||
MinRK
|
r6819 | |||
def _repr_svg_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("image/svg+xml") | ||||
MinRK
|
r6819 | |||
MinRK
|
r3602 | class Metadata(dict): | ||
MinRK
|
r3607 | """Subclass of dict for initializing metadata values. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | Attribute access works on keys. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | These objects have a strict set of keys - errors will raise if you try | ||
to add new keys. | ||||
""" | ||||
MinRK
|
r3602 | def __init__(self, *args, **kwargs): | ||
dict.__init__(self) | ||||
md = {'msg_id' : None, | ||||
'submitted' : None, | ||||
'started' : None, | ||||
'completed' : None, | ||||
'received' : None, | ||||
'engine_uuid' : None, | ||||
'engine_id' : None, | ||||
'follow' : None, | ||||
'after' : None, | ||||
'status' : None, | ||||
'pyin' : None, | ||||
'pyout' : None, | ||||
'pyerr' : None, | ||||
'stdout' : '', | ||||
'stderr' : '', | ||||
MinRK
|
r6812 | 'outputs' : [], | ||
MinRK
|
r8103 | 'data': {}, | ||
MinRK
|
r7494 | 'outputs_ready' : False, | ||
MinRK
|
r3602 | } | ||
self.update(md) | ||||
self.update(dict(*args, **kwargs)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | def __getattr__(self, key): | ||
"""getattr aliased to getitem""" | ||||
if key in self.iterkeys(): | ||||
return self[key] | ||||
else: | ||||
raise AttributeError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3607 | def __setattr__(self, key, value): | ||
"""setattr aliased to setitem, with strict""" | ||||
if key in self.iterkeys(): | ||||
self[key] = value | ||||
else: | ||||
raise AttributeError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | def __setitem__(self, key, value): | ||
"""strict static key enforcement""" | ||||
if key in self.iterkeys(): | ||||
dict.__setitem__(self, key, value) | ||||
else: | ||||
raise KeyError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | |||
MinRK
|
r3636 | class Client(HasTraits): | ||
MinRK
|
r3664 | """A semi-synchronous client to the IPython ZMQ cluster | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7889 | url_file : str/unicode; path to ipcontroller-client.json | ||
This JSON file should contain all the information needed to connect to a cluster, | ||||
and is likely the only argument needed. | ||||
MinRK
|
r3619 | Connection information for the Hub's registration. If a json connector | ||
file is given, then likely no further configuration is necessary. | ||||
MinRK
|
r3620 | [Default: use profile] | ||
profile : bytes | ||||
The name of the Cluster profile to be used to find connector information. | ||||
MinRK
|
r4071 | If run from an IPython application, the default profile will be the same | ||
as the running application, otherwise it will be 'default'. | ||||
Robert McGibbon
|
r8233 | cluster_id : str | ||
String id to added to runtime files, to prevent name collisions when using | ||||
multiple clusters with a single profile simultaneously. | ||||
When set, will look for files named like: 'ipcontroller-<cluster_id>-client.json' | ||||
Since this is text inserted into filenames, typical recommendations apply: | ||||
Simple character strings are ideal, and spaces are not recommended (but | ||||
should generally work) | ||||
Min RK
|
r3572 | context : zmq.Context | ||
MinRK
|
r3620 | Pass an existing zmq.Context instance, otherwise the client will create its own. | ||
Min RK
|
r3572 | debug : bool | ||
flag for lots of message printing for debug purposes | ||||
Bernardo B. Marques
|
r4872 | timeout : int/float | ||
MinRK
|
r4013 | time (in seconds) to wait for connection replies from the Hub | ||
[Default: 10] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4013 | #-------------- session related args ---------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4013 | config : Config object | ||
If specified, this will be relayed to the Session for configuration | ||||
username : str | ||||
set username for the session object | ||||
Bernardo B. Marques
|
r4872 | |||
Min RK
|
r3572 | #-------------- ssh related args ---------------- | ||
# These are args for configuring the ssh tunnel to be used | ||||
# credentials are used to forward connections over ssh to the Controller | ||||
# Note that the ip given in `addr` needs to be relative to sshserver | ||||
# The most basic case is to leave addr as pointing to localhost (127.0.0.1), | ||||
Bernardo B. Marques
|
r4872 | # and set sshserver as the same machine the Controller is on. However, | ||
Min RK
|
r3572 | # the only requirement is that sshserver is able to see the Controller | ||
# (i.e. is within the same trusted network). | ||||
Bernardo B. Marques
|
r4872 | |||
Min RK
|
r3572 | sshserver : str | ||
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port' | ||||
If keyfile or password is specified, and this is not, it will default to | ||||
the ip given in addr. | ||||
MinRK
|
r4589 | sshkey : str; path to ssh private key file | ||
Min RK
|
r3572 | This specifies a key to be used in ssh login, default None. | ||
Regular default ssh keys will be used without specifying this argument. | ||||
Bernardo B. Marques
|
r4872 | password : str | ||
Min RK
|
r3572 | Your ssh password to sshserver. Note that if this is left None, | ||
you will be prompted for it if passwordless key based login is unavailable. | ||||
MinRK
|
r3619 | paramiko : bool | ||
flag for whether to use paramiko instead of shell ssh for tunneling. | ||||
[default: True on win32, False else] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Attributes | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | ids : list of int engine IDs | ||
MinRK
|
r3539 | requesting the ids attribute always synchronizes | ||
the registration state. To request ids without synchronization, | ||||
MinRK
|
r3575 | use semi-private _ids attributes. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | history : list of msg_ids | ||
a list of msg_ids, keeping track of all the execution | ||||
MinRK
|
r3555 | messages you have submitted in order. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | outstanding : set of msg_ids | ||
a set of msg_ids that have been submitted, but whose | ||||
MinRK
|
r3555 | results have not yet been received. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | results : dict | ||
a dict of all our results, keyed by msg_id | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | block : bool | ||
determines default behavior when block not specified | ||||
in execution methods | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Methods | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | spin | ||
flushes incoming results and registration state changes | ||||
control methods spin, and requesting `ids` also ensures up to date | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | wait | ||
MinRK
|
r3635 | wait on one or more msg_ids | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | execution methods | ||
apply | ||||
MinRK
|
r3539 | legacy: execute, run | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | data movement | ||
push, pull, scatter, gather | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | query methods | ||
MinRK
|
r3664 | queue_status, get_result, purge, result_status | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | control methods | ||
abort, shutdown | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3636 | block = Bool(False) | ||
MinRK
|
r3651 | outstanding = Set() | ||
results = Instance('collections.defaultdict', (dict,)) | ||||
metadata = Instance('collections.defaultdict', (Metadata,)) | ||||
MinRK
|
r3636 | history = List() | ||
debug = Bool(False) | ||||
MinRK
|
r6484 | _spin_thread = Any() | ||
_stop_spinning = Any() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | profile=Unicode() | ||
def _profile_default(self): | ||||
if BaseIPythonApplication.initialized(): | ||||
# an IPython app *might* be running, try to get its profile | ||||
try: | ||||
return BaseIPythonApplication.instance().profile | ||||
except (AttributeError, MultipleInstanceError): | ||||
# could be a *different* subclass of config.Application, | ||||
# which would raise one of these two errors. | ||||
return u'default' | ||||
else: | ||||
return u'default' | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | _outstanding_dict = Instance('collections.defaultdict', (set,)) | ||
MinRK
|
r3636 | _ids = List() | ||
_connected=Bool(False) | ||||
_ssh=Bool(False) | ||||
_context = Instance('zmq.Context') | ||||
_config = Dict() | ||||
MinRK
|
r3664 | _engines=Instance(util.ReverseDict, (), {}) | ||
MinRK
|
r3657 | # _hub_socket=Instance('zmq.Socket') | ||
MinRK
|
r3636 | _query_socket=Instance('zmq.Socket') | ||
_control_socket=Instance('zmq.Socket') | ||||
_iopub_socket=Instance('zmq.Socket') | ||||
_notification_socket=Instance('zmq.Socket') | ||||
MinRK
|
r3664 | _mux_socket=Instance('zmq.Socket') | ||
_task_socket=Instance('zmq.Socket') | ||||
MinRK
|
r3988 | _task_scheme=Unicode() | ||
MinRK
|
r3636 | _closed = False | ||
MinRK
|
r5344 | _ignored_control_replies=Integer(0) | ||
_ignored_hub_replies=Integer(0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | def __new__(self, *args, **kw): | ||
# don't raise on positional args | ||||
return HasTraits.__new__(self, **kw) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7889 | def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None, | ||
context=None, debug=False, | ||||
MinRK
|
r3575 | sshserver=None, sshkey=None, password=None, paramiko=None, | ||
Robert McGibbon
|
r8233 | timeout=10, cluster_id=None, **extra_args | ||
MinRK
|
r3614 | ): | ||
MinRK
|
r4071 | if profile: | ||
super(Client, self).__init__(debug=debug, profile=profile) | ||||
else: | ||||
super(Client, self).__init__(debug=debug) | ||||
MinRK
|
r3539 | if context is None: | ||
MinRK
|
r3661 | context = zmq.Context.instance() | ||
MinRK
|
r3636 | self._context = context | ||
MinRK
|
r6484 | self._stop_spinning = Event() | ||
MinRK
|
r7889 | |||
if 'url_or_file' in extra_args: | ||||
url_file = extra_args['url_or_file'] | ||||
warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning) | ||||
if url_file and util.is_url(url_file): | ||||
raise ValueError("single urls cannot be specified, url-files must be used.") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) | ||
MinRK
|
r7889 | |||
MinRK
|
r3614 | if self._cd is not None: | ||
MinRK
|
r7889 | if url_file is None: | ||
Robert McGibbon
|
r8234 | if not cluster_id: | ||
Robert McGibbon
|
r8233 | client_json = 'ipcontroller-client.json' | ||
else: | ||||
client_json = 'ipcontroller-%s-client.json' % cluster_id | ||||
Robert McGibbon
|
r8273 | url_file = pjoin(self._cd.security_dir, client_json) | ||
MinRK
|
r7889 | if url_file is None: | ||
MinRK
|
r6869 | raise ValueError( | ||
"I can't find enough information to connect to a hub!" | ||||
MinRK
|
r7889 | " Please specify at least one of url_file or profile." | ||
MinRK
|
r6869 | ) | ||
MinRK
|
r7889 | |||
with open(url_file) as f: | ||||
cfg = json.load(f) | ||||
self._task_scheme = cfg['task_scheme'] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3614 | # sync defaults from args, json: | ||
if sshserver: | ||||
cfg['ssh'] = sshserver | ||||
MinRK
|
r7889 | |||
MinRK
|
r3614 | location = cfg.setdefault('location', None) | ||
MinRK
|
r7890 | |||
proto,addr = cfg['interface'].split('://') | ||||
Joseph Lansdowne
|
r8354 | addr = util.disambiguate_ip_address(addr, location) | ||
MinRK
|
r7890 | cfg['interface'] = "%s://%s" % (proto, addr) | ||
# turn interface,port into full urls: | ||||
for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'): | ||||
cfg[key] = cfg['interface'] + ':%i' % cfg[key] | ||||
MinRK
|
r7889 | url = cfg['registration'] | ||
MinRK
|
r7890 | |||
W. Trevor King
|
r9254 | if location is not None and addr == LOCALHOST: | ||
MinRK
|
r4119 | # location specified, and connection is expected to be local | ||
if location not in LOCAL_IPS and not sshserver: | ||||
# load ssh from JSON *only* if the controller is not on | ||||
# this machine | ||||
sshserver=cfg['ssh'] | ||||
if location not in LOCAL_IPS and not sshserver: | ||||
# warn if no ssh specified, but SSH is probably needed | ||||
# This is only a warning, because the most likely cause | ||||
# is a local Controller on a laptop whose IP is dynamic | ||||
MinRK
|
r4117 | warnings.warn(""" | ||
Controller appears to be listening on localhost, but not on this machine. | ||||
If this is true, you should specify Client(...,sshserver='you@%s') | ||||
or instruct your controller to listen on an external IP."""%location, | ||||
RuntimeWarning) | ||||
MinRK
|
r4207 | elif not sshserver: | ||
# otherwise sync with cfg | ||||
sshserver = cfg['ssh'] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3614 | self._config = cfg | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3575 | self._ssh = bool(sshserver or sshkey or password) | ||
Min RK
|
r3572 | if self._ssh and sshserver is None: | ||
MinRK
|
r3614 | # default to ssh via localhost | ||
MinRK
|
r7890 | sshserver = addr | ||
Min RK
|
r3572 | if self._ssh and password is None: | ||
MinRK
|
r3575 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): | ||
Min RK
|
r3572 | password=False | ||
else: | ||||
password = getpass("SSH Password for %s: "%sshserver) | ||||
MinRK
|
r3575 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4013 | # configure and construct the session | ||
David Hirschfeld
|
r11987 | try: | ||
extra_args['packer'] = cfg['pack'] | ||||
extra_args['unpacker'] = cfg['unpack'] | ||||
extra_args['key'] = cast_bytes(cfg['key']) | ||||
extra_args['signature_scheme'] = cfg['signature_scheme'] | ||||
except KeyError as exc: | ||||
msg = '\n'.join([ | ||||
"Connection file is invalid (missing '{}'), possibly from an old version of IPython.", | ||||
"If you are reusing connection files, remove them and start ipcontroller again." | ||||
]) | ||||
raise ValueError(msg.format(exc.message)) | ||||
MinRK
|
r7889 | |||
MinRK
|
r4013 | self.session = Session(**extra_args) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4725 | self._query_socket = self._context.socket(zmq.DEALER) | ||
MinRK
|
r7888 | |||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r7890 | tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r7890 | self._query_socket.connect(cfg['registration']) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3636 | self.session.debug = self.debug | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | self._notification_handlers = {'registration_notification' : self._register_engine, | ||
'unregistration_notification' : self._unregister_engine, | ||||
MinRK
|
r3664 | 'shutdown_notification' : lambda msg: self.close(), | ||
MinRK
|
r3539 | } | ||
self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | ||||
'apply_reply' : self._handle_apply_reply} | ||||
MinRK
|
r3664 | self._connect(sshserver, ssh_kwargs, timeout) | ||
MinRK
|
r7476 | |||
# last step: setup magics, if we are in IPython: | ||||
try: | ||||
ip = get_ipython() | ||||
except NameError: | ||||
return | ||||
else: | ||||
if 'px' not in ip.magics_manager.magics: | ||||
# in IPython but we are the first Client. | ||||
# activate a default view for parallel magics. | ||||
self.activate() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3654 | def __del__(self): | ||
"""cleanup sockets, but _not_ context.""" | ||||
self.close() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3992 | def _setup_profile_dir(self, profile, profile_dir, ipython_dir): | ||
MinRK
|
r3614 | if ipython_dir is None: | ||
ipython_dir = get_ipython_dir() | ||||
MinRK
|
r3992 | if profile_dir is not None: | ||
MinRK
|
r3614 | try: | ||
MinRK
|
r3992 | self._cd = ProfileDir.find_profile_dir(profile_dir) | ||
MinRK
|
r3641 | return | ||
MinRK
|
r3992 | except ProfileDirError: | ||
MinRK
|
r3614 | pass | ||
elif profile is not None: | ||||
try: | ||||
MinRK
|
r3992 | self._cd = ProfileDir.find_profile_dir_by_name( | ||
MinRK
|
r3614 | ipython_dir, profile) | ||
MinRK
|
r3641 | return | ||
MinRK
|
r3992 | except ProfileDirError: | ||
MinRK
|
r3614 | pass | ||
MinRK
|
r3641 | self._cd = None | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _update_engines(self, engines): | ||
MinRK
|
r3555 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | ||
MinRK
|
r3539 | for k,v in engines.iteritems(): | ||
eid = int(k) | ||||
MinRK
|
r7891 | if eid not in self._engines: | ||
self._ids.append(eid) | ||||
MinRK
|
r4155 | self._engines[eid] = v | ||
MinRK
|
r3625 | self._ids = sorted(self._ids) | ||
MinRK
|
r3622 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | ||
MinRK
|
r3664 | self._task_scheme == 'pure' and self._task_socket: | ||
MinRK
|
r3622 | self._stop_scheduling_tasks() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3622 | def _stop_scheduling_tasks(self): | ||
"""Stop scheduling tasks because an engine has been unregistered | ||||
from a pure ZMQ scheduler. | ||||
""" | ||||
MinRK
|
r3664 | self._task_socket.close() | ||
self._task_socket = None | ||||
MinRK
|
r3622 | msg = "An engine has been unregistered, and we are using pure " +\ | ||
"ZMQ task scheduling. Task farming will be disabled." | ||||
if self.outstanding: | ||||
msg += " If you were running tasks when this happened, " +\ | ||||
"some `outstanding` msg_ids may never resolve." | ||||
warnings.warn(msg, RuntimeWarning) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _build_targets(self, targets): | ||
MinRK
|
r3555 | """Turn valid target IDs or 'all' into two lists: | ||
(int_ids, uuids). | ||||
""" | ||||
MinRK
|
r3777 | if not self._ids: | ||
# flush notification socket if no engines yet, just in case | ||||
if not self.ids: | ||||
raise error.NoEnginesRegistered("Can't build targets without any engines") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | if targets is None: | ||
targets = self._ids | ||||
MinRK
|
r4172 | elif isinstance(targets, basestring): | ||
MinRK
|
r3539 | if targets.lower() == 'all': | ||
targets = self._ids | ||||
else: | ||||
raise TypeError("%r not valid str target, must be 'all'"%(targets)) | ||||
elif isinstance(targets, int): | ||||
MinRK
|
r3665 | if targets < 0: | ||
targets = self.ids[targets] | ||||
MinRK
|
r3777 | if targets not in self._ids: | ||
MinRK
|
r3665 | raise IndexError("No such engine: %i"%targets) | ||
MinRK
|
r3539 | targets = [targets] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3665 | if isinstance(targets, slice): | ||
indices = range(len(self._ids))[targets] | ||||
ids = self.ids | ||||
targets = [ ids[i] for i in indices ] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3665 | if not isinstance(targets, (tuple, list, xrange)): | ||
raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6813 | return [cast_bytes(self._engines[t]) for t in targets], list(targets) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def _connect(self, sshserver, ssh_kwargs, timeout): | ||
"""setup all our socket connections to the cluster. This is called from | ||||
MinRK
|
r3555 | __init__.""" | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3622 | # Maybe allow reconnecting? | ||
MinRK
|
r3539 | if self._connected: | ||
return | ||||
self._connected=True | ||||
Min RK
|
r3572 | |||
MinRK
|
r3614 | def connect_socket(s, url): | ||
MinRK
|
r7889 | # url = util.disambiguate_url(url, self._config['location']) | ||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3614 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3614 | return s.connect(url) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3657 | self.session.send(self._query_socket, 'connection_request') | ||
MinRK
|
r4072 | # use Poller because zmq.select has wrong units in pyzmq 2.1.7 | ||
poller = zmq.Poller() | ||||
poller.register(self._query_socket, zmq.POLLIN) | ||||
# poll expects milliseconds, timeout is seconds | ||||
evts = poller.poll(timeout*1000) | ||||
if not evts: | ||||
MinRK
|
r3664 | raise error.TimeoutError("Hub connection request timed out") | ||
MinRK
|
r3657 | idents,msg = self.session.recv(self._query_socket,mode=0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r7889 | content = msg['content'] | ||
# self._config['registration'] = dict(content) | ||||
cfg = self._config | ||||
if content['status'] == 'ok': | ||||
self._mux_socket = self._context.socket(zmq.DEALER) | ||||
connect_socket(self._mux_socket, cfg['mux']) | ||||
self._task_socket = self._context.socket(zmq.DEALER) | ||||
connect_socket(self._task_socket, cfg['task']) | ||||
self._notification_socket = self._context.socket(zmq.SUB) | ||||
self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | ||||
connect_socket(self._notification_socket, cfg['notification']) | ||||
self._control_socket = self._context.socket(zmq.DEALER) | ||||
connect_socket(self._control_socket, cfg['control']) | ||||
self._iopub_socket = self._context.socket(zmq.SUB) | ||||
self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | ||||
connect_socket(self._iopub_socket, cfg['iopub']) | ||||
self._update_engines(dict(content['engines'])) | ||||
MinRK
|
r3539 | else: | ||
self._connected = False | ||||
raise Exception("Failed to connect!") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# handlers and callbacks for incoming messages | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | def _unwrap_exception(self, content): | ||
MinRK
|
r3664 | """unwrap exception, and remap engine_id to int.""" | ||
MinRK
|
r3644 | e = error.unwrap_exception(content) | ||
MinRK
|
r3658 | # print e.traceback | ||
MinRK
|
r3639 | if e.engine_info: | ||
MinRK
|
r3641 | e_uuid = e.engine_info['engine_uuid'] | ||
MinRK
|
r3639 | eid = self._engines[e_uuid] | ||
MinRK
|
r3641 | e.engine_info['engine_id'] = eid | ||
MinRK
|
r3639 | return e | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7957 | def _extract_metadata(self, msg): | ||
header = msg['header'] | ||||
parent = msg['parent_header'] | ||||
msg_meta = msg['metadata'] | ||||
content = msg['content'] | ||||
MinRK
|
r3598 | md = {'msg_id' : parent['msg_id'], | ||
'received' : datetime.now(), | ||||
MinRK
|
r7957 | 'engine_uuid' : msg_meta.get('engine', None), | ||
'follow' : msg_meta.get('follow', []), | ||||
'after' : msg_meta.get('after', []), | ||||
MinRK
|
r3602 | 'status' : content['status'], | ||
} | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | if md['engine_uuid'] is not None: | ||
md['engine_id'] = self._engines.get(md['engine_uuid'], None) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | if 'date' in parent: | ||
MinRK
|
r4006 | md['submitted'] = parent['date'] | ||
MinRK
|
r7957 | if 'started' in msg_meta: | ||
md['started'] = msg_meta['started'] | ||||
MinRK
|
r3607 | if 'date' in header: | ||
MinRK
|
r4006 | md['completed'] = header['date'] | ||
MinRK
|
r3598 | return md | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | def _register_engine(self, msg): | ||
"""Register a new engine, and update our connection info.""" | ||||
content = msg['content'] | ||||
eid = content['id'] | ||||
MinRK
|
r7891 | d = {eid : content['uuid']} | ||
MinRK
|
r3651 | self._update_engines(d) | ||
def _unregister_engine(self, msg): | ||||
"""Unregister an engine that has died.""" | ||||
content = msg['content'] | ||||
eid = int(content['id']) | ||||
if eid in self._ids: | ||||
self._ids.remove(eid) | ||||
uuid = self._engines.pop(eid) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | self._handle_stranded_msgs(eid, uuid) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | if self._task_socket and self._task_scheme == 'pure': | ||
MinRK
|
r3651 | self._stop_scheduling_tasks() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | def _handle_stranded_msgs(self, eid, uuid): | ||
"""Handle messages known to be on an engine when the engine unregisters. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | It is possible that this will fire prematurely - that is, an engine will | ||
go down after completing a result, and the client will be notified | ||||
of the unregistration and later receive the successful result. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | outstanding = self._outstanding_dict[uuid] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | for msg_id in list(outstanding): | ||
if msg_id in self.results: | ||||
Bernardo B. Marques
|
r4872 | # we already | ||
MinRK
|
r3651 | continue | ||
try: | ||||
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) | ||||
except: | ||||
content = error.wrap_exception() | ||||
# build a fake message: | ||||
MinRK
|
r8199 | msg = self.session.msg('apply_reply', content=content) | ||
msg['parent_header']['msg_id'] = msg_id | ||||
msg['metadata']['engine'] = uuid | ||||
MinRK
|
r3651 | self._handle_apply_reply(msg) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _handle_execute_reply(self, msg): | ||
MinRK
|
r3598 | """Save the reply to an execute_request into our results. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | execute messages are never actually used. apply is used instead. | ||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
MinRK
|
r6793 | |||
content = msg['content'] | ||||
header = msg['header'] | ||||
# construct metadata: | ||||
md = self.metadata[msg_id] | ||||
MinRK
|
r7957 | md.update(self._extract_metadata(msg)) | ||
MinRK
|
r6793 | # is this redundant? | ||
self.metadata[msg_id] = md | ||||
e_outstanding = self._outstanding_dict[md['engine_uuid']] | ||||
if msg_id in e_outstanding: | ||||
e_outstanding.remove(msg_id) | ||||
# construct result: | ||||
if content['status'] == 'ok': | ||||
MinRK
|
r6819 | self.results[msg_id] = ExecuteReply(msg_id, content, md) | ||
MinRK
|
r6793 | elif content['status'] == 'aborted': | ||
self.results[msg_id] = error.TaskAborted(msg_id) | ||||
elif content['status'] == 'resubmitted': | ||||
# TODO: handle resubmission | ||||
pass | ||||
else: | ||||
self.results[msg_id] = self._unwrap_exception(content) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _handle_apply_reply(self, msg): | ||
MinRK
|
r3555 | """Save the reply to an apply_request into our results.""" | ||
MinRK
|
r4008 | parent = msg['parent_header'] | ||
MinRK
|
r3539 | msg_id = parent['msg_id'] | ||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
print self.results[msg_id] | ||||
print msg | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
content = msg['content'] | ||||
MinRK
|
r4008 | header = msg['header'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | # construct metadata: | ||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r7957 | md.update(self._extract_metadata(msg)) | ||
MinRK
|
r3651 | # is this redundant? | ||
MinRK
|
r3602 | self.metadata[msg_id] = md | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | ||
if msg_id in e_outstanding: | ||||
e_outstanding.remove(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | # construct result: | ||
MinRK
|
r3539 | if content['status'] == 'ok': | ||
MinRK
|
r8103 | self.results[msg_id] = serialize.unserialize_object(msg['buffers'])[0] | ||
MinRK
|
r3540 | elif content['status'] == 'aborted': | ||
MinRK
|
r3664 | self.results[msg_id] = error.TaskAborted(msg_id) | ||
MinRK
|
r3540 | elif content['status'] == 'resubmitted': | ||
MinRK
|
r3559 | # TODO: handle resubmission | ||
pass | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3639 | self.results[msg_id] = self._unwrap_exception(content) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _flush_notifications(self): | ||
MinRK
|
r3555 | """Flush notifications of engine registrations waiting | ||
in ZMQ queue.""" | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3539 | handler = self._notification_handlers.get(msg_type, None) | ||
if handler is None: | ||||
David Hirschfeld
|
r9029 | raise Exception("Unhandled message type: %s" % msg_type) | ||
MinRK
|
r3539 | else: | ||
handler(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _flush_results(self, sock): | ||
MinRK
|
r3555 | """Flush task or queue results waiting in ZMQ queue.""" | ||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3539 | handler = self._queue_handlers.get(msg_type, None) | ||
if handler is None: | ||||
David Hirschfeld
|
r9029 | raise Exception("Unhandled message type: %s" % msg_type) | ||
MinRK
|
r3539 | else: | ||
handler(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3540 | def _flush_control(self, sock): | ||
MinRK
|
r3555 | """Flush replies from the control channel waiting | ||
MinRK
|
r3559 | in the ZMQ queue. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3559 | Currently: ignore them.""" | ||
MinRK
|
r3664 | if self._ignored_control_replies <= 0: | ||
return | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3540 | while msg is not None: | ||
MinRK
|
r3664 | self._ignored_control_replies -= 1 | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def _flush_ignored_control(self): | ||
"""flush ignored control replies""" | ||||
while self._ignored_control_replies > 0: | ||||
self.session.recv(self._control_socket) | ||||
self._ignored_control_replies -= 1 | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def _flush_ignored_hub_replies(self): | ||
MinRK
|
r4006 | ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3664 | while msg is not None: | ||
MinRK
|
r4006 | ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | def _flush_iopub(self, sock): | ||
"""Flush replies from the iopub channel waiting | ||||
in the ZMQ queue. | ||||
""" | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3602 | while msg is not None: | ||
if self.debug: | ||||
pprint(msg) | ||||
parent = msg['parent_header'] | ||||
MinRK
|
r6091 | # ignore IOPub messages with no parent. | ||
# Caused by print statements or warnings from before the first execution. | ||||
if not parent: | ||||
MinRK
|
r10565 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r6091 | continue | ||
MinRK
|
r3602 | msg_id = parent['msg_id'] | ||
content = msg['content'] | ||||
header = msg['header'] | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | # init metadata: | ||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | if msg_type == 'stream': | ||
name = content['name'] | ||||
s = md[name] or '' | ||||
md[name] = s + content['data'] | ||||
elif msg_type == 'pyerr': | ||||
MinRK
|
r3639 | md.update({'pyerr' : self._unwrap_exception(content)}) | ||
MinRK
|
r3684 | elif msg_type == 'pyin': | ||
md.update({'pyin' : content['code']}) | ||||
MinRK
|
r6812 | elif msg_type == 'display_data': | ||
MinRK
|
r6891 | md['outputs'].append(content) | ||
MinRK
|
r6812 | elif msg_type == 'pyout': | ||
MinRK
|
r6891 | md['pyout'] = content | ||
MinRK
|
r8103 | elif msg_type == 'data_message': | ||
data, remainder = serialize.unserialize_object(msg['buffers']) | ||||
md['data'].update(data) | ||||
MinRK
|
r7494 | elif msg_type == 'status': | ||
# idle message comes after all outputs | ||||
if content['execution_state'] == 'idle': | ||||
md['outputs_ready'] = True | ||||
MinRK
|
r3602 | else: | ||
MinRK
|
r6812 | # unhandled msg_type (status, etc.) | ||
pass | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | # reduntant? | ||
MinRK
|
r3602 | self.metadata[msg_id] = md | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # len, getitem | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3636 | def __len__(self): | ||
"""len(client) returns # of engines.""" | ||||
return len(self.ids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def __getitem__(self, key): | ||
MinRK
|
r3635 | """index access returns DirectView multiplexer objects | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | Must be int, slice, or list/tuple/xrange of ints""" | ||
if not isinstance(key, (int, slice, tuple, list, xrange)): | ||||
raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3665 | return self.direct_view(key) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Begin public methods | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @property | ||
def ids(self): | ||||
"""Always up-to-date ids property.""" | ||||
self._flush_notifications() | ||||
# always copy: | ||||
return list(self._ids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7476 | def activate(self, targets='all', suffix=''): | ||
"""Create a DirectView and register it with IPython magics | ||||
Defines the magics `%px, %autopx, %pxresult, %%px` | ||||
Parameters | ||||
---------- | ||||
targets: int, list of ints, or 'all' | ||||
The engines on which the view's magics will run | ||||
suffix: str [default: ''] | ||||
The suffix, if any, for the magics. This allows you to have | ||||
multiple views associated with parallel magics at the same time. | ||||
e.g. ``rc.activate(targets=0, suffix='0')`` will give you | ||||
the magics ``%px0``, ``%pxresult0``, etc. for running magics just | ||||
on engine 0. | ||||
""" | ||||
view = self.direct_view(targets) | ||||
view.block = True | ||||
view.activate(suffix) | ||||
return view | ||||
MinRK
|
r3664 | def close(self): | ||
if self._closed: | ||||
return | ||||
MinRK
|
r6484 | self.stop_spin_thread() | ||
MinRK
|
r3664 | snames = filter(lambda n: n.endswith('socket'), dir(self)) | ||
for socket in map(lambda name: getattr(self, name), snames): | ||||
if isinstance(socket, zmq.Socket) and not socket.closed: | ||||
socket.close() | ||||
self._closed = True | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6484 | def _spin_every(self, interval=1): | ||
"""target func for use in spin_thread""" | ||||
while True: | ||||
if self._stop_spinning.is_set(): | ||||
return | ||||
time.sleep(interval) | ||||
self.spin() | ||||
def spin_thread(self, interval=1): | ||||
"""call Client.spin() in a background thread on some regular interval | ||||
This helps ensure that messages don't pile up too much in the zmq queue | ||||
while you are working on other things, or just leaving an idle terminal. | ||||
It also helps limit potential padding of the `received` timestamp | ||||
on AsyncResult objects, used for timings. | ||||
Parameters | ||||
---------- | ||||
interval : float, optional | ||||
The interval on which to spin the client in the background thread | ||||
(simply passed to time.sleep). | ||||
Notes | ||||
----- | ||||
For precision timing, you may want to use this method to put a bound | ||||
on the jitter (in seconds) in `received` timestamps used | ||||
in AsyncResult.wall_time. | ||||
""" | ||||
if self._spin_thread is not None: | ||||
self.stop_spin_thread() | ||||
self._stop_spinning.clear() | ||||
self._spin_thread = Thread(target=self._spin_every, args=(interval,)) | ||||
self._spin_thread.daemon = True | ||||
self._spin_thread.start() | ||||
def stop_spin_thread(self): | ||||
"""stop background spin_thread, if any""" | ||||
if self._spin_thread is not None: | ||||
self._stop_spinning.set() | ||||
self._spin_thread.join() | ||||
self._spin_thread = None | ||||
MinRK
|
r3539 | def spin(self): | ||
MinRK
|
r3555 | """Flush any registration notifications and execution results | ||
waiting in the ZMQ queue. | ||||
""" | ||||
if self._notification_socket: | ||||
MinRK
|
r3539 | self._flush_notifications() | ||
MinRK
|
r6819 | if self._iopub_socket: | ||
self._flush_iopub(self._iopub_socket) | ||||
MinRK
|
r3664 | if self._mux_socket: | ||
self._flush_results(self._mux_socket) | ||||
if self._task_socket: | ||||
self._flush_results(self._task_socket) | ||||
MinRK
|
r3555 | if self._control_socket: | ||
self._flush_control(self._control_socket) | ||||
MinRK
|
r3664 | if self._query_socket: | ||
self._flush_ignored_hub_replies() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def wait(self, jobs=None, timeout=-1): | ||
MinRK
|
r3639 | """waits on one or more `jobs`, for up to `timeout` seconds. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects | ||
MinRK
|
r3555 | ints are indices to self.history | ||
strs are msg_ids | ||||
default: wait on all outstanding messages | ||||
timeout : float | ||||
a time in seconds, after which to give up. | ||||
default is -1, which means no timeout | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | True : when all msg_ids are done | ||
False : timeout reached, some msg_ids still outstanding | ||||
""" | ||||
tic = time.time() | ||||
MinRK
|
r3639 | if jobs is None: | ||
MinRK
|
r3555 | theids = self.outstanding | ||
else: | ||||
MinRK
|
r4172 | if isinstance(jobs, (int, basestring, AsyncResult)): | ||
MinRK
|
r3639 | jobs = [jobs] | ||
MinRK
|
r3555 | theids = set() | ||
MinRK
|
r3639 | for job in jobs: | ||
if isinstance(job, int): | ||||
# index access | ||||
job = self.history[job] | ||||
elif isinstance(job, AsyncResult): | ||||
map(theids.add, job.msg_ids) | ||||
MinRK
|
r3590 | continue | ||
MinRK
|
r3639 | theids.add(job) | ||
MinRK
|
r3590 | if not theids.intersection(self.outstanding): | ||
return True | ||||
MinRK
|
r3555 | self.spin() | ||
while theids.intersection(self.outstanding): | ||||
if timeout >= 0 and ( time.time()-tic ) > timeout: | ||||
break | ||||
time.sleep(1e-3) | ||||
self.spin() | ||||
return len(theids.intersection(self.outstanding)) == 0 | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Control methods | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3540 | def clear(self, targets=None, block=None): | ||
MinRK
|
r3555 | """Clear the namespace in target(s).""" | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3559 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | ||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3774 | if block: | ||
MinRK
|
r3664 | self._flush_ignored_control() | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3654 | raise error | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def abort(self, jobs=None, targets=None, block=None): | ||
"""Abort specific jobs from the execution queues of target(s). | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | This is a mechanism to prevent jobs that have already been submitted | ||
from executing. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | jobs : msg_id, list of msg_ids, or AsyncResult | ||
The jobs to be aborted | ||||
MinRK
|
r5645 | |||
If unspecified/None: abort all outstanding jobs. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | """ | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r5645 | jobs = jobs if jobs is not None else list(self.outstanding) | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
MinRK
|
r5645 | |||
MinRK
|
r3639 | msg_ids = [] | ||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
MinRK
|
r3540 | content = dict(msg_ids=msg_ids) | ||
for t in targets: | ||||
Bernardo B. Marques
|
r4872 | self.session.send(self._control_socket, 'abort_request', | ||
MinRK
|
r3540 | content=content, ident=t) | ||
error = False | ||||
MinRK
|
r3774 | if block: | ||
MinRK
|
r3664 | self._flush_ignored_control() | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3654 | raise error | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r7481 | def shutdown(self, targets='all', restart=False, hub=False, block=None): | ||
"""Terminates one or more engine processes, optionally including the hub. | ||||
Parameters | ||||
---------- | ||||
targets: list of ints or 'all' [default: all] | ||||
Which engines to shutdown. | ||||
hub: bool [default: False] | ||||
Whether to include the Hub. hub=True implies targets='all'. | ||||
block: bool [default: self.block] | ||||
Whether to wait for clean shutdown replies or not. | ||||
restart: bool [default: False] | ||||
NOT IMPLEMENTED | ||||
whether to restart engines after shutting them down. | ||||
""" | ||||
David Hirschfeld
|
r9026 | from IPython.parallel.error import NoEnginesRegistered | ||
MinRK
|
r7481 | if restart: | ||
raise NotImplementedError("Engine restart is not yet implemented") | ||||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3664 | if hub: | ||
MinRK
|
r3580 | targets = 'all' | ||
David Hirschfeld
|
r9026 | try: | ||
targets = self._build_targets(targets)[0] | ||||
except NoEnginesRegistered: | ||||
targets = [] | ||||
MinRK
|
r3540 | for t in targets: | ||
Bernardo B. Marques
|
r4872 | self.session.send(self._control_socket, 'shutdown_request', | ||
MinRK
|
r3575 | content={'restart':restart},ident=t) | ||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3664 | if block or hub: | ||
self._flush_ignored_control() | ||||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3664 | idents,msg = self.session.recv(self._control_socket, 0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | if hub: | ||
MinRK
|
r3580 | time.sleep(0.25) | ||
self.session.send(self._query_socket, 'shutdown_request') | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3584 | raise error | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3774 | # Execution related methods | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3583 | def _maybe_raise(self, result): | ||
"""wrapper for maybe raising an exception if apply failed.""" | ||||
if isinstance(result, error.RemoteError): | ||||
raise result | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3583 | return result | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7957 | def send_apply_request(self, socket, f, args=None, kwargs=None, metadata=None, track=False, | ||
MinRK
|
r3664 | ident=None): | ||
"""construct and send an apply message via a socket. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | This is the principal method with which all engine execution is performed by views. | ||
MinRK
|
r3555 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6868 | if self._closed: | ||
raise RuntimeError("Client cannot be used after its sockets have been closed") | ||||
MinRK
|
r3555 | # defaults: | ||
args = args if args is not None else [] | ||||
kwargs = kwargs if kwargs is not None else {} | ||||
MinRK
|
r7957 | metadata = metadata if metadata is not None else {} | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | # validate arguments | ||
MinRK
|
r5821 | if not callable(f) and not isinstance(f, Reference): | ||
MinRK
|
r3555 | raise TypeError("f must be callable, not %s"%type(f)) | ||
if not isinstance(args, (tuple, list)): | ||||
raise TypeError("args must be tuple or list, not %s"%type(args)) | ||||
if not isinstance(kwargs, dict): | ||||
raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | ||||
MinRK
|
r7957 | if not isinstance(metadata, dict): | ||
raise TypeError("metadata must be dict, not %s"%type(metadata)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r8103 | bufs = serialize.pack_apply_message(f, args, kwargs, | ||
MinRK
|
r8033 | buffer_threshold=self.session.buffer_threshold, | ||
item_threshold=self.session.item_threshold, | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, | ||
MinRK
|
r7957 | metadata=metadata, track=track) | ||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4230 | msg_id = msg['header']['msg_id'] | ||
MinRK
|
r3539 | self.outstanding.add(msg_id) | ||
MinRK
|
r3664 | if ident: | ||
# possibly routed to a specific engine | ||||
if isinstance(ident, list): | ||||
MinRK
|
r6793 | ident = ident[-1] | ||
if ident in self._engines.values(): | ||||
# save for later, in case of engine death | ||||
self._outstanding_dict[ident].add(msg_id) | ||||
self.history.append(msg_id) | ||||
self.metadata[msg_id]['submitted'] = datetime.now() | ||||
return msg | ||||
MinRK
|
r7957 | def send_execute_request(self, socket, code, silent=True, metadata=None, ident=None): | ||
MinRK
|
r6793 | """construct and send an execute request via a socket. | ||
""" | ||||
MinRK
|
r6868 | if self._closed: | ||
raise RuntimeError("Client cannot be used after its sockets have been closed") | ||||
MinRK
|
r6793 | # defaults: | ||
MinRK
|
r7957 | metadata = metadata if metadata is not None else {} | ||
MinRK
|
r6793 | |||
# validate arguments | ||||
if not isinstance(code, basestring): | ||||
raise TypeError("code must be text, not %s" % type(code)) | ||||
MinRK
|
r7957 | if not isinstance(metadata, dict): | ||
raise TypeError("metadata must be dict, not %s" % type(metadata)) | ||||
MinRK
|
r6793 | |||
content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) | ||||
msg = self.session.send(socket, "execute_request", content=content, ident=ident, | ||||
MinRK
|
r7957 | metadata=metadata) | ||
MinRK
|
r6793 | |||
msg_id = msg['header']['msg_id'] | ||||
self.outstanding.add(msg_id) | ||||
if ident: | ||||
# possibly routed to a specific engine | ||||
if isinstance(ident, list): | ||||
MinRK
|
r3664 | ident = ident[-1] | ||
if ident in self._engines.values(): | ||||
# save for later, in case of engine death | ||||
self._outstanding_dict[ident].add(msg_id) | ||||
MinRK
|
r3539 | self.history.append(msg_id) | ||
MinRK
|
r3651 | self.metadata[msg_id]['submitted'] = datetime.now() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | return msg | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # construct a View object | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def load_balanced_view(self, targets=None): | ||
"""construct a DirectView object. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | If no arguments are specified, create a LoadBalancedView | ||
using all engines. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | targets: list,slice,int,etc. [default: use all engines] | ||
The subset of engines across which to load-balance | ||||
""" | ||||
MinRK
|
r4492 | if targets == 'all': | ||
targets = None | ||||
MinRK
|
r3666 | if targets is not None: | ||
MinRK
|
r3665 | targets = self._build_targets(targets)[1] | ||
return LoadBalancedView(client=self, socket=self._task_socket, targets=targets) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def direct_view(self, targets='all'): | ||
"""construct a DirectView object. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5290 | If no targets are specified, create a DirectView using all engines. | ||
rc.direct_view('all') is distinguished from rc[:] in that 'all' will | ||||
evaluate the target engines at each execution, whereas rc[:] will connect to | ||||
all *current* engines, and that list will not change. | ||||
That is, 'all' will always use all engines, whereas rc[:] will not use | ||||
engines added after the DirectView is constructed. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | targets: list,slice,int,etc. [default: use all engines] | ||
The engines to use for the View | ||||
""" | ||||
MinRK
|
r3665 | single = isinstance(targets, int) | ||
MinRK
|
r4492 | # allow 'all' to be lazily evaluated at each execution | ||
if targets != 'all': | ||||
targets = self._build_targets(targets)[1] | ||||
MinRK
|
r3665 | if single: | ||
targets = targets[0] | ||||
return DirectView(client=self, socket=self._mux_socket, targets=targets) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Query methods | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def get_result(self, indices_or_msg_ids=None, block=None): | ||
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | If the client already has the results, no request to the Hub will be made. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | This is a convenient way to construct AsyncResult objects, which are wrappers | ||
that include metadata about execution, and allow for awaiting results that | ||||
were not submitted by this Client. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | It can also be a convenient way to retrieve the metadata associated with | ||
blocking execution, since it always retrieves | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Examples | ||
-------- | ||||
:: | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | In [10]: r = client.apply() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | indices_or_msg_ids : integer history index, str msg_id, or list of either | ||
The indices or msg_ids of indices to be retrieved | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | block : bool | ||
Whether to wait for the result to be done | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | AsyncResult | ||
A single AsyncResult object will always be returned. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | AsyncHubResult | ||
A subclass of AsyncResult that retrieves results from the Hub | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | """ | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3639 | if indices_or_msg_ids is None: | ||
indices_or_msg_ids = -1 | ||||
MinRK
|
r11416 | |||
single_result = False | ||||
MinRK
|
r3639 | if not isinstance(indices_or_msg_ids, (list,tuple)): | ||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
MinRK
|
r11416 | single_result = True | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | theids = [] | ||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
MinRK
|
r4172 | if not isinstance(id, basestring): | ||
MinRK
|
r3639 | raise TypeError("indices must be str or int, not %r"%id) | ||
theids.append(id) | ||||
Bernardo B. Marques
|
r4872 | |||
Jan Schulz
|
r8409 | local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) | ||
MinRK
|
r3639 | remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) | ||
MinRK
|
r11416 | |||
# given single msg_id initially, get_result shot get the result itself, | ||||
# not a length-one list | ||||
if single_result: | ||||
theids = theids[0] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if remote_ids: | ||
ar = AsyncHubResult(self, msg_ids=theids) | ||||
else: | ||||
ar = AsyncResult(self, msg_ids=theids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if block: | ||
ar.wait() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | return ar | ||
MinRK
|
r3874 | |||
@spin_first | ||||
MinRK
|
r7957 | def resubmit(self, indices_or_msg_ids=None, metadata=None, block=None): | ||
MinRK
|
r3874 | """Resubmit one or more tasks. | ||
in-flight tasks may not be resubmitted. | ||||
Parameters | ||||
---------- | ||||
indices_or_msg_ids : integer history index, str msg_id, or list of either | ||||
The indices or msg_ids of indices to be retrieved | ||||
block : bool | ||||
Whether to wait for the result to be done | ||||
Returns | ||||
------- | ||||
AsyncHubResult | ||||
A subclass of AsyncResult that retrieves results from the Hub | ||||
""" | ||||
block = self.block if block is None else block | ||||
if indices_or_msg_ids is None: | ||||
indices_or_msg_ids = -1 | ||||
if not isinstance(indices_or_msg_ids, (list,tuple)): | ||||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
theids = [] | ||||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
MinRK
|
r4172 | if not isinstance(id, basestring): | ||
MinRK
|
r3874 | raise TypeError("indices must be str or int, not %r"%id) | ||
theids.append(id) | ||||
content = dict(msg_ids = theids) | ||||
self.session.send(self._query_socket, 'resubmit_request', content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
MinRK
|
r6817 | mapping = content['resubmitted'] | ||
new_ids = [ mapping[msg_id] for msg_id in theids ] | ||||
MinRK
|
r3874 | |||
MinRK
|
r6817 | ar = AsyncHubResult(self, msg_ids=new_ids) | ||
MinRK
|
r3874 | |||
if block: | ||||
ar.wait() | ||||
return ar | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def result_status(self, msg_ids, status_only=True): | ||
"""Check on the status of the result(s) of the apply request with `msg_ids`. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | If status_only is False, then the actual results will be retrieved, else | ||
only the status of the results will be checked. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | msg_ids : list of msg_ids | ||
MinRK
|
r3555 | if int: | ||
Passed as index to self.history for convenience. | ||||
MinRK
|
r3639 | status_only : bool (default: True) | ||
MinRK
|
r3555 | if False: | ||
MinRK
|
r3639 | Retrieve the actual results of completed tasks. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | results : dict | ||
There will always be the keys 'pending' and 'completed', which will | ||||
MinRK
|
r3639 | be lists of msg_ids that are incomplete or complete. If `status_only` | ||
is False, then completed results will be keyed by their `msg_id`. | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3641 | if not isinstance(msg_ids, (list,tuple)): | ||
MinRK
|
r3651 | msg_ids = [msg_ids] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | theids = [] | ||
MinRK
|
r3641 | for msg_id in msg_ids: | ||
MinRK
|
r3539 | if isinstance(msg_id, int): | ||
msg_id = self.history[msg_id] | ||||
MinRK
|
r3639 | if not isinstance(msg_id, basestring): | ||
MinRK
|
r3555 | raise TypeError("msg_ids must be str, not %r"%msg_id) | ||
MinRK
|
r3539 | theids.append(msg_id) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | completed = [] | ||
local_results = {} | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3622 | # comment this block out to temporarily disable local shortcut: | ||
MinRK
|
r3639 | for msg_id in theids: | ||
MinRK
|
r3622 | if msg_id in self.results: | ||
completed.append(msg_id) | ||||
local_results[msg_id] = self.results[msg_id] | ||||
theids.remove(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3559 | if theids: # some not locally cached | ||
MinRK
|
r3555 | content = dict(msg_ids=theids, status_only=status_only) | ||
msg = self.session.send(self._query_socket, "result_request", content=content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3598 | buffers = msg['buffers'] | ||
MinRK
|
r3555 | else: | ||
content = dict(completed=[],pending=[]) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | content['completed'].extend(completed) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | if status_only: | ||
return content | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | failures = [] | ||
# load cached results into result: | ||||
content.update(local_results) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | # update cache with results: | ||
for msg_id in sorted(theids): | ||||
if msg_id in content['completed']: | ||||
rec = content[msg_id] | ||||
parent = rec['header'] | ||||
header = rec['result_header'] | ||||
rcontent = rec['result_content'] | ||||
MinRK
|
r3602 | iodict = rec['io'] | ||
MinRK
|
r3598 | if isinstance(rcontent, str): | ||
rcontent = self.session.unpack(rcontent) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r7957 | md_msg = dict( | ||
content=rcontent, | ||||
parent_header=parent, | ||||
header=header, | ||||
metadata=rec['result_metadata'], | ||||
) | ||||
md.update(self._extract_metadata(md_msg)) | ||||
MinRK
|
r6469 | if rec.get('received'): | ||
md['received'] = rec['received'] | ||||
MinRK
|
r3602 | md.update(iodict) | ||
MinRK
|
r7508 | |||
MinRK
|
r3598 | if rcontent['status'] == 'ok': | ||
MinRK
|
r7508 | if header['msg_type'] == 'apply_reply': | ||
MinRK
|
r8103 | res,buffers = serialize.unserialize_object(buffers) | ||
MinRK
|
r7508 | elif header['msg_type'] == 'execute_reply': | ||
res = ExecuteReply(msg_id, rcontent, md) | ||||
else: | ||||
David Hirschfeld
|
r9027 | raise KeyError("unhandled msg type: %r" % header['msg_type']) | ||
MinRK
|
r3598 | else: | ||
MinRK
|
r3639 | res = self._unwrap_exception(rcontent) | ||
MinRK
|
r3598 | failures.append(res) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | self.results[msg_id] = res | ||
content[msg_id] = res | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if len(theids) == 1 and failures: | ||
MinRK
|
r7508 | raise failures[0] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | error.collect_exceptions(failures, "result_status") | ||
MinRK
|
r3555 | return content | ||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3635 | def queue_status(self, targets='all', verbose=False): | ||
MinRK
|
r3555 | """Fetch the status of engine queues. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r3635 | the engines whose states are to be queried. | ||
MinRK
|
r3555 | default : all | ||
verbose : bool | ||||
MinRK
|
r3584 | Whether to return lengths only, or lists of ids for each element | ||
MinRK
|
r3555 | """ | ||
MinRK
|
r6093 | if targets == 'all': | ||
# allow 'all' to be evaluated on the engine | ||||
engine_ids = None | ||||
else: | ||||
engine_ids = self._build_targets(targets)[1] | ||||
MinRK
|
r3664 | content = dict(targets=engine_ids, verbose=verbose) | ||
MinRK
|
r3555 | self.session.send(self._query_socket, "queue_request", content=content) | ||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3555 | content = msg['content'] | ||
status = content.pop('status') | ||||
if status != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r4036 | content = rekey(content) | ||
MinRK
|
r3664 | if isinstance(targets, int): | ||
return content[targets] | ||||
else: | ||||
return content | ||||
Bernardo B. Marques
|
r4872 | |||
Jan Schulz
|
r8409 | def _build_msgids_from_target(self, targets=None): | ||
"""Build a list of msg_ids from the list of engine targets""" | ||||
Jan Schulz
|
r8415 | if not targets: # needed as _build_targets otherwise uses all engines | ||
Jan Schulz
|
r8409 | return [] | ||
target_ids = self._build_targets(targets)[0] | ||||
return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata) | ||||
def _build_msgids_from_jobs(self, jobs=None): | ||||
"""Build a list of msg_ids from "jobs" """ | ||||
Jan Schulz
|
r8415 | if not jobs: | ||
return [] | ||||
Jan Schulz
|
r8409 | msg_ids = [] | ||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
return msg_ids | ||||
def purge_local_results(self, jobs=[], targets=[]): | ||||
"""Clears the client caches of results and frees such memory. | ||||
Individual results can be purged by msg_id, or the entire | ||||
history of specific targets can be purged. | ||||
Use `purge_local_results('all')` to scrub everything from the Clients's db. | ||||
The client must have no outstanding tasks before purging the caches. | ||||
Raises `AssertionError` if there are still outstanding tasks. | ||||
After this call all `AsyncResults` are invalid and should be discarded. | ||||
If you must "reget" the results, you can still do so by using | ||||
`client.get_result(msg_id)` or `client.get_result(asyncresult)`. This will | ||||
redownload the results from the hub if they are still available | ||||
(i.e `client.purge_hub_results(...)` has not been called. | ||||
Parameters | ||||
---------- | ||||
jobs : str or list of str or AsyncResult objects | ||||
the msg_ids whose results should be purged. | ||||
targets : int/str/list of ints/strs | ||||
The targets, by int_id, whose entire results are to be purged. | ||||
default : None | ||||
""" | ||||
assert not self.outstanding, "Can't purge a client with outstanding tasks!" | ||||
if not targets and not jobs: | ||||
raise ValueError("Must specify at least one of `targets` and `jobs`") | ||||
if jobs == 'all': | ||||
self.results.clear() | ||||
self.metadata.clear() | ||||
return | ||||
else: | ||||
msg_ids = [] | ||||
msg_ids.extend(self._build_msgids_from_target(targets)) | ||||
msg_ids.extend(self._build_msgids_from_jobs(jobs)) | ||||
map(self.results.pop, msg_ids) | ||||
map(self.metadata.pop, msg_ids) | ||||
MinRK
|
r3664 | @spin_first | ||
Jan Schulz
|
r8409 | def purge_hub_results(self, jobs=[], targets=[]): | ||
MinRK
|
r3664 | """Tell the Hub to forget results. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Individual results can be purged by msg_id, or the entire | ||
MinRK
|
r3584 | history of specific targets can be purged. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4146 | Use `purge_results('all')` to scrub everything from the Hub's db. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | jobs : str or list of str or AsyncResult objects | ||
MinRK
|
r3584 | the msg_ids whose results should be forgotten. | ||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r4146 | The targets, by int_id, whose entire history is to be purged. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | default : None | ||
""" | ||||
MinRK
|
r3639 | if not targets and not jobs: | ||
raise ValueError("Must specify at least one of `targets` and `jobs`") | ||||
MinRK
|
r3555 | if targets: | ||
targets = self._build_targets(targets)[1] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | # construct msg_ids from jobs | ||
MinRK
|
r4146 | if jobs == 'all': | ||
msg_ids = jobs | ||||
else: | ||||
Jan Schulz
|
r8409 | msg_ids = self._build_msgids_from_jobs(jobs) | ||
MinRK
|
r4146 | |||
content = dict(engine_ids=targets, msg_ids=msg_ids) | ||||
MinRK
|
r3555 | self.session.send(self._query_socket, "purge_request", content=content) | ||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3548 | |||
Jan Schulz
|
r8409 | def purge_results(self, jobs=[], targets=[]): | ||
"""Clears the cached results from both the hub and the local client | ||||
Individual results can be purged by msg_id, or the entire | ||||
history of specific targets can be purged. | ||||
Use `purge_results('all')` to scrub every cached result from both the Hub's and | ||||
the Client's db. | ||||
Equivalent to calling both `purge_hub_results()` and `purge_client_results()` with | ||||
the same arguments. | ||||
Parameters | ||||
---------- | ||||
jobs : str or list of str or AsyncResult objects | ||||
the msg_ids whose results should be forgotten. | ||||
targets : int/str/list of ints/strs | ||||
The targets, by int_id, whose entire history is to be purged. | ||||
default : None | ||||
""" | ||||
self.purge_local_results(jobs=jobs, targets=targets) | ||||
self.purge_hub_results(jobs=jobs, targets=targets) | ||||
def purge_everything(self): | ||||
"""Clears all content from previous Tasks from both the hub and the local client | ||||
In addition to calling `purge_results("all")` it also deletes the history and | ||||
other bookkeeping lists. | ||||
""" | ||||
self.purge_results("all") | ||||
self.history = [] | ||||
self.session.digest_history.clear() | ||||
MinRK
|
r3780 | @spin_first | ||
def hub_history(self): | ||||
"""Get the Hub's history | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Just like the Client, the Hub has a history, which is a list of msg_ids. | ||
This will contain the history of all clients, and, depending on configuration, | ||||
may contain history across multiple cluster sessions. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Any msg_id returned here is a valid argument to `get_result`. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | msg_ids : list of strs | ||
list of all msg_ids, ordered by task submission time. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | self.session.send(self._query_socket, "history_request", content={}) | ||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | if self.debug: | ||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
else: | ||||
return content['history'] | ||||
@spin_first | ||||
def db_query(self, query, keys=None): | ||||
"""Query the Hub's TaskRecord database | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | This will return a list of task record dicts that match `query` | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | query : mongodb query dict | ||
The search dict. See mongodb query docs for details. | ||||
keys : list of strs [optional] | ||||
MinRK
|
r3876 | The subset of keys to be returned. The default is to fetch everything but buffers. | ||
MinRK
|
r3780 | 'msg_id' will *always* be included. | ||
""" | ||||
MinRK
|
r3876 | if isinstance(keys, basestring): | ||
keys = [keys] | ||||
MinRK
|
r3780 | content = dict(query=query, keys=keys) | ||
self.session.send(self._query_socket, "db_request", content=content) | ||||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | records = content['records'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | buffer_lens = content['buffer_lens'] | ||
result_buffer_lens = content['result_buffer_lens'] | ||||
buffers = msg['buffers'] | ||||
has_bufs = buffer_lens is not None | ||||
has_rbufs = result_buffer_lens is not None | ||||
for i,rec in enumerate(records): | ||||
# relink buffers | ||||
if has_bufs: | ||||
blen = buffer_lens[i] | ||||
rec['buffers'], buffers = buffers[:blen],buffers[blen:] | ||||
if has_rbufs: | ||||
blen = result_buffer_lens[i] | ||||
rec['result_buffers'], buffers = buffers[:blen],buffers[blen:] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | return records | ||
MinRK
|
r3587 | |||
MinRK
|
r3666 | __all__ = [ 'Client' ] | ||