Show More
@@ -4,6 +4,8 from session import extract_header | |||
|
4 | 4 | |
|
5 | 5 | class DisplayHook(object): |
|
6 | 6 | |
|
7 | topic=None | |
|
8 | ||
|
7 | 9 | def __init__(self, session, pub_socket): |
|
8 | 10 | self.session = session |
|
9 | 11 | self.pub_socket = pub_socket |
@@ -15,7 +17,7 class DisplayHook(object): | |||
|
15 | 17 | |
|
16 | 18 | __builtin__._ = obj |
|
17 | 19 | msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, |
|
18 | parent=self.parent_header) | |
|
20 | parent=self.parent_header, ident=self.topic) | |
|
19 | 21 | |
|
20 | 22 | def set_parent(self, parent): |
|
21 | 23 | self.parent_header = extract_header(parent) No newline at end of file |
@@ -23,7 +23,8 class OutStream(object): | |||
|
23 | 23 | |
|
24 | 24 | # The time interval between automatic flushes, in seconds. |
|
25 | 25 | flush_interval = 0.05 |
|
26 | ||
|
26 | topic=None | |
|
27 | ||
|
27 | 28 | def __init__(self, session, pub_socket, name): |
|
28 | 29 | self.session = session |
|
29 | 30 | self.pub_socket = pub_socket |
@@ -49,10 +50,10 class OutStream(object): | |||
|
49 | 50 | enc = sys.stdin.encoding or sys.getdefaultencoding() |
|
50 | 51 | data = data.decode(enc, 'replace') |
|
51 | 52 | content = {u'name':self.name, u'data':data} |
|
52 | msg = self.session.send(self.pub_socket, u'stream', | |
|
53 |
|
|
|
54 | parent=self.parent_header) | |
|
53 | msg = self.session.send(self.pub_socket, u'stream', content=content, | |
|
54 | parent=self.parent_header, ident=self.topic) | |
|
55 | 55 | logger.debug(msg) |
|
56 | ||
|
56 | 57 | self._buffer.close() |
|
57 | 58 | self._new_buffer() |
|
58 | 59 |
@@ -137,11 +137,13 class AsyncResult(object): | |||
|
137 | 137 | return self._metadata |
|
138 | 138 | |
|
139 | 139 | @property |
|
140 | @check_ready | |
|
141 | 140 | def result_dict(self): |
|
142 | 141 | """result property as a dict.""" |
|
143 | 142 | return self.get_dict(0) |
|
144 | 143 | |
|
144 | def __dict__(self): | |
|
145 | return self.get_dict(0) | |
|
146 | ||
|
145 | 147 | #------------------------------------- |
|
146 | 148 | # dict-access |
|
147 | 149 | #------------------------------------- |
@@ -103,6 +103,32 class ResultDict(dict): | |||
|
103 | 103 | raise res |
|
104 | 104 | return res |
|
105 | 105 | |
|
106 | class Metadata(dict): | |
|
107 | """Subclass of dict for initializing metadata values.""" | |
|
108 | def __init__(self, *args, **kwargs): | |
|
109 | dict.__init__(self) | |
|
110 | md = {'msg_id' : None, | |
|
111 | 'submitted' : None, | |
|
112 | 'started' : None, | |
|
113 | 'completed' : None, | |
|
114 | 'received' : None, | |
|
115 | 'engine_uuid' : None, | |
|
116 | 'engine_id' : None, | |
|
117 | 'follow' : None, | |
|
118 | 'after' : None, | |
|
119 | 'status' : None, | |
|
120 | ||
|
121 | 'pyin' : None, | |
|
122 | 'pyout' : None, | |
|
123 | 'pyerr' : None, | |
|
124 | 'stdout' : '', | |
|
125 | 'stderr' : '', | |
|
126 | } | |
|
127 | self.update(md) | |
|
128 | self.update(dict(*args, **kwargs)) | |
|
129 | ||
|
130 | ||
|
131 | ||
|
106 | 132 | class Client(object): |
|
107 | 133 | """A semi-synchronous client to the IPython ZMQ controller |
|
108 | 134 | |
@@ -196,6 +222,7 class Client(object): | |||
|
196 | 222 | _registration_socket=None |
|
197 | 223 | _query_socket=None |
|
198 | 224 | _control_socket=None |
|
225 | _iopub_socket=None | |
|
199 | 226 | _notification_socket=None |
|
200 | 227 | _mux_socket=None |
|
201 | 228 | _task_socket=None |
@@ -325,6 +352,11 class Client(object): | |||
|
325 | 352 | self._control_socket = self.context.socket(zmq.PAIR) |
|
326 | 353 | self._control_socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
327 | 354 | connect_socket(self._control_socket, content.control) |
|
355 | if content.iopub: | |
|
356 | self._iopub_socket = self.context.socket(zmq.SUB) | |
|
357 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '') | |
|
358 | self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session) | |
|
359 | connect_socket(self._iopub_socket, content.iopub) | |
|
328 | 360 | self._update_engines(dict(content.engines)) |
|
329 | 361 | |
|
330 | 362 | else: |
@@ -350,8 +382,8 class Client(object): | |||
|
350 | 382 | if eid in self._ids: |
|
351 | 383 | self._ids.remove(eid) |
|
352 | 384 | self._engines.pop(eid) |
|
353 |
|
|
|
354 |
def _ |
|
|
385 | ||
|
386 | def _extract_metadata(self, header, parent, content): | |
|
355 | 387 | md = {'msg_id' : parent['msg_id'], |
|
356 | 388 | 'submitted' : datetime.strptime(parent['date'], ss.ISO8601), |
|
357 | 389 | 'started' : datetime.strptime(header['started'], ss.ISO8601), |
@@ -361,8 +393,8 class Client(object): | |||
|
361 | 393 | 'engine_id' : self._engines.get(header['engine'], None), |
|
362 | 394 | 'follow' : parent['follow'], |
|
363 | 395 | 'after' : parent['after'], |
|
364 | 'status' : content['status'] | |
|
365 |
|
|
|
396 | 'status' : content['status'], | |
|
397 | } | |
|
366 | 398 | return md |
|
367 | 399 | |
|
368 | 400 | def _handle_execute_reply(self, msg): |
@@ -390,8 +422,12 class Client(object): | |||
|
390 | 422 | content = msg['content'] |
|
391 | 423 | header = msg['header'] |
|
392 | 424 | |
|
393 | self.metadata[msg_id] = self._build_metadata(header, parent, content) | |
|
425 | # construct metadata: | |
|
426 | md = self.metadata.setdefault(msg_id, Metadata()) | |
|
427 | md.update(self._extract_metadata(header, parent, content)) | |
|
428 | self.metadata[msg_id] = md | |
|
394 | 429 | |
|
430 | # construct result: | |
|
395 | 431 | if content['status'] == 'ok': |
|
396 | 432 | self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0] |
|
397 | 433 | elif content['status'] == 'aborted': |
@@ -448,6 +484,37 class Client(object): | |||
|
448 | 484 | pprint(msg) |
|
449 | 485 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) |
|
450 | 486 | |
|
487 | def _flush_iopub(self, sock): | |
|
488 | """Flush replies from the iopub channel waiting | |
|
489 | in the ZMQ queue. | |
|
490 | """ | |
|
491 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
|
492 | while msg is not None: | |
|
493 | if self.debug: | |
|
494 | pprint(msg) | |
|
495 | msg = msg[-1] | |
|
496 | parent = msg['parent_header'] | |
|
497 | msg_id = parent['msg_id'] | |
|
498 | content = msg['content'] | |
|
499 | header = msg['header'] | |
|
500 | msg_type = msg['msg_type'] | |
|
501 | ||
|
502 | # init metadata: | |
|
503 | md = self.metadata.setdefault(msg_id, Metadata()) | |
|
504 | ||
|
505 | if msg_type == 'stream': | |
|
506 | name = content['name'] | |
|
507 | s = md[name] or '' | |
|
508 | md[name] = s + content['data'] | |
|
509 | elif msg_type == 'pyerr': | |
|
510 | md.update({'pyerr' : ss.unwrap_exception(content)}) | |
|
511 | else: | |
|
512 | md.update({msg_type : content['data']}) | |
|
513 | ||
|
514 | self.metadata[msg_id] = md | |
|
515 | ||
|
516 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |
|
517 | ||
|
451 | 518 | #-------------------------------------------------------------------------- |
|
452 | 519 | # getitem |
|
453 | 520 | #-------------------------------------------------------------------------- |
@@ -501,6 +568,8 class Client(object): | |||
|
501 | 568 | self._flush_results(self._task_socket) |
|
502 | 569 | if self._control_socket: |
|
503 | 570 | self._flush_control(self._control_socket) |
|
571 | if self._iopub_socket: | |
|
572 | self._flush_iopub(self._iopub_socket) | |
|
504 | 573 | |
|
505 | 574 | def barrier(self, msg_ids=None, timeout=-1): |
|
506 | 575 | """waits on one or more `msg_ids`, for up to `timeout` seconds. |
@@ -966,10 +1035,13 class Client(object): | |||
|
966 | 1035 | parent = rec['header'] |
|
967 | 1036 | header = rec['result_header'] |
|
968 | 1037 | rcontent = rec['result_content'] |
|
1038 | iodict = rec['io'] | |
|
969 | 1039 | if isinstance(rcontent, str): |
|
970 | 1040 | rcontent = self.session.unpack(rcontent) |
|
971 | 1041 | |
|
972 | self.metadata[msg_id] = self._build_metadata(header, parent, rcontent) | |
|
1042 | md = self.metadata.setdefault(msg_id, Metadata()) | |
|
1043 | md.update(self._extract_metadata(header, parent, rcontent)) | |
|
1044 | md.update(iodict) | |
|
973 | 1045 | |
|
974 | 1046 | if rcontent['status'] == 'ok': |
|
975 | 1047 | res,buffers = ss.unserialize_object(buffers) |
@@ -58,8 +58,8 def make_argument_parser(): | |||
|
58 | 58 | help='set the PUB socket for registration notification [default: random]') |
|
59 | 59 | parser.add_argument('--hb', type=str, metavar='PORTS', |
|
60 | 60 | help='set the 2 ports for heartbeats [default: random]') |
|
61 |
parser.add_argument('--ping', type=int, default= |
|
|
62 |
help='set the heartbeat period in ms [default: |
|
|
61 | parser.add_argument('--ping', type=int, default=100, | |
|
62 | help='set the heartbeat period in ms [default: 100]') | |
|
63 | 63 | parser.add_argument('--monitor', type=int, metavar='PORT', default=0, |
|
64 | 64 | help='set the SUB port for queue monitoring [default: random]') |
|
65 | 65 | parser.add_argument('--mux', type=str, metavar='PORTS', |
@@ -68,11 +68,15 def make_argument_parser(): | |||
|
68 | 68 | help='set the XREP/XREQ ports for the task queue [default: random]') |
|
69 | 69 | parser.add_argument('--control', type=str, metavar='PORTS', |
|
70 | 70 | help='set the XREP ports for the control queue [default: random]') |
|
71 |
parser.add_argument('-- |
|
|
71 | parser.add_argument('--iopub', type=str, metavar='PORTS', | |
|
72 | help='set the PUB/SUB ports for the iopub relay [default: random]') | |
|
73 | parser.add_argument('--scheduler', type=str, default='lru', | |
|
72 | 74 | choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'], |
|
73 |
help='select the task scheduler [default: |
|
|
75 | help='select the task scheduler [default: Python LRU]') | |
|
74 | 76 | parser.add_argument('--mongodb', action='store_true', |
|
75 | 77 | help='Use MongoDB task storage [default: in-memory]') |
|
78 | parser.add_argument('--session', type=str, default=None, | |
|
79 | help='Manually specify the session id.') | |
|
76 | 80 | |
|
77 | 81 | return parser |
|
78 | 82 | |
@@ -95,6 +99,11 def main(argv=None): | |||
|
95 | 99 | else: |
|
96 | 100 | mux = None |
|
97 | 101 | random_ports += 2 |
|
102 | if args.iopub: | |
|
103 | iopub = split_ports(args.iopub, 2) | |
|
104 | else: | |
|
105 | iopub = None | |
|
106 | random_ports += 2 | |
|
98 | 107 | if args.task: |
|
99 | 108 | task = split_ports(args.task, 2) |
|
100 | 109 | else: |
@@ -139,7 +148,8 def main(argv=None): | |||
|
139 | 148 | if args.execkey and not os.path.isfile(args.execkey): |
|
140 | 149 | generate_exec_key(args.execkey) |
|
141 | 150 | |
|
142 |
thesession = session.StreamSession(username=args.ident or "controller", |
|
|
151 | thesession = session.StreamSession(username=args.ident or "controller", | |
|
152 | keyfile=args.execkey, session=args.session) | |
|
143 | 153 | |
|
144 | 154 | ### build and launch the queues ### |
|
145 | 155 | |
@@ -151,6 +161,19 def main(argv=None): | |||
|
151 | 161 | |
|
152 | 162 | ports = select_random_ports(random_ports) |
|
153 | 163 | children = [] |
|
164 | ||
|
165 | # IOPub relay (in a Process) | |
|
166 | if not iopub: | |
|
167 | iopub = (ports.pop(),ports.pop()) | |
|
168 | q = ProcessMonitoredQueue(zmq.SUB, zmq.PUB, zmq.PUB, 'iopub', 'N/A') | |
|
169 | q.bind_in(iface%iopub[1]) | |
|
170 | q.bind_out(iface%iopub[0]) | |
|
171 | q.setsockopt_in(zmq.SUBSCRIBE, '') | |
|
172 | q.connect_mon(iface%monport) | |
|
173 | q.daemon=True | |
|
174 | q.start() | |
|
175 | children.append(q.launcher) | |
|
176 | ||
|
154 | 177 | # Multiplexer Queue (in a Process) |
|
155 | 178 | if not mux: |
|
156 | 179 | mux = (ports.pop(),ports.pop()) |
@@ -204,6 +227,7 def main(argv=None): | |||
|
204 | 227 | 'queue': iface%mux[1], |
|
205 | 228 | 'heartbeat': (iface%hb[0], iface%hb[1]), |
|
206 | 229 | 'task' : iface%task[1], |
|
230 | 'iopub' : iface%iopub[1], | |
|
207 | 231 | 'monitor' : iface%monport, |
|
208 | 232 | } |
|
209 | 233 | |
@@ -212,8 +236,11 def main(argv=None): | |||
|
212 | 236 | 'query': iface%cport, |
|
213 | 237 | 'queue': iface%mux[0], |
|
214 | 238 | 'task' : iface%task[0], |
|
239 | 'iopub' : iface%iopub[0], | |
|
215 | 240 | 'notification': iface%nport |
|
216 | 241 | } |
|
242 | ||
|
243 | # register relay of signals to the children | |
|
217 | 244 | signal_children(children) |
|
218 | 245 | hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs) |
|
219 | 246 | dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop) |
@@ -26,7 +26,7 from entry_point import make_base_argument_parser, connect_logger, parse_url | |||
|
26 | 26 | |
|
27 | 27 | |
|
28 | 28 | def printer(*msg): |
|
29 | pprint(msg) | |
|
29 | pprint(msg, stream=sys.__stdout__) | |
|
30 | 30 | |
|
31 | 31 | class Engine(object): |
|
32 | 32 | """IPython engine""" |
@@ -69,19 +69,12 class Engine(object): | |||
|
69 | 69 | shell_addrs = [str(queue_addr)] |
|
70 | 70 | control_addr = str(msg.content.control) |
|
71 | 71 | task_addr = msg.content.task |
|
72 | iopub_addr = msg.content.iopub | |
|
72 | 73 | if task_addr: |
|
73 | 74 | shell_addrs.append(str(task_addr)) |
|
74 | 75 | |
|
75 | 76 | hb_addrs = msg.content.heartbeat |
|
76 | 77 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() |
|
77 | ||
|
78 | # placeholder for no, since pub isn't hooked up: | |
|
79 | sub = self.context.socket(zmq.SUB) | |
|
80 | sub = zmqstream.ZMQStream(sub, self.loop) | |
|
81 | sub.on_recv(lambda *a: None) | |
|
82 | port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) | |
|
83 | iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) | |
|
84 | ||
|
85 | 78 | k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, |
|
86 | 79 | hb_addrs, client_addr=None, loop=self.loop, |
|
87 | 80 | context=self.context, key=self.session.key)[-1] |
@@ -96,7 +89,7 class Engine(object): | |||
|
96 | 89 | |
|
97 | 90 | # logger.info("engine::completed registration with id %s"%self.session.username) |
|
98 | 91 | |
|
99 | print (msg) | |
|
92 | print (msg,file=sys.__stdout__) | |
|
100 | 93 | |
|
101 | 94 | def unregister(self): |
|
102 | 95 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) |
@@ -104,7 +97,7 class Engine(object): | |||
|
104 | 97 | sys.exit(0) |
|
105 | 98 | |
|
106 | 99 | def start(self): |
|
107 | print ("registering") | |
|
100 | print ("registering",file=sys.__stdout__) | |
|
108 | 101 | self.register() |
|
109 | 102 | |
|
110 | 103 | |
@@ -128,7 +121,7 def main(argv=None, user_ns=None): | |||
|
128 | 121 | connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel) |
|
129 | 122 | |
|
130 | 123 | reg_conn = iface % args.regport |
|
131 | print (reg_conn) | |
|
124 | print (reg_conn, file=sys.__stdout__) | |
|
132 | 125 | print ("Starting the engine...", file=sys.__stderr__) |
|
133 | 126 | |
|
134 | 127 | reg = ctx.socket(zmq.PAIR) |
@@ -91,7 +91,7 def make_base_argument_parser(): | |||
|
91 | 91 | help='set the XREP port for registration [default: 10101]') |
|
92 | 92 | parser.add_argument('--logport', type=int, metavar='PORT', default=20202, |
|
93 | 93 | help='set the PUB port for logging [default: 10201]') |
|
94 |
parser.add_argument('--loglevel', type= |
|
|
94 | parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG, | |
|
95 | 95 | help='set the log level [default: DEBUG]') |
|
96 | 96 | parser.add_argument('--ident', type=str, |
|
97 | 97 | help='set the ZMQ identity [default: random]') |
@@ -107,6 +107,11 def make_base_argument_parser(): | |||
|
107 | 107 | |
|
108 | 108 | |
|
109 | 109 | def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG): |
|
110 | try: | |
|
111 | loglevel = int(loglevel) | |
|
112 | except ValueError: | |
|
113 | if isinstance(loglevel, str): | |
|
114 | loglevel = getattr(logging, loglevel) | |
|
110 | 115 | lsock = context.socket(zmq.PUB) |
|
111 | 116 | lsock.connect(iface) |
|
112 | 117 | handler = handlers.PUBHandler(lsock) |
@@ -41,6 +41,10 else: | |||
|
41 | 41 | def _passer(*args, **kwargs): |
|
42 | 42 | return |
|
43 | 43 | |
|
44 | def _printer(*args, **kwargs): | |
|
45 | print (args) | |
|
46 | print (kwargs) | |
|
47 | ||
|
44 | 48 | def init_record(msg): |
|
45 | 49 | """return an empty TaskRecord dict, with all keys initialized with None.""" |
|
46 | 50 | header = msg['header'] |
@@ -58,7 +62,12 def init_record(msg): | |||
|
58 | 62 | 'result_header' : None, |
|
59 | 63 | 'result_content' : None, |
|
60 | 64 | 'result_buffers' : None, |
|
61 | 'queue' : None | |
|
65 | 'queue' : None, | |
|
66 | 'pyin' : None, | |
|
67 | 'pyout': None, | |
|
68 | 'pyerr': None, | |
|
69 | 'stdout': '', | |
|
70 | 'stderr': '', | |
|
62 | 71 | } |
|
63 | 72 | |
|
64 | 73 | |
@@ -181,19 +190,20 class Hub(object): | |||
|
181 | 190 | # register our callbacks |
|
182 | 191 | self.registrar.on_recv(self.dispatch_register_request) |
|
183 | 192 | self.clientele.on_recv(self.dispatch_client_msg) |
|
184 |
self.queue.on_recv(self.dispatch_ |
|
|
193 | self.queue.on_recv(self.dispatch_monitor_traffic) | |
|
185 | 194 | |
|
186 | 195 | if heartbeat is not None: |
|
187 | 196 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) |
|
188 | 197 | heartbeat.add_new_heart_handler(self.handle_new_heart) |
|
189 | 198 | |
|
190 |
self. |
|
|
199 | self.monitor_handlers = { 'in' : self.save_queue_request, | |
|
191 | 200 | 'out': self.save_queue_result, |
|
192 | 201 | 'intask': self.save_task_request, |
|
193 | 202 | 'outtask': self.save_task_result, |
|
194 | 203 | 'tracktask': self.save_task_destination, |
|
195 | 204 | 'incontrol': _passer, |
|
196 | 205 | 'outcontrol': _passer, |
|
206 | 'iopub': self.save_iopub_message, | |
|
197 | 207 | } |
|
198 | 208 | |
|
199 | 209 | self.client_handlers = {'queue_request': self.queue_status, |
@@ -262,7 +272,7 class Hub(object): | |||
|
262 | 272 | try: |
|
263 | 273 | msg = self.session.unpack_message(msg[1:], content=True) |
|
264 | 274 | except: |
|
265 | logger.error("client::Invalid Message %s"%msg) | |
|
275 | logger.error("client::Invalid Message %s"%msg, exc_info=True) | |
|
266 | 276 | return False |
|
267 | 277 | |
|
268 | 278 | msg_type = msg.get('msg_type', None) |
@@ -299,19 +309,20 class Hub(object): | |||
|
299 | 309 | else: |
|
300 | 310 | handler(idents, msg) |
|
301 | 311 | |
|
302 |
def dispatch_ |
|
|
303 |
"""all ME and Task queue messages come through here |
|
|
304 | logger.debug("queue traffic: %s"%msg[:2]) | |
|
312 | def dispatch_monitor_traffic(self, msg): | |
|
313 | """all ME and Task queue messages come through here, as well as | |
|
314 | IOPub traffic.""" | |
|
315 | logger.debug("monitor traffic: %s"%msg[:2]) | |
|
305 | 316 | switch = msg[0] |
|
306 | 317 | idents, msg = self.session.feed_identities(msg[1:]) |
|
307 | 318 | if not idents: |
|
308 |
logger.error("Bad |
|
|
319 | logger.error("Bad Monitor Message: %s"%msg) | |
|
309 | 320 | return |
|
310 |
handler = self. |
|
|
321 | handler = self.monitor_handlers.get(switch, None) | |
|
311 | 322 | if handler is not None: |
|
312 | 323 | handler(idents, msg) |
|
313 | 324 | else: |
|
314 |
logger.error("Invalid m |
|
|
325 | logger.error("Invalid monitor topic: %s"%switch) | |
|
315 | 326 | |
|
316 | 327 | |
|
317 | 328 | def dispatch_client_msg(self, msg): |
@@ -486,7 +497,7 class Hub(object): | |||
|
486 | 497 | msg = self.session.unpack_message(msg, content=False) |
|
487 | 498 | except: |
|
488 | 499 | logger.error("task::invalid task result message send to %r: %s"%( |
|
489 | client_id, msg)) | |
|
500 | client_id, msg), exc_info=True) | |
|
490 | 501 | raise |
|
491 | 502 | return |
|
492 | 503 | |
@@ -532,7 +543,7 class Hub(object): | |||
|
532 | 543 | try: |
|
533 | 544 | msg = self.session.unpack_message(msg, content=True) |
|
534 | 545 | except: |
|
535 | logger.error("task::invalid task tracking message") | |
|
546 | logger.error("task::invalid task tracking message", exc_info=True) | |
|
536 | 547 | return |
|
537 | 548 | content = msg['content'] |
|
538 | 549 | print (content) |
@@ -557,6 +568,43 class Hub(object): | |||
|
557 | 568 | # self.session.send('mia_reply', content=content, idents=client_id) |
|
558 | 569 | |
|
559 | 570 | |
|
571 | #--------------------- IOPub Traffic ------------------------------ | |
|
572 | ||
|
573 | def save_iopub_message(self, topics, msg): | |
|
574 | """save an iopub message into the db""" | |
|
575 | print (topics) | |
|
576 | try: | |
|
577 | msg = self.session.unpack_message(msg, content=True) | |
|
578 | except: | |
|
579 | logger.error("iopub::invalid IOPub message", exc_info=True) | |
|
580 | return | |
|
581 | ||
|
582 | parent = msg['parent_header'] | |
|
583 | msg_id = parent['msg_id'] | |
|
584 | msg_type = msg['msg_type'] | |
|
585 | content = msg['content'] | |
|
586 | ||
|
587 | # ensure msg_id is in db | |
|
588 | try: | |
|
589 | rec = self.db.get_record(msg_id) | |
|
590 | except: | |
|
591 | logger.error("iopub::IOPub message has invalid parent", exc_info=True) | |
|
592 | return | |
|
593 | # stream | |
|
594 | d = {} | |
|
595 | if msg_type == 'stream': | |
|
596 | name = content['name'] | |
|
597 | s = rec[name] or '' | |
|
598 | d[name] = s + content['data'] | |
|
599 | ||
|
600 | elif msg_type == 'pyerr': | |
|
601 | d['pyerr'] = content | |
|
602 | else: | |
|
603 | d[msg_type] = content['data'] | |
|
604 | ||
|
605 | self.db.update_record(msg_id, d) | |
|
606 | ||
|
607 | ||
|
560 | 608 | |
|
561 | 609 | #------------------------------------------------------------------------- |
|
562 | 610 | # Registration requests |
@@ -579,7 +627,7 class Hub(object): | |||
|
579 | 627 | try: |
|
580 | 628 | queue = content['queue'] |
|
581 | 629 | except KeyError: |
|
582 | logger.error("registration::queue not specified") | |
|
630 | logger.error("registration::queue not specified", exc_info=True) | |
|
583 | 631 | return |
|
584 | 632 | heart = content.get('heartbeat', None) |
|
585 | 633 | """register a new engine, and create the socket(s) necessary""" |
@@ -639,7 +687,7 class Hub(object): | |||
|
639 | 687 | try: |
|
640 | 688 | eid = msg['content']['id'] |
|
641 | 689 | except: |
|
642 | logger.error("registration::bad engine id for unregistration: %s"%ident) | |
|
690 | logger.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True) | |
|
643 | 691 | return |
|
644 | 692 | logger.info("registration::unregister_engine(%s)"%eid) |
|
645 | 693 | content=dict(id=eid, queue=self.engines[eid].queue) |
@@ -662,7 +710,7 class Hub(object): | |||
|
662 | 710 | try: |
|
663 | 711 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) |
|
664 | 712 | except KeyError: |
|
665 | logger.error("registration::tried to finish nonexistant registration") | |
|
713 | logger.error("registration::tried to finish nonexistant registration", exc_info=True) | |
|
666 | 714 | return |
|
667 | 715 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) |
|
668 | 716 | if purge is not None: |
@@ -820,9 +868,13 class Hub(object): | |||
|
820 | 868 | completed.append(msg_id) |
|
821 | 869 | if not statusonly: |
|
822 | 870 | rec = records[msg_id] |
|
871 | io_dict = {} | |
|
872 | for key in 'pyin pyout pyerr stdout stderr'.split(): | |
|
873 | io_dict[key] = rec[key] | |
|
823 | 874 | content[msg_id] = { 'result_content': rec['result_content'], |
|
824 | 875 | 'header': rec['header'], |
|
825 | 876 | 'result_header' : rec['result_header'], |
|
877 | 'io' : io_dict, | |
|
826 | 878 | } |
|
827 | 879 | buffers.extend(map(str, rec['result_buffers'])) |
|
828 | 880 | else: |
@@ -30,9 +30,9 from IPython.external.decorator import decorator | |||
|
30 | 30 | |
|
31 | 31 | @decorator |
|
32 | 32 | def logged(f,self,*args,**kwargs): |
|
33 | print ("#--------------------") | |
|
34 | print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
|
35 | print ("#--") | |
|
33 | # print ("#--------------------") | |
|
34 | # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs)) | |
|
35 | # print ("#--") | |
|
36 | 36 | return f(self,*args, **kwargs) |
|
37 | 37 | |
|
38 | 38 | #---------------------------------------------------------------------- |
@@ -28,6 +28,9 from IPython.core import ultratb | |||
|
28 | 28 | from IPython.utils.traitlets import HasTraits, Instance, List |
|
29 | 29 | from IPython.zmq.completer import KernelCompleter |
|
30 | 30 | from IPython.zmq.log import logger # a Logger object |
|
31 | from IPython.zmq.iostream import OutStream | |
|
32 | from IPython.zmq.displayhook import DisplayHook | |
|
33 | ||
|
31 | 34 | |
|
32 | 35 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ |
|
33 | 36 | unpack_apply_message, ISO8601, wrap_exception |
@@ -36,7 +39,7 import heartmonitor | |||
|
36 | 39 | from client import Client |
|
37 | 40 | |
|
38 | 41 | def printer(*args): |
|
39 | pprint(args) | |
|
42 | pprint(args, stream=sys.__stdout__) | |
|
40 | 43 | |
|
41 | 44 | #----------------------------------------------------------------------------- |
|
42 | 45 | # Main kernel class |
@@ -59,6 +62,7 class Kernel(HasTraits): | |||
|
59 | 62 | def __init__(self, **kwargs): |
|
60 | 63 | super(Kernel, self).__init__(**kwargs) |
|
61 | 64 | self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) |
|
65 | self.prefix = 'engine.%s'%self.identity | |
|
62 | 66 | self.user_ns = {} |
|
63 | 67 | self.history = [] |
|
64 | 68 | self.compiler = CommandCompiler() |
@@ -212,18 +216,22 class Kernel(HasTraits): | |||
|
212 | 216 | return |
|
213 | 217 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) |
|
214 | 218 | # self.iopub_stream.send(pyin_msg) |
|
215 |
self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent |
|
|
219 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | |
|
220 | ident=self.identity+'.pyin') | |
|
216 | 221 | started = datetime.now().strftime(ISO8601) |
|
217 | 222 | try: |
|
218 | 223 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
219 | 224 | # allow for not overriding displayhook |
|
220 | 225 | if hasattr(sys.displayhook, 'set_parent'): |
|
221 | 226 | sys.displayhook.set_parent(parent) |
|
227 | sys.stdout.set_parent(parent) | |
|
228 | sys.stderr.set_parent(parent) | |
|
222 | 229 | exec comp_code in self.user_ns, self.user_ns |
|
223 | 230 | except: |
|
224 | 231 | exc_content = self._wrap_exception('execute') |
|
225 | 232 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
226 |
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent |
|
|
233 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
|
234 | ident=self.identity+'.pyerr') | |
|
227 | 235 | reply_content = exc_content |
|
228 | 236 | else: |
|
229 | 237 | reply_content = {'status' : 'ok'} |
@@ -247,7 +255,7 class Kernel(HasTraits): | |||
|
247 | 255 | return self.completer.complete(msg.content.line, msg.content.text) |
|
248 | 256 | |
|
249 | 257 | def apply_request(self, stream, ident, parent): |
|
250 | print (parent) | |
|
258 | # print (parent) | |
|
251 | 259 | try: |
|
252 | 260 | content = parent[u'content'] |
|
253 | 261 | bufs = parent[u'buffers'] |
@@ -266,6 +274,8 class Kernel(HasTraits): | |||
|
266 | 274 | # allow for not overriding displayhook |
|
267 | 275 | if hasattr(sys.displayhook, 'set_parent'): |
|
268 | 276 | sys.displayhook.set_parent(parent) |
|
277 | sys.stdout.set_parent(parent) | |
|
278 | sys.stderr.set_parent(parent) | |
|
269 | 279 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns |
|
270 | 280 | if bound: |
|
271 | 281 | working = self.user_ns |
@@ -305,7 +315,8 class Kernel(HasTraits): | |||
|
305 | 315 | except: |
|
306 | 316 | exc_content = self._wrap_exception('apply') |
|
307 | 317 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) |
|
308 |
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent |
|
|
318 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | |
|
319 | ident=self.identity+'.pyerr') | |
|
309 | 320 | reply_content = exc_content |
|
310 | 321 | result_buf = [] |
|
311 | 322 | |
@@ -318,7 +329,7 class Kernel(HasTraits): | |||
|
318 | 329 | # self.reply_socket.send_json(reply_msg) |
|
319 | 330 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, |
|
320 | 331 | parent=parent, ident=ident,buffers=result_buf, subheader=sub) |
|
321 | print(Message(reply_msg), file=sys.__stdout__) | |
|
332 | # print(Message(reply_msg), file=sys.__stdout__) | |
|
322 | 333 | # if reply_msg['content']['status'] == u'error': |
|
323 | 334 | # self.abort_queues() |
|
324 | 335 | |
@@ -364,7 +375,7 class Kernel(HasTraits): | |||
|
364 | 375 | |
|
365 | 376 | if self.iopub_stream: |
|
366 | 377 | self.iopub_stream.on_err(printer) |
|
367 | self.iopub_stream.on_send(printer) | |
|
378 | # self.iopub_stream.on_send(printer) | |
|
368 | 379 | |
|
369 | 380 | #### while True mode: |
|
370 | 381 | # while True: |
@@ -388,7 +399,9 class Kernel(HasTraits): | |||
|
388 | 399 | # time.sleep(1e-3) |
|
389 | 400 | |
|
390 | 401 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, |
|
391 |
client_addr=None, loop=None, context=None, key=None |
|
|
402 | client_addr=None, loop=None, context=None, key=None, | |
|
403 | out_stream_factory=OutStream, display_hook_factory=DisplayHook): | |
|
404 | ||
|
392 | 405 | # create loop, context, and session: |
|
393 | 406 | if loop is None: |
|
394 | 407 | loop = ioloop.IOLoop.instance() |
@@ -417,6 +430,17 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | |||
|
417 | 430 | iopub_stream.setsockopt(zmq.IDENTITY, identity) |
|
418 | 431 | iopub_stream.connect(iopub_addr) |
|
419 | 432 | |
|
433 | # Redirect input streams and set a display hook. | |
|
434 | if out_stream_factory: | |
|
435 | sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') | |
|
436 | sys.stdout.topic = identity+'.stdout' | |
|
437 | sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') | |
|
438 | sys.stderr.topic = identity+'.stderr' | |
|
439 | if display_hook_factory: | |
|
440 | sys.displayhook = display_hook_factory(session, iopub_stream) | |
|
441 | sys.displayhook.topic = identity+'.pyout' | |
|
442 | ||
|
443 | ||
|
420 | 444 | # launch heartbeat |
|
421 | 445 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) |
|
422 | 446 | heart.start() |
@@ -336,7 +336,7 class StreamSession(object): | |||
|
336 | 336 | return header.get('key', None) == self.key |
|
337 | 337 | |
|
338 | 338 | |
|
339 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): | |
|
339 | def send(self, stream, msg_or_type, content=None, buffers=None, parent=None, subheader=None, ident=None): | |
|
340 | 340 | """Build and send a message via stream or socket. |
|
341 | 341 | |
|
342 | 342 | Parameters |
@@ -344,10 +344,9 class StreamSession(object): | |||
|
344 | 344 | |
|
345 | 345 | stream : zmq.Socket or ZMQStream |
|
346 | 346 | the socket-like object used to send the data |
|
347 | msg_type : str or Message/dict | |
|
348 | Normally, msg_type will be | |
|
349 | ||
|
350 | ||
|
347 | msg_or_type : str or Message/dict | |
|
348 | Normally, msg_or_type will be a msg_type unless a message is being sent more | |
|
349 | than once. | |
|
351 | 350 | |
|
352 | 351 | Returns |
|
353 | 352 | ------- |
@@ -356,13 +355,13 class StreamSession(object): | |||
|
356 | 355 | the nice wrapped dict-like object containing the headers |
|
357 | 356 | |
|
358 | 357 | """ |
|
359 | if isinstance(msg_type, (Message, dict)): | |
|
358 | if isinstance(msg_or_type, (Message, dict)): | |
|
360 | 359 | # we got a Message, not a msg_type |
|
361 | 360 | # don't build a new Message |
|
362 | msg = msg_type | |
|
361 | msg = msg_or_type | |
|
363 | 362 | content = msg['content'] |
|
364 | 363 | else: |
|
365 | msg = self.msg(msg_type, content, parent, subheader) | |
|
364 | msg = self.msg(msg_or_type, content, parent, subheader) | |
|
366 | 365 | buffers = [] if buffers is None else buffers |
|
367 | 366 | to_send = [] |
|
368 | 367 | if isinstance(ident, list): |
General Comments 0
You need to be logged in to leave comments.
Login now