##// END OF EJS Templates
Safe kernel killing in Windows.
epatters -
Show More
@@ -1,892 +1,899 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 from Queue import Queue, Empty
19 from Queue import Queue, Empty
20 from subprocess import Popen
20 from subprocess import Popen
21 import signal
21 import signal
22 import sys
22 import sys
23 from threading import Thread
23 from threading import Thread
24 import time
24 import time
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils import io
32 from IPython.utils import io
33 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 from session import Session
34 from session import Session
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Constants and exceptions
37 # Constants and exceptions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 LOCALHOST = '127.0.0.1'
40 LOCALHOST = '127.0.0.1'
41
41
42 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
43 pass
43 pass
44
44
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46 # Utility functions
46 # Utility functions
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49 # some utilities to validate message structure, these might get moved elsewhere
49 # some utilities to validate message structure, these might get moved elsewhere
50 # if they prove to have more generic utility
50 # if they prove to have more generic utility
51
51
52 def validate_string_list(lst):
52 def validate_string_list(lst):
53 """Validate that the input is a list of strings.
53 """Validate that the input is a list of strings.
54
54
55 Raises ValueError if not."""
55 Raises ValueError if not."""
56 if not isinstance(lst, list):
56 if not isinstance(lst, list):
57 raise ValueError('input %r must be a list' % lst)
57 raise ValueError('input %r must be a list' % lst)
58 for x in lst:
58 for x in lst:
59 if not isinstance(x, basestring):
59 if not isinstance(x, basestring):
60 raise ValueError('element %r in list must be a string' % x)
60 raise ValueError('element %r in list must be a string' % x)
61
61
62
62
63 def validate_string_dict(dct):
63 def validate_string_dict(dct):
64 """Validate that the input is a dict with string keys and values.
64 """Validate that the input is a dict with string keys and values.
65
65
66 Raises ValueError if not."""
66 Raises ValueError if not."""
67 for k,v in dct.iteritems():
67 for k,v in dct.iteritems():
68 if not isinstance(k, basestring):
68 if not isinstance(k, basestring):
69 raise ValueError('key %r in dict must be a string' % k)
69 raise ValueError('key %r in dict must be a string' % k)
70 if not isinstance(v, basestring):
70 if not isinstance(v, basestring):
71 raise ValueError('value %r in dict must be a string' % v)
71 raise ValueError('value %r in dict must be a string' % v)
72
72
73
73
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75 # ZMQ Socket Channel classes
75 # ZMQ Socket Channel classes
76 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
77
77
78 class ZmqSocketChannel(Thread):
78 class ZmqSocketChannel(Thread):
79 """The base class for the channels that use ZMQ sockets.
79 """The base class for the channels that use ZMQ sockets.
80 """
80 """
81 context = None
81 context = None
82 session = None
82 session = None
83 socket = None
83 socket = None
84 ioloop = None
84 ioloop = None
85 iostate = None
85 iostate = None
86 _address = None
86 _address = None
87
87
88 def __init__(self, context, session, address):
88 def __init__(self, context, session, address):
89 """Create a channel
89 """Create a channel
90
90
91 Parameters
91 Parameters
92 ----------
92 ----------
93 context : :class:`zmq.Context`
93 context : :class:`zmq.Context`
94 The ZMQ context to use.
94 The ZMQ context to use.
95 session : :class:`session.Session`
95 session : :class:`session.Session`
96 The session to use.
96 The session to use.
97 address : tuple
97 address : tuple
98 Standard (ip, port) tuple that the kernel is listening on.
98 Standard (ip, port) tuple that the kernel is listening on.
99 """
99 """
100 super(ZmqSocketChannel, self).__init__()
100 super(ZmqSocketChannel, self).__init__()
101 self.daemon = True
101 self.daemon = True
102
102
103 self.context = context
103 self.context = context
104 self.session = session
104 self.session = session
105 if address[1] == 0:
105 if address[1] == 0:
106 message = 'The port number for a channel cannot be 0.'
106 message = 'The port number for a channel cannot be 0.'
107 raise InvalidPortNumber(message)
107 raise InvalidPortNumber(message)
108 self._address = address
108 self._address = address
109
109
110 def stop(self):
110 def stop(self):
111 """Stop the channel's activity.
111 """Stop the channel's activity.
112
112
113 This calls :method:`Thread.join` and returns when the thread
113 This calls :method:`Thread.join` and returns when the thread
114 terminates. :class:`RuntimeError` will be raised if
114 terminates. :class:`RuntimeError` will be raised if
115 :method:`self.start` is called again.
115 :method:`self.start` is called again.
116 """
116 """
117 self.join()
117 self.join()
118
118
119 @property
119 @property
120 def address(self):
120 def address(self):
121 """Get the channel's address as an (ip, port) tuple.
121 """Get the channel's address as an (ip, port) tuple.
122
122
123 By the default, the address is (localhost, 0), where 0 means a random
123 By the default, the address is (localhost, 0), where 0 means a random
124 port.
124 port.
125 """
125 """
126 return self._address
126 return self._address
127
127
128 def add_io_state(self, state):
128 def add_io_state(self, state):
129 """Add IO state to the eventloop.
129 """Add IO state to the eventloop.
130
130
131 Parameters
131 Parameters
132 ----------
132 ----------
133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 The IO state flag to set.
134 The IO state flag to set.
135
135
136 This is thread safe as it uses the thread safe IOLoop.add_callback.
136 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 """
137 """
138 def add_io_state_callback():
138 def add_io_state_callback():
139 if not self.iostate & state:
139 if not self.iostate & state:
140 self.iostate = self.iostate | state
140 self.iostate = self.iostate | state
141 self.ioloop.update_handler(self.socket, self.iostate)
141 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.add_callback(add_io_state_callback)
142 self.ioloop.add_callback(add_io_state_callback)
143
143
144 def drop_io_state(self, state):
144 def drop_io_state(self, state):
145 """Drop IO state from the eventloop.
145 """Drop IO state from the eventloop.
146
146
147 Parameters
147 Parameters
148 ----------
148 ----------
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 The IO state flag to set.
150 The IO state flag to set.
151
151
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 """
153 """
154 def drop_io_state_callback():
154 def drop_io_state_callback():
155 if self.iostate & state:
155 if self.iostate & state:
156 self.iostate = self.iostate & (~state)
156 self.iostate = self.iostate & (~state)
157 self.ioloop.update_handler(self.socket, self.iostate)
157 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.add_callback(drop_io_state_callback)
158 self.ioloop.add_callback(drop_io_state_callback)
159
159
160
160
161 class XReqSocketChannel(ZmqSocketChannel):
161 class XReqSocketChannel(ZmqSocketChannel):
162 """The XREQ channel for issues request/replies to the kernel.
162 """The XREQ channel for issues request/replies to the kernel.
163 """
163 """
164
164
165 command_queue = None
165 command_queue = None
166
166
167 def __init__(self, context, session, address):
167 def __init__(self, context, session, address):
168 super(XReqSocketChannel, self).__init__(context, session, address)
168 super(XReqSocketChannel, self).__init__(context, session, address)
169 self.command_queue = Queue()
169 self.command_queue = Queue()
170 self.ioloop = ioloop.IOLoop()
170 self.ioloop = ioloop.IOLoop()
171
171
172 def run(self):
172 def run(self):
173 """The thread's main activity. Call start() instead."""
173 """The thread's main activity. Call start() instead."""
174 self.socket = self.context.socket(zmq.XREQ)
174 self.socket = self.context.socket(zmq.XREQ)
175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.connect('tcp://%s:%i' % self.address)
176 self.socket.connect('tcp://%s:%i' % self.address)
177 self.iostate = POLLERR|POLLIN
177 self.iostate = POLLERR|POLLIN
178 self.ioloop.add_handler(self.socket, self._handle_events,
178 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.iostate)
179 self.iostate)
180 self.ioloop.start()
180 self.ioloop.start()
181
181
182 def stop(self):
182 def stop(self):
183 self.ioloop.stop()
183 self.ioloop.stop()
184 super(XReqSocketChannel, self).stop()
184 super(XReqSocketChannel, self).stop()
185
185
186 def call_handlers(self, msg):
186 def call_handlers(self, msg):
187 """This method is called in the ioloop thread when a message arrives.
187 """This method is called in the ioloop thread when a message arrives.
188
188
189 Subclasses should override this method to handle incoming messages.
189 Subclasses should override this method to handle incoming messages.
190 It is important to remember that this method is called in the thread
190 It is important to remember that this method is called in the thread
191 so that some logic must be done to ensure that the application leve
191 so that some logic must be done to ensure that the application leve
192 handlers are called in the application thread.
192 handlers are called in the application thread.
193 """
193 """
194 raise NotImplementedError('call_handlers must be defined in a subclass.')
194 raise NotImplementedError('call_handlers must be defined in a subclass.')
195
195
196 def execute(self, code, silent=False,
196 def execute(self, code, silent=False,
197 user_variables=None, user_expressions=None):
197 user_variables=None, user_expressions=None):
198 """Execute code in the kernel.
198 """Execute code in the kernel.
199
199
200 Parameters
200 Parameters
201 ----------
201 ----------
202 code : str
202 code : str
203 A string of Python code.
203 A string of Python code.
204
204
205 silent : bool, optional (default False)
205 silent : bool, optional (default False)
206 If set, the kernel will execute the code as quietly possible.
206 If set, the kernel will execute the code as quietly possible.
207
207
208 user_variables : list, optional
208 user_variables : list, optional
209 A list of variable names to pull from the user's namespace. They
209 A list of variable names to pull from the user's namespace. They
210 will come back as a dict with these names as keys and their
210 will come back as a dict with these names as keys and their
211 :func:`repr` as values.
211 :func:`repr` as values.
212
212
213 user_expressions : dict, optional
213 user_expressions : dict, optional
214 A dict with string keys and to pull from the user's
214 A dict with string keys and to pull from the user's
215 namespace. They will come back as a dict with these names as keys
215 namespace. They will come back as a dict with these names as keys
216 and their :func:`repr` as values.
216 and their :func:`repr` as values.
217
217
218 Returns
218 Returns
219 -------
219 -------
220 The msg_id of the message sent.
220 The msg_id of the message sent.
221 """
221 """
222 if user_variables is None:
222 if user_variables is None:
223 user_variables = []
223 user_variables = []
224 if user_expressions is None:
224 if user_expressions is None:
225 user_expressions = {}
225 user_expressions = {}
226
226
227 # Don't waste network traffic if inputs are invalid
227 # Don't waste network traffic if inputs are invalid
228 if not isinstance(code, basestring):
228 if not isinstance(code, basestring):
229 raise ValueError('code %r must be a string' % code)
229 raise ValueError('code %r must be a string' % code)
230 validate_string_list(user_variables)
230 validate_string_list(user_variables)
231 validate_string_dict(user_expressions)
231 validate_string_dict(user_expressions)
232
232
233 # Create class for content/msg creation. Related to, but possibly
233 # Create class for content/msg creation. Related to, but possibly
234 # not in Session.
234 # not in Session.
235 content = dict(code=code, silent=silent,
235 content = dict(code=code, silent=silent,
236 user_variables=user_variables,
236 user_variables=user_variables,
237 user_expressions=user_expressions)
237 user_expressions=user_expressions)
238 msg = self.session.msg('execute_request', content)
238 msg = self.session.msg('execute_request', content)
239 self._queue_request(msg)
239 self._queue_request(msg)
240 return msg['header']['msg_id']
240 return msg['header']['msg_id']
241
241
242 def complete(self, text, line, cursor_pos, block=None):
242 def complete(self, text, line, cursor_pos, block=None):
243 """Tab complete text in the kernel's namespace.
243 """Tab complete text in the kernel's namespace.
244
244
245 Parameters
245 Parameters
246 ----------
246 ----------
247 text : str
247 text : str
248 The text to complete.
248 The text to complete.
249 line : str
249 line : str
250 The full line of text that is the surrounding context for the
250 The full line of text that is the surrounding context for the
251 text to complete.
251 text to complete.
252 cursor_pos : int
252 cursor_pos : int
253 The position of the cursor in the line where the completion was
253 The position of the cursor in the line where the completion was
254 requested.
254 requested.
255 block : str, optional
255 block : str, optional
256 The full block of code in which the completion is being requested.
256 The full block of code in which the completion is being requested.
257
257
258 Returns
258 Returns
259 -------
259 -------
260 The msg_id of the message sent.
260 The msg_id of the message sent.
261 """
261 """
262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 msg = self.session.msg('complete_request', content)
263 msg = self.session.msg('complete_request', content)
264 self._queue_request(msg)
264 self._queue_request(msg)
265 return msg['header']['msg_id']
265 return msg['header']['msg_id']
266
266
267 def object_info(self, oname):
267 def object_info(self, oname):
268 """Get metadata information about an object.
268 """Get metadata information about an object.
269
269
270 Parameters
270 Parameters
271 ----------
271 ----------
272 oname : str
272 oname : str
273 A string specifying the object name.
273 A string specifying the object name.
274
274
275 Returns
275 Returns
276 -------
276 -------
277 The msg_id of the message sent.
277 The msg_id of the message sent.
278 """
278 """
279 content = dict(oname=oname)
279 content = dict(oname=oname)
280 msg = self.session.msg('object_info_request', content)
280 msg = self.session.msg('object_info_request', content)
281 self._queue_request(msg)
281 self._queue_request(msg)
282 return msg['header']['msg_id']
282 return msg['header']['msg_id']
283
283
284 def history(self, index=None, raw=False, output=True):
284 def history(self, index=None, raw=False, output=True):
285 """Get the history list.
285 """Get the history list.
286
286
287 Parameters
287 Parameters
288 ----------
288 ----------
289 index : n or (n1, n2) or None
289 index : n or (n1, n2) or None
290 If n, then the last entries. If a tuple, then all in
290 If n, then the last entries. If a tuple, then all in
291 range(n1, n2). If None, then all entries. Raises IndexError if
291 range(n1, n2). If None, then all entries. Raises IndexError if
292 the format of index is incorrect.
292 the format of index is incorrect.
293 raw : bool
293 raw : bool
294 If True, return the raw input.
294 If True, return the raw input.
295 output : bool
295 output : bool
296 If True, then return the output as well.
296 If True, then return the output as well.
297
297
298 Returns
298 Returns
299 -------
299 -------
300 The msg_id of the message sent.
300 The msg_id of the message sent.
301 """
301 """
302 content = dict(index=index, raw=raw, output=output)
302 content = dict(index=index, raw=raw, output=output)
303 msg = self.session.msg('history_request', content)
303 msg = self.session.msg('history_request', content)
304 self._queue_request(msg)
304 self._queue_request(msg)
305 return msg['header']['msg_id']
305 return msg['header']['msg_id']
306
306
307 def shutdown(self):
307 def shutdown(self):
308 """Request an immediate kernel shutdown.
308 """Request an immediate kernel shutdown.
309
309
310 Upon receipt of the (empty) reply, client code can safely assume that
310 Upon receipt of the (empty) reply, client code can safely assume that
311 the kernel has shut down and it's safe to forcefully terminate it if
311 the kernel has shut down and it's safe to forcefully terminate it if
312 it's still alive.
312 it's still alive.
313
313
314 The kernel will send the reply via a function registered with Python's
314 The kernel will send the reply via a function registered with Python's
315 atexit module, ensuring it's truly done as the kernel is done with all
315 atexit module, ensuring it's truly done as the kernel is done with all
316 normal operation.
316 normal operation.
317 """
317 """
318 # Send quit message to kernel. Once we implement kernel-side setattr,
318 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # this should probably be done that way, but for now this will do.
319 # this should probably be done that way, but for now this will do.
320 msg = self.session.msg('shutdown_request', {})
320 msg = self.session.msg('shutdown_request', {})
321 self._queue_request(msg)
321 self._queue_request(msg)
322 return msg['header']['msg_id']
322 return msg['header']['msg_id']
323
323
324 def _handle_events(self, socket, events):
324 def _handle_events(self, socket, events):
325 if events & POLLERR:
325 if events & POLLERR:
326 self._handle_err()
326 self._handle_err()
327 if events & POLLOUT:
327 if events & POLLOUT:
328 self._handle_send()
328 self._handle_send()
329 if events & POLLIN:
329 if events & POLLIN:
330 self._handle_recv()
330 self._handle_recv()
331
331
332 def _handle_recv(self):
332 def _handle_recv(self):
333 msg = self.socket.recv_json()
333 msg = self.socket.recv_json()
334 self.call_handlers(msg)
334 self.call_handlers(msg)
335
335
336 def _handle_send(self):
336 def _handle_send(self):
337 try:
337 try:
338 msg = self.command_queue.get(False)
338 msg = self.command_queue.get(False)
339 except Empty:
339 except Empty:
340 pass
340 pass
341 else:
341 else:
342 self.socket.send_json(msg)
342 self.socket.send_json(msg)
343 if self.command_queue.empty():
343 if self.command_queue.empty():
344 self.drop_io_state(POLLOUT)
344 self.drop_io_state(POLLOUT)
345
345
346 def _handle_err(self):
346 def _handle_err(self):
347 # We don't want to let this go silently, so eventually we should log.
347 # We don't want to let this go silently, so eventually we should log.
348 raise zmq.ZMQError()
348 raise zmq.ZMQError()
349
349
350 def _queue_request(self, msg):
350 def _queue_request(self, msg):
351 self.command_queue.put(msg)
351 self.command_queue.put(msg)
352 self.add_io_state(POLLOUT)
352 self.add_io_state(POLLOUT)
353
353
354
354
355 class SubSocketChannel(ZmqSocketChannel):
355 class SubSocketChannel(ZmqSocketChannel):
356 """The SUB channel which listens for messages that the kernel publishes.
356 """The SUB channel which listens for messages that the kernel publishes.
357 """
357 """
358
358
359 def __init__(self, context, session, address):
359 def __init__(self, context, session, address):
360 super(SubSocketChannel, self).__init__(context, session, address)
360 super(SubSocketChannel, self).__init__(context, session, address)
361 self.ioloop = ioloop.IOLoop()
361 self.ioloop = ioloop.IOLoop()
362
362
363 def run(self):
363 def run(self):
364 """The thread's main activity. Call start() instead."""
364 """The thread's main activity. Call start() instead."""
365 self.socket = self.context.socket(zmq.SUB)
365 self.socket = self.context.socket(zmq.SUB)
366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.connect('tcp://%s:%i' % self.address)
368 self.socket.connect('tcp://%s:%i' % self.address)
369 self.iostate = POLLIN|POLLERR
369 self.iostate = POLLIN|POLLERR
370 self.ioloop.add_handler(self.socket, self._handle_events,
370 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.iostate)
371 self.iostate)
372 self.ioloop.start()
372 self.ioloop.start()
373
373
374 def stop(self):
374 def stop(self):
375 self.ioloop.stop()
375 self.ioloop.stop()
376 super(SubSocketChannel, self).stop()
376 super(SubSocketChannel, self).stop()
377
377
378 def call_handlers(self, msg):
378 def call_handlers(self, msg):
379 """This method is called in the ioloop thread when a message arrives.
379 """This method is called in the ioloop thread when a message arrives.
380
380
381 Subclasses should override this method to handle incoming messages.
381 Subclasses should override this method to handle incoming messages.
382 It is important to remember that this method is called in the thread
382 It is important to remember that this method is called in the thread
383 so that some logic must be done to ensure that the application leve
383 so that some logic must be done to ensure that the application leve
384 handlers are called in the application thread.
384 handlers are called in the application thread.
385 """
385 """
386 raise NotImplementedError('call_handlers must be defined in a subclass.')
386 raise NotImplementedError('call_handlers must be defined in a subclass.')
387
387
388 def flush(self, timeout=1.0):
388 def flush(self, timeout=1.0):
389 """Immediately processes all pending messages on the SUB channel.
389 """Immediately processes all pending messages on the SUB channel.
390
390
391 Callers should use this method to ensure that :method:`call_handlers`
391 Callers should use this method to ensure that :method:`call_handlers`
392 has been called for all messages that have been received on the
392 has been called for all messages that have been received on the
393 0MQ SUB socket of this channel.
393 0MQ SUB socket of this channel.
394
394
395 This method is thread safe.
395 This method is thread safe.
396
396
397 Parameters
397 Parameters
398 ----------
398 ----------
399 timeout : float, optional
399 timeout : float, optional
400 The maximum amount of time to spend flushing, in seconds. The
400 The maximum amount of time to spend flushing, in seconds. The
401 default is one second.
401 default is one second.
402 """
402 """
403 # We do the IOLoop callback process twice to ensure that the IOLoop
403 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # gets to perform at least one full poll.
404 # gets to perform at least one full poll.
405 stop_time = time.time() + timeout
405 stop_time = time.time() + timeout
406 for i in xrange(2):
406 for i in xrange(2):
407 self._flushed = False
407 self._flushed = False
408 self.ioloop.add_callback(self._flush)
408 self.ioloop.add_callback(self._flush)
409 while not self._flushed and time.time() < stop_time:
409 while not self._flushed and time.time() < stop_time:
410 time.sleep(0.01)
410 time.sleep(0.01)
411
411
412 def _handle_events(self, socket, events):
412 def _handle_events(self, socket, events):
413 # Turn on and off POLLOUT depending on if we have made a request
413 # Turn on and off POLLOUT depending on if we have made a request
414 if events & POLLERR:
414 if events & POLLERR:
415 self._handle_err()
415 self._handle_err()
416 if events & POLLIN:
416 if events & POLLIN:
417 self._handle_recv()
417 self._handle_recv()
418
418
419 def _handle_err(self):
419 def _handle_err(self):
420 # We don't want to let this go silently, so eventually we should log.
420 # We don't want to let this go silently, so eventually we should log.
421 raise zmq.ZMQError()
421 raise zmq.ZMQError()
422
422
423 def _handle_recv(self):
423 def _handle_recv(self):
424 # Get all of the messages we can
424 # Get all of the messages we can
425 while True:
425 while True:
426 try:
426 try:
427 msg = self.socket.recv_json(zmq.NOBLOCK)
427 msg = self.socket.recv_json(zmq.NOBLOCK)
428 except zmq.ZMQError:
428 except zmq.ZMQError:
429 # Check the errno?
429 # Check the errno?
430 # Will this trigger POLLERR?
430 # Will this trigger POLLERR?
431 break
431 break
432 else:
432 else:
433 self.call_handlers(msg)
433 self.call_handlers(msg)
434
434
435 def _flush(self):
435 def _flush(self):
436 """Callback for :method:`self.flush`."""
436 """Callback for :method:`self.flush`."""
437 self._flushed = True
437 self._flushed = True
438
438
439
439
440 class RepSocketChannel(ZmqSocketChannel):
440 class RepSocketChannel(ZmqSocketChannel):
441 """A reply channel to handle raw_input requests that the kernel makes."""
441 """A reply channel to handle raw_input requests that the kernel makes."""
442
442
443 msg_queue = None
443 msg_queue = None
444
444
445 def __init__(self, context, session, address):
445 def __init__(self, context, session, address):
446 super(RepSocketChannel, self).__init__(context, session, address)
446 super(RepSocketChannel, self).__init__(context, session, address)
447 self.ioloop = ioloop.IOLoop()
447 self.ioloop = ioloop.IOLoop()
448 self.msg_queue = Queue()
448 self.msg_queue = Queue()
449
449
450 def run(self):
450 def run(self):
451 """The thread's main activity. Call start() instead."""
451 """The thread's main activity. Call start() instead."""
452 self.socket = self.context.socket(zmq.XREQ)
452 self.socket = self.context.socket(zmq.XREQ)
453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 self.socket.connect('tcp://%s:%i' % self.address)
454 self.socket.connect('tcp://%s:%i' % self.address)
455 self.iostate = POLLERR|POLLIN
455 self.iostate = POLLERR|POLLIN
456 self.ioloop.add_handler(self.socket, self._handle_events,
456 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.iostate)
457 self.iostate)
458 self.ioloop.start()
458 self.ioloop.start()
459
459
460 def stop(self):
460 def stop(self):
461 self.ioloop.stop()
461 self.ioloop.stop()
462 super(RepSocketChannel, self).stop()
462 super(RepSocketChannel, self).stop()
463
463
464 def call_handlers(self, msg):
464 def call_handlers(self, msg):
465 """This method is called in the ioloop thread when a message arrives.
465 """This method is called in the ioloop thread when a message arrives.
466
466
467 Subclasses should override this method to handle incoming messages.
467 Subclasses should override this method to handle incoming messages.
468 It is important to remember that this method is called in the thread
468 It is important to remember that this method is called in the thread
469 so that some logic must be done to ensure that the application leve
469 so that some logic must be done to ensure that the application leve
470 handlers are called in the application thread.
470 handlers are called in the application thread.
471 """
471 """
472 raise NotImplementedError('call_handlers must be defined in a subclass.')
472 raise NotImplementedError('call_handlers must be defined in a subclass.')
473
473
474 def input(self, string):
474 def input(self, string):
475 """Send a string of raw input to the kernel."""
475 """Send a string of raw input to the kernel."""
476 content = dict(value=string)
476 content = dict(value=string)
477 msg = self.session.msg('input_reply', content)
477 msg = self.session.msg('input_reply', content)
478 self._queue_reply(msg)
478 self._queue_reply(msg)
479
479
480 def _handle_events(self, socket, events):
480 def _handle_events(self, socket, events):
481 if events & POLLERR:
481 if events & POLLERR:
482 self._handle_err()
482 self._handle_err()
483 if events & POLLOUT:
483 if events & POLLOUT:
484 self._handle_send()
484 self._handle_send()
485 if events & POLLIN:
485 if events & POLLIN:
486 self._handle_recv()
486 self._handle_recv()
487
487
488 def _handle_recv(self):
488 def _handle_recv(self):
489 msg = self.socket.recv_json()
489 msg = self.socket.recv_json()
490 self.call_handlers(msg)
490 self.call_handlers(msg)
491
491
492 def _handle_send(self):
492 def _handle_send(self):
493 try:
493 try:
494 msg = self.msg_queue.get(False)
494 msg = self.msg_queue.get(False)
495 except Empty:
495 except Empty:
496 pass
496 pass
497 else:
497 else:
498 self.socket.send_json(msg)
498 self.socket.send_json(msg)
499 if self.msg_queue.empty():
499 if self.msg_queue.empty():
500 self.drop_io_state(POLLOUT)
500 self.drop_io_state(POLLOUT)
501
501
502 def _handle_err(self):
502 def _handle_err(self):
503 # We don't want to let this go silently, so eventually we should log.
503 # We don't want to let this go silently, so eventually we should log.
504 raise zmq.ZMQError()
504 raise zmq.ZMQError()
505
505
506 def _queue_reply(self, msg):
506 def _queue_reply(self, msg):
507 self.msg_queue.put(msg)
507 self.msg_queue.put(msg)
508 self.add_io_state(POLLOUT)
508 self.add_io_state(POLLOUT)
509
509
510
510
511 class HBSocketChannel(ZmqSocketChannel):
511 class HBSocketChannel(ZmqSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat.
512 """The heartbeat channel which monitors the kernel heartbeat.
513
513
514 Note that the heartbeat channel is paused by default. As long as you start
514 Note that the heartbeat channel is paused by default. As long as you start
515 this channel, the kernel manager will ensure that it is paused and un-paused
515 this channel, the kernel manager will ensure that it is paused and un-paused
516 as appropriate.
516 as appropriate.
517 """
517 """
518
518
519 time_to_dead = 3.0
519 time_to_dead = 3.0
520 socket = None
520 socket = None
521 poller = None
521 poller = None
522 _running = None
522 _running = None
523 _pause = None
523 _pause = None
524
524
525 def __init__(self, context, session, address):
525 def __init__(self, context, session, address):
526 super(HBSocketChannel, self).__init__(context, session, address)
526 super(HBSocketChannel, self).__init__(context, session, address)
527 self._running = False
527 self._running = False
528 self._pause = True
528 self._pause = True
529
529
530 def _create_socket(self):
530 def _create_socket(self):
531 self.socket = self.context.socket(zmq.REQ)
531 self.socket = self.context.socket(zmq.REQ)
532 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
532 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
533 self.socket.connect('tcp://%s:%i' % self.address)
533 self.socket.connect('tcp://%s:%i' % self.address)
534 self.poller = zmq.Poller()
534 self.poller = zmq.Poller()
535 self.poller.register(self.socket, zmq.POLLIN)
535 self.poller.register(self.socket, zmq.POLLIN)
536
536
537 def run(self):
537 def run(self):
538 """The thread's main activity. Call start() instead."""
538 """The thread's main activity. Call start() instead."""
539 self._create_socket()
539 self._create_socket()
540 self._running = True
540 self._running = True
541 while self._running:
541 while self._running:
542 if self._pause:
542 if self._pause:
543 time.sleep(self.time_to_dead)
543 time.sleep(self.time_to_dead)
544 else:
544 else:
545 since_last_heartbeat = 0.0
545 since_last_heartbeat = 0.0
546 request_time = time.time()
546 request_time = time.time()
547 try:
547 try:
548 #io.rprint('Ping from HB channel') # dbg
548 #io.rprint('Ping from HB channel') # dbg
549 self.socket.send_json('ping')
549 self.socket.send_json('ping')
550 except zmq.ZMQError, e:
550 except zmq.ZMQError, e:
551 #io.rprint('*** HB Error:', e) # dbg
551 #io.rprint('*** HB Error:', e) # dbg
552 if e.errno == zmq.EFSM:
552 if e.errno == zmq.EFSM:
553 #io.rprint('sleep...', self.time_to_dead) # dbg
553 #io.rprint('sleep...', self.time_to_dead) # dbg
554 time.sleep(self.time_to_dead)
554 time.sleep(self.time_to_dead)
555 self._create_socket()
555 self._create_socket()
556 else:
556 else:
557 raise
557 raise
558 else:
558 else:
559 while True:
559 while True:
560 try:
560 try:
561 self.socket.recv_json(zmq.NOBLOCK)
561 self.socket.recv_json(zmq.NOBLOCK)
562 except zmq.ZMQError, e:
562 except zmq.ZMQError, e:
563 #io.rprint('*** HB Error 2:', e) # dbg
563 #io.rprint('*** HB Error 2:', e) # dbg
564 if e.errno == zmq.EAGAIN:
564 if e.errno == zmq.EAGAIN:
565 before_poll = time.time()
565 before_poll = time.time()
566 until_dead = self.time_to_dead - (before_poll -
566 until_dead = self.time_to_dead - (before_poll -
567 request_time)
567 request_time)
568
568
569 # When the return value of poll() is an empty
569 # When the return value of poll() is an empty
570 # list, that is when things have gone wrong
570 # list, that is when things have gone wrong
571 # (zeromq bug). As long as it is not an empty
571 # (zeromq bug). As long as it is not an empty
572 # list, poll is working correctly even if it
572 # list, poll is working correctly even if it
573 # returns quickly. Note: poll timeout is in
573 # returns quickly. Note: poll timeout is in
574 # milliseconds.
574 # milliseconds.
575 self.poller.poll(1000*until_dead)
575 self.poller.poll(1000*until_dead)
576
576
577 since_last_heartbeat = time.time()-request_time
577 since_last_heartbeat = time.time()-request_time
578 if since_last_heartbeat > self.time_to_dead:
578 if since_last_heartbeat > self.time_to_dead:
579 self.call_handlers(since_last_heartbeat)
579 self.call_handlers(since_last_heartbeat)
580 break
580 break
581 else:
581 else:
582 # FIXME: We should probably log this instead.
582 # FIXME: We should probably log this instead.
583 raise
583 raise
584 else:
584 else:
585 until_dead = self.time_to_dead - (time.time() -
585 until_dead = self.time_to_dead - (time.time() -
586 request_time)
586 request_time)
587 if until_dead > 0.0:
587 if until_dead > 0.0:
588 #io.rprint('sleep...', self.time_to_dead) # dbg
588 #io.rprint('sleep...', self.time_to_dead) # dbg
589 time.sleep(until_dead)
589 time.sleep(until_dead)
590 break
590 break
591
591
592 def pause(self):
592 def pause(self):
593 """Pause the heartbeat."""
593 """Pause the heartbeat."""
594 self._pause = True
594 self._pause = True
595
595
596 def unpause(self):
596 def unpause(self):
597 """Unpause the heartbeat."""
597 """Unpause the heartbeat."""
598 self._pause = False
598 self._pause = False
599
599
600 def is_beating(self):
600 def is_beating(self):
601 """Is the heartbeat running and not paused."""
601 """Is the heartbeat running and not paused."""
602 if self.is_alive() and not self._pause:
602 if self.is_alive() and not self._pause:
603 return True
603 return True
604 else:
604 else:
605 return False
605 return False
606
606
607 def stop(self):
607 def stop(self):
608 self._running = False
608 self._running = False
609 super(HBSocketChannel, self).stop()
609 super(HBSocketChannel, self).stop()
610
610
611 def call_handlers(self, since_last_heartbeat):
611 def call_handlers(self, since_last_heartbeat):
612 """This method is called in the ioloop thread when a message arrives.
612 """This method is called in the ioloop thread when a message arrives.
613
613
614 Subclasses should override this method to handle incoming messages.
614 Subclasses should override this method to handle incoming messages.
615 It is important to remember that this method is called in the thread
615 It is important to remember that this method is called in the thread
616 so that some logic must be done to ensure that the application leve
616 so that some logic must be done to ensure that the application leve
617 handlers are called in the application thread.
617 handlers are called in the application thread.
618 """
618 """
619 raise NotImplementedError('call_handlers must be defined in a subclass.')
619 raise NotImplementedError('call_handlers must be defined in a subclass.')
620
620
621
621
622 #-----------------------------------------------------------------------------
622 #-----------------------------------------------------------------------------
623 # Main kernel manager class
623 # Main kernel manager class
624 #-----------------------------------------------------------------------------
624 #-----------------------------------------------------------------------------
625
625
626 class KernelManager(HasTraits):
626 class KernelManager(HasTraits):
627 """ Manages a kernel for a frontend.
627 """ Manages a kernel for a frontend.
628
628
629 The SUB channel is for the frontend to receive messages published by the
629 The SUB channel is for the frontend to receive messages published by the
630 kernel.
630 kernel.
631
631
632 The REQ channel is for the frontend to make requests of the kernel.
632 The REQ channel is for the frontend to make requests of the kernel.
633
633
634 The REP channel is for the kernel to request stdin (raw_input) from the
634 The REP channel is for the kernel to request stdin (raw_input) from the
635 frontend.
635 frontend.
636 """
636 """
637 # The PyZMQ Context to use for communication with the kernel.
637 # The PyZMQ Context to use for communication with the kernel.
638 context = Instance(zmq.Context,(),{})
638 context = Instance(zmq.Context,(),{})
639
639
640 # The Session to use for communication with the kernel.
640 # The Session to use for communication with the kernel.
641 session = Instance(Session,(),{})
641 session = Instance(Session,(),{})
642
642
643 # The kernel process with which the KernelManager is communicating.
643 # The kernel process with which the KernelManager is communicating.
644 kernel = Instance(Popen)
644 kernel = Instance(Popen)
645
645
646 # The addresses for the communication channels.
646 # The addresses for the communication channels.
647 xreq_address = TCPAddress((LOCALHOST, 0))
647 xreq_address = TCPAddress((LOCALHOST, 0))
648 sub_address = TCPAddress((LOCALHOST, 0))
648 sub_address = TCPAddress((LOCALHOST, 0))
649 rep_address = TCPAddress((LOCALHOST, 0))
649 rep_address = TCPAddress((LOCALHOST, 0))
650 hb_address = TCPAddress((LOCALHOST, 0))
650 hb_address = TCPAddress((LOCALHOST, 0))
651
651
652 # The classes to use for the various channels.
652 # The classes to use for the various channels.
653 xreq_channel_class = Type(XReqSocketChannel)
653 xreq_channel_class = Type(XReqSocketChannel)
654 sub_channel_class = Type(SubSocketChannel)
654 sub_channel_class = Type(SubSocketChannel)
655 rep_channel_class = Type(RepSocketChannel)
655 rep_channel_class = Type(RepSocketChannel)
656 hb_channel_class = Type(HBSocketChannel)
656 hb_channel_class = Type(HBSocketChannel)
657
657
658 # Protected traits.
658 # Protected traits.
659 _launch_args = Any
659 _launch_args = Any
660 _xreq_channel = Any
660 _xreq_channel = Any
661 _sub_channel = Any
661 _sub_channel = Any
662 _rep_channel = Any
662 _rep_channel = Any
663 _hb_channel = Any
663 _hb_channel = Any
664
664
665 #--------------------------------------------------------------------------
665 #--------------------------------------------------------------------------
666 # Channel management methods:
666 # Channel management methods:
667 #--------------------------------------------------------------------------
667 #--------------------------------------------------------------------------
668
668
669 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
669 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 """Starts the channels for this kernel.
670 """Starts the channels for this kernel.
671
671
672 This will create the channels if they do not exist and then start
672 This will create the channels if they do not exist and then start
673 them. If port numbers of 0 are being used (random ports) then you
673 them. If port numbers of 0 are being used (random ports) then you
674 must first call :method:`start_kernel`. If the channels have been
674 must first call :method:`start_kernel`. If the channels have been
675 stopped and you call this, :class:`RuntimeError` will be raised.
675 stopped and you call this, :class:`RuntimeError` will be raised.
676 """
676 """
677 if xreq:
677 if xreq:
678 self.xreq_channel.start()
678 self.xreq_channel.start()
679 if sub:
679 if sub:
680 self.sub_channel.start()
680 self.sub_channel.start()
681 if rep:
681 if rep:
682 self.rep_channel.start()
682 self.rep_channel.start()
683 if hb:
683 if hb:
684 self.hb_channel.start()
684 self.hb_channel.start()
685
685
686 def stop_channels(self):
686 def stop_channels(self):
687 """Stops all the running channels for this kernel.
687 """Stops all the running channels for this kernel.
688 """
688 """
689 if self.xreq_channel.is_alive():
689 if self.xreq_channel.is_alive():
690 self.xreq_channel.stop()
690 self.xreq_channel.stop()
691 if self.sub_channel.is_alive():
691 if self.sub_channel.is_alive():
692 self.sub_channel.stop()
692 self.sub_channel.stop()
693 if self.rep_channel.is_alive():
693 if self.rep_channel.is_alive():
694 self.rep_channel.stop()
694 self.rep_channel.stop()
695 if self.hb_channel.is_alive():
695 if self.hb_channel.is_alive():
696 self.hb_channel.stop()
696 self.hb_channel.stop()
697
697
698 @property
698 @property
699 def channels_running(self):
699 def channels_running(self):
700 """Are any of the channels created and running?"""
700 """Are any of the channels created and running?"""
701 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
701 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
702 self.rep_channel.is_alive() or self.hb_channel.is_alive())
702 self.rep_channel.is_alive() or self.hb_channel.is_alive())
703
703
704 #--------------------------------------------------------------------------
704 #--------------------------------------------------------------------------
705 # Kernel process management methods:
705 # Kernel process management methods:
706 #--------------------------------------------------------------------------
706 #--------------------------------------------------------------------------
707
707
708 def start_kernel(self, **kw):
708 def start_kernel(self, **kw):
709 """Starts a kernel process and configures the manager to use it.
709 """Starts a kernel process and configures the manager to use it.
710
710
711 If random ports (port=0) are being used, this method must be called
711 If random ports (port=0) are being used, this method must be called
712 before the channels are created.
712 before the channels are created.
713
713
714 Parameters:
714 Parameters:
715 -----------
715 -----------
716 ipython : bool, optional (default True)
716 ipython : bool, optional (default True)
717 Whether to use an IPython kernel instead of a plain Python kernel.
717 Whether to use an IPython kernel instead of a plain Python kernel.
718 """
718 """
719 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
719 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
720 self.rep_address, self.hb_address
720 self.rep_address, self.hb_address
721 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
721 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
722 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
722 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
723 raise RuntimeError("Can only launch a kernel on localhost."
723 raise RuntimeError("Can only launch a kernel on localhost."
724 "Make sure that the '*_address' attributes are "
724 "Make sure that the '*_address' attributes are "
725 "configured properly.")
725 "configured properly.")
726
726
727 self._launch_args = kw.copy()
727 self._launch_args = kw.copy()
728 if kw.pop('ipython', True):
728 if kw.pop('ipython', True):
729 from ipkernel import launch_kernel
729 from ipkernel import launch_kernel
730 else:
730 else:
731 from pykernel import launch_kernel
731 from pykernel import launch_kernel
732 self.kernel, xrep, pub, req, hb = launch_kernel(
732 self.kernel, xrep, pub, req, hb = launch_kernel(
733 xrep_port=xreq[1], pub_port=sub[1],
733 xrep_port=xreq[1], pub_port=sub[1],
734 req_port=rep[1], hb_port=hb[1], **kw)
734 req_port=rep[1], hb_port=hb[1], **kw)
735 self.xreq_address = (LOCALHOST, xrep)
735 self.xreq_address = (LOCALHOST, xrep)
736 self.sub_address = (LOCALHOST, pub)
736 self.sub_address = (LOCALHOST, pub)
737 self.rep_address = (LOCALHOST, req)
737 self.rep_address = (LOCALHOST, req)
738 self.hb_address = (LOCALHOST, hb)
738 self.hb_address = (LOCALHOST, hb)
739
739
740 def shutdown_kernel(self):
740 def shutdown_kernel(self):
741 """ Attempts to the stop the kernel process cleanly. If the kernel
741 """ Attempts to the stop the kernel process cleanly. If the kernel
742 cannot be stopped, it is killed, if possible.
742 cannot be stopped, it is killed, if possible.
743 """
743 """
744 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
744 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
745 if sys.platform == 'win32':
745 if sys.platform == 'win32':
746 self.kill_kernel()
746 self.kill_kernel()
747 return
747 return
748
748
749 # Pause the heart beat channel if it exists.
749 # Pause the heart beat channel if it exists.
750 if self._hb_channel is not None:
750 if self._hb_channel is not None:
751 self._hb_channel.pause()
751 self._hb_channel.pause()
752
752
753 # Don't send any additional kernel kill messages immediately, to give
753 # Don't send any additional kernel kill messages immediately, to give
754 # the kernel a chance to properly execute shutdown actions. Wait for at
754 # the kernel a chance to properly execute shutdown actions. Wait for at
755 # most 1s, checking every 0.1s.
755 # most 1s, checking every 0.1s.
756 self.xreq_channel.shutdown()
756 self.xreq_channel.shutdown()
757 for i in range(10):
757 for i in range(10):
758 if self.is_alive:
758 if self.is_alive:
759 time.sleep(0.1)
759 time.sleep(0.1)
760 else:
760 else:
761 break
761 break
762 else:
762 else:
763 # OK, we've waited long enough.
763 # OK, we've waited long enough.
764 if self.has_kernel:
764 if self.has_kernel:
765 self.kill_kernel()
765 self.kill_kernel()
766
766
767 def restart_kernel(self, now=False):
767 def restart_kernel(self, now=False):
768 """Restarts a kernel with the same arguments that were used to launch
768 """Restarts a kernel with the same arguments that were used to launch
769 it. If the old kernel was launched with random ports, the same ports
769 it. If the old kernel was launched with random ports, the same ports
770 will be used for the new kernel.
770 will be used for the new kernel.
771
771
772 Parameters
772 Parameters
773 ----------
773 ----------
774 now : bool, optional
774 now : bool, optional
775 If True, the kernel is forcefully restarted *immediately*, without
775 If True, the kernel is forcefully restarted *immediately*, without
776 having a chance to do any cleanup action. Otherwise the kernel is
776 having a chance to do any cleanup action. Otherwise the kernel is
777 given 1s to clean up before a forceful restart is issued.
777 given 1s to clean up before a forceful restart is issued.
778
778
779 In all cases the kernel is restarted, the only difference is whether
779 In all cases the kernel is restarted, the only difference is whether
780 it is given a chance to perform a clean shutdown or not.
780 it is given a chance to perform a clean shutdown or not.
781 """
781 """
782 if self._launch_args is None:
782 if self._launch_args is None:
783 raise RuntimeError("Cannot restart the kernel. "
783 raise RuntimeError("Cannot restart the kernel. "
784 "No previous call to 'start_kernel'.")
784 "No previous call to 'start_kernel'.")
785 else:
785 else:
786 if self.has_kernel:
786 if self.has_kernel:
787 if now:
787 if now:
788 self.kill_kernel()
788 self.kill_kernel()
789 else:
789 else:
790 self.shutdown_kernel()
790 self.shutdown_kernel()
791 self.start_kernel(**self._launch_args)
791 self.start_kernel(**self._launch_args)
792
792
793 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
793 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
794 # unless there is some delay here.
794 # unless there is some delay here.
795 if sys.platform == 'win32':
795 if sys.platform == 'win32':
796 time.sleep(0.2)
796 time.sleep(0.2)
797
797
798 @property
798 @property
799 def has_kernel(self):
799 def has_kernel(self):
800 """Returns whether a kernel process has been specified for the kernel
800 """Returns whether a kernel process has been specified for the kernel
801 manager.
801 manager.
802 """
802 """
803 return self.kernel is not None
803 return self.kernel is not None
804
804
805 def kill_kernel(self):
805 def kill_kernel(self):
806 """ Kill the running kernel. """
806 """ Kill the running kernel. """
807 if self.has_kernel:
807 if self.has_kernel:
808 # Pause the heart beat channel if it exists.
808 # Pause the heart beat channel if it exists.
809 if self._hb_channel is not None:
809 if self._hb_channel is not None:
810 self._hb_channel.pause()
810 self._hb_channel.pause()
811
811
812 self.kernel.kill()
812 # Attempt to kill the kernel.
813 try:
814 self.kernel.kill()
815 except OSError, e:
816 # In Windows, we will get an Access Denied error if the process
817 # has already terminated. Ignore it.
818 if not (sys.platform == 'win32' and e.winerror == 5):
819 raise
813 self.kernel = None
820 self.kernel = None
814 else:
821 else:
815 raise RuntimeError("Cannot kill kernel. No kernel is running!")
822 raise RuntimeError("Cannot kill kernel. No kernel is running!")
816
823
817 def interrupt_kernel(self):
824 def interrupt_kernel(self):
818 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
825 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
819 well supported on all platforms.
826 well supported on all platforms.
820 """
827 """
821 if self.has_kernel:
828 if self.has_kernel:
822 if sys.platform == 'win32':
829 if sys.platform == 'win32':
823 from parentpoller import ParentPollerWindows as Poller
830 from parentpoller import ParentPollerWindows as Poller
824 Poller.send_interrupt(self.kernel.win32_interrupt_event)
831 Poller.send_interrupt(self.kernel.win32_interrupt_event)
825 else:
832 else:
826 self.kernel.send_signal(signal.SIGINT)
833 self.kernel.send_signal(signal.SIGINT)
827 else:
834 else:
828 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
835 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
829
836
830 def signal_kernel(self, signum):
837 def signal_kernel(self, signum):
831 """ Sends a signal to the kernel. Note that since only SIGTERM is
838 """ Sends a signal to the kernel. Note that since only SIGTERM is
832 supported on Windows, this function is only useful on Unix systems.
839 supported on Windows, this function is only useful on Unix systems.
833 """
840 """
834 if self.has_kernel:
841 if self.has_kernel:
835 self.kernel.send_signal(signum)
842 self.kernel.send_signal(signum)
836 else:
843 else:
837 raise RuntimeError("Cannot signal kernel. No kernel is running!")
844 raise RuntimeError("Cannot signal kernel. No kernel is running!")
838
845
839 @property
846 @property
840 def is_alive(self):
847 def is_alive(self):
841 """Is the kernel process still running?"""
848 """Is the kernel process still running?"""
842 # FIXME: not using a heartbeat means this method is broken for any
849 # FIXME: not using a heartbeat means this method is broken for any
843 # remote kernel, it's only capable of handling local kernels.
850 # remote kernel, it's only capable of handling local kernels.
844 if self.has_kernel:
851 if self.has_kernel:
845 if self.kernel.poll() is None:
852 if self.kernel.poll() is None:
846 return True
853 return True
847 else:
854 else:
848 return False
855 return False
849 else:
856 else:
850 # We didn't start the kernel with this KernelManager so we don't
857 # We didn't start the kernel with this KernelManager so we don't
851 # know if it is running. We should use a heartbeat for this case.
858 # know if it is running. We should use a heartbeat for this case.
852 return True
859 return True
853
860
854 #--------------------------------------------------------------------------
861 #--------------------------------------------------------------------------
855 # Channels used for communication with the kernel:
862 # Channels used for communication with the kernel:
856 #--------------------------------------------------------------------------
863 #--------------------------------------------------------------------------
857
864
858 @property
865 @property
859 def xreq_channel(self):
866 def xreq_channel(self):
860 """Get the REQ socket channel object to make requests of the kernel."""
867 """Get the REQ socket channel object to make requests of the kernel."""
861 if self._xreq_channel is None:
868 if self._xreq_channel is None:
862 self._xreq_channel = self.xreq_channel_class(self.context,
869 self._xreq_channel = self.xreq_channel_class(self.context,
863 self.session,
870 self.session,
864 self.xreq_address)
871 self.xreq_address)
865 return self._xreq_channel
872 return self._xreq_channel
866
873
867 @property
874 @property
868 def sub_channel(self):
875 def sub_channel(self):
869 """Get the SUB socket channel object."""
876 """Get the SUB socket channel object."""
870 if self._sub_channel is None:
877 if self._sub_channel is None:
871 self._sub_channel = self.sub_channel_class(self.context,
878 self._sub_channel = self.sub_channel_class(self.context,
872 self.session,
879 self.session,
873 self.sub_address)
880 self.sub_address)
874 return self._sub_channel
881 return self._sub_channel
875
882
876 @property
883 @property
877 def rep_channel(self):
884 def rep_channel(self):
878 """Get the REP socket channel object to handle stdin (raw_input)."""
885 """Get the REP socket channel object to handle stdin (raw_input)."""
879 if self._rep_channel is None:
886 if self._rep_channel is None:
880 self._rep_channel = self.rep_channel_class(self.context,
887 self._rep_channel = self.rep_channel_class(self.context,
881 self.session,
888 self.session,
882 self.rep_address)
889 self.rep_address)
883 return self._rep_channel
890 return self._rep_channel
884
891
885 @property
892 @property
886 def hb_channel(self):
893 def hb_channel(self):
887 """Get the REP socket channel object to handle stdin (raw_input)."""
894 """Get the REP socket channel object to handle stdin (raw_input)."""
888 if self._hb_channel is None:
895 if self._hb_channel is None:
889 self._hb_channel = self.hb_channel_class(self.context,
896 self._hb_channel = self.hb_channel_class(self.context,
890 self.session,
897 self.session,
891 self.hb_address)
898 self.hb_address)
892 return self._hb_channel
899 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now