##// END OF EJS Templates
minor controller logging adjustments...
MinRK -
Show More
@@ -53,6 +53,7 b' class Heart(object):'
53 def start(self):
53 def start(self):
54 return self.device.start()
54 return self.device.start()
55
55
56
56 class HeartMonitor(LoggingConfigurable):
57 class HeartMonitor(LoggingConfigurable):
57 """A basic HeartMonitor class
58 """A basic HeartMonitor class
58 pingstream: a PUB stream
59 pingstream: a PUB stream
@@ -92,12 +93,12 b' class HeartMonitor(LoggingConfigurable):'
92
93
93 def add_new_heart_handler(self, handler):
94 def add_new_heart_handler(self, handler):
94 """add a new handler for new hearts"""
95 """add a new handler for new hearts"""
95 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
96 self.log.debug("heartbeat::new_heart_handler: %s", handler)
96 self._new_handlers.add(handler)
97 self._new_handlers.add(handler)
97
98
98 def add_heart_failure_handler(self, handler):
99 def add_heart_failure_handler(self, handler):
99 """add a new handler for heart failure"""
100 """add a new handler for heart failure"""
100 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
101 self.log.debug("heartbeat::new heart failure handler: %s", handler)
101 self._failure_handlers.add(handler)
102 self._failure_handlers.add(handler)
102
103
103 def beat(self):
104 def beat(self):
@@ -107,7 +108,7 b' class HeartMonitor(LoggingConfigurable):'
107 toc = time.time()
108 toc = time.time()
108 self.lifetime += toc-self.tic
109 self.lifetime += toc-self.tic
109 self.tic = toc
110 self.tic = toc
110 # self.log.debug("heartbeat::%s"%self.lifetime)
111 self.log.debug("heartbeat::sending %s", self.lifetime)
111 goodhearts = self.hearts.intersection(self.responses)
112 goodhearts = self.hearts.intersection(self.responses)
112 missed_beats = self.hearts.difference(goodhearts)
113 missed_beats = self.hearts.difference(goodhearts)
113 heartfailures = self.on_probation.intersection(missed_beats)
114 heartfailures = self.on_probation.intersection(missed_beats)
@@ -117,7 +118,7 b' class HeartMonitor(LoggingConfigurable):'
117 self.on_probation = missed_beats.intersection(self.hearts)
118 self.on_probation = missed_beats.intersection(self.hearts)
118 self.responses = set()
119 self.responses = set()
119 # print self.on_probation, self.hearts
120 # print self.on_probation, self.hearts
120 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
121 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
121 self.pingstream.send(asbytes(str(self.lifetime)))
122 self.pingstream.send(asbytes(str(self.lifetime)))
122
123
123 def handle_new_heart(self, heart):
124 def handle_new_heart(self, heart):
@@ -125,7 +126,7 b' class HeartMonitor(LoggingConfigurable):'
125 for handler in self._new_handlers:
126 for handler in self._new_handlers:
126 handler(heart)
127 handler(heart)
127 else:
128 else:
128 self.log.info("heartbeat::yay, got new heart %s!"%heart)
129 self.log.info("heartbeat::yay, got new heart %s!", heart)
129 self.hearts.add(heart)
130 self.hearts.add(heart)
130
131
131 def handle_heart_failure(self, heart):
132 def handle_heart_failure(self, heart):
@@ -134,10 +135,10 b' class HeartMonitor(LoggingConfigurable):'
134 try:
135 try:
135 handler(heart)
136 handler(heart)
136 except Exception as e:
137 except Exception as e:
137 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
138 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
138 pass
139 pass
139 else:
140 else:
140 self.log.info("heartbeat::Heart %s failed :("%heart)
141 self.log.info("heartbeat::Heart %s failed :(", heart)
141 self.hearts.remove(heart)
142 self.hearts.remove(heart)
142
143
143
144
@@ -151,11 +152,10 b' class HeartMonitor(LoggingConfigurable):'
151 self.responses.add(msg[0])
152 self.responses.add(msg[0])
152 elif msg[1] == last:
153 elif msg[1] == last:
153 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
154 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
154 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
155 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
155 self.responses.add(msg[0])
156 self.responses.add(msg[0])
156 else:
157 else:
157 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
158 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
158 (msg[1],self.lifetime))
159
159
160
160
161 if __name__ == '__main__':
161 if __name__ == '__main__':
@@ -192,7 +192,7 b' class HubFactory(RegistrationFactory):'
192 self._update_monitor_url()
192 self._update_monitor_url()
193
193
194 def _update_monitor_url(self):
194 def _update_monitor_url(self):
195 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
196
196
197 def _transport_changed(self, name, old, new):
197 def _transport_changed(self, name, old, new):
198 self.engine_transport = new
198 self.engine_transport = new
@@ -214,8 +214,8 b' class HubFactory(RegistrationFactory):'
214
214
215 def init_hub(self):
215 def init_hub(self):
216 """construct"""
216 """construct"""
217 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
218 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
219
219
220 ctx = self.context
220 ctx = self.context
221 loop = self.loop
221 loop = self.loop
@@ -223,10 +223,10 b' class HubFactory(RegistrationFactory):'
223 # Registrar socket
223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 q.bind(client_iface % self.regport)
225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 if self.client_ip != self.engine_ip:
227 if self.client_ip != self.engine_ip:
228 q.bind(engine_iface % self.regport)
228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230
230
231 ### Engine connections ###
231 ### Engine connections ###
232
232
@@ -282,8 +282,8 b' class HubFactory(RegistrationFactory):'
282 'iopub' : client_iface%self.iopub[0],
282 'iopub' : client_iface%self.iopub[0],
283 'notification': client_iface%self.notifier_port
283 'notification': client_iface%self.notifier_port
284 }
284 }
285 self.log.debug("Hub engine addrs: %s"%self.engine_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s"%self.client_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287
287
288 # resubmit stream
288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
@@ -444,7 +444,7 b' class Hub(SessionFactory):'
444 targets = _targets
444 targets = _targets
445 bad_targets = [ t for t in targets if t not in self.ids ]
445 bad_targets = [ t for t in targets if t not in self.ids ]
446 if bad_targets:
446 if bad_targets:
447 raise IndexError("No Such Engine: %r"%bad_targets)
447 raise IndexError("No Such Engine: %r" % bad_targets)
448 if not targets:
448 if not targets:
449 raise IndexError("No Engines Registered")
449 raise IndexError("No Engines Registered")
450 return targets
450 return targets
@@ -457,20 +457,20 b' class Hub(SessionFactory):'
457 def dispatch_monitor_traffic(self, msg):
457 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
458 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r"%msg[:2])
460 self.log.debug("monitor traffic: %r", msg[:2])
461 switch = msg[0]
461 switch = msg[0]
462 try:
462 try:
463 idents, msg = self.session.feed_identities(msg[1:])
463 idents, msg = self.session.feed_identities(msg[1:])
464 except ValueError:
464 except ValueError:
465 idents=[]
465 idents=[]
466 if not idents:
466 if not idents:
467 self.log.error("Bad Monitor Message: %r"%msg)
467 self.log.error("Bad Monitor Message: %r", msg)
468 return
468 return
469 handler = self.monitor_handlers.get(switch, None)
469 handler = self.monitor_handlers.get(switch, None)
470 if handler is not None:
470 if handler is not None:
471 handler(idents, msg)
471 handler(idents, msg)
472 else:
472 else:
473 self.log.error("Invalid monitor topic: %r"%switch)
473 self.log.error("Invalid monitor topic: %r", switch)
474
474
475
475
476 def dispatch_query(self, msg):
476 def dispatch_query(self, msg):
@@ -480,27 +480,27 b' class Hub(SessionFactory):'
480 except ValueError:
480 except ValueError:
481 idents = []
481 idents = []
482 if not idents:
482 if not idents:
483 self.log.error("Bad Query Message: %r"%msg)
483 self.log.error("Bad Query Message: %r", msg)
484 return
484 return
485 client_id = idents[0]
485 client_id = idents[0]
486 try:
486 try:
487 msg = self.session.unserialize(msg, content=True)
487 msg = self.session.unserialize(msg, content=True)
488 except Exception:
488 except Exception:
489 content = error.wrap_exception()
489 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 self.session.send(self.query, "hub_error", ident=client_id,
491 self.session.send(self.query, "hub_error", ident=client_id,
492 content=content)
492 content=content)
493 return
493 return
494 # print client_id, header, parent, content
494 # print client_id, header, parent, content
495 #switch on message type:
495 #switch on message type:
496 msg_type = msg['header']['msg_type']
496 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r"%(client_id, msg_type))
497 self.log.info("client::client %r requested %r", client_id, msg_type)
498 handler = self.query_handlers.get(msg_type, None)
498 handler = self.query_handlers.get(msg_type, None)
499 try:
499 try:
500 assert handler is not None, "Bad Message Type: %r"%msg_type
500 assert handler is not None, "Bad Message Type: %r" % msg_type
501 except:
501 except:
502 content = error.wrap_exception()
502 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 self.session.send(self.query, "hub_error", ident=client_id,
504 self.session.send(self.query, "hub_error", ident=client_id,
505 content=content)
505 content=content)
506 return
506 return
@@ -522,9 +522,9 b' class Hub(SessionFactory):'
522 """handler to attach to heartbeater.
522 """handler to attach to heartbeater.
523 Called when a new heart starts to beat.
523 Called when a new heart starts to beat.
524 Triggers completion of registration."""
524 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 if heart not in self.incoming_registrations:
526 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 else:
528 else:
529 self.finish_registration(heart)
529 self.finish_registration(heart)
530
530
@@ -533,11 +533,11 b' class Hub(SessionFactory):'
533 """handler to attach to heartbeater.
533 """handler to attach to heartbeater.
534 called when a previously registered heart fails to respond to beat request.
534 called when a previously registered heart fails to respond to beat request.
535 triggers unregistration"""
535 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 eid = self.hearts.get(heart, None)
537 eid = self.hearts.get(heart, None)
538 queue = self.engines[eid].queue
538 queue = self.engines[eid].queue
539 if eid is None:
539 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r"%heart)
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 else:
541 else:
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543
543
@@ -545,22 +545,23 b' class Hub(SessionFactory):'
545
545
546 def save_queue_request(self, idents, msg):
546 def save_queue_request(self, idents, msg):
547 if len(idents) < 2:
547 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r"%idents)
548 self.log.error("invalid identity prefix: %r", idents)
549 return
549 return
550 queue_id, client_id = idents[:2]
550 queue_id, client_id = idents[:2]
551 try:
551 try:
552 msg = self.session.unserialize(msg)
552 msg = self.session.unserialize(msg)
553 except Exception:
553 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 return
555 return
556
556
557 eid = self.by_ident.get(queue_id, None)
557 eid = self.by_ident.get(queue_id, None)
558 if eid is None:
558 if eid is None:
559 self.log.error("queue::target %r not registered"%queue_id)
559 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 return
561 return
562 record = init_record(msg)
562 record = init_record(msg)
563 msg_id = record['msg_id']
563 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
564 # Unicode in records
565 # Unicode in records
565 record['engine_uuid'] = queue_id.decode('ascii')
566 record['engine_uuid'] = queue_id.decode('ascii')
566 record['client_uuid'] = client_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
@@ -572,18 +573,18 b' class Hub(SessionFactory):'
572 for key,evalue in existing.iteritems():
573 for key,evalue in existing.iteritems():
573 rvalue = record.get(key, None)
574 rvalue = record.get(key, None)
574 if evalue and rvalue and evalue != rvalue:
575 if evalue and rvalue and evalue != rvalue:
575 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
576 elif evalue and not rvalue:
577 elif evalue and not rvalue:
577 record[key] = evalue
578 record[key] = evalue
578 try:
579 try:
579 self.db.update_record(msg_id, record)
580 self.db.update_record(msg_id, record)
580 except Exception:
581 except Exception:
581 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
582 except KeyError:
583 except KeyError:
583 try:
584 try:
584 self.db.add_record(msg_id, record)
585 self.db.add_record(msg_id, record)
585 except Exception:
586 except Exception:
586 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
587
588
588
589
589 self.pending.add(msg_id)
590 self.pending.add(msg_id)
@@ -591,20 +592,20 b' class Hub(SessionFactory):'
591
592
592 def save_queue_result(self, idents, msg):
593 def save_queue_result(self, idents, msg):
593 if len(idents) < 2:
594 if len(idents) < 2:
594 self.log.error("invalid identity prefix: %r"%idents)
595 self.log.error("invalid identity prefix: %r", idents)
595 return
596 return
596
597
597 client_id, queue_id = idents[:2]
598 client_id, queue_id = idents[:2]
598 try:
599 try:
599 msg = self.session.unserialize(msg)
600 msg = self.session.unserialize(msg)
600 except Exception:
601 except Exception:
601 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
602 queue_id,client_id, msg), exc_info=True)
603 queue_id, client_id, msg, exc_info=True)
603 return
604 return
604
605
605 eid = self.by_ident.get(queue_id, None)
606 eid = self.by_ident.get(queue_id, None)
606 if eid is None:
607 if eid is None:
607 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
608 return
609 return
609
610
610 parent = msg['parent_header']
611 parent = msg['parent_header']
@@ -616,10 +617,11 b' class Hub(SessionFactory):'
616 self.all_completed.add(msg_id)
617 self.all_completed.add(msg_id)
617 self.queues[eid].remove(msg_id)
618 self.queues[eid].remove(msg_id)
618 self.completed[eid].append(msg_id)
619 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
619 elif msg_id not in self.all_completed:
621 elif msg_id not in self.all_completed:
620 # it could be a result from a dead engine that died before delivering the
622 # it could be a result from a dead engine that died before delivering the
621 # result
623 # result
622 self.log.warn("queue:: unknown msg finished %r"%msg_id)
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
623 return
625 return
624 # update record anyway, because the unregistration could have been premature
626 # update record anyway, because the unregistration could have been premature
625 rheader = msg['header']
627 rheader = msg['header']
@@ -636,7 +638,7 b' class Hub(SessionFactory):'
636 try:
638 try:
637 self.db.update_record(msg_id, result)
639 self.db.update_record(msg_id, result)
638 except Exception:
640 except Exception:
639 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
640
642
641
643
642 #--------------------- Task Queue Traffic ------------------------------
644 #--------------------- Task Queue Traffic ------------------------------
@@ -648,8 +650,8 b' class Hub(SessionFactory):'
648 try:
650 try:
649 msg = self.session.unserialize(msg)
651 msg = self.session.unserialize(msg)
650 except Exception:
652 except Exception:
651 self.log.error("task::client %r sent invalid task message: %r"%(
653 self.log.error("task::client %r sent invalid task message: %r",
652 client_id, msg), exc_info=True)
654 client_id, msg, exc_info=True)
653 return
655 return
654 record = init_record(msg)
656 record = init_record(msg)
655
657
@@ -677,20 +679,20 b' class Hub(SessionFactory):'
677 continue
679 continue
678 rvalue = record.get(key, None)
680 rvalue = record.get(key, None)
679 if evalue and rvalue and evalue != rvalue:
681 if evalue and rvalue and evalue != rvalue:
680 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
681 elif evalue and not rvalue:
683 elif evalue and not rvalue:
682 record[key] = evalue
684 record[key] = evalue
683 try:
685 try:
684 self.db.update_record(msg_id, record)
686 self.db.update_record(msg_id, record)
685 except Exception:
687 except Exception:
686 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687 except KeyError:
689 except KeyError:
688 try:
690 try:
689 self.db.add_record(msg_id, record)
691 self.db.add_record(msg_id, record)
690 except Exception:
692 except Exception:
691 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
692 except Exception:
694 except Exception:
693 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
694
696
695 def save_task_result(self, idents, msg):
697 def save_task_result(self, idents, msg):
696 """save the result of a completed task."""
698 """save the result of a completed task."""
@@ -698,14 +700,14 b' class Hub(SessionFactory):'
698 try:
700 try:
699 msg = self.session.unserialize(msg)
701 msg = self.session.unserialize(msg)
700 except Exception:
702 except Exception:
701 self.log.error("task::invalid task result message send to %r: %r"%(
703 self.log.error("task::invalid task result message send to %r: %r",
702 client_id, msg), exc_info=True)
704 client_id, msg, exc_info=True)
703 return
705 return
704
706
705 parent = msg['parent_header']
707 parent = msg['parent_header']
706 if not parent:
708 if not parent:
707 # print msg
709 # print msg
708 self.log.warn("Task %r had no parent!"%msg)
710 self.log.warn("Task %r had no parent!", msg)
709 return
711 return
710 msg_id = parent['msg_id']
712 msg_id = parent['msg_id']
711 if msg_id in self.unassigned:
713 if msg_id in self.unassigned:
@@ -716,6 +718,7 b' class Hub(SessionFactory):'
716 eid = self.by_ident.get(engine_uuid, None)
718 eid = self.by_ident.get(engine_uuid, None)
717
719
718 if msg_id in self.pending:
720 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
719 self.pending.remove(msg_id)
722 self.pending.remove(msg_id)
720 self.all_completed.add(msg_id)
723 self.all_completed.add(msg_id)
721 if eid is not None:
724 if eid is not None:
@@ -736,10 +739,10 b' class Hub(SessionFactory):'
736 try:
739 try:
737 self.db.update_record(msg_id, result)
740 self.db.update_record(msg_id, result)
738 except Exception:
741 except Exception:
739 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
740
743
741 else:
744 else:
742 self.log.debug("task::unknown task %r finished"%msg_id)
745 self.log.debug("task::unknown task %r finished", msg_id)
743
746
744 def save_task_destination(self, idents, msg):
747 def save_task_destination(self, idents, msg):
745 try:
748 try:
@@ -753,7 +756,7 b' class Hub(SessionFactory):'
753 engine_uuid = content['engine_id']
756 engine_uuid = content['engine_id']
754 eid = self.by_ident[util.asbytes(engine_uuid)]
757 eid = self.by_ident[util.asbytes(engine_uuid)]
755
758
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
757 if msg_id in self.unassigned:
760 if msg_id in self.unassigned:
758 self.unassigned.remove(msg_id)
761 self.unassigned.remove(msg_id)
759 # else:
762 # else:
@@ -764,7 +767,7 b' class Hub(SessionFactory):'
764 try:
767 try:
765 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
766 except Exception:
769 except Exception:
767 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
768
771
769
772
770 def mia_task_request(self, idents, msg):
773 def mia_task_request(self, idents, msg):
@@ -787,7 +790,7 b' class Hub(SessionFactory):'
787
790
788 parent = msg['parent_header']
791 parent = msg['parent_header']
789 if not parent:
792 if not parent:
790 self.log.error("iopub::invalid IOPub message: %r"%msg)
793 self.log.error("iopub::invalid IOPub message: %r", msg)
791 return
794 return
792 msg_id = parent['msg_id']
795 msg_id = parent['msg_id']
793 msg_type = msg['header']['msg_type']
796 msg_type = msg['header']['msg_type']
@@ -817,7 +820,7 b' class Hub(SessionFactory):'
817 try:
820 try:
818 self.db.update_record(msg_id, d)
821 self.db.update_record(msg_id, d)
819 except Exception:
822 except Exception:
820 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
821
824
822
825
823
826
@@ -827,7 +830,7 b' class Hub(SessionFactory):'
827
830
828 def connection_request(self, client_id, msg):
831 def connection_request(self, client_id, msg):
829 """Reply with connection addresses for clients."""
832 """Reply with connection addresses for clients."""
830 self.log.info("client::client %r connected"%client_id)
833 self.log.info("client::client %r connected", client_id)
831 content = dict(status='ok')
834 content = dict(status='ok')
832 content.update(self.client_info)
835 content.update(self.client_info)
833 jsonable = {}
836 jsonable = {}
@@ -852,37 +855,37 b' class Hub(SessionFactory):'
852 eid = self._next_id
855 eid = self._next_id
853 # print (eid, queue, reg, heart)
856 # print (eid, queue, reg, heart)
854
857
855 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
856
859
857 content = dict(id=eid,status='ok')
860 content = dict(id=eid,status='ok')
858 content.update(self.engine_info)
861 content.update(self.engine_info)
859 # check if requesting available IDs:
862 # check if requesting available IDs:
860 if queue in self.by_ident:
863 if queue in self.by_ident:
861 try:
864 try:
862 raise KeyError("queue_id %r in use"%queue)
865 raise KeyError("queue_id %r in use" % queue)
863 except:
866 except:
864 content = error.wrap_exception()
867 content = error.wrap_exception()
865 self.log.error("queue_id %r in use"%queue, exc_info=True)
868 self.log.error("queue_id %r in use", queue, exc_info=True)
866 elif heart in self.hearts: # need to check unique hearts?
869 elif heart in self.hearts: # need to check unique hearts?
867 try:
870 try:
868 raise KeyError("heart_id %r in use"%heart)
871 raise KeyError("heart_id %r in use" % heart)
869 except:
872 except:
870 self.log.error("heart_id %r in use"%heart, exc_info=True)
873 self.log.error("heart_id %r in use", heart, exc_info=True)
871 content = error.wrap_exception()
874 content = error.wrap_exception()
872 else:
875 else:
873 for h, pack in self.incoming_registrations.iteritems():
876 for h, pack in self.incoming_registrations.iteritems():
874 if heart == h:
877 if heart == h:
875 try:
878 try:
876 raise KeyError("heart_id %r in use"%heart)
879 raise KeyError("heart_id %r in use" % heart)
877 except:
880 except:
878 self.log.error("heart_id %r in use"%heart, exc_info=True)
881 self.log.error("heart_id %r in use", heart, exc_info=True)
879 content = error.wrap_exception()
882 content = error.wrap_exception()
880 break
883 break
881 elif queue == pack[1]:
884 elif queue == pack[1]:
882 try:
885 try:
883 raise KeyError("queue_id %r in use"%queue)
886 raise KeyError("queue_id %r in use" % queue)
884 except:
887 except:
885 self.log.error("queue_id %r in use"%queue, exc_info=True)
888 self.log.error("queue_id %r in use", queue, exc_info=True)
886 content = error.wrap_exception()
889 content = error.wrap_exception()
887 break
890 break
888
891
@@ -901,7 +904,7 b' class Hub(SessionFactory):'
901 dc.start()
904 dc.start()
902 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
903 else:
906 else:
904 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
905 return eid
908 return eid
906
909
907 def unregister_engine(self, ident, msg):
910 def unregister_engine(self, ident, msg):
@@ -909,9 +912,9 b' class Hub(SessionFactory):'
909 try:
912 try:
910 eid = msg['content']['id']
913 eid = msg['content']['id']
911 except:
914 except:
912 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
913 return
916 return
914 self.log.info("registration::unregister_engine(%r)"%eid)
917 self.log.info("registration::unregister_engine(%r)", eid)
915 # print (eid)
918 # print (eid)
916 uuid = self.keytable[eid]
919 uuid = self.keytable[eid]
917 content=dict(id=eid, queue=uuid.decode('ascii'))
920 content=dict(id=eid, queue=uuid.decode('ascii'))
@@ -945,7 +948,7 b' class Hub(SessionFactory):'
945 self.pending.remove(msg_id)
948 self.pending.remove(msg_id)
946 self.all_completed.add(msg_id)
949 self.all_completed.add(msg_id)
947 try:
950 try:
948 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
949 except:
952 except:
950 content = error.wrap_exception()
953 content = error.wrap_exception()
951 # build a fake header:
954 # build a fake header:
@@ -958,7 +961,7 b' class Hub(SessionFactory):'
958 try:
961 try:
959 self.db.update_record(msg_id, rec)
962 self.db.update_record(msg_id, rec)
960 except Exception:
963 except Exception:
961 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
962
965
963
966
964 def finish_registration(self, heart):
967 def finish_registration(self, heart):
@@ -969,7 +972,7 b' class Hub(SessionFactory):'
969 except KeyError:
972 except KeyError:
970 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
971 return
974 return
972 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
973 if purge is not None:
976 if purge is not None:
974 purge.stop()
977 purge.stop()
975 control = queue
978 control = queue
@@ -985,12 +988,12 b' class Hub(SessionFactory):'
985 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
986 if self.notifier:
989 if self.notifier:
987 self.session.send(self.notifier, "registration_notification", content=content)
990 self.session.send(self.notifier, "registration_notification", content=content)
988 self.log.info("engine::Engine Connected: %i"%eid)
991 self.log.info("engine::Engine Connected: %i", eid)
989
992
990 def _purge_stalled_registration(self, heart):
993 def _purge_stalled_registration(self, heart):
991 if heart in self.incoming_registrations:
994 if heart in self.incoming_registrations:
992 eid = self.incoming_registrations.pop(heart)[0]
995 eid = self.incoming_registrations.pop(heart)[0]
993 self.log.info("registration::purging stalled registration: %i"%eid)
996 self.log.info("registration::purging stalled registration: %i", eid)
994 else:
997 else:
995 pass
998 pass
996
999
@@ -1077,7 +1080,7 b' class Hub(SessionFactory):'
1077 pending = filter(lambda m: m in self.pending, msg_ids)
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1078 if pending:
1081 if pending:
1079 try:
1082 try:
1080 raise IndexError("msg pending: %r"%pending[0])
1083 raise IndexError("msg pending: %r" % pending[0])
1081 except:
1084 except:
1082 reply = error.wrap_exception()
1085 reply = error.wrap_exception()
1083 else:
1086 else:
@@ -1091,7 +1094,7 b' class Hub(SessionFactory):'
1091 for eid in eids:
1094 for eid in eids:
1092 if eid not in self.engines:
1095 if eid not in self.engines:
1093 try:
1096 try:
1094 raise IndexError("No such engine: %i"%eid)
1097 raise IndexError("No such engine: %i" % eid)
1095 except:
1098 except:
1096 reply = error.wrap_exception()
1099 reply = error.wrap_exception()
1097 break
1100 break
@@ -1131,13 +1134,13 b' class Hub(SessionFactory):'
1131 elif len(records) < len(msg_ids):
1134 elif len(records) < len(msg_ids):
1132 missing = [ m for m in msg_ids if m not in found_ids ]
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1133 try:
1136 try:
1134 raise KeyError("No such msg(s): %r"%missing)
1137 raise KeyError("No such msg(s): %r" % missing)
1135 except KeyError:
1138 except KeyError:
1136 return finish(error.wrap_exception())
1139 return finish(error.wrap_exception())
1137 elif invalid_ids:
1140 elif invalid_ids:
1138 msg_id = invalid_ids[0]
1141 msg_id = invalid_ids[0]
1139 try:
1142 try:
1140 raise ValueError("Task %r appears to be inflight"%(msg_id))
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1141 except Exception:
1144 except Exception:
1142 return finish(error.wrap_exception())
1145 return finish(error.wrap_exception())
1143
1146
@@ -225,7 +225,7 b' class TaskScheduler(SessionFactory):'
225 try:
225 try:
226 handler(asbytes(msg['content']['queue']))
226 handler(asbytes(msg['content']['queue']))
227 except Exception:
227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
229
229
230 def _register_engine(self, uid):
230 def _register_engine(self, uid):
231 """New engine with ident `uid` became available."""
231 """New engine with ident `uid` became available."""
General Comments 0
You need to be logged in to leave comments. Login now