##// END OF EJS Templates
Add parameter for custom launch function to KernelManager.start_kernel.
epatters -
Show More
@@ -1,976 +1,983 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 import errno
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 import signal
23 import signal
24 import sys
24 import sys
25 from threading import Thread
25 from threading import Thread
26 import time
26 import time
27 import logging
27 import logging
28
28
29 # System library imports.
29 # System library imports.
30 import zmq
30 import zmq
31 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq import POLLIN, POLLOUT, POLLERR
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 # Local imports.
34 # Local imports.
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.utils import io
36 from IPython.utils import io
37 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
38 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
38 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
39 from session import Session, Message
39 from session import Session, Message
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Constants and exceptions
42 # Constants and exceptions
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 class InvalidPortNumber(Exception):
45 class InvalidPortNumber(Exception):
46 pass
46 pass
47
47
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49 # Utility functions
49 # Utility functions
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51
51
52 # some utilities to validate message structure, these might get moved elsewhere
52 # some utilities to validate message structure, these might get moved elsewhere
53 # if they prove to have more generic utility
53 # if they prove to have more generic utility
54
54
55 def validate_string_list(lst):
55 def validate_string_list(lst):
56 """Validate that the input is a list of strings.
56 """Validate that the input is a list of strings.
57
57
58 Raises ValueError if not."""
58 Raises ValueError if not."""
59 if not isinstance(lst, list):
59 if not isinstance(lst, list):
60 raise ValueError('input %r must be a list' % lst)
60 raise ValueError('input %r must be a list' % lst)
61 for x in lst:
61 for x in lst:
62 if not isinstance(x, basestring):
62 if not isinstance(x, basestring):
63 raise ValueError('element %r in list must be a string' % x)
63 raise ValueError('element %r in list must be a string' % x)
64
64
65
65
66 def validate_string_dict(dct):
66 def validate_string_dict(dct):
67 """Validate that the input is a dict with string keys and values.
67 """Validate that the input is a dict with string keys and values.
68
68
69 Raises ValueError if not."""
69 Raises ValueError if not."""
70 for k,v in dct.iteritems():
70 for k,v in dct.iteritems():
71 if not isinstance(k, basestring):
71 if not isinstance(k, basestring):
72 raise ValueError('key %r in dict must be a string' % k)
72 raise ValueError('key %r in dict must be a string' % k)
73 if not isinstance(v, basestring):
73 if not isinstance(v, basestring):
74 raise ValueError('value %r in dict must be a string' % v)
74 raise ValueError('value %r in dict must be a string' % v)
75
75
76
76
77 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
78 # ZMQ Socket Channel classes
78 # ZMQ Socket Channel classes
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80
80
81 class ZMQSocketChannel(Thread):
81 class ZMQSocketChannel(Thread):
82 """The base class for the channels that use ZMQ sockets.
82 """The base class for the channels that use ZMQ sockets.
83 """
83 """
84 context = None
84 context = None
85 session = None
85 session = None
86 socket = None
86 socket = None
87 ioloop = None
87 ioloop = None
88 iostate = None
88 iostate = None
89 _address = None
89 _address = None
90
90
91 def __init__(self, context, session, address):
91 def __init__(self, context, session, address):
92 """Create a channel
92 """Create a channel
93
93
94 Parameters
94 Parameters
95 ----------
95 ----------
96 context : :class:`zmq.Context`
96 context : :class:`zmq.Context`
97 The ZMQ context to use.
97 The ZMQ context to use.
98 session : :class:`session.Session`
98 session : :class:`session.Session`
99 The session to use.
99 The session to use.
100 address : tuple
100 address : tuple
101 Standard (ip, port) tuple that the kernel is listening on.
101 Standard (ip, port) tuple that the kernel is listening on.
102 """
102 """
103 super(ZMQSocketChannel, self).__init__()
103 super(ZMQSocketChannel, self).__init__()
104 self.daemon = True
104 self.daemon = True
105
105
106 self.context = context
106 self.context = context
107 self.session = session
107 self.session = session
108 if address[1] == 0:
108 if address[1] == 0:
109 message = 'The port number for a channel cannot be 0.'
109 message = 'The port number for a channel cannot be 0.'
110 raise InvalidPortNumber(message)
110 raise InvalidPortNumber(message)
111 self._address = address
111 self._address = address
112
112
113 def _run_loop(self):
113 def _run_loop(self):
114 """Run my loop, ignoring EINTR events in the poller"""
114 """Run my loop, ignoring EINTR events in the poller"""
115 while True:
115 while True:
116 try:
116 try:
117 self.ioloop.start()
117 self.ioloop.start()
118 except zmq.ZMQError as e:
118 except zmq.ZMQError as e:
119 if e.errno == errno.EINTR:
119 if e.errno == errno.EINTR:
120 continue
120 continue
121 else:
121 else:
122 raise
122 raise
123 else:
123 else:
124 break
124 break
125
125
126 def stop(self):
126 def stop(self):
127 """Stop the channel's activity.
127 """Stop the channel's activity.
128
128
129 This calls :method:`Thread.join` and returns when the thread
129 This calls :method:`Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
130 terminates. :class:`RuntimeError` will be raised if
131 :method:`self.start` is called again.
131 :method:`self.start` is called again.
132 """
132 """
133 self.join()
133 self.join()
134
134
135 @property
135 @property
136 def address(self):
136 def address(self):
137 """Get the channel's address as an (ip, port) tuple.
137 """Get the channel's address as an (ip, port) tuple.
138
138
139 By the default, the address is (localhost, 0), where 0 means a random
139 By the default, the address is (localhost, 0), where 0 means a random
140 port.
140 port.
141 """
141 """
142 return self._address
142 return self._address
143
143
144 def add_io_state(self, state):
144 def add_io_state(self, state):
145 """Add IO state to the eventloop.
145 """Add IO state to the eventloop.
146
146
147 Parameters
147 Parameters
148 ----------
148 ----------
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 The IO state flag to set.
150 The IO state flag to set.
151
151
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 """
153 """
154 def add_io_state_callback():
154 def add_io_state_callback():
155 if not self.iostate & state:
155 if not self.iostate & state:
156 self.iostate = self.iostate | state
156 self.iostate = self.iostate | state
157 self.ioloop.update_handler(self.socket, self.iostate)
157 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.add_callback(add_io_state_callback)
158 self.ioloop.add_callback(add_io_state_callback)
159
159
160 def drop_io_state(self, state):
160 def drop_io_state(self, state):
161 """Drop IO state from the eventloop.
161 """Drop IO state from the eventloop.
162
162
163 Parameters
163 Parameters
164 ----------
164 ----------
165 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
165 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
166 The IO state flag to set.
166 The IO state flag to set.
167
167
168 This is thread safe as it uses the thread safe IOLoop.add_callback.
168 This is thread safe as it uses the thread safe IOLoop.add_callback.
169 """
169 """
170 def drop_io_state_callback():
170 def drop_io_state_callback():
171 if self.iostate & state:
171 if self.iostate & state:
172 self.iostate = self.iostate & (~state)
172 self.iostate = self.iostate & (~state)
173 self.ioloop.update_handler(self.socket, self.iostate)
173 self.ioloop.update_handler(self.socket, self.iostate)
174 self.ioloop.add_callback(drop_io_state_callback)
174 self.ioloop.add_callback(drop_io_state_callback)
175
175
176
176
177 class ShellSocketChannel(ZMQSocketChannel):
177 class ShellSocketChannel(ZMQSocketChannel):
178 """The XREQ channel for issues request/replies to the kernel.
178 """The XREQ channel for issues request/replies to the kernel.
179 """
179 """
180
180
181 command_queue = None
181 command_queue = None
182
182
183 def __init__(self, context, session, address):
183 def __init__(self, context, session, address):
184 super(ShellSocketChannel, self).__init__(context, session, address)
184 super(ShellSocketChannel, self).__init__(context, session, address)
185 self.command_queue = Queue()
185 self.command_queue = Queue()
186 self.ioloop = ioloop.IOLoop()
186 self.ioloop = ioloop.IOLoop()
187
187
188 def run(self):
188 def run(self):
189 """The thread's main activity. Call start() instead."""
189 """The thread's main activity. Call start() instead."""
190 self.socket = self.context.socket(zmq.XREQ)
190 self.socket = self.context.socket(zmq.XREQ)
191 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
191 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
192 self.socket.connect('tcp://%s:%i' % self.address)
192 self.socket.connect('tcp://%s:%i' % self.address)
193 self.iostate = POLLERR|POLLIN
193 self.iostate = POLLERR|POLLIN
194 self.ioloop.add_handler(self.socket, self._handle_events,
194 self.ioloop.add_handler(self.socket, self._handle_events,
195 self.iostate)
195 self.iostate)
196 self._run_loop()
196 self._run_loop()
197
197
198 def stop(self):
198 def stop(self):
199 self.ioloop.stop()
199 self.ioloop.stop()
200 super(ShellSocketChannel, self).stop()
200 super(ShellSocketChannel, self).stop()
201
201
202 def call_handlers(self, msg):
202 def call_handlers(self, msg):
203 """This method is called in the ioloop thread when a message arrives.
203 """This method is called in the ioloop thread when a message arrives.
204
204
205 Subclasses should override this method to handle incoming messages.
205 Subclasses should override this method to handle incoming messages.
206 It is important to remember that this method is called in the thread
206 It is important to remember that this method is called in the thread
207 so that some logic must be done to ensure that the application leve
207 so that some logic must be done to ensure that the application leve
208 handlers are called in the application thread.
208 handlers are called in the application thread.
209 """
209 """
210 raise NotImplementedError('call_handlers must be defined in a subclass.')
210 raise NotImplementedError('call_handlers must be defined in a subclass.')
211
211
212 def execute(self, code, silent=False,
212 def execute(self, code, silent=False,
213 user_variables=None, user_expressions=None):
213 user_variables=None, user_expressions=None):
214 """Execute code in the kernel.
214 """Execute code in the kernel.
215
215
216 Parameters
216 Parameters
217 ----------
217 ----------
218 code : str
218 code : str
219 A string of Python code.
219 A string of Python code.
220
220
221 silent : bool, optional (default False)
221 silent : bool, optional (default False)
222 If set, the kernel will execute the code as quietly possible.
222 If set, the kernel will execute the code as quietly possible.
223
223
224 user_variables : list, optional
224 user_variables : list, optional
225 A list of variable names to pull from the user's namespace. They
225 A list of variable names to pull from the user's namespace. They
226 will come back as a dict with these names as keys and their
226 will come back as a dict with these names as keys and their
227 :func:`repr` as values.
227 :func:`repr` as values.
228
228
229 user_expressions : dict, optional
229 user_expressions : dict, optional
230 A dict with string keys and to pull from the user's
230 A dict with string keys and to pull from the user's
231 namespace. They will come back as a dict with these names as keys
231 namespace. They will come back as a dict with these names as keys
232 and their :func:`repr` as values.
232 and their :func:`repr` as values.
233
233
234 Returns
234 Returns
235 -------
235 -------
236 The msg_id of the message sent.
236 The msg_id of the message sent.
237 """
237 """
238 if user_variables is None:
238 if user_variables is None:
239 user_variables = []
239 user_variables = []
240 if user_expressions is None:
240 if user_expressions is None:
241 user_expressions = {}
241 user_expressions = {}
242
242
243 # Don't waste network traffic if inputs are invalid
243 # Don't waste network traffic if inputs are invalid
244 if not isinstance(code, basestring):
244 if not isinstance(code, basestring):
245 raise ValueError('code %r must be a string' % code)
245 raise ValueError('code %r must be a string' % code)
246 validate_string_list(user_variables)
246 validate_string_list(user_variables)
247 validate_string_dict(user_expressions)
247 validate_string_dict(user_expressions)
248
248
249 # Create class for content/msg creation. Related to, but possibly
249 # Create class for content/msg creation. Related to, but possibly
250 # not in Session.
250 # not in Session.
251 content = dict(code=code, silent=silent,
251 content = dict(code=code, silent=silent,
252 user_variables=user_variables,
252 user_variables=user_variables,
253 user_expressions=user_expressions)
253 user_expressions=user_expressions)
254 msg = self.session.msg('execute_request', content)
254 msg = self.session.msg('execute_request', content)
255 self._queue_request(msg)
255 self._queue_request(msg)
256 return msg['header']['msg_id']
256 return msg['header']['msg_id']
257
257
258 def complete(self, text, line, cursor_pos, block=None):
258 def complete(self, text, line, cursor_pos, block=None):
259 """Tab complete text in the kernel's namespace.
259 """Tab complete text in the kernel's namespace.
260
260
261 Parameters
261 Parameters
262 ----------
262 ----------
263 text : str
263 text : str
264 The text to complete.
264 The text to complete.
265 line : str
265 line : str
266 The full line of text that is the surrounding context for the
266 The full line of text that is the surrounding context for the
267 text to complete.
267 text to complete.
268 cursor_pos : int
268 cursor_pos : int
269 The position of the cursor in the line where the completion was
269 The position of the cursor in the line where the completion was
270 requested.
270 requested.
271 block : str, optional
271 block : str, optional
272 The full block of code in which the completion is being requested.
272 The full block of code in which the completion is being requested.
273
273
274 Returns
274 Returns
275 -------
275 -------
276 The msg_id of the message sent.
276 The msg_id of the message sent.
277 """
277 """
278 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
278 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
279 msg = self.session.msg('complete_request', content)
279 msg = self.session.msg('complete_request', content)
280 self._queue_request(msg)
280 self._queue_request(msg)
281 return msg['header']['msg_id']
281 return msg['header']['msg_id']
282
282
283 def object_info(self, oname):
283 def object_info(self, oname):
284 """Get metadata information about an object.
284 """Get metadata information about an object.
285
285
286 Parameters
286 Parameters
287 ----------
287 ----------
288 oname : str
288 oname : str
289 A string specifying the object name.
289 A string specifying the object name.
290
290
291 Returns
291 Returns
292 -------
292 -------
293 The msg_id of the message sent.
293 The msg_id of the message sent.
294 """
294 """
295 content = dict(oname=oname)
295 content = dict(oname=oname)
296 msg = self.session.msg('object_info_request', content)
296 msg = self.session.msg('object_info_request', content)
297 self._queue_request(msg)
297 self._queue_request(msg)
298 return msg['header']['msg_id']
298 return msg['header']['msg_id']
299
299
300 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
300 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
301 """Get entries from the history list.
301 """Get entries from the history list.
302
302
303 Parameters
303 Parameters
304 ----------
304 ----------
305 raw : bool
305 raw : bool
306 If True, return the raw input.
306 If True, return the raw input.
307 output : bool
307 output : bool
308 If True, then return the output as well.
308 If True, then return the output as well.
309 hist_access_type : str
309 hist_access_type : str
310 'range' (fill in session, start and stop params), 'tail' (fill in n)
310 'range' (fill in session, start and stop params), 'tail' (fill in n)
311 or 'search' (fill in pattern param).
311 or 'search' (fill in pattern param).
312
312
313 session : int
313 session : int
314 For a range request, the session from which to get lines. Session
314 For a range request, the session from which to get lines. Session
315 numbers are positive integers; negative ones count back from the
315 numbers are positive integers; negative ones count back from the
316 current session.
316 current session.
317 start : int
317 start : int
318 The first line number of a history range.
318 The first line number of a history range.
319 stop : int
319 stop : int
320 The final (excluded) line number of a history range.
320 The final (excluded) line number of a history range.
321
321
322 n : int
322 n : int
323 The number of lines of history to get for a tail request.
323 The number of lines of history to get for a tail request.
324
324
325 pattern : str
325 pattern : str
326 The glob-syntax pattern for a search request.
326 The glob-syntax pattern for a search request.
327
327
328 Returns
328 Returns
329 -------
329 -------
330 The msg_id of the message sent.
330 The msg_id of the message sent.
331 """
331 """
332 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
332 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
333 **kwargs)
333 **kwargs)
334 msg = self.session.msg('history_request', content)
334 msg = self.session.msg('history_request', content)
335 self._queue_request(msg)
335 self._queue_request(msg)
336 return msg['header']['msg_id']
336 return msg['header']['msg_id']
337
337
338 def shutdown(self, restart=False):
338 def shutdown(self, restart=False):
339 """Request an immediate kernel shutdown.
339 """Request an immediate kernel shutdown.
340
340
341 Upon receipt of the (empty) reply, client code can safely assume that
341 Upon receipt of the (empty) reply, client code can safely assume that
342 the kernel has shut down and it's safe to forcefully terminate it if
342 the kernel has shut down and it's safe to forcefully terminate it if
343 it's still alive.
343 it's still alive.
344
344
345 The kernel will send the reply via a function registered with Python's
345 The kernel will send the reply via a function registered with Python's
346 atexit module, ensuring it's truly done as the kernel is done with all
346 atexit module, ensuring it's truly done as the kernel is done with all
347 normal operation.
347 normal operation.
348 """
348 """
349 # Send quit message to kernel. Once we implement kernel-side setattr,
349 # Send quit message to kernel. Once we implement kernel-side setattr,
350 # this should probably be done that way, but for now this will do.
350 # this should probably be done that way, but for now this will do.
351 msg = self.session.msg('shutdown_request', {'restart':restart})
351 msg = self.session.msg('shutdown_request', {'restart':restart})
352 self._queue_request(msg)
352 self._queue_request(msg)
353 return msg['header']['msg_id']
353 return msg['header']['msg_id']
354
354
355 def _handle_events(self, socket, events):
355 def _handle_events(self, socket, events):
356 if events & POLLERR:
356 if events & POLLERR:
357 self._handle_err()
357 self._handle_err()
358 if events & POLLOUT:
358 if events & POLLOUT:
359 self._handle_send()
359 self._handle_send()
360 if events & POLLIN:
360 if events & POLLIN:
361 self._handle_recv()
361 self._handle_recv()
362
362
363 def _handle_recv(self):
363 def _handle_recv(self):
364 ident,msg = self.session.recv(self.socket, 0)
364 ident,msg = self.session.recv(self.socket, 0)
365 self.call_handlers(msg)
365 self.call_handlers(msg)
366
366
367 def _handle_send(self):
367 def _handle_send(self):
368 try:
368 try:
369 msg = self.command_queue.get(False)
369 msg = self.command_queue.get(False)
370 except Empty:
370 except Empty:
371 pass
371 pass
372 else:
372 else:
373 self.session.send(self.socket,msg)
373 self.session.send(self.socket,msg)
374 if self.command_queue.empty():
374 if self.command_queue.empty():
375 self.drop_io_state(POLLOUT)
375 self.drop_io_state(POLLOUT)
376
376
377 def _handle_err(self):
377 def _handle_err(self):
378 # We don't want to let this go silently, so eventually we should log.
378 # We don't want to let this go silently, so eventually we should log.
379 raise zmq.ZMQError()
379 raise zmq.ZMQError()
380
380
381 def _queue_request(self, msg):
381 def _queue_request(self, msg):
382 self.command_queue.put(msg)
382 self.command_queue.put(msg)
383 self.add_io_state(POLLOUT)
383 self.add_io_state(POLLOUT)
384
384
385
385
386 class SubSocketChannel(ZMQSocketChannel):
386 class SubSocketChannel(ZMQSocketChannel):
387 """The SUB channel which listens for messages that the kernel publishes.
387 """The SUB channel which listens for messages that the kernel publishes.
388 """
388 """
389
389
390 def __init__(self, context, session, address):
390 def __init__(self, context, session, address):
391 super(SubSocketChannel, self).__init__(context, session, address)
391 super(SubSocketChannel, self).__init__(context, session, address)
392 self.ioloop = ioloop.IOLoop()
392 self.ioloop = ioloop.IOLoop()
393
393
394 def run(self):
394 def run(self):
395 """The thread's main activity. Call start() instead."""
395 """The thread's main activity. Call start() instead."""
396 self.socket = self.context.socket(zmq.SUB)
396 self.socket = self.context.socket(zmq.SUB)
397 self.socket.setsockopt(zmq.SUBSCRIBE,'')
397 self.socket.setsockopt(zmq.SUBSCRIBE,'')
398 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
398 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
399 self.socket.connect('tcp://%s:%i' % self.address)
399 self.socket.connect('tcp://%s:%i' % self.address)
400 self.iostate = POLLIN|POLLERR
400 self.iostate = POLLIN|POLLERR
401 self.ioloop.add_handler(self.socket, self._handle_events,
401 self.ioloop.add_handler(self.socket, self._handle_events,
402 self.iostate)
402 self.iostate)
403 self._run_loop()
403 self._run_loop()
404
404
405 def stop(self):
405 def stop(self):
406 self.ioloop.stop()
406 self.ioloop.stop()
407 super(SubSocketChannel, self).stop()
407 super(SubSocketChannel, self).stop()
408
408
409 def call_handlers(self, msg):
409 def call_handlers(self, msg):
410 """This method is called in the ioloop thread when a message arrives.
410 """This method is called in the ioloop thread when a message arrives.
411
411
412 Subclasses should override this method to handle incoming messages.
412 Subclasses should override this method to handle incoming messages.
413 It is important to remember that this method is called in the thread
413 It is important to remember that this method is called in the thread
414 so that some logic must be done to ensure that the application leve
414 so that some logic must be done to ensure that the application leve
415 handlers are called in the application thread.
415 handlers are called in the application thread.
416 """
416 """
417 raise NotImplementedError('call_handlers must be defined in a subclass.')
417 raise NotImplementedError('call_handlers must be defined in a subclass.')
418
418
419 def flush(self, timeout=1.0):
419 def flush(self, timeout=1.0):
420 """Immediately processes all pending messages on the SUB channel.
420 """Immediately processes all pending messages on the SUB channel.
421
421
422 Callers should use this method to ensure that :method:`call_handlers`
422 Callers should use this method to ensure that :method:`call_handlers`
423 has been called for all messages that have been received on the
423 has been called for all messages that have been received on the
424 0MQ SUB socket of this channel.
424 0MQ SUB socket of this channel.
425
425
426 This method is thread safe.
426 This method is thread safe.
427
427
428 Parameters
428 Parameters
429 ----------
429 ----------
430 timeout : float, optional
430 timeout : float, optional
431 The maximum amount of time to spend flushing, in seconds. The
431 The maximum amount of time to spend flushing, in seconds. The
432 default is one second.
432 default is one second.
433 """
433 """
434 # We do the IOLoop callback process twice to ensure that the IOLoop
434 # We do the IOLoop callback process twice to ensure that the IOLoop
435 # gets to perform at least one full poll.
435 # gets to perform at least one full poll.
436 stop_time = time.time() + timeout
436 stop_time = time.time() + timeout
437 for i in xrange(2):
437 for i in xrange(2):
438 self._flushed = False
438 self._flushed = False
439 self.ioloop.add_callback(self._flush)
439 self.ioloop.add_callback(self._flush)
440 while not self._flushed and time.time() < stop_time:
440 while not self._flushed and time.time() < stop_time:
441 time.sleep(0.01)
441 time.sleep(0.01)
442
442
443 def _handle_events(self, socket, events):
443 def _handle_events(self, socket, events):
444 # Turn on and off POLLOUT depending on if we have made a request
444 # Turn on and off POLLOUT depending on if we have made a request
445 if events & POLLERR:
445 if events & POLLERR:
446 self._handle_err()
446 self._handle_err()
447 if events & POLLIN:
447 if events & POLLIN:
448 self._handle_recv()
448 self._handle_recv()
449
449
450 def _handle_err(self):
450 def _handle_err(self):
451 # We don't want to let this go silently, so eventually we should log.
451 # We don't want to let this go silently, so eventually we should log.
452 raise zmq.ZMQError()
452 raise zmq.ZMQError()
453
453
454 def _handle_recv(self):
454 def _handle_recv(self):
455 # Get all of the messages we can
455 # Get all of the messages we can
456 while True:
456 while True:
457 try:
457 try:
458 ident,msg = self.session.recv(self.socket)
458 ident,msg = self.session.recv(self.socket)
459 except zmq.ZMQError:
459 except zmq.ZMQError:
460 # Check the errno?
460 # Check the errno?
461 # Will this trigger POLLERR?
461 # Will this trigger POLLERR?
462 break
462 break
463 else:
463 else:
464 if msg is None:
464 if msg is None:
465 break
465 break
466 self.call_handlers(msg)
466 self.call_handlers(msg)
467
467
468 def _flush(self):
468 def _flush(self):
469 """Callback for :method:`self.flush`."""
469 """Callback for :method:`self.flush`."""
470 self._flushed = True
470 self._flushed = True
471
471
472
472
473 class StdInSocketChannel(ZMQSocketChannel):
473 class StdInSocketChannel(ZMQSocketChannel):
474 """A reply channel to handle raw_input requests that the kernel makes."""
474 """A reply channel to handle raw_input requests that the kernel makes."""
475
475
476 msg_queue = None
476 msg_queue = None
477
477
478 def __init__(self, context, session, address):
478 def __init__(self, context, session, address):
479 super(StdInSocketChannel, self).__init__(context, session, address)
479 super(StdInSocketChannel, self).__init__(context, session, address)
480 self.ioloop = ioloop.IOLoop()
480 self.ioloop = ioloop.IOLoop()
481 self.msg_queue = Queue()
481 self.msg_queue = Queue()
482
482
483 def run(self):
483 def run(self):
484 """The thread's main activity. Call start() instead."""
484 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.XREQ)
485 self.socket = self.context.socket(zmq.XREQ)
486 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
486 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
487 self.socket.connect('tcp://%s:%i' % self.address)
487 self.socket.connect('tcp://%s:%i' % self.address)
488 self.iostate = POLLERR|POLLIN
488 self.iostate = POLLERR|POLLIN
489 self.ioloop.add_handler(self.socket, self._handle_events,
489 self.ioloop.add_handler(self.socket, self._handle_events,
490 self.iostate)
490 self.iostate)
491 self._run_loop()
491 self._run_loop()
492
492
493 def stop(self):
493 def stop(self):
494 self.ioloop.stop()
494 self.ioloop.stop()
495 super(StdInSocketChannel, self).stop()
495 super(StdInSocketChannel, self).stop()
496
496
497 def call_handlers(self, msg):
497 def call_handlers(self, msg):
498 """This method is called in the ioloop thread when a message arrives.
498 """This method is called in the ioloop thread when a message arrives.
499
499
500 Subclasses should override this method to handle incoming messages.
500 Subclasses should override this method to handle incoming messages.
501 It is important to remember that this method is called in the thread
501 It is important to remember that this method is called in the thread
502 so that some logic must be done to ensure that the application leve
502 so that some logic must be done to ensure that the application leve
503 handlers are called in the application thread.
503 handlers are called in the application thread.
504 """
504 """
505 raise NotImplementedError('call_handlers must be defined in a subclass.')
505 raise NotImplementedError('call_handlers must be defined in a subclass.')
506
506
507 def input(self, string):
507 def input(self, string):
508 """Send a string of raw input to the kernel."""
508 """Send a string of raw input to the kernel."""
509 content = dict(value=string)
509 content = dict(value=string)
510 msg = self.session.msg('input_reply', content)
510 msg = self.session.msg('input_reply', content)
511 self._queue_reply(msg)
511 self._queue_reply(msg)
512
512
513 def _handle_events(self, socket, events):
513 def _handle_events(self, socket, events):
514 if events & POLLERR:
514 if events & POLLERR:
515 self._handle_err()
515 self._handle_err()
516 if events & POLLOUT:
516 if events & POLLOUT:
517 self._handle_send()
517 self._handle_send()
518 if events & POLLIN:
518 if events & POLLIN:
519 self._handle_recv()
519 self._handle_recv()
520
520
521 def _handle_recv(self):
521 def _handle_recv(self):
522 ident,msg = self.session.recv(self.socket, 0)
522 ident,msg = self.session.recv(self.socket, 0)
523 self.call_handlers(msg)
523 self.call_handlers(msg)
524
524
525 def _handle_send(self):
525 def _handle_send(self):
526 try:
526 try:
527 msg = self.msg_queue.get(False)
527 msg = self.msg_queue.get(False)
528 except Empty:
528 except Empty:
529 pass
529 pass
530 else:
530 else:
531 self.session.send(self.socket,msg)
531 self.session.send(self.socket,msg)
532 if self.msg_queue.empty():
532 if self.msg_queue.empty():
533 self.drop_io_state(POLLOUT)
533 self.drop_io_state(POLLOUT)
534
534
535 def _handle_err(self):
535 def _handle_err(self):
536 # We don't want to let this go silently, so eventually we should log.
536 # We don't want to let this go silently, so eventually we should log.
537 raise zmq.ZMQError()
537 raise zmq.ZMQError()
538
538
539 def _queue_reply(self, msg):
539 def _queue_reply(self, msg):
540 self.msg_queue.put(msg)
540 self.msg_queue.put(msg)
541 self.add_io_state(POLLOUT)
541 self.add_io_state(POLLOUT)
542
542
543
543
544 class HBSocketChannel(ZMQSocketChannel):
544 class HBSocketChannel(ZMQSocketChannel):
545 """The heartbeat channel which monitors the kernel heartbeat.
545 """The heartbeat channel which monitors the kernel heartbeat.
546
546
547 Note that the heartbeat channel is paused by default. As long as you start
547 Note that the heartbeat channel is paused by default. As long as you start
548 this channel, the kernel manager will ensure that it is paused and un-paused
548 this channel, the kernel manager will ensure that it is paused and un-paused
549 as appropriate.
549 as appropriate.
550 """
550 """
551
551
552 time_to_dead = 3.0
552 time_to_dead = 3.0
553 socket = None
553 socket = None
554 poller = None
554 poller = None
555 _running = None
555 _running = None
556 _pause = None
556 _pause = None
557
557
558 def __init__(self, context, session, address):
558 def __init__(self, context, session, address):
559 super(HBSocketChannel, self).__init__(context, session, address)
559 super(HBSocketChannel, self).__init__(context, session, address)
560 self._running = False
560 self._running = False
561 self._pause = True
561 self._pause = True
562
562
563 def _create_socket(self):
563 def _create_socket(self):
564 self.socket = self.context.socket(zmq.REQ)
564 self.socket = self.context.socket(zmq.REQ)
565 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
565 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
566 self.socket.connect('tcp://%s:%i' % self.address)
566 self.socket.connect('tcp://%s:%i' % self.address)
567 self.poller = zmq.Poller()
567 self.poller = zmq.Poller()
568 self.poller.register(self.socket, zmq.POLLIN)
568 self.poller.register(self.socket, zmq.POLLIN)
569
569
570 def run(self):
570 def run(self):
571 """The thread's main activity. Call start() instead."""
571 """The thread's main activity. Call start() instead."""
572 self._create_socket()
572 self._create_socket()
573 self._running = True
573 self._running = True
574 while self._running:
574 while self._running:
575 if self._pause:
575 if self._pause:
576 time.sleep(self.time_to_dead)
576 time.sleep(self.time_to_dead)
577 else:
577 else:
578 since_last_heartbeat = 0.0
578 since_last_heartbeat = 0.0
579 request_time = time.time()
579 request_time = time.time()
580 try:
580 try:
581 #io.rprint('Ping from HB channel') # dbg
581 #io.rprint('Ping from HB channel') # dbg
582 self.socket.send(b'ping')
582 self.socket.send(b'ping')
583 except zmq.ZMQError, e:
583 except zmq.ZMQError, e:
584 #io.rprint('*** HB Error:', e) # dbg
584 #io.rprint('*** HB Error:', e) # dbg
585 if e.errno == zmq.EFSM:
585 if e.errno == zmq.EFSM:
586 #io.rprint('sleep...', self.time_to_dead) # dbg
586 #io.rprint('sleep...', self.time_to_dead) # dbg
587 time.sleep(self.time_to_dead)
587 time.sleep(self.time_to_dead)
588 self._create_socket()
588 self._create_socket()
589 else:
589 else:
590 raise
590 raise
591 else:
591 else:
592 while True:
592 while True:
593 try:
593 try:
594 self.socket.recv(zmq.NOBLOCK)
594 self.socket.recv(zmq.NOBLOCK)
595 except zmq.ZMQError, e:
595 except zmq.ZMQError, e:
596 #io.rprint('*** HB Error 2:', e) # dbg
596 #io.rprint('*** HB Error 2:', e) # dbg
597 if e.errno == zmq.EAGAIN:
597 if e.errno == zmq.EAGAIN:
598 before_poll = time.time()
598 before_poll = time.time()
599 until_dead = self.time_to_dead - (before_poll -
599 until_dead = self.time_to_dead - (before_poll -
600 request_time)
600 request_time)
601
601
602 # When the return value of poll() is an empty
602 # When the return value of poll() is an empty
603 # list, that is when things have gone wrong
603 # list, that is when things have gone wrong
604 # (zeromq bug). As long as it is not an empty
604 # (zeromq bug). As long as it is not an empty
605 # list, poll is working correctly even if it
605 # list, poll is working correctly even if it
606 # returns quickly. Note: poll timeout is in
606 # returns quickly. Note: poll timeout is in
607 # milliseconds.
607 # milliseconds.
608 if until_dead > 0.0:
608 if until_dead > 0.0:
609 while True:
609 while True:
610 try:
610 try:
611 self.poller.poll(1000 * until_dead)
611 self.poller.poll(1000 * until_dead)
612 except zmq.ZMQError as e:
612 except zmq.ZMQError as e:
613 if e.errno == errno.EINTR:
613 if e.errno == errno.EINTR:
614 continue
614 continue
615 else:
615 else:
616 raise
616 raise
617 else:
617 else:
618 break
618 break
619
619
620 since_last_heartbeat = time.time()-request_time
620 since_last_heartbeat = time.time()-request_time
621 if since_last_heartbeat > self.time_to_dead:
621 if since_last_heartbeat > self.time_to_dead:
622 self.call_handlers(since_last_heartbeat)
622 self.call_handlers(since_last_heartbeat)
623 break
623 break
624 else:
624 else:
625 # FIXME: We should probably log this instead.
625 # FIXME: We should probably log this instead.
626 raise
626 raise
627 else:
627 else:
628 until_dead = self.time_to_dead - (time.time() -
628 until_dead = self.time_to_dead - (time.time() -
629 request_time)
629 request_time)
630 if until_dead > 0.0:
630 if until_dead > 0.0:
631 #io.rprint('sleep...', self.time_to_dead) # dbg
631 #io.rprint('sleep...', self.time_to_dead) # dbg
632 time.sleep(until_dead)
632 time.sleep(until_dead)
633 break
633 break
634
634
635 def pause(self):
635 def pause(self):
636 """Pause the heartbeat."""
636 """Pause the heartbeat."""
637 self._pause = True
637 self._pause = True
638
638
639 def unpause(self):
639 def unpause(self):
640 """Unpause the heartbeat."""
640 """Unpause the heartbeat."""
641 self._pause = False
641 self._pause = False
642
642
643 def is_beating(self):
643 def is_beating(self):
644 """Is the heartbeat running and not paused."""
644 """Is the heartbeat running and not paused."""
645 if self.is_alive() and not self._pause:
645 if self.is_alive() and not self._pause:
646 return True
646 return True
647 else:
647 else:
648 return False
648 return False
649
649
650 def stop(self):
650 def stop(self):
651 self._running = False
651 self._running = False
652 super(HBSocketChannel, self).stop()
652 super(HBSocketChannel, self).stop()
653
653
654 def call_handlers(self, since_last_heartbeat):
654 def call_handlers(self, since_last_heartbeat):
655 """This method is called in the ioloop thread when a message arrives.
655 """This method is called in the ioloop thread when a message arrives.
656
656
657 Subclasses should override this method to handle incoming messages.
657 Subclasses should override this method to handle incoming messages.
658 It is important to remember that this method is called in the thread
658 It is important to remember that this method is called in the thread
659 so that some logic must be done to ensure that the application leve
659 so that some logic must be done to ensure that the application leve
660 handlers are called in the application thread.
660 handlers are called in the application thread.
661 """
661 """
662 raise NotImplementedError('call_handlers must be defined in a subclass.')
662 raise NotImplementedError('call_handlers must be defined in a subclass.')
663
663
664
664
665 #-----------------------------------------------------------------------------
665 #-----------------------------------------------------------------------------
666 # Main kernel manager class
666 # Main kernel manager class
667 #-----------------------------------------------------------------------------
667 #-----------------------------------------------------------------------------
668
668
669 class KernelManager(HasTraits):
669 class KernelManager(HasTraits):
670 """ Manages a kernel for a frontend.
670 """ Manages a kernel for a frontend.
671
671
672 The SUB channel is for the frontend to receive messages published by the
672 The SUB channel is for the frontend to receive messages published by the
673 kernel.
673 kernel.
674
674
675 The REQ channel is for the frontend to make requests of the kernel.
675 The REQ channel is for the frontend to make requests of the kernel.
676
676
677 The REP channel is for the kernel to request stdin (raw_input) from the
677 The REP channel is for the kernel to request stdin (raw_input) from the
678 frontend.
678 frontend.
679 """
679 """
680 # config object for passing to child configurables
680 # config object for passing to child configurables
681 config = Instance(Config)
681 config = Instance(Config)
682
682
683 # The PyZMQ Context to use for communication with the kernel.
683 # The PyZMQ Context to use for communication with the kernel.
684 context = Instance(zmq.Context)
684 context = Instance(zmq.Context)
685 def _context_default(self):
685 def _context_default(self):
686 return zmq.Context.instance()
686 return zmq.Context.instance()
687
687
688 # The Session to use for communication with the kernel.
688 # The Session to use for communication with the kernel.
689 session = Instance(Session)
689 session = Instance(Session)
690
690
691 # The kernel process with which the KernelManager is communicating.
691 # The kernel process with which the KernelManager is communicating.
692 kernel = Instance(Popen)
692 kernel = Instance(Popen)
693
693
694 # The addresses for the communication channels.
694 # The addresses for the communication channels.
695 shell_address = TCPAddress((LOCALHOST, 0))
695 shell_address = TCPAddress((LOCALHOST, 0))
696 sub_address = TCPAddress((LOCALHOST, 0))
696 sub_address = TCPAddress((LOCALHOST, 0))
697 stdin_address = TCPAddress((LOCALHOST, 0))
697 stdin_address = TCPAddress((LOCALHOST, 0))
698 hb_address = TCPAddress((LOCALHOST, 0))
698 hb_address = TCPAddress((LOCALHOST, 0))
699
699
700 # The classes to use for the various channels.
700 # The classes to use for the various channels.
701 shell_channel_class = Type(ShellSocketChannel)
701 shell_channel_class = Type(ShellSocketChannel)
702 sub_channel_class = Type(SubSocketChannel)
702 sub_channel_class = Type(SubSocketChannel)
703 stdin_channel_class = Type(StdInSocketChannel)
703 stdin_channel_class = Type(StdInSocketChannel)
704 hb_channel_class = Type(HBSocketChannel)
704 hb_channel_class = Type(HBSocketChannel)
705
705
706 # Protected traits.
706 # Protected traits.
707 _launch_args = Any
707 _launch_args = Any
708 _shell_channel = Any
708 _shell_channel = Any
709 _sub_channel = Any
709 _sub_channel = Any
710 _stdin_channel = Any
710 _stdin_channel = Any
711 _hb_channel = Any
711 _hb_channel = Any
712
712
713 def __init__(self, **kwargs):
713 def __init__(self, **kwargs):
714 super(KernelManager, self).__init__(**kwargs)
714 super(KernelManager, self).__init__(**kwargs)
715 if self.session is None:
715 if self.session is None:
716 self.session = Session(config=self.config)
716 self.session = Session(config=self.config)
717 # Uncomment this to try closing the context.
717 # Uncomment this to try closing the context.
718 # atexit.register(self.context.term)
718 # atexit.register(self.context.term)
719
719
720 #--------------------------------------------------------------------------
720 #--------------------------------------------------------------------------
721 # Channel management methods:
721 # Channel management methods:
722 #--------------------------------------------------------------------------
722 #--------------------------------------------------------------------------
723
723
724 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
724 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
725 """Starts the channels for this kernel.
725 """Starts the channels for this kernel.
726
726
727 This will create the channels if they do not exist and then start
727 This will create the channels if they do not exist and then start
728 them. If port numbers of 0 are being used (random ports) then you
728 them. If port numbers of 0 are being used (random ports) then you
729 must first call :method:`start_kernel`. If the channels have been
729 must first call :method:`start_kernel`. If the channels have been
730 stopped and you call this, :class:`RuntimeError` will be raised.
730 stopped and you call this, :class:`RuntimeError` will be raised.
731 """
731 """
732 if shell:
732 if shell:
733 self.shell_channel.start()
733 self.shell_channel.start()
734 if sub:
734 if sub:
735 self.sub_channel.start()
735 self.sub_channel.start()
736 if stdin:
736 if stdin:
737 self.stdin_channel.start()
737 self.stdin_channel.start()
738 if hb:
738 if hb:
739 self.hb_channel.start()
739 self.hb_channel.start()
740
740
741 def stop_channels(self):
741 def stop_channels(self):
742 """Stops all the running channels for this kernel.
742 """Stops all the running channels for this kernel.
743 """
743 """
744 if self.shell_channel.is_alive():
744 if self.shell_channel.is_alive():
745 self.shell_channel.stop()
745 self.shell_channel.stop()
746 if self.sub_channel.is_alive():
746 if self.sub_channel.is_alive():
747 self.sub_channel.stop()
747 self.sub_channel.stop()
748 if self.stdin_channel.is_alive():
748 if self.stdin_channel.is_alive():
749 self.stdin_channel.stop()
749 self.stdin_channel.stop()
750 if self.hb_channel.is_alive():
750 if self.hb_channel.is_alive():
751 self.hb_channel.stop()
751 self.hb_channel.stop()
752
752
753 @property
753 @property
754 def channels_running(self):
754 def channels_running(self):
755 """Are any of the channels created and running?"""
755 """Are any of the channels created and running?"""
756 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
756 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
757 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
757 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
758
758
759 #--------------------------------------------------------------------------
759 #--------------------------------------------------------------------------
760 # Kernel process management methods:
760 # Kernel process management methods:
761 #--------------------------------------------------------------------------
761 #--------------------------------------------------------------------------
762
762
763 def start_kernel(self, **kw):
763 def start_kernel(self, **kw):
764 """Starts a kernel process and configures the manager to use it.
764 """Starts a kernel process and configures the manager to use it.
765
765
766 If random ports (port=0) are being used, this method must be called
766 If random ports (port=0) are being used, this method must be called
767 before the channels are created.
767 before the channels are created.
768
768
769 Parameters:
769 Parameters:
770 -----------
770 -----------
771 ipython : bool, optional (default True)
771 ipython : bool, optional (default True)
772 Whether to use an IPython kernel instead of a plain Python kernel.
772 Whether to use an IPython kernel instead of a plain Python kernel.
773
773
774 launcher : callable, optional (default None)
775 A custom function for launching the kernel process (generally a
776 wrapper around ``entry_point.base_launch_kernel``). In most cases,
777 it should not be necessary to use this parameter.
778
774 **kw : optional
779 **kw : optional
775 See respective options for IPython and Python kernels.
780 See respective options for IPython and Python kernels.
776 """
781 """
777 shell, sub, stdin, hb = self.shell_address, self.sub_address, \
782 shell, sub, stdin, hb = self.shell_address, self.sub_address, \
778 self.stdin_address, self.hb_address
783 self.stdin_address, self.hb_address
779 if shell[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
784 if shell[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
780 stdin[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
785 stdin[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
781 raise RuntimeError("Can only launch a kernel on a local interface. "
786 raise RuntimeError("Can only launch a kernel on a local interface. "
782 "Make sure that the '*_address' attributes are "
787 "Make sure that the '*_address' attributes are "
783 "configured properly. "
788 "configured properly. "
784 "Currently valid addresses are: %s"%LOCAL_IPS
789 "Currently valid addresses are: %s"%LOCAL_IPS
785 )
790 )
786
791
787 self._launch_args = kw.copy()
792 self._launch_args = kw.copy()
788 if kw.pop('ipython', True):
793 launch_kernel = kw.pop('launcher', None)
789 from ipkernel import launch_kernel
794 if launch_kernel is None:
790 else:
795 if kw.pop('ipython', True):
791 from pykernel import launch_kernel
796 from ipkernel import launch_kernel
797 else:
798 from pykernel import launch_kernel
792 self.kernel, xrep, pub, req, _hb = launch_kernel(
799 self.kernel, xrep, pub, req, _hb = launch_kernel(
793 shell_port=shell[1], iopub_port=sub[1],
800 shell_port=shell[1], iopub_port=sub[1],
794 stdin_port=stdin[1], hb_port=hb[1], **kw)
801 stdin_port=stdin[1], hb_port=hb[1], **kw)
795 self.shell_address = (shell[0], xrep)
802 self.shell_address = (shell[0], xrep)
796 self.sub_address = (sub[0], pub)
803 self.sub_address = (sub[0], pub)
797 self.stdin_address = (stdin[0], req)
804 self.stdin_address = (stdin[0], req)
798 self.hb_address = (hb[0], _hb)
805 self.hb_address = (hb[0], _hb)
799
806
800 def shutdown_kernel(self, restart=False):
807 def shutdown_kernel(self, restart=False):
801 """ Attempts to the stop the kernel process cleanly. If the kernel
808 """ Attempts to the stop the kernel process cleanly. If the kernel
802 cannot be stopped, it is killed, if possible.
809 cannot be stopped, it is killed, if possible.
803 """
810 """
804 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
811 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
805 if sys.platform == 'win32':
812 if sys.platform == 'win32':
806 self.kill_kernel()
813 self.kill_kernel()
807 return
814 return
808
815
809 # Pause the heart beat channel if it exists.
816 # Pause the heart beat channel if it exists.
810 if self._hb_channel is not None:
817 if self._hb_channel is not None:
811 self._hb_channel.pause()
818 self._hb_channel.pause()
812
819
813 # Don't send any additional kernel kill messages immediately, to give
820 # Don't send any additional kernel kill messages immediately, to give
814 # the kernel a chance to properly execute shutdown actions. Wait for at
821 # the kernel a chance to properly execute shutdown actions. Wait for at
815 # most 1s, checking every 0.1s.
822 # most 1s, checking every 0.1s.
816 self.shell_channel.shutdown(restart=restart)
823 self.shell_channel.shutdown(restart=restart)
817 for i in range(10):
824 for i in range(10):
818 if self.is_alive:
825 if self.is_alive:
819 time.sleep(0.1)
826 time.sleep(0.1)
820 else:
827 else:
821 break
828 break
822 else:
829 else:
823 # OK, we've waited long enough.
830 # OK, we've waited long enough.
824 if self.has_kernel:
831 if self.has_kernel:
825 self.kill_kernel()
832 self.kill_kernel()
826
833
827 def restart_kernel(self, now=False, **kw):
834 def restart_kernel(self, now=False, **kw):
828 """Restarts a kernel with the arguments that were used to launch it.
835 """Restarts a kernel with the arguments that were used to launch it.
829
836
830 If the old kernel was launched with random ports, the same ports will be
837 If the old kernel was launched with random ports, the same ports will be
831 used for the new kernel.
838 used for the new kernel.
832
839
833 Parameters
840 Parameters
834 ----------
841 ----------
835 now : bool, optional
842 now : bool, optional
836 If True, the kernel is forcefully restarted *immediately*, without
843 If True, the kernel is forcefully restarted *immediately*, without
837 having a chance to do any cleanup action. Otherwise the kernel is
844 having a chance to do any cleanup action. Otherwise the kernel is
838 given 1s to clean up before a forceful restart is issued.
845 given 1s to clean up before a forceful restart is issued.
839
846
840 In all cases the kernel is restarted, the only difference is whether
847 In all cases the kernel is restarted, the only difference is whether
841 it is given a chance to perform a clean shutdown or not.
848 it is given a chance to perform a clean shutdown or not.
842
849
843 **kw : optional
850 **kw : optional
844 Any options specified here will replace those used to launch the
851 Any options specified here will replace those used to launch the
845 kernel.
852 kernel.
846 """
853 """
847 if self._launch_args is None:
854 if self._launch_args is None:
848 raise RuntimeError("Cannot restart the kernel. "
855 raise RuntimeError("Cannot restart the kernel. "
849 "No previous call to 'start_kernel'.")
856 "No previous call to 'start_kernel'.")
850 else:
857 else:
851 # Stop currently running kernel.
858 # Stop currently running kernel.
852 if self.has_kernel:
859 if self.has_kernel:
853 if now:
860 if now:
854 self.kill_kernel()
861 self.kill_kernel()
855 else:
862 else:
856 self.shutdown_kernel(restart=True)
863 self.shutdown_kernel(restart=True)
857
864
858 # Start new kernel.
865 # Start new kernel.
859 self._launch_args.update(kw)
866 self._launch_args.update(kw)
860 self.start_kernel(**self._launch_args)
867 self.start_kernel(**self._launch_args)
861
868
862 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
869 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
863 # unless there is some delay here.
870 # unless there is some delay here.
864 if sys.platform == 'win32':
871 if sys.platform == 'win32':
865 time.sleep(0.2)
872 time.sleep(0.2)
866
873
867 @property
874 @property
868 def has_kernel(self):
875 def has_kernel(self):
869 """Returns whether a kernel process has been specified for the kernel
876 """Returns whether a kernel process has been specified for the kernel
870 manager.
877 manager.
871 """
878 """
872 return self.kernel is not None
879 return self.kernel is not None
873
880
874 def kill_kernel(self):
881 def kill_kernel(self):
875 """ Kill the running kernel. """
882 """ Kill the running kernel. """
876 if self.has_kernel:
883 if self.has_kernel:
877 # Pause the heart beat channel if it exists.
884 # Pause the heart beat channel if it exists.
878 if self._hb_channel is not None:
885 if self._hb_channel is not None:
879 self._hb_channel.pause()
886 self._hb_channel.pause()
880
887
881 # Attempt to kill the kernel.
888 # Attempt to kill the kernel.
882 try:
889 try:
883 self.kernel.kill()
890 self.kernel.kill()
884 except OSError, e:
891 except OSError, e:
885 # In Windows, we will get an Access Denied error if the process
892 # In Windows, we will get an Access Denied error if the process
886 # has already terminated. Ignore it.
893 # has already terminated. Ignore it.
887 if sys.platform == 'win32':
894 if sys.platform == 'win32':
888 if e.winerror != 5:
895 if e.winerror != 5:
889 raise
896 raise
890 # On Unix, we may get an ESRCH error if the process has already
897 # On Unix, we may get an ESRCH error if the process has already
891 # terminated. Ignore it.
898 # terminated. Ignore it.
892 else:
899 else:
893 from errno import ESRCH
900 from errno import ESRCH
894 if e.errno != ESRCH:
901 if e.errno != ESRCH:
895 raise
902 raise
896 self.kernel = None
903 self.kernel = None
897 else:
904 else:
898 raise RuntimeError("Cannot kill kernel. No kernel is running!")
905 raise RuntimeError("Cannot kill kernel. No kernel is running!")
899
906
900 def interrupt_kernel(self):
907 def interrupt_kernel(self):
901 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
908 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
902 well supported on all platforms.
909 well supported on all platforms.
903 """
910 """
904 if self.has_kernel:
911 if self.has_kernel:
905 if sys.platform == 'win32':
912 if sys.platform == 'win32':
906 from parentpoller import ParentPollerWindows as Poller
913 from parentpoller import ParentPollerWindows as Poller
907 Poller.send_interrupt(self.kernel.win32_interrupt_event)
914 Poller.send_interrupt(self.kernel.win32_interrupt_event)
908 else:
915 else:
909 self.kernel.send_signal(signal.SIGINT)
916 self.kernel.send_signal(signal.SIGINT)
910 else:
917 else:
911 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
918 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
912
919
913 def signal_kernel(self, signum):
920 def signal_kernel(self, signum):
914 """ Sends a signal to the kernel. Note that since only SIGTERM is
921 """ Sends a signal to the kernel. Note that since only SIGTERM is
915 supported on Windows, this function is only useful on Unix systems.
922 supported on Windows, this function is only useful on Unix systems.
916 """
923 """
917 if self.has_kernel:
924 if self.has_kernel:
918 self.kernel.send_signal(signum)
925 self.kernel.send_signal(signum)
919 else:
926 else:
920 raise RuntimeError("Cannot signal kernel. No kernel is running!")
927 raise RuntimeError("Cannot signal kernel. No kernel is running!")
921
928
922 @property
929 @property
923 def is_alive(self):
930 def is_alive(self):
924 """Is the kernel process still running?"""
931 """Is the kernel process still running?"""
925 # FIXME: not using a heartbeat means this method is broken for any
932 # FIXME: not using a heartbeat means this method is broken for any
926 # remote kernel, it's only capable of handling local kernels.
933 # remote kernel, it's only capable of handling local kernels.
927 if self.has_kernel:
934 if self.has_kernel:
928 if self.kernel.poll() is None:
935 if self.kernel.poll() is None:
929 return True
936 return True
930 else:
937 else:
931 return False
938 return False
932 else:
939 else:
933 # We didn't start the kernel with this KernelManager so we don't
940 # We didn't start the kernel with this KernelManager so we don't
934 # know if it is running. We should use a heartbeat for this case.
941 # know if it is running. We should use a heartbeat for this case.
935 return True
942 return True
936
943
937 #--------------------------------------------------------------------------
944 #--------------------------------------------------------------------------
938 # Channels used for communication with the kernel:
945 # Channels used for communication with the kernel:
939 #--------------------------------------------------------------------------
946 #--------------------------------------------------------------------------
940
947
941 @property
948 @property
942 def shell_channel(self):
949 def shell_channel(self):
943 """Get the REQ socket channel object to make requests of the kernel."""
950 """Get the REQ socket channel object to make requests of the kernel."""
944 if self._shell_channel is None:
951 if self._shell_channel is None:
945 self._shell_channel = self.shell_channel_class(self.context,
952 self._shell_channel = self.shell_channel_class(self.context,
946 self.session,
953 self.session,
947 self.shell_address)
954 self.shell_address)
948 return self._shell_channel
955 return self._shell_channel
949
956
950 @property
957 @property
951 def sub_channel(self):
958 def sub_channel(self):
952 """Get the SUB socket channel object."""
959 """Get the SUB socket channel object."""
953 if self._sub_channel is None:
960 if self._sub_channel is None:
954 self._sub_channel = self.sub_channel_class(self.context,
961 self._sub_channel = self.sub_channel_class(self.context,
955 self.session,
962 self.session,
956 self.sub_address)
963 self.sub_address)
957 return self._sub_channel
964 return self._sub_channel
958
965
959 @property
966 @property
960 def stdin_channel(self):
967 def stdin_channel(self):
961 """Get the REP socket channel object to handle stdin (raw_input)."""
968 """Get the REP socket channel object to handle stdin (raw_input)."""
962 if self._stdin_channel is None:
969 if self._stdin_channel is None:
963 self._stdin_channel = self.stdin_channel_class(self.context,
970 self._stdin_channel = self.stdin_channel_class(self.context,
964 self.session,
971 self.session,
965 self.stdin_address)
972 self.stdin_address)
966 return self._stdin_channel
973 return self._stdin_channel
967
974
968 @property
975 @property
969 def hb_channel(self):
976 def hb_channel(self):
970 """Get the heartbeat socket channel object to check that the
977 """Get the heartbeat socket channel object to check that the
971 kernel is alive."""
978 kernel is alive."""
972 if self._hb_channel is None:
979 if self._hb_channel is None:
973 self._hb_channel = self.hb_channel_class(self.context,
980 self._hb_channel = self.hb_channel_class(self.context,
974 self.session,
981 self.session,
975 self.hb_address)
982 self.hb_address)
976 return self._hb_channel
983 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now