##// END OF EJS Templates
rework logging connections
MinRK -
Show More
@@ -480,12 +480,11 b' class ApplicationWithClusterDir(Application):'
480 open_log_file = None
480 open_log_file = None
481 else:
481 else:
482 open_log_file = sys.stdout
482 open_log_file = sys.stdout
483 logger = logging.getLogger()
483 if open_log_file is not None:
484 level = self.log_level
484 self.log.removeHandler(self._log_handler)
485 self.log = logger
485 self._log_handler = logging.StreamHandler(open_log_file)
486 # since we've reconnected the logger, we need to reconnect the log-level
486 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
487 self.log_level = level
487 self._log_handler.setFormatter(self._log_formatter)
488 if open_log_file is not None and self._log_handler not in self.log.handlers:
489 self.log.addHandler(self._log_handler)
488 self.log.addHandler(self._log_handler)
490 # log.startLogging(open_log_file)
489 # log.startLogging(open_log_file)
491
490
@@ -44,14 +44,12 b' class ControllerFactory(HubFactory):'
44 children = List()
44 children = List()
45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46
46
47 def _update_mq(self):
47 def _usethreads_changed(self, name, old, new):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process')
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49
49
50 def __init__(self, **kwargs):
50 def __init__(self, **kwargs):
51 super(ControllerFactory, self).__init__(**kwargs)
51 super(ControllerFactory, self).__init__(**kwargs)
52 self.subconstructors.append(self.construct_schedulers)
52 self.subconstructors.append(self.construct_schedulers)
53 self._update_mq()
54 self.on_trait_change(self._update_mq, 'usethreads')
55
53
56 def start(self):
54 def start(self):
57 super(ControllerFactory, self).start()
55 super(ControllerFactory, self).start()
@@ -91,7 +89,7 b' class ControllerFactory(HubFactory):'
91 children.append(q)
89 children.append(q)
92 # Task Queue (in a Process)
90 # Task Queue (in a Process)
93 if self.scheme == 'pure':
91 if self.scheme == 'pure':
94 logging.warn("task::using pure XREQ Task scheduler")
92 self.log.warn("task::using pure XREQ Task scheduler")
95 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
96 q.bind_in(self.client_addrs['task'])
94 q.bind_in(self.client_addrs['task'])
97 q.bind_out(self.engine_addrs['task'])
95 q.bind_out(self.engine_addrs['task'])
@@ -99,10 +97,10 b' class ControllerFactory(HubFactory):'
99 q.daemon=True
97 q.daemon=True
100 children.append(q)
98 children.append(q)
101 elif self.scheme == 'none':
99 elif self.scheme == 'none':
102 logging.warn("task::using no Task scheduler")
100 self.log.warn("task::using no Task scheduler")
103
101
104 else:
102 else:
105 logging.warn("task::using Python %s Task scheduler"%self.scheme)
103 self.log.warn("task::using Python %s Task scheduler"%self.scheme)
106 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
107 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme))
108 q.daemon=True
106 q.daemon=True
@@ -25,8 +25,8 b' from streamkernel import Kernel'
25 import heartmonitor
25 import heartmonitor
26
26
27 def printer(*msg):
27 def printer(*msg):
28 # print (logging.handlers, file=sys.__stdout__)
28 # print (self.log.handlers, file=sys.__stdout__)
29 logging.info(str(msg))
29 self.log.info(str(msg))
30
30
31 class EngineFactory(RegistrationFactory):
31 class EngineFactory(RegistrationFactory):
32 """IPython engine"""
32 """IPython engine"""
@@ -54,7 +54,7 b' class EngineFactory(RegistrationFactory):'
54 def register(self):
54 def register(self):
55 """send the registration_request"""
55 """send the registration_request"""
56
56
57 logging.info("registering")
57 self.log.info("registering")
58 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
58 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
59 self.registrar.on_recv(self.complete_registration)
59 self.registrar.on_recv(self.complete_registration)
60 # print (self.session.key)
60 # print (self.session.key)
@@ -112,10 +112,9 b' class EngineFactory(RegistrationFactory):'
112 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
112 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
113 sys.displayhook.topic = 'engine.%i.pyout'%self.id
113 sys.displayhook.topic = 'engine.%i.pyout'%self.id
114
114
115 self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
115 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
116 control_stream=control_stream,
116 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
117 shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
117 loop=loop, user_ns = self.user_ns, logname=self.log.name)
118 user_ns = self.user_ns, config=self.config)
119 self.kernel.start()
118 self.kernel.start()
120
119
121 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
120 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
@@ -124,10 +123,10 b' class EngineFactory(RegistrationFactory):'
124
123
125
124
126 else:
125 else:
127 logging.error("Registration Failed: %s"%msg)
126 self.log.error("Registration Failed: %s"%msg)
128 raise Exception("Registration Failed: %s"%msg)
127 raise Exception("Registration Failed: %s"%msg)
129
128
130 logging.info("Completed registration with id %i"%self.id)
129 self.log.info("Completed registration with id %i"%self.id)
131
130
132
131
133 def unregister(self):
132 def unregister(self):
@@ -79,8 +79,8 b' def integer_loglevel(loglevel):'
79 loglevel = getattr(logging, loglevel)
79 loglevel = getattr(logging, loglevel)
80 return loglevel
80 return loglevel
81
81
82 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
82 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
83 logger = logging.getLogger()
83 logger = logging.getLogger(logname)
84 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
84 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
85 # don't add a second PUBHandler
85 # don't add a second PUBHandler
86 return
86 return
@@ -106,9 +106,9 b' def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):'
106 logger.addHandler(handler)
106 logger.addHandler(handler)
107 logger.setLevel(loglevel)
107 logger.setLevel(loglevel)
108
108
109 def local_logger(loglevel=logging.DEBUG):
109 def local_logger(logname, loglevel=logging.DEBUG):
110 loglevel = integer_loglevel(loglevel)
110 loglevel = integer_loglevel(loglevel)
111 logger = logging.getLogger()
111 logger = logging.getLogger(logname)
112 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
112 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
113 # don't add a second StreamHandler
113 # don't add a second StreamHandler
114 return
114 return
@@ -29,9 +29,15 b' import IPython.zmq.parallel.streamsession as ss'
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30 # Classes
30 # Classes
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 class LoggingFactory(Configurable):
33 """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait."""
34 log = Instance('logging.Logger', ('ZMQ', logging.WARN))
35 logname = CStr('ZMQ')
36 def _logname_changed(self, name, old, new):
37 self.log = logging.getLogger(new)
38
32
39
33
40 class SessionFactory(LoggingFactory):
34 class SessionFactory(Configurable):
35 """The Base factory from which every factory in IPython.zmq.parallel inherits"""
41 """The Base factory from which every factory in IPython.zmq.parallel inherits"""
36
42
37 packer = Str('',config=True)
43 packer = Str('',config=True)
@@ -41,14 +47,14 b' class SessionFactory(Configurable):'
41 return str(uuid.uuid4())
47 return str(uuid.uuid4())
42 username = Str(os.environ.get('USER','username'),config=True)
48 username = Str(os.environ.get('USER','username'),config=True)
43 exec_key = CUnicode('',config=True)
49 exec_key = CUnicode('',config=True)
44
45 # not configurable:
50 # not configurable:
46 context = Instance('zmq.Context', (), {})
51 context = Instance('zmq.Context', (), {})
47 session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
52 session = Instance('IPython.zmq.parallel.streamsession.StreamSession')
48 loop = Instance('zmq.eventloop.ioloop.IOLoop')
53 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
49 def _loop_default(self):
54 def _loop_default(self):
50 return IOLoop.instance()
55 return IOLoop.instance()
51
56
57
52 def __init__(self, **kwargs):
58 def __init__(self, **kwargs):
53 super(SessionFactory, self).__init__(**kwargs)
59 super(SessionFactory, self).__init__(**kwargs)
54
60
@@ -13,6 +13,9 b' import zmq'
13 from zmq.devices import ProcessDevice,ThreadDevice
13 from zmq.devices import ProcessDevice,ThreadDevice
14 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
15
15
16 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
17 from factory import LoggingFactory
18
16 class Heart(object):
19 class Heart(object):
17 """A basic heart object for responding to a HeartMonitor.
20 """A basic heart object for responding to a HeartMonitor.
18 This is a simple wrapper with defaults for the most common
21 This is a simple wrapper with defaults for the most common
@@ -39,36 +42,35 b' class Heart(object):'
39 def start(self):
42 def start(self):
40 return self.device.start()
43 return self.device.start()
41
44
42 class HeartMonitor(object):
45 class HeartMonitor(LoggingFactory):
43 """A basic HeartMonitor class
46 """A basic HeartMonitor class
44 pingstream: a PUB stream
47 pingstream: a PUB stream
45 pongstream: an XREP stream
48 pongstream: an XREP stream
46 period: the period of the heartbeat in milliseconds"""
49 period: the period of the heartbeat in milliseconds"""
47 loop=None
48 pingstream=None
49 pongstream=None
50 period=None
51 hearts=None
52 on_probation=None
53 last_ping=None
54 # debug=False
55
50
56 def __init__(self, loop, pingstream, pongstream, period=1000):
51 period=CFloat(1000, config=True) # in milliseconds
57 self.loop = loop
52
58 self.period = period
53 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
54 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
55 loop = Instance('zmq.eventloop.ioloop.IOLoop')
56 def _loop_default(self):
57 return ioloop.IOLoop.instance()
58 debug=Bool(False)
59
60 # not settable:
61 hearts=Set()
62 responses=Set()
63 on_probation=Set()
64 last_ping=CFloat(0)
65 _new_handlers = Set()
66 _failure_handlers = Set()
67 lifetime = CFloat(0)
68 tic = CFloat(0)
69
70 def __init__(self, **kwargs):
71 super(HeartMonitor, self).__init__(**kwargs)
59
72
60 self.pingstream = pingstream
61 self.pongstream = pongstream
62 self.pongstream.on_recv(self.handle_pong)
73 self.pongstream.on_recv(self.handle_pong)
63
64 self.hearts = set()
65 self.responses = set()
66 self.on_probation = set()
67 self.lifetime = 0
68 self.tic = time.time()
69
70 self._new_handlers = set()
71 self._failure_handlers = set()
72
74
73 def start(self):
75 def start(self):
74 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
76 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
@@ -76,12 +78,12 b' class HeartMonitor(object):'
76
78
77 def add_new_heart_handler(self, handler):
79 def add_new_heart_handler(self, handler):
78 """add a new handler for new hearts"""
80 """add a new handler for new hearts"""
79 logging.debug("heartbeat::new_heart_handler: %s"%handler)
81 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
80 self._new_handlers.add(handler)
82 self._new_handlers.add(handler)
81
83
82 def add_heart_failure_handler(self, handler):
84 def add_heart_failure_handler(self, handler):
83 """add a new handler for heart failure"""
85 """add a new handler for heart failure"""
84 logging.debug("heartbeat::new heart failure handler: %s"%handler)
86 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
85 self._failure_handlers.add(handler)
87 self._failure_handlers.add(handler)
86
88
87 def beat(self):
89 def beat(self):
@@ -91,7 +93,7 b' class HeartMonitor(object):'
91 toc = time.time()
93 toc = time.time()
92 self.lifetime += toc-self.tic
94 self.lifetime += toc-self.tic
93 self.tic = toc
95 self.tic = toc
94 # logging.debug("heartbeat::%s"%self.lifetime)
96 # self.log.debug("heartbeat::%s"%self.lifetime)
95 goodhearts = self.hearts.intersection(self.responses)
97 goodhearts = self.hearts.intersection(self.responses)
96 missed_beats = self.hearts.difference(goodhearts)
98 missed_beats = self.hearts.difference(goodhearts)
97 heartfailures = self.on_probation.intersection(missed_beats)
99 heartfailures = self.on_probation.intersection(missed_beats)
@@ -101,7 +103,7 b' class HeartMonitor(object):'
101 self.on_probation = missed_beats.intersection(self.hearts)
103 self.on_probation = missed_beats.intersection(self.hearts)
102 self.responses = set()
104 self.responses = set()
103 # print self.on_probation, self.hearts
105 # print self.on_probation, self.hearts
104 # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
106 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
105 self.pingstream.send(str(self.lifetime))
107 self.pingstream.send(str(self.lifetime))
106
108
107 def handle_new_heart(self, heart):
109 def handle_new_heart(self, heart):
@@ -109,7 +111,7 b' class HeartMonitor(object):'
109 for handler in self._new_handlers:
111 for handler in self._new_handlers:
110 handler(heart)
112 handler(heart)
111 else:
113 else:
112 logging.info("heartbeat::yay, got new heart %s!"%heart)
114 self.log.info("heartbeat::yay, got new heart %s!"%heart)
113 self.hearts.add(heart)
115 self.hearts.add(heart)
114
116
115 def handle_heart_failure(self, heart):
117 def handle_heart_failure(self, heart):
@@ -118,10 +120,10 b' class HeartMonitor(object):'
118 try:
120 try:
119 handler(heart)
121 handler(heart)
120 except Exception as e:
122 except Exception as e:
121 logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
123 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
122 pass
124 pass
123 else:
125 else:
124 logging.info("heartbeat::Heart %s failed :("%heart)
126 self.log.info("heartbeat::Heart %s failed :("%heart)
125 self.hearts.remove(heart)
127 self.hearts.remove(heart)
126
128
127
129
@@ -129,14 +131,14 b' class HeartMonitor(object):'
129 "a heart just beat"
131 "a heart just beat"
130 if msg[1] == str(self.lifetime):
132 if msg[1] == str(self.lifetime):
131 delta = time.time()-self.tic
133 delta = time.time()-self.tic
132 # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
134 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
133 self.responses.add(msg[0])
135 self.responses.add(msg[0])
134 elif msg[1] == str(self.last_ping):
136 elif msg[1] == str(self.last_ping):
135 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
137 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
136 logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
138 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
137 self.responses.add(msg[0])
139 self.responses.add(msg[0])
138 else:
140 else:
139 logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
141 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
140 (msg[1],self.lifetime))
142 (msg[1],self.lifetime))
141
143
142
144
@@ -30,7 +30,7 b' from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, Li'
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31
31
32 from entry_point import select_random_ports
32 from entry_point import select_random_ports
33 from factory import RegistrationFactory
33 from factory import RegistrationFactory, LoggingFactory
34
34
35 from streamsession import Message, wrap_exception, ISO8601
35 from streamsession import Message, wrap_exception, ISO8601
36 from heartmonitor import HeartMonitor
36 from heartmonitor import HeartMonitor
@@ -95,10 +95,6 b' class EngineConnector(HasTraits):'
95 registration=Str()
95 registration=Str()
96 heartbeat=Str()
96 heartbeat=Str()
97 pending=Set()
97 pending=Set()
98
99 def __init__(self, **kwargs):
100 super(EngineConnector, self).__init__(**kwargs)
101 logging.info("engine::Engine Connected: %i"%self.id)
102
98
103 class HubFactory(RegistrationFactory):
99 class HubFactory(RegistrationFactory):
104 """The Configurable for setting up a Hub."""
100 """The Configurable for setting up a Hub."""
@@ -193,7 +189,7 b' class HubFactory(RegistrationFactory):'
193 def start(self):
189 def start(self):
194 assert self._constructed, "must be constructed by self.construct() first!"
190 assert self._constructed, "must be constructed by self.construct() first!"
195 self.heartmonitor.start()
191 self.heartmonitor.start()
196 logging.info("Heartmonitor started")
192 self.log.info("Heartmonitor started")
197
193
198 def construct_hub(self):
194 def construct_hub(self):
199 """construct"""
195 """construct"""
@@ -206,10 +202,10 b' class HubFactory(RegistrationFactory):'
206 # Registrar socket
202 # Registrar socket
207 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
203 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
208 reg.bind(client_iface % self.regport)
204 reg.bind(client_iface % self.regport)
209 logging.info("Hub listening on %s for registration."%(client_iface%self.regport))
205 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
210 if self.client_ip != self.engine_ip:
206 if self.client_ip != self.engine_ip:
211 reg.bind(engine_iface % self.regport)
207 reg.bind(engine_iface % self.regport)
212 logging.info("Hub listening on %s for registration."%(engine_iface%self.regport))
208 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
213
209
214 ### Engine connections ###
210 ### Engine connections ###
215
211
@@ -218,8 +214,8 b' class HubFactory(RegistrationFactory):'
218 hpub.bind(engine_iface % self.hb[0])
214 hpub.bind(engine_iface % self.hb[0])
219 hrep = ctx.socket(zmq.XREP)
215 hrep = ctx.socket(zmq.XREP)
220 hrep.bind(engine_iface % self.hb[1])
216 hrep.bind(engine_iface % self.hb[1])
221
217 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
222 self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping)
218 period=self.ping, logname=self.log.name)
223
219
224 ### Client connections ###
220 ### Client connections ###
225 # Clientele socket
221 # Clientele socket
@@ -259,14 +255,15 b' class HubFactory(RegistrationFactory):'
259 'iopub' : client_iface%self.iopub[0],
255 'iopub' : client_iface%self.iopub[0],
260 'notification': client_iface%self.notifier_port
256 'notification': client_iface%self.notifier_port
261 }
257 }
262 logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
263 logging.debug("hub::Hub client addrs: %s"%self.client_addrs)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
264 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
260 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
265 registrar=reg, clientele=c, notifier=n, db=self.db,
261 registrar=reg, clientele=c, notifier=n, db=self.db,
266 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs)
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
263 logname=self.log.name)
267
264
268
265
269 class Hub(HasTraits):
266 class Hub(LoggingFactory):
270 """The IPython Controller Hub with 0MQ connections
267 """The IPython Controller Hub with 0MQ connections
271
268
272 Parameters
269 Parameters
@@ -371,7 +368,7 b' class Hub(HasTraits):'
371 'connection_request': self.connection_request,
368 'connection_request': self.connection_request,
372 }
369 }
373
370
374 logging.info("hub::created hub")
371 self.log.info("hub::created hub")
375
372
376 @property
373 @property
377 def _next_id(self):
374 def _next_id(self):
@@ -422,7 +419,7 b' class Hub(HasTraits):'
422 try:
419 try:
423 msg = self.session.unpack_message(msg[1:], content=True)
420 msg = self.session.unpack_message(msg[1:], content=True)
424 except:
421 except:
425 logging.error("client::Invalid Message %s"%msg, exc_info=True)
422 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
426 return False
423 return False
427
424
428 msg_type = msg.get('msg_type', None)
425 msg_type = msg.get('msg_type', None)
@@ -439,15 +436,15 b' class Hub(HasTraits):'
439
436
440 def dispatch_register_request(self, msg):
437 def dispatch_register_request(self, msg):
441 """"""
438 """"""
442 logging.debug("registration::dispatch_register_request(%s)"%msg)
439 self.log.debug("registration::dispatch_register_request(%s)"%msg)
443 idents,msg = self.session.feed_identities(msg)
440 idents,msg = self.session.feed_identities(msg)
444 if not idents:
441 if not idents:
445 logging.error("Bad Queue Message: %s"%msg, exc_info=True)
442 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
446 return
443 return
447 try:
444 try:
448 msg = self.session.unpack_message(msg,content=True)
445 msg = self.session.unpack_message(msg,content=True)
449 except:
446 except:
450 logging.error("registration::got bad registration message: %s"%msg, exc_info=True)
447 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
451 return
448 return
452
449
453 msg_type = msg['msg_type']
450 msg_type = msg['msg_type']
@@ -455,38 +452,38 b' class Hub(HasTraits):'
455
452
456 handler = self.registrar_handlers.get(msg_type, None)
453 handler = self.registrar_handlers.get(msg_type, None)
457 if handler is None:
454 if handler is None:
458 logging.error("registration::got bad registration message: %s"%msg)
455 self.log.error("registration::got bad registration message: %s"%msg)
459 else:
456 else:
460 handler(idents, msg)
457 handler(idents, msg)
461
458
462 def dispatch_monitor_traffic(self, msg):
459 def dispatch_monitor_traffic(self, msg):
463 """all ME and Task queue messages come through here, as well as
460 """all ME and Task queue messages come through here, as well as
464 IOPub traffic."""
461 IOPub traffic."""
465 logging.debug("monitor traffic: %s"%msg[:2])
462 self.log.debug("monitor traffic: %s"%msg[:2])
466 switch = msg[0]
463 switch = msg[0]
467 idents, msg = self.session.feed_identities(msg[1:])
464 idents, msg = self.session.feed_identities(msg[1:])
468 if not idents:
465 if not idents:
469 logging.error("Bad Monitor Message: %s"%msg)
466 self.log.error("Bad Monitor Message: %s"%msg)
470 return
467 return
471 handler = self.monitor_handlers.get(switch, None)
468 handler = self.monitor_handlers.get(switch, None)
472 if handler is not None:
469 if handler is not None:
473 handler(idents, msg)
470 handler(idents, msg)
474 else:
471 else:
475 logging.error("Invalid monitor topic: %s"%switch)
472 self.log.error("Invalid monitor topic: %s"%switch)
476
473
477
474
478 def dispatch_client_msg(self, msg):
475 def dispatch_client_msg(self, msg):
479 """Route messages from clients"""
476 """Route messages from clients"""
480 idents, msg = self.session.feed_identities(msg)
477 idents, msg = self.session.feed_identities(msg)
481 if not idents:
478 if not idents:
482 logging.error("Bad Client Message: %s"%msg)
479 self.log.error("Bad Client Message: %s"%msg)
483 return
480 return
484 client_id = idents[0]
481 client_id = idents[0]
485 try:
482 try:
486 msg = self.session.unpack_message(msg, content=True)
483 msg = self.session.unpack_message(msg, content=True)
487 except:
484 except:
488 content = wrap_exception()
485 content = wrap_exception()
489 logging.error("Bad Client Message: %s"%msg, exc_info=True)
486 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
490 self.session.send(self.clientele, "hub_error", ident=client_id,
487 self.session.send(self.clientele, "hub_error", ident=client_id,
491 content=content)
488 content=content)
492 return
489 return
@@ -494,13 +491,13 b' class Hub(HasTraits):'
494 # print client_id, header, parent, content
491 # print client_id, header, parent, content
495 #switch on message type:
492 #switch on message type:
496 msg_type = msg['msg_type']
493 msg_type = msg['msg_type']
497 logging.info("client:: client %s requested %s"%(client_id, msg_type))
494 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
498 handler = self.client_handlers.get(msg_type, None)
495 handler = self.client_handlers.get(msg_type, None)
499 try:
496 try:
500 assert handler is not None, "Bad Message Type: %s"%msg_type
497 assert handler is not None, "Bad Message Type: %s"%msg_type
501 except:
498 except:
502 content = wrap_exception()
499 content = wrap_exception()
503 logging.error("Bad Message Type: %s"%msg_type, exc_info=True)
500 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
504 self.session.send(self.clientele, "hub_error", ident=client_id,
501 self.session.send(self.clientele, "hub_error", ident=client_id,
505 content=content)
502 content=content)
506 return
503 return
@@ -521,9 +518,9 b' class Hub(HasTraits):'
521 """handler to attach to heartbeater.
518 """handler to attach to heartbeater.
522 Called when a new heart starts to beat.
519 Called when a new heart starts to beat.
523 Triggers completion of registration."""
520 Triggers completion of registration."""
524 logging.debug("heartbeat::handle_new_heart(%r)"%heart)
521 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 if heart not in self.incoming_registrations:
522 if heart not in self.incoming_registrations:
526 logging.info("heartbeat::ignoring new heart: %r"%heart)
523 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 else:
524 else:
528 self.finish_registration(heart)
525 self.finish_registration(heart)
529
526
@@ -532,11 +529,11 b' class Hub(HasTraits):'
532 """handler to attach to heartbeater.
529 """handler to attach to heartbeater.
533 called when a previously registered heart fails to respond to beat request.
530 called when a previously registered heart fails to respond to beat request.
534 triggers unregistration"""
531 triggers unregistration"""
535 logging.debug("heartbeat::handle_heart_failure(%r)"%heart)
532 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 eid = self.hearts.get(heart, None)
533 eid = self.hearts.get(heart, None)
537 queue = self.engines[eid].queue
534 queue = self.engines[eid].queue
538 if eid is None:
535 if eid is None:
539 logging.info("heartbeat::ignoring heart failure %r"%heart)
536 self.log.info("heartbeat::ignoring heart failure %r"%heart)
540 else:
537 else:
541 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
538 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542
539
@@ -544,19 +541,19 b' class Hub(HasTraits):'
544
541
545 def save_queue_request(self, idents, msg):
542 def save_queue_request(self, idents, msg):
546 if len(idents) < 2:
543 if len(idents) < 2:
547 logging.error("invalid identity prefix: %s"%idents)
544 self.log.error("invalid identity prefix: %s"%idents)
548 return
545 return
549 queue_id, client_id = idents[:2]
546 queue_id, client_id = idents[:2]
550 try:
547 try:
551 msg = self.session.unpack_message(msg, content=False)
548 msg = self.session.unpack_message(msg, content=False)
552 except:
549 except:
553 logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
550 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
554 return
551 return
555
552
556 eid = self.by_ident.get(queue_id, None)
553 eid = self.by_ident.get(queue_id, None)
557 if eid is None:
554 if eid is None:
558 logging.error("queue::target %r not registered"%queue_id)
555 self.log.error("queue::target %r not registered"%queue_id)
559 logging.debug("queue:: valid are: %s"%(self.by_ident.keys()))
556 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
560 return
557 return
561
558
562 header = msg['header']
559 header = msg['header']
@@ -573,21 +570,21 b' class Hub(HasTraits):'
573
570
574 def save_queue_result(self, idents, msg):
571 def save_queue_result(self, idents, msg):
575 if len(idents) < 2:
572 if len(idents) < 2:
576 logging.error("invalid identity prefix: %s"%idents)
573 self.log.error("invalid identity prefix: %s"%idents)
577 return
574 return
578
575
579 client_id, queue_id = idents[:2]
576 client_id, queue_id = idents[:2]
580 try:
577 try:
581 msg = self.session.unpack_message(msg, content=False)
578 msg = self.session.unpack_message(msg, content=False)
582 except:
579 except:
583 logging.error("queue::engine %r sent invalid message to %r: %s"%(
580 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
584 queue_id,client_id, msg), exc_info=True)
581 queue_id,client_id, msg), exc_info=True)
585 return
582 return
586
583
587 eid = self.by_ident.get(queue_id, None)
584 eid = self.by_ident.get(queue_id, None)
588 if eid is None:
585 if eid is None:
589 logging.error("queue::unknown engine %r is sending a reply: "%queue_id)
586 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 logging.debug("queue:: %s"%msg[2:])
587 self.log.debug("queue:: %s"%msg[2:])
591 return
588 return
592
589
593 parent = msg['parent_header']
590 parent = msg['parent_header']
@@ -616,7 +613,7 b' class Hub(HasTraits):'
616 result['result_buffers'] = msg['buffers']
613 result['result_buffers'] = msg['buffers']
617 self.db.update_record(msg_id, result)
614 self.db.update_record(msg_id, result)
618 else:
615 else:
619 logging.debug("queue:: unknown msg finished %s"%msg_id)
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
620
617
621 #--------------------- Task Queue Traffic ------------------------------
618 #--------------------- Task Queue Traffic ------------------------------
622
619
@@ -627,7 +624,7 b' class Hub(HasTraits):'
627 try:
624 try:
628 msg = self.session.unpack_message(msg, content=False)
625 msg = self.session.unpack_message(msg, content=False)
629 except:
626 except:
630 logging.error("task::client %r sent invalid task message: %s"%(
627 self.log.error("task::client %r sent invalid task message: %s"%(
631 client_id, msg), exc_info=True)
628 client_id, msg), exc_info=True)
632 return
629 return
633 record = init_record(msg)
630 record = init_record(msg)
@@ -646,7 +643,7 b' class Hub(HasTraits):'
646 try:
643 try:
647 msg = self.session.unpack_message(msg, content=False)
644 msg = self.session.unpack_message(msg, content=False)
648 except:
645 except:
649 logging.error("task::invalid task result message send to %r: %s"%(
646 self.log.error("task::invalid task result message send to %r: %s"%(
650 client_id, msg), exc_info=True)
647 client_id, msg), exc_info=True)
651 raise
648 raise
652 return
649 return
@@ -654,7 +651,7 b' class Hub(HasTraits):'
654 parent = msg['parent_header']
651 parent = msg['parent_header']
655 if not parent:
652 if not parent:
656 # print msg
653 # print msg
657 logging.warn("Task %r had no parent!"%msg)
654 self.log.warn("Task %r had no parent!"%msg)
658 return
655 return
659 msg_id = parent['msg_id']
656 msg_id = parent['msg_id']
660
657
@@ -687,13 +684,13 b' class Hub(HasTraits):'
687 self.db.update_record(msg_id, result)
684 self.db.update_record(msg_id, result)
688
685
689 else:
686 else:
690 logging.debug("task::unknown task %s finished"%msg_id)
687 self.log.debug("task::unknown task %s finished"%msg_id)
691
688
692 def save_task_destination(self, idents, msg):
689 def save_task_destination(self, idents, msg):
693 try:
690 try:
694 msg = self.session.unpack_message(msg, content=True)
691 msg = self.session.unpack_message(msg, content=True)
695 except:
692 except:
696 logging.error("task::invalid task tracking message", exc_info=True)
693 self.log.error("task::invalid task tracking message", exc_info=True)
697 return
694 return
698 content = msg['content']
695 content = msg['content']
699 print (content)
696 print (content)
@@ -701,11 +698,11 b' class Hub(HasTraits):'
701 engine_uuid = content['engine_id']
698 engine_uuid = content['engine_id']
702 eid = self.by_ident[engine_uuid]
699 eid = self.by_ident[engine_uuid]
703
700
704 logging.info("task::task %s arrived on %s"%(msg_id, eid))
701 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
705 # if msg_id in self.mia:
702 # if msg_id in self.mia:
706 # self.mia.remove(msg_id)
703 # self.mia.remove(msg_id)
707 # else:
704 # else:
708 # logging.debug("task::task %s not listed as MIA?!"%(msg_id))
705 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
709
706
710 self.tasks[eid].append(msg_id)
707 self.tasks[eid].append(msg_id)
711 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
708 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
@@ -726,12 +723,12 b' class Hub(HasTraits):'
726 try:
723 try:
727 msg = self.session.unpack_message(msg, content=True)
724 msg = self.session.unpack_message(msg, content=True)
728 except:
725 except:
729 logging.error("iopub::invalid IOPub message", exc_info=True)
726 self.log.error("iopub::invalid IOPub message", exc_info=True)
730 return
727 return
731
728
732 parent = msg['parent_header']
729 parent = msg['parent_header']
733 if not parent:
730 if not parent:
734 logging.error("iopub::invalid IOPub message: %s"%msg)
731 self.log.error("iopub::invalid IOPub message: %s"%msg)
735 return
732 return
736 msg_id = parent['msg_id']
733 msg_id = parent['msg_id']
737 msg_type = msg['msg_type']
734 msg_type = msg['msg_type']
@@ -741,7 +738,7 b' class Hub(HasTraits):'
741 try:
738 try:
742 rec = self.db.get_record(msg_id)
739 rec = self.db.get_record(msg_id)
743 except:
740 except:
744 logging.error("iopub::IOPub message has invalid parent", exc_info=True)
741 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
745 return
742 return
746 # stream
743 # stream
747 d = {}
744 d = {}
@@ -765,7 +762,7 b' class Hub(HasTraits):'
765
762
766 def connection_request(self, client_id, msg):
763 def connection_request(self, client_id, msg):
767 """Reply with connection addresses for clients."""
764 """Reply with connection addresses for clients."""
768 logging.info("client::client %s connected"%client_id)
765 self.log.info("client::client %s connected"%client_id)
769 content = dict(status='ok')
766 content = dict(status='ok')
770 content.update(self.client_addrs)
767 content.update(self.client_addrs)
771 jsonable = {}
768 jsonable = {}
@@ -780,14 +777,14 b' class Hub(HasTraits):'
780 try:
777 try:
781 queue = content['queue']
778 queue = content['queue']
782 except KeyError:
779 except KeyError:
783 logging.error("registration::queue not specified", exc_info=True)
780 self.log.error("registration::queue not specified", exc_info=True)
784 return
781 return
785 heart = content.get('heartbeat', None)
782 heart = content.get('heartbeat', None)
786 """register a new engine, and create the socket(s) necessary"""
783 """register a new engine, and create the socket(s) necessary"""
787 eid = self._next_id
784 eid = self._next_id
788 # print (eid, queue, reg, heart)
785 # print (eid, queue, reg, heart)
789
786
790 logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
787 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
791
788
792 content = dict(id=eid,status='ok')
789 content = dict(id=eid,status='ok')
793 content.update(self.engine_addrs)
790 content.update(self.engine_addrs)
@@ -797,12 +794,12 b' class Hub(HasTraits):'
797 raise KeyError("queue_id %r in use"%queue)
794 raise KeyError("queue_id %r in use"%queue)
798 except:
795 except:
799 content = wrap_exception()
796 content = wrap_exception()
800 logging.error("queue_id %r in use"%queue, exc_info=True)
797 self.log.error("queue_id %r in use"%queue, exc_info=True)
801 elif heart in self.hearts: # need to check unique hearts?
798 elif heart in self.hearts: # need to check unique hearts?
802 try:
799 try:
803 raise KeyError("heart_id %r in use"%heart)
800 raise KeyError("heart_id %r in use"%heart)
804 except:
801 except:
805 logging.error("heart_id %r in use"%heart, exc_info=True)
802 self.log.error("heart_id %r in use"%heart, exc_info=True)
806 content = wrap_exception()
803 content = wrap_exception()
807 else:
804 else:
808 for h, pack in self.incoming_registrations.iteritems():
805 for h, pack in self.incoming_registrations.iteritems():
@@ -810,14 +807,14 b' class Hub(HasTraits):'
810 try:
807 try:
811 raise KeyError("heart_id %r in use"%heart)
808 raise KeyError("heart_id %r in use"%heart)
812 except:
809 except:
813 logging.error("heart_id %r in use"%heart, exc_info=True)
810 self.log.error("heart_id %r in use"%heart, exc_info=True)
814 content = wrap_exception()
811 content = wrap_exception()
815 break
812 break
816 elif queue == pack[1]:
813 elif queue == pack[1]:
817 try:
814 try:
818 raise KeyError("queue_id %r in use"%queue)
815 raise KeyError("queue_id %r in use"%queue)
819 except:
816 except:
820 logging.error("queue_id %r in use"%queue, exc_info=True)
817 self.log.error("queue_id %r in use"%queue, exc_info=True)
821 content = wrap_exception()
818 content = wrap_exception()
822 break
819 break
823
820
@@ -836,7 +833,7 b' class Hub(HasTraits):'
836 dc.start()
833 dc.start()
837 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
834 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
838 else:
835 else:
839 logging.error("registration::registration %i failed: %s"%(eid, content['evalue']))
836 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
840 return eid
837 return eid
841
838
842 def unregister_engine(self, ident, msg):
839 def unregister_engine(self, ident, msg):
@@ -844,9 +841,9 b' class Hub(HasTraits):'
844 try:
841 try:
845 eid = msg['content']['id']
842 eid = msg['content']['id']
846 except:
843 except:
847 logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
844 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
848 return
845 return
849 logging.info("registration::unregister_engine(%s)"%eid)
846 self.log.info("registration::unregister_engine(%s)"%eid)
850 content=dict(id=eid, queue=self.engines[eid].queue)
847 content=dict(id=eid, queue=self.engines[eid].queue)
851 self.ids.remove(eid)
848 self.ids.remove(eid)
852 self.keytable.pop(eid)
849 self.keytable.pop(eid)
@@ -867,9 +864,9 b' class Hub(HasTraits):'
867 try:
864 try:
868 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
865 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
869 except KeyError:
866 except KeyError:
870 logging.error("registration::tried to finish nonexistant registration", exc_info=True)
867 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
871 return
868 return
872 logging.info("registration::finished registering engine %i:%r"%(eid,queue))
869 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
873 if purge is not None:
870 if purge is not None:
874 purge.stop()
871 purge.stop()
875 control = queue
872 control = queue
@@ -885,11 +882,12 b' class Hub(HasTraits):'
885 content = dict(id=eid, queue=self.engines[eid].queue)
882 content = dict(id=eid, queue=self.engines[eid].queue)
886 if self.notifier:
883 if self.notifier:
887 self.session.send(self.notifier, "registration_notification", content=content)
884 self.session.send(self.notifier, "registration_notification", content=content)
885 self.log.info("engine::Engine Connected: %i"%eid)
888
886
889 def _purge_stalled_registration(self, heart):
887 def _purge_stalled_registration(self, heart):
890 if heart in self.incoming_registrations:
888 if heart in self.incoming_registrations:
891 eid = self.incoming_registrations.pop(heart)[0]
889 eid = self.incoming_registrations.pop(heart)[0]
892 logging.info("registration::purging stalled registration: %i"%eid)
890 self.log.info("registration::purging stalled registration: %i"%eid)
893 else:
891 else:
894 pass
892 pass
895
893
@@ -910,7 +908,7 b' class Hub(HasTraits):'
910 dc.start()
908 dc.start()
911
909
912 def _shutdown(self):
910 def _shutdown(self):
913 logging.info("hub::hub shutting down.")
911 self.log.info("hub::hub shutting down.")
914 time.sleep(0.1)
912 time.sleep(0.1)
915 sys.exit(0)
913 sys.exit(0)
916
914
@@ -314,11 +314,12 b' class IPClusterApp(ApplicationWithClusterDir):'
314 # and engine will be launched.
314 # and engine will be launched.
315 el_class = import_item(config.Global.engine_launcher)
315 el_class = import_item(config.Global.engine_launcher)
316 self.engine_launcher = el_class(
316 self.engine_launcher = el_class(
317 work_dir=self.cluster_dir, config=config
317 work_dir=self.cluster_dir, config=config, logname=self.log.name
318 )
318 )
319 cl_class = import_item(config.Global.controller_launcher)
319 cl_class = import_item(config.Global.controller_launcher)
320 self.controller_launcher = cl_class(
320 self.controller_launcher = cl_class(
321 work_dir=self.cluster_dir, config=config
321 work_dir=self.cluster_dir, config=config,
322 logname=self.log.name
322 )
323 )
323
324
324 # Setup signals
325 # Setup signals
@@ -348,11 +349,11 b' class IPClusterApp(ApplicationWithClusterDir):'
348 return d
349 return d
349
350
350 def startup_message(self, r=None):
351 def startup_message(self, r=None):
351 logging.info("IPython cluster: started")
352 self.log.info("IPython cluster: started")
352 return r
353 return r
353
354
354 def start_controller(self, r=None):
355 def start_controller(self, r=None):
355 # logging.info("In start_controller")
356 # self.log.info("In start_controller")
356 config = self.master_config
357 config = self.master_config
357 d = self.controller_launcher.start(
358 d = self.controller_launcher.start(
358 cluster_dir=config.Global.cluster_dir
359 cluster_dir=config.Global.cluster_dir
@@ -360,7 +361,7 b' class IPClusterApp(ApplicationWithClusterDir):'
360 return d
361 return d
361
362
362 def start_engines(self, r=None):
363 def start_engines(self, r=None):
363 # logging.info("In start_engines")
364 # self.log.info("In start_engines")
364 config = self.master_config
365 config = self.master_config
365 d = self.engine_launcher.start(
366 d = self.engine_launcher.start(
366 config.Global.n,
367 config.Global.n,
@@ -369,12 +370,12 b' class IPClusterApp(ApplicationWithClusterDir):'
369 return d
370 return d
370
371
371 def stop_controller(self, r=None):
372 def stop_controller(self, r=None):
372 # logging.info("In stop_controller")
373 # self.log.info("In stop_controller")
373 if self.controller_launcher.running:
374 if self.controller_launcher.running:
374 return self.controller_launcher.stop()
375 return self.controller_launcher.stop()
375
376
376 def stop_engines(self, r=None):
377 def stop_engines(self, r=None):
377 # logging.info("In stop_engines")
378 # self.log.info("In stop_engines")
378 if self.engine_launcher.running:
379 if self.engine_launcher.running:
379 d = self.engine_launcher.stop()
380 d = self.engine_launcher.stop()
380 # d.addErrback(self.log_err)
381 # d.addErrback(self.log_err)
@@ -383,16 +384,16 b' class IPClusterApp(ApplicationWithClusterDir):'
383 return None
384 return None
384
385
385 def log_err(self, f):
386 def log_err(self, f):
386 logging.error(f.getTraceback())
387 self.log.error(f.getTraceback())
387 return None
388 return None
388
389
389 def stop_launchers(self, r=None):
390 def stop_launchers(self, r=None):
390 if not self._stopping:
391 if not self._stopping:
391 self._stopping = True
392 self._stopping = True
392 # if isinstance(r, failure.Failure):
393 # if isinstance(r, failure.Failure):
393 # logging.error('Unexpected error in ipcluster:')
394 # self.log.error('Unexpected error in ipcluster:')
394 # logging.info(r.getTraceback())
395 # self.log.info(r.getTraceback())
395 logging.error("IPython cluster: stopping")
396 self.log.error("IPython cluster: stopping")
396 # These return deferreds. We are not doing anything with them
397 # These return deferreds. We are not doing anything with them
397 # but we are holding refs to them as a reminder that they
398 # but we are holding refs to them as a reminder that they
398 # do return deferreds.
399 # do return deferreds.
@@ -462,7 +463,7 b' class IPClusterApp(ApplicationWithClusterDir):'
462 try:
463 try:
463 self.loop.start()
464 self.loop.start()
464 except:
465 except:
465 logging.info("stopping...")
466 self.log.info("stopping...")
466 self.remove_pid_file()
467 self.remove_pid_file()
467
468
468 def start_app_stop(self):
469 def start_app_stop(self):
@@ -279,7 +279,7 b' class IPControllerApp(ApplicationWithClusterDir):'
279 c.SessionFactory.exec_key = ''
279 c.SessionFactory.exec_key = ''
280
280
281 try:
281 try:
282 self.factory = ControllerFactory(config=c)
282 self.factory = ControllerFactory(config=c, logname=self.log.name)
283 self.start_logging()
283 self.start_logging()
284 self.factory.construct()
284 self.factory.construct()
285 except:
285 except:
@@ -193,7 +193,7 b' class IPEngineApp(ApplicationWithClusterDir):'
193 # Create the underlying shell class and EngineService
193 # Create the underlying shell class and EngineService
194 # shell_class = import_item(self.master_config.Global.shell_class)
194 # shell_class = import_item(self.master_config.Global.shell_class)
195 try:
195 try:
196 self.engine = EngineFactory(config=config)
196 self.engine = EngineFactory(config=config, logname=self.log.name)
197 except:
197 except:
198 self.log.error("Couldn't start the Engine", exc_info=True)
198 self.log.error("Couldn't start the Engine", exc_info=True)
199 self.exit(1)
199 self.exit(1)
@@ -107,7 +107,7 b' class IPLoggerApp(ApplicationWithClusterDir):'
107 self.start_logging()
107 self.start_logging()
108
108
109 try:
109 try:
110 self.watcher = LogWatcher(config=self.master_config)
110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 except:
111 except:
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 self.exit(1)
113 self.exit(1)
@@ -115,6 +115,7 b' class IPLoggerApp(ApplicationWithClusterDir):'
115
115
116 def start_app(self):
116 def start_app(self):
117 try:
117 try:
118 self.watcher.start()
118 self.watcher.loop.start()
119 self.watcher.loop.start()
119 except KeyboardInterrupt:
120 except KeyboardInterrupt:
120 self.log.critical("Logging Interrupted, shutting down...\n")
121 self.log.critical("Logging Interrupted, shutting down...\n")
@@ -30,11 +30,12 b' from subprocess import Popen, PIPE'
30
30
31 from zmq.eventloop import ioloop
31 from zmq.eventloop import ioloop
32
32
33 from IPython.config.configurable import Configurable
33 # from IPython.config.configurable import Configurable
34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
34 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
35 from IPython.utils.path import get_ipython_module_path
35 from IPython.utils.path import get_ipython_module_path
36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
36 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
37
37
38 from factory import LoggingFactory
38 # from IPython.kernel.winhpcjob import (
39 # from IPython.kernel.winhpcjob import (
39 # IPControllerTask, IPEngineTask,
40 # IPControllerTask, IPEngineTask,
40 # IPControllerJob, IPEngineSetJob
41 # IPControllerJob, IPEngineSetJob
@@ -75,7 +76,7 b' class UnknownStatus(LauncherError):'
75 pass
76 pass
76
77
77
78
78 class BaseLauncher(Configurable):
79 class BaseLauncher(LoggingFactory):
79 """An asbtraction for starting, stopping and signaling a process."""
80 """An asbtraction for starting, stopping and signaling a process."""
80
81
81 # In all of the launchers, the work_dir is where child processes will be
82 # In all of the launchers, the work_dir is where child processes will be
@@ -90,8 +91,8 b' class BaseLauncher(Configurable):'
90 def _loop_default(self):
91 def _loop_default(self):
91 return ioloop.IOLoop.instance()
92 return ioloop.IOLoop.instance()
92
93
93 def __init__(self, work_dir=u'.', config=None):
94 def __init__(self, work_dir=u'.', config=None, **kwargs):
94 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config)
95 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
95 self.state = 'before' # can be before, running, after
96 self.state = 'before' # can be before, running, after
96 self.stop_callbacks = []
97 self.stop_callbacks = []
97 self.start_data = None
98 self.start_data = None
@@ -163,7 +164,7 b' class BaseLauncher(Configurable):'
163 a pass-through so it can be used as a callback.
164 a pass-through so it can be used as a callback.
164 """
165 """
165
166
166 logging.info('Process %r started: %r' % (self.args[0], data))
167 self.log.info('Process %r started: %r' % (self.args[0], data))
167 self.start_data = data
168 self.start_data = data
168 self.state = 'running'
169 self.state = 'running'
169 return data
170 return data
@@ -174,7 +175,7 b' class BaseLauncher(Configurable):'
174 This logs the process stopping and sets the state to 'after'. Call
175 This logs the process stopping and sets the state to 'after'. Call
175 this to trigger all the deferreds from :func:`observe_stop`."""
176 this to trigger all the deferreds from :func:`observe_stop`."""
176
177
177 logging.info('Process %r stopped: %r' % (self.args[0], data))
178 self.log.info('Process %r stopped: %r' % (self.args[0], data))
178 self.stop_data = data
179 self.stop_data = data
179 self.state = 'after'
180 self.state = 'after'
180 for i in range(len(self.stop_callbacks)):
181 for i in range(len(self.stop_callbacks)):
@@ -212,9 +213,9 b' class LocalProcessLauncher(BaseLauncher):'
212 cmd_and_args = List([])
213 cmd_and_args = List([])
213 poll_frequency = Int(100) # in ms
214 poll_frequency = Int(100) # in ms
214
215
215 def __init__(self, work_dir=u'.', config=None):
216 def __init__(self, work_dir=u'.', config=None, **kwargs):
216 super(LocalProcessLauncher, self).__init__(
217 super(LocalProcessLauncher, self).__init__(
217 work_dir=work_dir, config=config
218 work_dir=work_dir, config=config, **kwargs
218 )
219 )
219 self.process = None
220 self.process = None
220 self.start_deferred = None
221 self.start_deferred = None
@@ -259,7 +260,7 b' class LocalProcessLauncher(BaseLauncher):'
259 line = self.process.stdout.readline()
260 line = self.process.stdout.readline()
260 # a stopped process will be readable but return empty strings
261 # a stopped process will be readable but return empty strings
261 if line:
262 if line:
262 logging.info(line[:-1])
263 self.log.info(line[:-1])
263 else:
264 else:
264 self.poll()
265 self.poll()
265
266
@@ -267,7 +268,7 b' class LocalProcessLauncher(BaseLauncher):'
267 line = self.process.stderr.readline()
268 line = self.process.stderr.readline()
268 # a stopped process will be readable but return empty strings
269 # a stopped process will be readable but return empty strings
269 if line:
270 if line:
270 logging.error(line[:-1])
271 self.log.error(line[:-1])
271 else:
272 else:
272 self.poll()
273 self.poll()
273
274
@@ -294,7 +295,7 b' class LocalControllerLauncher(LocalProcessLauncher):'
294 """Start the controller by cluster_dir."""
295 """Start the controller by cluster_dir."""
295 self.controller_args.extend(['--cluster-dir', cluster_dir])
296 self.controller_args.extend(['--cluster-dir', cluster_dir])
296 self.cluster_dir = unicode(cluster_dir)
297 self.cluster_dir = unicode(cluster_dir)
297 logging.info("Starting LocalControllerLauncher: %r" % self.args)
298 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
298 return super(LocalControllerLauncher, self).start()
299 return super(LocalControllerLauncher, self).start()
299
300
300
301
@@ -327,9 +328,9 b' class LocalEngineSetLauncher(BaseLauncher):'
327 # launcher class
328 # launcher class
328 launcher_class = LocalEngineLauncher
329 launcher_class = LocalEngineLauncher
329
330
330 def __init__(self, work_dir=u'.', config=None):
331 def __init__(self, work_dir=u'.', config=None, **kwargs):
331 super(LocalEngineSetLauncher, self).__init__(
332 super(LocalEngineSetLauncher, self).__init__(
332 work_dir=work_dir, config=config
333 work_dir=work_dir, config=config, **kwargs
333 )
334 )
334 self.launchers = {}
335 self.launchers = {}
335 self.stop_data = {}
336 self.stop_data = {}
@@ -339,14 +340,14 b' class LocalEngineSetLauncher(BaseLauncher):'
339 self.cluster_dir = unicode(cluster_dir)
340 self.cluster_dir = unicode(cluster_dir)
340 dlist = []
341 dlist = []
341 for i in range(n):
342 for i in range(n):
342 el = self.launcher_class(work_dir=self.work_dir, config=self.config)
343 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
343 # Copy the engine args over to each engine launcher.
344 # Copy the engine args over to each engine launcher.
344 import copy
345 import copy
345 el.engine_args = copy.deepcopy(self.engine_args)
346 el.engine_args = copy.deepcopy(self.engine_args)
346 el.on_stop(self._notice_engine_stopped)
347 el.on_stop(self._notice_engine_stopped)
347 d = el.start(cluster_dir)
348 d = el.start(cluster_dir)
348 if i==0:
349 if i==0:
349 logging.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
350 self.launchers[i] = el
351 self.launchers[i] = el
351 dlist.append(d)
352 dlist.append(d)
352 self.notify_start(dlist)
353 self.notify_start(dlist)
@@ -431,7 +432,7 b' class MPIExecControllerLauncher(MPIExecLauncher):'
431 """Start the controller by cluster_dir."""
432 """Start the controller by cluster_dir."""
432 self.controller_args.extend(['--cluster-dir', cluster_dir])
433 self.controller_args.extend(['--cluster-dir', cluster_dir])
433 self.cluster_dir = unicode(cluster_dir)
434 self.cluster_dir = unicode(cluster_dir)
434 logging.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
435 return super(MPIExecControllerLauncher, self).start(1)
436 return super(MPIExecControllerLauncher, self).start(1)
436
437
437 def find_args(self):
438 def find_args(self):
@@ -453,7 +454,7 b' class MPIExecEngineSetLauncher(MPIExecLauncher):'
453 self.engine_args.extend(['--cluster-dir', cluster_dir])
454 self.engine_args.extend(['--cluster-dir', cluster_dir])
454 self.cluster_dir = unicode(cluster_dir)
455 self.cluster_dir = unicode(cluster_dir)
455 self.n = n
456 self.n = n
456 logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
457 return super(MPIExecEngineSetLauncher, self).start(n)
458 return super(MPIExecEngineSetLauncher, self).start(n)
458
459
459 def find_args(self):
460 def find_args(self):
@@ -572,7 +573,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
572 # else:
573 # else:
573 # raise LauncherError("Job id couldn't be determined: %s" % output)
574 # raise LauncherError("Job id couldn't be determined: %s" % output)
574 # self.job_id = job_id
575 # self.job_id = job_id
575 # logging.info('Job started with job id: %r' % job_id)
576 # self.log.info('Job started with job id: %r' % job_id)
576 # return job_id
577 # return job_id
577 #
578 #
578 # @inlineCallbacks
579 # @inlineCallbacks
@@ -584,7 +585,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
584 # '/jobfile:%s' % self.job_file,
585 # '/jobfile:%s' % self.job_file,
585 # '/scheduler:%s' % self.scheduler
586 # '/scheduler:%s' % self.scheduler
586 # ]
587 # ]
587 # logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
588 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
589 # output = yield getProcessOutput(str(self.job_cmd),
590 # output = yield getProcessOutput(str(self.job_cmd),
590 # [str(a) for a in args],
591 # [str(a) for a in args],
@@ -602,7 +603,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
602 # self.job_id,
603 # self.job_id,
603 # '/scheduler:%s' % self.scheduler
604 # '/scheduler:%s' % self.scheduler
604 # ]
605 # ]
605 # logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # try:
607 # try:
607 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
608 # # Twisted will raise DeprecationWarnings if we try to pass unicode to this
608 # output = yield getProcessOutput(str(self.job_cmd),
609 # output = yield getProcessOutput(str(self.job_cmd),
@@ -633,7 +634,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
633 # t.controller_args.extend(self.extra_args)
634 # t.controller_args.extend(self.extra_args)
634 # job.add_task(t)
635 # job.add_task(t)
635 #
636 #
636 # logging.info("Writing job description file: %s" % self.job_file)
637 # self.log.info("Writing job description file: %s" % self.job_file)
637 # job.write(self.job_file)
638 # job.write(self.job_file)
638 #
639 #
639 # @property
640 # @property
@@ -665,7 +666,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
665 # t.engine_args.extend(self.extra_args)
666 # t.engine_args.extend(self.extra_args)
666 # job.add_task(t)
667 # job.add_task(t)
667 #
668 #
668 # logging.info("Writing job description file: %s" % self.job_file)
669 # self.log.info("Writing job description file: %s" % self.job_file)
669 # job.write(self.job_file)
670 # job.write(self.job_file)
670 #
671 #
671 # @property
672 # @property
@@ -729,14 +730,14 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
729 # else:
730 # else:
730 # raise LauncherError("Job id couldn't be determined: %s" % output)
731 # raise LauncherError("Job id couldn't be determined: %s" % output)
731 # self.job_id = job_id
732 # self.job_id = job_id
732 # logging.info('Job started with job id: %r' % job_id)
733 # self.log.info('Job started with job id: %r' % job_id)
733 # return job_id
734 # return job_id
734 #
735 #
735 # def write_batch_script(self, n):
736 # def write_batch_script(self, n):
736 # """Instantiate and write the batch script to the work_dir."""
737 # """Instantiate and write the batch script to the work_dir."""
737 # self.context['n'] = n
738 # self.context['n'] = n
738 # script_as_string = Itpl.itplns(self.batch_template, self.context)
739 # script_as_string = Itpl.itplns(self.batch_template, self.context)
739 # logging.info('Writing instantiated batch script: %s' % self.batch_file)
740 # self.log.info('Writing instantiated batch script: %s' % self.batch_file)
740 # f = open(self.batch_file, 'w')
741 # f = open(self.batch_file, 'w')
741 # f.write(script_as_string)
742 # f.write(script_as_string)
742 # f.close()
743 # f.close()
@@ -783,7 +784,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
783 # # ${cluster_dir}
784 # # ${cluster_dir}
784 # self.context['cluster_dir'] = cluster_dir
785 # self.context['cluster_dir'] = cluster_dir
785 # self.cluster_dir = unicode(cluster_dir)
786 # self.cluster_dir = unicode(cluster_dir)
786 # logging.info("Starting PBSControllerLauncher: %r" % self.args)
787 # self.log.info("Starting PBSControllerLauncher: %r" % self.args)
787 # return super(PBSControllerLauncher, self).start(1)
788 # return super(PBSControllerLauncher, self).start(1)
788 #
789 #
789 #
790 #
@@ -795,7 +796,7 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
795 # """Start n engines by profile or cluster_dir."""
796 # """Start n engines by profile or cluster_dir."""
796 # self.program_args.extend(['--cluster-dir', cluster_dir])
797 # self.program_args.extend(['--cluster-dir', cluster_dir])
797 # self.cluster_dir = unicode(cluster_dir)
798 # self.cluster_dir = unicode(cluster_dir)
798 # logging.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 # self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
799 # return super(PBSEngineSetLauncher, self).start(n)
800 # return super(PBSEngineSetLauncher, self).start(n)
800
801
801
802
@@ -819,6 +820,6 b' class IPClusterLauncher(LocalProcessLauncher):'
819 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
820 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
820
821
821 def start(self):
822 def start(self):
822 logging.info("Starting ipcluster: %r" % self.args)
823 self.log.info("Starting ipcluster: %r" % self.args)
823 return super(IPClusterLauncher, self).start()
824 return super(IPClusterLauncher, self).start()
824
825
@@ -18,15 +18,16 b' import logging'
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21 from IPython.config.configurable import Configurable
22 from IPython.utils.traitlets import Int, Str, Instance, List
21 from IPython.utils.traitlets import Int, Str, Instance, List
23
22
23 from factory import LoggingFactory
24
24 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
25 # Classes
26 # Classes
26 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
27
28
28
29
29 class LogWatcher(Configurable):
30 class LogWatcher(LoggingFactory):
30 """A simple class that receives messages on a SUB socket, as published
31 """A simple class that receives messages on a SUB socket, as published
31 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32
33
@@ -43,21 +44,25 b' class LogWatcher(Configurable):'
43 def _loop_default(self):
44 def _loop_default(self):
44 return ioloop.IOLoop.instance()
45 return ioloop.IOLoop.instance()
45
46
46 def __init__(self, config=None):
47 def __init__(self, **kwargs):
47 super(LogWatcher, self).__init__(config=config)
48 super(LogWatcher, self).__init__(**kwargs)
48 s = self.context.socket(zmq.SUB)
49 s = self.context.socket(zmq.SUB)
49 s.bind(self.url)
50 s.bind(self.url)
50 self.stream = zmqstream.ZMQStream(s, self.loop)
51 self.stream = zmqstream.ZMQStream(s, self.loop)
51 self.subscribe()
52 self.subscribe()
52 self.on_trait_change(self.subscribe, 'topics')
53 self.on_trait_change(self.subscribe, 'topics')
53
54
55 def start(self):
54 self.stream.on_recv(self.log_message)
56 self.stream.on_recv(self.log_message)
55
57
58 def stop(self):
59 self.stream.stop_on_recv()
60
56 def subscribe(self):
61 def subscribe(self):
57 """Update our SUB socket's subscriptions."""
62 """Update our SUB socket's subscriptions."""
58 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
63 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
59 for topic in self.topics:
64 for topic in self.topics:
60 logging.debug("Subscribing to: %r"%topic)
65 self.log.debug("Subscribing to: %r"%topic)
61 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
66 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
62
67
63 def _extract_level(self, topic_str):
68 def _extract_level(self, topic_str):
@@ -79,7 +84,7 b' class LogWatcher(Configurable):'
79 def log_message(self, raw):
84 def log_message(self, raw):
80 """receive and parse a message, then log it."""
85 """receive and parse a message, then log it."""
81 if len(raw) != 2 or '.' not in raw[0]:
86 if len(raw) != 2 or '.' not in raw[0]:
82 logging.error("Invalid log message: %s"%raw)
87 self.log.error("Invalid log message: %s"%raw)
83 return
88 return
84 else:
89 else:
85 topic, msg = raw
90 topic, msg = raw
@@ -10,8 +10,9 b' Python Scheduler exists.'
10 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 from random import randint,random
13 import sys
14 import logging
14 import logging
15 from random import randint,random
15 from types import FunctionType
16 from types import FunctionType
16
17
17 try:
18 try:
@@ -24,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
24
25
25 # local imports
26 # local imports
26 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 # from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance, Dict, List, Set
29 from IPython.utils.traitlets import Instance, Dict, List, Set
29
30
30 import error
31 import error
@@ -32,12 +33,13 b' from client import Client'
32 from dependency import Dependency
33 from dependency import Dependency
33 import streamsession as ss
34 import streamsession as ss
34 from entry_point import connect_logger, local_logger
35 from entry_point import connect_logger, local_logger
36 from factory import LoggingFactory
35
37
36
38
37 @decorator
39 @decorator
38 def logged(f,self,*args,**kwargs):
40 def logged(f,self,*args,**kwargs):
39 # print ("#--------------------")
41 # print ("#--------------------")
40 logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 # print ("#--")
43 # print ("#--")
42 return f(self,*args, **kwargs)
44 return f(self,*args, **kwargs)
43
45
@@ -108,7 +110,7 b' def leastload(loads):'
108 # store empty default dependency:
110 # store empty default dependency:
109 MET = Dependency([])
111 MET = Dependency([])
110
112
111 class TaskScheduler(Configurable):
113 class TaskScheduler(LoggingFactory):
112 """Python TaskScheduler object.
114 """Python TaskScheduler object.
113
115
114 This is the simplest object that supports msg_id based
116 This is the simplest object that supports msg_id based
@@ -153,7 +155,7 b' class TaskScheduler(Configurable):'
153 unregistration_notification = self._unregister_engine
155 unregistration_notification = self._unregister_engine
154 )
156 )
155 self.notifier_stream.on_recv(self.dispatch_notification)
157 self.notifier_stream.on_recv(self.dispatch_notification)
156 logging.info("Scheduler started...%r"%self)
158 self.log.info("Scheduler started...%r"%self)
157
159
158 def resume_receiving(self):
160 def resume_receiving(self):
159 """Resume accepting jobs."""
161 """Resume accepting jobs."""
@@ -180,7 +182,7 b' class TaskScheduler(Configurable):'
180 try:
182 try:
181 handler(str(msg['content']['queue']))
183 handler(str(msg['content']['queue']))
182 except KeyError:
184 except KeyError:
183 logging.error("task::Invalid notification msg: %s"%msg)
185 self.log.error("task::Invalid notification msg: %s"%msg)
184
186
185 @logged
187 @logged
186 def _register_engine(self, uid):
188 def _register_engine(self, uid):
@@ -236,7 +238,7 b' class TaskScheduler(Configurable):'
236 try:
238 try:
237 idents, msg = self.session.feed_identities(raw_msg, copy=False)
239 idents, msg = self.session.feed_identities(raw_msg, copy=False)
238 except Exception as e:
240 except Exception as e:
239 logging.error("task::Invaid msg: %s"%msg)
241 self.log.error("task::Invaid msg: %s"%msg)
240 return
242 return
241
243
242 # send to monitor
244 # send to monitor
@@ -277,7 +279,7 b' class TaskScheduler(Configurable):'
277 def fail_unreachable(self, msg_id):
279 def fail_unreachable(self, msg_id):
278 """a message has become unreachable"""
280 """a message has become unreachable"""
279 if msg_id not in self.depending:
281 if msg_id not in self.depending:
280 logging.error("msg %r already failed!"%msg_id)
282 self.log.error("msg %r already failed!"%msg_id)
281 return
283 return
282 raw_msg, after, follow = self.depending.pop(msg_id)
284 raw_msg, after, follow = self.depending.pop(msg_id)
283 for mid in follow.union(after):
285 for mid in follow.union(after):
@@ -369,7 +371,7 b' class TaskScheduler(Configurable):'
369 try:
371 try:
370 idents,msg = self.session.feed_identities(raw_msg, copy=False)
372 idents,msg = self.session.feed_identities(raw_msg, copy=False)
371 except Exception as e:
373 except Exception as e:
372 logging.error("task::Invaid result: %s"%msg)
374 self.log.error("task::Invaid result: %s"%msg)
373 return
375 return
374 msg = self.session.unpack_message(msg, content=False, copy=False)
376 msg = self.session.unpack_message(msg, content=False, copy=False)
375 header = msg['header']
377 header = msg['header']
@@ -470,7 +472,7 b' class TaskScheduler(Configurable):'
470
472
471
473
472
474
473 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
475 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
474 from zmq.eventloop import ioloop
476 from zmq.eventloop import ioloop
475 from zmq.eventloop.zmqstream import ZMQStream
477 from zmq.eventloop.zmqstream import ZMQStream
476
478
@@ -490,13 +492,13 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle'
490 scheme = globals().get(scheme, None)
492 scheme = globals().get(scheme, None)
491 # setup logging
493 # setup logging
492 if log_addr:
494 if log_addr:
493 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
495 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
494 else:
496 else:
495 local_logger(loglevel)
497 local_logger(logname, loglevel)
496
498
497 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
499 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
498 mon_stream=mons,notifier_stream=nots,
500 mon_stream=mons,notifier_stream=nots,
499 scheme=scheme,io_loop=loop)
501 scheme=scheme,io_loop=loop, logname=logname)
500
502
501 try:
503 try:
502 loop.start()
504 loop.start()
@@ -110,7 +110,7 b' class Kernel(SessionFactory):'
110 s = _Passer()
110 s = _Passer()
111 content = dict(silent=True, user_variable=[],user_expressions=[])
111 content = dict(silent=True, user_variable=[],user_expressions=[])
112 for line in self.exec_lines:
112 for line in self.exec_lines:
113 logging.debug("executing initialization: %s"%line)
113 self.log.debug("executing initialization: %s"%line)
114 content.update({'code':line})
114 content.update({'code':line})
115 msg = self.session.msg('execute_request', content)
115 msg = self.session.msg('execute_request', content)
116 self.execute_request(s, [], msg)
116 self.execute_request(s, [], msg)
@@ -139,8 +139,8 b' class Kernel(SessionFactory):'
139
139
140 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
140 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
141 # msg = self.reply_socket.recv_json()
141 # msg = self.reply_socket.recv_json()
142 logging.info("Aborting:")
142 self.log.info("Aborting:")
143 logging.info(str(msg))
143 self.log.info(str(msg))
144 msg_type = msg['msg_type']
144 msg_type = msg['msg_type']
145 reply_type = msg_type.split('_')[0] + '_reply'
145 reply_type = msg_type.split('_')[0] + '_reply'
146 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
146 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
@@ -148,7 +148,7 b' class Kernel(SessionFactory):'
148 # self.reply_socket.send_json(reply_msg)
148 # self.reply_socket.send_json(reply_msg)
149 reply_msg = self.session.send(stream, reply_type,
149 reply_msg = self.session.send(stream, reply_type,
150 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
150 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
151 logging.debug(str(reply_msg))
151 self.log.debug(str(reply_msg))
152 # We need to wait a bit for requests to come in. This can probably
152 # We need to wait a bit for requests to come in. This can probably
153 # be set shorter for true asynchronous clients.
153 # be set shorter for true asynchronous clients.
154 time.sleep(0.05)
154 time.sleep(0.05)
@@ -166,7 +166,7 b' class Kernel(SessionFactory):'
166 content = dict(status='ok')
166 content = dict(status='ok')
167 reply_msg = self.session.send(stream, 'abort_reply', content=content,
167 reply_msg = self.session.send(stream, 'abort_reply', content=content,
168 parent=parent, ident=ident)[0]
168 parent=parent, ident=ident)[0]
169 logging.debug(str(reply_msg))
169 self.log.debug(str(reply_msg))
170
170
171 def shutdown_request(self, stream, ident, parent):
171 def shutdown_request(self, stream, ident, parent):
172 """kill ourself. This should really be handled in an external process"""
172 """kill ourself. This should really be handled in an external process"""
@@ -191,7 +191,7 b' class Kernel(SessionFactory):'
191 try:
191 try:
192 msg = self.session.unpack_message(msg, content=True, copy=False)
192 msg = self.session.unpack_message(msg, content=True, copy=False)
193 except:
193 except:
194 logging.error("Invalid Message", exc_info=True)
194 self.log.error("Invalid Message", exc_info=True)
195 return
195 return
196
196
197 header = msg['header']
197 header = msg['header']
@@ -199,7 +199,7 b' class Kernel(SessionFactory):'
199
199
200 handler = self.control_handlers.get(msg['msg_type'], None)
200 handler = self.control_handlers.get(msg['msg_type'], None)
201 if handler is None:
201 if handler is None:
202 logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
202 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
203 else:
203 else:
204 handler(self.control_stream, idents, msg)
204 handler(self.control_stream, idents, msg)
205
205
@@ -240,11 +240,11 b' class Kernel(SessionFactory):'
240 self._initial_exec_lines()
240 self._initial_exec_lines()
241
241
242 def execute_request(self, stream, ident, parent):
242 def execute_request(self, stream, ident, parent):
243 logging.debug('execute request %s'%parent)
243 self.log.debug('execute request %s'%parent)
244 try:
244 try:
245 code = parent[u'content'][u'code']
245 code = parent[u'content'][u'code']
246 except:
246 except:
247 logging.error("Got bad msg: %s"%parent, exc_info=True)
247 self.log.error("Got bad msg: %s"%parent, exc_info=True)
248 return
248 return
249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
249 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
250 ident='%s.pyin'%self.prefix)
250 ident='%s.pyin'%self.prefix)
@@ -268,7 +268,7 b' class Kernel(SessionFactory):'
268
268
269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
269 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
270 ident=ident, subheader = dict(started=started))
270 ident=ident, subheader = dict(started=started))
271 logging.debug(str(reply_msg))
271 self.log.debug(str(reply_msg))
272 if reply_msg['content']['status'] == u'error':
272 if reply_msg['content']['status'] == u'error':
273 self.abort_queues()
273 self.abort_queues()
274
274
@@ -290,7 +290,7 b' class Kernel(SessionFactory):'
290 msg_id = parent['header']['msg_id']
290 msg_id = parent['header']['msg_id']
291 bound = content.get('bound', False)
291 bound = content.get('bound', False)
292 except:
292 except:
293 logging.error("Got bad msg: %s"%parent, exc_info=True)
293 self.log.error("Got bad msg: %s"%parent, exc_info=True)
294 return
294 return
295 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
295 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
296 # self.iopub_stream.send(pyin_msg)
296 # self.iopub_stream.send(pyin_msg)
@@ -364,7 +364,7 b' class Kernel(SessionFactory):'
364 try:
364 try:
365 msg = self.session.unpack_message(msg, content=True, copy=False)
365 msg = self.session.unpack_message(msg, content=True, copy=False)
366 except:
366 except:
367 logging.error("Invalid Message", exc_info=True)
367 self.log.error("Invalid Message", exc_info=True)
368 return
368 return
369
369
370
370
@@ -379,7 +379,7 b' class Kernel(SessionFactory):'
379 return
379 return
380 handler = self.shell_handlers.get(msg['msg_type'], None)
380 handler = self.shell_handlers.get(msg['msg_type'], None)
381 if handler is None:
381 if handler is None:
382 logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
382 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
383 else:
383 else:
384 handler(stream, idents, msg)
384 handler(stream, idents, msg)
385
385
@@ -158,7 +158,7 b" def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, pas"
158 ssh="ssh "
158 ssh="ssh "
159 if keyfile:
159 if keyfile:
160 ssh += "-i " + keyfile
160 ssh += "-i " + keyfile
161 cmd = ssh + " -f -L 127.0.0.1:%i:127.0.0.1:%i %s sleep %i"%(lport, rport, server, timeout)
161 cmd = ssh + " -f -L 127.0.0.1:%i:%s:%i %s sleep %i"%(lport, remoteip, rport, server, timeout)
162 tunnel = pexpect.spawn(cmd)
162 tunnel = pexpect.spawn(cmd)
163 failed = False
163 failed = False
164 while True:
164 while True:
General Comments 0
You need to be logged in to leave comments. Login now