##// END OF EJS Templates
properly handle nothing to recv in pykernel._abort_queue...
MinRK -
Show More
@@ -1,304 +1,302 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 # Standard library imports.
17 # Standard library imports.
18 import __builtin__
18 import __builtin__
19 from code import CommandCompiler
19 from code import CommandCompiler
20 import sys
20 import sys
21 import time
21 import time
22 import traceback
22 import traceback
23
23
24 # System library imports.
24 # System library imports.
25 import zmq
25 import zmq
26
26
27 # Local imports.
27 # Local imports.
28 from IPython.utils.traitlets import HasTraits, Instance
28 from IPython.utils.traitlets import HasTraits, Instance
29 from completer import KernelCompleter
29 from completer import KernelCompleter
30 from entry_point import base_launch_kernel, make_default_main
30 from entry_point import base_launch_kernel, make_default_main
31 from session import Session, Message
31 from session import Session, Message
32
32
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34 # Main kernel class
34 # Main kernel class
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36
36
37 class Kernel(HasTraits):
37 class Kernel(HasTraits):
38
38
39 # Private interface
39 # Private interface
40
40
41 # This is a dict of port number that the kernel is listening on. It is set
41 # This is a dict of port number that the kernel is listening on. It is set
42 # by record_ports and used by connect_request.
42 # by record_ports and used by connect_request.
43 _recorded_ports = None
43 _recorded_ports = None
44
44
45 #---------------------------------------------------------------------------
45 #---------------------------------------------------------------------------
46 # Kernel interface
46 # Kernel interface
47 #---------------------------------------------------------------------------
47 #---------------------------------------------------------------------------
48
48
49 session = Instance(Session)
49 session = Instance(Session)
50 reply_socket = Instance('zmq.Socket')
50 reply_socket = Instance('zmq.Socket')
51 pub_socket = Instance('zmq.Socket')
51 pub_socket = Instance('zmq.Socket')
52 req_socket = Instance('zmq.Socket')
52 req_socket = Instance('zmq.Socket')
53
53
54 def __init__(self, **kwargs):
54 def __init__(self, **kwargs):
55 super(Kernel, self).__init__(**kwargs)
55 super(Kernel, self).__init__(**kwargs)
56 self.user_ns = {}
56 self.user_ns = {}
57 self.history = []
57 self.history = []
58 self.compiler = CommandCompiler()
58 self.compiler = CommandCompiler()
59 self.completer = KernelCompleter(self.user_ns)
59 self.completer = KernelCompleter(self.user_ns)
60
60
61 # Build dict of handlers for message types
61 # Build dict of handlers for message types
62 msg_types = [ 'execute_request', 'complete_request',
62 msg_types = [ 'execute_request', 'complete_request',
63 'object_info_request', 'shutdown_request' ]
63 'object_info_request', 'shutdown_request' ]
64 self.handlers = {}
64 self.handlers = {}
65 for msg_type in msg_types:
65 for msg_type in msg_types:
66 self.handlers[msg_type] = getattr(self, msg_type)
66 self.handlers[msg_type] = getattr(self, msg_type)
67
67
68 def start(self):
68 def start(self):
69 """ Start the kernel main loop.
69 """ Start the kernel main loop.
70 """
70 """
71 while True:
71 while True:
72 ident,msg = self.session.recv(self.reply_socket,0)
72 ident,msg = self.session.recv(self.reply_socket,0)
73 assert ident is not None, "Missing message part."
73 assert ident is not None, "Missing message part."
74 omsg = Message(msg)
74 omsg = Message(msg)
75 print>>sys.__stdout__
75 print>>sys.__stdout__
76 print>>sys.__stdout__, omsg
76 print>>sys.__stdout__, omsg
77 handler = self.handlers.get(omsg.msg_type, None)
77 handler = self.handlers.get(omsg.msg_type, None)
78 if handler is None:
78 if handler is None:
79 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
79 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
80 else:
80 else:
81 handler(ident, omsg)
81 handler(ident, omsg)
82
82
83 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
83 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
84 """Record the ports that this kernel is using.
84 """Record the ports that this kernel is using.
85
85
86 The creator of the Kernel instance must call this methods if they
86 The creator of the Kernel instance must call this methods if they
87 want the :meth:`connect_request` method to return the port numbers.
87 want the :meth:`connect_request` method to return the port numbers.
88 """
88 """
89 self._recorded_ports = {
89 self._recorded_ports = {
90 'xrep_port' : xrep_port,
90 'xrep_port' : xrep_port,
91 'pub_port' : pub_port,
91 'pub_port' : pub_port,
92 'req_port' : req_port,
92 'req_port' : req_port,
93 'hb_port' : hb_port
93 'hb_port' : hb_port
94 }
94 }
95
95
96 #---------------------------------------------------------------------------
96 #---------------------------------------------------------------------------
97 # Kernel request handlers
97 # Kernel request handlers
98 #---------------------------------------------------------------------------
98 #---------------------------------------------------------------------------
99
99
100 def execute_request(self, ident, parent):
100 def execute_request(self, ident, parent):
101 try:
101 try:
102 code = parent[u'content'][u'code']
102 code = parent[u'content'][u'code']
103 except:
103 except:
104 print>>sys.__stderr__, "Got bad msg: "
104 print>>sys.__stderr__, "Got bad msg: "
105 print>>sys.__stderr__, Message(parent)
105 print>>sys.__stderr__, Message(parent)
106 return
106 return
107 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
107 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
108
108
109 try:
109 try:
110 comp_code = self.compiler(code, '<zmq-kernel>')
110 comp_code = self.compiler(code, '<zmq-kernel>')
111
111
112 # Replace raw_input. Note that is not sufficient to replace
112 # Replace raw_input. Note that is not sufficient to replace
113 # raw_input in the user namespace.
113 # raw_input in the user namespace.
114 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
114 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
115 __builtin__.raw_input = raw_input
115 __builtin__.raw_input = raw_input
116
116
117 # Set the parent message of the display hook and out streams.
117 # Set the parent message of the display hook and out streams.
118 sys.displayhook.set_parent(parent)
118 sys.displayhook.set_parent(parent)
119 sys.stdout.set_parent(parent)
119 sys.stdout.set_parent(parent)
120 sys.stderr.set_parent(parent)
120 sys.stderr.set_parent(parent)
121
121
122 exec comp_code in self.user_ns, self.user_ns
122 exec comp_code in self.user_ns, self.user_ns
123 except:
123 except:
124 etype, evalue, tb = sys.exc_info()
124 etype, evalue, tb = sys.exc_info()
125 tb = traceback.format_exception(etype, evalue, tb)
125 tb = traceback.format_exception(etype, evalue, tb)
126 exc_content = {
126 exc_content = {
127 u'status' : u'error',
127 u'status' : u'error',
128 u'traceback' : tb,
128 u'traceback' : tb,
129 u'ename' : unicode(etype.__name__),
129 u'ename' : unicode(etype.__name__),
130 u'evalue' : unicode(evalue)
130 u'evalue' : unicode(evalue)
131 }
131 }
132 exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent)
132 exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent)
133 reply_content = exc_content
133 reply_content = exc_content
134 else:
134 else:
135 reply_content = { 'status' : 'ok', 'payload' : {} }
135 reply_content = { 'status' : 'ok', 'payload' : {} }
136
136
137 # Flush output before sending the reply.
137 # Flush output before sending the reply.
138 sys.stderr.flush()
138 sys.stderr.flush()
139 sys.stdout.flush()
139 sys.stdout.flush()
140
140
141 # Send the reply.
141 # Send the reply.
142 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
142 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
143 print>>sys.__stdout__, Message(reply_msg)
143 print>>sys.__stdout__, Message(reply_msg)
144 if reply_msg['content']['status'] == u'error':
144 if reply_msg['content']['status'] == u'error':
145 self._abort_queue()
145 self._abort_queue()
146
146
147 def complete_request(self, ident, parent):
147 def complete_request(self, ident, parent):
148 matches = {'matches' : self._complete(parent),
148 matches = {'matches' : self._complete(parent),
149 'status' : 'ok'}
149 'status' : 'ok'}
150 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
150 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
151 matches, parent, ident)
151 matches, parent, ident)
152 print >> sys.__stdout__, completion_msg
152 print >> sys.__stdout__, completion_msg
153
153
154 def object_info_request(self, ident, parent):
154 def object_info_request(self, ident, parent):
155 context = parent['content']['oname'].split('.')
155 context = parent['content']['oname'].split('.')
156 object_info = self._object_info(context)
156 object_info = self._object_info(context)
157 msg = self.session.send(self.reply_socket, 'object_info_reply',
157 msg = self.session.send(self.reply_socket, 'object_info_reply',
158 object_info, parent, ident)
158 object_info, parent, ident)
159 print >> sys.__stdout__, msg
159 print >> sys.__stdout__, msg
160
160
161 def shutdown_request(self, ident, parent):
161 def shutdown_request(self, ident, parent):
162 content = dict(parent['content'])
162 content = dict(parent['content'])
163 msg = self.session.send(self.reply_socket, 'shutdown_reply',
163 msg = self.session.send(self.reply_socket, 'shutdown_reply',
164 content, parent, ident)
164 content, parent, ident)
165 msg = self.session.send(self.pub_socket, 'shutdown_reply',
165 msg = self.session.send(self.pub_socket, 'shutdown_reply',
166 content, parent, ident)
166 content, parent, ident)
167 print >> sys.__stdout__, msg
167 print >> sys.__stdout__, msg
168 time.sleep(0.1)
168 time.sleep(0.1)
169 sys.exit(0)
169 sys.exit(0)
170
170
171 #---------------------------------------------------------------------------
171 #---------------------------------------------------------------------------
172 # Protected interface
172 # Protected interface
173 #---------------------------------------------------------------------------
173 #---------------------------------------------------------------------------
174
174
175 def _abort_queue(self):
175 def _abort_queue(self):
176 while True:
176 while True:
177 try:
177 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
178 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
178 if msg is None:
179 except zmq.ZMQError, e:
179 break
180 if e.errno == zmq.EAGAIN:
181 break
182 else:
180 else:
183 assert ident is not None, "Missing message part."
181 assert ident is not None, "Unexpected missing message part."
184 print>>sys.__stdout__, "Aborting:"
182 print>>sys.__stdout__, "Aborting:"
185 print>>sys.__stdout__, Message(msg)
183 print>>sys.__stdout__, Message(msg)
186 msg_type = msg['msg_type']
184 msg_type = msg['msg_type']
187 reply_type = msg_type.split('_')[0] + '_reply'
185 reply_type = msg_type.split('_')[0] + '_reply'
188 reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
186 reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
189 print>>sys.__stdout__, Message(reply_msg)
187 print>>sys.__stdout__, Message(reply_msg)
190 # We need to wait a bit for requests to come in. This can probably
188 # We need to wait a bit for requests to come in. This can probably
191 # be set shorter for true asynchronous clients.
189 # be set shorter for true asynchronous clients.
192 time.sleep(0.1)
190 time.sleep(0.1)
193
191
194 def _raw_input(self, prompt, ident, parent):
192 def _raw_input(self, prompt, ident, parent):
195 # Flush output before making the request.
193 # Flush output before making the request.
196 sys.stderr.flush()
194 sys.stderr.flush()
197 sys.stdout.flush()
195 sys.stdout.flush()
198
196
199 # Send the input request.
197 # Send the input request.
200 content = dict(prompt=prompt)
198 content = dict(prompt=prompt)
201 msg = self.session.send(self.req_socket, u'input_request', content, parent)
199 msg = self.session.send(self.req_socket, u'input_request', content, parent)
202
200
203 # Await a response.
201 # Await a response.
204 ident,reply = self.session.recv(self.req_socket, 0)
202 ident,reply = self.session.recv(self.req_socket, 0)
205 try:
203 try:
206 value = reply['content']['value']
204 value = reply['content']['value']
207 except:
205 except:
208 print>>sys.__stderr__, "Got bad raw_input reply: "
206 print>>sys.__stderr__, "Got bad raw_input reply: "
209 print>>sys.__stderr__, Message(parent)
207 print>>sys.__stderr__, Message(parent)
210 value = ''
208 value = ''
211 return value
209 return value
212
210
213 def _complete(self, msg):
211 def _complete(self, msg):
214 return self.completer.complete(msg.content.line, msg.content.text)
212 return self.completer.complete(msg.content.line, msg.content.text)
215
213
216 def _object_info(self, context):
214 def _object_info(self, context):
217 symbol, leftover = self._symbol_from_context(context)
215 symbol, leftover = self._symbol_from_context(context)
218 if symbol is not None and not leftover:
216 if symbol is not None and not leftover:
219 doc = getattr(symbol, '__doc__', '')
217 doc = getattr(symbol, '__doc__', '')
220 else:
218 else:
221 doc = ''
219 doc = ''
222 object_info = dict(docstring = doc)
220 object_info = dict(docstring = doc)
223 return object_info
221 return object_info
224
222
225 def _symbol_from_context(self, context):
223 def _symbol_from_context(self, context):
226 if not context:
224 if not context:
227 return None, context
225 return None, context
228
226
229 base_symbol_string = context[0]
227 base_symbol_string = context[0]
230 symbol = self.user_ns.get(base_symbol_string, None)
228 symbol = self.user_ns.get(base_symbol_string, None)
231 if symbol is None:
229 if symbol is None:
232 symbol = __builtin__.__dict__.get(base_symbol_string, None)
230 symbol = __builtin__.__dict__.get(base_symbol_string, None)
233 if symbol is None:
231 if symbol is None:
234 return None, context
232 return None, context
235
233
236 context = context[1:]
234 context = context[1:]
237 for i, name in enumerate(context):
235 for i, name in enumerate(context):
238 new_symbol = getattr(symbol, name, None)
236 new_symbol = getattr(symbol, name, None)
239 if new_symbol is None:
237 if new_symbol is None:
240 return symbol, context[i:]
238 return symbol, context[i:]
241 else:
239 else:
242 symbol = new_symbol
240 symbol = new_symbol
243
241
244 return symbol, []
242 return symbol, []
245
243
246 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
247 # Kernel main and launch functions
245 # Kernel main and launch functions
248 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
249
247
250 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
248 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
251 stdin=None, stdout=None, stderr=None,
249 stdin=None, stdout=None, stderr=None,
252 executable=None, independent=False):
250 executable=None, independent=False):
253 """ Launches a localhost kernel, binding to the specified ports.
251 """ Launches a localhost kernel, binding to the specified ports.
254
252
255 Parameters
253 Parameters
256 ----------
254 ----------
257 ip : str, optional
255 ip : str, optional
258 The ip address the kernel will bind to.
256 The ip address the kernel will bind to.
259
257
260 xrep_port : int, optional
258 xrep_port : int, optional
261 The port to use for XREP channel.
259 The port to use for XREP channel.
262
260
263 pub_port : int, optional
261 pub_port : int, optional
264 The port to use for the SUB channel.
262 The port to use for the SUB channel.
265
263
266 req_port : int, optional
264 req_port : int, optional
267 The port to use for the REQ (raw input) channel.
265 The port to use for the REQ (raw input) channel.
268
266
269 hb_port : int, optional
267 hb_port : int, optional
270 The port to use for the hearbeat REP channel.
268 The port to use for the hearbeat REP channel.
271
269
272 stdin, stdout, stderr : optional (default None)
270 stdin, stdout, stderr : optional (default None)
273 Standards streams, as defined in subprocess.Popen.
271 Standards streams, as defined in subprocess.Popen.
274
272
275 executable : str, optional (default sys.executable)
273 executable : str, optional (default sys.executable)
276 The Python executable to use for the kernel process.
274 The Python executable to use for the kernel process.
277
275
278 independent : bool, optional (default False)
276 independent : bool, optional (default False)
279 If set, the kernel process is guaranteed to survive if this process
277 If set, the kernel process is guaranteed to survive if this process
280 dies. If not set, an effort is made to ensure that the kernel is killed
278 dies. If not set, an effort is made to ensure that the kernel is killed
281 when this process dies. Note that in this case it is still good practice
279 when this process dies. Note that in this case it is still good practice
282 to kill kernels manually before exiting.
280 to kill kernels manually before exiting.
283
281
284 Returns
282 Returns
285 -------
283 -------
286 A tuple of form:
284 A tuple of form:
287 (kernel_process, xrep_port, pub_port, req_port)
285 (kernel_process, xrep_port, pub_port, req_port)
288 where kernel_process is a Popen object and the ports are integers.
286 where kernel_process is a Popen object and the ports are integers.
289 """
287 """
290 extra_arguments = []
288 extra_arguments = []
291 if ip is not None:
289 if ip is not None:
292 extra_arguments.append('--ip')
290 extra_arguments.append('--ip')
293 if isinstance(ip, basestring):
291 if isinstance(ip, basestring):
294 extra_arguments.append(ip)
292 extra_arguments.append(ip)
295
293
296 return base_launch_kernel('from IPython.zmq.pykernel import main; main()',
294 return base_launch_kernel('from IPython.zmq.pykernel import main; main()',
297 xrep_port, pub_port, req_port, hb_port,
295 xrep_port, pub_port, req_port, hb_port,
298 stdin, stdout, stderr,
296 stdin, stdout, stderr,
299 executable, independent, extra_arguments)
297 executable, independent, extra_arguments)
300
298
301 main = make_default_main(Kernel)
299 main = make_default_main(Kernel)
302
300
303 if __name__ == '__main__':
301 if __name__ == '__main__':
304 main()
302 main()
General Comments 0
You need to be logged in to leave comments. Login now