##// END OF EJS Templates
use logging instead of `print >>` in pykernel
MinRK -
Show More
@@ -1,213 +1,214 b''
1 1 #!/usr/bin/env python
2 2 """An Application for launching a kernel
3 3
4 4 Authors
5 5 -------
6 6 * MinRK
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING.txt, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 # Standard library imports.
20 20 import os
21 21 import sys
22 22
23 23 # System library imports.
24 24 import zmq
25 25
26 26 # IPython imports.
27 27 from IPython.core.ultratb import FormattedTB
28 28 from IPython.core.newapplication import (
29 29 BaseIPythonApplication, base_flags, base_aliases
30 30 )
31 31 from IPython.utils import io
32 32 from IPython.utils.localinterfaces import LOCALHOST
33 33 from IPython.utils.traitlets import Any, Instance, Dict, Unicode, Int, Bool
34 34 from IPython.utils.importstring import import_item
35 35 # local imports
36 36 from IPython.zmq.heartbeat import Heartbeat
37 37 from IPython.zmq.parentpoller import ParentPollerUnix, ParentPollerWindows
38 38 from IPython.zmq.session import Session
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Flags and Aliases
43 43 #-----------------------------------------------------------------------------
44 44
45 45 kernel_aliases = dict(base_aliases)
46 46 kernel_aliases.update({
47 47 'ip' : 'KernelApp.ip',
48 48 'hb' : 'KernelApp.hb_port',
49 49 'shell' : 'KernelApp.shell_port',
50 50 'iopub' : 'KernelApp.iopub_port',
51 51 'stdin' : 'KernelApp.stdin_port',
52 52 'parent': 'KernelApp.parent',
53 53 })
54 54 if sys.platform.startswith('win'):
55 55 kernel_aliases['interrupt'] = 'KernelApp.interrupt'
56 56
57 57 kernel_flags = dict(base_flags)
58 58 kernel_flags.update({
59 59 'no-stdout' : (
60 60 {'KernelApp' : {'no_stdout' : True}},
61 61 "redirect stdout to the null device"),
62 62 'no-stderr' : (
63 63 {'KernelApp' : {'no_stderr' : True}},
64 64 "redirect stderr to the null device"),
65 65 })
66 66
67 67
68 68 #-----------------------------------------------------------------------------
69 69 # Application class for starting a Kernel
70 70 #-----------------------------------------------------------------------------
71 71
72 72 class KernelApp(BaseIPythonApplication):
73 73 name='pykernel'
74 74 aliases = Dict(kernel_aliases)
75 75 flags = Dict(kernel_flags)
76 76
77 77 # the kernel class, as an importstring
78 78 kernel_class = Unicode('IPython.zmq.pykernel.Kernel')
79 79 kernel = Any()
80 80 poller = Any() # don't restrict this even though current pollers are all Threads
81 81 heartbeat = Instance(Heartbeat)
82 82 session = Instance('IPython.zmq.session.Session')
83 83 ports = Dict()
84 84
85 85 # connection info:
86 86 ip = Unicode(LOCALHOST, config=True,
87 87 help="Set the IP or interface on which the kernel will listen.")
88 88 hb_port = Int(0, config=True, help="set the heartbeat port [default: random]")
89 89 shell_port = Int(0, config=True, help="set the shell (XREP) port [default: random]")
90 90 iopub_port = Int(0, config=True, help="set the iopub (PUB) port [default: random]")
91 91 stdin_port = Int(0, config=True, help="set the stdin (XREQ) port [default: random]")
92 92
93 93 # streams, etc.
94 94 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
95 95 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
96 96 outstream_class = Unicode('IPython.zmq.iostream.OutStream', config=True,
97 97 help="The importstring for the OutStream factory")
98 98 displayhook_class = Unicode('IPython.zmq.displayhook.DisplayHook', config=True,
99 99 help="The importstring for the DisplayHook factory")
100 100
101 101 # polling
102 102 parent = Int(0, config=True,
103 103 help="""kill this process if its parent dies. On Windows, the argument
104 104 specifies the HANDLE of the parent process, otherwise it is simply boolean.
105 105 """)
106 106 interrupt = Int(0, config=True,
107 107 help="""ONLY USED ON WINDOWS
108 108 Interrupt this process when the parent is signalled.
109 109 """)
110 110
111 111 def init_crash_handler(self):
112 112 # Install minimal exception handling
113 113 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
114 114 ostream=sys.__stdout__)
115 115
116 116 def init_poller(self):
117 117 if sys.platform == 'win32':
118 118 if self.interrupt or self.parent:
119 119 self.poller = ParentPollerWindows(self.interrupt, self.parent)
120 120 elif self.parent:
121 121 self.poller = ParentPollerUnix()
122 122
123 123 def _bind_socket(self, s, port):
124 124 iface = 'tcp://%s' % self.ip
125 125 if port <= 0:
126 126 port = s.bind_to_random_port(iface)
127 127 else:
128 128 s.bind(iface + ':%i'%port)
129 129 return port
130 130
131 131 def init_sockets(self):
132 132 # Create a context, a session, and the kernel sockets.
133 133 io.raw_print("Starting the kernel at pid:", os.getpid())
134 134 context = zmq.Context.instance()
135 135 # Uncomment this to try closing the context.
136 136 # atexit.register(context.term)
137 137
138 138 self.shell_socket = context.socket(zmq.XREP)
139 139 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
140 140 self.log.debug("shell XREP Channel on port: %i"%self.shell_port)
141 141
142 142 self.iopub_socket = context.socket(zmq.PUB)
143 143 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
144 144 self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port)
145 145
146 146 self.stdin_socket = context.socket(zmq.XREQ)
147 147 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
148 148 self.log.debug("stdin XREQ Channel on port: %i"%self.stdin_port)
149 149
150 150 self.heartbeat = Heartbeat(context, (self.ip, self.hb_port))
151 151 self.hb_port = self.heartbeat.port
152 152 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
153 153
154 154 # Helper to make it easier to connect to an existing kernel, until we have
155 155 # single-port connection negotiation fully implemented.
156 156 self.log.info("To connect another client to this kernel, use:")
157 157 self.log.info("--external shell={0} iopub={1} stdin={2} hb={3}".format(
158 158 self.shell_port, self.iopub_port, self.stdin_port, self.hb_port))
159 159
160 160
161 161 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
162 162 stdin=self.stdin_port, hb=self.hb_port)
163 163
164 164 def init_session(self):
165 165 """create our session object"""
166 166 self.session = Session(username=u'kernel')
167 167
168 168 def init_io(self):
169 169 """redirects stdout/stderr, and installs a display hook"""
170 170 # Re-direct stdout/stderr, if necessary.
171 171 if self.no_stdout or self.no_stderr:
172 172 blackhole = file(os.devnull, 'w')
173 173 if self.no_stdout:
174 174 sys.stdout = sys.__stdout__ = blackhole
175 175 if self.no_stderr:
176 176 sys.stderr = sys.__stderr__ = blackhole
177 177
178 178 # Redirect input streams and set a display hook.
179 179
180 180 if self.outstream_class:
181 181 outstream_factory = import_item(str(self.outstream_class))
182 182 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
183 183 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
184 184 if self.displayhook_class:
185 185 displayhook_factory = import_item(str(self.displayhook_class))
186 186 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
187 187
188 188 def init_kernel(self):
189 189 """Create the Kernel object itself"""
190 190 kernel_factory = import_item(str(self.kernel_class))
191 191 self.kernel = kernel_factory(config=self.config, session=self.session,
192 192 shell_socket=self.shell_socket,
193 193 iopub_socket=self.iopub_socket,
194 194 stdin_socket=self.stdin_socket,
195 log=self.log
195 196 )
196 197 self.kernel.record_ports(self.ports)
197 198
198 199 def initialize(self, argv=None):
199 200 super(KernelApp, self).initialize(argv)
200 201 self.init_session()
201 202 self.init_poller()
202 203 self.init_sockets()
203 204 self.init_io()
204 205 self.init_kernel()
205 206
206 207 def start(self):
207 208 self.heartbeat.start()
208 209 if self.poller is not None:
209 210 self.poller.start()
210 211 try:
211 212 self.kernel.start()
212 213 except KeyboardInterrupt:
213 214 pass
@@ -1,280 +1,278 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 # Standard library imports.
18 18 import __builtin__
19 19 from code import CommandCompiler
20 20 import sys
21 21 import time
22 22 import traceback
23 23
24 24 # System library imports.
25 25 import zmq
26 26
27 27 # Local imports.
28 28 from IPython.utils.traitlets import HasTraits, Instance, Dict, Float
29 29 from completer import KernelCompleter
30 30 from entry_point import base_launch_kernel
31 31 from session import Session, Message
32 32 from kernelapp import KernelApp
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Main kernel class
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class Kernel(HasTraits):
39 39
40 40 # Private interface
41 41
42 42 # Time to sleep after flushing the stdout/err buffers in each execute
43 43 # cycle. While this introduces a hard limit on the minimal latency of the
44 44 # execute cycle, it helps prevent output synchronization problems for
45 45 # clients.
46 46 # Units are in seconds. The minimum zmq latency on local host is probably
47 47 # ~150 microseconds, set this to 500us for now. We may need to increase it
48 48 # a little if it's not enough after more interactive testing.
49 49 _execute_sleep = Float(0.0005, config=True)
50 50
51 51 # This is a dict of port number that the kernel is listening on. It is set
52 52 # by record_ports and used by connect_request.
53 53 _recorded_ports = Dict()
54 54
55 55 #---------------------------------------------------------------------------
56 56 # Kernel interface
57 57 #---------------------------------------------------------------------------
58 58
59 59 session = Instance(Session)
60 60 shell_socket = Instance('zmq.Socket')
61 61 iopub_socket = Instance('zmq.Socket')
62 62 stdin_socket = Instance('zmq.Socket')
63 log = Instance('logging.Logger')
63 64
64 65 def __init__(self, **kwargs):
65 66 super(Kernel, self).__init__(**kwargs)
66 67 self.user_ns = {}
67 68 self.history = []
68 69 self.compiler = CommandCompiler()
69 70 self.completer = KernelCompleter(self.user_ns)
70 71
71 72 # Build dict of handlers for message types
72 73 msg_types = [ 'execute_request', 'complete_request',
73 74 'object_info_request', 'shutdown_request' ]
74 75 self.handlers = {}
75 76 for msg_type in msg_types:
76 77 self.handlers[msg_type] = getattr(self, msg_type)
77 78
78 79 def start(self):
79 80 """ Start the kernel main loop.
80 81 """
81 82 while True:
82 83 ident,msg = self.session.recv(self.shell_socket,0)
83 84 assert ident is not None, "Missing message part."
84 85 omsg = Message(msg)
85 print>>sys.__stdout__
86 print>>sys.__stdout__, omsg
86 self.log.debug(str(omsg))
87 87 handler = self.handlers.get(omsg.msg_type, None)
88 88 if handler is None:
89 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
89 self.log.error("UNKNOWN MESSAGE TYPE: %s"%omsg)
90 90 else:
91 91 handler(ident, omsg)
92 92
93 93 def record_ports(self, ports):
94 94 """Record the ports that this kernel is using.
95 95
96 96 The creator of the Kernel instance must call this methods if they
97 97 want the :meth:`connect_request` method to return the port numbers.
98 98 """
99 99 self._recorded_ports = ports
100 100
101 101 #---------------------------------------------------------------------------
102 102 # Kernel request handlers
103 103 #---------------------------------------------------------------------------
104 104
105 105 def execute_request(self, ident, parent):
106 106 try:
107 107 code = parent[u'content'][u'code']
108 108 except:
109 print>>sys.__stderr__, "Got bad msg: "
110 print>>sys.__stderr__, Message(parent)
109 self.log.error("Got bad msg: %s"%Message(parent))
111 110 return
112 111 pyin_msg = self.session.send(self.iopub_socket, u'pyin',{u'code':code}, parent=parent)
113 112
114 113 try:
115 114 comp_code = self.compiler(code, '<zmq-kernel>')
116 115
117 116 # Replace raw_input. Note that is not sufficient to replace
118 117 # raw_input in the user namespace.
119 118 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
120 119 __builtin__.raw_input = raw_input
121 120
122 121 # Set the parent message of the display hook and out streams.
123 122 sys.displayhook.set_parent(parent)
124 123 sys.stdout.set_parent(parent)
125 124 sys.stderr.set_parent(parent)
126 125
127 126 exec comp_code in self.user_ns, self.user_ns
128 127 except:
129 128 etype, evalue, tb = sys.exc_info()
130 129 tb = traceback.format_exception(etype, evalue, tb)
131 130 exc_content = {
132 131 u'status' : u'error',
133 132 u'traceback' : tb,
134 133 u'ename' : unicode(etype.__name__),
135 134 u'evalue' : unicode(evalue)
136 135 }
137 136 exc_msg = self.session.send(self.iopub_socket, u'pyerr', exc_content, parent)
138 137 reply_content = exc_content
139 138 else:
140 139 reply_content = { 'status' : 'ok', 'payload' : {} }
141 140
142 141 # Flush output before sending the reply.
143 142 sys.stderr.flush()
144 143 sys.stdout.flush()
145 144 # FIXME: on rare occasions, the flush doesn't seem to make it to the
146 145 # clients... This seems to mitigate the problem, but we definitely need
147 146 # to better understand what's going on.
148 147 if self._execute_sleep:
149 148 time.sleep(self._execute_sleep)
150 149
151 150 # Send the reply.
152 151 reply_msg = self.session.send(self.shell_socket, u'execute_reply', reply_content, parent, ident=ident)
153 print>>sys.__stdout__, Message(reply_msg)
152 self.log.debug(Message(reply_msg))
154 153 if reply_msg['content']['status'] == u'error':
155 154 self._abort_queue()
156 155
157 156 def complete_request(self, ident, parent):
158 157 matches = {'matches' : self._complete(parent),
159 158 'status' : 'ok'}
160 159 completion_msg = self.session.send(self.shell_socket, 'complete_reply',
161 160 matches, parent, ident)
162 print >> sys.__stdout__, completion_msg
161 self.log.debug(completion_msg)
163 162
164 163 def object_info_request(self, ident, parent):
165 164 context = parent['content']['oname'].split('.')
166 165 object_info = self._object_info(context)
167 166 msg = self.session.send(self.shell_socket, 'object_info_reply',
168 167 object_info, parent, ident)
169 print >> sys.__stdout__, msg
168 self.log.debug(msg)
170 169
171 170 def shutdown_request(self, ident, parent):
172 171 content = dict(parent['content'])
173 172 msg = self.session.send(self.shell_socket, 'shutdown_reply',
174 173 content, parent, ident)
175 174 msg = self.session.send(self.iopub_socket, 'shutdown_reply',
176 175 content, parent, ident)
177 print >> sys.__stdout__, msg
176 self.log.debug(msg)
178 177 time.sleep(0.1)
179 178 sys.exit(0)
180 179
181 180 #---------------------------------------------------------------------------
182 181 # Protected interface
183 182 #---------------------------------------------------------------------------
184 183
185 184 def _abort_queue(self):
186 185 while True:
187 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
186 ident,msg = self.session.recv(self.shell_socket, zmq.NOBLOCK)
188 187 if msg is None:
188 # msg=None on EAGAIN
189 189 break
190 190 else:
191 assert ident is not None, "Unexpected missing message part."
192 print>>sys.__stdout__, "Aborting:"
193 print>>sys.__stdout__, Message(msg)
191 assert ident is not None, "Missing message part."
192 self.log.debug("Aborting: %s"%Message(msg))
194 193 msg_type = msg['msg_type']
195 194 reply_type = msg_type.split('_')[0] + '_reply'
196 195 reply_msg = self.session.send(self.shell_socket, reply_type, {'status':'aborted'}, msg, ident=ident)
197 print>>sys.__stdout__, Message(reply_msg)
196 self.log.debug(Message(reply_msg))
198 197 # We need to wait a bit for requests to come in. This can probably
199 198 # be set shorter for true asynchronous clients.
200 199 time.sleep(0.1)
201 200
202 201 def _raw_input(self, prompt, ident, parent):
203 202 # Flush output before making the request.
204 203 sys.stderr.flush()
205 204 sys.stdout.flush()
206 205
207 206 # Send the input request.
208 207 content = dict(prompt=prompt)
209 208 msg = self.session.send(self.stdin_socket, u'input_request', content, parent)
210 209
211 210 # Await a response.
212 211 ident,reply = self.session.recv(self.stdin_socket, 0)
213 212 try:
214 213 value = reply['content']['value']
215 214 except:
216 print>>sys.__stderr__, "Got bad raw_input reply: "
217 print>>sys.__stderr__, Message(parent)
215 self.log.error("Got bad raw_input reply: %s"%Message(parent))
218 216 value = ''
219 217 return value
220 218
221 219 def _complete(self, msg):
222 220 return self.completer.complete(msg.content.line, msg.content.text)
223 221
224 222 def _object_info(self, context):
225 223 symbol, leftover = self._symbol_from_context(context)
226 224 if symbol is not None and not leftover:
227 225 doc = getattr(symbol, '__doc__', '')
228 226 else:
229 227 doc = ''
230 228 object_info = dict(docstring = doc)
231 229 return object_info
232 230
233 231 def _symbol_from_context(self, context):
234 232 if not context:
235 233 return None, context
236 234
237 235 base_symbol_string = context[0]
238 236 symbol = self.user_ns.get(base_symbol_string, None)
239 237 if symbol is None:
240 238 symbol = __builtin__.__dict__.get(base_symbol_string, None)
241 239 if symbol is None:
242 240 return None, context
243 241
244 242 context = context[1:]
245 243 for i, name in enumerate(context):
246 244 new_symbol = getattr(symbol, name, None)
247 245 if new_symbol is None:
248 246 return symbol, context[i:]
249 247 else:
250 248 symbol = new_symbol
251 249
252 250 return symbol, []
253 251
254 252 #-----------------------------------------------------------------------------
255 253 # Kernel main and launch functions
256 254 #-----------------------------------------------------------------------------
257 255
258 256 def launch_kernel(*args, **kwargs):
259 257 """ Launches a simple Python kernel, binding to the specified ports.
260 258
261 259 This function simply calls entry_point.base_launch_kernel with the right first
262 260 command to start a pykernel. See base_launch_kernel for arguments.
263 261
264 262 Returns
265 263 -------
266 264 A tuple of form:
267 265 (kernel_process, xrep_port, pub_port, req_port, hb_port)
268 266 where kernel_process is a Popen object and the ports are integers.
269 267 """
270 268 return base_launch_kernel('from IPython.zmq.pykernel import main; main()',
271 269 *args, **kwargs)
272 270
273 271 def main():
274 272 """Run a PyKernel as an application"""
275 273 app = KernelApp.instance()
276 274 app.initialize()
277 275 app.start()
278 276
279 277 if __name__ == '__main__':
280 278 main()
General Comments 0
You need to be logged in to leave comments. Login now