##// END OF EJS Templates
Merge pull request #1020 from takluyver/dollar-formatter...
Merge pull request #1020 from takluyver/dollar-formatter Dollar formatter for ! shell calls. This uses a subclass of `FullEvalFormatter` for filling in variables in shell commands like `!echo $spam`, instead of Itpl. I've retained the `{foo}` style standard formatting for complex expressions, so it can be picked up by the more powerful built in parser. `$foo` works for names and attribute access. FullEvalFormatter now always outputs unicode, even if you gave it a bytes string. This avoids problems when you mix 8-bit non-ascii strings and unicode (which e.g. `StringIO.StringIO` suffers from). Closes #822 (test was added in 31ab23f).

File last commit:

r5344:293d3eed
r5358:09c9952f merge
Show More
hub.py
1290 lines | 48.3 KiB | text/x-python | PythonLexer
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """The IPython Controller Hub with 0MQ
This is the master object that handles connections from engines and clients,
and monitors traffic through the various queues.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Min RK
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function
import sys
import time
MinRK
resort imports in a cleaner order
r3631 from datetime import datetime
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
import zmq
MinRK
Refactor newparallel to use Config system...
r3604 from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
# internal:
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.importstring import import_item
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 from IPython.utils.traitlets import (
MinRK
add Integer traitlet...
r5344 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
General improvements to database backend...
r3780 from IPython.parallel import error, util
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 from IPython.parallel.factory import RegistrationFactory
from IPython.zmq.session import SessionFactory
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
MinRK
eliminate relative imports
r3642 from .heartmonitor import HeartMonitor
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
def _passer(*args, **kwargs):
return
MinRK
propagate iopub to clients
r3602 def _printer(*args, **kwargs):
print (args)
print (kwargs)
MinRK
SGE test related fixes...
r3668 def empty_record():
"""Return an empty dict with all record keys."""
return {
'msg_id' : None,
'header' : None,
'content': None,
'buffers': None,
'submitted': None,
'client_uuid' : None,
'engine_uuid' : None,
'started': None,
'completed': None,
'resubmitted': None,
'result_header' : None,
'result_content' : None,
'result_buffers' : None,
'queue' : None,
'pyin' : None,
'pyout': None,
'pyerr': None,
'stdout': '',
'stderr': '',
}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def init_record(msg):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 """Initialize a TaskRecord based on a request."""
MinRK
handle datetime objects in Session...
r4008 header = msg['header']
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return {
'msg_id' : header['msg_id'],
'header' : header,
'content': msg['content'],
'buffers': msg['buffers'],
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 'submitted': header['date'],
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'client_uuid' : None,
'engine_uuid' : None,
'started': None,
'completed': None,
'resubmitted': None,
'result_header' : None,
'result_content' : None,
'result_buffers' : None,
MinRK
propagate iopub to clients
r3602 'queue' : None,
'pyin' : None,
'pyout': None,
'pyerr': None,
'stdout': '',
'stderr': '',
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 }
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 class EngineConnector(HasTraits):
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """A simple object for accessing the various zmq connections of an object.
Attributes are:
id (int): engine ID
uuid (str): uuid (unused?)
queue (str): identity of queue's XREQ socket
registration (str): identity of registration XREQ socket
heartbeat (str): identity of heartbeat XREQ socket
"""
MinRK
add Integer traitlet...
r5344 id=Integer(0)
MinRK
use CBytes in EngineConnector
r4074 queue=CBytes()
control=CBytes()
registration=CBytes()
heartbeat=CBytes()
MinRK
Refactor newparallel to use Config system...
r3604 pending=Set()
class HubFactory(RegistrationFactory):
"""The Configurable for setting up a Hub."""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # port-pairs for monitoredqueues:
MinRK
add Integer traitlet...
r5344 hb = Tuple(Integer,Integer,config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""XREQ/SUB Port pair for Engine heartbeats""")
MinRK
Refactor newparallel to use Config system...
r3604 def _hb_default(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 return tuple(util.select_random_ports(2))
MinRK
add Integer traitlet...
r5344 mux = Tuple(Integer,Integer,config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""Engine/Client Port pair for MUX queue""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _mux_default(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 return tuple(util.select_random_ports(2))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Integer traitlet...
r5344 task = Tuple(Integer,Integer,config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""Engine/Client Port pair for Task queue""")
MinRK
Refactor newparallel to use Config system...
r3604 def _task_default(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 return tuple(util.select_random_ports(2))
MinRK
add Integer traitlet...
r5344 control = Tuple(Integer,Integer,config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""Engine/Client Port pair for Control queue""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _control_default(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 return tuple(util.select_random_ports(2))
MinRK
add Integer traitlet...
r5344 iopub = Tuple(Integer,Integer,config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""Engine/Client Port pair for IOPub relay""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _iopub_default(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 return tuple(util.select_random_ports(2))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # single ports:
MinRK
add Integer traitlet...
r5344 mon_port = Integer(config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""Monitor (SUB) port for queue traffic""")
MinRK
Refactor newparallel to use Config system...
r3604 def _mon_port_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(1)[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Integer traitlet...
r5344 notifier_port = Integer(config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""PUB port for sending engine status notifications""")
MinRK
Refactor newparallel to use Config system...
r3604 def _notifier_port_default(self):
MinRK
General improvements to database backend...
r3780 return util.select_random_ports(1)[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup parallel traits...
r3988 engine_ip = Unicode('127.0.0.1', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="IP on which to listen for engine connections. [default: loopback]")
MinRK
cleanup parallel traits...
r3988 engine_transport = Unicode('tcp', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="0MQ transport for engine connections. [default: tcp]")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup parallel traits...
r3988 client_ip = Unicode('127.0.0.1', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="IP on which to listen for client connections. [default: loopback]")
MinRK
cleanup parallel traits...
r3988 client_transport = Unicode('tcp', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="0MQ transport for client connections. [default : tcp]")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup parallel traits...
r3988 monitor_ip = Unicode('127.0.0.1', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="IP on which to listen for monitor messages. [default: loopback]")
MinRK
cleanup parallel traits...
r3988 monitor_transport = Unicode('tcp', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="0MQ transport for monitor messages. [default : tcp]")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup parallel traits...
r3988 monitor_url = Unicode('')
Bernardo B. Marques
remove all trailling spaces
r4872
Thomas Kluyver
Use DottedObjectName traits in zmq and parallel modules.
r4055 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
config=True, help="""The class to use for the DB backend""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # not configurable
MinRK
organize IPython.parallel into subpackages
r3673 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _ip_changed(self, name, old, new):
self.engine_ip = new
self.client_ip = new
self.monitor_ip = new
self._update_monitor_url()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def _update_monitor_url(self):
self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 def _transport_changed(self, name, old, new):
self.engine_transport = new
self.client_transport = new
self.monitor_transport = new
MinRK
Refactor newparallel to use Config system...
r3604 self._update_monitor_url()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def __init__(self, **kwargs):
super(HubFactory, self).__init__(**kwargs)
self._update_monitor_url()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def construct(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.init_hub()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 def start(self):
self.heartmonitor.start()
MinRK
rework logging connections
r3610 self.log.info("Heartmonitor started")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 def init_hub(self):
MinRK
Refactor newparallel to use Config system...
r3604 """construct"""
client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # Registrar socket
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 q.bind(client_iface % self.regport)
MinRK
rework logging connections
r3610 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
MinRK
Refactor newparallel to use Config system...
r3604 if self.client_ip != self.engine_ip:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 q.bind(engine_iface % self.regport)
MinRK
rework logging connections
r3610 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 ### Engine connections ###
# heartbeat
hpub = ctx.socket(zmq.PUB)
hpub.bind(engine_iface % self.hb[0])
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 hrep = ctx.socket(zmq.ROUTER)
MinRK
Refactor newparallel to use Config system...
r3604 hrep.bind(engine_iface % self.hb[1])
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
pingstream=ZMQStream(hpub,loop),
pongstream=ZMQStream(hrep,loop)
)
MinRK
Refactor newparallel to use Config system...
r3604
### Client connections ###
# Notifier socket
n = ZMQStream(ctx.socket(zmq.PUB), loop)
n.bind(client_iface%self.notifier_port)
### build and launch the queues ###
# monitor socket
sub = ctx.socket(zmq.SUB)
Thomas Kluyver
Tweaks to improve automated conversion to Python 3 code.
r4110 sub.setsockopt(zmq.SUBSCRIBE, b"")
MinRK
Refactor newparallel to use Config system...
r3604 sub.bind(self.monitor_url)
MinRK
add default ip<x>z_config files
r3630 sub.bind('inproc://monitor')
MinRK
Refactor newparallel to use Config system...
r3604 sub = ZMQStream(sub, loop)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # connect the db
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # cdir = self.config.Global.cluster_dir
Bernardo B. Marques
remove all trailling spaces
r4872 self.db = import_item(str(self.db_class))(session=self.session.session,
MinRK
handle datetime objects in Session...
r4008 config=self.config, log=self.log)
MinRK
Refactor newparallel to use Config system...
r3604 time.sleep(.25)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 try:
scheme = self.config.TaskScheduler.scheme_name
except AttributeError:
from .scheduler import TaskScheduler
scheme = TaskScheduler.scheme_name.get_default_value()
MinRK
Refactor newparallel to use Config system...
r3604 # build connection dicts
MinRK
newparallel tweaks, fixes...
r3622 self.engine_info = {
MinRK
Refactor newparallel to use Config system...
r3604 'control' : engine_iface%self.control[1],
'mux': engine_iface%self.mux[1],
'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
'task' : engine_iface%self.task[1],
'iopub' : engine_iface%self.iopub[1],
# 'monitor' : engine_iface%self.mon_port,
}
MinRK
newparallel tweaks, fixes...
r3622 self.client_info = {
MinRK
Refactor newparallel to use Config system...
r3604 'control' : client_iface%self.control[0],
'mux': client_iface%self.mux[0],
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 'task' : (scheme, client_iface%self.task[0]),
MinRK
Refactor newparallel to use Config system...
r3604 'iopub' : client_iface%self.iopub[0],
'notification': client_iface%self.notifier_port
}
MinRK
Add SQLite backend, DB backends are Configurable...
r3646 self.log.debug("Hub engine addrs: %s"%self.engine_info)
self.log.debug("Hub client addrs: %s"%self.client_info)
MinRK
add Client.resubmit for re-running tasks...
r3874
# resubmit stream
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
MinRK
add Client.resubmit for re-running tasks...
r3874 url = util.disambiguate_url(self.client_info['task'][-1])
MinRK
add Session.bsession trait for session id as bytes
r4770 r.setsockopt(zmq.IDENTITY, self.session.bsession)
MinRK
add Client.resubmit for re-running tasks...
r3874 r.connect(url)
MinRK
Refactor newparallel to use Config system...
r3604 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
MinRK
add Client.resubmit for re-running tasks...
r3874 query=q, notifier=n, resubmit=r, db=self.db,
MinRK
newparallel tweaks, fixes...
r3622 engine_info=self.engine_info, client_info=self.client_info,
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 log=self.log)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
reorganize Factory classes to follow relocation of Session object
r4007 class Hub(SessionFactory):
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """The IPython Controller Hub with 0MQ connections
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 Parameters
==========
loop: zmq IOLoop instance
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 session: Session object
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 <removed> context: zmq context for creating new connections (?)
queue: ZMQStream for monitoring the command queue (SUB)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 query: ZMQStream for engine registration and client queries requests (XREP)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 heartbeat: HeartMonitor object checking the pulse of the engines
notifier: ZMQStream for broadcasting engine registration changes (PUB)
db: connection to db for out of memory logging of commands
NotImplemented
MinRK
newparallel tweaks, fixes...
r3622 engine_info: dict of zmq connection information for engines to connect
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 to the queues.
MinRK
newparallel tweaks, fixes...
r3622 client_info: dict of zmq connection information for engines to connect
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 to the queues.
"""
# internal data structures:
MinRK
Refactor newparallel to use Config system...
r3604 ids=Set() # engine IDs
keytable=Dict()
by_ident=Dict()
engines=Dict()
clients=Dict()
hearts=Dict()
pending=Set()
queues=Dict() # pending msg_ids keyed by engine_id
tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
completed=Dict() # completed msg_ids keyed by engine_id
all_completed=Set() # completed msg_ids keyed by engine_id
MinRK
SGE test related fixes...
r3668 dead_engines=Set() # completed msg_ids keyed by engine_id
MinRK
better handle aborted/unschedulers tasks
r3687 unassigned=Set() # set of task msg_ds not yet assigned a destination
MinRK
Refactor newparallel to use Config system...
r3604 incoming_registrations=Dict()
MinRK
add Integer traitlet...
r5344 registration_timeout=Integer()
_idcounter=Integer(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # objects from constructor:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 query=Instance(ZMQStream)
MinRK
Refactor newparallel to use Config system...
r3604 monitor=Instance(ZMQStream)
notifier=Instance(ZMQStream)
MinRK
add Client.resubmit for re-running tasks...
r3874 resubmit=Instance(ZMQStream)
heartmonitor=Instance(HeartMonitor)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 db=Instance(object)
MinRK
newparallel tweaks, fixes...
r3622 client_info=Dict()
engine_info=Dict()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def __init__(self, **kwargs):
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """
# universal:
loop: IOLoop for creating future connections
session: streamsession for sending serialized data
# engine:
queue: ZMQStream for monitoring queue messages
MinRK
remove all PAIR sockets, Merge registration+query
r3657 query: ZMQStream for engine+client registration and client requests
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 heartbeat: HeartMonitor object for tracking engines
# extra:
db: ZMQStream for db connection (NotImplemented)
MinRK
newparallel tweaks, fixes...
r3622 engine_info: zmq address/protocol dict for engine connections
client_info: zmq address/protocol dict for client connections
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 super(Hub, self).__init__(**kwargs)
self.registration_timeout = max(5000, 2*self.heartmonitor.period)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # validate connection dicts:
MinRK
newparallel tweaks, fixes...
r3622 for k,v in self.client_info.iteritems():
if k == 'task':
MinRK
General improvements to database backend...
r3780 util.validate_url_container(v[1])
MinRK
newparallel tweaks, fixes...
r3622 else:
MinRK
General improvements to database backend...
r3780 util.validate_url_container(v)
# util.validate_url_container(self.client_info)
util.validate_url_container(self.engine_info)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # register our callbacks
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.query.on_recv(self.dispatch_query)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.monitor.on_recv(self.dispatch_monitor_traffic)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update parallel code for py3k...
r4155 self.monitor_handlers = {b'in' : self.save_queue_request,
b'out': self.save_queue_result,
b'intask': self.save_task_request,
b'outtask': self.save_task_result,
b'tracktask': self.save_task_destination,
b'incontrol': _passer,
b'outcontrol': _passer,
b'iopub': self.save_iopub_message,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 }
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.query_handlers = {'queue_request': self.queue_status,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'result_request': self.get_results,
MinRK
General improvements to database backend...
r3780 'history_request': self.get_history,
'db_request': self.db_query,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'purge_request': self.purge_results,
'load_request': self.check_load,
'resubmit_request': self.resubmit_task,
'shutdown_request': self.shutdown_request,
MinRK
remove all PAIR sockets, Merge registration+query
r3657 'registration_request' : self.register_engine,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 'unregistration_request' : self.unregister_engine,
'connection_request': self.connection_request,
}
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Client.resubmit for re-running tasks...
r3874 # ignore resubmit replies
self.resubmit.on_recv(lambda msg: None, copy=False)
MinRK
rework logging connections
r3610 self.log.info("hub::created hub")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 @property
def _next_id(self):
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 """gemerate a new ID.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 No longer reuse old ids, just count from 0."""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 newid = self._idcounter
self._idcounter += 1
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return newid
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # newid = 0
# incoming = [id[0] for id in self.incoming_registrations.itervalues()]
# # print newid, self.ids, self.incoming_registrations
# while newid in self.ids or newid in incoming:
# newid += 1
# return newid
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #-----------------------------------------------------------------------------
# message validation
#-----------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def _validate_targets(self, targets):
"""turn any valid targets argument into a list of integer ids"""
if targets is None:
# default to all
targets = self.ids
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if isinstance(targets, (int,str,unicode)):
# only one target specified
targets = [targets]
_targets = []
for t in targets:
# map raw identities to ids
if isinstance(t, (str,unicode)):
t = self.by_ident.get(t, t)
_targets.append(t)
targets = _targets
bad_targets = [ t for t in targets if t not in self.ids ]
if bad_targets:
raise IndexError("No Such Engine: %r"%bad_targets)
if not targets:
raise IndexError("No Engines Registered")
return targets
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #-----------------------------------------------------------------------------
# dispatch methods (1 per stream)
#-----------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 def dispatch_monitor_traffic(self, msg):
"""all ME and Task queue messages come through here, as well as
IOPub traffic."""
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.debug("monitor traffic: %r"%msg[:2])
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 switch = msg[0]
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 try:
idents, msg = self.session.feed_identities(msg[1:])
except ValueError:
idents=[]
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if not idents:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Monitor Message: %r"%msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
MinRK
propagate iopub to clients
r3602 handler = self.monitor_handlers.get(switch, None)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if handler is not None:
handler(idents, msg)
else:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Invalid monitor topic: %r"%switch)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 def dispatch_query(self, msg):
"""Route registration requests and queries from clients."""
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 try:
idents, msg = self.session.feed_identities(msg)
except ValueError:
idents = []
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if not idents:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Query Message: %r"%msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
client_id = idents[0]
try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg, content=True)
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 except Exception:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "hub_error", ident=client_id,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content)
return
# print client_id, header, parent, content
#switch on message type:
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.info("client::client %r requested %r"%(client_id, msg_type))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 handler = self.query_handlers.get(msg_type, None)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
MinRK
add Client.resubmit for re-running tasks...
r3874 assert handler is not None, "Bad Message Type: %r"%msg_type
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "hub_error", ident=client_id,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content)
return
MinRK
add Client.resubmit for re-running tasks...
r3874
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
remove all PAIR sockets, Merge registration+query
r3657 handler(idents, msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def dispatch_db(self, msg):
""""""
raise NotImplementedError
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #---------------------------------------------------------------------------
# handler methods (1 per event)
#---------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #----------------------- Heartbeat --------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def handle_new_heart(self, heart):
"""handler to attach to heartbeater.
Called when a new heart starts to beat.
Triggers completion of registration."""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if heart not in self.incoming_registrations:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::ignoring new heart: %r"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
self.finish_registration(heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def handle_heart_failure(self, heart):
"""handler to attach to heartbeater.
called when a previously registered heart fails to respond to beat request.
triggers unregistration"""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 eid = self.hearts.get(heart, None)
queue = self.engines[eid].queue
if eid is None:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::ignoring heart failure %r"%heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #----------------------- MUX Queue Traffic ------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def save_queue_request(self, idents, msg):
if len(idents) < 2:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.error("invalid identity prefix: %r"%idents)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
queue_id, client_id = idents[:2]
try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 except Exception:
self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 eid = self.by_ident.get(queue_id, None)
if eid is None:
MinRK
rework logging connections
r3610 self.log.error("queue::target %r not registered"%queue_id)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
record = init_record(msg)
MinRK
handle datetime objects in Session...
r4008 msg_id = record['msg_id']
MinRK
update parallel code for py3k...
r4155 # Unicode in records
MinRK
enforce ascii identities in parallel code...
r4160 record['engine_uuid'] = queue_id.decode('ascii')
record['client_uuid'] = client_id.decode('ascii')
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 record['queue'] = 'mux'
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
SGE test related fixes...
r3668 try:
# it's posible iopub arrived first:
existing = self.db.get_record(msg_id)
for key,evalue in existing.iteritems():
MinRK
add Client.resubmit for re-running tasks...
r3874 rvalue = record.get(key, None)
MinRK
SGE test related fixes...
r3668 if evalue and rvalue and evalue != rvalue:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
MinRK
SGE test related fixes...
r3668 elif evalue and not rvalue:
record[key] = evalue
MinRK
prevent few remaining db requests from crashing Hub...
r4014 try:
self.db.update_record(msg_id, record)
except Exception:
self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
MinRK
SGE test related fixes...
r3668 except KeyError:
MinRK
prevent few remaining db requests from crashing Hub...
r4014 try:
self.db.add_record(msg_id, record)
except Exception:
self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.pending.add(msg_id)
self.queues[eid].append(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def save_queue_result(self, idents, msg):
if len(idents) < 2:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.error("invalid identity prefix: %r"%idents)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 client_id, queue_id = idents[:2]
try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 except Exception:
self.log.error("queue::engine %r sent invalid message to %r: %r"%(
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 queue_id,client_id, msg), exc_info=True)
return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 eid = self.by_ident.get(queue_id, None)
if eid is None:
MinRK
rework logging connections
r3610 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 parent = msg['parent_header']
if not parent:
return
msg_id = parent['msg_id']
if msg_id in self.pending:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
self.queues[eid].remove(msg_id)
self.completed[eid].append(msg_id)
MinRK
more graceful handling of dying engines
r3651 elif msg_id not in self.all_completed:
# it could be a result from a dead engine that died before delivering the
# result
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.warn("queue:: unknown msg finished %r"%msg_id)
MinRK
more graceful handling of dying engines
r3651 return
# update record anyway, because the unregistration could have been premature
MinRK
handle datetime objects in Session...
r4008 rheader = msg['header']
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 completed = rheader['date']
MinRK
more graceful handling of dying engines
r3651 started = rheader.get('started', None)
result = {
'result_header' : rheader,
'result_content': msg['content'],
'started' : started,
'completed' : completed
}
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
more graceful handling of dying engines
r3651 result['result_buffers'] = msg['buffers']
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, result)
except Exception:
self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #--------------------- Task Queue Traffic ------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def save_task_request(self, idents, msg):
"""Save the submission of a task."""
client_id = idents[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 except Exception:
self.log.error("task::client %r sent invalid task message: %r"%(
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 client_id, msg), exc_info=True)
return
record = init_record(msg)
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 record['client_uuid'] = client_id
record['queue'] = 'task'
header = msg['header']
msg_id = header['msg_id']
self.pending.add(msg_id)
MinRK
better handle aborted/unschedulers tasks
r3687 self.unassigned.add(msg_id)
MinRK
SGE test related fixes...
r3668 try:
# it's posible iopub arrived first:
existing = self.db.get_record(msg_id)
MinRK
add Client.resubmit for re-running tasks...
r3874 if existing['resubmitted']:
for key in ('submitted', 'client_uuid', 'buffers'):
# don't clobber these keys on resubmit
# submitted and client_uuid should be different
# and buffers might be big, and shouldn't have changed
record.pop(key)
# still check content,header which should not change
# but are not expensive to compare as buffers
MinRK
SGE test related fixes...
r3668 for key,evalue in existing.iteritems():
MinRK
add Client.resubmit for re-running tasks...
r3874 if key.endswith('buffers'):
# don't compare buffers
continue
rvalue = record.get(key, None)
MinRK
SGE test related fixes...
r3668 if evalue and rvalue and evalue != rvalue:
MinRK
add Client.resubmit for re-running tasks...
r3874 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
MinRK
SGE test related fixes...
r3668 elif evalue and not rvalue:
record[key] = evalue
MinRK
prevent few remaining db requests from crashing Hub...
r4014 try:
self.db.update_record(msg_id, record)
except Exception:
self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
MinRK
SGE test related fixes...
r3668 except KeyError:
MinRK
prevent few remaining db requests from crashing Hub...
r4014 try:
self.db.add_record(msg_id, record)
except Exception:
self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
MinRK
General improvements to database backend...
r3780 except Exception:
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def save_task_result(self, idents, msg):
"""save the result of a completed task."""
client_id = idents[0]
try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 except Exception:
self.log.error("task::invalid task result message send to %r: %r"%(
MinRK
propagate iopub to clients
r3602 client_id, msg), exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 parent = msg['parent_header']
if not parent:
# print msg
MinRK
rework logging connections
r3610 self.log.warn("Task %r had no parent!"%msg)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
msg_id = parent['msg_id']
MinRK
better handle aborted/unschedulers tasks
r3687 if msg_id in self.unassigned:
self.unassigned.remove(msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
handle datetime objects in Session...
r4008 header = msg['header']
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 engine_uuid = header.get('engine', None)
eid = self.by_ident.get(engine_uuid, None)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if msg_id in self.pending:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
if eid is not None:
self.completed[eid].append(msg_id)
if msg_id in self.tasks[eid]:
self.tasks[eid].remove(msg_id)
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 completed = header['date']
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 started = header.get('started', None)
result = {
'result_header' : header,
'result_content': msg['content'],
'started' : started,
'completed' : completed,
'engine_uuid': engine_uuid
}
MinRK
Add SQLite backend, DB backends are Configurable...
r3646
result['result_buffers'] = msg['buffers']
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, result)
except Exception:
self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.debug("task::unknown task %r finished"%msg_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def save_task_destination(self, idents, msg):
try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg, content=True)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 except Exception:
MinRK
rework logging connections
r3610 self.log.error("task::invalid task tracking message", exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
content = msg['content']
MinRK
testing fixes
r3641 # print (content)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 msg_id = content['msg_id']
engine_uuid = content['engine_id']
MinRK
cleanup per review...
r4161 eid = self.by_ident[util.asbytes(engine_uuid)]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
MinRK
better handle aborted/unschedulers tasks
r3687 if msg_id in self.unassigned:
self.unassigned.remove(msg_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # else:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.tasks[eid].append(msg_id)
# self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
except Exception:
self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def mia_task_request(self, idents, msg):
raise NotImplementedError
client_id = idents[0]
# content = dict(mia=self.mia,status='ok')
# self.session.send('mia_reply', content=content, idents=client_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 #--------------------- IOPub Traffic ------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 def save_iopub_message(self, topics, msg):
"""save an iopub message into the db"""
MinRK
testing fixes
r3641 # print (topics)
MinRK
propagate iopub to clients
r3602 try:
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = self.session.unserialize(msg, content=True)
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 except Exception:
MinRK
rework logging connections
r3610 self.log.error("iopub::invalid IOPub message", exc_info=True)
MinRK
propagate iopub to clients
r3602 return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 parent = msg['parent_header']
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 if not parent:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.error("iopub::invalid IOPub message: %r"%msg)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 return
MinRK
propagate iopub to clients
r3602 msg_id = parent['msg_id']
Brian E. Granger
Fixing code to assume msg_type and msg_id are top-level....
r4230 msg_type = msg['header']['msg_type']
MinRK
propagate iopub to clients
r3602 content = msg['content']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 # ensure msg_id is in db
try:
rec = self.db.get_record(msg_id)
MinRK
SGE test related fixes...
r3668 except KeyError:
rec = empty_record()
rec['msg_id'] = msg_id
self.db.add_record(msg_id, rec)
MinRK
propagate iopub to clients
r3602 # stream
d = {}
if msg_type == 'stream':
name = content['name']
s = rec[name] or ''
d[name] = s + content['data']
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 elif msg_type == 'pyerr':
d['pyerr'] = content
MinRK
add '-s' for startup script in ipengine...
r3684 elif msg_type == 'pyin':
d['pyin'] = content['code']
MinRK
propagate iopub to clients
r3602 else:
MinRK
add '-s' for startup script in ipengine...
r3684 d[msg_type] = content.get('data', '')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, d)
except Exception:
self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #-------------------------------------------------------------------------
# Registration requests
#-------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def connection_request(self, client_id, msg):
"""Reply with connection addresses for clients."""
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.info("client::client %r connected"%client_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content = dict(status='ok')
MinRK
newparallel tweaks, fixes...
r3622 content.update(self.client_info)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 jsonable = {}
for k,v in self.keytable.iteritems():
MinRK
SGE test related fixes...
r3668 if v not in self.dead_engines:
MinRK
enforce ascii identities in parallel code...
r4160 jsonable[str(k)] = v.decode('ascii')
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content['engines'] = jsonable
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def register_engine(self, reg, msg):
"""Register a new engine."""
content = msg['content']
try:
MinRK
cleanup per review...
r4161 queue = util.asbytes(content['queue'])
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 except KeyError:
MinRK
rework logging connections
r3610 self.log.error("registration::queue not specified", exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
heart = content.get('heartbeat', None)
MinRK
update parallel code for py3k...
r4155 if heart:
MinRK
cleanup per review...
r4161 heart = util.asbytes(heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 """register a new engine, and create the socket(s) necessary"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 eid = self._next_id
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # print (eid, queue, reg, heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content = dict(id=eid,status='ok')
MinRK
newparallel tweaks, fixes...
r3622 content.update(self.engine_info)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # check if requesting available IDs:
if queue in self.by_ident:
try:
raise KeyError("queue_id %r in use"%queue)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
rework logging connections
r3610 self.log.error("queue_id %r in use"%queue, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 elif heart in self.hearts: # need to check unique hearts?
try:
raise KeyError("heart_id %r in use"%heart)
except:
MinRK
rework logging connections
r3610 self.log.error("heart_id %r in use"%heart, exc_info=True)
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
for h, pack in self.incoming_registrations.iteritems():
if heart == h:
try:
raise KeyError("heart_id %r in use"%heart)
except:
MinRK
rework logging connections
r3610 self.log.error("heart_id %r in use"%heart, exc_info=True)
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 break
elif queue == pack[1]:
try:
raise KeyError("queue_id %r in use"%queue)
except:
MinRK
rework logging connections
r3610 self.log.error("queue_id %r in use"%queue, exc_info=True)
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 break
Bernardo B. Marques
remove all trailling spaces
r4872
msg = self.session.send(self.query, "registration_reply",
content=content,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 ident=reg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if content['status'] == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 if heart in self.heartmonitor.hearts:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 # already beating
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.finish_registration(heart)
else:
purge = lambda : self._purge_stalled_registration(heart)
dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
dc.start()
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return eid
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def unregister_engine(self, ident, msg):
"""Unregister an engine that explicitly requested to leave."""
try:
eid = msg['content']['id']
except:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 self.log.info("registration::unregister_engine(%r)"%eid)
MinRK
more graceful handling of dying engines
r3651 # print (eid)
MinRK
SGE test related fixes...
r3668 uuid = self.keytable[eid]
MinRK
cleanup per review...
r4161 content=dict(id=eid, queue=uuid.decode('ascii'))
MinRK
SGE test related fixes...
r3668 self.dead_engines.add(uuid)
# self.ids.remove(eid)
# uuid = self.keytable.pop(eid)
Bernardo B. Marques
remove all trailling spaces
r4872 #
MinRK
SGE test related fixes...
r3668 # ec = self.engines.pop(eid)
# self.hearts.pop(ec.heartbeat)
# self.by_ident.pop(ec.queue)
# self.completed.pop(eid)
handleit = lambda : self._handle_stranded_msgs(eid, uuid)
dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
dc.start()
############## TODO: HANDLE IT ################
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if self.notifier:
self.session.send(self.notifier, "unregistration_notification", content=content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 def _handle_stranded_msgs(self, eid, uuid):
"""Handle messages known to be on an engine when the engine unregisters.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 It is possible that this will fire prematurely - that is, an engine will
go down after completing a result, and the client will be notified
that the result failed and later receive the actual result.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
SGE test related fixes...
r3668 outstanding = self.queues[eid]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
more graceful handling of dying engines
r3651 for msg_id in outstanding:
self.pending.remove(msg_id)
self.all_completed.add(msg_id)
try:
raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
except:
content = error.wrap_exception()
# build a fake header:
header = {}
header['engine'] = uuid
MinRK
General improvements to database backend...
r3780 header['date'] = datetime.now()
MinRK
more graceful handling of dying engines
r3651 rec = dict(result_content=content, result_header=header, result_buffers=[])
rec['completed'] = header['date']
rec['engine_uuid'] = uuid
MinRK
General improvements to database backend...
r3780 try:
self.db.update_record(msg_id, rec)
except Exception:
self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def finish_registration(self, heart):
"""Second half of engine registration, called after our HeartMonitor
has received a beat from the Engine's Heart."""
Bernardo B. Marques
remove all trailling spaces
r4872 try:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
except KeyError:
MinRK
rework logging connections
r3610 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 return
MinRK
rework logging connections
r3610 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if purge is not None:
purge.stop()
control = queue
self.ids.add(eid)
self.keytable[eid] = queue
Bernardo B. Marques
remove all trailling spaces
r4872 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 control=control, heartbeat=heart)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 self.by_ident[queue] = eid
self.queues[eid] = list()
self.tasks[eid] = list()
self.completed[eid] = list()
self.hearts[heart] = eid
MinRK
cleanup per review...
r4161 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 if self.notifier:
self.session.send(self.notifier, "registration_notification", content=content)
MinRK
rework logging connections
r3610 self.log.info("engine::Engine Connected: %i"%eid)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def _purge_stalled_registration(self, heart):
if heart in self.incoming_registrations:
eid = self.incoming_registrations.pop(heart)[0]
MinRK
rework logging connections
r3610 self.log.info("registration::purging stalled registration: %i"%eid)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
pass
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 #-------------------------------------------------------------------------
# Client Requests
#-------------------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def shutdown_request(self, client_id, msg):
"""handle shutdown request."""
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
MinRK
update API after sagedays29...
r3664 # also notify other clients of shutdown
self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
dc.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def _shutdown(self):
MinRK
rework logging connections
r3610 self.log.info("hub::hub shutting down.")
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 time.sleep(0.1)
sys.exit(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def check_load(self, client_id, msg):
content = msg['content']
try:
targets = content['targets']
targets = self._validate_targets(targets)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "hub_error",
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content, ident=client_id)
return
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content = dict(status='ok')
# loads = {}
for t in targets:
content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "load_reply", content=content, ident=client_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def queue_status(self, client_id, msg):
"""Return the Queue status of one or more targets.
if verbose: return the msg_ids
else: return len of each type.
keys: queue (pending MUX jobs)
tasks (pending Task jobs)
completed (finished jobs from both queues)"""
content = msg['content']
targets = content['targets']
try:
targets = self._validate_targets(targets)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "hub_error",
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 content=content, ident=client_id)
return
verbose = content.get('verbose', False)
content = dict(status='ok')
for t in targets:
queue = self.queues[t]
completed = self.completed[t]
tasks = self.tasks[t]
if not verbose:
queue = len(queue)
completed = len(completed)
tasks = len(tasks)
MinRK
update parallel code for py3k...
r4155 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
MinRK
better handle aborted/unschedulers tasks
r3687 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
MinRK
update parallel code for py3k...
r4155 # print (content)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def purge_results(self, client_id, msg):
"""Purge results from memory. This method is more valuable before we move
to a DB based message storage mechanism."""
content = msg['content']
MinRK
fix purge_results for args other than specified msg_id...
r4146 self.log.info("Dropping records with %s", content)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 msg_ids = content.get('msg_ids', [])
reply = dict(status='ok')
if msg_ids == 'all':
MinRK
General improvements to database backend...
r3780 try:
self.db.drop_matching_records(dict(completed={'$ne':None}))
except Exception:
reply = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
MinRK
various db backend fixes...
r3875 pending = filter(lambda m: m in self.pending, msg_ids)
if pending:
try:
raise IndexError("msg pending: %r"%pending[0])
except:
reply = error.wrap_exception()
else:
try:
self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
except Exception:
reply = error.wrap_exception()
if reply['status'] == 'ok':
eids = content.get('engine_ids', [])
for eid in eids:
if eid not in self.engines:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
MinRK
various db backend fixes...
r3875 raise IndexError("No such engine: %i"%eid)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 except:
MinRK
cleanup pass
r3644 reply = error.wrap_exception()
MinRK
various db backend fixes...
r3875 break
uid = self.engines[eid].queue
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 try:
MinRK
various db backend fixes...
r3875 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
except Exception:
MinRK
cleanup pass
r3644 reply = error.wrap_exception()
MinRK
various db backend fixes...
r3875 break
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add Client.resubmit for re-running tasks...
r3874 def resubmit_task(self, client_id, msg):
"""Resubmit one or more tasks."""
def finish(reply):
self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
content = msg['content']
msg_ids = content['msg_ids']
reply = dict(status='ok')
try:
records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
'header', 'content', 'buffers'])
except Exception:
self.log.error('db::db error finding tasks to resubmit', exc_info=True)
return finish(error.wrap_exception())
# validate msg_ids
found_ids = [ rec['msg_id'] for rec in records ]
invalid_ids = filter(lambda m: m in self.pending, found_ids)
if len(records) > len(msg_ids):
try:
raise RuntimeError("DB appears to be in an inconsistent state."
"More matching records were found than should exist")
except Exception:
return finish(error.wrap_exception())
elif len(records) < len(msg_ids):
missing = [ m for m in msg_ids if m not in found_ids ]
try:
MinRK
cleanup Hub/Scheduler to prevent '%s'%<nonascii> errors
r3996 raise KeyError("No such msg(s): %r"%missing)
MinRK
add Client.resubmit for re-running tasks...
r3874 except KeyError:
return finish(error.wrap_exception())
elif invalid_ids:
msg_id = invalid_ids[0]
try:
raise ValueError("Task %r appears to be inflight"%(msg_id))
except Exception:
return finish(error.wrap_exception())
# clear the existing records
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 now = datetime.now()
MinRK
add Client.resubmit for re-running tasks...
r3874 rec = empty_record()
map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 rec['resubmitted'] = now
MinRK
add Client.resubmit for re-running tasks...
r3874 rec['queue'] = 'task'
rec['client_uuid'] = client_id[0]
try:
for msg_id in msg_ids:
self.all_completed.discard(msg_id)
self.db.update_record(msg_id, rec)
except Exception:
self.log.error('db::db error upating record', exc_info=True)
reply = error.wrap_exception()
else:
# send the messages
for rec in records:
header = rec['header']
MinRK
use HMAC digest to sign messages instead of cleartext key...
r4000 # include resubmitted in header to prevent digest collision
MinRK
handle datetime objects in Session...
r4008 header['resubmitted'] = now
MinRK
add Client.resubmit for re-running tasks...
r3874 msg = self.session.msg(header['msg_type'])
msg['content'] = rec['content']
msg['header'] = header
Brian E. Granger
Fixing docstrings and a few more places for msg_id/msg_type.
r4232 msg['header']['msg_id'] = rec['msg_id']
MinRK
add Client.resubmit for re-running tasks...
r3874 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
finish(dict(status='ok'))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 def _extract_record(self, rec):
"""decompose a TaskRecord dict into subsection of reply for get_result"""
io_dict = {}
for key in 'pyin pyout pyerr stdout stderr'.split():
io_dict[key] = rec[key]
content = { 'result_content': rec['result_content'],
'header': rec['header'],
'result_header' : rec['result_header'],
'io' : io_dict,
}
if rec['result_buffers']:
MinRK
update parallel code for py3k...
r4155 buffers = map(bytes, rec['result_buffers'])
MinRK
General improvements to database backend...
r3780 else:
buffers = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 return content, buffers
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 def get_results(self, client_id, msg):
"""Get the result of 1 or more messages."""
content = msg['content']
msg_ids = sorted(set(content['msg_ids']))
statusonly = content.get('status_only', False)
pending = []
completed = []
content = dict(status='ok')
content['pending'] = pending
content['completed'] = completed
buffers = []
if not statusonly:
MinRK
General improvements to database backend...
r3780 try:
matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
# turn match list into dict, for faster lookup
records = {}
for rec in matches:
records[rec['msg_id']] = rec
except Exception:
content = error.wrap_exception()
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "result_reply", content=content,
MinRK
General improvements to database backend...
r3780 parent=msg, ident=client_id)
return
else:
records = {}
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 for msg_id in msg_ids:
if msg_id in self.pending:
pending.append(msg_id)
MinRK
add Client.resubmit for re-running tasks...
r3874 elif msg_id in self.all_completed:
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 completed.append(msg_id)
if not statusonly:
MinRK
General improvements to database backend...
r3780 c,bufs = self._extract_record(records[msg_id])
content[msg_id] = c
buffers.extend(bufs)
MinRK
add Client.resubmit for re-running tasks...
r3874 elif msg_id in records:
if rec['completed']:
completed.append(msg_id)
c,bufs = self._extract_record(records[msg_id])
content[msg_id] = c
buffers.extend(bufs)
else:
pending.append(msg_id)
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 else:
try:
raise KeyError('No such message: '+msg_id)
except:
MinRK
cleanup pass
r3644 content = error.wrap_exception()
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 break
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "result_reply", content=content,
MinRK
Controller renamed to Hub (keeping ipcontrollerz)
r3599 parent=msg, ident=client_id,
buffers=buffers)
MinRK
General improvements to database backend...
r3780 def get_history(self, client_id, msg):
"""Get a list of all msg_ids in our DB records"""
try:
msg_ids = self.db.get_history()
except Exception as e:
content = error.wrap_exception()
else:
content = dict(status='ok', history=msg_ids)
Bernardo B. Marques
remove all trailling spaces
r4872
self.session.send(self.query, "history_reply", content=content,
MinRK
General improvements to database backend...
r3780 parent=msg, ident=client_id)
def db_query(self, client_id, msg):
"""Perform a raw query on the task record database."""
content = msg['content']
query = content.get('query', {})
keys = content.get('keys', None)
buffers = []
empty = list()
try:
records = self.db.find_records(query, keys)
except Exception as e:
content = error.wrap_exception()
else:
# extract buffers from reply content:
if keys is not None:
buffer_lens = [] if 'buffers' in keys else None
result_buffer_lens = [] if 'result_buffers' in keys else None
else:
buffer_lens = []
result_buffer_lens = []
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
General improvements to database backend...
r3780 for rec in records:
# buffers may be None, so double check
if buffer_lens is not None:
b = rec.pop('buffers', empty) or empty
buffer_lens.append(len(b))
buffers.extend(b)
if result_buffer_lens is not None:
rb = rec.pop('result_buffers', empty) or empty
result_buffer_lens.append(len(rb))
buffers.extend(rb)
content = dict(status='ok', records=records, buffer_lens=buffer_lens,
result_buffer_lens=result_buffer_lens)
MinRK
update parallel code for py3k...
r4155 # self.log.debug (content)
Bernardo B. Marques
remove all trailling spaces
r4872 self.session.send(self.query, "db_reply", content=content,
MinRK
General improvements to database backend...
r3780 parent=msg, ident=client_id,
buffers=buffers)