##// END OF EJS Templates
rework logging connections
MinRK -
Show More
@@ -480,12 +480,11 b' class ApplicationWithClusterDir(Application):'
480 480 open_log_file = None
481 481 else:
482 482 open_log_file = sys.stdout
483 logger = logging.getLogger()
484 level = self.log_level
485 self.log = logger
486 # since we've reconnected the logger, we need to reconnect the log-level
487 self.log_level = level
488 if open_log_file is not None and self._log_handler not in self.log.handlers:
483 if open_log_file is not None:
484 self.log.removeHandler(self._log_handler)
485 self._log_handler = logging.StreamHandler(open_log_file)
486 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
487 self._log_handler.setFormatter(self._log_formatter)
489 488 self.log.addHandler(self._log_handler)
490 489 # log.startLogging(open_log_file)
491 490
@@ -44,14 +44,12 b' class ControllerFactory(HubFactory):'
44 44 children = List()
45 45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46 46
47 def _update_mq(self):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
47 def _usethreads_changed(self, name, old, new):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49 49
50 50 def __init__(self, **kwargs):
51 51 super(ControllerFactory, self).__init__(**kwargs)
52 52 self.subconstructors.append(self.construct_schedulers)
53 self._update_mq()
54 self.on_trait_change(self._update_mq, 'usethreads')
55 53
56 54 def start(self):
57 55 super(ControllerFactory, self).start()
@@ -91,7 +89,7 b' class ControllerFactory(HubFactory):'
91 89 children.append(q)
92 90 # Task Queue (in a Process)
93 91 if self.scheme == 'pure':
94 logging.warn("task::using pure XREQ Task scheduler")
92 self.log.warn("task::using pure XREQ Task scheduler")
95 93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
96 94 q.bind_in(self.client_addrs['task'])
97 95 q.bind_out(self.engine_addrs['task'])
@@ -99,10 +97,10 b' class ControllerFactory(HubFactory):'
99 97 q.daemon=True
100 98 children.append(q)
101 99 elif self.scheme == 'none':
102 logging.warn("task::using no Task scheduler")
100 self.log.warn("task::using no Task scheduler")
103 101
104 102 else:
105 logging.warn("task::using Python %s Task scheduler"%self.scheme)
103 self.log.warn("task::using Python %s Task scheduler"%self.scheme)
106 104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
107 105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
108 106 q.daemon=True
@@ -25,8 +25,8 b' from streamkernel import Kernel'
25 25 import heartmonitor
26 26
27 27 def printer(*msg):
28 # print (logging.handlers, file=sys.__stdout__)
29 logging.info(str(msg))
28 # print (self.log.handlers, file=sys.__stdout__)
29 self.log.info(str(msg))
30 30
31 31 class EngineFactory(RegistrationFactory):
32 32 """IPython engine"""
@@ -54,7 +54,7 b' class EngineFactory(RegistrationFactory):'
54 54 def register(self):
55 55 """send the registration_request"""
56 56
57 logging.info("registering")
57 self.log.info("registering")
58 58 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
59 59 self.registrar.on_recv(self.complete_registration)
60 60 # print (self.session.key)
@@ -112,10 +112,9 b' class EngineFactory(RegistrationFactory):'
112 112 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
113 113 sys.displayhook.topic = 'engine.%i.pyout'%self.id
114 114
115 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
116 control_stream=control_stream,
117 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
118 user_ns = self.user_ns, config=self.config)
115 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
116 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
117 loop=loop, user_ns = self.user_ns, logname=self.log.name)
119 118 self.kernel.start()
120 119
121 120 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
@@ -124,10 +123,10 b' class EngineFactory(RegistrationFactory):'
124 123
125 124
126 125 else:
127 logging.error("Registration Failed: %s"%msg)
126 self.log.error("Registration Failed: %s"%msg)
128 127 raise Exception("Registration Failed: %s"%msg)
129 128
130 logging.info("Completed registration with id %i"%self.id)
129 self.log.info("Completed registration with id %i"%self.id)
131 130
132 131
133 132 def unregister(self):
@@ -79,8 +79,8 b' def integer_loglevel(loglevel):'
79 79 loglevel = getattr(logging, loglevel)
80 80 return loglevel
81 81
82 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
83 logger = logging.getLogger()
82 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
83 logger = logging.getLogger(logname)
84 84 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
85 85 # don't add a second PUBHandler
86 86 return
@@ -106,9 +106,9 b' def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):'
106 106 logger.addHandler(handler)
107 107 logger.setLevel(loglevel)
108 108
109 def local_logger(loglevel=logging.DEBUG):
109 def local_logger(logname, loglevel=logging.DEBUG):
110 110 loglevel = integer_loglevel(loglevel)
111 logger = logging.getLogger()
111 logger = logging.getLogger(logname)
112 112 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
113 113 # don't add a second StreamHandler
114 114 return
@@ -29,9 +29,15 b' import IPython.zmq.parallel.streamsession as ss'
29 29 #-----------------------------------------------------------------------------
30 30 # Classes
31 31 #-----------------------------------------------------------------------------
32 class LoggingFactory(Configurable):
33 """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
34 log = Instance('logging.Logger', ('ZMQ', logging.WARN))
35 logname = CStr('ZMQ')
36 def _logname_changed(self, name, old, new):
37 self.log = logging.getLogger(new)
32 38
33 39
34 class SessionFactory(Configurable):
40 class SessionFactory(LoggingFactory):
35 41 """The Base factory from which every factory in IPython.zmq.parallel inherits"""
36 42
37 43 packer = Str('',config=True)
@@ -41,14 +47,14 b' class SessionFactory(Configurable):'
41 47 return str(uuid.uuid4())
42 48 username = Str(os.environ.get('USER','username'),config=True)
43 49 exec_key = CUnicode('',config=True)
44
45 50 # not configurable:
46 51 context = Instance('zmq.Context', (), {})
47 52 session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
48 loop = Instance('zmq.eventloop.ioloop.IOLoop')
53 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
49 54 def _loop_default(self):
50 55 return IOLoop.instance()
51 56
57
52 58 def __init__(self, **kwargs):
53 59 super(SessionFactory, self).__init__(**kwargs)
54 60
@@ -13,6 +13,9 b' import zmq'
13 13 from zmq.devices import ProcessDevice,ThreadDevice
14 14 from zmq.eventloop import ioloop, zmqstream
15 15
16 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
17 from factory import LoggingFactory
18
16 19 class Heart(object):
17 20 """A basic heart object for responding to a HeartMonitor.
18 21 This is a simple wrapper with defaults for the most common
@@ -39,36 +42,35 b' class Heart(object):'
39 42 def start(self):
40 43 return self.device.start()
41 44
42 class HeartMonitor(object):
45 class HeartMonitor(LoggingFactory):
43 46 """A basic HeartMonitor class
44 47 pingstream: a PUB stream
45 48 pongstream: an XREP stream
46 49 period: the period of the heartbeat in milliseconds"""
47 loop=None
48 pingstream=None
49 pongstream=None
50 period=None
51 hearts=None
52 on_probation=None
53 last_ping=None
54 # debug=False
55
56 def __init__(self, loop, pingstream, pongstream, period=1000):
57 self.loop = loop
58 self.period = period
59
60 self.pingstream = pingstream
61 self.pongstream = pongstream
62 self.pongstream.on_recv(self.handle_pong)
63 50
64 self.hearts = set()
65 self.responses = set()
66 self.on_probation = set()
67 self.lifetime = 0
68 self.tic = time.time()
51 period=CFloat(1000, config=True) # in milliseconds
52
53 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
54 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
55 loop = Instance('zmq.eventloop.ioloop.IOLoop')
56 def _loop_default(self):
57 return ioloop.IOLoop.instance()
58 debug=Bool(False)
69 59
70 self._new_handlers = set()
71 self._failure_handlers = set()
60 # not settable:
61 hearts=Set()
62 responses=Set()
63 on_probation=Set()
64 last_ping=CFloat(0)
65 _new_handlers = Set()
66 _failure_handlers = Set()
67 lifetime = CFloat(0)
68 tic = CFloat(0)
69
70 def __init__(self, **kwargs):
71 super(HeartMonitor, self).__init__(**kwargs)
72
73 self.pongstream.on_recv(self.handle_pong)
72 74
73 75 def start(self):
74 76 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
@@ -76,12 +78,12 b' class HeartMonitor(object):'
76 78
77 79 def add_new_heart_handler(self, handler):
78 80 """add a new handler for new hearts"""
79 logging.debug("heartbeat::new_heart_handler: %s"%handler)
81 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
80 82 self._new_handlers.add(handler)
81 83
82 84 def add_heart_failure_handler(self, handler):
83 85 """add a new handler for heart failure"""
84 logging.debug("heartbeat::new heart failure handler: %s"%handler)
86 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
85 87 self._failure_handlers.add(handler)
86 88
87 89 def beat(self):
@@ -91,7 +93,7 b' class HeartMonitor(object):'
91 93 toc = time.time()
92 94 self.lifetime += toc-self.tic
93 95 self.tic = toc
94 # logging.debug("heartbeat::%s"%self.lifetime)
96 # self.log.debug("heartbeat::%s"%self.lifetime)
95 97 goodhearts = self.hearts.intersection(self.responses)
96 98 missed_beats = self.hearts.difference(goodhearts)
97 99 heartfailures = self.on_probation.intersection(missed_beats)
@@ -101,7 +103,7 b' class HeartMonitor(object):'
101 103 self.on_probation = missed_beats.intersection(self.hearts)
102 104 self.responses = set()
103 105 # print self.on_probation, self.hearts
104 # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
106 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
105 107 self.pingstream.send(str(self.lifetime))
106 108
107 109 def handle_new_heart(self, heart):
@@ -109,7 +111,7 b' class HeartMonitor(object):'
109 111 for handler in self._new_handlers:
110 112 handler(heart)
111 113 else:
112 logging.info("heartbeat::yay, got new heart %s!"%heart)
114 self.log.info("heartbeat::yay, got new heart %s!"%heart)
113 115 self.hearts.add(heart)
114 116
115 117 def handle_heart_failure(self, heart):
@@ -118,10 +120,10 b' class HeartMonitor(object):'
118 120 try:
119 121 handler(heart)
120 122 except Exception as e:
121 logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
123 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
122 124 pass
123 125 else:
124 logging.info("heartbeat::Heart %s failed :("%heart)
126 self.log.info("heartbeat::Heart %s failed :("%heart)
125 127 self.hearts.remove(heart)
126 128
127 129
@@ -129,14 +131,14 b' class HeartMonitor(object):'
129 131 "a heart just beat"
130 132 if msg[1] == str(self.lifetime):
131 133 delta = time.time()-self.tic
132 # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
134 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
133 135 self.responses.add(msg[0])
134 136 elif msg[1] == str(self.last_ping):
135 137 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
136 logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
138 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
137 139 self.responses.add(msg[0])
138 140 else:
139 logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
141 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
140 142 (msg[1],self.lifetime))
141 143
142 144
@@ -30,7 +30,7 b' from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, Li'
30 30 from IPython.utils.importstring import import_item
31 31
32 32 from entry_point import select_random_ports
33 from factory import RegistrationFactory
33 from factory import RegistrationFactory, LoggingFactory
34 34
35 35 from streamsession import Message, wrap_exception, ISO8601
36 36 from heartmonitor import HeartMonitor
@@ -96,10 +96,6 b' class EngineConnector(HasTraits):'
96 96 heartbeat=Str()
97 97 pending=Set()
98 98
99 def __init__(self, **kwargs):
100 super(EngineConnector, self).__init__(**kwargs)
101 logging.info("engine::Engine Connected: %i"%self.id)
102
103 99 class HubFactory(RegistrationFactory):
104 100 """The Configurable for setting up a Hub."""
105 101
@@ -193,7 +189,7 b' class HubFactory(RegistrationFactory):'
193 189 def start(self):
194 190 assert self._constructed, "must be constructed by self.construct() first!"
195 191 self.heartmonitor.start()
196 logging.info("Heartmonitor started")
192 self.log.info("Heartmonitor started")
197 193
198 194 def construct_hub(self):
199 195 """construct"""
@@ -206,10 +202,10 b' class HubFactory(RegistrationFactory):'
206 202 # Registrar socket
207 203 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
208 204 reg.bind(client_iface % self.regport)
209 logging.info("Hub listening on %s for registration."%(client_iface%self.regport))
205 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
210 206 if self.client_ip != self.engine_ip:
211 207 reg.bind(engine_iface % self.regport)
212 logging.info("Hub listening on %s for registration."%(engine_iface%self.regport))
208 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
213 209
214 210 ### Engine connections ###
215 211
@@ -218,8 +214,8 b' class HubFactory(RegistrationFactory):'
218 214 hpub.bind(engine_iface % self.hb[0])
219 215 hrep = ctx.socket(zmq.XREP)
220 216 hrep.bind(engine_iface % self.hb[1])
221
222 self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping)
217 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
218 period=self.ping, logname=self.log.name)
223 219
224 220 ### Client connections ###
225 221 # Clientele socket
@@ -259,14 +255,15 b' class HubFactory(RegistrationFactory):'
259 255 'iopub' : client_iface%self.iopub[0],
260 256 'notification': client_iface%self.notifier_port
261 257 }
262 logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
263 logging.debug("hub::Hub client addrs: %s"%self.client_addrs)
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
264 260 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
265 261 registrar=reg, clientele=c, notifier=n, db=self.db,
266 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs)
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
263 logname=self.log.name)
267 264
268 265
269 class Hub(HasTraits):
266 class Hub(LoggingFactory):
270 267 """The IPython Controller Hub with 0MQ connections
271 268
272 269 Parameters
@@ -371,7 +368,7 b' class Hub(HasTraits):'
371 368 'connection_request': self.connection_request,
372 369 }
373 370
374 logging.info("hub::created hub")
371 self.log.info("hub::created hub")
375 372
376 373 @property
377 374 def _next_id(self):
@@ -422,7 +419,7 b' class Hub(HasTraits):'
422 419 try:
423 420 msg = self.session.unpack_message(msg[1:], content=True)
424 421 except:
425 logging.error("client::Invalid Message %s"%msg, exc_info=True)
422 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
426 423 return False
427 424
428 425 msg_type = msg.get('msg_type', None)
@@ -439,15 +436,15 b' class Hub(HasTraits):'
439 436
440 437 def dispatch_register_request(self, msg):
441 438 """"""
442 logging.debug("registration::dispatch_register_request(%s)"%msg)
439 self.log.debug("registration::dispatch_register_request(%s)"%msg)
443 440 idents,msg = self.session.feed_identities(msg)
444 441 if not idents:
445 logging.error("Bad Queue Message: %s"%msg, exc_info=True)
442 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
446 443 return
447 444 try:
448 445 msg = self.session.unpack_message(msg,content=True)
449 446 except:
450 logging.error("registration::got bad registration message: %s"%msg, exc_info=True)
447 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
451 448 return
452 449
453 450 msg_type = msg['msg_type']
@@ -455,38 +452,38 b' class Hub(HasTraits):'
455 452
456 453 handler = self.registrar_handlers.get(msg_type, None)
457 454 if handler is None:
458 logging.error("registration::got bad registration message: %s"%msg)
455 self.log.error("registration::got bad registration message: %s"%msg)
459 456 else:
460 457 handler(idents, msg)
461 458
462 459 def dispatch_monitor_traffic(self, msg):
463 460 """all ME and Task queue messages come through here, as well as
464 461 IOPub traffic."""
465 logging.debug("monitor traffic: %s"%msg[:2])
462 self.log.debug("monitor traffic: %s"%msg[:2])
466 463 switch = msg[0]
467 464 idents, msg = self.session.feed_identities(msg[1:])
468 465 if not idents:
469 logging.error("Bad Monitor Message: %s"%msg)
466 self.log.error("Bad Monitor Message: %s"%msg)
470 467 return
471 468 handler = self.monitor_handlers.get(switch, None)
472 469 if handler is not None:
473 470 handler(idents, msg)
474 471 else:
475 logging.error("Invalid monitor topic: %s"%switch)
472 self.log.error("Invalid monitor topic: %s"%switch)
476 473
477 474
478 475 def dispatch_client_msg(self, msg):
479 476 """Route messages from clients"""
480 477 idents, msg = self.session.feed_identities(msg)
481 478 if not idents:
482 logging.error("Bad Client Message: %s"%msg)
479 self.log.error("Bad Client Message: %s"%msg)
483 480 return
484 481 client_id = idents[0]
485 482 try:
486 483 msg = self.session.unpack_message(msg, content=True)
487 484 except:
488 485 content = wrap_exception()
489 logging.error("Bad Client Message: %s"%msg, exc_info=True)
486 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
490 487 self.session.send(self.clientele, "hub_error", ident=client_id,
491 488 content=content)
492 489 return
@@ -494,13 +491,13 b' class Hub(HasTraits):'
494 491 # print client_id, header, parent, content
495 492 #switch on message type:
496 493 msg_type = msg['msg_type']
497 logging.info("client:: client %s requested %s"%(client_id, msg_type))
494 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
498 495 handler = self.client_handlers.get(msg_type, None)
499 496 try:
500 497 assert handler is not None, "Bad Message Type: %s"%msg_type
501 498 except:
502 499 content = wrap_exception()
503 logging.error("Bad Message Type: %s"%msg_type, exc_info=True)
500 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
504 501 self.session.send(self.clientele, "hub_error", ident=client_id,
505 502 content=content)
506 503 return
@@ -521,9 +518,9 b' class Hub(HasTraits):'
521 518 """handler to attach to heartbeater.
522 519 Called when a new heart starts to beat.
523 520 Triggers completion of registration."""
524 logging.debug("heartbeat::handle_new_heart(%r)"%heart)
521 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 522 if heart not in self.incoming_registrations:
526 logging.info("heartbeat::ignoring new heart: %r"%heart)
523 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 524 else:
528 525 self.finish_registration(heart)
529 526
@@ -532,11 +529,11 b' class Hub(HasTraits):'
532 529 """handler to attach to heartbeater.
533 530 called when a previously registered heart fails to respond to beat request.
534 531 triggers unregistration"""
535 logging.debug("heartbeat::handle_heart_failure(%r)"%heart)
532 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 533 eid = self.hearts.get(heart, None)
537 534 queue = self.engines[eid].queue
538 535 if eid is None:
539 logging.info("heartbeat::ignoring heart failure %r"%heart)
536 self.log.info("heartbeat::ignoring heart failure %r"%heart)
540 537 else:
541 538 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542 539
@@ -544,19 +541,19 b' class Hub(HasTraits):'
544 541
545 542 def save_queue_request(self, idents, msg):
546 543 if len(idents) < 2:
547 logging.error("invalid identity prefix: %s"%idents)
544 self.log.error("invalid identity prefix: %s"%idents)
548 545 return
549 546 queue_id, client_id = idents[:2]
550 547 try:
551 548 msg = self.session.unpack_message(msg, content=False)
552 549 except:
553 logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
550 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
554 551 return
555 552
556 553 eid = self.by_ident.get(queue_id, None)
557 554 if eid is None:
558 logging.error("queue::target %r not registered"%queue_id)
559 logging.debug("queue:: valid are: %s"%(self.by_ident.keys()))
555 self.log.error("queue::target %r not registered"%queue_id)
556 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
560 557 return
561 558
562 559 header = msg['header']
@@ -573,21 +570,21 b' class Hub(HasTraits):'
573 570
574 571 def save_queue_result(self, idents, msg):
575 572 if len(idents) < 2:
576 logging.error("invalid identity prefix: %s"%idents)
573 self.log.error("invalid identity prefix: %s"%idents)
577 574 return
578 575
579 576 client_id, queue_id = idents[:2]
580 577 try:
581 578 msg = self.session.unpack_message(msg, content=False)
582 579 except:
583 logging.error("queue::engine %r sent invalid message to %r: %s"%(
580 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 581 queue_id,client_id, msg), exc_info=True)
585 582 return
586 583
587 584 eid = self.by_ident.get(queue_id, None)
588 585 if eid is None:
589 logging.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 logging.debug("queue:: %s"%msg[2:])
586 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
587 self.log.debug("queue:: %s"%msg[2:])
591 588 return
592 589
593 590 parent = msg['parent_header']
@@ -616,7 +613,7 b' class Hub(HasTraits):'
616 613 result['result_buffers'] = msg['buffers']
617 614 self.db.update_record(msg_id, result)
618 615 else:
619 logging.debug("queue:: unknown msg finished %s"%msg_id)
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
620 617
621 618 #--------------------- Task Queue Traffic ------------------------------
622 619
@@ -627,7 +624,7 b' class Hub(HasTraits):'
627 624 try:
628 625 msg = self.session.unpack_message(msg, content=False)
629 626 except:
630 logging.error("task::client %r sent invalid task message: %s"%(
627 self.log.error("task::client %r sent invalid task message: %s"%(
631 628 client_id, msg), exc_info=True)
632 629 return
633 630 record = init_record(msg)
@@ -646,7 +643,7 b' class Hub(HasTraits):'
646 643 try:
647 644 msg = self.session.unpack_message(msg, content=False)
648 645 except:
649 logging.error("task::invalid task result message send to %r: %s"%(
646 self.log.error("task::invalid task result message send to %r: %s"%(
650 647 client_id, msg), exc_info=True)
651 648 raise
652 649 return
@@ -654,7 +651,7 b' class Hub(HasTraits):'
654 651 parent = msg['parent_header']
655 652 if not parent:
656 653 # print msg
657 logging.warn("Task %r had no parent!"%msg)
654 self.log.warn("Task %r had no parent!"%msg)
658 655 return
659 656 msg_id = parent['msg_id']
660 657
@@ -687,13 +684,13 b' class Hub(HasTraits):'
687 684 self.db.update_record(msg_id, result)
688 685
689 686 else:
690 logging.debug("task::unknown task %s finished"%msg_id)
687 self.log.debug("task::unknown task %s finished"%msg_id)
691 688
692 689 def save_task_destination(self, idents, msg):
693 690 try:
694 691 msg = self.session.unpack_message(msg, content=True)
695 692 except:
696 logging.error("task::invalid task tracking message", exc_info=True)
693 self.log.error("task::invalid task tracking message", exc_info=True)
697 694 return
698 695 content = msg['content']
699 696 print (content)
@@ -701,11 +698,11 b' class Hub(HasTraits):'
701 698 engine_uuid = content['engine_id']
702 699 eid = self.by_ident[engine_uuid]
703 700
704 logging.info("task::task %s arrived on %s"%(msg_id, eid))
701 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
705 702 # if msg_id in self.mia:
706 703 # self.mia.remove(msg_id)
707 704 # else:
708 # logging.debug("task::task %s not listed as MIA?!"%(msg_id))
705 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
709 706
710 707 self.tasks[eid].append(msg_id)
711 708 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
@@ -726,12 +723,12 b' class Hub(HasTraits):'
726 723 try:
727 724 msg = self.session.unpack_message(msg, content=True)
728 725 except:
729 logging.error("iopub::invalid IOPub message", exc_info=True)
726 self.log.error("iopub::invalid IOPub message", exc_info=True)
730 727 return
731 728
732 729 parent = msg['parent_header']
733 730 if not parent:
734 logging.error("iopub::invalid IOPub message: %s"%msg)
731 self.log.error("iopub::invalid IOPub message: %s"%msg)
735 732 return
736 733 msg_id = parent['msg_id']
737 734 msg_type = msg['msg_type']
@@ -741,7 +738,7 b' class Hub(HasTraits):'
741 738 try:
742 739 rec = self.db.get_record(msg_id)
743 740 except:
744 logging.error("iopub::IOPub message has invalid parent", exc_info=True)
741 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
745 742 return
746 743 # stream
747 744 d = {}
@@ -765,7 +762,7 b' class Hub(HasTraits):'
765 762
766 763 def connection_request(self, client_id, msg):
767 764 """Reply with connection addresses for clients."""
768 logging.info("client::client %s connected"%client_id)
765 self.log.info("client::client %s connected"%client_id)
769 766 content = dict(status='ok')
770 767 content.update(self.client_addrs)
771 768 jsonable = {}
@@ -780,14 +777,14 b' class Hub(HasTraits):'
780 777 try:
781 778 queue = content['queue']
782 779 except KeyError:
783 logging.error("registration::queue not specified", exc_info=True)
780 self.log.error("registration::queue not specified", exc_info=True)
784 781 return
785 782 heart = content.get('heartbeat', None)
786 783 """register a new engine, and create the socket(s) necessary"""
787 784 eid = self._next_id
788 785 # print (eid, queue, reg, heart)
789 786
790 logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
787 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
791 788
792 789 content = dict(id=eid,status='ok')
793 790 content.update(self.engine_addrs)
@@ -797,12 +794,12 b' class Hub(HasTraits):'
797 794 raise KeyError("queue_id %r in use"%queue)
798 795 except:
799 796 content = wrap_exception()
800 logging.error("queue_id %r in use"%queue, exc_info=True)
797 self.log.error("queue_id %r in use"%queue, exc_info=True)
801 798 elif heart in self.hearts: # need to check unique hearts?
802 799 try:
803 800 raise KeyError("heart_id %r in use"%heart)
804 801 except:
805 logging.error("heart_id %r in use"%heart, exc_info=True)
802 self.log.error("heart_id %r in use"%heart, exc_info=True)
806 803 content = wrap_exception()
807 804 else:
808 805 for h, pack in self.incoming_registrations.iteritems():
@@ -810,14 +807,14 b' class Hub(HasTraits):'
810 807 try:
811 808 raise KeyError("heart_id %r in use"%heart)
812 809 except:
813 logging.error("heart_id %r in use"%heart, exc_info=True)
810 self.log.error("heart_id %r in use"%heart, exc_info=True)
814 811 content = wrap_exception()
815 812 break
816 813 elif queue == pack[1]:
817 814 try:
818 815 raise KeyError("queue_id %r in use"%queue)
819 816 except:
820 logging.error("queue_id %r in use"%queue, exc_info=True)
817 self.log.error("queue_id %r in use"%queue, exc_info=True)
821 818 content = wrap_exception()
822 819 break
823 820
@@ -836,7 +833,7 b' class Hub(HasTraits):'
836 833 dc.start()
837 834 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
838 835 else:
839 logging.error("registration::registration %i failed: %s"%(eid, content['evalue']))
836 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
840 837 return eid
841 838
842 839 def unregister_engine(self, ident, msg):
@@ -844,9 +841,9 b' class Hub(HasTraits):'
844 841 try:
845 842 eid = msg['content']['id']
846 843 except:
847 logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
844 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
848 845 return
849 logging.info("registration::unregister_engine(%s)"%eid)
846 self.log.info("registration::unregister_engine(%s)"%eid)
850 847 content=dict(id=eid, queue=self.engines[eid].queue)
851 848 self.ids.remove(eid)
852 849 self.keytable.pop(eid)
@@ -867,9 +864,9 b' class Hub(HasTraits):'
867 864 try:
868 865 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
869 866 except KeyError:
870 logging.error("registration::tried to finish nonexistant registration", exc_info=True)
867 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
871 868 return
872 logging.info("registration::finished registering engine %i:%r"%(eid,queue))
869 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
873 870 if purge is not None:
874 871 purge.stop()
875 872 control = queue
@@ -885,11 +882,12 b' class Hub(HasTraits):'
885 882 content = dict(id=eid, queue=self.engines[eid].queue)
886 883 if self.notifier:
887 884 self.session.send(self.notifier, "registration_notification", content=content)
885 self.log.info("engine::Engine Connected: %i"%eid)
888 886
889 887 def _purge_stalled_registration(self, heart):
890 888 if heart in self.incoming_registrations:
891 889 eid = self.incoming_registrations.pop(heart)[0]
892 logging.info("registration::purging stalled registration: %i"%eid)
890 self.log.info("registration::purging stalled registration: %i"%eid)
893 891 else:
894 892 pass
895 893
@@ -910,7 +908,7 b' class Hub(HasTraits):'
910 908 dc.start()
911 909
912 910 def _shutdown(self):
913 logging.info("hub::hub shutting down.")
911 self.log.info("hub::hub shutting down.")
914 912 time.sleep(0.1)
915 913 sys.exit(0)
916 914
@@ -314,11 +314,12 b' class IPClusterApp(ApplicationWithClusterDir):'
314 314 # and engine will be launched.
315 315 el_class = import_item(config.Global.engine_launcher)
316 316 self.engine_launcher = el_class(
317 work_dir=self.cluster_dir, config=config
317 work_dir=self.cluster_dir, config=config, logname=self.log.name
318 318 )
319 319 cl_class = import_item(config.Global.controller_launcher)
320 320 self.controller_launcher = cl_class(
321 work_dir=self.cluster_dir, config=config
321 work_dir=self.cluster_dir, config=config,
322 logname=self.log.name
322 323 )
323 324
324 325 # Setup signals
@@ -348,11 +349,11 b' class IPClusterApp(ApplicationWithClusterDir):'
348 349 return d
349 350
350 351 def startup_message(self, r=None):
351 logging.info("IPython cluster: started")
352 self.log.info("IPython cluster: started")
352 353 return r
353 354
354 355 def start_controller(self, r=None):
355 # logging.info("In start_controller")
356 # self.log.info("In start_controller")
356 357 config = self.master_config
357 358 d = self.controller_launcher.start(
358 359 cluster_dir=config.Global.cluster_dir
@@ -360,7 +361,7 b' class IPClusterApp(ApplicationWithClusterDir):'
360 361 return d
361 362
362 363 def start_engines(self, r=None):
363 # logging.info("In start_engines")
364 # self.log.info("In start_engines")
364 365 config = self.master_config
365 366 d = self.engine_launcher.start(
366 367 config.Global.n,
@@ -369,12 +370,12 b' class IPClusterApp(ApplicationWithClusterDir):'
369 370 return d
370 371
371 372 def stop_controller(self, r=None):
372 # logging.info("In stop_controller")
373 # self.log.info("In stop_controller")
373 374 if self.controller_launcher.running:
374 375 return self.controller_launcher.stop()
375 376
376 377 def stop_engines(self, r=None):
377 # logging.info("In stop_engines")
378 # self.log.info("In stop_engines")
378 379 if self.engine_launcher.running:
379 380 d = self.engine_launcher.stop()
380 381 # d.addErrback(self.log_err)
@@ -383,16 +384,16 b' class IPClusterApp(ApplicationWithClusterDir):'
383 384 return None
384 385
385 386 def log_err(self, f):
386 logging.error(f.getTraceback())
387 self.log.error(f.getTraceback())
387 388 return None
388 389
389 390 def stop_launchers(self, r=None):
390 391 if not self._stopping:
391 392 self._stopping = True
392 393 # if isinstance(r, failure.Failure):
393 # logging.error('Unexpected error in ipcluster:')
394 # logging.info(r.getTraceback())
395 logging.error("IPython cluster: stopping")
394 # self.log.error('Unexpected error in ipcluster:')
395 # self.log.info(r.getTraceback())
396 self.log.error("IPython cluster: stopping")
396 397 # These return deferreds. We are not doing anything with them
397 398 # but we are holding refs to them as a reminder that they
398 399 # do return deferreds.
@@ -462,7 +463,7 b' class IPClusterApp(ApplicationWithClusterDir):'
462 463 try:
463 464 self.loop.start()
464 465 except:
465 logging.info("stopping...")
466 self.log.info("stopping...")
466 467 self.remove_pid_file()
467 468
468 469 def start_app_stop(self):
@@ -279,7 +279,7 b' class IPControllerApp(ApplicationWithClusterDir):'
279 279 c.SessionFactory.exec_key = ''
280 280
281 281 try:
282 self.factory = ControllerFactory(config=c)
282 self.factory = ControllerFactory(config=c, logname=self.log.name)
283 283 self.start_logging()
284 284 self.factory.construct()
285 285 except:
@@ -193,7 +193,7 b' class IPEngineApp(ApplicationWithClusterDir):'
193 193 # Create the underlying shell class and EngineService
194 194 # shell_class = import_item(self.master_config.Global.shell_class)
195 195 try:
196 self.engine = EngineFactory(config=config)
196 self.engine = EngineFactory(config=config, logname=self.log.name)
197 197 except:
198 198 self.log.error("Couldn't start the Engine", exc_info=True)
199 199 self.exit(1)
@@ -107,7 +107,7 b' class IPLoggerApp(ApplicationWithClusterDir):'
107 107 self.start_logging()
108 108
109 109 try:
110 self.watcher = LogWatcher(config=self.master_config)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 111 except:
112 112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 113 self.exit(1)
@@ -115,6 +115,7 b' class IPLoggerApp(ApplicationWithClusterDir):'
115 115
116 116 def start_app(self):
117 117 try:
118 self.watcher.start()
118 119 self.watcher.loop.start()
119 120 except KeyboardInterrupt:
120 121 self.log.critical("Logging Interrupted, shutting down...\n")
@@ -30,11 +30,12 b' from subprocess import Popen, PIPE'
30 30
31 31 from zmq.eventloop import ioloop
32 32
33 from IPython.config.configurable import Configurable
33 # from IPython.config.configurable import Configurable
34 34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 35 from IPython.utils.path import get_ipython_module_path
36 36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37 37
38 from factory import LoggingFactory
38 39 # from IPython.kernel.winhpcjob import (
39 40 # IPControllerTask, IPEngineTask,
40 41 # IPControllerJob, IPEngineSetJob
@@ -75,7 +76,7 b' class UnknownStatus(LauncherError):'
75 76 pass
76 77
77 78
78 class BaseLauncher(Configurable):
79 class BaseLauncher(LoggingFactory):
79 80 """An asbtraction for starting, stopping and signaling a process."""
80 81
81 82 # In all of the launchers, the work_dir is where child processes will be
@@ -90,8 +91,8 b' class BaseLauncher(Configurable):'
90 91 def _loop_default(self):
91 92 return ioloop.IOLoop.instance()
92 93
93 def __init__(self, work_dir=u'.', config=None):
94 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config)
94 def __init__(self, work_dir=u'.', config=None, **kwargs):
95 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
95 96 self.state = 'before' # can be before, running, after
96 97 self.stop_callbacks = []
97 98 self.start_data = None
@@ -163,7 +164,7 b' class BaseLauncher(Configurable):'
163 164 a pass-through so it can be used as a callback.
164 165 """
165 166
166 logging.info('Process %r started: %r' % (self.args[0], data))
167 self.log.info('Process %r started: %r' % (self.args[0], data))
167 168 self.start_data = data
168 169 self.state = 'running'
169 170 return data
@@ -174,7 +175,7 b' class BaseLauncher(Configurable):'
174 175 This logs the process stopping and sets the state to 'after'. Call
175 176 this to trigger all the deferreds from :func:`observe_stop`."""
176 177
177 logging.info('Process %r stopped: %r' % (self.args[0], data))
178 self.log.info('Process %r stopped: %r' % (self.args[0], data))
178 179 self.stop_data = data
179 180 self.state = 'after'
180 181 for i in range(len(self.stop_callbacks)):
@@ -212,9 +213,9 b' class LocalProcessLauncher(BaseLauncher):'
212 213 cmd_and_args = List([])
213 214 poll_frequency = Int(100) # in ms
214 215
215 def __init__(self, work_dir=u'.', config=None):
216 def __init__(self, work_dir=u'.', config=None, **kwargs):
216 217 super(LocalProcessLauncher, self).__init__(
217 work_dir=work_dir, config=config
218 work_dir=work_dir, config=config, **kwargs
218 219 )
219 220 self.process = None
220 221 self.start_deferred = None
@@ -259,7 +260,7 b' class LocalProcessLauncher(BaseLauncher):'
259 260 line = self.process.stdout.readline()
260 261 # a stopped process will be readable but return empty strings
261 262 if line:
262 logging.info(line[:-1])
263 self.log.info(line[:-1])
263 264 else:
264 265 self.poll()
265 266
@@ -267,7 +268,7 b' class LocalProcessLauncher(BaseLauncher):'
267 268 line = self.process.stderr.readline()
268 269 # a stopped process will be readable but return empty strings
269 270 if line:
270 logging.error(line[:-1])
271 self.log.error(line[:-1])
271 272 else:
272 273 self.poll()
273 274
@@ -294,7 +295,7 b' class LocalControllerLauncher(LocalProcessLauncher):'
294 295 """Start the controller by cluster_dir."""
295 296 self.controller_args.extend(['--cluster-dir', cluster_dir])
296 297 self.cluster_dir = unicode(cluster_dir)
297 logging.info("Starting LocalControllerLauncher: %r" % self.args)
298 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
298 299 return super(LocalControllerLauncher, self).start()
299 300
300 301
@@ -327,9 +328,9 b' class LocalEngineSetLauncher(BaseLauncher):'
327 328 # launcher class
328 329 launcher_class = LocalEngineLauncher
329 330
330 def __init__(self, work_dir=u'.', config=None):
331 def __init__(self, work_dir=u'.', config=None, **kwargs):
331 332 super(LocalEngineSetLauncher, self).__init__(
332 work_dir=work_dir, config=config
333 work_dir=work_dir, config=config, **kwargs
333 334 )
334 335 self.launchers = {}
335 336 self.stop_data = {}
@@ -339,14 +340,14 b' class LocalEngineSetLauncher(BaseLauncher):'
339 340 self.cluster_dir = unicode(cluster_dir)
340 341 dlist = []
341 342 for i in range(n):
342 el = self.launcher_class(work_dir=self.work_dir, config=self.config)
343 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
343 344 # Copy the engine args over to each engine launcher.
344 345 import copy
345 346 el.engine_args = copy.deepcopy(self.engine_args)
346 347 el.on_stop(self._notice_engine_stopped)
347 348 d = el.start(cluster_dir)
348 349 if i==0:
349 logging.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 351 self.launchers[i] = el
351 352 dlist.append(d)
352 353 self.notify_start(dlist)
@@ -431,7 +432,7 b' class MPIExecControllerLauncher(MPIExecLauncher):'
431 432 """Start the controller by cluster_dir."""
432 433 self.controller_args.extend(['--cluster-dir', cluster_dir])
433 434 self.cluster_dir = unicode(cluster_dir)
434 logging.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 436 return super(MPIExecControllerLauncher, self).start(1)
436 437
437 438 def find_args(self):
@@ -453,7 +454,7 b' class MPIExecEngineSetLauncher(MPIExecLauncher):'
453 454 self.engine_args.extend(['--cluster-dir', cluster_dir])
454 455 self.cluster_dir = unicode(cluster_dir)
455 456 self.n = n
456 logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 458 return super(MPIExecEngineSetLauncher, self).start(n)
458 459
459 460 def find_args(self):
@@ -572,7 +573,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
572 573 # else:
573 574 # raise LauncherError("Job id couldn't be determined: %s" % output)
574 575 # self.job_id = job_id
575 # logging.info('Job started with job id: %r' % job_id)
576 # self.log.info('Job started with job id: %r' % job_id)
576 577 # return job_id
577 578 #
578 579 # @inlineCallbacks
@@ -584,7 +585,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
584 585 # '/jobfile:%s' % self.job_file,
585 586 # '/scheduler:%s' % self.scheduler
586 587 # ]
587 # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 589 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 590 # output = yield getProcessOutput(str(self.job_cmd),
590 591 # [str(a) for a in args],
@@ -602,7 +603,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
602 603 # self.job_id,
603 604 # '/scheduler:%s' % self.scheduler
604 605 # ]
605 # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 607 # try:
607 608 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
608 609 # output = yield getProcessOutput(str(self.job_cmd),
@@ -633,7 +634,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
633 634 # t.controller_args.extend(self.extra_args)
634 635 # job.add_task(t)
635 636 #
636 # logging.info("Writing job description file: %s" % self.job_file)
637 # self.log.info("Writing job description file: %s" % self.job_file)
637 638 # job.write(self.job_file)
638 639 #
639 640 # @property
@@ -665,7 +666,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
665 666 # t.engine_args.extend(self.extra_args)
666 667 # job.add_task(t)
667 668 #
668 # logging.info("Writing job description file: %s" % self.job_file)
669 # self.log.info("Writing job description file: %s" % self.job_file)
669 670 # job.write(self.job_file)
670 671 #
671 672 # @property
@@ -729,14 +730,14 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
729 730 # else:
730 731 # raise LauncherError("Job id couldn't be determined: %s" % output)
731 732 # self.job_id = job_id
732 # logging.info('Job started with job id: %r' % job_id)
733 # self.log.info('Job started with job id: %r' % job_id)
733 734 # return job_id
734 735 #
735 736 # def write_batch_script(self, n):
736 737 # """Instantiate and write the batch script to the work_dir."""
737 738 # self.context['n'] = n
738 739 # script_as_string = Itpl.itplns(self.batch_template, self.context)
739 # logging.info('Writing instantiated batch script: %s' % self.batch_file)
740 # self.log.info('Writing instantiated batch script: %s' % self.batch_file)
740 741 # f = open(self.batch_file, 'w')
741 742 # f.write(script_as_string)
742 743 # f.close()
@@ -783,7 +784,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
783 784 # # ${cluster_dir}
784 785 # self.context['cluster_dir'] = cluster_dir
785 786 # self.cluster_dir = unicode(cluster_dir)
786 # logging.info("Starting PBSControllerLauncher: %r" % self.args)
787 # self.log.info("Starting PBSControllerLauncher: %r" % self.args)
787 788 # return super(PBSControllerLauncher, self).start(1)
788 789 #
789 790 #
@@ -795,7 +796,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
795 796 # """Start n engines by profile or cluster_dir."""
796 797 # self.program_args.extend(['--cluster-dir', cluster_dir])
797 798 # self.cluster_dir = unicode(cluster_dir)
798 # logging.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 800 # return super(PBSEngineSetLauncher, self).start(n)
800 801
801 802
@@ -819,6 +820,6 b' class IPClusterLauncher(LocalProcessLauncher):'
819 820 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
820 821
821 822 def start(self):
822 logging.info("Starting ipcluster: %r" % self.args)
823 self.log.info("Starting ipcluster: %r" % self.args)
823 824 return super(IPClusterLauncher, self).start()
824 825
@@ -18,15 +18,16 b' import logging'
18 18
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 from IPython.config.configurable import Configurable
22 21 from IPython.utils.traitlets import Int, Str, Instance, List
23 22
23 from factory import LoggingFactory
24
24 25 #-----------------------------------------------------------------------------
25 26 # Classes
26 27 #-----------------------------------------------------------------------------
27 28
28 29
29 class LogWatcher(Configurable):
30 class LogWatcher(LoggingFactory):
30 31 """A simple class that receives messages on a SUB socket, as published
31 32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32 33
@@ -43,21 +44,25 b' class LogWatcher(Configurable):'
43 44 def _loop_default(self):
44 45 return ioloop.IOLoop.instance()
45 46
46 def __init__(self, config=None):
47 super(LogWatcher, self).__init__(config=config)
47 def __init__(self, **kwargs):
48 super(LogWatcher, self).__init__(**kwargs)
48 49 s = self.context.socket(zmq.SUB)
49 50 s.bind(self.url)
50 51 self.stream = zmqstream.ZMQStream(s, self.loop)
51 52 self.subscribe()
52 53 self.on_trait_change(self.subscribe, 'topics')
53 54
55 def start(self):
54 56 self.stream.on_recv(self.log_message)
55 57
58 def stop(self):
59 self.stream.stop_on_recv()
60
56 61 def subscribe(self):
57 62 """Update our SUB socket's subscriptions."""
58 63 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
59 64 for topic in self.topics:
60 logging.debug("Subscribing to: %r"%topic)
65 self.log.debug("Subscribing to: %r"%topic)
61 66 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
62 67
63 68 def _extract_level(self, topic_str):
@@ -79,7 +84,7 b' class LogWatcher(Configurable):'
79 84 def log_message(self, raw):
80 85 """receive and parse a message, then log it."""
81 86 if len(raw) != 2 or '.' not in raw[0]:
82 logging.error("Invalid log message: %s"%raw)
87 self.log.error("Invalid log message: %s"%raw)
83 88 return
84 89 else:
85 90 topic, msg = raw
@@ -10,8 +10,9 b' Python Scheduler exists.'
10 10 #----------------------------------------------------------------------
11 11
12 12 from __future__ import print_function
13 from random import randint,random
13 import sys
14 14 import logging
15 from random import randint,random
15 16 from types import FunctionType
16 17
17 18 try:
@@ -24,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
24 25
25 26 # local imports
26 27 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 # from IPython.config.configurable import Configurable
28 29 from IPython.utils.traitlets import Instance, Dict, List, Set
29 30
30 31 import error
@@ -32,12 +33,13 b' from client import Client'
32 33 from dependency import Dependency
33 34 import streamsession as ss
34 35 from entry_point import connect_logger, local_logger
36 from factory import LoggingFactory
35 37
36 38
37 39 @decorator
38 40 def logged(f,self,*args,**kwargs):
39 41 # print ("#--------------------")
40 logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 43 # print ("#--")
42 44 return f(self,*args, **kwargs)
43 45
@@ -108,7 +110,7 b' def leastload(loads):'
108 110 # store empty default dependency:
109 111 MET = Dependency([])
110 112
111 class TaskScheduler(Configurable):
113 class TaskScheduler(LoggingFactory):
112 114 """Python TaskScheduler object.
113 115
114 116 This is the simplest object that supports msg_id based
@@ -153,7 +155,7 b' class TaskScheduler(Configurable):'
153 155 unregistration_notification = self._unregister_engine
154 156 )
155 157 self.notifier_stream.on_recv(self.dispatch_notification)
156 logging.info("Scheduler started...%r"%self)
158 self.log.info("Scheduler started...%r"%self)
157 159
158 160 def resume_receiving(self):
159 161 """Resume accepting jobs."""
@@ -180,7 +182,7 b' class TaskScheduler(Configurable):'
180 182 try:
181 183 handler(str(msg['content']['queue']))
182 184 except KeyError:
183 logging.error("task::Invalid notification msg: %s"%msg)
185 self.log.error("task::Invalid notification msg: %s"%msg)
184 186
185 187 @logged
186 188 def _register_engine(self, uid):
@@ -236,7 +238,7 b' class TaskScheduler(Configurable):'
236 238 try:
237 239 idents, msg = self.session.feed_identities(raw_msg, copy=False)
238 240 except Exception as e:
239 logging.error("task::Invaid msg: %s"%msg)
241 self.log.error("task::Invaid msg: %s"%msg)
240 242 return
241 243
242 244 # send to monitor
@@ -277,7 +279,7 b' class TaskScheduler(Configurable):'
277 279 def fail_unreachable(self, msg_id):
278 280 """a message has become unreachable"""
279 281 if msg_id not in self.depending:
280 logging.error("msg %r already failed!"%msg_id)
282 self.log.error("msg %r already failed!"%msg_id)
281 283 return
282 284 raw_msg, after, follow = self.depending.pop(msg_id)
283 285 for mid in follow.union(after):
@@ -369,7 +371,7 b' class TaskScheduler(Configurable):'
369 371 try:
370 372 idents,msg = self.session.feed_identities(raw_msg, copy=False)
371 373 except Exception as e:
372 logging.error("task::Invaid result: %s"%msg)
374 self.log.error("task::Invaid result: %s"%msg)
373 375 return
374 376 msg = self.session.unpack_message(msg, content=False, copy=False)
375 377 header = msg['header']
@@ -470,7 +472,7 b' class TaskScheduler(Configurable):'
470 472
471 473
472 474
473 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
475 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
474 476 from zmq.eventloop import ioloop
475 477 from zmq.eventloop.zmqstream import ZMQStream
476 478
@@ -490,13 +492,13 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle'
490 492 scheme = globals().get(scheme, None)
491 493 # setup logging
492 494 if log_addr:
493 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
495 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
494 496 else:
495 local_logger(loglevel)
497 local_logger(logname, loglevel)
496 498
497 499 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
498 500 mon_stream=mons,notifier_stream=nots,
499 scheme=scheme,io_loop=loop)
501 scheme=scheme,io_loop=loop, logname=logname)
500 502
501 503 try:
502 504 loop.start()
@@ -110,7 +110,7 b' class Kernel(SessionFactory):'
110 110 s = _Passer()
111 111 content = dict(silent=True, user_variable=[],user_expressions=[])
112 112 for line in self.exec_lines:
113 logging.debug("executing initialization: %s"%line)
113 self.log.debug("executing initialization: %s"%line)
114 114 content.update({'code':line})
115 115 msg = self.session.msg('execute_request', content)
116 116 self.execute_request(s, [], msg)
@@ -139,8 +139,8 b' class Kernel(SessionFactory):'
139 139
140 140 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
141 141 # msg = self.reply_socket.recv_json()
142 logging.info("Aborting:")
143 logging.info(str(msg))
142 self.log.info("Aborting:")
143 self.log.info(str(msg))
144 144 msg_type = msg['msg_type']
145 145 reply_type = msg_type.split('_')[0] + '_reply'
146 146 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
@@ -148,7 +148,7 b' class Kernel(SessionFactory):'
148 148 # self.reply_socket.send_json(reply_msg)
149 149 reply_msg = self.session.send(stream, reply_type,
150 150 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
151 logging.debug(str(reply_msg))
151 self.log.debug(str(reply_msg))
152 152 # We need to wait a bit for requests to come in. This can probably
153 153 # be set shorter for true asynchronous clients.
154 154 time.sleep(0.05)
@@ -166,7 +166,7 b' class Kernel(SessionFactory):'
166 166 content = dict(status='ok')
167 167 reply_msg = self.session.send(stream, 'abort_reply', content=content,
168 168 parent=parent, ident=ident)[0]
169 logging.debug(str(reply_msg))
169 self.log.debug(str(reply_msg))
170 170
171 171 def shutdown_request(self, stream, ident, parent):
172 172 """kill ourself. This should really be handled in an external process"""
@@ -191,7 +191,7 b' class Kernel(SessionFactory):'
191 191 try:
192 192 msg = self.session.unpack_message(msg, content=True, copy=False)
193 193 except:
194 logging.error("Invalid Message", exc_info=True)
194 self.log.error("Invalid Message", exc_info=True)
195 195 return
196 196
197 197 header = msg['header']
@@ -199,7 +199,7 b' class Kernel(SessionFactory):'
199 199
200 200 handler = self.control_handlers.get(msg['msg_type'], None)
201 201 if handler is None:
202 logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
203 203 else:
204 204 handler(self.control_stream, idents, msg)
205 205
@@ -240,11 +240,11 b' class Kernel(SessionFactory):'
240 240 self._initial_exec_lines()
241 241
242 242 def execute_request(self, stream, ident, parent):
243 logging.debug('execute request %s'%parent)
243 self.log.debug('execute request %s'%parent)
244 244 try:
245 245 code = parent[u'content'][u'code']
246 246 except:
247 logging.error("Got bad msg: %s"%parent, exc_info=True)
247 self.log.error("Got bad msg: %s"%parent, exc_info=True)
248 248 return
249 249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 250 ident='%s.pyin'%self.prefix)
@@ -268,7 +268,7 b' class Kernel(SessionFactory):'
268 268
269 269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
270 270 ident=ident, subheader = dict(started=started))
271 logging.debug(str(reply_msg))
271 self.log.debug(str(reply_msg))
272 272 if reply_msg['content']['status'] == u'error':
273 273 self.abort_queues()
274 274
@@ -290,7 +290,7 b' class Kernel(SessionFactory):'
290 290 msg_id = parent['header']['msg_id']
291 291 bound = content.get('bound', False)
292 292 except:
293 logging.error("Got bad msg: %s"%parent, exc_info=True)
293 self.log.error("Got bad msg: %s"%parent, exc_info=True)
294 294 return
295 295 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
296 296 # self.iopub_stream.send(pyin_msg)
@@ -364,7 +364,7 b' class Kernel(SessionFactory):'
364 364 try:
365 365 msg = self.session.unpack_message(msg, content=True, copy=False)
366 366 except:
367 logging.error("Invalid Message", exc_info=True)
367 self.log.error("Invalid Message", exc_info=True)
368 368 return
369 369
370 370
@@ -379,7 +379,7 b' class Kernel(SessionFactory):'
379 379 return
380 380 handler = self.shell_handlers.get(msg['msg_type'], None)
381 381 if handler is None:
382 logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
382 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
383 383 else:
384 384 handler(stream, idents, msg)
385 385
@@ -158,7 +158,7 b" def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, pas"
158 158 ssh="ssh "
159 159 if keyfile:
160 160 ssh += "-i " + keyfile
161 cmd = ssh + " -f -L 127.0.0.1:%i:127.0.0.1:%i %s sleep %i"%(lport, rport, server, timeout)
161 cmd = ssh + " -f -L 127.0.0.1:%i:%s:%i %s sleep %i"%(lport, remoteip, rport, server, timeout)
162 162 tunnel = pexpect.spawn(cmd)
163 163 failed = False
164 164 while True:
General Comments 0
You need to be logged in to leave comments. Login now