From fe4b48c0abe4d21be34076dc81f9984271821dbf 2011-04-08 00:38:09
From: MinRK <benjaminrk@gmail.com>
Date: 2011-04-08 00:38:09
Subject: [PATCH] basic LoadBalancedView, RemoteFunction

---

diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py
index fced919..1bac888 100644
--- a/IPython/zmq/parallel/client.py
+++ b/IPython/zmq/parallel/client.py
@@ -21,8 +21,8 @@ from zmq.eventloop import ioloop, zmqstream
 from IPython.external.decorator import decorator
 
 import streamsession as ss
-from remotenamespace import RemoteNamespace
-from view import DirectView
+# from remotenamespace import RemoteNamespace
+from view import DirectView, LoadBalancedView
 from dependency import Dependency, depend, require
 
 def _push(ns):
@@ -31,8 +31,13 @@ def _push(ns):
 def _pull(keys):
     g = globals()
     if isinstance(keys, (list,tuple, set)):
+        for key in keys:
+            if not g.has_key(key):
+                raise NameError("name '%s' is not defined"%key)
         return map(g.get, keys)
     else:
+        if not g.has_key(keys):
+            raise NameError("name '%s' is not defined"%keys)
         return g.get(keys)
 
 def _clear():
@@ -62,10 +67,35 @@ def defaultblock(f, self, *args, **kwargs):
     self.block = saveblock
     return ret
 
+def remote(client, block=None, targets=None):
+    """Turn a function into a remote function.
+    
+    This method can be used for map:
+    
+    >>> @remote(client,block=True)
+        def func(a)
+    """
+    def remote_function(f):
+        return RemoteFunction(client, f, block, targets)
+    return remote_function
+
 #--------------------------------------------------------------------------
 # Classes
 #--------------------------------------------------------------------------
 
+class RemoteFunction(object):
+    """Turn an existing function into a remote function"""
+    
+    def __init__(self, client, f, block=None, targets=None):
+        self.client = client
+        self.func = f
+        self.block=block
+        self.targets=targets
+    
+    def __call__(self, *args, **kwargs):
+        return self.client.apply(self.func, args=args, kwargs=kwargs,
+                block=self.block, targets=self.targets)
+    
 
 class AbortedTask(object):
     """A basic wrapper object describing an aborted task."""
@@ -84,7 +114,7 @@ class Client(object):
     Parameters
     ----------
     
-    addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101
+    addr : bytes; zmq url, e.g. 'tcp://127.0.0.1:10101'
         The address of the controller's registration socket.
     
     
@@ -281,7 +311,8 @@ class Client(object):
         elif content['status'] == 'aborted':
             self.results[msg_id] = AbortedTask(msg_id)
         elif content['status'] == 'resubmitted':
-            pass # handle resubmission
+            # TODO: handle resubmission
+            pass
         else:
             self.results[msg_id] = ss.unwrap_exception(content)
     
@@ -318,7 +349,9 @@ class Client(object):
     
     def _flush_control(self, sock):
         """Flush replies from the control channel waiting
-        in the ZMQ queue."""
+        in the ZMQ queue.
+        
+        Currently: ignore them."""
         msg = self.session.recv(sock, mode=zmq.NOBLOCK)
         while msg is not None:
             if self.debug:
@@ -330,7 +363,10 @@ class Client(object):
     #--------------------------------------------------------------------------
     
     def __getitem__(self, key):
-        """Dict access returns DirectView multiplexer objects."""
+        """Dict access returns DirectView multiplexer objects or,
+        if key is None, a LoadBalancedView."""
+        if key is None:
+            return LoadBalancedView(self)
         if isinstance(key, int):
             if key not in self.ids:
                 raise IndexError("No such engine: %i"%key)
@@ -412,7 +448,7 @@ class Client(object):
         """Clear the namespace in target(s)."""
         targets = self._build_targets(targets)[0]
         for t in targets:
-            self.session.send(self._control_socket, 'clear_request', content={},ident=t)
+            self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
         error = False
         if self.block:
             for i in range(len(targets)):
@@ -420,7 +456,7 @@ class Client(object):
                 if self.debug:
                     pprint(msg)
                 if msg['content']['status'] != 'ok':
-                    error = msg['content']
+                    error = ss.unwrap_exception(msg['content'])
         if error:
             return error
         
@@ -443,7 +479,7 @@ class Client(object):
                 if self.debug:
                     pprint(msg)
                 if msg['content']['status'] != 'ok':
-                    error = msg['content']
+                    error = ss.unwrap_exception(msg['content'])
         if error:
             return error
     
@@ -461,7 +497,7 @@ class Client(object):
                 if self.debug:
                     pprint(msg)
                 if msg['content']['status'] != 'ok':
-                    error = msg['content']
+                    error = ss.unwrap_exception(msg['content'])
         if error:
             return error
     
@@ -719,7 +755,7 @@ class Client(object):
                 local_results[msg_id] = self.results[msg_id]
                 theids.remove(msg_id)
         
-        if msg_ids: # some not locally cached
+        if theids: # some not locally cached
             content = dict(msg_ids=theids, status_only=status_only)
             msg = self.session.send(self._query_socket, "result_request", content=content)
             zmq.select([self._query_socket], [], [])
diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py
index 1ba90d9..222f2c5 100644
--- a/IPython/zmq/parallel/view.py
+++ b/IPython/zmq/parallel/view.py
@@ -42,13 +42,15 @@ class View(object):
     _targets = None
     _ntargets = None
     block=None
+    bound=None
     history=None
     
-    def __init__(self, client, targets):
+    def __init__(self, client, targets=None):
         self.client = client
         self._targets = targets
-        self._ntargets = 1 if isinstance(targets, int) else len(targets)
+        self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
         self.block = client.block
+        self.bound=True
         self.history = []
         self.outstanding = set()
         self.results = {}
@@ -84,7 +86,7 @@ class View(object):
         else:
             returns actual result of f(*args, **kwargs)
         """
-        return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
+        return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
 
     @save_ids
     def apply_async(self, f, *args, **kwargs):
@@ -147,6 +149,29 @@ class View(object):
         
         """
         return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
+    
+    def abort(self, msg_ids=None, block=None):
+        """Abort jobs on my engines.
+        
+        Parameters
+        ----------
+        
+        msg_ids : None, str, list of strs, optional
+            if None: abort all jobs.
+            else: abort specific msg_id(s).
+        """
+        block = block if block is not None else self.block
+        return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
+
+    def queue_status(self, verbose=False):
+        """Fetch the Queue status of my engines"""
+        return self.client.queue_status(targets=self.targets, verbose=verbose)
+    
+    def purge_results(self, msg_ids=[],targets=[]):
+        """Instruct the controller to forget specific results."""
+        if targets is None or targets == 'all':
+            targets = self.targets
+        return self.client.purge_results(msg_ids=msg_ids, targets=targets)
 
 
 class DirectView(View):
@@ -156,45 +181,40 @@ class DirectView(View):
         """update remote namespace with dict `ns`"""
         return self.client.push(ns, targets=self.targets, block=self.block)
     
+    push = update
+    
     def get(self, key_s):
         """get object(s) by `key_s` from remote namespace
         will return one object if it is a key.
         It also takes a list of keys, and will return a list of objects."""
         # block = block if block is not None else self.block
-        return self.client.pull(key_s, block=self.block, targets=self.targets)
+        return self.client.pull(key_s, block=True, targets=self.targets)
     
-    push = update
-    pull = get
+    def pull(self, key_s, block=True):
+        """get object(s) by `key_s` from remote namespace
+        will return one object if it is a key.
+        It also takes a list of keys, and will return a list of objects."""
+        block = block if block is not None else self.block
+        return self.client.pull(key_s, block=block, targets=self.targets)
     
     def __getitem__(self, key):
         return self.get(key)
     
-    def __setitem__(self,key,value):
+    def __setitem__(self,key, value):
         self.update({key:value})
     
     def clear(self, block=False):
         """Clear the remote namespaces on my engines."""
         block = block if block is not None else self.block
-        return self.client.clear(targets=self.targets,block=block)
+        return self.client.clear(targets=self.targets, block=block)
     
     def kill(self, block=True):
         """Kill my engines."""
         block = block if block is not None else self.block
-        return self.client.kill(targets=self.targets,block=block)
+        return self.client.kill(targets=self.targets, block=block)
     
-    def abort(self, msg_ids=None, block=None):
-        """Abort jobs on my engines.
-        
-        Parameters
-        ----------
-        
-        msg_ids : None, str, list of strs, optional
-            if None: abort all jobs.
-            else: abort specific msg_id(s).
-        """
-        block = block if block is not None else self.block
-        return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
-
 class LoadBalancedView(View):
-    _targets=None
+    def __repr__(self):
+        return "<%s %s>"%(self.__class__.__name__, self.client._addr)
+    
     
\ No newline at end of file