Show More
@@ -54,7 +54,8 b' class ClientCompleter(object):' | |||||
54 |
|
54 | |||
55 | # Give the kernel up to 0.5s to respond |
|
55 | # Give the kernel up to 0.5s to respond | |
56 | for i in range(5): |
|
56 | for i in range(5): | |
57 | rep = self.session.recv(self.socket) |
|
57 | ident,rep = self.session.recv(self.socket) | |
|
58 | rep = Message(rep) | |||
58 | if rep is not None and rep.msg_type == 'complete_reply': |
|
59 | if rep is not None and rep.msg_type == 'complete_reply': | |
59 | matches = rep.content.matches |
|
60 | matches = rep.content.matches | |
60 | break |
|
61 | break |
@@ -14,9 +14,8 b' class DisplayHook(object):' | |||||
14 | return |
|
14 | return | |
15 |
|
15 | |||
16 | __builtin__._ = obj |
|
16 | __builtin__._ = obj | |
17 |
msg = self.session. |
|
17 | msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, | |
18 | parent=self.parent_header) |
|
18 | parent=self.parent_header) | |
19 | self.pub_socket.send_json(msg) |
|
|||
20 |
|
19 | |||
21 | def set_parent(self, parent): |
|
20 | def set_parent(self, parent): | |
22 | self.parent_header = extract_header(parent) No newline at end of file |
|
21 | self.parent_header = extract_header(parent) |
@@ -92,10 +92,10 b' class Console(code.InteractiveConsole):' | |||||
92 |
|
92 | |||
93 | def recv_output(self): |
|
93 | def recv_output(self): | |
94 | while True: |
|
94 | while True: | |
95 |
|
|
95 | ident,msg = self.session.recv(self.sub_socket) | |
96 |
if |
|
96 | if msg is None: | |
97 | break |
|
97 | break | |
98 |
self.handle_output( |
|
98 | self.handle_output(Message(msg)) | |
99 |
|
99 | |||
100 | def handle_reply(self, rep): |
|
100 | def handle_reply(self, rep): | |
101 | # Handle any side effects on output channels |
|
101 | # Handle any side effects on output channels | |
@@ -114,9 +114,10 b' class Console(code.InteractiveConsole):' | |||||
114 | print >> sys.stderr, ab |
|
114 | print >> sys.stderr, ab | |
115 |
|
115 | |||
116 | def recv_reply(self): |
|
116 | def recv_reply(self): | |
117 | rep = self.session.recv(self.request_socket) |
|
117 | ident,rep = self.session.recv(self.request_socket) | |
118 | self.handle_reply(rep) |
|
118 | mrep = Message(rep) | |
119 | return rep |
|
119 | self.handle_reply(mrep) | |
|
120 | return mrep | |||
120 |
|
121 | |||
121 | def runcode(self, code): |
|
122 | def runcode(self, code): | |
122 | # We can't pickle code objects, so fetch the actual source |
|
123 | # We can't pickle code objects, so fetch the actual source |
@@ -37,10 +37,9 b' class OutStream(object):' | |||||
37 | data = self._buffer.getvalue() |
|
37 | data = self._buffer.getvalue() | |
38 | if data: |
|
38 | if data: | |
39 | content = {u'name':self.name, u'data':data} |
|
39 | content = {u'name':self.name, u'data':data} | |
40 |
msg = self.session. |
|
40 | msg = self.session.send(self.pub_socket, u'stream', content=content, | |
41 | parent=self.parent_header) |
|
41 | parent=self.parent_header) | |
42 | io.raw_print(msg) |
|
42 | io.raw_print(msg) | |
43 | self.pub_socket.send_json(msg) |
|
|||
44 |
|
43 | |||
45 | self._buffer.close() |
|
44 | self._buffer.close() | |
46 | self._new_buffer() |
|
45 | self._new_buffer() |
@@ -106,17 +106,14 b' class Kernel(Configurable):' | |||||
106 | def do_one_iteration(self): |
|
106 | def do_one_iteration(self): | |
107 | """Do one iteration of the kernel's evaluation loop. |
|
107 | """Do one iteration of the kernel's evaluation loop. | |
108 | """ |
|
108 | """ | |
109 | try: |
|
109 | ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) | |
110 | ident = self.reply_socket.recv(zmq.NOBLOCK) |
|
110 | if msg is None: | |
111 | except zmq.ZMQError, e: |
|
111 | return | |
112 | if e.errno == zmq.EAGAIN: |
|
112 | ||
113 | return |
|
|||
114 | else: |
|
|||
115 | raise |
|
|||
116 | # This assert will raise in versions of zeromq 2.0.7 and lesser. |
|
113 | # This assert will raise in versions of zeromq 2.0.7 and lesser. | |
117 | # We now require 2.0.8 or above, so we can uncomment for safety. |
|
114 | # We now require 2.0.8 or above, so we can uncomment for safety. | |
118 | assert self.reply_socket.rcvmore(), "Missing message part." |
|
115 | # print(ident,msg, file=sys.__stdout__) | |
119 | msg = self.reply_socket.recv_json() |
|
116 | assert ident is not None, "Missing message part." | |
120 |
|
117 | |||
121 | # Print some info about this message and leave a '--->' marker, so it's |
|
118 | # Print some info about this message and leave a '--->' marker, so it's | |
122 | # easier to trace visually the message chain when debugging. Each |
|
119 | # easier to trace visually the message chain when debugging. Each | |
@@ -169,17 +166,15 b' class Kernel(Configurable):' | |||||
169 | def _publish_pyin(self, code, parent): |
|
166 | def _publish_pyin(self, code, parent): | |
170 | """Publish the code request on the pyin stream.""" |
|
167 | """Publish the code request on the pyin stream.""" | |
171 |
|
168 | |||
172 |
pyin_msg = self.session. |
|
169 | pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) | |
173 | self.pub_socket.send_json(pyin_msg) |
|
|||
174 |
|
170 | |||
175 | def execute_request(self, ident, parent): |
|
171 | def execute_request(self, ident, parent): | |
176 |
|
172 | |||
177 |
status_msg = self.session. |
|
173 | status_msg = self.session.send(self.pub_socket, | |
178 | u'status', |
|
174 | u'status', | |
179 | {u'execution_state':u'busy'}, |
|
175 | {u'execution_state':u'busy'}, | |
180 | parent=parent |
|
176 | parent=parent | |
181 | ) |
|
177 | ) | |
182 | self.pub_socket.send_json(status_msg) |
|
|||
183 |
|
178 | |||
184 | try: |
|
179 | try: | |
185 | content = parent[u'content'] |
|
180 | content = parent[u'content'] | |
@@ -264,7 +259,7 b' class Kernel(Configurable):' | |||||
264 | shell.payload_manager.clear_payload() |
|
259 | shell.payload_manager.clear_payload() | |
265 |
|
260 | |||
266 | # Send the reply. |
|
261 | # Send the reply. | |
267 |
reply_msg = self.session. |
|
262 | reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) | |
268 | io.raw_print(reply_msg) |
|
263 | io.raw_print(reply_msg) | |
269 |
|
264 | |||
270 | # Flush output before sending the reply. |
|
265 | # Flush output before sending the reply. | |
@@ -276,17 +271,14 b' class Kernel(Configurable):' | |||||
276 | if self._execute_sleep: |
|
271 | if self._execute_sleep: | |
277 | time.sleep(self._execute_sleep) |
|
272 | time.sleep(self._execute_sleep) | |
278 |
|
273 | |||
279 | self.reply_socket.send(ident, zmq.SNDMORE) |
|
|||
280 | self.reply_socket.send_json(reply_msg) |
|
|||
281 | if reply_msg['content']['status'] == u'error': |
|
274 | if reply_msg['content']['status'] == u'error': | |
282 | self._abort_queue() |
|
275 | self._abort_queue() | |
283 |
|
276 | |||
284 |
status_msg = self.session. |
|
277 | status_msg = self.session.send(self.pub_socket, | |
285 | u'status', |
|
278 | u'status', | |
286 | {u'execution_state':u'idle'}, |
|
279 | {u'execution_state':u'idle'}, | |
287 | parent=parent |
|
280 | parent=parent | |
288 | ) |
|
281 | ) | |
289 | self.pub_socket.send_json(status_msg) |
|
|||
290 |
|
282 | |||
291 | def complete_request(self, ident, parent): |
|
283 | def complete_request(self, ident, parent): | |
292 | txt, matches = self._complete(parent) |
|
284 | txt, matches = self._complete(parent) | |
@@ -335,22 +327,18 b' class Kernel(Configurable):' | |||||
335 |
|
327 | |||
336 | def _abort_queue(self): |
|
328 | def _abort_queue(self): | |
337 | while True: |
|
329 | while True: | |
338 | try: |
|
330 | ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) | |
339 | ident = self.reply_socket.recv(zmq.NOBLOCK) |
|
331 | if msg is None: | |
340 | except zmq.ZMQError, e: |
|
332 | break | |
341 | if e.errno == zmq.EAGAIN: |
|
|||
342 | break |
|
|||
343 | else: |
|
333 | else: | |
344 | assert self.reply_socket.rcvmore(), \ |
|
334 | assert ident is not None, \ | |
345 | "Unexpected missing message part." |
|
335 | "Unexpected missing message part." | |
346 | msg = self.reply_socket.recv_json() |
|
|||
347 | io.raw_print("Aborting:\n", Message(msg)) |
|
336 | io.raw_print("Aborting:\n", Message(msg)) | |
348 | msg_type = msg['msg_type'] |
|
337 | msg_type = msg['msg_type'] | |
349 | reply_type = msg_type.split('_')[0] + '_reply' |
|
338 | reply_type = msg_type.split('_')[0] + '_reply' | |
350 |
reply_msg = self.session. |
|
339 | reply_msg = self.session.send(self.reply_socket, reply_type, | |
|
340 | {'status' : 'aborted'}, msg, ident=ident) | |||
351 | io.raw_print(reply_msg) |
|
341 | io.raw_print(reply_msg) | |
352 | self.reply_socket.send(ident,zmq.SNDMORE) |
|
|||
353 | self.reply_socket.send_json(reply_msg) |
|
|||
354 | # We need to wait a bit for requests to come in. This can probably |
|
342 | # We need to wait a bit for requests to come in. This can probably | |
355 | # be set shorter for true asynchronous clients. |
|
343 | # be set shorter for true asynchronous clients. | |
356 | time.sleep(0.1) |
|
344 | time.sleep(0.1) | |
@@ -362,11 +350,10 b' class Kernel(Configurable):' | |||||
362 |
|
350 | |||
363 | # Send the input request. |
|
351 | # Send the input request. | |
364 | content = dict(prompt=prompt) |
|
352 | content = dict(prompt=prompt) | |
365 |
msg = self.session. |
|
353 | msg = self.session.send(self.req_socket, u'input_request', content, parent) | |
366 | self.req_socket.send_json(msg) |
|
|||
367 |
|
354 | |||
368 | # Await a response. |
|
355 | # Await a response. | |
369 |
reply = self.req_socket |
|
356 | ident, reply = self.session.recv(self.req_socket, 0) | |
370 | try: |
|
357 | try: | |
371 | value = reply['content']['value'] |
|
358 | value = reply['content']['value'] | |
372 | except: |
|
359 | except: | |
@@ -423,8 +410,8 b' class Kernel(Configurable):' | |||||
423 | """ |
|
410 | """ | |
424 | # io.rprint("Kernel at_shutdown") # dbg |
|
411 | # io.rprint("Kernel at_shutdown") # dbg | |
425 | if self._shutdown_message is not None: |
|
412 | if self._shutdown_message is not None: | |
426 |
self.reply_socket |
|
413 | self.session.send(self.reply_socket, self._shutdown_message) | |
427 |
self.pub_socket |
|
414 | self.session.send(self.pub_socket, self._shutdown_message) | |
428 | io.raw_print(self._shutdown_message) |
|
415 | io.raw_print(self._shutdown_message) | |
429 | # A very short sleep to give zmq time to flush its message buffers |
|
416 | # A very short sleep to give zmq time to flush its message buffers | |
430 | # before Python truly shuts down. |
|
417 | # before Python truly shuts down. |
@@ -33,7 +33,7 b' from zmq.eventloop import ioloop' | |||||
33 | from IPython.utils import io |
|
33 | from IPython.utils import io | |
34 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS |
|
34 | from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS | |
35 | from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress |
|
35 | from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress | |
36 | from session import Session |
|
36 | from session import Session, Message | |
37 |
|
37 | |||
38 | #----------------------------------------------------------------------------- |
|
38 | #----------------------------------------------------------------------------- | |
39 | # Constants and exceptions |
|
39 | # Constants and exceptions | |
@@ -330,7 +330,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
330 | self._handle_recv() |
|
330 | self._handle_recv() | |
331 |
|
331 | |||
332 | def _handle_recv(self): |
|
332 | def _handle_recv(self): | |
333 | msg = self.socket.recv_json() |
|
333 | ident,msg = self.session.recv(self.socket, 0) | |
334 | self.call_handlers(msg) |
|
334 | self.call_handlers(msg) | |
335 |
|
335 | |||
336 | def _handle_send(self): |
|
336 | def _handle_send(self): | |
@@ -339,7 +339,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
339 | except Empty: |
|
339 | except Empty: | |
340 | pass |
|
340 | pass | |
341 | else: |
|
341 | else: | |
342 |
self.socket |
|
342 | self.session.send(self.socket,msg) | |
343 | if self.command_queue.empty(): |
|
343 | if self.command_queue.empty(): | |
344 | self.drop_io_state(POLLOUT) |
|
344 | self.drop_io_state(POLLOUT) | |
345 |
|
345 | |||
@@ -424,12 +424,14 b' class SubSocketChannel(ZmqSocketChannel):' | |||||
424 | # Get all of the messages we can |
|
424 | # Get all of the messages we can | |
425 | while True: |
|
425 | while True: | |
426 | try: |
|
426 | try: | |
427 | msg = self.socket.recv_json(zmq.NOBLOCK) |
|
427 | ident,msg = self.session.recv(self.socket) | |
428 | except zmq.ZMQError: |
|
428 | except zmq.ZMQError: | |
429 | # Check the errno? |
|
429 | # Check the errno? | |
430 | # Will this trigger POLLERR? |
|
430 | # Will this trigger POLLERR? | |
431 | break |
|
431 | break | |
432 | else: |
|
432 | else: | |
|
433 | if msg is None: | |||
|
434 | break | |||
433 | self.call_handlers(msg) |
|
435 | self.call_handlers(msg) | |
434 |
|
436 | |||
435 | def _flush(self): |
|
437 | def _flush(self): | |
@@ -486,7 +488,7 b' class RepSocketChannel(ZmqSocketChannel):' | |||||
486 | self._handle_recv() |
|
488 | self._handle_recv() | |
487 |
|
489 | |||
488 | def _handle_recv(self): |
|
490 | def _handle_recv(self): | |
489 | msg = self.socket.recv_json() |
|
491 | ident,msg = self.session.recv(self.socket, 0) | |
490 | self.call_handlers(msg) |
|
492 | self.call_handlers(msg) | |
491 |
|
493 | |||
492 | def _handle_send(self): |
|
494 | def _handle_send(self): | |
@@ -495,7 +497,7 b' class RepSocketChannel(ZmqSocketChannel):' | |||||
495 | except Empty: |
|
497 | except Empty: | |
496 | pass |
|
498 | pass | |
497 | else: |
|
499 | else: | |
498 |
self.socket |
|
500 | self.session.send(self.socket,msg) | |
499 | if self.msg_queue.empty(): |
|
501 | if self.msg_queue.empty(): | |
500 | self.drop_io_state(POLLOUT) |
|
502 | self.drop_io_state(POLLOUT) | |
501 |
|
503 | |||
@@ -546,7 +548,7 b' class HBSocketChannel(ZmqSocketChannel):' | |||||
546 | request_time = time.time() |
|
548 | request_time = time.time() | |
547 | try: |
|
549 | try: | |
548 | #io.rprint('Ping from HB channel') # dbg |
|
550 | #io.rprint('Ping from HB channel') # dbg | |
549 |
self.socket.send |
|
551 | self.socket.send(b'ping') | |
550 | except zmq.ZMQError, e: |
|
552 | except zmq.ZMQError, e: | |
551 | #io.rprint('*** HB Error:', e) # dbg |
|
553 | #io.rprint('*** HB Error:', e) # dbg | |
552 | if e.errno == zmq.EFSM: |
|
554 | if e.errno == zmq.EFSM: | |
@@ -558,7 +560,7 b' class HBSocketChannel(ZmqSocketChannel):' | |||||
558 | else: |
|
560 | else: | |
559 | while True: |
|
561 | while True: | |
560 | try: |
|
562 | try: | |
561 |
self.socket.recv |
|
563 | self.socket.recv(zmq.NOBLOCK) | |
562 | except zmq.ZMQError, e: |
|
564 | except zmq.ZMQError, e: | |
563 | #io.rprint('*** HB Error 2:', e) # dbg |
|
565 | #io.rprint('*** HB Error 2:', e) # dbg | |
564 | if e.errno == zmq.EAGAIN: |
|
566 | if e.errno == zmq.EAGAIN: |
@@ -69,9 +69,8 b' class Kernel(HasTraits):' | |||||
69 | """ Start the kernel main loop. |
|
69 | """ Start the kernel main loop. | |
70 | """ |
|
70 | """ | |
71 | while True: |
|
71 | while True: | |
72 |
ident = self. |
|
72 | ident,msg = self.session.recv(self.reply_socket,0) | |
73 |
assert |
|
73 | assert ident is not None, "Missing message part." | |
74 | msg = self.reply_socket.recv_json() |
|
|||
75 | omsg = Message(msg) |
|
74 | omsg = Message(msg) | |
76 | print>>sys.__stdout__ |
|
75 | print>>sys.__stdout__ | |
77 | print>>sys.__stdout__, omsg |
|
76 | print>>sys.__stdout__, omsg | |
@@ -105,8 +104,7 b' class Kernel(HasTraits):' | |||||
105 | print>>sys.__stderr__, "Got bad msg: " |
|
104 | print>>sys.__stderr__, "Got bad msg: " | |
106 | print>>sys.__stderr__, Message(parent) |
|
105 | print>>sys.__stderr__, Message(parent) | |
107 | return |
|
106 | return | |
108 |
pyin_msg = self.session. |
|
107 | pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) | |
109 | self.pub_socket.send_json(pyin_msg) |
|
|||
110 |
|
108 | |||
111 | try: |
|
109 | try: | |
112 | comp_code = self.compiler(code, '<zmq-kernel>') |
|
110 | comp_code = self.compiler(code, '<zmq-kernel>') | |
@@ -131,8 +129,7 b' class Kernel(HasTraits):' | |||||
131 | u'ename' : unicode(etype.__name__), |
|
129 | u'ename' : unicode(etype.__name__), | |
132 | u'evalue' : unicode(evalue) |
|
130 | u'evalue' : unicode(evalue) | |
133 | } |
|
131 | } | |
134 |
exc_msg = self.session. |
|
132 | exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent) | |
135 | self.pub_socket.send_json(exc_msg) |
|
|||
136 | reply_content = exc_content |
|
133 | reply_content = exc_content | |
137 | else: |
|
134 | else: | |
138 | reply_content = { 'status' : 'ok', 'payload' : {} } |
|
135 | reply_content = { 'status' : 'ok', 'payload' : {} } | |
@@ -142,10 +139,8 b' class Kernel(HasTraits):' | |||||
142 | sys.stdout.flush() |
|
139 | sys.stdout.flush() | |
143 |
|
140 | |||
144 | # Send the reply. |
|
141 | # Send the reply. | |
145 |
reply_msg = self.session. |
|
142 | reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) | |
146 | print>>sys.__stdout__, Message(reply_msg) |
|
143 | print>>sys.__stdout__, Message(reply_msg) | |
147 | self.reply_socket.send(ident, zmq.SNDMORE) |
|
|||
148 | self.reply_socket.send_json(reply_msg) |
|
|||
149 | if reply_msg['content']['status'] == u'error': |
|
144 | if reply_msg['content']['status'] == u'error': | |
150 | self._abort_queue() |
|
145 | self._abort_queue() | |
151 |
|
146 | |||
@@ -180,21 +175,18 b' class Kernel(HasTraits):' | |||||
180 | def _abort_queue(self): |
|
175 | def _abort_queue(self): | |
181 | while True: |
|
176 | while True: | |
182 | try: |
|
177 | try: | |
183 |
ident = self. |
|
178 | ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) | |
184 | except zmq.ZMQError, e: |
|
179 | except zmq.ZMQError, e: | |
185 | if e.errno == zmq.EAGAIN: |
|
180 | if e.errno == zmq.EAGAIN: | |
186 | break |
|
181 | break | |
187 | else: |
|
182 | else: | |
188 |
assert |
|
183 | assert ident is not None, "Missing message part." | |
189 | msg = self.reply_socket.recv_json() |
|
|||
190 | print>>sys.__stdout__, "Aborting:" |
|
184 | print>>sys.__stdout__, "Aborting:" | |
191 | print>>sys.__stdout__, Message(msg) |
|
185 | print>>sys.__stdout__, Message(msg) | |
192 | msg_type = msg['msg_type'] |
|
186 | msg_type = msg['msg_type'] | |
193 | reply_type = msg_type.split('_')[0] + '_reply' |
|
187 | reply_type = msg_type.split('_')[0] + '_reply' | |
194 |
reply_msg = self.session. |
|
188 | reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident) | |
195 | print>>sys.__stdout__, Message(reply_msg) |
|
189 | print>>sys.__stdout__, Message(reply_msg) | |
196 | self.reply_socket.send(ident,zmq.SNDMORE) |
|
|||
197 | self.reply_socket.send_json(reply_msg) |
|
|||
198 | # We need to wait a bit for requests to come in. This can probably |
|
190 | # We need to wait a bit for requests to come in. This can probably | |
199 | # be set shorter for true asynchronous clients. |
|
191 | # be set shorter for true asynchronous clients. | |
200 | time.sleep(0.1) |
|
192 | time.sleep(0.1) | |
@@ -206,11 +198,10 b' class Kernel(HasTraits):' | |||||
206 |
|
198 | |||
207 | # Send the input request. |
|
199 | # Send the input request. | |
208 | content = dict(prompt=prompt) |
|
200 | content = dict(prompt=prompt) | |
209 |
msg = self.session. |
|
201 | msg = self.session.send(self.req_socket, u'input_request', content, parent) | |
210 | self.req_socket.send_json(msg) |
|
|||
211 |
|
202 | |||
212 | # Await a response. |
|
203 | # Await a response. | |
213 |
reply = self.req_socket |
|
204 | ident,reply = self.session.recv(self.req_socket, 0) | |
214 | try: |
|
205 | try: | |
215 | value = reply['content']['value'] |
|
206 | value = reply['content']['value'] | |
216 | except: |
|
207 | except: |
@@ -4,6 +4,8 b' import pprint' | |||||
4 |
|
4 | |||
5 | import zmq |
|
5 | import zmq | |
6 |
|
6 | |||
|
7 | from zmq.utils import jsonapi as json | |||
|
8 | ||||
7 | class Message(object): |
|
9 | class Message(object): | |
8 | """A simple message object that maps dict keys to attributes. |
|
10 | """A simple message object that maps dict keys to attributes. | |
9 |
|
11 | |||
@@ -86,24 +88,35 b' class Session(object):' | |||||
86 | return msg |
|
88 | return msg | |
87 |
|
89 | |||
88 | def send(self, socket, msg_type, content=None, parent=None, ident=None): |
|
90 | def send(self, socket, msg_type, content=None, parent=None, ident=None): | |
89 | msg = self.msg(msg_type, content, parent) |
|
91 | if isinstance(msg_type, (Message, dict)): | |
|
92 | msg = dict(msg_type) | |||
|
93 | else: | |||
|
94 | msg = self.msg(msg_type, content, parent) | |||
90 | if ident is not None: |
|
95 | if ident is not None: | |
91 | socket.send(ident, zmq.SNDMORE) |
|
96 | socket.send(ident, zmq.SNDMORE) | |
92 | socket.send_json(msg) |
|
97 | socket.send_json(msg) | |
93 | omsg = Message(msg) |
|
98 | # omsg = Message(msg) | |
94 |
return |
|
99 | return msg | |
95 |
|
100 | |||
96 | def recv(self, socket, mode=zmq.NOBLOCK): |
|
101 | def recv(self, socket, mode=zmq.NOBLOCK): | |
97 | try: |
|
102 | try: | |
98 |
msg = socket.recv_ |
|
103 | msg = socket.recv_multipart(mode) | |
99 | except zmq.ZMQError, e: |
|
104 | except zmq.ZMQError, e: | |
100 | if e.errno == zmq.EAGAIN: |
|
105 | if e.errno == zmq.EAGAIN: | |
101 | # We can convert EAGAIN to None as we know in this case |
|
106 | # We can convert EAGAIN to None as we know in this case | |
102 | # recv_json won't return None. |
|
107 | # recv_json won't return None. | |
103 | return None |
|
108 | return None,None | |
104 | else: |
|
109 | else: | |
105 | raise |
|
110 | raise | |
106 | return Message(msg) |
|
111 | if len(msg) == 1: | |
|
112 | ident=None | |||
|
113 | msg = msg[0] | |||
|
114 | elif len(msg) == 2: | |||
|
115 | ident, msg = msg | |||
|
116 | else: | |||
|
117 | raise ValueError("Got message with length > 2, which is invalid") | |||
|
118 | ||||
|
119 | return ident, json.loads(msg) | |||
107 |
|
120 | |||
108 | def test_msg2obj(): |
|
121 | def test_msg2obj(): | |
109 | am = dict(x=1) |
|
122 | am = dict(x=1) |
@@ -71,7 +71,7 b' class ZMQDisplayHook(DisplayHook):' | |||||
71 |
|
71 | |||
72 | def finish_displayhook(self): |
|
72 | def finish_displayhook(self): | |
73 | """Finish up all displayhook activities.""" |
|
73 | """Finish up all displayhook activities.""" | |
74 |
self.pub_socket |
|
74 | self.session.send(self.pub_socket, self.msg) | |
75 | self.msg = None |
|
75 | self.msg = None | |
76 |
|
76 | |||
77 |
|
77 | |||
@@ -126,10 +126,9 b' class ZMQInteractiveShell(InteractiveShell):' | |||||
126 | } |
|
126 | } | |
127 |
|
127 | |||
128 | dh = self.displayhook |
|
128 | dh = self.displayhook | |
129 | exc_msg = dh.session.msg(u'pyerr', exc_content, dh.parent_header) |
|
|||
130 | # Send exception info over pub socket for other clients than the caller |
|
129 | # Send exception info over pub socket for other clients than the caller | |
131 | # to pick up |
|
130 | # to pick up | |
132 | dh.pub_socket.send_json(exc_msg) |
|
131 | exc_msg = dh.session.send(dh.pub_socket, u'pyerr', exc_content, dh.parent_header) | |
133 |
|
132 | |||
134 | # FIXME - Hack: store exception info in shell object. Right now, the |
|
133 | # FIXME - Hack: store exception info in shell object. Right now, the | |
135 | # caller is reading this info after the fact, we need to fix this logic |
|
134 | # caller is reading this info after the fact, we need to fix this logic |
General Comments 0
You need to be logged in to leave comments.
Login now