Show More
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator' | |||
|
30 | 30 | |
|
31 | 31 | @decorator |
|
32 | 32 | def logged(f,self,*args,**kwargs): |
|
33 | print ("#--------------------") | |
|
34 | print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
|
35 | print ("#--") | |
|
33 | # print ("#--------------------") | |
|
34 | # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
|
35 | # print ("#--") | |
|
36 | 36 | return f(self,*args, **kwargs) |
|
37 | 37 | |
|
38 | 38 | #---------------------------------------------------------------------- |
@@ -234,6 +234,9 b' class TaskScheduler(object):' | |||
|
234 | 234 | logger.error("task::Invaid msg: %s"%msg) |
|
235 | 235 | return |
|
236 | 236 | |
|
237 | # send to monitor | |
|
238 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |
|
239 | ||
|
237 | 240 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
238 | 241 | header = msg['header'] |
|
239 | 242 | msg_id = header['msg_id'] |
@@ -256,8 +259,6 b' class TaskScheduler(object):' | |||
|
256 | 259 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
257 | 260 | else: |
|
258 | 261 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
259 | # send to monitor | |
|
260 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |
|
261 | 262 | |
|
262 | 263 | @logged |
|
263 | 264 | def maybe_run(self, msg_id, raw_msg, follow=None): |
@@ -284,7 +285,6 b' class TaskScheduler(object):' | |||
|
284 | 285 | self.depending[msg_id] = (msg_id,msg,after,follow) |
|
285 | 286 | # track the ids in both follow/after, but not those already completed |
|
286 | 287 | for dep_id in after.union(follow).difference(self.all_done): |
|
287 | print (dep_id) | |
|
288 | 288 | if dep_id not in self.dependencies: |
|
289 | 289 | self.dependencies[dep_id] = set() |
|
290 | 290 | self.dependencies[dep_id].add(msg_id) |
@@ -300,7 +300,7 b' class TaskScheduler(object):' | |||
|
300 | 300 | if indices: |
|
301 | 301 | idx = indices[idx] |
|
302 | 302 | target = self.targets[idx] |
|
303 | print (target, map(str, msg[:3])) | |
|
303 | # print (target, map(str, msg[:3])) | |
|
304 | 304 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
305 | 305 | self.engine_stream.send_multipart(msg, copy=False) |
|
306 | 306 | self.add_job(idx) |
@@ -334,7 +334,7 b' class TaskScheduler(object):' | |||
|
334 | 334 | client = idents[1] |
|
335 | 335 | # swap_ids for XREP-XREP mirror |
|
336 | 336 | raw_msg[:2] = [client,engine] |
|
337 | print (map(str, raw_msg[:4])) | |
|
337 | # print (map(str, raw_msg[:4])) | |
|
338 | 338 | self.client_stream.send_multipart(raw_msg, copy=False) |
|
339 | 339 | # now, update our data structures |
|
340 | 340 | msg_id = parent['msg_id'] |
@@ -359,14 +359,15 b' class TaskScheduler(object):' | |||
|
359 | 359 | def update_dependencies(self, dep_id): |
|
360 | 360 | """dep_id just finished. Update our dependency |
|
361 | 361 | table and submit any jobs that just became runable.""" |
|
362 | ||
|
362 | 363 | if dep_id not in self.dependencies: |
|
363 | 364 | return |
|
364 | 365 | jobs = self.dependencies.pop(dep_id) |
|
365 | 366 | for job in jobs: |
|
366 | 367 | msg_id, raw_msg, after, follow = self.depending[job] |
|
367 |
if |
|
|
368 |
after.remove( |
|
|
369 | if not after: # time deps met | |
|
368 | if dep_id in after: | |
|
369 | after.remove(dep_id) | |
|
370 | if not after: # time deps met, maybe run | |
|
370 | 371 | if self.maybe_run(msg_id, raw_msg, follow): |
|
371 | 372 | self.depending.pop(job) |
|
372 | 373 | for mid in follow: |
General Comments 0
You need to be logged in to leave comments.
Login now