client.py
1501 lines
| 55.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | """A semi-synchronous Client for the ZMQ controller""" | ||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3575 | import os | ||
MinRK
|
r3631 | import json | ||
MinRK
|
r3551 | import time | ||
MinRK
|
r3631 | import warnings | ||
from datetime import datetime | ||||
MinRK
|
r3581 | from getpass import getpass | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3631 | |||
MinRK
|
r3614 | pjoin = os.path.join | ||
MinRK
|
r3540 | |||
MinRK
|
r3551 | import zmq | ||
MinRK
|
r3635 | # from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r3551 | |||
MinRK
|
r3614 | from IPython.utils.path import get_ipython_dir | ||
MinRK
|
r3643 | from IPython.utils.pickleutil import Reference | ||
MinRK
|
r3636 | from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode, | ||
Dict, List, Bool, Str, Set) | ||||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
MinRK
|
r3619 | from IPython.external.ssh import tunnel | ||
MinRK
|
r3539 | |||
MinRK
|
r3642 | from . import error | ||
from . import map as Map | ||||
MinRK
|
r3644 | from . import util | ||
MinRK
|
r3642 | from . import streamsession as ss | ||
from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult | ||||
from .clusterdir import ClusterDir, ClusterDirError | ||||
from .dependency import Dependency, depend, require, dependent | ||||
MinRK
|
r3644 | from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction | ||
from .util import ReverseDict, validate_url, disambiguate_url | ||||
MinRK
|
r3642 | from .view import DirectView, LoadBalancedView | ||
MinRK
|
r3539 | |||
MinRK
|
r3584 | #-------------------------------------------------------------------------- | ||
# helpers for implementing old MEC API via client.apply | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | def _push(ns): | ||
MinRK
|
r3584 | """helper method for implementing `client.push` via `client.apply`""" | ||
MinRK
|
r3539 | globals().update(ns) | ||
def _pull(keys): | ||||
MinRK
|
r3584 | """helper method for implementing `client.pull` via `client.apply`""" | ||
MinRK
|
r3539 | g = globals() | ||
MinRK
|
r3555 | if isinstance(keys, (list,tuple, set)): | ||
MinRK
|
r3559 | for key in keys: | ||
if not g.has_key(key): | ||||
raise NameError("name '%s' is not defined"%key) | ||||
MinRK
|
r3539 | return map(g.get, keys) | ||
else: | ||||
MinRK
|
r3559 | if not g.has_key(keys): | ||
raise NameError("name '%s' is not defined"%keys) | ||||
MinRK
|
r3539 | return g.get(keys) | ||
def _clear(): | ||||
MinRK
|
r3584 | """helper method for implementing `client.clear` via `client.apply`""" | ||
MinRK
|
r3539 | globals().clear() | ||
MinRK
|
r3590 | def _execute(code): | ||
MinRK
|
r3584 | """helper method for implementing `client.execute` via `client.apply`""" | ||
MinRK
|
r3539 | exec code in globals() | ||
MinRK
|
r3584 | |||
MinRK
|
r3539 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Decorators for Client methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @decorator | ||
MinRK
|
r3555 | def spinfirst(f, self, *args, **kwargs): | ||
"""Call spin() to sync state prior to calling the method.""" | ||||
MinRK
|
r3539 | self.spin() | ||
return f(self, *args, **kwargs) | ||||
@decorator | ||||
def defaultblock(f, self, *args, **kwargs): | ||||
MinRK
|
r3555 | """Default to self.block; preserve self.block.""" | ||
MinRK
|
r3539 | block = kwargs.get('block',None) | ||
block = self.block if block is None else block | ||||
saveblock = self.block | ||||
self.block = block | ||||
MinRK
|
r3590 | try: | ||
ret = f(self, *args, **kwargs) | ||||
finally: | ||||
self.block = saveblock | ||||
MinRK
|
r3539 | return ret | ||
MinRK
|
r3598 | |||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3602 | class Metadata(dict): | ||
MinRK
|
r3607 | """Subclass of dict for initializing metadata values. | ||
Attribute access works on keys. | ||||
These objects have a strict set of keys - errors will raise if you try | ||||
to add new keys. | ||||
""" | ||||
MinRK
|
r3602 | def __init__(self, *args, **kwargs): | ||
dict.__init__(self) | ||||
md = {'msg_id' : None, | ||||
'submitted' : None, | ||||
'started' : None, | ||||
'completed' : None, | ||||
'received' : None, | ||||
'engine_uuid' : None, | ||||
'engine_id' : None, | ||||
'follow' : None, | ||||
'after' : None, | ||||
'status' : None, | ||||
'pyin' : None, | ||||
'pyout' : None, | ||||
'pyerr' : None, | ||||
'stdout' : '', | ||||
'stderr' : '', | ||||
} | ||||
self.update(md) | ||||
self.update(dict(*args, **kwargs)) | ||||
MinRK
|
r3607 | |||
def __getattr__(self, key): | ||||
"""getattr aliased to getitem""" | ||||
if key in self.iterkeys(): | ||||
return self[key] | ||||
else: | ||||
raise AttributeError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3607 | def __setattr__(self, key, value): | ||
"""setattr aliased to setitem, with strict""" | ||||
if key in self.iterkeys(): | ||||
self[key] = value | ||||
else: | ||||
raise AttributeError(key) | ||||
def __setitem__(self, key, value): | ||||
"""strict static key enforcement""" | ||||
if key in self.iterkeys(): | ||||
dict.__setitem__(self, key, value) | ||||
else: | ||||
raise KeyError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3636 | class Client(HasTraits): | ||
MinRK
|
r3539 | """A semi-synchronous client to the IPython ZMQ controller | ||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
MinRK
|
r3619 | url_or_file : bytes; zmq url or path to ipcontroller-client.json | ||
Connection information for the Hub's registration. If a json connector | ||||
file is given, then likely no further configuration is necessary. | ||||
MinRK
|
r3620 | [Default: use profile] | ||
profile : bytes | ||||
The name of the Cluster profile to be used to find connector information. | ||||
[Default: 'default'] | ||||
Min RK
|
r3572 | context : zmq.Context | ||
MinRK
|
r3620 | Pass an existing zmq.Context instance, otherwise the client will create its own. | ||
Min RK
|
r3572 | username : bytes | ||
set username to be passed to the Session object | ||||
debug : bool | ||||
flag for lots of message printing for debug purposes | ||||
#-------------- ssh related args ---------------- | ||||
# These are args for configuring the ssh tunnel to be used | ||||
# credentials are used to forward connections over ssh to the Controller | ||||
# Note that the ip given in `addr` needs to be relative to sshserver | ||||
# The most basic case is to leave addr as pointing to localhost (127.0.0.1), | ||||
# and set sshserver as the same machine the Controller is on. However, | ||||
# the only requirement is that sshserver is able to see the Controller | ||||
# (i.e. is within the same trusted network). | ||||
sshserver : str | ||||
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port' | ||||
If keyfile or password is specified, and this is not, it will default to | ||||
the ip given in addr. | ||||
MinRK
|
r3575 | sshkey : str; path to public ssh key file | ||
Min RK
|
r3572 | This specifies a key to be used in ssh login, default None. | ||
Regular default ssh keys will be used without specifying this argument. | ||||
MinRK
|
r3619 | password : str | ||
Min RK
|
r3572 | Your ssh password to sshserver. Note that if this is left None, | ||
you will be prompted for it if passwordless key based login is unavailable. | ||||
MinRK
|
r3619 | paramiko : bool | ||
flag for whether to use paramiko instead of shell ssh for tunneling. | ||||
[default: True on win32, False else] | ||||
MinRK
|
r3555 | |||
MinRK
|
r3575 | #------- exec authentication args ------- | ||
# If even localhost is untrusted, you can have some protection against | ||||
# unauthorized execution by using a key. Messages are still sent | ||||
# as cleartext, so if someone can snoop your loopback traffic this will | ||||
MinRK
|
r3619 | # not help against malicious attacks. | ||
MinRK
|
r3575 | |||
exec_key : str | ||||
an authentication key or file containing a key | ||||
default: None | ||||
MinRK
|
r3539 | Attributes | ||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3555 | ids : set of int engine IDs | ||
MinRK
|
r3539 | requesting the ids attribute always synchronizes | ||
the registration state. To request ids without synchronization, | ||||
MinRK
|
r3575 | use semi-private _ids attributes. | ||
MinRK
|
r3539 | |||
history : list of msg_ids | ||||
a list of msg_ids, keeping track of all the execution | ||||
MinRK
|
r3555 | messages you have submitted in order. | ||
MinRK
|
r3539 | |||
outstanding : set of msg_ids | ||||
a set of msg_ids that have been submitted, but whose | ||||
MinRK
|
r3555 | results have not yet been received. | ||
MinRK
|
r3539 | |||
results : dict | ||||
a dict of all our results, keyed by msg_id | ||||
block : bool | ||||
determines default behavior when block not specified | ||||
in execution methods | ||||
Methods | ||||
------- | ||||
MinRK
|
r3635 | spin | ||
flushes incoming results and registration state changes | ||||
control methods spin, and requesting `ids` also ensures up to date | ||||
barrier | ||||
wait on one or more msg_ids | ||||
execution methods | ||||
apply | ||||
MinRK
|
r3539 | legacy: execute, run | ||
MinRK
|
r3635 | query methods | ||
queue_status, get_result, purge | ||||
MinRK
|
r3540 | |||
MinRK
|
r3635 | control methods | ||
abort, shutdown | ||||
MinRK
|
r3540 | |||
MinRK
|
r3539 | """ | ||
MinRK
|
r3636 | block = Bool(False) | ||
outstanding=Set() | ||||
results = Dict() | ||||
metadata = Dict() | ||||
history = List() | ||||
debug = Bool(False) | ||||
profile=CUnicode('default') | ||||
_ids = List() | ||||
_connected=Bool(False) | ||||
_ssh=Bool(False) | ||||
_context = Instance('zmq.Context') | ||||
_config = Dict() | ||||
_engines=Instance(ReverseDict, (), {}) | ||||
_registration_socket=Instance('zmq.Socket') | ||||
_query_socket=Instance('zmq.Socket') | ||||
_control_socket=Instance('zmq.Socket') | ||||
_iopub_socket=Instance('zmq.Socket') | ||||
_notification_socket=Instance('zmq.Socket') | ||||
_mux_socket=Instance('zmq.Socket') | ||||
_task_socket=Instance('zmq.Socket') | ||||
_task_scheme=Str() | ||||
_balanced_views=Dict() | ||||
_direct_views=Dict() | ||||
_closed = False | ||||
MinRK
|
r3539 | |||
MinRK
|
r3614 | def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None, | ||
context=None, username=None, debug=False, exec_key=None, | ||||
MinRK
|
r3575 | sshserver=None, sshkey=None, password=None, paramiko=None, | ||
MinRK
|
r3614 | ): | ||
MinRK
|
r3636 | super(Client, self).__init__(debug=debug, profile=profile) | ||
MinRK
|
r3539 | if context is None: | ||
context = zmq.Context() | ||||
MinRK
|
r3636 | self._context = context | ||
MinRK
|
r3614 | |||
self._setup_cluster_dir(profile, cluster_dir, ipython_dir) | ||||
if self._cd is not None: | ||||
if url_or_file is None: | ||||
url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | ||||
assert url_or_file is not None, "I can't find enough information to connect to a controller!"\ | ||||
" Please specify at least one of url_or_file or profile." | ||||
try: | ||||
validate_url(url_or_file) | ||||
except AssertionError: | ||||
if not os.path.exists(url_or_file): | ||||
if self._cd: | ||||
url_or_file = os.path.join(self._cd.security_dir, url_or_file) | ||||
assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file | ||||
with open(url_or_file) as f: | ||||
cfg = json.loads(f.read()) | ||||
else: | ||||
cfg = {'url':url_or_file} | ||||
# sync defaults from args, json: | ||||
if sshserver: | ||||
cfg['ssh'] = sshserver | ||||
if exec_key: | ||||
cfg['exec_key'] = exec_key | ||||
exec_key = cfg['exec_key'] | ||||
sshserver=cfg['ssh'] | ||||
url = cfg['url'] | ||||
location = cfg.setdefault('location', None) | ||||
cfg['url'] = disambiguate_url(cfg['url'], location) | ||||
url = cfg['url'] | ||||
self._config = cfg | ||||
MinRK
|
r3575 | self._ssh = bool(sshserver or sshkey or password) | ||
Min RK
|
r3572 | if self._ssh and sshserver is None: | ||
MinRK
|
r3614 | # default to ssh via localhost | ||
sshserver = url.split('://')[1].split(':')[0] | ||||
Min RK
|
r3572 | if self._ssh and password is None: | ||
MinRK
|
r3575 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): | ||
Min RK
|
r3572 | password=False | ||
else: | ||||
password = getpass("SSH Password for %s: "%sshserver) | ||||
MinRK
|
r3575 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | ||
Fernando Perez
|
r3577 | if exec_key is not None and os.path.isfile(exec_key): | ||
MinRK
|
r3575 | arg = 'keyfile' | ||
else: | ||||
arg = 'key' | ||||
key_arg = {arg:exec_key} | ||||
MinRK
|
r3539 | if username is None: | ||
MinRK
|
r3575 | self.session = ss.StreamSession(**key_arg) | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3575 | self.session = ss.StreamSession(username, **key_arg) | ||
MinRK
|
r3636 | self._registration_socket = self._context.socket(zmq.XREQ) | ||
MinRK
|
r3555 | self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3614 | tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3614 | self._registration_socket.connect(url) | ||
MinRK
|
r3636 | |||
self.session.debug = self.debug | ||||
MinRK
|
r3539 | |||
self._notification_handlers = {'registration_notification' : self._register_engine, | ||||
'unregistration_notification' : self._unregister_engine, | ||||
} | ||||
self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | ||||
'apply_reply' : self._handle_apply_reply} | ||||
Min RK
|
r3572 | self._connect(sshserver, ssh_kwargs) | ||
MinRK
|
r3539 | |||
MinRK
|
r3614 | def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir): | ||
if ipython_dir is None: | ||||
ipython_dir = get_ipython_dir() | ||||
if cluster_dir is not None: | ||||
try: | ||||
self._cd = ClusterDir.find_cluster_dir(cluster_dir) | ||||
MinRK
|
r3641 | return | ||
MinRK
|
r3614 | except ClusterDirError: | ||
pass | ||||
elif profile is not None: | ||||
try: | ||||
self._cd = ClusterDir.find_cluster_dir_by_profile( | ||||
ipython_dir, profile) | ||||
MinRK
|
r3641 | return | ||
MinRK
|
r3614 | except ClusterDirError: | ||
pass | ||||
MinRK
|
r3641 | self._cd = None | ||
MinRK
|
r3614 | |||
MinRK
|
r3539 | @property | ||
def ids(self): | ||||
MinRK
|
r3622 | """Always up-to-date ids property.""" | ||
MinRK
|
r3539 | self._flush_notifications() | ||
return self._ids | ||||
MinRK
|
r3636 | |||
def close(self): | ||||
if self._closed: | ||||
return | ||||
snames = filter(lambda n: n.endswith('socket'), dir(self)) | ||||
for socket in map(lambda name: getattr(self, name), snames): | ||||
socket.close() | ||||
self._closed = True | ||||
MinRK
|
r3539 | |||
def _update_engines(self, engines): | ||||
MinRK
|
r3555 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | ||
MinRK
|
r3539 | for k,v in engines.iteritems(): | ||
eid = int(k) | ||||
MinRK
|
r3540 | self._engines[eid] = bytes(v) # force not unicode | ||
MinRK
|
r3625 | self._ids.append(eid) | ||
self._ids = sorted(self._ids) | ||||
MinRK
|
r3622 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | ||
self._task_scheme == 'pure' and self._task_socket: | ||||
self._stop_scheduling_tasks() | ||||
def _stop_scheduling_tasks(self): | ||||
"""Stop scheduling tasks because an engine has been unregistered | ||||
from a pure ZMQ scheduler. | ||||
""" | ||||
self._task_socket.close() | ||||
self._task_socket = None | ||||
msg = "An engine has been unregistered, and we are using pure " +\ | ||||
"ZMQ task scheduling. Task farming will be disabled." | ||||
if self.outstanding: | ||||
msg += " If you were running tasks when this happened, " +\ | ||||
"some `outstanding` msg_ids may never resolve." | ||||
warnings.warn(msg, RuntimeWarning) | ||||
MinRK
|
r3539 | |||
def _build_targets(self, targets): | ||||
MinRK
|
r3555 | """Turn valid target IDs or 'all' into two lists: | ||
(int_ids, uuids). | ||||
""" | ||||
MinRK
|
r3539 | if targets is None: | ||
targets = self._ids | ||||
elif isinstance(targets, str): | ||||
if targets.lower() == 'all': | ||||
targets = self._ids | ||||
else: | ||||
raise TypeError("%r not valid str target, must be 'all'"%(targets)) | ||||
elif isinstance(targets, int): | ||||
targets = [targets] | ||||
return [self._engines[t] for t in targets], list(targets) | ||||
Min RK
|
r3572 | def _connect(self, sshserver, ssh_kwargs): | ||
MinRK
|
r3555 | """setup all our socket connections to the controller. This is called from | ||
__init__.""" | ||||
MinRK
|
r3622 | |||
# Maybe allow reconnecting? | ||||
MinRK
|
r3539 | if self._connected: | ||
return | ||||
self._connected=True | ||||
Min RK
|
r3572 | |||
MinRK
|
r3614 | def connect_socket(s, url): | ||
url = disambiguate_url(url, self._config['location']) | ||||
Min RK
|
r3572 | if self._ssh: | ||
MinRK
|
r3614 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) | ||
Min RK
|
r3572 | else: | ||
MinRK
|
r3614 | return s.connect(url) | ||
Min RK
|
r3572 | |||
MinRK
|
r3555 | self.session.send(self._registration_socket, 'connection_request') | ||
idents,msg = self.session.recv(self._registration_socket,mode=0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3539 | msg = ss.Message(msg) | ||
content = msg.content | ||||
MinRK
|
r3622 | self._config['registration'] = dict(content) | ||
MinRK
|
r3539 | if content.status == 'ok': | ||
MinRK
|
r3603 | if content.mux: | ||
MinRK
|
r3636 | self._mux_socket = self._context.socket(zmq.PAIR) | ||
MinRK
|
r3555 | self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
MinRK
|
r3603 | connect_socket(self._mux_socket, content.mux) | ||
MinRK
|
r3539 | if content.task: | ||
MinRK
|
r3622 | self._task_scheme, task_addr = content.task | ||
MinRK
|
r3636 | self._task_socket = self._context.socket(zmq.PAIR) | ||
MinRK
|
r3555 | self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
MinRK
|
r3622 | connect_socket(self._task_socket, task_addr) | ||
MinRK
|
r3539 | if content.notification: | ||
MinRK
|
r3636 | self._notification_socket = self._context.socket(zmq.SUB) | ||
Min RK
|
r3572 | connect_socket(self._notification_socket, content.notification) | ||
MinRK
|
r3555 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, "") | ||
MinRK
|
r3540 | if content.query: | ||
MinRK
|
r3636 | self._query_socket = self._context.socket(zmq.PAIR) | ||
MinRK
|
r3555 | self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
Min RK
|
r3572 | connect_socket(self._query_socket, content.query) | ||
MinRK
|
r3540 | if content.control: | ||
MinRK
|
r3636 | self._control_socket = self._context.socket(zmq.PAIR) | ||
MinRK
|
r3555 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
Min RK
|
r3572 | connect_socket(self._control_socket, content.control) | ||
MinRK
|
r3602 | if content.iopub: | ||
MinRK
|
r3636 | self._iopub_socket = self._context.socket(zmq.SUB) | ||
MinRK
|
r3602 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') | ||
self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
connect_socket(self._iopub_socket, content.iopub) | ||||
MinRK
|
r3539 | self._update_engines(dict(content.engines)) | ||
else: | ||||
self._connected = False | ||||
raise Exception("Failed to connect!") | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# handlers and callbacks for incoming messages | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3639 | def _unwrap_exception(self, content): | ||
"""unwrap exception, and remap engineid to int.""" | ||||
MinRK
|
r3644 | e = error.unwrap_exception(content) | ||
MinRK
|
r3639 | if e.engine_info: | ||
MinRK
|
r3641 | e_uuid = e.engine_info['engine_uuid'] | ||
MinRK
|
r3639 | eid = self._engines[e_uuid] | ||
MinRK
|
r3641 | e.engine_info['engine_id'] = eid | ||
MinRK
|
r3639 | return e | ||
MinRK
|
r3539 | def _register_engine(self, msg): | ||
MinRK
|
r3555 | """Register a new engine, and update our connection info.""" | ||
MinRK
|
r3539 | content = msg['content'] | ||
eid = content['id'] | ||||
d = {eid : content['queue']} | ||||
self._update_engines(d) | ||||
def _unregister_engine(self, msg): | ||||
MinRK
|
r3555 | """Unregister an engine that has died.""" | ||
MinRK
|
r3539 | content = msg['content'] | ||
eid = int(content['id']) | ||||
if eid in self._ids: | ||||
self._ids.remove(eid) | ||||
self._engines.pop(eid) | ||||
MinRK
|
r3622 | if self._task_socket and self._task_scheme == 'pure': | ||
self._stop_scheduling_tasks() | ||||
MinRK
|
r3602 | |||
def _extract_metadata(self, header, parent, content): | ||||
MinRK
|
r3598 | md = {'msg_id' : parent['msg_id'], | ||
'received' : datetime.now(), | ||||
MinRK
|
r3607 | 'engine_uuid' : header.get('engine', None), | ||
MinRK
|
r3618 | 'follow' : parent.get('follow', []), | ||
'after' : parent.get('after', []), | ||||
MinRK
|
r3602 | 'status' : content['status'], | ||
} | ||||
MinRK
|
r3607 | |||
if md['engine_uuid'] is not None: | ||||
md['engine_id'] = self._engines.get(md['engine_uuid'], None) | ||||
if 'date' in parent: | ||||
MinRK
|
r3644 | md['submitted'] = datetime.strptime(parent['date'], util.ISO8601) | ||
MinRK
|
r3607 | if 'started' in header: | ||
MinRK
|
r3644 | md['started'] = datetime.strptime(header['started'], util.ISO8601) | ||
MinRK
|
r3607 | if 'date' in header: | ||
MinRK
|
r3644 | md['completed'] = datetime.strptime(header['date'], util.ISO8601) | ||
MinRK
|
r3598 | return md | ||
MinRK
|
r3539 | def _handle_execute_reply(self, msg): | ||
MinRK
|
r3598 | """Save the reply to an execute_request into our results. | ||
execute messages are never actually used. apply is used instead. | ||||
""" | ||||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
MinRK
|
r3639 | self.results[msg_id] = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3539 | |||
def _handle_apply_reply(self, msg): | ||||
MinRK
|
r3555 | """Save the reply to an apply_request into our results.""" | ||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
print self.results[msg_id] | ||||
print msg | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
content = msg['content'] | ||||
MinRK
|
r3598 | header = msg['header'] | ||
MinRK
|
r3602 | # construct metadata: | ||
md = self.metadata.setdefault(msg_id, Metadata()) | ||||
md.update(self._extract_metadata(header, parent, content)) | ||||
self.metadata[msg_id] = md | ||||
MinRK
|
r3598 | |||
MinRK
|
r3602 | # construct result: | ||
MinRK
|
r3539 | if content['status'] == 'ok': | ||
MinRK
|
r3644 | self.results[msg_id] = util.unserialize_object(msg['buffers'])[0] | ||
MinRK
|
r3540 | elif content['status'] == 'aborted': | ||
MinRK
|
r3583 | self.results[msg_id] = error.AbortedTask(msg_id) | ||
MinRK
|
r3540 | elif content['status'] == 'resubmitted': | ||
MinRK
|
r3559 | # TODO: handle resubmission | ||
pass | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3639 | self.results[msg_id] = self._unwrap_exception(content) | ||
MinRK
|
r3539 | |||
def _flush_notifications(self): | ||||
MinRK
|
r3555 | """Flush notifications of engine registrations waiting | ||
in ZMQ queue.""" | ||||
msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3539 | msg = msg[-1] | ||
msg_type = msg['msg_type'] | ||||
handler = self._notification_handlers.get(msg_type, None) | ||||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
MinRK
|
r3555 | msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | |||
def _flush_results(self, sock): | ||||
MinRK
|
r3555 | """Flush task or queue results waiting in ZMQ queue.""" | ||
MinRK
|
r3539 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
while msg is not None: | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3539 | msg = msg[-1] | ||
msg_type = msg['msg_type'] | ||||
handler = self._queue_handlers.get(msg_type, None) | ||||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3540 | def _flush_control(self, sock): | ||
MinRK
|
r3555 | """Flush replies from the control channel waiting | ||
MinRK
|
r3559 | in the ZMQ queue. | ||
Currently: ignore them.""" | ||||
MinRK
|
r3540 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
while msg is not None: | ||||
if self.debug: | ||||
pprint(msg) | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3602 | def _flush_iopub(self, sock): | ||
"""Flush replies from the iopub channel waiting | ||||
in the ZMQ queue. | ||||
""" | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
while msg is not None: | ||||
if self.debug: | ||||
pprint(msg) | ||||
msg = msg[-1] | ||||
parent = msg['parent_header'] | ||||
msg_id = parent['msg_id'] | ||||
content = msg['content'] | ||||
header = msg['header'] | ||||
msg_type = msg['msg_type'] | ||||
# init metadata: | ||||
md = self.metadata.setdefault(msg_id, Metadata()) | ||||
if msg_type == 'stream': | ||||
name = content['name'] | ||||
s = md[name] or '' | ||||
md[name] = s + content['data'] | ||||
elif msg_type == 'pyerr': | ||||
MinRK
|
r3639 | md.update({'pyerr' : self._unwrap_exception(content)}) | ||
MinRK
|
r3602 | else: | ||
md.update({msg_type : content['data']}) | ||||
self.metadata[msg_id] = md | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # len, getitem | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3539 | |||
MinRK
|
r3636 | def __len__(self): | ||
"""len(client) returns # of engines.""" | ||||
return len(self.ids) | ||||
MinRK
|
r3539 | def __getitem__(self, key): | ||
MinRK
|
r3635 | """index access returns DirectView multiplexer objects | ||
MinRK
|
r3539 | |||
MinRK
|
r3635 | Must be int, slice, or list/tuple/xrange of ints""" | ||
if not isinstance(key, (int, slice, tuple, list, xrange)): | ||||
raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key))) | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3635 | return self.view(key, balanced=False) | ||
MinRK
|
r3539 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Begin public methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
def spin(self): | ||||
MinRK
|
r3555 | """Flush any registration notifications and execution results | ||
waiting in the ZMQ queue. | ||||
""" | ||||
if self._notification_socket: | ||||
MinRK
|
r3539 | self._flush_notifications() | ||
MinRK
|
r3555 | if self._mux_socket: | ||
self._flush_results(self._mux_socket) | ||||
if self._task_socket: | ||||
self._flush_results(self._task_socket) | ||||
if self._control_socket: | ||||
self._flush_control(self._control_socket) | ||||
MinRK
|
r3602 | if self._iopub_socket: | ||
self._flush_iopub(self._iopub_socket) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3639 | def barrier(self, jobs=None, timeout=-1): | ||
"""waits on one or more `jobs`, for up to `timeout` seconds. | ||||
MinRK
|
r3539 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects | ||
MinRK
|
r3555 | ints are indices to self.history | ||
strs are msg_ids | ||||
default: wait on all outstanding messages | ||||
timeout : float | ||||
a time in seconds, after which to give up. | ||||
default is -1, which means no timeout | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Returns | ||
------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3555 | True : when all msg_ids are done | ||
False : timeout reached, some msg_ids still outstanding | ||||
""" | ||||
tic = time.time() | ||||
MinRK
|
r3639 | if jobs is None: | ||
MinRK
|
r3555 | theids = self.outstanding | ||
else: | ||||
MinRK
|
r3639 | if isinstance(jobs, (int, str, AsyncResult)): | ||
jobs = [jobs] | ||||
MinRK
|
r3555 | theids = set() | ||
MinRK
|
r3639 | for job in jobs: | ||
if isinstance(job, int): | ||||
# index access | ||||
job = self.history[job] | ||||
elif isinstance(job, AsyncResult): | ||||
map(theids.add, job.msg_ids) | ||||
MinRK
|
r3590 | continue | ||
MinRK
|
r3639 | theids.add(job) | ||
MinRK
|
r3590 | if not theids.intersection(self.outstanding): | ||
return True | ||||
MinRK
|
r3555 | self.spin() | ||
while theids.intersection(self.outstanding): | ||||
if timeout >= 0 and ( time.time()-tic ) > timeout: | ||||
break | ||||
time.sleep(1e-3) | ||||
self.spin() | ||||
return len(theids.intersection(self.outstanding)) == 0 | ||||
#-------------------------------------------------------------------------- | ||||
# Control methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @spinfirst | ||
MinRK
|
r3540 | @defaultblock | ||
def clear(self, targets=None, block=None): | ||||
MinRK
|
r3555 | """Clear the namespace in target(s).""" | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3559 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | ||
MinRK
|
r3540 | error = False | ||
if self.block: | ||||
for i in range(len(targets)): | ||||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3540 | if error: | ||
return error | ||||
MinRK
|
r3539 | |||
@spinfirst | ||||
MinRK
|
r3540 | @defaultblock | ||
MinRK
|
r3639 | def abort(self, jobs=None, targets=None, block=None): | ||
"""Abort specific jobs from the execution queues of target(s). | ||||
This is a mechanism to prevent jobs that have already been submitted | ||||
from executing. | ||||
Parameters | ||||
---------- | ||||
jobs : msg_id, list of msg_ids, or AsyncResult | ||||
The jobs to be aborted | ||||
""" | ||||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
MinRK
|
r3639 | msg_ids = [] | ||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
MinRK
|
r3540 | content = dict(msg_ids=msg_ids) | ||
for t in targets: | ||||
MinRK
|
r3555 | self.session.send(self._control_socket, 'abort_request', | ||
MinRK
|
r3540 | content=content, ident=t) | ||
error = False | ||||
if self.block: | ||||
for i in range(len(targets)): | ||||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3540 | if error: | ||
return error | ||||
MinRK
|
r3539 | |||
MinRK
|
r3540 | @spinfirst | ||
@defaultblock | ||||
MinRK
|
r3580 | def shutdown(self, targets=None, restart=False, controller=False, block=None): | ||
"""Terminates one or more engine processes, optionally including the controller.""" | ||||
if controller: | ||||
targets = 'all' | ||||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3575 | self.session.send(self._control_socket, 'shutdown_request', | ||
content={'restart':restart},ident=t) | ||||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3580 | if block or controller: | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3580 | |||
if controller: | ||||
time.sleep(0.25) | ||||
self.session.send(self._query_socket, 'shutdown_request') | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3639 | error = self._unwrap_exception(msg['content']) | ||
MinRK
|
r3580 | |||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3584 | raise error | ||
MinRK
|
r3555 | |||
#-------------------------------------------------------------------------- | ||||
# Execution methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @defaultblock | ||
def execute(self, code, targets='all', block=None): | ||||
MinRK
|
r3555 | """Executes `code` on `targets` in blocking or nonblocking manner. | ||
MinRK
|
r3539 | |||
MinRK
|
r3584 | ``execute`` is always `bound` (affects engine namespace) | ||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3539 | code : str | ||
the code string to be executed | ||||
targets : int/str/list of ints/strs | ||||
the engines on which to execute | ||||
default : all | ||||
block : bool | ||||
MinRK
|
r3555 | whether or not to wait until done to return | ||
default: self.block | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3639 | result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False) | ||
if not block: | ||||
return result | ||||
MinRK
|
r3539 | |||
MinRK
|
r3620 | def run(self, filename, targets='all', block=None): | ||
"""Execute contents of `filename` on engine(s). | ||||
MinRK
|
r3539 | |||
MinRK
|
r3620 | This simply reads the contents of the file and calls `execute`. | ||
MinRK
|
r3584 | |||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3620 | filename : str | ||
The path to the file | ||||
targets : int/str/list of ints/strs | ||||
the engines on which to execute | ||||
default : all | ||||
MinRK
|
r3539 | block : bool | ||
whether or not to wait until done | ||||
MinRK
|
r3620 | default: self.block | ||
MinRK
|
r3539 | |||
""" | ||||
MinRK
|
r3620 | with open(filename, 'rb') as f: | ||
code = f.read() | ||||
return self.execute(code, targets=targets, block=block) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3583 | def _maybe_raise(self, result): | ||
"""wrapper for maybe raising an exception if apply failed.""" | ||||
if isinstance(result, error.RemoteError): | ||||
raise result | ||||
return result | ||||
MinRK
|
r3611 | |||
def _build_dependency(self, dep): | ||||
"""helper for building jsonable dependencies from various input forms""" | ||||
if isinstance(dep, Dependency): | ||||
return dep.as_dict() | ||||
elif isinstance(dep, AsyncResult): | ||||
return dep.msg_ids | ||||
elif dep is None: | ||||
return [] | ||||
else: | ||||
MinRK
|
r3624 | # pass to Dependency constructor | ||
return list(Dependency(dep)) | ||||
MinRK
|
r3611 | |||
MinRK
|
r3625 | @defaultblock | ||
MinRK
|
r3635 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, | ||
targets=None, balanced=None, | ||||
MinRK
|
r3611 | after=None, follow=None, timeout=None): | ||
MinRK
|
r3555 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | ||
This is the central execution command for the client. | ||||
Parameters | ||||
---------- | ||||
f : function | ||||
The fuction to be called remotely | ||||
args : tuple/list | ||||
The positional arguments passed to `f` | ||||
kwargs : dict | ||||
The keyword arguments passed to `f` | ||||
bound : bool (default: True) | ||||
Whether to execute in the Engine(s) namespace, or in a clean | ||||
namespace not affecting the engine. | ||||
block : bool (default: self.block) | ||||
Whether to wait for the result, or return immediately. | ||||
False: | ||||
MinRK
|
r3623 | returns AsyncResult | ||
MinRK
|
r3555 | True: | ||
returns actual result(s) of f(*args, **kwargs) | ||||
if multiple targets: | ||||
MinRK
|
r3623 | list of results, matching `targets` | ||
MinRK
|
r3555 | targets : int,list of ints, 'all', None | ||
Specify the destination of the job. | ||||
if None: | ||||
Submit via Task queue for load-balancing. | ||||
if 'all': | ||||
Run on all active engines | ||||
if list: | ||||
Run on each specified engine | ||||
if int: | ||||
Run on single engine | ||||
MinRK
|
r3635 | balanced : bool, default None | ||
whether to load-balance. This will default to True | ||||
if targets is unspecified, or False if targets is specified. | ||||
The following arguments are only used when balanced is True: | ||||
after : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a time-based dependency. | ||||
This job will only be run *after* the dependencies | ||||
have been met. | ||||
follow : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a location-based dependency. | ||||
This job will only be run on an engine where this dependency | ||||
is met. | ||||
timeout : float/int or None | ||||
Only for load-balanced execution (targets=None) | ||||
Specify an amount of time (in seconds) for the scheduler to | ||||
wait for dependencies to be met before failing with a | ||||
DependencyTimeout. | ||||
after,follow,timeout only used if `balanced=True`. | ||||
MinRK
|
r3611 | |||
MinRK
|
r3555 | Returns | ||
------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3555 | if block is False: | ||
MinRK
|
r3623 | return AsyncResult wrapping msg_ids | ||
output of AsyncResult.get() is identical to that of `apply(...block=True)` | ||||
MinRK
|
r3555 | else: | ||
if single target: | ||||
MinRK
|
r3623 | return result of `f(*args, **kwargs)` | ||
MinRK
|
r3555 | else: | ||
MinRK
|
r3623 | return list of results, matching `targets` | ||
MinRK
|
r3555 | """ | ||
MinRK
|
r3636 | assert not self._closed, "cannot use me anymore, I'm closed!" | ||
MinRK
|
r3555 | # defaults: | ||
MinRK
|
r3636 | block = block if block is not None else self.block | ||
MinRK
|
r3555 | args = args if args is not None else [] | ||
kwargs = kwargs if kwargs is not None else {} | ||||
MinRK
|
r3635 | if balanced is None: | ||
if targets is None: | ||||
# default to balanced if targets unspecified | ||||
balanced = True | ||||
else: | ||||
# otherwise default to multiplexing | ||||
balanced = False | ||||
if targets is None and balanced is False: | ||||
# default to all if *not* balanced, and targets is unspecified | ||||
targets = 'all' | ||||
MinRK
|
r3555 | # enforce types of f,args,kwrags | ||
if not callable(f): | ||||
raise TypeError("f must be callable, not %s"%type(f)) | ||||
if not isinstance(args, (tuple, list)): | ||||
raise TypeError("args must be tuple or list, not %s"%type(args)) | ||||
if not isinstance(kwargs, dict): | ||||
raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | ||||
MinRK
|
r3611 | |||
MinRK
|
r3625 | options = dict(bound=bound, block=block, targets=targets) | ||
MinRK
|
r3636 | |||
MinRK
|
r3635 | if balanced: | ||
return self._apply_balanced(f, args, kwargs, timeout=timeout, | ||||
MinRK
|
r3625 | after=after, follow=follow, **options) | ||
MinRK
|
r3635 | elif follow or after or timeout: | ||
msg = "follow, after, and timeout args are only used for" | ||||
msg += " load-balanced execution." | ||||
MinRK
|
r3625 | raise ValueError(msg) | ||
MinRK
|
r3635 | else: | ||
MinRK
|
r3625 | return self._apply_direct(f, args, kwargs, **options) | ||
MinRK
|
r3555 | |||
MinRK
|
r3636 | def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None, | ||
MinRK
|
r3611 | after=None, follow=None, timeout=None): | ||
MinRK
|
r3625 | """call f(*args, **kwargs) remotely in a load-balanced manner. | ||
MinRK
|
r3635 | This is a private method, see `apply` for details. | ||
Not to be called directly! | ||||
MinRK
|
r3625 | """ | ||
MinRK
|
r3636 | loc = locals() | ||
for name in ('bound', 'block'): | ||||
assert loc[name] is not None, "kwarg %r must be specified!"%name | ||||
MinRK
|
r3635 | |||
MinRK
|
r3625 | if self._task_socket is None: | ||
msg = "Task farming is disabled" | ||||
if self._task_scheme == 'pure': | ||||
msg += " because the pure ZMQ scheduler cannot handle" | ||||
msg += " disappearing engines." | ||||
raise RuntimeError(msg) | ||||
MinRK
|
r3623 | |||
if self._task_scheme == 'pure': | ||||
# pure zmq scheme doesn't support dependencies | ||||
msg = "Pure ZMQ scheduler doesn't support dependencies" | ||||
if (follow or after): | ||||
# hard fail on DAG dependencies | ||||
raise RuntimeError(msg) | ||||
if isinstance(f, dependent): | ||||
# soft warn on functional dependencies | ||||
warnings.warn(msg, RuntimeWarning) | ||||
MinRK
|
r3625 | # defaults: | ||
args = args if args is not None else [] | ||||
kwargs = kwargs if kwargs is not None else {} | ||||
if targets: | ||||
idents,_ = self._build_targets(targets) | ||||
else: | ||||
idents = [] | ||||
MinRK
|
r3624 | after = self._build_dependency(after) | ||
follow = self._build_dependency(follow) | ||||
MinRK
|
r3625 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) | ||
MinRK
|
r3644 | bufs = util.pack_apply_message(f,args,kwargs) | ||
MinRK
|
r3539 | content = dict(bound=bound) | ||
MinRK
|
r3593 | |||
MinRK
|
r3555 | msg = self.session.send(self._task_socket, "apply_request", | ||
MinRK
|
r3551 | content=content, buffers=bufs, subheader=subheader) | ||
MinRK
|
r3539 | msg_id = msg['msg_id'] | ||
self.outstanding.add(msg_id) | ||||
self.history.append(msg_id) | ||||
MinRK
|
r3596 | ar = AsyncResult(self, [msg_id], fname=f.__name__) | ||
MinRK
|
r3539 | if block: | ||
MinRK
|
r3625 | try: | ||
return ar.get() | ||||
except KeyboardInterrupt: | ||||
return ar | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3596 | return ar | ||
MinRK
|
r3539 | |||
MinRK
|
r3635 | def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None): | ||
MinRK
|
r3555 | """Then underlying method for applying functions to specific engines | ||
MinRK
|
r3625 | via the MUX queue. | ||
MinRK
|
r3635 | This is a private method, see `apply` for details. | ||
MinRK
|
r3625 | Not to be called directly! | ||
""" | ||||
MinRK
|
r3636 | loc = locals() | ||
for name in ('bound', 'block', 'targets'): | ||||
assert loc[name] is not None, "kwarg %r must be specified!"%name | ||||
MinRK
|
r3635 | |||
MinRK
|
r3625 | idents,targets = self._build_targets(targets) | ||
MinRK
|
r3593 | |||
MinRK
|
r3614 | subheader = {} | ||
MinRK
|
r3539 | content = dict(bound=bound) | ||
MinRK
|
r3644 | bufs = util.pack_apply_message(f,args,kwargs) | ||
MinRK
|
r3593 | |||
MinRK
|
r3539 | msg_ids = [] | ||
MinRK
|
r3625 | for ident in idents: | ||
MinRK
|
r3555 | msg = self.session.send(self._mux_socket, "apply_request", | ||
MinRK
|
r3625 | content=content, buffers=bufs, ident=ident, subheader=subheader) | ||
MinRK
|
r3539 | msg_id = msg['msg_id'] | ||
self.outstanding.add(msg_id) | ||||
self.history.append(msg_id) | ||||
msg_ids.append(msg_id) | ||||
MinRK
|
r3596 | ar = AsyncResult(self, msg_ids, fname=f.__name__) | ||
MinRK
|
r3539 | if block: | ||
MinRK
|
r3625 | try: | ||
return ar.get() | ||||
except KeyboardInterrupt: | ||||
return ar | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3596 | return ar | ||
MinRK
|
r3539 | |||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3636 | # construct a View object | ||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
MinRK
|
r3635 | @defaultblock | ||
MinRK
|
r3636 | def remote(self, bound=True, block=None, targets=None, balanced=None): | ||
"""Decorator for making a RemoteFunction""" | ||||
return remote(self, bound=bound, targets=targets, block=block, balanced=balanced) | ||||
MinRK
|
r3589 | |||
MinRK
|
r3635 | @defaultblock | ||
MinRK
|
r3636 | def parallel(self, dist='b', bound=True, block=None, targets=None, balanced=None): | ||
"""Decorator for making a ParallelFunction""" | ||||
return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced) | ||||
def _cache_view(self, targets, balanced): | ||||
"""save views, so subsequent requests don't create new objects.""" | ||||
if balanced: | ||||
view_class = LoadBalancedView | ||||
view_cache = self._balanced_views | ||||
else: | ||||
view_class = DirectView | ||||
view_cache = self._direct_views | ||||
# use str, since often targets will be a list | ||||
key = str(targets) | ||||
if key not in view_cache: | ||||
view_cache[key] = view_class(client=self, targets=targets) | ||||
return view_cache[key] | ||||
MinRK
|
r3589 | |||
MinRK
|
r3636 | def view(self, targets=None, balanced=None): | ||
"""Method for constructing View objects. | ||||
If no arguments are specified, create a LoadBalancedView | ||||
using all engines. If only `targets` specified, it will | ||||
be a DirectView. This method is the underlying implementation | ||||
of ``client.__getitem__``. | ||||
Parameters | ||||
---------- | ||||
targets: list,slice,int,etc. [default: use all engines] | ||||
The engines to use for the View | ||||
balanced : bool [default: False if targets specified, True else] | ||||
whether to build a LoadBalancedView or a DirectView | ||||
""" | ||||
balanced = (targets is None) if balanced is None else balanced | ||||
MinRK
|
r3635 | if targets is None: | ||
if balanced: | ||||
MinRK
|
r3636 | return self._cache_view(None,True) | ||
MinRK
|
r3635 | else: | ||
MinRK
|
r3625 | targets = slice(None) | ||
MinRK
|
r3635 | |||
if isinstance(targets, int): | ||||
MinRK
|
r3639 | if targets < 0: | ||
targets = self.ids[targets] | ||||
MinRK
|
r3635 | if targets not in self.ids: | ||
raise IndexError("No such engine: %i"%targets) | ||||
MinRK
|
r3636 | return self._cache_view(targets, balanced) | ||
MinRK
|
r3635 | |||
if isinstance(targets, slice): | ||||
indices = range(len(self.ids))[targets] | ||||
ids = sorted(self._ids) | ||||
targets = [ ids[i] for i in indices ] | ||||
if isinstance(targets, (tuple, list, xrange)): | ||||
_,targets = self._build_targets(list(targets)) | ||||
MinRK
|
r3636 | return self._cache_view(targets, balanced) | ||
MinRK
|
r3625 | else: | ||
MinRK
|
r3635 | raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets))) | ||
MinRK
|
r3625 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Data movement | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | @defaultblock | ||
MinRK
|
r3587 | def push(self, ns, targets='all', block=None): | ||
MinRK
|
r3555 | """Push the contents of `ns` into the namespace on `target`""" | ||
MinRK
|
r3539 | if not isinstance(ns, dict): | ||
raise TypeError("Must be a dict, not %s"%type(ns)) | ||||
MinRK
|
r3635 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False) | ||
MinRK
|
r3639 | if not block: | ||
return result | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | @defaultblock | ||
MinRK
|
r3590 | def pull(self, keys, targets='all', block=None): | ||
MinRK
|
r3555 | """Pull objects from `target`'s namespace by `keys`""" | ||
if isinstance(keys, str): | ||||
pass | ||||
MinRK
|
r3581 | elif isinstance(keys, (list,tuple,set)): | ||
MinRK
|
r3555 | for key in keys: | ||
if not isinstance(key, str): | ||||
raise TypeError | ||||
MinRK
|
r3635 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False) | ||
MinRK
|
r3539 | return result | ||
MinRK
|
r3635 | @defaultblock | ||
MinRK
|
r3587 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): | ||
""" | ||||
Partition a Python sequence and send the partitions to a set of engines. | ||||
""" | ||||
targets = self._build_targets(targets)[-1] | ||||
mapObject = Map.dists[dist]() | ||||
nparts = len(targets) | ||||
msg_ids = [] | ||||
for index, engineid in enumerate(targets): | ||||
partition = mapObject.getPartition(seq, index, nparts) | ||||
if flatten and len(partition) == 1: | ||||
MinRK
|
r3590 | r = self.push({key: partition[0]}, targets=engineid, block=False) | ||
MinRK
|
r3587 | else: | ||
MinRK
|
r3590 | r = self.push({key: partition}, targets=engineid, block=False) | ||
MinRK
|
r3592 | msg_ids.extend(r.msg_ids) | ||
MinRK
|
r3596 | r = AsyncResult(self, msg_ids, fname='scatter') | ||
MinRK
|
r3587 | if block: | ||
MinRK
|
r3639 | r.get() | ||
MinRK
|
r3587 | else: | ||
return r | ||||
MinRK
|
r3635 | @defaultblock | ||
MinRK
|
r3590 | def gather(self, key, dist='b', targets='all', block=None): | ||
MinRK
|
r3587 | """ | ||
Gather a partitioned sequence on a set of engines as a single local seq. | ||||
""" | ||||
targets = self._build_targets(targets)[-1] | ||||
mapObject = Map.dists[dist]() | ||||
msg_ids = [] | ||||
for index, engineid in enumerate(targets): | ||||
MinRK
|
r3592 | msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids) | ||
MinRK
|
r3587 | |||
MinRK
|
r3596 | r = AsyncMapResult(self, msg_ids, mapObject, fname='gather') | ||
MinRK
|
r3587 | if block: | ||
MinRK
|
r3590 | return r.get() | ||
MinRK
|
r3587 | else: | ||
return r | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Query methods | ||||
#-------------------------------------------------------------------------- | ||||
@spinfirst | ||||
MinRK
|
r3639 | @defaultblock | ||
def get_result(self, indices_or_msg_ids=None, block=None): | ||||
"""Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. | ||||
If the client already has the results, no request to the Hub will be made. | ||||
This is a convenient way to construct AsyncResult objects, which are wrappers | ||||
that include metadata about execution, and allow for awaiting results that | ||||
were not submitted by this Client. | ||||
It can also be a convenient way to retrieve the metadata associated with | ||||
blocking execution, since it always retrieves | ||||
Examples | ||||
-------- | ||||
:: | ||||
In [10]: r = client.apply() | ||||
MinRK
|
r3539 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | indices_or_msg_ids : integer history index, str msg_id, or list of either | ||
The indices or msg_ids of indices to be retrieved | ||||
block : bool | ||||
Whether to wait for the result to be done | ||||
Returns | ||||
------- | ||||
AsyncResult | ||||
A single AsyncResult object will always be returned. | ||||
AsyncHubResult | ||||
A subclass of AsyncResult that retrieves results from the Hub | ||||
""" | ||||
if indices_or_msg_ids is None: | ||||
indices_or_msg_ids = -1 | ||||
if not isinstance(indices_or_msg_ids, (list,tuple)): | ||||
indices_or_msg_ids = [indices_or_msg_ids] | ||||
theids = [] | ||||
for id in indices_or_msg_ids: | ||||
if isinstance(id, int): | ||||
id = self.history[id] | ||||
if not isinstance(id, str): | ||||
raise TypeError("indices must be str or int, not %r"%id) | ||||
theids.append(id) | ||||
local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids) | ||||
remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) | ||||
if remote_ids: | ||||
ar = AsyncHubResult(self, msg_ids=theids) | ||||
else: | ||||
ar = AsyncResult(self, msg_ids=theids) | ||||
if block: | ||||
ar.wait() | ||||
return ar | ||||
@spinfirst | ||||
def result_status(self, msg_ids, status_only=True): | ||||
"""Check on the status of the result(s) of the apply request with `msg_ids`. | ||||
If status_only is False, then the actual results will be retrieved, else | ||||
only the status of the results will be checked. | ||||
Parameters | ||||
---------- | ||||
msg_ids : list of msg_ids | ||||
MinRK
|
r3555 | if int: | ||
Passed as index to self.history for convenience. | ||||
MinRK
|
r3639 | status_only : bool (default: True) | ||
MinRK
|
r3555 | if False: | ||
MinRK
|
r3639 | Retrieve the actual results of completed tasks. | ||
MinRK
|
r3598 | |||
Returns | ||||
------- | ||||
results : dict | ||||
There will always be the keys 'pending' and 'completed', which will | ||||
MinRK
|
r3639 | be lists of msg_ids that are incomplete or complete. If `status_only` | ||
is False, then completed results will be keyed by their `msg_id`. | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3641 | if not isinstance(msg_ids, (list,tuple)): | ||
indices_or_msg_ids = [msg_ids] | ||||
MinRK
|
r3639 | |||
MinRK
|
r3539 | theids = [] | ||
MinRK
|
r3641 | for msg_id in msg_ids: | ||
MinRK
|
r3539 | if isinstance(msg_id, int): | ||
msg_id = self.history[msg_id] | ||||
MinRK
|
r3639 | if not isinstance(msg_id, basestring): | ||
MinRK
|
r3555 | raise TypeError("msg_ids must be str, not %r"%msg_id) | ||
MinRK
|
r3539 | theids.append(msg_id) | ||
MinRK
|
r3555 | completed = [] | ||
local_results = {} | ||||
MinRK
|
r3622 | |||
# comment this block out to temporarily disable local shortcut: | ||||
MinRK
|
r3639 | for msg_id in theids: | ||
MinRK
|
r3622 | if msg_id in self.results: | ||
completed.append(msg_id) | ||||
local_results[msg_id] = self.results[msg_id] | ||||
theids.remove(msg_id) | ||||
MinRK
|
r3555 | |||
MinRK
|
r3559 | if theids: # some not locally cached | ||
MinRK
|
r3555 | content = dict(msg_ids=theids, status_only=status_only) | ||
msg = self.session.send(self._query_socket, "result_request", content=content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3598 | buffers = msg['buffers'] | ||
MinRK
|
r3555 | else: | ||
content = dict(completed=[],pending=[]) | ||||
MinRK
|
r3598 | |||
content['completed'].extend(completed) | ||||
if status_only: | ||||
return content | ||||
failures = [] | ||||
# load cached results into result: | ||||
content.update(local_results) | ||||
# update cache with results: | ||||
for msg_id in sorted(theids): | ||||
if msg_id in content['completed']: | ||||
rec = content[msg_id] | ||||
parent = rec['header'] | ||||
header = rec['result_header'] | ||||
rcontent = rec['result_content'] | ||||
MinRK
|
r3602 | iodict = rec['io'] | ||
MinRK
|
r3598 | if isinstance(rcontent, str): | ||
rcontent = self.session.unpack(rcontent) | ||||
MinRK
|
r3602 | md = self.metadata.setdefault(msg_id, Metadata()) | ||
md.update(self._extract_metadata(header, parent, rcontent)) | ||||
md.update(iodict) | ||||
MinRK
|
r3598 | |||
if rcontent['status'] == 'ok': | ||||
MinRK
|
r3644 | res,buffers = util.unserialize_object(buffers) | ||
MinRK
|
r3598 | else: | ||
MinRK
|
r3639 | print rcontent | ||
res = self._unwrap_exception(rcontent) | ||||
MinRK
|
r3598 | failures.append(res) | ||
self.results[msg_id] = res | ||||
content[msg_id] = res | ||||
MinRK
|
r3639 | if len(theids) == 1 and failures: | ||
raise failures[0] | ||||
error.collect_exceptions(failures, "result_status") | ||||
MinRK
|
r3555 | return content | ||
@spinfirst | ||||
MinRK
|
r3635 | def queue_status(self, targets='all', verbose=False): | ||
MinRK
|
r3555 | """Fetch the status of engine queues. | ||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r3635 | the engines whose states are to be queried. | ||
MinRK
|
r3555 | default : all | ||
verbose : bool | ||||
MinRK
|
r3584 | Whether to return lengths only, or lists of ids for each element | ||
MinRK
|
r3555 | """ | ||
targets = self._build_targets(targets)[1] | ||||
content = dict(targets=targets, verbose=verbose) | ||||
self.session.send(self._query_socket, "queue_request", content=content) | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3555 | content = msg['content'] | ||
status = content.pop('status') | ||||
if status != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3644 | return util.rekey(content) | ||
MinRK
|
r3555 | |||
@spinfirst | ||||
MinRK
|
r3639 | def purge_results(self, jobs=[], targets=[]): | ||
MinRK
|
r3555 | """Tell the controller to forget results. | ||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Individual results can be purged by msg_id, or the entire | ||
MinRK
|
r3584 | history of specific targets can be purged. | ||
MinRK
|
r3555 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3635 | |||
MinRK
|
r3639 | jobs : str or list of strs or AsyncResult objects | ||
MinRK
|
r3584 | the msg_ids whose results should be forgotten. | ||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r3584 | The targets, by uuid or int_id, whose entire history is to be purged. | ||
Use `targets='all'` to scrub everything from the controller's memory. | ||||
MinRK
|
r3555 | default : None | ||
""" | ||||
MinRK
|
r3639 | if not targets and not jobs: | ||
raise ValueError("Must specify at least one of `targets` and `jobs`") | ||||
MinRK
|
r3555 | if targets: | ||
targets = self._build_targets(targets)[1] | ||||
MinRK
|
r3639 | |||
# construct msg_ids from jobs | ||||
msg_ids = [] | ||||
if isinstance(jobs, (basestring,AsyncResult)): | ||||
jobs = [jobs] | ||||
bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) | ||||
if bad_ids: | ||||
raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) | ||||
for j in jobs: | ||||
if isinstance(j, AsyncResult): | ||||
msg_ids.extend(j.msg_ids) | ||||
else: | ||||
msg_ids.append(j) | ||||
MinRK
|
r3555 | content = dict(targets=targets, msg_ids=msg_ids) | ||
self.session.send(self._query_socket, "purge_request", content=content) | ||||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
MinRK
|
r3639 | raise self._unwrap_exception(content) | ||
MinRK
|
r3548 | |||
MinRK
|
r3587 | |||
__all__ = [ 'Client', | ||||
'depend', | ||||
'require', | ||||
'remote', | ||||
'parallel', | ||||
'RemoteFunction', | ||||
'ParallelFunction', | ||||
'DirectView', | ||||
'LoadBalancedView', | ||||
MinRK
|
r3589 | 'AsyncResult', | ||
MinRK
|
r3644 | 'AsyncMapResult', | ||
'Reference' | ||||
MinRK
|
r3587 | ] | ||