client.py
817 lines
| 29.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | """A semi-synchronous Client for the ZMQ controller""" | |
MinRK
|
r3551 | #----------------------------------------------------------------------------- | |
# Copyright (C) 2010 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
MinRK
|
r3551 | #----------------------------------------------------------------------------- | |
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
MinRK
|
r3553 | from __future__ import print_function | |
MinRK
|
r3551 | import time | |
MinRK
|
r3540 | from pprint import pprint | |
MinRK
|
r3551 | import zmq | |
from zmq.eventloop import ioloop, zmqstream | |||
MinRK
|
r3539 | from IPython.external.decorator import decorator | |
import streamsession as ss | |||
from remotenamespace import RemoteNamespace | |||
from view import DirectView | |||
MinRK
|
r3548 | from dependency import Dependency, depend, require | |
MinRK
|
r3539 | ||
def _push(ns): | |||
globals().update(ns) | |||
def _pull(keys): | |||
g = globals() | |||
MinRK
|
r3555 | if isinstance(keys, (list,tuple, set)): | |
MinRK
|
r3539 | return map(g.get, keys) | |
else: | |||
return g.get(keys) | |||
def _clear(): | |||
globals().clear() | |||
def execute(code): | |||
exec code in globals() | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# Decorators for Client methods | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | @decorator | |
MinRK
|
r3555 | def spinfirst(f, self, *args, **kwargs): | |
"""Call spin() to sync state prior to calling the method.""" | |||
MinRK
|
r3539 | self.spin() | |
return f(self, *args, **kwargs) | |||
@decorator | |||
def defaultblock(f, self, *args, **kwargs): | |||
MinRK
|
r3555 | """Default to self.block; preserve self.block.""" | |
MinRK
|
r3539 | block = kwargs.get('block',None) | |
block = self.block if block is None else block | |||
saveblock = self.block | |||
self.block = block | |||
ret = f(self, *args, **kwargs) | |||
self.block = saveblock | |||
return ret | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# Classes | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3540 | class AbortedTask(object): | |
MinRK
|
r3555 | """A basic wrapper object describing an aborted task.""" | |
MinRK
|
r3540 | def __init__(self, msg_id): | |
self.msg_id = msg_id | |||
MinRK
|
r3539 | ||
MinRK
|
r3555 | class ControllerError(Exception): | |
def __init__(self, etype, evalue, tb): | |||
self.etype = etype | |||
self.evalue = evalue | |||
self.traceback=tb | |||
MinRK
|
r3539 | class Client(object): | |
"""A semi-synchronous client to the IPython ZMQ controller | |||
MinRK
|
r3555 | Parameters | |
---------- | |||
addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101 | |||
The address of the controller's registration socket. | |||
MinRK
|
r3539 | Attributes | |
---------- | |||
MinRK
|
r3555 | ids : set of int engine IDs | |
MinRK
|
r3539 | requesting the ids attribute always synchronizes | |
the registration state. To request ids without synchronization, | |||
MinRK
|
r3555 | use semi-private _ids. | |
MinRK
|
r3539 | ||
history : list of msg_ids | |||
a list of msg_ids, keeping track of all the execution | |||
MinRK
|
r3555 | messages you have submitted in order. | |
MinRK
|
r3539 | ||
outstanding : set of msg_ids | |||
a set of msg_ids that have been submitted, but whose | |||
MinRK
|
r3555 | results have not yet been received. | |
MinRK
|
r3539 | ||
results : dict | |||
a dict of all our results, keyed by msg_id | |||
block : bool | |||
determines default behavior when block not specified | |||
in execution methods | |||
Methods | |||
------- | |||
spin : flushes incoming results and registration state changes | |||
control methods spin, and requesting `ids` also ensures up to date | |||
barrier : wait on one or more msg_ids | |||
MinRK
|
r3555 | execution methods: apply/apply_bound/apply_to/applu_bount | |
MinRK
|
r3539 | legacy: execute, run | |
MinRK
|
r3555 | query methods: queue_status, get_result, purge | |
MinRK
|
r3540 | ||
control methods: abort, kill | |||
MinRK
|
r3539 | """ | |
_connected=False | |||
_engines=None | |||
MinRK
|
r3555 | _addr='tcp://127.0.0.1:10101' | |
_registration_socket=None | |||
_query_socket=None | |||
_control_socket=None | |||
_notification_socket=None | |||
_mux_socket=None | |||
_task_socket=None | |||
MinRK
|
r3539 | block = False | |
outstanding=None | |||
results = None | |||
history = None | |||
MinRK
|
r3540 | debug = False | |
MinRK
|
r3539 | ||
MinRK
|
r3555 | def __init__(self, addr='tcp://127.0.0.1:10101', context=None, username=None, debug=False): | |
MinRK
|
r3539 | if context is None: | |
context = zmq.Context() | |||
self.context = context | |||
MinRK
|
r3555 | self._addr = addr | |
MinRK
|
r3539 | if username is None: | |
self.session = ss.StreamSession() | |||
else: | |||
self.session = ss.StreamSession(username) | |||
MinRK
|
r3555 | self._registration_socket = self.context.socket(zmq.PAIR) | |
self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
self._registration_socket.connect(addr) | |||
MinRK
|
r3539 | self._engines = {} | |
self._ids = set() | |||
self.outstanding=set() | |||
self.results = {} | |||
self.history = [] | |||
MinRK
|
r3540 | self.debug = debug | |
self.session.debug = debug | |||
MinRK
|
r3539 | ||
self._notification_handlers = {'registration_notification' : self._register_engine, | |||
'unregistration_notification' : self._unregister_engine, | |||
} | |||
self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | |||
'apply_reply' : self._handle_apply_reply} | |||
MinRK
|
r3548 | self._connect() | |
MinRK
|
r3539 | ||
@property | |||
def ids(self): | |||
MinRK
|
r3555 | """Always up to date ids property.""" | |
MinRK
|
r3539 | self._flush_notifications() | |
return self._ids | |||
def _update_engines(self, engines): | |||
MinRK
|
r3555 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | |
MinRK
|
r3539 | for k,v in engines.iteritems(): | |
eid = int(k) | |||
MinRK
|
r3540 | self._engines[eid] = bytes(v) # force not unicode | |
MinRK
|
r3539 | self._ids.add(eid) | |
def _build_targets(self, targets): | |||
MinRK
|
r3555 | """Turn valid target IDs or 'all' into two lists: | |
(int_ids, uuids). | |||
""" | |||
MinRK
|
r3539 | if targets is None: | |
targets = self._ids | |||
elif isinstance(targets, str): | |||
if targets.lower() == 'all': | |||
targets = self._ids | |||
else: | |||
raise TypeError("%r not valid str target, must be 'all'"%(targets)) | |||
elif isinstance(targets, int): | |||
targets = [targets] | |||
return [self._engines[t] for t in targets], list(targets) | |||
def _connect(self): | |||
MinRK
|
r3555 | """setup all our socket connections to the controller. This is called from | |
__init__.""" | |||
MinRK
|
r3539 | if self._connected: | |
return | |||
self._connected=True | |||
MinRK
|
r3555 | self.session.send(self._registration_socket, 'connection_request') | |
idents,msg = self.session.recv(self._registration_socket,mode=0) | |||
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
MinRK
|
r3539 | msg = ss.Message(msg) | |
content = msg.content | |||
if content.status == 'ok': | |||
if content.queue: | |||
MinRK
|
r3555 | self._mux_socket = self.context.socket(zmq.PAIR) | |
self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
self._mux_socket.connect(content.queue) | |||
MinRK
|
r3539 | if content.task: | |
MinRK
|
r3555 | self._task_socket = self.context.socket(zmq.PAIR) | |
self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
self._task_socket.connect(content.task) | |||
MinRK
|
r3539 | if content.notification: | |
MinRK
|
r3555 | self._notification_socket = self.context.socket(zmq.SUB) | |
self._notification_socket.connect(content.notification) | |||
self._notification_socket.setsockopt(zmq.SUBSCRIBE, "") | |||
MinRK
|
r3540 | if content.query: | |
MinRK
|
r3555 | self._query_socket = self.context.socket(zmq.PAIR) | |
self._query_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
self._query_socket.connect(content.query) | |||
MinRK
|
r3540 | if content.control: | |
MinRK
|
r3555 | self._control_socket = self.context.socket(zmq.PAIR) | |
self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
self._control_socket.connect(content.control) | |||
MinRK
|
r3539 | self._update_engines(dict(content.engines)) | |
else: | |||
self._connected = False | |||
raise Exception("Failed to connect!") | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# handlers and callbacks for incoming messages | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | def _register_engine(self, msg): | |
MinRK
|
r3555 | """Register a new engine, and update our connection info.""" | |
MinRK
|
r3539 | content = msg['content'] | |
eid = content['id'] | |||
d = {eid : content['queue']} | |||
self._update_engines(d) | |||
self._ids.add(int(eid)) | |||
def _unregister_engine(self, msg): | |||
MinRK
|
r3555 | """Unregister an engine that has died.""" | |
MinRK
|
r3539 | content = msg['content'] | |
eid = int(content['id']) | |||
if eid in self._ids: | |||
self._ids.remove(eid) | |||
self._engines.pop(eid) | |||
def _handle_execute_reply(self, msg): | |||
MinRK
|
r3555 | """Save the reply to an execute_request into our results.""" | |
MinRK
|
r3539 | parent = msg['parent_header'] | |
msg_id = parent['msg_id'] | |||
if msg_id not in self.outstanding: | |||
MinRK
|
r3553 | print("got unknown result: %s"%msg_id) | |
MinRK
|
r3539 | else: | |
self.outstanding.remove(msg_id) | |||
self.results[msg_id] = ss.unwrap_exception(msg['content']) | |||
def _handle_apply_reply(self, msg): | |||
MinRK
|
r3555 | """Save the reply to an apply_request into our results.""" | |
MinRK
|
r3539 | parent = msg['parent_header'] | |
msg_id = parent['msg_id'] | |||
if msg_id not in self.outstanding: | |||
MinRK
|
r3553 | print ("got unknown result: %s"%msg_id) | |
MinRK
|
r3539 | else: | |
self.outstanding.remove(msg_id) | |||
content = msg['content'] | |||
if content['status'] == 'ok': | |||
self.results[msg_id] = ss.unserialize_object(msg['buffers']) | |||
MinRK
|
r3540 | elif content['status'] == 'aborted': | |
self.results[msg_id] = AbortedTask(msg_id) | |||
elif content['status'] == 'resubmitted': | |||
pass # handle resubmission | |||
MinRK
|
r3539 | else: | |
self.results[msg_id] = ss.unwrap_exception(content) | |||
def _flush_notifications(self): | |||
MinRK
|
r3555 | """Flush notifications of engine registrations waiting | |
in ZMQ queue.""" | |||
msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | |||
MinRK
|
r3539 | while msg is not None: | |
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
MinRK
|
r3539 | msg = msg[-1] | |
msg_type = msg['msg_type'] | |||
handler = self._notification_handlers.get(msg_type, None) | |||
if handler is None: | |||
raise Exception("Unhandled message type: %s"%msg.msg_type) | |||
else: | |||
handler(msg) | |||
MinRK
|
r3555 | msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK) | |
MinRK
|
r3539 | ||
def _flush_results(self, sock): | |||
MinRK
|
r3555 | """Flush task or queue results waiting in ZMQ queue.""" | |
MinRK
|
r3539 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
while msg is not None: | |||
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
MinRK
|
r3539 | msg = msg[-1] | |
msg_type = msg['msg_type'] | |||
handler = self._queue_handlers.get(msg_type, None) | |||
if handler is None: | |||
raise Exception("Unhandled message type: %s"%msg.msg_type) | |||
else: | |||
handler(msg) | |||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
MinRK
|
r3540 | def _flush_control(self, sock): | |
MinRK
|
r3555 | """Flush replies from the control channel waiting | |
in the ZMQ queue.""" | |||
MinRK
|
r3540 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
while msg is not None: | |||
if self.debug: | |||
pprint(msg) | |||
msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# getitem | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
def __getitem__(self, key): | |||
MinRK
|
r3555 | """Dict access returns DirectView multiplexer objects.""" | |
MinRK
|
r3539 | if isinstance(key, int): | |
if key not in self.ids: | |||
raise IndexError("No such engine: %i"%key) | |||
return DirectView(self, key) | |||
if isinstance(key, slice): | |||
indices = range(len(self.ids))[key] | |||
ids = sorted(self._ids) | |||
key = [ ids[i] for i in indices ] | |||
# newkeys = sorted(self._ids)[thekeys[k]] | |||
if isinstance(key, (tuple, list, xrange)): | |||
_,targets = self._build_targets(list(key)) | |||
return DirectView(self, targets) | |||
else: | |||
raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# Begin public methods | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
def spin(self): | |||
MinRK
|
r3555 | """Flush any registration notifications and execution results | |
waiting in the ZMQ queue. | |||
""" | |||
if self._notification_socket: | |||
MinRK
|
r3539 | self._flush_notifications() | |
MinRK
|
r3555 | if self._mux_socket: | |
self._flush_results(self._mux_socket) | |||
if self._task_socket: | |||
self._flush_results(self._task_socket) | |||
if self._control_socket: | |||
self._flush_control(self._control_socket) | |||
MinRK
|
r3539 | ||
MinRK
|
r3555 | def barrier(self, msg_ids=None, timeout=-1): | |
"""waits on one or more `msg_ids`, for up to `timeout` seconds. | |||
MinRK
|
r3539 | ||
Parameters | |||
---------- | |||
MinRK
|
r3555 | msg_ids : int, str, or list of ints and/or strs | |
ints are indices to self.history | |||
strs are msg_ids | |||
default: wait on all outstanding messages | |||
timeout : float | |||
a time in seconds, after which to give up. | |||
default is -1, which means no timeout | |||
MinRK
|
r3539 | ||
MinRK
|
r3555 | Returns | |
------- | |||
True : when all msg_ids are done | |||
False : timeout reached, some msg_ids still outstanding | |||
""" | |||
tic = time.time() | |||
if msg_ids is None: | |||
theids = self.outstanding | |||
else: | |||
if isinstance(msg_ids, (int, str)): | |||
msg_ids = [msg_ids] | |||
theids = set() | |||
for msg_id in msg_ids: | |||
if isinstance(msg_id, int): | |||
msg_id = self.history[msg_id] | |||
theids.add(msg_id) | |||
self.spin() | |||
while theids.intersection(self.outstanding): | |||
if timeout >= 0 and ( time.time()-tic ) > timeout: | |||
break | |||
time.sleep(1e-3) | |||
self.spin() | |||
return len(theids.intersection(self.outstanding)) == 0 | |||
#-------------------------------------------------------------------------- | |||
# Control methods | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | @spinfirst | |
MinRK
|
r3540 | @defaultblock | |
def clear(self, targets=None, block=None): | |||
MinRK
|
r3555 | """Clear the namespace in target(s).""" | |
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | |
for t in targets: | |||
MinRK
|
r3555 | self.session.send(self._control_socket, 'clear_request', content={},ident=t) | |
MinRK
|
r3540 | error = False | |
if self.block: | |||
for i in range(len(targets)): | |||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | |
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
if msg['content']['status'] != 'ok': | |||
error = msg['content'] | |||
if error: | |||
return error | |||
MinRK
|
r3539 | ||
@spinfirst | |||
MinRK
|
r3540 | @defaultblock | |
def abort(self, msg_ids = None, targets=None, block=None): | |||
MinRK
|
r3555 | """Abort the execution queues of target(s).""" | |
MinRK
|
r3540 | targets = self._build_targets(targets)[0] | |
if isinstance(msg_ids, basestring): | |||
msg_ids = [msg_ids] | |||
content = dict(msg_ids=msg_ids) | |||
for t in targets: | |||
MinRK
|
r3555 | self.session.send(self._control_socket, 'abort_request', | |
MinRK
|
r3540 | content=content, ident=t) | |
error = False | |||
if self.block: | |||
for i in range(len(targets)): | |||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | |
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
if msg['content']['status'] != 'ok': | |||
error = msg['content'] | |||
if error: | |||
return error | |||
MinRK
|
r3539 | ||
MinRK
|
r3540 | @spinfirst | |
@defaultblock | |||
def kill(self, targets=None, block=None): | |||
"""Terminates one or more engine processes.""" | |||
targets = self._build_targets(targets)[0] | |||
for t in targets: | |||
MinRK
|
r3555 | self.session.send(self._control_socket, 'kill_request', content={},ident=t) | |
MinRK
|
r3540 | error = False | |
if self.block: | |||
for i in range(len(targets)): | |||
MinRK
|
r3555 | idents,msg = self.session.recv(self._control_socket,0) | |
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
if msg['content']['status'] != 'ok': | |||
error = msg['content'] | |||
if error: | |||
return error | |||
MinRK
|
r3555 | ||
#-------------------------------------------------------------------------- | |||
# Execution methods | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | @defaultblock | |
def execute(self, code, targets='all', block=None): | |||
MinRK
|
r3555 | """Executes `code` on `targets` in blocking or nonblocking manner. | |
MinRK
|
r3539 | ||
Parameters | |||
---------- | |||
code : str | |||
the code string to be executed | |||
targets : int/str/list of ints/strs | |||
the engines on which to execute | |||
default : all | |||
block : bool | |||
MinRK
|
r3555 | whether or not to wait until done to return | |
default: self.block | |||
MinRK
|
r3539 | """ | |
# block = self.block if block is None else block | |||
# saveblock = self.block | |||
# self.block = block | |||
result = self.apply(execute, (code,), targets=targets, block=block, bound=True) | |||
# self.block = saveblock | |||
return result | |||
def run(self, code, block=None): | |||
MinRK
|
r3555 | """Runs `code` on an engine. | |
MinRK
|
r3539 | ||
Calls to this are load-balanced. | |||
Parameters | |||
---------- | |||
code : str | |||
the code string to be executed | |||
block : bool | |||
whether or not to wait until done | |||
""" | |||
result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |||
return result | |||
MinRK
|
r3555 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None, | |
after=None, follow=None): | |||
"""Call `f(*args, **kwargs)` on a remote engine(s), returning the result. | |||
This is the central execution command for the client. | |||
Parameters | |||
---------- | |||
f : function | |||
The fuction to be called remotely | |||
args : tuple/list | |||
The positional arguments passed to `f` | |||
kwargs : dict | |||
The keyword arguments passed to `f` | |||
bound : bool (default: True) | |||
Whether to execute in the Engine(s) namespace, or in a clean | |||
namespace not affecting the engine. | |||
block : bool (default: self.block) | |||
Whether to wait for the result, or return immediately. | |||
False: | |||
returns msg_id(s) | |||
if multiple targets: | |||
list of ids | |||
True: | |||
returns actual result(s) of f(*args, **kwargs) | |||
if multiple targets: | |||
dict of results, by engine ID | |||
targets : int,list of ints, 'all', None | |||
Specify the destination of the job. | |||
if None: | |||
Submit via Task queue for load-balancing. | |||
if 'all': | |||
Run on all active engines | |||
if list: | |||
Run on each specified engine | |||
if int: | |||
Run on single engine | |||
after : Dependency or collection of msg_ids | |||
Only for load-balanced execution (targets=None) | |||
Specify a list of msg_ids as a time-based dependency. | |||
This job will only be run *after* the dependencies | |||
have been met. | |||
follow : Dependency or collection of msg_ids | |||
Only for load-balanced execution (targets=None) | |||
Specify a list of msg_ids as a location-based dependency. | |||
This job will only be run on an engine where this dependency | |||
is met. | |||
Returns | |||
------- | |||
if block is False: | |||
if single target: | |||
return msg_id | |||
else: | |||
return list of msg_ids | |||
? (should this be dict like block=True) ? | |||
else: | |||
if single target: | |||
return result of f(*args, **kwargs) | |||
else: | |||
return dict of results, keyed by engine | |||
""" | |||
# defaults: | |||
block = block if block is not None else self.block | |||
args = args if args is not None else [] | |||
kwargs = kwargs if kwargs is not None else {} | |||
# enforce types of f,args,kwrags | |||
if not callable(f): | |||
raise TypeError("f must be callable, not %s"%type(f)) | |||
if not isinstance(args, (tuple, list)): | |||
raise TypeError("args must be tuple or list, not %s"%type(args)) | |||
if not isinstance(kwargs, dict): | |||
raise TypeError("kwargs must be dict, not %s"%type(kwargs)) | |||
options = dict(bound=bound, block=block, after=after, follow=follow) | |||
if targets is None: | |||
return self._apply_balanced(f, args, kwargs, **options) | |||
else: | |||
return self._apply_direct(f, args, kwargs, targets=targets, **options) | |||
MinRK
|
r3548 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None, | |
after=None, follow=None): | |||
MinRK
|
r3555 | """The underlying method for applying functions in a load balanced | |
manner, via the task queue.""" | |||
MinRK
|
r3551 | if isinstance(after, Dependency): | |
after = after.as_dict() | |||
elif after is None: | |||
after = [] | |||
if isinstance(follow, Dependency): | |||
follow = follow.as_dict() | |||
elif follow is None: | |||
follow = [] | |||
subheader = dict(after=after, follow=follow) | |||
MinRK
|
r3539 | ||
bufs = ss.pack_apply_message(f,args,kwargs) | |||
content = dict(bound=bound) | |||
MinRK
|
r3555 | msg = self.session.send(self._task_socket, "apply_request", | |
MinRK
|
r3551 | content=content, buffers=bufs, subheader=subheader) | |
MinRK
|
r3539 | msg_id = msg['msg_id'] | |
self.outstanding.add(msg_id) | |||
self.history.append(msg_id) | |||
if block: | |||
self.barrier(msg_id) | |||
return self.results[msg_id] | |||
else: | |||
return msg_id | |||
MinRK
|
r3548 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None, | |
after=None, follow=None): | |||
MinRK
|
r3555 | """Then underlying method for applying functions to specific engines | |
via the MUX queue.""" | |||
MinRK
|
r3548 | ||
MinRK
|
r3539 | queues,targets = self._build_targets(targets) | |
bufs = ss.pack_apply_message(f,args,kwargs) | |||
MinRK
|
r3548 | if isinstance(after, Dependency): | |
after = after.as_dict() | |||
elif after is None: | |||
after = [] | |||
if isinstance(follow, Dependency): | |||
follow = follow.as_dict() | |||
elif follow is None: | |||
follow = [] | |||
subheader = dict(after=after, follow=follow) | |||
MinRK
|
r3539 | content = dict(bound=bound) | |
msg_ids = [] | |||
for queue in queues: | |||
MinRK
|
r3555 | msg = self.session.send(self._mux_socket, "apply_request", | |
MinRK
|
r3548 | content=content, buffers=bufs,ident=queue, subheader=subheader) | |
MinRK
|
r3539 | msg_id = msg['msg_id'] | |
self.outstanding.add(msg_id) | |||
self.history.append(msg_id) | |||
msg_ids.append(msg_id) | |||
if block: | |||
self.barrier(msg_ids) | |||
else: | |||
if len(msg_ids) == 1: | |||
return msg_ids[0] | |||
else: | |||
return msg_ids | |||
if len(msg_ids) == 1: | |||
return self.results[msg_ids[0]] | |||
else: | |||
result = {} | |||
for target,mid in zip(targets, msg_ids): | |||
result[target] = self.results[mid] | |||
return result | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# Data movement | |||
#-------------------------------------------------------------------------- | |||
MinRK
|
r3539 | ||
MinRK
|
r3555 | @defaultblock | |
MinRK
|
r3539 | def push(self, ns, targets=None, block=None): | |
MinRK
|
r3555 | """Push the contents of `ns` into the namespace on `target`""" | |
MinRK
|
r3539 | if not isinstance(ns, dict): | |
raise TypeError("Must be a dict, not %s"%type(ns)) | |||
MinRK
|
r3555 | result = self.apply(_push, (ns,), targets=targets, block=block, bound=True) | |
MinRK
|
r3539 | return result | |
MinRK
|
r3555 | @defaultblock | |
MinRK
|
r3539 | def pull(self, keys, targets=None, block=True): | |
MinRK
|
r3555 | """Pull objects from `target`'s namespace by `keys`""" | |
if isinstance(keys, str): | |||
pass | |||
elif isistance(keys, (list,tuple,set)): | |||
for key in keys: | |||
if not isinstance(key, str): | |||
raise TypeError | |||
MinRK
|
r3539 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | |
return result | |||
MinRK
|
r3555 | #-------------------------------------------------------------------------- | |
# Query methods | |||
#-------------------------------------------------------------------------- | |||
@spinfirst | |||
def get_results(self, msg_ids, status_only=False): | |||
"""Returns the result of the execute or task request with `msg_ids`. | |||
MinRK
|
r3539 | ||
Parameters | |||
---------- | |||
MinRK
|
r3555 | msg_ids : list of ints or msg_ids | |
if int: | |||
Passed as index to self.history for convenience. | |||
status_only : bool (default: False) | |||
if False: | |||
return the actual results | |||
MinRK
|
r3539 | """ | |
if not isinstance(msg_ids, (list,tuple)): | |||
msg_ids = [msg_ids] | |||
theids = [] | |||
for msg_id in msg_ids: | |||
if isinstance(msg_id, int): | |||
msg_id = self.history[msg_id] | |||
MinRK
|
r3555 | if not isinstance(msg_id, str): | |
raise TypeError("msg_ids must be str, not %r"%msg_id) | |||
MinRK
|
r3539 | theids.append(msg_id) | |
MinRK
|
r3555 | completed = [] | |
local_results = {} | |||
for msg_id in list(theids): | |||
if msg_id in self.results: | |||
completed.append(msg_id) | |||
local_results[msg_id] = self.results[msg_id] | |||
theids.remove(msg_id) | |||
if msg_ids: # some not locally cached | |||
content = dict(msg_ids=theids, status_only=status_only) | |||
msg = self.session.send(self._query_socket, "result_request", content=content) | |||
zmq.select([self._query_socket], [], []) | |||
idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK) | |||
if self.debug: | |||
pprint(msg) | |||
content = msg['content'] | |||
if content['status'] != 'ok': | |||
raise ss.unwrap_exception(content) | |||
else: | |||
content = dict(completed=[],pending=[]) | |||
if not status_only: | |||
# load cached results into result: | |||
content['completed'].extend(completed) | |||
content.update(local_results) | |||
# update cache with results: | |||
for msg_id in msg_ids: | |||
if msg_id in content['completed']: | |||
self.results[msg_id] = content[msg_id] | |||
return content | |||
@spinfirst | |||
def queue_status(self, targets=None, verbose=False): | |||
"""Fetch the status of engine queues. | |||
Parameters | |||
---------- | |||
targets : int/str/list of ints/strs | |||
the engines on which to execute | |||
default : all | |||
verbose : bool | |||
whether to return lengths only, or lists of ids for each element | |||
""" | |||
targets = self._build_targets(targets)[1] | |||
content = dict(targets=targets, verbose=verbose) | |||
self.session.send(self._query_socket, "queue_request", content=content) | |||
idents,msg = self.session.recv(self._query_socket, 0) | |||
MinRK
|
r3540 | if self.debug: | |
pprint(msg) | |||
MinRK
|
r3555 | content = msg['content'] | |
status = content.pop('status') | |||
if status != 'ok': | |||
raise ss.unwrap_exception(content) | |||
return content | |||
@spinfirst | |||
def purge_results(self, msg_ids=[], targets=[]): | |||
"""Tell the controller to forget results. | |||
MinRK
|
r3539 | ||
MinRK
|
r3555 | Individual results can be purged by msg_id, or the entire | |
history of specific targets can | |||
Parameters | |||
---------- | |||
targets : int/str/list of ints/strs | |||
the targets | |||
default : None | |||
""" | |||
if not targets and not msg_ids: | |||
raise ValueError | |||
if targets: | |||
targets = self._build_targets(targets)[1] | |||
content = dict(targets=targets, msg_ids=msg_ids) | |||
self.session.send(self._query_socket, "purge_request", content=content) | |||
idents, msg = self.session.recv(self._query_socket, 0) | |||
if self.debug: | |||
pprint(msg) | |||
content = msg['content'] | |||
if content['status'] != 'ok': | |||
raise ss.unwrap_exception(content) | |||
MinRK
|
r3548 | ||
class AsynClient(Client): | |||
MinRK
|
r3555 | """An Asynchronous client, using the Tornado Event Loop. | |
!!!unfinished!!!""" | |||
MinRK
|
r3548 | io_loop = None | |
MinRK
|
r3555 | _queue_stream = None | |
_notifier_stream = None | |||
_task_stream = None | |||
_control_stream = None | |||
MinRK
|
r3548 | ||
def __init__(self, addr, context=None, username=None, debug=False, io_loop=None): | |||
Client.__init__(self, addr, context, username, debug) | |||
if io_loop is None: | |||
io_loop = ioloop.IOLoop.instance() | |||
self.io_loop = io_loop | |||
MinRK
|
r3555 | self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop) | |
self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop) | |||
self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop) | |||
self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop) | |||
MinRK
|
r3539 | ||
MinRK
|
r3548 | def spin(self): | |
for stream in (self.queue_stream, self.notifier_stream, | |||
self.task_stream, self.control_stream): | |||
stream.flush() | |||
MinRK
|
r3539 |