##// END OF EJS Templates
Merge branch 'minrk-session.send' into trunk
Brian Granger -
r3272:6d84fb9b merge
parent child Browse files
Show More
@@ -54,7 +54,8 b' class ClientCompleter(object):'
54 54
55 55 # Give the kernel up to 0.5s to respond
56 56 for i in range(5):
57 rep = self.session.recv(self.socket)
57 ident,rep = self.session.recv(self.socket)
58 rep = Message(rep)
58 59 if rep is not None and rep.msg_type == 'complete_reply':
59 60 matches = rep.content.matches
60 61 break
@@ -14,9 +14,8 b' class DisplayHook(object):'
14 14 return
15 15
16 16 __builtin__._ = obj
17 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
17 msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)},
18 18 parent=self.parent_header)
19 self.pub_socket.send_json(msg)
20 19
21 20 def set_parent(self, parent):
22 21 self.parent_header = extract_header(parent) No newline at end of file
@@ -92,10 +92,10 b' class Console(code.InteractiveConsole):'
92 92
93 93 def recv_output(self):
94 94 while True:
95 omsg = self.session.recv(self.sub_socket)
96 if omsg is None:
95 ident,msg = self.session.recv(self.sub_socket)
96 if msg is None:
97 97 break
98 self.handle_output(omsg)
98 self.handle_output(Message(msg))
99 99
100 100 def handle_reply(self, rep):
101 101 # Handle any side effects on output channels
@@ -114,9 +114,10 b' class Console(code.InteractiveConsole):'
114 114 print >> sys.stderr, ab
115 115
116 116 def recv_reply(self):
117 rep = self.session.recv(self.request_socket)
118 self.handle_reply(rep)
119 return rep
117 ident,rep = self.session.recv(self.request_socket)
118 mrep = Message(rep)
119 self.handle_reply(mrep)
120 return mrep
120 121
121 122 def runcode(self, code):
122 123 # We can't pickle code objects, so fetch the actual source
@@ -37,10 +37,9 b' class OutStream(object):'
37 37 data = self._buffer.getvalue()
38 38 if data:
39 39 content = {u'name':self.name, u'data':data}
40 msg = self.session.msg(u'stream', content=content,
40 msg = self.session.send(self.pub_socket, u'stream', content=content,
41 41 parent=self.parent_header)
42 42 io.raw_print(msg)
43 self.pub_socket.send_json(msg)
44 43
45 44 self._buffer.close()
46 45 self._new_buffer()
@@ -106,17 +106,14 b' class Kernel(Configurable):'
106 106 def do_one_iteration(self):
107 107 """Do one iteration of the kernel's evaluation loop.
108 108 """
109 try:
110 ident = self.reply_socket.recv(zmq.NOBLOCK)
111 except zmq.ZMQError, e:
112 if e.errno == zmq.EAGAIN:
113 return
114 else:
115 raise
109 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
110 if msg is None:
111 return
112
116 113 # This assert will raise in versions of zeromq 2.0.7 and lesser.
117 114 # We now require 2.0.8 or above, so we can uncomment for safety.
118 assert self.reply_socket.rcvmore(), "Missing message part."
119 msg = self.reply_socket.recv_json()
115 # print(ident,msg, file=sys.__stdout__)
116 assert ident is not None, "Missing message part."
120 117
121 118 # Print some info about this message and leave a '--->' marker, so it's
122 119 # easier to trace visually the message chain when debugging. Each
@@ -169,17 +166,15 b' class Kernel(Configurable):'
169 166 def _publish_pyin(self, code, parent):
170 167 """Publish the code request on the pyin stream."""
171 168
172 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
173 self.pub_socket.send_json(pyin_msg)
169 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
174 170
175 171 def execute_request(self, ident, parent):
176 172
177 status_msg = self.session.msg(
173 status_msg = self.session.send(self.pub_socket,
178 174 u'status',
179 175 {u'execution_state':u'busy'},
180 176 parent=parent
181 177 )
182 self.pub_socket.send_json(status_msg)
183 178
184 179 try:
185 180 content = parent[u'content']
@@ -264,7 +259,7 b' class Kernel(Configurable):'
264 259 shell.payload_manager.clear_payload()
265 260
266 261 # Send the reply.
267 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
262 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
268 263 io.raw_print(reply_msg)
269 264
270 265 # Flush output before sending the reply.
@@ -276,17 +271,14 b' class Kernel(Configurable):'
276 271 if self._execute_sleep:
277 272 time.sleep(self._execute_sleep)
278 273
279 self.reply_socket.send(ident, zmq.SNDMORE)
280 self.reply_socket.send_json(reply_msg)
281 274 if reply_msg['content']['status'] == u'error':
282 275 self._abort_queue()
283 276
284 status_msg = self.session.msg(
277 status_msg = self.session.send(self.pub_socket,
285 278 u'status',
286 279 {u'execution_state':u'idle'},
287 280 parent=parent
288 281 )
289 self.pub_socket.send_json(status_msg)
290 282
291 283 def complete_request(self, ident, parent):
292 284 txt, matches = self._complete(parent)
@@ -335,22 +327,18 b' class Kernel(Configurable):'
335 327
336 328 def _abort_queue(self):
337 329 while True:
338 try:
339 ident = self.reply_socket.recv(zmq.NOBLOCK)
340 except zmq.ZMQError, e:
341 if e.errno == zmq.EAGAIN:
342 break
330 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
331 if msg is None:
332 break
343 333 else:
344 assert self.reply_socket.rcvmore(), \
334 assert ident is not None, \
345 335 "Unexpected missing message part."
346 msg = self.reply_socket.recv_json()
347 336 io.raw_print("Aborting:\n", Message(msg))
348 337 msg_type = msg['msg_type']
349 338 reply_type = msg_type.split('_')[0] + '_reply'
350 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
339 reply_msg = self.session.send(self.reply_socket, reply_type,
340 {'status' : 'aborted'}, msg, ident=ident)
351 341 io.raw_print(reply_msg)
352 self.reply_socket.send(ident,zmq.SNDMORE)
353 self.reply_socket.send_json(reply_msg)
354 342 # We need to wait a bit for requests to come in. This can probably
355 343 # be set shorter for true asynchronous clients.
356 344 time.sleep(0.1)
@@ -362,11 +350,10 b' class Kernel(Configurable):'
362 350
363 351 # Send the input request.
364 352 content = dict(prompt=prompt)
365 msg = self.session.msg(u'input_request', content, parent)
366 self.req_socket.send_json(msg)
353 msg = self.session.send(self.req_socket, u'input_request', content, parent)
367 354
368 355 # Await a response.
369 reply = self.req_socket.recv_json()
356 ident, reply = self.session.recv(self.req_socket, 0)
370 357 try:
371 358 value = reply['content']['value']
372 359 except:
@@ -423,8 +410,8 b' class Kernel(Configurable):'
423 410 """
424 411 # io.rprint("Kernel at_shutdown") # dbg
425 412 if self._shutdown_message is not None:
426 self.reply_socket.send_json(self._shutdown_message)
427 self.pub_socket.send_json(self._shutdown_message)
413 self.session.send(self.reply_socket, self._shutdown_message)
414 self.session.send(self.pub_socket, self._shutdown_message)
428 415 io.raw_print(self._shutdown_message)
429 416 # A very short sleep to give zmq time to flush its message buffers
430 417 # before Python truly shuts down.
@@ -33,7 +33,7 b' from zmq.eventloop import ioloop'
33 33 from IPython.utils import io
34 34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 35 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 from session import Session
36 from session import Session, Message
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Constants and exceptions
@@ -330,7 +330,7 b' class XReqSocketChannel(ZmqSocketChannel):'
330 330 self._handle_recv()
331 331
332 332 def _handle_recv(self):
333 msg = self.socket.recv_json()
333 ident,msg = self.session.recv(self.socket, 0)
334 334 self.call_handlers(msg)
335 335
336 336 def _handle_send(self):
@@ -339,7 +339,7 b' class XReqSocketChannel(ZmqSocketChannel):'
339 339 except Empty:
340 340 pass
341 341 else:
342 self.socket.send_json(msg)
342 self.session.send(self.socket,msg)
343 343 if self.command_queue.empty():
344 344 self.drop_io_state(POLLOUT)
345 345
@@ -424,12 +424,14 b' class SubSocketChannel(ZmqSocketChannel):'
424 424 # Get all of the messages we can
425 425 while True:
426 426 try:
427 msg = self.socket.recv_json(zmq.NOBLOCK)
427 ident,msg = self.session.recv(self.socket)
428 428 except zmq.ZMQError:
429 429 # Check the errno?
430 430 # Will this trigger POLLERR?
431 431 break
432 432 else:
433 if msg is None:
434 break
433 435 self.call_handlers(msg)
434 436
435 437 def _flush(self):
@@ -486,7 +488,7 b' class RepSocketChannel(ZmqSocketChannel):'
486 488 self._handle_recv()
487 489
488 490 def _handle_recv(self):
489 msg = self.socket.recv_json()
491 ident,msg = self.session.recv(self.socket, 0)
490 492 self.call_handlers(msg)
491 493
492 494 def _handle_send(self):
@@ -495,7 +497,7 b' class RepSocketChannel(ZmqSocketChannel):'
495 497 except Empty:
496 498 pass
497 499 else:
498 self.socket.send_json(msg)
500 self.session.send(self.socket,msg)
499 501 if self.msg_queue.empty():
500 502 self.drop_io_state(POLLOUT)
501 503
@@ -546,7 +548,7 b' class HBSocketChannel(ZmqSocketChannel):'
546 548 request_time = time.time()
547 549 try:
548 550 #io.rprint('Ping from HB channel') # dbg
549 self.socket.send_json('ping')
551 self.socket.send(b'ping')
550 552 except zmq.ZMQError, e:
551 553 #io.rprint('*** HB Error:', e) # dbg
552 554 if e.errno == zmq.EFSM:
@@ -558,7 +560,7 b' class HBSocketChannel(ZmqSocketChannel):'
558 560 else:
559 561 while True:
560 562 try:
561 self.socket.recv_json(zmq.NOBLOCK)
563 self.socket.recv(zmq.NOBLOCK)
562 564 except zmq.ZMQError, e:
563 565 #io.rprint('*** HB Error 2:', e) # dbg
564 566 if e.errno == zmq.EAGAIN:
@@ -69,9 +69,8 b' class Kernel(HasTraits):'
69 69 """ Start the kernel main loop.
70 70 """
71 71 while True:
72 ident = self.reply_socket.recv()
73 assert self.reply_socket.rcvmore(), "Missing message part."
74 msg = self.reply_socket.recv_json()
72 ident,msg = self.session.recv(self.reply_socket,0)
73 assert ident is not None, "Missing message part."
75 74 omsg = Message(msg)
76 75 print>>sys.__stdout__
77 76 print>>sys.__stdout__, omsg
@@ -105,8 +104,7 b' class Kernel(HasTraits):'
105 104 print>>sys.__stderr__, "Got bad msg: "
106 105 print>>sys.__stderr__, Message(parent)
107 106 return
108 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
109 self.pub_socket.send_json(pyin_msg)
107 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
110 108
111 109 try:
112 110 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -131,8 +129,7 b' class Kernel(HasTraits):'
131 129 u'ename' : unicode(etype.__name__),
132 130 u'evalue' : unicode(evalue)
133 131 }
134 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
135 self.pub_socket.send_json(exc_msg)
132 exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent)
136 133 reply_content = exc_content
137 134 else:
138 135 reply_content = { 'status' : 'ok', 'payload' : {} }
@@ -142,10 +139,8 b' class Kernel(HasTraits):'
142 139 sys.stdout.flush()
143 140
144 141 # Send the reply.
145 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
142 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
146 143 print>>sys.__stdout__, Message(reply_msg)
147 self.reply_socket.send(ident, zmq.SNDMORE)
148 self.reply_socket.send_json(reply_msg)
149 144 if reply_msg['content']['status'] == u'error':
150 145 self._abort_queue()
151 146
@@ -180,21 +175,18 b' class Kernel(HasTraits):'
180 175 def _abort_queue(self):
181 176 while True:
182 177 try:
183 ident = self.reply_socket.recv(zmq.NOBLOCK)
178 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
184 179 except zmq.ZMQError, e:
185 180 if e.errno == zmq.EAGAIN:
186 181 break
187 182 else:
188 assert self.reply_socket.rcvmore(), "Missing message part."
189 msg = self.reply_socket.recv_json()
183 assert ident is not None, "Missing message part."
190 184 print>>sys.__stdout__, "Aborting:"
191 185 print>>sys.__stdout__, Message(msg)
192 186 msg_type = msg['msg_type']
193 187 reply_type = msg_type.split('_')[0] + '_reply'
194 reply_msg = self.session.msg(reply_type, {'status':'aborted'}, msg)
188 reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
195 189 print>>sys.__stdout__, Message(reply_msg)
196 self.reply_socket.send(ident,zmq.SNDMORE)
197 self.reply_socket.send_json(reply_msg)
198 190 # We need to wait a bit for requests to come in. This can probably
199 191 # be set shorter for true asynchronous clients.
200 192 time.sleep(0.1)
@@ -206,11 +198,10 b' class Kernel(HasTraits):'
206 198
207 199 # Send the input request.
208 200 content = dict(prompt=prompt)
209 msg = self.session.msg(u'input_request', content, parent)
210 self.req_socket.send_json(msg)
201 msg = self.session.send(self.req_socket, u'input_request', content, parent)
211 202
212 203 # Await a response.
213 reply = self.req_socket.recv_json()
204 ident,reply = self.session.recv(self.req_socket, 0)
214 205 try:
215 206 value = reply['content']['value']
216 207 except:
@@ -4,6 +4,8 b' import pprint'
4 4
5 5 import zmq
6 6
7 from zmq.utils import jsonapi as json
8
7 9 class Message(object):
8 10 """A simple message object that maps dict keys to attributes.
9 11
@@ -78,6 +80,10 b' class Session(object):'
78 80 return h
79 81
80 82 def msg(self, msg_type, content=None, parent=None):
83 """Construct a standard-form message, with a given type, content, and parent.
84
85 NOT to be called directly.
86 """
81 87 msg = {}
82 88 msg['header'] = self.msg_header()
83 89 msg['parent_header'] = {} if parent is None else extract_header(parent)
@@ -85,25 +91,81 b' class Session(object):'
85 91 msg['content'] = {} if content is None else content
86 92 return msg
87 93
88 def send(self, socket, msg_type, content=None, parent=None, ident=None):
89 msg = self.msg(msg_type, content, parent)
94 def send(self, socket, msg_or_type, content=None, parent=None, ident=None):
95 """send a message via a socket, using a uniform message pattern.
96
97 Parameters
98 ----------
99 socket : zmq.Socket
100 The socket on which to send.
101 msg_or_type : Message/dict or str
102 if str : then a new message will be constructed from content,parent
103 if Message/dict : then content and parent are ignored, and the message
104 is sent. This is only for use when sending a Message for a second time.
105 content : dict, optional
106 The contents of the message
107 parent : dict, optional
108 The parent header, or parent message, of this message
109 ident : bytes, optional
110 The zmq.IDENTITY prefix of the destination.
111 Only for use on certain socket types.
112
113 Returns
114 -------
115 msg : dict
116 The message, as constructed by self.msg(msg_type,content,parent)
117 """
118 if isinstance(msg_type, (Message, dict)):
119 msg = dict(msg_type)
120 else:
121 msg = self.msg(msg_type, content, parent)
90 122 if ident is not None:
91 123 socket.send(ident, zmq.SNDMORE)
92 124 socket.send_json(msg)
93 omsg = Message(msg)
94 return omsg
95
125 return msg
126
96 127 def recv(self, socket, mode=zmq.NOBLOCK):
128 """recv a message on a socket.
129
130 Receive an optionally identity-prefixed message, as sent via session.send().
131
132 Parameters
133 ----------
134
135 socket : zmq.Socket
136 The socket on which to recv a message.
137 mode : int, optional
138 the mode flag passed to socket.recv
139 default: zmq.NOBLOCK
140
141 Returns
142 -------
143 (ident,msg) : tuple
144 always length 2. If no message received, then return is (None,None)
145 ident : bytes or None
146 the identity prefix is there was one, None otherwise.
147 msg : dict or None
148 The actual message. If mode==zmq.NOBLOCK and no message was waiting,
149 it will be None.
150 """
97 151 try:
98 msg = socket.recv_json(mode)
152 msg = socket.recv_multipart(mode)
99 153 except zmq.ZMQError, e:
100 154 if e.errno == zmq.EAGAIN:
101 155 # We can convert EAGAIN to None as we know in this case
102 156 # recv_json won't return None.
103 return None
157 return None,None
104 158 else:
105 159 raise
106 return Message(msg)
160 if len(msg) == 1:
161 ident=None
162 msg = msg[0]
163 elif len(msg) == 2:
164 ident, msg = msg
165 else:
166 raise ValueError("Got message with length > 2, which is invalid")
167
168 return ident, json.loads(msg)
107 169
108 170 def test_msg2obj():
109 171 am = dict(x=1)
@@ -71,7 +71,7 b' class ZMQDisplayHook(DisplayHook):'
71 71
72 72 def finish_displayhook(self):
73 73 """Finish up all displayhook activities."""
74 self.pub_socket.send_json(self.msg)
74 self.session.send(self.pub_socket, self.msg)
75 75 self.msg = None
76 76
77 77
@@ -126,10 +126,9 b' class ZMQInteractiveShell(InteractiveShell):'
126 126 }
127 127
128 128 dh = self.displayhook
129 exc_msg = dh.session.msg(u'pyerr', exc_content, dh.parent_header)
130 129 # Send exception info over pub socket for other clients than the caller
131 130 # to pick up
132 dh.pub_socket.send_json(exc_msg)
131 exc_msg = dh.session.send(dh.pub_socket, u'pyerr', exc_content, dh.parent_header)
133 132
134 133 # FIXME - Hack: store exception info in shell object. Right now, the
135 134 # caller is reading this info after the fact, we need to fix this logic
General Comments 0
You need to be logged in to leave comments. Login now