##// END OF EJS Templates
Remaned kernel.py to pykernel.py and create ipkernel.py.
Brian Granger -
Show More
@@ -0,0 +1,359 b''
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
4 Things to do:
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
9 * Implement control messages.
10 * Implement event loop and poll version.
11 """
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 # Standard library imports.
18 import __builtin__
19 from code import CommandCompiler
20 import os
21 import sys
22 import time
23 import traceback
24
25 # System library imports.
26 import zmq
27
28 # Local imports.
29 from IPython.external.argparse import ArgumentParser
30 from session import Session, Message
31 from completer import KernelCompleter
32 from .iostream import OutStream
33 from .displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
35
36 #-----------------------------------------------------------------------------
37 # Main kernel class
38 #-----------------------------------------------------------------------------
39
40 class Kernel(object):
41
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
43 self.session = session
44 self.reply_socket = reply_socket
45 self.pub_socket = pub_socket
46 self.req_socket = req_socket
47 self.user_ns = {}
48 self.history = []
49 self.compiler = CommandCompiler()
50 self.completer = KernelCompleter(self.user_ns)
51
52 # Build dict of handlers for message types
53 msg_types = [ 'execute_request', 'complete_request',
54 'object_info_request' ]
55 self.handlers = {}
56 for msg_type in msg_types:
57 self.handlers[msg_type] = getattr(self, msg_type)
58
59 def abort_queue(self):
60 while True:
61 try:
62 ident = self.reply_socket.recv(zmq.NOBLOCK)
63 except zmq.ZMQError, e:
64 if e.errno == zmq.EAGAIN:
65 break
66 else:
67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
68 msg = self.reply_socket.recv_json()
69 print>>sys.__stdout__, "Aborting:"
70 print>>sys.__stdout__, Message(msg)
71 msg_type = msg['msg_type']
72 reply_type = msg_type.split('_')[0] + '_reply'
73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
74 print>>sys.__stdout__, Message(reply_msg)
75 self.reply_socket.send(ident,zmq.SNDMORE)
76 self.reply_socket.send_json(reply_msg)
77 # We need to wait a bit for requests to come in. This can probably
78 # be set shorter for true asynchronous clients.
79 time.sleep(0.1)
80
81 def execute_request(self, ident, parent):
82 try:
83 code = parent[u'content'][u'code']
84 except:
85 print>>sys.__stderr__, "Got bad msg: "
86 print>>sys.__stderr__, Message(parent)
87 return
88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
89 self.pub_socket.send_json(pyin_msg)
90
91 try:
92 comp_code = self.compiler(code, '<zmq-kernel>')
93
94 # Replace raw_input. Note that is not sufficient to replace
95 # raw_input in the user namespace.
96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
97 __builtin__.raw_input = raw_input
98
99 # Configure the display hook.
100 sys.displayhook.set_parent(parent)
101
102 exec comp_code in self.user_ns, self.user_ns
103 except:
104 etype, evalue, tb = sys.exc_info()
105 tb = traceback.format_exception(etype, evalue, tb)
106 exc_content = {
107 u'status' : u'error',
108 u'traceback' : tb,
109 u'ename' : unicode(etype.__name__),
110 u'evalue' : unicode(evalue)
111 }
112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
113 self.pub_socket.send_json(exc_msg)
114 reply_content = exc_content
115 else:
116 reply_content = {'status' : 'ok'}
117
118 # Flush output before sending the reply.
119 sys.stderr.flush()
120 sys.stdout.flush()
121
122 # Send the reply.
123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
124 print>>sys.__stdout__, Message(reply_msg)
125 self.reply_socket.send(ident, zmq.SNDMORE)
126 self.reply_socket.send_json(reply_msg)
127 if reply_msg['content']['status'] == u'error':
128 self.abort_queue()
129
130 def raw_input(self, prompt, ident, parent):
131 # Flush output before making the request.
132 sys.stderr.flush()
133 sys.stdout.flush()
134
135 # Send the input request.
136 content = dict(prompt=prompt)
137 msg = self.session.msg(u'input_request', content, parent)
138 self.req_socket.send_json(msg)
139
140 # Await a response.
141 reply = self.req_socket.recv_json()
142 try:
143 value = reply['content']['value']
144 except:
145 print>>sys.__stderr__, "Got bad raw_input reply: "
146 print>>sys.__stderr__, Message(parent)
147 value = ''
148 return value
149
150 def complete_request(self, ident, parent):
151 matches = {'matches' : self.complete(parent),
152 'status' : 'ok'}
153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
154 matches, parent, ident)
155 print >> sys.__stdout__, completion_msg
156
157 def complete(self, msg):
158 return self.completer.complete(msg.content.line, msg.content.text)
159
160 def object_info_request(self, ident, parent):
161 context = parent['content']['oname'].split('.')
162 object_info = self.object_info(context)
163 msg = self.session.send(self.reply_socket, 'object_info_reply',
164 object_info, parent, ident)
165 print >> sys.__stdout__, msg
166
167 def object_info(self, context):
168 symbol, leftover = self.symbol_from_context(context)
169 if symbol is not None and not leftover:
170 doc = getattr(symbol, '__doc__', '')
171 else:
172 doc = ''
173 object_info = dict(docstring = doc)
174 return object_info
175
176 def symbol_from_context(self, context):
177 if not context:
178 return None, context
179
180 base_symbol_string = context[0]
181 symbol = self.user_ns.get(base_symbol_string, None)
182 if symbol is None:
183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
184 if symbol is None:
185 return None, context
186
187 context = context[1:]
188 for i, name in enumerate(context):
189 new_symbol = getattr(symbol, name, None)
190 if new_symbol is None:
191 return symbol, context[i:]
192 else:
193 symbol = new_symbol
194
195 return symbol, []
196
197 def start(self):
198 while True:
199 ident = self.reply_socket.recv()
200 assert self.reply_socket.rcvmore(), "Missing message part."
201 msg = self.reply_socket.recv_json()
202 omsg = Message(msg)
203 print>>sys.__stdout__
204 print>>sys.__stdout__, omsg
205 handler = self.handlers.get(omsg.msg_type, None)
206 if handler is None:
207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
208 else:
209 handler(ident, omsg)
210
211 #-----------------------------------------------------------------------------
212 # Kernel main and launch functions
213 #-----------------------------------------------------------------------------
214
215 def bind_port(socket, ip, port):
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
217 port is chosen. Returns the port that was bound.
218 """
219 connection = 'tcp://%s' % ip
220 if port <= 0:
221 port = socket.bind_to_random_port(connection)
222 else:
223 connection += ':%i' % port
224 socket.bind(connection)
225 return port
226
227
228 def main():
229 """ Main entry point for launching a kernel.
230 """
231 # Parse command line arguments.
232 parser = ArgumentParser()
233 parser.add_argument('--ip', type=str, default='127.0.0.1',
234 help='set the kernel\'s IP address [default: local]')
235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
236 help='set the XREP channel port [default: random]')
237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
238 help='set the PUB channel port [default: random]')
239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
240 help='set the REQ channel port [default: random]')
241 if sys.platform == 'win32':
242 parser.add_argument('--parent', type=int, metavar='HANDLE',
243 default=0, help='kill this process if the process '
244 'with HANDLE dies')
245 else:
246 parser.add_argument('--parent', action='store_true',
247 help='kill this process if its parent dies')
248 namespace = parser.parse_args()
249
250 # Create a context, a session, and the kernel sockets.
251 print >>sys.__stdout__, "Starting the kernel..."
252 context = zmq.Context()
253 session = Session(username=u'kernel')
254
255 reply_socket = context.socket(zmq.XREP)
256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
258
259 pub_socket = context.socket(zmq.PUB)
260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
261 print >>sys.__stdout__, "PUB Channel on port", pub_port
262
263 req_socket = context.socket(zmq.XREQ)
264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
265 print >>sys.__stdout__, "REQ Channel on port", req_port
266
267 # Redirect input streams and set a display hook.
268 sys.stdout = OutStream(session, pub_socket, u'stdout')
269 sys.stderr = OutStream(session, pub_socket, u'stderr')
270 sys.displayhook = DisplayHook(session, pub_socket)
271
272 # Create the kernel.
273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
274
275 # Configure this kernel/process to die on parent termination, if necessary.
276 if namespace.parent:
277 if sys.platform == 'win32':
278 poller = ExitPollerWindows(namespace.parent)
279 else:
280 poller = ExitPollerUnix()
281 poller.start()
282
283 # Start the kernel mainloop.
284 kernel.start()
285
286
287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
288 """ Launches a localhost kernel, binding to the specified ports.
289
290 Parameters
291 ----------
292 xrep_port : int, optional
293 The port to use for XREP channel.
294
295 pub_port : int, optional
296 The port to use for the SUB channel.
297
298 req_port : int, optional
299 The port to use for the REQ (raw input) channel.
300
301 independent : bool, optional (default False)
302 If set, the kernel process is guaranteed to survive if this process
303 dies. If not set, an effort is made to ensure that the kernel is killed
304 when this process dies. Note that in this case it is still good practice
305 to kill kernels manually before exiting.
306
307 Returns
308 -------
309 A tuple of form:
310 (kernel_process, xrep_port, pub_port, req_port)
311 where kernel_process is a Popen object and the ports are integers.
312 """
313 import socket
314 from subprocess import Popen
315
316 # Find open ports as necessary.
317 ports = []
318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
319 for i in xrange(ports_needed):
320 sock = socket.socket()
321 sock.bind(('', 0))
322 ports.append(sock)
323 for i, sock in enumerate(ports):
324 port = sock.getsockname()[1]
325 sock.close()
326 ports[i] = port
327 if xrep_port <= 0:
328 xrep_port = ports.pop(0)
329 if pub_port <= 0:
330 pub_port = ports.pop(0)
331 if req_port <= 0:
332 req_port = ports.pop(0)
333
334 # Spawn a kernel.
335 command = 'from IPython.zmq.pykernel import main; main()'
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
337 '--pub', str(pub_port), '--req', str(req_port) ]
338 if independent:
339 if sys.platform == 'win32':
340 proc = Popen(['start', '/b'] + arguments, shell=True)
341 else:
342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
343 else:
344 if sys.platform == 'win32':
345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
346 DUPLICATE_SAME_ACCESS
347 pid = GetCurrentProcess()
348 handle = DuplicateHandle(pid, pid, pid, 0,
349 True, # Inheritable by new processes.
350 DUPLICATE_SAME_ACCESS)
351 proc = Popen(arguments + ['--parent', str(int(handle))])
352 else:
353 proc = Popen(arguments + ['--parent'])
354
355 return proc, xrep_port, pub_port, req_port
356
357
358 if __name__ == '__main__':
359 main()
@@ -332,7 +332,7 b' def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):'
332 req_port = ports.pop(0)
332 req_port = ports.pop(0)
333
333
334 # Spawn a kernel.
334 # Spawn a kernel.
335 command = 'from IPython.zmq.kernel import main; main()'
335 command = 'from IPython.zmq.ipkernel import main; main()'
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
337 '--pub', str(pub_port), '--req', str(req_port) ]
337 '--pub', str(pub_port), '--req', str(req_port) ]
338 if independent:
338 if independent:
@@ -30,7 +30,7 b' from zmq.eventloop import ioloop'
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 from kernel import launch_kernel
33 from .ipkernel import launch_kernel
34 from session import Session
34 from session import Session
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now