##// END OF EJS Templates
added dependencies & Python scheduler
added dependencies & Python scheduler

File last commit:

r3540:c369179d
r3548:48fdd06d
Show More
engine.py
140 lines | 4.6 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
connected to the Controller's queue(s).
"""
import sys
import time
import traceback
import uuid
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import ioloop, zmqstream
from streamsession import Message, StreamSession
from client import Client
import streamkernel as kernel
import heartmonitor
# import taskthread
# from log import logger
def printer(*msg):
MinRK
control channel progress
r3540 pprint(msg)
MinRK
prep newparallel for rebase...
r3539
class Engine(object):
"""IPython engine"""
id=None
context=None
loop=None
session=None
MinRK
control channel progress
r3540 ident=None
MinRK
prep newparallel for rebase...
r3539 registrar=None
heart=None
kernel=None
MinRK
control channel progress
r3540 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
MinRK
prep newparallel for rebase...
r3539 self.context = context
self.loop = loop
self.session = session
self.registrar = registrar
self.client = client
MinRK
control channel progress
r3540 self.ident = ident if ident else str(uuid.uuid4())
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_send(printer)
def register(self):
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
if msg.content.status == 'ok':
self.session.username = str(msg.content.id)
queue_addr = msg.content.queue
if queue_addr:
queue = self.context.socket(zmq.PAIR)
MinRK
control channel progress
r3540 queue.setsockopt(zmq.IDENTITY, self.ident)
MinRK
prep newparallel for rebase...
r3539 queue.connect(str(queue_addr))
self.queue = zmqstream.ZMQStream(queue, self.loop)
control_addr = msg.content.control
if control_addr:
control = self.context.socket(zmq.PAIR)
MinRK
control channel progress
r3540 control.setsockopt(zmq.IDENTITY, self.ident)
MinRK
prep newparallel for rebase...
r3539 control.connect(str(control_addr))
self.control = zmqstream.ZMQStream(control, self.loop)
task_addr = msg.content.task
print task_addr
if task_addr:
# task as stream:
task = self.context.socket(zmq.PAIR)
task.connect(str(task_addr))
self.task_stream = zmqstream.ZMQStream(task, self.loop)
# TaskThread:
# mon_addr = msg.content.monitor
MinRK
control channel progress
r3540 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
MinRK
prep newparallel for rebase...
r3539 # task.connect_in(str(task_addr))
# task.connect_out(str(mon_addr))
# self.task_stream = taskthread.QueueStream(*task.queues)
# task.start()
hbs = msg.content.heartbeat
MinRK
control channel progress
r3540 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.heart.start()
# ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
# placeholder for now:
pub = self.context.socket(zmq.PUB)
pub = zmqstream.ZMQStream(pub, self.loop)
# create and start the kernel
self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
self.kernel.start()
else:
# logger.error("Registration Failed: %s"%msg)
raise Exception("Registration Failed: %s"%msg)
# logger.info("engine::completed registration with id %s"%self.session.username)
print msg
def unregister(self):
self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
time.sleep(1)
sys.exit(0)
def start(self):
print "registering"
self.register()
if __name__ == '__main__':
loop = ioloop.IOLoop.instance()
session = StreamSession()
ctx = zmq.Context()
ip = '127.0.0.1'
reg_port = 10101
connection = ('tcp://%s' % ip) + ':%i'
reg_conn = connection % reg_port
print reg_conn
print >>sys.__stdout__, "Starting the engine..."
reg = ctx.socket(zmq.PAIR)
reg.connect(reg_conn)
reg = zmqstream.ZMQStream(reg, loop)
client = Client(reg_conn)
if len(sys.argv) > 1:
queue_id=sys.argv[1]
else:
queue_id = None
e = Engine(ctx, loop, session, reg, client, queue_id)
dc = ioloop.DelayedCallback(e.start, 500, loop)
dc.start()
loop.start()