##// END OF EJS Templates
added dependency decorator
MinRK -
Show More
@@ -0,0 +1,66 b''
1 """Dependency utilities"""
2
3 from IPython.external.decorator import decorator
4
5 # flags
6 ALL = 1 << 0
7 ANY = 1 << 1
8 HERE = 1 << 2
9 ANYWHERE = 1 << 3
10
11 class UnmetDependency(Exception):
12 pass
13
14 class depend2(object):
15 """dependency decorator"""
16 def __init__(self, f, *args, **kwargs):
17 print "Inside __init__()"
18 self.dependency = (f,args,kwargs)
19
20 def __call__(self, f, *args, **kwargs):
21 f._dependency = self.dependency
22 return decorator(_depend_wrapper, f)
23
24 class depend(object):
25 """Dependency decorator, for use with tasks."""
26 def __init__(self, f, *args, **kwargs):
27 print "Inside __init__()"
28 self.f = f
29 self.args = args
30 self.kwargs = kwargs
31
32 def __call__(self, f):
33 return dependent(f, self.f, *self.args, **self.kwargs)
34
35 class dependent(object):
36 """A function that depends on another function.
37 This is an object to prevent the closure used
38 in traditional decorators, which are not picklable.
39 """
40
41 def __init__(self, f, df, *dargs, **dkwargs):
42 self.f = f
43 self.func_name = self.f.func_name
44 self.df = df
45 self.dargs = dargs
46 self.dkwargs = dkwargs
47
48 def __call__(self, *args, **kwargs):
49 if self.df(*self.dargs, **self.dkwargs) is False:
50 raise UnmetDependency()
51 return self.f(*args, **kwargs)
52
53 def evaluate_dependency(deps):
54 """Evaluate wheter dependencies are met.
55
56 Parameters
57 ----------
58 deps : dict
59 """
60 pass
61
62 def _check_dependency(flag):
63 pass
64
65
66 __all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies'] No newline at end of file
@@ -16,9 +16,11 b' from code import CommandCompiler'
16 import zmq
16 import zmq
17 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
18
18
19 from IPython.zmq.completer import KernelCompleter
20
19 from streamsession import StreamSession, Message, extract_header, serialize_object,\
21 from streamsession import StreamSession, Message, extract_header, serialize_object,\
20 unpack_apply_message
22 unpack_apply_message
21 from IPython.zmq.completer import KernelCompleter
23 from dependency import UnmetDependency
22
24
23 def printer(*args):
25 def printer(*args):
24 pprint(args)
26 pprint(args)
@@ -155,7 +157,7 b' class Kernel(object):'
155 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
157 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
156 self.queue_handlers[msg_type] = getattr(self, msg_type)
158 self.queue_handlers[msg_type] = getattr(self, msg_type)
157
159
158 for msg_type in ['kill_request', 'abort_request']:
160 for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
159 self.control_handlers[msg_type] = getattr(self, msg_type)
161 self.control_handlers[msg_type] = getattr(self, msg_type)
160
162
161 #-------------------- control handlers -----------------------------
163 #-------------------- control handlers -----------------------------
@@ -273,13 +275,6 b' class Kernel(object):'
273 def check_aborted(self, msg_id):
275 def check_aborted(self, msg_id):
274 return msg_id in self.aborted
276 return msg_id in self.aborted
275
277
276 def unmet_dependencies(self, stream, idents, msg):
277 reply_type = msg['msg_type'].split('_')[0] + '_reply'
278 content = dict(status='resubmitted', reason='unmet dependencies')
279 reply_msg = self.session.send(stream, reply_type,
280 content=content, parent=msg, ident=idents)
281 ### TODO: actually resubmit it ###
282
283 #-------------------- queue handlers -----------------------------
278 #-------------------- queue handlers -----------------------------
284
279
285 def execute_request(self, stream, ident, parent):
280 def execute_request(self, stream, ident, parent):
@@ -344,6 +339,7 b' class Kernel(object):'
344 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
339 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
345 # self.pub_stream.send(pyin_msg)
340 # self.pub_stream.send(pyin_msg)
346 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
341 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
342 sub = {'dependencies_met' : True}
347 try:
343 try:
348 # allow for not overriding displayhook
344 # allow for not overriding displayhook
349 if hasattr(sys.displayhook, 'set_parent'):
345 if hasattr(sys.displayhook, 'set_parent'):
@@ -393,15 +389,19 b' class Kernel(object):'
393 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
389 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
394 reply_content = exc_content
390 reply_content = exc_content
395 result_buf = []
391 result_buf = []
392
393 if etype is UnmetDependency:
394 sub = {'dependencies_met' : False}
396 else:
395 else:
397 reply_content = {'status' : 'ok'}
396 reply_content = {'status' : 'ok'}
398 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
397 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
399 # self.reply_socket.send(ident, zmq.SNDMORE)
398 # self.reply_socket.send(ident, zmq.SNDMORE)
400 # self.reply_socket.send_json(reply_msg)
399 # self.reply_socket.send_json(reply_msg)
401 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
400 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
401 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
402 # print>>sys.__stdout__, Message(reply_msg)
402 # print>>sys.__stdout__, Message(reply_msg)
403 if reply_msg['content']['status'] == u'error':
403 # if reply_msg['content']['status'] == u'error':
404 self.abort_queues()
404 # self.abort_queues()
405
405
406 def dispatch_queue(self, stream, msg):
406 def dispatch_queue(self, stream, msg):
407 self.control_stream.flush()
407 self.control_stream.flush()
@@ -410,7 +410,6 b' class Kernel(object):'
410
410
411 header = msg['header']
411 header = msg['header']
412 msg_id = header['msg_id']
412 msg_id = header['msg_id']
413 dependencies = header.get('dependencies', [])
414 if self.check_aborted(msg_id):
413 if self.check_aborted(msg_id):
415 self.aborted.remove(msg_id)
414 self.aborted.remove(msg_id)
416 # is it safe to assume a msg_id will not be resubmitted?
415 # is it safe to assume a msg_id will not be resubmitted?
@@ -418,8 +417,6 b' class Kernel(object):'
418 reply_msg = self.session.send(stream, reply_type,
417 reply_msg = self.session.send(stream, reply_type,
419 content={'status' : 'aborted'}, parent=msg, ident=idents)
418 content={'status' : 'aborted'}, parent=msg, ident=idents)
420 return
419 return
421 if not self.check_dependencies(dependencies):
422 return self.unmet_dependencies(stream, idents, msg)
423 handler = self.queue_handlers.get(msg['msg_type'], None)
420 handler = self.queue_handlers.get(msg['msg_type'], None)
424 if handler is None:
421 if handler is None:
425 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
422 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
@@ -230,7 +230,7 b' def unpack_apply_message(bufs, g=None, copy=True):'
230 sargs = list(pickle.loads(bufs.pop(0)))
230 sargs = list(pickle.loads(bufs.pop(0)))
231 skwargs = dict(pickle.loads(bufs.pop(0)))
231 skwargs = dict(pickle.loads(bufs.pop(0)))
232 # print sargs, skwargs
232 # print sargs, skwargs
233 f = cf.getFunction(g)
233 f = uncan(cf, g)
234 for sa in sargs:
234 for sa in sargs:
235 if sa.data is None:
235 if sa.data is None:
236 m = bufs.pop(0)
236 m = bufs.pop(0)
@@ -19,10 +19,25 b' from types import FunctionType'
19
19
20 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
20 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
21 from IPython.kernel import codeutil
21 from IPython.kernel import codeutil
22 from IPython.zmq.parallel.dependency import dependent
22
23
23 class CannedObject(object):
24 class CannedObject(object):
24 pass
25 def __init__(self, obj, keys=[]):
26 self.keys = keys
27 self.obj = obj
28 for key in keys:
29 setattr(obj, key, can(getattr(obj, key)))
30
25
31
32 def getObject(self, g=None):
33 if g is None:
34 g = globals()
35 for key in self.keys:
36 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
37 return self.obj
38
39
40
26 class CannedFunction(CannedObject):
41 class CannedFunction(CannedObject):
27
42
28 def __init__(self, f):
43 def __init__(self, f):
@@ -41,6 +56,9 b' class CannedFunction(CannedObject):'
41 def can(obj):
56 def can(obj):
42 if isinstance(obj, FunctionType):
57 if isinstance(obj, FunctionType):
43 return CannedFunction(obj)
58 return CannedFunction(obj)
59 elif isinstance(obj, dependent):
60 keys = ('f','df')
61 return CannedObject(obj, keys=keys)
44 elif isinstance(obj,dict):
62 elif isinstance(obj,dict):
45 return canDict(obj)
63 return canDict(obj)
46 elif isinstance(obj, (list,tuple)):
64 elif isinstance(obj, (list,tuple)):
@@ -67,6 +85,8 b' def canSequence(obj):'
67 def uncan(obj, g=None):
85 def uncan(obj, g=None):
68 if isinstance(obj, CannedFunction):
86 if isinstance(obj, CannedFunction):
69 return obj.getFunction(g)
87 return obj.getFunction(g)
88 elif isinstance(obj, CannedObject):
89 return obj.getObject(g)
70 elif isinstance(obj,dict):
90 elif isinstance(obj,dict):
71 return uncanDict(obj)
91 return uncanDict(obj)
72 elif isinstance(obj, (list,tuple)):
92 elif isinstance(obj, (list,tuple)):
General Comments 0
You need to be logged in to leave comments. Login now