Show More
@@ -54,7 +54,8 b' class ClientCompleter(object):' | |||
|
54 | 54 | |
|
55 | 55 | # Give the kernel up to 0.5s to respond |
|
56 | 56 | for i in range(5): |
|
57 | rep = self.session.recv(self.socket) | |
|
57 | ident,rep = self.session.recv(self.socket) | |
|
58 | rep = Message(rep) | |
|
58 | 59 | if rep is not None and rep.msg_type == 'complete_reply': |
|
59 | 60 | matches = rep.content.matches |
|
60 | 61 | break |
@@ -14,9 +14,8 b' class DisplayHook(object):' | |||
|
14 | 14 | return |
|
15 | 15 | |
|
16 | 16 | __builtin__._ = obj |
|
17 |
msg = self.session. |
|
|
17 | msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, | |
|
18 | 18 | parent=self.parent_header) |
|
19 | self.pub_socket.send_json(msg) | |
|
20 | 19 | |
|
21 | 20 | def set_parent(self, parent): |
|
22 | 21 | self.parent_header = extract_header(parent) No newline at end of file |
@@ -92,10 +92,10 b' class Console(code.InteractiveConsole):' | |||
|
92 | 92 | |
|
93 | 93 | def recv_output(self): |
|
94 | 94 | while True: |
|
95 |
|
|
|
96 |
if |
|
|
95 | ident,msg = self.session.recv(self.sub_socket) | |
|
96 | if msg is None: | |
|
97 | 97 | break |
|
98 |
self.handle_output( |
|
|
98 | self.handle_output(Message(msg)) | |
|
99 | 99 | |
|
100 | 100 | def handle_reply(self, rep): |
|
101 | 101 | # Handle any side effects on output channels |
@@ -114,9 +114,10 b' class Console(code.InteractiveConsole):' | |||
|
114 | 114 | print >> sys.stderr, ab |
|
115 | 115 | |
|
116 | 116 | def recv_reply(self): |
|
117 | rep = self.session.recv(self.request_socket) | |
|
118 | self.handle_reply(rep) | |
|
119 | return rep | |
|
117 | ident,rep = self.session.recv(self.request_socket) | |
|
118 | mrep = Message(rep) | |
|
119 | self.handle_reply(mrep) | |
|
120 | return mrep | |
|
120 | 121 | |
|
121 | 122 | def runcode(self, code): |
|
122 | 123 | # We can't pickle code objects, so fetch the actual source |
@@ -37,10 +37,9 b' class OutStream(object):' | |||
|
37 | 37 | data = self._buffer.getvalue() |
|
38 | 38 | if data: |
|
39 | 39 | content = {u'name':self.name, u'data':data} |
|
40 |
msg = self.session. |
|
|
40 | msg = self.session.send(self.pub_socket, u'stream', content=content, | |
|
41 | 41 | parent=self.parent_header) |
|
42 | 42 | io.raw_print(msg) |
|
43 | self.pub_socket.send_json(msg) | |
|
44 | 43 | |
|
45 | 44 | self._buffer.close() |
|
46 | 45 | self._new_buffer() |
@@ -106,17 +106,14 b' class Kernel(Configurable):' | |||
|
106 | 106 | def do_one_iteration(self): |
|
107 | 107 | """Do one iteration of the kernel's evaluation loop. |
|
108 | 108 | """ |
|
109 | try: | |
|
110 | ident = self.reply_socket.recv(zmq.NOBLOCK) | |
|
111 | except zmq.ZMQError, e: | |
|
112 | if e.errno == zmq.EAGAIN: | |
|
109 | ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) | |
|
110 | if msg is None: | |
|
113 | 111 |
|
|
114 |
|
|
|
115 | raise | |
|
112 | ||
|
116 | 113 | # This assert will raise in versions of zeromq 2.0.7 and lesser. |
|
117 | 114 | # We now require 2.0.8 or above, so we can uncomment for safety. |
|
118 | assert self.reply_socket.rcvmore(), "Missing message part." | |
|
119 | msg = self.reply_socket.recv_json() | |
|
115 | # print(ident,msg, file=sys.__stdout__) | |
|
116 | assert ident is not None, "Missing message part." | |
|
120 | 117 | |
|
121 | 118 | # Print some info about this message and leave a '--->' marker, so it's |
|
122 | 119 | # easier to trace visually the message chain when debugging. Each |
@@ -169,17 +166,15 b' class Kernel(Configurable):' | |||
|
169 | 166 | def _publish_pyin(self, code, parent): |
|
170 | 167 | """Publish the code request on the pyin stream.""" |
|
171 | 168 | |
|
172 |
pyin_msg = self.session. |
|
|
173 | self.pub_socket.send_json(pyin_msg) | |
|
169 | pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) | |
|
174 | 170 | |
|
175 | 171 | def execute_request(self, ident, parent): |
|
176 | 172 | |
|
177 |
status_msg = self.session. |
|
|
173 | status_msg = self.session.send(self.pub_socket, | |
|
178 | 174 | u'status', |
|
179 | 175 | {u'execution_state':u'busy'}, |
|
180 | 176 | parent=parent |
|
181 | 177 | ) |
|
182 | self.pub_socket.send_json(status_msg) | |
|
183 | 178 | |
|
184 | 179 | try: |
|
185 | 180 | content = parent[u'content'] |
@@ -264,7 +259,7 b' class Kernel(Configurable):' | |||
|
264 | 259 | shell.payload_manager.clear_payload() |
|
265 | 260 | |
|
266 | 261 | # Send the reply. |
|
267 |
reply_msg = self.session. |
|
|
262 | reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) | |
|
268 | 263 | io.raw_print(reply_msg) |
|
269 | 264 | |
|
270 | 265 | # Flush output before sending the reply. |
@@ -276,17 +271,14 b' class Kernel(Configurable):' | |||
|
276 | 271 | if self._execute_sleep: |
|
277 | 272 | time.sleep(self._execute_sleep) |
|
278 | 273 | |
|
279 | self.reply_socket.send(ident, zmq.SNDMORE) | |
|
280 | self.reply_socket.send_json(reply_msg) | |
|
281 | 274 | if reply_msg['content']['status'] == u'error': |
|
282 | 275 | self._abort_queue() |
|
283 | 276 | |
|
284 |
status_msg = self.session. |
|
|
277 | status_msg = self.session.send(self.pub_socket, | |
|
285 | 278 | u'status', |
|
286 | 279 | {u'execution_state':u'idle'}, |
|
287 | 280 | parent=parent |
|
288 | 281 | ) |
|
289 | self.pub_socket.send_json(status_msg) | |
|
290 | 282 | |
|
291 | 283 | def complete_request(self, ident, parent): |
|
292 | 284 | txt, matches = self._complete(parent) |
@@ -335,22 +327,18 b' class Kernel(Configurable):' | |||
|
335 | 327 | |
|
336 | 328 | def _abort_queue(self): |
|
337 | 329 | while True: |
|
338 | try: | |
|
339 | ident = self.reply_socket.recv(zmq.NOBLOCK) | |
|
340 | except zmq.ZMQError, e: | |
|
341 | if e.errno == zmq.EAGAIN: | |
|
330 | ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) | |
|
331 | if msg is None: | |
|
342 | 332 |
|
|
343 | 333 | else: |
|
344 | assert self.reply_socket.rcvmore(), \ | |
|
334 | assert ident is not None, \ | |
|
345 | 335 | "Unexpected missing message part." |
|
346 | msg = self.reply_socket.recv_json() | |
|
347 | 336 | io.raw_print("Aborting:\n", Message(msg)) |
|
348 | 337 | msg_type = msg['msg_type'] |
|
349 | 338 | reply_type = msg_type.split('_')[0] + '_reply' |
|
350 |
reply_msg = self.session. |
|
|
339 | reply_msg = self.session.send(self.reply_socket, reply_type, | |
|
340 | {'status' : 'aborted'}, msg, ident=ident) | |
|
351 | 341 | io.raw_print(reply_msg) |
|
352 | self.reply_socket.send(ident,zmq.SNDMORE) | |
|
353 | self.reply_socket.send_json(reply_msg) | |
|
354 | 342 | # We need to wait a bit for requests to come in. This can probably |
|
355 | 343 | # be set shorter for true asynchronous clients. |
|
356 | 344 | time.sleep(0.1) |
@@ -362,11 +350,10 b' class Kernel(Configurable):' | |||
|
362 | 350 | |
|
363 | 351 | # Send the input request. |
|
364 | 352 | content = dict(prompt=prompt) |
|
365 |
msg = self.session. |
|
|
366 | self.req_socket.send_json(msg) | |
|
353 | msg = self.session.send(self.req_socket, u'input_request', content, parent) | |
|
367 | 354 | |
|
368 | 355 | # Await a response. |
|
369 |
reply = self.req_socket |
|
|
356 | ident, reply = self.session.recv(self.req_socket, 0) | |
|
370 | 357 | try: |
|
371 | 358 | value = reply['content']['value'] |
|
372 | 359 | except: |
@@ -423,8 +410,8 b' class Kernel(Configurable):' | |||
|
423 | 410 | """ |
|
424 | 411 | # io.rprint("Kernel at_shutdown") # dbg |
|
425 | 412 | if self._shutdown_message is not None: |
|
426 |
self.reply_socket |
|
|
427 |
self.pub_socket |
|
|
413 | self.session.send(self.reply_socket, self._shutdown_message) | |
|
414 | self.session.send(self.pub_socket, self._shutdown_message) | |
|
428 | 415 | io.raw_print(self._shutdown_message) |
|
429 | 416 | # A very short sleep to give zmq time to flush its message buffers |
|
430 | 417 | # before Python truly shuts down. |
@@ -33,7 +33,7 b' from zmq.eventloop import ioloop' | |||
|
33 | 33 | from IPython.utils import io |
|
34 | 34 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS |
|
35 | 35 | from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress |
|
36 | from session import Session | |
|
36 | from session import Session, Message | |
|
37 | 37 | |
|
38 | 38 | #----------------------------------------------------------------------------- |
|
39 | 39 | # Constants and exceptions |
@@ -330,7 +330,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
330 | 330 | self._handle_recv() |
|
331 | 331 | |
|
332 | 332 | def _handle_recv(self): |
|
333 | msg = self.socket.recv_json() | |
|
333 | ident,msg = self.session.recv(self.socket, 0) | |
|
334 | 334 | self.call_handlers(msg) |
|
335 | 335 | |
|
336 | 336 | def _handle_send(self): |
@@ -339,7 +339,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||
|
339 | 339 | except Empty: |
|
340 | 340 | pass |
|
341 | 341 | else: |
|
342 |
self.socket |
|
|
342 | self.session.send(self.socket,msg) | |
|
343 | 343 | if self.command_queue.empty(): |
|
344 | 344 | self.drop_io_state(POLLOUT) |
|
345 | 345 | |
@@ -424,12 +424,14 b' class SubSocketChannel(ZmqSocketChannel):' | |||
|
424 | 424 | # Get all of the messages we can |
|
425 | 425 | while True: |
|
426 | 426 | try: |
|
427 | msg = self.socket.recv_json(zmq.NOBLOCK) | |
|
427 | ident,msg = self.session.recv(self.socket) | |
|
428 | 428 | except zmq.ZMQError: |
|
429 | 429 | # Check the errno? |
|
430 | 430 | # Will this trigger POLLERR? |
|
431 | 431 | break |
|
432 | 432 | else: |
|
433 | if msg is None: | |
|
434 | break | |
|
433 | 435 | self.call_handlers(msg) |
|
434 | 436 | |
|
435 | 437 | def _flush(self): |
@@ -486,7 +488,7 b' class RepSocketChannel(ZmqSocketChannel):' | |||
|
486 | 488 | self._handle_recv() |
|
487 | 489 | |
|
488 | 490 | def _handle_recv(self): |
|
489 | msg = self.socket.recv_json() | |
|
491 | ident,msg = self.session.recv(self.socket, 0) | |
|
490 | 492 | self.call_handlers(msg) |
|
491 | 493 | |
|
492 | 494 | def _handle_send(self): |
@@ -495,7 +497,7 b' class RepSocketChannel(ZmqSocketChannel):' | |||
|
495 | 497 | except Empty: |
|
496 | 498 | pass |
|
497 | 499 | else: |
|
498 |
self.socket |
|
|
500 | self.session.send(self.socket,msg) | |
|
499 | 501 | if self.msg_queue.empty(): |
|
500 | 502 | self.drop_io_state(POLLOUT) |
|
501 | 503 | |
@@ -546,7 +548,7 b' class HBSocketChannel(ZmqSocketChannel):' | |||
|
546 | 548 | request_time = time.time() |
|
547 | 549 | try: |
|
548 | 550 | #io.rprint('Ping from HB channel') # dbg |
|
549 |
self.socket.send |
|
|
551 | self.socket.send(b'ping') | |
|
550 | 552 | except zmq.ZMQError, e: |
|
551 | 553 | #io.rprint('*** HB Error:', e) # dbg |
|
552 | 554 | if e.errno == zmq.EFSM: |
@@ -558,7 +560,7 b' class HBSocketChannel(ZmqSocketChannel):' | |||
|
558 | 560 | else: |
|
559 | 561 | while True: |
|
560 | 562 | try: |
|
561 |
self.socket.recv |
|
|
563 | self.socket.recv(zmq.NOBLOCK) | |
|
562 | 564 | except zmq.ZMQError, e: |
|
563 | 565 | #io.rprint('*** HB Error 2:', e) # dbg |
|
564 | 566 | if e.errno == zmq.EAGAIN: |
@@ -69,9 +69,8 b' class Kernel(HasTraits):' | |||
|
69 | 69 | """ Start the kernel main loop. |
|
70 | 70 | """ |
|
71 | 71 | while True: |
|
72 |
ident = self. |
|
|
73 |
assert |
|
|
74 | msg = self.reply_socket.recv_json() | |
|
72 | ident,msg = self.session.recv(self.reply_socket,0) | |
|
73 | assert ident is not None, "Missing message part." | |
|
75 | 74 | omsg = Message(msg) |
|
76 | 75 | print>>sys.__stdout__ |
|
77 | 76 | print>>sys.__stdout__, omsg |
@@ -105,8 +104,7 b' class Kernel(HasTraits):' | |||
|
105 | 104 | print>>sys.__stderr__, "Got bad msg: " |
|
106 | 105 | print>>sys.__stderr__, Message(parent) |
|
107 | 106 | return |
|
108 |
pyin_msg = self.session. |
|
|
109 | self.pub_socket.send_json(pyin_msg) | |
|
107 | pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) | |
|
110 | 108 | |
|
111 | 109 | try: |
|
112 | 110 | comp_code = self.compiler(code, '<zmq-kernel>') |
@@ -131,8 +129,7 b' class Kernel(HasTraits):' | |||
|
131 | 129 | u'ename' : unicode(etype.__name__), |
|
132 | 130 | u'evalue' : unicode(evalue) |
|
133 | 131 | } |
|
134 |
exc_msg = self.session. |
|
|
135 | self.pub_socket.send_json(exc_msg) | |
|
132 | exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent) | |
|
136 | 133 | reply_content = exc_content |
|
137 | 134 | else: |
|
138 | 135 | reply_content = { 'status' : 'ok', 'payload' : {} } |
@@ -142,10 +139,8 b' class Kernel(HasTraits):' | |||
|
142 | 139 | sys.stdout.flush() |
|
143 | 140 | |
|
144 | 141 | # Send the reply. |
|
145 |
reply_msg = self.session. |
|
|
142 | reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) | |
|
146 | 143 | print>>sys.__stdout__, Message(reply_msg) |
|
147 | self.reply_socket.send(ident, zmq.SNDMORE) | |
|
148 | self.reply_socket.send_json(reply_msg) | |
|
149 | 144 | if reply_msg['content']['status'] == u'error': |
|
150 | 145 | self._abort_queue() |
|
151 | 146 | |
@@ -180,21 +175,18 b' class Kernel(HasTraits):' | |||
|
180 | 175 | def _abort_queue(self): |
|
181 | 176 | while True: |
|
182 | 177 | try: |
|
183 |
ident = self. |
|
|
178 | ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) | |
|
184 | 179 | except zmq.ZMQError, e: |
|
185 | 180 | if e.errno == zmq.EAGAIN: |
|
186 | 181 | break |
|
187 | 182 | else: |
|
188 |
assert |
|
|
189 | msg = self.reply_socket.recv_json() | |
|
183 | assert ident is not None, "Missing message part." | |
|
190 | 184 | print>>sys.__stdout__, "Aborting:" |
|
191 | 185 | print>>sys.__stdout__, Message(msg) |
|
192 | 186 | msg_type = msg['msg_type'] |
|
193 | 187 | reply_type = msg_type.split('_')[0] + '_reply' |
|
194 |
reply_msg = self.session. |
|
|
188 | reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident) | |
|
195 | 189 | print>>sys.__stdout__, Message(reply_msg) |
|
196 | self.reply_socket.send(ident,zmq.SNDMORE) | |
|
197 | self.reply_socket.send_json(reply_msg) | |
|
198 | 190 | # We need to wait a bit for requests to come in. This can probably |
|
199 | 191 | # be set shorter for true asynchronous clients. |
|
200 | 192 | time.sleep(0.1) |
@@ -206,11 +198,10 b' class Kernel(HasTraits):' | |||
|
206 | 198 | |
|
207 | 199 | # Send the input request. |
|
208 | 200 | content = dict(prompt=prompt) |
|
209 |
msg = self.session. |
|
|
210 | self.req_socket.send_json(msg) | |
|
201 | msg = self.session.send(self.req_socket, u'input_request', content, parent) | |
|
211 | 202 | |
|
212 | 203 | # Await a response. |
|
213 |
reply = self.req_socket |
|
|
204 | ident,reply = self.session.recv(self.req_socket, 0) | |
|
214 | 205 | try: |
|
215 | 206 | value = reply['content']['value'] |
|
216 | 207 | except: |
@@ -4,6 +4,8 b' import pprint' | |||
|
4 | 4 | |
|
5 | 5 | import zmq |
|
6 | 6 | |
|
7 | from zmq.utils import jsonapi as json | |
|
8 | ||
|
7 | 9 | class Message(object): |
|
8 | 10 | """A simple message object that maps dict keys to attributes. |
|
9 | 11 | |
@@ -86,24 +88,35 b' class Session(object):' | |||
|
86 | 88 | return msg |
|
87 | 89 | |
|
88 | 90 | def send(self, socket, msg_type, content=None, parent=None, ident=None): |
|
91 | if isinstance(msg_type, (Message, dict)): | |
|
92 | msg = dict(msg_type) | |
|
93 | else: | |
|
89 | 94 | msg = self.msg(msg_type, content, parent) |
|
90 | 95 | if ident is not None: |
|
91 | 96 | socket.send(ident, zmq.SNDMORE) |
|
92 | 97 | socket.send_json(msg) |
|
93 | omsg = Message(msg) | |
|
94 |
return |
|
|
98 | # omsg = Message(msg) | |
|
99 | return msg | |
|
95 | 100 | |
|
96 | 101 | def recv(self, socket, mode=zmq.NOBLOCK): |
|
97 | 102 | try: |
|
98 |
msg = socket.recv_ |
|
|
103 | msg = socket.recv_multipart(mode) | |
|
99 | 104 | except zmq.ZMQError, e: |
|
100 | 105 | if e.errno == zmq.EAGAIN: |
|
101 | 106 | # We can convert EAGAIN to None as we know in this case |
|
102 | 107 | # recv_json won't return None. |
|
103 | return None | |
|
108 | return None,None | |
|
104 | 109 | else: |
|
105 | 110 | raise |
|
106 | return Message(msg) | |
|
111 | if len(msg) == 1: | |
|
112 | ident=None | |
|
113 | msg = msg[0] | |
|
114 | elif len(msg) == 2: | |
|
115 | ident, msg = msg | |
|
116 | else: | |
|
117 | raise ValueError("Got message with length > 2, which is invalid") | |
|
118 | ||
|
119 | return ident, json.loads(msg) | |
|
107 | 120 | |
|
108 | 121 | def test_msg2obj(): |
|
109 | 122 | am = dict(x=1) |
@@ -71,7 +71,7 b' class ZMQDisplayHook(DisplayHook):' | |||
|
71 | 71 | |
|
72 | 72 | def finish_displayhook(self): |
|
73 | 73 | """Finish up all displayhook activities.""" |
|
74 |
self.pub_socket |
|
|
74 | self.session.send(self.pub_socket, self.msg) | |
|
75 | 75 | self.msg = None |
|
76 | 76 | |
|
77 | 77 | |
@@ -126,10 +126,9 b' class ZMQInteractiveShell(InteractiveShell):' | |||
|
126 | 126 | } |
|
127 | 127 | |
|
128 | 128 | dh = self.displayhook |
|
129 | exc_msg = dh.session.msg(u'pyerr', exc_content, dh.parent_header) | |
|
130 | 129 | # Send exception info over pub socket for other clients than the caller |
|
131 | 130 | # to pick up |
|
132 | dh.pub_socket.send_json(exc_msg) | |
|
131 | exc_msg = dh.session.send(dh.pub_socket, u'pyerr', exc_content, dh.parent_header) | |
|
133 | 132 | |
|
134 | 133 | # FIXME - Hack: store exception info in shell object. Right now, the |
|
135 | 134 | # caller is reading this info after the fact, we need to fix this logic |
General Comments 0
You need to be logged in to leave comments.
Login now