diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index fced919..1bac888 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -21,8 +21,8 @@ from zmq.eventloop import ioloop, zmqstream from IPython.external.decorator import decorator import streamsession as ss -from remotenamespace import RemoteNamespace -from view import DirectView +# from remotenamespace import RemoteNamespace +from view import DirectView, LoadBalancedView from dependency import Dependency, depend, require def _push(ns): @@ -31,8 +31,13 @@ def _push(ns): def _pull(keys): g = globals() if isinstance(keys, (list,tuple, set)): + for key in keys: + if not g.has_key(key): + raise NameError("name '%s' is not defined"%key) return map(g.get, keys) else: + if not g.has_key(keys): + raise NameError("name '%s' is not defined"%keys) return g.get(keys) def _clear(): @@ -62,10 +67,35 @@ def defaultblock(f, self, *args, **kwargs): self.block = saveblock return ret +def remote(client, block=None, targets=None): + """Turn a function into a remote function. + + This method can be used for map: + + >>> @remote(client,block=True) + def func(a) + """ + def remote_function(f): + return RemoteFunction(client, f, block, targets) + return remote_function + #-------------------------------------------------------------------------- # Classes #-------------------------------------------------------------------------- +class RemoteFunction(object): + """Turn an existing function into a remote function""" + + def __init__(self, client, f, block=None, targets=None): + self.client = client + self.func = f + self.block=block + self.targets=targets + + def __call__(self, *args, **kwargs): + return self.client.apply(self.func, args=args, kwargs=kwargs, + block=self.block, targets=self.targets) + class AbortedTask(object): """A basic wrapper object describing an aborted task.""" @@ -84,7 +114,7 @@ class Client(object): Parameters ---------- - addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101 + addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101' The address of the controller's registration socket. @@ -281,7 +311,8 @@ class Client(object): elif content['status'] == 'aborted': self.results[msg_id] = AbortedTask(msg_id) elif content['status'] == 'resubmitted': - pass # handle resubmission + # TODO: handle resubmission + pass else: self.results[msg_id] = ss.unwrap_exception(content) @@ -318,7 +349,9 @@ class Client(object): def _flush_control(self, sock): """Flush replies from the control channel waiting - in the ZMQ queue.""" + in the ZMQ queue. + + Currently: ignore them.""" msg = self.session.recv(sock, mode=zmq.NOBLOCK) while msg is not None: if self.debug: @@ -330,7 +363,10 @@ class Client(object): #-------------------------------------------------------------------------- def __getitem__(self, key): - """Dict access returns DirectView multiplexer objects.""" + """Dict access returns DirectView multiplexer objects or, + if key is None, a LoadBalancedView.""" + if key is None: + return LoadBalancedView(self) if isinstance(key, int): if key not in self.ids: raise IndexError("No such engine: %i"%key) @@ -412,7 +448,7 @@ class Client(object): """Clear the namespace in target(s).""" targets = self._build_targets(targets)[0] for t in targets: - self.session.send(self._control_socket, 'clear_request', content={},ident=t) + self.session.send(self._control_socket, 'clear_request', content={}, ident=t) error = False if self.block: for i in range(len(targets)): @@ -420,7 +456,7 @@ class Client(object): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = msg['content'] + error = ss.unwrap_exception(msg['content']) if error: return error @@ -443,7 +479,7 @@ class Client(object): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = msg['content'] + error = ss.unwrap_exception(msg['content']) if error: return error @@ -461,7 +497,7 @@ class Client(object): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = msg['content'] + error = ss.unwrap_exception(msg['content']) if error: return error @@ -719,7 +755,7 @@ class Client(object): local_results[msg_id] = self.results[msg_id] theids.remove(msg_id) - if msg_ids: # some not locally cached + if theids: # some not locally cached content = dict(msg_ids=theids, status_only=status_only) msg = self.session.send(self._query_socket, "result_request", content=content) zmq.select([self._query_socket], [], []) diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 1ba90d9..222f2c5 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -42,13 +42,15 @@ class View(object): _targets = None _ntargets = None block=None + bound=None history=None - def __init__(self, client, targets): + def __init__(self, client, targets=None): self.client = client self._targets = targets - self._ntargets = 1 if isinstance(targets, int) else len(targets) + self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets) self.block = client.block + self.bound=True self.history = [] self.outstanding = set() self.results = {} @@ -84,7 +86,7 @@ class View(object): else: returns actual result of f(*args, **kwargs) """ - return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) + return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound) @save_ids def apply_async(self, f, *args, **kwargs): @@ -147,6 +149,29 @@ class View(object): """ return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) + + def abort(self, msg_ids=None, block=None): + """Abort jobs on my engines. + + Parameters + ---------- + + msg_ids : None, str, list of strs, optional + if None: abort all jobs. + else: abort specific msg_id(s). + """ + block = block if block is not None else self.block + return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) + + def queue_status(self, verbose=False): + """Fetch the Queue status of my engines""" + return self.client.queue_status(targets=self.targets, verbose=verbose) + + def purge_results(self, msg_ids=[],targets=[]): + """Instruct the controller to forget specific results.""" + if targets is None or targets == 'all': + targets = self.targets + return self.client.purge_results(msg_ids=msg_ids, targets=targets) class DirectView(View): @@ -156,45 +181,40 @@ class DirectView(View): """update remote namespace with dict `ns`""" return self.client.push(ns, targets=self.targets, block=self.block) + push = update + def get(self, key_s): """get object(s) by `key_s` from remote namespace will return one object if it is a key. It also takes a list of keys, and will return a list of objects.""" # block = block if block is not None else self.block - return self.client.pull(key_s, block=self.block, targets=self.targets) + return self.client.pull(key_s, block=True, targets=self.targets) - push = update - pull = get + def pull(self, key_s, block=True): + """get object(s) by `key_s` from remote namespace + will return one object if it is a key. + It also takes a list of keys, and will return a list of objects.""" + block = block if block is not None else self.block + return self.client.pull(key_s, block=block, targets=self.targets) def __getitem__(self, key): return self.get(key) - def __setitem__(self,key,value): + def __setitem__(self,key, value): self.update({key:value}) def clear(self, block=False): """Clear the remote namespaces on my engines.""" block = block if block is not None else self.block - return self.client.clear(targets=self.targets,block=block) + return self.client.clear(targets=self.targets, block=block) def kill(self, block=True): """Kill my engines.""" block = block if block is not None else self.block - return self.client.kill(targets=self.targets,block=block) + return self.client.kill(targets=self.targets, block=block) - def abort(self, msg_ids=None, block=None): - """Abort jobs on my engines. - - Parameters - ---------- - - msg_ids : None, str, list of strs, optional - if None: abort all jobs. - else: abort specific msg_id(s). - """ - block = block if block is not None else self.block - return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) - class LoadBalancedView(View): - _targets=None + def __repr__(self): + return "<%s %s>"%(self.__class__.__name__, self.client._addr) + \ No newline at end of file