Show More
@@ -0,0 +1,66 | |||
|
1 | """Dependency utilities""" | |
|
2 | ||
|
3 | from IPython.external.decorator import decorator | |
|
4 | ||
|
5 | # flags | |
|
6 | ALL = 1 << 0 | |
|
7 | ANY = 1 << 1 | |
|
8 | HERE = 1 << 2 | |
|
9 | ANYWHERE = 1 << 3 | |
|
10 | ||
|
11 | class UnmetDependency(Exception): | |
|
12 | pass | |
|
13 | ||
|
14 | class depend2(object): | |
|
15 | """dependency decorator""" | |
|
16 | def __init__(self, f, *args, **kwargs): | |
|
17 | print "Inside __init__()" | |
|
18 | self.dependency = (f,args,kwargs) | |
|
19 | ||
|
20 | def __call__(self, f, *args, **kwargs): | |
|
21 | f._dependency = self.dependency | |
|
22 | return decorator(_depend_wrapper, f) | |
|
23 | ||
|
24 | class depend(object): | |
|
25 | """Dependency decorator, for use with tasks.""" | |
|
26 | def __init__(self, f, *args, **kwargs): | |
|
27 | print "Inside __init__()" | |
|
28 | self.f = f | |
|
29 | self.args = args | |
|
30 | self.kwargs = kwargs | |
|
31 | ||
|
32 | def __call__(self, f): | |
|
33 | return dependent(f, self.f, *self.args, **self.kwargs) | |
|
34 | ||
|
35 | class dependent(object): | |
|
36 | """A function that depends on another function. | |
|
37 | This is an object to prevent the closure used | |
|
38 | in traditional decorators, which are not picklable. | |
|
39 | """ | |
|
40 | ||
|
41 | def __init__(self, f, df, *dargs, **dkwargs): | |
|
42 | self.f = f | |
|
43 | self.func_name = self.f.func_name | |
|
44 | self.df = df | |
|
45 | self.dargs = dargs | |
|
46 | self.dkwargs = dkwargs | |
|
47 | ||
|
48 | def __call__(self, *args, **kwargs): | |
|
49 | if self.df(*self.dargs, **self.dkwargs) is False: | |
|
50 | raise UnmetDependency() | |
|
51 | return self.f(*args, **kwargs) | |
|
52 | ||
|
53 | def evaluate_dependency(deps): | |
|
54 | """Evaluate wheter dependencies are met. | |
|
55 | ||
|
56 | Parameters | |
|
57 | ---------- | |
|
58 | deps : dict | |
|
59 | """ | |
|
60 | pass | |
|
61 | ||
|
62 | def _check_dependency(flag): | |
|
63 | pass | |
|
64 | ||
|
65 | ||
|
66 | __all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies'] No newline at end of file |
@@ -16,9 +16,11 from code import CommandCompiler | |||
|
16 | 16 | import zmq |
|
17 | 17 | from zmq.eventloop import ioloop, zmqstream |
|
18 | 18 | |
|
19 | from IPython.zmq.completer import KernelCompleter | |
|
20 | ||
|
19 | 21 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
20 | 22 | unpack_apply_message |
|
21 | from IPython.zmq.completer import KernelCompleter | |
|
23 | from dependency import UnmetDependency | |
|
22 | 24 | |
|
23 | 25 | def printer(*args): |
|
24 | 26 | pprint(args) |
@@ -155,7 +157,7 class Kernel(object): | |||
|
155 | 157 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: |
|
156 | 158 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
157 | 159 | |
|
158 | for msg_type in ['kill_request', 'abort_request']: | |
|
160 | for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys(): | |
|
159 | 161 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
160 | 162 | |
|
161 | 163 | #-------------------- control handlers ----------------------------- |
@@ -273,13 +275,6 class Kernel(object): | |||
|
273 | 275 | def check_aborted(self, msg_id): |
|
274 | 276 | return msg_id in self.aborted |
|
275 | 277 | |
|
276 | def unmet_dependencies(self, stream, idents, msg): | |
|
277 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |
|
278 | content = dict(status='resubmitted', reason='unmet dependencies') | |
|
279 | reply_msg = self.session.send(stream, reply_type, | |
|
280 | content=content, parent=msg, ident=idents) | |
|
281 | ### TODO: actually resubmit it ### | |
|
282 | ||
|
283 | 278 | #-------------------- queue handlers ----------------------------- |
|
284 | 279 | |
|
285 | 280 | def execute_request(self, stream, ident, parent): |
@@ -344,6 +339,7 class Kernel(object): | |||
|
344 | 339 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
345 | 340 | # self.pub_stream.send(pyin_msg) |
|
346 | 341 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
342 | sub = {'dependencies_met' : True} | |
|
347 | 343 | try: |
|
348 | 344 | # allow for not overriding displayhook |
|
349 | 345 | if hasattr(sys.displayhook, 'set_parent'): |
@@ -393,15 +389,19 class Kernel(object): | |||
|
393 | 389 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
394 | 390 | reply_content = exc_content |
|
395 | 391 | result_buf = [] |
|
392 | ||
|
393 | if etype is UnmetDependency: | |
|
394 | sub = {'dependencies_met' : False} | |
|
396 | 395 | else: |
|
397 | 396 | reply_content = {'status' : 'ok'} |
|
398 | 397 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
399 | 398 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
400 | 399 | # self.reply_socket.send_json(reply_msg) |
|
401 |
reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
|
400 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
|
401 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |
|
402 | 402 | # print>>sys.__stdout__, Message(reply_msg) |
|
403 | if reply_msg['content']['status'] == u'error': | |
|
404 | self.abort_queues() | |
|
403 | # if reply_msg['content']['status'] == u'error': | |
|
404 | # self.abort_queues() | |
|
405 | 405 | |
|
406 | 406 | def dispatch_queue(self, stream, msg): |
|
407 | 407 | self.control_stream.flush() |
@@ -410,7 +410,6 class Kernel(object): | |||
|
410 | 410 | |
|
411 | 411 | header = msg['header'] |
|
412 | 412 | msg_id = header['msg_id'] |
|
413 | dependencies = header.get('dependencies', []) | |
|
414 | 413 | if self.check_aborted(msg_id): |
|
415 | 414 | self.aborted.remove(msg_id) |
|
416 | 415 | # is it safe to assume a msg_id will not be resubmitted? |
@@ -418,8 +417,6 class Kernel(object): | |||
|
418 | 417 | reply_msg = self.session.send(stream, reply_type, |
|
419 | 418 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
420 | 419 | return |
|
421 | if not self.check_dependencies(dependencies): | |
|
422 | return self.unmet_dependencies(stream, idents, msg) | |
|
423 | 420 | handler = self.queue_handlers.get(msg['msg_type'], None) |
|
424 | 421 | if handler is None: |
|
425 | 422 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg |
@@ -230,7 +230,7 def unpack_apply_message(bufs, g=None, copy=True): | |||
|
230 | 230 | sargs = list(pickle.loads(bufs.pop(0))) |
|
231 | 231 | skwargs = dict(pickle.loads(bufs.pop(0))) |
|
232 | 232 | # print sargs, skwargs |
|
233 |
f = cf |
|
|
233 | f = uncan(cf, g) | |
|
234 | 234 | for sa in sargs: |
|
235 | 235 | if sa.data is None: |
|
236 | 236 | m = bufs.pop(0) |
@@ -19,10 +19,25 from types import FunctionType | |||
|
19 | 19 | |
|
20 | 20 | # contents of codeutil should either be in here, or codeutil belongs in IPython/util |
|
21 | 21 | from IPython.kernel import codeutil |
|
22 | from IPython.zmq.parallel.dependency import dependent | |
|
22 | 23 | |
|
23 | 24 | class CannedObject(object): |
|
24 | pass | |
|
25 | def __init__(self, obj, keys=[]): | |
|
26 | self.keys = keys | |
|
27 | self.obj = obj | |
|
28 | for key in keys: | |
|
29 | setattr(obj, key, can(getattr(obj, key))) | |
|
30 | ||
|
25 | 31 | |
|
32 | def getObject(self, g=None): | |
|
33 | if g is None: | |
|
34 | g = globals() | |
|
35 | for key in self.keys: | |
|
36 | setattr(self.obj, key, uncan(getattr(self.obj, key), g)) | |
|
37 | return self.obj | |
|
38 | ||
|
39 | ||
|
40 | ||
|
26 | 41 | class CannedFunction(CannedObject): |
|
27 | 42 | |
|
28 | 43 | def __init__(self, f): |
@@ -41,6 +56,9 class CannedFunction(CannedObject): | |||
|
41 | 56 | def can(obj): |
|
42 | 57 | if isinstance(obj, FunctionType): |
|
43 | 58 | return CannedFunction(obj) |
|
59 | elif isinstance(obj, dependent): | |
|
60 | keys = ('f','df') | |
|
61 | return CannedObject(obj, keys=keys) | |
|
44 | 62 | elif isinstance(obj,dict): |
|
45 | 63 | return canDict(obj) |
|
46 | 64 | elif isinstance(obj, (list,tuple)): |
@@ -67,6 +85,8 def canSequence(obj): | |||
|
67 | 85 | def uncan(obj, g=None): |
|
68 | 86 | if isinstance(obj, CannedFunction): |
|
69 | 87 | return obj.getFunction(g) |
|
88 | elif isinstance(obj, CannedObject): | |
|
89 | return obj.getObject(g) | |
|
70 | 90 | elif isinstance(obj,dict): |
|
71 | 91 | return uncanDict(obj) |
|
72 | 92 | elif isinstance(obj, (list,tuple)): |
General Comments 0
You need to be logged in to leave comments.
Login now