From 5297768283fa59abed435bcf60a138ce355c807c 2011-04-08 00:38:07 From: MinRK <benjaminrk@gmail.com> Date: 2011-04-08 00:38:07 Subject: [PATCH] added dependency decorator --- diff --git a/IPython/zmq/parallel/dependency.py b/IPython/zmq/parallel/dependency.py new file mode 100644 index 0000000..4d24111 --- /dev/null +++ b/IPython/zmq/parallel/dependency.py @@ -0,0 +1,66 @@ +"""Dependency utilities""" + +from IPython.external.decorator import decorator + +# flags +ALL = 1 << 0 +ANY = 1 << 1 +HERE = 1 << 2 +ANYWHERE = 1 << 3 + +class UnmetDependency(Exception): + pass + +class depend2(object): + """dependency decorator""" + def __init__(self, f, *args, **kwargs): + print "Inside __init__()" + self.dependency = (f,args,kwargs) + + def __call__(self, f, *args, **kwargs): + f._dependency = self.dependency + return decorator(_depend_wrapper, f) + +class depend(object): + """Dependency decorator, for use with tasks.""" + def __init__(self, f, *args, **kwargs): + print "Inside __init__()" + self.f = f + self.args = args + self.kwargs = kwargs + + def __call__(self, f): + return dependent(f, self.f, *self.args, **self.kwargs) + +class dependent(object): + """A function that depends on another function. + This is an object to prevent the closure used + in traditional decorators, which are not picklable. + """ + + def __init__(self, f, df, *dargs, **dkwargs): + self.f = f + self.func_name = self.f.func_name + self.df = df + self.dargs = dargs + self.dkwargs = dkwargs + + def __call__(self, *args, **kwargs): + if self.df(*self.dargs, **self.dkwargs) is False: + raise UnmetDependency() + return self.f(*args, **kwargs) + +def evaluate_dependency(deps): + """Evaluate wheter dependencies are met. + + Parameters + ---------- + deps : dict + """ + pass + +def _check_dependency(flag): + pass + + +__all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies'] \ No newline at end of file diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index 57adf9b..e9fa5ba 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -16,9 +16,11 @@ from code import CommandCompiler import zmq from zmq.eventloop import ioloop, zmqstream +from IPython.zmq.completer import KernelCompleter + from streamsession import StreamSession, Message, extract_header, serialize_object,\ unpack_apply_message -from IPython.zmq.completer import KernelCompleter +from dependency import UnmetDependency def printer(*args): pprint(args) @@ -155,7 +157,7 @@ class Kernel(object): for msg_type in ['execute_request', 'complete_request', 'apply_request']: self.queue_handlers[msg_type] = getattr(self, msg_type) - for msg_type in ['kill_request', 'abort_request']: + for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys(): self.control_handlers[msg_type] = getattr(self, msg_type) #-------------------- control handlers ----------------------------- @@ -273,13 +275,6 @@ class Kernel(object): def check_aborted(self, msg_id): return msg_id in self.aborted - def unmet_dependencies(self, stream, idents, msg): - reply_type = msg['msg_type'].split('_')[0] + '_reply' - content = dict(status='resubmitted', reason='unmet dependencies') - reply_msg = self.session.send(stream, reply_type, - content=content, parent=msg, ident=idents) - ### TODO: actually resubmit it ### - #-------------------- queue handlers ----------------------------- def execute_request(self, stream, ident, parent): @@ -344,6 +339,7 @@ class Kernel(object): # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.pub_stream.send(pyin_msg) # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) + sub = {'dependencies_met' : True} try: # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): @@ -393,15 +389,19 @@ class Kernel(object): self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) reply_content = exc_content result_buf = [] + + if etype is UnmetDependency: + sub = {'dependencies_met' : False} else: reply_content = {'status' : 'ok'} # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) # self.reply_socket.send(ident, zmq.SNDMORE) # self.reply_socket.send_json(reply_msg) - reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) + reply_msg = self.session.send(stream, u'apply_reply', reply_content, + parent=parent, ident=ident,buffers=result_buf, subheader=sub) # print>>sys.__stdout__, Message(reply_msg) - if reply_msg['content']['status'] == u'error': - self.abort_queues() + # if reply_msg['content']['status'] == u'error': + # self.abort_queues() def dispatch_queue(self, stream, msg): self.control_stream.flush() @@ -410,7 +410,6 @@ class Kernel(object): header = msg['header'] msg_id = header['msg_id'] - dependencies = header.get('dependencies', []) if self.check_aborted(msg_id): self.aborted.remove(msg_id) # is it safe to assume a msg_id will not be resubmitted? @@ -418,8 +417,6 @@ class Kernel(object): reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents) return - if not self.check_dependencies(dependencies): - return self.unmet_dependencies(stream, idents, msg) handler = self.queue_handlers.get(msg['msg_type'], None) if handler is None: print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py index d98b5fe..ff02b12 100644 --- a/IPython/zmq/parallel/streamsession.py +++ b/IPython/zmq/parallel/streamsession.py @@ -230,7 +230,7 @@ def unpack_apply_message(bufs, g=None, copy=True): sargs = list(pickle.loads(bufs.pop(0))) skwargs = dict(pickle.loads(bufs.pop(0))) # print sargs, skwargs - f = cf.getFunction(g) + f = uncan(cf, g) for sa in sargs: if sa.data is None: m = bufs.pop(0) diff --git a/IPython/zmq/pickleutil.py b/IPython/zmq/pickleutil.py index 0191056..a52fc69 100644 --- a/IPython/zmq/pickleutil.py +++ b/IPython/zmq/pickleutil.py @@ -19,10 +19,25 @@ from types import FunctionType # contents of codeutil should either be in here, or codeutil belongs in IPython/util from IPython.kernel import codeutil +from IPython.zmq.parallel.dependency import dependent class CannedObject(object): - pass + def __init__(self, obj, keys=[]): + self.keys = keys + self.obj = obj + for key in keys: + setattr(obj, key, can(getattr(obj, key))) + + def getObject(self, g=None): + if g is None: + g = globals() + for key in self.keys: + setattr(self.obj, key, uncan(getattr(self.obj, key), g)) + return self.obj + + + class CannedFunction(CannedObject): def __init__(self, f): @@ -41,6 +56,9 @@ class CannedFunction(CannedObject): def can(obj): if isinstance(obj, FunctionType): return CannedFunction(obj) + elif isinstance(obj, dependent): + keys = ('f','df') + return CannedObject(obj, keys=keys) elif isinstance(obj,dict): return canDict(obj) elif isinstance(obj, (list,tuple)): @@ -67,6 +85,8 @@ def canSequence(obj): def uncan(obj, g=None): if isinstance(obj, CannedFunction): return obj.getFunction(g) + elif isinstance(obj, CannedObject): + return obj.getObject(g) elif isinstance(obj,dict): return uncanDict(obj) elif isinstance(obj, (list,tuple)):