From 8554e339e08c0dc794daa7cf102f205e7f15ac10 2011-04-08 00:38:15 From: MinRK Date: 2011-04-08 00:38:15 Subject: [PATCH] propagate iopub to clients --- diff --git a/IPython/zmq/displayhook.py b/IPython/zmq/displayhook.py index eb36beb..905f129 100644 --- a/IPython/zmq/displayhook.py +++ b/IPython/zmq/displayhook.py @@ -4,6 +4,8 @@ from session import extract_header class DisplayHook(object): + topic=None + def __init__(self, session, pub_socket): self.session = session self.pub_socket = pub_socket @@ -15,7 +17,7 @@ class DisplayHook(object): __builtin__._ = obj msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, - parent=self.parent_header) + parent=self.parent_header, ident=self.topic) def set_parent(self, parent): self.parent_header = extract_header(parent) \ No newline at end of file diff --git a/IPython/zmq/iostream.py b/IPython/zmq/iostream.py index 55f0eeb..a2e0c2c 100644 --- a/IPython/zmq/iostream.py +++ b/IPython/zmq/iostream.py @@ -23,7 +23,8 @@ class OutStream(object): # The time interval between automatic flushes, in seconds. flush_interval = 0.05 - + topic=None + def __init__(self, session, pub_socket, name): self.session = session self.pub_socket = pub_socket @@ -49,10 +50,10 @@ class OutStream(object): enc = sys.stdin.encoding or sys.getdefaultencoding() data = data.decode(enc, 'replace') content = {u'name':self.name, u'data':data} - msg = self.session.send(self.pub_socket, u'stream', - content=content, - parent=self.parent_header) + msg = self.session.send(self.pub_socket, u'stream', content=content, + parent=self.parent_header, ident=self.topic) logger.debug(msg) + self._buffer.close() self._new_buffer() diff --git a/IPython/zmq/parallel/asyncresult.py b/IPython/zmq/parallel/asyncresult.py index e96b75b..efcee41 100644 --- a/IPython/zmq/parallel/asyncresult.py +++ b/IPython/zmq/parallel/asyncresult.py @@ -137,11 +137,13 @@ class AsyncResult(object): return self._metadata @property - @check_ready def result_dict(self): """result property as a dict.""" return self.get_dict(0) + def __dict__(self): + return self.get_dict(0) + #------------------------------------- # dict-access #------------------------------------- diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index cfd857b..33644e8 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -103,6 +103,32 @@ class ResultDict(dict): raise res return res +class Metadata(dict): + """Subclass of dict for initializing metadata values.""" + def __init__(self, *args, **kwargs): + dict.__init__(self) + md = {'msg_id' : None, + 'submitted' : None, + 'started' : None, + 'completed' : None, + 'received' : None, + 'engine_uuid' : None, + 'engine_id' : None, + 'follow' : None, + 'after' : None, + 'status' : None, + + 'pyin' : None, + 'pyout' : None, + 'pyerr' : None, + 'stdout' : '', + 'stderr' : '', + } + self.update(md) + self.update(dict(*args, **kwargs)) + + + class Client(object): """A semi-synchronous client to the IPython ZMQ controller @@ -196,6 +222,7 @@ class Client(object): _registration_socket=None _query_socket=None _control_socket=None + _iopub_socket=None _notification_socket=None _mux_socket=None _task_socket=None @@ -325,6 +352,11 @@ class Client(object): self._control_socket = self.context.socket(zmq.PAIR) self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) connect_socket(self._control_socket, content.control) + if content.iopub: + self._iopub_socket = self.context.socket(zmq.SUB) + self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') + self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) + connect_socket(self._iopub_socket, content.iopub) self._update_engines(dict(content.engines)) else: @@ -350,8 +382,8 @@ class Client(object): if eid in self._ids: self._ids.remove(eid) self._engines.pop(eid) - # - def _build_metadata(self, header, parent, content): + + def _extract_metadata(self, header, parent, content): md = {'msg_id' : parent['msg_id'], 'submitted' : datetime.strptime(parent['date'], ss.ISO8601), 'started' : datetime.strptime(header['started'], ss.ISO8601), @@ -361,8 +393,8 @@ class Client(object): 'engine_id' : self._engines.get(header['engine'], None), 'follow' : parent['follow'], 'after' : parent['after'], - 'status' : content['status'] - } + 'status' : content['status'], + } return md def _handle_execute_reply(self, msg): @@ -390,8 +422,12 @@ class Client(object): content = msg['content'] header = msg['header'] - self.metadata[msg_id] = self._build_metadata(header, parent, content) + # construct metadata: + md = self.metadata.setdefault(msg_id, Metadata()) + md.update(self._extract_metadata(header, parent, content)) + self.metadata[msg_id] = md + # construct result: if content['status'] == 'ok': self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0] elif content['status'] == 'aborted': @@ -448,6 +484,37 @@ class Client(object): pprint(msg) msg = self.session.recv(sock, mode=zmq.NOBLOCK) + def _flush_iopub(self, sock): + """Flush replies from the iopub channel waiting + in the ZMQ queue. + """ + msg = self.session.recv(sock, mode=zmq.NOBLOCK) + while msg is not None: + if self.debug: + pprint(msg) + msg = msg[-1] + parent = msg['parent_header'] + msg_id = parent['msg_id'] + content = msg['content'] + header = msg['header'] + msg_type = msg['msg_type'] + + # init metadata: + md = self.metadata.setdefault(msg_id, Metadata()) + + if msg_type == 'stream': + name = content['name'] + s = md[name] or '' + md[name] = s + content['data'] + elif msg_type == 'pyerr': + md.update({'pyerr' : ss.unwrap_exception(content)}) + else: + md.update({msg_type : content['data']}) + + self.metadata[msg_id] = md + + msg = self.session.recv(sock, mode=zmq.NOBLOCK) + #-------------------------------------------------------------------------- # getitem #-------------------------------------------------------------------------- @@ -501,6 +568,8 @@ class Client(object): self._flush_results(self._task_socket) if self._control_socket: self._flush_control(self._control_socket) + if self._iopub_socket: + self._flush_iopub(self._iopub_socket) def barrier(self, msg_ids=None, timeout=-1): """waits on one or more `msg_ids`, for up to `timeout` seconds. @@ -966,10 +1035,13 @@ class Client(object): parent = rec['header'] header = rec['result_header'] rcontent = rec['result_content'] + iodict = rec['io'] if isinstance(rcontent, str): rcontent = self.session.unpack(rcontent) - self.metadata[msg_id] = self._build_metadata(header, parent, rcontent) + md = self.metadata.setdefault(msg_id, Metadata()) + md.update(self._extract_metadata(header, parent, rcontent)) + md.update(iodict) if rcontent['status'] == 'ok': res,buffers = ss.unserialize_object(buffers) diff --git a/IPython/zmq/parallel/controller.py b/IPython/zmq/parallel/controller.py index f83a648..a3c404d 100755 --- a/IPython/zmq/parallel/controller.py +++ b/IPython/zmq/parallel/controller.py @@ -58,8 +58,8 @@ def make_argument_parser(): help='set the PUB socket for registration notification [default: random]') parser.add_argument('--hb', type=str, metavar='PORTS', help='set the 2 ports for heartbeats [default: random]') - parser.add_argument('--ping', type=int, default=3000, - help='set the heartbeat period in ms [default: 3000]') + parser.add_argument('--ping', type=int, default=100, + help='set the heartbeat period in ms [default: 100]') parser.add_argument('--monitor', type=int, metavar='PORT', default=0, help='set the SUB port for queue monitoring [default: random]') parser.add_argument('--mux', type=str, metavar='PORTS', @@ -68,11 +68,15 @@ def make_argument_parser(): help='set the XREP/XREQ ports for the task queue [default: random]') parser.add_argument('--control', type=str, metavar='PORTS', help='set the XREP ports for the control queue [default: random]') - parser.add_argument('--scheduler', type=str, default='pure', + parser.add_argument('--iopub', type=str, metavar='PORTS', + help='set the PUB/SUB ports for the iopub relay [default: random]') + parser.add_argument('--scheduler', type=str, default='lru', choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], - help='select the task scheduler [default: pure ZMQ]') + help='select the task scheduler [default: Python LRU]') parser.add_argument('--mongodb', action='store_true', help='Use MongoDB task storage [default: in-memory]') + parser.add_argument('--session', type=str, default=None, + help='Manually specify the session id.') return parser @@ -95,6 +99,11 @@ def main(argv=None): else: mux = None random_ports += 2 + if args.iopub: + iopub = split_ports(args.iopub, 2) + else: + iopub = None + random_ports += 2 if args.task: task = split_ports(args.task, 2) else: @@ -139,7 +148,8 @@ def main(argv=None): if args.execkey and not os.path.isfile(args.execkey): generate_exec_key(args.execkey) - thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey) + thesession = session.StreamSession(username=args.ident or "controller", + keyfile=args.execkey, session=args.session) ### build and launch the queues ### @@ -151,6 +161,19 @@ def main(argv=None): ports = select_random_ports(random_ports) children = [] + + # IOPub relay (in a Process) + if not iopub: + iopub = (ports.pop(),ports.pop()) + q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') + q.bind_in(iface%iopub[1]) + q.bind_out(iface%iopub[0]) + q.setsockopt_in(zmq.SUBSCRIBE, '') + q.connect_mon(iface%monport) + q.daemon=True + q.start() + children.append(q.launcher) + # Multiplexer Queue (in a Process) if not mux: mux = (ports.pop(),ports.pop()) @@ -204,6 +227,7 @@ def main(argv=None): 'queue': iface%mux[1], 'heartbeat': (iface%hb[0], iface%hb[1]), 'task' : iface%task[1], + 'iopub' : iface%iopub[1], 'monitor' : iface%monport, } @@ -212,8 +236,11 @@ def main(argv=None): 'query': iface%cport, 'queue': iface%mux[0], 'task' : iface%task[0], + 'iopub' : iface%iopub[0], 'notification': iface%nport } + + # register relay of signals to the children signal_children(children) hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index fc368a3..501ef19 100755 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -26,7 +26,7 @@ from entry_point import make_base_argument_parser, connect_logger, parse_url def printer(*msg): - pprint(msg) + pprint(msg, stream=sys.__stdout__) class Engine(object): """IPython engine""" @@ -69,19 +69,12 @@ class Engine(object): shell_addrs = [str(queue_addr)] control_addr = str(msg.content.control) task_addr = msg.content.task + iopub_addr = msg.content.iopub if task_addr: shell_addrs.append(str(task_addr)) hb_addrs = msg.content.heartbeat # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() - - # placeholder for no, since pub isn't hooked up: - sub = self.context.socket(zmq.SUB) - sub = zmqstream.ZMQStream(sub, self.loop) - sub.on_recv(lambda *a: None) - port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) - iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) - k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, client_addr=None, loop=self.loop, context=self.context, key=self.session.key)[-1] @@ -96,7 +89,7 @@ class Engine(object): # logger.info("engine::completed registration with id %s"%self.session.username) - print (msg) + print (msg,file=sys.__stdout__) def unregister(self): self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) @@ -104,7 +97,7 @@ class Engine(object): sys.exit(0) def start(self): - print ("registering") + print ("registering",file=sys.__stdout__) self.register() @@ -128,7 +121,7 @@ def main(argv=None, user_ns=None): connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) reg_conn = iface % args.regport - print (reg_conn) + print (reg_conn, file=sys.__stdout__) print ("Starting the engine...", file=sys.__stderr__) reg = ctx.socket(zmq.PAIR) diff --git a/IPython/zmq/parallel/entry_point.py b/IPython/zmq/parallel/entry_point.py index 2036186..1ae1544 100644 --- a/IPython/zmq/parallel/entry_point.py +++ b/IPython/zmq/parallel/entry_point.py @@ -91,7 +91,7 @@ def make_base_argument_parser(): help='set the XREP port for registration [default: 10101]') parser.add_argument('--logport', type=int, metavar='PORT', default=20202, help='set the PUB port for logging [default: 10201]') - parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG, + parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG, help='set the log level [default: DEBUG]') parser.add_argument('--ident', type=str, help='set the ZMQ identity [default: random]') @@ -107,6 +107,11 @@ def make_base_argument_parser(): def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): + try: + loglevel = int(loglevel) + except ValueError: + if isinstance(loglevel, str): + loglevel = getattr(logging, loglevel) lsock = context.socket(zmq.PUB) lsock.connect(iface) handler = handlers.PUBHandler(lsock) diff --git a/IPython/zmq/parallel/hub.py b/IPython/zmq/parallel/hub.py index 73679df..681af4e 100755 --- a/IPython/zmq/parallel/hub.py +++ b/IPython/zmq/parallel/hub.py @@ -41,6 +41,10 @@ else: def _passer(*args, **kwargs): return +def _printer(*args, **kwargs): + print (args) + print (kwargs) + def init_record(msg): """return an empty TaskRecord dict, with all keys initialized with None.""" header = msg['header'] @@ -58,7 +62,12 @@ def init_record(msg): 'result_header' : None, 'result_content' : None, 'result_buffers' : None, - 'queue' : None + 'queue' : None, + 'pyin' : None, + 'pyout': None, + 'pyerr': None, + 'stdout': '', + 'stderr': '', } @@ -181,19 +190,20 @@ class Hub(object): # register our callbacks self.registrar.on_recv(self.dispatch_register_request) self.clientele.on_recv(self.dispatch_client_msg) - self.queue.on_recv(self.dispatch_queue_traffic) + self.queue.on_recv(self.dispatch_monitor_traffic) if heartbeat is not None: heartbeat.add_heart_failure_handler(self.handle_heart_failure) heartbeat.add_new_heart_handler(self.handle_new_heart) - self.queue_handlers = { 'in' : self.save_queue_request, + self.monitor_handlers = { 'in' : self.save_queue_request, 'out': self.save_queue_result, 'intask': self.save_task_request, 'outtask': self.save_task_result, 'tracktask': self.save_task_destination, 'incontrol': _passer, 'outcontrol': _passer, + 'iopub': self.save_iopub_message, } self.client_handlers = {'queue_request': self.queue_status, @@ -262,7 +272,7 @@ class Hub(object): try: msg = self.session.unpack_message(msg[1:], content=True) except: - logger.error("client::Invalid Message %s"%msg) + logger.error("client::Invalid Message %s"%msg, exc_info=True) return False msg_type = msg.get('msg_type', None) @@ -299,19 +309,20 @@ class Hub(object): else: handler(idents, msg) - def dispatch_queue_traffic(self, msg): - """all ME and Task queue messages come through here""" - logger.debug("queue traffic: %s"%msg[:2]) + def dispatch_monitor_traffic(self, msg): + """all ME and Task queue messages come through here, as well as + IOPub traffic.""" + logger.debug("monitor traffic: %s"%msg[:2]) switch = msg[0] idents, msg = self.session.feed_identities(msg[1:]) if not idents: - logger.error("Bad Queue Message: %s"%msg) + logger.error("Bad Monitor Message: %s"%msg) return - handler = self.queue_handlers.get(switch, None) + handler = self.monitor_handlers.get(switch, None) if handler is not None: handler(idents, msg) else: - logger.error("Invalid message topic: %s"%switch) + logger.error("Invalid monitor topic: %s"%switch) def dispatch_client_msg(self, msg): @@ -486,7 +497,7 @@ class Hub(object): msg = self.session.unpack_message(msg, content=False) except: logger.error("task::invalid task result message send to %r: %s"%( - client_id, msg)) + client_id, msg), exc_info=True) raise return @@ -532,7 +543,7 @@ class Hub(object): try: msg = self.session.unpack_message(msg, content=True) except: - logger.error("task::invalid task tracking message") + logger.error("task::invalid task tracking message", exc_info=True) return content = msg['content'] print (content) @@ -557,6 +568,43 @@ class Hub(object): # self.session.send('mia_reply', content=content, idents=client_id) + #--------------------- IOPub Traffic ------------------------------ + + def save_iopub_message(self, topics, msg): + """save an iopub message into the db""" + print (topics) + try: + msg = self.session.unpack_message(msg, content=True) + except: + logger.error("iopub::invalid IOPub message", exc_info=True) + return + + parent = msg['parent_header'] + msg_id = parent['msg_id'] + msg_type = msg['msg_type'] + content = msg['content'] + + # ensure msg_id is in db + try: + rec = self.db.get_record(msg_id) + except: + logger.error("iopub::IOPub message has invalid parent", exc_info=True) + return + # stream + d = {} + if msg_type == 'stream': + name = content['name'] + s = rec[name] or '' + d[name] = s + content['data'] + + elif msg_type == 'pyerr': + d['pyerr'] = content + else: + d[msg_type] = content['data'] + + self.db.update_record(msg_id, d) + + #------------------------------------------------------------------------- # Registration requests @@ -579,7 +627,7 @@ class Hub(object): try: queue = content['queue'] except KeyError: - logger.error("registration::queue not specified") + logger.error("registration::queue not specified", exc_info=True) return heart = content.get('heartbeat', None) """register a new engine, and create the socket(s) necessary""" @@ -639,7 +687,7 @@ class Hub(object): try: eid = msg['content']['id'] except: - logger.error("registration::bad engine id for unregistration: %s"%ident) + logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) return logger.info("registration::unregister_engine(%s)"%eid) content=dict(id=eid, queue=self.engines[eid].queue) @@ -662,7 +710,7 @@ class Hub(object): try: (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) except KeyError: - logger.error("registration::tried to finish nonexistant registration") + logger.error("registration::tried to finish nonexistant registration", exc_info=True) return logger.info("registration::finished registering engine %i:%r"%(eid,queue)) if purge is not None: @@ -820,9 +868,13 @@ class Hub(object): completed.append(msg_id) if not statusonly: rec = records[msg_id] + io_dict = {} + for key in 'pyin pyout pyerr stdout stderr'.split(): + io_dict[key] = rec[key] content[msg_id] = { 'result_content': rec['result_content'], 'header': rec['header'], 'result_header' : rec['result_header'], + 'io' : io_dict, } buffers.extend(map(str, rec['result_buffers'])) else: diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 87b333a..9cfaa92 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -30,9 +30,9 @@ from IPython.external.decorator import decorator @decorator def logged(f,self,*args,**kwargs): - print ("#--------------------") - print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) - print ("#--") + # print ("#--------------------") + # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) + # print ("#--") return f(self,*args, **kwargs) #---------------------------------------------------------------------- diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index 1024b66..749088c 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -28,6 +28,9 @@ from IPython.core import ultratb from IPython.utils.traitlets import HasTraits, Instance, List from IPython.zmq.completer import KernelCompleter from IPython.zmq.log import logger # a Logger object +from IPython.zmq.iostream import OutStream +from IPython.zmq.displayhook import DisplayHook + from streamsession import StreamSession, Message, extract_header, serialize_object,\ unpack_apply_message, ISO8601, wrap_exception @@ -36,7 +39,7 @@ import heartmonitor from client import Client def printer(*args): - pprint(args) + pprint(args, stream=sys.__stdout__) #----------------------------------------------------------------------------- # Main kernel class @@ -59,6 +62,7 @@ class Kernel(HasTraits): def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) + self.prefix = 'engine.%s'%self.identity self.user_ns = {} self.history = [] self.compiler = CommandCompiler() @@ -212,18 +216,22 @@ class Kernel(HasTraits): return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) - self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) + self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, + ident=self.identity+'.pyin') started = datetime.now().strftime(ISO8601) try: comp_code = self.compiler(code, '') # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): sys.displayhook.set_parent(parent) + sys.stdout.set_parent(parent) + sys.stderr.set_parent(parent) exec comp_code in self.user_ns, self.user_ns except: exc_content = self._wrap_exception('execute') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) + self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, + ident=self.identity+'.pyerr') reply_content = exc_content else: reply_content = {'status' : 'ok'} @@ -247,7 +255,7 @@ class Kernel(HasTraits): return self.completer.complete(msg.content.line, msg.content.text) def apply_request(self, stream, ident, parent): - print (parent) + # print (parent) try: content = parent[u'content'] bufs = parent[u'buffers'] @@ -266,6 +274,8 @@ class Kernel(HasTraits): # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): sys.displayhook.set_parent(parent) + sys.stdout.set_parent(parent) + sys.stderr.set_parent(parent) # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns if bound: working = self.user_ns @@ -305,7 +315,8 @@ class Kernel(HasTraits): except: exc_content = self._wrap_exception('apply') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) + self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, + ident=self.identity+'.pyerr') reply_content = exc_content result_buf = [] @@ -318,7 +329,7 @@ class Kernel(HasTraits): # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf, subheader=sub) - print(Message(reply_msg), file=sys.__stdout__) + # print(Message(reply_msg), file=sys.__stdout__) # if reply_msg['content']['status'] == u'error': # self.abort_queues() @@ -364,7 +375,7 @@ class Kernel(HasTraits): if self.iopub_stream: self.iopub_stream.on_err(printer) - self.iopub_stream.on_send(printer) + # self.iopub_stream.on_send(printer) #### while True mode: # while True: @@ -388,7 +399,9 @@ class Kernel(HasTraits): # time.sleep(1e-3) def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, - client_addr=None, loop=None, context=None, key=None): + client_addr=None, loop=None, context=None, key=None, + out_stream_factory=OutStream, display_hook_factory=DisplayHook): + # create loop, context, and session: if loop is None: loop = ioloop.IOLoop.instance() @@ -417,6 +430,17 @@ def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, iopub_stream.setsockopt(zmq.IDENTITY, identity) iopub_stream.connect(iopub_addr) + # Redirect input streams and set a display hook. + if out_stream_factory: + sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') + sys.stdout.topic = identity+'.stdout' + sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') + sys.stderr.topic = identity+'.stderr' + if display_hook_factory: + sys.displayhook = display_hook_factory(session, iopub_stream) + sys.displayhook.topic = identity+'.pyout' + + # launch heartbeat heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) heart.start() diff --git a/IPython/zmq/parallel/streamsession.py b/IPython/zmq/parallel/streamsession.py index 58aba18..e012a64 100644 --- a/IPython/zmq/parallel/streamsession.py +++ b/IPython/zmq/parallel/streamsession.py @@ -336,7 +336,7 @@ class StreamSession(object): return header.get('key', None) == self.key - def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): + def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None): """Build and send a message via stream or socket. Parameters @@ -344,10 +344,9 @@ class StreamSession(object): stream : zmq.Socket or ZMQStream the socket-like object used to send the data - msg_type : str or Message/dict - Normally, msg_type will be - - + msg_or_type : str or Message/dict + Normally, msg_or_type will be a msg_type unless a message is being sent more + than once. Returns ------- @@ -356,13 +355,13 @@ class StreamSession(object): the nice wrapped dict-like object containing the headers """ - if isinstance(msg_type, (Message, dict)): + if isinstance(msg_or_type, (Message, dict)): # we got a Message, not a msg_type # don't build a new Message - msg = msg_type + msg = msg_or_type content = msg['content'] else: - msg = self.msg(msg_type, content, parent, subheader) + msg = self.msg(msg_or_type, content, parent, subheader) buffers = [] if buffers is None else buffers to_send = [] if isinstance(ident, list):