|
|
#!/usr/bin/env python
|
|
|
"""Views"""
|
|
|
|
|
|
from IPython.external.decorator import decorator
|
|
|
|
|
|
|
|
|
@decorator
|
|
|
def myblock(f, self, *args, **kwargs):
|
|
|
block = self.client.block
|
|
|
self.client.block = self.block
|
|
|
ret = f(self, *args, **kwargs)
|
|
|
self.client.block = block
|
|
|
return ret
|
|
|
|
|
|
@decorator
|
|
|
def save_ids(f, self, *args, **kwargs):
|
|
|
ret = f(self, *args, **kwargs)
|
|
|
msg_ids = self.client.history[-self._ntargets:]
|
|
|
self.history.extend(msg_ids)
|
|
|
map(self.outstanding.add, msg_ids)
|
|
|
return ret
|
|
|
|
|
|
@decorator
|
|
|
def sync_results(f, self, *args, **kwargs):
|
|
|
ret = f(self, *args, **kwargs)
|
|
|
delta = self.outstanding.difference(self.client.outstanding)
|
|
|
completed = self.outstanding.intersection(delta)
|
|
|
self.outstanding = self.outstanding.difference(completed)
|
|
|
for msg_id in completed:
|
|
|
self.results[msg_id] = self.client.results[msg_id]
|
|
|
return ret
|
|
|
|
|
|
@decorator
|
|
|
def spin_after(f, self, *args, **kwargs):
|
|
|
ret = f(self, *args, **kwargs)
|
|
|
self.spin()
|
|
|
return ret
|
|
|
|
|
|
|
|
|
class View(object):
|
|
|
"""Base View class"""
|
|
|
_targets = None
|
|
|
_ntargets = None
|
|
|
block=None
|
|
|
history=None
|
|
|
|
|
|
def __init__(self, client, targets):
|
|
|
self.client = client
|
|
|
self._targets = targets
|
|
|
self._ntargets = 1 if isinstance(targets, int) else len(targets)
|
|
|
self.block = client.block
|
|
|
self.history = []
|
|
|
self.outstanding = set()
|
|
|
self.results = {}
|
|
|
|
|
|
def __repr__(self):
|
|
|
strtargets = str(self._targets)
|
|
|
if len(strtargets) > 16:
|
|
|
strtargets = strtargets[:12]+'...]'
|
|
|
return "<%s %s>"%(self.__class__.__name__, strtargets)
|
|
|
|
|
|
@property
|
|
|
def targets(self):
|
|
|
return self._targets
|
|
|
|
|
|
@targets.setter
|
|
|
def targets(self, value):
|
|
|
raise TypeError("Cannot set my targets argument after construction!")
|
|
|
|
|
|
@sync_results
|
|
|
def spin(self):
|
|
|
"""spin the client, and sync"""
|
|
|
self.client.spin()
|
|
|
|
|
|
@sync_results
|
|
|
@save_ids
|
|
|
def apply(self, f, *args, **kwargs):
|
|
|
"""calls f(*args, **kwargs) on remote engines, returning the result.
|
|
|
|
|
|
This method does not involve the engine's namespace.
|
|
|
|
|
|
if self.block is False:
|
|
|
returns msg_id
|
|
|
else:
|
|
|
returns actual result of f(*args, **kwargs)
|
|
|
"""
|
|
|
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
|
|
|
|
|
|
@save_ids
|
|
|
def apply_async(self, f, *args, **kwargs):
|
|
|
"""calls f(*args, **kwargs) on remote engines in a nonblocking manner.
|
|
|
|
|
|
This method does not involve the engine's namespace.
|
|
|
|
|
|
returns msg_id
|
|
|
"""
|
|
|
return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
|
|
|
|
|
|
@spin_after
|
|
|
@save_ids
|
|
|
def apply_sync(self, f, *args, **kwargs):
|
|
|
"""calls f(*args, **kwargs) on remote engines in a blocking manner,
|
|
|
returning the result.
|
|
|
|
|
|
This method does not involve the engine's namespace.
|
|
|
|
|
|
returns: actual result of f(*args, **kwargs)
|
|
|
"""
|
|
|
return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
|
|
|
|
|
|
@sync_results
|
|
|
@save_ids
|
|
|
def apply_bound(self, f, *args, **kwargs):
|
|
|
"""calls f(*args, **kwargs) bound to engine namespace(s).
|
|
|
|
|
|
if self.block is False:
|
|
|
returns msg_id
|
|
|
else:
|
|
|
returns actual result of f(*args, **kwargs)
|
|
|
|
|
|
This method has access to the targets' globals
|
|
|
|
|
|
"""
|
|
|
return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
|
|
|
|
|
|
@sync_results
|
|
|
@save_ids
|
|
|
def apply_async_bound(self, f, *args, **kwargs):
|
|
|
"""calls f(*args, **kwargs) bound to engine namespace(s)
|
|
|
in a nonblocking manner.
|
|
|
|
|
|
returns: msg_id
|
|
|
|
|
|
This method has access to the targets' globals
|
|
|
|
|
|
"""
|
|
|
return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
|
|
|
|
|
|
@spin_after
|
|
|
@save_ids
|
|
|
def apply_sync_bound(self, f, *args, **kwargs):
|
|
|
"""calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
|
|
|
|
|
|
returns: actual result of f(*args, **kwargs)
|
|
|
|
|
|
This method has access to the targets' globals
|
|
|
|
|
|
"""
|
|
|
return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
|
|
|
|
|
|
|
|
|
class DirectView(View):
|
|
|
"""Direct Multiplexer View"""
|
|
|
|
|
|
def update(self, ns):
|
|
|
"""update remote namespace with dict `ns`"""
|
|
|
return self.client.push(ns, targets=self.targets, block=self.block)
|
|
|
|
|
|
def get(self, key_s):
|
|
|
"""get object(s) by `key_s` from remote namespace
|
|
|
will return one object if it is a key.
|
|
|
It also takes a list of keys, and will return a list of objects."""
|
|
|
# block = block if block is not None else self.block
|
|
|
return self.client.pull(key_s, block=self.block, targets=self.targets)
|
|
|
|
|
|
push = update
|
|
|
pull = get
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
return self.get(key)
|
|
|
|
|
|
def __setitem__(self,key,value):
|
|
|
self.update({key:value})
|
|
|
|
|
|
def clear(self, block=False):
|
|
|
"""Clear the remote namespaces on my engines."""
|
|
|
block = block if block is not None else self.block
|
|
|
return self.client.clear(targets=self.targets,block=block)
|
|
|
|
|
|
def kill(self, block=True):
|
|
|
"""Kill my engines."""
|
|
|
block = block if block is not None else self.block
|
|
|
return self.client.kill(targets=self.targets,block=block)
|
|
|
|
|
|
def abort(self, msg_ids=None, block=None):
|
|
|
"""Abort jobs on my engines.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
|
|
|
msg_ids : None, str, list of strs, optional
|
|
|
if None: abort all jobs.
|
|
|
else: abort specific msg_id(s).
|
|
|
"""
|
|
|
block = block if block is not None else self.block
|
|
|
return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
|
|
|
|
|
|
class LoadBalancedView(View):
|
|
|
_targets=None
|
|
|
|