engine.py
147 lines
| 4.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | |
"""A simple engine that talks to a controller over 0MQ. | |||
it handles registration, etc. and launches a kernel | |||
connected to the Controller's queue(s). | |||
""" | |||
MinRK
|
r3552 | from __future__ import print_function | |
MinRK
|
r3539 | import sys | |
import time | |||
import traceback | |||
import uuid | |||
MinRK
|
r3540 | from pprint import pprint | |
MinRK
|
r3539 | ||
import zmq | |||
from zmq.eventloop import ioloop, zmqstream | |||
from streamsession import Message, StreamSession | |||
from client import Client | |||
import streamkernel as kernel | |||
import heartmonitor | |||
MinRK
|
r3552 | from entry_point import make_base_argument_parser, connect_logger, parse_url | |
MinRK
|
r3539 | # import taskthread | |
# from log import logger | |||
def printer(*msg): | |||
MinRK
|
r3540 | pprint(msg) | |
MinRK
|
r3539 | ||
class Engine(object): | |||
"""IPython engine""" | |||
id=None | |||
context=None | |||
loop=None | |||
session=None | |||
MinRK
|
r3540 | ident=None | |
MinRK
|
r3539 | registrar=None | |
heart=None | |||
kernel=None | |||
MinRK
|
r3540 | def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None): | |
MinRK
|
r3539 | self.context = context | |
self.loop = loop | |||
self.session = session | |||
self.registrar = registrar | |||
self.client = client | |||
MinRK
|
r3540 | self.ident = ident if ident else str(uuid.uuid4()) | |
MinRK
|
r3539 | self.registrar.on_send(printer) | |
def register(self): | |||
MinRK
|
r3540 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | |
MinRK
|
r3539 | self.registrar.on_recv(self.complete_registration) | |
self.session.send(self.registrar, "registration_request",content=content) | |||
def complete_registration(self, msg): | |||
# print msg | |||
idents,msg = self.session.feed_identities(msg) | |||
msg = Message(self.session.unpack_message(msg)) | |||
if msg.content.status == 'ok': | |||
self.session.username = str(msg.content.id) | |||
queue_addr = msg.content.queue | |||
if queue_addr: | |||
queue = self.context.socket(zmq.PAIR) | |||
MinRK
|
r3540 | queue.setsockopt(zmq.IDENTITY, self.ident) | |
MinRK
|
r3539 | queue.connect(str(queue_addr)) | |
self.queue = zmqstream.ZMQStream(queue, self.loop) | |||
control_addr = msg.content.control | |||
if control_addr: | |||
control = self.context.socket(zmq.PAIR) | |||
MinRK
|
r3540 | control.setsockopt(zmq.IDENTITY, self.ident) | |
MinRK
|
r3539 | control.connect(str(control_addr)) | |
self.control = zmqstream.ZMQStream(control, self.loop) | |||
task_addr = msg.content.task | |||
MinRK
|
r3552 | print (task_addr) | |
MinRK
|
r3539 | if task_addr: | |
# task as stream: | |||
task = self.context.socket(zmq.PAIR) | |||
MinRK
|
r3550 | task.setsockopt(zmq.IDENTITY, self.ident) | |
MinRK
|
r3539 | task.connect(str(task_addr)) | |
self.task_stream = zmqstream.ZMQStream(task, self.loop) | |||
# TaskThread: | |||
# mon_addr = msg.content.monitor | |||
MinRK
|
r3540 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) | |
MinRK
|
r3539 | # task.connect_in(str(task_addr)) | |
# task.connect_out(str(mon_addr)) | |||
# self.task_stream = taskthread.QueueStream(*task.queues) | |||
# task.start() | |||
hbs = msg.content.heartbeat | |||
MinRK
|
r3540 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) | |
MinRK
|
r3539 | self.heart.start() | |
# ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |||
# placeholder for now: | |||
pub = self.context.socket(zmq.PUB) | |||
pub = zmqstream.ZMQStream(pub, self.loop) | |||
# create and start the kernel | |||
self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) | |||
self.kernel.start() | |||
else: | |||
# logger.error("Registration Failed: %s"%msg) | |||
raise Exception("Registration Failed: %s"%msg) | |||
# logger.info("engine::completed registration with id %s"%self.session.username) | |||
MinRK
|
r3552 | print (msg) | |
MinRK
|
r3539 | ||
def unregister(self): | |||
self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |||
time.sleep(1) | |||
sys.exit(0) | |||
def start(self): | |||
MinRK
|
r3552 | print ("registering") | |
MinRK
|
r3539 | self.register() | |
MinRK
|
r3550 | def main(): | |
MinRK
|
r3539 | ||
MinRK
|
r3552 | parser = make_base_argument_parser() | |
MinRK
|
r3550 | ||
args = parser.parse_args() | |||
MinRK
|
r3552 | parse_url(args) | |
MinRK
|
r3550 | ||
iface="%s://%s"%(args.transport,args.ip)+':%i' | |||
MinRK
|
r3552 | ||
MinRK
|
r3539 | loop = ioloop.IOLoop.instance() | |
session = StreamSession() | |||
ctx = zmq.Context() | |||
MinRK
|
r3550 | # setup logging | |
connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) | |||
reg_conn = iface % args.regport | |||
MinRK
|
r3552 | print (reg_conn) | |
print ("Starting the engine...", file=sys.__stderr__) | |||
MinRK
|
r3539 | ||
reg = ctx.socket(zmq.PAIR) | |||
reg.connect(reg_conn) | |||
reg = zmqstream.ZMQStream(reg, loop) | |||
client = Client(reg_conn) | |||
MinRK
|
r3550 | e = Engine(ctx, loop, session, reg, client, args.ident) | |
dc = ioloop.DelayedCallback(e.start, 100, loop) | |||
MinRK
|
r3539 | dc.start() | |
loop.start() |