##// END OF EJS Templates
Separating kernel into smaller pieces.
Brian Granger -
Show More
@@ -0,0 +1,22 b''
1 import __builtin__
2
3 from session import extract_header
4
5 class DisplayHook(object):
6
7 def __init__(self, session, pub_socket):
8 self.session = session
9 self.pub_socket = pub_socket
10 self.parent_header = {}
11
12 def __call__(self, obj):
13 if obj is None:
14 return
15
16 __builtin__._ = obj
17 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
18 parent=self.parent_header)
19 self.pub_socket.send_json(msg)
20
21 def set_parent(self, parent):
22 self.parent_header = extract_header(parent) No newline at end of file
@@ -0,0 +1,42 b''
1 import os
2 import time
3 from threading import Thread
4
5
6 class ExitPollerUnix(Thread):
7 """ A Unix-specific daemon thread that terminates the program immediately
8 when the parent process no longer exists.
9 """
10
11 def __init__(self):
12 super(ExitPollerUnix, self).__init__()
13 self.daemon = True
14
15 def run(self):
16 # We cannot use os.waitpid because it works only for child processes.
17 from errno import EINTR
18 while True:
19 try:
20 if os.getppid() == 1:
21 os._exit(1)
22 time.sleep(1.0)
23 except OSError, e:
24 if e.errno == EINTR:
25 continue
26 raise
27
28 class ExitPollerWindows(Thread):
29 """ A Windows-specific daemon thread that terminates the program immediately
30 when a Win32 handle is signaled.
31 """
32
33 def __init__(self, handle):
34 super(ExitPollerWindows, self).__init__()
35 self.daemon = True
36 self.handle = handle
37
38 def run(self):
39 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
40 result = WaitForSingleObject(self.handle, INFINITE)
41 if result == WAIT_OBJECT_0:
42 os._exit(1) No newline at end of file
@@ -0,0 +1,77 b''
1 import sys
2 import time
3 from cStringIO import StringIO
4
5 from session import extract_header, Message
6
7 #-----------------------------------------------------------------------------
8 # Stream classes
9 #-----------------------------------------------------------------------------
10
11 class OutStream(object):
12 """A file like object that publishes the stream to a 0MQ PUB socket."""
13
14 # The time interval between automatic flushes, in seconds.
15 flush_interval = 0.05
16
17 def __init__(self, session, pub_socket, name):
18 self.session = session
19 self.pub_socket = pub_socket
20 self.name = name
21 self.parent_header = {}
22 self._new_buffer()
23
24 def set_parent(self, parent):
25 self.parent_header = extract_header(parent)
26
27 def close(self):
28 self.pub_socket = None
29
30 def flush(self):
31 if self.pub_socket is None:
32 raise ValueError(u'I/O operation on closed file')
33 else:
34 data = self._buffer.getvalue()
35 if data:
36 content = {u'name':self.name, u'data':data}
37 msg = self.session.msg(u'stream', content=content,
38 parent=self.parent_header)
39 print>>sys.__stdout__, Message(msg)
40 self.pub_socket.send_json(msg)
41
42 self._buffer.close()
43 self._new_buffer()
44
45 def isatty(self):
46 return False
47
48 def next(self):
49 raise IOError('Read not supported on a write only stream.')
50
51 def read(self, size=-1):
52 raise IOError('Read not supported on a write only stream.')
53
54 def readline(self, size=-1):
55 raise IOError('Read not supported on a write only stream.')
56
57 def write(self, string):
58 if self.pub_socket is None:
59 raise ValueError('I/O operation on closed file')
60 else:
61 self._buffer.write(string)
62 current_time = time.time()
63 if self._start <= 0:
64 self._start = current_time
65 elif current_time - self._start > self.flush_interval:
66 self.flush()
67
68 def writelines(self, sequence):
69 if self.pub_socket is None:
70 raise ValueError('I/O operation on closed file')
71 else:
72 for string in sequence:
73 self.write(string)
74
75 def _new_buffer(self):
76 self._buffer = StringIO()
77 self._start = -1 No newline at end of file
@@ -1,488 +1,359 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
8 * Implement random port and security key logic.
10 * Implement control messages.
9 * Implement control messages.
11 * Implement event loop and poll version.
10 * Implement event loop and poll version.
12 """
11 """
13
12
14 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 # Imports
14 # Imports
16 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17
16
18 # Standard library imports.
17 # Standard library imports.
19 import __builtin__
18 import __builtin__
20 from code import CommandCompiler
19 from code import CommandCompiler
21 from cStringIO import StringIO
22 import os
20 import os
23 import sys
21 import sys
24 from threading import Thread
25 import time
22 import time
26 import traceback
23 import traceback
27
24
28 # System library imports.
25 # System library imports.
29 import zmq
26 import zmq
30
27
31 # Local imports.
28 # Local imports.
32 from IPython.external.argparse import ArgumentParser
29 from IPython.external.argparse import ArgumentParser
33 from session import Session, Message, extract_header
30 from session import Session, Message
34 from completer import KernelCompleter
31 from completer import KernelCompleter
32 from .iostream import OutStream
33 from .displayhook import DisplayHook
34 from .exitpoller import ExitPollerUnix, ExitPollerWindows
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Kernel and stream classes
37 # Main kernel class
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 class OutStream(object):
41 """A file like object that publishes the stream to a 0MQ PUB socket."""
42
43 # The time interval between automatic flushes, in seconds.
44 flush_interval = 0.05
45
46 def __init__(self, session, pub_socket, name):
47 self.session = session
48 self.pub_socket = pub_socket
49 self.name = name
50 self.parent_header = {}
51 self._new_buffer()
52
53 def set_parent(self, parent):
54 self.parent_header = extract_header(parent)
55
56 def close(self):
57 self.pub_socket = None
58
59 def flush(self):
60 if self.pub_socket is None:
61 raise ValueError(u'I/O operation on closed file')
62 else:
63 data = self._buffer.getvalue()
64 if data:
65 content = {u'name':self.name, u'data':data}
66 msg = self.session.msg(u'stream', content=content,
67 parent=self.parent_header)
68 print>>sys.__stdout__, Message(msg)
69 self.pub_socket.send_json(msg)
70
71 self._buffer.close()
72 self._new_buffer()
73
74 def isatty(self):
75 return False
76
77 def next(self):
78 raise IOError('Read not supported on a write only stream.')
79
80 def read(self, size=-1):
81 raise IOError('Read not supported on a write only stream.')
82
83 def readline(self, size=-1):
84 raise IOError('Read not supported on a write only stream.')
85
86 def write(self, string):
87 if self.pub_socket is None:
88 raise ValueError('I/O operation on closed file')
89 else:
90 self._buffer.write(string)
91 current_time = time.time()
92 if self._start <= 0:
93 self._start = current_time
94 elif current_time - self._start > self.flush_interval:
95 self.flush()
96
97 def writelines(self, sequence):
98 if self.pub_socket is None:
99 raise ValueError('I/O operation on closed file')
100 else:
101 for string in sequence:
102 self.write(string)
103
104 def _new_buffer(self):
105 self._buffer = StringIO()
106 self._start = -1
107
108
109 class DisplayHook(object):
110
111 def __init__(self, session, pub_socket):
112 self.session = session
113 self.pub_socket = pub_socket
114 self.parent_header = {}
115
116 def __call__(self, obj):
117 if obj is None:
118 return
119
120 __builtin__._ = obj
121 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
122 parent=self.parent_header)
123 self.pub_socket.send_json(msg)
124
125 def set_parent(self, parent):
126 self.parent_header = extract_header(parent)
127
128
129 class Kernel(object):
40 class Kernel(object):
130
41
131 def __init__(self, session, reply_socket, pub_socket, req_socket):
42 def __init__(self, session, reply_socket, pub_socket, req_socket):
132 self.session = session
43 self.session = session
133 self.reply_socket = reply_socket
44 self.reply_socket = reply_socket
134 self.pub_socket = pub_socket
45 self.pub_socket = pub_socket
135 self.req_socket = req_socket
46 self.req_socket = req_socket
136 self.user_ns = {}
47 self.user_ns = {}
137 self.history = []
48 self.history = []
138 self.compiler = CommandCompiler()
49 self.compiler = CommandCompiler()
139 self.completer = KernelCompleter(self.user_ns)
50 self.completer = KernelCompleter(self.user_ns)
140
51
141 # Build dict of handlers for message types
52 # Build dict of handlers for message types
142 msg_types = [ 'execute_request', 'complete_request',
53 msg_types = [ 'execute_request', 'complete_request',
143 'object_info_request' ]
54 'object_info_request' ]
144 self.handlers = {}
55 self.handlers = {}
145 for msg_type in msg_types:
56 for msg_type in msg_types:
146 self.handlers[msg_type] = getattr(self, msg_type)
57 self.handlers[msg_type] = getattr(self, msg_type)
147
58
148 def abort_queue(self):
59 def abort_queue(self):
149 while True:
60 while True:
150 try:
61 try:
151 ident = self.reply_socket.recv(zmq.NOBLOCK)
62 ident = self.reply_socket.recv(zmq.NOBLOCK)
152 except zmq.ZMQError, e:
63 except zmq.ZMQError, e:
153 if e.errno == zmq.EAGAIN:
64 if e.errno == zmq.EAGAIN:
154 break
65 break
155 else:
66 else:
156 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
67 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
157 msg = self.reply_socket.recv_json()
68 msg = self.reply_socket.recv_json()
158 print>>sys.__stdout__, "Aborting:"
69 print>>sys.__stdout__, "Aborting:"
159 print>>sys.__stdout__, Message(msg)
70 print>>sys.__stdout__, Message(msg)
160 msg_type = msg['msg_type']
71 msg_type = msg['msg_type']
161 reply_type = msg_type.split('_')[0] + '_reply'
72 reply_type = msg_type.split('_')[0] + '_reply'
162 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
73 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
163 print>>sys.__stdout__, Message(reply_msg)
74 print>>sys.__stdout__, Message(reply_msg)
164 self.reply_socket.send(ident,zmq.SNDMORE)
75 self.reply_socket.send(ident,zmq.SNDMORE)
165 self.reply_socket.send_json(reply_msg)
76 self.reply_socket.send_json(reply_msg)
166 # We need to wait a bit for requests to come in. This can probably
77 # We need to wait a bit for requests to come in. This can probably
167 # be set shorter for true asynchronous clients.
78 # be set shorter for true asynchronous clients.
168 time.sleep(0.1)
79 time.sleep(0.1)
169
80
170 def execute_request(self, ident, parent):
81 def execute_request(self, ident, parent):
171 try:
82 try:
172 code = parent[u'content'][u'code']
83 code = parent[u'content'][u'code']
173 except:
84 except:
174 print>>sys.__stderr__, "Got bad msg: "
85 print>>sys.__stderr__, "Got bad msg: "
175 print>>sys.__stderr__, Message(parent)
86 print>>sys.__stderr__, Message(parent)
176 return
87 return
177 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
88 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
178 self.pub_socket.send_json(pyin_msg)
89 self.pub_socket.send_json(pyin_msg)
179
90
180 try:
91 try:
181 comp_code = self.compiler(code, '<zmq-kernel>')
92 comp_code = self.compiler(code, '<zmq-kernel>')
182
93
183 # Replace raw_input. Note that is not sufficient to replace
94 # Replace raw_input. Note that is not sufficient to replace
184 # raw_input in the user namespace.
95 # raw_input in the user namespace.
185 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
96 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
186 __builtin__.raw_input = raw_input
97 __builtin__.raw_input = raw_input
187
98
188 # Configure the display hook.
99 # Configure the display hook.
189 sys.displayhook.set_parent(parent)
100 sys.displayhook.set_parent(parent)
190
101
191 exec comp_code in self.user_ns, self.user_ns
102 exec comp_code in self.user_ns, self.user_ns
192 except:
103 except:
193 result = u'error'
194 etype, evalue, tb = sys.exc_info()
104 etype, evalue, tb = sys.exc_info()
195 tb = traceback.format_exception(etype, evalue, tb)
105 tb = traceback.format_exception(etype, evalue, tb)
196 exc_content = {
106 exc_content = {
197 u'status' : u'error',
107 u'status' : u'error',
198 u'traceback' : tb,
108 u'traceback' : tb,
199 u'ename' : unicode(etype.__name__),
109 u'ename' : unicode(etype.__name__),
200 u'evalue' : unicode(evalue)
110 u'evalue' : unicode(evalue)
201 }
111 }
202 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
112 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
203 self.pub_socket.send_json(exc_msg)
113 self.pub_socket.send_json(exc_msg)
204 reply_content = exc_content
114 reply_content = exc_content
205 else:
115 else:
206 reply_content = {'status' : 'ok'}
116 reply_content = {'status' : 'ok'}
207
117
208 # Flush output before sending the reply.
118 # Flush output before sending the reply.
209 sys.stderr.flush()
119 sys.stderr.flush()
210 sys.stdout.flush()
120 sys.stdout.flush()
211
121
212 # Send the reply.
122 # Send the reply.
213 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
123 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
214 print>>sys.__stdout__, Message(reply_msg)
124 print>>sys.__stdout__, Message(reply_msg)
215 self.reply_socket.send(ident, zmq.SNDMORE)
125 self.reply_socket.send(ident, zmq.SNDMORE)
216 self.reply_socket.send_json(reply_msg)
126 self.reply_socket.send_json(reply_msg)
217 if reply_msg['content']['status'] == u'error':
127 if reply_msg['content']['status'] == u'error':
218 self.abort_queue()
128 self.abort_queue()
219
129
220 def raw_input(self, prompt, ident, parent):
130 def raw_input(self, prompt, ident, parent):
221 # Flush output before making the request.
131 # Flush output before making the request.
222 sys.stderr.flush()
132 sys.stderr.flush()
223 sys.stdout.flush()
133 sys.stdout.flush()
224
134
225 # Send the input request.
135 # Send the input request.
226 content = dict(prompt=prompt)
136 content = dict(prompt=prompt)
227 msg = self.session.msg(u'input_request', content, parent)
137 msg = self.session.msg(u'input_request', content, parent)
228 self.req_socket.send_json(msg)
138 self.req_socket.send_json(msg)
229
139
230 # Await a response.
140 # Await a response.
231 reply = self.req_socket.recv_json()
141 reply = self.req_socket.recv_json()
232 try:
142 try:
233 value = reply['content']['value']
143 value = reply['content']['value']
234 except:
144 except:
235 print>>sys.__stderr__, "Got bad raw_input reply: "
145 print>>sys.__stderr__, "Got bad raw_input reply: "
236 print>>sys.__stderr__, Message(parent)
146 print>>sys.__stderr__, Message(parent)
237 value = ''
147 value = ''
238 return value
148 return value
239
149
240 def complete_request(self, ident, parent):
150 def complete_request(self, ident, parent):
241 matches = {'matches' : self.complete(parent),
151 matches = {'matches' : self.complete(parent),
242 'status' : 'ok'}
152 'status' : 'ok'}
243 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
153 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
244 matches, parent, ident)
154 matches, parent, ident)
245 print >> sys.__stdout__, completion_msg
155 print >> sys.__stdout__, completion_msg
246
156
247 def complete(self, msg):
157 def complete(self, msg):
248 return self.completer.complete(msg.content.line, msg.content.text)
158 return self.completer.complete(msg.content.line, msg.content.text)
249
159
250 def object_info_request(self, ident, parent):
160 def object_info_request(self, ident, parent):
251 context = parent['content']['oname'].split('.')
161 context = parent['content']['oname'].split('.')
252 object_info = self.object_info(context)
162 object_info = self.object_info(context)
253 msg = self.session.send(self.reply_socket, 'object_info_reply',
163 msg = self.session.send(self.reply_socket, 'object_info_reply',
254 object_info, parent, ident)
164 object_info, parent, ident)
255 print >> sys.__stdout__, msg
165 print >> sys.__stdout__, msg
256
166
257 def object_info(self, context):
167 def object_info(self, context):
258 symbol, leftover = self.symbol_from_context(context)
168 symbol, leftover = self.symbol_from_context(context)
259 if symbol is not None and not leftover:
169 if symbol is not None and not leftover:
260 doc = getattr(symbol, '__doc__', '')
170 doc = getattr(symbol, '__doc__', '')
261 else:
171 else:
262 doc = ''
172 doc = ''
263 object_info = dict(docstring = doc)
173 object_info = dict(docstring = doc)
264 return object_info
174 return object_info
265
175
266 def symbol_from_context(self, context):
176 def symbol_from_context(self, context):
267 if not context:
177 if not context:
268 return None, context
178 return None, context
269
179
270 base_symbol_string = context[0]
180 base_symbol_string = context[0]
271 symbol = self.user_ns.get(base_symbol_string, None)
181 symbol = self.user_ns.get(base_symbol_string, None)
272 if symbol is None:
182 if symbol is None:
273 symbol = __builtin__.__dict__.get(base_symbol_string, None)
183 symbol = __builtin__.__dict__.get(base_symbol_string, None)
274 if symbol is None:
184 if symbol is None:
275 return None, context
185 return None, context
276
186
277 context = context[1:]
187 context = context[1:]
278 for i, name in enumerate(context):
188 for i, name in enumerate(context):
279 new_symbol = getattr(symbol, name, None)
189 new_symbol = getattr(symbol, name, None)
280 if new_symbol is None:
190 if new_symbol is None:
281 return symbol, context[i:]
191 return symbol, context[i:]
282 else:
192 else:
283 symbol = new_symbol
193 symbol = new_symbol
284
194
285 return symbol, []
195 return symbol, []
286
196
287 def start(self):
197 def start(self):
288 while True:
198 while True:
289 ident = self.reply_socket.recv()
199 ident = self.reply_socket.recv()
290 assert self.reply_socket.rcvmore(), "Missing message part."
200 assert self.reply_socket.rcvmore(), "Missing message part."
291 msg = self.reply_socket.recv_json()
201 msg = self.reply_socket.recv_json()
292 omsg = Message(msg)
202 omsg = Message(msg)
293 print>>sys.__stdout__
203 print>>sys.__stdout__
294 print>>sys.__stdout__, omsg
204 print>>sys.__stdout__, omsg
295 handler = self.handlers.get(omsg.msg_type, None)
205 handler = self.handlers.get(omsg.msg_type, None)
296 if handler is None:
206 if handler is None:
297 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
207 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
298 else:
208 else:
299 handler(ident, omsg)
209 handler(ident, omsg)
300
210
301 #-----------------------------------------------------------------------------
211 #-----------------------------------------------------------------------------
302 # Kernel main and launch functions
212 # Kernel main and launch functions
303 #-----------------------------------------------------------------------------
213 #-----------------------------------------------------------------------------
304
214
305 class ExitPollerUnix(Thread):
306 """ A Unix-specific daemon thread that terminates the program immediately
307 when the parent process no longer exists.
308 """
309
310 def __init__(self):
311 super(ExitPollerUnix, self).__init__()
312 self.daemon = True
313
314 def run(self):
315 # We cannot use os.waitpid because it works only for child processes.
316 from errno import EINTR
317 while True:
318 try:
319 if os.getppid() == 1:
320 os._exit(1)
321 time.sleep(1.0)
322 except OSError, e:
323 if e.errno == EINTR:
324 continue
325 raise
326
327 class ExitPollerWindows(Thread):
328 """ A Windows-specific daemon thread that terminates the program immediately
329 when a Win32 handle is signaled.
330 """
331
332 def __init__(self, handle):
333 super(ExitPollerWindows, self).__init__()
334 self.daemon = True
335 self.handle = handle
336
337 def run(self):
338 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
339 result = WaitForSingleObject(self.handle, INFINITE)
340 if result == WAIT_OBJECT_0:
341 os._exit(1)
342
343
344 def bind_port(socket, ip, port):
215 def bind_port(socket, ip, port):
345 """ Binds the specified ZMQ socket. If the port is less than zero, a random
216 """ Binds the specified ZMQ socket. If the port is less than zero, a random
346 port is chosen. Returns the port that was bound.
217 port is chosen. Returns the port that was bound.
347 """
218 """
348 connection = 'tcp://%s' % ip
219 connection = 'tcp://%s' % ip
349 if port <= 0:
220 if port <= 0:
350 port = socket.bind_to_random_port(connection)
221 port = socket.bind_to_random_port(connection)
351 else:
222 else:
352 connection += ':%i' % port
223 connection += ':%i' % port
353 socket.bind(connection)
224 socket.bind(connection)
354 return port
225 return port
355
226
356
227
357 def main():
228 def main():
358 """ Main entry point for launching a kernel.
229 """ Main entry point for launching a kernel.
359 """
230 """
360 # Parse command line arguments.
231 # Parse command line arguments.
361 parser = ArgumentParser()
232 parser = ArgumentParser()
362 parser.add_argument('--ip', type=str, default='127.0.0.1',
233 parser.add_argument('--ip', type=str, default='127.0.0.1',
363 help='set the kernel\'s IP address [default: local]')
234 help='set the kernel\'s IP address [default: local]')
364 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
235 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
365 help='set the XREP channel port [default: random]')
236 help='set the XREP channel port [default: random]')
366 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
237 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
367 help='set the PUB channel port [default: random]')
238 help='set the PUB channel port [default: random]')
368 parser.add_argument('--req', type=int, metavar='PORT', default=0,
239 parser.add_argument('--req', type=int, metavar='PORT', default=0,
369 help='set the REQ channel port [default: random]')
240 help='set the REQ channel port [default: random]')
370 if sys.platform == 'win32':
241 if sys.platform == 'win32':
371 parser.add_argument('--parent', type=int, metavar='HANDLE',
242 parser.add_argument('--parent', type=int, metavar='HANDLE',
372 default=0, help='kill this process if the process '
243 default=0, help='kill this process if the process '
373 'with HANDLE dies')
244 'with HANDLE dies')
374 else:
245 else:
375 parser.add_argument('--parent', action='store_true',
246 parser.add_argument('--parent', action='store_true',
376 help='kill this process if its parent dies')
247 help='kill this process if its parent dies')
377 namespace = parser.parse_args()
248 namespace = parser.parse_args()
378
249
379 # Create a context, a session, and the kernel sockets.
250 # Create a context, a session, and the kernel sockets.
380 print >>sys.__stdout__, "Starting the kernel..."
251 print >>sys.__stdout__, "Starting the kernel..."
381 context = zmq.Context()
252 context = zmq.Context()
382 session = Session(username=u'kernel')
253 session = Session(username=u'kernel')
383
254
384 reply_socket = context.socket(zmq.XREP)
255 reply_socket = context.socket(zmq.XREP)
385 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
256 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
386 print >>sys.__stdout__, "XREP Channel on port", xrep_port
257 print >>sys.__stdout__, "XREP Channel on port", xrep_port
387
258
388 pub_socket = context.socket(zmq.PUB)
259 pub_socket = context.socket(zmq.PUB)
389 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
260 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
390 print >>sys.__stdout__, "PUB Channel on port", pub_port
261 print >>sys.__stdout__, "PUB Channel on port", pub_port
391
262
392 req_socket = context.socket(zmq.XREQ)
263 req_socket = context.socket(zmq.XREQ)
393 req_port = bind_port(req_socket, namespace.ip, namespace.req)
264 req_port = bind_port(req_socket, namespace.ip, namespace.req)
394 print >>sys.__stdout__, "REQ Channel on port", req_port
265 print >>sys.__stdout__, "REQ Channel on port", req_port
395
266
396 # Redirect input streams and set a display hook.
267 # Redirect input streams and set a display hook.
397 sys.stdout = OutStream(session, pub_socket, u'stdout')
268 sys.stdout = OutStream(session, pub_socket, u'stdout')
398 sys.stderr = OutStream(session, pub_socket, u'stderr')
269 sys.stderr = OutStream(session, pub_socket, u'stderr')
399 sys.displayhook = DisplayHook(session, pub_socket)
270 sys.displayhook = DisplayHook(session, pub_socket)
400
271
401 # Create the kernel.
272 # Create the kernel.
402 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
273 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
403
274
404 # Configure this kernel/process to die on parent termination, if necessary.
275 # Configure this kernel/process to die on parent termination, if necessary.
405 if namespace.parent:
276 if namespace.parent:
406 if sys.platform == 'win32':
277 if sys.platform == 'win32':
407 poller = ExitPollerWindows(namespace.parent)
278 poller = ExitPollerWindows(namespace.parent)
408 else:
279 else:
409 poller = ExitPollerUnix()
280 poller = ExitPollerUnix()
410 poller.start()
281 poller.start()
411
282
412 # Start the kernel mainloop.
283 # Start the kernel mainloop.
413 kernel.start()
284 kernel.start()
414
285
415
286
416 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
287 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
417 """ Launches a localhost kernel, binding to the specified ports.
288 """ Launches a localhost kernel, binding to the specified ports.
418
289
419 Parameters
290 Parameters
420 ----------
291 ----------
421 xrep_port : int, optional
292 xrep_port : int, optional
422 The port to use for XREP channel.
293 The port to use for XREP channel.
423
294
424 pub_port : int, optional
295 pub_port : int, optional
425 The port to use for the SUB channel.
296 The port to use for the SUB channel.
426
297
427 req_port : int, optional
298 req_port : int, optional
428 The port to use for the REQ (raw input) channel.
299 The port to use for the REQ (raw input) channel.
429
300
430 independent : bool, optional (default False)
301 independent : bool, optional (default False)
431 If set, the kernel process is guaranteed to survive if this process
302 If set, the kernel process is guaranteed to survive if this process
432 dies. If not set, an effort is made to ensure that the kernel is killed
303 dies. If not set, an effort is made to ensure that the kernel is killed
433 when this process dies. Note that in this case it is still good practice
304 when this process dies. Note that in this case it is still good practice
434 to kill kernels manually before exiting.
305 to kill kernels manually before exiting.
435
306
436 Returns
307 Returns
437 -------
308 -------
438 A tuple of form:
309 A tuple of form:
439 (kernel_process, xrep_port, pub_port, req_port)
310 (kernel_process, xrep_port, pub_port, req_port)
440 where kernel_process is a Popen object and the ports are integers.
311 where kernel_process is a Popen object and the ports are integers.
441 """
312 """
442 import socket
313 import socket
443 from subprocess import Popen
314 from subprocess import Popen
444
315
445 # Find open ports as necessary.
316 # Find open ports as necessary.
446 ports = []
317 ports = []
447 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
318 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
448 for i in xrange(ports_needed):
319 for i in xrange(ports_needed):
449 sock = socket.socket()
320 sock = socket.socket()
450 sock.bind(('', 0))
321 sock.bind(('', 0))
451 ports.append(sock)
322 ports.append(sock)
452 for i, sock in enumerate(ports):
323 for i, sock in enumerate(ports):
453 port = sock.getsockname()[1]
324 port = sock.getsockname()[1]
454 sock.close()
325 sock.close()
455 ports[i] = port
326 ports[i] = port
456 if xrep_port <= 0:
327 if xrep_port <= 0:
457 xrep_port = ports.pop(0)
328 xrep_port = ports.pop(0)
458 if pub_port <= 0:
329 if pub_port <= 0:
459 pub_port = ports.pop(0)
330 pub_port = ports.pop(0)
460 if req_port <= 0:
331 if req_port <= 0:
461 req_port = ports.pop(0)
332 req_port = ports.pop(0)
462
333
463 # Spawn a kernel.
334 # Spawn a kernel.
464 command = 'from IPython.zmq.kernel import main; main()'
335 command = 'from IPython.zmq.kernel import main; main()'
465 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
336 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
466 '--pub', str(pub_port), '--req', str(req_port) ]
337 '--pub', str(pub_port), '--req', str(req_port) ]
467 if independent:
338 if independent:
468 if sys.platform == 'win32':
339 if sys.platform == 'win32':
469 proc = Popen(['start', '/b'] + arguments, shell=True)
340 proc = Popen(['start', '/b'] + arguments, shell=True)
470 else:
341 else:
471 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
342 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
472 else:
343 else:
473 if sys.platform == 'win32':
344 if sys.platform == 'win32':
474 from _subprocess import DuplicateHandle, GetCurrentProcess, \
345 from _subprocess import DuplicateHandle, GetCurrentProcess, \
475 DUPLICATE_SAME_ACCESS
346 DUPLICATE_SAME_ACCESS
476 pid = GetCurrentProcess()
347 pid = GetCurrentProcess()
477 handle = DuplicateHandle(pid, pid, pid, 0,
348 handle = DuplicateHandle(pid, pid, pid, 0,
478 True, # Inheritable by new processes.
349 True, # Inheritable by new processes.
479 DUPLICATE_SAME_ACCESS)
350 DUPLICATE_SAME_ACCESS)
480 proc = Popen(arguments + ['--parent', str(int(handle))])
351 proc = Popen(arguments + ['--parent', str(int(handle))])
481 else:
352 else:
482 proc = Popen(arguments + ['--parent'])
353 proc = Popen(arguments + ['--parent'])
483
354
484 return proc, xrep_port, pub_port, req_port
355 return proc, xrep_port, pub_port, req_port
485
356
486
357
487 if __name__ == '__main__':
358 if __name__ == '__main__':
488 main()
359 main()
General Comments 0
You need to be logged in to leave comments. Login now