##// END OF EJS Templates
general parallel code cleanup
general parallel code cleanup

File last commit:

r3552:d405cd69
r3556:b8dd49c8
Show More
engine.py
147 lines | 4.8 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
connected to the Controller's queue(s).
"""
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import traceback
import uuid
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import ioloop, zmqstream
from streamsession import Message, StreamSession
from client import Client
import streamkernel as kernel
import heartmonitor
MinRK
added simple cluster entry point
r3552 from entry_point import make_base_argument_parser, connect_logger, parse_url
MinRK
prep newparallel for rebase...
r3539 # import taskthread
# from log import logger
def printer(*msg):
MinRK
control channel progress
r3540 pprint(msg)
MinRK
prep newparallel for rebase...
r3539
class Engine(object):
"""IPython engine"""
id=None
context=None
loop=None
session=None
MinRK
control channel progress
r3540 ident=None
MinRK
prep newparallel for rebase...
r3539 registrar=None
heart=None
kernel=None
MinRK
control channel progress
r3540 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
MinRK
prep newparallel for rebase...
r3539 self.context = context
self.loop = loop
self.session = session
self.registrar = registrar
self.client = client
MinRK
control channel progress
r3540 self.ident = ident if ident else str(uuid.uuid4())
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_send(printer)
def register(self):
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
if msg.content.status == 'ok':
self.session.username = str(msg.content.id)
queue_addr = msg.content.queue
if queue_addr:
queue = self.context.socket(zmq.PAIR)
MinRK
control channel progress
r3540 queue.setsockopt(zmq.IDENTITY, self.ident)
MinRK
prep newparallel for rebase...
r3539 queue.connect(str(queue_addr))
self.queue = zmqstream.ZMQStream(queue, self.loop)
control_addr = msg.content.control
if control_addr:
control = self.context.socket(zmq.PAIR)
MinRK
control channel progress
r3540 control.setsockopt(zmq.IDENTITY, self.ident)
MinRK
prep newparallel for rebase...
r3539 control.connect(str(control_addr))
self.control = zmqstream.ZMQStream(control, self.loop)
task_addr = msg.content.task
MinRK
added simple cluster entry point
r3552 print (task_addr)
MinRK
prep newparallel for rebase...
r3539 if task_addr:
# task as stream:
task = self.context.socket(zmq.PAIR)
MinRK
added zmq controller/engine entry points
r3550 task.setsockopt(zmq.IDENTITY, self.ident)
MinRK
prep newparallel for rebase...
r3539 task.connect(str(task_addr))
self.task_stream = zmqstream.ZMQStream(task, self.loop)
# TaskThread:
# mon_addr = msg.content.monitor
MinRK
control channel progress
r3540 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
MinRK
prep newparallel for rebase...
r3539 # task.connect_in(str(task_addr))
# task.connect_out(str(mon_addr))
# self.task_stream = taskthread.QueueStream(*task.queues)
# task.start()
hbs = msg.content.heartbeat
MinRK
control channel progress
r3540 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.heart.start()
# ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
# placeholder for now:
pub = self.context.socket(zmq.PUB)
pub = zmqstream.ZMQStream(pub, self.loop)
# create and start the kernel
self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
self.kernel.start()
else:
# logger.error("Registration Failed: %s"%msg)
raise Exception("Registration Failed: %s"%msg)
# logger.info("engine::completed registration with id %s"%self.session.username)
MinRK
added simple cluster entry point
r3552 print (msg)
MinRK
prep newparallel for rebase...
r3539
def unregister(self):
self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
time.sleep(1)
sys.exit(0)
def start(self):
MinRK
added simple cluster entry point
r3552 print ("registering")
MinRK
prep newparallel for rebase...
r3539 self.register()
MinRK
added zmq controller/engine entry points
r3550 def main():
MinRK
prep newparallel for rebase...
r3539
MinRK
added simple cluster entry point
r3552 parser = make_base_argument_parser()
MinRK
added zmq controller/engine entry points
r3550
args = parser.parse_args()
MinRK
added simple cluster entry point
r3552 parse_url(args)
MinRK
added zmq controller/engine entry points
r3550
iface="%s://%s"%(args.transport,args.ip)+':%i'
MinRK
added simple cluster entry point
r3552
MinRK
prep newparallel for rebase...
r3539 loop = ioloop.IOLoop.instance()
session = StreamSession()
ctx = zmq.Context()
MinRK
added zmq controller/engine entry points
r3550 # setup logging
connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
reg_conn = iface % args.regport
MinRK
added simple cluster entry point
r3552 print (reg_conn)
print ("Starting the engine...", file=sys.__stderr__)
MinRK
prep newparallel for rebase...
r3539
reg = ctx.socket(zmq.PAIR)
reg.connect(reg_conn)
reg = zmqstream.ZMQStream(reg, loop)
client = Client(reg_conn)
MinRK
added zmq controller/engine entry points
r3550 e = Engine(ctx, loop, session, reg, client, args.ident)
dc = ioloop.DelayedCallback(e.start, 100, loop)
MinRK
prep newparallel for rebase...
r3539 dc.start()
loop.start()