client.py
1628 lines
| 58.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """A semi-synchronous Client for the ZMQ cluster | ||
Authors: | ||||
* MinRK | ||||
""" | ||||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3551 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3575 | import os | ||
MinRK
|
r3631 | import json | ||
MinRK
|
r4155 | import sys | ||
MinRK
|
r6484 | from threading import Thread, Event | ||
MinRK
|
r3551 | import time | ||
MinRK
|
r3631 | import warnings | ||
from datetime import datetime | ||||
MinRK
|
r3581 | from getpass import getpass | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3631 | |||
MinRK
|
r3614 | pjoin = os.path.join | ||
MinRK
|
r3540 | |||
MinRK
|
r3551 | import zmq | ||
MinRK
|
r3635 | # from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r3551 | |||
MinRK
|
r4071 | from IPython.config.configurable import MultipleInstanceError | ||
from IPython.core.application import BaseIPythonApplication | ||||
MinRK
|
r4036 | from IPython.utils.jsonutil import rekey | ||
MinRK
|
r4108 | from IPython.utils.localinterfaces import LOCAL_IPS | ||
MinRK
|
r3614 | from IPython.utils.path import get_ipython_dir | ||
MinRK
|
r6813 | from IPython.utils.py3compat import cast_bytes | ||
MinRK
|
r5344 | from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode, | ||
MinRK
|
r6484 | Dict, List, Bool, Set, Any) | ||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
MinRK
|
r3619 | from IPython.external.ssh import tunnel | ||
MinRK
|
r3539 | |||
MinRK
|
r5821 | from IPython.parallel import Reference | ||
MinRK
|
r3673 | from IPython.parallel import error | ||
from IPython.parallel import util | ||||
MinRK
|
r4006 | from IPython.zmq.session import Session, Message | ||
MinRK
|
r3666 | from .asyncresult import AsyncResult, AsyncHubResult | ||
MinRK
|
r4024 | from IPython.core.profiledir import ProfileDir, ProfileDirError | ||
MinRK
|
r3642 | from .view import DirectView, LoadBalancedView | ||
MinRK
|
r3539 | |||
MinRK
|
r4155 | if sys.version_info[0] >= 3: | ||
MinRK
|
r4161 | # xrange is used in a couple 'isinstance' tests in py2 | ||
MinRK
|
r4155 | # should be just 'range' in 3k | ||
xrange = range | ||||
MinRK
|
r3584 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3555 | # Decorators for Client methods | ||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @decorator | ||
MinRK
|
r3664 | def spin_first(f, self, *args, **kwargs): | ||
MinRK
|
r3555 | """Call spin() to sync state prior to calling the method.""" | ||
MinRK
|
r3539 | self.spin() | ||
return f(self, *args, **kwargs) | ||||
MinRK
|
r3598 | |||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r6819 | |||
class ExecuteReply(object): | ||||
"""wrapper for finished Execute results""" | ||||
def __init__(self, msg_id, content, metadata): | ||||
self.msg_id = msg_id | ||||
self._content = content | ||||
self.execution_count = content['execution_count'] | ||||
self.metadata = metadata | ||||
def __getitem__(self, key): | ||||
return self.metadata[key] | ||||
def __getattr__(self, key): | ||||
if key not in self.metadata: | ||||
raise AttributeError(key) | ||||
return self.metadata[key] | ||||
def __repr__(self): | ||||
pyout = self.metadata['pyout'] or {} | ||||
MinRK
|
r6891 | text_out = pyout.get('data', {}).get('text/plain', '') | ||
MinRK
|
r6819 | if len(text_out) > 32: | ||
text_out = text_out[:29] + '...' | ||||
return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out) | ||||
def _repr_html_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("text/html") | ||||
MinRK
|
r6819 | |||
def _repr_latex_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("text/latex") | ||||
MinRK
|
r6819 | |||
def _repr_json_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("application/json") | ||||
MinRK
|
r6819 | |||
def _repr_javascript_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("application/javascript") | ||||
MinRK
|
r6819 | |||
def _repr_png_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("image/png") | ||||
MinRK
|
r6819 | |||
def _repr_jpeg_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("image/jpeg") | ||||
MinRK
|
r6819 | |||
def _repr_svg_(self): | ||||
MinRK
|
r6891 | pyout = self.metadata['pyout'] or {'data':{}} | ||
return pyout['data'].get("image/svg+xml") | ||||
MinRK
|
r6819 | |||
MinRK
|
r3602 | class Metadata(dict): | ||
MinRK
|
r3607 | """Subclass of dict for initializing metadata values. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | Attribute access works on keys. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | These objects have a strict set of keys - errors will raise if you try | ||
to add new keys. | ||||
""" | ||||
MinRK
|
r3602 | def __init__(self, *args, **kwargs): | ||
dict.__init__(self) | ||||
md = {'msg_id' : None, | ||||
'submitted' : None, | ||||
'started' : None, | ||||
'completed' : None, | ||||
'received' : None, | ||||
'engine_uuid' : None, | ||||
'engine_id' : None, | ||||
'follow' : None, | ||||
'after' : None, | ||||
'status' : None, | ||||
'pyin' : None, | ||||
'pyout' : None, | ||||
'pyerr' : None, | ||||
'stdout' : '', | ||||
'stderr' : '', | ||||
MinRK
|
r6812 | 'outputs' : [], | ||
MinRK
|
r3602 | } | ||
self.update(md) | ||||
self.update(dict(*args, **kwargs)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | def __getattr__(self, key): | ||
"""getattr aliased to getitem""" | ||||
if key in self.iterkeys(): | ||||
return self[key] | ||||
else: | ||||
raise AttributeError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3607 | def __setattr__(self, key, value): | ||
"""setattr aliased to setitem, with strict""" | ||||
if key in self.iterkeys(): | ||||
self[key] = value | ||||
else: | ||||
raise AttributeError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | def __setitem__(self, key, value): | ||
"""strict static key enforcement""" | ||||
if key in self.iterkeys(): | ||||
dict.__setitem__(self, key, value) | ||||
else: | ||||
raise KeyError(key) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | |||
MinRK
|
r3636 | class Client(HasTraits): | ||
MinRK
|
r3664 | """A semi-synchronous client to the IPython ZMQ cluster | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json | ||
MinRK
|
r3619 | Connection information for the Hub's registration. If a json connector | ||
file is given, then likely no further configuration is necessary. | ||||
MinRK
|
r3620 | [Default: use profile] | ||
profile : bytes | ||||
The name of the Cluster profile to be used to find connector information. | ||||
MinRK
|
r4071 | If run from an IPython application, the default profile will be the same | ||
as the running application, otherwise it will be 'default'. | ||||
Min RK
|
r3572 | context : zmq.Context | ||
MinRK
|
r3620 | Pass an existing zmq.Context instance, otherwise the client will create its own. | ||
Min RK
|
r3572 | debug : bool | ||
flag for lots of message printing for debug purposes | ||||
Bernardo B. Marques
|
r4872 | timeout : int/float | ||
MinRK
|
r4013 | time (in seconds) to wait for connection replies from the Hub | ||
[Default: 10] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4013 | #-------------- session related args ---------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4013 | config : Config object | ||
If specified, this will be relayed to the Session for configuration | ||||
username : str | ||||
set username for the session object | ||||
packer : str (import_string) or callable | ||||
Can be either the simple keyword 'json' or 'pickle', or an import_string to a | ||||
function to serialize messages. Must support same input as | ||||
JSON, and output must be bytes. | ||||
You can pass a callable directly as `pack` | ||||
unpacker : str (import_string) or callable | ||||
The inverse of packer. Only necessary if packer is specified as *not* one | ||||
of 'json' or 'pickle'. | ||||
Bernardo B. Marques
|
r4872 | |||
Min RK
|
r3572 | #-------------- ssh related args ---------------- | ||
# These are args for configuring the ssh tunnel to be used | ||||
# credentials are used to forward connections over ssh to the Controller | ||||
# Note that the ip given in `addr` needs to be relative to sshserver | ||||
# The most basic case is to leave addr as pointing to localhost (127.0.0.1), | ||||
Bernardo B. Marques
|
r4872 | # and set sshserver as the same machine the Controller is on. However, | ||
Min RK
|
r3572 | # the only requirement is that sshserver is able to see the Controller | ||
# (i.e. is within the same trusted network). | ||||
Bernardo B. Marques
|
r4872 | |||
Min RK
|
r3572 | sshserver : str | ||
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port' | ||||
If keyfile or password is specified, and this is not, it will default to | ||||
the ip given in addr. | ||||
MinRK
|
r4589 | sshkey : str; path to ssh private key file | ||
Min RK
|
r3572 | This specifies a key to be used in ssh login, default None. | ||
Regular default ssh keys will be used without specifying this argument. | ||||
Bernardo B. Marques
|
r4872 | password : str | ||
Min RK
|
r3572 | Your ssh password to sshserver. Note that if this is left None, | ||
you will be prompted for it if passwordless key based login is unavailable. | ||||
MinRK
|
r3619 | paramiko : bool | ||
flag for whether to use paramiko instead of shell ssh for tunneling. | ||||
[default: True on win32, False else] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | ------- exec authentication args ------- | ||
If even localhost is untrusted, you can have some protection against | ||||
Bernardo B. Marques
|
r4872 | unauthorized execution by signing messages with HMAC digests. | ||
Messages are still sent as cleartext, so if someone can snoop your | ||||
MinRK
|
r4013 | loopback traffic this will not protect your privacy, but will prevent | ||
unauthorized execution. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3575 | exec_key : str | ||
an authentication key or file containing a key | ||||
default: None | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Attributes | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | ids : list of int engine IDs | ||
MinRK
|
r3539 | requesting the ids attribute always synchronizes | ||
the registration state. To request ids without synchronization, | ||||
MinRK
|
r3575 | use semi-private _ids attributes. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | history : list of msg_ids | ||
a list of msg_ids, keeping track of all the execution | ||||
MinRK
|
r3555 | messages you have submitted in order. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | outstanding : set of msg_ids | ||
a set of msg_ids that have been submitted, but whose | ||||
MinRK
|
r3555 | results have not yet been received. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | results : dict | ||
a dict of all our results, keyed by msg_id | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | block : bool | ||
determines default behavior when block not specified | ||||
in execution methods | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Methods | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | spin | ||
flushes incoming results and registration state changes | ||||
control methods spin, and requesting `ids` also ensures up to date | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | wait | ||
MinRK
|
r3635 | wait on one or more msg_ids | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | execution methods | ||
apply | ||||
MinRK
|
r3539 | legacy: execute, run | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | data movement | ||
push, pull, scatter, gather | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | query methods | ||
MinRK
|
r3664 | queue_status, get_result, purge, result_status | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | control methods | ||
abort, shutdown | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3636 | block = Bool(False) | ||
MinRK
|
r3651 | outstanding = Set() | ||
results = Instance('collections.defaultdict', (dict,)) | ||||
metadata = Instance('collections.defaultdict', (Metadata,)) | ||||
MinRK
|
r3636 | history = List() | ||
debug = Bool(False) | ||||
MinRK
|
r6484 | _spin_thread = Any() | ||
_stop_spinning = Any() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | profile=Unicode() | ||
def _profile_default(self): | ||||
if BaseIPythonApplication.initialized(): | ||||
# an IPython app *might* be running, try to get its profile | ||||
try: | ||||
return BaseIPythonApplication.instance().profile | ||||
except (AttributeError, MultipleInstanceError): | ||||
# could be a *different* subclass of config.Application, | ||||
# which would raise one of these two errors. | ||||
return u'default' | ||||
else: | ||||
return u'default' | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | _outstanding_dict = Instance('collections.defaultdict', (set,)) | ||
MinRK
|
r3636 | _ids = List() | ||
_connected=Bool(False) | ||||
_ssh=Bool(False) | ||||
_context = Instance('zmq.Context') | ||||
_config = Dict() | ||||
MinRK
|
r3664 | _engines=Instance(util.ReverseDict, (), {}) | ||
MinRK
|
r3657 | # _hub_socket=Instance('zmq.Socket') | ||
MinRK
|
r3636 | _query_socket=Instance('zmq.Socket') | ||
_control_socket=Instance('zmq.Socket') | ||||
_iopub_socket=Instance('zmq.Socket') | ||||
_notification_socket=Instance('zmq.Socket') | ||||
MinRK
|
r3664 | _mux_socket=Instance('zmq.Socket') | ||
_task_socket=Instance('zmq.Socket') | ||||
MinRK
|
r3988 | _task_scheme=Unicode() | ||
MinRK
|
r3636 | _closed = False | ||
MinRK
|
r5344 | _ignored_control_replies=Integer(0) | ||
_ignored_hub_replies=Integer(0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | def __new__(self, *args, **kw): | ||
# don't raise on positional args | ||||
return HasTraits.__new__(self, **kw) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None, | ||
MinRK
|
r4013 | context=None, debug=False, exec_key=None, | ||
MinRK
|
r3575 | sshserver=None, sshkey=None, password=None, paramiko=None, | ||
MinRK
|
r4013 | timeout=10, **extra_args | ||
MinRK
|
r3614 | ): | ||
MinRK
|
r4071 | if profile: | ||
super(Client, self).__init__(debug=debug, profile=profile) | ||||
else: | ||||
super(Client, self).__init__(debug=debug) | ||||
MinRK
|
r3539 | if context is None: | ||
MinRK
|
r3661 | context = zmq.Context.instance() | ||
MinRK
|
r3636 | self._context = context | ||
MinRK
|
r6484 | self._stop_spinning = Event() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4071 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) | ||
MinRK
|
r3614 | if self._cd is not None: | ||
if url_or_file is None: | ||||
url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | ||||
MinRK
|
r6869 | if url_or_file is None: | ||
raise ValueError( | ||||
"I can't find enough information to connect to a hub!" | ||||
" Please specify at least one of url_or_file or profile." | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5079 | if not util.is_url(url_or_file): | ||
# it's not a url, try for a file | ||||
MinRK
|
r3614 | if not os.path.exists(url_or_file): | ||
if self._cd: | ||||
url_or_file = os.path.join(self._cd.security_dir, url_or_file) | ||||
MinRK
|
r6869 | if not os.path.exists(url_or_file): | ||
raise IOError("Connection file not found: %r" % url_or_file) | ||||
MinRK
|
r3614 | with open(url_or_file) as f: | ||
cfg = json.loads(f.read()) | ||||
else: | ||||
cfg = {'url':url_or_file} | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3614 | # sync defaults from args, json: | ||
if sshserver: | ||||
cfg['ssh'] = sshserver | ||||
if exec_key: | ||||
cfg['exec_key'] = exec_key | ||||
exec_key = cfg['exec_key'] | ||||
location = cfg.setdefault('location', None) | ||||
MinRK
|
r3664 | cfg['url'] = util.disambiguate_url(cfg['url'], location) | ||
MinRK
|
r3614 | url = cfg['url'] | ||
MinRK
|
r4119 | proto,addr,port = util.split_url(url) | ||
if location is not None and addr == '127.0.0.1': | ||||
# location specified, and connection is expected to be local | ||||
if location not in LOCAL_IPS and not sshserver: | ||||
# load ssh from JSON *only* if the controller is not on | ||||
# this machine | ||||
sshserver=cfg['ssh'] | ||||
if location not in LOCAL_IPS and not sshserver: | ||||
# warn if no ssh specified, but SSH is probably needed | ||||
# This is only a warning, because the most likely cause | ||||
# is a local Controller on a laptop whose IP is dynamic | ||||
MinRK
|
r4117 | warnings.warn(""" | ||
Controller appears to be listening on localhost, but not on this machine. | ||||
If this is true, you should specify Client(...,sshserver='you@%s') | ||||
or instruct your controller to listen on an external IP."""%location, | ||||
RuntimeWarning) | ||||
MinRK
|
r4207 | elif not sshserver: | ||
# otherwise sync with cfg | ||||
sshserver = cfg['ssh'] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3614 | self._config = cfg | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3575 | self._ssh = bool(sshserver or sshkey or password) | ||
Min RK
|
r3572 | if self._ssh and sshserver is None: | ||
MinRK
|
r3614 | # default to ssh via localhost | ||
sshserver = url.split('://')[1].split(':')[0] | ||||
Min RK
|
r3572 | if self._ssh and password is None: | ||
MinRK
|
r3575 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): | ||
Min RK
|
r3572 | password=False | ||
else: | ||||
password = getpass("SSH Password for %s: "%sshserver) | ||||
MinRK
|
r3575 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4013 | # configure and construct the session | ||
if exec_key is not None: | ||||
if os.path.isfile(exec_key): | ||||
extra_args['keyfile'] = exec_key | ||||
else: | ||||
MinRK
|
r6813 | exec_key = cast_bytes(exec_key) | ||
MinRK
|
r4013 | extra_args['key'] = exec_key | ||
self.session = Session(**extra_args) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4725 | self._query_socket = self._context.socket(zmq.DEALER) | ||
MinRK
|
r4770 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3657 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3657 | self._query_socket.connect(url) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3636 | self.session.debug = self.debug | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | self._notification_handlers = {'registration_notification' : self._register_engine, | ||
'unregistration_notification' : self._unregister_engine, | ||||
MinRK
|
r3664 | 'shutdown_notification' : lambda msg: self.close(), | ||
MinRK
|
r3539 | } | ||
self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | ||||
'apply_reply' : self._handle_apply_reply} | ||||
MinRK
|
r3664 | self._connect(sshserver, ssh_kwargs, timeout) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3654 | def __del__(self): | ||
"""cleanup sockets, but _not_ context.""" | ||||
self.close() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3992 | def _setup_profile_dir(self, profile, profile_dir, ipython_dir): | ||
MinRK
|
r3614 | if ipython_dir is None: | ||
ipython_dir = get_ipython_dir() | ||||
MinRK
|
r3992 | if profile_dir is not None: | ||
MinRK
|
r3614 | try: | ||
MinRK
|
r3992 | self._cd = ProfileDir.find_profile_dir(profile_dir) | ||
MinRK
|
r3641 | return | ||
MinRK
|
r3992 | except ProfileDirError: | ||
MinRK
|
r3614 | pass | ||
elif profile is not None: | ||||
try: | ||||
MinRK
|
r3992 | self._cd = ProfileDir.find_profile_dir_by_name( | ||
MinRK
|
r3614 | ipython_dir, profile) | ||
MinRK
|
r3641 | return | ||
MinRK
|
r3992 | except ProfileDirError: | ||
MinRK
|
r3614 | pass | ||
MinRK
|
r3641 | self._cd = None | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _update_engines(self, engines): | ||
MinRK
|
r3555 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | ||
MinRK
|
r3539 | for k,v in engines.iteritems(): | ||
eid = int(k) | ||||
MinRK
|
r4155 | self._engines[eid] = v | ||
MinRK
|
r3625 | self._ids.append(eid) | ||
self._ids = sorted(self._ids) | ||||
MinRK
|
r3622 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | ||
MinRK
|
r3664 | self._task_scheme == 'pure' and self._task_socket: | ||
MinRK
|
r3622 | self._stop_scheduling_tasks() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3622 | def _stop_scheduling_tasks(self): | ||
"""Stop scheduling tasks because an engine has been unregistered | ||||
from a pure ZMQ scheduler. | ||||
""" | ||||
MinRK
|
r3664 | self._task_socket.close() | ||
self._task_socket = None | ||||
MinRK
|
r3622 | msg = "An engine has been unregistered, and we are using pure " +\ | ||
"ZMQ task scheduling. Task farming will be disabled." | ||||
if self.outstanding: | ||||
msg += " If you were running tasks when this happened, " +\ | ||||
"some `outstanding` msg_ids may never resolve." | ||||
warnings.warn(msg, RuntimeWarning) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _build_targets(self, targets): | ||
MinRK
|
r3555 | """Turn valid target IDs or 'all' into two lists: | ||
(int_ids, uuids). | ||||
""" | ||||
MinRK
|
r3777 | if not self._ids: | ||
# flush notification socket if no engines yet, just in case | ||||
if not self.ids: | ||||
raise error.NoEnginesRegistered("Can't build targets without any engines") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | if targets is None: | ||
targets = self._ids | ||||
MinRK
|
r4172 | elif isinstance(targets, basestring): | ||
MinRK
|
r3539 | if targets.lower() == 'all': | ||
targets = self._ids | ||||
else: | ||||
raise TypeError("%r not valid str target, must be 'all'"%(targets)) | ||||
elif isinstance(targets, int): | ||||
MinRK
|
r3665 | if targets < 0: | ||
targets = self.ids[targets] | ||||
MinRK
|
r3777 | if targets not in self._ids: | ||
MinRK
|
r3665 | raise IndexError("No such engine: %i"%targets) | ||
MinRK
|
r3539 | targets = [targets] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3665 | if isinstance(targets, slice): | ||
indices = range(len(self._ids))[targets] | ||||
ids = self.ids | ||||
targets = [ ids[i] for i in indices ] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3665 | if not isinstance(targets, (tuple, list, xrange)): | ||
raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6813 | return [cast_bytes(self._engines[t]) for t in targets], list(targets) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def _connect(self, sshserver, ssh_kwargs, timeout): | ||
"""setup all our socket connections to the cluster. This is called from | ||||
MinRK
|
r3555 | __init__.""" | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3622 | # Maybe allow reconnecting? | ||
MinRK
|
r3539 | if self._connected: | ||
return | ||||
self._connected=True | ||||
Min RK
|
r3572 | |||
MinRK
|
r3614 | def connect_socket(s, url): | ||
MinRK
|
r3664 | url = util.disambiguate_url(url, self._config['location']) | ||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3614 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3614 | return s.connect(url) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3657 | self.session.send(self._query_socket, 'connection_request') | ||
MinRK
|
r4072 | # use Poller because zmq.select has wrong units in pyzmq 2.1.7 | ||
poller = zmq.Poller() | ||||
poller.register(self._query_socket, zmq.POLLIN) | ||||
# poll expects milliseconds, timeout is seconds | ||||
evts = poller.poll(timeout*1000) | ||||
if not evts: | ||||
MinRK
|
r3664 | raise error.TimeoutError("Hub connection request timed out") | ||
MinRK
|
r3657 | idents,msg = self.session.recv(self._query_socket,mode=0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r4006 | msg = Message(msg) | ||
MinRK
|
r3539 | content = msg.content | ||
MinRK
|
r3622 | self._config['registration'] = dict(content) | ||
MinRK
|
r3539 | if content.status == 'ok': | ||
MinRK
|
r4770 | ident = self.session.bsession | ||
MinRK
|
r3603 | if content.mux: | ||
MinRK
|
r4725 | self._mux_socket = self._context.socket(zmq.DEALER) | ||
MinRK
|
r4155 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) | ||
MinRK
|
r3664 | connect_socket(self._mux_socket, content.mux) | ||
MinRK
|
r3539 | if content.task: | ||
MinRK
|
r3622 | self._task_scheme, task_addr = content.task | ||
MinRK
|
r4725 | self._task_socket = self._context.socket(zmq.DEALER) | ||
MinRK
|
r4155 | self._task_socket.setsockopt(zmq.IDENTITY, ident) | ||
MinRK
|
r3664 | connect_socket(self._task_socket, task_addr) | ||
MinRK
|
r3539 | if content.notification: | ||
MinRK
|
r3636 | self._notification_socket = self._context.socket(zmq.SUB) | ||
Min RK
|
r3572 | connect_socket(self._notification_socket, content.notification) | ||
MinRK
|
r3657 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | ||
# if content.query: | ||||
MinRK
|
r4725 | # self._query_socket = self._context.socket(zmq.DEALER) | ||
MinRK
|
r4770 | # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) | ||
MinRK
|
r3657 | # connect_socket(self._query_socket, content.query) | ||
MinRK
|
r3540 | if content.control: | ||
MinRK
|
r4725 | self._control_socket = self._context.socket(zmq.DEALER) | ||
MinRK
|
r4155 | self._control_socket.setsockopt(zmq.IDENTITY, ident) | ||
Min RK
|
r3572 | connect_socket(self._control_socket, content.control) | ||
MinRK
|
r3602 | if content.iopub: | ||
MinRK
|
r3636 | self._iopub_socket = self._context.socket(zmq.SUB) | ||
MinRK
|
r3657 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | ||
MinRK
|
r4155 | self._iopub_socket.setsockopt(zmq.IDENTITY, ident) | ||
MinRK
|
r3602 | connect_socket(self._iopub_socket, content.iopub) | ||
MinRK
|
r3539 | self._update_engines(dict(content.engines)) | ||
else: | ||||
self._connected = False | ||||
raise Exception("Failed to connect!") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# handlers and callbacks for incoming messages | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | def _unwrap_exception(self, content): | ||
MinRK
|
r3664 | """unwrap exception, and remap engine_id to int.""" | ||
MinRK
|
r3644 | e = error.unwrap_exception(content) | ||
MinRK
|
r3658 | # print e.traceback | ||
MinRK
|
r3639 | if e.engine_info: | ||
MinRK
|
r3641 | e_uuid = e.engine_info['engine_uuid'] | ||
MinRK
|
r3639 | eid = self._engines[e_uuid] | ||
MinRK
|
r3641 | e.engine_info['engine_id'] = eid | ||
MinRK
|
r3639 | return e | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | def _extract_metadata(self, header, parent, content): | ||
MinRK
|
r3598 | md = {'msg_id' : parent['msg_id'], | ||
'received' : datetime.now(), | ||||
MinRK
|
r3607 | 'engine_uuid' : header.get('engine', None), | ||
MinRK
|
r3618 | 'follow' : parent.get('follow', []), | ||
'after' : parent.get('after', []), | ||||
MinRK
|
r3602 | 'status' : content['status'], | ||
} | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | if md['engine_uuid'] is not None: | ||
md['engine_id'] = self._engines.get(md['engine_uuid'], None) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | if 'date' in parent: | ||
MinRK
|
r4006 | md['submitted'] = parent['date'] | ||
MinRK
|
r3607 | if 'started' in header: | ||
MinRK
|
r4006 | md['started'] = header['started'] | ||
MinRK
|
r3607 | if 'date' in header: | ||
MinRK
|
r4006 | md['completed'] = header['date'] | ||
MinRK
|
r3598 | return md | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | def _register_engine(self, msg): | ||
"""Register a new engine, and update our connection info.""" | ||||
content = msg['content'] | ||||
eid = content['id'] | ||||
d = {eid : content['queue']} | ||||
self._update_engines(d) | ||||
def _unregister_engine(self, msg): | ||||
"""Unregister an engine that has died.""" | ||||
content = msg['content'] | ||||
eid = int(content['id']) | ||||
if eid in self._ids: | ||||
self._ids.remove(eid) | ||||
uuid = self._engines.pop(eid) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | self._handle_stranded_msgs(eid, uuid) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | if self._task_socket and self._task_scheme == 'pure': | ||
MinRK
|
r3651 | self._stop_scheduling_tasks() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | def _handle_stranded_msgs(self, eid, uuid): | ||
"""Handle messages known to be on an engine when the engine unregisters. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | It is possible that this will fire prematurely - that is, an engine will | ||
go down after completing a result, and the client will be notified | ||||
of the unregistration and later receive the successful result. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | outstanding = self._outstanding_dict[uuid] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | for msg_id in list(outstanding): | ||
if msg_id in self.results: | ||||
Bernardo B. Marques
|
r4872 | # we already | ||
MinRK
|
r3651 | continue | ||
try: | ||||
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) | ||||
except: | ||||
content = error.wrap_exception() | ||||
# build a fake message: | ||||
parent = {} | ||||
header = {} | ||||
parent['msg_id'] = msg_id | ||||
header['engine'] = uuid | ||||
MinRK
|
r4006 | header['date'] = datetime.now() | ||
MinRK
|
r3651 | msg = dict(parent_header=parent, header=header, content=content) | ||
self._handle_apply_reply(msg) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _handle_execute_reply(self, msg): | ||
MinRK
|
r3598 | """Save the reply to an execute_request into our results. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | execute messages are never actually used. apply is used instead. | ||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
MinRK
|
r6793 | |||
content = msg['content'] | ||||
header = msg['header'] | ||||
# construct metadata: | ||||
md = self.metadata[msg_id] | ||||
md.update(self._extract_metadata(header, parent, content)) | ||||
# is this redundant? | ||||
self.metadata[msg_id] = md | ||||
e_outstanding = self._outstanding_dict[md['engine_uuid']] | ||||
if msg_id in e_outstanding: | ||||
e_outstanding.remove(msg_id) | ||||
# construct result: | ||||
if content['status'] == 'ok': | ||||
MinRK
|
r6819 | self.results[msg_id] = ExecuteReply(msg_id, content, md) | ||
MinRK
|
r6793 | elif content['status'] == 'aborted': | ||
self.results[msg_id] = error.TaskAborted(msg_id) | ||||
elif content['status'] == 'resubmitted': | ||||
# TODO: handle resubmission | ||||
pass | ||||
else: | ||||
self.results[msg_id] = self._unwrap_exception(content) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _handle_apply_reply(self, msg): | ||
MinRK
|
r3555 | """Save the reply to an apply_request into our results.""" | ||
MinRK
|
r4008 | parent = msg['parent_header'] | ||
MinRK
|
r3539 | msg_id = parent['msg_id'] | ||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
print self.results[msg_id] | ||||
print msg | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
content = msg['content'] | ||||
MinRK
|
r4008 | header = msg['header'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | # construct metadata: | ||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r3602 | md.update(self._extract_metadata(header, parent, content)) | ||
MinRK
|
r3651 | # is this redundant? | ||
MinRK
|
r3602 | self.metadata[msg_id] = md | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | ||
if msg_id in e_outstanding: | ||||
e_outstanding.remove(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | # construct result: | ||
MinRK
|
r3539 | if content['status'] == 'ok': | ||
MinRK
|
r3644 | self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] | ||
MinRK
|
r3540 | elif content['status'] == 'aborted': | ||
MinRK
|
r3664 | self.results[msg_id] = error.TaskAborted(msg_id) | ||
MinRK
|
r3540 | elif content['status'] == 'resubmitted': | ||
MinRK
|
r3559 | # TODO: handle resubmission | ||
pass | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3639 | self.results[msg_id] = self._unwrap_exception(content) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _flush_notifications(self): | ||
MinRK
|
r3555 | """Flush notifications of engine registrations waiting | ||
in ZMQ queue.""" | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3539 | handler = self._notification_handlers.get(msg_type, None) | ||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def _flush_results(self, sock): | ||
MinRK
|
r3555 | """Flush task or queue results waiting in ZMQ queue.""" | ||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3539 | handler = self._queue_handlers.get(msg_type, None) | ||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3540 | def _flush_control(self, sock): | ||
MinRK
|
r3555 | """Flush replies from the control channel waiting | ||
MinRK
|
r3559 | in the ZMQ queue. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3559 | Currently: ignore them.""" | ||
MinRK
|
r3664 | if self._ignored_control_replies <= 0: | ||
return | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3540 | while msg is not None: | ||
MinRK
|
r3664 | self._ignored_control_replies -= 1 | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def _flush_ignored_control(self): | ||
"""flush ignored control replies""" | ||||
while self._ignored_control_replies > 0: | ||||
self.session.recv(self._control_socket) | ||||
self._ignored_control_replies -= 1 | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def _flush_ignored_hub_replies(self): | ||
MinRK
|
r4006 | ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3664 | while msg is not None: | ||
MinRK
|
r4006 | ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | def _flush_iopub(self, sock): | ||
"""Flush replies from the iopub channel waiting | ||||
in the ZMQ queue. | ||||
""" | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3602 | while msg is not None: | ||
if self.debug: | ||||
pprint(msg) | ||||
parent = msg['parent_header'] | ||||
MinRK
|
r6091 | # ignore IOPub messages with no parent. | ||
# Caused by print statements or warnings from before the first execution. | ||||
if not parent: | ||||
continue | ||||
MinRK
|
r3602 | msg_id = parent['msg_id'] | ||
content = msg['content'] | ||||
header = msg['header'] | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | # init metadata: | ||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | if msg_type == 'stream': | ||
name = content['name'] | ||||
s = md[name] or '' | ||||
md[name] = s + content['data'] | ||||
elif msg_type == 'pyerr': | ||||
MinRK
|
r3639 | md.update({'pyerr' : self._unwrap_exception(content)}) | ||
MinRK
|
r3684 | elif msg_type == 'pyin': | ||
md.update({'pyin' : content['code']}) | ||||
MinRK
|
r6812 | elif msg_type == 'display_data': | ||
MinRK
|
r6891 | md['outputs'].append(content) | ||
MinRK
|
r6812 | elif msg_type == 'pyout': | ||
MinRK
|
r6891 | md['pyout'] = content | ||
MinRK
|
r3602 | else: | ||
MinRK
|
r6812 | # unhandled msg_type (status, etc.) | ||
pass | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | # reduntant? | ||
MinRK
|
r3602 | self.metadata[msg_id] = md | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # len, getitem | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3636 | def __len__(self): | ||
"""len(client) returns # of engines.""" | ||||
return len(self.ids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def __getitem__(self, key): | ||
MinRK
|
r3635 | """index access returns DirectView multiplexer objects | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3635 | Must be int, slice, or list/tuple/xrange of ints""" | ||
if not isinstance(key, (int, slice, tuple, list, xrange)): | ||||
raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3665 | return self.direct_view(key) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Begin public methods | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @property | ||
def ids(self): | ||||
"""Always up-to-date ids property.""" | ||||
self._flush_notifications() | ||||
# always copy: | ||||
return list(self._ids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def close(self): | ||
if self._closed: | ||||
return | ||||
MinRK
|
r6484 | self.stop_spin_thread() | ||
MinRK
|
r3664 | snames = filter(lambda n: n.endswith('socket'), dir(self)) | ||
for socket in map(lambda name: getattr(self, name), snames): | ||||
if isinstance(socket, zmq.Socket) and not socket.closed: | ||||
socket.close() | ||||
self._closed = True | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6484 | def _spin_every(self, interval=1): | ||
"""target func for use in spin_thread""" | ||||
while True: | ||||
if self._stop_spinning.is_set(): | ||||
return | ||||
time.sleep(interval) | ||||
self.spin() | ||||
def spin_thread(self, interval=1): | ||||
"""call Client.spin() in a background thread on some regular interval | ||||
This helps ensure that messages don't pile up too much in the zmq queue | ||||
while you are working on other things, or just leaving an idle terminal. | ||||
It also helps limit potential padding of the `received` timestamp | ||||
on AsyncResult objects, used for timings. | ||||
Parameters | ||||
---------- | ||||
interval : float, optional | ||||
The interval on which to spin the client in the background thread | ||||
(simply passed to time.sleep). | ||||
Notes | ||||
----- | ||||
For precision timing, you may want to use this method to put a bound | ||||
on the jitter (in seconds) in `received` timestamps used | ||||
in AsyncResult.wall_time. | ||||
""" | ||||
if self._spin_thread is not None: | ||||
self.stop_spin_thread() | ||||
self._stop_spinning.clear() | ||||
self._spin_thread = Thread(target=self._spin_every, args=(interval,)) | ||||
self._spin_thread.daemon = True | ||||
self._spin_thread.start() | ||||
def stop_spin_thread(self): | ||||
"""stop background spin_thread, if any""" | ||||
if self._spin_thread is not None: | ||||
self._stop_spinning.set() | ||||
self._spin_thread.join() | ||||
self._spin_thread = None | ||||
MinRK
|
r3539 | def spin(self): | ||
MinRK
|
r3555 | """Flush any registration notifications and execution results | ||
waiting in the ZMQ queue. | ||||
""" | ||||
if self._notification_socket: | ||||
MinRK
|
r3539 | self._flush_notifications() | ||
MinRK
|
r6819 | if self._iopub_socket: | ||
self._flush_iopub(self._iopub_socket) | ||||
MinRK
|
r3664 | if self._mux_socket: | ||
self._flush_results(self._mux_socket) | ||||
if self._task_socket: | ||||
self._flush_results(self._task_socket) | ||||
MinRK
|
r3555 | if self._control_socket: | ||
self._flush_control(self._control_socket) | ||||
MinRK
|
r3664 | if self._query_socket: | ||
self._flush_ignored_hub_replies() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def wait(self, jobs=None, timeout=-1): | ||
MinRK
|
r3639 | """waits on one or more `jobs`, for up to `timeout` seconds. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects | ||
MinRK
|
r3555 | ints are indices to self.history | ||
strs are msg_ids | ||||
default: wait on all outstanding messages | ||||
timeout : float | ||||
a time in seconds, after which to give up. | ||||
default is -1, which means no timeout | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | True : when all msg_ids are done | ||
False : timeout reached, some msg_ids still outstanding | ||||
""" | ||||
tic = time.time() | ||||
MinRK
|
r3639 | if jobs is None: | ||
MinRK
|
r3555 | theids = self.outstanding | ||
else: | ||||
MinRK
|
r4172 | if isinstance(jobs, (int, basestring, AsyncResult)): | ||
MinRK
|
r3639 | jobs = [jobs] | ||
MinRK
|
r3555 | theids = set() | ||
MinRK
|
r3639 | for job in jobs: | ||
if isinstance(job, int): | ||||
# index access | ||||
job = self.history[job] | ||||
elif isinstance(job, AsyncResult): | ||||
map(theids.add, job.msg_ids) | ||||
MinRK
|
r3590 | continue | ||
MinRK
|
r3639 | theids.add(job) | ||
MinRK
|
r3590 | if not theids.intersection(self.outstanding): | ||
return True | ||||
MinRK
|
r3555 | self.spin() | ||
while theids.intersection(self.outstanding): | ||||
if timeout >= 0 and ( time.time()-tic ) > timeout: | ||||
break | ||||
time.sleep(1e-3) | ||||
self.spin() | ||||
return len(theids.intersection(self.outstanding)) == 0 | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Control methods | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3540 | def clear(self, targets=None, block=None): | ||
MinRK
|
r3555 | """Clear the namespace in target(s).""" | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3559 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | ||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3774 | if block: | ||
MinRK
|
r3664 | self._flush_ignored_control() | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3654 | raise error | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def abort(self, jobs=None, targets=None, block=None): | ||
"""Abort specific jobs from the execution queues of target(s). | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | This is a mechanism to prevent jobs that have already been submitted | ||
from executing. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | jobs : msg_id, list of msg_ids, or AsyncResult | ||
The jobs to be aborted | ||||
MinRK
|
r5645 | |||
If unspecified/None: abort all outstanding jobs. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | """ | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r5645 | jobs = jobs if jobs is not None else list(self.outstanding) | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
MinRK
|
r5645 | |||
MinRK
|
r3639 | msg_ids = [] | ||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
MinRK
|
r3540 | content = dict(msg_ids=msg_ids) | ||
for t in targets: | ||||
Bernardo B. Marques
|
r4872 | self.session.send(self._control_socket, 'abort_request', | ||
MinRK
|
r3540 | content=content, ident=t) | ||
error = False | ||||
MinRK
|
r3774 | if block: | ||
MinRK
|
r3664 | self._flush_ignored_control() | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3654 | raise error | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
def shutdown(self, targets=None, restart=False, hub=False, block=None): | ||||
"""Terminates one or more engine processes, optionally including the hub.""" | ||||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3664 | if hub: | ||
MinRK
|
r3580 | targets = 'all' | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
Bernardo B. Marques
|
r4872 | self.session.send(self._control_socket, 'shutdown_request', | ||
MinRK
|
r3575 | content={'restart':restart},ident=t) | ||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3664 | if block or hub: | ||
self._flush_ignored_control() | ||||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3664 | idents,msg = self.session.recv(self._control_socket, 0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | if hub: | ||
MinRK
|
r3580 | time.sleep(0.25) | ||
self.session.send(self._query_socket, 'shutdown_request') | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3584 | raise error | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3774 | # Execution related methods | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3583 | def _maybe_raise(self, result): | ||
"""wrapper for maybe raising an exception if apply failed.""" | ||||
if isinstance(result, error.RemoteError): | ||||
raise result | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3583 | return result | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6793 | def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False, | ||
MinRK
|
r3664 | ident=None): | ||
"""construct and send an apply message via a socket. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | This is the principal method with which all engine execution is performed by views. | ||
MinRK
|
r3555 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6868 | if self._closed: | ||
raise RuntimeError("Client cannot be used after its sockets have been closed") | ||||
MinRK
|
r3555 | # defaults: | ||
args = args if args is not None else [] | ||||
kwargs = kwargs if kwargs is not None else {} | ||||
MinRK
|
r3664 | subheader = subheader if subheader is not None else {} | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | # validate arguments | ||
MinRK
|
r5821 | if not callable(f) and not isinstance(f, Reference): | ||
MinRK
|
r3555 | raise TypeError("f must be callable, not %s"%type(f)) | ||
if not isinstance(args, (tuple, list)): | ||||
raise TypeError("args must be tuple or list, not %s"%type(args)) | ||||
if not isinstance(kwargs, dict): | ||||
raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | ||||
MinRK
|
r3664 | if not isinstance(subheader, dict): | ||
raise TypeError("subheader must be dict, not %s"%type(subheader)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | bufs = util.pack_apply_message(f,args,kwargs) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, | ||
subheader=subheader, track=track) | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4230 | msg_id = msg['header']['msg_id'] | ||
MinRK
|
r3539 | self.outstanding.add(msg_id) | ||
MinRK
|
r3664 | if ident: | ||
# possibly routed to a specific engine | ||||
if isinstance(ident, list): | ||||
MinRK
|
r6793 | ident = ident[-1] | ||
if ident in self._engines.values(): | ||||
# save for later, in case of engine death | ||||
self._outstanding_dict[ident].add(msg_id) | ||||
self.history.append(msg_id) | ||||
self.metadata[msg_id]['submitted'] = datetime.now() | ||||
return msg | ||||
MinRK
|
r6803 | def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None): | ||
MinRK
|
r6793 | """construct and send an execute request via a socket. | ||
""" | ||||
MinRK
|
r6868 | if self._closed: | ||
raise RuntimeError("Client cannot be used after its sockets have been closed") | ||||
MinRK
|
r6793 | # defaults: | ||
subheader = subheader if subheader is not None else {} | ||||
# validate arguments | ||||
if not isinstance(code, basestring): | ||||
raise TypeError("code must be text, not %s" % type(code)) | ||||
if not isinstance(subheader, dict): | ||||
raise TypeError("subheader must be dict, not %s" % type(subheader)) | ||||
content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={}) | ||||
msg = self.session.send(socket, "execute_request", content=content, ident=ident, | ||||
subheader=subheader) | ||||
msg_id = msg['header']['msg_id'] | ||||
self.outstanding.add(msg_id) | ||||
if ident: | ||||
# possibly routed to a specific engine | ||||
if isinstance(ident, list): | ||||
MinRK
|
r3664 | ident = ident[-1] | ||
if ident in self._engines.values(): | ||||
# save for later, in case of engine death | ||||
self._outstanding_dict[ident].add(msg_id) | ||||
MinRK
|
r3539 | self.history.append(msg_id) | ||
MinRK
|
r3651 | self.metadata[msg_id]['submitted'] = datetime.now() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | return msg | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # construct a View object | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def load_balanced_view(self, targets=None): | ||
"""construct a DirectView object. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | If no arguments are specified, create a LoadBalancedView | ||
using all engines. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | targets: list,slice,int,etc. [default: use all engines] | ||
The subset of engines across which to load-balance | ||||
""" | ||||
MinRK
|
r4492 | if targets == 'all': | ||
targets = None | ||||
MinRK
|
r3666 | if targets is not None: | ||
MinRK
|
r3665 | targets = self._build_targets(targets)[1] | ||
return LoadBalancedView(client=self, socket=self._task_socket, targets=targets) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def direct_view(self, targets='all'): | ||
"""construct a DirectView object. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5290 | If no targets are specified, create a DirectView using all engines. | ||
rc.direct_view('all') is distinguished from rc[:] in that 'all' will | ||||
evaluate the target engines at each execution, whereas rc[:] will connect to | ||||
all *current* engines, and that list will not change. | ||||
That is, 'all' will always use all engines, whereas rc[:] will not use | ||||
engines added after the DirectView is constructed. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | targets: list,slice,int,etc. [default: use all engines] | ||
The engines to use for the View | ||||
""" | ||||
MinRK
|
r3665 | single = isinstance(targets, int) | ||
MinRK
|
r4492 | # allow 'all' to be lazily evaluated at each execution | ||
if targets != 'all': | ||||
targets = self._build_targets(targets)[1] | ||||
MinRK
|
r3665 | if single: | ||
targets = targets[0] | ||||
return DirectView(client=self, socket=self._mux_socket, targets=targets) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Query methods | ||||
#-------------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def get_result(self, indices_or_msg_ids=None, block=None): | ||
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | If the client already has the results, no request to the Hub will be made. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | This is a convenient way to construct AsyncResult objects, which are wrappers | ||
that include metadata about execution, and allow for awaiting results that | ||||
were not submitted by this Client. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | It can also be a convenient way to retrieve the metadata associated with | ||
blocking execution, since it always retrieves | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Examples | ||
-------- | ||||
:: | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | In [10]: r = client.apply() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | indices_or_msg_ids : integer history index, str msg_id, or list of either | ||
The indices or msg_ids of indices to be retrieved | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | block : bool | ||
Whether to wait for the result to be done | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | AsyncResult | ||
A single AsyncResult object will always be returned. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | AsyncHubResult | ||
A subclass of AsyncResult that retrieves results from the Hub | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | """ | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3639 | if indices_or_msg_ids is None: | ||
indices_or_msg_ids = -1 | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if not isinstance(indices_or_msg_ids, (list,tuple)): | ||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | theids = [] | ||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
MinRK
|
r4172 | if not isinstance(id, basestring): | ||
MinRK
|
r3639 | raise TypeError("indices must be str or int, not %r"%id) | ||
theids.append(id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids) | ||
remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if remote_ids: | ||
ar = AsyncHubResult(self, msg_ids=theids) | ||||
else: | ||||
ar = AsyncResult(self, msg_ids=theids) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if block: | ||
ar.wait() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | return ar | ||
MinRK
|
r3874 | |||
@spin_first | ||||
def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None): | ||||
"""Resubmit one or more tasks. | ||||
in-flight tasks may not be resubmitted. | ||||
Parameters | ||||
---------- | ||||
indices_or_msg_ids : integer history index, str msg_id, or list of either | ||||
The indices or msg_ids of indices to be retrieved | ||||
block : bool | ||||
Whether to wait for the result to be done | ||||
Returns | ||||
------- | ||||
AsyncHubResult | ||||
A subclass of AsyncResult that retrieves results from the Hub | ||||
""" | ||||
block = self.block if block is None else block | ||||
if indices_or_msg_ids is None: | ||||
indices_or_msg_ids = -1 | ||||
if not isinstance(indices_or_msg_ids, (list,tuple)): | ||||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
theids = [] | ||||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
MinRK
|
r4172 | if not isinstance(id, basestring): | ||
MinRK
|
r3874 | raise TypeError("indices must be str or int, not %r"%id) | ||
theids.append(id) | ||||
content = dict(msg_ids = theids) | ||||
self.session.send(self._query_socket, 'resubmit_request', content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
MinRK
|
r6817 | mapping = content['resubmitted'] | ||
new_ids = [ mapping[msg_id] for msg_id in theids ] | ||||
MinRK
|
r3874 | |||
MinRK
|
r6817 | ar = AsyncHubResult(self, msg_ids=new_ids) | ||
MinRK
|
r3874 | |||
if block: | ||||
ar.wait() | ||||
return ar | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def result_status(self, msg_ids, status_only=True): | ||
"""Check on the status of the result(s) of the apply request with `msg_ids`. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | If status_only is False, then the actual results will be retrieved, else | ||
only the status of the results will be checked. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | msg_ids : list of msg_ids | ||
MinRK
|
r3555 | if int: | ||
Passed as index to self.history for convenience. | ||||
MinRK
|
r3639 | status_only : bool (default: True) | ||
MinRK
|
r3555 | if False: | ||
MinRK
|
r3639 | Retrieve the actual results of completed tasks. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | results : dict | ||
There will always be the keys 'pending' and 'completed', which will | ||||
MinRK
|
r3639 | be lists of msg_ids that are incomplete or complete. If `status_only` | ||
is False, then completed results will be keyed by their `msg_id`. | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3641 | if not isinstance(msg_ids, (list,tuple)): | ||
MinRK
|
r3651 | msg_ids = [msg_ids] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | theids = [] | ||
MinRK
|
r3641 | for msg_id in msg_ids: | ||
MinRK
|
r3539 | if isinstance(msg_id, int): | ||
msg_id = self.history[msg_id] | ||||
MinRK
|
r3639 | if not isinstance(msg_id, basestring): | ||
MinRK
|
r3555 | raise TypeError("msg_ids must be str, not %r"%msg_id) | ||
MinRK
|
r3539 | theids.append(msg_id) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | completed = [] | ||
local_results = {} | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3622 | # comment this block out to temporarily disable local shortcut: | ||
MinRK
|
r3639 | for msg_id in theids: | ||
MinRK
|
r3622 | if msg_id in self.results: | ||
completed.append(msg_id) | ||||
local_results[msg_id] = self.results[msg_id] | ||||
theids.remove(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3559 | if theids: # some not locally cached | ||
MinRK
|
r3555 | content = dict(msg_ids=theids, status_only=status_only) | ||
msg = self.session.send(self._query_socket, "result_request", content=content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3598 | buffers = msg['buffers'] | ||
MinRK
|
r3555 | else: | ||
content = dict(completed=[],pending=[]) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | content['completed'].extend(completed) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | if status_only: | ||
return content | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | failures = [] | ||
# load cached results into result: | ||||
content.update(local_results) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | # update cache with results: | ||
for msg_id in sorted(theids): | ||||
if msg_id in content['completed']: | ||||
rec = content[msg_id] | ||||
parent = rec['header'] | ||||
header = rec['result_header'] | ||||
rcontent = rec['result_content'] | ||||
MinRK
|
r3602 | iodict = rec['io'] | ||
MinRK
|
r3598 | if isinstance(rcontent, str): | ||
rcontent = self.session.unpack(rcontent) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r3602 | md.update(self._extract_metadata(header, parent, rcontent)) | ||
MinRK
|
r6469 | if rec.get('received'): | ||
md['received'] = rec['received'] | ||||
MinRK
|
r3602 | md.update(iodict) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | if rcontent['status'] == 'ok': | ||
MinRK
|
r3644 | res,buffers = util.unserialize_object(buffers) | ||
MinRK
|
r3598 | else: | ||
MinRK
|
r3639 | print rcontent | ||
res = self._unwrap_exception(rcontent) | ||||
MinRK
|
r3598 | failures.append(res) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3598 | self.results[msg_id] = res | ||
content[msg_id] = res | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | if len(theids) == 1 and failures: | ||
raise failures[0] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | error.collect_exceptions(failures, "result_status") | ||
MinRK
|
r3555 | return content | ||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3635 | def queue_status(self, targets='all', verbose=False): | ||
MinRK
|
r3555 | """Fetch the status of engine queues. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r3635 | the engines whose states are to be queried. | ||
MinRK
|
r3555 | default : all | ||
verbose : bool | ||||
MinRK
|
r3584 | Whether to return lengths only, or lists of ids for each element | ||
MinRK
|
r3555 | """ | ||
MinRK
|
r6093 | if targets == 'all': | ||
# allow 'all' to be evaluated on the engine | ||||
engine_ids = None | ||||
else: | ||||
engine_ids = self._build_targets(targets)[1] | ||||
MinRK
|
r3664 | content = dict(targets=engine_ids, verbose=verbose) | ||
MinRK
|
r3555 | self.session.send(self._query_socket, "queue_request", content=content) | ||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3555 | content = msg['content'] | ||
status = content.pop('status') | ||||
if status != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r4036 | content = rekey(content) | ||
MinRK
|
r3664 | if isinstance(targets, int): | ||
return content[targets] | ||||
else: | ||||
return content | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def purge_results(self, jobs=[], targets=[]): | ||
MinRK
|
r3664 | """Tell the Hub to forget results. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Individual results can be purged by msg_id, or the entire | ||
MinRK
|
r3584 | history of specific targets can be purged. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4146 | Use `purge_results('all')` to scrub everything from the Hub's db. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | jobs : str or list of str or AsyncResult objects | ||
MinRK
|
r3584 | the msg_ids whose results should be forgotten. | ||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r4146 | The targets, by int_id, whose entire history is to be purged. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3555 | default : None | ||
""" | ||||
MinRK
|
r3639 | if not targets and not jobs: | ||
raise ValueError("Must specify at least one of `targets` and `jobs`") | ||||
MinRK
|
r3555 | if targets: | ||
targets = self._build_targets(targets)[1] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | # construct msg_ids from jobs | ||
MinRK
|
r4146 | if jobs == 'all': | ||
msg_ids = jobs | ||||
else: | ||||
msg_ids = [] | ||||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
content = dict(engine_ids=targets, msg_ids=msg_ids) | ||||
MinRK
|
r3555 | self.session.send(self._query_socket, "purge_request", content=content) | ||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3548 | |||
MinRK
|
r3780 | @spin_first | ||
def hub_history(self): | ||||
"""Get the Hub's history | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Just like the Client, the Hub has a history, which is a list of msg_ids. | ||
This will contain the history of all clients, and, depending on configuration, | ||||
may contain history across multiple cluster sessions. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Any msg_id returned here is a valid argument to `get_result`. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Returns | ||
------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | msg_ids : list of strs | ||
list of all msg_ids, ordered by task submission time. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | self.session.send(self._query_socket, "history_request", content={}) | ||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | if self.debug: | ||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
else: | ||||
return content['history'] | ||||
@spin_first | ||||
def db_query(self, query, keys=None): | ||||
"""Query the Hub's TaskRecord database | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | This will return a list of task record dicts that match `query` | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | query : mongodb query dict | ||
The search dict. See mongodb query docs for details. | ||||
keys : list of strs [optional] | ||||
MinRK
|
r3876 | The subset of keys to be returned. The default is to fetch everything but buffers. | ||
MinRK
|
r3780 | 'msg_id' will *always* be included. | ||
""" | ||||
MinRK
|
r3876 | if isinstance(keys, basestring): | ||
keys = [keys] | ||||
MinRK
|
r3780 | content = dict(query=query, keys=keys) | ||
self.session.send(self._query_socket, "db_request", content=content) | ||||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | records = content['records'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | buffer_lens = content['buffer_lens'] | ||
result_buffer_lens = content['result_buffer_lens'] | ||||
buffers = msg['buffers'] | ||||
has_bufs = buffer_lens is not None | ||||
has_rbufs = result_buffer_lens is not None | ||||
for i,rec in enumerate(records): | ||||
# relink buffers | ||||
if has_bufs: | ||||
blen = buffer_lens[i] | ||||
rec['buffers'], buffers = buffers[:blen],buffers[blen:] | ||||
if has_rbufs: | ||||
blen = result_buffer_lens[i] | ||||
rec['result_buffers'], buffers = buffers[:blen],buffers[blen:] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | return records | ||
MinRK
|
r3587 | |||
MinRK
|
r3666 | __all__ = [ 'Client' ] | ||