##// END OF EJS Templates
Improvements to dependency handling...
MinRK -
Show More
@@ -16,17 +16,23 b' __docformat__ = "restructuredtext en"'
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 from types import FunctionType
18 from types import FunctionType
19 import copy
19
20
20 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
21 from IPython.zmq.parallel.dependency import dependent
21 from IPython.zmq.parallel.dependency import dependent
22
22 import codeutil
23 import codeutil
23
24
25 #-------------------------------------------------------------------------------
26 # Classes
27 #-------------------------------------------------------------------------------
28
29
24 class CannedObject(object):
30 class CannedObject(object):
25 def __init__(self, obj, keys=[]):
31 def __init__(self, obj, keys=[]):
26 self.keys = keys
32 self.keys = keys
27 self.obj = obj
33 self.obj = copy.copy(obj)
28 for key in keys:
34 for key in keys:
29 setattr(obj, key, can(getattr(obj, key)))
35 setattr(self.obj, key, can(getattr(obj, key)))
30
36
31
37
32 def getObject(self, g=None):
38 def getObject(self, g=None):
@@ -43,6 +49,7 b' class CannedFunction(CannedObject):'
43 def __init__(self, f):
49 def __init__(self, f):
44 self._checkType(f)
50 self._checkType(f)
45 self.code = f.func_code
51 self.code = f.func_code
52 self.__name__ = f.__name__
46
53
47 def _checkType(self, obj):
54 def _checkType(self, obj):
48 assert isinstance(obj, FunctionType), "Not a function type"
55 assert isinstance(obj, FunctionType), "Not a function type"
@@ -53,6 +60,11 b' class CannedFunction(CannedObject):'
53 newFunc = FunctionType(self.code, g)
60 newFunc = FunctionType(self.code, g)
54 return newFunc
61 return newFunc
55
62
63 #-------------------------------------------------------------------------------
64 # Functions
65 #-------------------------------------------------------------------------------
66
67
56 def can(obj):
68 def can(obj):
57 if isinstance(obj, FunctionType):
69 if isinstance(obj, FunctionType):
58 return CannedFunction(obj)
70 return CannedFunction(obj)
@@ -36,6 +36,7 b' class AsyncResult(object):'
36 self._fname=fname
36 self._fname=fname
37 self._ready = False
37 self._ready = False
38 self._success = None
38 self._success = None
39 self._flatten_result = len(msg_ids) == 1
39
40
40 def __repr__(self):
41 def __repr__(self):
41 if self._ready:
42 if self._ready:
@@ -49,7 +50,7 b' class AsyncResult(object):'
49 Override me in subclasses for turning a list of results
50 Override me in subclasses for turning a list of results
50 into the expected form.
51 into the expected form.
51 """
52 """
52 if len(self.msg_ids) == 1:
53 if self._flatten_result:
53 return res[0]
54 return res[0]
54 else:
55 else:
55 return res
56 return res
@@ -115,7 +116,7 b' class AsyncResult(object):'
115 def get_dict(self, timeout=-1):
116 def get_dict(self, timeout=-1):
116 """Get the results as a dict, keyed by engine_id."""
117 """Get the results as a dict, keyed by engine_id."""
117 results = self.get(timeout)
118 results = self.get(timeout)
118 engine_ids = [md['engine_id'] for md in self._metadata ]
119 engine_ids = [ md['engine_id'] for md in self._metadata ]
119 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 maxcount = bycount.count(bycount[-1])
121 maxcount = bycount.count(bycount[-1])
121 if maxcount > 1:
122 if maxcount > 1:
@@ -130,11 +131,17 b' class AsyncResult(object):'
130 """result property."""
131 """result property."""
131 return self._result
132 return self._result
132
133
134 # abbreviated alias:
135 r = result
136
133 @property
137 @property
134 @check_ready
138 @check_ready
135 def metadata(self):
139 def metadata(self):
136 """metadata property."""
140 """metadata property."""
137 return self._metadata
141 if self._flatten_result:
142 return self._metadata[0]
143 else:
144 return self._metadata
138
145
139 @property
146 @property
140 def result_dict(self):
147 def result_dict(self):
@@ -157,7 +164,11 b' class AsyncResult(object):'
157 elif isinstance(key, slice):
164 elif isinstance(key, slice):
158 return error.collect_exceptions(self._result[key], self._fname)
165 return error.collect_exceptions(self._result[key], self._fname)
159 elif isinstance(key, basestring):
166 elif isinstance(key, basestring):
160 return [ md[key] for md in self._metadata ]
167 values = [ md[key] for md in self._metadata ]
168 if self._flatten_result:
169 return values[0]
170 else:
171 return values
161 else:
172 else:
162 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
173 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
163
174
@@ -177,8 +188,9 b' class AsyncMapResult(AsyncResult):'
177 """
188 """
178
189
179 def __init__(self, client, msg_ids, mapObject, fname=''):
190 def __init__(self, client, msg_ids, mapObject, fname=''):
180 self._mapObject = mapObject
181 AsyncResult.__init__(self, client, msg_ids, fname=fname)
191 AsyncResult.__init__(self, client, msg_ids, fname=fname)
192 self._mapObject = mapObject
193 self._flatten_result = False
182
194
183 def _reconstruct_result(self, res):
195 def _reconstruct_result(self, res):
184 """Perform the gather on the actual results."""
196 """Perform the gather on the actual results."""
@@ -91,7 +91,13 b' def defaultblock(f, self, *args, **kwargs):'
91 #--------------------------------------------------------------------------
91 #--------------------------------------------------------------------------
92
92
93 class Metadata(dict):
93 class Metadata(dict):
94 """Subclass of dict for initializing metadata values."""
94 """Subclass of dict for initializing metadata values.
95
96 Attribute access works on keys.
97
98 These objects have a strict set of keys - errors will raise if you try
99 to add new keys.
100 """
95 def __init__(self, *args, **kwargs):
101 def __init__(self, *args, **kwargs):
96 dict.__init__(self)
102 dict.__init__(self)
97 md = {'msg_id' : None,
103 md = {'msg_id' : None,
@@ -113,7 +119,27 b' class Metadata(dict):'
113 }
119 }
114 self.update(md)
120 self.update(md)
115 self.update(dict(*args, **kwargs))
121 self.update(dict(*args, **kwargs))
122
123 def __getattr__(self, key):
124 """getattr aliased to getitem"""
125 if key in self.iterkeys():
126 return self[key]
127 else:
128 raise AttributeError(key)
116
129
130 def __setattr__(self, key, value):
131 """setattr aliased to setitem, with strict"""
132 if key in self.iterkeys():
133 self[key] = value
134 else:
135 raise AttributeError(key)
136
137 def __setitem__(self, key, value):
138 """strict static key enforcement"""
139 if key in self.iterkeys():
140 dict.__setitem__(self, key, value)
141 else:
142 raise KeyError(key)
117
143
118
144
119 class Client(object):
145 class Client(object):
@@ -372,16 +398,22 b' class Client(object):'
372
398
373 def _extract_metadata(self, header, parent, content):
399 def _extract_metadata(self, header, parent, content):
374 md = {'msg_id' : parent['msg_id'],
400 md = {'msg_id' : parent['msg_id'],
375 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
376 'started' : datetime.strptime(header['started'], ss.ISO8601),
377 'completed' : datetime.strptime(header['date'], ss.ISO8601),
378 'received' : datetime.now(),
401 'received' : datetime.now(),
379 'engine_uuid' : header['engine'],
402 'engine_uuid' : header.get('engine', None),
380 'engine_id' : self._engines.get(header['engine'], None),
381 'follow' : parent['follow'],
403 'follow' : parent['follow'],
382 'after' : parent['after'],
404 'after' : parent['after'],
383 'status' : content['status'],
405 'status' : content['status'],
384 }
406 }
407
408 if md['engine_uuid'] is not None:
409 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
410
411 if 'date' in parent:
412 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
413 if 'started' in header:
414 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
415 if 'date' in header:
416 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
385 return md
417 return md
386
418
387 def _handle_execute_reply(self, msg):
419 def _handle_execute_reply(self, msg):
@@ -393,7 +425,10 b' class Client(object):'
393 parent = msg['parent_header']
425 parent = msg['parent_header']
394 msg_id = parent['msg_id']
426 msg_id = parent['msg_id']
395 if msg_id not in self.outstanding:
427 if msg_id not in self.outstanding:
396 print("got unknown result: %s"%msg_id)
428 if msg_id in self.history:
429 print ("got stale result: %s"%msg_id)
430 else:
431 print ("got unknown result: %s"%msg_id)
397 else:
432 else:
398 self.outstanding.remove(msg_id)
433 self.outstanding.remove(msg_id)
399 self.results[msg_id] = ss.unwrap_exception(msg['content'])
434 self.results[msg_id] = ss.unwrap_exception(msg['content'])
@@ -403,7 +438,12 b' class Client(object):'
403 parent = msg['parent_header']
438 parent = msg['parent_header']
404 msg_id = parent['msg_id']
439 msg_id = parent['msg_id']
405 if msg_id not in self.outstanding:
440 if msg_id not in self.outstanding:
406 print ("got unknown result: %s"%msg_id)
441 if msg_id in self.history:
442 print ("got stale result: %s"%msg_id)
443 print self.results[msg_id]
444 print msg
445 else:
446 print ("got unknown result: %s"%msg_id)
407 else:
447 else:
408 self.outstanding.remove(msg_id)
448 self.outstanding.remove(msg_id)
409 content = msg['content']
449 content = msg['content']
@@ -424,9 +464,10 b' class Client(object):'
424 pass
464 pass
425 else:
465 else:
426 e = ss.unwrap_exception(content)
466 e = ss.unwrap_exception(content)
427 e_uuid = e.engine_info['engineid']
467 if e.engine_info:
428 eid = self._engines[e_uuid]
468 e_uuid = e.engine_info['engineid']
429 e.engine_info['engineid'] = eid
469 eid = self._engines[e_uuid]
470 e.engine_info['engineid'] = eid
430 self.results[msg_id] = e
471 self.results[msg_id] = e
431
472
432 def _flush_notifications(self):
473 def _flush_notifications(self):
@@ -811,6 +852,8 b' class Client(object):'
811 elif after is None:
852 elif after is None:
812 after = []
853 after = []
813 if isinstance(follow, Dependency):
854 if isinstance(follow, Dependency):
855 # if len(follow) > 1 and follow.mode == 'all':
856 # warn("complex follow-dependencies are not rigorously tested for reachability", UserWarning)
814 follow = follow.as_dict()
857 follow = follow.as_dict()
815 elif isinstance(follow, AsyncResult):
858 elif isinstance(follow, AsyncResult):
816 follow=follow.msg_ids
859 follow=follow.msg_ids
@@ -827,7 +870,6 b' class Client(object):'
827 after=None, follow=None):
870 after=None, follow=None):
828 """The underlying method for applying functions in a load balanced
871 """The underlying method for applying functions in a load balanced
829 manner, via the task queue."""
872 manner, via the task queue."""
830
831 subheader = dict(after=after, follow=follow)
873 subheader = dict(after=after, follow=follow)
832 bufs = ss.pack_apply_message(f,args,kwargs)
874 bufs = ss.pack_apply_message(f,args,kwargs)
833 content = dict(bound=bound)
875 content = dict(bound=bound)
@@ -1,6 +1,8 b''
1 """Dependency utilities"""
1 """Dependency utilities"""
2
2
3 from IPython.external.decorator import decorator
3 from IPython.external.decorator import decorator
4 from error import UnmetDependency
5
4
6
5 # flags
7 # flags
6 ALL = 1 << 0
8 ALL = 1 << 0
@@ -8,9 +10,6 b' ANY = 1 << 1'
8 HERE = 1 << 2
10 HERE = 1 << 2
9 ANYWHERE = 1 << 3
11 ANYWHERE = 1 << 3
10
12
11 class UnmetDependency(Exception):
12 pass
13
14
13
15 class depend(object):
14 class depend(object):
16 """Dependency decorator, for use with tasks."""
15 """Dependency decorator, for use with tasks."""
@@ -30,7 +29,7 b' class dependent(object):'
30
29
31 def __init__(self, f, df, *dargs, **dkwargs):
30 def __init__(self, f, df, *dargs, **dkwargs):
32 self.f = f
31 self.f = f
33 self.func_name = self.f.func_name
32 self.func_name = getattr(f, '__name__', 'f')
34 self.df = df
33 self.df = df
35 self.dargs = dargs
34 self.dargs = dargs
36 self.dkwargs = dkwargs
35 self.dkwargs = dkwargs
@@ -39,6 +38,10 b' class dependent(object):'
39 if self.df(*self.dargs, **self.dkwargs) is False:
38 if self.df(*self.dargs, **self.dkwargs) is False:
40 raise UnmetDependency()
39 raise UnmetDependency()
41 return self.f(*args, **kwargs)
40 return self.f(*args, **kwargs)
41
42 @property
43 def __name__(self):
44 return self.func_name
42
45
43 def _require(*names):
46 def _require(*names):
44 for name in names:
47 for name in names:
@@ -57,18 +60,23 b' class Dependency(set):'
57 Subclassed from set()."""
60 Subclassed from set()."""
58
61
59 mode='all'
62 mode='all'
63 success_only=True
60
64
61 def __init__(self, dependencies=[], mode='all'):
65 def __init__(self, dependencies=[], mode='all', success_only=True):
62 if isinstance(dependencies, dict):
66 if isinstance(dependencies, dict):
63 # load from dict
67 # load from dict
64 dependencies = dependencies.get('dependencies', [])
65 mode = dependencies.get('mode', mode)
68 mode = dependencies.get('mode', mode)
69 success_only = dependencies.get('success_only', success_only)
70 dependencies = dependencies.get('dependencies', [])
66 set.__init__(self, dependencies)
71 set.__init__(self, dependencies)
67 self.mode = mode.lower()
72 self.mode = mode.lower()
73 self.success_only=success_only
68 if self.mode not in ('any', 'all'):
74 if self.mode not in ('any', 'all'):
69 raise NotImplementedError("Only any|all supported, not %r"%mode)
75 raise NotImplementedError("Only any|all supported, not %r"%mode)
70
76
71 def check(self, completed):
77 def check(self, completed, failed=None):
78 if failed is not None and not self.success_only:
79 completed = completed.union(failed)
72 if len(self) == 0:
80 if len(self) == 0:
73 return True
81 return True
74 if self.mode == 'all':
82 if self.mode == 'all':
@@ -78,13 +86,26 b' class Dependency(set):'
78 else:
86 else:
79 raise NotImplementedError("Only any|all supported, not %r"%mode)
87 raise NotImplementedError("Only any|all supported, not %r"%mode)
80
88
89 def unreachable(self, failed):
90 if len(self) == 0 or len(failed) == 0 or not self.success_only:
91 return False
92 print self, self.success_only, self.mode, failed
93 if self.mode == 'all':
94 return not self.isdisjoint(failed)
95 elif self.mode == 'any':
96 return self.issubset(failed)
97 else:
98 raise NotImplementedError("Only any|all supported, not %r"%mode)
99
100
81 def as_dict(self):
101 def as_dict(self):
82 """Represent this dependency as a dict. For json compatibility."""
102 """Represent this dependency as a dict. For json compatibility."""
83 return dict(
103 return dict(
84 dependencies=list(self),
104 dependencies=list(self),
85 mode=self.mode
105 mode=self.mode,
106 success_only=self.success_only,
86 )
107 )
87
108
88
109
89 __all__ = ['UnmetDependency', 'depend', 'require', 'Dependency']
110 __all__ = ['depend', 'require', 'Dependency']
90
111
@@ -148,6 +148,12 b' class FileTimeoutError(KernelError):'
148 class TimeoutError(KernelError):
148 class TimeoutError(KernelError):
149 pass
149 pass
150
150
151 class UnmetDependency(KernelError):
152 pass
153
154 class ImpossibleDependency(UnmetDependency):
155 pass
156
151 class RemoteError(KernelError):
157 class RemoteError(KernelError):
152 """Error raised elsewhere"""
158 """Error raised elsewhere"""
153 ename=None
159 ename=None
@@ -27,6 +27,7 b' from IPython.external.decorator import decorator'
27 from IPython.config.configurable import Configurable
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance, Dict, List, Set
28 from IPython.utils.traitlets import Instance, Dict, List, Set
29
29
30 import error
30 from client import Client
31 from client import Client
31 from dependency import Dependency
32 from dependency import Dependency
32 import streamsession as ss
33 import streamsession as ss
@@ -104,6 +105,9 b' def leastload(loads):'
104 #---------------------------------------------------------------------
105 #---------------------------------------------------------------------
105 # Classes
106 # Classes
106 #---------------------------------------------------------------------
107 #---------------------------------------------------------------------
108 # store empty default dependency:
109 MET = Dependency([])
110
107 class TaskScheduler(Configurable):
111 class TaskScheduler(Configurable):
108 """Python TaskScheduler object.
112 """Python TaskScheduler object.
109
113
@@ -126,10 +130,14 b' class TaskScheduler(Configurable):'
126 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
130 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
127 pending = Dict() # dict by engine_uuid of submitted tasks
131 pending = Dict() # dict by engine_uuid of submitted tasks
128 completed = Dict() # dict by engine_uuid of completed tasks
132 completed = Dict() # dict by engine_uuid of completed tasks
133 failed = Dict() # dict by engine_uuid of failed tasks
134 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
129 clients = Dict() # dict by msg_id for who submitted the task
135 clients = Dict() # dict by msg_id for who submitted the task
130 targets = List() # list of target IDENTs
136 targets = List() # list of target IDENTs
131 loads = List() # list of engine loads
137 loads = List() # list of engine loads
132 all_done = Set() # set of all completed tasks
138 all_completed = Set() # set of all completed tasks
139 all_failed = Set() # set of all failed tasks
140 all_done = Set() # set of all finished tasks=union(completed,failed)
133 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
141 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
134 session = Instance(ss.StreamSession)
142 session = Instance(ss.StreamSession)
135
143
@@ -182,6 +190,7 b' class TaskScheduler(Configurable):'
182 self.loads.insert(0,0)
190 self.loads.insert(0,0)
183 # initialize sets
191 # initialize sets
184 self.completed[uid] = set()
192 self.completed[uid] = set()
193 self.failed[uid] = set()
185 self.pending[uid] = {}
194 self.pending[uid] = {}
186 if len(self.targets) == 1:
195 if len(self.targets) == 1:
187 self.resume_receiving()
196 self.resume_receiving()
@@ -196,6 +205,11 b' class TaskScheduler(Configurable):'
196 self.engine_stream.flush()
205 self.engine_stream.flush()
197
206
198 self.completed.pop(uid)
207 self.completed.pop(uid)
208 self.failed.pop(uid)
209 # don't pop destinations, because it might be used later
210 # map(self.destinations.pop, self.completed.pop(uid))
211 # map(self.destinations.pop, self.failed.pop(uid))
212
199 lost = self.pending.pop(uid)
213 lost = self.pending.pop(uid)
200
214
201 idx = self.targets.index(uid)
215 idx = self.targets.index(uid)
@@ -235,15 +249,23 b' class TaskScheduler(Configurable):'
235 # time dependencies
249 # time dependencies
236 after = Dependency(header.get('after', []))
250 after = Dependency(header.get('after', []))
237 if after.mode == 'all':
251 if after.mode == 'all':
238 after.difference_update(self.all_done)
252 after.difference_update(self.all_completed)
239 if after.check(self.all_done):
253 if not after.success_only:
254 after.difference_update(self.all_failed)
255 if after.check(self.all_completed, self.all_failed):
240 # recast as empty set, if `after` already met,
256 # recast as empty set, if `after` already met,
241 # to prevent unnecessary set comparisons
257 # to prevent unnecessary set comparisons
242 after = Dependency([])
258 after = MET
243
259
244 # location dependencies
260 # location dependencies
245 follow = Dependency(header.get('follow', []))
261 follow = Dependency(header.get('follow', []))
246 if len(after) == 0:
262
263 # check if unreachable:
264 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
265 self.depending[msg_id] = [raw_msg,MET,MET]
266 return self.fail_unreachable(msg_id)
267
268 if after.check(self.all_completed, self.all_failed):
247 # time deps already met, try to run
269 # time deps already met, try to run
248 if not self.maybe_run(msg_id, raw_msg, follow):
270 if not self.maybe_run(msg_id, raw_msg, follow):
249 # can't run yet
271 # can't run yet
@@ -252,6 +274,35 b' class TaskScheduler(Configurable):'
252 self.save_unmet(msg_id, raw_msg, after, follow)
274 self.save_unmet(msg_id, raw_msg, after, follow)
253
275
254 @logged
276 @logged
277 def fail_unreachable(self, msg_id):
278 """a message has become unreachable"""
279 if msg_id not in self.depending:
280 logging.error("msg %r already failed!"%msg_id)
281 return
282 raw_msg, after, follow = self.depending.pop(msg_id)
283 for mid in follow.union(after):
284 if mid in self.dependencies:
285 self.dependencies[mid].remove(msg_id)
286
287 idents,msg = self.session.feed_identities(raw_msg, copy=False)
288 msg = self.session.unpack_message(msg, copy=False, content=False)
289 header = msg['header']
290
291 try:
292 raise error.ImpossibleDependency()
293 except:
294 content = ss.wrap_exception()
295
296 self.all_done.add(msg_id)
297 self.all_failed.add(msg_id)
298
299 msg = self.session.send(self.client_stream, 'apply_reply', content,
300 parent=header, ident=idents)
301 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
302
303 self.update_dependencies(msg_id, success=False)
304
305 @logged
255 def maybe_run(self, msg_id, raw_msg, follow=None):
306 def maybe_run(self, msg_id, raw_msg, follow=None):
256 """check location dependencies, and run if they are met."""
307 """check location dependencies, and run if they are met."""
257
308
@@ -259,10 +310,20 b' class TaskScheduler(Configurable):'
259 def can_run(idx):
310 def can_run(idx):
260 target = self.targets[idx]
311 target = self.targets[idx]
261 return target not in self.blacklist.get(msg_id, []) and\
312 return target not in self.blacklist.get(msg_id, []) and\
262 follow.check(self.completed[target])
313 follow.check(self.completed[target], self.failed[target])
263
314
264 indices = filter(can_run, range(len(self.targets)))
315 indices = filter(can_run, range(len(self.targets)))
265 if not indices:
316 if not indices:
317 # TODO evaluate unmeetable follow dependencies
318 if follow.mode == 'all':
319 dests = set()
320 relevant = self.all_completed if follow.success_only else self.all_done
321 for m in follow.intersection(relevant):
322 dests.add(self.destinations[m])
323 if len(dests) > 1:
324 self.fail_unreachable(msg_id)
325
326
266 return False
327 return False
267 else:
328 else:
268 indices = None
329 indices = None
@@ -271,10 +332,10 b' class TaskScheduler(Configurable):'
271 return True
332 return True
272
333
273 @logged
334 @logged
274 def save_unmet(self, msg_id, msg, after, follow):
335 def save_unmet(self, msg_id, raw_msg, after, follow):
275 """Save a message for later submission when its dependencies are met."""
336 """Save a message for later submission when its dependencies are met."""
276 self.depending[msg_id] = (msg_id,msg,after,follow)
337 self.depending[msg_id] = [raw_msg,after,follow]
277 # track the ids in both follow/after, but not those already completed
338 # track the ids in follow or after, but not those already finished
278 for dep_id in after.union(follow).difference(self.all_done):
339 for dep_id in after.union(follow).difference(self.all_done):
279 if dep_id not in self.dependencies:
340 if dep_id not in self.dependencies:
280 self.dependencies[dep_id] = set()
341 self.dependencies[dep_id] = set()
@@ -313,14 +374,15 b' class TaskScheduler(Configurable):'
313 msg = self.session.unpack_message(msg, content=False, copy=False)
374 msg = self.session.unpack_message(msg, content=False, copy=False)
314 header = msg['header']
375 header = msg['header']
315 if header.get('dependencies_met', True):
376 if header.get('dependencies_met', True):
316 self.handle_result_success(idents, msg['parent_header'], raw_msg)
377 success = (header['status'] == 'ok')
317 # send to monitor
378 self.handle_result(idents, msg['parent_header'], raw_msg, success)
379 # send to Hub monitor
318 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
380 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
319 else:
381 else:
320 self.handle_unmet_dependency(idents, msg['parent_header'])
382 self.handle_unmet_dependency(idents, msg['parent_header'])
321
383
322 @logged
384 @logged
323 def handle_result_success(self, idents, parent, raw_msg):
385 def handle_result(self, idents, parent, raw_msg, success=True):
324 # first, relay result to client
386 # first, relay result to client
325 engine = idents[0]
387 engine = idents[0]
326 client = idents[1]
388 client = idents[1]
@@ -331,10 +393,16 b' class TaskScheduler(Configurable):'
331 # now, update our data structures
393 # now, update our data structures
332 msg_id = parent['msg_id']
394 msg_id = parent['msg_id']
333 self.pending[engine].pop(msg_id)
395 self.pending[engine].pop(msg_id)
334 self.completed[engine].add(msg_id)
396 if success:
397 self.completed[engine].add(msg_id)
398 self.all_completed.add(msg_id)
399 else:
400 self.failed[engine].add(msg_id)
401 self.all_failed.add(msg_id)
335 self.all_done.add(msg_id)
402 self.all_done.add(msg_id)
403 self.destinations[msg_id] = engine
336
404
337 self.update_dependencies(msg_id)
405 self.update_dependencies(msg_id, success)
338
406
339 @logged
407 @logged
340 def handle_unmet_dependency(self, idents, parent):
408 def handle_unmet_dependency(self, idents, parent):
@@ -346,24 +414,39 b' class TaskScheduler(Configurable):'
346 raw_msg,follow = self.pending[engine].pop(msg_id)
414 raw_msg,follow = self.pending[engine].pop(msg_id)
347 if not self.maybe_run(msg_id, raw_msg, follow):
415 if not self.maybe_run(msg_id, raw_msg, follow):
348 # resubmit failed, put it back in our dependency tree
416 # resubmit failed, put it back in our dependency tree
349 self.save_unmet(msg_id, raw_msg, Dependency(), follow)
417 self.save_unmet(msg_id, raw_msg, MET, follow)
350 pass
418 pass
419
351 @logged
420 @logged
352 def update_dependencies(self, dep_id):
421 def update_dependencies(self, dep_id, success=True):
353 """dep_id just finished. Update our dependency
422 """dep_id just finished. Update our dependency
354 table and submit any jobs that just became runable."""
423 table and submit any jobs that just became runable."""
355
424 # print ("\n\n***********")
425 # pprint (dep_id)
426 # pprint (self.dependencies)
427 # pprint (self.depending)
428 # pprint (self.all_completed)
429 # pprint (self.all_failed)
430 # print ("\n\n***********\n\n")
356 if dep_id not in self.dependencies:
431 if dep_id not in self.dependencies:
357 return
432 return
358 jobs = self.dependencies.pop(dep_id)
433 jobs = self.dependencies.pop(dep_id)
359 for job in jobs:
434
360 msg_id, raw_msg, after, follow = self.depending[job]
435 for msg_id in jobs:
361 if dep_id in after:
436 raw_msg, after, follow = self.depending[msg_id]
362 after.remove(dep_id)
437 # if dep_id in after:
363 if not after: # time deps met, maybe run
438 # if after.mode == 'all' and (success or not after.success_only):
439 # after.remove(dep_id)
440
441 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
442 self.fail_unreachable(msg_id)
443
444 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
445 self.depending[msg_id][1] = MET
364 if self.maybe_run(msg_id, raw_msg, follow):
446 if self.maybe_run(msg_id, raw_msg, follow):
365 self.depending.pop(job)
447
366 for mid in follow:
448 self.depending.pop(msg_id)
449 for mid in follow.union(after):
367 if mid in self.dependencies:
450 if mid in self.dependencies:
368 self.dependencies[mid].remove(msg_id)
451 self.dependencies[mid].remove(msg_id)
369
452
@@ -34,7 +34,6 b' from IPython.zmq.displayhook import DisplayHook'
34 from factory import SessionFactory
34 from factory import SessionFactory
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
36 unpack_apply_message, ISO8601, wrap_exception
36 unpack_apply_message, ISO8601, wrap_exception
37 from dependency import UnmetDependency
38 import heartmonitor
37 import heartmonitor
39 from client import Client
38 from client import Client
40
39
@@ -266,9 +265,7 b' class Kernel(SessionFactory):'
266 reply_content = exc_content
265 reply_content = exc_content
267 else:
266 else:
268 reply_content = {'status' : 'ok'}
267 reply_content = {'status' : 'ok'}
269 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
268
270 # self.reply_socket.send(ident, zmq.SNDMORE)
271 # self.reply_socket.send_json(reply_msg)
272 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
273 ident=ident, subheader = dict(started=started))
270 ident=ident, subheader = dict(started=started))
274 logging.debug(str(reply_msg))
271 logging.debug(str(reply_msg))
@@ -317,10 +314,7 b' class Kernel(SessionFactory):'
317 suffix = prefix = "_" # prevent keyword collisions with lambda
314 suffix = prefix = "_" # prevent keyword collisions with lambda
318 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
315 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
319 # if f.fun
316 # if f.fun
320 if hasattr(f, 'func_name'):
317 fname = getattr(f, '__name__', 'f')
321 fname = f.func_name
322 else:
323 fname = f.__name__
324
318
325 fname = prefix+fname.strip('<>')+suffix
319 fname = prefix+fname.strip('<>')+suffix
326 argname = prefix+"args"+suffix
320 argname = prefix+"args"+suffix
@@ -350,16 +344,17 b' class Kernel(SessionFactory):'
350 reply_content = exc_content
344 reply_content = exc_content
351 result_buf = []
345 result_buf = []
352
346
353 if exc_content['ename'] == UnmetDependency.__name__:
347 if exc_content['ename'] == 'UnmetDependency':
354 sub['dependencies_met'] = False
348 sub['dependencies_met'] = False
355 else:
349 else:
356 reply_content = {'status' : 'ok'}
350 reply_content = {'status' : 'ok'}
357 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
351
358 # self.reply_socket.send(ident, zmq.SNDMORE)
352 # put 'ok'/'error' status in header, for scheduler introspection:
359 # self.reply_socket.send_json(reply_msg)
353 sub['status'] = reply_content['status']
354
360 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
355 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
361 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
356 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
362 # print(Message(reply_msg), file=sys.__stdout__)
357
363 # if reply_msg['content']['status'] == u'error':
358 # if reply_msg['content']['status'] == u'error':
364 # self.abort_queues()
359 # self.abort_queues()
365
360
@@ -400,13 +395,11 b' class Kernel(SessionFactory):'
400 return dispatcher
395 return dispatcher
401
396
402 for s in self.shell_streams:
397 for s in self.shell_streams:
403 # s.on_recv(printer)
404 s.on_recv(make_dispatcher(s), copy=False)
398 s.on_recv(make_dispatcher(s), copy=False)
405 # s.on_err(printer)
399 s.on_err(printer)
406
400
407 if self.iopub_stream:
401 if self.iopub_stream:
408 self.iopub_stream.on_err(printer)
402 self.iopub_stream.on_err(printer)
409 # self.iopub_stream.on_send(printer)
410
403
411 #### while True mode:
404 #### while True mode:
412 # while True:
405 # while True:
@@ -399,12 +399,12 b' class StreamSession(object):'
399 stream.send(b, flag, copy=False)
399 stream.send(b, flag, copy=False)
400 if buffers:
400 if buffers:
401 stream.send(buffers[-1], copy=False)
401 stream.send(buffers[-1], copy=False)
402 omsg = Message(msg)
402 # omsg = Message(msg)
403 if self.debug:
403 if self.debug:
404 pprint.pprint(omsg)
404 pprint.pprint(msg)
405 pprint.pprint(to_send)
405 pprint.pprint(to_send)
406 pprint.pprint(buffers)
406 pprint.pprint(buffers)
407 return omsg
407 return msg
408
408
409 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
409 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
410 """Send a raw message via ident path.
410 """Send a raw message via ident path.
General Comments 0
You need to be logged in to leave comments. Login now