##// END OF EJS Templates
adapt kernel/error.py to zmq, improve error propagation.
adapt kernel/error.py to zmq, improve error propagation.

File last commit:

r3559:fe4b48c0
r3583:b0d94c76
Show More
view.py
219 lines | 6.9 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""Views"""
from IPython.external.decorator import decorator
@decorator
def myblock(f, self, *args, **kwargs):
block = self.client.block
self.client.block = self.block
ret = f(self, *args, **kwargs)
self.client.block = block
return ret
@decorator
def save_ids(f, self, *args, **kwargs):
ret = f(self, *args, **kwargs)
msg_ids = self.client.history[-self._ntargets:]
self.history.extend(msg_ids)
map(self.outstanding.add, msg_ids)
return ret
@decorator
def sync_results(f, self, *args, **kwargs):
ret = f(self, *args, **kwargs)
delta = self.outstanding.difference(self.client.outstanding)
completed = self.outstanding.intersection(delta)
self.outstanding = self.outstanding.difference(completed)
for msg_id in completed:
self.results[msg_id] = self.client.results[msg_id]
return ret
@decorator
def spin_after(f, self, *args, **kwargs):
ret = f(self, *args, **kwargs)
self.spin()
return ret
class View(object):
"""Base View class"""
_targets = None
_ntargets = None
block=None
bound=None
history=None
def __init__(self, client, targets=None):
self.client = client
self._targets = targets
self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
self.block = client.block
self.bound=True
self.history = []
self.outstanding = set()
self.results = {}
def __repr__(self):
strtargets = str(self._targets)
if len(strtargets) > 16:
strtargets = strtargets[:12]+'...]'
return "<%s %s>"%(self.__class__.__name__, strtargets)
@property
def targets(self):
return self._targets
@targets.setter
def targets(self, value):
raise TypeError("Cannot set my targets argument after construction!")
@sync_results
def spin(self):
"""spin the client, and sync"""
self.client.spin()
@sync_results
@save_ids
def apply(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines, returning the result.
This method does not involve the engine's namespace.
if self.block is False:
returns msg_id
else:
returns actual result of f(*args, **kwargs)
"""
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
@save_ids
def apply_async(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
This method does not involve the engine's namespace.
returns msg_id
"""
return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
@spin_after
@save_ids
def apply_sync(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
returning the result.
This method does not involve the engine's namespace.
returns: actual result of f(*args, **kwargs)
"""
return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
@sync_results
@save_ids
def apply_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s).
if self.block is False:
returns msg_id
else:
returns actual result of f(*args, **kwargs)
This method has access to the targets' globals
"""
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
@sync_results
@save_ids
def apply_async_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s)
in a nonblocking manner.
returns: msg_id
This method has access to the targets' globals
"""
return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
@spin_after
@save_ids
def apply_sync_bound(self, f, *args, **kwargs):
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
returns: actual result of f(*args, **kwargs)
This method has access to the targets' globals
"""
return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
def abort(self, msg_ids=None, block=None):
"""Abort jobs on my engines.
Parameters
----------
msg_ids : None, str, list of strs, optional
if None: abort all jobs.
else: abort specific msg_id(s).
"""
block = block if block is not None else self.block
return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
def queue_status(self, verbose=False):
"""Fetch the Queue status of my engines"""
return self.client.queue_status(targets=self.targets, verbose=verbose)
def purge_results(self, msg_ids=[],targets=[]):
"""Instruct the controller to forget specific results."""
if targets is None or targets == 'all':
targets = self.targets
return self.client.purge_results(msg_ids=msg_ids, targets=targets)
class DirectView(View):
"""Direct Multiplexer View"""
def update(self, ns):
"""update remote namespace with dict `ns`"""
return self.client.push(ns, targets=self.targets, block=self.block)
push = update
def get(self, key_s):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
# block = block if block is not None else self.block
return self.client.pull(key_s, block=True, targets=self.targets)
def pull(self, key_s, block=True):
"""get object(s) by `key_s` from remote namespace
will return one object if it is a key.
It also takes a list of keys, and will return a list of objects."""
block = block if block is not None else self.block
return self.client.pull(key_s, block=block, targets=self.targets)
def __getitem__(self, key):
return self.get(key)
def __setitem__(self,key, value):
self.update({key:value})
def clear(self, block=False):
"""Clear the remote namespaces on my engines."""
block = block if block is not None else self.block
return self.client.clear(targets=self.targets, block=block)
def kill(self, block=True):
"""Kill my engines."""
block = block if block is not None else self.block
return self.client.kill(targets=self.targets, block=block)
class LoadBalancedView(View):
def __repr__(self):
return "<%s %s>"%(self.__class__.__name__, self.client._addr)