##// END OF EJS Templates
added preliminary ssh tunneling support for clients
added preliminary ssh tunneling support for clients

File last commit:

r3550:3ab94ec8
r3572:99528f8b
Show More
controller.py
138 lines | 4.0 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""A script to launch a controller with all its queues and connect it to a logger"""
import time
import logging
import zmq
from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
from zmq.log import handlers
from IPython.zmq import log
from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
def setup():
"""setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
ctx = zmq.Context()
loop = ioloop.IOLoop.instance()
# port config
# config={}
execfile('config.py', globals())
iface = config['interface']
logport = config['logport']
rport = config['regport']
cport = config['clientport']
cqport = config['cqueueport']
eqport = config['equeueport']
ctport = config['ctaskport']
etport = config['etaskport']
ccport = config['ccontrolport']
ecport = config['econtrolport']
hport = config['heartport']
nport = config['notifierport']
# setup logging
lsock = ctx.socket(zmq.PUB)
lsock.connect('%s:%i'%(iface,logport))
# connected=False
# while not connected:
# try:
# except:
# logport = logport + 1
# else:
# connected=True
#
handler = handlers.PUBHandler(lsock)
handler.setLevel(logging.DEBUG)
handler.root_topic = "controller"
log.logger.addHandler(handler)
time.sleep(.5)
### Engine connections ###
# Engine registrar socket
reg = ZMQStream(ctx.socket(zmq.XREP), loop)
reg.bind("%s:%i"%(iface, rport))
# heartbeat
hpub = ctx.socket(zmq.PUB)
hpub.bind("%s:%i"%(iface, hport))
hrep = ctx.socket(zmq.XREP)
hrep.bind("%s:%i"%(iface, hport+1))
hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
hb.start()
### Client connections ###
# Clientele socket
c = ZMQStream(ctx.socket(zmq.XREP), loop)
c.bind("%s:%i"%(iface, cport))
n = ZMQStream(ctx.socket(zmq.PUB), loop)
n.bind("%s:%i"%(iface, nport))
thesession = session.StreamSession(username="controller")
# build and launch the queue
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, "")
monport = sub.bind_to_random_port(iface)
sub = ZMQStream(sub, loop)
# Multiplexer Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
q.bind_in("%s:%i"%(iface, cqport))
q.bind_out("%s:%i"%(iface, eqport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
# Control Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
q.bind_in("%s:%i"%(iface, ccport))
q.bind_out("%s:%i"%(iface, ecport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
# Task Queue (in a Process)
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
q.bind_in("%s:%i"%(iface, ctport))
q.bind_out("%s:%i"%(iface, etport))
q.connect_mon("%s:%i"%(iface, monport))
q.daemon=True
q.start()
time.sleep(.25)
# build connection dicts
engine_addrs = {
'control' : "%s:%i"%(iface, ecport),
'queue': "%s:%i"%(iface, eqport),
'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
'task' : "%s:%i"%(iface, etport),
'monitor' : "%s:%i"%(iface, monport),
}
client_addrs = {
'control' : "%s:%i"%(iface, ccport),
'query': "%s:%i"%(iface, cport),
'queue': "%s:%i"%(iface, cqport),
'task' : "%s:%i"%(iface, ctport),
'notification': "%s:%i"%(iface, nport)
}
con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
return loop
if __name__ == '__main__':
loop = setup()
loop.start()