##// END OF EJS Templates
quiet down scheduler printing, fix dep_id check in update_dependencies
MinRK -
Show More
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator'
30 30
31 31 @decorator
32 32 def logged(f,self,*args,**kwargs):
33 print ("#--------------------")
34 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 print ("#--")
33 # print ("#--------------------")
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 # print ("#--")
36 36 return f(self,*args, **kwargs)
37 37
38 38 #----------------------------------------------------------------------
@@ -234,6 +234,9 b' class TaskScheduler(object):'
234 234 logger.error("task::Invaid msg: %s"%msg)
235 235 return
236 236
237 # send to monitor
238 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239
237 240 msg = self.session.unpack_message(msg, content=False, copy=False)
238 241 header = msg['header']
239 242 msg_id = header['msg_id']
@@ -256,8 +259,6 b' class TaskScheduler(object):'
256 259 self.save_unmet(msg_id, raw_msg, after, follow)
257 260 else:
258 261 self.save_unmet(msg_id, raw_msg, after, follow)
259 # send to monitor
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261 262
262 263 @logged
263 264 def maybe_run(self, msg_id, raw_msg, follow=None):
@@ -284,7 +285,6 b' class TaskScheduler(object):'
284 285 self.depending[msg_id] = (msg_id,msg,after,follow)
285 286 # track the ids in both follow/after, but not those already completed
286 287 for dep_id in after.union(follow).difference(self.all_done):
287 print (dep_id)
288 288 if dep_id not in self.dependencies:
289 289 self.dependencies[dep_id] = set()
290 290 self.dependencies[dep_id].add(msg_id)
@@ -300,7 +300,7 b' class TaskScheduler(object):'
300 300 if indices:
301 301 idx = indices[idx]
302 302 target = self.targets[idx]
303 print (target, map(str, msg[:3]))
303 # print (target, map(str, msg[:3]))
304 304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 305 self.engine_stream.send_multipart(msg, copy=False)
306 306 self.add_job(idx)
@@ -334,7 +334,7 b' class TaskScheduler(object):'
334 334 client = idents[1]
335 335 # swap_ids for XREP-XREP mirror
336 336 raw_msg[:2] = [client,engine]
337 print (map(str, raw_msg[:4]))
337 # print (map(str, raw_msg[:4]))
338 338 self.client_stream.send_multipart(raw_msg, copy=False)
339 339 # now, update our data structures
340 340 msg_id = parent['msg_id']
@@ -359,14 +359,15 b' class TaskScheduler(object):'
359 359 def update_dependencies(self, dep_id):
360 360 """dep_id just finished. Update our dependency
361 361 table and submit any jobs that just became runable."""
362
362 363 if dep_id not in self.dependencies:
363 364 return
364 365 jobs = self.dependencies.pop(dep_id)
365 366 for job in jobs:
366 367 msg_id, raw_msg, after, follow = self.depending[job]
367 if msg_id in after:
368 after.remove(msg_id)
369 if not after: # time deps met
368 if dep_id in after:
369 after.remove(dep_id)
370 if not after: # time deps met, maybe run
370 371 if self.maybe_run(msg_id, raw_msg, follow):
371 372 self.depending.pop(job)
372 373 for mid in follow:
General Comments 0
You need to be logged in to leave comments. Login now