##// END OF EJS Templates
Greatly increased frontend performance by improving kernel stdout/stderr buffering.
epatters -
Show More
@@ -1,535 +1,547 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
8 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
9 * Implement random port and security key logic.
10 * Implement control messages.
10 * Implement control messages.
11 * Implement event loop and poll version.
11 * Implement event loop and poll version.
12 """
12 """
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import __builtin__
19 import __builtin__
20 from code import CommandCompiler
20 from code import CommandCompiler
21 from cStringIO import StringIO
21 import os
22 import os
22 import sys
23 import sys
23 from threading import Thread
24 from threading import Thread
24 import time
25 import time
25 import traceback
26 import traceback
26
27
27 # System library imports.
28 # System library imports.
28 import zmq
29 import zmq
29
30
30 # Local imports.
31 # Local imports.
31 from IPython.external.argparse import ArgumentParser
32 from IPython.external.argparse import ArgumentParser
32 from session import Session, Message, extract_header
33 from session import Session, Message, extract_header
33 from completer import KernelCompleter
34 from completer import KernelCompleter
34
35
35 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
36 # Kernel and stream classes
37 # Kernel and stream classes
37 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
38
39
39 class InStream(object):
40 class InStream(object):
40 """ A file like object that reads from a 0MQ XREQ socket."""
41 """ A file like object that reads from a 0MQ XREQ socket."""
41
42
42 def __init__(self, session, socket):
43 def __init__(self, session, socket):
43 self.session = session
44 self.session = session
44 self.socket = socket
45 self.socket = socket
45
46
46 def close(self):
47 def close(self):
47 self.socket = None
48 self.socket = None
48
49
49 def flush(self):
50 def flush(self):
50 if self.socket is None:
51 if self.socket is None:
51 raise ValueError('I/O operation on closed file')
52 raise ValueError('I/O operation on closed file')
52
53
53 def isatty(self):
54 def isatty(self):
54 return False
55 return False
55
56
56 def next(self):
57 def next(self):
57 raise IOError('Seek not supported.')
58 raise IOError('Seek not supported.')
58
59
59 def read(self, size=-1):
60 def read(self, size=-1):
60 # FIXME: Do we want another request for this?
61 # FIXME: Do we want another request for this?
61 string = '\n'.join(self.readlines())
62 string = '\n'.join(self.readlines())
62 return self._truncate(string, size)
63 return self._truncate(string, size)
63
64
64 def readline(self, size=-1):
65 def readline(self, size=-1):
65 if self.socket is None:
66 if self.socket is None:
66 raise ValueError('I/O operation on closed file')
67 raise ValueError('I/O operation on closed file')
67 else:
68 else:
68 content = dict(size=size)
69 content = dict(size=size)
69 msg = self.session.msg('readline_request', content=content)
70 msg = self.session.msg('readline_request', content=content)
70 reply = self._request(msg)
71 reply = self._request(msg)
71 line = reply['content']['line']
72 line = reply['content']['line']
72 return self._truncate(line, size)
73 return self._truncate(line, size)
73
74
74 def readlines(self, sizehint=-1):
75 def readlines(self, sizehint=-1):
75 # Sizehint is ignored, as is permitted.
76 # Sizehint is ignored, as is permitted.
76 if self.socket is None:
77 if self.socket is None:
77 raise ValueError('I/O operation on closed file')
78 raise ValueError('I/O operation on closed file')
78 else:
79 else:
79 lines = []
80 lines = []
80 while True:
81 while True:
81 line = self.readline()
82 line = self.readline()
82 if line:
83 if line:
83 lines.append(line)
84 lines.append(line)
84 else:
85 else:
85 break
86 break
86 return lines
87 return lines
87
88
88 def seek(self, offset, whence=None):
89 def seek(self, offset, whence=None):
89 raise IOError('Seek not supported.')
90 raise IOError('Seek not supported.')
90
91
91 def write(self, string):
92 def write(self, string):
92 raise IOError('Write not supported on a read only stream.')
93 raise IOError('Write not supported on a read only stream.')
93
94
94 def writelines(self, sequence):
95 def writelines(self, sequence):
95 raise IOError('Write not supported on a read only stream.')
96 raise IOError('Write not supported on a read only stream.')
96
97
97 def _request(self, msg):
98 def _request(self, msg):
98 # Flush output before making the request. This ensures, for example,
99 # Flush output before making the request. This ensures, for example,
99 # that raw_input(prompt) actually gets a prompt written.
100 # that raw_input(prompt) actually gets a prompt written.
100 sys.stderr.flush()
101 sys.stderr.flush()
101 sys.stdout.flush()
102 sys.stdout.flush()
102
103
103 self.socket.send_json(msg)
104 self.socket.send_json(msg)
104 while True:
105 while True:
105 try:
106 try:
106 reply = self.socket.recv_json(zmq.NOBLOCK)
107 reply = self.socket.recv_json(zmq.NOBLOCK)
107 except zmq.ZMQError, e:
108 except zmq.ZMQError, e:
108 if e.errno == zmq.EAGAIN:
109 if e.errno == zmq.EAGAIN:
109 pass
110 pass
110 else:
111 else:
111 raise
112 raise
112 else:
113 else:
113 break
114 break
114 return reply
115 return reply
115
116
116 def _truncate(self, string, size):
117 def _truncate(self, string, size):
117 if size >= 0:
118 if size >= 0:
118 if isinstance(string, str):
119 if isinstance(string, str):
119 return string[:size]
120 return string[:size]
120 elif isinstance(string, unicode):
121 elif isinstance(string, unicode):
121 encoded = string.encode('utf-8')[:size]
122 encoded = string.encode('utf-8')[:size]
122 return encoded.decode('utf-8', 'ignore')
123 return encoded.decode('utf-8', 'ignore')
123 return string
124 return string
124
125
125
126
126 class OutStream(object):
127 class OutStream(object):
127 """A file like object that publishes the stream to a 0MQ PUB socket."""
128 """A file like object that publishes the stream to a 0MQ PUB socket."""
128
129
129 def __init__(self, session, pub_socket, name, max_buffer=200):
130 # The time interval between automatic flushes, in seconds.
131 flush_interval = 0.05
132
133 def __init__(self, session, pub_socket, name):
130 self.session = session
134 self.session = session
131 self.pub_socket = pub_socket
135 self.pub_socket = pub_socket
132 self.name = name
136 self.name = name
133 self._buffer = []
134 self._buffer_len = 0
135 self.max_buffer = max_buffer
136 self.parent_header = {}
137 self.parent_header = {}
138 self._new_buffer()
137
139
138 def set_parent(self, parent):
140 def set_parent(self, parent):
139 self.parent_header = extract_header(parent)
141 self.parent_header = extract_header(parent)
140
142
141 def close(self):
143 def close(self):
142 self.pub_socket = None
144 self.pub_socket = None
143
145
144 def flush(self):
146 def flush(self):
145 if self.pub_socket is None:
147 if self.pub_socket is None:
146 raise ValueError(u'I/O operation on closed file')
148 raise ValueError(u'I/O operation on closed file')
147 else:
149 else:
148 if self._buffer:
150 data = self._buffer.getvalue()
149 data = ''.join(self._buffer)
151 if data:
150 content = {u'name':self.name, u'data':data}
152 content = {u'name':self.name, u'data':data}
151 msg = self.session.msg(u'stream', content=content,
153 msg = self.session.msg(u'stream', content=content,
152 parent=self.parent_header)
154 parent=self.parent_header)
153 print>>sys.__stdout__, Message(msg)
155 print>>sys.__stdout__, Message(msg)
154 self.pub_socket.send_json(msg)
156 self.pub_socket.send_json(msg)
155 self._buffer_len = 0
157
156 self._buffer = []
158 self._buffer.close()
159 self._new_buffer()
157
160
158 def isatty(self):
161 def isatty(self):
159 return False
162 return False
160
163
161 def next(self):
164 def next(self):
162 raise IOError('Read not supported on a write only stream.')
165 raise IOError('Read not supported on a write only stream.')
163
166
164 def read(self, size=None):
167 def read(self, size=-1):
165 raise IOError('Read not supported on a write only stream.')
168 raise IOError('Read not supported on a write only stream.')
166
169
167 readline=read
170 def readline(self, size=-1):
171 raise IOError('Read not supported on a write only stream.')
168
172
169 def write(self, s):
173 def write(self, string):
170 if self.pub_socket is None:
174 if self.pub_socket is None:
171 raise ValueError('I/O operation on closed file')
175 raise ValueError('I/O operation on closed file')
172 else:
176 else:
173 self._buffer.append(s)
177 self._buffer.write(string)
174 self._buffer_len += len(s)
178 current_time = time.time()
175 self._maybe_send()
179 if self._start <= 0:
176
180 self._start = current_time
177 def _maybe_send(self):
181 elif current_time - self._start > self.flush_interval:
178 if '\n' in self._buffer[-1]:
182 self.flush()
179 self.flush()
180 if self._buffer_len > self.max_buffer:
181 self.flush()
182
183
183 def writelines(self, sequence):
184 def writelines(self, sequence):
184 if self.pub_socket is None:
185 if self.pub_socket is None:
185 raise ValueError('I/O operation on closed file')
186 raise ValueError('I/O operation on closed file')
186 else:
187 else:
187 for s in sequence:
188 for string in sequence:
188 self.write(s)
189 self.write(string)
190
191 def _new_buffer(self):
192 self._buffer = StringIO()
193 self._start = -1
189
194
190
195
191 class DisplayHook(object):
196 class DisplayHook(object):
192
197
193 def __init__(self, session, pub_socket):
198 def __init__(self, session, pub_socket):
194 self.session = session
199 self.session = session
195 self.pub_socket = pub_socket
200 self.pub_socket = pub_socket
196 self.parent_header = {}
201 self.parent_header = {}
197
202
198 def __call__(self, obj):
203 def __call__(self, obj):
199 if obj is None:
204 if obj is None:
200 return
205 return
201
206
202 __builtin__._ = obj
207 __builtin__._ = obj
203 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
208 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
204 parent=self.parent_header)
209 parent=self.parent_header)
205 self.pub_socket.send_json(msg)
210 self.pub_socket.send_json(msg)
206
211
207 def set_parent(self, parent):
212 def set_parent(self, parent):
208 self.parent_header = extract_header(parent)
213 self.parent_header = extract_header(parent)
209
214
210
215
211 class Kernel(object):
216 class Kernel(object):
212
217
213 def __init__(self, session, reply_socket, pub_socket):
218 def __init__(self, session, reply_socket, pub_socket):
214 self.session = session
219 self.session = session
215 self.reply_socket = reply_socket
220 self.reply_socket = reply_socket
216 self.pub_socket = pub_socket
221 self.pub_socket = pub_socket
217 self.user_ns = {}
222 self.user_ns = {}
218 self.history = []
223 self.history = []
219 self.compiler = CommandCompiler()
224 self.compiler = CommandCompiler()
220 self.completer = KernelCompleter(self.user_ns)
225 self.completer = KernelCompleter(self.user_ns)
221
226
222 # Build dict of handlers for message types
227 # Build dict of handlers for message types
223 msg_types = [ 'execute_request', 'complete_request',
228 msg_types = [ 'execute_request', 'complete_request',
224 'object_info_request' ]
229 'object_info_request' ]
225 self.handlers = {}
230 self.handlers = {}
226 for msg_type in msg_types:
231 for msg_type in msg_types:
227 self.handlers[msg_type] = getattr(self, msg_type)
232 self.handlers[msg_type] = getattr(self, msg_type)
228
233
229 def abort_queue(self):
234 def abort_queue(self):
230 while True:
235 while True:
231 try:
236 try:
232 ident = self.reply_socket.recv(zmq.NOBLOCK)
237 ident = self.reply_socket.recv(zmq.NOBLOCK)
233 except zmq.ZMQError, e:
238 except zmq.ZMQError, e:
234 if e.errno == zmq.EAGAIN:
239 if e.errno == zmq.EAGAIN:
235 break
240 break
236 else:
241 else:
237 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
242 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
238 msg = self.reply_socket.recv_json()
243 msg = self.reply_socket.recv_json()
239 print>>sys.__stdout__, "Aborting:"
244 print>>sys.__stdout__, "Aborting:"
240 print>>sys.__stdout__, Message(msg)
245 print>>sys.__stdout__, Message(msg)
241 msg_type = msg['msg_type']
246 msg_type = msg['msg_type']
242 reply_type = msg_type.split('_')[0] + '_reply'
247 reply_type = msg_type.split('_')[0] + '_reply'
243 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
248 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
244 print>>sys.__stdout__, Message(reply_msg)
249 print>>sys.__stdout__, Message(reply_msg)
245 self.reply_socket.send(ident,zmq.SNDMORE)
250 self.reply_socket.send(ident,zmq.SNDMORE)
246 self.reply_socket.send_json(reply_msg)
251 self.reply_socket.send_json(reply_msg)
247 # We need to wait a bit for requests to come in. This can probably
252 # We need to wait a bit for requests to come in. This can probably
248 # be set shorter for true asynchronous clients.
253 # be set shorter for true asynchronous clients.
249 time.sleep(0.1)
254 time.sleep(0.1)
250
255
251 def execute_request(self, ident, parent):
256 def execute_request(self, ident, parent):
252 try:
257 try:
253 code = parent[u'content'][u'code']
258 code = parent[u'content'][u'code']
254 except:
259 except:
255 print>>sys.__stderr__, "Got bad msg: "
260 print>>sys.__stderr__, "Got bad msg: "
256 print>>sys.__stderr__, Message(parent)
261 print>>sys.__stderr__, Message(parent)
257 return
262 return
258 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
263 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
259 self.pub_socket.send_json(pyin_msg)
264 self.pub_socket.send_json(pyin_msg)
265
260 try:
266 try:
261 comp_code = self.compiler(code, '<zmq-kernel>')
267 comp_code = self.compiler(code, '<zmq-kernel>')
262 sys.displayhook.set_parent(parent)
268 sys.displayhook.set_parent(parent)
263 exec comp_code in self.user_ns, self.user_ns
269 exec comp_code in self.user_ns, self.user_ns
264 except:
270 except:
265 result = u'error'
271 result = u'error'
266 etype, evalue, tb = sys.exc_info()
272 etype, evalue, tb = sys.exc_info()
267 tb = traceback.format_exception(etype, evalue, tb)
273 tb = traceback.format_exception(etype, evalue, tb)
268 exc_content = {
274 exc_content = {
269 u'status' : u'error',
275 u'status' : u'error',
270 u'traceback' : tb,
276 u'traceback' : tb,
271 u'ename' : unicode(etype.__name__),
277 u'ename' : unicode(etype.__name__),
272 u'evalue' : unicode(evalue)
278 u'evalue' : unicode(evalue)
273 }
279 }
274 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
280 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
275 self.pub_socket.send_json(exc_msg)
281 self.pub_socket.send_json(exc_msg)
276 reply_content = exc_content
282 reply_content = exc_content
277 else:
283 else:
278 reply_content = {'status' : 'ok'}
284 reply_content = {'status' : 'ok'}
285
286 # Flush output before sending the reply.
287 sys.stderr.flush()
288 sys.stdout.flush()
289
290 # Send the reply.
279 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
291 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
280 print>>sys.__stdout__, Message(reply_msg)
292 print>>sys.__stdout__, Message(reply_msg)
281 self.reply_socket.send(ident, zmq.SNDMORE)
293 self.reply_socket.send(ident, zmq.SNDMORE)
282 self.reply_socket.send_json(reply_msg)
294 self.reply_socket.send_json(reply_msg)
283 if reply_msg['content']['status'] == u'error':
295 if reply_msg['content']['status'] == u'error':
284 self.abort_queue()
296 self.abort_queue()
285
297
286 def complete_request(self, ident, parent):
298 def complete_request(self, ident, parent):
287 matches = {'matches' : self.complete(parent),
299 matches = {'matches' : self.complete(parent),
288 'status' : 'ok'}
300 'status' : 'ok'}
289 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
301 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
290 matches, parent, ident)
302 matches, parent, ident)
291 print >> sys.__stdout__, completion_msg
303 print >> sys.__stdout__, completion_msg
292
304
293 def complete(self, msg):
305 def complete(self, msg):
294 return self.completer.complete(msg.content.line, msg.content.text)
306 return self.completer.complete(msg.content.line, msg.content.text)
295
307
296 def object_info_request(self, ident, parent):
308 def object_info_request(self, ident, parent):
297 context = parent['content']['oname'].split('.')
309 context = parent['content']['oname'].split('.')
298 object_info = self.object_info(context)
310 object_info = self.object_info(context)
299 msg = self.session.send(self.reply_socket, 'object_info_reply',
311 msg = self.session.send(self.reply_socket, 'object_info_reply',
300 object_info, parent, ident)
312 object_info, parent, ident)
301 print >> sys.__stdout__, msg
313 print >> sys.__stdout__, msg
302
314
303 def object_info(self, context):
315 def object_info(self, context):
304 symbol, leftover = self.symbol_from_context(context)
316 symbol, leftover = self.symbol_from_context(context)
305 if symbol is not None and not leftover:
317 if symbol is not None and not leftover:
306 doc = getattr(symbol, '__doc__', '')
318 doc = getattr(symbol, '__doc__', '')
307 else:
319 else:
308 doc = ''
320 doc = ''
309 object_info = dict(docstring = doc)
321 object_info = dict(docstring = doc)
310 return object_info
322 return object_info
311
323
312 def symbol_from_context(self, context):
324 def symbol_from_context(self, context):
313 if not context:
325 if not context:
314 return None, context
326 return None, context
315
327
316 base_symbol_string = context[0]
328 base_symbol_string = context[0]
317 symbol = self.user_ns.get(base_symbol_string, None)
329 symbol = self.user_ns.get(base_symbol_string, None)
318 if symbol is None:
330 if symbol is None:
319 symbol = __builtin__.__dict__.get(base_symbol_string, None)
331 symbol = __builtin__.__dict__.get(base_symbol_string, None)
320 if symbol is None:
332 if symbol is None:
321 return None, context
333 return None, context
322
334
323 context = context[1:]
335 context = context[1:]
324 for i, name in enumerate(context):
336 for i, name in enumerate(context):
325 new_symbol = getattr(symbol, name, None)
337 new_symbol = getattr(symbol, name, None)
326 if new_symbol is None:
338 if new_symbol is None:
327 return symbol, context[i:]
339 return symbol, context[i:]
328 else:
340 else:
329 symbol = new_symbol
341 symbol = new_symbol
330
342
331 return symbol, []
343 return symbol, []
332
344
333 def start(self):
345 def start(self):
334 while True:
346 while True:
335 ident = self.reply_socket.recv()
347 ident = self.reply_socket.recv()
336 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
348 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
337 msg = self.reply_socket.recv_json()
349 msg = self.reply_socket.recv_json()
338 omsg = Message(msg)
350 omsg = Message(msg)
339 print>>sys.__stdout__
351 print>>sys.__stdout__
340 print>>sys.__stdout__, omsg
352 print>>sys.__stdout__, omsg
341 handler = self.handlers.get(omsg.msg_type, None)
353 handler = self.handlers.get(omsg.msg_type, None)
342 if handler is None:
354 if handler is None:
343 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
355 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
344 else:
356 else:
345 handler(ident, omsg)
357 handler(ident, omsg)
346
358
347 #-----------------------------------------------------------------------------
359 #-----------------------------------------------------------------------------
348 # Kernel main and launch functions
360 # Kernel main and launch functions
349 #-----------------------------------------------------------------------------
361 #-----------------------------------------------------------------------------
350
362
351 class ExitPollerUnix(Thread):
363 class ExitPollerUnix(Thread):
352 """ A Unix-specific daemon thread that terminates the program immediately
364 """ A Unix-specific daemon thread that terminates the program immediately
353 when this process' parent process no longer exists.
365 when this process' parent process no longer exists.
354 """
366 """
355
367
356 def __init__(self):
368 def __init__(self):
357 super(ExitPollerUnix, self).__init__()
369 super(ExitPollerUnix, self).__init__()
358 self.daemon = True
370 self.daemon = True
359
371
360 def run(self):
372 def run(self):
361 # We cannot use os.waitpid because it works only for child processes.
373 # We cannot use os.waitpid because it works only for child processes.
362 from errno import EINTR
374 from errno import EINTR
363 while True:
375 while True:
364 try:
376 try:
365 if os.getppid() == 1:
377 if os.getppid() == 1:
366 os._exit(1)
378 os._exit(1)
367 time.sleep(1.0)
379 time.sleep(1.0)
368 except OSError, e:
380 except OSError, e:
369 if e.errno == EINTR:
381 if e.errno == EINTR:
370 continue
382 continue
371 raise
383 raise
372
384
373 class ExitPollerWindows(Thread):
385 class ExitPollerWindows(Thread):
374 """ A Windows-specific daemon thread that terminates the program immediately
386 """ A Windows-specific daemon thread that terminates the program immediately
375 when a Win32 handle is signaled.
387 when a Win32 handle is signaled.
376 """
388 """
377
389
378 def __init__(self, handle):
390 def __init__(self, handle):
379 super(ExitPollerWindows, self).__init__()
391 super(ExitPollerWindows, self).__init__()
380 self.daemon = True
392 self.daemon = True
381 self.handle = handle
393 self.handle = handle
382
394
383 def run(self):
395 def run(self):
384 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
396 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
385 result = WaitForSingleObject(self.handle, INFINITE)
397 result = WaitForSingleObject(self.handle, INFINITE)
386 if result == WAIT_OBJECT_0:
398 if result == WAIT_OBJECT_0:
387 os._exit(1)
399 os._exit(1)
388
400
389
401
390 def bind_port(socket, ip, port):
402 def bind_port(socket, ip, port):
391 """ Binds the specified ZMQ socket. If the port is less than zero, a random
403 """ Binds the specified ZMQ socket. If the port is less than zero, a random
392 port is chosen. Returns the port that was bound.
404 port is chosen. Returns the port that was bound.
393 """
405 """
394 connection = 'tcp://%s' % ip
406 connection = 'tcp://%s' % ip
395 if port <= 0:
407 if port <= 0:
396 port = socket.bind_to_random_port(connection)
408 port = socket.bind_to_random_port(connection)
397 else:
409 else:
398 connection += ':%i' % port
410 connection += ':%i' % port
399 socket.bind(connection)
411 socket.bind(connection)
400 return port
412 return port
401
413
402
414
403 def main():
415 def main():
404 """ Main entry point for launching a kernel.
416 """ Main entry point for launching a kernel.
405 """
417 """
406 # Parse command line arguments.
418 # Parse command line arguments.
407 parser = ArgumentParser()
419 parser = ArgumentParser()
408 parser.add_argument('--ip', type=str, default='127.0.0.1',
420 parser.add_argument('--ip', type=str, default='127.0.0.1',
409 help='set the kernel\'s IP address [default: local]')
421 help='set the kernel\'s IP address [default: local]')
410 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
422 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
411 help='set the XREP channel port [default: random]')
423 help='set the XREP channel port [default: random]')
412 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
424 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
413 help='set the PUB channel port [default: random]')
425 help='set the PUB channel port [default: random]')
414 parser.add_argument('--req', type=int, metavar='PORT', default=0,
426 parser.add_argument('--req', type=int, metavar='PORT', default=0,
415 help='set the REQ channel port [default: random]')
427 help='set the REQ channel port [default: random]')
416 if sys.platform == 'win32':
428 if sys.platform == 'win32':
417 parser.add_argument('--parent', type=int, metavar='HANDLE',
429 parser.add_argument('--parent', type=int, metavar='HANDLE',
418 default=0, help='kill this process if the process '
430 default=0, help='kill this process if the process '
419 'with HANDLE dies')
431 'with HANDLE dies')
420 else:
432 else:
421 parser.add_argument('--parent', action='store_true',
433 parser.add_argument('--parent', action='store_true',
422 help='kill this process if its parent dies')
434 help='kill this process if its parent dies')
423 namespace = parser.parse_args()
435 namespace = parser.parse_args()
424
436
425 # Create a context, a session, and the kernel sockets.
437 # Create a context, a session, and the kernel sockets.
426 print >>sys.__stdout__, "Starting the kernel..."
438 print >>sys.__stdout__, "Starting the kernel..."
427 context = zmq.Context()
439 context = zmq.Context()
428 session = Session(username=u'kernel')
440 session = Session(username=u'kernel')
429
441
430 reply_socket = context.socket(zmq.XREP)
442 reply_socket = context.socket(zmq.XREP)
431 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
443 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
432 print >>sys.__stdout__, "XREP Channel on port", xrep_port
444 print >>sys.__stdout__, "XREP Channel on port", xrep_port
433
445
434 pub_socket = context.socket(zmq.PUB)
446 pub_socket = context.socket(zmq.PUB)
435 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
447 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
436 print >>sys.__stdout__, "PUB Channel on port", pub_port
448 print >>sys.__stdout__, "PUB Channel on port", pub_port
437
449
438 req_socket = context.socket(zmq.XREQ)
450 req_socket = context.socket(zmq.XREQ)
439 req_port = bind_port(req_socket, namespace.ip, namespace.req)
451 req_port = bind_port(req_socket, namespace.ip, namespace.req)
440 print >>sys.__stdout__, "REQ Channel on port", req_port
452 print >>sys.__stdout__, "REQ Channel on port", req_port
441
453
442 # Redirect input streams and set a display hook.
454 # Redirect input streams and set a display hook.
443 sys.stdin = InStream(session, req_socket)
455 sys.stdin = InStream(session, req_socket)
444 sys.stdout = OutStream(session, pub_socket, u'stdout')
456 sys.stdout = OutStream(session, pub_socket, u'stdout')
445 sys.stderr = OutStream(session, pub_socket, u'stderr')
457 sys.stderr = OutStream(session, pub_socket, u'stderr')
446 sys.displayhook = DisplayHook(session, pub_socket)
458 sys.displayhook = DisplayHook(session, pub_socket)
447
459
448 # Create the kernel.
460 # Create the kernel.
449 kernel = Kernel(session, reply_socket, pub_socket)
461 kernel = Kernel(session, reply_socket, pub_socket)
450
462
451 # Configure this kernel/process to die on parent termination, if necessary.
463 # Configure this kernel/process to die on parent termination, if necessary.
452 if namespace.parent:
464 if namespace.parent:
453 if sys.platform == 'win32':
465 if sys.platform == 'win32':
454 poller = ExitPollerWindows(namespace.parent)
466 poller = ExitPollerWindows(namespace.parent)
455 else:
467 else:
456 poller = ExitPollerUnix()
468 poller = ExitPollerUnix()
457 poller.start()
469 poller.start()
458
470
459 # Start the kernel mainloop.
471 # Start the kernel mainloop.
460 kernel.start()
472 kernel.start()
461
473
462
474
463 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
475 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
464 """ Launches a localhost kernel, binding to the specified ports.
476 """ Launches a localhost kernel, binding to the specified ports.
465
477
466 Parameters
478 Parameters
467 ----------
479 ----------
468 xrep_port : int, optional
480 xrep_port : int, optional
469 The port to use for XREP channel.
481 The port to use for XREP channel.
470
482
471 pub_port : int, optional
483 pub_port : int, optional
472 The port to use for the SUB channel.
484 The port to use for the SUB channel.
473
485
474 req_port : int, optional
486 req_port : int, optional
475 The port to use for the REQ (raw input) channel.
487 The port to use for the REQ (raw input) channel.
476
488
477 independent : bool, optional (default False)
489 independent : bool, optional (default False)
478 If set, the kernel process is guaranteed to survive if this process
490 If set, the kernel process is guaranteed to survive if this process
479 dies. If not set, an effort is made to ensure that the kernel is killed
491 dies. If not set, an effort is made to ensure that the kernel is killed
480 when this process dies. Note that in this case it is still good practice
492 when this process dies. Note that in this case it is still good practice
481 to kill kernels manually before exiting.
493 to kill kernels manually before exiting.
482
494
483 Returns
495 Returns
484 -------
496 -------
485 A tuple of form:
497 A tuple of form:
486 (kernel_process, xrep_port, pub_port, req_port)
498 (kernel_process, xrep_port, pub_port, req_port)
487 where kernel_process is a Popen object and the ports are integers.
499 where kernel_process is a Popen object and the ports are integers.
488 """
500 """
489 import socket
501 import socket
490 from subprocess import Popen
502 from subprocess import Popen
491
503
492 # Find open ports as necessary.
504 # Find open ports as necessary.
493 ports = []
505 ports = []
494 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
506 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
495 for i in xrange(ports_needed):
507 for i in xrange(ports_needed):
496 sock = socket.socket()
508 sock = socket.socket()
497 sock.bind(('', 0))
509 sock.bind(('', 0))
498 ports.append(sock)
510 ports.append(sock)
499 for i, sock in enumerate(ports):
511 for i, sock in enumerate(ports):
500 port = sock.getsockname()[1]
512 port = sock.getsockname()[1]
501 sock.close()
513 sock.close()
502 ports[i] = port
514 ports[i] = port
503 if xrep_port <= 0:
515 if xrep_port <= 0:
504 xrep_port = ports.pop(0)
516 xrep_port = ports.pop(0)
505 if pub_port <= 0:
517 if pub_port <= 0:
506 pub_port = ports.pop(0)
518 pub_port = ports.pop(0)
507 if req_port <= 0:
519 if req_port <= 0:
508 req_port = ports.pop(0)
520 req_port = ports.pop(0)
509
521
510 # Spawn a kernel.
522 # Spawn a kernel.
511 command = 'from IPython.zmq.kernel import main; main()'
523 command = 'from IPython.zmq.kernel import main; main()'
512 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
524 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
513 '--pub', str(pub_port), '--req', str(req_port) ]
525 '--pub', str(pub_port), '--req', str(req_port) ]
514 if independent:
526 if independent:
515 if sys.platform == 'win32':
527 if sys.platform == 'win32':
516 proc = Popen(['start', '/b'] + arguments, shell=True)
528 proc = Popen(['start', '/b'] + arguments, shell=True)
517 else:
529 else:
518 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
530 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
519 else:
531 else:
520 if sys.platform == 'win32':
532 if sys.platform == 'win32':
521 from _subprocess import DuplicateHandle, GetCurrentProcess, \
533 from _subprocess import DuplicateHandle, GetCurrentProcess, \
522 DUPLICATE_SAME_ACCESS
534 DUPLICATE_SAME_ACCESS
523 pid = GetCurrentProcess()
535 pid = GetCurrentProcess()
524 handle = DuplicateHandle(pid, pid, pid, 0,
536 handle = DuplicateHandle(pid, pid, pid, 0,
525 True, # Inheritable by new processes.
537 True, # Inheritable by new processes.
526 DUPLICATE_SAME_ACCESS)
538 DUPLICATE_SAME_ACCESS)
527 proc = Popen(arguments + ['--parent', str(int(handle))])
539 proc = Popen(arguments + ['--parent', str(int(handle))])
528 else:
540 else:
529 proc = Popen(arguments + ['--parent'])
541 proc = Popen(arguments + ['--parent'])
530
542
531 return proc, xrep_port, pub_port, req_port
543 return proc, xrep_port, pub_port, req_port
532
544
533
545
534 if __name__ == '__main__':
546 if __name__ == '__main__':
535 main()
547 main()
General Comments 0
You need to be logged in to leave comments. Login now