hub.py
1270 lines
| 48.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3599 | #!/usr/bin/env python | ||
"""The IPython Controller Hub with 0MQ | ||||
This is the master object that handles connections from engines and clients, | ||||
and monitors traffic through the various queues. | ||||
""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
from __future__ import print_function | ||||
import sys | ||||
import time | ||||
MinRK
|
r3631 | from datetime import datetime | ||
MinRK
|
r3599 | |||
import zmq | ||||
MinRK
|
r3604 | from zmq.eventloop import ioloop | ||
from zmq.eventloop.zmqstream import ZMQStream | ||||
MinRK
|
r3599 | |||
# internal: | ||||
MinRK
|
r3604 | from IPython.utils.importstring import import_item | ||
MinRK
|
r3985 | from IPython.utils.traitlets import ( | ||
MinRK
|
r3988 | HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr | ||
MinRK
|
r3985 | ) | ||
MinRK
|
r3604 | |||
MinRK
|
r3780 | from IPython.parallel import error, util | ||
MinRK
|
r3673 | from IPython.parallel.factory import RegistrationFactory, LoggingFactory | ||
MinRK
|
r3599 | |||
MinRK
|
r3642 | from .heartmonitor import HeartMonitor | ||
MinRK
|
r3599 | |||
#----------------------------------------------------------------------------- | ||||
# Code | ||||
#----------------------------------------------------------------------------- | ||||
def _passer(*args, **kwargs): | ||||
return | ||||
MinRK
|
r3602 | def _printer(*args, **kwargs): | ||
print (args) | ||||
print (kwargs) | ||||
MinRK
|
r3668 | def empty_record(): | ||
"""Return an empty dict with all record keys.""" | ||||
return { | ||||
'msg_id' : None, | ||||
'header' : None, | ||||
'content': None, | ||||
'buffers': None, | ||||
'submitted': None, | ||||
'client_uuid' : None, | ||||
'engine_uuid' : None, | ||||
'started': None, | ||||
'completed': None, | ||||
'resubmitted': None, | ||||
'result_header' : None, | ||||
'result_content' : None, | ||||
'result_buffers' : None, | ||||
'queue' : None, | ||||
'pyin' : None, | ||||
'pyout': None, | ||||
'pyerr': None, | ||||
'stdout': '', | ||||
'stderr': '', | ||||
} | ||||
MinRK
|
r3599 | def init_record(msg): | ||
MinRK
|
r3603 | """Initialize a TaskRecord based on a request.""" | ||
MinRK
|
r3599 | header = msg['header'] | ||
return { | ||||
'msg_id' : header['msg_id'], | ||||
'header' : header, | ||||
'content': msg['content'], | ||||
'buffers': msg['buffers'], | ||||
MinRK
|
r3780 | 'submitted': datetime.strptime(header['date'], util.ISO8601), | ||
MinRK
|
r3599 | 'client_uuid' : None, | ||
'engine_uuid' : None, | ||||
'started': None, | ||||
'completed': None, | ||||
'resubmitted': None, | ||||
'result_header' : None, | ||||
'result_content' : None, | ||||
'result_buffers' : None, | ||||
MinRK
|
r3602 | 'queue' : None, | ||
'pyin' : None, | ||||
'pyout': None, | ||||
'pyerr': None, | ||||
'stdout': '', | ||||
'stderr': '', | ||||
MinRK
|
r3599 | } | ||
MinRK
|
r3603 | class EngineConnector(HasTraits): | ||
MinRK
|
r3599 | """A simple object for accessing the various zmq connections of an object. | ||
Attributes are: | ||||
id (int): engine ID | ||||
uuid (str): uuid (unused?) | ||||
queue (str): identity of queue's XREQ socket | ||||
registration (str): identity of registration XREQ socket | ||||
heartbeat (str): identity of heartbeat XREQ socket | ||||
""" | ||||
MinRK
|
r3603 | id=Int(0) | ||
MinRK
|
r3988 | queue=CStr() | ||
control=CStr() | ||||
registration=CStr() | ||||
heartbeat=CStr() | ||||
MinRK
|
r3604 | pending=Set() | ||
class HubFactory(RegistrationFactory): | ||||
"""The Configurable for setting up a Hub.""" | ||||
# port-pairs for monitoredqueues: | ||||
MinRK
|
r3985 | hb = Tuple(Int,Int,config=True, | ||
help="""XREQ/SUB Port pair for Engine heartbeats""") | ||||
MinRK
|
r3604 | def _hb_default(self): | ||
MinRK
|
r3985 | return tuple(util.select_random_ports(2)) | ||
mux = Tuple(Int,Int,config=True, | ||||
help="""Engine/Client Port pair for MUX queue""") | ||||
MinRK
|
r3604 | |||
def _mux_default(self): | ||||
MinRK
|
r3985 | return tuple(util.select_random_ports(2)) | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | task = Tuple(Int,Int,config=True, | ||
help="""Engine/Client Port pair for Task queue""") | ||||
MinRK
|
r3604 | def _task_default(self): | ||
MinRK
|
r3985 | return tuple(util.select_random_ports(2)) | ||
control = Tuple(Int,Int,config=True, | ||||
help="""Engine/Client Port pair for Control queue""") | ||||
MinRK
|
r3604 | |||
def _control_default(self): | ||||
MinRK
|
r3985 | return tuple(util.select_random_ports(2)) | ||
iopub = Tuple(Int,Int,config=True, | ||||
help="""Engine/Client Port pair for IOPub relay""") | ||||
MinRK
|
r3604 | |||
def _iopub_default(self): | ||||
MinRK
|
r3985 | return tuple(util.select_random_ports(2)) | ||
MinRK
|
r3604 | |||
# single ports: | ||||
MinRK
|
r3985 | mon_port = Int(config=True, | ||
help="""Monitor (SUB) port for queue traffic""") | ||||
MinRK
|
r3604 | def _mon_port_default(self): | ||
MinRK
|
r3780 | return util.select_random_ports(1)[0] | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | notifier_port = Int(config=True, | ||
help="""PUB port for sending engine status notifications""") | ||||
MinRK
|
r3604 | def _notifier_port_default(self): | ||
MinRK
|
r3780 | return util.select_random_ports(1)[0] | ||
MinRK
|
r3604 | |||
MinRK
|
r3988 | engine_ip = Unicode('127.0.0.1', config=True, | ||
MinRK
|
r3985 | help="IP on which to listen for engine connections. [default: loopback]") | ||
MinRK
|
r3988 | engine_transport = Unicode('tcp', config=True, | ||
MinRK
|
r3985 | help="0MQ transport for engine connections. [default: tcp]") | ||
MinRK
|
r3604 | |||
MinRK
|
r3988 | client_ip = Unicode('127.0.0.1', config=True, | ||
MinRK
|
r3985 | help="IP on which to listen for client connections. [default: loopback]") | ||
MinRK
|
r3988 | client_transport = Unicode('tcp', config=True, | ||
MinRK
|
r3985 | help="0MQ transport for client connections. [default : tcp]") | ||
MinRK
|
r3604 | |||
MinRK
|
r3988 | monitor_ip = Unicode('127.0.0.1', config=True, | ||
MinRK
|
r3985 | help="IP on which to listen for monitor messages. [default: loopback]") | ||
MinRK
|
r3988 | monitor_transport = Unicode('tcp', config=True, | ||
MinRK
|
r3985 | help="0MQ transport for monitor messages. [default : tcp]") | ||
MinRK
|
r3604 | |||
MinRK
|
r3988 | monitor_url = Unicode('') | ||
MinRK
|
r3604 | |||
MinRK
|
r3988 | db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True, | ||
MinRK
|
r3985 | help="""The class to use for the DB backend""") | ||
MinRK
|
r3604 | |||
# not configurable | ||||
MinRK
|
r3673 | db = Instance('IPython.parallel.controller.dictdb.BaseDB') | ||
heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor') | ||||
MinRK
|
r3604 | |||
MinRK
|
r3605 | def _ip_changed(self, name, old, new): | ||
self.engine_ip = new | ||||
self.client_ip = new | ||||
self.monitor_ip = new | ||||
self._update_monitor_url() | ||||
MinRK
|
r3604 | def _update_monitor_url(self): | ||
self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port) | ||||
MinRK
|
r3605 | def _transport_changed(self, name, old, new): | ||
self.engine_transport = new | ||||
self.client_transport = new | ||||
self.monitor_transport = new | ||||
MinRK
|
r3604 | self._update_monitor_url() | ||
MinRK
|
r3603 | |||
MinRK
|
r3604 | def __init__(self, **kwargs): | ||
super(HubFactory, self).__init__(**kwargs) | ||||
self._update_monitor_url() | ||||
def construct(self): | ||||
MinRK
|
r3985 | self.init_hub() | ||
MinRK
|
r3604 | |||
def start(self): | ||||
self.heartmonitor.start() | ||||
MinRK
|
r3610 | self.log.info("Heartmonitor started") | ||
MinRK
|
r3604 | |||
MinRK
|
r3985 | def init_hub(self): | ||
MinRK
|
r3604 | """construct""" | ||
client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i" | ||||
engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i" | ||||
ctx = self.context | ||||
loop = self.loop | ||||
# Registrar socket | ||||
MinRK
|
r3657 | q = ZMQStream(ctx.socket(zmq.XREP), loop) | ||
q.bind(client_iface % self.regport) | ||||
MinRK
|
r3610 | self.log.info("Hub listening on %s for registration."%(client_iface%self.regport)) | ||
MinRK
|
r3604 | if self.client_ip != self.engine_ip: | ||
MinRK
|
r3657 | q.bind(engine_iface % self.regport) | ||
MinRK
|
r3610 | self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport)) | ||
MinRK
|
r3604 | |||
### Engine connections ### | ||||
# heartbeat | ||||
hpub = ctx.socket(zmq.PUB) | ||||
hpub.bind(engine_iface % self.hb[0]) | ||||
hrep = ctx.socket(zmq.XREP) | ||||
hrep.bind(engine_iface % self.hb[1]) | ||||
MinRK
|
r3610 | self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop), | ||
MinRK
|
r3985 | config=self.config) | ||
MinRK
|
r3604 | |||
### Client connections ### | ||||
# Notifier socket | ||||
n = ZMQStream(ctx.socket(zmq.PUB), loop) | ||||
n.bind(client_iface%self.notifier_port) | ||||
### build and launch the queues ### | ||||
# monitor socket | ||||
sub = ctx.socket(zmq.SUB) | ||||
sub.setsockopt(zmq.SUBSCRIBE, "") | ||||
sub.bind(self.monitor_url) | ||||
MinRK
|
r3630 | sub.bind('inproc://monitor') | ||
MinRK
|
r3604 | sub = ZMQStream(sub, loop) | ||
# connect the db | ||||
MinRK
|
r3646 | self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1])) | ||
MinRK
|
r3657 | # cdir = self.config.Global.cluster_dir | ||
MinRK
|
r3988 | self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config) | ||
MinRK
|
r3604 | time.sleep(.25) | ||
MinRK
|
r3985 | try: | ||
scheme = self.config.TaskScheduler.scheme_name | ||||
except AttributeError: | ||||
from .scheduler import TaskScheduler | ||||
scheme = TaskScheduler.scheme_name.get_default_value() | ||||
MinRK
|
r3604 | # build connection dicts | ||
MinRK
|
r3622 | self.engine_info = { | ||
MinRK
|
r3604 | 'control' : engine_iface%self.control[1], | ||
'mux': engine_iface%self.mux[1], | ||||
'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), | ||||
'task' : engine_iface%self.task[1], | ||||
'iopub' : engine_iface%self.iopub[1], | ||||
# 'monitor' : engine_iface%self.mon_port, | ||||
} | ||||
MinRK
|
r3622 | self.client_info = { | ||
MinRK
|
r3604 | 'control' : client_iface%self.control[0], | ||
'mux': client_iface%self.mux[0], | ||||
MinRK
|
r3985 | 'task' : (scheme, client_iface%self.task[0]), | ||
MinRK
|
r3604 | 'iopub' : client_iface%self.iopub[0], | ||
'notification': client_iface%self.notifier_port | ||||
} | ||||
MinRK
|
r3646 | self.log.debug("Hub engine addrs: %s"%self.engine_info) | ||
self.log.debug("Hub client addrs: %s"%self.client_info) | ||||
MinRK
|
r3874 | |||
# resubmit stream | ||||
r = ZMQStream(ctx.socket(zmq.XREQ), loop) | ||||
url = util.disambiguate_url(self.client_info['task'][-1]) | ||||
r.setsockopt(zmq.IDENTITY, self.session.session) | ||||
r.connect(url) | ||||
MinRK
|
r3604 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | ||
MinRK
|
r3874 | query=q, notifier=n, resubmit=r, db=self.db, | ||
MinRK
|
r3622 | engine_info=self.engine_info, client_info=self.client_info, | ||
MinRK
|
r3610 | logname=self.log.name) | ||
MinRK
|
r3604 | |||
MinRK
|
r3610 | class Hub(LoggingFactory): | ||
MinRK
|
r3599 | """The IPython Controller Hub with 0MQ connections | ||
Parameters | ||||
========== | ||||
loop: zmq IOLoop instance | ||||
session: StreamSession object | ||||
<removed> context: zmq context for creating new connections (?) | ||||
queue: ZMQStream for monitoring the command queue (SUB) | ||||
MinRK
|
r3657 | query: ZMQStream for engine registration and client queries requests (XREP) | ||
MinRK
|
r3599 | heartbeat: HeartMonitor object checking the pulse of the engines | ||
notifier: ZMQStream for broadcasting engine registration changes (PUB) | ||||
db: connection to db for out of memory logging of commands | ||||
NotImplemented | ||||
MinRK
|
r3622 | engine_info: dict of zmq connection information for engines to connect | ||
MinRK
|
r3599 | to the queues. | ||
MinRK
|
r3622 | client_info: dict of zmq connection information for engines to connect | ||
MinRK
|
r3599 | to the queues. | ||
""" | ||||
# internal data structures: | ||||
MinRK
|
r3604 | ids=Set() # engine IDs | ||
keytable=Dict() | ||||
by_ident=Dict() | ||||
engines=Dict() | ||||
clients=Dict() | ||||
hearts=Dict() | ||||
pending=Set() | ||||
queues=Dict() # pending msg_ids keyed by engine_id | ||||
tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id | ||||
completed=Dict() # completed msg_ids keyed by engine_id | ||||
all_completed=Set() # completed msg_ids keyed by engine_id | ||||
MinRK
|
r3668 | dead_engines=Set() # completed msg_ids keyed by engine_id | ||
MinRK
|
r3687 | unassigned=Set() # set of task msg_ds not yet assigned a destination | ||
MinRK
|
r3604 | incoming_registrations=Dict() | ||
registration_timeout=Int() | ||||
_idcounter=Int(0) | ||||
MinRK
|
r3599 | |||
MinRK
|
r3603 | # objects from constructor: | ||
loop=Instance(ioloop.IOLoop) | ||||
MinRK
|
r3657 | query=Instance(ZMQStream) | ||
MinRK
|
r3604 | monitor=Instance(ZMQStream) | ||
notifier=Instance(ZMQStream) | ||||
MinRK
|
r3874 | resubmit=Instance(ZMQStream) | ||
heartmonitor=Instance(HeartMonitor) | ||||
MinRK
|
r3603 | db=Instance(object) | ||
MinRK
|
r3622 | client_info=Dict() | ||
engine_info=Dict() | ||||
MinRK
|
r3599 | |||
MinRK
|
r3603 | def __init__(self, **kwargs): | ||
MinRK
|
r3599 | """ | ||
# universal: | ||||
loop: IOLoop for creating future connections | ||||
session: streamsession for sending serialized data | ||||
# engine: | ||||
queue: ZMQStream for monitoring queue messages | ||||
MinRK
|
r3657 | query: ZMQStream for engine+client registration and client requests | ||
MinRK
|
r3599 | heartbeat: HeartMonitor object for tracking engines | ||
# extra: | ||||
db: ZMQStream for db connection (NotImplemented) | ||||
MinRK
|
r3622 | engine_info: zmq address/protocol dict for engine connections | ||
client_info: zmq address/protocol dict for client connections | ||||
MinRK
|
r3599 | """ | ||
MinRK
|
r3603 | |||
super(Hub, self).__init__(**kwargs) | ||||
self.registration_timeout = max(5000, 2*self.heartmonitor.period) | ||||
MinRK
|
r3599 | |||
# validate connection dicts: | ||||
MinRK
|
r3622 | for k,v in self.client_info.iteritems(): | ||
if k == 'task': | ||||
MinRK
|
r3780 | util.validate_url_container(v[1]) | ||
MinRK
|
r3622 | else: | ||
MinRK
|
r3780 | util.validate_url_container(v) | ||
# util.validate_url_container(self.client_info) | ||||
util.validate_url_container(self.engine_info) | ||||
MinRK
|
r3599 | |||
# register our callbacks | ||||
MinRK
|
r3657 | self.query.on_recv(self.dispatch_query) | ||
MinRK
|
r3603 | self.monitor.on_recv(self.dispatch_monitor_traffic) | ||
MinRK
|
r3599 | |||
MinRK
|
r3603 | self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure) | ||
self.heartmonitor.add_new_heart_handler(self.handle_new_heart) | ||||
MinRK
|
r3599 | |||
MinRK
|
r3602 | self.monitor_handlers = { 'in' : self.save_queue_request, | ||
MinRK
|
r3599 | 'out': self.save_queue_result, | ||
'intask': self.save_task_request, | ||||
'outtask': self.save_task_result, | ||||
'tracktask': self.save_task_destination, | ||||
'incontrol': _passer, | ||||
'outcontrol': _passer, | ||||
MinRK
|
r3602 | 'iopub': self.save_iopub_message, | ||
MinRK
|
r3599 | } | ||
MinRK
|
r3657 | self.query_handlers = {'queue_request': self.queue_status, | ||
MinRK
|
r3599 | 'result_request': self.get_results, | ||
MinRK
|
r3780 | 'history_request': self.get_history, | ||
'db_request': self.db_query, | ||||
MinRK
|
r3599 | 'purge_request': self.purge_results, | ||
'load_request': self.check_load, | ||||
'resubmit_request': self.resubmit_task, | ||||
'shutdown_request': self.shutdown_request, | ||||
MinRK
|
r3657 | 'registration_request' : self.register_engine, | ||
MinRK
|
r3599 | 'unregistration_request' : self.unregister_engine, | ||
'connection_request': self.connection_request, | ||||
} | ||||
MinRK
|
r3874 | # ignore resubmit replies | ||
self.resubmit.on_recv(lambda msg: None, copy=False) | ||||
MinRK
|
r3610 | self.log.info("hub::created hub") | ||
MinRK
|
r3599 | |||
MinRK
|
r3603 | @property | ||
def _next_id(self): | ||||
MinRK
|
r3605 | """gemerate a new ID. | ||
No longer reuse old ids, just count from 0.""" | ||||
MinRK
|
r3603 | newid = self._idcounter | ||
self._idcounter += 1 | ||||
MinRK
|
r3599 | return newid | ||
MinRK
|
r3603 | # newid = 0 | ||
# incoming = [id[0] for id in self.incoming_registrations.itervalues()] | ||||
# # print newid, self.ids, self.incoming_registrations | ||||
# while newid in self.ids or newid in incoming: | ||||
# newid += 1 | ||||
# return newid | ||||
MinRK
|
r3599 | |||
#----------------------------------------------------------------------------- | ||||
# message validation | ||||
#----------------------------------------------------------------------------- | ||||
def _validate_targets(self, targets): | ||||
"""turn any valid targets argument into a list of integer ids""" | ||||
if targets is None: | ||||
# default to all | ||||
targets = self.ids | ||||
if isinstance(targets, (int,str,unicode)): | ||||
# only one target specified | ||||
targets = [targets] | ||||
_targets = [] | ||||
for t in targets: | ||||
# map raw identities to ids | ||||
if isinstance(t, (str,unicode)): | ||||
t = self.by_ident.get(t, t) | ||||
_targets.append(t) | ||||
targets = _targets | ||||
bad_targets = [ t for t in targets if t not in self.ids ] | ||||
if bad_targets: | ||||
raise IndexError("No Such Engine: %r"%bad_targets) | ||||
if not targets: | ||||
raise IndexError("No Engines Registered") | ||||
return targets | ||||
#----------------------------------------------------------------------------- | ||||
# dispatch methods (1 per stream) | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3602 | def dispatch_monitor_traffic(self, msg): | ||
"""all ME and Task queue messages come through here, as well as | ||||
IOPub traffic.""" | ||||
MinRK
|
r3874 | self.log.debug("monitor traffic: %r"%msg[:2]) | ||
MinRK
|
r3599 | switch = msg[0] | ||
MinRK
|
r3996 | try: | ||
idents, msg = self.session.feed_identities(msg[1:]) | ||||
except ValueError: | ||||
idents=[] | ||||
MinRK
|
r3599 | if not idents: | ||
MinRK
|
r3874 | self.log.error("Bad Monitor Message: %r"%msg) | ||
MinRK
|
r3599 | return | ||
MinRK
|
r3602 | handler = self.monitor_handlers.get(switch, None) | ||
MinRK
|
r3599 | if handler is not None: | ||
handler(idents, msg) | ||||
else: | ||||
MinRK
|
r3874 | self.log.error("Invalid monitor topic: %r"%switch) | ||
MinRK
|
r3599 | |||
MinRK
|
r3657 | def dispatch_query(self, msg): | ||
"""Route registration requests and queries from clients.""" | ||||
MinRK
|
r3599 | idents, msg = self.session.feed_identities(msg) | ||
if not idents: | ||||
MinRK
|
r3874 | self.log.error("Bad Query Message: %r"%msg) | ||
MinRK
|
r3599 | return | ||
client_id = idents[0] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=True) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3874 | self.log.error("Bad Query Message: %r"%msg, exc_info=True) | ||
MinRK
|
r3657 | self.session.send(self.query, "hub_error", ident=client_id, | ||
MinRK
|
r3599 | content=content) | ||
return | ||||
# print client_id, header, parent, content | ||||
#switch on message type: | ||||
msg_type = msg['msg_type'] | ||||
MinRK
|
r3874 | self.log.info("client::client %r requested %r"%(client_id, msg_type)) | ||
MinRK
|
r3657 | handler = self.query_handlers.get(msg_type, None) | ||
MinRK
|
r3599 | try: | ||
MinRK
|
r3874 | assert handler is not None, "Bad Message Type: %r"%msg_type | ||
MinRK
|
r3599 | except: | ||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3874 | self.log.error("Bad Message Type: %r"%msg_type, exc_info=True) | ||
MinRK
|
r3657 | self.session.send(self.query, "hub_error", ident=client_id, | ||
MinRK
|
r3599 | content=content) | ||
return | ||||
MinRK
|
r3874 | |||
MinRK
|
r3599 | else: | ||
MinRK
|
r3657 | handler(idents, msg) | ||
MinRK
|
r3599 | |||
def dispatch_db(self, msg): | ||||
"""""" | ||||
raise NotImplementedError | ||||
#--------------------------------------------------------------------------- | ||||
# handler methods (1 per event) | ||||
#--------------------------------------------------------------------------- | ||||
#----------------------- Heartbeat -------------------------------------- | ||||
def handle_new_heart(self, heart): | ||||
"""handler to attach to heartbeater. | ||||
Called when a new heart starts to beat. | ||||
Triggers completion of registration.""" | ||||
MinRK
|
r3610 | self.log.debug("heartbeat::handle_new_heart(%r)"%heart) | ||
MinRK
|
r3599 | if heart not in self.incoming_registrations: | ||
MinRK
|
r3610 | self.log.info("heartbeat::ignoring new heart: %r"%heart) | ||
MinRK
|
r3599 | else: | ||
self.finish_registration(heart) | ||||
def handle_heart_failure(self, heart): | ||||
"""handler to attach to heartbeater. | ||||
called when a previously registered heart fails to respond to beat request. | ||||
triggers unregistration""" | ||||
MinRK
|
r3610 | self.log.debug("heartbeat::handle_heart_failure(%r)"%heart) | ||
MinRK
|
r3599 | eid = self.hearts.get(heart, None) | ||
queue = self.engines[eid].queue | ||||
if eid is None: | ||||
MinRK
|
r3610 | self.log.info("heartbeat::ignoring heart failure %r"%heart) | ||
MinRK
|
r3599 | else: | ||
self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) | ||||
#----------------------- MUX Queue Traffic ------------------------------ | ||||
def save_queue_request(self, idents, msg): | ||||
if len(idents) < 2: | ||||
MinRK
|
r3996 | self.log.error("invalid identity prefix: %r"%idents) | ||
MinRK
|
r3599 | return | ||
queue_id, client_id = idents[:2] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
MinRK
|
r3996 | except Exception: | ||
self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True) | ||||
MinRK
|
r3599 | return | ||
eid = self.by_ident.get(queue_id, None) | ||||
if eid is None: | ||||
MinRK
|
r3610 | self.log.error("queue::target %r not registered"%queue_id) | ||
MinRK
|
r3996 | self.log.debug("queue:: valid are: %r"%(self.by_ident.keys())) | ||
MinRK
|
r3599 | return | ||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
record = init_record(msg) | ||||
record['engine_uuid'] = queue_id | ||||
record['client_uuid'] = client_id | ||||
record['queue'] = 'mux' | ||||
MinRK
|
r3646 | |||
MinRK
|
r3668 | try: | ||
# it's posible iopub arrived first: | ||||
existing = self.db.get_record(msg_id) | ||||
for key,evalue in existing.iteritems(): | ||||
MinRK
|
r3874 | rvalue = record.get(key, None) | ||
MinRK
|
r3668 | if evalue and rvalue and evalue != rvalue: | ||
MinRK
|
r3874 | self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue)) | ||
MinRK
|
r3668 | elif evalue and not rvalue: | ||
record[key] = evalue | ||||
self.db.update_record(msg_id, record) | ||||
except KeyError: | ||||
self.db.add_record(msg_id, record) | ||||
MinRK
|
r3599 | self.pending.add(msg_id) | ||
self.queues[eid].append(msg_id) | ||||
def save_queue_result(self, idents, msg): | ||||
if len(idents) < 2: | ||||
MinRK
|
r3996 | self.log.error("invalid identity prefix: %r"%idents) | ||
MinRK
|
r3599 | return | ||
client_id, queue_id = idents[:2] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
MinRK
|
r3996 | except Exception: | ||
self.log.error("queue::engine %r sent invalid message to %r: %r"%( | ||||
MinRK
|
r3599 | queue_id,client_id, msg), exc_info=True) | ||
return | ||||
eid = self.by_ident.get(queue_id, None) | ||||
if eid is None: | ||||
MinRK
|
r3610 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) | ||
MinRK
|
r3599 | return | ||
parent = msg['parent_header'] | ||||
if not parent: | ||||
return | ||||
msg_id = parent['msg_id'] | ||||
if msg_id in self.pending: | ||||
self.pending.remove(msg_id) | ||||
self.all_completed.add(msg_id) | ||||
self.queues[eid].remove(msg_id) | ||||
self.completed[eid].append(msg_id) | ||||
MinRK
|
r3651 | elif msg_id not in self.all_completed: | ||
# it could be a result from a dead engine that died before delivering the | ||||
# result | ||||
MinRK
|
r3996 | self.log.warn("queue:: unknown msg finished %r"%msg_id) | ||
MinRK
|
r3651 | return | ||
# update record anyway, because the unregistration could have been premature | ||||
rheader = msg['header'] | ||||
MinRK
|
r3780 | completed = datetime.strptime(rheader['date'], util.ISO8601) | ||
MinRK
|
r3651 | started = rheader.get('started', None) | ||
if started is not None: | ||||
MinRK
|
r3780 | started = datetime.strptime(started, util.ISO8601) | ||
MinRK
|
r3651 | result = { | ||
'result_header' : rheader, | ||||
'result_content': msg['content'], | ||||
'started' : started, | ||||
'completed' : completed | ||||
} | ||||
MinRK
|
r3646 | |||
MinRK
|
r3651 | result['result_buffers'] = msg['buffers'] | ||
MinRK
|
r3780 | try: | ||
self.db.update_record(msg_id, result) | ||||
except Exception: | ||||
self.log.error("DB Error updating record %r"%msg_id, exc_info=True) | ||||
MinRK
|
r3651 | |||
MinRK
|
r3599 | |||
#--------------------- Task Queue Traffic ------------------------------ | ||||
def save_task_request(self, idents, msg): | ||||
"""Save the submission of a task.""" | ||||
client_id = idents[0] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
MinRK
|
r3996 | except Exception: | ||
self.log.error("task::client %r sent invalid task message: %r"%( | ||||
MinRK
|
r3599 | client_id, msg), exc_info=True) | ||
return | ||||
record = init_record(msg) | ||||
MinRK
|
r3646 | |||
MinRK
|
r3599 | record['client_uuid'] = client_id | ||
record['queue'] = 'task' | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
self.pending.add(msg_id) | ||||
MinRK
|
r3687 | self.unassigned.add(msg_id) | ||
MinRK
|
r3668 | try: | ||
# it's posible iopub arrived first: | ||||
existing = self.db.get_record(msg_id) | ||||
MinRK
|
r3874 | if existing['resubmitted']: | ||
for key in ('submitted', 'client_uuid', 'buffers'): | ||||
# don't clobber these keys on resubmit | ||||
# submitted and client_uuid should be different | ||||
# and buffers might be big, and shouldn't have changed | ||||
record.pop(key) | ||||
# still check content,header which should not change | ||||
# but are not expensive to compare as buffers | ||||
MinRK
|
r3668 | for key,evalue in existing.iteritems(): | ||
MinRK
|
r3874 | if key.endswith('buffers'): | ||
# don't compare buffers | ||||
continue | ||||
rvalue = record.get(key, None) | ||||
MinRK
|
r3668 | if evalue and rvalue and evalue != rvalue: | ||
MinRK
|
r3874 | self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue)) | ||
MinRK
|
r3668 | elif evalue and not rvalue: | ||
record[key] = evalue | ||||
self.db.update_record(msg_id, record) | ||||
except KeyError: | ||||
self.db.add_record(msg_id, record) | ||||
MinRK
|
r3780 | except Exception: | ||
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) | ||||
MinRK
|
r3599 | |||
def save_task_result(self, idents, msg): | ||||
"""save the result of a completed task.""" | ||||
client_id = idents[0] | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=False) | ||||
MinRK
|
r3996 | except Exception: | ||
self.log.error("task::invalid task result message send to %r: %r"%( | ||||
MinRK
|
r3602 | client_id, msg), exc_info=True) | ||
MinRK
|
r3599 | return | ||
parent = msg['parent_header'] | ||||
if not parent: | ||||
# print msg | ||||
MinRK
|
r3610 | self.log.warn("Task %r had no parent!"%msg) | ||
MinRK
|
r3599 | return | ||
msg_id = parent['msg_id'] | ||||
MinRK
|
r3687 | if msg_id in self.unassigned: | ||
self.unassigned.remove(msg_id) | ||||
MinRK
|
r3599 | |||
header = msg['header'] | ||||
engine_uuid = header.get('engine', None) | ||||
eid = self.by_ident.get(engine_uuid, None) | ||||
if msg_id in self.pending: | ||||
self.pending.remove(msg_id) | ||||
self.all_completed.add(msg_id) | ||||
if eid is not None: | ||||
self.completed[eid].append(msg_id) | ||||
if msg_id in self.tasks[eid]: | ||||
self.tasks[eid].remove(msg_id) | ||||
MinRK
|
r3780 | completed = datetime.strptime(header['date'], util.ISO8601) | ||
MinRK
|
r3599 | started = header.get('started', None) | ||
if started is not None: | ||||
MinRK
|
r3780 | started = datetime.strptime(started, util.ISO8601) | ||
MinRK
|
r3599 | result = { | ||
'result_header' : header, | ||||
'result_content': msg['content'], | ||||
'started' : started, | ||||
'completed' : completed, | ||||
'engine_uuid': engine_uuid | ||||
} | ||||
MinRK
|
r3646 | |||
result['result_buffers'] = msg['buffers'] | ||||
MinRK
|
r3780 | try: | ||
self.db.update_record(msg_id, result) | ||||
except Exception: | ||||
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True) | ||||
MinRK
|
r3599 | |||
else: | ||||
MinRK
|
r3996 | self.log.debug("task::unknown task %r finished"%msg_id) | ||
MinRK
|
r3599 | |||
def save_task_destination(self, idents, msg): | ||||
try: | ||||
msg = self.session.unpack_message(msg, content=True) | ||||
MinRK
|
r3996 | except Exception: | ||
MinRK
|
r3610 | self.log.error("task::invalid task tracking message", exc_info=True) | ||
MinRK
|
r3599 | return | ||
content = msg['content'] | ||||
MinRK
|
r3641 | # print (content) | ||
MinRK
|
r3599 | msg_id = content['msg_id'] | ||
engine_uuid = content['engine_id'] | ||||
eid = self.by_ident[engine_uuid] | ||||
MinRK
|
r3996 | self.log.info("task::task %r arrived on %r"%(msg_id, eid)) | ||
MinRK
|
r3687 | if msg_id in self.unassigned: | ||
self.unassigned.remove(msg_id) | ||||
MinRK
|
r3599 | # else: | ||
MinRK
|
r3996 | # self.log.debug("task::task %r not listed as MIA?!"%(msg_id)) | ||
MinRK
|
r3599 | |||
self.tasks[eid].append(msg_id) | ||||
# self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid)) | ||||
MinRK
|
r3780 | try: | ||
self.db.update_record(msg_id, dict(engine_uuid=engine_uuid)) | ||||
except Exception: | ||||
self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True) | ||||
MinRK
|
r3599 | |||
def mia_task_request(self, idents, msg): | ||||
raise NotImplementedError | ||||
client_id = idents[0] | ||||
# content = dict(mia=self.mia,status='ok') | ||||
# self.session.send('mia_reply', content=content, idents=client_id) | ||||
MinRK
|
r3602 | #--------------------- IOPub Traffic ------------------------------ | ||
def save_iopub_message(self, topics, msg): | ||||
"""save an iopub message into the db""" | ||||
MinRK
|
r3641 | # print (topics) | ||
MinRK
|
r3602 | try: | ||
msg = self.session.unpack_message(msg, content=True) | ||||
MinRK
|
r3996 | except Exception: | ||
MinRK
|
r3610 | self.log.error("iopub::invalid IOPub message", exc_info=True) | ||
MinRK
|
r3602 | return | ||
parent = msg['parent_header'] | ||||
MinRK
|
r3603 | if not parent: | ||
MinRK
|
r3996 | self.log.error("iopub::invalid IOPub message: %r"%msg) | ||
MinRK
|
r3603 | return | ||
MinRK
|
r3602 | msg_id = parent['msg_id'] | ||
msg_type = msg['msg_type'] | ||||
content = msg['content'] | ||||
# ensure msg_id is in db | ||||
try: | ||||
rec = self.db.get_record(msg_id) | ||||
MinRK
|
r3668 | except KeyError: | ||
rec = empty_record() | ||||
rec['msg_id'] = msg_id | ||||
self.db.add_record(msg_id, rec) | ||||
MinRK
|
r3602 | # stream | ||
d = {} | ||||
if msg_type == 'stream': | ||||
name = content['name'] | ||||
s = rec[name] or '' | ||||
d[name] = s + content['data'] | ||||
elif msg_type == 'pyerr': | ||||
d['pyerr'] = content | ||||
MinRK
|
r3684 | elif msg_type == 'pyin': | ||
d['pyin'] = content['code'] | ||||
MinRK
|
r3602 | else: | ||
MinRK
|
r3684 | d[msg_type] = content.get('data', '') | ||
MinRK
|
r3602 | |||
MinRK
|
r3780 | try: | ||
self.db.update_record(msg_id, d) | ||||
except Exception: | ||||
self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3599 | |||
#------------------------------------------------------------------------- | ||||
# Registration requests | ||||
#------------------------------------------------------------------------- | ||||
def connection_request(self, client_id, msg): | ||||
"""Reply with connection addresses for clients.""" | ||||
MinRK
|
r3996 | self.log.info("client::client %r connected"%client_id) | ||
MinRK
|
r3599 | content = dict(status='ok') | ||
MinRK
|
r3622 | content.update(self.client_info) | ||
MinRK
|
r3599 | jsonable = {} | ||
for k,v in self.keytable.iteritems(): | ||||
MinRK
|
r3668 | if v not in self.dead_engines: | ||
jsonable[str(k)] = v | ||||
MinRK
|
r3599 | content['engines'] = jsonable | ||
MinRK
|
r3657 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | ||
MinRK
|
r3599 | |||
def register_engine(self, reg, msg): | ||||
"""Register a new engine.""" | ||||
content = msg['content'] | ||||
try: | ||||
queue = content['queue'] | ||||
except KeyError: | ||||
MinRK
|
r3610 | self.log.error("registration::queue not specified", exc_info=True) | ||
MinRK
|
r3599 | return | ||
heart = content.get('heartbeat', None) | ||||
"""register a new engine, and create the socket(s) necessary""" | ||||
MinRK
|
r3603 | eid = self._next_id | ||
MinRK
|
r3599 | # print (eid, queue, reg, heart) | ||
MinRK
|
r3610 | self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | ||
MinRK
|
r3599 | |||
content = dict(id=eid,status='ok') | ||||
MinRK
|
r3622 | content.update(self.engine_info) | ||
MinRK
|
r3599 | # check if requesting available IDs: | ||
if queue in self.by_ident: | ||||
try: | ||||
raise KeyError("queue_id %r in use"%queue) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3610 | self.log.error("queue_id %r in use"%queue, exc_info=True) | ||
MinRK
|
r3599 | elif heart in self.hearts: # need to check unique hearts? | ||
try: | ||||
raise KeyError("heart_id %r in use"%heart) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("heart_id %r in use"%heart, exc_info=True) | ||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3599 | else: | ||
for h, pack in self.incoming_registrations.iteritems(): | ||||
if heart == h: | ||||
try: | ||||
raise KeyError("heart_id %r in use"%heart) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("heart_id %r in use"%heart, exc_info=True) | ||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3599 | break | ||
elif queue == pack[1]: | ||||
try: | ||||
raise KeyError("queue_id %r in use"%queue) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("queue_id %r in use"%queue, exc_info=True) | ||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3599 | break | ||
MinRK
|
r3657 | msg = self.session.send(self.query, "registration_reply", | ||
MinRK
|
r3599 | content=content, | ||
ident=reg) | ||||
if content['status'] == 'ok': | ||||
MinRK
|
r3603 | if heart in self.heartmonitor.hearts: | ||
MinRK
|
r3599 | # already beating | ||
MinRK
|
r3603 | self.incoming_registrations[heart] = (eid,queue,reg[0],None) | ||
MinRK
|
r3599 | self.finish_registration(heart) | ||
else: | ||||
purge = lambda : self._purge_stalled_registration(heart) | ||||
dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3603 | self.incoming_registrations[heart] = (eid,queue,reg[0],dc) | ||
MinRK
|
r3599 | else: | ||
MinRK
|
r3996 | self.log.error("registration::registration %i failed: %r"%(eid, content['evalue'])) | ||
MinRK
|
r3599 | return eid | ||
def unregister_engine(self, ident, msg): | ||||
"""Unregister an engine that explicitly requested to leave.""" | ||||
try: | ||||
eid = msg['content']['id'] | ||||
except: | ||||
MinRK
|
r3996 | self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True) | ||
MinRK
|
r3599 | return | ||
MinRK
|
r3996 | self.log.info("registration::unregister_engine(%r)"%eid) | ||
MinRK
|
r3651 | # print (eid) | ||
MinRK
|
r3668 | uuid = self.keytable[eid] | ||
content=dict(id=eid, queue=uuid) | ||||
self.dead_engines.add(uuid) | ||||
# self.ids.remove(eid) | ||||
# uuid = self.keytable.pop(eid) | ||||
# | ||||
# ec = self.engines.pop(eid) | ||||
# self.hearts.pop(ec.heartbeat) | ||||
# self.by_ident.pop(ec.queue) | ||||
# self.completed.pop(eid) | ||||
handleit = lambda : self._handle_stranded_msgs(eid, uuid) | ||||
dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) | ||||
dc.start() | ||||
############## TODO: HANDLE IT ################ | ||||
MinRK
|
r3599 | |||
if self.notifier: | ||||
self.session.send(self.notifier, "unregistration_notification", content=content) | ||||
MinRK
|
r3651 | def _handle_stranded_msgs(self, eid, uuid): | ||
"""Handle messages known to be on an engine when the engine unregisters. | ||||
It is possible that this will fire prematurely - that is, an engine will | ||||
go down after completing a result, and the client will be notified | ||||
that the result failed and later receive the actual result. | ||||
""" | ||||
MinRK
|
r3668 | outstanding = self.queues[eid] | ||
MinRK
|
r3651 | |||
for msg_id in outstanding: | ||||
self.pending.remove(msg_id) | ||||
self.all_completed.add(msg_id) | ||||
try: | ||||
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id)) | ||||
except: | ||||
content = error.wrap_exception() | ||||
# build a fake header: | ||||
header = {} | ||||
header['engine'] = uuid | ||||
MinRK
|
r3780 | header['date'] = datetime.now() | ||
MinRK
|
r3651 | rec = dict(result_content=content, result_header=header, result_buffers=[]) | ||
rec['completed'] = header['date'] | ||||
rec['engine_uuid'] = uuid | ||||
MinRK
|
r3780 | try: | ||
self.db.update_record(msg_id, rec) | ||||
except Exception: | ||||
self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True) | ||||
MinRK
|
r3651 | |||
MinRK
|
r3599 | def finish_registration(self, heart): | ||
"""Second half of engine registration, called after our HeartMonitor | ||||
has received a beat from the Engine's Heart.""" | ||||
try: | ||||
(eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | ||||
except KeyError: | ||||
MinRK
|
r3610 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) | ||
MinRK
|
r3599 | return | ||
MinRK
|
r3610 | self.log.info("registration::finished registering engine %i:%r"%(eid,queue)) | ||
MinRK
|
r3599 | if purge is not None: | ||
purge.stop() | ||||
control = queue | ||||
self.ids.add(eid) | ||||
self.keytable[eid] = queue | ||||
MinRK
|
r3603 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, | ||
control=control, heartbeat=heart) | ||||
MinRK
|
r3599 | self.by_ident[queue] = eid | ||
self.queues[eid] = list() | ||||
self.tasks[eid] = list() | ||||
self.completed[eid] = list() | ||||
self.hearts[heart] = eid | ||||
content = dict(id=eid, queue=self.engines[eid].queue) | ||||
if self.notifier: | ||||
self.session.send(self.notifier, "registration_notification", content=content) | ||||
MinRK
|
r3610 | self.log.info("engine::Engine Connected: %i"%eid) | ||
MinRK
|
r3599 | |||
def _purge_stalled_registration(self, heart): | ||||
if heart in self.incoming_registrations: | ||||
eid = self.incoming_registrations.pop(heart)[0] | ||||
MinRK
|
r3610 | self.log.info("registration::purging stalled registration: %i"%eid) | ||
MinRK
|
r3599 | else: | ||
pass | ||||
#------------------------------------------------------------------------- | ||||
# Client Requests | ||||
#------------------------------------------------------------------------- | ||||
def shutdown_request(self, client_id, msg): | ||||
"""handle shutdown request.""" | ||||
MinRK
|
r3657 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) | ||
MinRK
|
r3664 | # also notify other clients of shutdown | ||
self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) | ||||
MinRK
|
r3599 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) | ||
dc.start() | ||||
def _shutdown(self): | ||||
MinRK
|
r3610 | self.log.info("hub::hub shutting down.") | ||
MinRK
|
r3599 | time.sleep(0.1) | ||
sys.exit(0) | ||||
def check_load(self, client_id, msg): | ||||
content = msg['content'] | ||||
try: | ||||
targets = content['targets'] | ||||
targets = self._validate_targets(targets) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3657 | self.session.send(self.query, "hub_error", | ||
MinRK
|
r3599 | content=content, ident=client_id) | ||
return | ||||
content = dict(status='ok') | ||||
# loads = {} | ||||
for t in targets: | ||||
content[bytes(t)] = len(self.queues[t])+len(self.tasks[t]) | ||||
MinRK
|
r3657 | self.session.send(self.query, "load_reply", content=content, ident=client_id) | ||
MinRK
|
r3599 | |||
def queue_status(self, client_id, msg): | ||||
"""Return the Queue status of one or more targets. | ||||
if verbose: return the msg_ids | ||||
else: return len of each type. | ||||
keys: queue (pending MUX jobs) | ||||
tasks (pending Task jobs) | ||||
completed (finished jobs from both queues)""" | ||||
content = msg['content'] | ||||
targets = content['targets'] | ||||
try: | ||||
targets = self._validate_targets(targets) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3657 | self.session.send(self.query, "hub_error", | ||
MinRK
|
r3599 | content=content, ident=client_id) | ||
return | ||||
verbose = content.get('verbose', False) | ||||
content = dict(status='ok') | ||||
for t in targets: | ||||
queue = self.queues[t] | ||||
completed = self.completed[t] | ||||
tasks = self.tasks[t] | ||||
if not verbose: | ||||
queue = len(queue) | ||||
completed = len(completed) | ||||
tasks = len(tasks) | ||||
content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | ||||
MinRK
|
r3687 | content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) | ||
MinRK
|
r3657 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) | ||
MinRK
|
r3599 | |||
def purge_results(self, client_id, msg): | ||||
"""Purge results from memory. This method is more valuable before we move | ||||
to a DB based message storage mechanism.""" | ||||
content = msg['content'] | ||||
msg_ids = content.get('msg_ids', []) | ||||
reply = dict(status='ok') | ||||
if msg_ids == 'all': | ||||
MinRK
|
r3780 | try: | ||
self.db.drop_matching_records(dict(completed={'$ne':None})) | ||||
except Exception: | ||||
reply = error.wrap_exception() | ||||
MinRK
|
r3599 | else: | ||
MinRK
|
r3875 | pending = filter(lambda m: m in self.pending, msg_ids) | ||
if pending: | ||||
try: | ||||
raise IndexError("msg pending: %r"%pending[0]) | ||||
except: | ||||
reply = error.wrap_exception() | ||||
else: | ||||
try: | ||||
self.db.drop_matching_records(dict(msg_id={'$in':msg_ids})) | ||||
except Exception: | ||||
reply = error.wrap_exception() | ||||
if reply['status'] == 'ok': | ||||
eids = content.get('engine_ids', []) | ||||
for eid in eids: | ||||
if eid not in self.engines: | ||||
MinRK
|
r3599 | try: | ||
MinRK
|
r3875 | raise IndexError("No such engine: %i"%eid) | ||
MinRK
|
r3599 | except: | ||
MinRK
|
r3644 | reply = error.wrap_exception() | ||
MinRK
|
r3875 | break | ||
msg_ids = self.completed.pop(eid) | ||||
uid = self.engines[eid].queue | ||||
MinRK
|
r3599 | try: | ||
MinRK
|
r3875 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | ||
except Exception: | ||||
MinRK
|
r3644 | reply = error.wrap_exception() | ||
MinRK
|
r3875 | break | ||
MinRK
|
r3599 | |||
MinRK
|
r3657 | self.session.send(self.query, 'purge_reply', content=reply, ident=client_id) | ||
MinRK
|
r3599 | |||
MinRK
|
r3874 | def resubmit_task(self, client_id, msg): | ||
"""Resubmit one or more tasks.""" | ||||
def finish(reply): | ||||
self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id) | ||||
content = msg['content'] | ||||
msg_ids = content['msg_ids'] | ||||
reply = dict(status='ok') | ||||
try: | ||||
records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[ | ||||
'header', 'content', 'buffers']) | ||||
except Exception: | ||||
self.log.error('db::db error finding tasks to resubmit', exc_info=True) | ||||
return finish(error.wrap_exception()) | ||||
# validate msg_ids | ||||
found_ids = [ rec['msg_id'] for rec in records ] | ||||
invalid_ids = filter(lambda m: m in self.pending, found_ids) | ||||
if len(records) > len(msg_ids): | ||||
try: | ||||
raise RuntimeError("DB appears to be in an inconsistent state." | ||||
"More matching records were found than should exist") | ||||
except Exception: | ||||
return finish(error.wrap_exception()) | ||||
elif len(records) < len(msg_ids): | ||||
missing = [ m for m in msg_ids if m not in found_ids ] | ||||
try: | ||||
MinRK
|
r3996 | raise KeyError("No such msg(s): %r"%missing) | ||
MinRK
|
r3874 | except KeyError: | ||
return finish(error.wrap_exception()) | ||||
elif invalid_ids: | ||||
msg_id = invalid_ids[0] | ||||
try: | ||||
raise ValueError("Task %r appears to be inflight"%(msg_id)) | ||||
except Exception: | ||||
return finish(error.wrap_exception()) | ||||
# clear the existing records | ||||
rec = empty_record() | ||||
map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) | ||||
rec['resubmitted'] = datetime.now() | ||||
rec['queue'] = 'task' | ||||
rec['client_uuid'] = client_id[0] | ||||
try: | ||||
for msg_id in msg_ids: | ||||
self.all_completed.discard(msg_id) | ||||
self.db.update_record(msg_id, rec) | ||||
except Exception: | ||||
self.log.error('db::db error upating record', exc_info=True) | ||||
reply = error.wrap_exception() | ||||
else: | ||||
# send the messages | ||||
for rec in records: | ||||
header = rec['header'] | ||||
msg = self.session.msg(header['msg_type']) | ||||
msg['content'] = rec['content'] | ||||
msg['header'] = header | ||||
msg['msg_id'] = rec['msg_id'] | ||||
self.session.send(self.resubmit, msg, buffers=rec['buffers']) | ||||
finish(dict(status='ok')) | ||||
MinRK
|
r3599 | |||
MinRK
|
r3780 | def _extract_record(self, rec): | ||
"""decompose a TaskRecord dict into subsection of reply for get_result""" | ||||
io_dict = {} | ||||
for key in 'pyin pyout pyerr stdout stderr'.split(): | ||||
io_dict[key] = rec[key] | ||||
content = { 'result_content': rec['result_content'], | ||||
'header': rec['header'], | ||||
'result_header' : rec['result_header'], | ||||
'io' : io_dict, | ||||
} | ||||
if rec['result_buffers']: | ||||
buffers = map(str, rec['result_buffers']) | ||||
else: | ||||
buffers = [] | ||||
return content, buffers | ||||
MinRK
|
r3599 | def get_results(self, client_id, msg): | ||
"""Get the result of 1 or more messages.""" | ||||
content = msg['content'] | ||||
msg_ids = sorted(set(content['msg_ids'])) | ||||
statusonly = content.get('status_only', False) | ||||
pending = [] | ||||
completed = [] | ||||
content = dict(status='ok') | ||||
content['pending'] = pending | ||||
content['completed'] = completed | ||||
buffers = [] | ||||
if not statusonly: | ||||
MinRK
|
r3780 | try: | ||
matches = self.db.find_records(dict(msg_id={'$in':msg_ids})) | ||||
# turn match list into dict, for faster lookup | ||||
records = {} | ||||
for rec in matches: | ||||
records[rec['msg_id']] = rec | ||||
except Exception: | ||||
content = error.wrap_exception() | ||||
self.session.send(self.query, "result_reply", content=content, | ||||
parent=msg, ident=client_id) | ||||
return | ||||
else: | ||||
records = {} | ||||
MinRK
|
r3599 | for msg_id in msg_ids: | ||
if msg_id in self.pending: | ||||
pending.append(msg_id) | ||||
MinRK
|
r3874 | elif msg_id in self.all_completed: | ||
MinRK
|
r3599 | completed.append(msg_id) | ||
if not statusonly: | ||||
MinRK
|
r3780 | c,bufs = self._extract_record(records[msg_id]) | ||
content[msg_id] = c | ||||
buffers.extend(bufs) | ||||
MinRK
|
r3874 | elif msg_id in records: | ||
if rec['completed']: | ||||
completed.append(msg_id) | ||||
c,bufs = self._extract_record(records[msg_id]) | ||||
content[msg_id] = c | ||||
buffers.extend(bufs) | ||||
else: | ||||
pending.append(msg_id) | ||||
MinRK
|
r3599 | else: | ||
try: | ||||
raise KeyError('No such message: '+msg_id) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3599 | break | ||
MinRK
|
r3657 | self.session.send(self.query, "result_reply", content=content, | ||
MinRK
|
r3599 | parent=msg, ident=client_id, | ||
buffers=buffers) | ||||
MinRK
|
r3780 | def get_history(self, client_id, msg): | ||
"""Get a list of all msg_ids in our DB records""" | ||||
try: | ||||
msg_ids = self.db.get_history() | ||||
except Exception as e: | ||||
content = error.wrap_exception() | ||||
else: | ||||
content = dict(status='ok', history=msg_ids) | ||||
self.session.send(self.query, "history_reply", content=content, | ||||
parent=msg, ident=client_id) | ||||
def db_query(self, client_id, msg): | ||||
"""Perform a raw query on the task record database.""" | ||||
content = msg['content'] | ||||
query = content.get('query', {}) | ||||
keys = content.get('keys', None) | ||||
query = util.extract_dates(query) | ||||
buffers = [] | ||||
empty = list() | ||||
try: | ||||
records = self.db.find_records(query, keys) | ||||
except Exception as e: | ||||
content = error.wrap_exception() | ||||
else: | ||||
# extract buffers from reply content: | ||||
if keys is not None: | ||||
buffer_lens = [] if 'buffers' in keys else None | ||||
result_buffer_lens = [] if 'result_buffers' in keys else None | ||||
else: | ||||
buffer_lens = [] | ||||
result_buffer_lens = [] | ||||
for rec in records: | ||||
# buffers may be None, so double check | ||||
if buffer_lens is not None: | ||||
b = rec.pop('buffers', empty) or empty | ||||
buffer_lens.append(len(b)) | ||||
buffers.extend(b) | ||||
if result_buffer_lens is not None: | ||||
rb = rec.pop('result_buffers', empty) or empty | ||||
result_buffer_lens.append(len(rb)) | ||||
buffers.extend(rb) | ||||
content = dict(status='ok', records=records, buffer_lens=buffer_lens, | ||||
result_buffer_lens=result_buffer_lens) | ||||
self.session.send(self.query, "db_reply", content=content, | ||||
parent=msg, ident=client_id, | ||||
buffers=buffers) | ||||