##// END OF EJS Templates
added dependency decorator
MinRK -
Show More
@@ -0,0 +1,66
1 """Dependency utilities"""
2
3 from IPython.external.decorator import decorator
4
5 # flags
6 ALL = 1 << 0
7 ANY = 1 << 1
8 HERE = 1 << 2
9 ANYWHERE = 1 << 3
10
11 class UnmetDependency(Exception):
12 pass
13
14 class depend2(object):
15 """dependency decorator"""
16 def __init__(self, f, *args, **kwargs):
17 print "Inside __init__()"
18 self.dependency = (f,args,kwargs)
19
20 def __call__(self, f, *args, **kwargs):
21 f._dependency = self.dependency
22 return decorator(_depend_wrapper, f)
23
24 class depend(object):
25 """Dependency decorator, for use with tasks."""
26 def __init__(self, f, *args, **kwargs):
27 print "Inside __init__()"
28 self.f = f
29 self.args = args
30 self.kwargs = kwargs
31
32 def __call__(self, f):
33 return dependent(f, self.f, *self.args, **self.kwargs)
34
35 class dependent(object):
36 """A function that depends on another function.
37 This is an object to prevent the closure used
38 in traditional decorators, which are not picklable.
39 """
40
41 def __init__(self, f, df, *dargs, **dkwargs):
42 self.f = f
43 self.func_name = self.f.func_name
44 self.df = df
45 self.dargs = dargs
46 self.dkwargs = dkwargs
47
48 def __call__(self, *args, **kwargs):
49 if self.df(*self.dargs, **self.dkwargs) is False:
50 raise UnmetDependency()
51 return self.f(*args, **kwargs)
52
53 def evaluate_dependency(deps):
54 """Evaluate wheter dependencies are met.
55
56 Parameters
57 ----------
58 deps : dict
59 """
60 pass
61
62 def _check_dependency(flag):
63 pass
64
65
66 __all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies'] No newline at end of file
@@ -16,9 +16,11 from code import CommandCompiler
16 16 import zmq
17 17 from zmq.eventloop import ioloop, zmqstream
18 18
19 from IPython.zmq.completer import KernelCompleter
20
19 21 from streamsession import StreamSession, Message, extract_header, serialize_object,\
20 22 unpack_apply_message
21 from IPython.zmq.completer import KernelCompleter
23 from dependency import UnmetDependency
22 24
23 25 def printer(*args):
24 26 pprint(args)
@@ -155,7 +157,7 class Kernel(object):
155 157 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
156 158 self.queue_handlers[msg_type] = getattr(self, msg_type)
157 159
158 for msg_type in ['kill_request', 'abort_request']:
160 for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys():
159 161 self.control_handlers[msg_type] = getattr(self, msg_type)
160 162
161 163 #-------------------- control handlers -----------------------------
@@ -273,13 +275,6 class Kernel(object):
273 275 def check_aborted(self, msg_id):
274 276 return msg_id in self.aborted
275 277
276 def unmet_dependencies(self, stream, idents, msg):
277 reply_type = msg['msg_type'].split('_')[0] + '_reply'
278 content = dict(status='resubmitted', reason='unmet dependencies')
279 reply_msg = self.session.send(stream, reply_type,
280 content=content, parent=msg, ident=idents)
281 ### TODO: actually resubmit it ###
282
283 278 #-------------------- queue handlers -----------------------------
284 279
285 280 def execute_request(self, stream, ident, parent):
@@ -344,6 +339,7 class Kernel(object):
344 339 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
345 340 # self.pub_stream.send(pyin_msg)
346 341 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
342 sub = {'dependencies_met' : True}
347 343 try:
348 344 # allow for not overriding displayhook
349 345 if hasattr(sys.displayhook, 'set_parent'):
@@ -393,15 +389,19 class Kernel(object):
393 389 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
394 390 reply_content = exc_content
395 391 result_buf = []
392
393 if etype is UnmetDependency:
394 sub = {'dependencies_met' : False}
396 395 else:
397 396 reply_content = {'status' : 'ok'}
398 397 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
399 398 # self.reply_socket.send(ident, zmq.SNDMORE)
400 399 # self.reply_socket.send_json(reply_msg)
401 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
400 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
401 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
402 402 # print>>sys.__stdout__, Message(reply_msg)
403 if reply_msg['content']['status'] == u'error':
404 self.abort_queues()
403 # if reply_msg['content']['status'] == u'error':
404 # self.abort_queues()
405 405
406 406 def dispatch_queue(self, stream, msg):
407 407 self.control_stream.flush()
@@ -410,7 +410,6 class Kernel(object):
410 410
411 411 header = msg['header']
412 412 msg_id = header['msg_id']
413 dependencies = header.get('dependencies', [])
414 413 if self.check_aborted(msg_id):
415 414 self.aborted.remove(msg_id)
416 415 # is it safe to assume a msg_id will not be resubmitted?
@@ -418,8 +417,6 class Kernel(object):
418 417 reply_msg = self.session.send(stream, reply_type,
419 418 content={'status' : 'aborted'}, parent=msg, ident=idents)
420 419 return
421 if not self.check_dependencies(dependencies):
422 return self.unmet_dependencies(stream, idents, msg)
423 420 handler = self.queue_handlers.get(msg['msg_type'], None)
424 421 if handler is None:
425 422 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
@@ -230,7 +230,7 def unpack_apply_message(bufs, g=None, copy=True):
230 230 sargs = list(pickle.loads(bufs.pop(0)))
231 231 skwargs = dict(pickle.loads(bufs.pop(0)))
232 232 # print sargs, skwargs
233 f = cf.getFunction(g)
233 f = uncan(cf, g)
234 234 for sa in sargs:
235 235 if sa.data is None:
236 236 m = bufs.pop(0)
@@ -19,10 +19,25 from types import FunctionType
19 19
20 20 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
21 21 from IPython.kernel import codeutil
22 from IPython.zmq.parallel.dependency import dependent
22 23
23 24 class CannedObject(object):
24 pass
25 def __init__(self, obj, keys=[]):
26 self.keys = keys
27 self.obj = obj
28 for key in keys:
29 setattr(obj, key, can(getattr(obj, key)))
30
25 31
32 def getObject(self, g=None):
33 if g is None:
34 g = globals()
35 for key in self.keys:
36 setattr(self.obj, key, uncan(getattr(self.obj, key), g))
37 return self.obj
38
39
40
26 41 class CannedFunction(CannedObject):
27 42
28 43 def __init__(self, f):
@@ -41,6 +56,9 class CannedFunction(CannedObject):
41 56 def can(obj):
42 57 if isinstance(obj, FunctionType):
43 58 return CannedFunction(obj)
59 elif isinstance(obj, dependent):
60 keys = ('f','df')
61 return CannedObject(obj, keys=keys)
44 62 elif isinstance(obj,dict):
45 63 return canDict(obj)
46 64 elif isinstance(obj, (list,tuple)):
@@ -67,6 +85,8 def canSequence(obj):
67 85 def uncan(obj, g=None):
68 86 if isinstance(obj, CannedFunction):
69 87 return obj.getFunction(g)
88 elif isinstance(obj, CannedObject):
89 return obj.getObject(g)
70 90 elif isinstance(obj,dict):
71 91 return uncanDict(obj)
72 92 elif isinstance(obj, (list,tuple)):
General Comments 0
You need to be logged in to leave comments. Login now