##// END OF EJS Templates
First semi-working draft of ipython based kernel.
Brian Granger -
Show More
@@ -1,359 +1,364 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 # Standard library imports.
17 # Standard library imports.
18 import __builtin__
18 import __builtin__
19 from code import CommandCompiler
19 from code import CommandCompiler
20 import os
20 import os
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24
24
25 # System library imports.
25 # System library imports.
26 import zmq
26 import zmq
27
27
28 # Local imports.
28 # Local imports.
29 from IPython.config.configurable import Configurable
30 from IPython.core.iplib import InteractiveShell, InteractiveShellABC
29 from IPython.external.argparse import ArgumentParser
31 from IPython.external.argparse import ArgumentParser
30 from session import Session, Message
32 from IPython.utils.traitlets import Instance
33 from IPython.zmq.session import Session, Message
31 from completer import KernelCompleter
34 from completer import KernelCompleter
32 from .iostream import OutStream
35 from iostream import OutStream
33 from .displayhook import DisplayHook
36 from displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
37 from exitpoller import ExitPollerUnix, ExitPollerWindows
35
38
36 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
37 # Main kernel class
40 # Main kernel class
38 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
39
42
40 class Kernel(object):
43 class Kernel(Configurable):
41
44
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
45 shell = Instance('IPython.core.iplib.InteractiveShellABC')
43 self.session = session
46 session = Instance('IPython.zmq.session.Session')
44 self.reply_socket = reply_socket
47 reply_socket = Instance('zmq.Socket')
45 self.pub_socket = pub_socket
48 pub_socket = Instance('zmq.Socket')
46 self.req_socket = req_socket
49 req_socket = Instance('zmq.Socket')
47 self.user_ns = {}
50
48 self.history = []
51 def __init__(self, **kwargs):
49 self.compiler = CommandCompiler()
52 super(Kernel, self).__init__(**kwargs)
50 self.completer = KernelCompleter(self.user_ns)
53 self.shell = InteractiveShell.instance()
51
54
52 # Build dict of handlers for message types
55 # Build dict of handlers for message types
53 msg_types = [ 'execute_request', 'complete_request',
56 msg_types = [ 'execute_request', 'complete_request',
54 'object_info_request' ]
57 'object_info_request' ]
55 self.handlers = {}
58 self.handlers = {}
56 for msg_type in msg_types:
59 for msg_type in msg_types:
57 self.handlers[msg_type] = getattr(self, msg_type)
60 self.handlers[msg_type] = getattr(self, msg_type)
58
61
59 def abort_queue(self):
62 def abort_queue(self):
60 while True:
63 while True:
61 try:
64 try:
62 ident = self.reply_socket.recv(zmq.NOBLOCK)
65 ident = self.reply_socket.recv(zmq.NOBLOCK)
63 except zmq.ZMQError, e:
66 except zmq.ZMQError, e:
64 if e.errno == zmq.EAGAIN:
67 if e.errno == zmq.EAGAIN:
65 break
68 break
66 else:
69 else:
67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
70 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
68 msg = self.reply_socket.recv_json()
71 msg = self.reply_socket.recv_json()
69 print>>sys.__stdout__, "Aborting:"
72 print>>sys.__stdout__, "Aborting:"
70 print>>sys.__stdout__, Message(msg)
73 print>>sys.__stdout__, Message(msg)
71 msg_type = msg['msg_type']
74 msg_type = msg['msg_type']
72 reply_type = msg_type.split('_')[0] + '_reply'
75 reply_type = msg_type.split('_')[0] + '_reply'
73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
76 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
74 print>>sys.__stdout__, Message(reply_msg)
77 print>>sys.__stdout__, Message(reply_msg)
75 self.reply_socket.send(ident,zmq.SNDMORE)
78 self.reply_socket.send(ident,zmq.SNDMORE)
76 self.reply_socket.send_json(reply_msg)
79 self.reply_socket.send_json(reply_msg)
77 # We need to wait a bit for requests to come in. This can probably
80 # We need to wait a bit for requests to come in. This can probably
78 # be set shorter for true asynchronous clients.
81 # be set shorter for true asynchronous clients.
79 time.sleep(0.1)
82 time.sleep(0.1)
80
83
81 def execute_request(self, ident, parent):
84 def execute_request(self, ident, parent):
82 try:
85 try:
83 code = parent[u'content'][u'code']
86 code = parent[u'content'][u'code']
84 except:
87 except:
85 print>>sys.__stderr__, "Got bad msg: "
88 print>>sys.__stderr__, "Got bad msg: "
86 print>>sys.__stderr__, Message(parent)
89 print>>sys.__stderr__, Message(parent)
87 return
90 return
88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
91 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
89 self.pub_socket.send_json(pyin_msg)
92 self.pub_socket.send_json(pyin_msg)
90
93
91 try:
94 try:
92 comp_code = self.compiler(code, '<zmq-kernel>')
93
94 # Replace raw_input. Note that is not sufficient to replace
95 # Replace raw_input. Note that is not sufficient to replace
95 # raw_input in the user namespace.
96 # raw_input in the user namespace.
96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
97 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
97 __builtin__.raw_input = raw_input
98 __builtin__.raw_input = raw_input
98
99
99 # Configure the display hook.
100 # Configure the display hook.
100 sys.displayhook.set_parent(parent)
101 sys.displayhook.set_parent(parent)
101
102
102 exec comp_code in self.user_ns, self.user_ns
103 self.shell.runlines(code)
104 # exec comp_code in self.user_ns, self.user_ns
103 except:
105 except:
104 etype, evalue, tb = sys.exc_info()
106 etype, evalue, tb = sys.exc_info()
105 tb = traceback.format_exception(etype, evalue, tb)
107 tb = traceback.format_exception(etype, evalue, tb)
106 exc_content = {
108 exc_content = {
107 u'status' : u'error',
109 u'status' : u'error',
108 u'traceback' : tb,
110 u'traceback' : tb,
109 u'ename' : unicode(etype.__name__),
111 u'ename' : unicode(etype.__name__),
110 u'evalue' : unicode(evalue)
112 u'evalue' : unicode(evalue)
111 }
113 }
112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
114 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
113 self.pub_socket.send_json(exc_msg)
115 self.pub_socket.send_json(exc_msg)
114 reply_content = exc_content
116 reply_content = exc_content
115 else:
117 else:
116 reply_content = {'status' : 'ok'}
118 reply_content = {'status' : 'ok'}
117
119
118 # Flush output before sending the reply.
120 # Flush output before sending the reply.
119 sys.stderr.flush()
121 sys.stderr.flush()
120 sys.stdout.flush()
122 sys.stdout.flush()
121
123
122 # Send the reply.
124 # Send the reply.
123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
125 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
124 print>>sys.__stdout__, Message(reply_msg)
126 print>>sys.__stdout__, Message(reply_msg)
125 self.reply_socket.send(ident, zmq.SNDMORE)
127 self.reply_socket.send(ident, zmq.SNDMORE)
126 self.reply_socket.send_json(reply_msg)
128 self.reply_socket.send_json(reply_msg)
127 if reply_msg['content']['status'] == u'error':
129 if reply_msg['content']['status'] == u'error':
128 self.abort_queue()
130 self.abort_queue()
129
131
130 def raw_input(self, prompt, ident, parent):
132 def raw_input(self, prompt, ident, parent):
131 # Flush output before making the request.
133 # Flush output before making the request.
132 sys.stderr.flush()
134 sys.stderr.flush()
133 sys.stdout.flush()
135 sys.stdout.flush()
134
136
135 # Send the input request.
137 # Send the input request.
136 content = dict(prompt=prompt)
138 content = dict(prompt=prompt)
137 msg = self.session.msg(u'input_request', content, parent)
139 msg = self.session.msg(u'input_request', content, parent)
138 self.req_socket.send_json(msg)
140 self.req_socket.send_json(msg)
139
141
140 # Await a response.
142 # Await a response.
141 reply = self.req_socket.recv_json()
143 reply = self.req_socket.recv_json()
142 try:
144 try:
143 value = reply['content']['value']
145 value = reply['content']['value']
144 except:
146 except:
145 print>>sys.__stderr__, "Got bad raw_input reply: "
147 print>>sys.__stderr__, "Got bad raw_input reply: "
146 print>>sys.__stderr__, Message(parent)
148 print>>sys.__stderr__, Message(parent)
147 value = ''
149 value = ''
148 return value
150 return value
149
151
150 def complete_request(self, ident, parent):
152 def complete_request(self, ident, parent):
151 matches = {'matches' : self.complete(parent),
153 matches = {'matches' : self.complete(parent),
152 'status' : 'ok'}
154 'status' : 'ok'}
153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
155 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
154 matches, parent, ident)
156 matches, parent, ident)
155 print >> sys.__stdout__, completion_msg
157 print >> sys.__stdout__, completion_msg
156
158
157 def complete(self, msg):
159 def complete(self, msg):
158 return self.completer.complete(msg.content.line, msg.content.text)
160 return self.shell.complete(msg.content.line)
159
161
160 def object_info_request(self, ident, parent):
162 def object_info_request(self, ident, parent):
161 context = parent['content']['oname'].split('.')
163 context = parent['content']['oname'].split('.')
162 object_info = self.object_info(context)
164 object_info = self.object_info(context)
163 msg = self.session.send(self.reply_socket, 'object_info_reply',
165 msg = self.session.send(self.reply_socket, 'object_info_reply',
164 object_info, parent, ident)
166 object_info, parent, ident)
165 print >> sys.__stdout__, msg
167 print >> sys.__stdout__, msg
166
168
167 def object_info(self, context):
169 def object_info(self, context):
168 symbol, leftover = self.symbol_from_context(context)
170 symbol, leftover = self.symbol_from_context(context)
169 if symbol is not None and not leftover:
171 if symbol is not None and not leftover:
170 doc = getattr(symbol, '__doc__', '')
172 doc = getattr(symbol, '__doc__', '')
171 else:
173 else:
172 doc = ''
174 doc = ''
173 object_info = dict(docstring = doc)
175 object_info = dict(docstring = doc)
174 return object_info
176 return object_info
175
177
176 def symbol_from_context(self, context):
178 def symbol_from_context(self, context):
177 if not context:
179 if not context:
178 return None, context
180 return None, context
179
181
180 base_symbol_string = context[0]
182 base_symbol_string = context[0]
181 symbol = self.user_ns.get(base_symbol_string, None)
183 symbol = self.shell.user_ns.get(base_symbol_string, None)
182 if symbol is None:
184 if symbol is None:
183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
185 symbol = __builtin__.__dict__.get(base_symbol_string, None)
184 if symbol is None:
186 if symbol is None:
185 return None, context
187 return None, context
186
188
187 context = context[1:]
189 context = context[1:]
188 for i, name in enumerate(context):
190 for i, name in enumerate(context):
189 new_symbol = getattr(symbol, name, None)
191 new_symbol = getattr(symbol, name, None)
190 if new_symbol is None:
192 if new_symbol is None:
191 return symbol, context[i:]
193 return symbol, context[i:]
192 else:
194 else:
193 symbol = new_symbol
195 symbol = new_symbol
194
196
195 return symbol, []
197 return symbol, []
196
198
197 def start(self):
199 def start(self):
198 while True:
200 while True:
199 ident = self.reply_socket.recv()
201 ident = self.reply_socket.recv()
200 assert self.reply_socket.rcvmore(), "Missing message part."
202 assert self.reply_socket.rcvmore(), "Missing message part."
201 msg = self.reply_socket.recv_json()
203 msg = self.reply_socket.recv_json()
202 omsg = Message(msg)
204 omsg = Message(msg)
203 print>>sys.__stdout__
205 print>>sys.__stdout__
204 print>>sys.__stdout__, omsg
206 print>>sys.__stdout__, omsg
205 handler = self.handlers.get(omsg.msg_type, None)
207 handler = self.handlers.get(omsg.msg_type, None)
206 if handler is None:
208 if handler is None:
207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
209 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
208 else:
210 else:
209 handler(ident, omsg)
211 handler(ident, omsg)
210
212
211 #-----------------------------------------------------------------------------
213 #-----------------------------------------------------------------------------
212 # Kernel main and launch functions
214 # Kernel main and launch functions
213 #-----------------------------------------------------------------------------
215 #-----------------------------------------------------------------------------
214
216
215 def bind_port(socket, ip, port):
217 def bind_port(socket, ip, port):
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
218 """ Binds the specified ZMQ socket. If the port is less than zero, a random
217 port is chosen. Returns the port that was bound.
219 port is chosen. Returns the port that was bound.
218 """
220 """
219 connection = 'tcp://%s' % ip
221 connection = 'tcp://%s' % ip
220 if port <= 0:
222 if port <= 0:
221 port = socket.bind_to_random_port(connection)
223 port = socket.bind_to_random_port(connection)
222 else:
224 else:
223 connection += ':%i' % port
225 connection += ':%i' % port
224 socket.bind(connection)
226 socket.bind(connection)
225 return port
227 return port
226
228
227
229
228 def main():
230 def main():
229 """ Main entry point for launching a kernel.
231 """ Main entry point for launching a kernel.
230 """
232 """
231 # Parse command line arguments.
233 # Parse command line arguments.
232 parser = ArgumentParser()
234 parser = ArgumentParser()
233 parser.add_argument('--ip', type=str, default='127.0.0.1',
235 parser.add_argument('--ip', type=str, default='127.0.0.1',
234 help='set the kernel\'s IP address [default: local]')
236 help='set the kernel\'s IP address [default: local]')
235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
237 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
236 help='set the XREP channel port [default: random]')
238 help='set the XREP channel port [default: random]')
237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
239 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
238 help='set the PUB channel port [default: random]')
240 help='set the PUB channel port [default: random]')
239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
241 parser.add_argument('--req', type=int, metavar='PORT', default=0,
240 help='set the REQ channel port [default: random]')
242 help='set the REQ channel port [default: random]')
241 if sys.platform == 'win32':
243 if sys.platform == 'win32':
242 parser.add_argument('--parent', type=int, metavar='HANDLE',
244 parser.add_argument('--parent', type=int, metavar='HANDLE',
243 default=0, help='kill this process if the process '
245 default=0, help='kill this process if the process '
244 'with HANDLE dies')
246 'with HANDLE dies')
245 else:
247 else:
246 parser.add_argument('--parent', action='store_true',
248 parser.add_argument('--parent', action='store_true',
247 help='kill this process if its parent dies')
249 help='kill this process if its parent dies')
248 namespace = parser.parse_args()
250 namespace = parser.parse_args()
249
251
250 # Create a context, a session, and the kernel sockets.
252 # Create a context, a session, and the kernel sockets.
251 print >>sys.__stdout__, "Starting the kernel..."
253 print >>sys.__stdout__, "Starting the kernel..."
252 context = zmq.Context()
254 context = zmq.Context()
253 session = Session(username=u'kernel')
255 session = Session(username=u'kernel')
254
256
255 reply_socket = context.socket(zmq.XREP)
257 reply_socket = context.socket(zmq.XREP)
256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
258 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
259 print >>sys.__stdout__, "XREP Channel on port", xrep_port
258
260
259 pub_socket = context.socket(zmq.PUB)
261 pub_socket = context.socket(zmq.PUB)
260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
262 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
261 print >>sys.__stdout__, "PUB Channel on port", pub_port
263 print >>sys.__stdout__, "PUB Channel on port", pub_port
262
264
263 req_socket = context.socket(zmq.XREQ)
265 req_socket = context.socket(zmq.XREQ)
264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
266 req_port = bind_port(req_socket, namespace.ip, namespace.req)
265 print >>sys.__stdout__, "REQ Channel on port", req_port
267 print >>sys.__stdout__, "REQ Channel on port", req_port
266
268
267 # Redirect input streams and set a display hook.
269 # Redirect input streams and set a display hook.
268 sys.stdout = OutStream(session, pub_socket, u'stdout')
270 sys.stdout = OutStream(session, pub_socket, u'stdout')
269 sys.stderr = OutStream(session, pub_socket, u'stderr')
271 sys.stderr = OutStream(session, pub_socket, u'stderr')
270 sys.displayhook = DisplayHook(session, pub_socket)
272 sys.displayhook = DisplayHook(session, pub_socket)
271
273
272 # Create the kernel.
274 # Create the kernel.
273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
275 kernel = Kernel(
276 session=session, reply_socket=reply_socket,
277 pub_socket=pub_socket, req_socket=req_socket
278 )
274
279
275 # Configure this kernel/process to die on parent termination, if necessary.
280 # Configure this kernel/process to die on parent termination, if necessary.
276 if namespace.parent:
281 if namespace.parent:
277 if sys.platform == 'win32':
282 if sys.platform == 'win32':
278 poller = ExitPollerWindows(namespace.parent)
283 poller = ExitPollerWindows(namespace.parent)
279 else:
284 else:
280 poller = ExitPollerUnix()
285 poller = ExitPollerUnix()
281 poller.start()
286 poller.start()
282
287
283 # Start the kernel mainloop.
288 # Start the kernel mainloop.
284 kernel.start()
289 kernel.start()
285
290
286
291
287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
292 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
288 """ Launches a localhost kernel, binding to the specified ports.
293 """ Launches a localhost kernel, binding to the specified ports.
289
294
290 Parameters
295 Parameters
291 ----------
296 ----------
292 xrep_port : int, optional
297 xrep_port : int, optional
293 The port to use for XREP channel.
298 The port to use for XREP channel.
294
299
295 pub_port : int, optional
300 pub_port : int, optional
296 The port to use for the SUB channel.
301 The port to use for the SUB channel.
297
302
298 req_port : int, optional
303 req_port : int, optional
299 The port to use for the REQ (raw input) channel.
304 The port to use for the REQ (raw input) channel.
300
305
301 independent : bool, optional (default False)
306 independent : bool, optional (default False)
302 If set, the kernel process is guaranteed to survive if this process
307 If set, the kernel process is guaranteed to survive if this process
303 dies. If not set, an effort is made to ensure that the kernel is killed
308 dies. If not set, an effort is made to ensure that the kernel is killed
304 when this process dies. Note that in this case it is still good practice
309 when this process dies. Note that in this case it is still good practice
305 to kill kernels manually before exiting.
310 to kill kernels manually before exiting.
306
311
307 Returns
312 Returns
308 -------
313 -------
309 A tuple of form:
314 A tuple of form:
310 (kernel_process, xrep_port, pub_port, req_port)
315 (kernel_process, xrep_port, pub_port, req_port)
311 where kernel_process is a Popen object and the ports are integers.
316 where kernel_process is a Popen object and the ports are integers.
312 """
317 """
313 import socket
318 import socket
314 from subprocess import Popen
319 from subprocess import Popen
315
320
316 # Find open ports as necessary.
321 # Find open ports as necessary.
317 ports = []
322 ports = []
318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
323 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
319 for i in xrange(ports_needed):
324 for i in xrange(ports_needed):
320 sock = socket.socket()
325 sock = socket.socket()
321 sock.bind(('', 0))
326 sock.bind(('', 0))
322 ports.append(sock)
327 ports.append(sock)
323 for i, sock in enumerate(ports):
328 for i, sock in enumerate(ports):
324 port = sock.getsockname()[1]
329 port = sock.getsockname()[1]
325 sock.close()
330 sock.close()
326 ports[i] = port
331 ports[i] = port
327 if xrep_port <= 0:
332 if xrep_port <= 0:
328 xrep_port = ports.pop(0)
333 xrep_port = ports.pop(0)
329 if pub_port <= 0:
334 if pub_port <= 0:
330 pub_port = ports.pop(0)
335 pub_port = ports.pop(0)
331 if req_port <= 0:
336 if req_port <= 0:
332 req_port = ports.pop(0)
337 req_port = ports.pop(0)
333
338
334 # Spawn a kernel.
339 # Spawn a kernel.
335 command = 'from IPython.zmq.ipkernel import main; main()'
340 command = 'from IPython.zmq.ipkernel import main; main()'
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
341 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
337 '--pub', str(pub_port), '--req', str(req_port) ]
342 '--pub', str(pub_port), '--req', str(req_port) ]
338 if independent:
343 if independent:
339 if sys.platform == 'win32':
344 if sys.platform == 'win32':
340 proc = Popen(['start', '/b'] + arguments, shell=True)
345 proc = Popen(['start', '/b'] + arguments, shell=True)
341 else:
346 else:
342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
347 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
343 else:
348 else:
344 if sys.platform == 'win32':
349 if sys.platform == 'win32':
345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
350 from _subprocess import DuplicateHandle, GetCurrentProcess, \
346 DUPLICATE_SAME_ACCESS
351 DUPLICATE_SAME_ACCESS
347 pid = GetCurrentProcess()
352 pid = GetCurrentProcess()
348 handle = DuplicateHandle(pid, pid, pid, 0,
353 handle = DuplicateHandle(pid, pid, pid, 0,
349 True, # Inheritable by new processes.
354 True, # Inheritable by new processes.
350 DUPLICATE_SAME_ACCESS)
355 DUPLICATE_SAME_ACCESS)
351 proc = Popen(arguments + ['--parent', str(int(handle))])
356 proc = Popen(arguments + ['--parent', str(int(handle))])
352 else:
357 else:
353 proc = Popen(arguments + ['--parent'])
358 proc = Popen(arguments + ['--parent'])
354
359
355 return proc, xrep_port, pub_port, req_port
360 return proc, xrep_port, pub_port, req_port
356
361
357
362
358 if __name__ == '__main__':
363 if __name__ == '__main__':
359 main()
364 main()
@@ -1,571 +1,571 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 from threading import Thread
23 from threading import Thread
24 import time
24 import time
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 from .ipkernel import launch_kernel
33 from ipkernel import launch_kernel
34 from session import Session
34 from session import Session
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 LOCALHOST = '127.0.0.1'
40 LOCALHOST = '127.0.0.1'
41
41
42 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
43 pass
43 pass
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # ZMQ Socket Channel classes
46 # ZMQ Socket Channel classes
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 class ZmqSocketChannel(Thread):
49 class ZmqSocketChannel(Thread):
50 """The base class for the channels that use ZMQ sockets.
50 """The base class for the channels that use ZMQ sockets.
51 """
51 """
52 context = None
52 context = None
53 session = None
53 session = None
54 socket = None
54 socket = None
55 ioloop = None
55 ioloop = None
56 iostate = None
56 iostate = None
57 _address = None
57 _address = None
58
58
59 def __init__(self, context, session, address):
59 def __init__(self, context, session, address):
60 """Create a channel
60 """Create a channel
61
61
62 Parameters
62 Parameters
63 ----------
63 ----------
64 context : :class:`zmq.Context`
64 context : :class:`zmq.Context`
65 The ZMQ context to use.
65 The ZMQ context to use.
66 session : :class:`session.Session`
66 session : :class:`session.Session`
67 The session to use.
67 The session to use.
68 address : tuple
68 address : tuple
69 Standard (ip, port) tuple that the kernel is listening on.
69 Standard (ip, port) tuple that the kernel is listening on.
70 """
70 """
71 super(ZmqSocketChannel, self).__init__()
71 super(ZmqSocketChannel, self).__init__()
72 self.daemon = True
72 self.daemon = True
73
73
74 self.context = context
74 self.context = context
75 self.session = session
75 self.session = session
76 if address[1] == 0:
76 if address[1] == 0:
77 message = 'The port number for a channel cannot be 0.'
77 message = 'The port number for a channel cannot be 0.'
78 raise InvalidPortNumber(message)
78 raise InvalidPortNumber(message)
79 self._address = address
79 self._address = address
80
80
81 def stop(self):
81 def stop(self):
82 """Stop the channel's activity.
82 """Stop the channel's activity.
83
83
84 This calls :method:`Thread.join` and returns when the thread
84 This calls :method:`Thread.join` and returns when the thread
85 terminates. :class:`RuntimeError` will be raised if
85 terminates. :class:`RuntimeError` will be raised if
86 :method:`self.start` is called again.
86 :method:`self.start` is called again.
87 """
87 """
88 self.join()
88 self.join()
89
89
90 @property
90 @property
91 def address(self):
91 def address(self):
92 """Get the channel's address as an (ip, port) tuple.
92 """Get the channel's address as an (ip, port) tuple.
93
93
94 By the default, the address is (localhost, 0), where 0 means a random
94 By the default, the address is (localhost, 0), where 0 means a random
95 port.
95 port.
96 """
96 """
97 return self._address
97 return self._address
98
98
99 def add_io_state(self, state):
99 def add_io_state(self, state):
100 """Add IO state to the eventloop.
100 """Add IO state to the eventloop.
101
101
102 Parameters
102 Parameters
103 ----------
103 ----------
104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
105 The IO state flag to set.
105 The IO state flag to set.
106
106
107 This is thread safe as it uses the thread safe IOLoop.add_callback.
107 This is thread safe as it uses the thread safe IOLoop.add_callback.
108 """
108 """
109 def add_io_state_callback():
109 def add_io_state_callback():
110 if not self.iostate & state:
110 if not self.iostate & state:
111 self.iostate = self.iostate | state
111 self.iostate = self.iostate | state
112 self.ioloop.update_handler(self.socket, self.iostate)
112 self.ioloop.update_handler(self.socket, self.iostate)
113 self.ioloop.add_callback(add_io_state_callback)
113 self.ioloop.add_callback(add_io_state_callback)
114
114
115 def drop_io_state(self, state):
115 def drop_io_state(self, state):
116 """Drop IO state from the eventloop.
116 """Drop IO state from the eventloop.
117
117
118 Parameters
118 Parameters
119 ----------
119 ----------
120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
121 The IO state flag to set.
121 The IO state flag to set.
122
122
123 This is thread safe as it uses the thread safe IOLoop.add_callback.
123 This is thread safe as it uses the thread safe IOLoop.add_callback.
124 """
124 """
125 def drop_io_state_callback():
125 def drop_io_state_callback():
126 if self.iostate & state:
126 if self.iostate & state:
127 self.iostate = self.iostate & (~state)
127 self.iostate = self.iostate & (~state)
128 self.ioloop.update_handler(self.socket, self.iostate)
128 self.ioloop.update_handler(self.socket, self.iostate)
129 self.ioloop.add_callback(drop_io_state_callback)
129 self.ioloop.add_callback(drop_io_state_callback)
130
130
131
131
132 class XReqSocketChannel(ZmqSocketChannel):
132 class XReqSocketChannel(ZmqSocketChannel):
133 """The XREQ channel for issues request/replies to the kernel.
133 """The XREQ channel for issues request/replies to the kernel.
134 """
134 """
135
135
136 command_queue = None
136 command_queue = None
137
137
138 def __init__(self, context, session, address):
138 def __init__(self, context, session, address):
139 self.command_queue = Queue()
139 self.command_queue = Queue()
140 super(XReqSocketChannel, self).__init__(context, session, address)
140 super(XReqSocketChannel, self).__init__(context, session, address)
141
141
142 def run(self):
142 def run(self):
143 """The thread's main activity. Call start() instead."""
143 """The thread's main activity. Call start() instead."""
144 self.socket = self.context.socket(zmq.XREQ)
144 self.socket = self.context.socket(zmq.XREQ)
145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 self.socket.connect('tcp://%s:%i' % self.address)
146 self.socket.connect('tcp://%s:%i' % self.address)
147 self.ioloop = ioloop.IOLoop()
147 self.ioloop = ioloop.IOLoop()
148 self.iostate = POLLERR|POLLIN
148 self.iostate = POLLERR|POLLIN
149 self.ioloop.add_handler(self.socket, self._handle_events,
149 self.ioloop.add_handler(self.socket, self._handle_events,
150 self.iostate)
150 self.iostate)
151 self.ioloop.start()
151 self.ioloop.start()
152
152
153 def stop(self):
153 def stop(self):
154 self.ioloop.stop()
154 self.ioloop.stop()
155 super(XReqSocketChannel, self).stop()
155 super(XReqSocketChannel, self).stop()
156
156
157 def call_handlers(self, msg):
157 def call_handlers(self, msg):
158 """This method is called in the ioloop thread when a message arrives.
158 """This method is called in the ioloop thread when a message arrives.
159
159
160 Subclasses should override this method to handle incoming messages.
160 Subclasses should override this method to handle incoming messages.
161 It is important to remember that this method is called in the thread
161 It is important to remember that this method is called in the thread
162 so that some logic must be done to ensure that the application leve
162 so that some logic must be done to ensure that the application leve
163 handlers are called in the application thread.
163 handlers are called in the application thread.
164 """
164 """
165 raise NotImplementedError('call_handlers must be defined in a subclass.')
165 raise NotImplementedError('call_handlers must be defined in a subclass.')
166
166
167 def execute(self, code):
167 def execute(self, code):
168 """Execute code in the kernel.
168 """Execute code in the kernel.
169
169
170 Parameters
170 Parameters
171 ----------
171 ----------
172 code : str
172 code : str
173 A string of Python code.
173 A string of Python code.
174
174
175 Returns
175 Returns
176 -------
176 -------
177 The msg_id of the message sent.
177 The msg_id of the message sent.
178 """
178 """
179 # Create class for content/msg creation. Related to, but possibly
179 # Create class for content/msg creation. Related to, but possibly
180 # not in Session.
180 # not in Session.
181 content = dict(code=code)
181 content = dict(code=code)
182 msg = self.session.msg('execute_request', content)
182 msg = self.session.msg('execute_request', content)
183 self._queue_request(msg)
183 self._queue_request(msg)
184 return msg['header']['msg_id']
184 return msg['header']['msg_id']
185
185
186 def complete(self, text, line, block=None):
186 def complete(self, text, line, block=None):
187 """Tab complete text, line, block in the kernel's namespace.
187 """Tab complete text, line, block in the kernel's namespace.
188
188
189 Parameters
189 Parameters
190 ----------
190 ----------
191 text : str
191 text : str
192 The text to complete.
192 The text to complete.
193 line : str
193 line : str
194 The full line of text that is the surrounding context for the
194 The full line of text that is the surrounding context for the
195 text to complete.
195 text to complete.
196 block : str
196 block : str
197 The full block of code in which the completion is being requested.
197 The full block of code in which the completion is being requested.
198
198
199 Returns
199 Returns
200 -------
200 -------
201 The msg_id of the message sent.
201 The msg_id of the message sent.
202 """
202 """
203 content = dict(text=text, line=line)
203 content = dict(text=text, line=line)
204 msg = self.session.msg('complete_request', content)
204 msg = self.session.msg('complete_request', content)
205 self._queue_request(msg)
205 self._queue_request(msg)
206 return msg['header']['msg_id']
206 return msg['header']['msg_id']
207
207
208 def object_info(self, oname):
208 def object_info(self, oname):
209 """Get metadata information about an object.
209 """Get metadata information about an object.
210
210
211 Parameters
211 Parameters
212 ----------
212 ----------
213 oname : str
213 oname : str
214 A string specifying the object name.
214 A string specifying the object name.
215
215
216 Returns
216 Returns
217 -------
217 -------
218 The msg_id of the message sent.
218 The msg_id of the message sent.
219 """
219 """
220 content = dict(oname=oname)
220 content = dict(oname=oname)
221 msg = self.session.msg('object_info_request', content)
221 msg = self.session.msg('object_info_request', content)
222 self._queue_request(msg)
222 self._queue_request(msg)
223 return msg['header']['msg_id']
223 return msg['header']['msg_id']
224
224
225 def _handle_events(self, socket, events):
225 def _handle_events(self, socket, events):
226 if events & POLLERR:
226 if events & POLLERR:
227 self._handle_err()
227 self._handle_err()
228 if events & POLLOUT:
228 if events & POLLOUT:
229 self._handle_send()
229 self._handle_send()
230 if events & POLLIN:
230 if events & POLLIN:
231 self._handle_recv()
231 self._handle_recv()
232
232
233 def _handle_recv(self):
233 def _handle_recv(self):
234 msg = self.socket.recv_json()
234 msg = self.socket.recv_json()
235 self.call_handlers(msg)
235 self.call_handlers(msg)
236
236
237 def _handle_send(self):
237 def _handle_send(self):
238 try:
238 try:
239 msg = self.command_queue.get(False)
239 msg = self.command_queue.get(False)
240 except Empty:
240 except Empty:
241 pass
241 pass
242 else:
242 else:
243 self.socket.send_json(msg)
243 self.socket.send_json(msg)
244 if self.command_queue.empty():
244 if self.command_queue.empty():
245 self.drop_io_state(POLLOUT)
245 self.drop_io_state(POLLOUT)
246
246
247 def _handle_err(self):
247 def _handle_err(self):
248 # We don't want to let this go silently, so eventually we should log.
248 # We don't want to let this go silently, so eventually we should log.
249 raise zmq.ZMQError()
249 raise zmq.ZMQError()
250
250
251 def _queue_request(self, msg):
251 def _queue_request(self, msg):
252 self.command_queue.put(msg)
252 self.command_queue.put(msg)
253 self.add_io_state(POLLOUT)
253 self.add_io_state(POLLOUT)
254
254
255
255
256 class SubSocketChannel(ZmqSocketChannel):
256 class SubSocketChannel(ZmqSocketChannel):
257 """The SUB channel which listens for messages that the kernel publishes.
257 """The SUB channel which listens for messages that the kernel publishes.
258 """
258 """
259
259
260 def __init__(self, context, session, address):
260 def __init__(self, context, session, address):
261 super(SubSocketChannel, self).__init__(context, session, address)
261 super(SubSocketChannel, self).__init__(context, session, address)
262
262
263 def run(self):
263 def run(self):
264 """The thread's main activity. Call start() instead."""
264 """The thread's main activity. Call start() instead."""
265 self.socket = self.context.socket(zmq.SUB)
265 self.socket = self.context.socket(zmq.SUB)
266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 self.socket.connect('tcp://%s:%i' % self.address)
268 self.socket.connect('tcp://%s:%i' % self.address)
269 self.ioloop = ioloop.IOLoop()
269 self.ioloop = ioloop.IOLoop()
270 self.iostate = POLLIN|POLLERR
270 self.iostate = POLLIN|POLLERR
271 self.ioloop.add_handler(self.socket, self._handle_events,
271 self.ioloop.add_handler(self.socket, self._handle_events,
272 self.iostate)
272 self.iostate)
273 self.ioloop.start()
273 self.ioloop.start()
274
274
275 def stop(self):
275 def stop(self):
276 self.ioloop.stop()
276 self.ioloop.stop()
277 super(SubSocketChannel, self).stop()
277 super(SubSocketChannel, self).stop()
278
278
279 def call_handlers(self, msg):
279 def call_handlers(self, msg):
280 """This method is called in the ioloop thread when a message arrives.
280 """This method is called in the ioloop thread when a message arrives.
281
281
282 Subclasses should override this method to handle incoming messages.
282 Subclasses should override this method to handle incoming messages.
283 It is important to remember that this method is called in the thread
283 It is important to remember that this method is called in the thread
284 so that some logic must be done to ensure that the application leve
284 so that some logic must be done to ensure that the application leve
285 handlers are called in the application thread.
285 handlers are called in the application thread.
286 """
286 """
287 raise NotImplementedError('call_handlers must be defined in a subclass.')
287 raise NotImplementedError('call_handlers must be defined in a subclass.')
288
288
289 def flush(self, timeout=1.0):
289 def flush(self, timeout=1.0):
290 """Immediately processes all pending messages on the SUB channel.
290 """Immediately processes all pending messages on the SUB channel.
291
291
292 Callers should use this method to ensure that :method:`call_handlers`
292 Callers should use this method to ensure that :method:`call_handlers`
293 has been called for all messages that have been received on the
293 has been called for all messages that have been received on the
294 0MQ SUB socket of this channel.
294 0MQ SUB socket of this channel.
295
295
296 This method is thread safe.
296 This method is thread safe.
297
297
298 Parameters
298 Parameters
299 ----------
299 ----------
300 timeout : float, optional
300 timeout : float, optional
301 The maximum amount of time to spend flushing, in seconds. The
301 The maximum amount of time to spend flushing, in seconds. The
302 default is one second.
302 default is one second.
303 """
303 """
304 # We do the IOLoop callback process twice to ensure that the IOLoop
304 # We do the IOLoop callback process twice to ensure that the IOLoop
305 # gets to perform at least one full poll.
305 # gets to perform at least one full poll.
306 stop_time = time.time() + timeout
306 stop_time = time.time() + timeout
307 for i in xrange(2):
307 for i in xrange(2):
308 self._flushed = False
308 self._flushed = False
309 self.ioloop.add_callback(self._flush)
309 self.ioloop.add_callback(self._flush)
310 while not self._flushed and time.time() < stop_time:
310 while not self._flushed and time.time() < stop_time:
311 time.sleep(0.01)
311 time.sleep(0.01)
312
312
313 def _handle_events(self, socket, events):
313 def _handle_events(self, socket, events):
314 # Turn on and off POLLOUT depending on if we have made a request
314 # Turn on and off POLLOUT depending on if we have made a request
315 if events & POLLERR:
315 if events & POLLERR:
316 self._handle_err()
316 self._handle_err()
317 if events & POLLIN:
317 if events & POLLIN:
318 self._handle_recv()
318 self._handle_recv()
319
319
320 def _handle_err(self):
320 def _handle_err(self):
321 # We don't want to let this go silently, so eventually we should log.
321 # We don't want to let this go silently, so eventually we should log.
322 raise zmq.ZMQError()
322 raise zmq.ZMQError()
323
323
324 def _handle_recv(self):
324 def _handle_recv(self):
325 # Get all of the messages we can
325 # Get all of the messages we can
326 while True:
326 while True:
327 try:
327 try:
328 msg = self.socket.recv_json(zmq.NOBLOCK)
328 msg = self.socket.recv_json(zmq.NOBLOCK)
329 except zmq.ZMQError:
329 except zmq.ZMQError:
330 # Check the errno?
330 # Check the errno?
331 # Will this trigger POLLERR?
331 # Will this trigger POLLERR?
332 break
332 break
333 else:
333 else:
334 self.call_handlers(msg)
334 self.call_handlers(msg)
335
335
336 def _flush(self):
336 def _flush(self):
337 """Callback for :method:`self.flush`."""
337 """Callback for :method:`self.flush`."""
338 self._flushed = True
338 self._flushed = True
339
339
340
340
341 class RepSocketChannel(ZmqSocketChannel):
341 class RepSocketChannel(ZmqSocketChannel):
342 """A reply channel to handle raw_input requests that the kernel makes."""
342 """A reply channel to handle raw_input requests that the kernel makes."""
343
343
344 msg_queue = None
344 msg_queue = None
345
345
346 def __init__(self, context, session, address):
346 def __init__(self, context, session, address):
347 self.msg_queue = Queue()
347 self.msg_queue = Queue()
348 super(RepSocketChannel, self).__init__(context, session, address)
348 super(RepSocketChannel, self).__init__(context, session, address)
349
349
350 def run(self):
350 def run(self):
351 """The thread's main activity. Call start() instead."""
351 """The thread's main activity. Call start() instead."""
352 self.socket = self.context.socket(zmq.XREQ)
352 self.socket = self.context.socket(zmq.XREQ)
353 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
353 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
354 self.socket.connect('tcp://%s:%i' % self.address)
354 self.socket.connect('tcp://%s:%i' % self.address)
355 self.ioloop = ioloop.IOLoop()
355 self.ioloop = ioloop.IOLoop()
356 self.iostate = POLLERR|POLLIN
356 self.iostate = POLLERR|POLLIN
357 self.ioloop.add_handler(self.socket, self._handle_events,
357 self.ioloop.add_handler(self.socket, self._handle_events,
358 self.iostate)
358 self.iostate)
359 self.ioloop.start()
359 self.ioloop.start()
360
360
361 def stop(self):
361 def stop(self):
362 self.ioloop.stop()
362 self.ioloop.stop()
363 super(RepSocketChannel, self).stop()
363 super(RepSocketChannel, self).stop()
364
364
365 def call_handlers(self, msg):
365 def call_handlers(self, msg):
366 """This method is called in the ioloop thread when a message arrives.
366 """This method is called in the ioloop thread when a message arrives.
367
367
368 Subclasses should override this method to handle incoming messages.
368 Subclasses should override this method to handle incoming messages.
369 It is important to remember that this method is called in the thread
369 It is important to remember that this method is called in the thread
370 so that some logic must be done to ensure that the application leve
370 so that some logic must be done to ensure that the application leve
371 handlers are called in the application thread.
371 handlers are called in the application thread.
372 """
372 """
373 raise NotImplementedError('call_handlers must be defined in a subclass.')
373 raise NotImplementedError('call_handlers must be defined in a subclass.')
374
374
375 def input(self, string):
375 def input(self, string):
376 """Send a string of raw input to the kernel."""
376 """Send a string of raw input to the kernel."""
377 content = dict(value=string)
377 content = dict(value=string)
378 msg = self.session.msg('input_reply', content)
378 msg = self.session.msg('input_reply', content)
379 self._queue_reply(msg)
379 self._queue_reply(msg)
380
380
381 def _handle_events(self, socket, events):
381 def _handle_events(self, socket, events):
382 if events & POLLERR:
382 if events & POLLERR:
383 self._handle_err()
383 self._handle_err()
384 if events & POLLOUT:
384 if events & POLLOUT:
385 self._handle_send()
385 self._handle_send()
386 if events & POLLIN:
386 if events & POLLIN:
387 self._handle_recv()
387 self._handle_recv()
388
388
389 def _handle_recv(self):
389 def _handle_recv(self):
390 msg = self.socket.recv_json()
390 msg = self.socket.recv_json()
391 self.call_handlers(msg)
391 self.call_handlers(msg)
392
392
393 def _handle_send(self):
393 def _handle_send(self):
394 try:
394 try:
395 msg = self.msg_queue.get(False)
395 msg = self.msg_queue.get(False)
396 except Empty:
396 except Empty:
397 pass
397 pass
398 else:
398 else:
399 self.socket.send_json(msg)
399 self.socket.send_json(msg)
400 if self.msg_queue.empty():
400 if self.msg_queue.empty():
401 self.drop_io_state(POLLOUT)
401 self.drop_io_state(POLLOUT)
402
402
403 def _handle_err(self):
403 def _handle_err(self):
404 # We don't want to let this go silently, so eventually we should log.
404 # We don't want to let this go silently, so eventually we should log.
405 raise zmq.ZMQError()
405 raise zmq.ZMQError()
406
406
407 def _queue_reply(self, msg):
407 def _queue_reply(self, msg):
408 self.msg_queue.put(msg)
408 self.msg_queue.put(msg)
409 self.add_io_state(POLLOUT)
409 self.add_io_state(POLLOUT)
410
410
411
411
412 #-----------------------------------------------------------------------------
412 #-----------------------------------------------------------------------------
413 # Main kernel manager class
413 # Main kernel manager class
414 #-----------------------------------------------------------------------------
414 #-----------------------------------------------------------------------------
415
415
416 class KernelManager(HasTraits):
416 class KernelManager(HasTraits):
417 """ Manages a kernel for a frontend.
417 """ Manages a kernel for a frontend.
418
418
419 The SUB channel is for the frontend to receive messages published by the
419 The SUB channel is for the frontend to receive messages published by the
420 kernel.
420 kernel.
421
421
422 The REQ channel is for the frontend to make requests of the kernel.
422 The REQ channel is for the frontend to make requests of the kernel.
423
423
424 The REP channel is for the kernel to request stdin (raw_input) from the
424 The REP channel is for the kernel to request stdin (raw_input) from the
425 frontend.
425 frontend.
426 """
426 """
427 # The PyZMQ Context to use for communication with the kernel.
427 # The PyZMQ Context to use for communication with the kernel.
428 context = Instance(zmq.Context,(),{})
428 context = Instance(zmq.Context,(),{})
429
429
430 # The Session to use for communication with the kernel.
430 # The Session to use for communication with the kernel.
431 session = Instance(Session,(),{})
431 session = Instance(Session,(),{})
432
432
433 # The kernel process with which the KernelManager is communicating.
433 # The kernel process with which the KernelManager is communicating.
434 kernel = Instance(Popen)
434 kernel = Instance(Popen)
435
435
436 # The classes to use for the various channels.
436 # The classes to use for the various channels.
437 xreq_channel_class = Type(XReqSocketChannel)
437 xreq_channel_class = Type(XReqSocketChannel)
438 sub_channel_class = Type(SubSocketChannel)
438 sub_channel_class = Type(SubSocketChannel)
439 rep_channel_class = Type(RepSocketChannel)
439 rep_channel_class = Type(RepSocketChannel)
440
440
441 # Protected traits.
441 # Protected traits.
442 xreq_address = TCPAddress((LOCALHOST, 0))
442 xreq_address = TCPAddress((LOCALHOST, 0))
443 sub_address = TCPAddress((LOCALHOST, 0))
443 sub_address = TCPAddress((LOCALHOST, 0))
444 rep_address = TCPAddress((LOCALHOST, 0))
444 rep_address = TCPAddress((LOCALHOST, 0))
445 _xreq_channel = Any
445 _xreq_channel = Any
446 _sub_channel = Any
446 _sub_channel = Any
447 _rep_channel = Any
447 _rep_channel = Any
448
448
449 def __init__(self, **kwargs):
449 def __init__(self, **kwargs):
450 super(KernelManager, self).__init__(**kwargs)
450 super(KernelManager, self).__init__(**kwargs)
451
451
452 #--------------------------------- -----------------------------------------
452 #--------------------------------- -----------------------------------------
453 # Channel management methods:
453 # Channel management methods:
454 #--------------------------------------------------------------------------
454 #--------------------------------------------------------------------------
455
455
456 def start_channels(self):
456 def start_channels(self):
457 """Starts the channels for this kernel.
457 """Starts the channels for this kernel.
458
458
459 This will create the channels if they do not exist and then start
459 This will create the channels if they do not exist and then start
460 them. If port numbers of 0 are being used (random ports) then you
460 them. If port numbers of 0 are being used (random ports) then you
461 must first call :method:`start_kernel`. If the channels have been
461 must first call :method:`start_kernel`. If the channels have been
462 stopped and you call this, :class:`RuntimeError` will be raised.
462 stopped and you call this, :class:`RuntimeError` will be raised.
463 """
463 """
464 self.xreq_channel.start()
464 self.xreq_channel.start()
465 self.sub_channel.start()
465 self.sub_channel.start()
466 self.rep_channel.start()
466 self.rep_channel.start()
467
467
468 def stop_channels(self):
468 def stop_channels(self):
469 """Stops the channels for this kernel.
469 """Stops the channels for this kernel.
470
470
471 This stops the channels by joining their threads. If the channels
471 This stops the channels by joining their threads. If the channels
472 were not started, :class:`RuntimeError` will be raised.
472 were not started, :class:`RuntimeError` will be raised.
473 """
473 """
474 self.xreq_channel.stop()
474 self.xreq_channel.stop()
475 self.sub_channel.stop()
475 self.sub_channel.stop()
476 self.rep_channel.stop()
476 self.rep_channel.stop()
477
477
478 @property
478 @property
479 def channels_running(self):
479 def channels_running(self):
480 """Are all of the channels created and running?"""
480 """Are all of the channels created and running?"""
481 return self.xreq_channel.is_alive() \
481 return self.xreq_channel.is_alive() \
482 and self.sub_channel.is_alive() \
482 and self.sub_channel.is_alive() \
483 and self.rep_channel.is_alive()
483 and self.rep_channel.is_alive()
484
484
485 #--------------------------------------------------------------------------
485 #--------------------------------------------------------------------------
486 # Kernel process management methods:
486 # Kernel process management methods:
487 #--------------------------------------------------------------------------
487 #--------------------------------------------------------------------------
488
488
489 def start_kernel(self):
489 def start_kernel(self):
490 """Starts a kernel process and configures the manager to use it.
490 """Starts a kernel process and configures the manager to use it.
491
491
492 If random ports (port=0) are being used, this method must be called
492 If random ports (port=0) are being used, this method must be called
493 before the channels are created.
493 before the channels are created.
494 """
494 """
495 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
495 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
496 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
496 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
497 raise RuntimeError("Can only launch a kernel on localhost."
497 raise RuntimeError("Can only launch a kernel on localhost."
498 "Make sure that the '*_address' attributes are "
498 "Make sure that the '*_address' attributes are "
499 "configured properly.")
499 "configured properly.")
500
500
501 self.kernel, xrep, pub, req = launch_kernel(
501 self.kernel, xrep, pub, req = launch_kernel(
502 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
502 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
503 self.xreq_address = (LOCALHOST, xrep)
503 self.xreq_address = (LOCALHOST, xrep)
504 self.sub_address = (LOCALHOST, pub)
504 self.sub_address = (LOCALHOST, pub)
505 self.rep_address = (LOCALHOST, req)
505 self.rep_address = (LOCALHOST, req)
506
506
507 @property
507 @property
508 def has_kernel(self):
508 def has_kernel(self):
509 """Returns whether a kernel process has been specified for the kernel
509 """Returns whether a kernel process has been specified for the kernel
510 manager.
510 manager.
511 """
511 """
512 return self.kernel is not None
512 return self.kernel is not None
513
513
514 def kill_kernel(self):
514 def kill_kernel(self):
515 """ Kill the running kernel. """
515 """ Kill the running kernel. """
516 if self.kernel is not None:
516 if self.kernel is not None:
517 self.kernel.kill()
517 self.kernel.kill()
518 self.kernel = None
518 self.kernel = None
519 else:
519 else:
520 raise RuntimeError("Cannot kill kernel. No kernel is running!")
520 raise RuntimeError("Cannot kill kernel. No kernel is running!")
521
521
522 def signal_kernel(self, signum):
522 def signal_kernel(self, signum):
523 """ Sends a signal to the kernel. """
523 """ Sends a signal to the kernel. """
524 if self.kernel is not None:
524 if self.kernel is not None:
525 self.kernel.send_signal(signum)
525 self.kernel.send_signal(signum)
526 else:
526 else:
527 raise RuntimeError("Cannot signal kernel. No kernel is running!")
527 raise RuntimeError("Cannot signal kernel. No kernel is running!")
528
528
529 @property
529 @property
530 def is_alive(self):
530 def is_alive(self):
531 """Is the kernel process still running?"""
531 """Is the kernel process still running?"""
532 if self.kernel is not None:
532 if self.kernel is not None:
533 if self.kernel.poll() is None:
533 if self.kernel.poll() is None:
534 return True
534 return True
535 else:
535 else:
536 return False
536 return False
537 else:
537 else:
538 # We didn't start the kernel with this KernelManager so we don't
538 # We didn't start the kernel with this KernelManager so we don't
539 # know if it is running. We should use a heartbeat for this case.
539 # know if it is running. We should use a heartbeat for this case.
540 return True
540 return True
541
541
542 #--------------------------------------------------------------------------
542 #--------------------------------------------------------------------------
543 # Channels used for communication with the kernel:
543 # Channels used for communication with the kernel:
544 #--------------------------------------------------------------------------
544 #--------------------------------------------------------------------------
545
545
546 @property
546 @property
547 def xreq_channel(self):
547 def xreq_channel(self):
548 """Get the REQ socket channel object to make requests of the kernel."""
548 """Get the REQ socket channel object to make requests of the kernel."""
549 if self._xreq_channel is None:
549 if self._xreq_channel is None:
550 self._xreq_channel = self.xreq_channel_class(self.context,
550 self._xreq_channel = self.xreq_channel_class(self.context,
551 self.session,
551 self.session,
552 self.xreq_address)
552 self.xreq_address)
553 return self._xreq_channel
553 return self._xreq_channel
554
554
555 @property
555 @property
556 def sub_channel(self):
556 def sub_channel(self):
557 """Get the SUB socket channel object."""
557 """Get the SUB socket channel object."""
558 if self._sub_channel is None:
558 if self._sub_channel is None:
559 self._sub_channel = self.sub_channel_class(self.context,
559 self._sub_channel = self.sub_channel_class(self.context,
560 self.session,
560 self.session,
561 self.sub_address)
561 self.sub_address)
562 return self._sub_channel
562 return self._sub_channel
563
563
564 @property
564 @property
565 def rep_channel(self):
565 def rep_channel(self):
566 """Get the REP socket channel object to handle stdin (raw_input)."""
566 """Get the REP socket channel object to handle stdin (raw_input)."""
567 if self._rep_channel is None:
567 if self._rep_channel is None:
568 self._rep_channel = self.rep_channel_class(self.context,
568 self._rep_channel = self.rep_channel_class(self.context,
569 self.session,
569 self.session,
570 self.rep_address)
570 self.rep_address)
571 return self._rep_channel
571 return self._rep_channel
@@ -1,359 +1,359 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 # Standard library imports.
17 # Standard library imports.
18 import __builtin__
18 import __builtin__
19 from code import CommandCompiler
19 from code import CommandCompiler
20 import os
20 import os
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24
24
25 # System library imports.
25 # System library imports.
26 import zmq
26 import zmq
27
27
28 # Local imports.
28 # Local imports.
29 from IPython.external.argparse import ArgumentParser
29 from IPython.external.argparse import ArgumentParser
30 from session import Session, Message
30 from session import Session, Message
31 from completer import KernelCompleter
31 from completer import KernelCompleter
32 from .iostream import OutStream
32 from iostream import OutStream
33 from .displayhook import DisplayHook
33 from displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
34 from exitpoller import ExitPollerUnix, ExitPollerWindows
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Main kernel class
37 # Main kernel class
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 class Kernel(object):
40 class Kernel(object):
41
41
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
43 self.session = session
43 self.session = session
44 self.reply_socket = reply_socket
44 self.reply_socket = reply_socket
45 self.pub_socket = pub_socket
45 self.pub_socket = pub_socket
46 self.req_socket = req_socket
46 self.req_socket = req_socket
47 self.user_ns = {}
47 self.user_ns = {}
48 self.history = []
48 self.history = []
49 self.compiler = CommandCompiler()
49 self.compiler = CommandCompiler()
50 self.completer = KernelCompleter(self.user_ns)
50 self.completer = KernelCompleter(self.user_ns)
51
51
52 # Build dict of handlers for message types
52 # Build dict of handlers for message types
53 msg_types = [ 'execute_request', 'complete_request',
53 msg_types = [ 'execute_request', 'complete_request',
54 'object_info_request' ]
54 'object_info_request' ]
55 self.handlers = {}
55 self.handlers = {}
56 for msg_type in msg_types:
56 for msg_type in msg_types:
57 self.handlers[msg_type] = getattr(self, msg_type)
57 self.handlers[msg_type] = getattr(self, msg_type)
58
58
59 def abort_queue(self):
59 def abort_queue(self):
60 while True:
60 while True:
61 try:
61 try:
62 ident = self.reply_socket.recv(zmq.NOBLOCK)
62 ident = self.reply_socket.recv(zmq.NOBLOCK)
63 except zmq.ZMQError, e:
63 except zmq.ZMQError, e:
64 if e.errno == zmq.EAGAIN:
64 if e.errno == zmq.EAGAIN:
65 break
65 break
66 else:
66 else:
67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
68 msg = self.reply_socket.recv_json()
68 msg = self.reply_socket.recv_json()
69 print>>sys.__stdout__, "Aborting:"
69 print>>sys.__stdout__, "Aborting:"
70 print>>sys.__stdout__, Message(msg)
70 print>>sys.__stdout__, Message(msg)
71 msg_type = msg['msg_type']
71 msg_type = msg['msg_type']
72 reply_type = msg_type.split('_')[0] + '_reply'
72 reply_type = msg_type.split('_')[0] + '_reply'
73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
74 print>>sys.__stdout__, Message(reply_msg)
74 print>>sys.__stdout__, Message(reply_msg)
75 self.reply_socket.send(ident,zmq.SNDMORE)
75 self.reply_socket.send(ident,zmq.SNDMORE)
76 self.reply_socket.send_json(reply_msg)
76 self.reply_socket.send_json(reply_msg)
77 # We need to wait a bit for requests to come in. This can probably
77 # We need to wait a bit for requests to come in. This can probably
78 # be set shorter for true asynchronous clients.
78 # be set shorter for true asynchronous clients.
79 time.sleep(0.1)
79 time.sleep(0.1)
80
80
81 def execute_request(self, ident, parent):
81 def execute_request(self, ident, parent):
82 try:
82 try:
83 code = parent[u'content'][u'code']
83 code = parent[u'content'][u'code']
84 except:
84 except:
85 print>>sys.__stderr__, "Got bad msg: "
85 print>>sys.__stderr__, "Got bad msg: "
86 print>>sys.__stderr__, Message(parent)
86 print>>sys.__stderr__, Message(parent)
87 return
87 return
88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
89 self.pub_socket.send_json(pyin_msg)
89 self.pub_socket.send_json(pyin_msg)
90
90
91 try:
91 try:
92 comp_code = self.compiler(code, '<zmq-kernel>')
92 comp_code = self.compiler(code, '<zmq-kernel>')
93
93
94 # Replace raw_input. Note that is not sufficient to replace
94 # Replace raw_input. Note that is not sufficient to replace
95 # raw_input in the user namespace.
95 # raw_input in the user namespace.
96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
97 __builtin__.raw_input = raw_input
97 __builtin__.raw_input = raw_input
98
98
99 # Configure the display hook.
99 # Configure the display hook.
100 sys.displayhook.set_parent(parent)
100 sys.displayhook.set_parent(parent)
101
101
102 exec comp_code in self.user_ns, self.user_ns
102 exec comp_code in self.user_ns, self.user_ns
103 except:
103 except:
104 etype, evalue, tb = sys.exc_info()
104 etype, evalue, tb = sys.exc_info()
105 tb = traceback.format_exception(etype, evalue, tb)
105 tb = traceback.format_exception(etype, evalue, tb)
106 exc_content = {
106 exc_content = {
107 u'status' : u'error',
107 u'status' : u'error',
108 u'traceback' : tb,
108 u'traceback' : tb,
109 u'ename' : unicode(etype.__name__),
109 u'ename' : unicode(etype.__name__),
110 u'evalue' : unicode(evalue)
110 u'evalue' : unicode(evalue)
111 }
111 }
112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
113 self.pub_socket.send_json(exc_msg)
113 self.pub_socket.send_json(exc_msg)
114 reply_content = exc_content
114 reply_content = exc_content
115 else:
115 else:
116 reply_content = {'status' : 'ok'}
116 reply_content = {'status' : 'ok'}
117
117
118 # Flush output before sending the reply.
118 # Flush output before sending the reply.
119 sys.stderr.flush()
119 sys.stderr.flush()
120 sys.stdout.flush()
120 sys.stdout.flush()
121
121
122 # Send the reply.
122 # Send the reply.
123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
124 print>>sys.__stdout__, Message(reply_msg)
124 print>>sys.__stdout__, Message(reply_msg)
125 self.reply_socket.send(ident, zmq.SNDMORE)
125 self.reply_socket.send(ident, zmq.SNDMORE)
126 self.reply_socket.send_json(reply_msg)
126 self.reply_socket.send_json(reply_msg)
127 if reply_msg['content']['status'] == u'error':
127 if reply_msg['content']['status'] == u'error':
128 self.abort_queue()
128 self.abort_queue()
129
129
130 def raw_input(self, prompt, ident, parent):
130 def raw_input(self, prompt, ident, parent):
131 # Flush output before making the request.
131 # Flush output before making the request.
132 sys.stderr.flush()
132 sys.stderr.flush()
133 sys.stdout.flush()
133 sys.stdout.flush()
134
134
135 # Send the input request.
135 # Send the input request.
136 content = dict(prompt=prompt)
136 content = dict(prompt=prompt)
137 msg = self.session.msg(u'input_request', content, parent)
137 msg = self.session.msg(u'input_request', content, parent)
138 self.req_socket.send_json(msg)
138 self.req_socket.send_json(msg)
139
139
140 # Await a response.
140 # Await a response.
141 reply = self.req_socket.recv_json()
141 reply = self.req_socket.recv_json()
142 try:
142 try:
143 value = reply['content']['value']
143 value = reply['content']['value']
144 except:
144 except:
145 print>>sys.__stderr__, "Got bad raw_input reply: "
145 print>>sys.__stderr__, "Got bad raw_input reply: "
146 print>>sys.__stderr__, Message(parent)
146 print>>sys.__stderr__, Message(parent)
147 value = ''
147 value = ''
148 return value
148 return value
149
149
150 def complete_request(self, ident, parent):
150 def complete_request(self, ident, parent):
151 matches = {'matches' : self.complete(parent),
151 matches = {'matches' : self.complete(parent),
152 'status' : 'ok'}
152 'status' : 'ok'}
153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
154 matches, parent, ident)
154 matches, parent, ident)
155 print >> sys.__stdout__, completion_msg
155 print >> sys.__stdout__, completion_msg
156
156
157 def complete(self, msg):
157 def complete(self, msg):
158 return self.completer.complete(msg.content.line, msg.content.text)
158 return self.completer.complete(msg.content.line, msg.content.text)
159
159
160 def object_info_request(self, ident, parent):
160 def object_info_request(self, ident, parent):
161 context = parent['content']['oname'].split('.')
161 context = parent['content']['oname'].split('.')
162 object_info = self.object_info(context)
162 object_info = self.object_info(context)
163 msg = self.session.send(self.reply_socket, 'object_info_reply',
163 msg = self.session.send(self.reply_socket, 'object_info_reply',
164 object_info, parent, ident)
164 object_info, parent, ident)
165 print >> sys.__stdout__, msg
165 print >> sys.__stdout__, msg
166
166
167 def object_info(self, context):
167 def object_info(self, context):
168 symbol, leftover = self.symbol_from_context(context)
168 symbol, leftover = self.symbol_from_context(context)
169 if symbol is not None and not leftover:
169 if symbol is not None and not leftover:
170 doc = getattr(symbol, '__doc__', '')
170 doc = getattr(symbol, '__doc__', '')
171 else:
171 else:
172 doc = ''
172 doc = ''
173 object_info = dict(docstring = doc)
173 object_info = dict(docstring = doc)
174 return object_info
174 return object_info
175
175
176 def symbol_from_context(self, context):
176 def symbol_from_context(self, context):
177 if not context:
177 if not context:
178 return None, context
178 return None, context
179
179
180 base_symbol_string = context[0]
180 base_symbol_string = context[0]
181 symbol = self.user_ns.get(base_symbol_string, None)
181 symbol = self.user_ns.get(base_symbol_string, None)
182 if symbol is None:
182 if symbol is None:
183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
184 if symbol is None:
184 if symbol is None:
185 return None, context
185 return None, context
186
186
187 context = context[1:]
187 context = context[1:]
188 for i, name in enumerate(context):
188 for i, name in enumerate(context):
189 new_symbol = getattr(symbol, name, None)
189 new_symbol = getattr(symbol, name, None)
190 if new_symbol is None:
190 if new_symbol is None:
191 return symbol, context[i:]
191 return symbol, context[i:]
192 else:
192 else:
193 symbol = new_symbol
193 symbol = new_symbol
194
194
195 return symbol, []
195 return symbol, []
196
196
197 def start(self):
197 def start(self):
198 while True:
198 while True:
199 ident = self.reply_socket.recv()
199 ident = self.reply_socket.recv()
200 assert self.reply_socket.rcvmore(), "Missing message part."
200 assert self.reply_socket.rcvmore(), "Missing message part."
201 msg = self.reply_socket.recv_json()
201 msg = self.reply_socket.recv_json()
202 omsg = Message(msg)
202 omsg = Message(msg)
203 print>>sys.__stdout__
203 print>>sys.__stdout__
204 print>>sys.__stdout__, omsg
204 print>>sys.__stdout__, omsg
205 handler = self.handlers.get(omsg.msg_type, None)
205 handler = self.handlers.get(omsg.msg_type, None)
206 if handler is None:
206 if handler is None:
207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
208 else:
208 else:
209 handler(ident, omsg)
209 handler(ident, omsg)
210
210
211 #-----------------------------------------------------------------------------
211 #-----------------------------------------------------------------------------
212 # Kernel main and launch functions
212 # Kernel main and launch functions
213 #-----------------------------------------------------------------------------
213 #-----------------------------------------------------------------------------
214
214
215 def bind_port(socket, ip, port):
215 def bind_port(socket, ip, port):
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
217 port is chosen. Returns the port that was bound.
217 port is chosen. Returns the port that was bound.
218 """
218 """
219 connection = 'tcp://%s' % ip
219 connection = 'tcp://%s' % ip
220 if port <= 0:
220 if port <= 0:
221 port = socket.bind_to_random_port(connection)
221 port = socket.bind_to_random_port(connection)
222 else:
222 else:
223 connection += ':%i' % port
223 connection += ':%i' % port
224 socket.bind(connection)
224 socket.bind(connection)
225 return port
225 return port
226
226
227
227
228 def main():
228 def main():
229 """ Main entry point for launching a kernel.
229 """ Main entry point for launching a kernel.
230 """
230 """
231 # Parse command line arguments.
231 # Parse command line arguments.
232 parser = ArgumentParser()
232 parser = ArgumentParser()
233 parser.add_argument('--ip', type=str, default='127.0.0.1',
233 parser.add_argument('--ip', type=str, default='127.0.0.1',
234 help='set the kernel\'s IP address [default: local]')
234 help='set the kernel\'s IP address [default: local]')
235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
236 help='set the XREP channel port [default: random]')
236 help='set the XREP channel port [default: random]')
237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
238 help='set the PUB channel port [default: random]')
238 help='set the PUB channel port [default: random]')
239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
240 help='set the REQ channel port [default: random]')
240 help='set the REQ channel port [default: random]')
241 if sys.platform == 'win32':
241 if sys.platform == 'win32':
242 parser.add_argument('--parent', type=int, metavar='HANDLE',
242 parser.add_argument('--parent', type=int, metavar='HANDLE',
243 default=0, help='kill this process if the process '
243 default=0, help='kill this process if the process '
244 'with HANDLE dies')
244 'with HANDLE dies')
245 else:
245 else:
246 parser.add_argument('--parent', action='store_true',
246 parser.add_argument('--parent', action='store_true',
247 help='kill this process if its parent dies')
247 help='kill this process if its parent dies')
248 namespace = parser.parse_args()
248 namespace = parser.parse_args()
249
249
250 # Create a context, a session, and the kernel sockets.
250 # Create a context, a session, and the kernel sockets.
251 print >>sys.__stdout__, "Starting the kernel..."
251 print >>sys.__stdout__, "Starting the kernel..."
252 context = zmq.Context()
252 context = zmq.Context()
253 session = Session(username=u'kernel')
253 session = Session(username=u'kernel')
254
254
255 reply_socket = context.socket(zmq.XREP)
255 reply_socket = context.socket(zmq.XREP)
256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
258
258
259 pub_socket = context.socket(zmq.PUB)
259 pub_socket = context.socket(zmq.PUB)
260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
261 print >>sys.__stdout__, "PUB Channel on port", pub_port
261 print >>sys.__stdout__, "PUB Channel on port", pub_port
262
262
263 req_socket = context.socket(zmq.XREQ)
263 req_socket = context.socket(zmq.XREQ)
264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
265 print >>sys.__stdout__, "REQ Channel on port", req_port
265 print >>sys.__stdout__, "REQ Channel on port", req_port
266
266
267 # Redirect input streams and set a display hook.
267 # Redirect input streams and set a display hook.
268 sys.stdout = OutStream(session, pub_socket, u'stdout')
268 sys.stdout = OutStream(session, pub_socket, u'stdout')
269 sys.stderr = OutStream(session, pub_socket, u'stderr')
269 sys.stderr = OutStream(session, pub_socket, u'stderr')
270 sys.displayhook = DisplayHook(session, pub_socket)
270 sys.displayhook = DisplayHook(session, pub_socket)
271
271
272 # Create the kernel.
272 # Create the kernel.
273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
274
274
275 # Configure this kernel/process to die on parent termination, if necessary.
275 # Configure this kernel/process to die on parent termination, if necessary.
276 if namespace.parent:
276 if namespace.parent:
277 if sys.platform == 'win32':
277 if sys.platform == 'win32':
278 poller = ExitPollerWindows(namespace.parent)
278 poller = ExitPollerWindows(namespace.parent)
279 else:
279 else:
280 poller = ExitPollerUnix()
280 poller = ExitPollerUnix()
281 poller.start()
281 poller.start()
282
282
283 # Start the kernel mainloop.
283 # Start the kernel mainloop.
284 kernel.start()
284 kernel.start()
285
285
286
286
287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
288 """ Launches a localhost kernel, binding to the specified ports.
288 """ Launches a localhost kernel, binding to the specified ports.
289
289
290 Parameters
290 Parameters
291 ----------
291 ----------
292 xrep_port : int, optional
292 xrep_port : int, optional
293 The port to use for XREP channel.
293 The port to use for XREP channel.
294
294
295 pub_port : int, optional
295 pub_port : int, optional
296 The port to use for the SUB channel.
296 The port to use for the SUB channel.
297
297
298 req_port : int, optional
298 req_port : int, optional
299 The port to use for the REQ (raw input) channel.
299 The port to use for the REQ (raw input) channel.
300
300
301 independent : bool, optional (default False)
301 independent : bool, optional (default False)
302 If set, the kernel process is guaranteed to survive if this process
302 If set, the kernel process is guaranteed to survive if this process
303 dies. If not set, an effort is made to ensure that the kernel is killed
303 dies. If not set, an effort is made to ensure that the kernel is killed
304 when this process dies. Note that in this case it is still good practice
304 when this process dies. Note that in this case it is still good practice
305 to kill kernels manually before exiting.
305 to kill kernels manually before exiting.
306
306
307 Returns
307 Returns
308 -------
308 -------
309 A tuple of form:
309 A tuple of form:
310 (kernel_process, xrep_port, pub_port, req_port)
310 (kernel_process, xrep_port, pub_port, req_port)
311 where kernel_process is a Popen object and the ports are integers.
311 where kernel_process is a Popen object and the ports are integers.
312 """
312 """
313 import socket
313 import socket
314 from subprocess import Popen
314 from subprocess import Popen
315
315
316 # Find open ports as necessary.
316 # Find open ports as necessary.
317 ports = []
317 ports = []
318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
319 for i in xrange(ports_needed):
319 for i in xrange(ports_needed):
320 sock = socket.socket()
320 sock = socket.socket()
321 sock.bind(('', 0))
321 sock.bind(('', 0))
322 ports.append(sock)
322 ports.append(sock)
323 for i, sock in enumerate(ports):
323 for i, sock in enumerate(ports):
324 port = sock.getsockname()[1]
324 port = sock.getsockname()[1]
325 sock.close()
325 sock.close()
326 ports[i] = port
326 ports[i] = port
327 if xrep_port <= 0:
327 if xrep_port <= 0:
328 xrep_port = ports.pop(0)
328 xrep_port = ports.pop(0)
329 if pub_port <= 0:
329 if pub_port <= 0:
330 pub_port = ports.pop(0)
330 pub_port = ports.pop(0)
331 if req_port <= 0:
331 if req_port <= 0:
332 req_port = ports.pop(0)
332 req_port = ports.pop(0)
333
333
334 # Spawn a kernel.
334 # Spawn a kernel.
335 command = 'from IPython.zmq.pykernel import main; main()'
335 command = 'from IPython.zmq.pykernel import main; main()'
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
337 '--pub', str(pub_port), '--req', str(req_port) ]
337 '--pub', str(pub_port), '--req', str(req_port) ]
338 if independent:
338 if independent:
339 if sys.platform == 'win32':
339 if sys.platform == 'win32':
340 proc = Popen(['start', '/b'] + arguments, shell=True)
340 proc = Popen(['start', '/b'] + arguments, shell=True)
341 else:
341 else:
342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
343 else:
343 else:
344 if sys.platform == 'win32':
344 if sys.platform == 'win32':
345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
346 DUPLICATE_SAME_ACCESS
346 DUPLICATE_SAME_ACCESS
347 pid = GetCurrentProcess()
347 pid = GetCurrentProcess()
348 handle = DuplicateHandle(pid, pid, pid, 0,
348 handle = DuplicateHandle(pid, pid, pid, 0,
349 True, # Inheritable by new processes.
349 True, # Inheritable by new processes.
350 DUPLICATE_SAME_ACCESS)
350 DUPLICATE_SAME_ACCESS)
351 proc = Popen(arguments + ['--parent', str(int(handle))])
351 proc = Popen(arguments + ['--parent', str(int(handle))])
352 else:
352 else:
353 proc = Popen(arguments + ['--parent'])
353 proc = Popen(arguments + ['--parent'])
354
354
355 return proc, xrep_port, pub_port, req_port
355 return proc, xrep_port, pub_port, req_port
356
356
357
357
358 if __name__ == '__main__':
358 if __name__ == '__main__':
359 main()
359 main()
General Comments 0
You need to be logged in to leave comments. Login now