##// END OF EJS Templates
all sends/recvs now via Session.send/recv....
MinRK -
Show More
@@ -54,7 +54,8 b' class ClientCompleter(object):'
54
54
55 # Give the kernel up to 0.5s to respond
55 # Give the kernel up to 0.5s to respond
56 for i in range(5):
56 for i in range(5):
57 rep = self.session.recv(self.socket)
57 ident,rep = self.session.recv(self.socket)
58 rep = Message(rep)
58 if rep is not None and rep.msg_type == 'complete_reply':
59 if rep is not None and rep.msg_type == 'complete_reply':
59 matches = rep.content.matches
60 matches = rep.content.matches
60 break
61 break
@@ -14,9 +14,8 b' class DisplayHook(object):'
14 return
14 return
15
15
16 __builtin__._ = obj
16 __builtin__._ = obj
17 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
17 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
18 parent=self.parent_header)
19 self.pub_socket.send_json(msg)
20
19
21 def set_parent(self, parent):
20 def set_parent(self, parent):
22 self.parent_header = extract_header(parent) No newline at end of file
21 self.parent_header = extract_header(parent)
@@ -92,10 +92,10 b' class Console(code.InteractiveConsole):'
92
92
93 def recv_output(self):
93 def recv_output(self):
94 while True:
94 while True:
95 omsg = self.session.recv(self.sub_socket)
95 ident,msg = self.session.recv(self.sub_socket)
96 if omsg is None:
96 if msg is None:
97 break
97 break
98 self.handle_output(omsg)
98 self.handle_output(Message(msg))
99
99
100 def handle_reply(self, rep):
100 def handle_reply(self, rep):
101 # Handle any side effects on output channels
101 # Handle any side effects on output channels
@@ -114,9 +114,10 b' class Console(code.InteractiveConsole):'
114 print >> sys.stderr, ab
114 print >> sys.stderr, ab
115
115
116 def recv_reply(self):
116 def recv_reply(self):
117 rep = self.session.recv(self.request_socket)
117 ident,rep = self.session.recv(self.request_socket)
118 self.handle_reply(rep)
118 mrep = Message(rep)
119 return rep
119 self.handle_reply(mrep)
120 return mrep
120
121
121 def runcode(self, code):
122 def runcode(self, code):
122 # We can't pickle code objects, so fetch the actual source
123 # We can't pickle code objects, so fetch the actual source
@@ -37,10 +37,9 b' class OutStream(object):'
37 data = self._buffer.getvalue()
37 data = self._buffer.getvalue()
38 if data:
38 if data:
39 content = {u'name':self.name, u'data':data}
39 content = {u'name':self.name, u'data':data}
40 msg = self.session.msg(u'stream', content=content,
40 msg = self.session.send(self.pub_socket, u'stream', content=content,
41 parent=self.parent_header)
41 parent=self.parent_header)
42 io.raw_print(msg)
42 io.raw_print(msg)
43 self.pub_socket.send_json(msg)
44
43
45 self._buffer.close()
44 self._buffer.close()
46 self._new_buffer()
45 self._new_buffer()
@@ -106,17 +106,14 b' class Kernel(Configurable):'
106 def do_one_iteration(self):
106 def do_one_iteration(self):
107 """Do one iteration of the kernel's evaluation loop.
107 """Do one iteration of the kernel's evaluation loop.
108 """
108 """
109 try:
109 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
110 ident = self.reply_socket.recv(zmq.NOBLOCK)
110 if msg is None:
111 except zmq.ZMQError, e:
112 if e.errno == zmq.EAGAIN:
113 return
111 return
114 else:
112
115 raise
116 # This assert will raise in versions of zeromq 2.0.7 and lesser.
113 # This assert will raise in versions of zeromq 2.0.7 and lesser.
117 # We now require 2.0.8 or above, so we can uncomment for safety.
114 # We now require 2.0.8 or above, so we can uncomment for safety.
118 assert self.reply_socket.rcvmore(), "Missing message part."
115 # print(ident,msg, file=sys.__stdout__)
119 msg = self.reply_socket.recv_json()
116 assert ident is not None, "Missing message part."
120
117
121 # Print some info about this message and leave a '--->' marker, so it's
118 # Print some info about this message and leave a '--->' marker, so it's
122 # easier to trace visually the message chain when debugging. Each
119 # easier to trace visually the message chain when debugging. Each
@@ -169,17 +166,15 b' class Kernel(Configurable):'
169 def _publish_pyin(self, code, parent):
166 def _publish_pyin(self, code, parent):
170 """Publish the code request on the pyin stream."""
167 """Publish the code request on the pyin stream."""
171
168
172 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
169 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
173 self.pub_socket.send_json(pyin_msg)
174
170
175 def execute_request(self, ident, parent):
171 def execute_request(self, ident, parent):
176
172
177 status_msg = self.session.msg(
173 status_msg = self.session.send(self.pub_socket,
178 u'status',
174 u'status',
179 {u'execution_state':u'busy'},
175 {u'execution_state':u'busy'},
180 parent=parent
176 parent=parent
181 )
177 )
182 self.pub_socket.send_json(status_msg)
183
178
184 try:
179 try:
185 content = parent[u'content']
180 content = parent[u'content']
@@ -264,7 +259,7 b' class Kernel(Configurable):'
264 shell.payload_manager.clear_payload()
259 shell.payload_manager.clear_payload()
265
260
266 # Send the reply.
261 # Send the reply.
267 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
262 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
268 io.raw_print(reply_msg)
263 io.raw_print(reply_msg)
269
264
270 # Flush output before sending the reply.
265 # Flush output before sending the reply.
@@ -276,17 +271,14 b' class Kernel(Configurable):'
276 if self._execute_sleep:
271 if self._execute_sleep:
277 time.sleep(self._execute_sleep)
272 time.sleep(self._execute_sleep)
278
273
279 self.reply_socket.send(ident, zmq.SNDMORE)
280 self.reply_socket.send_json(reply_msg)
281 if reply_msg['content']['status'] == u'error':
274 if reply_msg['content']['status'] == u'error':
282 self._abort_queue()
275 self._abort_queue()
283
276
284 status_msg = self.session.msg(
277 status_msg = self.session.send(self.pub_socket,
285 u'status',
278 u'status',
286 {u'execution_state':u'idle'},
279 {u'execution_state':u'idle'},
287 parent=parent
280 parent=parent
288 )
281 )
289 self.pub_socket.send_json(status_msg)
290
282
291 def complete_request(self, ident, parent):
283 def complete_request(self, ident, parent):
292 txt, matches = self._complete(parent)
284 txt, matches = self._complete(parent)
@@ -335,22 +327,18 b' class Kernel(Configurable):'
335
327
336 def _abort_queue(self):
328 def _abort_queue(self):
337 while True:
329 while True:
338 try:
330 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
339 ident = self.reply_socket.recv(zmq.NOBLOCK)
331 if msg is None:
340 except zmq.ZMQError, e:
341 if e.errno == zmq.EAGAIN:
342 break
332 break
343 else:
333 else:
344 assert self.reply_socket.rcvmore(), \
334 assert ident is not None, \
345 "Unexpected missing message part."
335 "Unexpected missing message part."
346 msg = self.reply_socket.recv_json()
347 io.raw_print("Aborting:\n", Message(msg))
336 io.raw_print("Aborting:\n", Message(msg))
348 msg_type = msg['msg_type']
337 msg_type = msg['msg_type']
349 reply_type = msg_type.split('_')[0] + '_reply'
338 reply_type = msg_type.split('_')[0] + '_reply'
350 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
339 reply_msg = self.session.send(self.reply_socket, reply_type,
340 {'status' : 'aborted'}, msg, ident=ident)
351 io.raw_print(reply_msg)
341 io.raw_print(reply_msg)
352 self.reply_socket.send(ident,zmq.SNDMORE)
353 self.reply_socket.send_json(reply_msg)
354 # We need to wait a bit for requests to come in. This can probably
342 # We need to wait a bit for requests to come in. This can probably
355 # be set shorter for true asynchronous clients.
343 # be set shorter for true asynchronous clients.
356 time.sleep(0.1)
344 time.sleep(0.1)
@@ -362,11 +350,10 b' class Kernel(Configurable):'
362
350
363 # Send the input request.
351 # Send the input request.
364 content = dict(prompt=prompt)
352 content = dict(prompt=prompt)
365 msg = self.session.msg(u'input_request', content, parent)
353 msg = self.session.send(self.req_socket, u'input_request', content, parent)
366 self.req_socket.send_json(msg)
367
354
368 # Await a response.
355 # Await a response.
369 reply = self.req_socket.recv_json()
356 ident, reply = self.session.recv(self.req_socket, 0)
370 try:
357 try:
371 value = reply['content']['value']
358 value = reply['content']['value']
372 except:
359 except:
@@ -423,8 +410,8 b' class Kernel(Configurable):'
423 """
410 """
424 # io.rprint("Kernel at_shutdown") # dbg
411 # io.rprint("Kernel at_shutdown") # dbg
425 if self._shutdown_message is not None:
412 if self._shutdown_message is not None:
426 self.reply_socket.send_json(self._shutdown_message)
413 self.session.send(self.reply_socket, self._shutdown_message)
427 self.pub_socket.send_json(self._shutdown_message)
414 self.session.send(self.pub_socket, self._shutdown_message)
428 io.raw_print(self._shutdown_message)
415 io.raw_print(self._shutdown_message)
429 # A very short sleep to give zmq time to flush its message buffers
416 # A very short sleep to give zmq time to flush its message buffers
430 # before Python truly shuts down.
417 # before Python truly shuts down.
@@ -33,7 +33,7 b' from zmq.eventloop import ioloop'
33 from IPython.utils import io
33 from IPython.utils import io
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
35 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 from session import Session
36 from session import Session, Message
37
37
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39 # Constants and exceptions
39 # Constants and exceptions
@@ -330,7 +330,7 b' class XReqSocketChannel(ZmqSocketChannel):'
330 self._handle_recv()
330 self._handle_recv()
331
331
332 def _handle_recv(self):
332 def _handle_recv(self):
333 msg = self.socket.recv_json()
333 ident,msg = self.session.recv(self.socket, 0)
334 self.call_handlers(msg)
334 self.call_handlers(msg)
335
335
336 def _handle_send(self):
336 def _handle_send(self):
@@ -339,7 +339,7 b' class XReqSocketChannel(ZmqSocketChannel):'
339 except Empty:
339 except Empty:
340 pass
340 pass
341 else:
341 else:
342 self.socket.send_json(msg)
342 self.session.send(self.socket,msg)
343 if self.command_queue.empty():
343 if self.command_queue.empty():
344 self.drop_io_state(POLLOUT)
344 self.drop_io_state(POLLOUT)
345
345
@@ -424,12 +424,14 b' class SubSocketChannel(ZmqSocketChannel):'
424 # Get all of the messages we can
424 # Get all of the messages we can
425 while True:
425 while True:
426 try:
426 try:
427 msg = self.socket.recv_json(zmq.NOBLOCK)
427 ident,msg = self.session.recv(self.socket)
428 except zmq.ZMQError:
428 except zmq.ZMQError:
429 # Check the errno?
429 # Check the errno?
430 # Will this trigger POLLERR?
430 # Will this trigger POLLERR?
431 break
431 break
432 else:
432 else:
433 if msg is None:
434 break
433 self.call_handlers(msg)
435 self.call_handlers(msg)
434
436
435 def _flush(self):
437 def _flush(self):
@@ -486,7 +488,7 b' class RepSocketChannel(ZmqSocketChannel):'
486 self._handle_recv()
488 self._handle_recv()
487
489
488 def _handle_recv(self):
490 def _handle_recv(self):
489 msg = self.socket.recv_json()
491 ident,msg = self.session.recv(self.socket, 0)
490 self.call_handlers(msg)
492 self.call_handlers(msg)
491
493
492 def _handle_send(self):
494 def _handle_send(self):
@@ -495,7 +497,7 b' class RepSocketChannel(ZmqSocketChannel):'
495 except Empty:
497 except Empty:
496 pass
498 pass
497 else:
499 else:
498 self.socket.send_json(msg)
500 self.session.send(self.socket,msg)
499 if self.msg_queue.empty():
501 if self.msg_queue.empty():
500 self.drop_io_state(POLLOUT)
502 self.drop_io_state(POLLOUT)
501
503
@@ -546,7 +548,7 b' class HBSocketChannel(ZmqSocketChannel):'
546 request_time = time.time()
548 request_time = time.time()
547 try:
549 try:
548 #io.rprint('Ping from HB channel') # dbg
550 #io.rprint('Ping from HB channel') # dbg
549 self.socket.send_json('ping')
551 self.socket.send(b'ping')
550 except zmq.ZMQError, e:
552 except zmq.ZMQError, e:
551 #io.rprint('*** HB Error:', e) # dbg
553 #io.rprint('*** HB Error:', e) # dbg
552 if e.errno == zmq.EFSM:
554 if e.errno == zmq.EFSM:
@@ -558,7 +560,7 b' class HBSocketChannel(ZmqSocketChannel):'
558 else:
560 else:
559 while True:
561 while True:
560 try:
562 try:
561 self.socket.recv_json(zmq.NOBLOCK)
563 self.socket.recv(zmq.NOBLOCK)
562 except zmq.ZMQError, e:
564 except zmq.ZMQError, e:
563 #io.rprint('*** HB Error 2:', e) # dbg
565 #io.rprint('*** HB Error 2:', e) # dbg
564 if e.errno == zmq.EAGAIN:
566 if e.errno == zmq.EAGAIN:
@@ -69,9 +69,8 b' class Kernel(HasTraits):'
69 """ Start the kernel main loop.
69 """ Start the kernel main loop.
70 """
70 """
71 while True:
71 while True:
72 ident = self.reply_socket.recv()
72 ident,msg = self.session.recv(self.reply_socket,0)
73 assert self.reply_socket.rcvmore(), "Missing message part."
73 assert ident is not None, "Missing message part."
74 msg = self.reply_socket.recv_json()
75 omsg = Message(msg)
74 omsg = Message(msg)
76 print>>sys.__stdout__
75 print>>sys.__stdout__
77 print>>sys.__stdout__, omsg
76 print>>sys.__stdout__, omsg
@@ -105,8 +104,7 b' class Kernel(HasTraits):'
105 print>>sys.__stderr__, "Got bad msg: "
104 print>>sys.__stderr__, "Got bad msg: "
106 print>>sys.__stderr__, Message(parent)
105 print>>sys.__stderr__, Message(parent)
107 return
106 return
108 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
107 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
109 self.pub_socket.send_json(pyin_msg)
110
108
111 try:
109 try:
112 comp_code = self.compiler(code, '<zmq-kernel>')
110 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -131,8 +129,7 b' class Kernel(HasTraits):'
131 u'ename' : unicode(etype.__name__),
129 u'ename' : unicode(etype.__name__),
132 u'evalue' : unicode(evalue)
130 u'evalue' : unicode(evalue)
133 }
131 }
134 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
132 exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent)
135 self.pub_socket.send_json(exc_msg)
136 reply_content = exc_content
133 reply_content = exc_content
137 else:
134 else:
138 reply_content = { 'status' : 'ok', 'payload' : {} }
135 reply_content = { 'status' : 'ok', 'payload' : {} }
@@ -142,10 +139,8 b' class Kernel(HasTraits):'
142 sys.stdout.flush()
139 sys.stdout.flush()
143
140
144 # Send the reply.
141 # Send the reply.
145 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
142 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
146 print>>sys.__stdout__, Message(reply_msg)
143 print>>sys.__stdout__, Message(reply_msg)
147 self.reply_socket.send(ident, zmq.SNDMORE)
148 self.reply_socket.send_json(reply_msg)
149 if reply_msg['content']['status'] == u'error':
144 if reply_msg['content']['status'] == u'error':
150 self._abort_queue()
145 self._abort_queue()
151
146
@@ -180,21 +175,18 b' class Kernel(HasTraits):'
180 def _abort_queue(self):
175 def _abort_queue(self):
181 while True:
176 while True:
182 try:
177 try:
183 ident = self.reply_socket.recv(zmq.NOBLOCK)
178 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
184 except zmq.ZMQError, e:
179 except zmq.ZMQError, e:
185 if e.errno == zmq.EAGAIN:
180 if e.errno == zmq.EAGAIN:
186 break
181 break
187 else:
182 else:
188 assert self.reply_socket.rcvmore(), "Missing message part."
183 assert ident is not None, "Missing message part."
189 msg = self.reply_socket.recv_json()
190 print>>sys.__stdout__, "Aborting:"
184 print>>sys.__stdout__, "Aborting:"
191 print>>sys.__stdout__, Message(msg)
185 print>>sys.__stdout__, Message(msg)
192 msg_type = msg['msg_type']
186 msg_type = msg['msg_type']
193 reply_type = msg_type.split('_')[0] + '_reply'
187 reply_type = msg_type.split('_')[0] + '_reply'
194 reply_msg = self.session.msg(reply_type, {'status':'aborted'}, msg)
188 reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
195 print>>sys.__stdout__, Message(reply_msg)
189 print>>sys.__stdout__, Message(reply_msg)
196 self.reply_socket.send(ident,zmq.SNDMORE)
197 self.reply_socket.send_json(reply_msg)
198 # We need to wait a bit for requests to come in. This can probably
190 # We need to wait a bit for requests to come in. This can probably
199 # be set shorter for true asynchronous clients.
191 # be set shorter for true asynchronous clients.
200 time.sleep(0.1)
192 time.sleep(0.1)
@@ -206,11 +198,10 b' class Kernel(HasTraits):'
206
198
207 # Send the input request.
199 # Send the input request.
208 content = dict(prompt=prompt)
200 content = dict(prompt=prompt)
209 msg = self.session.msg(u'input_request', content, parent)
201 msg = self.session.send(self.req_socket, u'input_request', content, parent)
210 self.req_socket.send_json(msg)
211
202
212 # Await a response.
203 # Await a response.
213 reply = self.req_socket.recv_json()
204 ident,reply = self.session.recv(self.req_socket, 0)
214 try:
205 try:
215 value = reply['content']['value']
206 value = reply['content']['value']
216 except:
207 except:
@@ -4,6 +4,8 b' import pprint'
4
4
5 import zmq
5 import zmq
6
6
7 from zmq.utils import jsonapi as json
8
7 class Message(object):
9 class Message(object):
8 """A simple message object that maps dict keys to attributes.
10 """A simple message object that maps dict keys to attributes.
9
11
@@ -86,24 +88,35 b' class Session(object):'
86 return msg
88 return msg
87
89
88 def send(self, socket, msg_type, content=None, parent=None, ident=None):
90 def send(self, socket, msg_type, content=None, parent=None, ident=None):
91 if isinstance(msg_type, (Message, dict)):
92 msg = dict(msg_type)
93 else:
89 msg = self.msg(msg_type, content, parent)
94 msg = self.msg(msg_type, content, parent)
90 if ident is not None:
95 if ident is not None:
91 socket.send(ident, zmq.SNDMORE)
96 socket.send(ident, zmq.SNDMORE)
92 socket.send_json(msg)
97 socket.send_json(msg)
93 omsg = Message(msg)
98 # omsg = Message(msg)
94 return omsg
99 return msg
95
100
96 def recv(self, socket, mode=zmq.NOBLOCK):
101 def recv(self, socket, mode=zmq.NOBLOCK):
97 try:
102 try:
98 msg = socket.recv_json(mode)
103 msg = socket.recv_multipart(mode)
99 except zmq.ZMQError, e:
104 except zmq.ZMQError, e:
100 if e.errno == zmq.EAGAIN:
105 if e.errno == zmq.EAGAIN:
101 # We can convert EAGAIN to None as we know in this case
106 # We can convert EAGAIN to None as we know in this case
102 # recv_json won't return None.
107 # recv_json won't return None.
103 return None
108 return None,None
104 else:
109 else:
105 raise
110 raise
106 return Message(msg)
111 if len(msg) == 1:
112 ident=None
113 msg = msg[0]
114 elif len(msg) == 2:
115 ident, msg = msg
116 else:
117 raise ValueError("Got message with length > 2, which is invalid")
118
119 return ident, json.loads(msg)
107
120
108 def test_msg2obj():
121 def test_msg2obj():
109 am = dict(x=1)
122 am = dict(x=1)
@@ -71,7 +71,7 b' class ZMQDisplayHook(DisplayHook):'
71
71
72 def finish_displayhook(self):
72 def finish_displayhook(self):
73 """Finish up all displayhook activities."""
73 """Finish up all displayhook activities."""
74 self.pub_socket.send_json(self.msg)
74 self.session.send(self.pub_socket, self.msg)
75 self.msg = None
75 self.msg = None
76
76
77
77
@@ -126,10 +126,9 b' class ZMQInteractiveShell(InteractiveShell):'
126 }
126 }
127
127
128 dh = self.displayhook
128 dh = self.displayhook
129 exc_msg = dh.session.msg(u'pyerr', exc_content, dh.parent_header)
130 # Send exception info over pub socket for other clients than the caller
129 # Send exception info over pub socket for other clients than the caller
131 # to pick up
130 # to pick up
132 dh.pub_socket.send_json(exc_msg)
131 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', exc_content, dh.parent_header)
133
132
134 # FIXME - Hack: store exception info in shell object. Right now, the
133 # FIXME - Hack: store exception info in shell object. Right now, the
135 # caller is reading this info after the fact, we need to fix this logic
134 # caller is reading this info after the fact, we need to fix this logic
General Comments 0
You need to be logged in to leave comments. Login now