##// END OF EJS Templates
major cleanup of client code + purge_request implemented
major cleanup of client code + purge_request implemented

File last commit:

r3552:d405cd69
r3555:077c1af7
Show More
controller.py
923 lines | 33.1 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""The IPython Controller with 0MQ
This is the master object that handles connections from engines, clients, and
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import print_function
from datetime import datetime
import logging
import zmq
from zmq.eventloop import zmqstream, ioloop
import uuid
# internal:
from IPython.zmq.log import logger # a Logger object
from IPython.zmq.entry_point import bind_port
from streamsession import Message, wrap_exception
from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
connect_logger, parse_url)
# from messages import json # use the same import switches
#-----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
class ReverseDict(dict):
"""simple double-keyed subset of dict methods."""
def __init__(self, *args, **kwargs):
dict.__init__(self, *args, **kwargs)
self.reverse = dict()
for key, value in self.iteritems():
self.reverse[value] = key
def __getitem__(self, key):
try:
return dict.__getitem__(self, key)
except KeyError:
return self.reverse[key]
def __setitem__(self, key, value):
if key in self.reverse:
raise KeyError("Can't have key %r on both sides!"%key)
dict.__setitem__(self, key, value)
self.reverse[value] = key
def pop(self, key):
value = dict.pop(self, key)
self.d1.pop(value)
return value
class EngineConnector(object):
"""A simple object for accessing the various zmq connections of an object.
Attributes are:
id (int): engine ID
uuid (str): uuid (unused?)
queue (str): identity of queue's XREQ socket
registration (str): identity of registration XREQ socket
heartbeat (str): identity of heartbeat XREQ socket
"""
id=0
queue=None
control=None
registration=None
heartbeat=None
pending=None
def __init__(self, id, queue, registration, control, heartbeat=None):
logger.info("engine::Engine Connected: %i"%id)
self.id = id
self.queue = queue
self.registration = registration
self.control = control
self.heartbeat = heartbeat
class Controller(object):
"""The IPython Controller with 0MQ connections
Parameters
==========
loop: zmq IOLoop instance
session: StreamSession object
<removed> context: zmq context for creating new connections (?)
registrar: ZMQStream for engine registration requests (XREP)
clientele: ZMQStream for client connections (XREP)
not used for jobs, only query/control commands
queue: ZMQStream for monitoring the command queue (SUB)
heartbeat: HeartMonitor object checking the pulse of the engines
db_stream: connection to db for out of memory logging of commands
NotImplemented
queue_addr: zmq connection address of the XREP socket for the queue
hb_addr: zmq connection address of the PUB socket for heartbeats
task_addr: zmq connection address of the XREQ socket for task queue
"""
# internal data structures:
ids=None # engine IDs
keytable=None
engines=None
clients=None
hearts=None
pending=None
results=None
tasks=None
completed=None
mia=None
incoming_registrations=None
registration_timeout=None
#objects from constructor:
loop=None
registrar=None
clientelle=None
queue=None
heartbeat=None
notifier=None
db=None
client_addr=None
engine_addrs=None
def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
"""
# universal:
loop: IOLoop for creating future connections
session: streamsession for sending serialized data
# engine:
queue: ZMQStream for monitoring queue messages
registrar: ZMQStream for engine registration
heartbeat: HeartMonitor object for tracking engines
# client:
clientele: ZMQStream for client connections
# extra:
db: ZMQStream for db connection (NotImplemented)
engine_addrs: zmq address/protocol dict for engine connections
client_addrs: zmq address/protocol dict for client connections
"""
self.ids = set()
self.keytable={}
self.incoming_registrations={}
self.engines = {}
self.by_ident = {}
self.clients = {}
self.hearts = {}
self.mia = set()
# self.sockets = {}
self.loop = loop
self.session = session
self.registrar = registrar
self.clientele = clientele
self.queue = queue
self.heartbeat = heartbeat
self.notifier = notifier
self.db = db
self.client_addrs = client_addrs
assert isinstance(client_addrs['queue'], str)
# self.hb_addrs = hb_addrs
self.engine_addrs = engine_addrs
assert isinstance(engine_addrs['queue'], str)
assert len(engine_addrs['heartbeat']) == 2
# register our callbacks
self.registrar.on_recv(self.dispatch_register_request)
self.clientele.on_recv(self.dispatch_client_msg)
self.queue.on_recv(self.dispatch_queue_traffic)
if heartbeat is not None:
heartbeat.add_heart_failure_handler(self.handle_heart_failure)
heartbeat.add_new_heart_handler(self.handle_new_heart)
if self.db is not None:
self.db.on_recv(self.dispatch_db)
self.client_handlers = {'queue_request': self.queue_status,
'result_request': self.get_results,
'purge_request': self.purge_results,
'resubmit_request': self.resubmit_task,
}
self.registrar_handlers = {'registration_request' : self.register_engine,
'unregistration_request' : self.unregister_engine,
'connection_request': self.connection_request,
}
#
# this is the stuff that will move to DB:
self.results = {} # completed results
self.pending = {} # pending messages, keyed by msg_id
self.queues = {} # pending msg_ids keyed by engine_id
self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
self.completed = {} # completed msg_ids keyed by engine_id
self.registration_timeout = max(5000, 2*self.heartbeat.period)
logger.info("controller::created controller")
def _new_id(self):
"""gemerate a new ID"""
newid = 0
incoming = [id[0] for id in self.incoming_registrations.itervalues()]
# print newid, self.ids, self.incoming_registrations
while newid in self.ids or newid in incoming:
newid += 1
return newid
#-----------------------------------------------------------------------------
# message validation
#-----------------------------------------------------------------------------
def _validate_targets(self, targets):
"""turn any valid targets argument into a list of integer ids"""
if targets is None:
# default to all
targets = self.ids
if isinstance(targets, (int,str,unicode)):
# only one target specified
targets = [targets]
_targets = []
for t in targets:
# map raw identities to ids
if isinstance(t, (str,unicode)):
t = self.by_ident.get(t, t)
_targets.append(t)
targets = _targets
bad_targets = [ t for t in targets if t not in self.ids ]
if bad_targets:
raise IndexError("No Such Engine: %r"%bad_targets)
if not targets:
raise IndexError("No Engines Registered")
return targets
def _validate_client_msg(self, msg):
"""validates and unpacks headers of a message. Returns False if invalid,
(ident, header, parent, content)"""
client_id = msg[0]
try:
msg = self.session.unpack_message(msg[1:], content=True)
except:
logger.error("client::Invalid Message %s"%msg)
return False
msg_type = msg.get('msg_type', None)
if msg_type is None:
return False
header = msg.get('header')
# session doesn't handle split content for now:
return client_id, msg
#-----------------------------------------------------------------------------
# dispatch methods (1 per stream)
#-----------------------------------------------------------------------------
def dispatch_register_request(self, msg):
""""""
logger.debug("registration::dispatch_register_request(%s)"%msg)
idents,msg = self.session.feed_identities(msg)
print (idents,msg, len(msg))
try:
msg = self.session.unpack_message(msg,content=True)
except Exception, e:
logger.error("registration::got bad registration message: %s"%msg)
raise e
return
msg_type = msg['msg_type']
content = msg['content']
handler = self.registrar_handlers.get(msg_type, None)
if handler is None:
logger.error("registration::got bad registration message: %s"%msg)
else:
handler(idents, msg)
def dispatch_queue_traffic(self, msg):
"""all ME and Task queue messages come through here"""
logger.debug("queue traffic: %s"%msg[:2])
switch = msg[0]
idents, msg = self.session.feed_identities(msg[1:])
if switch == 'in':
self.save_queue_request(idents, msg)
elif switch == 'out':
self.save_queue_result(idents, msg)
elif switch == 'intask':
self.save_task_request(idents, msg)
elif switch == 'outtask':
self.save_task_result(idents, msg)
elif switch == 'tracktask':
self.save_task_destination(idents, msg)
elif switch in ('incontrol', 'outcontrol'):
pass
else:
logger.error("Invalid message topic: %s"%switch)
def dispatch_client_msg(self, msg):
"""Route messages from clients"""
idents, msg = self.session.feed_identities(msg)
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=True)
except:
content = wrap_exception()
logger.error("Bad Client Message: %s"%msg)
self.session.send(self.clientele, "controller_error", ident=client_id,
content=content)
return
# print client_id, header, parent, content
#switch on message type:
msg_type = msg['msg_type']
logger.info("client:: client %s requested %s"%(client_id, msg_type))
handler = self.client_handlers.get(msg_type, None)
try:
assert handler is not None, "Bad Message Type: %s"%msg_type
except:
content = wrap_exception()
logger.error("Bad Message Type: %s"%msg_type)
self.session.send(self.clientele, "controller_error", ident=client_id,
content=content)
return
else:
handler(client_id, msg)
def dispatch_db(self, msg):
""""""
raise NotImplementedError
#---------------------------------------------------------------------------
# handler methods (1 per event)
#---------------------------------------------------------------------------
#----------------------- Heartbeat --------------------------------------
def handle_new_heart(self, heart):
"""handler to attach to heartbeater.
Called when a new heart starts to beat.
Triggers completion of registration."""
logger.debug("heartbeat::handle_new_heart(%r)"%heart)
if heart not in self.incoming_registrations:
logger.info("heartbeat::ignoring new heart: %r"%heart)
else:
self.finish_registration(heart)
def handle_heart_failure(self, heart):
"""handler to attach to heartbeater.
called when a previously registered heart fails to respond to beat request.
triggers unregistration"""
logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
eid = self.hearts.get(heart, None)
queue = self.engines[eid].queue
if eid is None:
logger.info("heartbeat::ignoring heart failure %r"%heart)
else:
self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
#----------------------- MUX Queue Traffic ------------------------------
def save_queue_request(self, idents, msg):
queue_id, client_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
logger.error("queue::target %r not registered"%queue_id)
logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
return
header = msg['header']
msg_id = header['msg_id']
info = dict(submit=datetime.now(),
received=None,
engine=(eid, queue_id))
self.pending[msg_id] = ( msg, info )
self.queues[eid][0].append(msg_id)
def save_queue_result(self, idents, msg):
client_id, queue_id = idents[:2]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("queue::engine %r sent invalid message to %r: %s"%(
queue_id,client_id, msg))
return
eid = self.by_ident.get(queue_id, None)
if eid is None:
logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
logger.debug("queue:: %s"%msg[2:])
return
parent = msg['parent_header']
if not parent:
return
msg_id = parent['msg_id']
self.results[msg_id] = msg
if msg_id in self.pending:
self.pending.pop(msg_id)
self.queues[eid][0].remove(msg_id)
self.completed[eid].append(msg_id)
else:
logger.debug("queue:: unknown msg finished %s"%msg_id)
#--------------------- Task Queue Traffic ------------------------------
def save_task_request(self, idents, msg):
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::client %r sent invalid task message: %s"%(
client_id, msg))
return
header = msg['header']
msg_id = header['msg_id']
self.mia.add(msg_id)
self.pending[msg_id] = msg
if not self.tasks.has_key(client_id):
self.tasks[client_id] = []
self.tasks[client_id].append(msg_id)
def save_task_result(self, idents, msg):
client_id = idents[0]
try:
msg = self.session.unpack_message(msg, content=False)
except:
logger.error("task::invalid task result message send to %r: %s"%(
client_id, msg))
return
parent = msg['parent_header']
if not parent:
# print msg
# logger.warn("")
return
msg_id = parent['msg_id']
self.results[msg_id] = msg
if msg_id in self.pending:
self.pending.pop(msg_id)
if msg_id in self.mia:
self.mia.remove(msg_id)
else:
logger.debug("task::unknown task %s finished"%msg_id)
def save_task_destination(self, idents, msg):
try:
msg = self.session.unpack_message(msg, content=True)
except:
logger.error("task::invalid task tracking message")
return
content = msg['content']
print (content)
msg_id = content['msg_id']
engine_uuid = content['engine_id']
for eid,queue_id in self.keytable.iteritems():
if queue_id == engine_uuid:
break
logger.info("task::task %s arrived on %s"%(msg_id, eid))
if msg_id in self.mia:
self.mia.remove(msg_id)
else:
logger.debug("task::task %s not listed as MIA?!"%(msg_id))
self.tasks[engine_uuid].append(msg_id)
def mia_task_request(self, idents, msg):
client_id = idents[0]
content = dict(mia=self.mia,status='ok')
self.session.send('mia_reply', content=content, idents=client_id)
#-------------------- Registration -----------------------------
def connection_request(self, client_id, msg):
"""reply with connection addresses for clients"""
logger.info("client::client %s connected"%client_id)
content = dict(status='ok')
content.update(self.client_addrs)
jsonable = {}
for k,v in self.keytable.iteritems():
jsonable[str(k)] = v
content['engines'] = jsonable
self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
def register_engine(self, reg, msg):
"""register an engine"""
content = msg['content']
try:
queue = content['queue']
except KeyError:
logger.error("registration::queue not specified")
return
heart = content.get('heartbeat', None)
"""register a new engine, and create the socket(s) necessary"""
eid = self._new_id()
# print (eid, queue, reg, heart)
logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
content = dict(id=eid,status='ok')
content.update(self.engine_addrs)
# check if requesting available IDs:
if queue in self.by_ident:
content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
elif heart in self.hearts: # need to check unique hearts?
content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
else:
for h, pack in self.incoming_registrations.iteritems():
if heart == h:
content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
break
elif queue == pack[1]:
content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
break
msg = self.session.send(self.registrar, "registration_reply",
content=content,
ident=reg)
if content['status'] == 'ok':
if heart in self.heartbeat.hearts:
# already beating
self.incoming_registrations[heart] = (eid,queue,reg,None)
self.finish_registration(heart)
else:
purge = lambda : self._purge_stalled_registration(heart)
dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
dc.start()
self.incoming_registrations[heart] = (eid,queue,reg,dc)
else:
logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
return eid
def unregister_engine(self, ident, msg):
try:
eid = msg['content']['id']
except:
logger.error("registration::bad engine id for unregistration: %s"%ident)
return
logger.info("registration::unregister_engine(%s)"%eid)
content=dict(id=eid, queue=self.engines[eid].queue)
self.ids.remove(eid)
self.keytable.pop(eid)
ec = self.engines.pop(eid)
self.hearts.pop(ec.heartbeat)
self.by_ident.pop(ec.queue)
self.completed.pop(eid)
for msg_id in self.queues.pop(eid)[0]:
msg = self.pending.pop(msg_id)
############## TODO: HANDLE IT ################
if self.notifier:
self.session.send(self.notifier, "unregistration_notification", content=content)
def finish_registration(self, heart):
try:
(eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
except KeyError:
logger.error("registration::tried to finish nonexistant registration")
return
logger.info("registration::finished registering engine %i:%r"%(eid,queue))
if purge is not None:
purge.stop()
control = queue
self.ids.add(eid)
self.keytable[eid] = queue
self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
self.by_ident[queue] = eid
self.queues[eid] = ([],[])
self.completed[eid] = list()
self.hearts[heart] = eid
content = dict(id=eid, queue=self.engines[eid].queue)
if self.notifier:
self.session.send(self.notifier, "registration_notification", content=content)
def _purge_stalled_registration(self, heart):
if heart in self.incoming_registrations:
eid = self.incoming_registrations.pop(heart)[0]
logger.info("registration::purging stalled registration: %i"%eid)
else:
pass
#------------------- Client Requests -------------------------------
def check_load(self, client_id, msg):
content = msg['content']
try:
targets = content['targets']
targets = self._validate_targets(targets)
except:
content = wrap_exception()
self.session.send(self.clientele, "controller_error",
content=content, ident=client_id)
return
content = dict(status='ok')
# loads = {}
for t in targets:
content[str(t)] = len(self.queues[t])
self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
def queue_status(self, client_id, msg):
"""handle queue_status request"""
content = msg['content']
targets = content['targets']
try:
targets = self._validate_targets(targets)
except:
content = wrap_exception()
self.session.send(self.clientele, "controller_error",
content=content, ident=client_id)
return
verbose = msg.get('verbose', False)
content = dict()
for t in targets:
queue = self.queues[t]
completed = self.completed[t]
if not verbose:
queue = len(queue)
completed = len(completed)
content[str(t)] = {'queue': queue, 'completed': completed }
# pending
self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
def purge_results(self, client_id, msg):
content = msg['content']
msg_ids = content.get('msg_ids', [])
reply = dict(status='ok')
if msg_ids == 'all':
self.results = {}
else:
for msg_id in msg_ids:
if msg_id in self.results:
self.results.pop(msg_id)
else:
if msg_id in self.pending:
reply = dict(status='error', reason="msg pending: %r"%msg_id)
else:
reply = dict(status='error', reason="No such msg: %r"%msg_id)
break
eids = content.get('engine_ids', [])
for eid in eids:
if eid not in self.engines:
reply = dict(status='error', reason="No such engine: %i"%eid)
break
msg_ids = self.completed.pop(eid)
for msg_id in msg_ids:
self.results.pop(msg_id)
self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
def resubmit_task(self, client_id, msg, buffers):
content = msg['content']
header = msg['header']
msg_ids = content.get('msg_ids', [])
reply = dict(status='ok')
if msg_ids == 'all':
self.results = {}
else:
for msg_id in msg_ids:
if msg_id in self.results:
self.results.pop(msg_id)
else:
if msg_id in self.pending:
reply = dict(status='error', reason="msg pending: %r"%msg_id)
else:
reply = dict(status='error', reason="No such msg: %r"%msg_id)
break
eids = content.get('engine_ids', [])
for eid in eids:
if eid not in self.engines:
reply = dict(status='error', reason="No such engine: %i"%eid)
break
msg_ids = self.completed.pop(eid)
for msg_id in msg_ids:
self.results.pop(msg_id)
self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
def get_results(self, client_id, msg):
"""get the result of 1 or more messages"""
content = msg['content']
msg_ids = set(content['msg_ids'])
statusonly = content.get('status_only', False)
pending = []
completed = []
content = dict(status='ok')
content['pending'] = pending
content['completed'] = completed
for msg_id in msg_ids:
if msg_id in self.pending:
pending.append(msg_id)
elif msg_id in self.results:
completed.append(msg_id)
if not statusonly:
content[msg_id] = self.results[msg_id]['content']
else:
content = dict(status='error')
content['reason'] = 'no such message: '+msg_id
break
self.session.send(self.clientele, "result_reply", content=content,
parent=msg, ident=client_id)
############ OLD METHODS for Python Relay Controller ###################
def _validate_engine_msg(self, msg):
"""validates and unpacks headers of a message. Returns False if invalid,
(ident, message)"""
ident = msg[0]
try:
msg = self.session.unpack_message(msg[1:], content=False)
except:
logger.error("engine.%s::Invalid Message %s"%(ident, msg))
return False
try:
eid = msg.header.username
assert self.engines.has_key(eid)
except:
logger.error("engine::Invalid Engine ID %s"%(ident))
return False
return eid, msg
#--------------------
# Entry Point
#--------------------
def make_argument_parser():
"""Make an argument parser"""
parser = make_base_argument_parser()
parser.add_argument('--client', type=int, metavar='PORT', default=0,
help='set the XREP port for clients [default: random]')
parser.add_argument('--notice', type=int, metavar='PORT', default=0,
help='set the PUB socket for registration notification [default: random]')
parser.add_argument('--hb', type=str, metavar='PORTS',
help='set the 2 ports for heartbeats [default: random]')
parser.add_argument('--ping', type=int, default=3000,
help='set the heartbeat period in ms [default: 3000]')
parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
help='set the SUB port for queue monitoring [default: random]')
parser.add_argument('--mux', type=str, metavar='PORTS',
help='set the XREP ports for the MUX queue [default: random]')
parser.add_argument('--task', type=str, metavar='PORTS',
help='set the XREP/XREQ ports for the task queue [default: random]')
parser.add_argument('--control', type=str, metavar='PORTS',
help='set the XREP ports for the control queue [default: random]')
parser.add_argument('--scheduler', type=str, default='pure',
choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
help='select the task scheduler [default: pure ZMQ]')
return parser
def main():
import time
from multiprocessing import Process
from zmq.eventloop.zmqstream import ZMQStream
from zmq.devices import ProcessMonitoredQueue
from zmq.log import handlers
import streamsession as session
import heartmonitor
from scheduler import launch_scheduler
parser = make_argument_parser()
args = parser.parse_args()
parse_url(args)
iface="%s://%s"%(args.transport,args.ip)+':%i'
random_ports = 0
if args.hb:
hb = split_ports(args.hb, 2)
else:
hb = select_random_ports(2)
if args.mux:
mux = split_ports(args.mux, 2)
else:
mux = None
random_ports += 2
if args.task:
task = split_ports(args.task, 2)
else:
task = None
random_ports += 2
if args.control:
control = split_ports(args.control, 2)
else:
control = None
random_ports += 2
ctx = zmq.Context()
loop = ioloop.IOLoop.instance()
# setup logging
connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
# Registrar socket
reg = ZMQStream(ctx.socket(zmq.XREP), loop)
regport = bind_port(reg, args.ip, args.regport)
### Engine connections ###
# heartbeat
hpub = ctx.socket(zmq.PUB)
bind_port(hpub, args.ip, hb[0])
hrep = ctx.socket(zmq.XREP)
bind_port(hrep, args.ip, hb[1])
hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
hmon.start()
### Client connections ###
# Clientele socket
c = ZMQStream(ctx.socket(zmq.XREP), loop)
cport = bind_port(c, args.ip, args.client)
# Notifier socket
n = ZMQStream(ctx.socket(zmq.PUB), loop)
nport = bind_port(n, args.ip, args.notice)
thesession = session.StreamSession(username=args.ident or "controller")
### build and launch the queues ###
# monitor socket
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
monport = bind_port(sub, args.ip, args.monitor)
sub = ZMQStream(sub, loop)
ports = select_random_ports(random_ports)
# Multiplexer Queue (in a Process)
if not mux:
mux = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in(iface%mux[0])
q.bind_out(iface%mux[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
# Control Queue (in a Process)
if not control:
control = (ports.pop(),ports.pop())
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in(iface%control[0])
q.bind_out(iface%control[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
# Task Queue (in a Process)
if not task:
task = (ports.pop(),ports.pop())
if args.scheduler == 'pure':
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in(iface%task[0])
q.bind_out(iface%task[1])
q.connect_mon(iface%monport)
q.daemon=True
q.start()
else:
sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
print (sargs)
p = Process(target=launch_scheduler, args=sargs)
p.daemon=True
p.start()
time.sleep(.25)
# build connection dicts
engine_addrs = {
'control' : iface%control[1],
'queue': iface%mux[1],
'heartbeat': (iface%hb[0], iface%hb[1]),
'task' : iface%task[1],
'monitor' : iface%monport,
}
client_addrs = {
'control' : iface%control[0],
'query': iface%cport,
'queue': iface%mux[0],
'task' : iface%task[0],
'notification': iface%nport
}
con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
loop.start()
if __name__ == '__main__':
main()