Show More
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator' | |||||
30 |
|
30 | |||
31 | @decorator |
|
31 | @decorator | |
32 | def logged(f,self,*args,**kwargs): |
|
32 | def logged(f,self,*args,**kwargs): | |
33 | print ("#--------------------") |
|
33 | # print ("#--------------------") | |
34 | print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) |
|
34 | # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
35 | print ("#--") |
|
35 | # print ("#--") | |
36 | return f(self,*args, **kwargs) |
|
36 | return f(self,*args, **kwargs) | |
37 |
|
37 | |||
38 | #---------------------------------------------------------------------- |
|
38 | #---------------------------------------------------------------------- | |
@@ -234,6 +234,9 b' class TaskScheduler(object):' | |||||
234 | logger.error("task::Invaid msg: %s"%msg) |
|
234 | logger.error("task::Invaid msg: %s"%msg) | |
235 | return |
|
235 | return | |
236 |
|
236 | |||
|
237 | # send to monitor | |||
|
238 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | |||
|
239 | ||||
237 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
240 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
238 | header = msg['header'] |
|
241 | header = msg['header'] | |
239 | msg_id = header['msg_id'] |
|
242 | msg_id = header['msg_id'] | |
@@ -256,8 +259,6 b' class TaskScheduler(object):' | |||||
256 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
259 | self.save_unmet(msg_id, raw_msg, after, follow) | |
257 | else: |
|
260 | else: | |
258 | self.save_unmet(msg_id, raw_msg, after, follow) |
|
261 | self.save_unmet(msg_id, raw_msg, after, follow) | |
259 | # send to monitor |
|
|||
260 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) |
|
|||
261 |
|
262 | |||
262 | @logged |
|
263 | @logged | |
263 | def maybe_run(self, msg_id, raw_msg, follow=None): |
|
264 | def maybe_run(self, msg_id, raw_msg, follow=None): | |
@@ -284,7 +285,6 b' class TaskScheduler(object):' | |||||
284 | self.depending[msg_id] = (msg_id,msg,after,follow) |
|
285 | self.depending[msg_id] = (msg_id,msg,after,follow) | |
285 | # track the ids in both follow/after, but not those already completed |
|
286 | # track the ids in both follow/after, but not those already completed | |
286 | for dep_id in after.union(follow).difference(self.all_done): |
|
287 | for dep_id in after.union(follow).difference(self.all_done): | |
287 | print (dep_id) |
|
|||
288 | if dep_id not in self.dependencies: |
|
288 | if dep_id not in self.dependencies: | |
289 | self.dependencies[dep_id] = set() |
|
289 | self.dependencies[dep_id] = set() | |
290 | self.dependencies[dep_id].add(msg_id) |
|
290 | self.dependencies[dep_id].add(msg_id) | |
@@ -300,7 +300,7 b' class TaskScheduler(object):' | |||||
300 | if indices: |
|
300 | if indices: | |
301 | idx = indices[idx] |
|
301 | idx = indices[idx] | |
302 | target = self.targets[idx] |
|
302 | target = self.targets[idx] | |
303 | print (target, map(str, msg[:3])) |
|
303 | # print (target, map(str, msg[:3])) | |
304 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
304 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | |
305 | self.engine_stream.send_multipart(msg, copy=False) |
|
305 | self.engine_stream.send_multipart(msg, copy=False) | |
306 | self.add_job(idx) |
|
306 | self.add_job(idx) | |
@@ -334,7 +334,7 b' class TaskScheduler(object):' | |||||
334 | client = idents[1] |
|
334 | client = idents[1] | |
335 | # swap_ids for XREP-XREP mirror |
|
335 | # swap_ids for XREP-XREP mirror | |
336 | raw_msg[:2] = [client,engine] |
|
336 | raw_msg[:2] = [client,engine] | |
337 | print (map(str, raw_msg[:4])) |
|
337 | # print (map(str, raw_msg[:4])) | |
338 | self.client_stream.send_multipart(raw_msg, copy=False) |
|
338 | self.client_stream.send_multipart(raw_msg, copy=False) | |
339 | # now, update our data structures |
|
339 | # now, update our data structures | |
340 | msg_id = parent['msg_id'] |
|
340 | msg_id = parent['msg_id'] | |
@@ -359,14 +359,15 b' class TaskScheduler(object):' | |||||
359 | def update_dependencies(self, dep_id): |
|
359 | def update_dependencies(self, dep_id): | |
360 | """dep_id just finished. Update our dependency |
|
360 | """dep_id just finished. Update our dependency | |
361 | table and submit any jobs that just became runable.""" |
|
361 | table and submit any jobs that just became runable.""" | |
|
362 | ||||
362 | if dep_id not in self.dependencies: |
|
363 | if dep_id not in self.dependencies: | |
363 | return |
|
364 | return | |
364 | jobs = self.dependencies.pop(dep_id) |
|
365 | jobs = self.dependencies.pop(dep_id) | |
365 | for job in jobs: |
|
366 | for job in jobs: | |
366 | msg_id, raw_msg, after, follow = self.depending[job] |
|
367 | msg_id, raw_msg, after, follow = self.depending[job] | |
367 |
if |
|
368 | if dep_id in after: | |
368 |
after.remove( |
|
369 | after.remove(dep_id) | |
369 | if not after: # time deps met |
|
370 | if not after: # time deps met, maybe run | |
370 | if self.maybe_run(msg_id, raw_msg, follow): |
|
371 | if self.maybe_run(msg_id, raw_msg, follow): | |
371 | self.depending.pop(job) |
|
372 | self.depending.pop(job) | |
372 | for mid in follow: |
|
373 | for mid in follow: |
General Comments 0
You need to be logged in to leave comments.
Login now