diff --git a/IPython/zmq/parallel/clusterdir.py b/IPython/zmq/parallel/clusterdir.py index 8afddbb..8f0290e 100755 --- a/IPython/zmq/parallel/clusterdir.py +++ b/IPython/zmq/parallel/clusterdir.py @@ -480,12 +480,11 @@ class ApplicationWithClusterDir(Application): open_log_file = None else: open_log_file = sys.stdout - logger = logging.getLogger() - level = self.log_level - self.log = logger - # since we've reconnected the logger, we need to reconnect the log-level - self.log_level = level - if open_log_file is not None and self._log_handler not in self.log.handlers: + if open_log_file is not None: + self.log.removeHandler(self._log_handler) + self._log_handler = logging.StreamHandler(open_log_file) + self._log_formatter = logging.Formatter("[%(name)s] %(message)s") + self._log_handler.setFormatter(self._log_formatter) self.log.addHandler(self._log_handler) # log.startLogging(open_log_file) diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index 1039112..6365c25 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -44,14 +44,12 @@ class ControllerFactory(HubFactory): children = List() mq_class = Str('zmq.devices.ProcessMonitoredQueue') - def _update_mq(self): - self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if self.usethreads else 'Process') + def _usethreads_changed(self, name, old, new): + self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process') def __init__(self, **kwargs): super(ControllerFactory, self).__init__(**kwargs) self.subconstructors.append(self.construct_schedulers) - self._update_mq() - self.on_trait_change(self._update_mq, 'usethreads') def start(self): super(ControllerFactory, self).start() @@ -91,7 +89,7 @@ class ControllerFactory(HubFactory): children.append(q) # Task Queue (in a Process) if self.scheme == 'pure': - logging.warn("task::using pure XREQ Task scheduler") + self.log.warn("task::using pure XREQ Task scheduler") q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') q.bind_in(self.client_addrs['task']) q.bind_out(self.engine_addrs['task']) @@ -99,10 +97,10 @@ class ControllerFactory(HubFactory): q.daemon=True children.append(q) elif self.scheme == 'none': - logging.warn("task::using no Task scheduler") + self.log.warn("task::using no Task scheduler") else: - logging.warn("task::using Python %s Task scheduler"%self.scheme) + self.log.warn("task::using Python %s Task scheduler"%self.scheme) sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification']) q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme)) q.daemon=True diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index 41509d9..6bd1ddf 100755 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -25,8 +25,8 @@ from streamkernel import Kernel import heartmonitor def printer(*msg): - # print (logging.handlers, file=sys.__stdout__) - logging.info(str(msg)) + # print (self.log.handlers, file=sys.__stdout__) + self.log.info(str(msg)) class EngineFactory(RegistrationFactory): """IPython engine""" @@ -54,7 +54,7 @@ class EngineFactory(RegistrationFactory): def register(self): """send the registration_request""" - logging.info("registering") + self.log.info("registering") content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) self.registrar.on_recv(self.complete_registration) # print (self.session.key) @@ -112,10 +112,9 @@ class EngineFactory(RegistrationFactory): sys.displayhook = self.display_hook_factory(self.session, iopub_stream) sys.displayhook.topic = 'engine.%i.pyout'%self.id - self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session, - control_stream=control_stream, - shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop, - user_ns = self.user_ns, config=self.config) + self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, + control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, + loop=loop, user_ns = self.user_ns, logname=self.log.name) self.kernel.start() heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) @@ -124,10 +123,10 @@ class EngineFactory(RegistrationFactory): else: - logging.error("Registration Failed: %s"%msg) + self.log.error("Registration Failed: %s"%msg) raise Exception("Registration Failed: %s"%msg) - logging.info("Completed registration with id %i"%self.id) + self.log.info("Completed registration with id %i"%self.id) def unregister(self): diff --git a/IPython/zmq/parallel/entry_point.py b/IPython/zmq/parallel/entry_point.py index eb1c63a..2d44e9f 100644 --- a/IPython/zmq/parallel/entry_point.py +++ b/IPython/zmq/parallel/entry_point.py @@ -79,8 +79,8 @@ def integer_loglevel(loglevel): loglevel = getattr(logging, loglevel) return loglevel -def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): - logger = logging.getLogger() +def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG): + logger = logging.getLogger(logname) if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]): # don't add a second PUBHandler return @@ -106,9 +106,9 @@ def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): logger.addHandler(handler) logger.setLevel(loglevel) -def local_logger(loglevel=logging.DEBUG): +def local_logger(logname, loglevel=logging.DEBUG): loglevel = integer_loglevel(loglevel) - logger = logging.getLogger() + logger = logging.getLogger(logname) if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]): # don't add a second StreamHandler return diff --git a/IPython/zmq/parallel/factory.py b/IPython/zmq/parallel/factory.py index 04d5173..f406be3 100644 --- a/IPython/zmq/parallel/factory.py +++ b/IPython/zmq/parallel/factory.py @@ -29,9 +29,15 @@ import IPython.zmq.parallel.streamsession as ss #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- +class LoggingFactory(Configurable): + """A most basic class, that has a `log` (type:`Logger`) attribute, set via a `logname` Trait.""" + log = Instance('logging.Logger', ('ZMQ', logging.WARN)) + logname = CStr('ZMQ') + def _logname_changed(self, name, old, new): + self.log = logging.getLogger(new) + - -class SessionFactory(Configurable): +class SessionFactory(LoggingFactory): """The Base factory from which every factory in IPython.zmq.parallel inherits""" packer = Str('',config=True) @@ -41,14 +47,14 @@ class SessionFactory(Configurable): return str(uuid.uuid4()) username = Str(os.environ.get('USER','username'),config=True) exec_key = CUnicode('',config=True) - # not configurable: context = Instance('zmq.Context', (), {}) session = Instance('IPython.zmq.parallel.streamsession.StreamSession') - loop = Instance('zmq.eventloop.ioloop.IOLoop') + loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False) def _loop_default(self): return IOLoop.instance() + def __init__(self, **kwargs): super(SessionFactory, self).__init__(**kwargs) diff --git a/IPython/zmq/parallel/heartmonitor.py b/IPython/zmq/parallel/heartmonitor.py index b722f4f..436e7f7 100644 --- a/IPython/zmq/parallel/heartmonitor.py +++ b/IPython/zmq/parallel/heartmonitor.py @@ -13,6 +13,9 @@ import zmq from zmq.devices import ProcessDevice,ThreadDevice from zmq.eventloop import ioloop, zmqstream +from IPython.utils.traitlets import Set, Instance, CFloat, Bool +from factory import LoggingFactory + class Heart(object): """A basic heart object for responding to a HeartMonitor. This is a simple wrapper with defaults for the most common @@ -39,36 +42,35 @@ class Heart(object): def start(self): return self.device.start() -class HeartMonitor(object): +class HeartMonitor(LoggingFactory): """A basic HeartMonitor class pingstream: a PUB stream pongstream: an XREP stream period: the period of the heartbeat in milliseconds""" - loop=None - pingstream=None - pongstream=None - period=None - hearts=None - on_probation=None - last_ping=None - # debug=False - def __init__(self, loop, pingstream, pongstream, period=1000): - self.loop = loop - self.period = period + period=CFloat(1000, config=True) # in milliseconds + + pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') + pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') + loop = Instance('zmq.eventloop.ioloop.IOLoop') + def _loop_default(self): + return ioloop.IOLoop.instance() + debug=Bool(False) + + # not settable: + hearts=Set() + responses=Set() + on_probation=Set() + last_ping=CFloat(0) + _new_handlers = Set() + _failure_handlers = Set() + lifetime = CFloat(0) + tic = CFloat(0) + + def __init__(self, **kwargs): + super(HeartMonitor, self).__init__(**kwargs) - self.pingstream = pingstream - self.pongstream = pongstream self.pongstream.on_recv(self.handle_pong) - - self.hearts = set() - self.responses = set() - self.on_probation = set() - self.lifetime = 0 - self.tic = time.time() - - self._new_handlers = set() - self._failure_handlers = set() def start(self): self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) @@ -76,12 +78,12 @@ class HeartMonitor(object): def add_new_heart_handler(self, handler): """add a new handler for new hearts""" - logging.debug("heartbeat::new_heart_handler: %s"%handler) + self.log.debug("heartbeat::new_heart_handler: %s"%handler) self._new_handlers.add(handler) def add_heart_failure_handler(self, handler): """add a new handler for heart failure""" - logging.debug("heartbeat::new heart failure handler: %s"%handler) + self.log.debug("heartbeat::new heart failure handler: %s"%handler) self._failure_handlers.add(handler) def beat(self): @@ -91,7 +93,7 @@ class HeartMonitor(object): toc = time.time() self.lifetime += toc-self.tic self.tic = toc - # logging.debug("heartbeat::%s"%self.lifetime) + # self.log.debug("heartbeat::%s"%self.lifetime) goodhearts = self.hearts.intersection(self.responses) missed_beats = self.hearts.difference(goodhearts) heartfailures = self.on_probation.intersection(missed_beats) @@ -101,7 +103,7 @@ class HeartMonitor(object): self.on_probation = missed_beats.intersection(self.hearts) self.responses = set() # print self.on_probation, self.hearts - # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) + # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) self.pingstream.send(str(self.lifetime)) def handle_new_heart(self, heart): @@ -109,7 +111,7 @@ class HeartMonitor(object): for handler in self._new_handlers: handler(heart) else: - logging.info("heartbeat::yay, got new heart %s!"%heart) + self.log.info("heartbeat::yay, got new heart %s!"%heart) self.hearts.add(heart) def handle_heart_failure(self, heart): @@ -118,10 +120,10 @@ class HeartMonitor(object): try: handler(heart) except Exception as e: - logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) + self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) pass else: - logging.info("heartbeat::Heart %s failed :("%heart) + self.log.info("heartbeat::Heart %s failed :("%heart) self.hearts.remove(heart) @@ -129,14 +131,14 @@ class HeartMonitor(object): "a heart just beat" if msg[1] == str(self.lifetime): delta = time.time()-self.tic - # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) + # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) elif msg[1] == str(self.last_ping): delta = time.time()-self.tic + (self.lifetime-self.last_ping) - logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) + self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) else: - logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% + self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% (msg[1],self.lifetime)) diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index e215ece..2b3a2aa 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -30,7 +30,7 @@ from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, Li from IPython.utils.importstring import import_item from entry_point import select_random_ports -from factory import RegistrationFactory +from factory import RegistrationFactory, LoggingFactory from streamsession import Message, wrap_exception, ISO8601 from heartmonitor import HeartMonitor @@ -95,10 +95,6 @@ class EngineConnector(HasTraits): registration=Str() heartbeat=Str() pending=Set() - - def __init__(self, **kwargs): - super(EngineConnector, self).__init__(**kwargs) - logging.info("engine::Engine Connected: %i"%self.id) class HubFactory(RegistrationFactory): """The Configurable for setting up a Hub.""" @@ -193,7 +189,7 @@ class HubFactory(RegistrationFactory): def start(self): assert self._constructed, "must be constructed by self.construct() first!" self.heartmonitor.start() - logging.info("Heartmonitor started") + self.log.info("Heartmonitor started") def construct_hub(self): """construct""" @@ -206,10 +202,10 @@ class HubFactory(RegistrationFactory): # Registrar socket reg = ZMQStream(ctx.socket(zmq.XREP), loop) reg.bind(client_iface % self.regport) - logging.info("Hub listening on %s for registration."%(client_iface%self.regport)) + self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) if self.client_ip != self.engine_ip: reg.bind(engine_iface % self.regport) - logging.info("Hub listening on %s for registration."%(engine_iface%self.regport)) + self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport)) ### Engine connections ### @@ -218,8 +214,8 @@ class HubFactory(RegistrationFactory): hpub.bind(engine_iface % self.hb[0]) hrep = ctx.socket(zmq.XREP) hrep.bind(engine_iface % self.hb[1]) - - self.heartmonitor = HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop), self.ping) + self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop), + period=self.ping, logname=self.log.name) ### Client connections ### # Clientele socket @@ -259,14 +255,15 @@ class HubFactory(RegistrationFactory): 'iopub' : client_iface%self.iopub[0], 'notification': client_iface%self.notifier_port } - logging.debug("hub::Hub engine addrs: %s"%self.engine_addrs) - logging.debug("hub::Hub client addrs: %s"%self.client_addrs) + self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs) + self.log.debug("hub::Hub client addrs: %s"%self.client_addrs) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, registrar=reg, clientele=c, notifier=n, db=self.db, - engine_addrs=self.engine_addrs, client_addrs=self.client_addrs) + engine_addrs=self.engine_addrs, client_addrs=self.client_addrs, + logname=self.log.name) -class Hub(HasTraits): +class Hub(LoggingFactory): """The IPython Controller Hub with 0MQ connections Parameters @@ -371,7 +368,7 @@ class Hub(HasTraits): 'connection_request': self.connection_request, } - logging.info("hub::created hub") + self.log.info("hub::created hub") @property def _next_id(self): @@ -422,7 +419,7 @@ class Hub(HasTraits): try: msg = self.session.unpack_message(msg[1:], content=True) except: - logging.error("client::Invalid Message %s"%msg, exc_info=True) + self.log.error("client::Invalid Message %s"%msg, exc_info=True) return False msg_type = msg.get('msg_type', None) @@ -439,15 +436,15 @@ class Hub(HasTraits): def dispatch_register_request(self, msg): """""" - logging.debug("registration::dispatch_register_request(%s)"%msg) + self.log.debug("registration::dispatch_register_request(%s)"%msg) idents,msg = self.session.feed_identities(msg) if not idents: - logging.error("Bad Queue Message: %s"%msg, exc_info=True) + self.log.error("Bad Queue Message: %s"%msg, exc_info=True) return try: msg = self.session.unpack_message(msg,content=True) except: - logging.error("registration::got bad registration message: %s"%msg, exc_info=True) + self.log.error("registration::got bad registration message: %s"%msg, exc_info=True) return msg_type = msg['msg_type'] @@ -455,38 +452,38 @@ class Hub(HasTraits): handler = self.registrar_handlers.get(msg_type, None) if handler is None: - logging.error("registration::got bad registration message: %s"%msg) + self.log.error("registration::got bad registration message: %s"%msg) else: handler(idents, msg) def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" - logging.debug("monitor traffic: %s"%msg[:2]) + self.log.debug("monitor traffic: %s"%msg[:2]) switch = msg[0] idents, msg = self.session.feed_identities(msg[1:]) if not idents: - logging.error("Bad Monitor Message: %s"%msg) + self.log.error("Bad Monitor Message: %s"%msg) return handler = self.monitor_handlers.get(switch, None) if handler is not None: handler(idents, msg) else: - logging.error("Invalid monitor topic: %s"%switch) + self.log.error("Invalid monitor topic: %s"%switch) def dispatch_client_msg(self, msg): """Route messages from clients""" idents, msg = self.session.feed_identities(msg) if not idents: - logging.error("Bad Client Message: %s"%msg) + self.log.error("Bad Client Message: %s"%msg) return client_id = idents[0] try: msg = self.session.unpack_message(msg, content=True) except: content = wrap_exception() - logging.error("Bad Client Message: %s"%msg, exc_info=True) + self.log.error("Bad Client Message: %s"%msg, exc_info=True) self.session.send(self.clientele, "hub_error", ident=client_id, content=content) return @@ -494,13 +491,13 @@ class Hub(HasTraits): # print client_id, header, parent, content #switch on message type: msg_type = msg['msg_type'] - logging.info("client:: client %s requested %s"%(client_id, msg_type)) + self.log.info("client:: client %s requested %s"%(client_id, msg_type)) handler = self.client_handlers.get(msg_type, None) try: assert handler is not None, "Bad Message Type: %s"%msg_type except: content = wrap_exception() - logging.error("Bad Message Type: %s"%msg_type, exc_info=True) + self.log.error("Bad Message Type: %s"%msg_type, exc_info=True) self.session.send(self.clientele, "hub_error", ident=client_id, content=content) return @@ -521,9 +518,9 @@ class Hub(HasTraits): """handler to attach to heartbeater. Called when a new heart starts to beat. Triggers completion of registration.""" - logging.debug("heartbeat::handle_new_heart(%r)"%heart) + self.log.debug("heartbeat::handle_new_heart(%r)"%heart) if heart not in self.incoming_registrations: - logging.info("heartbeat::ignoring new heart: %r"%heart) + self.log.info("heartbeat::ignoring new heart: %r"%heart) else: self.finish_registration(heart) @@ -532,11 +529,11 @@ class Hub(HasTraits): """handler to attach to heartbeater. called when a previously registered heart fails to respond to beat request. triggers unregistration""" - logging.debug("heartbeat::handle_heart_failure(%r)"%heart) + self.log.debug("heartbeat::handle_heart_failure(%r)"%heart) eid = self.hearts.get(heart, None) queue = self.engines[eid].queue if eid is None: - logging.info("heartbeat::ignoring heart failure %r"%heart) + self.log.info("heartbeat::ignoring heart failure %r"%heart) else: self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) @@ -544,19 +541,19 @@ class Hub(HasTraits): def save_queue_request(self, idents, msg): if len(idents) < 2: - logging.error("invalid identity prefix: %s"%idents) + self.log.error("invalid identity prefix: %s"%idents) return queue_id, client_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) except: - logging.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) + self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: - logging.error("queue::target %r not registered"%queue_id) - logging.debug("queue:: valid are: %s"%(self.by_ident.keys())) + self.log.error("queue::target %r not registered"%queue_id) + self.log.debug("queue:: valid are: %s"%(self.by_ident.keys())) return header = msg['header'] @@ -573,21 +570,21 @@ class Hub(HasTraits): def save_queue_result(self, idents, msg): if len(idents) < 2: - logging.error("invalid identity prefix: %s"%idents) + self.log.error("invalid identity prefix: %s"%idents) return client_id, queue_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) except: - logging.error("queue::engine %r sent invalid message to %r: %s"%( + self.log.error("queue::engine %r sent invalid message to %r: %s"%( queue_id,client_id, msg), exc_info=True) return eid = self.by_ident.get(queue_id, None) if eid is None: - logging.error("queue::unknown engine %r is sending a reply: "%queue_id) - logging.debug("queue:: %s"%msg[2:]) + self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) + self.log.debug("queue:: %s"%msg[2:]) return parent = msg['parent_header'] @@ -616,7 +613,7 @@ class Hub(HasTraits): result['result_buffers'] = msg['buffers'] self.db.update_record(msg_id, result) else: - logging.debug("queue:: unknown msg finished %s"%msg_id) + self.log.debug("queue:: unknown msg finished %s"%msg_id) #--------------------- Task Queue Traffic ------------------------------ @@ -627,7 +624,7 @@ class Hub(HasTraits): try: msg = self.session.unpack_message(msg, content=False) except: - logging.error("task::client %r sent invalid task message: %s"%( + self.log.error("task::client %r sent invalid task message: %s"%( client_id, msg), exc_info=True) return record = init_record(msg) @@ -646,7 +643,7 @@ class Hub(HasTraits): try: msg = self.session.unpack_message(msg, content=False) except: - logging.error("task::invalid task result message send to %r: %s"%( + self.log.error("task::invalid task result message send to %r: %s"%( client_id, msg), exc_info=True) raise return @@ -654,7 +651,7 @@ class Hub(HasTraits): parent = msg['parent_header'] if not parent: # print msg - logging.warn("Task %r had no parent!"%msg) + self.log.warn("Task %r had no parent!"%msg) return msg_id = parent['msg_id'] @@ -687,13 +684,13 @@ class Hub(HasTraits): self.db.update_record(msg_id, result) else: - logging.debug("task::unknown task %s finished"%msg_id) + self.log.debug("task::unknown task %s finished"%msg_id) def save_task_destination(self, idents, msg): try: msg = self.session.unpack_message(msg, content=True) except: - logging.error("task::invalid task tracking message", exc_info=True) + self.log.error("task::invalid task tracking message", exc_info=True) return content = msg['content'] print (content) @@ -701,11 +698,11 @@ class Hub(HasTraits): engine_uuid = content['engine_id'] eid = self.by_ident[engine_uuid] - logging.info("task::task %s arrived on %s"%(msg_id, eid)) + self.log.info("task::task %s arrived on %s"%(msg_id, eid)) # if msg_id in self.mia: # self.mia.remove(msg_id) # else: - # logging.debug("task::task %s not listed as MIA?!"%(msg_id)) + # self.log.debug("task::task %s not listed as MIA?!"%(msg_id)) self.tasks[eid].append(msg_id) # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) @@ -726,12 +723,12 @@ class Hub(HasTraits): try: msg = self.session.unpack_message(msg, content=True) except: - logging.error("iopub::invalid IOPub message", exc_info=True) + self.log.error("iopub::invalid IOPub message", exc_info=True) return parent = msg['parent_header'] if not parent: - logging.error("iopub::invalid IOPub message: %s"%msg) + self.log.error("iopub::invalid IOPub message: %s"%msg) return msg_id = parent['msg_id'] msg_type = msg['msg_type'] @@ -741,7 +738,7 @@ class Hub(HasTraits): try: rec = self.db.get_record(msg_id) except: - logging.error("iopub::IOPub message has invalid parent", exc_info=True) + self.log.error("iopub::IOPub message has invalid parent", exc_info=True) return # stream d = {} @@ -765,7 +762,7 @@ class Hub(HasTraits): def connection_request(self, client_id, msg): """Reply with connection addresses for clients.""" - logging.info("client::client %s connected"%client_id) + self.log.info("client::client %s connected"%client_id) content = dict(status='ok') content.update(self.client_addrs) jsonable = {} @@ -780,14 +777,14 @@ class Hub(HasTraits): try: queue = content['queue'] except KeyError: - logging.error("registration::queue not specified", exc_info=True) + self.log.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) """register a new engine, and create the socket(s) necessary""" eid = self._next_id # print (eid, queue, reg, heart) - logging.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) + self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) content = dict(id=eid,status='ok') content.update(self.engine_addrs) @@ -797,12 +794,12 @@ class Hub(HasTraits): raise KeyError("queue_id %r in use"%queue) except: content = wrap_exception() - logging.error("queue_id %r in use"%queue, exc_info=True) + self.log.error("queue_id %r in use"%queue, exc_info=True) elif heart in self.hearts: # need to check unique hearts? try: raise KeyError("heart_id %r in use"%heart) except: - logging.error("heart_id %r in use"%heart, exc_info=True) + self.log.error("heart_id %r in use"%heart, exc_info=True) content = wrap_exception() else: for h, pack in self.incoming_registrations.iteritems(): @@ -810,14 +807,14 @@ class Hub(HasTraits): try: raise KeyError("heart_id %r in use"%heart) except: - logging.error("heart_id %r in use"%heart, exc_info=True) + self.log.error("heart_id %r in use"%heart, exc_info=True) content = wrap_exception() break elif queue == pack[1]: try: raise KeyError("queue_id %r in use"%queue) except: - logging.error("queue_id %r in use"%queue, exc_info=True) + self.log.error("queue_id %r in use"%queue, exc_info=True) content = wrap_exception() break @@ -836,7 +833,7 @@ class Hub(HasTraits): dc.start() self.incoming_registrations[heart] = (eid,queue,reg[0],dc) else: - logging.error("registration::registration %i failed: %s"%(eid, content['evalue'])) + self.log.error("registration::registration %i failed: %s"%(eid, content['evalue'])) return eid def unregister_engine(self, ident, msg): @@ -844,9 +841,9 @@ class Hub(HasTraits): try: eid = msg['content']['id'] except: - logging.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) + self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) return - logging.info("registration::unregister_engine(%s)"%eid) + self.log.info("registration::unregister_engine(%s)"%eid) content=dict(id=eid, queue=self.engines[eid].queue) self.ids.remove(eid) self.keytable.pop(eid) @@ -867,9 +864,9 @@ class Hub(HasTraits): try: (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) except KeyError: - logging.error("registration::tried to finish nonexistant registration", exc_info=True) + self.log.error("registration::tried to finish nonexistant registration", exc_info=True) return - logging.info("registration::finished registering engine %i:%r"%(eid,queue)) + self.log.info("registration::finished registering engine %i:%r"%(eid,queue)) if purge is not None: purge.stop() control = queue @@ -885,11 +882,12 @@ class Hub(HasTraits): content = dict(id=eid, queue=self.engines[eid].queue) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) + self.log.info("engine::Engine Connected: %i"%eid) def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: eid = self.incoming_registrations.pop(heart)[0] - logging.info("registration::purging stalled registration: %i"%eid) + self.log.info("registration::purging stalled registration: %i"%eid) else: pass @@ -910,7 +908,7 @@ class Hub(HasTraits): dc.start() def _shutdown(self): - logging.info("hub::hub shutting down.") + self.log.info("hub::hub shutting down.") time.sleep(0.1) sys.exit(0) diff --git a/IPython/zmq/parallel/ipclusterapp.py b/IPython/zmq/parallel/ipclusterapp.py index a311ef9..fb70113 100755 --- a/IPython/zmq/parallel/ipclusterapp.py +++ b/IPython/zmq/parallel/ipclusterapp.py @@ -314,11 +314,12 @@ class IPClusterApp(ApplicationWithClusterDir): # and engine will be launched. el_class = import_item(config.Global.engine_launcher) self.engine_launcher = el_class( - work_dir=self.cluster_dir, config=config + work_dir=self.cluster_dir, config=config, logname=self.log.name ) cl_class = import_item(config.Global.controller_launcher) self.controller_launcher = cl_class( - work_dir=self.cluster_dir, config=config + work_dir=self.cluster_dir, config=config, + logname=self.log.name ) # Setup signals @@ -348,11 +349,11 @@ class IPClusterApp(ApplicationWithClusterDir): return d def startup_message(self, r=None): - logging.info("IPython cluster: started") + self.log.info("IPython cluster: started") return r def start_controller(self, r=None): - # logging.info("In start_controller") + # self.log.info("In start_controller") config = self.master_config d = self.controller_launcher.start( cluster_dir=config.Global.cluster_dir @@ -360,7 +361,7 @@ class IPClusterApp(ApplicationWithClusterDir): return d def start_engines(self, r=None): - # logging.info("In start_engines") + # self.log.info("In start_engines") config = self.master_config d = self.engine_launcher.start( config.Global.n, @@ -369,12 +370,12 @@ class IPClusterApp(ApplicationWithClusterDir): return d def stop_controller(self, r=None): - # logging.info("In stop_controller") + # self.log.info("In stop_controller") if self.controller_launcher.running: return self.controller_launcher.stop() def stop_engines(self, r=None): - # logging.info("In stop_engines") + # self.log.info("In stop_engines") if self.engine_launcher.running: d = self.engine_launcher.stop() # d.addErrback(self.log_err) @@ -383,16 +384,16 @@ class IPClusterApp(ApplicationWithClusterDir): return None def log_err(self, f): - logging.error(f.getTraceback()) + self.log.error(f.getTraceback()) return None def stop_launchers(self, r=None): if not self._stopping: self._stopping = True # if isinstance(r, failure.Failure): - # logging.error('Unexpected error in ipcluster:') - # logging.info(r.getTraceback()) - logging.error("IPython cluster: stopping") + # self.log.error('Unexpected error in ipcluster:') + # self.log.info(r.getTraceback()) + self.log.error("IPython cluster: stopping") # These return deferreds. We are not doing anything with them # but we are holding refs to them as a reminder that they # do return deferreds. @@ -462,7 +463,7 @@ class IPClusterApp(ApplicationWithClusterDir): try: self.loop.start() except: - logging.info("stopping...") + self.log.info("stopping...") self.remove_pid_file() def start_app_stop(self): diff --git a/IPython/zmq/parallel/ipcontrollerapp.py b/IPython/zmq/parallel/ipcontrollerapp.py index 1a65825..a2ce0c0 100755 --- a/IPython/zmq/parallel/ipcontrollerapp.py +++ b/IPython/zmq/parallel/ipcontrollerapp.py @@ -279,7 +279,7 @@ class IPControllerApp(ApplicationWithClusterDir): c.SessionFactory.exec_key = '' try: - self.factory = ControllerFactory(config=c) + self.factory = ControllerFactory(config=c, logname=self.log.name) self.start_logging() self.factory.construct() except: diff --git a/IPython/zmq/parallel/ipengineapp.py b/IPython/zmq/parallel/ipengineapp.py index a75c239..3599e03 100755 --- a/IPython/zmq/parallel/ipengineapp.py +++ b/IPython/zmq/parallel/ipengineapp.py @@ -193,7 +193,7 @@ class IPEngineApp(ApplicationWithClusterDir): # Create the underlying shell class and EngineService # shell_class = import_item(self.master_config.Global.shell_class) try: - self.engine = EngineFactory(config=config) + self.engine = EngineFactory(config=config, logname=self.log.name) except: self.log.error("Couldn't start the Engine", exc_info=True) self.exit(1) diff --git a/IPython/zmq/parallel/iploggerapp.py b/IPython/zmq/parallel/iploggerapp.py index 940ba18..e8d906d 100755 --- a/IPython/zmq/parallel/iploggerapp.py +++ b/IPython/zmq/parallel/iploggerapp.py @@ -107,7 +107,7 @@ class IPLoggerApp(ApplicationWithClusterDir): self.start_logging() try: - self.watcher = LogWatcher(config=self.master_config) + self.watcher = LogWatcher(config=self.master_config, logname=self.log.name) except: self.log.error("Couldn't start the LogWatcher", exc_info=True) self.exit(1) @@ -115,6 +115,7 @@ class IPLoggerApp(ApplicationWithClusterDir): def start_app(self): try: + self.watcher.start() self.watcher.loop.start() except KeyboardInterrupt: self.log.critical("Logging Interrupted, shutting down...\n") diff --git a/IPython/zmq/parallel/launcher.py b/IPython/zmq/parallel/launcher.py index 1b05ee9..82b4240 100644 --- a/IPython/zmq/parallel/launcher.py +++ b/IPython/zmq/parallel/launcher.py @@ -30,11 +30,12 @@ from subprocess import Popen, PIPE from zmq.eventloop import ioloop -from IPython.config.configurable import Configurable +# from IPython.config.configurable import Configurable from IPython.utils.traitlets import Str, Int, List, Unicode, Instance from IPython.utils.path import get_ipython_module_path from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError +from factory import LoggingFactory # from IPython.kernel.winhpcjob import ( # IPControllerTask, IPEngineTask, # IPControllerJob, IPEngineSetJob @@ -75,7 +76,7 @@ class UnknownStatus(LauncherError): pass -class BaseLauncher(Configurable): +class BaseLauncher(LoggingFactory): """An asbtraction for starting, stopping and signaling a process.""" # In all of the launchers, the work_dir is where child processes will be @@ -90,8 +91,8 @@ class BaseLauncher(Configurable): def _loop_default(self): return ioloop.IOLoop.instance() - def __init__(self, work_dir=u'.', config=None): - super(BaseLauncher, self).__init__(work_dir=work_dir, config=config) + def __init__(self, work_dir=u'.', config=None, **kwargs): + super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs) self.state = 'before' # can be before, running, after self.stop_callbacks = [] self.start_data = None @@ -163,7 +164,7 @@ class BaseLauncher(Configurable): a pass-through so it can be used as a callback. """ - logging.info('Process %r started: %r' % (self.args[0], data)) + self.log.info('Process %r started: %r' % (self.args[0], data)) self.start_data = data self.state = 'running' return data @@ -174,7 +175,7 @@ class BaseLauncher(Configurable): This logs the process stopping and sets the state to 'after'. Call this to trigger all the deferreds from :func:`observe_stop`.""" - logging.info('Process %r stopped: %r' % (self.args[0], data)) + self.log.info('Process %r stopped: %r' % (self.args[0], data)) self.stop_data = data self.state = 'after' for i in range(len(self.stop_callbacks)): @@ -212,9 +213,9 @@ class LocalProcessLauncher(BaseLauncher): cmd_and_args = List([]) poll_frequency = Int(100) # in ms - def __init__(self, work_dir=u'.', config=None): + def __init__(self, work_dir=u'.', config=None, **kwargs): super(LocalProcessLauncher, self).__init__( - work_dir=work_dir, config=config + work_dir=work_dir, config=config, **kwargs ) self.process = None self.start_deferred = None @@ -259,7 +260,7 @@ class LocalProcessLauncher(BaseLauncher): line = self.process.stdout.readline() # a stopped process will be readable but return empty strings if line: - logging.info(line[:-1]) + self.log.info(line[:-1]) else: self.poll() @@ -267,7 +268,7 @@ class LocalProcessLauncher(BaseLauncher): line = self.process.stderr.readline() # a stopped process will be readable but return empty strings if line: - logging.error(line[:-1]) + self.log.error(line[:-1]) else: self.poll() @@ -294,7 +295,7 @@ class LocalControllerLauncher(LocalProcessLauncher): """Start the controller by cluster_dir.""" self.controller_args.extend(['--cluster-dir', cluster_dir]) self.cluster_dir = unicode(cluster_dir) - logging.info("Starting LocalControllerLauncher: %r" % self.args) + self.log.info("Starting LocalControllerLauncher: %r" % self.args) return super(LocalControllerLauncher, self).start() @@ -327,9 +328,9 @@ class LocalEngineSetLauncher(BaseLauncher): # launcher class launcher_class = LocalEngineLauncher - def __init__(self, work_dir=u'.', config=None): + def __init__(self, work_dir=u'.', config=None, **kwargs): super(LocalEngineSetLauncher, self).__init__( - work_dir=work_dir, config=config + work_dir=work_dir, config=config, **kwargs ) self.launchers = {} self.stop_data = {} @@ -339,14 +340,14 @@ class LocalEngineSetLauncher(BaseLauncher): self.cluster_dir = unicode(cluster_dir) dlist = [] for i in range(n): - el = self.launcher_class(work_dir=self.work_dir, config=self.config) + el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name) # Copy the engine args over to each engine launcher. import copy el.engine_args = copy.deepcopy(self.engine_args) el.on_stop(self._notice_engine_stopped) d = el.start(cluster_dir) if i==0: - logging.info("Starting LocalEngineSetLauncher: %r" % el.args) + self.log.info("Starting LocalEngineSetLauncher: %r" % el.args) self.launchers[i] = el dlist.append(d) self.notify_start(dlist) @@ -431,7 +432,7 @@ class MPIExecControllerLauncher(MPIExecLauncher): """Start the controller by cluster_dir.""" self.controller_args.extend(['--cluster-dir', cluster_dir]) self.cluster_dir = unicode(cluster_dir) - logging.info("Starting MPIExecControllerLauncher: %r" % self.args) + self.log.info("Starting MPIExecControllerLauncher: %r" % self.args) return super(MPIExecControllerLauncher, self).start(1) def find_args(self): @@ -453,7 +454,7 @@ class MPIExecEngineSetLauncher(MPIExecLauncher): self.engine_args.extend(['--cluster-dir', cluster_dir]) self.cluster_dir = unicode(cluster_dir) self.n = n - logging.info('Starting MPIExecEngineSetLauncher: %r' % self.args) + self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args) return super(MPIExecEngineSetLauncher, self).start(n) def find_args(self): @@ -572,7 +573,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # else: # raise LauncherError("Job id couldn't be determined: %s" % output) # self.job_id = job_id -# logging.info('Job started with job id: %r' % job_id) +# self.log.info('Job started with job id: %r' % job_id) # return job_id # # @inlineCallbacks @@ -584,7 +585,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # '/jobfile:%s' % self.job_file, # '/scheduler:%s' % self.scheduler # ] -# logging.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) +# self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) # # Twisted will raise DeprecationWarnings if we try to pass unicode to this # output = yield getProcessOutput(str(self.job_cmd), # [str(a) for a in args], @@ -602,7 +603,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # self.job_id, # '/scheduler:%s' % self.scheduler # ] -# logging.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) +# self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),)) # try: # # Twisted will raise DeprecationWarnings if we try to pass unicode to this # output = yield getProcessOutput(str(self.job_cmd), @@ -633,7 +634,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # t.controller_args.extend(self.extra_args) # job.add_task(t) # -# logging.info("Writing job description file: %s" % self.job_file) +# self.log.info("Writing job description file: %s" % self.job_file) # job.write(self.job_file) # # @property @@ -665,7 +666,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # t.engine_args.extend(self.extra_args) # job.add_task(t) # -# logging.info("Writing job description file: %s" % self.job_file) +# self.log.info("Writing job description file: %s" % self.job_file) # job.write(self.job_file) # # @property @@ -729,14 +730,14 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # else: # raise LauncherError("Job id couldn't be determined: %s" % output) # self.job_id = job_id -# logging.info('Job started with job id: %r' % job_id) +# self.log.info('Job started with job id: %r' % job_id) # return job_id # # def write_batch_script(self, n): # """Instantiate and write the batch script to the work_dir.""" # self.context['n'] = n # script_as_string = Itpl.itplns(self.batch_template, self.context) -# logging.info('Writing instantiated batch script: %s' % self.batch_file) +# self.log.info('Writing instantiated batch script: %s' % self.batch_file) # f = open(self.batch_file, 'w') # f.write(script_as_string) # f.close() @@ -783,7 +784,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # # ${cluster_dir} # self.context['cluster_dir'] = cluster_dir # self.cluster_dir = unicode(cluster_dir) -# logging.info("Starting PBSControllerLauncher: %r" % self.args) +# self.log.info("Starting PBSControllerLauncher: %r" % self.args) # return super(PBSControllerLauncher, self).start(1) # # @@ -795,7 +796,7 @@ class SSHEngineSetLauncher(LocalEngineSetLauncher): # """Start n engines by profile or cluster_dir.""" # self.program_args.extend(['--cluster-dir', cluster_dir]) # self.cluster_dir = unicode(cluster_dir) -# logging.info('Starting PBSEngineSetLauncher: %r' % self.args) +# self.log.info('Starting PBSEngineSetLauncher: %r' % self.args) # return super(PBSEngineSetLauncher, self).start(n) @@ -819,6 +820,6 @@ class IPClusterLauncher(LocalProcessLauncher): ['-n', repr(self.ipcluster_n)] + self.ipcluster_args def start(self): - logging.info("Starting ipcluster: %r" % self.args) + self.log.info("Starting ipcluster: %r" % self.args) return super(IPClusterLauncher, self).start() diff --git a/IPython/zmq/parallel/logwatcher.py b/IPython/zmq/parallel/logwatcher.py index 6f02195..5fc506f 100644 --- a/IPython/zmq/parallel/logwatcher.py +++ b/IPython/zmq/parallel/logwatcher.py @@ -18,15 +18,16 @@ import logging import zmq from zmq.eventloop import ioloop, zmqstream -from IPython.config.configurable import Configurable from IPython.utils.traitlets import Int, Str, Instance, List +from factory import LoggingFactory + #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- -class LogWatcher(Configurable): +class LogWatcher(LoggingFactory): """A simple class that receives messages on a SUB socket, as published by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. @@ -43,21 +44,25 @@ class LogWatcher(Configurable): def _loop_default(self): return ioloop.IOLoop.instance() - def __init__(self, config=None): - super(LogWatcher, self).__init__(config=config) + def __init__(self, **kwargs): + super(LogWatcher, self).__init__(**kwargs) s = self.context.socket(zmq.SUB) s.bind(self.url) self.stream = zmqstream.ZMQStream(s, self.loop) self.subscribe() self.on_trait_change(self.subscribe, 'topics') - + + def start(self): self.stream.on_recv(self.log_message) + def stop(self): + self.stream.stop_on_recv() + def subscribe(self): """Update our SUB socket's subscriptions.""" self.stream.setsockopt(zmq.UNSUBSCRIBE, '') for topic in self.topics: - logging.debug("Subscribing to: %r"%topic) + self.log.debug("Subscribing to: %r"%topic) self.stream.setsockopt(zmq.SUBSCRIBE, topic) def _extract_level(self, topic_str): @@ -79,7 +84,7 @@ class LogWatcher(Configurable): def log_message(self, raw): """receive and parse a message, then log it.""" if len(raw) != 2 or '.' not in raw[0]: - logging.error("Invalid log message: %s"%raw) + self.log.error("Invalid log message: %s"%raw) return else: topic, msg = raw diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 3275b6f..7dbec99 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -10,8 +10,9 @@ Python Scheduler exists. #---------------------------------------------------------------------- from __future__ import print_function -from random import randint,random +import sys import logging +from random import randint,random from types import FunctionType try: @@ -24,7 +25,7 @@ from zmq.eventloop import ioloop, zmqstream # local imports from IPython.external.decorator import decorator -from IPython.config.configurable import Configurable +# from IPython.config.configurable import Configurable from IPython.utils.traitlets import Instance, Dict, List, Set import error @@ -32,12 +33,13 @@ from client import Client from dependency import Dependency import streamsession as ss from entry_point import connect_logger, local_logger +from factory import LoggingFactory @decorator def logged(f,self,*args,**kwargs): # print ("#--------------------") - logging.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) + self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) # print ("#--") return f(self,*args, **kwargs) @@ -108,7 +110,7 @@ def leastload(loads): # store empty default dependency: MET = Dependency([]) -class TaskScheduler(Configurable): +class TaskScheduler(LoggingFactory): """Python TaskScheduler object. This is the simplest object that supports msg_id based @@ -153,7 +155,7 @@ class TaskScheduler(Configurable): unregistration_notification = self._unregister_engine ) self.notifier_stream.on_recv(self.dispatch_notification) - logging.info("Scheduler started...%r"%self) + self.log.info("Scheduler started...%r"%self) def resume_receiving(self): """Resume accepting jobs.""" @@ -180,7 +182,7 @@ class TaskScheduler(Configurable): try: handler(str(msg['content']['queue'])) except KeyError: - logging.error("task::Invalid notification msg: %s"%msg) + self.log.error("task::Invalid notification msg: %s"%msg) @logged def _register_engine(self, uid): @@ -236,7 +238,7 @@ class TaskScheduler(Configurable): try: idents, msg = self.session.feed_identities(raw_msg, copy=False) except Exception as e: - logging.error("task::Invaid msg: %s"%msg) + self.log.error("task::Invaid msg: %s"%msg) return # send to monitor @@ -277,7 +279,7 @@ class TaskScheduler(Configurable): def fail_unreachable(self, msg_id): """a message has become unreachable""" if msg_id not in self.depending: - logging.error("msg %r already failed!"%msg_id) + self.log.error("msg %r already failed!"%msg_id) return raw_msg, after, follow = self.depending.pop(msg_id) for mid in follow.union(after): @@ -369,7 +371,7 @@ class TaskScheduler(Configurable): try: idents,msg = self.session.feed_identities(raw_msg, copy=False) except Exception as e: - logging.error("task::Invaid result: %s"%msg) + self.log.error("task::Invaid result: %s"%msg) return msg = self.session.unpack_message(msg, content=False, copy=False) header = msg['header'] @@ -470,7 +472,7 @@ class TaskScheduler(Configurable): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream @@ -490,13 +492,13 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, logle scheme = globals().get(scheme, None) # setup logging if log_addr: - connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel) + connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel) else: - local_logger(loglevel) + local_logger(logname, loglevel) scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons,notifier_stream=nots, - scheme=scheme,io_loop=loop) + scheme=scheme,io_loop=loop, logname=logname) try: loop.start() diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index f30a4ca..a34d0ca 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -110,7 +110,7 @@ class Kernel(SessionFactory): s = _Passer() content = dict(silent=True, user_variable=[],user_expressions=[]) for line in self.exec_lines: - logging.debug("executing initialization: %s"%line) + self.log.debug("executing initialization: %s"%line) content.update({'code':line}) msg = self.session.msg('execute_request', content) self.execute_request(s, [], msg) @@ -139,8 +139,8 @@ class Kernel(SessionFactory): # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." # msg = self.reply_socket.recv_json() - logging.info("Aborting:") - logging.info(str(msg)) + self.log.info("Aborting:") + self.log.info(str(msg)) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) @@ -148,7 +148,7 @@ class Kernel(SessionFactory): # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents)[0] - logging.debug(str(reply_msg)) + self.log.debug(str(reply_msg)) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.05) @@ -166,7 +166,7 @@ class Kernel(SessionFactory): content = dict(status='ok') reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, ident=ident)[0] - logging.debug(str(reply_msg)) + self.log.debug(str(reply_msg)) def shutdown_request(self, stream, ident, parent): """kill ourself. This should really be handled in an external process""" @@ -191,7 +191,7 @@ class Kernel(SessionFactory): try: msg = self.session.unpack_message(msg, content=True, copy=False) except: - logging.error("Invalid Message", exc_info=True) + self.log.error("Invalid Message", exc_info=True) return header = msg['header'] @@ -199,7 +199,7 @@ class Kernel(SessionFactory): handler = self.control_handlers.get(msg['msg_type'], None) if handler is None: - logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) + self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) else: handler(self.control_stream, idents, msg) @@ -240,11 +240,11 @@ class Kernel(SessionFactory): self._initial_exec_lines() def execute_request(self, stream, ident, parent): - logging.debug('execute request %s'%parent) + self.log.debug('execute request %s'%parent) try: code = parent[u'content'][u'code'] except: - logging.error("Got bad msg: %s"%parent, exc_info=True) + self.log.error("Got bad msg: %s"%parent, exc_info=True) return self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, ident='%s.pyin'%self.prefix) @@ -268,7 +268,7 @@ class Kernel(SessionFactory): reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident, subheader = dict(started=started)) - logging.debug(str(reply_msg)) + self.log.debug(str(reply_msg)) if reply_msg['content']['status'] == u'error': self.abort_queues() @@ -290,7 +290,7 @@ class Kernel(SessionFactory): msg_id = parent['header']['msg_id'] bound = content.get('bound', False) except: - logging.error("Got bad msg: %s"%parent, exc_info=True) + self.log.error("Got bad msg: %s"%parent, exc_info=True) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) @@ -364,7 +364,7 @@ class Kernel(SessionFactory): try: msg = self.session.unpack_message(msg, content=True, copy=False) except: - logging.error("Invalid Message", exc_info=True) + self.log.error("Invalid Message", exc_info=True) return @@ -379,7 +379,7 @@ class Kernel(SessionFactory): return handler = self.shell_handlers.get(msg['msg_type'], None) if handler is None: - logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) + self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) else: handler(stream, idents, msg) diff --git a/IPython/zmq/tunnel.py b/IPython/zmq/tunnel.py index a943d77..868b0a5 100644 --- a/IPython/zmq/tunnel.py +++ b/IPython/zmq/tunnel.py @@ -158,7 +158,7 @@ def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, pas ssh="ssh " if keyfile: ssh += "-i " + keyfile - cmd = ssh + " -f -L 127.0.0.1:%i:127.0.0.1:%i %s sleep %i"%(lport, rport, server, timeout) + cmd = ssh + " -f -L 127.0.0.1:%i:%s:%i %s sleep %i"%(lport, remoteip, rport, server, timeout) tunnel = pexpect.spawn(cmd) failed = False while True: