##// END OF EJS Templates
propagate iopub to clients
MinRK -
Show More
@@ -4,6 +4,8 b' from session import extract_header'
4
4
5 class DisplayHook(object):
5 class DisplayHook(object):
6
6
7 topic=None
8
7 def __init__(self, session, pub_socket):
9 def __init__(self, session, pub_socket):
8 self.session = session
10 self.session = session
9 self.pub_socket = pub_socket
11 self.pub_socket = pub_socket
@@ -15,7 +17,7 b' class DisplayHook(object):'
15
17
16 __builtin__._ = obj
18 __builtin__._ = obj
17 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
19 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
20 parent=self.parent_header, ident=self.topic)
19
21
20 def set_parent(self, parent):
22 def set_parent(self, parent):
21 self.parent_header = extract_header(parent) No newline at end of file
23 self.parent_header = extract_header(parent)
@@ -23,7 +23,8 b' class OutStream(object):'
23
23
24 # The time interval between automatic flushes, in seconds.
24 # The time interval between automatic flushes, in seconds.
25 flush_interval = 0.05
25 flush_interval = 0.05
26
26 topic=None
27
27 def __init__(self, session, pub_socket, name):
28 def __init__(self, session, pub_socket, name):
28 self.session = session
29 self.session = session
29 self.pub_socket = pub_socket
30 self.pub_socket = pub_socket
@@ -49,10 +50,10 b' class OutStream(object):'
49 enc = sys.stdin.encoding or sys.getdefaultencoding()
50 enc = sys.stdin.encoding or sys.getdefaultencoding()
50 data = data.decode(enc, 'replace')
51 data = data.decode(enc, 'replace')
51 content = {u'name':self.name, u'data':data}
52 content = {u'name':self.name, u'data':data}
52 msg = self.session.send(self.pub_socket, u'stream',
53 msg = self.session.send(self.pub_socket, u'stream', content=content,
53 content=content,
54 parent=self.parent_header, ident=self.topic)
54 parent=self.parent_header)
55 logger.debug(msg)
55 logger.debug(msg)
56
56 self._buffer.close()
57 self._buffer.close()
57 self._new_buffer()
58 self._new_buffer()
58
59
@@ -137,11 +137,13 b' class AsyncResult(object):'
137 return self._metadata
137 return self._metadata
138
138
139 @property
139 @property
140 @check_ready
141 def result_dict(self):
140 def result_dict(self):
142 """result property as a dict."""
141 """result property as a dict."""
143 return self.get_dict(0)
142 return self.get_dict(0)
144
143
144 def __dict__(self):
145 return self.get_dict(0)
146
145 #-------------------------------------
147 #-------------------------------------
146 # dict-access
148 # dict-access
147 #-------------------------------------
149 #-------------------------------------
@@ -103,6 +103,32 b' class ResultDict(dict):'
103 raise res
103 raise res
104 return res
104 return res
105
105
106 class Metadata(dict):
107 """Subclass of dict for initializing metadata values."""
108 def __init__(self, *args, **kwargs):
109 dict.__init__(self)
110 md = {'msg_id' : None,
111 'submitted' : None,
112 'started' : None,
113 'completed' : None,
114 'received' : None,
115 'engine_uuid' : None,
116 'engine_id' : None,
117 'follow' : None,
118 'after' : None,
119 'status' : None,
120
121 'pyin' : None,
122 'pyout' : None,
123 'pyerr' : None,
124 'stdout' : '',
125 'stderr' : '',
126 }
127 self.update(md)
128 self.update(dict(*args, **kwargs))
129
130
131
106 class Client(object):
132 class Client(object):
107 """A semi-synchronous client to the IPython ZMQ controller
133 """A semi-synchronous client to the IPython ZMQ controller
108
134
@@ -196,6 +222,7 b' class Client(object):'
196 _registration_socket=None
222 _registration_socket=None
197 _query_socket=None
223 _query_socket=None
198 _control_socket=None
224 _control_socket=None
225 _iopub_socket=None
199 _notification_socket=None
226 _notification_socket=None
200 _mux_socket=None
227 _mux_socket=None
201 _task_socket=None
228 _task_socket=None
@@ -325,6 +352,11 b' class Client(object):'
325 self._control_socket = self.context.socket(zmq.PAIR)
352 self._control_socket = self.context.socket(zmq.PAIR)
326 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
353 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
327 connect_socket(self._control_socket, content.control)
354 connect_socket(self._control_socket, content.control)
355 if content.iopub:
356 self._iopub_socket = self.context.socket(zmq.SUB)
357 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
358 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
359 connect_socket(self._iopub_socket, content.iopub)
328 self._update_engines(dict(content.engines))
360 self._update_engines(dict(content.engines))
329
361
330 else:
362 else:
@@ -350,8 +382,8 b' class Client(object):'
350 if eid in self._ids:
382 if eid in self._ids:
351 self._ids.remove(eid)
383 self._ids.remove(eid)
352 self._engines.pop(eid)
384 self._engines.pop(eid)
353 #
385
354 def _build_metadata(self, header, parent, content):
386 def _extract_metadata(self, header, parent, content):
355 md = {'msg_id' : parent['msg_id'],
387 md = {'msg_id' : parent['msg_id'],
356 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
388 'submitted' : datetime.strptime(parent['date'], ss.ISO8601),
357 'started' : datetime.strptime(header['started'], ss.ISO8601),
389 'started' : datetime.strptime(header['started'], ss.ISO8601),
@@ -361,8 +393,8 b' class Client(object):'
361 'engine_id' : self._engines.get(header['engine'], None),
393 'engine_id' : self._engines.get(header['engine'], None),
362 'follow' : parent['follow'],
394 'follow' : parent['follow'],
363 'after' : parent['after'],
395 'after' : parent['after'],
364 'status' : content['status']
396 'status' : content['status'],
365 }
397 }
366 return md
398 return md
367
399
368 def _handle_execute_reply(self, msg):
400 def _handle_execute_reply(self, msg):
@@ -390,8 +422,12 b' class Client(object):'
390 content = msg['content']
422 content = msg['content']
391 header = msg['header']
423 header = msg['header']
392
424
393 self.metadata[msg_id] = self._build_metadata(header, parent, content)
425 # construct metadata:
426 md = self.metadata.setdefault(msg_id, Metadata())
427 md.update(self._extract_metadata(header, parent, content))
428 self.metadata[msg_id] = md
394
429
430 # construct result:
395 if content['status'] == 'ok':
431 if content['status'] == 'ok':
396 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
432 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
397 elif content['status'] == 'aborted':
433 elif content['status'] == 'aborted':
@@ -448,6 +484,37 b' class Client(object):'
448 pprint(msg)
484 pprint(msg)
449 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
485 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
450
486
487 def _flush_iopub(self, sock):
488 """Flush replies from the iopub channel waiting
489 in the ZMQ queue.
490 """
491 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
492 while msg is not None:
493 if self.debug:
494 pprint(msg)
495 msg = msg[-1]
496 parent = msg['parent_header']
497 msg_id = parent['msg_id']
498 content = msg['content']
499 header = msg['header']
500 msg_type = msg['msg_type']
501
502 # init metadata:
503 md = self.metadata.setdefault(msg_id, Metadata())
504
505 if msg_type == 'stream':
506 name = content['name']
507 s = md[name] or ''
508 md[name] = s + content['data']
509 elif msg_type == 'pyerr':
510 md.update({'pyerr' : ss.unwrap_exception(content)})
511 else:
512 md.update({msg_type : content['data']})
513
514 self.metadata[msg_id] = md
515
516 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
517
451 #--------------------------------------------------------------------------
518 #--------------------------------------------------------------------------
452 # getitem
519 # getitem
453 #--------------------------------------------------------------------------
520 #--------------------------------------------------------------------------
@@ -501,6 +568,8 b' class Client(object):'
501 self._flush_results(self._task_socket)
568 self._flush_results(self._task_socket)
502 if self._control_socket:
569 if self._control_socket:
503 self._flush_control(self._control_socket)
570 self._flush_control(self._control_socket)
571 if self._iopub_socket:
572 self._flush_iopub(self._iopub_socket)
504
573
505 def barrier(self, msg_ids=None, timeout=-1):
574 def barrier(self, msg_ids=None, timeout=-1):
506 """waits on one or more `msg_ids`, for up to `timeout` seconds.
575 """waits on one or more `msg_ids`, for up to `timeout` seconds.
@@ -966,10 +1035,13 b' class Client(object):'
966 parent = rec['header']
1035 parent = rec['header']
967 header = rec['result_header']
1036 header = rec['result_header']
968 rcontent = rec['result_content']
1037 rcontent = rec['result_content']
1038 iodict = rec['io']
969 if isinstance(rcontent, str):
1039 if isinstance(rcontent, str):
970 rcontent = self.session.unpack(rcontent)
1040 rcontent = self.session.unpack(rcontent)
971
1041
972 self.metadata[msg_id] = self._build_metadata(header, parent, rcontent)
1042 md = self.metadata.setdefault(msg_id, Metadata())
1043 md.update(self._extract_metadata(header, parent, rcontent))
1044 md.update(iodict)
973
1045
974 if rcontent['status'] == 'ok':
1046 if rcontent['status'] == 'ok':
975 res,buffers = ss.unserialize_object(buffers)
1047 res,buffers = ss.unserialize_object(buffers)
@@ -58,8 +58,8 b' def make_argument_parser():'
58 help='set the PUB socket for registration notification [default: random]')
58 help='set the PUB socket for registration notification [default: random]')
59 parser.add_argument('--hb', type=str, metavar='PORTS',
59 parser.add_argument('--hb', type=str, metavar='PORTS',
60 help='set the 2 ports for heartbeats [default: random]')
60 help='set the 2 ports for heartbeats [default: random]')
61 parser.add_argument('--ping', type=int, default=3000,
61 parser.add_argument('--ping', type=int, default=100,
62 help='set the heartbeat period in ms [default: 3000]')
62 help='set the heartbeat period in ms [default: 100]')
63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
63 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
64 help='set the SUB port for queue monitoring [default: random]')
64 help='set the SUB port for queue monitoring [default: random]')
65 parser.add_argument('--mux', type=str, metavar='PORTS',
65 parser.add_argument('--mux', type=str, metavar='PORTS',
@@ -68,11 +68,15 b' def make_argument_parser():'
68 help='set the XREP/XREQ ports for the task queue [default: random]')
68 help='set the XREP/XREQ ports for the task queue [default: random]')
69 parser.add_argument('--control', type=str, metavar='PORTS',
69 parser.add_argument('--control', type=str, metavar='PORTS',
70 help='set the XREP ports for the control queue [default: random]')
70 help='set the XREP ports for the control queue [default: random]')
71 parser.add_argument('--scheduler', type=str, default='pure',
71 parser.add_argument('--iopub', type=str, metavar='PORTS',
72 help='set the PUB/SUB ports for the iopub relay [default: random]')
73 parser.add_argument('--scheduler', type=str, default='lru',
72 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
74 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
73 help='select the task scheduler [default: pure ZMQ]')
75 help='select the task scheduler [default: Python LRU]')
74 parser.add_argument('--mongodb', action='store_true',
76 parser.add_argument('--mongodb', action='store_true',
75 help='Use MongoDB task storage [default: in-memory]')
77 help='Use MongoDB task storage [default: in-memory]')
78 parser.add_argument('--session', type=str, default=None,
79 help='Manually specify the session id.')
76
80
77 return parser
81 return parser
78
82
@@ -95,6 +99,11 b' def main(argv=None):'
95 else:
99 else:
96 mux = None
100 mux = None
97 random_ports += 2
101 random_ports += 2
102 if args.iopub:
103 iopub = split_ports(args.iopub, 2)
104 else:
105 iopub = None
106 random_ports += 2
98 if args.task:
107 if args.task:
99 task = split_ports(args.task, 2)
108 task = split_ports(args.task, 2)
100 else:
109 else:
@@ -139,7 +148,8 b' def main(argv=None):'
139 if args.execkey and not os.path.isfile(args.execkey):
148 if args.execkey and not os.path.isfile(args.execkey):
140 generate_exec_key(args.execkey)
149 generate_exec_key(args.execkey)
141
150
142 thesession = session.StreamSession(username=args.ident or "controller", keyfile=args.execkey)
151 thesession = session.StreamSession(username=args.ident or "controller",
152 keyfile=args.execkey, session=args.session)
143
153
144 ### build and launch the queues ###
154 ### build and launch the queues ###
145
155
@@ -151,6 +161,19 b' def main(argv=None):'
151
161
152 ports = select_random_ports(random_ports)
162 ports = select_random_ports(random_ports)
153 children = []
163 children = []
164
165 # IOPub relay (in a Process)
166 if not iopub:
167 iopub = (ports.pop(),ports.pop())
168 q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A')
169 q.bind_in(iface%iopub[1])
170 q.bind_out(iface%iopub[0])
171 q.setsockopt_in(zmq.SUBSCRIBE, '')
172 q.connect_mon(iface%monport)
173 q.daemon=True
174 q.start()
175 children.append(q.launcher)
176
154 # Multiplexer Queue (in a Process)
177 # Multiplexer Queue (in a Process)
155 if not mux:
178 if not mux:
156 mux = (ports.pop(),ports.pop())
179 mux = (ports.pop(),ports.pop())
@@ -204,6 +227,7 b' def main(argv=None):'
204 'queue': iface%mux[1],
227 'queue': iface%mux[1],
205 'heartbeat': (iface%hb[0], iface%hb[1]),
228 'heartbeat': (iface%hb[0], iface%hb[1]),
206 'task' : iface%task[1],
229 'task' : iface%task[1],
230 'iopub' : iface%iopub[1],
207 'monitor' : iface%monport,
231 'monitor' : iface%monport,
208 }
232 }
209
233
@@ -212,8 +236,11 b' def main(argv=None):'
212 'query': iface%cport,
236 'query': iface%cport,
213 'queue': iface%mux[0],
237 'queue': iface%mux[0],
214 'task' : iface%task[0],
238 'task' : iface%task[0],
239 'iopub' : iface%iopub[0],
215 'notification': iface%nport
240 'notification': iface%nport
216 }
241 }
242
243 # register relay of signals to the children
217 signal_children(children)
244 signal_children(children)
218 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
219 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
246 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
@@ -26,7 +26,7 b' from entry_point import make_base_argument_parser, connect_logger, parse_url'
26
26
27
27
28 def printer(*msg):
28 def printer(*msg):
29 pprint(msg)
29 pprint(msg, stream=sys.__stdout__)
30
30
31 class Engine(object):
31 class Engine(object):
32 """IPython engine"""
32 """IPython engine"""
@@ -69,19 +69,12 b' class Engine(object):'
69 shell_addrs = [str(queue_addr)]
69 shell_addrs = [str(queue_addr)]
70 control_addr = str(msg.content.control)
70 control_addr = str(msg.content.control)
71 task_addr = msg.content.task
71 task_addr = msg.content.task
72 iopub_addr = msg.content.iopub
72 if task_addr:
73 if task_addr:
73 shell_addrs.append(str(task_addr))
74 shell_addrs.append(str(task_addr))
74
75
75 hb_addrs = msg.content.heartbeat
76 hb_addrs = msg.content.heartbeat
76 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
77
78 # placeholder for no, since pub isn't hooked up:
79 sub = self.context.socket(zmq.SUB)
80 sub = zmqstream.ZMQStream(sub, self.loop)
81 sub.on_recv(lambda *a: None)
82 port = sub.bind_to_random_port("tcp://%s"%LOCALHOST)
83 iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345)
84
85 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
86 hb_addrs, client_addr=None, loop=self.loop,
79 hb_addrs, client_addr=None, loop=self.loop,
87 context=self.context, key=self.session.key)[-1]
80 context=self.context, key=self.session.key)[-1]
@@ -96,7 +89,7 b' class Engine(object):'
96
89
97 # logger.info("engine::completed registration with id %s"%self.session.username)
90 # logger.info("engine::completed registration with id %s"%self.session.username)
98
91
99 print (msg)
92 print (msg,file=sys.__stdout__)
100
93
101 def unregister(self):
94 def unregister(self):
102 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -104,7 +97,7 b' class Engine(object):'
104 sys.exit(0)
97 sys.exit(0)
105
98
106 def start(self):
99 def start(self):
107 print ("registering")
100 print ("registering",file=sys.__stdout__)
108 self.register()
101 self.register()
109
102
110
103
@@ -128,7 +121,7 b' def main(argv=None, user_ns=None):'
128 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
129
122
130 reg_conn = iface % args.regport
123 reg_conn = iface % args.regport
131 print (reg_conn)
124 print (reg_conn, file=sys.__stdout__)
132 print ("Starting the engine...", file=sys.__stderr__)
125 print ("Starting the engine...", file=sys.__stderr__)
133
126
134 reg = ctx.socket(zmq.PAIR)
127 reg = ctx.socket(zmq.PAIR)
@@ -91,7 +91,7 b' def make_base_argument_parser():'
91 help='set the XREP port for registration [default: 10101]')
91 help='set the XREP port for registration [default: 10101]')
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 help='set the PUB port for logging [default: 10201]')
93 help='set the PUB port for logging [default: 10201]')
94 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 help='set the log level [default: DEBUG]')
95 help='set the log level [default: DEBUG]')
96 parser.add_argument('--ident', type=str,
96 parser.add_argument('--ident', type=str,
97 help='set the ZMQ identity [default: random]')
97 help='set the ZMQ identity [default: random]')
@@ -107,6 +107,11 b' def make_base_argument_parser():'
107
107
108
108
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
110 try:
111 loglevel = int(loglevel)
112 except ValueError:
113 if isinstance(loglevel, str):
114 loglevel = getattr(logging, loglevel)
110 lsock = context.socket(zmq.PUB)
115 lsock = context.socket(zmq.PUB)
111 lsock.connect(iface)
116 lsock.connect(iface)
112 handler = handlers.PUBHandler(lsock)
117 handler = handlers.PUBHandler(lsock)
@@ -41,6 +41,10 b' else:'
41 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
42 return
42 return
43
43
44 def _printer(*args, **kwargs):
45 print (args)
46 print (kwargs)
47
44 def init_record(msg):
48 def init_record(msg):
45 """return an empty TaskRecord dict, with all keys initialized with None."""
49 """return an empty TaskRecord dict, with all keys initialized with None."""
46 header = msg['header']
50 header = msg['header']
@@ -58,7 +62,12 b' def init_record(msg):'
58 'result_header' : None,
62 'result_header' : None,
59 'result_content' : None,
63 'result_content' : None,
60 'result_buffers' : None,
64 'result_buffers' : None,
61 'queue' : None
65 'queue' : None,
66 'pyin' : None,
67 'pyout': None,
68 'pyerr': None,
69 'stdout': '',
70 'stderr': '',
62 }
71 }
63
72
64
73
@@ -181,19 +190,20 b' class Hub(object):'
181 # register our callbacks
190 # register our callbacks
182 self.registrar.on_recv(self.dispatch_register_request)
191 self.registrar.on_recv(self.dispatch_register_request)
183 self.clientele.on_recv(self.dispatch_client_msg)
192 self.clientele.on_recv(self.dispatch_client_msg)
184 self.queue.on_recv(self.dispatch_queue_traffic)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
185
194
186 if heartbeat is not None:
195 if heartbeat is not None:
187 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
188 heartbeat.add_new_heart_handler(self.handle_new_heart)
197 heartbeat.add_new_heart_handler(self.handle_new_heart)
189
198
190 self.queue_handlers = { 'in' : self.save_queue_request,
199 self.monitor_handlers = { 'in' : self.save_queue_request,
191 'out': self.save_queue_result,
200 'out': self.save_queue_result,
192 'intask': self.save_task_request,
201 'intask': self.save_task_request,
193 'outtask': self.save_task_result,
202 'outtask': self.save_task_result,
194 'tracktask': self.save_task_destination,
203 'tracktask': self.save_task_destination,
195 'incontrol': _passer,
204 'incontrol': _passer,
196 'outcontrol': _passer,
205 'outcontrol': _passer,
206 'iopub': self.save_iopub_message,
197 }
207 }
198
208
199 self.client_handlers = {'queue_request': self.queue_status,
209 self.client_handlers = {'queue_request': self.queue_status,
@@ -262,7 +272,7 b' class Hub(object):'
262 try:
272 try:
263 msg = self.session.unpack_message(msg[1:], content=True)
273 msg = self.session.unpack_message(msg[1:], content=True)
264 except:
274 except:
265 logger.error("client::Invalid Message %s"%msg)
275 logger.error("client::Invalid Message %s"%msg, exc_info=True)
266 return False
276 return False
267
277
268 msg_type = msg.get('msg_type', None)
278 msg_type = msg.get('msg_type', None)
@@ -299,19 +309,20 b' class Hub(object):'
299 else:
309 else:
300 handler(idents, msg)
310 handler(idents, msg)
301
311
302 def dispatch_queue_traffic(self, msg):
312 def dispatch_monitor_traffic(self, msg):
303 """all ME and Task queue messages come through here"""
313 """all ME and Task queue messages come through here, as well as
304 logger.debug("queue traffic: %s"%msg[:2])
314 IOPub traffic."""
315 logger.debug("monitor traffic: %s"%msg[:2])
305 switch = msg[0]
316 switch = msg[0]
306 idents, msg = self.session.feed_identities(msg[1:])
317 idents, msg = self.session.feed_identities(msg[1:])
307 if not idents:
318 if not idents:
308 logger.error("Bad Queue Message: %s"%msg)
319 logger.error("Bad Monitor Message: %s"%msg)
309 return
320 return
310 handler = self.queue_handlers.get(switch, None)
321 handler = self.monitor_handlers.get(switch, None)
311 if handler is not None:
322 if handler is not None:
312 handler(idents, msg)
323 handler(idents, msg)
313 else:
324 else:
314 logger.error("Invalid message topic: %s"%switch)
325 logger.error("Invalid monitor topic: %s"%switch)
315
326
316
327
317 def dispatch_client_msg(self, msg):
328 def dispatch_client_msg(self, msg):
@@ -486,7 +497,7 b' class Hub(object):'
486 msg = self.session.unpack_message(msg, content=False)
497 msg = self.session.unpack_message(msg, content=False)
487 except:
498 except:
488 logger.error("task::invalid task result message send to %r: %s"%(
499 logger.error("task::invalid task result message send to %r: %s"%(
489 client_id, msg))
500 client_id, msg), exc_info=True)
490 raise
501 raise
491 return
502 return
492
503
@@ -532,7 +543,7 b' class Hub(object):'
532 try:
543 try:
533 msg = self.session.unpack_message(msg, content=True)
544 msg = self.session.unpack_message(msg, content=True)
534 except:
545 except:
535 logger.error("task::invalid task tracking message")
546 logger.error("task::invalid task tracking message", exc_info=True)
536 return
547 return
537 content = msg['content']
548 content = msg['content']
538 print (content)
549 print (content)
@@ -557,6 +568,43 b' class Hub(object):'
557 # self.session.send('mia_reply', content=content, idents=client_id)
568 # self.session.send('mia_reply', content=content, idents=client_id)
558
569
559
570
571 #--------------------- IOPub Traffic ------------------------------
572
573 def save_iopub_message(self, topics, msg):
574 """save an iopub message into the db"""
575 print (topics)
576 try:
577 msg = self.session.unpack_message(msg, content=True)
578 except:
579 logger.error("iopub::invalid IOPub message", exc_info=True)
580 return
581
582 parent = msg['parent_header']
583 msg_id = parent['msg_id']
584 msg_type = msg['msg_type']
585 content = msg['content']
586
587 # ensure msg_id is in db
588 try:
589 rec = self.db.get_record(msg_id)
590 except:
591 logger.error("iopub::IOPub message has invalid parent", exc_info=True)
592 return
593 # stream
594 d = {}
595 if msg_type == 'stream':
596 name = content['name']
597 s = rec[name] or ''
598 d[name] = s + content['data']
599
600 elif msg_type == 'pyerr':
601 d['pyerr'] = content
602 else:
603 d[msg_type] = content['data']
604
605 self.db.update_record(msg_id, d)
606
607
560
608
561 #-------------------------------------------------------------------------
609 #-------------------------------------------------------------------------
562 # Registration requests
610 # Registration requests
@@ -579,7 +627,7 b' class Hub(object):'
579 try:
627 try:
580 queue = content['queue']
628 queue = content['queue']
581 except KeyError:
629 except KeyError:
582 logger.error("registration::queue not specified")
630 logger.error("registration::queue not specified", exc_info=True)
583 return
631 return
584 heart = content.get('heartbeat', None)
632 heart = content.get('heartbeat', None)
585 """register a new engine, and create the socket(s) necessary"""
633 """register a new engine, and create the socket(s) necessary"""
@@ -639,7 +687,7 b' class Hub(object):'
639 try:
687 try:
640 eid = msg['content']['id']
688 eid = msg['content']['id']
641 except:
689 except:
642 logger.error("registration::bad engine id for unregistration: %s"%ident)
690 logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
643 return
691 return
644 logger.info("registration::unregister_engine(%s)"%eid)
692 logger.info("registration::unregister_engine(%s)"%eid)
645 content=dict(id=eid, queue=self.engines[eid].queue)
693 content=dict(id=eid, queue=self.engines[eid].queue)
@@ -662,7 +710,7 b' class Hub(object):'
662 try:
710 try:
663 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
711 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
664 except KeyError:
712 except KeyError:
665 logger.error("registration::tried to finish nonexistant registration")
713 logger.error("registration::tried to finish nonexistant registration", exc_info=True)
666 return
714 return
667 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
715 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
668 if purge is not None:
716 if purge is not None:
@@ -820,9 +868,13 b' class Hub(object):'
820 completed.append(msg_id)
868 completed.append(msg_id)
821 if not statusonly:
869 if not statusonly:
822 rec = records[msg_id]
870 rec = records[msg_id]
871 io_dict = {}
872 for key in 'pyin pyout pyerr stdout stderr'.split():
873 io_dict[key] = rec[key]
823 content[msg_id] = { 'result_content': rec['result_content'],
874 content[msg_id] = { 'result_content': rec['result_content'],
824 'header': rec['header'],
875 'header': rec['header'],
825 'result_header' : rec['result_header'],
876 'result_header' : rec['result_header'],
877 'io' : io_dict,
826 }
878 }
827 buffers.extend(map(str, rec['result_buffers']))
879 buffers.extend(map(str, rec['result_buffers']))
828 else:
880 else:
@@ -30,9 +30,9 b' from IPython.external.decorator import decorator'
30
30
31 @decorator
31 @decorator
32 def logged(f,self,*args,**kwargs):
32 def logged(f,self,*args,**kwargs):
33 print ("#--------------------")
33 # print ("#--------------------")
34 print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 print ("#--")
35 # print ("#--")
36 return f(self,*args, **kwargs)
36 return f(self,*args, **kwargs)
37
37
38 #----------------------------------------------------------------------
38 #----------------------------------------------------------------------
@@ -28,6 +28,9 b' from IPython.core import ultratb'
28 from IPython.utils.traitlets import HasTraits, Instance, List
28 from IPython.utils.traitlets import HasTraits, Instance, List
29 from IPython.zmq.completer import KernelCompleter
29 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.log import logger # a Logger object
30 from IPython.zmq.log import logger # a Logger object
31 from IPython.zmq.iostream import OutStream
32 from IPython.zmq.displayhook import DisplayHook
33
31
34
32 from streamsession import StreamSession, Message, extract_header, serialize_object,\
35 from streamsession import StreamSession, Message, extract_header, serialize_object,\
33 unpack_apply_message, ISO8601, wrap_exception
36 unpack_apply_message, ISO8601, wrap_exception
@@ -36,7 +39,7 b' import heartmonitor'
36 from client import Client
39 from client import Client
37
40
38 def printer(*args):
41 def printer(*args):
39 pprint(args)
42 pprint(args, stream=sys.__stdout__)
40
43
41 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
42 # Main kernel class
45 # Main kernel class
@@ -59,6 +62,7 b' class Kernel(HasTraits):'
59 def __init__(self, **kwargs):
62 def __init__(self, **kwargs):
60 super(Kernel, self).__init__(**kwargs)
63 super(Kernel, self).__init__(**kwargs)
61 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
64 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
62 self.user_ns = {}
66 self.user_ns = {}
63 self.history = []
67 self.history = []
64 self.compiler = CommandCompiler()
68 self.compiler = CommandCompiler()
@@ -212,18 +216,22 b' class Kernel(HasTraits):'
212 return
216 return
213 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
217 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
214 # self.iopub_stream.send(pyin_msg)
218 # self.iopub_stream.send(pyin_msg)
215 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
219 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
216 started = datetime.now().strftime(ISO8601)
221 started = datetime.now().strftime(ISO8601)
217 try:
222 try:
218 comp_code = self.compiler(code, '<zmq-kernel>')
223 comp_code = self.compiler(code, '<zmq-kernel>')
219 # allow for not overriding displayhook
224 # allow for not overriding displayhook
220 if hasattr(sys.displayhook, 'set_parent'):
225 if hasattr(sys.displayhook, 'set_parent'):
221 sys.displayhook.set_parent(parent)
226 sys.displayhook.set_parent(parent)
227 sys.stdout.set_parent(parent)
228 sys.stderr.set_parent(parent)
222 exec comp_code in self.user_ns, self.user_ns
229 exec comp_code in self.user_ns, self.user_ns
223 except:
230 except:
224 exc_content = self._wrap_exception('execute')
231 exc_content = self._wrap_exception('execute')
225 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
232 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
226 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
233 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
227 reply_content = exc_content
235 reply_content = exc_content
228 else:
236 else:
229 reply_content = {'status' : 'ok'}
237 reply_content = {'status' : 'ok'}
@@ -247,7 +255,7 b' class Kernel(HasTraits):'
247 return self.completer.complete(msg.content.line, msg.content.text)
255 return self.completer.complete(msg.content.line, msg.content.text)
248
256
249 def apply_request(self, stream, ident, parent):
257 def apply_request(self, stream, ident, parent):
250 print (parent)
258 # print (parent)
251 try:
259 try:
252 content = parent[u'content']
260 content = parent[u'content']
253 bufs = parent[u'buffers']
261 bufs = parent[u'buffers']
@@ -266,6 +274,8 b' class Kernel(HasTraits):'
266 # allow for not overriding displayhook
274 # allow for not overriding displayhook
267 if hasattr(sys.displayhook, 'set_parent'):
275 if hasattr(sys.displayhook, 'set_parent'):
268 sys.displayhook.set_parent(parent)
276 sys.displayhook.set_parent(parent)
277 sys.stdout.set_parent(parent)
278 sys.stderr.set_parent(parent)
269 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
279 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
270 if bound:
280 if bound:
271 working = self.user_ns
281 working = self.user_ns
@@ -305,7 +315,8 b' class Kernel(HasTraits):'
305 except:
315 except:
306 exc_content = self._wrap_exception('apply')
316 exc_content = self._wrap_exception('apply')
307 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
317 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
308 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent)
318 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
309 reply_content = exc_content
320 reply_content = exc_content
310 result_buf = []
321 result_buf = []
311
322
@@ -318,7 +329,7 b' class Kernel(HasTraits):'
318 # self.reply_socket.send_json(reply_msg)
329 # self.reply_socket.send_json(reply_msg)
319 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
330 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
320 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
331 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
321 print(Message(reply_msg), file=sys.__stdout__)
332 # print(Message(reply_msg), file=sys.__stdout__)
322 # if reply_msg['content']['status'] == u'error':
333 # if reply_msg['content']['status'] == u'error':
323 # self.abort_queues()
334 # self.abort_queues()
324
335
@@ -364,7 +375,7 b' class Kernel(HasTraits):'
364
375
365 if self.iopub_stream:
376 if self.iopub_stream:
366 self.iopub_stream.on_err(printer)
377 self.iopub_stream.on_err(printer)
367 self.iopub_stream.on_send(printer)
378 # self.iopub_stream.on_send(printer)
368
379
369 #### while True mode:
380 #### while True mode:
370 # while True:
381 # while True:
@@ -388,7 +399,9 b' class Kernel(HasTraits):'
388 # time.sleep(1e-3)
399 # time.sleep(1e-3)
389
400
390 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
391 client_addr=None, loop=None, context=None, key=None):
402 client_addr=None, loop=None, context=None, key=None,
403 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404
392 # create loop, context, and session:
405 # create loop, context, and session:
393 if loop is None:
406 if loop is None:
394 loop = ioloop.IOLoop.instance()
407 loop = ioloop.IOLoop.instance()
@@ -417,6 +430,17 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
417 iopub_stream.setsockopt(zmq.IDENTITY, identity)
430 iopub_stream.setsockopt(zmq.IDENTITY, identity)
418 iopub_stream.connect(iopub_addr)
431 iopub_stream.connect(iopub_addr)
419
432
433 # Redirect input streams and set a display hook.
434 if out_stream_factory:
435 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
437 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
439 if display_hook_factory:
440 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
442
443
420 # launch heartbeat
444 # launch heartbeat
421 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
445 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
422 heart.start()
446 heart.start()
@@ -336,7 +336,7 b' class StreamSession(object):'
336 return header.get('key', None) == self.key
336 return header.get('key', None) == self.key
337
337
338
338
339 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
339 def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
340 """Build and send a message via stream or socket.
340 """Build and send a message via stream or socket.
341
341
342 Parameters
342 Parameters
@@ -344,10 +344,9 b' class StreamSession(object):'
344
344
345 stream : zmq.Socket or ZMQStream
345 stream : zmq.Socket or ZMQStream
346 the socket-like object used to send the data
346 the socket-like object used to send the data
347 msg_type : str or Message/dict
347 msg_or_type : str or Message/dict
348 Normally, msg_type will be
348 Normally, msg_or_type will be a msg_type unless a message is being sent more
349
349 than once.
350
351
350
352 Returns
351 Returns
353 -------
352 -------
@@ -356,13 +355,13 b' class StreamSession(object):'
356 the nice wrapped dict-like object containing the headers
355 the nice wrapped dict-like object containing the headers
357
356
358 """
357 """
359 if isinstance(msg_type, (Message, dict)):
358 if isinstance(msg_or_type, (Message, dict)):
360 # we got a Message, not a msg_type
359 # we got a Message, not a msg_type
361 # don't build a new Message
360 # don't build a new Message
362 msg = msg_type
361 msg = msg_or_type
363 content = msg['content']
362 content = msg['content']
364 else:
363 else:
365 msg = self.msg(msg_type, content, parent, subheader)
364 msg = self.msg(msg_or_type, content, parent, subheader)
366 buffers = [] if buffers is None else buffers
365 buffers = [] if buffers is None else buffers
367 to_send = []
366 to_send = []
368 if isinstance(ident, list):
367 if isinstance(ident, list):
General Comments 0
You need to be logged in to leave comments. Login now