From 5297768283fa59abed435bcf60a138ce355c807c 2011-04-08 00:38:07
From: MinRK <benjaminrk@gmail.com>
Date: 2011-04-08 00:38:07
Subject: [PATCH] added dependency decorator

---

diff --git a/IPython/zmq/parallel/dependency.py b/IPython/zmq/parallel/dependency.py
new file mode 100644
index 0000000..4d24111
--- /dev/null
+++ b/IPython/zmq/parallel/dependency.py
@@ -0,0 +1,66 @@
+"""Dependency utilities"""
+
+from IPython.external.decorator import decorator
+
+# flags
+ALL = 1 << 0
+ANY = 1 << 1
+HERE = 1 << 2
+ANYWHERE = 1 << 3
+
+class UnmetDependency(Exception):
+    pass
+
+class depend2(object):
+    """dependency decorator"""
+    def __init__(self, f, *args, **kwargs):
+        print "Inside __init__()"
+        self.dependency = (f,args,kwargs)
+    
+    def __call__(self, f, *args, **kwargs):
+        f._dependency = self.dependency
+        return decorator(_depend_wrapper, f)
+    
+class depend(object):
+    """Dependency decorator, for use with tasks."""
+    def __init__(self, f, *args, **kwargs):
+        print "Inside __init__()"
+        self.f = f
+        self.args = args
+        self.kwargs = kwargs
+    
+    def __call__(self, f):
+        return dependent(f, self.f, *self.args, **self.kwargs)
+
+class dependent(object):
+    """A function that depends on another function.
+    This is an object to prevent the closure used
+    in traditional decorators, which are not picklable.
+    """
+
+    def __init__(self, f, df, *dargs, **dkwargs):
+        self.f = f
+        self.func_name = self.f.func_name
+        self.df = df
+        self.dargs = dargs
+        self.dkwargs = dkwargs
+
+    def __call__(self, *args, **kwargs):
+        if self.df(*self.dargs, **self.dkwargs) is False:
+            raise UnmetDependency()
+        return self.f(*args, **kwargs)
+
+def evaluate_dependency(deps):
+    """Evaluate wheter dependencies are met.
+    
+    Parameters
+    ----------
+    deps : dict
+    """
+    pass
+
+def _check_dependency(flag):
+    pass
+
+
+__all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies']
\ No newline at end of file
diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py
index 57adf9b..e9fa5ba 100755
--- a/IPython/zmq/parallel/streamkernel.py
+++ b/IPython/zmq/parallel/streamkernel.py
@@ -16,9 +16,11 @@ from code import CommandCompiler
 import zmq
 from zmq.eventloop import ioloop, zmqstream
 
+from IPython.zmq.completer import KernelCompleter
+
 from streamsession import StreamSession, Message, extract_header, serialize_object,\
                 unpack_apply_message
-from IPython.zmq.completer import KernelCompleter
+from dependency import UnmetDependency
 
 def printer(*args):
     pprint(args)
@@ -155,7 +157,7 @@ class Kernel(object):
         for msg_type in ['execute_request', 'complete_request', 'apply_request']:
             self.queue_handlers[msg_type] = getattr(self, msg_type)
         
-        for msg_type in ['kill_request', 'abort_request']:
+        for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
             self.control_handlers[msg_type] = getattr(self, msg_type)
 
     #-------------------- control handlers -----------------------------
@@ -273,13 +275,6 @@ class Kernel(object):
     def check_aborted(self, msg_id):
         return msg_id in self.aborted
     
-    def unmet_dependencies(self, stream, idents, msg):
-        reply_type = msg['msg_type'].split('_')[0] + '_reply'
-        content = dict(status='resubmitted', reason='unmet dependencies')
-        reply_msg = self.session.send(stream, reply_type, 
-                    content=content, parent=msg, ident=idents)
-        ### TODO: actually resubmit it ###
-    
     #-------------------- queue handlers -----------------------------
     
     def execute_request(self, stream, ident, parent):
@@ -344,6 +339,7 @@ class Kernel(object):
         # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
         # self.pub_stream.send(pyin_msg)
         # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
+        sub = {'dependencies_met' : True}
         try:
             # allow for not overriding displayhook
             if hasattr(sys.displayhook, 'set_parent'):
@@ -393,15 +389,19 @@ class Kernel(object):
             self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
             reply_content = exc_content
             result_buf = []
+            
+            if etype is UnmetDependency:
+                sub = {'dependencies_met' : False}
         else:
             reply_content = {'status' : 'ok'}
         # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
         # self.reply_socket.send(ident, zmq.SNDMORE)
         # self.reply_socket.send_json(reply_msg)
-        reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
+        reply_msg = self.session.send(stream, u'apply_reply', reply_content, 
+                    parent=parent, ident=ident,buffers=result_buf, subheader=sub)
         # print>>sys.__stdout__, Message(reply_msg)
-        if reply_msg['content']['status'] == u'error':
-            self.abort_queues()
+        # if reply_msg['content']['status'] == u'error':
+        #     self.abort_queues()
     
     def dispatch_queue(self, stream, msg):
         self.control_stream.flush()
@@ -410,7 +410,6 @@ class Kernel(object):
         
         header = msg['header']
         msg_id = header['msg_id']
-        dependencies = header.get('dependencies', [])
         if self.check_aborted(msg_id):
             self.aborted.remove(msg_id)
             # is it safe to assume a msg_id will not be resubmitted?
@@ -418,8 +417,6 @@ class Kernel(object):
             reply_msg = self.session.send(stream, reply_type, 
                         content={'status' : 'aborted'}, parent=msg, ident=idents)
             return
-        if not self.check_dependencies(dependencies):
-            return self.unmet_dependencies(stream, idents, msg)
         handler = self.queue_handlers.get(msg['msg_type'], None)
         if handler is None:
             print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py
index d98b5fe..ff02b12 100644
--- a/IPython/zmq/parallel/streamsession.py
+++ b/IPython/zmq/parallel/streamsession.py
@@ -230,7 +230,7 @@ def unpack_apply_message(bufs, g=None, copy=True):
     sargs = list(pickle.loads(bufs.pop(0)))
     skwargs = dict(pickle.loads(bufs.pop(0)))
     # print sargs, skwargs
-    f = cf.getFunction(g)
+    f = uncan(cf, g)
     for sa in sargs:
         if sa.data is None:
             m = bufs.pop(0)
diff --git a/IPython/zmq/pickleutil.py b/IPython/zmq/pickleutil.py
index 0191056..a52fc69 100644
--- a/IPython/zmq/pickleutil.py
+++ b/IPython/zmq/pickleutil.py
@@ -19,10 +19,25 @@ from types import FunctionType
 
 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
 from IPython.kernel import codeutil
+from IPython.zmq.parallel.dependency import dependent
 
 class CannedObject(object):
-    pass
+    def __init__(self, obj, keys=[]):
+        self.keys = keys
+        self.obj = obj
+        for key in keys:
+            setattr(obj, key, can(getattr(obj, key)))
+            
     
+    def getObject(self, g=None):
+        if g is None:
+            g = globals()
+        for key in self.keys:
+            setattr(self.obj, key, uncan(getattr(self.obj, key), g))
+        return self.obj
+
+        
+
 class CannedFunction(CannedObject):
     
     def __init__(self, f):
@@ -41,6 +56,9 @@ class CannedFunction(CannedObject):
 def can(obj):
     if isinstance(obj, FunctionType):
         return CannedFunction(obj)
+    elif isinstance(obj, dependent):
+        keys = ('f','df')
+        return CannedObject(obj, keys=keys)
     elif isinstance(obj,dict):
         return canDict(obj)
     elif isinstance(obj, (list,tuple)):
@@ -67,6 +85,8 @@ def canSequence(obj):
 def uncan(obj, g=None):
     if isinstance(obj, CannedFunction):
         return obj.getFunction(g)
+    elif isinstance(obj, CannedObject):
+        return obj.getObject(g)
     elif isinstance(obj,dict):
         return uncanDict(obj)
     elif isinstance(obj, (list,tuple)):