#!/usr/bin/env python # encoding: utf-8 """The IPython Controller with 0MQ This is the master object that handles connections from engines, clients, and """ #----------------------------------------------------------------------------- # Copyright (C) 2008-2009 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from datetime import datetime import zmq from zmq.eventloop import zmqstream, ioloop import uuid # internal: from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack from IPython.zmq.log import logger # a Logger object # from messages import json # use the same import switches #----------------------------------------------------------------------------- # Code #----------------------------------------------------------------------------- class ReverseDict(dict): """simple double-keyed subset of dict methods.""" def __init__(self, *args, **kwargs): dict.__init__(self, *args, **kwargs) self.reverse = dict() for key, value in self.iteritems(): self.reverse[value] = key def __getitem__(self, key): try: return dict.__getitem__(self, key) except KeyError: return self.reverse[key] def __setitem__(self, key, value): if key in self.reverse: raise KeyError("Can't have key %r on both sides!"%key) dict.__setitem__(self, key, value) self.reverse[value] = key def pop(self, key): value = dict.pop(self, key) self.d1.pop(value) return value class EngineConnector(object): """A simple object for accessing the various zmq connections of an object. Attributes are: id (int): engine ID uuid (str): uuid (unused?) queue (str): identity of queue's XREQ socket registration (str): identity of registration XREQ socket heartbeat (str): identity of heartbeat XREQ socket """ id=0 queue=None control=None registration=None heartbeat=None pending=None def __init__(self, id, queue, registration, control, heartbeat=None): logger.info("engine::Engine Connected: %i"%id) self.id = id self.queue = queue self.registration = registration self.control = control self.heartbeat = heartbeat class Controller(object): """The IPython Controller with 0MQ connections Parameters ========== loop: zmq IOLoop instance session: StreamSession object context: zmq context for creating new connections (?) registrar: ZMQStream for engine registration requests (XREP) clientele: ZMQStream for client connections (XREP) not used for jobs, only query/control commands queue: ZMQStream for monitoring the command queue (SUB) heartbeat: HeartMonitor object checking the pulse of the engines db_stream: connection to db for out of memory logging of commands NotImplemented queue_addr: zmq connection address of the XREP socket for the queue hb_addr: zmq connection address of the PUB socket for heartbeats task_addr: zmq connection address of the XREQ socket for task queue """ # internal data structures: ids=None # engine IDs keytable=None engines=None clients=None hearts=None pending=None results=None tasks=None completed=None mia=None incoming_registrations=None registration_timeout=None #objects from constructor: loop=None registrar=None clientelle=None queue=None heartbeat=None notifier=None db=None client_addr=None engine_addrs=None def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): """ # universal: loop: IOLoop for creating future connections session: streamsession for sending serialized data # engine: queue: ZMQStream for monitoring queue messages registrar: ZMQStream for engine registration heartbeat: HeartMonitor object for tracking engines # client: clientele: ZMQStream for client connections # extra: db: ZMQStream for db connection (NotImplemented) engine_addrs: zmq address/protocol dict for engine connections client_addrs: zmq address/protocol dict for client connections """ self.ids = set() self.keytable={} self.incoming_registrations={} self.engines = {} self.by_ident = {} self.clients = {} self.hearts = {} self.mia = set() # self.sockets = {} self.loop = loop self.session = session self.registrar = registrar self.clientele = clientele self.queue = queue self.heartbeat = heartbeat self.notifier = notifier self.db = db self.client_addrs = client_addrs assert isinstance(client_addrs['queue'], str) # self.hb_addrs = hb_addrs self.engine_addrs = engine_addrs assert isinstance(engine_addrs['queue'], str) assert len(engine_addrs['heartbeat']) == 2 # register our callbacks self.registrar.on_recv(self.dispatch_register_request) self.clientele.on_recv(self.dispatch_client_msg) self.queue.on_recv(self.dispatch_queue_traffic) if heartbeat is not None: heartbeat.add_heart_failure_handler(self.handle_heart_failure) heartbeat.add_new_heart_handler(self.handle_new_heart) if self.db is not None: self.db.on_recv(self.dispatch_db) self.client_handlers = {'queue_request': self.queue_status, 'result_request': self.get_results, 'purge_request': self.purge_results, 'resubmit_request': self.resubmit_task, } self.registrar_handlers = {'registration_request' : self.register_engine, 'unregistration_request' : self.unregister_engine, 'connection_request': self.connection_request, } # # this is the stuff that will move to DB: self.results = {} # completed results self.pending = {} # pending messages, keyed by msg_id self.queues = {} # pending msg_ids keyed by engine_id self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id self.completed = {} # completed msg_ids keyed by engine_id self.registration_timeout = max(5000, 2*self.heartbeat.period) logger.info("controller::created controller") def _new_id(self): """gemerate a new ID""" newid = 0 incoming = [id[0] for id in self.incoming_registrations.itervalues()] # print newid, self.ids, self.incoming_registrations while newid in self.ids or newid in incoming: newid += 1 return newid #----------------------------------------------------------------------------- # message validation #----------------------------------------------------------------------------- def _validate_targets(self, targets): """turn any valid targets argument into a list of integer ids""" if targets is None: # default to all targets = self.ids if isinstance(targets, (int,str,unicode)): # only one target specified targets = [targets] _targets = [] for t in targets: # map raw identities to ids if isinstance(t, (str,unicode)): t = self.by_ident.get(t, t) _targets.append(t) targets = _targets bad_targets = [ t for t in targets if t not in self.ids ] if bad_targets: raise IndexError("No Such Engine: %r"%bad_targets) if not targets: raise IndexError("No Engines Registered") return targets def _validate_client_msg(self, msg): """validates and unpacks headers of a message. Returns False if invalid, (ident, header, parent, content)""" client_id = msg[0] try: msg = self.session.unpack_message(msg[1:], content=True) except: logger.error("client::Invalid Message %s"%msg) return False msg_type = msg.get('msg_type', None) if msg_type is None: return False header = msg.get('header') # session doesn't handle split content for now: return client_id, msg #----------------------------------------------------------------------------- # dispatch methods (1 per socket) #----------------------------------------------------------------------------- def dispatch_register_request(self, msg): """""" logger.debug("registration::dispatch_register_request(%s)"%msg) idents,msg = self.session.feed_identities(msg) print idents,msg, len(msg) try: msg = self.session.unpack_message(msg,content=True) except Exception, e: logger.error("registration::got bad registration message: %s"%msg) raise e return msg_type = msg['msg_type'] content = msg['content'] handler = self.registrar_handlers.get(msg_type, None) if handler is None: logger.error("registration::got bad registration message: %s"%msg) else: handler(idents, msg) def dispatch_queue_traffic(self, msg): """all ME and Task queue messages come through here""" logger.debug("queue traffic: %s"%msg[:2]) switch = msg[0] idents, msg = self.session.feed_identities(msg[1:]) if switch == 'in': self.save_queue_request(idents, msg) elif switch == 'out': self.save_queue_result(idents, msg) elif switch == 'intask': self.save_task_request(idents, msg) elif switch == 'outtask': self.save_task_result(idents, msg) elif switch == 'tracktask': self.save_task_destination(idents, msg) elif switch in ('incontrol', 'outcontrol'): pass else: logger.error("Invalid message topic: %s"%switch) def dispatch_client_msg(self, msg): """Route messages from clients""" idents, msg = self.session.feed_identities(msg) client_id = idents[0] try: msg = self.session.unpack_message(msg, content=True) except: content = wrap_exception() logger.error("Bad Client Message: %s"%msg) self.session.send(self.clientele, "controller_error", ident=client_id, content=content) return # print client_id, header, parent, content #switch on message type: msg_type = msg['msg_type'] logger.info("client:: client %s requested %s"%(client_id, msg_type)) handler = self.client_handlers.get(msg_type, None) try: assert handler is not None, "Bad Message Type: %s"%msg_type except: content = wrap_exception() logger.error("Bad Message Type: %s"%msg_type) self.session.send(self.clientele, "controller_error", ident=client_id, content=content) return else: handler(client_id, msg) def dispatch_db(self, msg): """""" raise NotImplementedError #--------------------------------------------------------------------------- # handler methods (1 per event) #--------------------------------------------------------------------------- #----------------------- Heartbeat -------------------------------------- def handle_new_heart(self, heart): """handler to attach to heartbeater. Called when a new heart starts to beat. Triggers completion of registration.""" logger.debug("heartbeat::handle_new_heart(%r)"%heart) if heart not in self.incoming_registrations: logger.info("heartbeat::ignoring new heart: %r"%heart) else: self.finish_registration(heart) def handle_heart_failure(self, heart): """handler to attach to heartbeater. called when a previously registered heart fails to respond to beat request. triggers unregistration""" logger.debug("heartbeat::handle_heart_failure(%r)"%heart) eid = self.hearts.get(heart, None) if eid is None: logger.info("heartbeat::ignoring heart failure %r"%heart) else: self.unregister_engine(heart, dict(content=dict(id=eid))) #----------------------- MUX Queue Traffic ------------------------------ def save_queue_request(self, idents, msg): queue_id, client_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) except: logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg)) return eid = self.by_ident.get(queue_id, None) if eid is None: logger.error("queue::target %r not registered"%queue_id) logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) return header = msg['header'] msg_id = header['msg_id'] info = dict(submit=datetime.now(), received=None, engine=(eid, queue_id)) self.pending[msg_id] = ( msg, info ) self.queues[eid][0].append(msg_id) def save_queue_result(self, idents, msg): client_id, queue_id = idents[:2] try: msg = self.session.unpack_message(msg, content=False) except: logger.error("queue::engine %r sent invalid message to %r: %s"%( queue_id,client_id, msg)) return eid = self.by_ident.get(queue_id, None) if eid is None: logger.error("queue::unknown engine %r is sending a reply: "%queue_id) logger.debug("queue:: %s"%msg[2:]) return parent = msg['parent_header'] if not parent: return msg_id = parent['msg_id'] self.results[msg_id] = msg if msg_id in self.pending: self.pending.pop(msg_id) self.queues[eid][0].remove(msg_id) self.completed[eid].append(msg_id) else: logger.debug("queue:: unknown msg finished %s"%msg_id) #--------------------- Task Queue Traffic ------------------------------ def save_task_request(self, idents, msg): client_id = idents[0] try: msg = self.session.unpack_message(msg, content=False) except: logger.error("task::client %r sent invalid task message: %s"%( client_id, msg)) return header = msg['header'] msg_id = header['msg_id'] self.mia.add(msg_id) self.pending[msg_id] = msg if not self.tasks.has_key(client_id): self.tasks[client_id] = [] self.tasks[client_id].append(msg_id) def save_task_result(self, idents, msg): client_id = idents[0] try: msg = self.session.unpack_message(msg, content=False) except: logger.error("task::invalid task result message send to %r: %s"%( client_id, msg)) return parent = msg['parent_header'] if not parent: # print msg # logger.warn("") return msg_id = parent['msg_id'] self.results[msg_id] = msg if msg_id in self.pending: self.pending.pop(msg_id) if msg_id in self.mia: self.mia.remove(msg_id) else: logger.debug("task:: unknown task %s finished"%msg_id) def save_task_destination(self, idents, msg): try: msg = self.session.unpack_message(msg, content=True) except: logger.error("task::invalid task tracking message") return content = msg['content'] print content msg_id = content['msg_id'] engine_uuid = content['engine_id'] for eid,queue_id in self.keytable.iteritems(): if queue_id == engine_uuid: break logger.info("task:: task %s arrived on %s"%(msg_id, eid)) if msg_id in self.mia: self.mia.remove(msg_id) else: logger.debug("task::task %s not listed as MIA?!"%(msg_id)) self.tasks[engine_uuid].append(msg_id) def mia_task_request(self, idents, msg): client_id = idents[0] content = dict(mia=self.mia,status='ok') self.session.send('mia_reply', content=content, idents=client_id) #-------------------- Registration ----------------------------- def connection_request(self, client_id, msg): """reply with connection addresses for clients""" logger.info("client::client %s connected"%client_id) content = dict(status='ok') content.update(self.client_addrs) jsonable = {} for k,v in self.keytable.iteritems(): jsonable[str(k)] = v content['engines'] = jsonable self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) def register_engine(self, reg, msg): """register an engine""" content = msg['content'] try: queue = content['queue'] except KeyError: logger.error("registration::queue not specified") return heart = content.get('heartbeat', None) """register a new engine, and create the socket(s) necessary""" eid = self._new_id() # print (eid, queue, reg, heart) logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) content = dict(id=eid,status='ok') content.update(self.engine_addrs) # check if requesting available IDs: if queue in self.by_ident: content = {'status': 'error', 'reason': "queue_id %r in use"%queue} elif heart in self.hearts: # need to check unique hearts? content = {'status': 'error', 'reason': "heart_id %r in use"%heart} else: for h, pack in self.incoming_registrations.iteritems(): if heart == h: content = {'status': 'error', 'reason': "heart_id %r in use"%heart} break elif queue == pack[1]: content = {'status': 'error', 'reason': "queue_id %r in use"%queue} break msg = self.session.send(self.registrar, "registration_reply", content=content, ident=reg) if content['status'] == 'ok': if heart in self.heartbeat.hearts: # already beating self.incoming_registrations[heart] = (eid,queue,reg,None) self.finish_registration(heart) else: purge = lambda : self._purge_stalled_registration(heart) dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) dc.start() self.incoming_registrations[heart] = (eid,queue,reg,dc) else: logger.error("registration::registration %i failed: %s"%(eid, content['reason'])) return eid def unregister_engine(self, ident, msg): try: eid = msg['content']['id'] except: logger.error("registration::bad engine id for unregistration: %s"%ident) return logger.info("registration::unregister_engine(%s)"%eid) content=dict(id=eid, queue=self.engines[eid].queue) self.ids.remove(eid) self.keytable.pop(eid) ec = self.engines.pop(eid) self.hearts.pop(ec.heartbeat) self.by_ident.pop(ec.queue) self.completed.pop(eid) for msg_id in self.queues.pop(eid)[0]: msg = self.pending.pop(msg_id) ############## TODO: HANDLE IT ################ if self.notifier: self.session.send(self.notifier, "unregistration_notification", content=content) def finish_registration(self, heart): try: (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) except KeyError: logger.error("registration::tried to finish nonexistant registration") return logger.info("registration::finished registering engine %i:%r"%(eid,queue)) if purge is not None: purge.stop() control = queue self.ids.add(eid) self.keytable[eid] = queue self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) self.by_ident[queue] = eid self.queues[eid] = ([],[]) self.completed[eid] = list() self.hearts[heart] = eid content = dict(id=eid, queue=self.engines[eid].queue) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: eid = self.incoming_registrations.pop(heart)[0] logger.info("registration::purging stalled registration: %i"%eid) else: pass #------------------- Client Requests ------------------------------- def check_load(self, client_id, msg): content = msg['content'] try: targets = content['targets'] targets = self._validate_targets(targets) except: content = wrap_exception() self.session.send(self.clientele, "controller_error", content=content, ident=client_id) return content = dict(status='ok') # loads = {} for t in targets: content[str(t)] = len(self.queues[t]) self.session.send(self.clientele, "load_reply", content=content, ident=client_id) def queue_status(self, client_id, msg): """handle queue_status request""" content = msg['content'] targets = content['targets'] try: targets = self._validate_targets(targets) except: content = wrap_exception() self.session.send(self.clientele, "controller_error", content=content, ident=client_id) return verbose = msg.get('verbose', False) content = dict() for t in targets: queue = self.queues[t] completed = self.completed[t] if not verbose: queue = len(queue) completed = len(completed) content[str(t)] = {'queue': queue, 'completed': completed } # pending self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) def job_status(self, client_id, msg): """handle queue_status request""" content = msg['content'] msg_ids = content['msg_ids'] try: targets = self._validate_targets(targets) except: content = wrap_exception() self.session.send(self.clientele, "controller_error", content=content, ident=client_id) return verbose = msg.get('verbose', False) content = dict() for t in targets: queue = self.queues[t] completed = self.completed[t] if not verbose: queue = len(queue) completed = len(completed) content[str(t)] = {'queue': queue, 'completed': completed } # pending self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) def purge_results(self, client_id, msg): content = msg['content'] msg_ids = content.get('msg_ids', []) reply = dict(status='ok') if msg_ids == 'all': self.results = {} else: for msg_id in msg_ids: if msg_id in self.results: self.results.pop(msg_id) else: if msg_id in self.pending: reply = dict(status='error', reason="msg pending: %r"%msg_id) else: reply = dict(status='error', reason="No such msg: %r"%msg_id) break eids = content.get('engine_ids', []) for eid in eids: if eid not in self.engines: reply = dict(status='error', reason="No such engine: %i"%eid) break msg_ids = self.completed.pop(eid) for msg_id in msg_ids: self.results.pop(msg_id) self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) def resubmit_task(self, client_id, msg, buffers): content = msg['content'] header = msg['header'] msg_ids = content.get('msg_ids', []) reply = dict(status='ok') if msg_ids == 'all': self.results = {} else: for msg_id in msg_ids: if msg_id in self.results: self.results.pop(msg_id) else: if msg_id in self.pending: reply = dict(status='error', reason="msg pending: %r"%msg_id) else: reply = dict(status='error', reason="No such msg: %r"%msg_id) break eids = content.get('engine_ids', []) for eid in eids: if eid not in self.engines: reply = dict(status='error', reason="No such engine: %i"%eid) break msg_ids = self.completed.pop(eid) for msg_id in msg_ids: self.results.pop(msg_id) self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) def get_results(self, client_id, msg): """get the result of 1 or more messages""" content = msg['content'] msg_ids = set(content['msg_ids']) statusonly = content.get('status_only', False) pending = [] completed = [] content = dict(status='ok') content['pending'] = pending content['completed'] = completed for msg_id in msg_ids: if msg_id in self.pending: pending.append(msg_id) elif msg_id in self.results: completed.append(msg_id) if not statusonly: content[msg_id] = self.results[msg_id]['content'] else: content = dict(status='error') content['reason'] = 'no such message: '+msg_id break self.session.send(self.clientele, "result_reply", content=content, parent=msg, ident=client_id) ############ OLD METHODS for Python Relay Controller ################### def _validate_engine_msg(self, msg): """validates and unpacks headers of a message. Returns False if invalid, (ident, message)""" ident = msg[0] try: msg = self.session.unpack_message(msg[1:], content=False) except: logger.error("engine.%s::Invalid Message %s"%(ident, msg)) return False try: eid = msg.header.username assert self.engines.has_key(eid) except: logger.error("engine::Invalid Engine ID %s"%(ident)) return False return eid, msg