##// END OF EJS Templates
adapt kernel's ipcluster and Launchers to newparallel
adapt kernel's ipcluster and Launchers to newparallel

File last commit:

r3604:2c044319
r3605:2d79d3e4
Show More
streamkernel.py
490 lines | 18.2 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""
Kernel adapted from kernel.py to use ZMQ Streams
"""
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 #-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports.
MinRK
use print_function
r3553 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 import __builtin__
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 from code import CommandCompiler
MinRK
control channel progress
r3540 import os
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import traceback
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import logging
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 from datetime import datetime
MinRK
prep newparallel for rebase...
r3539 from signal import SIGTERM, SIGKILL
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # System library imports.
MinRK
prep newparallel for rebase...
r3539 import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # Local imports.
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 from IPython.core import ultratb
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.traitlets import HasTraits, Instance, List, Int, Dict, Set, Str
MinRK
added dependency decorator
r3546 from IPython.zmq.completer import KernelCompleter
MinRK
propagate iopub to clients
r3602 from IPython.zmq.iostream import OutStream
from IPython.zmq.displayhook import DisplayHook
MinRK
Refactor newparallel to use Config system...
r3604 from factory import SessionFactory
MinRK
prep newparallel for rebase...
r3539 from streamsession import StreamSession, Message, extract_header, serialize_object,\
MinRK
Clients can now shutdown the controller.
r3580 unpack_apply_message, ISO8601, wrap_exception
MinRK
added dependency decorator
r3546 from dependency import UnmetDependency
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 import heartmonitor
from client import Client
MinRK
prep newparallel for rebase...
r3539
MinRK
control channel progress
r3540 def printer(*args):
MinRK
propagate iopub to clients
r3602 pprint(args, stream=sys.__stdout__)
MinRK
control channel progress
r3540
MinRK
Refactor newparallel to use Config system...
r3604
class _Passer:
"""Empty class that implements `send()` that does nothing."""
def send(self, *args, **kwargs):
pass
send_multipart = send
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 #-----------------------------------------------------------------------------
# Main kernel class
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class Kernel(SessionFactory):
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 #---------------------------------------------------------------------------
# Kernel interface
#---------------------------------------------------------------------------
MinRK
Refactor newparallel to use Config system...
r3604
# kwargs:
int_id = Int(-1, config=True)
user_ns = Dict(config=True)
exec_lines = List(config=True)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 control_stream = Instance(zmqstream.ZMQStream)
task_stream = Instance(zmqstream.ZMQStream)
iopub_stream = Instance(zmqstream.ZMQStream)
MinRK
Refactor newparallel to use Config system...
r3604 client = Instance('IPython.zmq.parallel.client.Client')
# internals
shell_streams = List()
compiler = Instance(CommandCompiler, (), {})
completer = Instance(KernelCompleter)
aborted = Set()
shell_handlers = Dict()
control_handlers = Dict()
def _set_prefix(self):
self.prefix = "engine.%s"%self.int_id
def _connect_completer(self):
self.completer = KernelCompleter(self.user_ns)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
MinRK
Refactor newparallel to use Config system...
r3604 self._set_prefix()
self._connect_completer()
self.on_trait_change(self._set_prefix, 'id')
self.on_trait_change(self._connect_completer, 'user_ns')
MinRK
prep newparallel for rebase...
r3539
# Build dict of handlers for message types
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for msg_type in ['execute_request', 'complete_request', 'apply_request',
'clear_request']:
self.shell_handlers[msg_type] = getattr(self, msg_type)
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
MinRK
prep newparallel for rebase...
r3539 self.control_handlers[msg_type] = getattr(self, msg_type)
MinRK
Refactor newparallel to use Config system...
r3604
self._initial_exec_lines()
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583
def _wrap_exception(self, method=None):
MinRK
Refactor newparallel to use Config system...
r3604 e_info = dict(engineid=self.ident, method=method)
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 content=wrap_exception(e_info)
return content
MinRK
Refactor newparallel to use Config system...
r3604 def _initial_exec_lines(self):
s = _Passer()
content = dict(silent=True, user_variable=[],user_expressions=[])
for line in self.exec_lines:
logging.debug("executing initialization: %s"%line)
content.update({'code':line})
msg = self.session.msg('execute_request', content)
self.execute_request(s, [], msg)
MinRK
prep newparallel for rebase...
r3539 #-------------------- control handlers -----------------------------
MinRK
control channel progress
r3540 def abort_queues(self):
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for stream in self.shell_streams:
MinRK
control channel progress
r3540 if stream:
self.abort_queue(stream)
MinRK
prep newparallel for rebase...
r3539
def abort_queue(self, stream):
while True:
try:
msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
MinRK
general parallel code cleanup
r3556 except zmq.ZMQError as e:
MinRK
prep newparallel for rebase...
r3539 if e.errno == zmq.EAGAIN:
break
else:
return
else:
if msg is None:
return
else:
idents,msg = msg
# assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
# msg = self.reply_socket.recv_json()
MinRK
Refactor newparallel to use Config system...
r3604 logging.info("Aborting:")
logging.info(str(msg))
MinRK
prep newparallel for rebase...
r3539 msg_type = msg['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(stream, reply_type,
MinRK
added zmq controller/engine entry points
r3550 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
MinRK
Refactor newparallel to use Config system...
r3604 logging.debug(str(reply_msg))
MinRK
prep newparallel for rebase...
r3539 # We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.05)
def abort_request(self, stream, ident, parent):
MinRK
control channel progress
r3540 """abort a specifig msg by id"""
MinRK
prep newparallel for rebase...
r3539 msg_ids = parent['content'].get('msg_ids', None)
MinRK
control channel progress
r3540 if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
MinRK
prep newparallel for rebase...
r3539 if not msg_ids:
MinRK
control channel progress
r3540 self.abort_queues()
MinRK
prep newparallel for rebase...
r3539 for mid in msg_ids:
MinRK
control channel progress
r3540 self.aborted.add(str(mid))
MinRK
prep newparallel for rebase...
r3539
content = dict(status='ok')
MinRK
added zmq controller/engine entry points
r3550 reply_msg = self.session.send(stream, 'abort_reply', content=content,
parent=parent, ident=ident)[0]
MinRK
Refactor newparallel to use Config system...
r3604 logging.debug(str(reply_msg))
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 def shutdown_request(self, stream, ident, parent):
MinRK
use wrap_exception in controller, fix clear on kernel
r3560 """kill ourself. This should really be handled in an external process"""
MinRK
Clients can now shutdown the controller.
r3580 try:
self.abort_queues()
except:
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 content = self._wrap_exception('shutdown')
MinRK
Clients can now shutdown the controller.
r3580 else:
content = dict(parent['content'])
content['status'] = 'ok'
MinRK
added exec_key and fixed client.shutdown
r3575 msg = self.session.send(stream, 'shutdown_reply',
content=content, parent=parent, ident=ident)
# msg = self.session.send(self.pub_socket, 'shutdown_reply',
# content, parent, ident)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # print >> sys.__stdout__, msg
MinRK
Clients can now shutdown the controller.
r3580 # time.sleep(0.2)
dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
dc.start()
MinRK
use wrap_exception in controller, fix clear on kernel
r3560
MinRK
prep newparallel for rebase...
r3539 def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
MinRK
added exec_key and fixed client.shutdown
r3575 try:
msg = self.session.unpack_message(msg, content=True, copy=False)
except:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("Invalid Message", exc_info=True)
MinRK
added exec_key and fixed client.shutdown
r3575 return
MinRK
prep newparallel for rebase...
r3539
header = msg['header']
msg_id = header['msg_id']
handler = self.control_handlers.get(msg['msg_type'], None)
if handler is None:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
control channel progress
r3540 handler(self.control_stream, idents, msg)
MinRK
prep newparallel for rebase...
r3539
#-------------------- queue helpers ------------------------------
def check_dependencies(self, dependencies):
if not dependencies:
return True
if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
anyorall = dependencies[0]
dependencies = dependencies[1]
else:
anyorall = 'all'
results = self.client.get_results(dependencies,status_only=True)
if results['status'] != 'ok':
return False
if anyorall == 'any':
if not results['completed']:
return False
else:
if results['pending']:
return False
return True
MinRK
control channel progress
r3540 def check_aborted(self, msg_id):
return msg_id in self.aborted
MinRK
prep newparallel for rebase...
r3539 #-------------------- queue handlers -----------------------------
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.user_ns = {}
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
content = dict(status='ok'))
MinRK
Refactor newparallel to use Config system...
r3604 self._initial_exec_lines()
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
prep newparallel for rebase...
r3539 def execute_request(self, stream, ident, parent):
MinRK
Refactor newparallel to use Config system...
r3604 logging.debug('execute request %s'%parent)
MinRK
prep newparallel for rebase...
r3539 try:
code = parent[u'content'][u'code']
except:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("Got bad msg: %s"%parent, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 return
MinRK
propagate iopub to clients
r3602 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 ident='%s.pyin'%self.prefix)
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 started = datetime.now().strftime(ISO8601)
MinRK
prep newparallel for rebase...
r3539 try:
comp_code = self.compiler(code, '<zmq-kernel>')
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
MinRK
propagate iopub to clients
r3602 sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
MinRK
prep newparallel for rebase...
r3539 exec comp_code in self.user_ns, self.user_ns
except:
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 exc_content = self._wrap_exception('execute')
MinRK
prep newparallel for rebase...
r3539 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
MinRK
propagate iopub to clients
r3602 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 ident='%s.pyerr'%self.prefix)
MinRK
prep newparallel for rebase...
r3539 reply_content = exc_content
else:
reply_content = {'status' : 'ok'}
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
# self.reply_socket.send(ident, zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
ident=ident, subheader = dict(started=started))
MinRK
Refactor newparallel to use Config system...
r3604 logging.debug(str(reply_msg))
MinRK
prep newparallel for rebase...
r3539 if reply_msg['content']['status'] == u'error':
MinRK
control channel progress
r3540 self.abort_queues()
MinRK
prep newparallel for rebase...
r3539
def complete_request(self, stream, ident, parent):
matches = {'matches' : self.complete(parent),
'status' : 'ok'}
completion_msg = self.session.send(stream, 'complete_reply',
matches, parent, ident)
# print >> sys.__stdout__, completion_msg
def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
def apply_request(self, stream, ident, parent):
MinRK
propagate iopub to clients
r3602 # print (parent)
MinRK
prep newparallel for rebase...
r3539 try:
content = parent[u'content']
bufs = parent[u'buffers']
msg_id = parent['header']['msg_id']
bound = content.get('bound', False)
except:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("Got bad msg: %s"%parent, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 return
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # self.iopub_stream.send(pyin_msg)
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
MinRK
Refactor newparallel to use Config system...
r3604 sub = {'dependencies_met' : True, 'engine' : self.ident,
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 'started': datetime.now().strftime(ISO8601)}
MinRK
prep newparallel for rebase...
r3539 try:
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
MinRK
propagate iopub to clients
r3602 sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
MinRK
prep newparallel for rebase...
r3539 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
if bound:
working = self.user_ns
suffix = str(msg_id).replace("-","")
prefix = "_"
else:
working = dict()
MinRK
control channel progress
r3540 suffix = prefix = "_" # prevent keyword collisions with lambda
MinRK
prep newparallel for rebase...
r3539 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
# if f.fun
MinRK
split pendingresult and remotefunction into own files, add view.map.
r3588 if hasattr(f, 'func_name'):
fname = f.func_name
else:
fname = f.__name__
fname = prefix+fname.strip('<>')+suffix
MinRK
prep newparallel for rebase...
r3539 argname = prefix+"args"+suffix
kwargname = prefix+"kwargs"+suffix
resultname = prefix+"result"+suffix
ns = { fname : f, argname : args, kwargname : kwargs }
# print ns
working.update(ns)
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
exec code in working, working
result = working.get(resultname)
# clear the namespace
if bound:
for key in ns.iterkeys():
self.user_ns.pop(key)
else:
del working
packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
except:
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 exc_content = self._wrap_exception('apply')
MinRK
prep newparallel for rebase...
r3539 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
MinRK
propagate iopub to clients
r3602 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 ident='%s.pyerr'%self.prefix)
MinRK
prep newparallel for rebase...
r3539 reply_content = exc_content
result_buf = []
MinRK
added dependency decorator
r3546
MinRK
adapt kernel/error.py to zmq, improve error propagation.
r3583 if exc_content['ename'] == UnmetDependency.__name__:
MinRK
use wrap_exception in controller, fix clear on kernel
r3560 sub['dependencies_met'] = False
MinRK
prep newparallel for rebase...
r3539 else:
reply_content = {'status' : 'ok'}
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
# self.reply_socket.send(ident, zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
MinRK
added dependency decorator
r3546 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, subheader=sub)
MinRK
propagate iopub to clients
r3602 # print(Message(reply_msg), file=sys.__stdout__)
MinRK
added dependency decorator
r3546 # if reply_msg['content']['status'] == u'error':
# self.abort_queues()
MinRK
prep newparallel for rebase...
r3539
def dispatch_queue(self, stream, msg):
MinRK
use new stream.flush()
r3541 self.control_stream.flush()
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg, copy=False)
MinRK
added exec_key and fixed client.shutdown
r3575 try:
msg = self.session.unpack_message(msg, content=True, copy=False)
except:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("Invalid Message", exc_info=True)
MinRK
added exec_key and fixed client.shutdown
r3575 return
MinRK
prep newparallel for rebase...
r3539
header = msg['header']
msg_id = header['msg_id']
if self.check_aborted(msg_id):
MinRK
control channel progress
r3540 self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg['msg_type'].split('_')[0] + '_reply'
reply_msg = self.session.send(stream, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)
return
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 handler = self.shell_handlers.get(msg['msg_type'], None)
MinRK
prep newparallel for rebase...
r3539 if handler is None:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
MinRK
prep newparallel for rebase...
r3539 else:
handler(stream, idents, msg)
def start(self):
#### stream mode:
if self.control_stream:
self.control_stream.on_recv(self.dispatch_control, copy=False)
MinRK
control channel progress
r3540 self.control_stream.on_err(printer)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 def make_dispatcher(stream):
def dispatcher(msg):
return self.dispatch_queue(stream, msg)
return dispatcher
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 for s in self.shell_streams:
MinRK
Refactor newparallel to use Config system...
r3604 # s.on_recv(printer)
MinRK
add timestamps all messages; fix reply on wrong channel bug.
r3578 s.on_recv(make_dispatcher(s), copy=False)
MinRK
Refactor newparallel to use Config system...
r3604 # s.on_err(printer)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
if self.iopub_stream:
self.iopub_stream.on_err(printer)
MinRK
propagate iopub to clients
r3602 # self.iopub_stream.on_send(printer)
MinRK
prep newparallel for rebase...
r3539
#### while True mode:
# while True:
# idle = True
# try:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # msg = self.shell_stream.socket.recv_multipart(
MinRK
prep newparallel for rebase...
r3539 # zmq.NOBLOCK, copy=False)
# except zmq.ZMQError, e:
# if e.errno != zmq.EAGAIN:
# raise e
# else:
# idle=False
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # self.dispatch_queue(self.shell_stream, msg)
MinRK
prep newparallel for rebase...
r3539 #
# if not self.task_stream.empty():
# idle=False
# msg = self.task_stream.recv_multipart()
# self.dispatch_queue(self.task_stream, msg)
# if idle:
# # don't busywait
# time.sleep(1e-3)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
MinRK
propagate iopub to clients
r3602 client_addr=None, loop=None, context=None, key=None,
out_stream_factory=OutStream, display_hook_factory=DisplayHook):
MinRK
Refactor newparallel to use Config system...
r3604 """NO LONGER IN USE"""
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # create loop, context, and session:
if loop is None:
loop = ioloop.IOLoop.instance()
if context is None:
context = zmq.Context()
c = context
MinRK
added exec_key and fixed client.shutdown
r3575 session = StreamSession(key=key)
# print (session.key)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
# create Control Stream
control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
control_stream.setsockopt(zmq.IDENTITY, identity)
control_stream.connect(control_addr)
# create Shell Streams (MUX, Task, etc.):
shell_streams = []
for addr in shell_addrs:
stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
stream.setsockopt(zmq.IDENTITY, identity)
stream.connect(addr)
shell_streams.append(stream)
MinRK
prep newparallel for rebase...
r3539
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # create iopub stream:
iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
iopub_stream.connect(iopub_addr)
MinRK
propagate iopub to clients
r3602 # Redirect input streams and set a display hook.
if out_stream_factory:
sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 sys.stdout.topic = 'engine.%i.stdout'%int_id
MinRK
propagate iopub to clients
r3602 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 sys.stderr.topic = 'engine.%i.stderr'%int_id
MinRK
propagate iopub to clients
r3602 if display_hook_factory:
sys.displayhook = display_hook_factory(session, iopub_stream)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 sys.displayhook.topic = 'engine.%i.pyout'%int_id
MinRK
propagate iopub to clients
r3602
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 # launch heartbeat
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
heart.start()
# create (optional) Client
if client_addr:
client = Client(client_addr, username=identity)
else:
client = None
MinRK
Refactor newparallel to use Config system...
r3604 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_streams=shell_streams, iopub_stream=iopub_stream,
MinRK
Clients can now shutdown the controller.
r3580 client=client, loop=loop)
MinRK
prep newparallel for rebase...
r3539 kernel.start()
Fernando Perez
Allow argv and namespace control to be passed to engines/controller.
r3576 return loop, c, kernel
MinRK
prep newparallel for rebase...
r3539