controller.py
905 lines
| 33.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
"""The IPython Controller with 0MQ | ||||
MinRK
|
r3556 | This is the master object that handles connections from engines and clients, | ||
and monitors traffic through the various queues. | ||||
MinRK
|
r3539 | """ | ||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3551 | # Copyright (C) 2010 The IPython Development Team | ||
MinRK
|
r3539 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3552 | from __future__ import print_function | ||
MinRK
|
r3539 | from datetime import datetime | ||
MinRK
|
r3550 | import logging | ||
MinRK
|
r3539 | |||
import zmq | ||||
from zmq.eventloop import zmqstream, ioloop | ||||
import uuid | ||||
# internal: | ||||
from IPython.zmq.log import logger # a Logger object | ||||
MinRK
|
r3550 | from IPython.zmq.entry_point import bind_port | ||
MinRK
|
r3539 | |||
MinRK
|
r3550 | from streamsession import Message, wrap_exception | ||
MinRK
|
r3552 | from entry_point import (make_base_argument_parser, select_random_ports, split_ports, | ||
connect_logger, parse_url) | ||||
MinRK
|
r3539 | |||
#----------------------------------------------------------------------------- | ||||
# Code | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3556 | def _passer(*args, **kwargs): | ||
return | ||||
MinRK
|
r3539 | class ReverseDict(dict): | ||
"""simple double-keyed subset of dict methods.""" | ||||
def __init__(self, *args, **kwargs): | ||||
dict.__init__(self, *args, **kwargs) | ||||
self.reverse = dict() | ||||
for key, value in self.iteritems(): | ||||
self.reverse[value] = key | ||||
def __getitem__(self, key): | ||||
try: | ||||
return dict.__getitem__(self, key) | ||||
except KeyError: | ||||
return self.reverse[key] | ||||
def __setitem__(self, key, value): | ||||
if key in self.reverse: | ||||
raise KeyError("Can't have key %r on both sides!"%key) | ||||
dict.__setitem__(self, key, value) | ||||
self.reverse[value] = key | ||||
def pop(self, key): | ||||
value = dict.pop(self, key) | ||||
self.d1.pop(value) | ||||
return value | ||||
class EngineConnector(object): | ||||
"""A simple object for accessing the various zmq connections of an object. | ||||
Attributes are: | ||||
id (int): engine ID | ||||
uuid (str): uuid (unused?) | ||||
queue (str): identity of queue's XREQ socket | ||||
registration (str): identity of registration XREQ socket | ||||
heartbeat (str): identity of heartbeat XREQ socket | ||||
""" | ||||
id=0 | ||||
queue=None | ||||
control=None | ||||
registration=None | ||||
heartbeat=None | ||||
pending=None | ||||
def __init__(self, id, queue, registration, control, heartbeat=None): | ||||
logger.info("engine::Engine Connected: %i"%id) | ||||
self.id = id | ||||
self.queue = queue | ||||
self.registration = registration | ||||
self.control = control | ||||
self.heartbeat = heartbeat | ||||
class Controller(object): | ||||
"""The IPython Controller with 0MQ connections | ||||
Parameters | ||||
========== | ||||
loop: zmq IOLoop instance | ||||
session: StreamSession object | ||||
<removed> context: zmq context for creating new connections (?) | ||||
MinRK
|
r3556 | queue: ZMQStream for monitoring the command queue (SUB) | ||
MinRK
|
r3539 | registrar: ZMQStream for engine registration requests (XREP) | ||
MinRK
|
r3556 | heartbeat: HeartMonitor object checking the pulse of the engines | ||
MinRK
|
r3539 | clientele: ZMQStream for client connections (XREP) | ||
not used for jobs, only query/control commands | ||||
MinRK
|
r3556 | notifier: ZMQStream for broadcasting engine registration changes (PUB) | ||
db: connection to db for out of memory logging of commands | ||||
MinRK
|
r3539 | NotImplemented | ||
MinRK
|
r3556 | engine_addrs: dict of zmq connection information for engines to connect | ||
to the queues. | ||||
client_addrs: dict of zmq connection information for engines to connect | ||||
to the queues. | ||||
MinRK
|
r3539 | """ | ||
# internal data structures: | ||||
ids=None # engine IDs | ||||
keytable=None | ||||
engines=None | ||||
clients=None | ||||
hearts=None | ||||
pending=None | ||||
results=None | ||||
tasks=None | ||||
completed=None | ||||
mia=None | ||||
incoming_registrations=None | ||||
registration_timeout=None | ||||
#objects from constructor: | ||||
loop=None | ||||
registrar=None | ||||
clientelle=None | ||||
queue=None | ||||
heartbeat=None | ||||
notifier=None | ||||
db=None | ||||
client_addr=None | ||||
engine_addrs=None | ||||
def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): | ||||
""" | ||||
# universal: | ||||
loop: IOLoop for creating future connections | ||||
session: streamsession for sending serialized data | ||||
# engine: | ||||
queue: ZMQStream for monitoring queue messages | ||||
registrar: ZMQStream for engine registration | ||||
heartbeat: HeartMonitor object for tracking engines | ||||
# client: | ||||
clientele: ZMQStream for client connections | ||||
# extra: | ||||
db: ZMQStream for db connection (NotImplemented) | ||||
engine_addrs: zmq address/protocol dict for engine connections | ||||
client_addrs: zmq address/protocol dict for client connections | ||||
""" | ||||
self.ids = set() | ||||
self.keytable={} | ||||
self.incoming_registrations={} | ||||
self.engines = {} | ||||
self.by_ident = {} | ||||
self.clients = {} | ||||
self.hearts = {} | ||||
self.mia = set() | ||||
# self.sockets = {} | ||||
self.loop = loop | ||||
self.session = session | ||||
self.registrar = registrar | ||||
self.clientele = clientele | ||||
self.queue = queue | ||||
self.heartbeat = heartbeat | ||||
self.notifier = notifier | ||||
self.db = db | ||||
MinRK
|
r3556 | # validate connection dicts: | ||
MinRK
|
r3539 | self.client_addrs = client_addrs | ||
assert isinstance(client_addrs['queue'], str) | ||||
MinRK
|
r3556 | assert isinstance(client_addrs['control'], str) | ||
MinRK
|
r3539 | # self.hb_addrs = hb_addrs | ||
self.engine_addrs = engine_addrs | ||||
assert isinstance(engine_addrs['queue'], str) | ||||
MinRK
|
r3556 | assert isinstance(client_addrs['control'], str) | ||
MinRK
|
r3539 | assert len(engine_addrs['heartbeat']) == 2 | ||
# register our callbacks | ||||
self.registrar.on_recv(self.dispatch_register_request) | ||||
self.clientele.on_recv(self.dispatch_client_msg) | ||||
self.queue.on_recv(self.dispatch_queue_traffic) | ||||
if heartbeat is not None: | ||||
heartbeat.add_heart_failure_handler(self.handle_heart_failure) | ||||
heartbeat.add_new_heart_handler(self.handle_new_heart) | ||||
MinRK
|
r3556 | self.queue_handlers = { 'in' : self.save_queue_request, | ||
'out': self.save_queue_result, | ||||
'intask': self.save_task_request, | ||||
'outtask': self.save_task_result, | ||||
'tracktask': self.save_task_destination, | ||||
'incontrol': _passer, | ||||
'outcontrol': _passer, | ||||
} | ||||
MinRK
|
r3539 | self.client_handlers = {'queue_request': self.queue_status, | ||
'result_request': self.get_results, | ||||
'purge_request': self.purge_results, | ||||
MinRK
|
r3556 | 'load_request': self.check_load, | ||
MinRK
|
r3539 | 'resubmit_request': self.resubmit_task, | ||
} | ||||
self.registrar_handlers = {'registration_request' : self.register_engine, | ||||
'unregistration_request' : self.unregister_engine, | ||||
'connection_request': self.connection_request, | ||||
} | ||||
# | ||||
# this is the stuff that will move to DB: | ||||
self.results = {} # completed results | ||||
self.pending = {} # pending messages, keyed by msg_id | ||||
self.queues = {} # pending msg_ids keyed by engine_id | ||||
self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | ||||
self.completed = {} # completed msg_ids keyed by engine_id | ||||
self.registration_timeout = max(5000, 2*self.heartbeat.period) | ||||
logger.info("controller::created controller") | ||||
def _new_id(self): | ||||
"""gemerate a new ID""" | ||||
newid = 0 | ||||
incoming = [id[0] for id in self.incoming_registrations.itervalues()] | ||||
# print newid, self.ids, self.incoming_registrations | ||||
while newid in self.ids or newid in incoming: | ||||
newid += 1 | ||||
return newid | ||||
#----------------------------------------------------------------------------- | ||||
# message validation | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3551 | |||
MinRK
|
r3539 | def _validate_targets(self, targets): | ||
"""turn any valid targets argument into a list of integer ids""" | ||||
if targets is None: | ||||
# default to all | ||||
targets = self.ids | ||||
if isinstance(targets, (int,str,unicode)): | ||||
# only one target specified | ||||
targets = [targets] | ||||
_targets = [] | ||||
for t in targets: | ||||
# map raw identities to ids | ||||
if isinstance(t, (str,unicode)): | ||||
t = self.by_ident.get(t, t) | ||||
_targets.append(t) | ||||
targets = _targets | ||||
bad_targets = [ t for t in targets if t not in self.ids ] | ||||
if bad_targets: | ||||
raise IndexError("No Such Engine: %r"%bad_targets) | ||||
if not targets: | ||||
raise IndexError("No Engines Registered") | ||||
return targets | ||||
def _validate_client_msg(self, msg): | ||||
"""validates and unpacks headers of a message. Returns False if invalid, | ||||
(ident, header, parent, content)""" | ||||
client_id = msg[0] | ||||
try: | ||||
msg = self.session.unpack_message(msg[1:], content=True) | ||||
except: | ||||
logger.error("client::Invalid Message %s"%msg) | ||||
return False | ||||
msg_type = msg.get('msg_type', None) | ||||
if msg_type is None: | ||||
return False | ||||
header = msg.get('header') | ||||
# session doesn't handle split content for now: | ||||
return client_id, msg | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3551 | # dispatch methods (1 per stream) | ||
MinRK
|
r3539 | #----------------------------------------------------------------------------- | ||
def dispatch_register_request(self, msg): | ||||
"""""" | ||||
logger.debug("registration::dispatch_register_request(%s)"%msg) | ||||
idents,msg = self.session.feed_identities(msg) | ||||
MinRK
|
r3552 | print (idents,msg, len(msg)) | ||
MinRK
|
r3539 | try: | ||
msg = self.session.unpack_message(msg,content=True) | ||||
MinRK
|
r3556 | except Exception as e: | ||
MinRK
|
r3539 | logger.error("registration::got bad registration message: %s"%msg) | ||
raise e | ||||
return | ||||
msg_type = msg['msg_type'] | ||||
content = msg['content'] | ||||
handler = self.registrar_handlers.get(msg_type, None) | ||||
if handler is None: | ||||
logger.error("registration::got bad registration message: %s"%msg) | ||||
else: | ||||
handler(idents, msg) | ||||
def dispatch_queue_traffic(self, msg): | ||||
"""all ME and Task queue messages come through here""" | ||||
logger.debug("queue traffic: %s"%msg[:2]) | ||||
switch = msg[0] | ||||
idents, msg = self.session.feed_identities(msg[1:]) | ||||
MinRK
|
r3556 | handler = self.queue_handlers.get(switch, None) | ||
if handler is not None: | ||||
handler(idents, msg) | ||||
MinRK
|
r3539 | else: | ||
logger.error("Invalid message topic: %s"%switch) | ||||
def dispatch_client_msg(self, msg): | ||||
"""Route messages from clients""" | ||||
idents, msg = self.session.feed_identities(msg) | ||||
client_id = idents[0] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=True) | ||||
except: | ||||
content = wrap_exception() | ||||
logger.error("Bad Client Message: %s"%msg) | ||||
self.session.send(self.clientele, "controller_error", ident=client_id, | ||||
content=content) | ||||
return | ||||
# print client_id, header, parent, content | ||||
#switch on message type: | ||||
msg_type = msg['msg_type'] | ||||
logger.info("client:: client %s requested %s"%(client_id, msg_type)) | ||||
handler = self.client_handlers.get(msg_type, None) | ||||
try: | ||||
assert handler is not None, "Bad Message Type: %s"%msg_type | ||||
except: | ||||
content = wrap_exception() | ||||
logger.error("Bad Message Type: %s"%msg_type) | ||||
self.session.send(self.clientele, "controller_error", ident=client_id, | ||||
content=content) | ||||
return | ||||
else: | ||||
handler(client_id, msg) | ||||
def dispatch_db(self, msg): | ||||
"""""" | ||||
raise NotImplementedError | ||||
#--------------------------------------------------------------------------- | ||||
# handler methods (1 per event) | ||||
#--------------------------------------------------------------------------- | ||||
#----------------------- Heartbeat -------------------------------------- | ||||
def handle_new_heart(self, heart): | ||||
"""handler to attach to heartbeater. | ||||
Called when a new heart starts to beat. | ||||
Triggers completion of registration.""" | ||||
logger.debug("heartbeat::handle_new_heart(%r)"%heart) | ||||
if heart not in self.incoming_registrations: | ||||
logger.info("heartbeat::ignoring new heart: %r"%heart) | ||||
else: | ||||
self.finish_registration(heart) | ||||
def handle_heart_failure(self, heart): | ||||
"""handler to attach to heartbeater. | ||||
called when a previously registered heart fails to respond to beat request. | ||||
triggers unregistration""" | ||||
logger.debug("heartbeat::handle_heart_failure(%r)"%heart) | ||||
eid = self.hearts.get(heart, None) | ||||
MinRK
|
r3550 | queue = self.engines[eid].queue | ||
MinRK
|
r3539 | if eid is None: | ||
logger.info("heartbeat::ignoring heart failure %r"%heart) | ||||
else: | ||||
MinRK
|
r3550 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) | ||
MinRK
|
r3539 | |||
#----------------------- MUX Queue Traffic ------------------------------ | ||||
def save_queue_request(self, idents, msg): | ||||
queue_id, client_id = idents[:2] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
except: | ||||
logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg)) | ||||
return | ||||
eid = self.by_ident.get(queue_id, None) | ||||
if eid is None: | ||||
logger.error("queue::target %r not registered"%queue_id) | ||||
logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) | ||||
return | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
info = dict(submit=datetime.now(), | ||||
received=None, | ||||
engine=(eid, queue_id)) | ||||
self.pending[msg_id] = ( msg, info ) | ||||
MinRK
|
r3556 | self.queues[eid].append(msg_id) | ||
MinRK
|
r3539 | |||
def save_queue_result(self, idents, msg): | ||||
client_id, queue_id = idents[:2] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
except: | ||||
logger.error("queue::engine %r sent invalid message to %r: %s"%( | ||||
queue_id,client_id, msg)) | ||||
return | ||||
eid = self.by_ident.get(queue_id, None) | ||||
if eid is None: | ||||
logger.error("queue::unknown engine %r is sending a reply: "%queue_id) | ||||
logger.debug("queue:: %s"%msg[2:]) | ||||
return | ||||
parent = msg['parent_header'] | ||||
if not parent: | ||||
return | ||||
msg_id = parent['msg_id'] | ||||
self.results[msg_id] = msg | ||||
if msg_id in self.pending: | ||||
self.pending.pop(msg_id) | ||||
MinRK
|
r3556 | self.queues[eid].remove(msg_id) | ||
MinRK
|
r3539 | self.completed[eid].append(msg_id) | ||
else: | ||||
logger.debug("queue:: unknown msg finished %s"%msg_id) | ||||
#--------------------- Task Queue Traffic ------------------------------ | ||||
def save_task_request(self, idents, msg): | ||||
MinRK
|
r3556 | """Save the submission of a task.""" | ||
MinRK
|
r3539 | client_id = idents[0] | ||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
except: | ||||
logger.error("task::client %r sent invalid task message: %s"%( | ||||
client_id, msg)) | ||||
return | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
self.mia.add(msg_id) | ||||
MinRK
|
r3556 | info = dict(submit=datetime.now(), | ||
received=None, | ||||
engine=None) | ||||
self.pending[msg_id] = (msg, info) | ||||
MinRK
|
r3539 | if not self.tasks.has_key(client_id): | ||
self.tasks[client_id] = [] | ||||
self.tasks[client_id].append(msg_id) | ||||
def save_task_result(self, idents, msg): | ||||
MinRK
|
r3556 | """save the result of a completed task.""" | ||
client_id, engine_uuid = idents[:2] | ||||
MinRK
|
r3539 | try: | ||
msg = self.session.unpack_message(msg, content=False) | ||||
except: | ||||
logger.error("task::invalid task result message send to %r: %s"%( | ||||
client_id, msg)) | ||||
return | ||||
parent = msg['parent_header'] | ||||
MinRK
|
r3556 | eid = self.by_ident[engine_uuid] | ||
MinRK
|
r3539 | if not parent: | ||
# print msg | ||||
# logger.warn("") | ||||
return | ||||
msg_id = parent['msg_id'] | ||||
self.results[msg_id] = msg | ||||
MinRK
|
r3556 | if msg_id in self.pending and msg_id in self.tasks[eid]: | ||
MinRK
|
r3539 | self.pending.pop(msg_id) | ||
if msg_id in self.mia: | ||||
self.mia.remove(msg_id) | ||||
MinRK
|
r3556 | self.completed[eid].append(msg_id) | ||
self.tasks[eid].remove(msg_id) | ||||
MinRK
|
r3539 | else: | ||
MinRK
|
r3551 | logger.debug("task::unknown task %s finished"%msg_id) | ||
MinRK
|
r3539 | |||
def save_task_destination(self, idents, msg): | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=True) | ||||
except: | ||||
logger.error("task::invalid task tracking message") | ||||
return | ||||
content = msg['content'] | ||||
MinRK
|
r3552 | print (content) | ||
MinRK
|
r3539 | msg_id = content['msg_id'] | ||
engine_uuid = content['engine_id'] | ||||
MinRK
|
r3556 | eid = self.by_ident[engine_uuid] | ||
MinRK
|
r3539 | |||
MinRK
|
r3551 | logger.info("task::task %s arrived on %s"%(msg_id, eid)) | ||
MinRK
|
r3539 | if msg_id in self.mia: | ||
self.mia.remove(msg_id) | ||||
else: | ||||
logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | ||||
MinRK
|
r3556 | |||
self.tasks[eid].append(msg_id) | ||||
self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) | ||||
MinRK
|
r3539 | |||
def mia_task_request(self, idents, msg): | ||||
client_id = idents[0] | ||||
content = dict(mia=self.mia,status='ok') | ||||
self.session.send('mia_reply', content=content, idents=client_id) | ||||
MinRK
|
r3556 | #------------------------------------------------------------------------- | ||
# Registration requests | ||||
#------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
def connection_request(self, client_id, msg): | ||||
MinRK
|
r3556 | """Reply with connection addresses for clients.""" | ||
MinRK
|
r3539 | logger.info("client::client %s connected"%client_id) | ||
content = dict(status='ok') | ||||
content.update(self.client_addrs) | ||||
jsonable = {} | ||||
for k,v in self.keytable.iteritems(): | ||||
jsonable[str(k)] = v | ||||
content['engines'] = jsonable | ||||
self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) | ||||
def register_engine(self, reg, msg): | ||||
MinRK
|
r3556 | """Register a new engine.""" | ||
MinRK
|
r3539 | content = msg['content'] | ||
try: | ||||
queue = content['queue'] | ||||
except KeyError: | ||||
logger.error("registration::queue not specified") | ||||
return | ||||
heart = content.get('heartbeat', None) | ||||
"""register a new engine, and create the socket(s) necessary""" | ||||
eid = self._new_id() | ||||
# print (eid, queue, reg, heart) | ||||
logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | ||||
content = dict(id=eid,status='ok') | ||||
content.update(self.engine_addrs) | ||||
# check if requesting available IDs: | ||||
if queue in self.by_ident: | ||||
content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | ||||
elif heart in self.hearts: # need to check unique hearts? | ||||
content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | ||||
else: | ||||
for h, pack in self.incoming_registrations.iteritems(): | ||||
if heart == h: | ||||
content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | ||||
break | ||||
elif queue == pack[1]: | ||||
content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | ||||
break | ||||
msg = self.session.send(self.registrar, "registration_reply", | ||||
content=content, | ||||
ident=reg) | ||||
if content['status'] == 'ok': | ||||
if heart in self.heartbeat.hearts: | ||||
# already beating | ||||
self.incoming_registrations[heart] = (eid,queue,reg,None) | ||||
self.finish_registration(heart) | ||||
else: | ||||
purge = lambda : self._purge_stalled_registration(heart) | ||||
dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | ||||
dc.start() | ||||
self.incoming_registrations[heart] = (eid,queue,reg,dc) | ||||
else: | ||||
logger.error("registration::registration %i failed: %s"%(eid, content['reason'])) | ||||
return eid | ||||
def unregister_engine(self, ident, msg): | ||||
MinRK
|
r3556 | """Unregister an engine that explicitly requested to leave.""" | ||
MinRK
|
r3539 | try: | ||
eid = msg['content']['id'] | ||||
except: | ||||
logger.error("registration::bad engine id for unregistration: %s"%ident) | ||||
return | ||||
logger.info("registration::unregister_engine(%s)"%eid) | ||||
content=dict(id=eid, queue=self.engines[eid].queue) | ||||
self.ids.remove(eid) | ||||
self.keytable.pop(eid) | ||||
ec = self.engines.pop(eid) | ||||
self.hearts.pop(ec.heartbeat) | ||||
self.by_ident.pop(ec.queue) | ||||
self.completed.pop(eid) | ||||
MinRK
|
r3556 | for msg_id in self.queues.pop(eid): | ||
MinRK
|
r3539 | msg = self.pending.pop(msg_id) | ||
############## TODO: HANDLE IT ################ | ||||
if self.notifier: | ||||
self.session.send(self.notifier, "unregistration_notification", content=content) | ||||
def finish_registration(self, heart): | ||||
MinRK
|
r3556 | """Second half of engine registration, called after our HeartMonitor | ||
has received a beat from the Engine's Heart.""" | ||||
MinRK
|
r3539 | try: | ||
(eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | ||||
except KeyError: | ||||
logger.error("registration::tried to finish nonexistant registration") | ||||
return | ||||
logger.info("registration::finished registering engine %i:%r"%(eid,queue)) | ||||
if purge is not None: | ||||
purge.stop() | ||||
control = queue | ||||
self.ids.add(eid) | ||||
self.keytable[eid] = queue | ||||
self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) | ||||
self.by_ident[queue] = eid | ||||
MinRK
|
r3556 | self.queues[eid] = list() | ||
self.tasks[eid] = list() | ||||
MinRK
|
r3539 | self.completed[eid] = list() | ||
self.hearts[heart] = eid | ||||
content = dict(id=eid, queue=self.engines[eid].queue) | ||||
if self.notifier: | ||||
self.session.send(self.notifier, "registration_notification", content=content) | ||||
def _purge_stalled_registration(self, heart): | ||||
if heart in self.incoming_registrations: | ||||
eid = self.incoming_registrations.pop(heart)[0] | ||||
logger.info("registration::purging stalled registration: %i"%eid) | ||||
else: | ||||
pass | ||||
MinRK
|
r3556 | #------------------------------------------------------------------------- | ||
# Client Requests | ||||
#------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
def check_load(self, client_id, msg): | ||||
content = msg['content'] | ||||
try: | ||||
targets = content['targets'] | ||||
targets = self._validate_targets(targets) | ||||
except: | ||||
content = wrap_exception() | ||||
self.session.send(self.clientele, "controller_error", | ||||
content=content, ident=client_id) | ||||
return | ||||
content = dict(status='ok') | ||||
# loads = {} | ||||
for t in targets: | ||||
MinRK
|
r3556 | content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) | ||
MinRK
|
r3539 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) | ||
def queue_status(self, client_id, msg): | ||||
MinRK
|
r3556 | """Return the Queue status of one or more targets. | ||
if verbose: return the msg_ids | ||||
else: return len of each type. | ||||
keys: queue (pending MUX jobs) | ||||
tasks (pending Task jobs) | ||||
completed (finished jobs from both queues)""" | ||||
MinRK
|
r3539 | content = msg['content'] | ||
targets = content['targets'] | ||||
try: | ||||
targets = self._validate_targets(targets) | ||||
except: | ||||
content = wrap_exception() | ||||
self.session.send(self.clientele, "controller_error", | ||||
content=content, ident=client_id) | ||||
return | ||||
MinRK
|
r3556 | verbose = content.get('verbose', False) | ||
content = dict(status='ok') | ||||
MinRK
|
r3539 | for t in targets: | ||
queue = self.queues[t] | ||||
completed = self.completed[t] | ||||
MinRK
|
r3556 | tasks = self.tasks[t] | ||
MinRK
|
r3539 | if not verbose: | ||
queue = len(queue) | ||||
completed = len(completed) | ||||
MinRK
|
r3556 | tasks = len(tasks) | ||
content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | ||||
MinRK
|
r3539 | # pending | ||
self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | ||||
def purge_results(self, client_id, msg): | ||||
MinRK
|
r3556 | """Purge results from memory. This method is more valuable before we move | ||
to a DB based message storage mechanism.""" | ||||
MinRK
|
r3539 | content = msg['content'] | ||
msg_ids = content.get('msg_ids', []) | ||||
reply = dict(status='ok') | ||||
if msg_ids == 'all': | ||||
self.results = {} | ||||
else: | ||||
for msg_id in msg_ids: | ||||
if msg_id in self.results: | ||||
self.results.pop(msg_id) | ||||
else: | ||||
if msg_id in self.pending: | ||||
reply = dict(status='error', reason="msg pending: %r"%msg_id) | ||||
else: | ||||
reply = dict(status='error', reason="No such msg: %r"%msg_id) | ||||
break | ||||
eids = content.get('engine_ids', []) | ||||
for eid in eids: | ||||
if eid not in self.engines: | ||||
reply = dict(status='error', reason="No such engine: %i"%eid) | ||||
break | ||||
msg_ids = self.completed.pop(eid) | ||||
for msg_id in msg_ids: | ||||
self.results.pop(msg_id) | ||||
self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | ||||
def resubmit_task(self, client_id, msg, buffers): | ||||
MinRK
|
r3556 | """Resubmit a task.""" | ||
raise NotImplementedError | ||||
MinRK
|
r3539 | |||
def get_results(self, client_id, msg): | ||||
MinRK
|
r3556 | """Get the result of 1 or more messages.""" | ||
MinRK
|
r3539 | content = msg['content'] | ||
msg_ids = set(content['msg_ids']) | ||||
statusonly = content.get('status_only', False) | ||||
pending = [] | ||||
completed = [] | ||||
content = dict(status='ok') | ||||
content['pending'] = pending | ||||
content['completed'] = completed | ||||
for msg_id in msg_ids: | ||||
if msg_id in self.pending: | ||||
pending.append(msg_id) | ||||
elif msg_id in self.results: | ||||
completed.append(msg_id) | ||||
if not statusonly: | ||||
content[msg_id] = self.results[msg_id]['content'] | ||||
else: | ||||
content = dict(status='error') | ||||
content['reason'] = 'no such message: '+msg_id | ||||
break | ||||
self.session.send(self.clientele, "result_reply", content=content, | ||||
parent=msg, ident=client_id) | ||||
MinRK
|
r3556 | #------------------------------------------------------------------------- | ||
MinRK
|
r3550 | # Entry Point | ||
MinRK
|
r3556 | #------------------------------------------------------------------------- | ||
MinRK
|
r3552 | def make_argument_parser(): | ||
"""Make an argument parser""" | ||||
parser = make_base_argument_parser() | ||||
MinRK
|
r3550 | |||
parser.add_argument('--client', type=int, metavar='PORT', default=0, | ||||
help='set the XREP port for clients [default: random]') | ||||
parser.add_argument('--notice', type=int, metavar='PORT', default=0, | ||||
help='set the PUB socket for registration notification [default: random]') | ||||
parser.add_argument('--hb', type=str, metavar='PORTS', | ||||
help='set the 2 ports for heartbeats [default: random]') | ||||
parser.add_argument('--ping', type=int, default=3000, | ||||
help='set the heartbeat period in ms [default: 3000]') | ||||
parser.add_argument('--monitor', type=int, metavar='PORT', default=0, | ||||
help='set the SUB port for queue monitoring [default: random]') | ||||
parser.add_argument('--mux', type=str, metavar='PORTS', | ||||
help='set the XREP ports for the MUX queue [default: random]') | ||||
parser.add_argument('--task', type=str, metavar='PORTS', | ||||
help='set the XREP/XREQ ports for the task queue [default: random]') | ||||
parser.add_argument('--control', type=str, metavar='PORTS', | ||||
help='set the XREP ports for the control queue [default: random]') | ||||
parser.add_argument('--scheduler', type=str, default='pure', | ||||
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], | ||||
help='select the task scheduler [default: pure ZMQ]') | ||||
MinRK
|
r3552 | return parser | ||
MinRK
|
r3550 | |||
MinRK
|
r3552 | def main(): | ||
import time | ||||
from multiprocessing import Process | ||||
from zmq.eventloop.zmqstream import ZMQStream | ||||
from zmq.devices import ProcessMonitoredQueue | ||||
from zmq.log import handlers | ||||
import streamsession as session | ||||
import heartmonitor | ||||
from scheduler import launch_scheduler | ||||
parser = make_argument_parser() | ||||
args = parser.parse_args() | ||||
parse_url(args) | ||||
MinRK
|
r3550 | |||
iface="%s://%s"%(args.transport,args.ip)+':%i' | ||||
random_ports = 0 | ||||
if args.hb: | ||||
hb = split_ports(args.hb, 2) | ||||
else: | ||||
hb = select_random_ports(2) | ||||
if args.mux: | ||||
mux = split_ports(args.mux, 2) | ||||
else: | ||||
mux = None | ||||
random_ports += 2 | ||||
if args.task: | ||||
task = split_ports(args.task, 2) | ||||
else: | ||||
task = None | ||||
random_ports += 2 | ||||
if args.control: | ||||
control = split_ports(args.control, 2) | ||||
else: | ||||
control = None | ||||
random_ports += 2 | ||||
ctx = zmq.Context() | ||||
loop = ioloop.IOLoop.instance() | ||||
# setup logging | ||||
connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel) | ||||
# Registrar socket | ||||
reg = ZMQStream(ctx.socket(zmq.XREP), loop) | ||||
regport = bind_port(reg, args.ip, args.regport) | ||||
### Engine connections ### | ||||
# heartbeat | ||||
hpub = ctx.socket(zmq.PUB) | ||||
bind_port(hpub, args.ip, hb[0]) | ||||
hrep = ctx.socket(zmq.XREP) | ||||
bind_port(hrep, args.ip, hb[1]) | ||||
hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping) | ||||
hmon.start() | ||||
### Client connections ### | ||||
# Clientele socket | ||||
c = ZMQStream(ctx.socket(zmq.XREP), loop) | ||||
cport = bind_port(c, args.ip, args.client) | ||||
# Notifier socket | ||||
n = ZMQStream(ctx.socket(zmq.PUB), loop) | ||||
nport = bind_port(n, args.ip, args.notice) | ||||
thesession = session.StreamSession(username=args.ident or "controller") | ||||
### build and launch the queues ### | ||||
# monitor socket | ||||
sub = ctx.socket(zmq.SUB) | ||||
sub.setsockopt(zmq.SUBSCRIBE, "") | ||||
monport = bind_port(sub, args.ip, args.monitor) | ||||
sub = ZMQStream(sub, loop) | ||||
ports = select_random_ports(random_ports) | ||||
# Multiplexer Queue (in a Process) | ||||
if not mux: | ||||
mux = (ports.pop(),ports.pop()) | ||||
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | ||||
q.bind_in(iface%mux[0]) | ||||
q.bind_out(iface%mux[1]) | ||||
q.connect_mon(iface%monport) | ||||
q.daemon=True | ||||
q.start() | ||||
# Control Queue (in a Process) | ||||
if not control: | ||||
control = (ports.pop(),ports.pop()) | ||||
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | ||||
q.bind_in(iface%control[0]) | ||||
q.bind_out(iface%control[1]) | ||||
q.connect_mon(iface%monport) | ||||
q.daemon=True | ||||
q.start() | ||||
# Task Queue (in a Process) | ||||
if not task: | ||||
task = (ports.pop(),ports.pop()) | ||||
if args.scheduler == 'pure': | ||||
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | ||||
q.bind_in(iface%task[0]) | ||||
q.bind_out(iface%task[1]) | ||||
q.connect_mon(iface%monport) | ||||
q.daemon=True | ||||
q.start() | ||||
else: | ||||
sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler) | ||||
MinRK
|
r3552 | print (sargs) | ||
MinRK
|
r3550 | p = Process(target=launch_scheduler, args=sargs) | ||
p.daemon=True | ||||
p.start() | ||||
time.sleep(.25) | ||||
# build connection dicts | ||||
engine_addrs = { | ||||
'control' : iface%control[1], | ||||
'queue': iface%mux[1], | ||||
'heartbeat': (iface%hb[0], iface%hb[1]), | ||||
'task' : iface%task[1], | ||||
'monitor' : iface%monport, | ||||
} | ||||
client_addrs = { | ||||
'control' : iface%control[0], | ||||
'query': iface%cport, | ||||
'queue': iface%mux[0], | ||||
'task' : iface%task[0], | ||||
'notification': iface%nport | ||||
} | ||||
con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs) | ||||
MinRK
|
r3552 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) | ||
MinRK
|
r3550 | loop.start() | ||
MinRK
|
r3552 | |||
if __name__ == '__main__': | ||||
main() | ||||