controller.py
138 lines
| 4.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | |
"""A script to launch a controller with all its queues and connect it to a logger""" | |||
import time | |||
import logging | |||
import zmq | |||
from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue | |||
from zmq.eventloop import ioloop | |||
from zmq.eventloop.zmqstream import ZMQStream | |||
from zmq.log import handlers | |||
from IPython.zmq import log | |||
from IPython.zmq.parallel import controller, heartmonitor, streamsession as session | |||
def setup(): | |||
"""setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat""" | |||
MinRK
|
r3550 | ctx = zmq.Context() | |
MinRK
|
r3539 | loop = ioloop.IOLoop.instance() | |
# port config | |||
# config={} | |||
execfile('config.py', globals()) | |||
iface = config['interface'] | |||
logport = config['logport'] | |||
rport = config['regport'] | |||
cport = config['clientport'] | |||
cqport = config['cqueueport'] | |||
eqport = config['equeueport'] | |||
ctport = config['ctaskport'] | |||
etport = config['etaskport'] | |||
ccport = config['ccontrolport'] | |||
ecport = config['econtrolport'] | |||
hport = config['heartport'] | |||
nport = config['notifierport'] | |||
# setup logging | |||
lsock = ctx.socket(zmq.PUB) | |||
lsock.connect('%s:%i'%(iface,logport)) | |||
# connected=False | |||
# while not connected: | |||
# try: | |||
# except: | |||
# logport = logport + 1 | |||
# else: | |||
# connected=True | |||
# | |||
handler = handlers.PUBHandler(lsock) | |||
handler.setLevel(logging.DEBUG) | |||
handler.root_topic = "controller" | |||
log.logger.addHandler(handler) | |||
time.sleep(.5) | |||
### Engine connections ### | |||
# Engine registrar socket | |||
reg = ZMQStream(ctx.socket(zmq.XREP), loop) | |||
reg.bind("%s:%i"%(iface, rport)) | |||
# heartbeat | |||
hpub = ctx.socket(zmq.PUB) | |||
hpub.bind("%s:%i"%(iface, hport)) | |||
hrep = ctx.socket(zmq.XREP) | |||
hrep.bind("%s:%i"%(iface, hport+1)) | |||
hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500) | |||
hb.start() | |||
### Client connections ### | |||
# Clientele socket | |||
c = ZMQStream(ctx.socket(zmq.XREP), loop) | |||
c.bind("%s:%i"%(iface, cport)) | |||
n = ZMQStream(ctx.socket(zmq.PUB), loop) | |||
n.bind("%s:%i"%(iface, nport)) | |||
thesession = session.StreamSession(username="controller") | |||
# build and launch the queue | |||
sub = ctx.socket(zmq.SUB) | |||
sub.setsockopt(zmq.SUBSCRIBE, "") | |||
monport = sub.bind_to_random_port(iface) | |||
sub = ZMQStream(sub, loop) | |||
# Multiplexer Queue (in a Process) | |||
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |||
q.bind_in("%s:%i"%(iface, cqport)) | |||
q.bind_out("%s:%i"%(iface, eqport)) | |||
q.connect_mon("%s:%i"%(iface, monport)) | |||
q.daemon=True | |||
q.start() | |||
# Control Queue (in a Process) | |||
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |||
q.bind_in("%s:%i"%(iface, ccport)) | |||
q.bind_out("%s:%i"%(iface, ecport)) | |||
q.connect_mon("%s:%i"%(iface, monport)) | |||
q.daemon=True | |||
q.start() | |||
# Task Queue (in a Process) | |||
q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |||
q.bind_in("%s:%i"%(iface, ctport)) | |||
q.bind_out("%s:%i"%(iface, etport)) | |||
q.connect_mon("%s:%i"%(iface, monport)) | |||
q.daemon=True | |||
q.start() | |||
time.sleep(.25) | |||
# build connection dicts | |||
engine_addrs = { | |||
'control' : "%s:%i"%(iface, ecport), | |||
'queue': "%s:%i"%(iface, eqport), | |||
'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)), | |||
'task' : "%s:%i"%(iface, etport), | |||
'monitor' : "%s:%i"%(iface, monport), | |||
} | |||
client_addrs = { | |||
'control' : "%s:%i"%(iface, ccport), | |||
MinRK
|
r3540 | 'query': "%s:%i"%(iface, cport), | |
MinRK
|
r3539 | 'queue': "%s:%i"%(iface, cqport), | |
'task' : "%s:%i"%(iface, ctport), | |||
'notification': "%s:%i"%(iface, nport) | |||
} | |||
con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs) | |||
return loop | |||
if __name__ == '__main__': | |||
loop = setup() | |||
loop.start() |