From f1b3eb2310cf565e5ee871081d7bd9e260bafa97 2011-04-08 00:38:09 From: MinRK Date: 2011-04-08 00:38:09 Subject: [PATCH] quiet down scheduler printing, fix dep_id check in update_dependencies --- diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 0b17552..7adb023 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -30,9 +30,9 @@ from IPython.external.decorator import decorator @decorator def logged(f,self,*args,**kwargs): - print ("#--------------------") - print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) - print ("#--") + # print ("#--------------------") + # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) + # print ("#--") return f(self,*args, **kwargs) #---------------------------------------------------------------------- @@ -234,6 +234,9 @@ class TaskScheduler(object): logger.error("task::Invaid msg: %s"%msg) return + # send to monitor + self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) + msg = self.session.unpack_message(msg, content=False, copy=False) header = msg['header'] msg_id = header['msg_id'] @@ -256,8 +259,6 @@ class TaskScheduler(object): self.save_unmet(msg_id, raw_msg, after, follow) else: self.save_unmet(msg_id, raw_msg, after, follow) - # send to monitor - self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) @logged def maybe_run(self, msg_id, raw_msg, follow=None): @@ -284,7 +285,6 @@ class TaskScheduler(object): self.depending[msg_id] = (msg_id,msg,after,follow) # track the ids in both follow/after, but not those already completed for dep_id in after.union(follow).difference(self.all_done): - print (dep_id) if dep_id not in self.dependencies: self.dependencies[dep_id] = set() self.dependencies[dep_id].add(msg_id) @@ -300,7 +300,7 @@ class TaskScheduler(object): if indices: idx = indices[idx] target = self.targets[idx] - print (target, map(str, msg[:3])) + # print (target, map(str, msg[:3])) self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) self.engine_stream.send_multipart(msg, copy=False) self.add_job(idx) @@ -334,7 +334,7 @@ class TaskScheduler(object): client = idents[1] # swap_ids for XREP-XREP mirror raw_msg[:2] = [client,engine] - print (map(str, raw_msg[:4])) + # print (map(str, raw_msg[:4])) self.client_stream.send_multipart(raw_msg, copy=False) # now, update our data structures msg_id = parent['msg_id'] @@ -359,14 +359,15 @@ class TaskScheduler(object): def update_dependencies(self, dep_id): """dep_id just finished. Update our dependency table and submit any jobs that just became runable.""" + if dep_id not in self.dependencies: return jobs = self.dependencies.pop(dep_id) for job in jobs: msg_id, raw_msg, after, follow = self.depending[job] - if msg_id in after: - after.remove(msg_id) - if not after: # time deps met + if dep_id in after: + after.remove(dep_id) + if not after: # time deps met, maybe run if self.maybe_run(msg_id, raw_msg, follow): self.depending.pop(job) for mid in follow: