##// END OF EJS Templates
Remaned kernel.py to pykernel.py and create ipkernel.py.
Brian Granger -
Show More
@@ -0,0 +1,359 b''
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
4 Things to do:
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
9 * Implement control messages.
10 * Implement event loop and poll version.
11 """
12
13 #-----------------------------------------------------------------------------
14 # Imports
15 #-----------------------------------------------------------------------------
16
17 # Standard library imports.
18 import __builtin__
19 from code import CommandCompiler
20 import os
21 import sys
22 import time
23 import traceback
24
25 # System library imports.
26 import zmq
27
28 # Local imports.
29 from IPython.external.argparse import ArgumentParser
30 from session import Session, Message
31 from completer import KernelCompleter
32 from .iostream import OutStream
33 from .displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
35
36 #-----------------------------------------------------------------------------
37 # Main kernel class
38 #-----------------------------------------------------------------------------
39
40 class Kernel(object):
41
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
43 self.session = session
44 self.reply_socket = reply_socket
45 self.pub_socket = pub_socket
46 self.req_socket = req_socket
47 self.user_ns = {}
48 self.history = []
49 self.compiler = CommandCompiler()
50 self.completer = KernelCompleter(self.user_ns)
51
52 # Build dict of handlers for message types
53 msg_types = [ 'execute_request', 'complete_request',
54 'object_info_request' ]
55 self.handlers = {}
56 for msg_type in msg_types:
57 self.handlers[msg_type] = getattr(self, msg_type)
58
59 def abort_queue(self):
60 while True:
61 try:
62 ident = self.reply_socket.recv(zmq.NOBLOCK)
63 except zmq.ZMQError, e:
64 if e.errno == zmq.EAGAIN:
65 break
66 else:
67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
68 msg = self.reply_socket.recv_json()
69 print>>sys.__stdout__, "Aborting:"
70 print>>sys.__stdout__, Message(msg)
71 msg_type = msg['msg_type']
72 reply_type = msg_type.split('_')[0] + '_reply'
73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
74 print>>sys.__stdout__, Message(reply_msg)
75 self.reply_socket.send(ident,zmq.SNDMORE)
76 self.reply_socket.send_json(reply_msg)
77 # We need to wait a bit for requests to come in. This can probably
78 # be set shorter for true asynchronous clients.
79 time.sleep(0.1)
80
81 def execute_request(self, ident, parent):
82 try:
83 code = parent[u'content'][u'code']
84 except:
85 print>>sys.__stderr__, "Got bad msg: "
86 print>>sys.__stderr__, Message(parent)
87 return
88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
89 self.pub_socket.send_json(pyin_msg)
90
91 try:
92 comp_code = self.compiler(code, '<zmq-kernel>')
93
94 # Replace raw_input. Note that is not sufficient to replace
95 # raw_input in the user namespace.
96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
97 __builtin__.raw_input = raw_input
98
99 # Configure the display hook.
100 sys.displayhook.set_parent(parent)
101
102 exec comp_code in self.user_ns, self.user_ns
103 except:
104 etype, evalue, tb = sys.exc_info()
105 tb = traceback.format_exception(etype, evalue, tb)
106 exc_content = {
107 u'status' : u'error',
108 u'traceback' : tb,
109 u'ename' : unicode(etype.__name__),
110 u'evalue' : unicode(evalue)
111 }
112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
113 self.pub_socket.send_json(exc_msg)
114 reply_content = exc_content
115 else:
116 reply_content = {'status' : 'ok'}
117
118 # Flush output before sending the reply.
119 sys.stderr.flush()
120 sys.stdout.flush()
121
122 # Send the reply.
123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
124 print>>sys.__stdout__, Message(reply_msg)
125 self.reply_socket.send(ident, zmq.SNDMORE)
126 self.reply_socket.send_json(reply_msg)
127 if reply_msg['content']['status'] == u'error':
128 self.abort_queue()
129
130 def raw_input(self, prompt, ident, parent):
131 # Flush output before making the request.
132 sys.stderr.flush()
133 sys.stdout.flush()
134
135 # Send the input request.
136 content = dict(prompt=prompt)
137 msg = self.session.msg(u'input_request', content, parent)
138 self.req_socket.send_json(msg)
139
140 # Await a response.
141 reply = self.req_socket.recv_json()
142 try:
143 value = reply['content']['value']
144 except:
145 print>>sys.__stderr__, "Got bad raw_input reply: "
146 print>>sys.__stderr__, Message(parent)
147 value = ''
148 return value
149
150 def complete_request(self, ident, parent):
151 matches = {'matches' : self.complete(parent),
152 'status' : 'ok'}
153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
154 matches, parent, ident)
155 print >> sys.__stdout__, completion_msg
156
157 def complete(self, msg):
158 return self.completer.complete(msg.content.line, msg.content.text)
159
160 def object_info_request(self, ident, parent):
161 context = parent['content']['oname'].split('.')
162 object_info = self.object_info(context)
163 msg = self.session.send(self.reply_socket, 'object_info_reply',
164 object_info, parent, ident)
165 print >> sys.__stdout__, msg
166
167 def object_info(self, context):
168 symbol, leftover = self.symbol_from_context(context)
169 if symbol is not None and not leftover:
170 doc = getattr(symbol, '__doc__', '')
171 else:
172 doc = ''
173 object_info = dict(docstring = doc)
174 return object_info
175
176 def symbol_from_context(self, context):
177 if not context:
178 return None, context
179
180 base_symbol_string = context[0]
181 symbol = self.user_ns.get(base_symbol_string, None)
182 if symbol is None:
183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
184 if symbol is None:
185 return None, context
186
187 context = context[1:]
188 for i, name in enumerate(context):
189 new_symbol = getattr(symbol, name, None)
190 if new_symbol is None:
191 return symbol, context[i:]
192 else:
193 symbol = new_symbol
194
195 return symbol, []
196
197 def start(self):
198 while True:
199 ident = self.reply_socket.recv()
200 assert self.reply_socket.rcvmore(), "Missing message part."
201 msg = self.reply_socket.recv_json()
202 omsg = Message(msg)
203 print>>sys.__stdout__
204 print>>sys.__stdout__, omsg
205 handler = self.handlers.get(omsg.msg_type, None)
206 if handler is None:
207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
208 else:
209 handler(ident, omsg)
210
211 #-----------------------------------------------------------------------------
212 # Kernel main and launch functions
213 #-----------------------------------------------------------------------------
214
215 def bind_port(socket, ip, port):
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
217 port is chosen. Returns the port that was bound.
218 """
219 connection = 'tcp://%s' % ip
220 if port <= 0:
221 port = socket.bind_to_random_port(connection)
222 else:
223 connection += ':%i' % port
224 socket.bind(connection)
225 return port
226
227
228 def main():
229 """ Main entry point for launching a kernel.
230 """
231 # Parse command line arguments.
232 parser = ArgumentParser()
233 parser.add_argument('--ip', type=str, default='127.0.0.1',
234 help='set the kernel\'s IP address [default: local]')
235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
236 help='set the XREP channel port [default: random]')
237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
238 help='set the PUB channel port [default: random]')
239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
240 help='set the REQ channel port [default: random]')
241 if sys.platform == 'win32':
242 parser.add_argument('--parent', type=int, metavar='HANDLE',
243 default=0, help='kill this process if the process '
244 'with HANDLE dies')
245 else:
246 parser.add_argument('--parent', action='store_true',
247 help='kill this process if its parent dies')
248 namespace = parser.parse_args()
249
250 # Create a context, a session, and the kernel sockets.
251 print >>sys.__stdout__, "Starting the kernel..."
252 context = zmq.Context()
253 session = Session(username=u'kernel')
254
255 reply_socket = context.socket(zmq.XREP)
256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
258
259 pub_socket = context.socket(zmq.PUB)
260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
261 print >>sys.__stdout__, "PUB Channel on port", pub_port
262
263 req_socket = context.socket(zmq.XREQ)
264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
265 print >>sys.__stdout__, "REQ Channel on port", req_port
266
267 # Redirect input streams and set a display hook.
268 sys.stdout = OutStream(session, pub_socket, u'stdout')
269 sys.stderr = OutStream(session, pub_socket, u'stderr')
270 sys.displayhook = DisplayHook(session, pub_socket)
271
272 # Create the kernel.
273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
274
275 # Configure this kernel/process to die on parent termination, if necessary.
276 if namespace.parent:
277 if sys.platform == 'win32':
278 poller = ExitPollerWindows(namespace.parent)
279 else:
280 poller = ExitPollerUnix()
281 poller.start()
282
283 # Start the kernel mainloop.
284 kernel.start()
285
286
287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
288 """ Launches a localhost kernel, binding to the specified ports.
289
290 Parameters
291 ----------
292 xrep_port : int, optional
293 The port to use for XREP channel.
294
295 pub_port : int, optional
296 The port to use for the SUB channel.
297
298 req_port : int, optional
299 The port to use for the REQ (raw input) channel.
300
301 independent : bool, optional (default False)
302 If set, the kernel process is guaranteed to survive if this process
303 dies. If not set, an effort is made to ensure that the kernel is killed
304 when this process dies. Note that in this case it is still good practice
305 to kill kernels manually before exiting.
306
307 Returns
308 -------
309 A tuple of form:
310 (kernel_process, xrep_port, pub_port, req_port)
311 where kernel_process is a Popen object and the ports are integers.
312 """
313 import socket
314 from subprocess import Popen
315
316 # Find open ports as necessary.
317 ports = []
318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
319 for i in xrange(ports_needed):
320 sock = socket.socket()
321 sock.bind(('', 0))
322 ports.append(sock)
323 for i, sock in enumerate(ports):
324 port = sock.getsockname()[1]
325 sock.close()
326 ports[i] = port
327 if xrep_port <= 0:
328 xrep_port = ports.pop(0)
329 if pub_port <= 0:
330 pub_port = ports.pop(0)
331 if req_port <= 0:
332 req_port = ports.pop(0)
333
334 # Spawn a kernel.
335 command = 'from IPython.zmq.pykernel import main; main()'
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
337 '--pub', str(pub_port), '--req', str(req_port) ]
338 if independent:
339 if sys.platform == 'win32':
340 proc = Popen(['start', '/b'] + arguments, shell=True)
341 else:
342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
343 else:
344 if sys.platform == 'win32':
345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
346 DUPLICATE_SAME_ACCESS
347 pid = GetCurrentProcess()
348 handle = DuplicateHandle(pid, pid, pid, 0,
349 True, # Inheritable by new processes.
350 DUPLICATE_SAME_ACCESS)
351 proc = Popen(arguments + ['--parent', str(int(handle))])
352 else:
353 proc = Popen(arguments + ['--parent'])
354
355 return proc, xrep_port, pub_port, req_port
356
357
358 if __name__ == '__main__':
359 main()
@@ -1,359 +1,359 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 # Standard library imports.
18 18 import __builtin__
19 19 from code import CommandCompiler
20 20 import os
21 21 import sys
22 22 import time
23 23 import traceback
24 24
25 25 # System library imports.
26 26 import zmq
27 27
28 28 # Local imports.
29 29 from IPython.external.argparse import ArgumentParser
30 30 from session import Session, Message
31 31 from completer import KernelCompleter
32 32 from .iostream import OutStream
33 33 from .displayhook import DisplayHook
34 34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Main kernel class
38 38 #-----------------------------------------------------------------------------
39 39
40 40 class Kernel(object):
41 41
42 42 def __init__(self, session, reply_socket, pub_socket, req_socket):
43 43 self.session = session
44 44 self.reply_socket = reply_socket
45 45 self.pub_socket = pub_socket
46 46 self.req_socket = req_socket
47 47 self.user_ns = {}
48 48 self.history = []
49 49 self.compiler = CommandCompiler()
50 50 self.completer = KernelCompleter(self.user_ns)
51 51
52 52 # Build dict of handlers for message types
53 53 msg_types = [ 'execute_request', 'complete_request',
54 54 'object_info_request' ]
55 55 self.handlers = {}
56 56 for msg_type in msg_types:
57 57 self.handlers[msg_type] = getattr(self, msg_type)
58 58
59 59 def abort_queue(self):
60 60 while True:
61 61 try:
62 62 ident = self.reply_socket.recv(zmq.NOBLOCK)
63 63 except zmq.ZMQError, e:
64 64 if e.errno == zmq.EAGAIN:
65 65 break
66 66 else:
67 67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
68 68 msg = self.reply_socket.recv_json()
69 69 print>>sys.__stdout__, "Aborting:"
70 70 print>>sys.__stdout__, Message(msg)
71 71 msg_type = msg['msg_type']
72 72 reply_type = msg_type.split('_')[0] + '_reply'
73 73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
74 74 print>>sys.__stdout__, Message(reply_msg)
75 75 self.reply_socket.send(ident,zmq.SNDMORE)
76 76 self.reply_socket.send_json(reply_msg)
77 77 # We need to wait a bit for requests to come in. This can probably
78 78 # be set shorter for true asynchronous clients.
79 79 time.sleep(0.1)
80 80
81 81 def execute_request(self, ident, parent):
82 82 try:
83 83 code = parent[u'content'][u'code']
84 84 except:
85 85 print>>sys.__stderr__, "Got bad msg: "
86 86 print>>sys.__stderr__, Message(parent)
87 87 return
88 88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
89 89 self.pub_socket.send_json(pyin_msg)
90 90
91 91 try:
92 92 comp_code = self.compiler(code, '<zmq-kernel>')
93 93
94 94 # Replace raw_input. Note that is not sufficient to replace
95 95 # raw_input in the user namespace.
96 96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
97 97 __builtin__.raw_input = raw_input
98 98
99 99 # Configure the display hook.
100 100 sys.displayhook.set_parent(parent)
101 101
102 102 exec comp_code in self.user_ns, self.user_ns
103 103 except:
104 104 etype, evalue, tb = sys.exc_info()
105 105 tb = traceback.format_exception(etype, evalue, tb)
106 106 exc_content = {
107 107 u'status' : u'error',
108 108 u'traceback' : tb,
109 109 u'ename' : unicode(etype.__name__),
110 110 u'evalue' : unicode(evalue)
111 111 }
112 112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
113 113 self.pub_socket.send_json(exc_msg)
114 114 reply_content = exc_content
115 115 else:
116 116 reply_content = {'status' : 'ok'}
117 117
118 118 # Flush output before sending the reply.
119 119 sys.stderr.flush()
120 120 sys.stdout.flush()
121 121
122 122 # Send the reply.
123 123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
124 124 print>>sys.__stdout__, Message(reply_msg)
125 125 self.reply_socket.send(ident, zmq.SNDMORE)
126 126 self.reply_socket.send_json(reply_msg)
127 127 if reply_msg['content']['status'] == u'error':
128 128 self.abort_queue()
129 129
130 130 def raw_input(self, prompt, ident, parent):
131 131 # Flush output before making the request.
132 132 sys.stderr.flush()
133 133 sys.stdout.flush()
134 134
135 135 # Send the input request.
136 136 content = dict(prompt=prompt)
137 137 msg = self.session.msg(u'input_request', content, parent)
138 138 self.req_socket.send_json(msg)
139 139
140 140 # Await a response.
141 141 reply = self.req_socket.recv_json()
142 142 try:
143 143 value = reply['content']['value']
144 144 except:
145 145 print>>sys.__stderr__, "Got bad raw_input reply: "
146 146 print>>sys.__stderr__, Message(parent)
147 147 value = ''
148 148 return value
149 149
150 150 def complete_request(self, ident, parent):
151 151 matches = {'matches' : self.complete(parent),
152 152 'status' : 'ok'}
153 153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
154 154 matches, parent, ident)
155 155 print >> sys.__stdout__, completion_msg
156 156
157 157 def complete(self, msg):
158 158 return self.completer.complete(msg.content.line, msg.content.text)
159 159
160 160 def object_info_request(self, ident, parent):
161 161 context = parent['content']['oname'].split('.')
162 162 object_info = self.object_info(context)
163 163 msg = self.session.send(self.reply_socket, 'object_info_reply',
164 164 object_info, parent, ident)
165 165 print >> sys.__stdout__, msg
166 166
167 167 def object_info(self, context):
168 168 symbol, leftover = self.symbol_from_context(context)
169 169 if symbol is not None and not leftover:
170 170 doc = getattr(symbol, '__doc__', '')
171 171 else:
172 172 doc = ''
173 173 object_info = dict(docstring = doc)
174 174 return object_info
175 175
176 176 def symbol_from_context(self, context):
177 177 if not context:
178 178 return None, context
179 179
180 180 base_symbol_string = context[0]
181 181 symbol = self.user_ns.get(base_symbol_string, None)
182 182 if symbol is None:
183 183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
184 184 if symbol is None:
185 185 return None, context
186 186
187 187 context = context[1:]
188 188 for i, name in enumerate(context):
189 189 new_symbol = getattr(symbol, name, None)
190 190 if new_symbol is None:
191 191 return symbol, context[i:]
192 192 else:
193 193 symbol = new_symbol
194 194
195 195 return symbol, []
196 196
197 197 def start(self):
198 198 while True:
199 199 ident = self.reply_socket.recv()
200 200 assert self.reply_socket.rcvmore(), "Missing message part."
201 201 msg = self.reply_socket.recv_json()
202 202 omsg = Message(msg)
203 203 print>>sys.__stdout__
204 204 print>>sys.__stdout__, omsg
205 205 handler = self.handlers.get(omsg.msg_type, None)
206 206 if handler is None:
207 207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
208 208 else:
209 209 handler(ident, omsg)
210 210
211 211 #-----------------------------------------------------------------------------
212 212 # Kernel main and launch functions
213 213 #-----------------------------------------------------------------------------
214 214
215 215 def bind_port(socket, ip, port):
216 216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
217 217 port is chosen. Returns the port that was bound.
218 218 """
219 219 connection = 'tcp://%s' % ip
220 220 if port <= 0:
221 221 port = socket.bind_to_random_port(connection)
222 222 else:
223 223 connection += ':%i' % port
224 224 socket.bind(connection)
225 225 return port
226 226
227 227
228 228 def main():
229 229 """ Main entry point for launching a kernel.
230 230 """
231 231 # Parse command line arguments.
232 232 parser = ArgumentParser()
233 233 parser.add_argument('--ip', type=str, default='127.0.0.1',
234 234 help='set the kernel\'s IP address [default: local]')
235 235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
236 236 help='set the XREP channel port [default: random]')
237 237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
238 238 help='set the PUB channel port [default: random]')
239 239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
240 240 help='set the REQ channel port [default: random]')
241 241 if sys.platform == 'win32':
242 242 parser.add_argument('--parent', type=int, metavar='HANDLE',
243 243 default=0, help='kill this process if the process '
244 244 'with HANDLE dies')
245 245 else:
246 246 parser.add_argument('--parent', action='store_true',
247 247 help='kill this process if its parent dies')
248 248 namespace = parser.parse_args()
249 249
250 250 # Create a context, a session, and the kernel sockets.
251 251 print >>sys.__stdout__, "Starting the kernel..."
252 252 context = zmq.Context()
253 253 session = Session(username=u'kernel')
254 254
255 255 reply_socket = context.socket(zmq.XREP)
256 256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
257 257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
258 258
259 259 pub_socket = context.socket(zmq.PUB)
260 260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
261 261 print >>sys.__stdout__, "PUB Channel on port", pub_port
262 262
263 263 req_socket = context.socket(zmq.XREQ)
264 264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
265 265 print >>sys.__stdout__, "REQ Channel on port", req_port
266 266
267 267 # Redirect input streams and set a display hook.
268 268 sys.stdout = OutStream(session, pub_socket, u'stdout')
269 269 sys.stderr = OutStream(session, pub_socket, u'stderr')
270 270 sys.displayhook = DisplayHook(session, pub_socket)
271 271
272 272 # Create the kernel.
273 273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
274 274
275 275 # Configure this kernel/process to die on parent termination, if necessary.
276 276 if namespace.parent:
277 277 if sys.platform == 'win32':
278 278 poller = ExitPollerWindows(namespace.parent)
279 279 else:
280 280 poller = ExitPollerUnix()
281 281 poller.start()
282 282
283 283 # Start the kernel mainloop.
284 284 kernel.start()
285 285
286 286
287 287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
288 288 """ Launches a localhost kernel, binding to the specified ports.
289 289
290 290 Parameters
291 291 ----------
292 292 xrep_port : int, optional
293 293 The port to use for XREP channel.
294 294
295 295 pub_port : int, optional
296 296 The port to use for the SUB channel.
297 297
298 298 req_port : int, optional
299 299 The port to use for the REQ (raw input) channel.
300 300
301 301 independent : bool, optional (default False)
302 302 If set, the kernel process is guaranteed to survive if this process
303 303 dies. If not set, an effort is made to ensure that the kernel is killed
304 304 when this process dies. Note that in this case it is still good practice
305 305 to kill kernels manually before exiting.
306 306
307 307 Returns
308 308 -------
309 309 A tuple of form:
310 310 (kernel_process, xrep_port, pub_port, req_port)
311 311 where kernel_process is a Popen object and the ports are integers.
312 312 """
313 313 import socket
314 314 from subprocess import Popen
315 315
316 316 # Find open ports as necessary.
317 317 ports = []
318 318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
319 319 for i in xrange(ports_needed):
320 320 sock = socket.socket()
321 321 sock.bind(('', 0))
322 322 ports.append(sock)
323 323 for i, sock in enumerate(ports):
324 324 port = sock.getsockname()[1]
325 325 sock.close()
326 326 ports[i] = port
327 327 if xrep_port <= 0:
328 328 xrep_port = ports.pop(0)
329 329 if pub_port <= 0:
330 330 pub_port = ports.pop(0)
331 331 if req_port <= 0:
332 332 req_port = ports.pop(0)
333 333
334 334 # Spawn a kernel.
335 command = 'from IPython.zmq.kernel import main; main()'
335 command = 'from IPython.zmq.ipkernel import main; main()'
336 336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
337 337 '--pub', str(pub_port), '--req', str(req_port) ]
338 338 if independent:
339 339 if sys.platform == 'win32':
340 340 proc = Popen(['start', '/b'] + arguments, shell=True)
341 341 else:
342 342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
343 343 else:
344 344 if sys.platform == 'win32':
345 345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
346 346 DUPLICATE_SAME_ACCESS
347 347 pid = GetCurrentProcess()
348 348 handle = DuplicateHandle(pid, pid, pid, 0,
349 349 True, # Inheritable by new processes.
350 350 DUPLICATE_SAME_ACCESS)
351 351 proc = Popen(arguments + ['--parent', str(int(handle))])
352 352 else:
353 353 proc = Popen(arguments + ['--parent'])
354 354
355 355 return proc, xrep_port, pub_port, req_port
356 356
357 357
358 358 if __name__ == '__main__':
359 359 main()
@@ -1,571 +1,571 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 Todo
4 4 ====
5 5
6 6 * Create logger to handle debugging and console messages.
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2008-2010 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 # Standard library imports.
21 21 from Queue import Queue, Empty
22 22 from subprocess import Popen
23 23 from threading import Thread
24 24 import time
25 25
26 26 # System library imports.
27 27 import zmq
28 28 from zmq import POLLIN, POLLOUT, POLLERR
29 29 from zmq.eventloop import ioloop
30 30
31 31 # Local imports.
32 32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 from kernel import launch_kernel
33 from .ipkernel import launch_kernel
34 34 from session import Session
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Constants and exceptions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 LOCALHOST = '127.0.0.1'
41 41
42 42 class InvalidPortNumber(Exception):
43 43 pass
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # ZMQ Socket Channel classes
47 47 #-----------------------------------------------------------------------------
48 48
49 49 class ZmqSocketChannel(Thread):
50 50 """The base class for the channels that use ZMQ sockets.
51 51 """
52 52 context = None
53 53 session = None
54 54 socket = None
55 55 ioloop = None
56 56 iostate = None
57 57 _address = None
58 58
59 59 def __init__(self, context, session, address):
60 60 """Create a channel
61 61
62 62 Parameters
63 63 ----------
64 64 context : :class:`zmq.Context`
65 65 The ZMQ context to use.
66 66 session : :class:`session.Session`
67 67 The session to use.
68 68 address : tuple
69 69 Standard (ip, port) tuple that the kernel is listening on.
70 70 """
71 71 super(ZmqSocketChannel, self).__init__()
72 72 self.daemon = True
73 73
74 74 self.context = context
75 75 self.session = session
76 76 if address[1] == 0:
77 77 message = 'The port number for a channel cannot be 0.'
78 78 raise InvalidPortNumber(message)
79 79 self._address = address
80 80
81 81 def stop(self):
82 82 """Stop the channel's activity.
83 83
84 84 This calls :method:`Thread.join` and returns when the thread
85 85 terminates. :class:`RuntimeError` will be raised if
86 86 :method:`self.start` is called again.
87 87 """
88 88 self.join()
89 89
90 90 @property
91 91 def address(self):
92 92 """Get the channel's address as an (ip, port) tuple.
93 93
94 94 By the default, the address is (localhost, 0), where 0 means a random
95 95 port.
96 96 """
97 97 return self._address
98 98
99 99 def add_io_state(self, state):
100 100 """Add IO state to the eventloop.
101 101
102 102 Parameters
103 103 ----------
104 104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
105 105 The IO state flag to set.
106 106
107 107 This is thread safe as it uses the thread safe IOLoop.add_callback.
108 108 """
109 109 def add_io_state_callback():
110 110 if not self.iostate & state:
111 111 self.iostate = self.iostate | state
112 112 self.ioloop.update_handler(self.socket, self.iostate)
113 113 self.ioloop.add_callback(add_io_state_callback)
114 114
115 115 def drop_io_state(self, state):
116 116 """Drop IO state from the eventloop.
117 117
118 118 Parameters
119 119 ----------
120 120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
121 121 The IO state flag to set.
122 122
123 123 This is thread safe as it uses the thread safe IOLoop.add_callback.
124 124 """
125 125 def drop_io_state_callback():
126 126 if self.iostate & state:
127 127 self.iostate = self.iostate & (~state)
128 128 self.ioloop.update_handler(self.socket, self.iostate)
129 129 self.ioloop.add_callback(drop_io_state_callback)
130 130
131 131
132 132 class XReqSocketChannel(ZmqSocketChannel):
133 133 """The XREQ channel for issues request/replies to the kernel.
134 134 """
135 135
136 136 command_queue = None
137 137
138 138 def __init__(self, context, session, address):
139 139 self.command_queue = Queue()
140 140 super(XReqSocketChannel, self).__init__(context, session, address)
141 141
142 142 def run(self):
143 143 """The thread's main activity. Call start() instead."""
144 144 self.socket = self.context.socket(zmq.XREQ)
145 145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 146 self.socket.connect('tcp://%s:%i' % self.address)
147 147 self.ioloop = ioloop.IOLoop()
148 148 self.iostate = POLLERR|POLLIN
149 149 self.ioloop.add_handler(self.socket, self._handle_events,
150 150 self.iostate)
151 151 self.ioloop.start()
152 152
153 153 def stop(self):
154 154 self.ioloop.stop()
155 155 super(XReqSocketChannel, self).stop()
156 156
157 157 def call_handlers(self, msg):
158 158 """This method is called in the ioloop thread when a message arrives.
159 159
160 160 Subclasses should override this method to handle incoming messages.
161 161 It is important to remember that this method is called in the thread
162 162 so that some logic must be done to ensure that the application leve
163 163 handlers are called in the application thread.
164 164 """
165 165 raise NotImplementedError('call_handlers must be defined in a subclass.')
166 166
167 167 def execute(self, code):
168 168 """Execute code in the kernel.
169 169
170 170 Parameters
171 171 ----------
172 172 code : str
173 173 A string of Python code.
174 174
175 175 Returns
176 176 -------
177 177 The msg_id of the message sent.
178 178 """
179 179 # Create class for content/msg creation. Related to, but possibly
180 180 # not in Session.
181 181 content = dict(code=code)
182 182 msg = self.session.msg('execute_request', content)
183 183 self._queue_request(msg)
184 184 return msg['header']['msg_id']
185 185
186 186 def complete(self, text, line, block=None):
187 187 """Tab complete text, line, block in the kernel's namespace.
188 188
189 189 Parameters
190 190 ----------
191 191 text : str
192 192 The text to complete.
193 193 line : str
194 194 The full line of text that is the surrounding context for the
195 195 text to complete.
196 196 block : str
197 197 The full block of code in which the completion is being requested.
198 198
199 199 Returns
200 200 -------
201 201 The msg_id of the message sent.
202 202 """
203 203 content = dict(text=text, line=line)
204 204 msg = self.session.msg('complete_request', content)
205 205 self._queue_request(msg)
206 206 return msg['header']['msg_id']
207 207
208 208 def object_info(self, oname):
209 209 """Get metadata information about an object.
210 210
211 211 Parameters
212 212 ----------
213 213 oname : str
214 214 A string specifying the object name.
215 215
216 216 Returns
217 217 -------
218 218 The msg_id of the message sent.
219 219 """
220 220 content = dict(oname=oname)
221 221 msg = self.session.msg('object_info_request', content)
222 222 self._queue_request(msg)
223 223 return msg['header']['msg_id']
224 224
225 225 def _handle_events(self, socket, events):
226 226 if events & POLLERR:
227 227 self._handle_err()
228 228 if events & POLLOUT:
229 229 self._handle_send()
230 230 if events & POLLIN:
231 231 self._handle_recv()
232 232
233 233 def _handle_recv(self):
234 234 msg = self.socket.recv_json()
235 235 self.call_handlers(msg)
236 236
237 237 def _handle_send(self):
238 238 try:
239 239 msg = self.command_queue.get(False)
240 240 except Empty:
241 241 pass
242 242 else:
243 243 self.socket.send_json(msg)
244 244 if self.command_queue.empty():
245 245 self.drop_io_state(POLLOUT)
246 246
247 247 def _handle_err(self):
248 248 # We don't want to let this go silently, so eventually we should log.
249 249 raise zmq.ZMQError()
250 250
251 251 def _queue_request(self, msg):
252 252 self.command_queue.put(msg)
253 253 self.add_io_state(POLLOUT)
254 254
255 255
256 256 class SubSocketChannel(ZmqSocketChannel):
257 257 """The SUB channel which listens for messages that the kernel publishes.
258 258 """
259 259
260 260 def __init__(self, context, session, address):
261 261 super(SubSocketChannel, self).__init__(context, session, address)
262 262
263 263 def run(self):
264 264 """The thread's main activity. Call start() instead."""
265 265 self.socket = self.context.socket(zmq.SUB)
266 266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 268 self.socket.connect('tcp://%s:%i' % self.address)
269 269 self.ioloop = ioloop.IOLoop()
270 270 self.iostate = POLLIN|POLLERR
271 271 self.ioloop.add_handler(self.socket, self._handle_events,
272 272 self.iostate)
273 273 self.ioloop.start()
274 274
275 275 def stop(self):
276 276 self.ioloop.stop()
277 277 super(SubSocketChannel, self).stop()
278 278
279 279 def call_handlers(self, msg):
280 280 """This method is called in the ioloop thread when a message arrives.
281 281
282 282 Subclasses should override this method to handle incoming messages.
283 283 It is important to remember that this method is called in the thread
284 284 so that some logic must be done to ensure that the application leve
285 285 handlers are called in the application thread.
286 286 """
287 287 raise NotImplementedError('call_handlers must be defined in a subclass.')
288 288
289 289 def flush(self, timeout=1.0):
290 290 """Immediately processes all pending messages on the SUB channel.
291 291
292 292 Callers should use this method to ensure that :method:`call_handlers`
293 293 has been called for all messages that have been received on the
294 294 0MQ SUB socket of this channel.
295 295
296 296 This method is thread safe.
297 297
298 298 Parameters
299 299 ----------
300 300 timeout : float, optional
301 301 The maximum amount of time to spend flushing, in seconds. The
302 302 default is one second.
303 303 """
304 304 # We do the IOLoop callback process twice to ensure that the IOLoop
305 305 # gets to perform at least one full poll.
306 306 stop_time = time.time() + timeout
307 307 for i in xrange(2):
308 308 self._flushed = False
309 309 self.ioloop.add_callback(self._flush)
310 310 while not self._flushed and time.time() < stop_time:
311 311 time.sleep(0.01)
312 312
313 313 def _handle_events(self, socket, events):
314 314 # Turn on and off POLLOUT depending on if we have made a request
315 315 if events & POLLERR:
316 316 self._handle_err()
317 317 if events & POLLIN:
318 318 self._handle_recv()
319 319
320 320 def _handle_err(self):
321 321 # We don't want to let this go silently, so eventually we should log.
322 322 raise zmq.ZMQError()
323 323
324 324 def _handle_recv(self):
325 325 # Get all of the messages we can
326 326 while True:
327 327 try:
328 328 msg = self.socket.recv_json(zmq.NOBLOCK)
329 329 except zmq.ZMQError:
330 330 # Check the errno?
331 331 # Will this trigger POLLERR?
332 332 break
333 333 else:
334 334 self.call_handlers(msg)
335 335
336 336 def _flush(self):
337 337 """Callback for :method:`self.flush`."""
338 338 self._flushed = True
339 339
340 340
341 341 class RepSocketChannel(ZmqSocketChannel):
342 342 """A reply channel to handle raw_input requests that the kernel makes."""
343 343
344 344 msg_queue = None
345 345
346 346 def __init__(self, context, session, address):
347 347 self.msg_queue = Queue()
348 348 super(RepSocketChannel, self).__init__(context, session, address)
349 349
350 350 def run(self):
351 351 """The thread's main activity. Call start() instead."""
352 352 self.socket = self.context.socket(zmq.XREQ)
353 353 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
354 354 self.socket.connect('tcp://%s:%i' % self.address)
355 355 self.ioloop = ioloop.IOLoop()
356 356 self.iostate = POLLERR|POLLIN
357 357 self.ioloop.add_handler(self.socket, self._handle_events,
358 358 self.iostate)
359 359 self.ioloop.start()
360 360
361 361 def stop(self):
362 362 self.ioloop.stop()
363 363 super(RepSocketChannel, self).stop()
364 364
365 365 def call_handlers(self, msg):
366 366 """This method is called in the ioloop thread when a message arrives.
367 367
368 368 Subclasses should override this method to handle incoming messages.
369 369 It is important to remember that this method is called in the thread
370 370 so that some logic must be done to ensure that the application leve
371 371 handlers are called in the application thread.
372 372 """
373 373 raise NotImplementedError('call_handlers must be defined in a subclass.')
374 374
375 375 def input(self, string):
376 376 """Send a string of raw input to the kernel."""
377 377 content = dict(value=string)
378 378 msg = self.session.msg('input_reply', content)
379 379 self._queue_reply(msg)
380 380
381 381 def _handle_events(self, socket, events):
382 382 if events & POLLERR:
383 383 self._handle_err()
384 384 if events & POLLOUT:
385 385 self._handle_send()
386 386 if events & POLLIN:
387 387 self._handle_recv()
388 388
389 389 def _handle_recv(self):
390 390 msg = self.socket.recv_json()
391 391 self.call_handlers(msg)
392 392
393 393 def _handle_send(self):
394 394 try:
395 395 msg = self.msg_queue.get(False)
396 396 except Empty:
397 397 pass
398 398 else:
399 399 self.socket.send_json(msg)
400 400 if self.msg_queue.empty():
401 401 self.drop_io_state(POLLOUT)
402 402
403 403 def _handle_err(self):
404 404 # We don't want to let this go silently, so eventually we should log.
405 405 raise zmq.ZMQError()
406 406
407 407 def _queue_reply(self, msg):
408 408 self.msg_queue.put(msg)
409 409 self.add_io_state(POLLOUT)
410 410
411 411
412 412 #-----------------------------------------------------------------------------
413 413 # Main kernel manager class
414 414 #-----------------------------------------------------------------------------
415 415
416 416 class KernelManager(HasTraits):
417 417 """ Manages a kernel for a frontend.
418 418
419 419 The SUB channel is for the frontend to receive messages published by the
420 420 kernel.
421 421
422 422 The REQ channel is for the frontend to make requests of the kernel.
423 423
424 424 The REP channel is for the kernel to request stdin (raw_input) from the
425 425 frontend.
426 426 """
427 427 # The PyZMQ Context to use for communication with the kernel.
428 428 context = Instance(zmq.Context,(),{})
429 429
430 430 # The Session to use for communication with the kernel.
431 431 session = Instance(Session,(),{})
432 432
433 433 # The kernel process with which the KernelManager is communicating.
434 434 kernel = Instance(Popen)
435 435
436 436 # The classes to use for the various channels.
437 437 xreq_channel_class = Type(XReqSocketChannel)
438 438 sub_channel_class = Type(SubSocketChannel)
439 439 rep_channel_class = Type(RepSocketChannel)
440 440
441 441 # Protected traits.
442 442 xreq_address = TCPAddress((LOCALHOST, 0))
443 443 sub_address = TCPAddress((LOCALHOST, 0))
444 444 rep_address = TCPAddress((LOCALHOST, 0))
445 445 _xreq_channel = Any
446 446 _sub_channel = Any
447 447 _rep_channel = Any
448 448
449 449 def __init__(self, **kwargs):
450 450 super(KernelManager, self).__init__(**kwargs)
451 451
452 452 #--------------------------------- -----------------------------------------
453 453 # Channel management methods:
454 454 #--------------------------------------------------------------------------
455 455
456 456 def start_channels(self):
457 457 """Starts the channels for this kernel.
458 458
459 459 This will create the channels if they do not exist and then start
460 460 them. If port numbers of 0 are being used (random ports) then you
461 461 must first call :method:`start_kernel`. If the channels have been
462 462 stopped and you call this, :class:`RuntimeError` will be raised.
463 463 """
464 464 self.xreq_channel.start()
465 465 self.sub_channel.start()
466 466 self.rep_channel.start()
467 467
468 468 def stop_channels(self):
469 469 """Stops the channels for this kernel.
470 470
471 471 This stops the channels by joining their threads. If the channels
472 472 were not started, :class:`RuntimeError` will be raised.
473 473 """
474 474 self.xreq_channel.stop()
475 475 self.sub_channel.stop()
476 476 self.rep_channel.stop()
477 477
478 478 @property
479 479 def channels_running(self):
480 480 """Are all of the channels created and running?"""
481 481 return self.xreq_channel.is_alive() \
482 482 and self.sub_channel.is_alive() \
483 483 and self.rep_channel.is_alive()
484 484
485 485 #--------------------------------------------------------------------------
486 486 # Kernel process management methods:
487 487 #--------------------------------------------------------------------------
488 488
489 489 def start_kernel(self):
490 490 """Starts a kernel process and configures the manager to use it.
491 491
492 492 If random ports (port=0) are being used, this method must be called
493 493 before the channels are created.
494 494 """
495 495 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
496 496 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
497 497 raise RuntimeError("Can only launch a kernel on localhost."
498 498 "Make sure that the '*_address' attributes are "
499 499 "configured properly.")
500 500
501 501 self.kernel, xrep, pub, req = launch_kernel(
502 502 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
503 503 self.xreq_address = (LOCALHOST, xrep)
504 504 self.sub_address = (LOCALHOST, pub)
505 505 self.rep_address = (LOCALHOST, req)
506 506
507 507 @property
508 508 def has_kernel(self):
509 509 """Returns whether a kernel process has been specified for the kernel
510 510 manager.
511 511 """
512 512 return self.kernel is not None
513 513
514 514 def kill_kernel(self):
515 515 """ Kill the running kernel. """
516 516 if self.kernel is not None:
517 517 self.kernel.kill()
518 518 self.kernel = None
519 519 else:
520 520 raise RuntimeError("Cannot kill kernel. No kernel is running!")
521 521
522 522 def signal_kernel(self, signum):
523 523 """ Sends a signal to the kernel. """
524 524 if self.kernel is not None:
525 525 self.kernel.send_signal(signum)
526 526 else:
527 527 raise RuntimeError("Cannot signal kernel. No kernel is running!")
528 528
529 529 @property
530 530 def is_alive(self):
531 531 """Is the kernel process still running?"""
532 532 if self.kernel is not None:
533 533 if self.kernel.poll() is None:
534 534 return True
535 535 else:
536 536 return False
537 537 else:
538 538 # We didn't start the kernel with this KernelManager so we don't
539 539 # know if it is running. We should use a heartbeat for this case.
540 540 return True
541 541
542 542 #--------------------------------------------------------------------------
543 543 # Channels used for communication with the kernel:
544 544 #--------------------------------------------------------------------------
545 545
546 546 @property
547 547 def xreq_channel(self):
548 548 """Get the REQ socket channel object to make requests of the kernel."""
549 549 if self._xreq_channel is None:
550 550 self._xreq_channel = self.xreq_channel_class(self.context,
551 551 self.session,
552 552 self.xreq_address)
553 553 return self._xreq_channel
554 554
555 555 @property
556 556 def sub_channel(self):
557 557 """Get the SUB socket channel object."""
558 558 if self._sub_channel is None:
559 559 self._sub_channel = self.sub_channel_class(self.context,
560 560 self.session,
561 561 self.sub_address)
562 562 return self._sub_channel
563 563
564 564 @property
565 565 def rep_channel(self):
566 566 """Get the REP socket channel object to handle stdin (raw_input)."""
567 567 if self._rep_channel is None:
568 568 self._rep_channel = self.rep_channel_class(self.context,
569 569 self.session,
570 570 self.rep_address)
571 571 return self._rep_channel
General Comments 0
You need to be logged in to leave comments. Login now