##// END OF EJS Templates
control channel progress
control channel progress

File last commit:

r3540:c369179d
r3540:c369179d
Show More
controller.py
138 lines | 4.0 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""A script to launch a controller with all its queues and connect it to a logger"""
import time
import logging
import zmq
from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
from zmq.log import handlers
from IPython.zmq import log
from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
def setup():
"""setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
ctx = zmq.Context(1)
loop = ioloop.IOLoop.instance()
# port config
# config={}
execfile('config.py', globals())
iface = config['interface']
logport = config['logport']
rport = config['regport']
cport = config['clientport']
cqport = config['cqueueport']
eqport = config['equeueport']
ctport = config['ctaskport']
etport = config['etaskport']
ccport = config['ccontrolport']
ecport = config['econtrolport']
hport = config['heartport']
nport = config['notifierport']
# setup logging
lsock = ctx.socket(zmq.PUB)
lsock.connect('%s:%i'%(iface,logport))
# connected=False
# while not connected:
# try:
# except:
# logport = logport + 1
# else:
# connected=True
#
handler = handlers.PUBHandler(lsock)
handler.setLevel(logging.DEBUG)
handler.root_topic = "controller"
log.logger.addHandler(handler)
time.sleep(.5)
### Engine connections ###
# Engine registrar socket
reg = ZMQStream(ctx.socket(zmq.XREP), loop)
reg.bind("%s:%i"%(iface, rport))
# heartbeat
hpub = ctx.socket(zmq.PUB)
hpub.bind("%s:%i"%(iface, hport))
hrep = ctx.socket(zmq.XREP)
hrep.bind("%s:%i"%(iface, hport+1))
hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
hb.start()
### Client connections ###
# Clientele socket
c = ZMQStream(ctx.socket(zmq.XREP), loop)
c.bind("%s:%i"%(iface, cport))
n = ZMQStream(ctx.socket(zmq.PUB), loop)
n.bind("%s:%i"%(iface, nport))
thesession = session.StreamSession(username="controller")
# build and launch the queue
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
monport = sub.bind_to_random_port(iface)
sub = ZMQStream(sub, loop)
# Multiplexer Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in("%s:%i"%(iface, cqport))
q.bind_out("%s:%i"%(iface, eqport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
# Control Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in("%s:%i"%(iface, ccport))
q.bind_out("%s:%i"%(iface, ecport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
# Task Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in("%s:%i"%(iface, ctport))
q.bind_out("%s:%i"%(iface, etport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
time.sleep(.25)
# build connection dicts
engine_addrs = {
'control' : "%s:%i"%(iface, ecport),
'queue': "%s:%i"%(iface, eqport),
'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
'task' : "%s:%i"%(iface, etport),
'monitor' : "%s:%i"%(iface, monport),
}
client_addrs = {
'control' : "%s:%i"%(iface, ccport),
'query': "%s:%i"%(iface, cport),
'queue': "%s:%i"%(iface, cqport),
'task' : "%s:%i"%(iface, ctport),
'notification': "%s:%i"%(iface, nport)
}
con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
return loop
if __name__ == '__main__':
loop = setup()
loop.start()