##// END OF EJS Templates
Kernel subprocess parent polling is now implemented for Windows.
epatters -
Show More
@@ -1,480 +1,526 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Finish implementing `raw_input`.
6 * Finish implementing `raw_input`.
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 call set_parent on all the PUB objects with the message about to be executed.
8 call set_parent on all the PUB objects with the message about to be executed.
9 * Implement random port and security key logic.
9 * Implement random port and security key logic.
10 * Implement control messages.
10 * Implement control messages.
11 * Implement event loop and poll version.
11 * Implement event loop and poll version.
12 """
12 """
13
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17
14 # Standard library imports.
18 # Standard library imports.
15 import __builtin__
19 import __builtin__
16 from code import CommandCompiler
20 from code import CommandCompiler
17 import os
21 import os
18 import sys
22 import sys
23 from threading import Thread
19 import time
24 import time
20 import traceback
25 import traceback
21
26
22 # System library imports.
27 # System library imports.
23 import zmq
28 import zmq
24
29
25 # Local imports.
30 # Local imports.
26 from IPython.external.argparse import ArgumentParser
31 from IPython.external.argparse import ArgumentParser
27 from session import Session, Message, extract_header
32 from session import Session, Message, extract_header
28 from completer import KernelCompleter
33 from completer import KernelCompleter
29
34
35 #-----------------------------------------------------------------------------
36 # Kernel and stream classes
37 #-----------------------------------------------------------------------------
30
38
31 class InStream(object):
39 class InStream(object):
32 """ A file like object that reads from a 0MQ XREQ socket."""
40 """ A file like object that reads from a 0MQ XREQ socket."""
33
41
34 def __init__(self, session, socket):
42 def __init__(self, session, socket):
35 self.session = session
43 self.session = session
36 self.socket = socket
44 self.socket = socket
37
45
38 def close(self):
46 def close(self):
39 self.socket = None
47 self.socket = None
40
48
41 def flush(self):
49 def flush(self):
42 if self.socket is None:
50 if self.socket is None:
43 raise ValueError('I/O operation on closed file')
51 raise ValueError('I/O operation on closed file')
44
52
45 def isatty(self):
53 def isatty(self):
46 return False
54 return False
47
55
48 def next(self):
56 def next(self):
49 raise IOError('Seek not supported.')
57 raise IOError('Seek not supported.')
50
58
51 def read(self, size=-1):
59 def read(self, size=-1):
52 # FIXME: Do we want another request for this?
60 # FIXME: Do we want another request for this?
53 string = '\n'.join(self.readlines())
61 string = '\n'.join(self.readlines())
54 return self._truncate(string, size)
62 return self._truncate(string, size)
55
63
56 def readline(self, size=-1):
64 def readline(self, size=-1):
57 if self.socket is None:
65 if self.socket is None:
58 raise ValueError('I/O operation on closed file')
66 raise ValueError('I/O operation on closed file')
59 else:
67 else:
60 content = dict(size=size)
68 content = dict(size=size)
61 msg = self.session.msg('readline_request', content=content)
69 msg = self.session.msg('readline_request', content=content)
62 reply = self._request(msg)
70 reply = self._request(msg)
63 line = reply['content']['line']
71 line = reply['content']['line']
64 return self._truncate(line, size)
72 return self._truncate(line, size)
65
73
66 def readlines(self, sizehint=-1):
74 def readlines(self, sizehint=-1):
67 # Sizehint is ignored, as is permitted.
75 # Sizehint is ignored, as is permitted.
68 if self.socket is None:
76 if self.socket is None:
69 raise ValueError('I/O operation on closed file')
77 raise ValueError('I/O operation on closed file')
70 else:
78 else:
71 lines = []
79 lines = []
72 while True:
80 while True:
73 line = self.readline()
81 line = self.readline()
74 if line:
82 if line:
75 lines.append(line)
83 lines.append(line)
76 else:
84 else:
77 break
85 break
78 return lines
86 return lines
79
87
80 def seek(self, offset, whence=None):
88 def seek(self, offset, whence=None):
81 raise IOError('Seek not supported.')
89 raise IOError('Seek not supported.')
82
90
83 def write(self, string):
91 def write(self, string):
84 raise IOError('Write not supported on a read only stream.')
92 raise IOError('Write not supported on a read only stream.')
85
93
86 def writelines(self, sequence):
94 def writelines(self, sequence):
87 raise IOError('Write not supported on a read only stream.')
95 raise IOError('Write not supported on a read only stream.')
88
96
89 def _request(self, msg):
97 def _request(self, msg):
90 # Flush output before making the request. This ensures, for example,
98 # Flush output before making the request. This ensures, for example,
91 # that raw_input(prompt) actually gets a prompt written.
99 # that raw_input(prompt) actually gets a prompt written.
92 sys.stderr.flush()
100 sys.stderr.flush()
93 sys.stdout.flush()
101 sys.stdout.flush()
94
102
95 self.socket.send_json(msg)
103 self.socket.send_json(msg)
96 while True:
104 while True:
97 try:
105 try:
98 reply = self.socket.recv_json(zmq.NOBLOCK)
106 reply = self.socket.recv_json(zmq.NOBLOCK)
99 except zmq.ZMQError, e:
107 except zmq.ZMQError, e:
100 if e.errno == zmq.EAGAIN:
108 if e.errno == zmq.EAGAIN:
101 pass
109 pass
102 else:
110 else:
103 raise
111 raise
104 else:
112 else:
105 break
113 break
106 return reply
114 return reply
107
115
108 def _truncate(self, string, size):
116 def _truncate(self, string, size):
109 if size >= 0:
117 if size >= 0:
110 if isinstance(string, str):
118 if isinstance(string, str):
111 return string[:size]
119 return string[:size]
112 elif isinstance(string, unicode):
120 elif isinstance(string, unicode):
113 encoded = string.encode('utf-8')[:size]
121 encoded = string.encode('utf-8')[:size]
114 return encoded.decode('utf-8', 'ignore')
122 return encoded.decode('utf-8', 'ignore')
115 return string
123 return string
116
124
117
125
118 class OutStream(object):
126 class OutStream(object):
119 """A file like object that publishes the stream to a 0MQ PUB socket."""
127 """A file like object that publishes the stream to a 0MQ PUB socket."""
120
128
121 def __init__(self, session, pub_socket, name, max_buffer=200):
129 def __init__(self, session, pub_socket, name, max_buffer=200):
122 self.session = session
130 self.session = session
123 self.pub_socket = pub_socket
131 self.pub_socket = pub_socket
124 self.name = name
132 self.name = name
125 self._buffer = []
133 self._buffer = []
126 self._buffer_len = 0
134 self._buffer_len = 0
127 self.max_buffer = max_buffer
135 self.max_buffer = max_buffer
128 self.parent_header = {}
136 self.parent_header = {}
129
137
130 def set_parent(self, parent):
138 def set_parent(self, parent):
131 self.parent_header = extract_header(parent)
139 self.parent_header = extract_header(parent)
132
140
133 def close(self):
141 def close(self):
134 self.pub_socket = None
142 self.pub_socket = None
135
143
136 def flush(self):
144 def flush(self):
137 if self.pub_socket is None:
145 if self.pub_socket is None:
138 raise ValueError(u'I/O operation on closed file')
146 raise ValueError(u'I/O operation on closed file')
139 else:
147 else:
140 if self._buffer:
148 if self._buffer:
141 data = ''.join(self._buffer)
149 data = ''.join(self._buffer)
142 content = {u'name':self.name, u'data':data}
150 content = {u'name':self.name, u'data':data}
143 msg = self.session.msg(u'stream', content=content,
151 msg = self.session.msg(u'stream', content=content,
144 parent=self.parent_header)
152 parent=self.parent_header)
145 print>>sys.__stdout__, Message(msg)
153 print>>sys.__stdout__, Message(msg)
146 self.pub_socket.send_json(msg)
154 self.pub_socket.send_json(msg)
147 self._buffer_len = 0
155 self._buffer_len = 0
148 self._buffer = []
156 self._buffer = []
149
157
150 def isatty(self):
158 def isatty(self):
151 return False
159 return False
152
160
153 def next(self):
161 def next(self):
154 raise IOError('Read not supported on a write only stream.')
162 raise IOError('Read not supported on a write only stream.')
155
163
156 def read(self, size=None):
164 def read(self, size=None):
157 raise IOError('Read not supported on a write only stream.')
165 raise IOError('Read not supported on a write only stream.')
158
166
159 readline=read
167 readline=read
160
168
161 def write(self, s):
169 def write(self, s):
162 if self.pub_socket is None:
170 if self.pub_socket is None:
163 raise ValueError('I/O operation on closed file')
171 raise ValueError('I/O operation on closed file')
164 else:
172 else:
165 self._buffer.append(s)
173 self._buffer.append(s)
166 self._buffer_len += len(s)
174 self._buffer_len += len(s)
167 self._maybe_send()
175 self._maybe_send()
168
176
169 def _maybe_send(self):
177 def _maybe_send(self):
170 if '\n' in self._buffer[-1]:
178 if '\n' in self._buffer[-1]:
171 self.flush()
179 self.flush()
172 if self._buffer_len > self.max_buffer:
180 if self._buffer_len > self.max_buffer:
173 self.flush()
181 self.flush()
174
182
175 def writelines(self, sequence):
183 def writelines(self, sequence):
176 if self.pub_socket is None:
184 if self.pub_socket is None:
177 raise ValueError('I/O operation on closed file')
185 raise ValueError('I/O operation on closed file')
178 else:
186 else:
179 for s in sequence:
187 for s in sequence:
180 self.write(s)
188 self.write(s)
181
189
182
190
183 class DisplayHook(object):
191 class DisplayHook(object):
184
192
185 def __init__(self, session, pub_socket):
193 def __init__(self, session, pub_socket):
186 self.session = session
194 self.session = session
187 self.pub_socket = pub_socket
195 self.pub_socket = pub_socket
188 self.parent_header = {}
196 self.parent_header = {}
189
197
190 def __call__(self, obj):
198 def __call__(self, obj):
191 if obj is None:
199 if obj is None:
192 return
200 return
193
201
194 __builtin__._ = obj
202 __builtin__._ = obj
195 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
203 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
196 parent=self.parent_header)
204 parent=self.parent_header)
197 self.pub_socket.send_json(msg)
205 self.pub_socket.send_json(msg)
198
206
199 def set_parent(self, parent):
207 def set_parent(self, parent):
200 self.parent_header = extract_header(parent)
208 self.parent_header = extract_header(parent)
201
209
202
210
203 class Kernel(object):
211 class Kernel(object):
204
212
205 def __init__(self, session, reply_socket, pub_socket):
213 def __init__(self, session, reply_socket, pub_socket):
206 self.session = session
214 self.session = session
207 self.reply_socket = reply_socket
215 self.reply_socket = reply_socket
208 self.pub_socket = pub_socket
216 self.pub_socket = pub_socket
209 self.user_ns = {}
217 self.user_ns = {}
210 self.history = []
218 self.history = []
211 self.compiler = CommandCompiler()
219 self.compiler = CommandCompiler()
212 self.completer = KernelCompleter(self.user_ns)
220 self.completer = KernelCompleter(self.user_ns)
213 self.poll_ppid = False
214
221
215 # Build dict of handlers for message types
222 # Build dict of handlers for message types
216 msg_types = [ 'execute_request', 'complete_request',
223 msg_types = [ 'execute_request', 'complete_request',
217 'object_info_request' ]
224 'object_info_request' ]
218 self.handlers = {}
225 self.handlers = {}
219 for msg_type in msg_types:
226 for msg_type in msg_types:
220 self.handlers[msg_type] = getattr(self, msg_type)
227 self.handlers[msg_type] = getattr(self, msg_type)
221
228
222 def abort_queue(self):
229 def abort_queue(self):
223 while True:
230 while True:
224 try:
231 try:
225 ident = self.reply_socket.recv(zmq.NOBLOCK)
232 ident = self.reply_socket.recv(zmq.NOBLOCK)
226 except zmq.ZMQError, e:
233 except zmq.ZMQError, e:
227 if e.errno == zmq.EAGAIN:
234 if e.errno == zmq.EAGAIN:
228 break
235 break
229 else:
236 else:
230 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
237 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
231 msg = self.reply_socket.recv_json()
238 msg = self.reply_socket.recv_json()
232 print>>sys.__stdout__, "Aborting:"
239 print>>sys.__stdout__, "Aborting:"
233 print>>sys.__stdout__, Message(msg)
240 print>>sys.__stdout__, Message(msg)
234 msg_type = msg['msg_type']
241 msg_type = msg['msg_type']
235 reply_type = msg_type.split('_')[0] + '_reply'
242 reply_type = msg_type.split('_')[0] + '_reply'
236 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
243 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
237 print>>sys.__stdout__, Message(reply_msg)
244 print>>sys.__stdout__, Message(reply_msg)
238 self.reply_socket.send(ident,zmq.SNDMORE)
245 self.reply_socket.send(ident,zmq.SNDMORE)
239 self.reply_socket.send_json(reply_msg)
246 self.reply_socket.send_json(reply_msg)
240 # We need to wait a bit for requests to come in. This can probably
247 # We need to wait a bit for requests to come in. This can probably
241 # be set shorter for true asynchronous clients.
248 # be set shorter for true asynchronous clients.
242 time.sleep(0.1)
249 time.sleep(0.1)
243
250
244 def execute_request(self, ident, parent):
251 def execute_request(self, ident, parent):
245 try:
252 try:
246 code = parent[u'content'][u'code']
253 code = parent[u'content'][u'code']
247 except:
254 except:
248 print>>sys.__stderr__, "Got bad msg: "
255 print>>sys.__stderr__, "Got bad msg: "
249 print>>sys.__stderr__, Message(parent)
256 print>>sys.__stderr__, Message(parent)
250 return
257 return
251 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
258 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
252 self.pub_socket.send_json(pyin_msg)
259 self.pub_socket.send_json(pyin_msg)
253 try:
260 try:
254 comp_code = self.compiler(code, '<zmq-kernel>')
261 comp_code = self.compiler(code, '<zmq-kernel>')
255 sys.displayhook.set_parent(parent)
262 sys.displayhook.set_parent(parent)
256 exec comp_code in self.user_ns, self.user_ns
263 exec comp_code in self.user_ns, self.user_ns
257 except:
264 except:
258 result = u'error'
265 result = u'error'
259 etype, evalue, tb = sys.exc_info()
266 etype, evalue, tb = sys.exc_info()
260 tb = traceback.format_exception(etype, evalue, tb)
267 tb = traceback.format_exception(etype, evalue, tb)
261 exc_content = {
268 exc_content = {
262 u'status' : u'error',
269 u'status' : u'error',
263 u'traceback' : tb,
270 u'traceback' : tb,
264 u'etype' : unicode(etype),
271 u'etype' : unicode(etype),
265 u'evalue' : unicode(evalue)
272 u'evalue' : unicode(evalue)
266 }
273 }
267 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
274 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
268 self.pub_socket.send_json(exc_msg)
275 self.pub_socket.send_json(exc_msg)
269 reply_content = exc_content
276 reply_content = exc_content
270 else:
277 else:
271 reply_content = {'status' : 'ok'}
278 reply_content = {'status' : 'ok'}
272 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
279 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
273 print>>sys.__stdout__, Message(reply_msg)
280 print>>sys.__stdout__, Message(reply_msg)
274 self.reply_socket.send(ident, zmq.SNDMORE)
281 self.reply_socket.send(ident, zmq.SNDMORE)
275 self.reply_socket.send_json(reply_msg)
282 self.reply_socket.send_json(reply_msg)
276 if reply_msg['content']['status'] == u'error':
283 if reply_msg['content']['status'] == u'error':
277 self.abort_queue()
284 self.abort_queue()
278
285
279 def complete_request(self, ident, parent):
286 def complete_request(self, ident, parent):
280 matches = {'matches' : self.complete(parent),
287 matches = {'matches' : self.complete(parent),
281 'status' : 'ok'}
288 'status' : 'ok'}
282 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
289 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
283 matches, parent, ident)
290 matches, parent, ident)
284 print >> sys.__stdout__, completion_msg
291 print >> sys.__stdout__, completion_msg
285
292
286 def complete(self, msg):
293 def complete(self, msg):
287 return self.completer.complete(msg.content.line, msg.content.text)
294 return self.completer.complete(msg.content.line, msg.content.text)
288
295
289 def object_info_request(self, ident, parent):
296 def object_info_request(self, ident, parent):
290 context = parent['content']['oname'].split('.')
297 context = parent['content']['oname'].split('.')
291 object_info = self.object_info(context)
298 object_info = self.object_info(context)
292 msg = self.session.send(self.reply_socket, 'object_info_reply',
299 msg = self.session.send(self.reply_socket, 'object_info_reply',
293 object_info, parent, ident)
300 object_info, parent, ident)
294 print >> sys.__stdout__, msg
301 print >> sys.__stdout__, msg
295
302
296 def object_info(self, context):
303 def object_info(self, context):
297 symbol, leftover = self.symbol_from_context(context)
304 symbol, leftover = self.symbol_from_context(context)
298 if symbol is not None and not leftover:
305 if symbol is not None and not leftover:
299 doc = getattr(symbol, '__doc__', '')
306 doc = getattr(symbol, '__doc__', '')
300 else:
307 else:
301 doc = ''
308 doc = ''
302 object_info = dict(docstring = doc)
309 object_info = dict(docstring = doc)
303 return object_info
310 return object_info
304
311
305 def symbol_from_context(self, context):
312 def symbol_from_context(self, context):
306 if not context:
313 if not context:
307 return None, context
314 return None, context
308
315
309 base_symbol_string = context[0]
316 base_symbol_string = context[0]
310 symbol = self.user_ns.get(base_symbol_string, None)
317 symbol = self.user_ns.get(base_symbol_string, None)
311 if symbol is None:
318 if symbol is None:
312 symbol = __builtin__.__dict__.get(base_symbol_string, None)
319 symbol = __builtin__.__dict__.get(base_symbol_string, None)
313 if symbol is None:
320 if symbol is None:
314 return None, context
321 return None, context
315
322
316 context = context[1:]
323 context = context[1:]
317 for i, name in enumerate(context):
324 for i, name in enumerate(context):
318 new_symbol = getattr(symbol, name, None)
325 new_symbol = getattr(symbol, name, None)
319 if new_symbol is None:
326 if new_symbol is None:
320 return symbol, context[i:]
327 return symbol, context[i:]
321 else:
328 else:
322 symbol = new_symbol
329 symbol = new_symbol
323
330
324 return symbol, []
331 return symbol, []
325
332
326 def start(self):
333 def start(self):
327 while True:
334 while True:
328 if self.poll_ppid and os.getppid() == 1:
329 print>>sys.__stderr__, "KILLED KERNEL. No parent process."
330 os._exit(1)
331
332 ident = self.reply_socket.recv()
335 ident = self.reply_socket.recv()
333 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
336 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
334 msg = self.reply_socket.recv_json()
337 msg = self.reply_socket.recv_json()
335 omsg = Message(msg)
338 omsg = Message(msg)
336 print>>sys.__stdout__
339 print>>sys.__stdout__
337 print>>sys.__stdout__, omsg
340 print>>sys.__stdout__, omsg
338 handler = self.handlers.get(omsg.msg_type, None)
341 handler = self.handlers.get(omsg.msg_type, None)
339 if handler is None:
342 if handler is None:
340 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
343 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
341 else:
344 else:
342 handler(ident, omsg)
345 handler(ident, omsg)
343
346
347 #-----------------------------------------------------------------------------
348 # Kernel main and launch functions
349 #-----------------------------------------------------------------------------
350
351 class UnixPoller(Thread):
352
353 def __init__(self):
354 super(UnixPoller, self).__init__()
355 self.daemon = True
356
357 def run(self):
358 while True:
359 if os.getppid() == 1:
360 os._exit(1)
361 time.sleep(5.0)
362
363 class WindowsPoller(Thread):
364
365 def __init__(self, handle):
366 super(WindowsPoller, self).__init__()
367 self.daemon = True
368 self.handle = handle
369
370 def run(self):
371 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
372 result = WaitForSingleObject(self.handle, INFINITE)
373 if result == WAIT_OBJECT_0:
374 os._exit(1)
344
375
345 def bind_port(socket, ip, port):
376 def bind_port(socket, ip, port):
346 """ Binds the specified ZMQ socket. If the port is less than zero, a random
377 """ Binds the specified ZMQ socket. If the port is less than zero, a random
347 port is chosen. Returns the port that was bound.
378 port is chosen. Returns the port that was bound.
348 """
379 """
349 connection = 'tcp://%s' % ip
380 connection = 'tcp://%s' % ip
350 if port <= 0:
381 if port <= 0:
351 port = socket.bind_to_random_port(connection)
382 port = socket.bind_to_random_port(connection)
352 else:
383 else:
353 connection += ':%i' % port
384 connection += ':%i' % port
354 socket.bind(connection)
385 socket.bind(connection)
355 return port
386 return port
356
387
357 def main():
388 def main():
358 """ Main entry point for launching a kernel.
389 """ Main entry point for launching a kernel.
359 """
390 """
360 # Parse command line arguments.
391 # Parse command line arguments.
361 parser = ArgumentParser()
392 parser = ArgumentParser()
362 parser.add_argument('--ip', type=str, default='127.0.0.1',
393 parser.add_argument('--ip', type=str, default='127.0.0.1',
363 help='set the kernel\'s IP address [default: local]')
394 help='set the kernel\'s IP address [default: local]')
364 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
395 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
365 help='set the XREP channel port [default: random]')
396 help='set the XREP channel port [default: random]')
366 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
397 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
367 help='set the PUB channel port [default: random]')
398 help='set the PUB channel port [default: random]')
368 parser.add_argument('--req', type=int, metavar='PORT', default=0,
399 parser.add_argument('--req', type=int, metavar='PORT', default=0,
369 help='set the REQ channel port [default: random]')
400 help='set the REQ channel port [default: random]')
370 parser.add_argument('--require-parent', action='store_true',
401 if sys.platform == 'win32':
371 help='ensure that this process dies with its parent')
402 parser.add_argument('--parent', type=int, metavar='HANDLE',
403 default=0, help='kill this process if the process '
404 'with HANDLE dies')
405 else:
406 parser.add_argument('--parent', action='store_true',
407 help='kill this process if its parent dies')
372 namespace = parser.parse_args()
408 namespace = parser.parse_args()
373
409
374 # Create a context, a session, and the kernel sockets.
410 # Create a context, a session, and the kernel sockets.
375 print >>sys.__stdout__, "Starting the kernel..."
411 print >>sys.__stdout__, "Starting the kernel..."
376 context = zmq.Context()
412 context = zmq.Context()
377 session = Session(username=u'kernel')
413 session = Session(username=u'kernel')
378
414
379 reply_socket = context.socket(zmq.XREP)
415 reply_socket = context.socket(zmq.XREP)
380 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
416 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
381 print >>sys.__stdout__, "XREP Channel on port", xrep_port
417 print >>sys.__stdout__, "XREP Channel on port", xrep_port
382
418
383 pub_socket = context.socket(zmq.PUB)
419 pub_socket = context.socket(zmq.PUB)
384 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
420 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
385 print >>sys.__stdout__, "PUB Channel on port", pub_port
421 print >>sys.__stdout__, "PUB Channel on port", pub_port
386
422
387 req_socket = context.socket(zmq.XREQ)
423 req_socket = context.socket(zmq.XREQ)
388 req_port = bind_port(req_socket, namespace.ip, namespace.req)
424 req_port = bind_port(req_socket, namespace.ip, namespace.req)
389 print >>sys.__stdout__, "REQ Channel on port", req_port
425 print >>sys.__stdout__, "REQ Channel on port", req_port
390
426
391 # Redirect input streams and set a display hook.
427 # Redirect input streams and set a display hook.
392 sys.stdin = InStream(session, req_socket)
428 sys.stdin = InStream(session, req_socket)
393 sys.stdout = OutStream(session, pub_socket, u'stdout')
429 sys.stdout = OutStream(session, pub_socket, u'stdout')
394 sys.stderr = OutStream(session, pub_socket, u'stderr')
430 sys.stderr = OutStream(session, pub_socket, u'stderr')
395 sys.displayhook = DisplayHook(session, pub_socket)
431 sys.displayhook = DisplayHook(session, pub_socket)
396
432
397 # Create the kernel.
433 # Create the kernel.
398 kernel = Kernel(session, reply_socket, pub_socket)
434 kernel = Kernel(session, reply_socket, pub_socket)
399
435
400 # Configure this kernel/process to die on parent termination, if necessary.
436 # Configure this kernel/process to die on parent termination, if necessary.
401 if namespace.require_parent:
437 if namespace.parent:
402 if sys.platform == 'linux2':
438 if sys.platform == 'linux2':
403 import ctypes, ctypes.util, signal
439 import ctypes, ctypes.util, signal
404 PR_SET_PDEATHSIG = 1
440 PR_SET_PDEATHSIG = 1
405 libc = ctypes.CDLL(ctypes.util.find_library('c'))
441 libc = ctypes.CDLL(ctypes.util.find_library('c'))
406 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
442 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
407
443 elif sys.platform == 'win32':
408 elif sys.platform != 'win32':
444 poller = WindowsPoller(namespace.parent)
409 kernel.poll_ppid = True
445 poller.start()
446 else:
447 poller = UnixPoller()
448 poller.start()
410
449
411 # Start the kernel mainloop.
450 # Start the kernel mainloop.
412 kernel.start()
451 kernel.start()
413
452
414
453
415 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
454 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
416 """ Launches a localhost kernel, binding to the specified ports.
455 """ Launches a localhost kernel, binding to the specified ports.
417
456
418 Parameters
457 Parameters
419 ----------
458 ----------
420 xrep_port : int, optional
459 xrep_port : int, optional
421 The port to use for XREP channel.
460 The port to use for XREP channel.
422
461
423 pub_port : int, optional
462 pub_port : int, optional
424 The port to use for the SUB channel.
463 The port to use for the SUB channel.
425
464
426 req_port : int, optional
465 req_port : int, optional
427 The port to use for the REQ (raw input) channel.
466 The port to use for the REQ (raw input) channel.
428
467
429 independent : bool, optional (default False)
468 independent : bool, optional (default False)
430 If set, the kernel process is guaranteed to survive if this process
469 If set, the kernel process is guaranteed to survive if this process
431 dies. If not set, an effort is made to ensure that the kernel is killed
470 dies. If not set, an effort is made to ensure that the kernel is killed
432 when this process dies. Note that in this case it is still good practice
471 when this process dies. Note that in this case it is still good practice
433 to attempt to kill kernels manually before exiting.
472 to kill kernels manually before exiting.
434
473
435 Returns
474 Returns
436 -------
475 -------
437 A tuple of form:
476 A tuple of form:
438 (kernel_process, xrep_port, pub_port, req_port)
477 (kernel_process, xrep_port, pub_port, req_port)
439 where kernel_process is a Popen object and the ports are integers.
478 where kernel_process is a Popen object and the ports are integers.
440 """
479 """
441 import socket
480 import socket
442 from subprocess import Popen
481 from subprocess import Popen
443
482
444 # Find open ports as necessary.
483 # Find open ports as necessary.
445 ports = []
484 ports = []
446 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
485 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
447 for i in xrange(ports_needed):
486 for i in xrange(ports_needed):
448 sock = socket.socket()
487 sock = socket.socket()
449 sock.bind(('', 0))
488 sock.bind(('', 0))
450 ports.append(sock)
489 ports.append(sock)
451 for i, sock in enumerate(ports):
490 for i, sock in enumerate(ports):
452 port = sock.getsockname()[1]
491 port = sock.getsockname()[1]
453 sock.close()
492 sock.close()
454 ports[i] = port
493 ports[i] = port
455 if xrep_port <= 0:
494 if xrep_port <= 0:
456 xrep_port = ports.pop(0)
495 xrep_port = ports.pop(0)
457 if pub_port <= 0:
496 if pub_port <= 0:
458 pub_port = ports.pop(0)
497 pub_port = ports.pop(0)
459 if req_port <= 0:
498 if req_port <= 0:
460 req_port = ports.pop(0)
499 req_port = ports.pop(0)
461
500
462 # Spawn a kernel.
501 # Spawn a kernel.
463 command = 'from IPython.zmq.kernel import main; main()'
502 command = 'from IPython.zmq.kernel import main; main()'
464 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
503 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
465 '--pub', str(pub_port), '--req', str(req_port) ]
504 '--pub', str(pub_port), '--req', str(req_port) ]
466
467 if independent:
505 if independent:
468 if sys.platform == 'win32':
506 if sys.platform == 'win32':
469 proc = Popen(['start', '/b'] + arguments, shell=True)
507 proc = Popen(['start', '/b'] + arguments, shell=True)
470 else:
508 else:
471 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
509 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
472
473 else:
510 else:
474 proc = Popen(arguments + ['--require-parent'])
511 if sys.platform == 'win32':
512 from _subprocess import DuplicateHandle, GetCurrentProcess, \
513 DUPLICATE_SAME_ACCESS
514 pid = GetCurrentProcess()
515 handle = DuplicateHandle(pid, pid, pid, 0,
516 True, # Inheritable by new processes.
517 DUPLICATE_SAME_ACCESS)
518 proc = Popen(arguments + ['--parent', str(int(handle))])
519 else:
520 proc = Popen(arguments + ['--parent'])
475
521
476 return proc, xrep_port, pub_port, req_port
522 return proc, xrep_port, pub_port, req_port
477
523
478
524
479 if __name__ == '__main__':
525 if __name__ == '__main__':
480 main()
526 main()
General Comments 0
You need to be logged in to leave comments. Login now