From 600411fc4546d469bae7c722f144b968d8a08959 2011-04-08 00:38:15 From: MinRK Date: 2011-04-08 00:38:15 Subject: [PATCH] improved logging + Hub,Engine,Scheduler are Configurable --- diff --git a/IPython/zmq/log.py b/IPython/zmq/log.py index 797cd28..6d37219 100644 --- a/IPython/zmq/log.py +++ b/IPython/zmq/log.py @@ -21,7 +21,3 @@ class EnginePUBHandler(PUBHandler): else: return "engine" - -logger = logging.getLogger('ipzmq') -logger.setLevel(logging.DEBUG) - diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 33644e8..f881deb 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -90,11 +90,6 @@ def defaultblock(f, self, *args, **kwargs): # Classes #-------------------------------------------------------------------------- -class AbortedTask(object): - """A basic wrapper object describing an aborted task.""" - def __init__(self, msg_id): - self.msg_id = msg_id - class ResultDict(dict): """A subclass of dict that raises errors if it has them.""" def __getitem__(self, key): @@ -332,10 +327,10 @@ class Client(object): msg = ss.Message(msg) content = msg.content if content.status == 'ok': - if content.queue: + if content.mux: self._mux_socket = self.context.socket(zmq.PAIR) self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session) - connect_socket(self._mux_socket, content.queue) + connect_socket(self._mux_socket, content.mux) if content.task: self._task_socket = self.context.socket(zmq.PAIR) self._task_socket.setsockopt(zmq.IDENTITY, self.session.session) diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index a3c404d..3d3fed1 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -17,6 +17,7 @@ from __future__ import print_function import os import time +import logging from multiprocessing import Process import zmq @@ -29,7 +30,8 @@ from IPython.zmq.entry_point import bind_port from hub import Hub from entry_point import (make_base_argument_parser, select_random_ports, split_ports, - connect_logger, parse_url, signal_children, generate_exec_key) + connect_logger, parse_url, signal_children, generate_exec_key, + local_logger) import streamsession as session @@ -118,8 +120,6 @@ def main(argv=None): ctx = zmq.Context() loop = ioloop.IOLoop.instance() - # setup logging - connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) # Registrar socket reg = ZMQStream(ctx.socket(zmq.XREP), loop) @@ -207,7 +207,9 @@ def main(argv=None): q.start() children.append(q.launcher) else: - sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) + log_addr = iface%args.logport if args.logport else None + sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport, + log_addr, args.loglevel, args.scheduler) print (sargs) q = Process(target=launch_scheduler, args=sargs) q.daemon=True @@ -224,7 +226,7 @@ def main(argv=None): # build connection dicts engine_addrs = { 'control' : iface%control[1], - 'queue': iface%mux[1], + 'mux': iface%mux[1], 'heartbeat': (iface%hb[0], iface%hb[1]), 'task' : iface%task[1], 'iopub' : iface%iopub[1], @@ -234,15 +236,24 @@ def main(argv=None): client_addrs = { 'control' : iface%control[0], 'query': iface%cport, - 'queue': iface%mux[0], + 'mux': iface%mux[0], 'task' : iface%task[0], 'iopub' : iface%iopub[0], 'notification': iface%nport } + # setup logging + if args.logport: + connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) + else: + local_logger(args.loglevel) + # register relay of signals to the children signal_children(children) - hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) + hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon, + registrar=reg, clientele=c, notifier=n, db=db, + engine_addrs=engine_addrs, client_addrs=client_addrs) + dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) dc.start() loop.start() diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index 501ef19..aa56e00 100755 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -8,49 +8,48 @@ import sys import time import traceback import uuid +import logging from pprint import pprint import zmq from zmq.eventloop import ioloop, zmqstream -from IPython.utils.traitlets import HasTraits -from IPython.utils.localinterfaces import LOCALHOST +# internal +from IPython.config.configurable import Configurable +from IPython.utils.traitlets import Instance, Str, Dict +# from IPython.utils.localinterfaces import LOCALHOST from streamsession import Message, StreamSession -from client import Client from streamkernel import Kernel, make_kernel import heartmonitor -from entry_point import make_base_argument_parser, connect_logger, parse_url +from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url, + local_logger) # import taskthread -# from log import logger - +logger = logging.getLogger() def printer(*msg): - pprint(msg, stream=sys.__stdout__) + # print (logger.handlers, file=sys.__stdout__) + logger.info(str(msg)) -class Engine(object): +class Engine(Configurable): """IPython engine""" - id=None - context=None - loop=None - session=None - ident=None - registrar=None - heart=None kernel=None - user_ns=None + id=None - def __init__(self, context, loop, session, registrar, client=None, ident=None, - heart_id=None, user_ns=None): - self.context = context - self.loop = loop - self.session = session - self.registrar = registrar - self.client = client - self.ident = ident if ident else str(uuid.uuid4()) + # configurables: + context=Instance(zmq.Context) + loop=Instance(ioloop.IOLoop) + session=Instance(StreamSession) + ident=Str() + registrar=Instance(zmqstream.ZMQStream) + user_ns=Dict() + + def __init__(self, **kwargs): + super(Engine, self).__init__(**kwargs) + if not self.ident: + self.ident = str(uuid.uuid4()) self.registrar.on_send(printer) - self.user_ns = user_ns def register(self): @@ -64,9 +63,10 @@ class Engine(object): idents,msg = self.session.feed_identities(msg) msg = Message(self.session.unpack_message(msg)) if msg.content.status == 'ok': - self.session.username = str(msg.content.id) - queue_addr = msg.content.queue - shell_addrs = [str(queue_addr)] + self.id = int(msg.content.id) + self.session.username = 'engine-%i'%self.id + queue_addr = msg.content.mux + shell_addrs = [ str(queue_addr) ] control_addr = str(msg.content.control) task_addr = msg.content.task iopub_addr = msg.content.iopub @@ -75,7 +75,7 @@ class Engine(object): hb_addrs = msg.content.heartbeat # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() - k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, + k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, client_addr=None, loop=self.loop, context=self.context, key=self.session.key)[-1] self.kernel = k @@ -84,12 +84,12 @@ class Engine(object): self.kernel.user_ns = self.user_ns else: - # logger.error("Registration Failed: %s"%msg) + logger.error("Registration Failed: %s"%msg) raise Exception("Registration Failed: %s"%msg) - # logger.info("engine::completed registration with id %s"%self.session.username) + logger.info("completed registration with id %i"%self.id) - print (msg,file=sys.__stdout__) + # logger.info(str(msg)) def unregister(self): self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) @@ -97,7 +97,7 @@ class Engine(object): sys.exit(0) def start(self): - print ("registering",file=sys.__stdout__) + logger.info("registering") self.register() @@ -118,7 +118,6 @@ def main(argv=None, user_ns=None): ctx = zmq.Context() # setup logging - connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) reg_conn = iface % args.regport print (reg_conn, file=sys.__stdout__) @@ -127,10 +126,16 @@ def main(argv=None, user_ns=None): reg = ctx.socket(zmq.PAIR) reg.connect(reg_conn) reg = zmqstream.ZMQStream(reg, loop) - client = None - e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns) - dc = ioloop.DelayedCallback(e.start, 100, loop) + e = Engine(context=ctx, loop=loop, session=session, registrar=reg, + ident=args.ident or '', user_ns=user_ns) + if args.logport: + print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__) + connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel) + else: + local_logger(args.loglevel) + + dc = ioloop.DelayedCallback(e.start, 0, loop) dc.start() loop.start() diff --git a/IPython/zmq/parallel/entry_point.py b/IPython/zmq/parallel/entry_point.py index 1ae1544..a2f1c5b 100644 --- a/IPython/zmq/parallel/entry_point.py +++ b/IPython/zmq/parallel/entry_point.py @@ -22,7 +22,7 @@ from zmq.log import handlers # Local imports. from IPython.core.ultratb import FormattedTB from IPython.external.argparse import ArgumentParser -from IPython.zmq.log import logger +from IPython.zmq.log import EnginePUBHandler def split_ports(s, n): """Parser helper for multiport strings""" @@ -82,6 +82,7 @@ def make_base_argument_parser(): """ Creates an ArgumentParser for the generic arguments supported by all ipcluster entry points. """ + parser = ArgumentParser() parser.add_argument('--ip', type=str, default='127.0.0.1', help='set the controller\'s IP address [default: local]') @@ -89,10 +90,10 @@ def make_base_argument_parser(): help='set the transport to use [default: tcp]') parser.add_argument('--regport', type=int, metavar='PORT', default=10101, help='set the XREP port for registration [default: 10101]') - parser.add_argument('--logport', type=int, metavar='PORT', default=20202, - help='set the PUB port for logging [default: 10201]') - parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG, - help='set the log level [default: DEBUG]') + parser.add_argument('--logport', type=int, metavar='PORT', default=0, + help='set the PUB port for remote logging [default: log to stdout]') + parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO, + help='set the log level [default: INFO]') parser.add_argument('--ident', type=str, help='set the ZMQ identity [default: random]') parser.add_argument('--packer', type=str, default='json', @@ -105,17 +106,42 @@ def make_base_argument_parser(): return parser - -def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): +def integer_loglevel(loglevel): try: loglevel = int(loglevel) except ValueError: if isinstance(loglevel, str): loglevel = getattr(logging, loglevel) + return loglevel + +def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): + loglevel = integer_loglevel(loglevel) lsock = context.socket(zmq.PUB) lsock.connect(iface) handler = handlers.PUBHandler(lsock) handler.setLevel(loglevel) handler.root_topic = root + logger = logging.getLogger() + logger.addHandler(handler) + logger.setLevel(loglevel) + +def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG): + logger = logging.getLogger() + loglevel = integer_loglevel(loglevel) + lsock = context.socket(zmq.PUB) + lsock.connect(iface) + handler = EnginePUBHandler(engine, lsock) + handler.setLevel(loglevel) + logger.addHandler(handler) + logger.setLevel(loglevel) + +def local_logger(loglevel=logging.DEBUG): + loglevel = integer_loglevel(loglevel) + logger = logging.getLogger() + if logger.handlers: + # if there are any handlers, skip the hookup + return + handler = logging.StreamHandler() + handler.setLevel(loglevel) logger.addHandler(handler) - \ No newline at end of file + logger.setLevel(loglevel) diff --git a/IPython/zmq/parallel/heartmonitor.py b/IPython/zmq/parallel/heartmonitor.py index 34dcf6f..33df58f 100644 --- a/IPython/zmq/parallel/heartmonitor.py +++ b/IPython/zmq/parallel/heartmonitor.py @@ -7,13 +7,13 @@ and hearts are tracked based on their XREQ identities. from __future__ import print_function import time import uuid +import logging import zmq from zmq.devices import ProcessDevice,ThreadDevice from zmq.eventloop import ioloop, zmqstream -#internal -from IPython.zmq.log import logger +logger = logging.getLogger() class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -53,6 +53,7 @@ class HeartMonitor(object): hearts=None on_probation=None last_ping=None + # debug=False def __init__(self, loop, pingstream, pongstream, period=1000): self.loop = loop @@ -84,19 +85,6 @@ class HeartMonitor(object): """add a new handler for heart failure""" logger.debug("heartbeat::new heart failure handler: %s"%handler) self._failure_handlers.add(handler) - - # def _flush(self): - # """override IOLoop triggers""" - # while True: - # try: - # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK) - # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled") - # except zmq.ZMQError: - # return - # else: - # self.handle_pong(msg) - # # print '.' - # def beat(self): self.pongstream.flush() @@ -105,7 +93,7 @@ class HeartMonitor(object): toc = time.time() self.lifetime += toc-self.tic self.tic = toc - logger.debug("heartbeat::%s"%self.lifetime) + # logger.debug("heartbeat::%s"%self.lifetime) goodhearts = self.hearts.intersection(self.responses) missed_beats = self.hearts.difference(goodhearts) heartfailures = self.on_probation.intersection(missed_beats) @@ -144,7 +132,7 @@ class HeartMonitor(object): "a heart just beat" if msg[1] == str(self.lifetime): delta = time.time()-self.tic - logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) + # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) self.responses.add(msg[0]) elif msg[1] == str(self.last_ping): delta = time.time()-self.tic + (self.lifetime-self.last_ping) diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 681af4e..ca42577 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -18,14 +18,19 @@ from __future__ import print_function import sys from datetime import datetime import time +import logging import zmq -from zmq.eventloop import ioloop +from zmq.eventloop import ioloop, zmqstream # internal: -from IPython.zmq.log import logger # a Logger object +from IPython.config.configurable import Configurable +from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict +# from IPython.zmq.log import logger # a Logger object from streamsession import Message, wrap_exception, ISO8601 +from heartmonitor import HeartMonitor +from util import validate_url_container try: from pymongo.binary import Binary @@ -38,6 +43,8 @@ else: # Code #----------------------------------------------------------------------------- +logger = logging.getLogger() + def _passer(*args, **kwargs): return @@ -46,7 +53,7 @@ def _printer(*args, **kwargs): print (kwargs) def init_record(msg): - """return an empty TaskRecord dict, with all keys initialized with None.""" + """Initialize a TaskRecord based on a request.""" header = msg['header'] return { 'msg_id' : header['msg_id'], @@ -71,7 +78,7 @@ def init_record(msg): } -class EngineConnector(object): +class EngineConnector(HasTraits): """A simple object for accessing the various zmq connections of an object. Attributes are: id (int): engine ID @@ -80,22 +87,18 @@ class EngineConnector(object): registration (str): identity of registration XREQ socket heartbeat (str): identity of heartbeat XREQ socket """ - id=0 - queue=None - control=None - registration=None - heartbeat=None - pending=None - - def __init__(self, id, queue, registration, control, heartbeat=None): - logger.info("engine::Engine Connected: %i"%id) - self.id = id - self.queue = queue - self.registration = registration - self.control = control - self.heartbeat = heartbeat - -class Hub(object): + id=Int(0) + queue=Str() + control=Str() + registration=Str() + heartbeat=Str() + pending=Instance(set) + + def __init__(self, **kwargs): + super(EngineConnector, self).__init__(**kwargs) + logger.info("engine::Engine Connected: %i"%self.id) + +class Hub(Configurable): """The IPython Controller Hub with 0MQ connections Parameters @@ -123,26 +126,25 @@ class Hub(object): clients=None hearts=None pending=None - results=None tasks=None completed=None - mia=None + # mia=None incoming_registrations=None registration_timeout=None - #objects from constructor: - loop=None - registrar=None - clientelle=None - queue=None - heartbeat=None - notifier=None - db=None - client_addr=None - engine_addrs=None + # objects from constructor: + loop=Instance(ioloop.IOLoop) + registrar=Instance(zmqstream.ZMQStream) + clientele=Instance(zmqstream.ZMQStream) + monitor=Instance(zmqstream.ZMQStream) + heartmonitor=Instance(HeartMonitor) + notifier=Instance(zmqstream.ZMQStream) + db=Instance(object) + client_addrs=Dict() + engine_addrs=Dict() - def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): + def __init__(self, **kwargs): """ # universal: loop: IOLoop for creating future connections @@ -158,6 +160,8 @@ class Hub(object): engine_addrs: zmq address/protocol dict for engine connections client_addrs: zmq address/protocol dict for client connections """ + + super(Hub, self).__init__(**kwargs) self.ids = set() self.keytable={} self.incoming_registrations={} @@ -166,35 +170,44 @@ class Hub(object): self.clients = {} self.hearts = {} # self.mia = set() - + self.registration_timeout = max(5000, 2*self.heartmonitor.period) + # this is the stuff that will move to DB: + self.pending = set() # pending messages, keyed by msg_id + self.queues = {} # pending msg_ids keyed by engine_id + self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id + self.completed = {} # completed msg_ids keyed by engine_id + self.all_completed = set() + self._idcounter = 0 # self.sockets = {} - self.loop = loop - self.session = session - self.registrar = registrar - self.clientele = clientele - self.queue = queue - self.heartbeat = heartbeat - self.notifier = notifier - self.db = db + # self.loop = loop + # self.session = session + # self.registrar = registrar + # self.clientele = clientele + # self.queue = queue + # self.heartmonitor = heartbeat + # self.notifier = notifier + # self.db = db # validate connection dicts: - self.client_addrs = client_addrs - assert isinstance(client_addrs['queue'], str) - assert isinstance(client_addrs['control'], str) + # self.client_addrs = client_addrs + validate_url_container(self.client_addrs) + + # assert isinstance(self.client_addrs['queue'], str) + # assert isinstance(self.client_addrs['control'], str) # self.hb_addrs = hb_addrs - self.engine_addrs = engine_addrs - assert isinstance(engine_addrs['queue'], str) - assert isinstance(client_addrs['control'], str) - assert len(engine_addrs['heartbeat']) == 2 + validate_url_container(self.engine_addrs) + # self.engine_addrs = engine_addrs + # assert isinstance(self.engine_addrs['queue'], str) + # assert isinstance(self.engine_addrs['control'], str) + # assert len(engine_addrs['heartbeat']) == 2 # register our callbacks self.registrar.on_recv(self.dispatch_register_request) self.clientele.on_recv(self.dispatch_client_msg) - self.queue.on_recv(self.dispatch_monitor_traffic) + self.monitor.on_recv(self.dispatch_monitor_traffic) - if heartbeat is not None: - heartbeat.add_heart_failure_handler(self.handle_heart_failure) - heartbeat.add_new_heart_handler(self.handle_new_heart) + self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) + self.heartmonitor.add_new_heart_handler(self.handle_new_heart) self.monitor_handlers = { 'in' : self.save_queue_request, 'out': self.save_queue_result, @@ -218,25 +231,21 @@ class Hub(object): 'unregistration_request' : self.unregister_engine, 'connection_request': self.connection_request, } - self.registration_timeout = max(5000, 2*self.heartbeat.period) - # this is the stuff that will move to DB: - # self.results = {} # completed results - self.pending = set() # pending messages, keyed by msg_id - self.queues = {} # pending msg_ids keyed by engine_id - self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id - self.completed = {} # completed msg_ids keyed by engine_id - self.all_completed = set() logger.info("controller::created controller") - def _new_id(self): + @property + def _next_id(self): """gemerate a new ID""" - newid = 0 - incoming = [id[0] for id in self.incoming_registrations.itervalues()] - # print newid, self.ids, self.incoming_registrations - while newid in self.ids or newid in incoming: - newid += 1 + newid = self._idcounter + self._idcounter += 1 return newid + # newid = 0 + # incoming = [id[0] for id in self.incoming_registrations.itervalues()] + # # print newid, self.ids, self.incoming_registrations + # while newid in self.ids or newid in incoming: + # newid += 1 + # return newid #----------------------------------------------------------------------------- # message validation @@ -580,6 +589,9 @@ class Hub(object): return parent = msg['parent_header'] + if not parent: + logger.error("iopub::invalid IOPub message: %s"%msg) + return msg_id = parent['msg_id'] msg_type = msg['msg_type'] content = msg['content'] @@ -631,7 +643,7 @@ class Hub(object): return heart = content.get('heartbeat', None) """register a new engine, and create the socket(s) necessary""" - eid = self._new_id() + eid = self._next_id # print (eid, queue, reg, heart) logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) @@ -644,10 +656,12 @@ class Hub(object): raise KeyError("queue_id %r in use"%queue) except: content = wrap_exception() + logger.error("queue_id %r in use"%queue, exc_info=True) elif heart in self.hearts: # need to check unique hearts? try: raise KeyError("heart_id %r in use"%heart) except: + logger.error("heart_id %r in use"%heart, exc_info=True) content = wrap_exception() else: for h, pack in self.incoming_registrations.iteritems(): @@ -655,12 +669,14 @@ class Hub(object): try: raise KeyError("heart_id %r in use"%heart) except: + logger.error("heart_id %r in use"%heart, exc_info=True) content = wrap_exception() break elif queue == pack[1]: try: raise KeyError("queue_id %r in use"%queue) except: + logger.error("queue_id %r in use"%queue, exc_info=True) content = wrap_exception() break @@ -669,15 +685,15 @@ class Hub(object): ident=reg) if content['status'] == 'ok': - if heart in self.heartbeat.hearts: + if heart in self.heartmonitor.hearts: # already beating - self.incoming_registrations[heart] = (eid,queue,reg,None) + self.incoming_registrations[heart] = (eid,queue,reg[0],None) self.finish_registration(heart) else: purge = lambda : self._purge_stalled_registration(heart) dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) dc.start() - self.incoming_registrations[heart] = (eid,queue,reg,dc) + self.incoming_registrations[heart] = (eid,queue,reg[0],dc) else: logger.error("registration::registration %i failed: %s"%(eid, content['evalue'])) return eid @@ -718,7 +734,8 @@ class Hub(object): control = queue self.ids.add(eid) self.keytable[eid] = queue - self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) + self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, + control=control, heartbeat=heart) self.by_ident[queue] = eid self.queues[eid] = list() self.tasks[eid] = list() diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 9cfaa92..7dc920e 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -11,6 +11,8 @@ Python Scheduler exists. from __future__ import print_function from random import randint,random +import logging +from types import FunctionType try: import numpy @@ -21,17 +23,22 @@ import zmq from zmq.eventloop import ioloop, zmqstream # local imports -from IPython.zmq.log import logger # a Logger object +from IPython.external.decorator import decorator +from IPython.config.configurable import Configurable +from IPython.utils.traitlets import Instance + from client import Client from dependency import Dependency import streamsession as ss +from entry_point import connect_logger, local_logger -from IPython.external.decorator import decorator + +logger = logging.getLogger() @decorator def logged(f,self,*args,**kwargs): # print ("#--------------------") - # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) + logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) # print ("#--") return f(self,*args, **kwargs) @@ -99,7 +106,7 @@ def leastload(loads): #--------------------------------------------------------------------- # Classes #--------------------------------------------------------------------- -class TaskScheduler(object): +class TaskScheduler(Configurable): """Python TaskScheduler object. This is the simplest object that supports msg_id based @@ -108,10 +115,15 @@ class TaskScheduler(object): """ - scheme = leastload # function for determining the destination - client_stream = None # client-facing stream - engine_stream = None # engine-facing stream - mon_stream = None # controller-facing stream + # configurables: + scheme = Instance(FunctionType, default=leastload) # function for determining the destination + client_stream = Instance(zmqstream.ZMQStream) # client-facing stream + engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream + notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream + mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream + io_loop = Instance(ioloop.IOLoop) + + # internals: dependencies = None # dict by msg_id of [ msg_ids that depend on key ] depending = None # dict by msg_id of (msg_id, raw_msg, after, follow) pending = None # dict by engine_uuid of submitted tasks @@ -123,23 +135,10 @@ class TaskScheduler(object): blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency - def __init__(self, client_stream, engine_stream, mon_stream, - notifier_stream, scheme=None, io_loop=None): - if io_loop is None: - io_loop = ioloop.IOLoop.instance() - self.io_loop = io_loop - self.client_stream = client_stream - self.engine_stream = engine_stream - self.mon_stream = mon_stream - self.notifier_stream = notifier_stream - - if scheme is not None: - self.scheme = scheme - else: - self.scheme = TaskScheduler.scheme + def __init__(self, **kwargs): + super(TaskScheduler, self).__init__(**kwargs) self.session = ss.StreamSession(username="TaskScheduler") - self.dependencies = {} self.depending = {} self.completed = {} @@ -150,12 +149,13 @@ class TaskScheduler(object): self.targets = [] self.loads = [] - engine_stream.on_recv(self.dispatch_result, copy=False) + self.engine_stream.on_recv(self.dispatch_result, copy=False) self._notification_handlers = dict( registration_notification = self._register_engine, unregistration_notification = self._unregister_engine ) self.notifier_stream.on_recv(self.dispatch_notification) + logger.info("Scheduler started...%r"%self) def resume_receiving(self): """Resume accepting jobs.""" @@ -183,6 +183,7 @@ class TaskScheduler(object): handler(str(msg['content']['queue'])) except KeyError: logger.error("task::Invalid notification msg: %s"%msg) + @logged def _register_engine(self, uid): """New engine with ident `uid` became available.""" @@ -306,7 +307,8 @@ class TaskScheduler(object): self.add_job(idx) self.pending[target][msg_id] = (msg, follow) content = dict(msg_id=msg_id, engine_id=target) - self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask') + self.session.send(self.mon_stream, 'task_destination', content=content, + ident=['tracktask',self.session.session]) #----------------------------------------------------------------------- # Result Handling @@ -395,7 +397,7 @@ class TaskScheduler(object): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'): +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'): from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream @@ -414,7 +416,15 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'): nots.setsockopt(zmq.SUBSCRIBE, '') nots.connect(not_addr) - scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop) + # setup logging + if log_addr: + connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel) + else: + local_logger(loglevel) + + scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, + mon_stream=mons,notifier_stream=nots, + scheme=scheme,io_loop=loop) loop.start() diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index 749088c..4fcad1c 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -15,6 +15,7 @@ import os import sys import time import traceback +import logging from datetime import datetime from signal import SIGTERM, SIGKILL from pprint import pprint @@ -25,9 +26,8 @@ from zmq.eventloop import ioloop, zmqstream # Local imports. from IPython.core import ultratb -from IPython.utils.traitlets import HasTraits, Instance, List +from IPython.utils.traitlets import HasTraits, Instance, List, Int from IPython.zmq.completer import KernelCompleter -from IPython.zmq.log import logger # a Logger object from IPython.zmq.iostream import OutStream from IPython.zmq.displayhook import DisplayHook @@ -38,6 +38,8 @@ from dependency import UnmetDependency import heartmonitor from client import Client +logger = logging.getLogger() + def printer(*args): pprint(args, stream=sys.__stdout__) @@ -51,8 +53,9 @@ class Kernel(HasTraits): # Kernel interface #--------------------------------------------------------------------------- + id = Int(-1) session = Instance(StreamSession) - shell_streams = Instance(list) + shell_streams = List() control_stream = Instance(zmqstream.ZMQStream) task_stream = Instance(zmqstream.ZMQStream) iopub_stream = Instance(zmqstream.ZMQStream) @@ -62,7 +65,8 @@ class Kernel(HasTraits): def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) - self.prefix = 'engine.%s'%self.identity + self.prefix = 'engine.%s'%self.id + logger.root_topic = self.prefix self.user_ns = {} self.history = [] self.compiler = CommandCompiler() @@ -108,8 +112,8 @@ class Kernel(HasTraits): # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." # msg = self.reply_socket.recv_json() - print ("Aborting:", file=sys.__stdout__) - print (Message(msg), file=sys.__stdout__) + logger.info("Aborting:") + logger.info(str(msg)) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) @@ -117,7 +121,7 @@ class Kernel(HasTraits): # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents)[0] - print(Message(reply_msg), file=sys.__stdout__) + logger.debug(str(reply_msg)) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.05) @@ -135,7 +139,7 @@ class Kernel(HasTraits): content = dict(status='ok') reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, ident=ident)[0] - print(Message(reply_msg), file=sys.__stdout__) + logger(Message(reply_msg), file=sys.__stdout__) def shutdown_request(self, stream, ident, parent): """kill ourself. This should really be handled in an external process""" @@ -168,7 +172,7 @@ class Kernel(HasTraits): handler = self.control_handlers.get(msg['msg_type'], None) if handler is None: - print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) + logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) else: handler(self.control_stream, idents, msg) @@ -211,13 +215,12 @@ class Kernel(HasTraits): try: code = parent[u'content'][u'code'] except: - print("Got bad msg: ", file=sys.__stderr__) - print(Message(parent), file=sys.__stderr__) + logger.error("Got bad msg: %s"%parent, exc_info=True) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, - ident=self.identity+'.pyin') + ident='%s.pyin'%self.prefix) started = datetime.now().strftime(ISO8601) try: comp_code = self.compiler(code, '') @@ -231,7 +234,7 @@ class Kernel(HasTraits): exc_content = self._wrap_exception('execute') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident=self.identity+'.pyerr') + ident='%s.pyerr'%self.prefix) reply_content = exc_content else: reply_content = {'status' : 'ok'} @@ -240,7 +243,7 @@ class Kernel(HasTraits): # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident, subheader = dict(started=started)) - print(Message(reply_msg), file=sys.__stdout__) + logger.debug(str(reply_msg)) if reply_msg['content']['status'] == u'error': self.abort_queues() @@ -262,8 +265,7 @@ class Kernel(HasTraits): msg_id = parent['header']['msg_id'] bound = content.get('bound', False) except: - print("Got bad msg: ", file=sys.__stderr__) - print(Message(parent), file=sys.__stderr__) + logger.error("Got bad msg: %s"%parent, exc_info=True) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) @@ -316,7 +318,7 @@ class Kernel(HasTraits): exc_content = self._wrap_exception('apply') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, - ident=self.identity+'.pyerr') + ident='%s.pyerr'%self.prefix) reply_content = exc_content result_buf = [] @@ -354,7 +356,7 @@ class Kernel(HasTraits): return handler = self.shell_handlers.get(msg['msg_type'], None) if handler is None: - print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) + logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) else: handler(stream, idents, msg) @@ -398,7 +400,7 @@ class Kernel(HasTraits): # # don't busywait # time.sleep(1e-3) -def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, +def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs, client_addr=None, loop=None, context=None, key=None, out_stream_factory=OutStream, display_hook_factory=DisplayHook): @@ -410,7 +412,7 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, c = context session = StreamSession(key=key) # print (session.key) - print (control_addr, shell_addrs, iopub_addr, hb_addrs) + # print (control_addr, shell_addrs, iopub_addr, hb_addrs) # create Control Stream control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) @@ -433,12 +435,12 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, # Redirect input streams and set a display hook. if out_stream_factory: sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') - sys.stdout.topic = identity+'.stdout' + sys.stdout.topic = 'engine.%i.stdout'%int_id sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') - sys.stderr.topic = identity+'.stderr' + sys.stderr.topic = 'engine.%i.stderr'%int_id if display_hook_factory: sys.displayhook = display_hook_factory(session, iopub_stream) - sys.displayhook.topic = identity+'.pyout' + sys.displayhook.topic = 'engine.%i.pyout'%int_id # launch heartbeat @@ -451,7 +453,7 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, else: client = None - kernel = Kernel(session=session, control_stream=control_stream, + kernel = Kernel(id=int_id, session=session, control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, client=client, loop=loop) kernel.start() diff --git a/IPython/zmq/parallel/util.py b/IPython/zmq/parallel/util.py index 67dfe26..61d9512 100644 --- a/IPython/zmq/parallel/util.py +++ b/IPython/zmq/parallel/util.py @@ -1,4 +1,5 @@ """some generic utilities""" +import re class ReverseDict(dict): """simple double-keyed subset of dict methods.""" @@ -33,3 +34,46 @@ class ReverseDict(dict): return default +def validate_url(url): + """validate a url for zeromq""" + if not isinstance(url, basestring): + raise TypeError("url must be a string, not %r"%type(url)) + url = url.lower() + + proto_addr = url.split('://') + assert len(proto_addr) == 2, 'Invalid url: %r'%url + proto, addr = proto_addr + assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto + + # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391 + # author: Remi Sabourin + pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$') + + if proto == 'tcp': + lis = addr.split(':') + assert len(lis) == 2, 'Invalid url: %r'%url + addr,s_port = lis + try: + port = int(s_port) + except ValueError: + raise AssertionError("Invalid port %r in url: %r"%(port, url)) + + assert pat.match(addr) is not None, 'Invalid url: %r'%url + + else: + # only validate tcp urls currently + pass + + return True + + +def validate_url_container(container): + """validate a potentially nested collection of urls.""" + if isinstance(container, basestring): + url = container + return validate_url(url) + elif isinstance(container, dict): + container = container.itervalues() + + for element in container: + validate_url_container(element) \ No newline at end of file diff --git a/examples/zmqontroller/logwatcher.py b/examples/zmqontroller/logwatcher.py index b8f8b09..7887818 100644 --- a/examples/zmqontroller/logwatcher.py +++ b/examples/zmqontroller/logwatcher.py @@ -19,6 +19,7 @@ # You should have received a copy of the Lesser GNU General Public License # along with this program. If not, see . +import sys import zmq logport = 20202 def main(topics, addrs): @@ -26,20 +27,27 @@ def main(topics, addrs): context = zmq.Context() socket = context.socket(zmq.SUB) for topic in topics: + print "Subscribing to: %r"%topic socket.setsockopt(zmq.SUBSCRIBE, topic) if addrs: for addr in addrs: print "Connecting to: ", addr socket.connect(addr) else: - socket.bind('tcp://127.0.0.1:%i'%logport) + socket.bind('tcp://*:%i'%logport) while True: # topic = socket.recv() # print topic - topic, msg = socket.recv_multipart() - # msg = socket.recv_pyobj() - print "%s | %s " % (topic, msg), + # print 'tic' + raw = socket.recv_multipart() + if len(raw) != 2: + print "!!! invalid log message: %s"%raw + else: + topic, msg = raw + # don't newline, since log messages always newline: + print "%s | %s" % (topic, msg), + sys.stdout.flush() if __name__ == '__main__': import sys