##// END OF EJS Templates
Adding temp refs to msg_type to prevent nested dict gets.
Brian E. Granger -
Show More
@@ -1,438 +1,440 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 * Brian Granger
8 * Brian Granger
9 * Fernando Perez
9 * Fernando Perez
10 * Evan Patterson
10 * Evan Patterson
11 """
11 """
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2010-2011 The IPython Development Team
13 # Copyright (C) 2010-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 # Standard library imports.
23 # Standard library imports.
24 from __future__ import print_function
24 from __future__ import print_function
25
25
26 import sys
26 import sys
27 import time
27 import time
28
28
29 from code import CommandCompiler
29 from code import CommandCompiler
30 from datetime import datetime
30 from datetime import datetime
31 from pprint import pprint
31 from pprint import pprint
32
32
33 # System library imports.
33 # System library imports.
34 import zmq
34 import zmq
35 from zmq.eventloop import ioloop, zmqstream
35 from zmq.eventloop import ioloop, zmqstream
36
36
37 # Local imports.
37 # Local imports.
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
38 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
39 from IPython.zmq.completer import KernelCompleter
39 from IPython.zmq.completer import KernelCompleter
40
40
41 from IPython.parallel.error import wrap_exception
41 from IPython.parallel.error import wrap_exception
42 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.factory import SessionFactory
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
43 from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
44
44
45 def printer(*args):
45 def printer(*args):
46 pprint(args, stream=sys.__stdout__)
46 pprint(args, stream=sys.__stdout__)
47
47
48
48
49 class _Passer(zmqstream.ZMQStream):
49 class _Passer(zmqstream.ZMQStream):
50 """Empty class that implements `send()` that does nothing.
50 """Empty class that implements `send()` that does nothing.
51
51
52 Subclass ZMQStream for Session typechecking
52 Subclass ZMQStream for Session typechecking
53
53
54 """
54 """
55 def __init__(self, *args, **kwargs):
55 def __init__(self, *args, **kwargs):
56 pass
56 pass
57
57
58 def send(self, *args, **kwargs):
58 def send(self, *args, **kwargs):
59 pass
59 pass
60 send_multipart = send
60 send_multipart = send
61
61
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Main kernel class
64 # Main kernel class
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67 class Kernel(SessionFactory):
67 class Kernel(SessionFactory):
68
68
69 #---------------------------------------------------------------------------
69 #---------------------------------------------------------------------------
70 # Kernel interface
70 # Kernel interface
71 #---------------------------------------------------------------------------
71 #---------------------------------------------------------------------------
72
72
73 # kwargs:
73 # kwargs:
74 exec_lines = List(Unicode, config=True,
74 exec_lines = List(Unicode, config=True,
75 help="List of lines to execute")
75 help="List of lines to execute")
76
76
77 # identities:
77 # identities:
78 int_id = Int(-1)
78 int_id = Int(-1)
79 bident = CBytes()
79 bident = CBytes()
80 ident = Unicode()
80 ident = Unicode()
81 def _ident_changed(self, name, old, new):
81 def _ident_changed(self, name, old, new):
82 self.bident = asbytes(new)
82 self.bident = asbytes(new)
83
83
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
84 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
85
85
86 control_stream = Instance(zmqstream.ZMQStream)
86 control_stream = Instance(zmqstream.ZMQStream)
87 task_stream = Instance(zmqstream.ZMQStream)
87 task_stream = Instance(zmqstream.ZMQStream)
88 iopub_stream = Instance(zmqstream.ZMQStream)
88 iopub_stream = Instance(zmqstream.ZMQStream)
89 client = Instance('IPython.parallel.Client')
89 client = Instance('IPython.parallel.Client')
90
90
91 # internals
91 # internals
92 shell_streams = List()
92 shell_streams = List()
93 compiler = Instance(CommandCompiler, (), {})
93 compiler = Instance(CommandCompiler, (), {})
94 completer = Instance(KernelCompleter)
94 completer = Instance(KernelCompleter)
95
95
96 aborted = Set()
96 aborted = Set()
97 shell_handlers = Dict()
97 shell_handlers = Dict()
98 control_handlers = Dict()
98 control_handlers = Dict()
99
99
100 def _set_prefix(self):
100 def _set_prefix(self):
101 self.prefix = "engine.%s"%self.int_id
101 self.prefix = "engine.%s"%self.int_id
102
102
103 def _connect_completer(self):
103 def _connect_completer(self):
104 self.completer = KernelCompleter(self.user_ns)
104 self.completer = KernelCompleter(self.user_ns)
105
105
106 def __init__(self, **kwargs):
106 def __init__(self, **kwargs):
107 super(Kernel, self).__init__(**kwargs)
107 super(Kernel, self).__init__(**kwargs)
108 self._set_prefix()
108 self._set_prefix()
109 self._connect_completer()
109 self._connect_completer()
110
110
111 self.on_trait_change(self._set_prefix, 'id')
111 self.on_trait_change(self._set_prefix, 'id')
112 self.on_trait_change(self._connect_completer, 'user_ns')
112 self.on_trait_change(self._connect_completer, 'user_ns')
113
113
114 # Build dict of handlers for message types
114 # Build dict of handlers for message types
115 for msg_type in ['execute_request', 'complete_request', 'apply_request',
115 for msg_type in ['execute_request', 'complete_request', 'apply_request',
116 'clear_request']:
116 'clear_request']:
117 self.shell_handlers[msg_type] = getattr(self, msg_type)
117 self.shell_handlers[msg_type] = getattr(self, msg_type)
118
118
119 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
119 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
120 self.control_handlers[msg_type] = getattr(self, msg_type)
120 self.control_handlers[msg_type] = getattr(self, msg_type)
121
121
122 self._initial_exec_lines()
122 self._initial_exec_lines()
123
123
124 def _wrap_exception(self, method=None):
124 def _wrap_exception(self, method=None):
125 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
125 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
126 content=wrap_exception(e_info)
126 content=wrap_exception(e_info)
127 return content
127 return content
128
128
129 def _initial_exec_lines(self):
129 def _initial_exec_lines(self):
130 s = _Passer()
130 s = _Passer()
131 content = dict(silent=True, user_variable=[],user_expressions=[])
131 content = dict(silent=True, user_variable=[],user_expressions=[])
132 for line in self.exec_lines:
132 for line in self.exec_lines:
133 self.log.debug("executing initialization: %s"%line)
133 self.log.debug("executing initialization: %s"%line)
134 content.update({'code':line})
134 content.update({'code':line})
135 msg = self.session.msg('execute_request', content)
135 msg = self.session.msg('execute_request', content)
136 self.execute_request(s, [], msg)
136 self.execute_request(s, [], msg)
137
137
138
138
139 #-------------------- control handlers -----------------------------
139 #-------------------- control handlers -----------------------------
140 def abort_queues(self):
140 def abort_queues(self):
141 for stream in self.shell_streams:
141 for stream in self.shell_streams:
142 if stream:
142 if stream:
143 self.abort_queue(stream)
143 self.abort_queue(stream)
144
144
145 def abort_queue(self, stream):
145 def abort_queue(self, stream):
146 while True:
146 while True:
147 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
147 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
148 if msg is None:
148 if msg is None:
149 return
149 return
150
150
151 self.log.info("Aborting:")
151 self.log.info("Aborting:")
152 self.log.info(str(msg))
152 self.log.info(str(msg))
153 msg_type = msg['header']['msg_type']
153 msg_type = msg['header']['msg_type']
154 reply_type = msg_type.split('_')[0] + '_reply'
154 reply_type = msg_type.split('_')[0] + '_reply'
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
156 # self.reply_socket.send(ident,zmq.SNDMORE)
157 # self.reply_socket.send_json(reply_msg)
157 # self.reply_socket.send_json(reply_msg)
158 reply_msg = self.session.send(stream, reply_type,
158 reply_msg = self.session.send(stream, reply_type,
159 content={'status' : 'aborted'}, parent=msg, ident=idents)
159 content={'status' : 'aborted'}, parent=msg, ident=idents)
160 self.log.debug(str(reply_msg))
160 self.log.debug(str(reply_msg))
161 # We need to wait a bit for requests to come in. This can probably
161 # We need to wait a bit for requests to come in. This can probably
162 # be set shorter for true asynchronous clients.
162 # be set shorter for true asynchronous clients.
163 time.sleep(0.05)
163 time.sleep(0.05)
164
164
165 def abort_request(self, stream, ident, parent):
165 def abort_request(self, stream, ident, parent):
166 """abort a specifig msg by id"""
166 """abort a specifig msg by id"""
167 msg_ids = parent['content'].get('msg_ids', None)
167 msg_ids = parent['content'].get('msg_ids', None)
168 if isinstance(msg_ids, basestring):
168 if isinstance(msg_ids, basestring):
169 msg_ids = [msg_ids]
169 msg_ids = [msg_ids]
170 if not msg_ids:
170 if not msg_ids:
171 self.abort_queues()
171 self.abort_queues()
172 for mid in msg_ids:
172 for mid in msg_ids:
173 self.aborted.add(str(mid))
173 self.aborted.add(str(mid))
174
174
175 content = dict(status='ok')
175 content = dict(status='ok')
176 reply_msg = self.session.send(stream, 'abort_reply', content=content,
176 reply_msg = self.session.send(stream, 'abort_reply', content=content,
177 parent=parent, ident=ident)
177 parent=parent, ident=ident)
178 self.log.debug(str(reply_msg))
178 self.log.debug(str(reply_msg))
179
179
180 def shutdown_request(self, stream, ident, parent):
180 def shutdown_request(self, stream, ident, parent):
181 """kill ourself. This should really be handled in an external process"""
181 """kill ourself. This should really be handled in an external process"""
182 try:
182 try:
183 self.abort_queues()
183 self.abort_queues()
184 except:
184 except:
185 content = self._wrap_exception('shutdown')
185 content = self._wrap_exception('shutdown')
186 else:
186 else:
187 content = dict(parent['content'])
187 content = dict(parent['content'])
188 content['status'] = 'ok'
188 content['status'] = 'ok'
189 msg = self.session.send(stream, 'shutdown_reply',
189 msg = self.session.send(stream, 'shutdown_reply',
190 content=content, parent=parent, ident=ident)
190 content=content, parent=parent, ident=ident)
191 self.log.debug(str(msg))
191 self.log.debug(str(msg))
192 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
193 dc.start()
193 dc.start()
194
194
195 def dispatch_control(self, msg):
195 def dispatch_control(self, msg):
196 idents,msg = self.session.feed_identities(msg, copy=False)
196 idents,msg = self.session.feed_identities(msg, copy=False)
197 try:
197 try:
198 msg = self.session.unserialize(msg, content=True, copy=False)
198 msg = self.session.unserialize(msg, content=True, copy=False)
199 except:
199 except:
200 self.log.error("Invalid Message", exc_info=True)
200 self.log.error("Invalid Message", exc_info=True)
201 return
201 return
202 else:
202 else:
203 self.log.debug("Control received, %s", msg)
203 self.log.debug("Control received, %s", msg)
204
204
205 header = msg['header']
205 header = msg['header']
206 msg_id = header['msg_id']
206 msg_id = header['msg_id']
207
207 msg_type = header['msg_type']
208 handler = self.control_handlers.get(msg['header']['msg_type'], None)
208
209 handler = self.control_handlers.get(msg_type, None)
209 if handler is None:
210 if handler is None:
210 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['header']['msg_type'])
211 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
211 else:
212 else:
212 handler(self.control_stream, idents, msg)
213 handler(self.control_stream, idents, msg)
213
214
214
215
215 #-------------------- queue helpers ------------------------------
216 #-------------------- queue helpers ------------------------------
216
217
217 def check_dependencies(self, dependencies):
218 def check_dependencies(self, dependencies):
218 if not dependencies:
219 if not dependencies:
219 return True
220 return True
220 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
221 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
221 anyorall = dependencies[0]
222 anyorall = dependencies[0]
222 dependencies = dependencies[1]
223 dependencies = dependencies[1]
223 else:
224 else:
224 anyorall = 'all'
225 anyorall = 'all'
225 results = self.client.get_results(dependencies,status_only=True)
226 results = self.client.get_results(dependencies,status_only=True)
226 if results['status'] != 'ok':
227 if results['status'] != 'ok':
227 return False
228 return False
228
229
229 if anyorall == 'any':
230 if anyorall == 'any':
230 if not results['completed']:
231 if not results['completed']:
231 return False
232 return False
232 else:
233 else:
233 if results['pending']:
234 if results['pending']:
234 return False
235 return False
235
236
236 return True
237 return True
237
238
238 def check_aborted(self, msg_id):
239 def check_aborted(self, msg_id):
239 return msg_id in self.aborted
240 return msg_id in self.aborted
240
241
241 #-------------------- queue handlers -----------------------------
242 #-------------------- queue handlers -----------------------------
242
243
243 def clear_request(self, stream, idents, parent):
244 def clear_request(self, stream, idents, parent):
244 """Clear our namespace."""
245 """Clear our namespace."""
245 self.user_ns = {}
246 self.user_ns = {}
246 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
247 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
247 content = dict(status='ok'))
248 content = dict(status='ok'))
248 self._initial_exec_lines()
249 self._initial_exec_lines()
249
250
250 def execute_request(self, stream, ident, parent):
251 def execute_request(self, stream, ident, parent):
251 self.log.debug('execute request %s'%parent)
252 self.log.debug('execute request %s'%parent)
252 try:
253 try:
253 code = parent[u'content'][u'code']
254 code = parent[u'content'][u'code']
254 except:
255 except:
255 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 self.log.error("Got bad msg: %s"%parent, exc_info=True)
256 return
257 return
257 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
258 ident=asbytes('%s.pyin'%self.prefix))
259 ident=asbytes('%s.pyin'%self.prefix))
259 started = datetime.now()
260 started = datetime.now()
260 try:
261 try:
261 comp_code = self.compiler(code, '<zmq-kernel>')
262 comp_code = self.compiler(code, '<zmq-kernel>')
262 # allow for not overriding displayhook
263 # allow for not overriding displayhook
263 if hasattr(sys.displayhook, 'set_parent'):
264 if hasattr(sys.displayhook, 'set_parent'):
264 sys.displayhook.set_parent(parent)
265 sys.displayhook.set_parent(parent)
265 sys.stdout.set_parent(parent)
266 sys.stdout.set_parent(parent)
266 sys.stderr.set_parent(parent)
267 sys.stderr.set_parent(parent)
267 exec comp_code in self.user_ns, self.user_ns
268 exec comp_code in self.user_ns, self.user_ns
268 except:
269 except:
269 exc_content = self._wrap_exception('execute')
270 exc_content = self._wrap_exception('execute')
270 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
271 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
272 ident=asbytes('%s.pyerr'%self.prefix))
273 ident=asbytes('%s.pyerr'%self.prefix))
273 reply_content = exc_content
274 reply_content = exc_content
274 else:
275 else:
275 reply_content = {'status' : 'ok'}
276 reply_content = {'status' : 'ok'}
276
277
277 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
278 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
278 ident=ident, subheader = dict(started=started))
279 ident=ident, subheader = dict(started=started))
279 self.log.debug(str(reply_msg))
280 self.log.debug(str(reply_msg))
280 if reply_msg['content']['status'] == u'error':
281 if reply_msg['content']['status'] == u'error':
281 self.abort_queues()
282 self.abort_queues()
282
283
283 def complete_request(self, stream, ident, parent):
284 def complete_request(self, stream, ident, parent):
284 matches = {'matches' : self.complete(parent),
285 matches = {'matches' : self.complete(parent),
285 'status' : 'ok'}
286 'status' : 'ok'}
286 completion_msg = self.session.send(stream, 'complete_reply',
287 completion_msg = self.session.send(stream, 'complete_reply',
287 matches, parent, ident)
288 matches, parent, ident)
288 # print >> sys.__stdout__, completion_msg
289 # print >> sys.__stdout__, completion_msg
289
290
290 def complete(self, msg):
291 def complete(self, msg):
291 return self.completer.complete(msg.content.line, msg.content.text)
292 return self.completer.complete(msg.content.line, msg.content.text)
292
293
293 def apply_request(self, stream, ident, parent):
294 def apply_request(self, stream, ident, parent):
294 # flush previous reply, so this request won't block it
295 # flush previous reply, so this request won't block it
295 stream.flush(zmq.POLLOUT)
296 stream.flush(zmq.POLLOUT)
296 try:
297 try:
297 content = parent[u'content']
298 content = parent[u'content']
298 bufs = parent[u'buffers']
299 bufs = parent[u'buffers']
299 msg_id = parent['header']['msg_id']
300 msg_id = parent['header']['msg_id']
300 # bound = parent['header'].get('bound', False)
301 # bound = parent['header'].get('bound', False)
301 except:
302 except:
302 self.log.error("Got bad msg: %s"%parent, exc_info=True)
303 self.log.error("Got bad msg: %s"%parent, exc_info=True)
303 return
304 return
304 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
305 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
305 # self.iopub_stream.send(pyin_msg)
306 # self.iopub_stream.send(pyin_msg)
306 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
307 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
307 sub = {'dependencies_met' : True, 'engine' : self.ident,
308 sub = {'dependencies_met' : True, 'engine' : self.ident,
308 'started': datetime.now()}
309 'started': datetime.now()}
309 try:
310 try:
310 # allow for not overriding displayhook
311 # allow for not overriding displayhook
311 if hasattr(sys.displayhook, 'set_parent'):
312 if hasattr(sys.displayhook, 'set_parent'):
312 sys.displayhook.set_parent(parent)
313 sys.displayhook.set_parent(parent)
313 sys.stdout.set_parent(parent)
314 sys.stdout.set_parent(parent)
314 sys.stderr.set_parent(parent)
315 sys.stderr.set_parent(parent)
315 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
316 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
316 working = self.user_ns
317 working = self.user_ns
317 # suffix =
318 # suffix =
318 prefix = "_"+str(msg_id).replace("-","")+"_"
319 prefix = "_"+str(msg_id).replace("-","")+"_"
319
320
320 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
321 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
321 # if bound:
322 # if bound:
322 # bound_ns = Namespace(working)
323 # bound_ns = Namespace(working)
323 # args = [bound_ns]+list(args)
324 # args = [bound_ns]+list(args)
324
325
325 fname = getattr(f, '__name__', 'f')
326 fname = getattr(f, '__name__', 'f')
326
327
327 fname = prefix+"f"
328 fname = prefix+"f"
328 argname = prefix+"args"
329 argname = prefix+"args"
329 kwargname = prefix+"kwargs"
330 kwargname = prefix+"kwargs"
330 resultname = prefix+"result"
331 resultname = prefix+"result"
331
332
332 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
333 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
333 # print ns
334 # print ns
334 working.update(ns)
335 working.update(ns)
335 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
336 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
336 try:
337 try:
337 exec code in working,working
338 exec code in working,working
338 result = working.get(resultname)
339 result = working.get(resultname)
339 finally:
340 finally:
340 for key in ns.iterkeys():
341 for key in ns.iterkeys():
341 working.pop(key)
342 working.pop(key)
342 # if bound:
343 # if bound:
343 # working.update(bound_ns)
344 # working.update(bound_ns)
344
345
345 packed_result,buf = serialize_object(result)
346 packed_result,buf = serialize_object(result)
346 result_buf = [packed_result]+buf
347 result_buf = [packed_result]+buf
347 except:
348 except:
348 exc_content = self._wrap_exception('apply')
349 exc_content = self._wrap_exception('apply')
349 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
350 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
351 ident=asbytes('%s.pyerr'%self.prefix))
352 ident=asbytes('%s.pyerr'%self.prefix))
352 reply_content = exc_content
353 reply_content = exc_content
353 result_buf = []
354 result_buf = []
354
355
355 if exc_content['ename'] == 'UnmetDependency':
356 if exc_content['ename'] == 'UnmetDependency':
356 sub['dependencies_met'] = False
357 sub['dependencies_met'] = False
357 else:
358 else:
358 reply_content = {'status' : 'ok'}
359 reply_content = {'status' : 'ok'}
359
360
360 # put 'ok'/'error' status in header, for scheduler introspection:
361 # put 'ok'/'error' status in header, for scheduler introspection:
361 sub['status'] = reply_content['status']
362 sub['status'] = reply_content['status']
362
363
363 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
364 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
364 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
365 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
365
366
366 # flush i/o
367 # flush i/o
367 # should this be before reply_msg is sent, like in the single-kernel code,
368 # should this be before reply_msg is sent, like in the single-kernel code,
368 # or should nothing get in the way of real results?
369 # or should nothing get in the way of real results?
369 sys.stdout.flush()
370 sys.stdout.flush()
370 sys.stderr.flush()
371 sys.stderr.flush()
371
372
372 def dispatch_queue(self, stream, msg):
373 def dispatch_queue(self, stream, msg):
373 self.control_stream.flush()
374 self.control_stream.flush()
374 idents,msg = self.session.feed_identities(msg, copy=False)
375 idents,msg = self.session.feed_identities(msg, copy=False)
375 try:
376 try:
376 msg = self.session.unserialize(msg, content=True, copy=False)
377 msg = self.session.unserialize(msg, content=True, copy=False)
377 except:
378 except:
378 self.log.error("Invalid Message", exc_info=True)
379 self.log.error("Invalid Message", exc_info=True)
379 return
380 return
380 else:
381 else:
381 self.log.debug("Message received, %s", msg)
382 self.log.debug("Message received, %s", msg)
382
383
383
384
384 header = msg['header']
385 header = msg['header']
385 msg_id = header['msg_id']
386 msg_id = header['msg_id']
387 msg['header']['msg_type']
386 if self.check_aborted(msg_id):
388 if self.check_aborted(msg_id):
387 self.aborted.remove(msg_id)
389 self.aborted.remove(msg_id)
388 # is it safe to assume a msg_id will not be resubmitted?
390 # is it safe to assume a msg_id will not be resubmitted?
389 reply_type = msg['header']['msg_type'].split('_')[0] + '_reply'
391 reply_type = msg_type.split('_')[0] + '_reply'
390 status = {'status' : 'aborted'}
392 status = {'status' : 'aborted'}
391 reply_msg = self.session.send(stream, reply_type, subheader=status,
393 reply_msg = self.session.send(stream, reply_type, subheader=status,
392 content=status, parent=msg, ident=idents)
394 content=status, parent=msg, ident=idents)
393 return
395 return
394 handler = self.shell_handlers.get(msg['header']['msg_type'], None)
396 handler = self.shell_handlers.get(msg_type, None)
395 if handler is None:
397 if handler is None:
396 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['header']['msg_type'])
398 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
397 else:
399 else:
398 handler(stream, idents, msg)
400 handler(stream, idents, msg)
399
401
400 def start(self):
402 def start(self):
401 #### stream mode:
403 #### stream mode:
402 if self.control_stream:
404 if self.control_stream:
403 self.control_stream.on_recv(self.dispatch_control, copy=False)
405 self.control_stream.on_recv(self.dispatch_control, copy=False)
404 self.control_stream.on_err(printer)
406 self.control_stream.on_err(printer)
405
407
406 def make_dispatcher(stream):
408 def make_dispatcher(stream):
407 def dispatcher(msg):
409 def dispatcher(msg):
408 return self.dispatch_queue(stream, msg)
410 return self.dispatch_queue(stream, msg)
409 return dispatcher
411 return dispatcher
410
412
411 for s in self.shell_streams:
413 for s in self.shell_streams:
412 s.on_recv(make_dispatcher(s), copy=False)
414 s.on_recv(make_dispatcher(s), copy=False)
413 s.on_err(printer)
415 s.on_err(printer)
414
416
415 if self.iopub_stream:
417 if self.iopub_stream:
416 self.iopub_stream.on_err(printer)
418 self.iopub_stream.on_err(printer)
417
419
418 #### while True mode:
420 #### while True mode:
419 # while True:
421 # while True:
420 # idle = True
422 # idle = True
421 # try:
423 # try:
422 # msg = self.shell_stream.socket.recv_multipart(
424 # msg = self.shell_stream.socket.recv_multipart(
423 # zmq.NOBLOCK, copy=False)
425 # zmq.NOBLOCK, copy=False)
424 # except zmq.ZMQError, e:
426 # except zmq.ZMQError, e:
425 # if e.errno != zmq.EAGAIN:
427 # if e.errno != zmq.EAGAIN:
426 # raise e
428 # raise e
427 # else:
429 # else:
428 # idle=False
430 # idle=False
429 # self.dispatch_queue(self.shell_stream, msg)
431 # self.dispatch_queue(self.shell_stream, msg)
430 #
432 #
431 # if not self.task_stream.empty():
433 # if not self.task_stream.empty():
432 # idle=False
434 # idle=False
433 # msg = self.task_stream.recv_multipart()
435 # msg = self.task_stream.recv_multipart()
434 # self.dispatch_queue(self.task_stream, msg)
436 # self.dispatch_queue(self.task_stream, msg)
435 # if idle:
437 # if idle:
436 # # don't busywait
438 # # don't busywait
437 # time.sleep(1e-3)
439 # time.sleep(1e-3)
438
440
@@ -1,179 +1,180 b''
1 """Tests for db backends
1 """Tests for db backends
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import tempfile
21 import tempfile
22 import time
22 import time
23
23
24 from datetime import datetime, timedelta
24 from datetime import datetime, timedelta
25 from unittest import TestCase
25 from unittest import TestCase
26
26
27 from nose import SkipTest
27 from nose import SkipTest
28
28
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel.controller.dictdb import DictDB
30 from IPython.parallel.controller.dictdb import DictDB
31 from IPython.parallel.controller.sqlitedb import SQLiteDB
31 from IPython.parallel.controller.sqlitedb import SQLiteDB
32 from IPython.parallel.controller.hub import init_record, empty_record
32 from IPython.parallel.controller.hub import init_record, empty_record
33
33
34 from IPython.zmq.session import Session
34 from IPython.zmq.session import Session
35
35
36
36
37 #-------------------------------------------------------------------------------
37 #-------------------------------------------------------------------------------
38 # TestCases
38 # TestCases
39 #-------------------------------------------------------------------------------
39 #-------------------------------------------------------------------------------
40
40
41 class TestDictBackend(TestCase):
41 class TestDictBackend(TestCase):
42 def setUp(self):
42 def setUp(self):
43 self.session = Session()
43 self.session = Session()
44 self.db = self.create_db()
44 self.db = self.create_db()
45 self.load_records(16)
45 self.load_records(16)
46
46
47 def create_db(self):
47 def create_db(self):
48 return DictDB()
48 return DictDB()
49
49
50 def load_records(self, n=1):
50 def load_records(self, n=1):
51 """load n records for testing"""
51 """load n records for testing"""
52 #sleep 1/10 s, to ensure timestamp is different to previous calls
52 #sleep 1/10 s, to ensure timestamp is different to previous calls
53 time.sleep(0.1)
53 time.sleep(0.1)
54 msg_ids = []
54 msg_ids = []
55 for i in range(n):
55 for i in range(n):
56 msg = self.session.msg('apply_request', content=dict(a=5))
56 msg = self.session.msg('apply_request', content=dict(a=5))
57 msg['buffers'] = []
57 msg['buffers'] = []
58 rec = init_record(msg)
58 rec = init_record(msg)
59 msg_ids.append(msg['header']['msg_id'])
59 msg_id = msg['header']['msg_id']
60 self.db.add_record(msg['header']['msg_id'], rec)
60 msg_ids.append(msg_id)
61 self.db.add_record(msg_id, rec)
61 return msg_ids
62 return msg_ids
62
63
63 def test_add_record(self):
64 def test_add_record(self):
64 before = self.db.get_history()
65 before = self.db.get_history()
65 self.load_records(5)
66 self.load_records(5)
66 after = self.db.get_history()
67 after = self.db.get_history()
67 self.assertEquals(len(after), len(before)+5)
68 self.assertEquals(len(after), len(before)+5)
68 self.assertEquals(after[:-5],before)
69 self.assertEquals(after[:-5],before)
69
70
70 def test_drop_record(self):
71 def test_drop_record(self):
71 msg_id = self.load_records()[-1]
72 msg_id = self.load_records()[-1]
72 rec = self.db.get_record(msg_id)
73 rec = self.db.get_record(msg_id)
73 self.db.drop_record(msg_id)
74 self.db.drop_record(msg_id)
74 self.assertRaises(KeyError,self.db.get_record, msg_id)
75 self.assertRaises(KeyError,self.db.get_record, msg_id)
75
76
76 def _round_to_millisecond(self, dt):
77 def _round_to_millisecond(self, dt):
77 """necessary because mongodb rounds microseconds"""
78 """necessary because mongodb rounds microseconds"""
78 micro = dt.microsecond
79 micro = dt.microsecond
79 extra = int(str(micro)[-3:])
80 extra = int(str(micro)[-3:])
80 return dt - timedelta(microseconds=extra)
81 return dt - timedelta(microseconds=extra)
81
82
82 def test_update_record(self):
83 def test_update_record(self):
83 now = self._round_to_millisecond(datetime.now())
84 now = self._round_to_millisecond(datetime.now())
84 #
85 #
85 msg_id = self.db.get_history()[-1]
86 msg_id = self.db.get_history()[-1]
86 rec1 = self.db.get_record(msg_id)
87 rec1 = self.db.get_record(msg_id)
87 data = {'stdout': 'hello there', 'completed' : now}
88 data = {'stdout': 'hello there', 'completed' : now}
88 self.db.update_record(msg_id, data)
89 self.db.update_record(msg_id, data)
89 rec2 = self.db.get_record(msg_id)
90 rec2 = self.db.get_record(msg_id)
90 self.assertEquals(rec2['stdout'], 'hello there')
91 self.assertEquals(rec2['stdout'], 'hello there')
91 self.assertEquals(rec2['completed'], now)
92 self.assertEquals(rec2['completed'], now)
92 rec1.update(data)
93 rec1.update(data)
93 self.assertEquals(rec1, rec2)
94 self.assertEquals(rec1, rec2)
94
95
95 # def test_update_record_bad(self):
96 # def test_update_record_bad(self):
96 # """test updating nonexistant records"""
97 # """test updating nonexistant records"""
97 # msg_id = str(uuid.uuid4())
98 # msg_id = str(uuid.uuid4())
98 # data = {'stdout': 'hello there'}
99 # data = {'stdout': 'hello there'}
99 # self.assertRaises(KeyError, self.db.update_record, msg_id, data)
100 # self.assertRaises(KeyError, self.db.update_record, msg_id, data)
100
101
101 def test_find_records_dt(self):
102 def test_find_records_dt(self):
102 """test finding records by date"""
103 """test finding records by date"""
103 hist = self.db.get_history()
104 hist = self.db.get_history()
104 middle = self.db.get_record(hist[len(hist)//2])
105 middle = self.db.get_record(hist[len(hist)//2])
105 tic = middle['submitted']
106 tic = middle['submitted']
106 before = self.db.find_records({'submitted' : {'$lt' : tic}})
107 before = self.db.find_records({'submitted' : {'$lt' : tic}})
107 after = self.db.find_records({'submitted' : {'$gte' : tic}})
108 after = self.db.find_records({'submitted' : {'$gte' : tic}})
108 self.assertEquals(len(before)+len(after),len(hist))
109 self.assertEquals(len(before)+len(after),len(hist))
109 for b in before:
110 for b in before:
110 self.assertTrue(b['submitted'] < tic)
111 self.assertTrue(b['submitted'] < tic)
111 for a in after:
112 for a in after:
112 self.assertTrue(a['submitted'] >= tic)
113 self.assertTrue(a['submitted'] >= tic)
113 same = self.db.find_records({'submitted' : tic})
114 same = self.db.find_records({'submitted' : tic})
114 for s in same:
115 for s in same:
115 self.assertTrue(s['submitted'] == tic)
116 self.assertTrue(s['submitted'] == tic)
116
117
117 def test_find_records_keys(self):
118 def test_find_records_keys(self):
118 """test extracting subset of record keys"""
119 """test extracting subset of record keys"""
119 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
120 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
120 for rec in found:
121 for rec in found:
121 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
122 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
122
123
123 def test_find_records_msg_id(self):
124 def test_find_records_msg_id(self):
124 """ensure msg_id is always in found records"""
125 """ensure msg_id is always in found records"""
125 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
126 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
126 for rec in found:
127 for rec in found:
127 self.assertTrue('msg_id' in rec.keys())
128 self.assertTrue('msg_id' in rec.keys())
128 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])
129 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['submitted'])
129 for rec in found:
130 for rec in found:
130 self.assertTrue('msg_id' in rec.keys())
131 self.assertTrue('msg_id' in rec.keys())
131 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])
132 found = self.db.find_records({'msg_id': {'$ne' : ''}},keys=['msg_id'])
132 for rec in found:
133 for rec in found:
133 self.assertTrue('msg_id' in rec.keys())
134 self.assertTrue('msg_id' in rec.keys())
134
135
135 def test_find_records_in(self):
136 def test_find_records_in(self):
136 """test finding records with '$in','$nin' operators"""
137 """test finding records with '$in','$nin' operators"""
137 hist = self.db.get_history()
138 hist = self.db.get_history()
138 even = hist[::2]
139 even = hist[::2]
139 odd = hist[1::2]
140 odd = hist[1::2]
140 recs = self.db.find_records({ 'msg_id' : {'$in' : even}})
141 recs = self.db.find_records({ 'msg_id' : {'$in' : even}})
141 found = [ r['msg_id'] for r in recs ]
142 found = [ r['msg_id'] for r in recs ]
142 self.assertEquals(set(even), set(found))
143 self.assertEquals(set(even), set(found))
143 recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})
144 recs = self.db.find_records({ 'msg_id' : {'$nin' : even}})
144 found = [ r['msg_id'] for r in recs ]
145 found = [ r['msg_id'] for r in recs ]
145 self.assertEquals(set(odd), set(found))
146 self.assertEquals(set(odd), set(found))
146
147
147 def test_get_history(self):
148 def test_get_history(self):
148 msg_ids = self.db.get_history()
149 msg_ids = self.db.get_history()
149 latest = datetime(1984,1,1)
150 latest = datetime(1984,1,1)
150 for msg_id in msg_ids:
151 for msg_id in msg_ids:
151 rec = self.db.get_record(msg_id)
152 rec = self.db.get_record(msg_id)
152 newt = rec['submitted']
153 newt = rec['submitted']
153 self.assertTrue(newt >= latest)
154 self.assertTrue(newt >= latest)
154 latest = newt
155 latest = newt
155 msg_id = self.load_records(1)[-1]
156 msg_id = self.load_records(1)[-1]
156 self.assertEquals(self.db.get_history()[-1],msg_id)
157 self.assertEquals(self.db.get_history()[-1],msg_id)
157
158
158 def test_datetime(self):
159 def test_datetime(self):
159 """get/set timestamps with datetime objects"""
160 """get/set timestamps with datetime objects"""
160 msg_id = self.db.get_history()[-1]
161 msg_id = self.db.get_history()[-1]
161 rec = self.db.get_record(msg_id)
162 rec = self.db.get_record(msg_id)
162 self.assertTrue(isinstance(rec['submitted'], datetime))
163 self.assertTrue(isinstance(rec['submitted'], datetime))
163 self.db.update_record(msg_id, dict(completed=datetime.now()))
164 self.db.update_record(msg_id, dict(completed=datetime.now()))
164 rec = self.db.get_record(msg_id)
165 rec = self.db.get_record(msg_id)
165 self.assertTrue(isinstance(rec['completed'], datetime))
166 self.assertTrue(isinstance(rec['completed'], datetime))
166
167
167 def test_drop_matching(self):
168 def test_drop_matching(self):
168 msg_ids = self.load_records(10)
169 msg_ids = self.load_records(10)
169 query = {'msg_id' : {'$in':msg_ids}}
170 query = {'msg_id' : {'$in':msg_ids}}
170 self.db.drop_matching_records(query)
171 self.db.drop_matching_records(query)
171 recs = self.db.find_records(query)
172 recs = self.db.find_records(query)
172 self.assertEquals(len(recs), 0)
173 self.assertEquals(len(recs), 0)
173
174
174 class TestSQLiteBackend(TestDictBackend):
175 class TestSQLiteBackend(TestDictBackend):
175 def create_db(self):
176 def create_db(self):
176 return SQLiteDB(location=tempfile.gettempdir())
177 return SQLiteDB(location=tempfile.gettempdir())
177
178
178 def tearDown(self):
179 def tearDown(self):
179 self.db._db.close()
180 self.db._db.close()
@@ -1,678 +1,679 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import __builtin__
19 import __builtin__
20 import atexit
20 import atexit
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24 import logging
24 import logging
25 # System library imports.
25 # System library imports.
26 import zmq
26 import zmq
27
27
28 # Local imports.
28 # Local imports.
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.config.application import boolean_flag
30 from IPython.config.application import boolean_flag
31 from IPython.core.application import ProfileDir
31 from IPython.core.application import ProfileDir
32 from IPython.core.shellapp import (
32 from IPython.core.shellapp import (
33 InteractiveShellApp, shell_flags, shell_aliases
33 InteractiveShellApp, shell_flags, shell_aliases
34 )
34 )
35 from IPython.utils import io
35 from IPython.utils import io
36 from IPython.utils.jsonutil import json_clean
36 from IPython.utils.jsonutil import json_clean
37 from IPython.lib import pylabtools
37 from IPython.lib import pylabtools
38 from IPython.utils.traitlets import (
38 from IPython.utils.traitlets import (
39 List, Instance, Float, Dict, Bool, Int, Unicode, CaselessStrEnum
39 List, Instance, Float, Dict, Bool, Int, Unicode, CaselessStrEnum
40 )
40 )
41
41
42 from entry_point import base_launch_kernel
42 from entry_point import base_launch_kernel
43 from kernelapp import KernelApp, kernel_flags, kernel_aliases
43 from kernelapp import KernelApp, kernel_flags, kernel_aliases
44 from iostream import OutStream
44 from iostream import OutStream
45 from session import Session, Message
45 from session import Session, Message
46 from zmqshell import ZMQInteractiveShell
46 from zmqshell import ZMQInteractiveShell
47
47
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Main kernel class
51 # Main kernel class
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53
53
54 class Kernel(Configurable):
54 class Kernel(Configurable):
55
55
56 #---------------------------------------------------------------------------
56 #---------------------------------------------------------------------------
57 # Kernel interface
57 # Kernel interface
58 #---------------------------------------------------------------------------
58 #---------------------------------------------------------------------------
59
59
60 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
60 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
61 session = Instance(Session)
61 session = Instance(Session)
62 shell_socket = Instance('zmq.Socket')
62 shell_socket = Instance('zmq.Socket')
63 iopub_socket = Instance('zmq.Socket')
63 iopub_socket = Instance('zmq.Socket')
64 stdin_socket = Instance('zmq.Socket')
64 stdin_socket = Instance('zmq.Socket')
65 log = Instance(logging.Logger)
65 log = Instance(logging.Logger)
66
66
67 # Private interface
67 # Private interface
68
68
69 # Time to sleep after flushing the stdout/err buffers in each execute
69 # Time to sleep after flushing the stdout/err buffers in each execute
70 # cycle. While this introduces a hard limit on the minimal latency of the
70 # cycle. While this introduces a hard limit on the minimal latency of the
71 # execute cycle, it helps prevent output synchronization problems for
71 # execute cycle, it helps prevent output synchronization problems for
72 # clients.
72 # clients.
73 # Units are in seconds. The minimum zmq latency on local host is probably
73 # Units are in seconds. The minimum zmq latency on local host is probably
74 # ~150 microseconds, set this to 500us for now. We may need to increase it
74 # ~150 microseconds, set this to 500us for now. We may need to increase it
75 # a little if it's not enough after more interactive testing.
75 # a little if it's not enough after more interactive testing.
76 _execute_sleep = Float(0.0005, config=True)
76 _execute_sleep = Float(0.0005, config=True)
77
77
78 # Frequency of the kernel's event loop.
78 # Frequency of the kernel's event loop.
79 # Units are in seconds, kernel subclasses for GUI toolkits may need to
79 # Units are in seconds, kernel subclasses for GUI toolkits may need to
80 # adapt to milliseconds.
80 # adapt to milliseconds.
81 _poll_interval = Float(0.05, config=True)
81 _poll_interval = Float(0.05, config=True)
82
82
83 # If the shutdown was requested over the network, we leave here the
83 # If the shutdown was requested over the network, we leave here the
84 # necessary reply message so it can be sent by our registered atexit
84 # necessary reply message so it can be sent by our registered atexit
85 # handler. This ensures that the reply is only sent to clients truly at
85 # handler. This ensures that the reply is only sent to clients truly at
86 # the end of our shutdown process (which happens after the underlying
86 # the end of our shutdown process (which happens after the underlying
87 # IPython shell's own shutdown).
87 # IPython shell's own shutdown).
88 _shutdown_message = None
88 _shutdown_message = None
89
89
90 # This is a dict of port number that the kernel is listening on. It is set
90 # This is a dict of port number that the kernel is listening on. It is set
91 # by record_ports and used by connect_request.
91 # by record_ports and used by connect_request.
92 _recorded_ports = Dict()
92 _recorded_ports = Dict()
93
93
94
94
95
95
96 def __init__(self, **kwargs):
96 def __init__(self, **kwargs):
97 super(Kernel, self).__init__(**kwargs)
97 super(Kernel, self).__init__(**kwargs)
98
98
99 # Before we even start up the shell, register *first* our exit handlers
99 # Before we even start up the shell, register *first* our exit handlers
100 # so they come before the shell's
100 # so they come before the shell's
101 atexit.register(self._at_shutdown)
101 atexit.register(self._at_shutdown)
102
102
103 # Initialize the InteractiveShell subclass
103 # Initialize the InteractiveShell subclass
104 self.shell = ZMQInteractiveShell.instance(config=self.config)
104 self.shell = ZMQInteractiveShell.instance(config=self.config)
105 self.shell.displayhook.session = self.session
105 self.shell.displayhook.session = self.session
106 self.shell.displayhook.pub_socket = self.iopub_socket
106 self.shell.displayhook.pub_socket = self.iopub_socket
107 self.shell.display_pub.session = self.session
107 self.shell.display_pub.session = self.session
108 self.shell.display_pub.pub_socket = self.iopub_socket
108 self.shell.display_pub.pub_socket = self.iopub_socket
109
109
110 # TMP - hack while developing
110 # TMP - hack while developing
111 self.shell._reply_content = None
111 self.shell._reply_content = None
112
112
113 # Build dict of handlers for message types
113 # Build dict of handlers for message types
114 msg_types = [ 'execute_request', 'complete_request',
114 msg_types = [ 'execute_request', 'complete_request',
115 'object_info_request', 'history_request',
115 'object_info_request', 'history_request',
116 'connect_request', 'shutdown_request']
116 'connect_request', 'shutdown_request']
117 self.handlers = {}
117 self.handlers = {}
118 for msg_type in msg_types:
118 for msg_type in msg_types:
119 self.handlers[msg_type] = getattr(self, msg_type)
119 self.handlers[msg_type] = getattr(self, msg_type)
120
120
121 def do_one_iteration(self):
121 def do_one_iteration(self):
122 """Do one iteration of the kernel's evaluation loop.
122 """Do one iteration of the kernel's evaluation loop.
123 """
123 """
124 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
124 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
125 msg_type = msg['header']['msg_type']
125 if msg is None:
126 if msg is None:
126 return
127 return
127
128
128 # This assert will raise in versions of zeromq 2.0.7 and lesser.
129 # This assert will raise in versions of zeromq 2.0.7 and lesser.
129 # We now require 2.0.8 or above, so we can uncomment for safety.
130 # We now require 2.0.8 or above, so we can uncomment for safety.
130 # print(ident,msg, file=sys.__stdout__)
131 # print(ident,msg, file=sys.__stdout__)
131 assert ident is not None, "Missing message part."
132 assert ident is not None, "Missing message part."
132
133
133 # Print some info about this message and leave a '--->' marker, so it's
134 # Print some info about this message and leave a '--->' marker, so it's
134 # easier to trace visually the message chain when debugging. Each
135 # easier to trace visually the message chain when debugging. Each
135 # handler prints its message at the end.
136 # handler prints its message at the end.
136 self.log.debug('\n*** MESSAGE TYPE:'+str(msg['header']['msg_type'])+'***')
137 self.log.debug('\n*** MESSAGE TYPE:'+str(msg_type)+'***')
137 self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
138 self.log.debug(' Content: '+str(msg['content'])+'\n --->\n ')
138
139
139 # Find and call actual handler for message
140 # Find and call actual handler for message
140 handler = self.handlers.get(msg['header']['msg_type'], None)
141 handler = self.handlers.get(msg_type, None)
141 if handler is None:
142 if handler is None:
142 self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
143 self.log.error("UNKNOWN MESSAGE TYPE:" +str(msg))
143 else:
144 else:
144 handler(ident, msg)
145 handler(ident, msg)
145
146
146 # Check whether we should exit, in case the incoming message set the
147 # Check whether we should exit, in case the incoming message set the
147 # exit flag on
148 # exit flag on
148 if self.shell.exit_now:
149 if self.shell.exit_now:
149 self.log.debug('\nExiting IPython kernel...')
150 self.log.debug('\nExiting IPython kernel...')
150 # We do a normal, clean exit, which allows any actions registered
151 # We do a normal, clean exit, which allows any actions registered
151 # via atexit (such as history saving) to take place.
152 # via atexit (such as history saving) to take place.
152 sys.exit(0)
153 sys.exit(0)
153
154
154
155
155 def start(self):
156 def start(self):
156 """ Start the kernel main loop.
157 """ Start the kernel main loop.
157 """
158 """
158 poller = zmq.Poller()
159 poller = zmq.Poller()
159 poller.register(self.shell_socket, zmq.POLLIN)
160 poller.register(self.shell_socket, zmq.POLLIN)
160 while True:
161 while True:
161 try:
162 try:
162 # scale by extra factor of 10, because there is no
163 # scale by extra factor of 10, because there is no
163 # reason for this to be anything less than ~ 0.1s
164 # reason for this to be anything less than ~ 0.1s
164 # since it is a real poller and will respond
165 # since it is a real poller and will respond
165 # to events immediately
166 # to events immediately
166 poller.poll(10*1000*self._poll_interval)
167 poller.poll(10*1000*self._poll_interval)
167 self.do_one_iteration()
168 self.do_one_iteration()
168 except KeyboardInterrupt:
169 except KeyboardInterrupt:
169 # Ctrl-C shouldn't crash the kernel
170 # Ctrl-C shouldn't crash the kernel
170 io.raw_print("KeyboardInterrupt caught in kernel")
171 io.raw_print("KeyboardInterrupt caught in kernel")
171
172
172 def record_ports(self, ports):
173 def record_ports(self, ports):
173 """Record the ports that this kernel is using.
174 """Record the ports that this kernel is using.
174
175
175 The creator of the Kernel instance must call this methods if they
176 The creator of the Kernel instance must call this methods if they
176 want the :meth:`connect_request` method to return the port numbers.
177 want the :meth:`connect_request` method to return the port numbers.
177 """
178 """
178 self._recorded_ports = ports
179 self._recorded_ports = ports
179
180
180 #---------------------------------------------------------------------------
181 #---------------------------------------------------------------------------
181 # Kernel request handlers
182 # Kernel request handlers
182 #---------------------------------------------------------------------------
183 #---------------------------------------------------------------------------
183
184
184 def _publish_pyin(self, code, parent):
185 def _publish_pyin(self, code, parent):
185 """Publish the code request on the pyin stream."""
186 """Publish the code request on the pyin stream."""
186
187
187 pyin_msg = self.session.send(self.iopub_socket, u'pyin',{u'code':code}, parent=parent)
188 pyin_msg = self.session.send(self.iopub_socket, u'pyin',{u'code':code}, parent=parent)
188
189
189 def execute_request(self, ident, parent):
190 def execute_request(self, ident, parent):
190
191
191 status_msg = self.session.send(self.iopub_socket,
192 status_msg = self.session.send(self.iopub_socket,
192 u'status',
193 u'status',
193 {u'execution_state':u'busy'},
194 {u'execution_state':u'busy'},
194 parent=parent
195 parent=parent
195 )
196 )
196
197
197 try:
198 try:
198 content = parent[u'content']
199 content = parent[u'content']
199 code = content[u'code']
200 code = content[u'code']
200 silent = content[u'silent']
201 silent = content[u'silent']
201 except:
202 except:
202 self.log.error("Got bad msg: ")
203 self.log.error("Got bad msg: ")
203 self.log.error(str(Message(parent)))
204 self.log.error(str(Message(parent)))
204 return
205 return
205
206
206 shell = self.shell # we'll need this a lot here
207 shell = self.shell # we'll need this a lot here
207
208
208 # Replace raw_input. Note that is not sufficient to replace
209 # Replace raw_input. Note that is not sufficient to replace
209 # raw_input in the user namespace.
210 # raw_input in the user namespace.
210 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
211 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
211 __builtin__.raw_input = raw_input
212 __builtin__.raw_input = raw_input
212
213
213 # Set the parent message of the display hook and out streams.
214 # Set the parent message of the display hook and out streams.
214 shell.displayhook.set_parent(parent)
215 shell.displayhook.set_parent(parent)
215 shell.display_pub.set_parent(parent)
216 shell.display_pub.set_parent(parent)
216 sys.stdout.set_parent(parent)
217 sys.stdout.set_parent(parent)
217 sys.stderr.set_parent(parent)
218 sys.stderr.set_parent(parent)
218
219
219 # Re-broadcast our input for the benefit of listening clients, and
220 # Re-broadcast our input for the benefit of listening clients, and
220 # start computing output
221 # start computing output
221 if not silent:
222 if not silent:
222 self._publish_pyin(code, parent)
223 self._publish_pyin(code, parent)
223
224
224 reply_content = {}
225 reply_content = {}
225 try:
226 try:
226 if silent:
227 if silent:
227 # run_code uses 'exec' mode, so no displayhook will fire, and it
228 # run_code uses 'exec' mode, so no displayhook will fire, and it
228 # doesn't call logging or history manipulations. Print
229 # doesn't call logging or history manipulations. Print
229 # statements in that code will obviously still execute.
230 # statements in that code will obviously still execute.
230 shell.run_code(code)
231 shell.run_code(code)
231 else:
232 else:
232 # FIXME: the shell calls the exception handler itself.
233 # FIXME: the shell calls the exception handler itself.
233 shell.run_cell(code)
234 shell.run_cell(code)
234 except:
235 except:
235 status = u'error'
236 status = u'error'
236 # FIXME: this code right now isn't being used yet by default,
237 # FIXME: this code right now isn't being used yet by default,
237 # because the run_cell() call above directly fires off exception
238 # because the run_cell() call above directly fires off exception
238 # reporting. This code, therefore, is only active in the scenario
239 # reporting. This code, therefore, is only active in the scenario
239 # where runlines itself has an unhandled exception. We need to
240 # where runlines itself has an unhandled exception. We need to
240 # uniformize this, for all exception construction to come from a
241 # uniformize this, for all exception construction to come from a
241 # single location in the codbase.
242 # single location in the codbase.
242 etype, evalue, tb = sys.exc_info()
243 etype, evalue, tb = sys.exc_info()
243 tb_list = traceback.format_exception(etype, evalue, tb)
244 tb_list = traceback.format_exception(etype, evalue, tb)
244 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
245 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
245 else:
246 else:
246 status = u'ok'
247 status = u'ok'
247
248
248 reply_content[u'status'] = status
249 reply_content[u'status'] = status
249
250
250 # Return the execution counter so clients can display prompts
251 # Return the execution counter so clients can display prompts
251 reply_content['execution_count'] = shell.execution_count -1
252 reply_content['execution_count'] = shell.execution_count -1
252
253
253 # FIXME - fish exception info out of shell, possibly left there by
254 # FIXME - fish exception info out of shell, possibly left there by
254 # runlines. We'll need to clean up this logic later.
255 # runlines. We'll need to clean up this logic later.
255 if shell._reply_content is not None:
256 if shell._reply_content is not None:
256 reply_content.update(shell._reply_content)
257 reply_content.update(shell._reply_content)
257 # reset after use
258 # reset after use
258 shell._reply_content = None
259 shell._reply_content = None
259
260
260 # At this point, we can tell whether the main code execution succeeded
261 # At this point, we can tell whether the main code execution succeeded
261 # or not. If it did, we proceed to evaluate user_variables/expressions
262 # or not. If it did, we proceed to evaluate user_variables/expressions
262 if reply_content['status'] == 'ok':
263 if reply_content['status'] == 'ok':
263 reply_content[u'user_variables'] = \
264 reply_content[u'user_variables'] = \
264 shell.user_variables(content[u'user_variables'])
265 shell.user_variables(content[u'user_variables'])
265 reply_content[u'user_expressions'] = \
266 reply_content[u'user_expressions'] = \
266 shell.user_expressions(content[u'user_expressions'])
267 shell.user_expressions(content[u'user_expressions'])
267 else:
268 else:
268 # If there was an error, don't even try to compute variables or
269 # If there was an error, don't even try to compute variables or
269 # expressions
270 # expressions
270 reply_content[u'user_variables'] = {}
271 reply_content[u'user_variables'] = {}
271 reply_content[u'user_expressions'] = {}
272 reply_content[u'user_expressions'] = {}
272
273
273 # Payloads should be retrieved regardless of outcome, so we can both
274 # Payloads should be retrieved regardless of outcome, so we can both
274 # recover partial output (that could have been generated early in a
275 # recover partial output (that could have been generated early in a
275 # block, before an error) and clear the payload system always.
276 # block, before an error) and clear the payload system always.
276 reply_content[u'payload'] = shell.payload_manager.read_payload()
277 reply_content[u'payload'] = shell.payload_manager.read_payload()
277 # Be agressive about clearing the payload because we don't want
278 # Be agressive about clearing the payload because we don't want
278 # it to sit in memory until the next execute_request comes in.
279 # it to sit in memory until the next execute_request comes in.
279 shell.payload_manager.clear_payload()
280 shell.payload_manager.clear_payload()
280
281
281 # Flush output before sending the reply.
282 # Flush output before sending the reply.
282 sys.stdout.flush()
283 sys.stdout.flush()
283 sys.stderr.flush()
284 sys.stderr.flush()
284 # FIXME: on rare occasions, the flush doesn't seem to make it to the
285 # FIXME: on rare occasions, the flush doesn't seem to make it to the
285 # clients... This seems to mitigate the problem, but we definitely need
286 # clients... This seems to mitigate the problem, but we definitely need
286 # to better understand what's going on.
287 # to better understand what's going on.
287 if self._execute_sleep:
288 if self._execute_sleep:
288 time.sleep(self._execute_sleep)
289 time.sleep(self._execute_sleep)
289
290
290 # Send the reply.
291 # Send the reply.
291 reply_msg = self.session.send(self.shell_socket, u'execute_reply',
292 reply_msg = self.session.send(self.shell_socket, u'execute_reply',
292 reply_content, parent, ident=ident)
293 reply_content, parent, ident=ident)
293 self.log.debug(str(reply_msg))
294 self.log.debug(str(reply_msg))
294
295
295 if reply_msg['content']['status'] == u'error':
296 if reply_msg['content']['status'] == u'error':
296 self._abort_queue()
297 self._abort_queue()
297
298
298 status_msg = self.session.send(self.iopub_socket,
299 status_msg = self.session.send(self.iopub_socket,
299 u'status',
300 u'status',
300 {u'execution_state':u'idle'},
301 {u'execution_state':u'idle'},
301 parent=parent
302 parent=parent
302 )
303 )
303
304
304 def complete_request(self, ident, parent):
305 def complete_request(self, ident, parent):
305 txt, matches = self._complete(parent)
306 txt, matches = self._complete(parent)
306 matches = {'matches' : matches,
307 matches = {'matches' : matches,
307 'matched_text' : txt,
308 'matched_text' : txt,
308 'status' : 'ok'}
309 'status' : 'ok'}
309 completion_msg = self.session.send(self.shell_socket, 'complete_reply',
310 completion_msg = self.session.send(self.shell_socket, 'complete_reply',
310 matches, parent, ident)
311 matches, parent, ident)
311 self.log.debug(str(completion_msg))
312 self.log.debug(str(completion_msg))
312
313
313 def object_info_request(self, ident, parent):
314 def object_info_request(self, ident, parent):
314 object_info = self.shell.object_inspect(parent['content']['oname'])
315 object_info = self.shell.object_inspect(parent['content']['oname'])
315 # Before we send this object over, we scrub it for JSON usage
316 # Before we send this object over, we scrub it for JSON usage
316 oinfo = json_clean(object_info)
317 oinfo = json_clean(object_info)
317 msg = self.session.send(self.shell_socket, 'object_info_reply',
318 msg = self.session.send(self.shell_socket, 'object_info_reply',
318 oinfo, parent, ident)
319 oinfo, parent, ident)
319 self.log.debug(msg)
320 self.log.debug(msg)
320
321
321 def history_request(self, ident, parent):
322 def history_request(self, ident, parent):
322 # We need to pull these out, as passing **kwargs doesn't work with
323 # We need to pull these out, as passing **kwargs doesn't work with
323 # unicode keys before Python 2.6.5.
324 # unicode keys before Python 2.6.5.
324 hist_access_type = parent['content']['hist_access_type']
325 hist_access_type = parent['content']['hist_access_type']
325 raw = parent['content']['raw']
326 raw = parent['content']['raw']
326 output = parent['content']['output']
327 output = parent['content']['output']
327 if hist_access_type == 'tail':
328 if hist_access_type == 'tail':
328 n = parent['content']['n']
329 n = parent['content']['n']
329 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
330 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
330 include_latest=True)
331 include_latest=True)
331
332
332 elif hist_access_type == 'range':
333 elif hist_access_type == 'range':
333 session = parent['content']['session']
334 session = parent['content']['session']
334 start = parent['content']['start']
335 start = parent['content']['start']
335 stop = parent['content']['stop']
336 stop = parent['content']['stop']
336 hist = self.shell.history_manager.get_range(session, start, stop,
337 hist = self.shell.history_manager.get_range(session, start, stop,
337 raw=raw, output=output)
338 raw=raw, output=output)
338
339
339 elif hist_access_type == 'search':
340 elif hist_access_type == 'search':
340 pattern = parent['content']['pattern']
341 pattern = parent['content']['pattern']
341 hist = self.shell.history_manager.search(pattern, raw=raw, output=output)
342 hist = self.shell.history_manager.search(pattern, raw=raw, output=output)
342
343
343 else:
344 else:
344 hist = []
345 hist = []
345 content = {'history' : list(hist)}
346 content = {'history' : list(hist)}
346 msg = self.session.send(self.shell_socket, 'history_reply',
347 msg = self.session.send(self.shell_socket, 'history_reply',
347 content, parent, ident)
348 content, parent, ident)
348 self.log.debug(str(msg))
349 self.log.debug(str(msg))
349
350
350 def connect_request(self, ident, parent):
351 def connect_request(self, ident, parent):
351 if self._recorded_ports is not None:
352 if self._recorded_ports is not None:
352 content = self._recorded_ports.copy()
353 content = self._recorded_ports.copy()
353 else:
354 else:
354 content = {}
355 content = {}
355 msg = self.session.send(self.shell_socket, 'connect_reply',
356 msg = self.session.send(self.shell_socket, 'connect_reply',
356 content, parent, ident)
357 content, parent, ident)
357 self.log.debug(msg)
358 self.log.debug(msg)
358
359
359 def shutdown_request(self, ident, parent):
360 def shutdown_request(self, ident, parent):
360 self.shell.exit_now = True
361 self.shell.exit_now = True
361 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
362 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
362 sys.exit(0)
363 sys.exit(0)
363
364
364 #---------------------------------------------------------------------------
365 #---------------------------------------------------------------------------
365 # Protected interface
366 # Protected interface
366 #---------------------------------------------------------------------------
367 #---------------------------------------------------------------------------
367
368
368 def _abort_queue(self):
369 def _abort_queue(self):
369 while True:
370 while True:
370 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
371 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
371 if msg is None:
372 if msg is None:
372 break
373 break
373 else:
374 else:
374 assert ident is not None, \
375 assert ident is not None, \
375 "Unexpected missing message part."
376 "Unexpected missing message part."
376
377
377 self.log.debug("Aborting:\n"+str(Message(msg)))
378 self.log.debug("Aborting:\n"+str(Message(msg)))
378 msg_type = msg['header']['msg_type']
379 msg_type = msg['header']['msg_type']
379 reply_type = msg_type.split('_')[0] + '_reply'
380 reply_type = msg_type.split('_')[0] + '_reply'
380 reply_msg = self.session.send(self.shell_socket, reply_type,
381 reply_msg = self.session.send(self.shell_socket, reply_type,
381 {'status' : 'aborted'}, msg, ident=ident)
382 {'status' : 'aborted'}, msg, ident=ident)
382 self.log.debug(reply_msg)
383 self.log.debug(reply_msg)
383 # We need to wait a bit for requests to come in. This can probably
384 # We need to wait a bit for requests to come in. This can probably
384 # be set shorter for true asynchronous clients.
385 # be set shorter for true asynchronous clients.
385 time.sleep(0.1)
386 time.sleep(0.1)
386
387
387 def _raw_input(self, prompt, ident, parent):
388 def _raw_input(self, prompt, ident, parent):
388 # Flush output before making the request.
389 # Flush output before making the request.
389 sys.stderr.flush()
390 sys.stderr.flush()
390 sys.stdout.flush()
391 sys.stdout.flush()
391
392
392 # Send the input request.
393 # Send the input request.
393 content = dict(prompt=prompt)
394 content = dict(prompt=prompt)
394 msg = self.session.send(self.stdin_socket, u'input_request', content, parent)
395 msg = self.session.send(self.stdin_socket, u'input_request', content, parent)
395
396
396 # Await a response.
397 # Await a response.
397 ident, reply = self.session.recv(self.stdin_socket, 0)
398 ident, reply = self.session.recv(self.stdin_socket, 0)
398 try:
399 try:
399 value = reply['content']['value']
400 value = reply['content']['value']
400 except:
401 except:
401 self.log.error("Got bad raw_input reply: ")
402 self.log.error("Got bad raw_input reply: ")
402 self.log.error(str(Message(parent)))
403 self.log.error(str(Message(parent)))
403 value = ''
404 value = ''
404 return value
405 return value
405
406
406 def _complete(self, msg):
407 def _complete(self, msg):
407 c = msg['content']
408 c = msg['content']
408 try:
409 try:
409 cpos = int(c['cursor_pos'])
410 cpos = int(c['cursor_pos'])
410 except:
411 except:
411 # If we don't get something that we can convert to an integer, at
412 # If we don't get something that we can convert to an integer, at
412 # least attempt the completion guessing the cursor is at the end of
413 # least attempt the completion guessing the cursor is at the end of
413 # the text, if there's any, and otherwise of the line
414 # the text, if there's any, and otherwise of the line
414 cpos = len(c['text'])
415 cpos = len(c['text'])
415 if cpos==0:
416 if cpos==0:
416 cpos = len(c['line'])
417 cpos = len(c['line'])
417 return self.shell.complete(c['text'], c['line'], cpos)
418 return self.shell.complete(c['text'], c['line'], cpos)
418
419
419 def _object_info(self, context):
420 def _object_info(self, context):
420 symbol, leftover = self._symbol_from_context(context)
421 symbol, leftover = self._symbol_from_context(context)
421 if symbol is not None and not leftover:
422 if symbol is not None and not leftover:
422 doc = getattr(symbol, '__doc__', '')
423 doc = getattr(symbol, '__doc__', '')
423 else:
424 else:
424 doc = ''
425 doc = ''
425 object_info = dict(docstring = doc)
426 object_info = dict(docstring = doc)
426 return object_info
427 return object_info
427
428
428 def _symbol_from_context(self, context):
429 def _symbol_from_context(self, context):
429 if not context:
430 if not context:
430 return None, context
431 return None, context
431
432
432 base_symbol_string = context[0]
433 base_symbol_string = context[0]
433 symbol = self.shell.user_ns.get(base_symbol_string, None)
434 symbol = self.shell.user_ns.get(base_symbol_string, None)
434 if symbol is None:
435 if symbol is None:
435 symbol = __builtin__.__dict__.get(base_symbol_string, None)
436 symbol = __builtin__.__dict__.get(base_symbol_string, None)
436 if symbol is None:
437 if symbol is None:
437 return None, context
438 return None, context
438
439
439 context = context[1:]
440 context = context[1:]
440 for i, name in enumerate(context):
441 for i, name in enumerate(context):
441 new_symbol = getattr(symbol, name, None)
442 new_symbol = getattr(symbol, name, None)
442 if new_symbol is None:
443 if new_symbol is None:
443 return symbol, context[i:]
444 return symbol, context[i:]
444 else:
445 else:
445 symbol = new_symbol
446 symbol = new_symbol
446
447
447 return symbol, []
448 return symbol, []
448
449
449 def _at_shutdown(self):
450 def _at_shutdown(self):
450 """Actions taken at shutdown by the kernel, called by python's atexit.
451 """Actions taken at shutdown by the kernel, called by python's atexit.
451 """
452 """
452 # io.rprint("Kernel at_shutdown") # dbg
453 # io.rprint("Kernel at_shutdown") # dbg
453 if self._shutdown_message is not None:
454 if self._shutdown_message is not None:
454 self.session.send(self.shell_socket, self._shutdown_message)
455 self.session.send(self.shell_socket, self._shutdown_message)
455 self.session.send(self.iopub_socket, self._shutdown_message)
456 self.session.send(self.iopub_socket, self._shutdown_message)
456 self.log.debug(str(self._shutdown_message))
457 self.log.debug(str(self._shutdown_message))
457 # A very short sleep to give zmq time to flush its message buffers
458 # A very short sleep to give zmq time to flush its message buffers
458 # before Python truly shuts down.
459 # before Python truly shuts down.
459 time.sleep(0.01)
460 time.sleep(0.01)
460
461
461
462
462 class QtKernel(Kernel):
463 class QtKernel(Kernel):
463 """A Kernel subclass with Qt support."""
464 """A Kernel subclass with Qt support."""
464
465
465 def start(self):
466 def start(self):
466 """Start a kernel with QtPy4 event loop integration."""
467 """Start a kernel with QtPy4 event loop integration."""
467
468
468 from IPython.external.qt_for_kernel import QtCore
469 from IPython.external.qt_for_kernel import QtCore
469 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
470 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
470
471
471 self.app = get_app_qt4([" "])
472 self.app = get_app_qt4([" "])
472 self.app.setQuitOnLastWindowClosed(False)
473 self.app.setQuitOnLastWindowClosed(False)
473 self.timer = QtCore.QTimer()
474 self.timer = QtCore.QTimer()
474 self.timer.timeout.connect(self.do_one_iteration)
475 self.timer.timeout.connect(self.do_one_iteration)
475 # Units for the timer are in milliseconds
476 # Units for the timer are in milliseconds
476 self.timer.start(1000*self._poll_interval)
477 self.timer.start(1000*self._poll_interval)
477 start_event_loop_qt4(self.app)
478 start_event_loop_qt4(self.app)
478
479
479
480
480 class WxKernel(Kernel):
481 class WxKernel(Kernel):
481 """A Kernel subclass with Wx support."""
482 """A Kernel subclass with Wx support."""
482
483
483 def start(self):
484 def start(self):
484 """Start a kernel with wx event loop support."""
485 """Start a kernel with wx event loop support."""
485
486
486 import wx
487 import wx
487 from IPython.lib.guisupport import start_event_loop_wx
488 from IPython.lib.guisupport import start_event_loop_wx
488
489
489 doi = self.do_one_iteration
490 doi = self.do_one_iteration
490 # Wx uses milliseconds
491 # Wx uses milliseconds
491 poll_interval = int(1000*self._poll_interval)
492 poll_interval = int(1000*self._poll_interval)
492
493
493 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
494 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
494 # We make the Frame hidden when we create it in the main app below.
495 # We make the Frame hidden when we create it in the main app below.
495 class TimerFrame(wx.Frame):
496 class TimerFrame(wx.Frame):
496 def __init__(self, func):
497 def __init__(self, func):
497 wx.Frame.__init__(self, None, -1)
498 wx.Frame.__init__(self, None, -1)
498 self.timer = wx.Timer(self)
499 self.timer = wx.Timer(self)
499 # Units for the timer are in milliseconds
500 # Units for the timer are in milliseconds
500 self.timer.Start(poll_interval)
501 self.timer.Start(poll_interval)
501 self.Bind(wx.EVT_TIMER, self.on_timer)
502 self.Bind(wx.EVT_TIMER, self.on_timer)
502 self.func = func
503 self.func = func
503
504
504 def on_timer(self, event):
505 def on_timer(self, event):
505 self.func()
506 self.func()
506
507
507 # We need a custom wx.App to create our Frame subclass that has the
508 # We need a custom wx.App to create our Frame subclass that has the
508 # wx.Timer to drive the ZMQ event loop.
509 # wx.Timer to drive the ZMQ event loop.
509 class IPWxApp(wx.App):
510 class IPWxApp(wx.App):
510 def OnInit(self):
511 def OnInit(self):
511 self.frame = TimerFrame(doi)
512 self.frame = TimerFrame(doi)
512 self.frame.Show(False)
513 self.frame.Show(False)
513 return True
514 return True
514
515
515 # The redirect=False here makes sure that wx doesn't replace
516 # The redirect=False here makes sure that wx doesn't replace
516 # sys.stdout/stderr with its own classes.
517 # sys.stdout/stderr with its own classes.
517 self.app = IPWxApp(redirect=False)
518 self.app = IPWxApp(redirect=False)
518 start_event_loop_wx(self.app)
519 start_event_loop_wx(self.app)
519
520
520
521
521 class TkKernel(Kernel):
522 class TkKernel(Kernel):
522 """A Kernel subclass with Tk support."""
523 """A Kernel subclass with Tk support."""
523
524
524 def start(self):
525 def start(self):
525 """Start a Tk enabled event loop."""
526 """Start a Tk enabled event loop."""
526
527
527 import Tkinter
528 import Tkinter
528 doi = self.do_one_iteration
529 doi = self.do_one_iteration
529 # Tk uses milliseconds
530 # Tk uses milliseconds
530 poll_interval = int(1000*self._poll_interval)
531 poll_interval = int(1000*self._poll_interval)
531 # For Tkinter, we create a Tk object and call its withdraw method.
532 # For Tkinter, we create a Tk object and call its withdraw method.
532 class Timer(object):
533 class Timer(object):
533 def __init__(self, func):
534 def __init__(self, func):
534 self.app = Tkinter.Tk()
535 self.app = Tkinter.Tk()
535 self.app.withdraw()
536 self.app.withdraw()
536 self.func = func
537 self.func = func
537
538
538 def on_timer(self):
539 def on_timer(self):
539 self.func()
540 self.func()
540 self.app.after(poll_interval, self.on_timer)
541 self.app.after(poll_interval, self.on_timer)
541
542
542 def start(self):
543 def start(self):
543 self.on_timer() # Call it once to get things going.
544 self.on_timer() # Call it once to get things going.
544 self.app.mainloop()
545 self.app.mainloop()
545
546
546 self.timer = Timer(doi)
547 self.timer = Timer(doi)
547 self.timer.start()
548 self.timer.start()
548
549
549
550
550 class GTKKernel(Kernel):
551 class GTKKernel(Kernel):
551 """A Kernel subclass with GTK support."""
552 """A Kernel subclass with GTK support."""
552
553
553 def start(self):
554 def start(self):
554 """Start the kernel, coordinating with the GTK event loop"""
555 """Start the kernel, coordinating with the GTK event loop"""
555 from .gui.gtkembed import GTKEmbed
556 from .gui.gtkembed import GTKEmbed
556
557
557 gtk_kernel = GTKEmbed(self)
558 gtk_kernel = GTKEmbed(self)
558 gtk_kernel.start()
559 gtk_kernel.start()
559
560
560
561
561 #-----------------------------------------------------------------------------
562 #-----------------------------------------------------------------------------
562 # Aliases and Flags for the IPKernelApp
563 # Aliases and Flags for the IPKernelApp
563 #-----------------------------------------------------------------------------
564 #-----------------------------------------------------------------------------
564
565
565 flags = dict(kernel_flags)
566 flags = dict(kernel_flags)
566 flags.update(shell_flags)
567 flags.update(shell_flags)
567
568
568 addflag = lambda *args: flags.update(boolean_flag(*args))
569 addflag = lambda *args: flags.update(boolean_flag(*args))
569
570
570 flags['pylab'] = (
571 flags['pylab'] = (
571 {'IPKernelApp' : {'pylab' : 'auto'}},
572 {'IPKernelApp' : {'pylab' : 'auto'}},
572 """Pre-load matplotlib and numpy for interactive use with
573 """Pre-load matplotlib and numpy for interactive use with
573 the default matplotlib backend."""
574 the default matplotlib backend."""
574 )
575 )
575
576
576 aliases = dict(kernel_aliases)
577 aliases = dict(kernel_aliases)
577 aliases.update(shell_aliases)
578 aliases.update(shell_aliases)
578
579
579 # it's possible we don't want short aliases for *all* of these:
580 # it's possible we don't want short aliases for *all* of these:
580 aliases.update(dict(
581 aliases.update(dict(
581 pylab='IPKernelApp.pylab',
582 pylab='IPKernelApp.pylab',
582 ))
583 ))
583
584
584 #-----------------------------------------------------------------------------
585 #-----------------------------------------------------------------------------
585 # The IPKernelApp class
586 # The IPKernelApp class
586 #-----------------------------------------------------------------------------
587 #-----------------------------------------------------------------------------
587
588
588 class IPKernelApp(KernelApp, InteractiveShellApp):
589 class IPKernelApp(KernelApp, InteractiveShellApp):
589 name = 'ipkernel'
590 name = 'ipkernel'
590
591
591 aliases = Dict(aliases)
592 aliases = Dict(aliases)
592 flags = Dict(flags)
593 flags = Dict(flags)
593 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
594 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
594 # configurables
595 # configurables
595 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
596 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
596 config=True,
597 config=True,
597 help="""Pre-load matplotlib and numpy for interactive use,
598 help="""Pre-load matplotlib and numpy for interactive use,
598 selecting a particular matplotlib backend and loop integration.
599 selecting a particular matplotlib backend and loop integration.
599 """
600 """
600 )
601 )
601 pylab_import_all = Bool(True, config=True,
602 pylab_import_all = Bool(True, config=True,
602 help="""If true, an 'import *' is done from numpy and pylab,
603 help="""If true, an 'import *' is done from numpy and pylab,
603 when using pylab"""
604 when using pylab"""
604 )
605 )
605 def initialize(self, argv=None):
606 def initialize(self, argv=None):
606 super(IPKernelApp, self).initialize(argv)
607 super(IPKernelApp, self).initialize(argv)
607 self.init_shell()
608 self.init_shell()
608 self.init_extensions()
609 self.init_extensions()
609 self.init_code()
610 self.init_code()
610
611
611 def init_kernel(self):
612 def init_kernel(self):
612 kernel_factory = Kernel
613 kernel_factory = Kernel
613
614
614 kernel_map = {
615 kernel_map = {
615 'qt' : QtKernel,
616 'qt' : QtKernel,
616 'qt4': QtKernel,
617 'qt4': QtKernel,
617 'inline': Kernel,
618 'inline': Kernel,
618 'osx': TkKernel,
619 'osx': TkKernel,
619 'wx' : WxKernel,
620 'wx' : WxKernel,
620 'tk' : TkKernel,
621 'tk' : TkKernel,
621 'gtk': GTKKernel,
622 'gtk': GTKKernel,
622 }
623 }
623
624
624 if self.pylab:
625 if self.pylab:
625 key = None if self.pylab == 'auto' else self.pylab
626 key = None if self.pylab == 'auto' else self.pylab
626 gui, backend = pylabtools.find_gui_and_backend(key)
627 gui, backend = pylabtools.find_gui_and_backend(key)
627 kernel_factory = kernel_map.get(gui)
628 kernel_factory = kernel_map.get(gui)
628 if kernel_factory is None:
629 if kernel_factory is None:
629 raise ValueError('GUI is not supported: %r' % gui)
630 raise ValueError('GUI is not supported: %r' % gui)
630 pylabtools.activate_matplotlib(backend)
631 pylabtools.activate_matplotlib(backend)
631
632
632 kernel = kernel_factory(config=self.config, session=self.session,
633 kernel = kernel_factory(config=self.config, session=self.session,
633 shell_socket=self.shell_socket,
634 shell_socket=self.shell_socket,
634 iopub_socket=self.iopub_socket,
635 iopub_socket=self.iopub_socket,
635 stdin_socket=self.stdin_socket,
636 stdin_socket=self.stdin_socket,
636 log=self.log
637 log=self.log
637 )
638 )
638 self.kernel = kernel
639 self.kernel = kernel
639 kernel.record_ports(self.ports)
640 kernel.record_ports(self.ports)
640
641
641 if self.pylab:
642 if self.pylab:
642 import_all = self.pylab_import_all
643 import_all = self.pylab_import_all
643 pylabtools.import_pylab(kernel.shell.user_ns, backend, import_all,
644 pylabtools.import_pylab(kernel.shell.user_ns, backend, import_all,
644 shell=kernel.shell)
645 shell=kernel.shell)
645
646
646 def init_shell(self):
647 def init_shell(self):
647 self.shell = self.kernel.shell
648 self.shell = self.kernel.shell
648
649
649
650
650 #-----------------------------------------------------------------------------
651 #-----------------------------------------------------------------------------
651 # Kernel main and launch functions
652 # Kernel main and launch functions
652 #-----------------------------------------------------------------------------
653 #-----------------------------------------------------------------------------
653
654
654 def launch_kernel(*args, **kwargs):
655 def launch_kernel(*args, **kwargs):
655 """Launches a localhost IPython kernel, binding to the specified ports.
656 """Launches a localhost IPython kernel, binding to the specified ports.
656
657
657 This function simply calls entry_point.base_launch_kernel with the right first
658 This function simply calls entry_point.base_launch_kernel with the right first
658 command to start an ipkernel. See base_launch_kernel for arguments.
659 command to start an ipkernel. See base_launch_kernel for arguments.
659
660
660 Returns
661 Returns
661 -------
662 -------
662 A tuple of form:
663 A tuple of form:
663 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
664 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
664 where kernel_process is a Popen object and the ports are integers.
665 where kernel_process is a Popen object and the ports are integers.
665 """
666 """
666 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
667 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
667 *args, **kwargs)
668 *args, **kwargs)
668
669
669
670
670 def main():
671 def main():
671 """Run an IPKernel as an application"""
672 """Run an IPKernel as an application"""
672 app = IPKernelApp.instance()
673 app = IPKernelApp.instance()
673 app.initialize()
674 app.initialize()
674 app.start()
675 app.start()
675
676
676
677
677 if __name__ == '__main__':
678 if __name__ == '__main__':
678 main()
679 main()
General Comments 0
You need to be logged in to leave comments. Login now