#!/usr/bin/env python """A simple engine that talks to a controller over 0MQ. it handles registration, etc. and launches a kernel connected to the Controller's queue(s). """ from __future__ import print_function import sys import time import traceback import uuid from pprint import pprint import zmq from zmq.eventloop import ioloop, zmqstream from IPython.utils.traitlets import HasTraits from IPython.utils.localinterfaces import LOCALHOST from streamsession import Message, StreamSession from client import Client from streamkernel import Kernel, make_kernel import heartmonitor from entry_point import make_base_argument_parser, connect_logger, parse_url # import taskthread # from log import logger def printer(*msg): pprint(msg) class Engine(object): """IPython engine""" id=None context=None loop=None session=None ident=None registrar=None heart=None kernel=None user_ns=None def __init__(self, context, loop, session, registrar, client=None, ident=None, heart_id=None, user_ns=None): self.context = context self.loop = loop self.session = session self.registrar = registrar self.client = client self.ident = ident if ident else str(uuid.uuid4()) self.registrar.on_send(printer) self.user_ns = user_ns def register(self): content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) self.registrar.on_recv(self.complete_registration) # print (self.session.key) self.session.send(self.registrar, "registration_request",content=content) def complete_registration(self, msg): # print msg idents,msg = self.session.feed_identities(msg) msg = Message(self.session.unpack_message(msg)) if msg.content.status == 'ok': self.session.username = str(msg.content.id) queue_addr = msg.content.queue shell_addrs = [str(queue_addr)] control_addr = str(msg.content.control) task_addr = msg.content.task if task_addr: shell_addrs.append(str(task_addr)) hb_addrs = msg.content.heartbeat # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() # placeholder for no, since pub isn't hooked up: sub = self.context.socket(zmq.SUB) sub = zmqstream.ZMQStream(sub, self.loop) sub.on_recv(lambda *a: None) port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, client_addr=None, loop=self.loop, context=self.context, key=self.session.key)[-1] self.kernel = k if self.user_ns is not None: self.user_ns.update(self.kernel.user_ns) self.kernel.user_ns = self.user_ns else: # logger.error("Registration Failed: %s"%msg) raise Exception("Registration Failed: %s"%msg) # logger.info("engine::completed registration with id %s"%self.session.username) print (msg) def unregister(self): self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) time.sleep(1) sys.exit(0) def start(self): print ("registering") self.register() def main(argv=None, user_ns=None): parser = make_base_argument_parser() args = parser.parse_args(argv) parse_url(args) iface="%s://%s"%(args.transport,args.ip)+':%i' loop = ioloop.IOLoop.instance() session = StreamSession(keyfile=args.execkey) # print (session.key) ctx = zmq.Context() # setup logging connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) reg_conn = iface % args.regport print (reg_conn) print ("Starting the engine...", file=sys.__stderr__) reg = ctx.socket(zmq.PAIR) reg.connect(reg_conn) reg = zmqstream.ZMQStream(reg, loop) client = None e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns) dc = ioloop.DelayedCallback(e.start, 100, loop) dc.start() loop.start() # Execution as a script if __name__ == '__main__': main()