client.py
1435 lines
| 52.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """A semi-synchronous Client for the ZMQ cluster | ||
Authors: | ||||
* MinRK | ||||
""" | ||||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
MinRK
|
r4018 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3551 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3575 | import os | ||
MinRK
|
r3631 | import json | ||
MinRK
|
r4155 | import sys | ||
MinRK
|
r3551 | import time | ||
MinRK
|
r3631 | import warnings | ||
from datetime import datetime | ||||
MinRK
|
r3581 | from getpass import getpass | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3631 | |||
MinRK
|
r3614 | pjoin = os.path.join | ||
MinRK
|
r3540 | |||
MinRK
|
r3551 | import zmq | ||
MinRK
|
r3635 | # from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r3551 | |||
MinRK
|
r4071 | from IPython.config.configurable import MultipleInstanceError | ||
from IPython.core.application import BaseIPythonApplication | ||||
MinRK
|
r4036 | from IPython.utils.jsonutil import rekey | ||
MinRK
|
r4108 | from IPython.utils.localinterfaces import LOCAL_IPS | ||
MinRK
|
r3614 | from IPython.utils.path import get_ipython_dir | ||
MinRK
|
r3988 | from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode, | ||
Dict, List, Bool, Set) | ||||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
MinRK
|
r3619 | from IPython.external.ssh import tunnel | ||
MinRK
|
r3539 | |||
MinRK
|
r3673 | from IPython.parallel import error | ||
from IPython.parallel import util | ||||
MinRK
|
r4006 | from IPython.zmq.session import Session, Message | ||
MinRK
|
r3666 | from .asyncresult import AsyncResult, AsyncHubResult | ||
MinRK
|
r4024 | from IPython.core.profiledir import ProfileDir, ProfileDirError | ||
MinRK
|
r3642 | from .view import DirectView, LoadBalancedView | ||
MinRK
|
r3539 | |||
MinRK
|
r4155 | if sys.version_info[0] >= 3: | ||
MinRK
|
r4161 | # xrange is used in a couple 'isinstance' tests in py2 | ||
MinRK
|
r4155 | # should be just 'range' in 3k | ||
xrange = range | ||||
MinRK
|
r3584 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3555 | # Decorators for Client methods | ||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @decorator | ||
MinRK
|
r3664 | def spin_first(f, self, *args, **kwargs): | ||
MinRK
|
r3555 | """Call spin() to sync state prior to calling the method.""" | ||
MinRK
|
r3539 | self.spin() | ||
return f(self, *args, **kwargs) | ||||
MinRK
|
r3598 | |||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3602 | class Metadata(dict): | ||
MinRK
|
r3607 | """Subclass of dict for initializing metadata values. | ||
Attribute access works on keys. | ||||
These objects have a strict set of keys - errors will raise if you try | ||||
to add new keys. | ||||
""" | ||||
MinRK
|
r3602 | def __init__(self, *args, **kwargs): | ||
dict.__init__(self) | ||||
md = {'msg_id' : None, | ||||
'submitted' : None, | ||||
'started' : None, | ||||
'completed' : None, | ||||
'received' : None, | ||||
'engine_uuid' : None, | ||||
'engine_id' : None, | ||||
'follow' : None, | ||||
'after' : None, | ||||
'status' : None, | ||||
'pyin' : None, | ||||
'pyout' : None, | ||||
'pyerr' : None, | ||||
'stdout' : '', | ||||
'stderr' : '', | ||||
} | ||||
self.update(md) | ||||
self.update(dict(*args, **kwargs)) | ||||
MinRK
|
r3607 | |||
def __getattr__(self, key): | ||||
"""getattr aliased to getitem""" | ||||
if key in self.iterkeys(): | ||||
return self[key] | ||||
else: | ||||
raise AttributeError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3607 | def __setattr__(self, key, value): | ||
"""setattr aliased to setitem, with strict""" | ||||
if key in self.iterkeys(): | ||||
self[key] = value | ||||
else: | ||||
raise AttributeError(key) | ||||
def __setitem__(self, key, value): | ||||
"""strict static key enforcement""" | ||||
if key in self.iterkeys(): | ||||
dict.__setitem__(self, key, value) | ||||
else: | ||||
raise KeyError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3636 | class Client(HasTraits): | ||
MinRK
|
r3664 | """A semi-synchronous client to the IPython ZMQ cluster | ||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
MinRK
|
r4071 | url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json | ||
MinRK
|
r3619 | Connection information for the Hub's registration. If a json connector | ||
file is given, then likely no further configuration is necessary. | ||||
MinRK
|
r3620 | [Default: use profile] | ||
profile : bytes | ||||
The name of the Cluster profile to be used to find connector information. | ||||
MinRK
|
r4071 | If run from an IPython application, the default profile will be the same | ||
as the running application, otherwise it will be 'default'. | ||||
Min RK
|
r3572 | context : zmq.Context | ||
MinRK
|
r3620 | Pass an existing zmq.Context instance, otherwise the client will create its own. | ||
Min RK
|
r3572 | debug : bool | ||
flag for lots of message printing for debug purposes | ||||
MinRK
|
r4013 | timeout : int/float | ||
time (in seconds) to wait for connection replies from the Hub | ||||
[Default: 10] | ||||
Min RK
|
r3572 | |||
MinRK
|
r4013 | #-------------- session related args ---------------- | ||
config : Config object | ||||
If specified, this will be relayed to the Session for configuration | ||||
username : str | ||||
set username for the session object | ||||
packer : str (import_string) or callable | ||||
Can be either the simple keyword 'json' or 'pickle', or an import_string to a | ||||
function to serialize messages. Must support same input as | ||||
JSON, and output must be bytes. | ||||
You can pass a callable directly as `pack` | ||||
unpacker : str (import_string) or callable | ||||
The inverse of packer. Only necessary if packer is specified as *not* one | ||||
of 'json' or 'pickle'. | ||||
Min RK
|
r3572 | #-------------- ssh related args ---------------- | ||
# These are args for configuring the ssh tunnel to be used | ||||
# credentials are used to forward connections over ssh to the Controller | ||||
# Note that the ip given in `addr` needs to be relative to sshserver | ||||
# The most basic case is to leave addr as pointing to localhost (127.0.0.1), | ||||
# and set sshserver as the same machine the Controller is on. However, | ||||
# the only requirement is that sshserver is able to see the Controller | ||||
# (i.e. is within the same trusted network). | ||||
sshserver : str | ||||
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port' | ||||
If keyfile or password is specified, and this is not, it will default to | ||||
the ip given in addr. | ||||
MinRK
|
r3575 | sshkey : str; path to public ssh key file | ||
Min RK
|
r3572 | This specifies a key to be used in ssh login, default None. | ||
Regular default ssh keys will be used without specifying this argument. | ||||
MinRK
|
r3619 | password : str | ||
Min RK
|
r3572 | Your ssh password to sshserver. Note that if this is left None, | ||
you will be prompted for it if passwordless key based login is unavailable. | ||||
MinRK
|
r3619 | paramiko : bool | ||
flag for whether to use paramiko instead of shell ssh for tunneling. | ||||
[default: True on win32, False else] | ||||
MinRK
|
r3555 | |||
MinRK
|
r3664 | ------- exec authentication args ------- | ||
If even localhost is untrusted, you can have some protection against | ||||
MinRK
|
r4013 | unauthorized execution by signing messages with HMAC digests. | ||
Messages are still sent as cleartext, so if someone can snoop your | ||||
loopback traffic this will not protect your privacy, but will prevent | ||||
unauthorized execution. | ||||
MinRK
|
r3575 | |||
exec_key : str | ||||
an authentication key or file containing a key | ||||
default: None | ||||
MinRK
|
r3539 | Attributes | ||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3664 | ids : list of int engine IDs | ||
MinRK
|
r3539 | requesting the ids attribute always synchronizes | ||
the registration state. To request ids without synchronization, | ||||
MinRK
|
r3575 | use semi-private _ids attributes. | ||
MinRK
|
r3539 | |||
history : list of msg_ids | ||||
a list of msg_ids, keeping track of all the execution | ||||
MinRK
|
r3555 | messages you have submitted in order. | ||
MinRK
|
r3539 | |||
outstanding : set of msg_ids | ||||
a set of msg_ids that have been submitted, but whose | ||||
MinRK
|
r3555 | results have not yet been received. | ||
MinRK
|
r3539 | |||
results : dict | ||||
a dict of all our results, keyed by msg_id | ||||
block : bool | ||||
determines default behavior when block not specified | ||||
in execution methods | ||||
Methods | ||||
------- | ||||
MinRK
|
r3635 | spin | ||
flushes incoming results and registration state changes | ||||
control methods spin, and requesting `ids` also ensures up to date | ||||
MinRK
|
r3664 | wait | ||
MinRK
|
r3635 | wait on one or more msg_ids | ||
execution methods | ||||
apply | ||||
MinRK
|
r3539 | legacy: execute, run | ||
MinRK
|
r3664 | data movement | ||
push, pull, scatter, gather | ||||
MinRK
|
r3635 | query methods | ||
MinRK
|
r3664 | queue_status, get_result, purge, result_status | ||
MinRK
|
r3540 | |||
MinRK
|
r3635 | control methods | ||
abort, shutdown | ||||
MinRK
|
r3540 | |||
MinRK
|
r3539 | """ | ||
MinRK
|
r3636 | block = Bool(False) | ||
MinRK
|
r3651 | outstanding = Set() | ||
results = Instance('collections.defaultdict', (dict,)) | ||||
metadata = Instance('collections.defaultdict', (Metadata,)) | ||||
MinRK
|
r3636 | history = List() | ||
debug = Bool(False) | ||||
MinRK
|
r4071 | |||
profile=Unicode() | ||||
def _profile_default(self): | ||||
if BaseIPythonApplication.initialized(): | ||||
# an IPython app *might* be running, try to get its profile | ||||
try: | ||||
return BaseIPythonApplication.instance().profile | ||||
except (AttributeError, MultipleInstanceError): | ||||
# could be a *different* subclass of config.Application, | ||||
# which would raise one of these two errors. | ||||
return u'default' | ||||
else: | ||||
return u'default' | ||||
MinRK
|
r3636 | |||
MinRK
|
r3651 | _outstanding_dict = Instance('collections.defaultdict', (set,)) | ||
MinRK
|
r3636 | _ids = List() | ||
_connected=Bool(False) | ||||
_ssh=Bool(False) | ||||
_context = Instance('zmq.Context') | ||||
_config = Dict() | ||||
MinRK
|
r3664 | _engines=Instance(util.ReverseDict, (), {}) | ||
MinRK
|
r3657 | # _hub_socket=Instance('zmq.Socket') | ||
MinRK
|
r3636 | _query_socket=Instance('zmq.Socket') | ||
_control_socket=Instance('zmq.Socket') | ||||
_iopub_socket=Instance('zmq.Socket') | ||||
_notification_socket=Instance('zmq.Socket') | ||||
MinRK
|
r3664 | _mux_socket=Instance('zmq.Socket') | ||
_task_socket=Instance('zmq.Socket') | ||||
MinRK
|
r3988 | _task_scheme=Unicode() | ||
MinRK
|
r3636 | _closed = False | ||
MinRK
|
r3664 | _ignored_control_replies=Int(0) | ||
_ignored_hub_replies=Int(0) | ||||
MinRK
|
r3539 | |||
MinRK
|
r4071 | def __new__(self, *args, **kw): | ||
# don't raise on positional args | ||||
return HasTraits.__new__(self, **kw) | ||||
def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None, | ||||
MinRK
|
r4013 | context=None, debug=False, exec_key=None, | ||
MinRK
|
r3575 | sshserver=None, sshkey=None, password=None, paramiko=None, | ||
MinRK
|
r4013 | timeout=10, **extra_args | ||
MinRK
|
r3614 | ): | ||
MinRK
|
r4071 | if profile: | ||
super(Client, self).__init__(debug=debug, profile=profile) | ||||
else: | ||||
super(Client, self).__init__(debug=debug) | ||||
MinRK
|
r3539 | if context is None: | ||
MinRK
|
r3661 | context = zmq.Context.instance() | ||
MinRK
|
r3636 | self._context = context | ||
MinRK
|
r3614 | |||
MinRK
|
r4071 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) | ||
MinRK
|
r3614 | if self._cd is not None: | ||
if url_or_file is None: | ||||
url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | ||||
MinRK
|
r3664 | assert url_or_file is not None, "I can't find enough information to connect to a hub!"\ | ||
MinRK
|
r3614 | " Please specify at least one of url_or_file or profile." | ||
try: | ||||
MinRK
|
r3664 | util.validate_url(url_or_file) | ||
MinRK
|
r3614 | except AssertionError: | ||
if not os.path.exists(url_or_file): | ||||
if self._cd: | ||||
url_or_file = os.path.join(self._cd.security_dir, url_or_file) | ||||
assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file | ||||
with open(url_or_file) as f: | ||||
cfg = json.loads(f.read()) | ||||
else: | ||||
cfg = {'url':url_or_file} | ||||
# sync defaults from args, json: | ||||
if sshserver: | ||||
cfg['ssh'] = sshserver | ||||
if exec_key: | ||||
cfg['exec_key'] = exec_key | ||||
exec_key = cfg['exec_key'] | ||||
location = cfg.setdefault('location', None) | ||||
MinRK
|
r3664 | cfg['url'] = util.disambiguate_url(cfg['url'], location) | ||
MinRK
|
r3614 | url = cfg['url'] | ||
MinRK
|
r4119 | proto,addr,port = util.split_url(url) | ||
if location is not None and addr == '127.0.0.1': | ||||
# location specified, and connection is expected to be local | ||||
if location not in LOCAL_IPS and not sshserver: | ||||
# load ssh from JSON *only* if the controller is not on | ||||
# this machine | ||||
sshserver=cfg['ssh'] | ||||
if location not in LOCAL_IPS and not sshserver: | ||||
# warn if no ssh specified, but SSH is probably needed | ||||
# This is only a warning, because the most likely cause | ||||
# is a local Controller on a laptop whose IP is dynamic | ||||
MinRK
|
r4117 | warnings.warn(""" | ||
Controller appears to be listening on localhost, but not on this machine. | ||||
If this is true, you should specify Client(...,sshserver='you@%s') | ||||
or instruct your controller to listen on an external IP."""%location, | ||||
RuntimeWarning) | ||||
MinRK
|
r4207 | elif not sshserver: | ||
# otherwise sync with cfg | ||||
sshserver = cfg['ssh'] | ||||
MinRK
|
r3614 | |||
self._config = cfg | ||||
MinRK
|
r3575 | self._ssh = bool(sshserver or sshkey or password) | ||
Min RK
|
r3572 | if self._ssh and sshserver is None: | ||
MinRK
|
r3614 | # default to ssh via localhost | ||
sshserver = url.split('://')[1].split(':')[0] | ||||
Min RK
|
r3572 | if self._ssh and password is None: | ||
MinRK
|
r3575 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): | ||
Min RK
|
r3572 | password=False | ||
else: | ||||
password = getpass("SSH Password for %s: "%sshserver) | ||||
MinRK
|
r3575 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | ||
MinRK
|
r4013 | |||
# configure and construct the session | ||||
if exec_key is not None: | ||||
if os.path.isfile(exec_key): | ||||
extra_args['keyfile'] = exec_key | ||||
else: | ||||
MinRK
|
r4161 | exec_key = util.asbytes(exec_key) | ||
MinRK
|
r4013 | extra_args['key'] = exec_key | ||
self.session = Session(**extra_args) | ||||
MinRK
|
r3657 | self._query_socket = self._context.socket(zmq.XREQ) | ||
MinRK
|
r4161 | self._query_socket.setsockopt(zmq.IDENTITY, util.asbytes(self.session.session)) | ||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3657 | tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3657 | self._query_socket.connect(url) | ||
MinRK
|
r3636 | |||
self.session.debug = self.debug | ||||
MinRK
|
r3539 | |||
self._notification_handlers = {'registration_notification' : self._register_engine, | ||||
'unregistration_notification' : self._unregister_engine, | ||||
MinRK
|
r3664 | 'shutdown_notification' : lambda msg: self.close(), | ||
MinRK
|
r3539 | } | ||
self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | ||||
'apply_reply' : self._handle_apply_reply} | ||||
MinRK
|
r3664 | self._connect(sshserver, ssh_kwargs, timeout) | ||
MinRK
|
r3539 | |||
MinRK
|
r3654 | def __del__(self): | ||
"""cleanup sockets, but _not_ context.""" | ||||
self.close() | ||||
MinRK
|
r3539 | |||
MinRK
|
r3992 | def _setup_profile_dir(self, profile, profile_dir, ipython_dir): | ||
MinRK
|
r3614 | if ipython_dir is None: | ||
ipython_dir = get_ipython_dir() | ||||
MinRK
|
r3992 | if profile_dir is not None: | ||
MinRK
|
r3614 | try: | ||
MinRK
|
r3992 | self._cd = ProfileDir.find_profile_dir(profile_dir) | ||
MinRK
|
r3641 | return | ||
MinRK
|
r3992 | except ProfileDirError: | ||
MinRK
|
r3614 | pass | ||
elif profile is not None: | ||||
try: | ||||
MinRK
|
r3992 | self._cd = ProfileDir.find_profile_dir_by_name( | ||
MinRK
|
r3614 | ipython_dir, profile) | ||
MinRK
|
r3641 | return | ||
MinRK
|
r3992 | except ProfileDirError: | ||
MinRK
|
r3614 | pass | ||
MinRK
|
r3641 | self._cd = None | ||
MinRK
|
r3614 | |||
MinRK
|
r3539 | def _update_engines(self, engines): | ||
MinRK
|
r3555 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | ||
MinRK
|
r3539 | for k,v in engines.iteritems(): | ||
eid = int(k) | ||||
MinRK
|
r4155 | self._engines[eid] = v | ||
MinRK
|
r3625 | self._ids.append(eid) | ||
self._ids = sorted(self._ids) | ||||
MinRK
|
r3622 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | ||
MinRK
|
r3664 | self._task_scheme == 'pure' and self._task_socket: | ||
MinRK
|
r3622 | self._stop_scheduling_tasks() | ||
def _stop_scheduling_tasks(self): | ||||
"""Stop scheduling tasks because an engine has been unregistered | ||||
from a pure ZMQ scheduler. | ||||
""" | ||||
MinRK
|
r3664 | self._task_socket.close() | ||
self._task_socket = None | ||||
MinRK
|
r3622 | msg = "An engine has been unregistered, and we are using pure " +\ | ||
"ZMQ task scheduling. Task farming will be disabled." | ||||
if self.outstanding: | ||||
msg += " If you were running tasks when this happened, " +\ | ||||
"some `outstanding` msg_ids may never resolve." | ||||
warnings.warn(msg, RuntimeWarning) | ||||
MinRK
|
r3539 | |||
def _build_targets(self, targets): | ||||
MinRK
|
r3555 | """Turn valid target IDs or 'all' into two lists: | ||
(int_ids, uuids). | ||||
""" | ||||
MinRK
|
r3777 | if not self._ids: | ||
# flush notification socket if no engines yet, just in case | ||||
if not self.ids: | ||||
raise error.NoEnginesRegistered("Can't build targets without any engines") | ||||
MinRK
|
r3539 | if targets is None: | ||
targets = self._ids | ||||
MinRK
|
r4172 | elif isinstance(targets, basestring): | ||
MinRK
|
r3539 | if targets.lower() == 'all': | ||
targets = self._ids | ||||
else: | ||||
raise TypeError("%r not valid str target, must be 'all'"%(targets)) | ||||
elif isinstance(targets, int): | ||||
MinRK
|
r3665 | if targets < 0: | ||
targets = self.ids[targets] | ||||
MinRK
|
r3777 | if targets not in self._ids: | ||
MinRK
|
r3665 | raise IndexError("No such engine: %i"%targets) | ||
MinRK
|
r3539 | targets = [targets] | ||
MinRK
|
r3665 | |||
if isinstance(targets, slice): | ||||
indices = range(len(self._ids))[targets] | ||||
ids = self.ids | ||||
targets = [ ids[i] for i in indices ] | ||||
if not isinstance(targets, (tuple, list, xrange)): | ||||
raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | ||||
MinRK
|
r4161 | return [util.asbytes(self._engines[t]) for t in targets], list(targets) | ||
MinRK
|
r3539 | |||
MinRK
|
r3664 | def _connect(self, sshserver, ssh_kwargs, timeout): | ||
"""setup all our socket connections to the cluster. This is called from | ||||
MinRK
|
r3555 | __init__.""" | ||
MinRK
|
r3622 | |||
# Maybe allow reconnecting? | ||||
MinRK
|
r3539 | if self._connected: | ||
return | ||||
self._connected=True | ||||
Min RK
|
r3572 | |||
MinRK
|
r3614 | def connect_socket(s, url): | ||
MinRK
|
r3664 | url = util.disambiguate_url(url, self._config['location']) | ||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3614 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3614 | return s.connect(url) | ||
Min RK
|
r3572 | |||
MinRK
|
r3657 | self.session.send(self._query_socket, 'connection_request') | ||
MinRK
|
r4072 | # use Poller because zmq.select has wrong units in pyzmq 2.1.7 | ||
poller = zmq.Poller() | ||||
poller.register(self._query_socket, zmq.POLLIN) | ||||
# poll expects milliseconds, timeout is seconds | ||||
evts = poller.poll(timeout*1000) | ||||
if not evts: | ||||
MinRK
|
r3664 | raise error.TimeoutError("Hub connection request timed out") | ||
MinRK
|
r3657 | idents,msg = self.session.recv(self._query_socket,mode=0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r4006 | msg = Message(msg) | ||
MinRK
|
r3539 | content = msg.content | ||
MinRK
|
r3622 | self._config['registration'] = dict(content) | ||
MinRK
|
r3539 | if content.status == 'ok': | ||
MinRK
|
r4161 | ident = util.asbytes(self.session.session) | ||
MinRK
|
r3603 | if content.mux: | ||
MinRK
|
r3664 | self._mux_socket = self._context.socket(zmq.XREQ) | ||
MinRK
|
r4155 | self._mux_socket.setsockopt(zmq.IDENTITY, ident) | ||
MinRK
|
r3664 | connect_socket(self._mux_socket, content.mux) | ||
MinRK
|
r3539 | if content.task: | ||
MinRK
|
r3622 | self._task_scheme, task_addr = content.task | ||
MinRK
|
r3664 | self._task_socket = self._context.socket(zmq.XREQ) | ||
MinRK
|
r4155 | self._task_socket.setsockopt(zmq.IDENTITY, ident) | ||
MinRK
|
r3664 | connect_socket(self._task_socket, task_addr) | ||
MinRK
|
r3539 | if content.notification: | ||
MinRK
|
r3636 | self._notification_socket = self._context.socket(zmq.SUB) | ||
Min RK
|
r3572 | connect_socket(self._notification_socket, content.notification) | ||
MinRK
|
r3657 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | ||
# if content.query: | ||||
# self._query_socket = self._context.socket(zmq.XREQ) | ||||
# self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
# connect_socket(self._query_socket, content.query) | ||||
MinRK
|
r3540 | if content.control: | ||
MinRK
|
r3657 | self._control_socket = self._context.socket(zmq.XREQ) | ||
MinRK
|
r4155 | self._control_socket.setsockopt(zmq.IDENTITY, ident) | ||
Min RK
|
r3572 | connect_socket(self._control_socket, content.control) | ||
MinRK
|
r3602 | if content.iopub: | ||
MinRK
|
r3636 | self._iopub_socket = self._context.socket(zmq.SUB) | ||
MinRK
|
r3657 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | ||
MinRK
|
r4155 | self._iopub_socket.setsockopt(zmq.IDENTITY, ident) | ||
MinRK
|
r3602 | connect_socket(self._iopub_socket, content.iopub) | ||
MinRK
|
r3539 | self._update_engines(dict(content.engines)) | ||
else: | ||||
self._connected = False | ||||
raise Exception("Failed to connect!") | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# handlers and callbacks for incoming messages | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3639 | def _unwrap_exception(self, content): | ||
MinRK
|
r3664 | """unwrap exception, and remap engine_id to int.""" | ||
MinRK
|
r3644 | e = error.unwrap_exception(content) | ||
MinRK
|
r3658 | # print e.traceback | ||
MinRK
|
r3639 | if e.engine_info: | ||
MinRK
|
r3641 | e_uuid = e.engine_info['engine_uuid'] | ||
MinRK
|
r3639 | eid = self._engines[e_uuid] | ||
MinRK
|
r3641 | e.engine_info['engine_id'] = eid | ||
MinRK
|
r3639 | return e | ||
MinRK
|
r3602 | def _extract_metadata(self, header, parent, content): | ||
MinRK
|
r3598 | md = {'msg_id' : parent['msg_id'], | ||
'received' : datetime.now(), | ||||
MinRK
|
r3607 | 'engine_uuid' : header.get('engine', None), | ||
MinRK
|
r3618 | 'follow' : parent.get('follow', []), | ||
'after' : parent.get('after', []), | ||||
MinRK
|
r3602 | 'status' : content['status'], | ||
} | ||||
MinRK
|
r3607 | |||
if md['engine_uuid'] is not None: | ||||
md['engine_id'] = self._engines.get(md['engine_uuid'], None) | ||||
if 'date' in parent: | ||||
MinRK
|
r4006 | md['submitted'] = parent['date'] | ||
MinRK
|
r3607 | if 'started' in header: | ||
MinRK
|
r4006 | md['started'] = header['started'] | ||
MinRK
|
r3607 | if 'date' in header: | ||
MinRK
|
r4006 | md['completed'] = header['date'] | ||
MinRK
|
r3598 | return md | ||
MinRK
|
r3651 | def _register_engine(self, msg): | ||
"""Register a new engine, and update our connection info.""" | ||||
content = msg['content'] | ||||
eid = content['id'] | ||||
d = {eid : content['queue']} | ||||
self._update_engines(d) | ||||
def _unregister_engine(self, msg): | ||||
"""Unregister an engine that has died.""" | ||||
content = msg['content'] | ||||
eid = int(content['id']) | ||||
if eid in self._ids: | ||||
self._ids.remove(eid) | ||||
uuid = self._engines.pop(eid) | ||||
self._handle_stranded_msgs(eid, uuid) | ||||
MinRK
|
r3664 | if self._task_socket and self._task_scheme == 'pure': | ||
MinRK
|
r3651 | self._stop_scheduling_tasks() | ||
def _handle_stranded_msgs(self, eid, uuid): | ||||
"""Handle messages known to be on an engine when the engine unregisters. | ||||
It is possible that this will fire prematurely - that is, an engine will | ||||
go down after completing a result, and the client will be notified | ||||
of the unregistration and later receive the successful result. | ||||
""" | ||||
outstanding = self._outstanding_dict[uuid] | ||||
for msg_id in list(outstanding): | ||||
if msg_id in self.results: | ||||
# we already | ||||
continue | ||||
try: | ||||
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) | ||||
except: | ||||
content = error.wrap_exception() | ||||
# build a fake message: | ||||
parent = {} | ||||
header = {} | ||||
parent['msg_id'] = msg_id | ||||
header['engine'] = uuid | ||||
MinRK
|
r4006 | header['date'] = datetime.now() | ||
MinRK
|
r3651 | msg = dict(parent_header=parent, header=header, content=content) | ||
self._handle_apply_reply(msg) | ||||
MinRK
|
r3539 | def _handle_execute_reply(self, msg): | ||
MinRK
|
r3598 | """Save the reply to an execute_request into our results. | ||
execute messages are never actually used. apply is used instead. | ||||
""" | ||||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
MinRK
|
r3639 | self.results[msg_id] = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3539 | |||
def _handle_apply_reply(self, msg): | ||||
MinRK
|
r3555 | """Save the reply to an apply_request into our results.""" | ||
MinRK
|
r4008 | parent = msg['parent_header'] | ||
MinRK
|
r3539 | msg_id = parent['msg_id'] | ||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
print self.results[msg_id] | ||||
print msg | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
content = msg['content'] | ||||
MinRK
|
r4008 | header = msg['header'] | ||
MinRK
|
r3598 | |||
MinRK
|
r3602 | # construct metadata: | ||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r3602 | md.update(self._extract_metadata(header, parent, content)) | ||
MinRK
|
r3651 | # is this redundant? | ||
MinRK
|
r3602 | self.metadata[msg_id] = md | ||
MinRK
|
r3598 | |||
MinRK
|
r3651 | e_outstanding = self._outstanding_dict[md['engine_uuid']] | ||
if msg_id in e_outstanding: | ||||
e_outstanding.remove(msg_id) | ||||
MinRK
|
r3602 | # construct result: | ||
MinRK
|
r3539 | if content['status'] == 'ok': | ||
MinRK
|
r3644 | self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] | ||
MinRK
|
r3540 | elif content['status'] == 'aborted': | ||
MinRK
|
r3664 | self.results[msg_id] = error.TaskAborted(msg_id) | ||
MinRK
|
r3540 | elif content['status'] == 'resubmitted': | ||
MinRK
|
r3559 | # TODO: handle resubmission | ||
pass | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3639 | self.results[msg_id] = self._unwrap_exception(content) | ||
MinRK
|
r3539 | |||
def _flush_notifications(self): | ||||
MinRK
|
r3555 | """Flush notifications of engine registrations waiting | ||
in ZMQ queue.""" | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3539 | handler = self._notification_handlers.get(msg_type, None) | ||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | |||
def _flush_results(self, sock): | ||||
MinRK
|
r3555 | """Flush task or queue results waiting in ZMQ queue.""" | ||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3539 | handler = self._queue_handlers.get(msg_type, None) | ||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | |||
MinRK
|
r3540 | def _flush_control(self, sock): | ||
MinRK
|
r3555 | """Flush replies from the control channel waiting | ||
MinRK
|
r3559 | in the ZMQ queue. | ||
Currently: ignore them.""" | ||||
MinRK
|
r3664 | if self._ignored_control_replies <= 0: | ||
return | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3540 | while msg is not None: | ||
MinRK
|
r3664 | self._ignored_control_replies -= 1 | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3540 | |||
MinRK
|
r3664 | def _flush_ignored_control(self): | ||
"""flush ignored control replies""" | ||||
while self._ignored_control_replies > 0: | ||||
self.session.recv(self._control_socket) | ||||
self._ignored_control_replies -= 1 | ||||
def _flush_ignored_hub_replies(self): | ||||
MinRK
|
r4006 | ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3664 | while msg is not None: | ||
MinRK
|
r4006 | ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3664 | |||
MinRK
|
r3602 | def _flush_iopub(self, sock): | ||
"""Flush replies from the iopub channel waiting | ||||
in the ZMQ queue. | ||||
""" | ||||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3602 | while msg is not None: | ||
if self.debug: | ||||
pprint(msg) | ||||
parent = msg['parent_header'] | ||||
msg_id = parent['msg_id'] | ||||
content = msg['content'] | ||||
header = msg['header'] | ||||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
MinRK
|
r3602 | |||
# init metadata: | ||||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r3602 | |||
if msg_type == 'stream': | ||||
name = content['name'] | ||||
s = md[name] or '' | ||||
md[name] = s + content['data'] | ||||
elif msg_type == 'pyerr': | ||||
MinRK
|
r3639 | md.update({'pyerr' : self._unwrap_exception(content)}) | ||
MinRK
|
r3684 | elif msg_type == 'pyin': | ||
md.update({'pyin' : content['code']}) | ||||
MinRK
|
r3602 | else: | ||
MinRK
|
r3684 | md.update({msg_type : content.get('data', '')}) | ||
MinRK
|
r3602 | |||
MinRK
|
r3651 | # reduntant? | ||
MinRK
|
r3602 | self.metadata[msg_id] = md | ||
MinRK
|
r4006 | idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
MinRK
|
r3602 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # len, getitem | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3539 | |||
MinRK
|
r3636 | def __len__(self): | ||
"""len(client) returns # of engines.""" | ||||
return len(self.ids) | ||||
MinRK
|
r3539 | def __getitem__(self, key): | ||
MinRK
|
r3635 | """index access returns DirectView multiplexer objects | ||
MinRK
|
r3539 | |||
MinRK
|
r3635 | Must be int, slice, or list/tuple/xrange of ints""" | ||
if not isinstance(key, (int, slice, tuple, list, xrange)): | ||||
raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3665 | return self.direct_view(key) | ||
MinRK
|
r3539 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Begin public methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3664 | @property | ||
def ids(self): | ||||
"""Always up-to-date ids property.""" | ||||
self._flush_notifications() | ||||
# always copy: | ||||
return list(self._ids) | ||||
def close(self): | ||||
if self._closed: | ||||
return | ||||
snames = filter(lambda n: n.endswith('socket'), dir(self)) | ||||
for socket in map(lambda name: getattr(self, name), snames): | ||||
if isinstance(socket, zmq.Socket) and not socket.closed: | ||||
socket.close() | ||||
self._closed = True | ||||
MinRK
|
r3539 | def spin(self): | ||
MinRK
|
r3555 | """Flush any registration notifications and execution results | ||
waiting in the ZMQ queue. | ||||
""" | ||||
if self._notification_socket: | ||||
MinRK
|
r3539 | self._flush_notifications() | ||
MinRK
|
r3664 | if self._mux_socket: | ||
self._flush_results(self._mux_socket) | ||||
if self._task_socket: | ||||
self._flush_results(self._task_socket) | ||||
MinRK
|
r3555 | if self._control_socket: | ||
self._flush_control(self._control_socket) | ||||
MinRK
|
r3602 | if self._iopub_socket: | ||
self._flush_iopub(self._iopub_socket) | ||||
MinRK
|
r3664 | if self._query_socket: | ||
self._flush_ignored_hub_replies() | ||||
MinRK
|
r3539 | |||
MinRK
|
r3664 | def wait(self, jobs=None, timeout=-1): | ||
MinRK
|
r3639 | """waits on one or more `jobs`, for up to `timeout` seconds. | ||
MinRK
|
r3539 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects | ||
MinRK
|
r3555 | ints are indices to self.history | ||
strs are msg_ids | ||||
default: wait on all outstanding messages | ||||
timeout : float | ||||
a time in seconds, after which to give up. | ||||
default is -1, which means no timeout | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Returns | ||
------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3555 | True : when all msg_ids are done | ||
False : timeout reached, some msg_ids still outstanding | ||||
""" | ||||
tic = time.time() | ||||
MinRK
|
r3639 | if jobs is None: | ||
MinRK
|
r3555 | theids = self.outstanding | ||
else: | ||||
MinRK
|
r4172 | if isinstance(jobs, (int, basestring, AsyncResult)): | ||
MinRK
|
r3639 | jobs = [jobs] | ||
MinRK
|
r3555 | theids = set() | ||
MinRK
|
r3639 | for job in jobs: | ||
if isinstance(job, int): | ||||
# index access | ||||
job = self.history[job] | ||||
elif isinstance(job, AsyncResult): | ||||
map(theids.add, job.msg_ids) | ||||
MinRK
|
r3590 | continue | ||
MinRK
|
r3639 | theids.add(job) | ||
MinRK
|
r3590 | if not theids.intersection(self.outstanding): | ||
return True | ||||
MinRK
|
r3555 | self.spin() | ||
while theids.intersection(self.outstanding): | ||||
if timeout >= 0 and ( time.time()-tic ) > timeout: | ||||
break | ||||
time.sleep(1e-3) | ||||
self.spin() | ||||
return len(theids.intersection(self.outstanding)) == 0 | ||||
#-------------------------------------------------------------------------- | ||||
# Control methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3540 | def clear(self, targets=None, block=None): | ||
MinRK
|
r3555 | """Clear the namespace in target(s).""" | ||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3559 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | ||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3774 | if block: | ||
MinRK
|
r3664 | self._flush_ignored_control() | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3654 | raise error | ||
MinRK
|
r3540 | |||
MinRK
|
r3539 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def abort(self, jobs=None, targets=None, block=None): | ||
"""Abort specific jobs from the execution queues of target(s). | ||||
This is a mechanism to prevent jobs that have already been submitted | ||||
from executing. | ||||
Parameters | ||||
---------- | ||||
jobs : msg_id, list of msg_ids, or AsyncResult | ||||
The jobs to be aborted | ||||
""" | ||||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
MinRK
|
r3639 | msg_ids = [] | ||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
MinRK
|
r3540 | content = dict(msg_ids=msg_ids) | ||
for t in targets: | ||||
MinRK
|
r3555 | self.session.send(self._control_socket, 'abort_request', | ||
MinRK
|
r3540 | content=content, ident=t) | ||
error = False | ||||
MinRK
|
r3774 | if block: | ||
MinRK
|
r3664 | self._flush_ignored_control() | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3654 | raise error | ||
MinRK
|
r3539 | |||
MinRK
|
r3664 | @spin_first | ||
def shutdown(self, targets=None, restart=False, hub=False, block=None): | ||||
"""Terminates one or more engine processes, optionally including the hub.""" | ||||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3664 | if hub: | ||
MinRK
|
r3580 | targets = 'all' | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3575 | self.session.send(self._control_socket, 'shutdown_request', | ||
content={'restart':restart},ident=t) | ||||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3664 | if block or hub: | ||
self._flush_ignored_control() | ||||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3664 | idents,msg = self.session.recv(self._control_socket, 0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3664 | else: | ||
self._ignored_control_replies += len(targets) | ||||
MinRK
|
r3580 | |||
MinRK
|
r3664 | if hub: | ||
MinRK
|
r3580 | time.sleep(0.25) | ||
self.session.send(self._query_socket, 'shutdown_request') | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3580 | |||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3584 | raise error | ||
MinRK
|
r3555 | |||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3774 | # Execution related methods | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3583 | def _maybe_raise(self, result): | ||
"""wrapper for maybe raising an exception if apply failed.""" | ||||
if isinstance(result, error.RemoteError): | ||||
raise result | ||||
return result | ||||
MinRK
|
r3611 | |||
MinRK
|
r3664 | def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False, | ||
ident=None): | ||||
"""construct and send an apply message via a socket. | ||||
MinRK
|
r3555 | |||
MinRK
|
r3664 | This is the principal method with which all engine execution is performed by views. | ||
MinRK
|
r3555 | """ | ||
MinRK
|
r3664 | |||
MinRK
|
r3636 | assert not self._closed, "cannot use me anymore, I'm closed!" | ||
MinRK
|
r3555 | # defaults: | ||
args = args if args is not None else [] | ||||
kwargs = kwargs if kwargs is not None else {} | ||||
MinRK
|
r3664 | subheader = subheader if subheader is not None else {} | ||
MinRK
|
r3555 | |||
MinRK
|
r3664 | # validate arguments | ||
MinRK
|
r3555 | if not callable(f): | ||
raise TypeError("f must be callable, not %s"%type(f)) | ||||
if not isinstance(args, (tuple, list)): | ||||
raise TypeError("args must be tuple or list, not %s"%type(args)) | ||||
if not isinstance(kwargs, dict): | ||||
raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | ||||
MinRK
|
r3664 | if not isinstance(subheader, dict): | ||
raise TypeError("subheader must be dict, not %s"%type(subheader)) | ||||
MinRK
|
r3611 | |||
MinRK
|
r3644 | bufs = util.pack_apply_message(f,args,kwargs) | ||
MinRK
|
r3593 | |||
MinRK
|
r3664 | msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident, | ||
subheader=subheader, track=track) | ||||
Brian E. Granger
|
r4230 | msg_id = msg['header']['msg_id'] | ||
MinRK
|
r3539 | self.outstanding.add(msg_id) | ||
MinRK
|
r3664 | if ident: | ||
# possibly routed to a specific engine | ||||
if isinstance(ident, list): | ||||
ident = ident[-1] | ||||
if ident in self._engines.values(): | ||||
# save for later, in case of engine death | ||||
self._outstanding_dict[ident].add(msg_id) | ||||
MinRK
|
r3539 | self.history.append(msg_id) | ||
MinRK
|
r3651 | self.metadata[msg_id]['submitted'] = datetime.now() | ||
MinRK
|
r3658 | |||
MinRK
|
r3664 | return msg | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # construct a View object | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3664 | def load_balanced_view(self, targets=None): | ||
"""construct a DirectView object. | ||||
If no arguments are specified, create a LoadBalancedView | ||||
using all engines. | ||||
Parameters | ||||
---------- | ||||
targets: list,slice,int,etc. [default: use all engines] | ||||
The subset of engines across which to load-balance | ||||
""" | ||||
MinRK
|
r4492 | if targets == 'all': | ||
targets = None | ||||
MinRK
|
r3666 | if targets is not None: | ||
MinRK
|
r3665 | targets = self._build_targets(targets)[1] | ||
return LoadBalancedView(client=self, socket=self._task_socket, targets=targets) | ||||
MinRK
|
r3664 | |||
def direct_view(self, targets='all'): | ||||
"""construct a DirectView object. | ||||
If no targets are specified, create a DirectView | ||||
using all engines. | ||||
Parameters | ||||
---------- | ||||
targets: list,slice,int,etc. [default: use all engines] | ||||
The engines to use for the View | ||||
""" | ||||
MinRK
|
r3665 | single = isinstance(targets, int) | ||
MinRK
|
r4492 | # allow 'all' to be lazily evaluated at each execution | ||
if targets != 'all': | ||||
targets = self._build_targets(targets)[1] | ||||
MinRK
|
r3665 | if single: | ||
targets = targets[0] | ||||
return DirectView(client=self, socket=self._mux_socket, targets=targets) | ||||
MinRK
|
r3625 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Query methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def get_result(self, indices_or_msg_ids=None, block=None): | ||
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. | ||||
If the client already has the results, no request to the Hub will be made. | ||||
This is a convenient way to construct AsyncResult objects, which are wrappers | ||||
that include metadata about execution, and allow for awaiting results that | ||||
were not submitted by this Client. | ||||
It can also be a convenient way to retrieve the metadata associated with | ||||
blocking execution, since it always retrieves | ||||
Examples | ||||
-------- | ||||
:: | ||||
In [10]: r = client.apply() | ||||
MinRK
|
r3539 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | indices_or_msg_ids : integer history index, str msg_id, or list of either | ||
The indices or msg_ids of indices to be retrieved | ||||
block : bool | ||||
Whether to wait for the result to be done | ||||
Returns | ||||
------- | ||||
AsyncResult | ||||
A single AsyncResult object will always be returned. | ||||
AsyncHubResult | ||||
A subclass of AsyncResult that retrieves results from the Hub | ||||
""" | ||||
MinRK
|
r3774 | block = self.block if block is None else block | ||
MinRK
|
r3639 | if indices_or_msg_ids is None: | ||
indices_or_msg_ids = -1 | ||||
if not isinstance(indices_or_msg_ids, (list,tuple)): | ||||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
theids = [] | ||||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
MinRK
|
r4172 | if not isinstance(id, basestring): | ||
MinRK
|
r3639 | raise TypeError("indices must be str or int, not %r"%id) | ||
theids.append(id) | ||||
local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids) | ||||
remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) | ||||
if remote_ids: | ||||
ar = AsyncHubResult(self, msg_ids=theids) | ||||
else: | ||||
ar = AsyncResult(self, msg_ids=theids) | ||||
if block: | ||||
ar.wait() | ||||
return ar | ||||
MinRK
|
r3874 | |||
@spin_first | ||||
def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None): | ||||
"""Resubmit one or more tasks. | ||||
in-flight tasks may not be resubmitted. | ||||
Parameters | ||||
---------- | ||||
indices_or_msg_ids : integer history index, str msg_id, or list of either | ||||
The indices or msg_ids of indices to be retrieved | ||||
block : bool | ||||
Whether to wait for the result to be done | ||||
Returns | ||||
------- | ||||
AsyncHubResult | ||||
A subclass of AsyncResult that retrieves results from the Hub | ||||
""" | ||||
block = self.block if block is None else block | ||||
if indices_or_msg_ids is None: | ||||
indices_or_msg_ids = -1 | ||||
if not isinstance(indices_or_msg_ids, (list,tuple)): | ||||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
theids = [] | ||||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
MinRK
|
r4172 | if not isinstance(id, basestring): | ||
MinRK
|
r3874 | raise TypeError("indices must be str or int, not %r"%id) | ||
theids.append(id) | ||||
for msg_id in theids: | ||||
self.outstanding.discard(msg_id) | ||||
if msg_id in self.history: | ||||
self.history.remove(msg_id) | ||||
self.results.pop(msg_id, None) | ||||
self.metadata.pop(msg_id, None) | ||||
content = dict(msg_ids = theids) | ||||
self.session.send(self._query_socket, 'resubmit_request', content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
ar = AsyncHubResult(self, msg_ids=theids) | ||||
if block: | ||||
ar.wait() | ||||
return ar | ||||
MinRK
|
r3639 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def result_status(self, msg_ids, status_only=True): | ||
"""Check on the status of the result(s) of the apply request with `msg_ids`. | ||||
If status_only is False, then the actual results will be retrieved, else | ||||
only the status of the results will be checked. | ||||
Parameters | ||||
---------- | ||||
msg_ids : list of msg_ids | ||||
MinRK
|
r3555 | if int: | ||
Passed as index to self.history for convenience. | ||||
MinRK
|
r3639 | status_only : bool (default: True) | ||
MinRK
|
r3555 | if False: | ||
MinRK
|
r3639 | Retrieve the actual results of completed tasks. | ||
MinRK
|
r3598 | |||
Returns | ||||
------- | ||||
results : dict | ||||
There will always be the keys 'pending' and 'completed', which will | ||||
MinRK
|
r3639 | be lists of msg_ids that are incomplete or complete. If `status_only` | ||
is False, then completed results will be keyed by their `msg_id`. | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3641 | if not isinstance(msg_ids, (list,tuple)): | ||
MinRK
|
r3651 | msg_ids = [msg_ids] | ||
MinRK
|
r3639 | |||
MinRK
|
r3539 | theids = [] | ||
MinRK
|
r3641 | for msg_id in msg_ids: | ||
MinRK
|
r3539 | if isinstance(msg_id, int): | ||
msg_id = self.history[msg_id] | ||||
MinRK
|
r3639 | if not isinstance(msg_id, basestring): | ||
MinRK
|
r3555 | raise TypeError("msg_ids must be str, not %r"%msg_id) | ||
MinRK
|
r3539 | theids.append(msg_id) | ||
MinRK
|
r3555 | completed = [] | ||
local_results = {} | ||||
MinRK
|
r3622 | |||
# comment this block out to temporarily disable local shortcut: | ||||
MinRK
|
r3639 | for msg_id in theids: | ||
MinRK
|
r3622 | if msg_id in self.results: | ||
completed.append(msg_id) | ||||
local_results[msg_id] = self.results[msg_id] | ||||
theids.remove(msg_id) | ||||
MinRK
|
r3555 | |||
MinRK
|
r3559 | if theids: # some not locally cached | ||
MinRK
|
r3555 | content = dict(msg_ids=theids, status_only=status_only) | ||
msg = self.session.send(self._query_socket, "result_request", content=content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3598 | buffers = msg['buffers'] | ||
MinRK
|
r3555 | else: | ||
content = dict(completed=[],pending=[]) | ||||
MinRK
|
r3598 | |||
content['completed'].extend(completed) | ||||
if status_only: | ||||
return content | ||||
failures = [] | ||||
# load cached results into result: | ||||
content.update(local_results) | ||||
MinRK
|
r4008 | |||
MinRK
|
r3598 | # update cache with results: | ||
for msg_id in sorted(theids): | ||||
if msg_id in content['completed']: | ||||
rec = content[msg_id] | ||||
parent = rec['header'] | ||||
header = rec['result_header'] | ||||
rcontent = rec['result_content'] | ||||
MinRK
|
r3602 | iodict = rec['io'] | ||
MinRK
|
r3598 | if isinstance(rcontent, str): | ||
rcontent = self.session.unpack(rcontent) | ||||
MinRK
|
r3651 | md = self.metadata[msg_id] | ||
MinRK
|
r3602 | md.update(self._extract_metadata(header, parent, rcontent)) | ||
md.update(iodict) | ||||
MinRK
|
r3598 | |||
if rcontent['status'] == 'ok': | ||||
MinRK
|
r3644 | res,buffers = util.unserialize_object(buffers) | ||
MinRK
|
r3598 | else: | ||
MinRK
|
r3639 | print rcontent | ||
res = self._unwrap_exception(rcontent) | ||||
MinRK
|
r3598 | failures.append(res) | ||
self.results[msg_id] = res | ||||
content[msg_id] = res | ||||
MinRK
|
r3639 | if len(theids) == 1 and failures: | ||
raise failures[0] | ||||
error.collect_exceptions(failures, "result_status") | ||||
MinRK
|
r3555 | return content | ||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3635 | def queue_status(self, targets='all', verbose=False): | ||
MinRK
|
r3555 | """Fetch the status of engine queues. | ||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r3635 | the engines whose states are to be queried. | ||
MinRK
|
r3555 | default : all | ||
verbose : bool | ||||
MinRK
|
r3584 | Whether to return lengths only, or lists of ids for each element | ||
MinRK
|
r3555 | """ | ||
MinRK
|
r3664 | engine_ids = self._build_targets(targets)[1] | ||
content = dict(targets=engine_ids, verbose=verbose) | ||||
MinRK
|
r3555 | self.session.send(self._query_socket, "queue_request", content=content) | ||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3555 | content = msg['content'] | ||
status = content.pop('status') | ||||
if status != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r4036 | content = rekey(content) | ||
MinRK
|
r3664 | if isinstance(targets, int): | ||
return content[targets] | ||||
else: | ||||
return content | ||||
MinRK
|
r3555 | |||
MinRK
|
r3664 | @spin_first | ||
MinRK
|
r3639 | def purge_results(self, jobs=[], targets=[]): | ||
MinRK
|
r3664 | """Tell the Hub to forget results. | ||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Individual results can be purged by msg_id, or the entire | ||
MinRK
|
r3584 | history of specific targets can be purged. | ||
MinRK
|
r3555 | |||
MinRK
|
r4146 | Use `purge_results('all')` to scrub everything from the Hub's db. | ||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3664 | jobs : str or list of str or AsyncResult objects | ||
MinRK
|
r3584 | the msg_ids whose results should be forgotten. | ||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r4146 | The targets, by int_id, whose entire history is to be purged. | ||
MinRK
|
r3584 | |||
MinRK
|
r3555 | default : None | ||
""" | ||||
MinRK
|
r3639 | if not targets and not jobs: | ||
raise ValueError("Must specify at least one of `targets` and `jobs`") | ||||
MinRK
|
r3555 | if targets: | ||
targets = self._build_targets(targets)[1] | ||||
MinRK
|
r3639 | |||
# construct msg_ids from jobs | ||||
MinRK
|
r4146 | if jobs == 'all': | ||
msg_ids = jobs | ||||
else: | ||||
msg_ids = [] | ||||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
content = dict(engine_ids=targets, msg_ids=msg_ids) | ||||
MinRK
|
r3555 | self.session.send(self._query_socket, "purge_request", content=content) | ||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3548 | |||
MinRK
|
r3780 | @spin_first | ||
def hub_history(self): | ||||
"""Get the Hub's history | ||||
Just like the Client, the Hub has a history, which is a list of msg_ids. | ||||
This will contain the history of all clients, and, depending on configuration, | ||||
may contain history across multiple cluster sessions. | ||||
Any msg_id returned here is a valid argument to `get_result`. | ||||
Returns | ||||
------- | ||||
msg_ids : list of strs | ||||
list of all msg_ids, ordered by task submission time. | ||||
""" | ||||
self.session.send(self._query_socket, "history_request", content={}) | ||||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
else: | ||||
return content['history'] | ||||
@spin_first | ||||
def db_query(self, query, keys=None): | ||||
"""Query the Hub's TaskRecord database | ||||
This will return a list of task record dicts that match `query` | ||||
Parameters | ||||
---------- | ||||
query : mongodb query dict | ||||
The search dict. See mongodb query docs for details. | ||||
keys : list of strs [optional] | ||||
MinRK
|
r3876 | The subset of keys to be returned. The default is to fetch everything but buffers. | ||
MinRK
|
r3780 | 'msg_id' will *always* be included. | ||
""" | ||||
MinRK
|
r3876 | if isinstance(keys, basestring): | ||
keys = [keys] | ||||
MinRK
|
r3780 | content = dict(query=query, keys=keys) | ||
self.session.send(self._query_socket, "db_request", content=content) | ||||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise self._unwrap_exception(content) | ||||
records = content['records'] | ||||
MinRK
|
r4008 | |||
MinRK
|
r3780 | buffer_lens = content['buffer_lens'] | ||
result_buffer_lens = content['result_buffer_lens'] | ||||
buffers = msg['buffers'] | ||||
has_bufs = buffer_lens is not None | ||||
has_rbufs = result_buffer_lens is not None | ||||
for i,rec in enumerate(records): | ||||
# relink buffers | ||||
if has_bufs: | ||||
blen = buffer_lens[i] | ||||
rec['buffers'], buffers = buffers[:blen],buffers[blen:] | ||||
if has_rbufs: | ||||
blen = result_buffer_lens[i] | ||||
rec['result_buffers'], buffers = buffers[:blen],buffers[blen:] | ||||
return records | ||||
MinRK
|
r3587 | |||
MinRK
|
r3666 | __all__ = [ 'Client' ] | ||