engine.py
140 lines
| 4.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | |
"""A simple engine that talks to a controller over 0MQ. | |||
it handles registration, etc. and launches a kernel | |||
connected to the Controller's queue(s). | |||
""" | |||
import sys | |||
import time | |||
import traceback | |||
import uuid | |||
MinRK
|
r3540 | from pprint import pprint | |
MinRK
|
r3539 | ||
import zmq | |||
from zmq.eventloop import ioloop, zmqstream | |||
from streamsession import Message, StreamSession | |||
from client import Client | |||
import streamkernel as kernel | |||
import heartmonitor | |||
# import taskthread | |||
# from log import logger | |||
def printer(*msg): | |||
MinRK
|
r3540 | pprint(msg) | |
MinRK
|
r3539 | ||
class Engine(object): | |||
"""IPython engine""" | |||
id=None | |||
context=None | |||
loop=None | |||
session=None | |||
MinRK
|
r3540 | ident=None | |
MinRK
|
r3539 | registrar=None | |
heart=None | |||
kernel=None | |||
MinRK
|
r3540 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): | |
MinRK
|
r3539 | self.context = context | |
self.loop = loop | |||
self.session = session | |||
self.registrar = registrar | |||
self.client = client | |||
MinRK
|
r3540 | self.ident = ident if ident else str(uuid.uuid4()) | |
MinRK
|
r3539 | self.registrar.on_send(printer) | |
def register(self): | |||
MinRK
|
r3540 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
MinRK
|
r3539 | self.registrar.on_recv(self.complete_registration) | |
self.session.send(self.registrar, "registration_request",content=content) | |||
def complete_registration(self, msg): | |||
# print msg | |||
idents,msg = self.session.feed_identities(msg) | |||
msg = Message(self.session.unpack_message(msg)) | |||
if msg.content.status == 'ok': | |||
self.session.username = str(msg.content.id) | |||
queue_addr = msg.content.queue | |||
if queue_addr: | |||
queue = self.context.socket(zmq.PAIR) | |||
MinRK
|
r3540 | queue.setsockopt(zmq.IDENTITY, self.ident) | |
MinRK
|
r3539 | queue.connect(str(queue_addr)) | |
self.queue = zmqstream.ZMQStream(queue, self.loop) | |||
control_addr = msg.content.control | |||
if control_addr: | |||
control = self.context.socket(zmq.PAIR) | |||
MinRK
|
r3540 | control.setsockopt(zmq.IDENTITY, self.ident) | |
MinRK
|
r3539 | control.connect(str(control_addr)) | |
self.control = zmqstream.ZMQStream(control, self.loop) | |||
task_addr = msg.content.task | |||
print task_addr | |||
if task_addr: | |||
# task as stream: | |||
task = self.context.socket(zmq.PAIR) | |||
task.connect(str(task_addr)) | |||
self.task_stream = zmqstream.ZMQStream(task, self.loop) | |||
# TaskThread: | |||
# mon_addr = msg.content.monitor | |||
MinRK
|
r3540 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) | |
MinRK
|
r3539 | # task.connect_in(str(task_addr)) | |
# task.connect_out(str(mon_addr)) | |||
# self.task_stream = taskthread.QueueStream(*task.queues) | |||
# task.start() | |||
hbs = msg.content.heartbeat | |||
MinRK
|
r3540 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) | |
MinRK
|
r3539 | self.heart.start() | |
# ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |||
# placeholder for now: | |||
pub = self.context.socket(zmq.PUB) | |||
pub = zmqstream.ZMQStream(pub, self.loop) | |||
# create and start the kernel | |||
self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) | |||
self.kernel.start() | |||
else: | |||
# logger.error("Registration Failed: %s"%msg) | |||
raise Exception("Registration Failed: %s"%msg) | |||
# logger.info("engine::completed registration with id %s"%self.session.username) | |||
print msg | |||
def unregister(self): | |||
self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |||
time.sleep(1) | |||
sys.exit(0) | |||
def start(self): | |||
print "registering" | |||
self.register() | |||
if __name__ == '__main__': | |||
loop = ioloop.IOLoop.instance() | |||
session = StreamSession() | |||
ctx = zmq.Context() | |||
ip = '127.0.0.1' | |||
reg_port = 10101 | |||
connection = ('tcp://%s' % ip) + ':%i' | |||
reg_conn = connection % reg_port | |||
print reg_conn | |||
print >>sys.__stdout__, "Starting the engine..." | |||
reg = ctx.socket(zmq.PAIR) | |||
reg.connect(reg_conn) | |||
reg = zmqstream.ZMQStream(reg, loop) | |||
client = Client(reg_conn) | |||
if len(sys.argv) > 1: | |||
queue_id=sys.argv[1] | |||
else: | |||
queue_id = None | |||
e = Engine(ctx, loop, session, reg, client, queue_id) | |||
dc = ioloop.DelayedCallback(e.start, 500, loop) | |||
dc.start() | |||
loop.start() |