Show More
@@ -0,0 +1,66 | |||||
|
1 | """Dependency utilities""" | |||
|
2 | ||||
|
3 | from IPython.external.decorator import decorator | |||
|
4 | ||||
|
5 | # flags | |||
|
6 | ALL = 1 << 0 | |||
|
7 | ANY = 1 << 1 | |||
|
8 | HERE = 1 << 2 | |||
|
9 | ANYWHERE = 1 << 3 | |||
|
10 | ||||
|
11 | class UnmetDependency(Exception): | |||
|
12 | pass | |||
|
13 | ||||
|
14 | class depend2(object): | |||
|
15 | """dependency decorator""" | |||
|
16 | def __init__(self, f, *args, **kwargs): | |||
|
17 | print "Inside __init__()" | |||
|
18 | self.dependency = (f,args,kwargs) | |||
|
19 | ||||
|
20 | def __call__(self, f, *args, **kwargs): | |||
|
21 | f._dependency = self.dependency | |||
|
22 | return decorator(_depend_wrapper, f) | |||
|
23 | ||||
|
24 | class depend(object): | |||
|
25 | """Dependency decorator, for use with tasks.""" | |||
|
26 | def __init__(self, f, *args, **kwargs): | |||
|
27 | print "Inside __init__()" | |||
|
28 | self.f = f | |||
|
29 | self.args = args | |||
|
30 | self.kwargs = kwargs | |||
|
31 | ||||
|
32 | def __call__(self, f): | |||
|
33 | return dependent(f, self.f, *self.args, **self.kwargs) | |||
|
34 | ||||
|
35 | class dependent(object): | |||
|
36 | """A function that depends on another function. | |||
|
37 | This is an object to prevent the closure used | |||
|
38 | in traditional decorators, which are not picklable. | |||
|
39 | """ | |||
|
40 | ||||
|
41 | def __init__(self, f, df, *dargs, **dkwargs): | |||
|
42 | self.f = f | |||
|
43 | self.func_name = self.f.func_name | |||
|
44 | self.df = df | |||
|
45 | self.dargs = dargs | |||
|
46 | self.dkwargs = dkwargs | |||
|
47 | ||||
|
48 | def __call__(self, *args, **kwargs): | |||
|
49 | if self.df(*self.dargs, **self.dkwargs) is False: | |||
|
50 | raise UnmetDependency() | |||
|
51 | return self.f(*args, **kwargs) | |||
|
52 | ||||
|
53 | def evaluate_dependency(deps): | |||
|
54 | """Evaluate wheter dependencies are met. | |||
|
55 | ||||
|
56 | Parameters | |||
|
57 | ---------- | |||
|
58 | deps : dict | |||
|
59 | """ | |||
|
60 | pass | |||
|
61 | ||||
|
62 | def _check_dependency(flag): | |||
|
63 | pass | |||
|
64 | ||||
|
65 | ||||
|
66 | __all__ = ['UnmetDependency', 'depend', 'evaluate_dependencies'] No newline at end of file |
@@ -16,9 +16,11 from code import CommandCompiler | |||||
16 | import zmq |
|
16 | import zmq | |
17 | from zmq.eventloop import ioloop, zmqstream |
|
17 | from zmq.eventloop import ioloop, zmqstream | |
18 |
|
18 | |||
|
19 | from IPython.zmq.completer import KernelCompleter | |||
|
20 | ||||
19 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
21 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |
20 | unpack_apply_message |
|
22 | unpack_apply_message | |
21 | from IPython.zmq.completer import KernelCompleter |
|
23 | from dependency import UnmetDependency | |
22 |
|
24 | |||
23 | def printer(*args): |
|
25 | def printer(*args): | |
24 | pprint(args) |
|
26 | pprint(args) | |
@@ -155,7 +157,7 class Kernel(object): | |||||
155 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: |
|
157 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: | |
156 | self.queue_handlers[msg_type] = getattr(self, msg_type) |
|
158 | self.queue_handlers[msg_type] = getattr(self, msg_type) | |
157 |
|
159 | |||
158 | for msg_type in ['kill_request', 'abort_request']: |
|
160 | for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys(): | |
159 | self.control_handlers[msg_type] = getattr(self, msg_type) |
|
161 | self.control_handlers[msg_type] = getattr(self, msg_type) | |
160 |
|
162 | |||
161 | #-------------------- control handlers ----------------------------- |
|
163 | #-------------------- control handlers ----------------------------- | |
@@ -273,13 +275,6 class Kernel(object): | |||||
273 | def check_aborted(self, msg_id): |
|
275 | def check_aborted(self, msg_id): | |
274 | return msg_id in self.aborted |
|
276 | return msg_id in self.aborted | |
275 |
|
277 | |||
276 | def unmet_dependencies(self, stream, idents, msg): |
|
|||
277 | reply_type = msg['msg_type'].split('_')[0] + '_reply' |
|
|||
278 | content = dict(status='resubmitted', reason='unmet dependencies') |
|
|||
279 | reply_msg = self.session.send(stream, reply_type, |
|
|||
280 | content=content, parent=msg, ident=idents) |
|
|||
281 | ### TODO: actually resubmit it ### |
|
|||
282 |
|
||||
283 | #-------------------- queue handlers ----------------------------- |
|
278 | #-------------------- queue handlers ----------------------------- | |
284 |
|
279 | |||
285 | def execute_request(self, stream, ident, parent): |
|
280 | def execute_request(self, stream, ident, parent): | |
@@ -344,6 +339,7 class Kernel(object): | |||||
344 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
339 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |
345 | # self.pub_stream.send(pyin_msg) |
|
340 | # self.pub_stream.send(pyin_msg) | |
346 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) |
|
341 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |
|
342 | sub = {'dependencies_met' : True} | |||
347 | try: |
|
343 | try: | |
348 | # allow for not overriding displayhook |
|
344 | # allow for not overriding displayhook | |
349 | if hasattr(sys.displayhook, 'set_parent'): |
|
345 | if hasattr(sys.displayhook, 'set_parent'): | |
@@ -393,15 +389,19 class Kernel(object): | |||||
393 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) |
|
389 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |
394 | reply_content = exc_content |
|
390 | reply_content = exc_content | |
395 | result_buf = [] |
|
391 | result_buf = [] | |
|
392 | ||||
|
393 | if etype is UnmetDependency: | |||
|
394 | sub = {'dependencies_met' : False} | |||
396 | else: |
|
395 | else: | |
397 | reply_content = {'status' : 'ok'} |
|
396 | reply_content = {'status' : 'ok'} | |
398 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) |
|
397 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |
399 | # self.reply_socket.send(ident, zmq.SNDMORE) |
|
398 | # self.reply_socket.send(ident, zmq.SNDMORE) | |
400 | # self.reply_socket.send_json(reply_msg) |
|
399 | # self.reply_socket.send_json(reply_msg) | |
401 |
reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
400 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | |
|
401 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) | |||
402 | # print>>sys.__stdout__, Message(reply_msg) |
|
402 | # print>>sys.__stdout__, Message(reply_msg) | |
403 | if reply_msg['content']['status'] == u'error': |
|
403 | # if reply_msg['content']['status'] == u'error': | |
404 | self.abort_queues() |
|
404 | # self.abort_queues() | |
405 |
|
405 | |||
406 | def dispatch_queue(self, stream, msg): |
|
406 | def dispatch_queue(self, stream, msg): | |
407 | self.control_stream.flush() |
|
407 | self.control_stream.flush() | |
@@ -410,7 +410,6 class Kernel(object): | |||||
410 |
|
410 | |||
411 | header = msg['header'] |
|
411 | header = msg['header'] | |
412 | msg_id = header['msg_id'] |
|
412 | msg_id = header['msg_id'] | |
413 | dependencies = header.get('dependencies', []) |
|
|||
414 | if self.check_aborted(msg_id): |
|
413 | if self.check_aborted(msg_id): | |
415 | self.aborted.remove(msg_id) |
|
414 | self.aborted.remove(msg_id) | |
416 | # is it safe to assume a msg_id will not be resubmitted? |
|
415 | # is it safe to assume a msg_id will not be resubmitted? | |
@@ -418,8 +417,6 class Kernel(object): | |||||
418 | reply_msg = self.session.send(stream, reply_type, |
|
417 | reply_msg = self.session.send(stream, reply_type, | |
419 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
418 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
420 | return |
|
419 | return | |
421 | if not self.check_dependencies(dependencies): |
|
|||
422 | return self.unmet_dependencies(stream, idents, msg) |
|
|||
423 | handler = self.queue_handlers.get(msg['msg_type'], None) |
|
420 | handler = self.queue_handlers.get(msg['msg_type'], None) | |
424 | if handler is None: |
|
421 | if handler is None: | |
425 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg |
|
422 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg |
@@ -230,7 +230,7 def unpack_apply_message(bufs, g=None, copy=True): | |||||
230 | sargs = list(pickle.loads(bufs.pop(0))) |
|
230 | sargs = list(pickle.loads(bufs.pop(0))) | |
231 | skwargs = dict(pickle.loads(bufs.pop(0))) |
|
231 | skwargs = dict(pickle.loads(bufs.pop(0))) | |
232 | # print sargs, skwargs |
|
232 | # print sargs, skwargs | |
233 |
f = cf |
|
233 | f = uncan(cf, g) | |
234 | for sa in sargs: |
|
234 | for sa in sargs: | |
235 | if sa.data is None: |
|
235 | if sa.data is None: | |
236 | m = bufs.pop(0) |
|
236 | m = bufs.pop(0) |
@@ -19,10 +19,25 from types import FunctionType | |||||
19 |
|
19 | |||
20 | # contents of codeutil should either be in here, or codeutil belongs in IPython/util |
|
20 | # contents of codeutil should either be in here, or codeutil belongs in IPython/util | |
21 | from IPython.kernel import codeutil |
|
21 | from IPython.kernel import codeutil | |
|
22 | from IPython.zmq.parallel.dependency import dependent | |||
22 |
|
23 | |||
23 | class CannedObject(object): |
|
24 | class CannedObject(object): | |
24 | pass |
|
25 | def __init__(self, obj, keys=[]): | |
|
26 | self.keys = keys | |||
|
27 | self.obj = obj | |||
|
28 | for key in keys: | |||
|
29 | setattr(obj, key, can(getattr(obj, key))) | |||
|
30 | ||||
25 |
|
31 | |||
|
32 | def getObject(self, g=None): | |||
|
33 | if g is None: | |||
|
34 | g = globals() | |||
|
35 | for key in self.keys: | |||
|
36 | setattr(self.obj, key, uncan(getattr(self.obj, key), g)) | |||
|
37 | return self.obj | |||
|
38 | ||||
|
39 | ||||
|
40 | ||||
26 | class CannedFunction(CannedObject): |
|
41 | class CannedFunction(CannedObject): | |
27 |
|
42 | |||
28 | def __init__(self, f): |
|
43 | def __init__(self, f): | |
@@ -41,6 +56,9 class CannedFunction(CannedObject): | |||||
41 | def can(obj): |
|
56 | def can(obj): | |
42 | if isinstance(obj, FunctionType): |
|
57 | if isinstance(obj, FunctionType): | |
43 | return CannedFunction(obj) |
|
58 | return CannedFunction(obj) | |
|
59 | elif isinstance(obj, dependent): | |||
|
60 | keys = ('f','df') | |||
|
61 | return CannedObject(obj, keys=keys) | |||
44 | elif isinstance(obj,dict): |
|
62 | elif isinstance(obj,dict): | |
45 | return canDict(obj) |
|
63 | return canDict(obj) | |
46 | elif isinstance(obj, (list,tuple)): |
|
64 | elif isinstance(obj, (list,tuple)): | |
@@ -67,6 +85,8 def canSequence(obj): | |||||
67 | def uncan(obj, g=None): |
|
85 | def uncan(obj, g=None): | |
68 | if isinstance(obj, CannedFunction): |
|
86 | if isinstance(obj, CannedFunction): | |
69 | return obj.getFunction(g) |
|
87 | return obj.getFunction(g) | |
|
88 | elif isinstance(obj, CannedObject): | |||
|
89 | return obj.getObject(g) | |||
70 | elif isinstance(obj,dict): |
|
90 | elif isinstance(obj,dict): | |
71 | return uncanDict(obj) |
|
91 | return uncanDict(obj) | |
72 | elif isinstance(obj, (list,tuple)): |
|
92 | elif isinstance(obj, (list,tuple)): |
General Comments 0
You need to be logged in to leave comments.
Login now