##// END OF EJS Templates
propagate iopub to clients
MinRK -
Show More
@@ -4,6 +4,8 b' from session import extract_header'
4 4
5 5 class DisplayHook(object):
6 6
7 topic=None
8
7 9 def __init__(self, session, pub_socket):
8 10 self.session = session
9 11 self.pub_socket = pub_socket
@@ -15,7 +17,7 b' class DisplayHook(object):'
15 17
16 18 __builtin__._ = obj
17 19 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
20 parent=self.parent_header, ident=self.topic)
19 21
20 22 def set_parent(self, parent):
21 23 self.parent_header = extract_header(parent) No newline at end of file
@@ -23,7 +23,8 b' class OutStream(object):'
23 23
24 24 # The time interval between automatic flushes, in seconds.
25 25 flush_interval = 0.05
26
26 topic=None
27
27 28 def __init__(self, session, pub_socket, name):
28 29 self.session = session
29 30 self.pub_socket = pub_socket
@@ -49,10 +50,10 b' class OutStream(object):'
49 50 enc = sys.stdin.encoding or sys.getdefaultencoding()
50 51 data = data.decode(enc, 'replace')
51 52 content = {u'name':self.name, u'data':data}
52 msg = self.session.send(self.pub_socket, u'stream',
53 content=content,
54 parent=self.parent_header)
53 msg = self.session.send(self.pub_socket, u'stream', content=content,
54 parent=self.parent_header, ident=self.topic)
55 55 logger.debug(msg)
56
56 57 self._buffer.close()
57 58 self._new_buffer()
58 59
@@ -137,11 +137,13 b' class AsyncResult(object):'
137 137 return self._metadata
138 138
139 139 @property
140 @check_ready
141 140 def result_dict(self):
142 141 """result property as a dict."""
143 142 return self.get_dict(0)
144 143
144 def __dict__(self):
145 return self.get_dict(0)
146
145 147 #-------------------------------------
146 148 # dict-access
147 149 #-------------------------------------
@@ -103,6 +103,32 b' class ResultDict(dict):'
103 103 raise res
104 104 return res
105 105
106 class Metadata(dict):
107 """Subclass of dict for initializing metadata values."""
108 def __init__(self, *args, **kwargs):
109 dict.__init__(self)
110 md = {'msg_id' : None,
111 'submitted' : None,
112 'started' : None,
113 'completed' : None,
114 'received' : None,
115 'engine_uuid' : None,
116 'engine_id' : None,
117 'follow' : None,
118 'after' : None,
119 'status' : None,
120
121 'pyin' : None,
122 'pyout' : None,
123 'pyerr' : None,
124 'stdout' : '',
125 'stderr' : '',
126 }
127 self.update(md)
128 self.update(dict(*args, **kwargs))
129
130
131
106 132 class Client(object):
107 133 """A semi-synchronous client to the IPython ZMQ controller
108 134
@@ -196,6 +222,7 b' class Client(object):'
196 222 _registration_socket=None
197 223 _query_socket=None
198 224 _control_socket=None
225 _iopub_socket=None
199 226 _notification_socket=None
200 227 _mux_socket=None
201 228 _task_socket=None
@@ -325,6 +352,11 b' class Client(object):'
325 352 self._control_socket = self.context.socket(zmq.PAIR)
326 353 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
327 354 connect_socket(self._control_socket, content.control)
355 if content.iopub:
356 self._iopub_socket = self.context.socket(zmq.SUB)
357 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
358 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
359 connect_socket(self._iopub_socket, content.iopub)
328 360 self._update_engines(dict(content.engines))
329 361
330 362 else:
@@ -350,8 +382,8 b' class Client(object):'
350 382 if eid in self._ids:
351 383 self._ids.remove(eid)
352 384 self._engines.pop(eid)
353 #
354 def _build_metadata(self, header, parent, content):
385
386 def _extract_metadata(self, header, parent, content):
355 387 md = {'msg_id' : parent['msg_id'],
356 388 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
357 389 'started' : datetime.strptime(header['started'], ss.ISO8601),
@@ -361,8 +393,8 b' class Client(object):'
361 393 'engine_id' : self._engines.get(header['engine'], None),
362 394 'follow' : parent['follow'],
363 395 'after' : parent['after'],
364 'status' : content['status']
365 }
396 'status' : content['status'],
397 }
366 398 return md
367 399
368 400 def _handle_execute_reply(self, msg):
@@ -390,8 +422,12 b' class Client(object):'
390 422 content = msg['content']
391 423 header = msg['header']
392 424
393 self.metadata[msg_id] = self._build_metadata(header, parent, content)
425 # construct metadata:
426 md = self.metadata.setdefault(msg_id, Metadata())
427 md.update(self._extract_metadata(header, parent, content))
428 self.metadata[msg_id] = md
394 429
430 # construct result:
395 431 if content['status'] == 'ok':
396 432 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
397 433 elif content['status'] == 'aborted':
@@ -448,6 +484,37 b' class Client(object):'
448 484 pprint(msg)
449 485 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
450 486
487 def _flush_iopub(self, sock):
488 """Flush replies from the iopub channel waiting
489 in the ZMQ queue.
490 """
491 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
492 while msg is not None:
493 if self.debug:
494 pprint(msg)
495 msg = msg[-1]
496 parent = msg['parent_header']
497 msg_id = parent['msg_id']
498 content = msg['content']
499 header = msg['header']
500 msg_type = msg['msg_type']
501
502 # init metadata:
503 md = self.metadata.setdefault(msg_id, Metadata())
504
505 if msg_type == 'stream':
506 name = content['name']
507 s = md[name] or ''
508 md[name] = s + content['data']
509 elif msg_type == 'pyerr':
510 md.update({'pyerr' : ss.unwrap_exception(content)})
511 else:
512 md.update({msg_type : content['data']})
513
514 self.metadata[msg_id] = md
515
516 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
517
451 518 #--------------------------------------------------------------------------
452 519 # getitem
453 520 #--------------------------------------------------------------------------
@@ -501,6 +568,8 b' class Client(object):'
501 568 self._flush_results(self._task_socket)
502 569 if self._control_socket:
503 570 self._flush_control(self._control_socket)
571 if self._iopub_socket:
572 self._flush_iopub(self._iopub_socket)
504 573
505 574 def barrier(self, msg_ids=None, timeout=-1):
506 575 """waits on one or more `msg_ids`, for up to `timeout` seconds.
@@ -966,10 +1035,13 b' class Client(object):'
966 1035 parent = rec['header']
967 1036 header = rec['result_header']
968 1037 rcontent = rec['result_content']
1038 iodict = rec['io']
969 1039 if isinstance(rcontent, str):
970 1040 rcontent = self.session.unpack(rcontent)
971 1041
972 self.metadata[msg_id] = self._build_metadata(header, parent, rcontent)
1042 md = self.metadata.setdefault(msg_id, Metadata())
1043 md.update(self._extract_metadata(header, parent, rcontent))
1044 md.update(iodict)
973 1045
974 1046 if rcontent['status'] == 'ok':
975 1047 res,buffers = ss.unserialize_object(buffers)
@@ -58,8 +58,8 b' def make_argument_parser():'
58 58 help='set the PUB socket for registration notification [default: random]')
59 59 parser.add_argument('--hb', type=str, metavar='PORTS',
60 60 help='set the 2 ports for heartbeats [default: random]')
61 parser.add_argument('--ping', type=int, default=3000,
62 help='set the heartbeat period in ms [default: 3000]')
61 parser.add_argument('--ping', type=int, default=100,
62 help='set the heartbeat period in ms [default: 100]')
63 63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
64 64 help='set the SUB port for queue monitoring [default: random]')
65 65 parser.add_argument('--mux', type=str, metavar='PORTS',
@@ -68,11 +68,15 b' def make_argument_parser():'
68 68 help='set the XREP/XREQ ports for the task queue [default: random]')
69 69 parser.add_argument('--control', type=str, metavar='PORTS',
70 70 help='set the XREP ports for the control queue [default: random]')
71 parser.add_argument('--scheduler', type=str, default='pure',
71 parser.add_argument('--iopub', type=str, metavar='PORTS',
72 help='set the PUB/SUB ports for the iopub relay [default: random]')
73 parser.add_argument('--scheduler', type=str, default='lru',
72 74 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
73 help='select the task scheduler [default: pure ZMQ]')
75 help='select the task scheduler [default: Python LRU]')
74 76 parser.add_argument('--mongodb', action='store_true',
75 77 help='Use MongoDB task storage [default: in-memory]')
78 parser.add_argument('--session', type=str, default=None,
79 help='Manually specify the session id.')
76 80
77 81 return parser
78 82
@@ -95,6 +99,11 b' def main(argv=None):'
95 99 else:
96 100 mux = None
97 101 random_ports += 2
102 if args.iopub:
103 iopub = split_ports(args.iopub, 2)
104 else:
105 iopub = None
106 random_ports += 2
98 107 if args.task:
99 108 task = split_ports(args.task, 2)
100 109 else:
@@ -139,7 +148,8 b' def main(argv=None):'
139 148 if args.execkey and not os.path.isfile(args.execkey):
140 149 generate_exec_key(args.execkey)
141 150
142 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
151 thesession = session.StreamSession(username=args.ident or "controller",
152 keyfile=args.execkey, session=args.session)
143 153
144 154 ### build and launch the queues ###
145 155
@@ -151,6 +161,19 b' def main(argv=None):'
151 161
152 162 ports = select_random_ports(random_ports)
153 163 children = []
164
165 # IOPub relay (in a Process)
166 if not iopub:
167 iopub = (ports.pop(),ports.pop())
168 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
169 q.bind_in(iface%iopub[1])
170 q.bind_out(iface%iopub[0])
171 q.setsockopt_in(zmq.SUBSCRIBE, '')
172 q.connect_mon(iface%monport)
173 q.daemon=True
174 q.start()
175 children.append(q.launcher)
176
154 177 # Multiplexer Queue (in a Process)
155 178 if not mux:
156 179 mux = (ports.pop(),ports.pop())
@@ -204,6 +227,7 b' def main(argv=None):'
204 227 'queue': iface%mux[1],
205 228 'heartbeat': (iface%hb[0], iface%hb[1]),
206 229 'task' : iface%task[1],
230 'iopub' : iface%iopub[1],
207 231 'monitor' : iface%monport,
208 232 }
209 233
@@ -212,8 +236,11 b' def main(argv=None):'
212 236 'query': iface%cport,
213 237 'queue': iface%mux[0],
214 238 'task' : iface%task[0],
239 'iopub' : iface%iopub[0],
215 240 'notification': iface%nport
216 241 }
242
243 # register relay of signals to the children
217 244 signal_children(children)
218 245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
219 246 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
@@ -26,7 +26,7 b' from entry_point import make_base_argument_parser, connect_logger, parse_url'
26 26
27 27
28 28 def printer(*msg):
29 pprint(msg)
29 pprint(msg, stream=sys.__stdout__)
30 30
31 31 class Engine(object):
32 32 """IPython engine"""
@@ -69,19 +69,12 b' class Engine(object):'
69 69 shell_addrs = [str(queue_addr)]
70 70 control_addr = str(msg.content.control)
71 71 task_addr = msg.content.task
72 iopub_addr = msg.content.iopub
72 73 if task_addr:
73 74 shell_addrs.append(str(task_addr))
74 75
75 76 hb_addrs = msg.content.heartbeat
76 77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77
78 # placeholder for no, since pub isn't hooked up:
79 sub = self.context.socket(zmq.SUB)
80 sub = zmqstream.ZMQStream(sub, self.loop)
81 sub.on_recv(lambda *a: None)
82 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
83 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
84
85 78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
86 79 hb_addrs, client_addr=None, loop=self.loop,
87 80 context=self.context, key=self.session.key)[-1]
@@ -96,7 +89,7 b' class Engine(object):'
96 89
97 90 # logger.info("engine::completed registration with id %s"%self.session.username)
98 91
99 print (msg)
92 print (msg,file=sys.__stdout__)
100 93
101 94 def unregister(self):
102 95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -104,7 +97,7 b' class Engine(object):'
104 97 sys.exit(0)
105 98
106 99 def start(self):
107 print ("registering")
100 print ("registering",file=sys.__stdout__)
108 101 self.register()
109 102
110 103
@@ -128,7 +121,7 b' def main(argv=None, user_ns=None):'
128 121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
129 122
130 123 reg_conn = iface % args.regport
131 print (reg_conn)
124 print (reg_conn, file=sys.__stdout__)
132 125 print ("Starting the engine...", file=sys.__stderr__)
133 126
134 127 reg = ctx.socket(zmq.PAIR)
@@ -91,7 +91,7 b' def make_base_argument_parser():'
91 91 help='set the XREP port for registration [default: 10101]')
92 92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 93 help='set the PUB port for logging [default: 10201]')
94 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 95 help='set the log level [default: DEBUG]')
96 96 parser.add_argument('--ident', type=str,
97 97 help='set the ZMQ identity [default: random]')
@@ -107,6 +107,11 b' def make_base_argument_parser():'
107 107
108 108
109 109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
110 try:
111 loglevel = int(loglevel)
112 except ValueError:
113 if isinstance(loglevel, str):
114 loglevel = getattr(logging, loglevel)
110 115 lsock = context.socket(zmq.PUB)
111 116 lsock.connect(iface)
112 117 handler = handlers.PUBHandler(lsock)
@@ -41,6 +41,10 b' else:'
41 41 def _passer(*args, **kwargs):
42 42 return
43 43
44 def _printer(*args, **kwargs):
45 print (args)
46 print (kwargs)
47
44 48 def init_record(msg):
45 49 """return an empty TaskRecord dict, with all keys initialized with None."""
46 50 header = msg['header']
@@ -58,7 +62,12 b' def init_record(msg):'
58 62 'result_header' : None,
59 63 'result_content' : None,
60 64 'result_buffers' : None,
61 'queue' : None
65 'queue' : None,
66 'pyin' : None,
67 'pyout': None,
68 'pyerr': None,
69 'stdout': '',
70 'stderr': '',
62 71 }
63 72
64 73
@@ -181,19 +190,20 b' class Hub(object):'
181 190 # register our callbacks
182 191 self.registrar.on_recv(self.dispatch_register_request)
183 192 self.clientele.on_recv(self.dispatch_client_msg)
184 self.queue.on_recv(self.dispatch_queue_traffic)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
185 194
186 195 if heartbeat is not None:
187 196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
188 197 heartbeat.add_new_heart_handler(self.handle_new_heart)
189 198
190 self.queue_handlers = { 'in' : self.save_queue_request,
199 self.monitor_handlers = { 'in' : self.save_queue_request,
191 200 'out': self.save_queue_result,
192 201 'intask': self.save_task_request,
193 202 'outtask': self.save_task_result,
194 203 'tracktask': self.save_task_destination,
195 204 'incontrol': _passer,
196 205 'outcontrol': _passer,
206 'iopub': self.save_iopub_message,
197 207 }
198 208
199 209 self.client_handlers = {'queue_request': self.queue_status,
@@ -262,7 +272,7 b' class Hub(object):'
262 272 try:
263 273 msg = self.session.unpack_message(msg[1:], content=True)
264 274 except:
265 logger.error("client::Invalid Message %s"%msg)
275 logger.error("client::Invalid Message %s"%msg, exc_info=True)
266 276 return False
267 277
268 278 msg_type = msg.get('msg_type', None)
@@ -299,19 +309,20 b' class Hub(object):'
299 309 else:
300 310 handler(idents, msg)
301 311
302 def dispatch_queue_traffic(self, msg):
303 """all ME and Task queue messages come through here"""
304 logger.debug("queue traffic: %s"%msg[:2])
312 def dispatch_monitor_traffic(self, msg):
313 """all ME and Task queue messages come through here, as well as
314 IOPub traffic."""
315 logger.debug("monitor traffic: %s"%msg[:2])
305 316 switch = msg[0]
306 317 idents, msg = self.session.feed_identities(msg[1:])
307 318 if not idents:
308 logger.error("Bad Queue Message: %s"%msg)
319 logger.error("Bad Monitor Message: %s"%msg)
309 320 return
310 handler = self.queue_handlers.get(switch, None)
321 handler = self.monitor_handlers.get(switch, None)
311 322 if handler is not None:
312 323 handler(idents, msg)
313 324 else:
314 logger.error("Invalid message topic: %s"%switch)
325 logger.error("Invalid monitor topic: %s"%switch)
315 326
316 327
317 328 def dispatch_client_msg(self, msg):
@@ -486,7 +497,7 b' class Hub(object):'
486 497 msg = self.session.unpack_message(msg, content=False)
487 498 except:
488 499 logger.error("task::invalid task result message send to %r: %s"%(
489 client_id, msg))
500 client_id, msg), exc_info=True)
490 501 raise
491 502 return
492 503
@@ -532,7 +543,7 b' class Hub(object):'
532 543 try:
533 544 msg = self.session.unpack_message(msg, content=True)
534 545 except:
535 logger.error("task::invalid task tracking message")
546 logger.error("task::invalid task tracking message", exc_info=True)
536 547 return
537 548 content = msg['content']
538 549 print (content)
@@ -557,6 +568,43 b' class Hub(object):'
557 568 # self.session.send('mia_reply', content=content, idents=client_id)
558 569
559 570
571 #--------------------- IOPub Traffic ------------------------------
572
573 def save_iopub_message(self, topics, msg):
574 """save an iopub message into the db"""
575 print (topics)
576 try:
577 msg = self.session.unpack_message(msg, content=True)
578 except:
579 logger.error("iopub::invalid IOPub message", exc_info=True)
580 return
581
582 parent = msg['parent_header']
583 msg_id = parent['msg_id']
584 msg_type = msg['msg_type']
585 content = msg['content']
586
587 # ensure msg_id is in db
588 try:
589 rec = self.db.get_record(msg_id)
590 except:
591 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
592 return
593 # stream
594 d = {}
595 if msg_type == 'stream':
596 name = content['name']
597 s = rec[name] or ''
598 d[name] = s + content['data']
599
600 elif msg_type == 'pyerr':
601 d['pyerr'] = content
602 else:
603 d[msg_type] = content['data']
604
605 self.db.update_record(msg_id, d)
606
607
560 608
561 609 #-------------------------------------------------------------------------
562 610 # Registration requests
@@ -579,7 +627,7 b' class Hub(object):'
579 627 try:
580 628 queue = content['queue']
581 629 except KeyError:
582 logger.error("registration::queue not specified")
630 logger.error("registration::queue not specified", exc_info=True)
583 631 return
584 632 heart = content.get('heartbeat', None)
585 633 """register a new engine, and create the socket(s) necessary"""
@@ -639,7 +687,7 b' class Hub(object):'
639 687 try:
640 688 eid = msg['content']['id']
641 689 except:
642 logger.error("registration::bad engine id for unregistration: %s"%ident)
690 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
643 691 return
644 692 logger.info("registration::unregister_engine(%s)"%eid)
645 693 content=dict(id=eid, queue=self.engines[eid].queue)
@@ -662,7 +710,7 b' class Hub(object):'
662 710 try:
663 711 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
664 712 except KeyError:
665 logger.error("registration::tried to finish nonexistant registration")
713 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
666 714 return
667 715 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
668 716 if purge is not None:
@@ -820,9 +868,13 b' class Hub(object):'
820 868 completed.append(msg_id)
821 869 if not statusonly:
822 870 rec = records[msg_id]
871 io_dict = {}
872 for key in 'pyin pyout pyerr stdout stderr'.split():
873 io_dict[key] = rec[key]
823 874 content[msg_id] = { 'result_content': rec['result_content'],
824 875 'header': rec['header'],
825 876 'result_header' : rec['result_header'],
877 'io' : io_dict,
826 878 }
827 879 buffers.extend(map(str, rec['result_buffers']))
828 880 else:
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator'
30 30
31 31 @decorator
32 32 def logged(f,self,*args,**kwargs):
33 print ("#--------------------")
34 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 print ("#--")
33 # print ("#--------------------")
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 # print ("#--")
36 36 return f(self,*args, **kwargs)
37 37
38 38 #----------------------------------------------------------------------
@@ -28,6 +28,9 b' from IPython.core import ultratb'
28 28 from IPython.utils.traitlets import HasTraits, Instance, List
29 29 from IPython.zmq.completer import KernelCompleter
30 30 from IPython.zmq.log import logger # a Logger object
31 from IPython.zmq.iostream import OutStream
32 from IPython.zmq.displayhook import DisplayHook
33
31 34
32 35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
33 36 unpack_apply_message, ISO8601, wrap_exception
@@ -36,7 +39,7 b' import heartmonitor'
36 39 from client import Client
37 40
38 41 def printer(*args):
39 pprint(args)
42 pprint(args, stream=sys.__stdout__)
40 43
41 44 #-----------------------------------------------------------------------------
42 45 # Main kernel class
@@ -59,6 +62,7 b' class Kernel(HasTraits):'
59 62 def __init__(self, **kwargs):
60 63 super(Kernel, self).__init__(**kwargs)
61 64 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
62 66 self.user_ns = {}
63 67 self.history = []
64 68 self.compiler = CommandCompiler()
@@ -212,18 +216,22 b' class Kernel(HasTraits):'
212 216 return
213 217 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
214 218 # self.iopub_stream.send(pyin_msg)
215 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
219 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
216 221 started = datetime.now().strftime(ISO8601)
217 222 try:
218 223 comp_code = self.compiler(code, '<zmq-kernel>')
219 224 # allow for not overriding displayhook
220 225 if hasattr(sys.displayhook, 'set_parent'):
221 226 sys.displayhook.set_parent(parent)
227 sys.stdout.set_parent(parent)
228 sys.stderr.set_parent(parent)
222 229 exec comp_code in self.user_ns, self.user_ns
223 230 except:
224 231 exc_content = self._wrap_exception('execute')
225 232 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
226 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
233 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
227 235 reply_content = exc_content
228 236 else:
229 237 reply_content = {'status' : 'ok'}
@@ -247,7 +255,7 b' class Kernel(HasTraits):'
247 255 return self.completer.complete(msg.content.line, msg.content.text)
248 256
249 257 def apply_request(self, stream, ident, parent):
250 print (parent)
258 # print (parent)
251 259 try:
252 260 content = parent[u'content']
253 261 bufs = parent[u'buffers']
@@ -266,6 +274,8 b' class Kernel(HasTraits):'
266 274 # allow for not overriding displayhook
267 275 if hasattr(sys.displayhook, 'set_parent'):
268 276 sys.displayhook.set_parent(parent)
277 sys.stdout.set_parent(parent)
278 sys.stderr.set_parent(parent)
269 279 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
270 280 if bound:
271 281 working = self.user_ns
@@ -305,7 +315,8 b' class Kernel(HasTraits):'
305 315 except:
306 316 exc_content = self._wrap_exception('apply')
307 317 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
308 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
318 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
309 320 reply_content = exc_content
310 321 result_buf = []
311 322
@@ -318,7 +329,7 b' class Kernel(HasTraits):'
318 329 # self.reply_socket.send_json(reply_msg)
319 330 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
320 331 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
321 print(Message(reply_msg), file=sys.__stdout__)
332 # print(Message(reply_msg), file=sys.__stdout__)
322 333 # if reply_msg['content']['status'] == u'error':
323 334 # self.abort_queues()
324 335
@@ -364,7 +375,7 b' class Kernel(HasTraits):'
364 375
365 376 if self.iopub_stream:
366 377 self.iopub_stream.on_err(printer)
367 self.iopub_stream.on_send(printer)
378 # self.iopub_stream.on_send(printer)
368 379
369 380 #### while True mode:
370 381 # while True:
@@ -388,7 +399,9 b' class Kernel(HasTraits):'
388 399 # time.sleep(1e-3)
389 400
390 401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
391 client_addr=None, loop=None, context=None, key=None):
402 client_addr=None, loop=None, context=None, key=None,
403 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404
392 405 # create loop, context, and session:
393 406 if loop is None:
394 407 loop = ioloop.IOLoop.instance()
@@ -417,6 +430,17 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
417 430 iopub_stream.setsockopt(zmq.IDENTITY, identity)
418 431 iopub_stream.connect(iopub_addr)
419 432
433 # Redirect input streams and set a display hook.
434 if out_stream_factory:
435 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
437 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
439 if display_hook_factory:
440 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
442
443
420 444 # launch heartbeat
421 445 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
422 446 heart.start()
@@ -336,7 +336,7 b' class StreamSession(object):'
336 336 return header.get('key', None) == self.key
337 337
338 338
339 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
339 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
340 340 """Build and send a message via stream or socket.
341 341
342 342 Parameters
@@ -344,10 +344,9 b' class StreamSession(object):'
344 344
345 345 stream : zmq.Socket or ZMQStream
346 346 the socket-like object used to send the data
347 msg_type : str or Message/dict
348 Normally, msg_type will be
349
350
347 msg_or_type : str or Message/dict
348 Normally, msg_or_type will be a msg_type unless a message is being sent more
349 than once.
351 350
352 351 Returns
353 352 -------
@@ -356,13 +355,13 b' class StreamSession(object):'
356 355 the nice wrapped dict-like object containing the headers
357 356
358 357 """
359 if isinstance(msg_type, (Message, dict)):
358 if isinstance(msg_or_type, (Message, dict)):
360 359 # we got a Message, not a msg_type
361 360 # don't build a new Message
362 msg = msg_type
361 msg = msg_or_type
363 362 content = msg['content']
364 363 else:
365 msg = self.msg(msg_type, content, parent, subheader)
364 msg = self.msg(msg_or_type, content, parent, subheader)
366 365 buffers = [] if buffers is None else buffers
367 366 to_send = []
368 367 if isinstance(ident, list):
General Comments 0
You need to be logged in to leave comments. Login now