client.py
1216 lines
| 44.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | """A semi-synchronous Client for the ZMQ controller""" | ||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3575 | import os | ||
MinRK
|
r3551 | import time | ||
MinRK
|
r3581 | from getpass import getpass | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3598 | from datetime import datetime | ||
MinRK
|
r3540 | |||
MinRK
|
r3551 | import zmq | ||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3539 | from IPython.external.decorator import decorator | ||
Min RK
|
r3572 | from IPython.zmq import tunnel | ||
MinRK
|
r3539 | |||
import streamsession as ss | ||||
MinRK
|
r3559 | # from remotenamespace import RemoteNamespace | ||
from view import DirectView, LoadBalancedView | ||||
MinRK
|
r3548 | from dependency import Dependency, depend, require | ||
MinRK
|
r3583 | import error | ||
MinRK
|
r3587 | import map as Map | ||
MinRK
|
r3589 | from asyncresult import AsyncResult, AsyncMapResult | ||
MinRK
|
r3588 | from remotefunction import remote,parallel,ParallelFunction,RemoteFunction | ||
MinRK
|
r3598 | from util import ReverseDict | ||
MinRK
|
r3539 | |||
MinRK
|
r3584 | #-------------------------------------------------------------------------- | ||
# helpers for implementing old MEC API via client.apply | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | def _push(ns): | ||
MinRK
|
r3584 | """helper method for implementing `client.push` via `client.apply`""" | ||
MinRK
|
r3539 | globals().update(ns) | ||
def _pull(keys): | ||||
MinRK
|
r3584 | """helper method for implementing `client.pull` via `client.apply`""" | ||
MinRK
|
r3539 | g = globals() | ||
MinRK
|
r3555 | if isinstance(keys, (list,tuple, set)): | ||
MinRK
|
r3559 | for key in keys: | ||
if not g.has_key(key): | ||||
raise NameError("name '%s' is not defined"%key) | ||||
MinRK
|
r3539 | return map(g.get, keys) | ||
else: | ||||
MinRK
|
r3559 | if not g.has_key(keys): | ||
raise NameError("name '%s' is not defined"%keys) | ||||
MinRK
|
r3539 | return g.get(keys) | ||
def _clear(): | ||||
MinRK
|
r3584 | """helper method for implementing `client.clear` via `client.apply`""" | ||
MinRK
|
r3539 | globals().clear() | ||
MinRK
|
r3590 | def _execute(code): | ||
MinRK
|
r3584 | """helper method for implementing `client.execute` via `client.apply`""" | ||
MinRK
|
r3539 | exec code in globals() | ||
MinRK
|
r3584 | |||
MinRK
|
r3539 | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Decorators for Client methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @decorator | ||
MinRK
|
r3555 | def spinfirst(f, self, *args, **kwargs): | ||
"""Call spin() to sync state prior to calling the method.""" | ||||
MinRK
|
r3539 | self.spin() | ||
return f(self, *args, **kwargs) | ||||
@decorator | ||||
def defaultblock(f, self, *args, **kwargs): | ||||
MinRK
|
r3555 | """Default to self.block; preserve self.block.""" | ||
MinRK
|
r3539 | block = kwargs.get('block',None) | ||
block = self.block if block is None else block | ||||
saveblock = self.block | ||||
self.block = block | ||||
MinRK
|
r3590 | try: | ||
ret = f(self, *args, **kwargs) | ||||
finally: | ||||
self.block = saveblock | ||||
MinRK
|
r3539 | return ret | ||
MinRK
|
r3598 | |||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3602 | class Metadata(dict): | ||
MinRK
|
r3607 | """Subclass of dict for initializing metadata values. | ||
Attribute access works on keys. | ||||
These objects have a strict set of keys - errors will raise if you try | ||||
to add new keys. | ||||
""" | ||||
MinRK
|
r3602 | def __init__(self, *args, **kwargs): | ||
dict.__init__(self) | ||||
md = {'msg_id' : None, | ||||
'submitted' : None, | ||||
'started' : None, | ||||
'completed' : None, | ||||
'received' : None, | ||||
'engine_uuid' : None, | ||||
'engine_id' : None, | ||||
'follow' : None, | ||||
'after' : None, | ||||
'status' : None, | ||||
'pyin' : None, | ||||
'pyout' : None, | ||||
'pyerr' : None, | ||||
'stdout' : '', | ||||
'stderr' : '', | ||||
} | ||||
self.update(md) | ||||
self.update(dict(*args, **kwargs)) | ||||
MinRK
|
r3607 | |||
def __getattr__(self, key): | ||||
"""getattr aliased to getitem""" | ||||
if key in self.iterkeys(): | ||||
return self[key] | ||||
else: | ||||
raise AttributeError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3607 | def __setattr__(self, key, value): | ||
"""setattr aliased to setitem, with strict""" | ||||
if key in self.iterkeys(): | ||||
self[key] = value | ||||
else: | ||||
raise AttributeError(key) | ||||
def __setitem__(self, key, value): | ||||
"""strict static key enforcement""" | ||||
if key in self.iterkeys(): | ||||
dict.__setitem__(self, key, value) | ||||
else: | ||||
raise KeyError(key) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3539 | class Client(object): | ||
"""A semi-synchronous client to the IPython ZMQ controller | ||||
MinRK
|
r3555 | Parameters | ||
---------- | ||||
MinRK
|
r3559 | addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101' | ||
MinRK
|
r3555 | The address of the controller's registration socket. | ||
Min RK
|
r3572 | [Default: 'tcp://127.0.0.1:10101'] | ||
context : zmq.Context | ||||
Pass an existing zmq.Context instance, otherwise the client will create its own | ||||
username : bytes | ||||
set username to be passed to the Session object | ||||
debug : bool | ||||
flag for lots of message printing for debug purposes | ||||
#-------------- ssh related args ---------------- | ||||
# These are args for configuring the ssh tunnel to be used | ||||
# credentials are used to forward connections over ssh to the Controller | ||||
# Note that the ip given in `addr` needs to be relative to sshserver | ||||
# The most basic case is to leave addr as pointing to localhost (127.0.0.1), | ||||
# and set sshserver as the same machine the Controller is on. However, | ||||
# the only requirement is that sshserver is able to see the Controller | ||||
# (i.e. is within the same trusted network). | ||||
sshserver : str | ||||
A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port' | ||||
If keyfile or password is specified, and this is not, it will default to | ||||
the ip given in addr. | ||||
MinRK
|
r3575 | sshkey : str; path to public ssh key file | ||
Min RK
|
r3572 | This specifies a key to be used in ssh login, default None. | ||
Regular default ssh keys will be used without specifying this argument. | ||||
password : str; | ||||
Your ssh password to sshserver. Note that if this is left None, | ||||
you will be prompted for it if passwordless key based login is unavailable. | ||||
MinRK
|
r3555 | |||
MinRK
|
r3575 | #------- exec authentication args ------- | ||
# If even localhost is untrusted, you can have some protection against | ||||
# unauthorized execution by using a key. Messages are still sent | ||||
# as cleartext, so if someone can snoop your loopback traffic this will | ||||
# not help anything. | ||||
exec_key : str | ||||
an authentication key or file containing a key | ||||
default: None | ||||
MinRK
|
r3539 | Attributes | ||
---------- | ||||
MinRK
|
r3555 | ids : set of int engine IDs | ||
MinRK
|
r3539 | requesting the ids attribute always synchronizes | ||
the registration state. To request ids without synchronization, | ||||
MinRK
|
r3575 | use semi-private _ids attributes. | ||
MinRK
|
r3539 | |||
history : list of msg_ids | ||||
a list of msg_ids, keeping track of all the execution | ||||
MinRK
|
r3555 | messages you have submitted in order. | ||
MinRK
|
r3539 | |||
outstanding : set of msg_ids | ||||
a set of msg_ids that have been submitted, but whose | ||||
MinRK
|
r3555 | results have not yet been received. | ||
MinRK
|
r3539 | |||
results : dict | ||||
a dict of all our results, keyed by msg_id | ||||
block : bool | ||||
determines default behavior when block not specified | ||||
in execution methods | ||||
Methods | ||||
------- | ||||
spin : flushes incoming results and registration state changes | ||||
control methods spin, and requesting `ids` also ensures up to date | ||||
barrier : wait on one or more msg_ids | ||||
MinRK
|
r3575 | execution methods: apply/apply_bound/apply_to/apply_bound | ||
MinRK
|
r3539 | legacy: execute, run | ||
MinRK
|
r3555 | query methods: queue_status, get_result, purge | ||
MinRK
|
r3540 | |||
control methods: abort, kill | ||||
MinRK
|
r3539 | """ | ||
_connected=False | ||||
Min RK
|
r3572 | _ssh=False | ||
MinRK
|
r3539 | _engines=None | ||
MinRK
|
r3555 | _addr='tcp://127.0.0.1:10101' | ||
_registration_socket=None | ||||
_query_socket=None | ||||
_control_socket=None | ||||
MinRK
|
r3602 | _iopub_socket=None | ||
MinRK
|
r3555 | _notification_socket=None | ||
_mux_socket=None | ||||
_task_socket=None | ||||
MinRK
|
r3539 | block = False | ||
outstanding=None | ||||
results = None | ||||
history = None | ||||
MinRK
|
r3540 | debug = False | ||
MinRK
|
r3590 | targets = None | ||
MinRK
|
r3539 | |||
Min RK
|
r3572 | def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False, | ||
MinRK
|
r3575 | sshserver=None, sshkey=None, password=None, paramiko=None, | ||
exec_key=None,): | ||||
MinRK
|
r3539 | if context is None: | ||
context = zmq.Context() | ||||
self.context = context | ||||
MinRK
|
r3590 | self.targets = 'all' | ||
MinRK
|
r3555 | self._addr = addr | ||
MinRK
|
r3575 | self._ssh = bool(sshserver or sshkey or password) | ||
Min RK
|
r3572 | if self._ssh and sshserver is None: | ||
# default to the same | ||||
sshserver = addr.split('://')[1].split(':')[0] | ||||
if self._ssh and password is None: | ||||
MinRK
|
r3575 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): | ||
Min RK
|
r3572 | password=False | ||
else: | ||||
password = getpass("SSH Password for %s: "%sshserver) | ||||
MinRK
|
r3575 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | ||
Fernando Perez
|
r3577 | if exec_key is not None and os.path.isfile(exec_key): | ||
MinRK
|
r3575 | arg = 'keyfile' | ||
else: | ||||
arg = 'key' | ||||
key_arg = {arg:exec_key} | ||||
MinRK
|
r3539 | if username is None: | ||
MinRK
|
r3575 | self.session = ss.StreamSession(**key_arg) | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3575 | self.session = ss.StreamSession(username, **key_arg) | ||
Min RK
|
r3572 | self._registration_socket = self.context.socket(zmq.XREQ) | ||
MinRK
|
r3555 | self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
Min RK
|
r3572 | if self._ssh: | ||
tunnel.tunnel_connection(self._registration_socket, addr, sshserver, **ssh_kwargs) | ||||
else: | ||||
self._registration_socket.connect(addr) | ||||
MinRK
|
r3598 | self._engines = ReverseDict() | ||
MinRK
|
r3539 | self._ids = set() | ||
self.outstanding=set() | ||||
self.results = {} | ||||
MinRK
|
r3598 | self.metadata = {} | ||
MinRK
|
r3539 | self.history = [] | ||
MinRK
|
r3540 | self.debug = debug | ||
self.session.debug = debug | ||||
MinRK
|
r3539 | |||
self._notification_handlers = {'registration_notification' : self._register_engine, | ||||
'unregistration_notification' : self._unregister_engine, | ||||
} | ||||
self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | ||||
'apply_reply' : self._handle_apply_reply} | ||||
Min RK
|
r3572 | self._connect(sshserver, ssh_kwargs) | ||
MinRK
|
r3539 | |||
@property | ||||
def ids(self): | ||||
MinRK
|
r3555 | """Always up to date ids property.""" | ||
MinRK
|
r3539 | self._flush_notifications() | ||
return self._ids | ||||
def _update_engines(self, engines): | ||||
MinRK
|
r3555 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | ||
MinRK
|
r3539 | for k,v in engines.iteritems(): | ||
eid = int(k) | ||||
MinRK
|
r3540 | self._engines[eid] = bytes(v) # force not unicode | ||
MinRK
|
r3539 | self._ids.add(eid) | ||
def _build_targets(self, targets): | ||||
MinRK
|
r3555 | """Turn valid target IDs or 'all' into two lists: | ||
(int_ids, uuids). | ||||
""" | ||||
MinRK
|
r3539 | if targets is None: | ||
targets = self._ids | ||||
elif isinstance(targets, str): | ||||
if targets.lower() == 'all': | ||||
targets = self._ids | ||||
else: | ||||
raise TypeError("%r not valid str target, must be 'all'"%(targets)) | ||||
elif isinstance(targets, int): | ||||
targets = [targets] | ||||
return [self._engines[t] for t in targets], list(targets) | ||||
Min RK
|
r3572 | def _connect(self, sshserver, ssh_kwargs): | ||
MinRK
|
r3555 | """setup all our socket connections to the controller. This is called from | ||
__init__.""" | ||||
MinRK
|
r3539 | if self._connected: | ||
return | ||||
self._connected=True | ||||
Min RK
|
r3572 | |||
def connect_socket(s, addr): | ||||
if self._ssh: | ||||
return tunnel.tunnel_connection(s, addr, sshserver, **ssh_kwargs) | ||||
else: | ||||
return s.connect(addr) | ||||
MinRK
|
r3555 | self.session.send(self._registration_socket, 'connection_request') | ||
idents,msg = self.session.recv(self._registration_socket,mode=0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3539 | msg = ss.Message(msg) | ||
content = msg.content | ||||
if content.status == 'ok': | ||||
MinRK
|
r3603 | if content.mux: | ||
MinRK
|
r3555 | self._mux_socket = self.context.socket(zmq.PAIR) | ||
self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
MinRK
|
r3603 | connect_socket(self._mux_socket, content.mux) | ||
MinRK
|
r3539 | if content.task: | ||
MinRK
|
r3555 | self._task_socket = self.context.socket(zmq.PAIR) | ||
self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
Min RK
|
r3572 | connect_socket(self._task_socket, content.task) | ||
MinRK
|
r3539 | if content.notification: | ||
MinRK
|
r3555 | self._notification_socket = self.context.socket(zmq.SUB) | ||
Min RK
|
r3572 | connect_socket(self._notification_socket, content.notification) | ||
MinRK
|
r3555 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, "") | ||
MinRK
|
r3540 | if content.query: | ||
MinRK
|
r3555 | self._query_socket = self.context.socket(zmq.PAIR) | ||
self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
Min RK
|
r3572 | connect_socket(self._query_socket, content.query) | ||
MinRK
|
r3540 | if content.control: | ||
MinRK
|
r3555 | self._control_socket = self.context.socket(zmq.PAIR) | ||
self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
Min RK
|
r3572 | connect_socket(self._control_socket, content.control) | ||
MinRK
|
r3602 | if content.iopub: | ||
self._iopub_socket = self.context.socket(zmq.SUB) | ||||
self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') | ||||
self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
connect_socket(self._iopub_socket, content.iopub) | ||||
MinRK
|
r3539 | self._update_engines(dict(content.engines)) | ||
else: | ||||
self._connected = False | ||||
raise Exception("Failed to connect!") | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# handlers and callbacks for incoming messages | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | def _register_engine(self, msg): | ||
MinRK
|
r3555 | """Register a new engine, and update our connection info.""" | ||
MinRK
|
r3539 | content = msg['content'] | ||
eid = content['id'] | ||||
d = {eid : content['queue']} | ||||
self._update_engines(d) | ||||
self._ids.add(int(eid)) | ||||
def _unregister_engine(self, msg): | ||||
MinRK
|
r3555 | """Unregister an engine that has died.""" | ||
MinRK
|
r3539 | content = msg['content'] | ||
eid = int(content['id']) | ||||
if eid in self._ids: | ||||
self._ids.remove(eid) | ||||
self._engines.pop(eid) | ||||
MinRK
|
r3602 | |||
def _extract_metadata(self, header, parent, content): | ||||
MinRK
|
r3598 | md = {'msg_id' : parent['msg_id'], | ||
'received' : datetime.now(), | ||||
MinRK
|
r3607 | 'engine_uuid' : header.get('engine', None), | ||
MinRK
|
r3598 | 'follow' : parent['follow'], | ||
'after' : parent['after'], | ||||
MinRK
|
r3602 | 'status' : content['status'], | ||
} | ||||
MinRK
|
r3607 | |||
if md['engine_uuid'] is not None: | ||||
md['engine_id'] = self._engines.get(md['engine_uuid'], None) | ||||
if 'date' in parent: | ||||
md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601) | ||||
if 'started' in header: | ||||
md['started'] = datetime.strptime(header['started'], ss.ISO8601) | ||||
if 'date' in header: | ||||
md['completed'] = datetime.strptime(header['date'], ss.ISO8601) | ||||
MinRK
|
r3598 | return md | ||
MinRK
|
r3539 | def _handle_execute_reply(self, msg): | ||
MinRK
|
r3598 | """Save the reply to an execute_request into our results. | ||
execute messages are never actually used. apply is used instead. | ||||
""" | ||||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
self.results[msg_id] = ss.unwrap_exception(msg['content']) | ||||
def _handle_apply_reply(self, msg): | ||||
MinRK
|
r3555 | """Save the reply to an apply_request into our results.""" | ||
MinRK
|
r3539 | parent = msg['parent_header'] | ||
msg_id = parent['msg_id'] | ||||
if msg_id not in self.outstanding: | ||||
MinRK
|
r3607 | if msg_id in self.history: | ||
print ("got stale result: %s"%msg_id) | ||||
print self.results[msg_id] | ||||
print msg | ||||
else: | ||||
print ("got unknown result: %s"%msg_id) | ||||
MinRK
|
r3539 | else: | ||
self.outstanding.remove(msg_id) | ||||
content = msg['content'] | ||||
MinRK
|
r3598 | header = msg['header'] | ||
MinRK
|
r3602 | # construct metadata: | ||
md = self.metadata.setdefault(msg_id, Metadata()) | ||||
md.update(self._extract_metadata(header, parent, content)) | ||||
self.metadata[msg_id] = md | ||||
MinRK
|
r3598 | |||
MinRK
|
r3602 | # construct result: | ||
MinRK
|
r3539 | if content['status'] == 'ok': | ||
MinRK
|
r3598 | self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0] | ||
MinRK
|
r3540 | elif content['status'] == 'aborted': | ||
MinRK
|
r3583 | self.results[msg_id] = error.AbortedTask(msg_id) | ||
MinRK
|
r3540 | elif content['status'] == 'resubmitted': | ||
MinRK
|
r3559 | # TODO: handle resubmission | ||
pass | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3583 | e = ss.unwrap_exception(content) | ||
MinRK
|
r3607 | if e.engine_info: | ||
e_uuid = e.engine_info['engineid'] | ||||
eid = self._engines[e_uuid] | ||||
e.engine_info['engineid'] = eid | ||||
MinRK
|
r3583 | self.results[msg_id] = e | ||
MinRK
|
r3539 | |||
def _flush_notifications(self): | ||||
MinRK
|
r3555 | """Flush notifications of engine registrations waiting | ||
in ZMQ queue.""" | ||||
msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3539 | while msg is not None: | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3539 | msg = msg[-1] | ||
msg_type = msg['msg_type'] | ||||
handler = self._notification_handlers.get(msg_type, None) | ||||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
MinRK
|
r3555 | msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | ||
MinRK
|
r3539 | |||
def _flush_results(self, sock): | ||||
MinRK
|
r3555 | """Flush task or queue results waiting in ZMQ queue.""" | ||
MinRK
|
r3539 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
while msg is not None: | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3539 | msg = msg[-1] | ||
msg_type = msg['msg_type'] | ||||
handler = self._queue_handlers.get(msg_type, None) | ||||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg.msg_type) | ||||
else: | ||||
handler(msg) | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3540 | def _flush_control(self, sock): | ||
MinRK
|
r3555 | """Flush replies from the control channel waiting | ||
MinRK
|
r3559 | in the ZMQ queue. | ||
Currently: ignore them.""" | ||||
MinRK
|
r3540 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||
while msg is not None: | ||||
if self.debug: | ||||
pprint(msg) | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3602 | def _flush_iopub(self, sock): | ||
"""Flush replies from the iopub channel waiting | ||||
in the ZMQ queue. | ||||
""" | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
while msg is not None: | ||||
if self.debug: | ||||
pprint(msg) | ||||
msg = msg[-1] | ||||
parent = msg['parent_header'] | ||||
msg_id = parent['msg_id'] | ||||
content = msg['content'] | ||||
header = msg['header'] | ||||
msg_type = msg['msg_type'] | ||||
# init metadata: | ||||
md = self.metadata.setdefault(msg_id, Metadata()) | ||||
if msg_type == 'stream': | ||||
name = content['name'] | ||||
s = md[name] or '' | ||||
md[name] = s + content['data'] | ||||
elif msg_type == 'pyerr': | ||||
md.update({'pyerr' : ss.unwrap_exception(content)}) | ||||
else: | ||||
md.update({msg_type : content['data']}) | ||||
self.metadata[msg_id] = md | ||||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# getitem | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
def __getitem__(self, key): | ||||
MinRK
|
r3559 | """Dict access returns DirectView multiplexer objects or, | ||
if key is None, a LoadBalancedView.""" | ||||
if key is None: | ||||
return LoadBalancedView(self) | ||||
MinRK
|
r3539 | if isinstance(key, int): | ||
if key not in self.ids: | ||||
raise IndexError("No such engine: %i"%key) | ||||
return DirectView(self, key) | ||||
if isinstance(key, slice): | ||||
indices = range(len(self.ids))[key] | ||||
ids = sorted(self._ids) | ||||
key = [ ids[i] for i in indices ] | ||||
# newkeys = sorted(self._ids)[thekeys[k]] | ||||
if isinstance(key, (tuple, list, xrange)): | ||||
_,targets = self._build_targets(list(key)) | ||||
return DirectView(self, targets) | ||||
else: | ||||
raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Begin public methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3587 | @property | ||
def remote(self): | ||||
"""property for convenient RemoteFunction generation. | ||||
>>> @client.remote | ||||
... def f(): | ||||
import os | ||||
print (os.getpid()) | ||||
""" | ||||
return remote(self, block=self.block) | ||||
MinRK
|
r3539 | def spin(self): | ||
MinRK
|
r3555 | """Flush any registration notifications and execution results | ||
waiting in the ZMQ queue. | ||||
""" | ||||
if self._notification_socket: | ||||
MinRK
|
r3539 | self._flush_notifications() | ||
MinRK
|
r3555 | if self._mux_socket: | ||
self._flush_results(self._mux_socket) | ||||
if self._task_socket: | ||||
self._flush_results(self._task_socket) | ||||
if self._control_socket: | ||||
self._flush_control(self._control_socket) | ||||
MinRK
|
r3602 | if self._iopub_socket: | ||
self._flush_iopub(self._iopub_socket) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | def barrier(self, msg_ids=None, timeout=-1): | ||
"""waits on one or more `msg_ids`, for up to `timeout` seconds. | ||||
MinRK
|
r3539 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3590 | msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects | ||
MinRK
|
r3555 | ints are indices to self.history | ||
strs are msg_ids | ||||
default: wait on all outstanding messages | ||||
timeout : float | ||||
a time in seconds, after which to give up. | ||||
default is -1, which means no timeout | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Returns | ||
------- | ||||
True : when all msg_ids are done | ||||
False : timeout reached, some msg_ids still outstanding | ||||
""" | ||||
tic = time.time() | ||||
if msg_ids is None: | ||||
theids = self.outstanding | ||||
else: | ||||
MinRK
|
r3590 | if isinstance(msg_ids, (int, str, AsyncResult)): | ||
MinRK
|
r3555 | msg_ids = [msg_ids] | ||
theids = set() | ||||
for msg_id in msg_ids: | ||||
if isinstance(msg_id, int): | ||||
msg_id = self.history[msg_id] | ||||
MinRK
|
r3590 | elif isinstance(msg_id, AsyncResult): | ||
MinRK
|
r3592 | map(theids.add, msg_id.msg_ids) | ||
MinRK
|
r3590 | continue | ||
MinRK
|
r3555 | theids.add(msg_id) | ||
MinRK
|
r3590 | if not theids.intersection(self.outstanding): | ||
return True | ||||
MinRK
|
r3555 | self.spin() | ||
while theids.intersection(self.outstanding): | ||||
if timeout >= 0 and ( time.time()-tic ) > timeout: | ||||
break | ||||
time.sleep(1e-3) | ||||
self.spin() | ||||
return len(theids.intersection(self.outstanding)) == 0 | ||||
#-------------------------------------------------------------------------- | ||||
# Control methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @spinfirst | ||
MinRK
|
r3540 | @defaultblock | ||
def clear(self, targets=None, block=None): | ||||
MinRK
|
r3555 | """Clear the namespace in target(s).""" | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3559 | self.session.send(self._control_socket, 'clear_request', content={}, ident=t) | ||
MinRK
|
r3540 | error = False | ||
if self.block: | ||||
for i in range(len(targets)): | ||||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3559 | error = ss.unwrap_exception(msg['content']) | ||
MinRK
|
r3540 | if error: | ||
return error | ||||
MinRK
|
r3539 | |||
@spinfirst | ||||
MinRK
|
r3540 | @defaultblock | ||
def abort(self, msg_ids = None, targets=None, block=None): | ||||
MinRK
|
r3555 | """Abort the execution queues of target(s).""" | ||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
if isinstance(msg_ids, basestring): | ||||
msg_ids = [msg_ids] | ||||
content = dict(msg_ids=msg_ids) | ||||
for t in targets: | ||||
MinRK
|
r3555 | self.session.send(self._control_socket, 'abort_request', | ||
MinRK
|
r3540 | content=content, ident=t) | ||
error = False | ||||
if self.block: | ||||
for i in range(len(targets)): | ||||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3559 | error = ss.unwrap_exception(msg['content']) | ||
MinRK
|
r3540 | if error: | ||
return error | ||||
MinRK
|
r3539 | |||
MinRK
|
r3540 | @spinfirst | ||
@defaultblock | ||||
MinRK
|
r3580 | def shutdown(self, targets=None, restart=False, controller=False, block=None): | ||
"""Terminates one or more engine processes, optionally including the controller.""" | ||||
if controller: | ||||
targets = 'all' | ||||
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | ||
for t in targets: | ||||
MinRK
|
r3575 | self.session.send(self._control_socket, 'shutdown_request', | ||
content={'restart':restart},ident=t) | ||||
MinRK
|
r3540 | error = False | ||
MinRK
|
r3580 | if block or controller: | ||
MinRK
|
r3540 | for i in range(len(targets)): | ||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | ||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
MinRK
|
r3559 | error = ss.unwrap_exception(msg['content']) | ||
MinRK
|
r3580 | |||
if controller: | ||||
time.sleep(0.25) | ||||
self.session.send(self._query_socket, 'shutdown_request') | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
if msg['content']['status'] != 'ok': | ||||
error = ss.unwrap_exception(msg['content']) | ||||
MinRK
|
r3540 | if error: | ||
MinRK
|
r3584 | raise error | ||
MinRK
|
r3555 | |||
#-------------------------------------------------------------------------- | ||||
# Execution methods | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | @defaultblock | ||
def execute(self, code, targets='all', block=None): | ||||
MinRK
|
r3555 | """Executes `code` on `targets` in blocking or nonblocking manner. | ||
MinRK
|
r3539 | |||
MinRK
|
r3584 | ``execute`` is always `bound` (affects engine namespace) | ||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
code : str | ||||
the code string to be executed | ||||
targets : int/str/list of ints/strs | ||||
the engines on which to execute | ||||
default : all | ||||
block : bool | ||||
MinRK
|
r3555 | whether or not to wait until done to return | ||
default: self.block | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3590 | result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True) | ||
MinRK
|
r3539 | return result | ||
def run(self, code, block=None): | ||||
MinRK
|
r3555 | """Runs `code` on an engine. | ||
MinRK
|
r3539 | |||
Calls to this are load-balanced. | ||||
MinRK
|
r3584 | ``run`` is never `bound` (no effect on engine namespace) | ||
MinRK
|
r3539 | Parameters | ||
---------- | ||||
code : str | ||||
the code string to be executed | ||||
block : bool | ||||
whether or not to wait until done | ||||
""" | ||||
MinRK
|
r3593 | result = self.apply(_execute, (code,), targets=None, block=block, bound=False) | ||
MinRK
|
r3539 | return result | ||
MinRK
|
r3583 | def _maybe_raise(self, result): | ||
"""wrapper for maybe raising an exception if apply failed.""" | ||||
if isinstance(result, error.RemoteError): | ||||
raise result | ||||
return result | ||||
MinRK
|
r3611 | |||
def _build_dependency(self, dep): | ||||
"""helper for building jsonable dependencies from various input forms""" | ||||
if isinstance(dep, Dependency): | ||||
return dep.as_dict() | ||||
elif isinstance(dep, AsyncResult): | ||||
return dep.msg_ids | ||||
elif dep is None: | ||||
return [] | ||||
elif isinstance(dep, set): | ||||
return list(dep) | ||||
elif isinstance(dep, (list,dict)): | ||||
return dep | ||||
elif isinstance(dep, str): | ||||
return [dep] | ||||
else: | ||||
raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep)) | ||||
MinRK
|
r3555 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, | ||
MinRK
|
r3611 | after=None, follow=None, timeout=None): | ||
MinRK
|
r3555 | """Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | ||
This is the central execution command for the client. | ||||
Parameters | ||||
---------- | ||||
f : function | ||||
The fuction to be called remotely | ||||
args : tuple/list | ||||
The positional arguments passed to `f` | ||||
kwargs : dict | ||||
The keyword arguments passed to `f` | ||||
bound : bool (default: True) | ||||
Whether to execute in the Engine(s) namespace, or in a clean | ||||
namespace not affecting the engine. | ||||
block : bool (default: self.block) | ||||
Whether to wait for the result, or return immediately. | ||||
False: | ||||
returns msg_id(s) | ||||
if multiple targets: | ||||
list of ids | ||||
True: | ||||
returns actual result(s) of f(*args, **kwargs) | ||||
if multiple targets: | ||||
dict of results, by engine ID | ||||
targets : int,list of ints, 'all', None | ||||
Specify the destination of the job. | ||||
if None: | ||||
Submit via Task queue for load-balancing. | ||||
if 'all': | ||||
Run on all active engines | ||||
if list: | ||||
Run on each specified engine | ||||
if int: | ||||
Run on single engine | ||||
after : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a time-based dependency. | ||||
This job will only be run *after* the dependencies | ||||
have been met. | ||||
follow : Dependency or collection of msg_ids | ||||
Only for load-balanced execution (targets=None) | ||||
Specify a list of msg_ids as a location-based dependency. | ||||
This job will only be run on an engine where this dependency | ||||
is met. | ||||
MinRK
|
r3611 | timeout : float or None | ||
Only for load-balanced execution (targets=None) | ||||
Specify an amount of time (in seconds) | ||||
MinRK
|
r3555 | Returns | ||
------- | ||||
if block is False: | ||||
if single target: | ||||
return msg_id | ||||
else: | ||||
return list of msg_ids | ||||
? (should this be dict like block=True) ? | ||||
else: | ||||
if single target: | ||||
return result of f(*args, **kwargs) | ||||
else: | ||||
return dict of results, keyed by engine | ||||
""" | ||||
# defaults: | ||||
block = block if block is not None else self.block | ||||
args = args if args is not None else [] | ||||
kwargs = kwargs if kwargs is not None else {} | ||||
# enforce types of f,args,kwrags | ||||
if not callable(f): | ||||
raise TypeError("f must be callable, not %s"%type(f)) | ||||
if not isinstance(args, (tuple, list)): | ||||
raise TypeError("args must be tuple or list, not %s"%type(args)) | ||||
if not isinstance(kwargs, dict): | ||||
raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | ||||
MinRK
|
r3611 | |||
after = self._build_dependency(after) | ||||
follow = self._build_dependency(follow) | ||||
options = dict(bound=bound, block=block) | ||||
MinRK
|
r3555 | |||
if targets is None: | ||||
MinRK
|
r3611 | return self._apply_balanced(f, args, kwargs, timeout=timeout, | ||
after=after, follow=follow, **options) | ||||
MinRK
|
r3555 | else: | ||
return self._apply_direct(f, args, kwargs, targets=targets, **options) | ||||
MinRK
|
r3548 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, | ||
MinRK
|
r3611 | after=None, follow=None, timeout=None): | ||
MinRK
|
r3555 | """The underlying method for applying functions in a load balanced | ||
manner, via the task queue.""" | ||||
MinRK
|
r3611 | subheader = dict(after=after, follow=follow, timeout=timeout) | ||
MinRK
|
r3539 | bufs = ss.pack_apply_message(f,args,kwargs) | ||
content = dict(bound=bound) | ||||
MinRK
|
r3593 | |||
MinRK
|
r3555 | msg = self.session.send(self._task_socket, "apply_request", | ||
MinRK
|
r3551 | content=content, buffers=bufs, subheader=subheader) | ||
MinRK
|
r3539 | msg_id = msg['msg_id'] | ||
self.outstanding.add(msg_id) | ||||
self.history.append(msg_id) | ||||
MinRK
|
r3596 | ar = AsyncResult(self, [msg_id], fname=f.__name__) | ||
MinRK
|
r3539 | if block: | ||
MinRK
|
r3596 | return ar.get() | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3596 | return ar | ||
MinRK
|
r3539 | |||
MinRK
|
r3611 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | ||
MinRK
|
r3555 | """Then underlying method for applying functions to specific engines | ||
via the MUX queue.""" | ||||
MinRK
|
r3548 | |||
MinRK
|
r3539 | queues,targets = self._build_targets(targets) | ||
MinRK
|
r3593 | |||
MinRK
|
r3548 | subheader = dict(after=after, follow=follow) | ||
MinRK
|
r3539 | content = dict(bound=bound) | ||
MinRK
|
r3593 | bufs = ss.pack_apply_message(f,args,kwargs) | ||
MinRK
|
r3539 | msg_ids = [] | ||
for queue in queues: | ||||
MinRK
|
r3555 | msg = self.session.send(self._mux_socket, "apply_request", | ||
MinRK
|
r3548 | content=content, buffers=bufs,ident=queue, subheader=subheader) | ||
MinRK
|
r3539 | msg_id = msg['msg_id'] | ||
self.outstanding.add(msg_id) | ||||
self.history.append(msg_id) | ||||
msg_ids.append(msg_id) | ||||
MinRK
|
r3596 | ar = AsyncResult(self, msg_ids, fname=f.__name__) | ||
MinRK
|
r3539 | if block: | ||
MinRK
|
r3596 | return ar.get() | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3596 | return ar | ||
MinRK
|
r3539 | |||
MinRK
|
r3589 | #-------------------------------------------------------------------------- | ||
# Map and decorators | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3588 | def map(self, f, *sequences): | ||
"""Parallel version of builtin `map`, using all our engines.""" | ||||
pf = ParallelFunction(self, f, block=self.block, | ||||
bound=True, targets='all') | ||||
return pf.map(*sequences) | ||||
MinRK
|
r3587 | |||
MinRK
|
r3589 | def parallel(self, bound=True, targets='all', block=True): | ||
MinRK
|
r3590 | """Decorator for making a ParallelFunction.""" | ||
MinRK
|
r3589 | return parallel(self, bound=bound, targets=targets, block=block) | ||
def remote(self, bound=True, targets='all', block=True): | ||||
MinRK
|
r3590 | """Decorator for making a RemoteFunction.""" | ||
MinRK
|
r3589 | return remote(self, bound=bound, targets=targets, block=block) | ||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Data movement | ||||
#-------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | @defaultblock | ||
MinRK
|
r3587 | def push(self, ns, targets='all', block=None): | ||
MinRK
|
r3555 | """Push the contents of `ns` into the namespace on `target`""" | ||
MinRK
|
r3539 | if not isinstance(ns, dict): | ||
raise TypeError("Must be a dict, not %s"%type(ns)) | ||||
MinRK
|
r3555 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True) | ||
MinRK
|
r3539 | return result | ||
MinRK
|
r3555 | @defaultblock | ||
MinRK
|
r3590 | def pull(self, keys, targets='all', block=None): | ||
MinRK
|
r3555 | """Pull objects from `target`'s namespace by `keys`""" | ||
if isinstance(keys, str): | ||||
pass | ||||
MinRK
|
r3581 | elif isinstance(keys, (list,tuple,set)): | ||
MinRK
|
r3555 | for key in keys: | ||
if not isinstance(key, str): | ||||
raise TypeError | ||||
MinRK
|
r3539 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | ||
return result | ||||
MinRK
|
r3587 | def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None): | ||
""" | ||||
Partition a Python sequence and send the partitions to a set of engines. | ||||
""" | ||||
MinRK
|
r3590 | block = block if block is not None else self.block | ||
MinRK
|
r3587 | targets = self._build_targets(targets)[-1] | ||
mapObject = Map.dists[dist]() | ||||
nparts = len(targets) | ||||
msg_ids = [] | ||||
for index, engineid in enumerate(targets): | ||||
partition = mapObject.getPartition(seq, index, nparts) | ||||
if flatten and len(partition) == 1: | ||||
MinRK
|
r3590 | r = self.push({key: partition[0]}, targets=engineid, block=False) | ||
MinRK
|
r3587 | else: | ||
MinRK
|
r3590 | r = self.push({key: partition}, targets=engineid, block=False) | ||
MinRK
|
r3592 | msg_ids.extend(r.msg_ids) | ||
MinRK
|
r3596 | r = AsyncResult(self, msg_ids, fname='scatter') | ||
MinRK
|
r3587 | if block: | ||
MinRK
|
r3590 | return r.get() | ||
MinRK
|
r3587 | else: | ||
return r | ||||
MinRK
|
r3590 | def gather(self, key, dist='b', targets='all', block=None): | ||
MinRK
|
r3587 | """ | ||
Gather a partitioned sequence on a set of engines as a single local seq. | ||||
""" | ||||
MinRK
|
r3590 | block = block if block is not None else self.block | ||
MinRK
|
r3587 | |||
targets = self._build_targets(targets)[-1] | ||||
mapObject = Map.dists[dist]() | ||||
msg_ids = [] | ||||
for index, engineid in enumerate(targets): | ||||
MinRK
|
r3592 | msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids) | ||
MinRK
|
r3587 | |||
MinRK
|
r3596 | r = AsyncMapResult(self, msg_ids, mapObject, fname='gather') | ||
MinRK
|
r3587 | if block: | ||
MinRK
|
r3590 | return r.get() | ||
MinRK
|
r3587 | else: | ||
return r | ||||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | ||
# Query methods | ||||
#-------------------------------------------------------------------------- | ||||
@spinfirst | ||||
def get_results(self, msg_ids, status_only=False): | ||||
"""Returns the result of the execute or task request with `msg_ids`. | ||||
MinRK
|
r3539 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3555 | msg_ids : list of ints or msg_ids | ||
if int: | ||||
Passed as index to self.history for convenience. | ||||
status_only : bool (default: False) | ||||
if False: | ||||
return the actual results | ||||
MinRK
|
r3598 | |||
Returns | ||||
------- | ||||
results : dict | ||||
There will always be the keys 'pending' and 'completed', which will | ||||
be lists of msg_ids. | ||||
MinRK
|
r3539 | """ | ||
if not isinstance(msg_ids, (list,tuple)): | ||||
msg_ids = [msg_ids] | ||||
theids = [] | ||||
for msg_id in msg_ids: | ||||
if isinstance(msg_id, int): | ||||
msg_id = self.history[msg_id] | ||||
MinRK
|
r3555 | if not isinstance(msg_id, str): | ||
raise TypeError("msg_ids must be str, not %r"%msg_id) | ||||
MinRK
|
r3539 | theids.append(msg_id) | ||
MinRK
|
r3555 | completed = [] | ||
local_results = {} | ||||
MinRK
|
r3598 | # temporarily disable local shortcut | ||
# for msg_id in list(theids): | ||||
# if msg_id in self.results: | ||||
# completed.append(msg_id) | ||||
# local_results[msg_id] = self.results[msg_id] | ||||
# theids.remove(msg_id) | ||||
MinRK
|
r3555 | |||
MinRK
|
r3559 | if theids: # some not locally cached | ||
MinRK
|
r3555 | content = dict(msg_ids=theids, status_only=status_only) | ||
msg = self.session.send(self._query_socket, "result_request", content=content) | ||||
zmq.select([self._query_socket], [], []) | ||||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise ss.unwrap_exception(content) | ||||
MinRK
|
r3598 | buffers = msg['buffers'] | ||
MinRK
|
r3555 | else: | ||
content = dict(completed=[],pending=[]) | ||||
MinRK
|
r3598 | |||
content['completed'].extend(completed) | ||||
if status_only: | ||||
return content | ||||
failures = [] | ||||
# load cached results into result: | ||||
content.update(local_results) | ||||
# update cache with results: | ||||
for msg_id in sorted(theids): | ||||
if msg_id in content['completed']: | ||||
rec = content[msg_id] | ||||
parent = rec['header'] | ||||
header = rec['result_header'] | ||||
rcontent = rec['result_content'] | ||||
MinRK
|
r3602 | iodict = rec['io'] | ||
MinRK
|
r3598 | if isinstance(rcontent, str): | ||
rcontent = self.session.unpack(rcontent) | ||||
MinRK
|
r3602 | md = self.metadata.setdefault(msg_id, Metadata()) | ||
md.update(self._extract_metadata(header, parent, rcontent)) | ||||
md.update(iodict) | ||||
MinRK
|
r3598 | |||
if rcontent['status'] == 'ok': | ||||
res,buffers = ss.unserialize_object(buffers) | ||||
else: | ||||
res = ss.unwrap_exception(rcontent) | ||||
failures.append(res) | ||||
self.results[msg_id] = res | ||||
content[msg_id] = res | ||||
error.collect_exceptions(failures, "get_results") | ||||
MinRK
|
r3555 | return content | ||
@spinfirst | ||||
def queue_status(self, targets=None, verbose=False): | ||||
"""Fetch the status of engine queues. | ||||
Parameters | ||||
---------- | ||||
targets : int/str/list of ints/strs | ||||
the engines on which to execute | ||||
default : all | ||||
verbose : bool | ||||
MinRK
|
r3584 | Whether to return lengths only, or lists of ids for each element | ||
MinRK
|
r3555 | """ | ||
targets = self._build_targets(targets)[1] | ||||
content = dict(targets=targets, verbose=verbose) | ||||
self.session.send(self._query_socket, "queue_request", content=content) | ||||
idents,msg = self.session.recv(self._query_socket, 0) | ||||
MinRK
|
r3540 | if self.debug: | ||
pprint(msg) | ||||
MinRK
|
r3555 | content = msg['content'] | ||
status = content.pop('status') | ||||
if status != 'ok': | ||||
raise ss.unwrap_exception(content) | ||||
MinRK
|
r3598 | return ss.rekey(content) | ||
MinRK
|
r3555 | |||
@spinfirst | ||||
def purge_results(self, msg_ids=[], targets=[]): | ||||
"""Tell the controller to forget results. | ||||
MinRK
|
r3539 | |||
MinRK
|
r3555 | Individual results can be purged by msg_id, or the entire | ||
MinRK
|
r3584 | history of specific targets can be purged. | ||
MinRK
|
r3555 | |||
Parameters | ||||
---------- | ||||
MinRK
|
r3584 | msg_ids : str or list of strs | ||
the msg_ids whose results should be forgotten. | ||||
MinRK
|
r3555 | targets : int/str/list of ints/strs | ||
MinRK
|
r3584 | The targets, by uuid or int_id, whose entire history is to be purged. | ||
Use `targets='all'` to scrub everything from the controller's memory. | ||||
MinRK
|
r3555 | default : None | ||
""" | ||||
if not targets and not msg_ids: | ||||
raise ValueError | ||||
if targets: | ||||
targets = self._build_targets(targets)[1] | ||||
content = dict(targets=targets, msg_ids=msg_ids) | ||||
self.session.send(self._query_socket, "purge_request", content=content) | ||||
idents, msg = self.session.recv(self._query_socket, 0) | ||||
if self.debug: | ||||
pprint(msg) | ||||
content = msg['content'] | ||||
if content['status'] != 'ok': | ||||
raise ss.unwrap_exception(content) | ||||
MinRK
|
r3548 | |||
MinRK
|
r3590 | #---------------------------------------- | ||
# activate for %px,%autopx magics | ||||
#---------------------------------------- | ||||
def activate(self): | ||||
"""Make this `View` active for parallel magic commands. | ||||
IPython has a magic command syntax to work with `MultiEngineClient` objects. | ||||
In a given IPython session there is a single active one. While | ||||
there can be many `Views` created and used by the user, | ||||
there is only one active one. The active `View` is used whenever | ||||
the magic commands %px and %autopx are used. | ||||
The activate() method is called on a given `View` to make it | ||||
active. Once this has been done, the magic commands can be used. | ||||
""" | ||||
try: | ||||
# This is injected into __builtins__. | ||||
ip = get_ipython() | ||||
except NameError: | ||||
print "The IPython parallel magics (%result, %px, %autopx) only work within IPython." | ||||
else: | ||||
pmagic = ip.plugin_manager.get_plugin('parallelmagic') | ||||
if pmagic is not None: | ||||
pmagic.active_multiengine_client = self | ||||
else: | ||||
print "You must first load the parallelmagic extension " \ | ||||
"by doing '%load_ext parallelmagic'" | ||||
MinRK
|
r3548 | class AsynClient(Client): | ||
MinRK
|
r3555 | """An Asynchronous client, using the Tornado Event Loop. | ||
!!!unfinished!!!""" | ||||
MinRK
|
r3548 | io_loop = None | ||
MinRK
|
r3555 | _queue_stream = None | ||
_notifier_stream = None | ||||
_task_stream = None | ||||
_control_stream = None | ||||
MinRK
|
r3548 | |||
def __init__(self, addr, context=None, username=None, debug=False, io_loop=None): | ||||
Client.__init__(self, addr, context, username, debug) | ||||
if io_loop is None: | ||||
io_loop = ioloop.IOLoop.instance() | ||||
self.io_loop = io_loop | ||||
MinRK
|
r3555 | self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop) | ||
self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop) | ||||
self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop) | ||||
self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3548 | def spin(self): | ||
for stream in (self.queue_stream, self.notifier_stream, | ||||
self.task_stream, self.control_stream): | ||||
stream.flush() | ||||
MinRK
|
r3587 | |||
__all__ = [ 'Client', | ||||
'depend', | ||||
'require', | ||||
'remote', | ||||
'parallel', | ||||
'RemoteFunction', | ||||
'ParallelFunction', | ||||
'DirectView', | ||||
'LoadBalancedView', | ||||
MinRK
|
r3589 | 'AsyncResult', | ||
'AsyncMapResult' | ||||
MinRK
|
r3587 | ] | ||