##// END OF EJS Templates
quiet down scheduler printing, fix dep_id check in update_dependencies
MinRK -
Show More
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator'
30
30
31 @decorator
31 @decorator
32 def logged(f,self,*args,**kwargs):
32 def logged(f,self,*args,**kwargs):
33 print ("#--------------------")
33 # print ("#--------------------")
34 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 print ("#--")
35 # print ("#--")
36 return f(self,*args, **kwargs)
36 return f(self,*args, **kwargs)
37
37
38 #----------------------------------------------------------------------
38 #----------------------------------------------------------------------
@@ -234,6 +234,9 b' class TaskScheduler(object):'
234 logger.error("task::Invaid msg: %s"%msg)
234 logger.error("task::Invaid msg: %s"%msg)
235 return
235 return
236
236
237 # send to monitor
238 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
239
237 msg = self.session.unpack_message(msg, content=False, copy=False)
240 msg = self.session.unpack_message(msg, content=False, copy=False)
238 header = msg['header']
241 header = msg['header']
239 msg_id = header['msg_id']
242 msg_id = header['msg_id']
@@ -256,8 +259,6 b' class TaskScheduler(object):'
256 self.save_unmet(msg_id, raw_msg, after, follow)
259 self.save_unmet(msg_id, raw_msg, after, follow)
257 else:
260 else:
258 self.save_unmet(msg_id, raw_msg, after, follow)
261 self.save_unmet(msg_id, raw_msg, after, follow)
259 # send to monitor
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261
262
262 @logged
263 @logged
263 def maybe_run(self, msg_id, raw_msg, follow=None):
264 def maybe_run(self, msg_id, raw_msg, follow=None):
@@ -284,7 +285,6 b' class TaskScheduler(object):'
284 self.depending[msg_id] = (msg_id,msg,after,follow)
285 self.depending[msg_id] = (msg_id,msg,after,follow)
285 # track the ids in both follow/after, but not those already completed
286 # track the ids in both follow/after, but not those already completed
286 for dep_id in after.union(follow).difference(self.all_done):
287 for dep_id in after.union(follow).difference(self.all_done):
287 print (dep_id)
288 if dep_id not in self.dependencies:
288 if dep_id not in self.dependencies:
289 self.dependencies[dep_id] = set()
289 self.dependencies[dep_id] = set()
290 self.dependencies[dep_id].add(msg_id)
290 self.dependencies[dep_id].add(msg_id)
@@ -300,7 +300,7 b' class TaskScheduler(object):'
300 if indices:
300 if indices:
301 idx = indices[idx]
301 idx = indices[idx]
302 target = self.targets[idx]
302 target = self.targets[idx]
303 print (target, map(str, msg[:3]))
303 # print (target, map(str, msg[:3]))
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
304 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
305 self.engine_stream.send_multipart(msg, copy=False)
306 self.add_job(idx)
306 self.add_job(idx)
@@ -334,7 +334,7 b' class TaskScheduler(object):'
334 client = idents[1]
334 client = idents[1]
335 # swap_ids for XREP-XREP mirror
335 # swap_ids for XREP-XREP mirror
336 raw_msg[:2] = [client,engine]
336 raw_msg[:2] = [client,engine]
337 print (map(str, raw_msg[:4]))
337 # print (map(str, raw_msg[:4]))
338 self.client_stream.send_multipart(raw_msg, copy=False)
338 self.client_stream.send_multipart(raw_msg, copy=False)
339 # now, update our data structures
339 # now, update our data structures
340 msg_id = parent['msg_id']
340 msg_id = parent['msg_id']
@@ -359,14 +359,15 b' class TaskScheduler(object):'
359 def update_dependencies(self, dep_id):
359 def update_dependencies(self, dep_id):
360 """dep_id just finished. Update our dependency
360 """dep_id just finished. Update our dependency
361 table and submit any jobs that just became runable."""
361 table and submit any jobs that just became runable."""
362
362 if dep_id not in self.dependencies:
363 if dep_id not in self.dependencies:
363 return
364 return
364 jobs = self.dependencies.pop(dep_id)
365 jobs = self.dependencies.pop(dep_id)
365 for job in jobs:
366 for job in jobs:
366 msg_id, raw_msg, after, follow = self.depending[job]
367 msg_id, raw_msg, after, follow = self.depending[job]
367 if msg_id in after:
368 if dep_id in after:
368 after.remove(msg_id)
369 after.remove(dep_id)
369 if not after: # time deps met
370 if not after: # time deps met, maybe run
370 if self.maybe_run(msg_id, raw_msg, follow):
371 if self.maybe_run(msg_id, raw_msg, follow):
371 self.depending.pop(job)
372 self.depending.pop(job)
372 for mid in follow:
373 for mid in follow:
General Comments 0
You need to be logged in to leave comments. Login now