##// END OF EJS Templates
Replace links to launchpad bugs in comments/docstrings with equivalent github links.
Replace links to launchpad bugs in comments/docstrings with equivalent github links.

File last commit:

r3875:ffe043d4
r3917:03c097c7
Show More
hub.py
1282 lines | 48.5 KiB | text/x-python | PythonLexer
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #!/usr/bin/env python
"""The IPython Controller Hub with 0MQ
This is the master object that handles connections from engines and clients,
and monitors traffic through the various queues.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function
import sys
import time
MinRK
resort imports in a cleaner order
r3631 from datetime import datetime
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
import zmq
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
# internal:
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
MinRK
resort imports in a cleaner order
r3631 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
General improvements to database backend...
r3780 from IPython.parallel import error, util
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.factory import RegistrationFactory, LoggingFactory
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
eliminate relative imports
r3642 from .heartmonitor import HeartMonitor
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
def _passer(*args, **kwargs):
return
MinRK
propagate iopub to clients
r3602 def _printer(*args, **kwargs):
print (args)
print (kwargs)
MinRK
SGE test related fixes...
r3668 def empty_record():
"""Return an empty dict with all record keys."""
return {
'msg_id' : None,
'header' : None,
'content': None,
'buffers': None,
'submitted': None,
'client_uuid' : None,
'engine_uuid' : None,
'started': None,
'completed': None,
'resubmitted': None,
'result_header' : None,
'result_content' : None,
'result_buffers' : None,
'queue' : None,
'pyin' : None,
'pyout': None,
'pyerr': None,
'stdout': '',
'stderr': '',
}
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def init_record(msg):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 """Initialize a TaskRecord based on a request."""
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 header = msg['header']
return {
'msg_id' : header['msg_id'],
'header' : header,
'content': msg['content'],
'buffers': msg['buffers'],
MinRK
General improvements to database backend...
r3780 'submitted': datetime.strptime(header['date'], util.ISO8601),
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'client_uuid' : None,
'engine_uuid' : None,
'started': None,
'completed': None,
'resubmitted': None,
'result_header' : None,
'result_content' : None,
'result_buffers' : None,
MinRK
propagate iopub to clients
r3602 'queue' : None,
'pyin' : None,
'pyout': None,
'pyerr': None,
'stdout': '',
'stderr': '',
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 }
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 class EngineConnector(HasTraits):
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """A simple object for accessing the various zmq connections of an object.
Attributes are:
id (int): engine ID
uuid (str): uuid (unused?)
queue (str): identity of queue's XREQ socket
registration (str): identity of registration XREQ socket
heartbeat (str): identity of heartbeat XREQ socket
"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 id=Int(0)
queue=Str()
control=Str()
registration=Str()
heartbeat=Str()
MinRK
Refactor newparallel to use Config system...
r3604 pending=Set()
class HubFactory(RegistrationFactory):
"""The Configurable for setting up a Hub."""
MinRK
newparallel tweaks, fixes...
r3622 # name of a scheduler scheme
MinRK
dependency tweaks + dependency/scheduler docs
r3624 scheme = Str('leastload', config=True)
MinRK
newparallel tweaks, fixes...
r3622
MinRK
Refactor newparallel to use Config system...
r3604 # port-pairs for monitoredqueues:
hb = Instance(list, config=True)
def _hb_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(2)
MinRK
Refactor newparallel to use Config system...
r3604
mux = Instance(list, config=True)
def _mux_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(2)
MinRK
Refactor newparallel to use Config system...
r3604
task = Instance(list, config=True)
def _task_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(2)
MinRK
Refactor newparallel to use Config system...
r3604
control = Instance(list, config=True)
def _control_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(2)
MinRK
Refactor newparallel to use Config system...
r3604
iopub = Instance(list, config=True)
def _iopub_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(2)
MinRK
Refactor newparallel to use Config system...
r3604
# single ports:
mon_port = Instance(int, config=True)
def _mon_port_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(1)[0]
MinRK
Refactor newparallel to use Config system...
r3604
notifier_port = Instance(int, config=True)
def _notifier_port_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(1)[0]
MinRK
Refactor newparallel to use Config system...
r3604
ping = Int(1000, config=True) # ping frequency
MinRK
add default ip<x>z_config files
r3630 engine_ip = CStr('127.0.0.1', config=True)
engine_transport = CStr('tcp', config=True)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add default ip<x>z_config files
r3630 client_ip = CStr('127.0.0.1', config=True)
client_transport = CStr('tcp', config=True)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add default ip<x>z_config files
r3630 monitor_ip = CStr('127.0.0.1', config=True)
monitor_transport = CStr('tcp', config=True)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
add default ip<x>z_config files
r3630 monitor_url = CStr('')
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
organize IPython.parallel into subpackages
r3673 db_class = CStr('IPython.parallel.controller.dictdb.DictDB', config=True)
MinRK
Refactor newparallel to use Config system...
r3604
# not configurable
MinRK
organize IPython.parallel into subpackages
r3673 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
MinRK
Refactor newparallel to use Config system...
r3604 subconstructors = List()
_constructed = Bool(False)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _ip_changed(self, name, old, new):
self.engine_ip = new
self.client_ip = new
self.monitor_ip = new
self._update_monitor_url()
MinRK
Refactor newparallel to use Config system...
r3604 def _update_monitor_url(self):
self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _transport_changed(self, name, old, new):
self.engine_transport = new
self.client_transport = new
self.monitor_transport = new
MinRK
Refactor newparallel to use Config system...
r3604 self._update_monitor_url()
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
MinRK
Refactor newparallel to use Config system...
r3604 def __init__(self, **kwargs):
super(HubFactory, self).__init__(**kwargs)
self._update_monitor_url()
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # self.on_trait_change(self._sync_ips, 'ip')
# self.on_trait_change(self._sync_transports, 'transport')
MinRK
Refactor newparallel to use Config system...
r3604 self.subconstructors.append(self.construct_hub)
def construct(self):
assert not self._constructed, "already constructed!"
for subc in self.subconstructors:
subc()
self._constructed = True
def start(self):
assert self._constructed, "must be constructed by self.construct() first!"
self.heartmonitor.start()
MinRK
rework logging connections
r3610 self.log.info("Heartmonitor started")
MinRK
Refactor newparallel to use Config system...
r3604
def construct_hub(self):
"""construct"""
client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
ctx = self.context
loop = self.loop
# Registrar socket
MinRK
remove all PAIR sockets, Merge registration+query
r3657 q = ZMQStream(ctx.socket(zmq.XREP), loop)
q.bind(client_iface % self.regport)
MinRK
rework logging connections
r3610 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
MinRK
Refactor newparallel to use Config system...
r3604 if self.client_ip != self.engine_ip:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 q.bind(engine_iface % self.regport)
MinRK
rework logging connections
r3610 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
MinRK
Refactor newparallel to use Config system...
r3604
### Engine connections ###
# heartbeat
hpub = ctx.socket(zmq.PUB)
hpub.bind(engine_iface % self.hb[0])
hrep = ctx.socket(zmq.XREP)
hrep.bind(engine_iface % self.hb[1])
MinRK
rework logging connections
r3610 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
period=self.ping, logname=self.log.name)
MinRK
Refactor newparallel to use Config system...
r3604
### Client connections ###
# Notifier socket
n = ZMQStream(ctx.socket(zmq.PUB), loop)
n.bind(client_iface%self.notifier_port)
### build and launch the queues ###
# monitor socket
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
sub.bind(self.monitor_url)
MinRK
add default ip<x>z_config files
r3630 sub.bind('inproc://monitor')
MinRK
Refactor newparallel to use Config system...
r3604 sub = ZMQStream(sub, loop)
# connect the db
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # cdir = self.config.Global.cluster_dir
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
MinRK
Refactor newparallel to use Config system...
r3604 time.sleep(.25)
# build connection dicts
MinRK
newparallel tweaks, fixes...
r3622 self.engine_info = {
MinRK
Refactor newparallel to use Config system...
r3604 'control' : engine_iface%self.control[1],
'mux': engine_iface%self.mux[1],
'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
'task' : engine_iface%self.task[1],
'iopub' : engine_iface%self.iopub[1],
# 'monitor' : engine_iface%self.mon_port,
}
MinRK
newparallel tweaks, fixes...
r3622 self.client_info = {
MinRK
Refactor newparallel to use Config system...
r3604 'control' : client_iface%self.control[0],
'mux': client_iface%self.mux[0],
MinRK
newparallel tweaks, fixes...
r3622 'task' : (self.scheme, client_iface%self.task[0]),
MinRK
Refactor newparallel to use Config system...
r3604 'iopub' : client_iface%self.iopub[0],
'notification': client_iface%self.notifier_port
}
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 self.log.debug("Hub engine addrs: %s"%self.engine_info)
self.log.debug("Hub client addrs: %s"%self.client_info)
MinRK
add Client.resubmit for re-running tasks...
r3874
# resubmit stream
r = ZMQStream(ctx.socket(zmq.XREQ), loop)
url = util.disambiguate_url(self.client_info['task'][-1])
r.setsockopt(zmq.IDENTITY, self.session.session)
r.connect(url)
MinRK
Refactor newparallel to use Config system...
r3604 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
MinRK
add Client.resubmit for re-running tasks...
r3874 query=q, notifier=n, resubmit=r, db=self.db,
MinRK
newparallel tweaks, fixes...
r3622 engine_info=self.engine_info, client_info=self.client_info,
MinRK
rework logging connections
r3610 logname=self.log.name)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 class Hub(LoggingFactory):
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """The IPython Controller Hub with 0MQ connections
Parameters
==========
loop: zmq IOLoop instance
session: StreamSession object
<removed> context: zmq context for creating new connections (?)
queue: ZMQStream for monitoring the command queue (SUB)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 query: ZMQStream for engine registration and client queries requests (XREP)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 heartbeat: HeartMonitor object checking the pulse of the engines
notifier: ZMQStream for broadcasting engine registration changes (PUB)
db: connection to db for out of memory logging of commands
NotImplemented
MinRK
newparallel tweaks, fixes...
r3622 engine_info: dict of zmq connection information for engines to connect
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 to the queues.
MinRK
newparallel tweaks, fixes...
r3622 client_info: dict of zmq connection information for engines to connect
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 to the queues.
"""
# internal data structures:
MinRK
Refactor newparallel to use Config system...
r3604 ids=Set() # engine IDs
keytable=Dict()
by_ident=Dict()
engines=Dict()
clients=Dict()
hearts=Dict()
pending=Set()
queues=Dict() # pending msg_ids keyed by engine_id
tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
completed=Dict() # completed msg_ids keyed by engine_id
all_completed=Set() # completed msg_ids keyed by engine_id
MinRK
SGE test related fixes...
r3668 dead_engines=Set() # completed msg_ids keyed by engine_id
MinRK
better handle aborted/unschedulers tasks
r3687 unassigned=Set() # set of task msg_ds not yet assigned a destination
MinRK
Refactor newparallel to use Config system...
r3604 incoming_registrations=Dict()
registration_timeout=Int()
_idcounter=Int(0)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # objects from constructor:
loop=Instance(ioloop.IOLoop)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 query=Instance(ZMQStream)
MinRK
Refactor newparallel to use Config system...
r3604 monitor=Instance(ZMQStream)
notifier=Instance(ZMQStream)
MinRK
add Client.resubmit for re-running tasks...
r3874 resubmit=Instance(ZMQStream)
heartmonitor=Instance(HeartMonitor)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 db=Instance(object)
MinRK
newparallel tweaks, fixes...
r3622 client_info=Dict()
engine_info=Dict()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def __init__(self, **kwargs):
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """
# universal:
loop: IOLoop for creating future connections
session: streamsession for sending serialized data
# engine:
queue: ZMQStream for monitoring queue messages
MinRK
remove all PAIR sockets, Merge registration+query
r3657 query: ZMQStream for engine+client registration and client requests
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 heartbeat: HeartMonitor object for tracking engines
# extra:
db: ZMQStream for db connection (NotImplemented)
MinRK
newparallel tweaks, fixes...
r3622 engine_info: zmq address/protocol dict for engine connections
client_info: zmq address/protocol dict for client connections
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
super(Hub, self).__init__(**kwargs)
self.registration_timeout = max(5000, 2*self.heartmonitor.period)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
# validate connection dicts:
MinRK
newparallel tweaks, fixes...
r3622 for k,v in self.client_info.iteritems():
if k == 'task':
MinRK
General improvements to database backend...
r3780 util.validate_url_container(v[1])
MinRK
newparallel tweaks, fixes...
r3622 else:
MinRK
General improvements to database backend...
r3780 util.validate_url_container(v)
# util.validate_url_container(self.client_info)
util.validate_url_container(self.engine_info)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
# register our callbacks
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.query.on_recv(self.dispatch_query)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.monitor.on_recv(self.dispatch_monitor_traffic)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
propagate iopub to clients
r3602 self.monitor_handlers = { 'in' : self.save_queue_request,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'out': self.save_queue_result,
'intask': self.save_task_request,
'outtask': self.save_task_result,
'tracktask': self.save_task_destination,
'incontrol': _passer,
'outcontrol': _passer,
MinRK
propagate iopub to clients
r3602 'iopub': self.save_iopub_message,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 }
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.query_handlers = {'queue_request': self.queue_status,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'result_request': self.get_results,
MinRK
General improvements to database backend...
r3780 'history_request': self.get_history,
'db_request': self.db_query,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'purge_request': self.purge_results,
'load_request': self.check_load,
'resubmit_request': self.resubmit_task,
'shutdown_request': self.shutdown_request,
MinRK
remove all PAIR sockets, Merge registration+query
r3657 'registration_request' : self.register_engine,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'unregistration_request' : self.unregister_engine,
'connection_request': self.connection_request,
}
MinRK
add Client.resubmit for re-running tasks...
r3874 # ignore resubmit replies
self.resubmit.on_recv(lambda msg: None, copy=False)
MinRK
rework logging connections
r3610 self.log.info("hub::created hub")
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 @property
def _next_id(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """gemerate a new ID.
No longer reuse old ids, just count from 0."""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 newid = self._idcounter
self._idcounter += 1
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return newid
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # newid = 0
# incoming = [id[0] for id in self.incoming_registrations.itervalues()]
# # print newid, self.ids, self.incoming_registrations
# while newid in self.ids or newid in incoming:
# newid += 1
# return newid
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
#-----------------------------------------------------------------------------
# message validation
#-----------------------------------------------------------------------------
def _validate_targets(self, targets):
"""turn any valid targets argument into a list of integer ids"""
if targets is None:
# default to all
targets = self.ids
if isinstance(targets, (int,str,unicode)):
# only one target specified
targets = [targets]
_targets = []
for t in targets:
# map raw identities to ids
if isinstance(t, (str,unicode)):
t = self.by_ident.get(t, t)
_targets.append(t)
targets = _targets
bad_targets = [ t for t in targets if t not in self.ids ]
if bad_targets:
raise IndexError("No Such Engine: %r"%bad_targets)
if not targets:
raise IndexError("No Engines Registered")
return targets
#-----------------------------------------------------------------------------
# dispatch methods (1 per stream)
#-----------------------------------------------------------------------------
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # def dispatch_registration_request(self, msg):
# """"""
# self.log.debug("registration::dispatch_register_request(%s)"%msg)
# idents,msg = self.session.feed_identities(msg)
# if not idents:
# self.log.error("Bad Query Message: %s"%msg, exc_info=True)
# return
# try:
# msg = self.session.unpack_message(msg,content=True)
# except:
# self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
# return
#
# msg_type = msg['msg_type']
# content = msg['content']
#
# handler = self.query_handlers.get(msg_type, None)
# if handler is None:
# self.log.error("registration::got bad registration message: %s"%msg)
# else:
# handler(idents, msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
propagate iopub to clients
r3602 def dispatch_monitor_traffic(self, msg):
"""all ME and Task queue messages come through here, as well as
IOPub traffic."""
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.debug("monitor traffic: %r"%msg[:2])
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 switch = msg[0]
idents, msg = self.session.feed_identities(msg[1:])
if not idents:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Monitor Message: %r"%msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
MinRK
propagate iopub to clients
r3602 handler = self.monitor_handlers.get(switch, None)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if handler is not None:
handler(idents, msg)
else:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Invalid monitor topic: %r"%switch)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
remove all PAIR sockets, Merge registration+query
r3657 def dispatch_query(self, msg):
"""Route registration requests and queries from clients."""
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 idents, msg = self.session.feed_identities(msg)
if not idents:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Query Message: %r"%msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=True)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "hub_error", ident=client_id,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content)
return
# print client_id, header, parent, content
#switch on message type:
msg_type = msg['msg_type']
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.info("client::client %r requested %r"%(client_id, msg_type))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 handler = self.query_handlers.get(msg_type, None)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
MinRK
add Client.resubmit for re-running tasks...
r3874 assert handler is not None, "Bad Message Type: %r"%msg_type
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "hub_error", ident=client_id,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content)
return
MinRK
add Client.resubmit for re-running tasks...
r3874
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 handler(idents, msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def dispatch_db(self, msg):
""""""
raise NotImplementedError
#---------------------------------------------------------------------------
# handler methods (1 per event)
#---------------------------------------------------------------------------
#----------------------- Heartbeat --------------------------------------
def handle_new_heart(self, heart):
"""handler to attach to heartbeater.
Called when a new heart starts to beat.
Triggers completion of registration."""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if heart not in self.incoming_registrations:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::ignoring new heart: %r"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
self.finish_registration(heart)
def handle_heart_failure(self, heart):
"""handler to attach to heartbeater.
called when a previously registered heart fails to respond to beat request.
triggers unregistration"""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 eid = self.hearts.get(heart, None)
queue = self.engines[eid].queue
if eid is None:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::ignoring heart failure %r"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
#----------------------- MUX Queue Traffic ------------------------------
def save_queue_request(self, idents, msg):
if len(idents) < 2:
MinRK
rework logging connections
r3610 self.log.error("invalid identity prefix: %s"%idents)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
queue_id, client_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
except:
MinRK
rework logging connections
r3610 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
eid = self.by_ident.get(queue_id, None)
if eid is None:
MinRK
rework logging connections
r3610 self.log.error("queue::target %r not registered"%queue_id)
self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
header = msg['header']
msg_id = header['msg_id']
record = init_record(msg)
record['engine_uuid'] = queue_id
record['client_uuid'] = client_id
record['queue'] = 'mux'
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
SGE test related fixes...
r3668 try:
# it's posible iopub arrived first:
existing = self.db.get_record(msg_id)
for key,evalue in existing.iteritems():
MinRK
add Client.resubmit for re-running tasks...
r3874 rvalue = record.get(key, None)
MinRK
SGE test related fixes...
r3668 if evalue and rvalue and evalue != rvalue:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
MinRK
SGE test related fixes...
r3668 elif evalue and not rvalue:
record[key] = evalue
self.db.update_record(msg_id, record)
except KeyError:
self.db.add_record(msg_id, record)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.pending.add(msg_id)
self.queues[eid].append(msg_id)
def save_queue_result(self, idents, msg):
if len(idents) < 2:
MinRK
rework logging connections
r3610 self.log.error("invalid identity prefix: %s"%idents)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
client_id, queue_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
except:
MinRK
rework logging connections
r3610 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 queue_id,client_id, msg), exc_info=True)
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
MinRK
rework logging connections
r3610 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
MinRK
SGE test related fixes...
r3668 # self.log.debug("queue:: %s"%msg[2:])
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
parent = msg['parent_header']
if not parent:
return
msg_id = parent['msg_id']
if msg_id in self.pending:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
self.queues[eid].remove(msg_id)
self.completed[eid].append(msg_id)
MinRK
more graceful handling of dying engines
r3651 elif msg_id not in self.all_completed:
# it could be a result from a dead engine that died before delivering the
# result
self.log.warn("queue:: unknown msg finished %s"%msg_id)
return
# update record anyway, because the unregistration could have been premature
rheader = msg['header']
MinRK
General improvements to database backend...
r3780 completed = datetime.strptime(rheader['date'], util.ISO8601)
MinRK
more graceful handling of dying engines
r3651 started = rheader.get('started', None)
if started is not None:
MinRK
General improvements to database backend...
r3780 started = datetime.strptime(started, util.ISO8601)
MinRK
more graceful handling of dying engines
r3651 result = {
'result_header' : rheader,
'result_content': msg['content'],
'started' : started,
'completed' : completed
}
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
more graceful handling of dying engines
r3651 result['result_buffers'] = msg['buffers']
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, result)
except Exception:
self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
MinRK
more graceful handling of dying engines
r3651
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
#--------------------- Task Queue Traffic ------------------------------
def save_task_request(self, idents, msg):
"""Save the submission of a task."""
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
except:
MinRK
rework logging connections
r3610 self.log.error("task::client %r sent invalid task message: %s"%(
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 client_id, msg), exc_info=True)
return
record = init_record(msg)
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 record['client_uuid'] = client_id
record['queue'] = 'task'
header = msg['header']
msg_id = header['msg_id']
self.pending.add(msg_id)
MinRK
better handle aborted/unschedulers tasks
r3687 self.unassigned.add(msg_id)
MinRK
SGE test related fixes...
r3668 try:
# it's posible iopub arrived first:
existing = self.db.get_record(msg_id)
MinRK
add Client.resubmit for re-running tasks...
r3874 if existing['resubmitted']:
for key in ('submitted', 'client_uuid', 'buffers'):
# don't clobber these keys on resubmit
# submitted and client_uuid should be different
# and buffers might be big, and shouldn't have changed
record.pop(key)
# still check content,header which should not change
# but are not expensive to compare as buffers
MinRK
SGE test related fixes...
r3668 for key,evalue in existing.iteritems():
MinRK
add Client.resubmit for re-running tasks...
r3874 if key.endswith('buffers'):
# don't compare buffers
continue
rvalue = record.get(key, None)
MinRK
SGE test related fixes...
r3668 if evalue and rvalue and evalue != rvalue:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
MinRK
SGE test related fixes...
r3668 elif evalue and not rvalue:
record[key] = evalue
self.db.update_record(msg_id, record)
except KeyError:
self.db.add_record(msg_id, record)
MinRK
General improvements to database backend...
r3780 except Exception:
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def save_task_result(self, idents, msg):
"""save the result of a completed task."""
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
except:
MinRK
rework logging connections
r3610 self.log.error("task::invalid task result message send to %r: %s"%(
MinRK
propagate iopub to clients
r3602 client_id, msg), exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 raise
return
parent = msg['parent_header']
if not parent:
# print msg
MinRK
rework logging connections
r3610 self.log.warn("Task %r had no parent!"%msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
msg_id = parent['msg_id']
MinRK
better handle aborted/unschedulers tasks
r3687 if msg_id in self.unassigned:
self.unassigned.remove(msg_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
header = msg['header']
engine_uuid = header.get('engine', None)
eid = self.by_ident.get(engine_uuid, None)
if msg_id in self.pending:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
if eid is not None:
self.completed[eid].append(msg_id)
if msg_id in self.tasks[eid]:
self.tasks[eid].remove(msg_id)
MinRK
General improvements to database backend...
r3780 completed = datetime.strptime(header['date'], util.ISO8601)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 started = header.get('started', None)
if started is not None:
MinRK
General improvements to database backend...
r3780 started = datetime.strptime(started, util.ISO8601)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 result = {
'result_header' : header,
'result_content': msg['content'],
'started' : started,
'completed' : completed,
'engine_uuid': engine_uuid
}
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
result['result_buffers'] = msg['buffers']
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, result)
except Exception:
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
else:
MinRK
rework logging connections
r3610 self.log.debug("task::unknown task %s finished"%msg_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def save_task_destination(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=True)
except:
MinRK
rework logging connections
r3610 self.log.error("task::invalid task tracking message", exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
content = msg['content']
MinRK
testing fixes
r3641 # print (content)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 msg_id = content['msg_id']
engine_uuid = content['engine_id']
eid = self.by_ident[engine_uuid]
MinRK
rework logging connections
r3610 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
MinRK
better handle aborted/unschedulers tasks
r3687 if msg_id in self.unassigned:
self.unassigned.remove(msg_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # else:
MinRK
rework logging connections
r3610 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
self.tasks[eid].append(msg_id)
# self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
except Exception:
self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def mia_task_request(self, idents, msg):
raise NotImplementedError
client_id = idents[0]
# content = dict(mia=self.mia,status='ok')
# self.session.send('mia_reply', content=content, idents=client_id)
MinRK
propagate iopub to clients
r3602 #--------------------- IOPub Traffic ------------------------------
def save_iopub_message(self, topics, msg):
"""save an iopub message into the db"""
MinRK
testing fixes
r3641 # print (topics)
MinRK
propagate iopub to clients
r3602 try:
msg = self.session.unpack_message(msg, content=True)
except:
MinRK
rework logging connections
r3610 self.log.error("iopub::invalid IOPub message", exc_info=True)
MinRK
propagate iopub to clients
r3602 return
parent = msg['parent_header']
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 if not parent:
MinRK
rework logging connections
r3610 self.log.error("iopub::invalid IOPub message: %s"%msg)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 return
MinRK
propagate iopub to clients
r3602 msg_id = parent['msg_id']
msg_type = msg['msg_type']
content = msg['content']
# ensure msg_id is in db
try:
rec = self.db.get_record(msg_id)
MinRK
SGE test related fixes...
r3668 except KeyError:
rec = empty_record()
rec['msg_id'] = msg_id
self.db.add_record(msg_id, rec)
MinRK
propagate iopub to clients
r3602 # stream
d = {}
if msg_type == 'stream':
name = content['name']
s = rec[name] or ''
d[name] = s + content['data']
elif msg_type == 'pyerr':
d['pyerr'] = content
MinRK
add '-s' for startup script in ipengine...
r3684 elif msg_type == 'pyin':
d['pyin'] = content['code']
MinRK
propagate iopub to clients
r3602 else:
MinRK
add '-s' for startup script in ipengine...
r3684 d[msg_type] = content.get('data', '')
MinRK
propagate iopub to clients
r3602
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, d)
except Exception:
self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
MinRK
propagate iopub to clients
r3602
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
#-------------------------------------------------------------------------
# Registration requests
#-------------------------------------------------------------------------
def connection_request(self, client_id, msg):
"""Reply with connection addresses for clients."""
MinRK
rework logging connections
r3610 self.log.info("client::client %s connected"%client_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content = dict(status='ok')
MinRK
newparallel tweaks, fixes...
r3622 content.update(self.client_info)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 jsonable = {}
for k,v in self.keytable.iteritems():
MinRK
SGE test related fixes...
r3668 if v not in self.dead_engines:
jsonable[str(k)] = v
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content['engines'] = jsonable
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def register_engine(self, reg, msg):
"""Register a new engine."""
content = msg['content']
try:
queue = content['queue']
except KeyError:
MinRK
rework logging connections
r3610 self.log.error("registration::queue not specified", exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
heart = content.get('heartbeat', None)
"""register a new engine, and create the socket(s) necessary"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 eid = self._next_id
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # print (eid, queue, reg, heart)
MinRK
rework logging connections
r3610 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
content = dict(id=eid,status='ok')
MinRK
newparallel tweaks, fixes...
r3622 content.update(self.engine_info)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # check if requesting available IDs:
if queue in self.by_ident:
try:
raise KeyError("queue_id %r in use"%queue)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
rework logging connections
r3610 self.log.error("queue_id %r in use"%queue, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 elif heart in self.hearts: # need to check unique hearts?
try:
raise KeyError("heart_id %r in use"%heart)
except:
MinRK
rework logging connections
r3610 self.log.error("heart_id %r in use"%heart, exc_info=True)
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
for h, pack in self.incoming_registrations.iteritems():
if heart == h:
try:
raise KeyError("heart_id %r in use"%heart)
except:
MinRK
rework logging connections
r3610 self.log.error("heart_id %r in use"%heart, exc_info=True)
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 break
elif queue == pack[1]:
try:
raise KeyError("queue_id %r in use"%queue)
except:
MinRK
rework logging connections
r3610 self.log.error("queue_id %r in use"%queue, exc_info=True)
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 break
MinRK
remove all PAIR sockets, Merge registration+query
r3657 msg = self.session.send(self.query, "registration_reply",
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content,
ident=reg)
if content['status'] == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 if heart in self.heartmonitor.hearts:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # already beating
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.finish_registration(heart)
else:
purge = lambda : self._purge_stalled_registration(heart)
dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
dc.start()
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
rework logging connections
r3610 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return eid
def unregister_engine(self, ident, msg):
"""Unregister an engine that explicitly requested to leave."""
try:
eid = msg['content']['id']
except:
MinRK
rework logging connections
r3610 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
MinRK
rework logging connections
r3610 self.log.info("registration::unregister_engine(%s)"%eid)
MinRK
more graceful handling of dying engines
r3651 # print (eid)
MinRK
SGE test related fixes...
r3668 uuid = self.keytable[eid]
content=dict(id=eid, queue=uuid)
self.dead_engines.add(uuid)
# self.ids.remove(eid)
# uuid = self.keytable.pop(eid)
#
# ec = self.engines.pop(eid)
# self.hearts.pop(ec.heartbeat)
# self.by_ident.pop(ec.queue)
# self.completed.pop(eid)
handleit = lambda : self._handle_stranded_msgs(eid, uuid)
dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
dc.start()
############## TODO: HANDLE IT ################
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
if self.notifier:
self.session.send(self.notifier, "unregistration_notification", content=content)
MinRK
more graceful handling of dying engines
r3651 def _handle_stranded_msgs(self, eid, uuid):
"""Handle messages known to be on an engine when the engine unregisters.
It is possible that this will fire prematurely - that is, an engine will
go down after completing a result, and the client will be notified
that the result failed and later receive the actual result.
"""
MinRK
SGE test related fixes...
r3668 outstanding = self.queues[eid]
MinRK
more graceful handling of dying engines
r3651
for msg_id in outstanding:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
try:
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
except:
content = error.wrap_exception()
# build a fake header:
header = {}
header['engine'] = uuid
MinRK
General improvements to database backend...
r3780 header['date'] = datetime.now()
MinRK
more graceful handling of dying engines
r3651 rec = dict(result_content=content, result_header=header, result_buffers=[])
rec['completed'] = header['date']
rec['engine_uuid'] = uuid
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, rec)
except Exception:
self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
MinRK
more graceful handling of dying engines
r3651
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def finish_registration(self, heart):
"""Second half of engine registration, called after our HeartMonitor
has received a beat from the Engine's Heart."""
try:
(eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
except KeyError:
MinRK
rework logging connections
r3610 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
MinRK
rework logging connections
r3610 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if purge is not None:
purge.stop()
control = queue
self.ids.add(eid)
self.keytable[eid] = queue
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
control=control, heartbeat=heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.by_ident[queue] = eid
self.queues[eid] = list()
self.tasks[eid] = list()
self.completed[eid] = list()
self.hearts[heart] = eid
content = dict(id=eid, queue=self.engines[eid].queue)
if self.notifier:
self.session.send(self.notifier, "registration_notification", content=content)
MinRK
rework logging connections
r3610 self.log.info("engine::Engine Connected: %i"%eid)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def _purge_stalled_registration(self, heart):
if heart in self.incoming_registrations:
eid = self.incoming_registrations.pop(heart)[0]
MinRK
rework logging connections
r3610 self.log.info("registration::purging stalled registration: %i"%eid)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
pass
#-------------------------------------------------------------------------
# Client Requests
#-------------------------------------------------------------------------
def shutdown_request(self, client_id, msg):
"""handle shutdown request."""
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
MinRK
update API after sagedays29...
r3664 # also notify other clients of shutdown
self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
dc.start()
def _shutdown(self):
MinRK
rework logging connections
r3610 self.log.info("hub::hub shutting down.")
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 time.sleep(0.1)
sys.exit(0)
def check_load(self, client_id, msg):
content = msg['content']
try:
targets = content['targets']
targets = self._validate_targets(targets)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "hub_error",
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content, ident=client_id)
return
content = dict(status='ok')
# loads = {}
for t in targets:
content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "load_reply", content=content, ident=client_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def queue_status(self, client_id, msg):
"""Return the Queue status of one or more targets.
if verbose: return the msg_ids
else: return len of each type.
keys: queue (pending MUX jobs)
tasks (pending Task jobs)
completed (finished jobs from both queues)"""
content = msg['content']
targets = content['targets']
try:
targets = self._validate_targets(targets)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "hub_error",
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content, ident=client_id)
return
verbose = content.get('verbose', False)
content = dict(status='ok')
for t in targets:
queue = self.queues[t]
completed = self.completed[t]
tasks = self.tasks[t]
if not verbose:
queue = len(queue)
completed = len(completed)
tasks = len(tasks)
content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
MinRK
better handle aborted/unschedulers tasks
r3687 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
def purge_results(self, client_id, msg):
"""Purge results from memory. This method is more valuable before we move
to a DB based message storage mechanism."""
content = msg['content']
msg_ids = content.get('msg_ids', [])
reply = dict(status='ok')
if msg_ids == 'all':
MinRK
General improvements to database backend...
r3780 try:
self.db.drop_matching_records(dict(completed={'$ne':None}))
except Exception:
reply = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
various db backend fixes...
r3875 pending = filter(lambda m: m in self.pending, msg_ids)
if pending:
try:
raise IndexError("msg pending: %r"%pending[0])
except:
reply = error.wrap_exception()
else:
try:
self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
except Exception:
reply = error.wrap_exception()
if reply['status'] == 'ok':
eids = content.get('engine_ids', [])
for eid in eids:
if eid not in self.engines:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
MinRK
various db backend fixes...
r3875 raise IndexError("No such engine: %i"%eid)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 except:
MinRK
cleanup pass
r3644 reply = error.wrap_exception()
MinRK
various db backend fixes...
r3875 break
msg_ids = self.completed.pop(eid)
uid = self.engines[eid].queue
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
MinRK
various db backend fixes...
r3875 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
except Exception:
MinRK
cleanup pass
r3644 reply = error.wrap_exception()
MinRK
various db backend fixes...
r3875 break
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
add Client.resubmit for re-running tasks...
r3874 def resubmit_task(self, client_id, msg):
"""Resubmit one or more tasks."""
def finish(reply):
self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
content = msg['content']
msg_ids = content['msg_ids']
reply = dict(status='ok')
try:
records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
'header', 'content', 'buffers'])
except Exception:
self.log.error('db::db error finding tasks to resubmit', exc_info=True)
return finish(error.wrap_exception())
# validate msg_ids
found_ids = [ rec['msg_id'] for rec in records ]
invalid_ids = filter(lambda m: m in self.pending, found_ids)
if len(records) > len(msg_ids):
try:
raise RuntimeError("DB appears to be in an inconsistent state."
"More matching records were found than should exist")
except Exception:
return finish(error.wrap_exception())
elif len(records) < len(msg_ids):
missing = [ m for m in msg_ids if m not in found_ids ]
try:
raise KeyError("No such msg(s): %s"%missing)
except KeyError:
return finish(error.wrap_exception())
elif invalid_ids:
msg_id = invalid_ids[0]
try:
raise ValueError("Task %r appears to be inflight"%(msg_id))
except Exception:
return finish(error.wrap_exception())
# clear the existing records
rec = empty_record()
map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
rec['resubmitted'] = datetime.now()
rec['queue'] = 'task'
rec['client_uuid'] = client_id[0]
try:
for msg_id in msg_ids:
self.all_completed.discard(msg_id)
self.db.update_record(msg_id, rec)
except Exception:
self.log.error('db::db error upating record', exc_info=True)
reply = error.wrap_exception()
else:
# send the messages
for rec in records:
header = rec['header']
msg = self.session.msg(header['msg_type'])
msg['content'] = rec['content']
msg['header'] = header
msg['msg_id'] = rec['msg_id']
self.session.send(self.resubmit, msg, buffers=rec['buffers'])
finish(dict(status='ok'))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
General improvements to database backend...
r3780 def _extract_record(self, rec):
"""decompose a TaskRecord dict into subsection of reply for get_result"""
io_dict = {}
for key in 'pyin pyout pyerr stdout stderr'.split():
io_dict[key] = rec[key]
content = { 'result_content': rec['result_content'],
'header': rec['header'],
'result_header' : rec['result_header'],
'io' : io_dict,
}
if rec['result_buffers']:
buffers = map(str, rec['result_buffers'])
else:
buffers = []
return content, buffers
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def get_results(self, client_id, msg):
"""Get the result of 1 or more messages."""
content = msg['content']
msg_ids = sorted(set(content['msg_ids']))
statusonly = content.get('status_only', False)
pending = []
completed = []
content = dict(status='ok')
content['pending'] = pending
content['completed'] = completed
buffers = []
if not statusonly:
MinRK
General improvements to database backend...
r3780 try:
matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
# turn match list into dict, for faster lookup
records = {}
for rec in matches:
records[rec['msg_id']] = rec
except Exception:
content = error.wrap_exception()
self.session.send(self.query, "result_reply", content=content,
parent=msg, ident=client_id)
return
else:
records = {}
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 for msg_id in msg_ids:
if msg_id in self.pending:
pending.append(msg_id)
MinRK
add Client.resubmit for re-running tasks...
r3874 elif msg_id in self.all_completed:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 completed.append(msg_id)
if not statusonly:
MinRK
General improvements to database backend...
r3780 c,bufs = self._extract_record(records[msg_id])
content[msg_id] = c
buffers.extend(bufs)
MinRK
add Client.resubmit for re-running tasks...
r3874 elif msg_id in records:
if rec['completed']:
completed.append(msg_id)
c,bufs = self._extract_record(records[msg_id])
content[msg_id] = c
buffers.extend(bufs)
else:
pending.append(msg_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
try:
raise KeyError('No such message: '+msg_id)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 break
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "result_reply", content=content,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 parent=msg, ident=client_id,
buffers=buffers)
MinRK
General improvements to database backend...
r3780 def get_history(self, client_id, msg):
"""Get a list of all msg_ids in our DB records"""
try:
msg_ids = self.db.get_history()
except Exception as e:
content = error.wrap_exception()
else:
content = dict(status='ok', history=msg_ids)
self.session.send(self.query, "history_reply", content=content,
parent=msg, ident=client_id)
def db_query(self, client_id, msg):
"""Perform a raw query on the task record database."""
content = msg['content']
query = content.get('query', {})
keys = content.get('keys', None)
query = util.extract_dates(query)
buffers = []
empty = list()
try:
records = self.db.find_records(query, keys)
except Exception as e:
content = error.wrap_exception()
else:
# extract buffers from reply content:
if keys is not None:
buffer_lens = [] if 'buffers' in keys else None
result_buffer_lens = [] if 'result_buffers' in keys else None
else:
buffer_lens = []
result_buffer_lens = []
for rec in records:
# buffers may be None, so double check
if buffer_lens is not None:
b = rec.pop('buffers', empty) or empty
buffer_lens.append(len(b))
buffers.extend(b)
if result_buffer_lens is not None:
rb = rec.pop('result_buffers', empty) or empty
result_buffer_lens.append(len(rb))
buffers.extend(rb)
content = dict(status='ok', records=records, buffer_lens=buffer_lens,
result_buffer_lens=result_buffer_lens)
self.session.send(self.query, "db_reply", content=content,
parent=msg, ident=client_id,
buffers=buffers)