##// END OF EJS Templates
ignore EINTR in channel loops...
MinRK -
Show More
@@ -1,945 +1,968
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 import errno
20 from Queue import Queue, Empty
21 from Queue import Queue, Empty
21 from subprocess import Popen
22 from subprocess import Popen
22 import signal
23 import signal
23 import sys
24 import sys
24 from threading import Thread
25 from threading import Thread
25 import time
26 import time
26 import logging
27 import logging
27
28
28 # System library imports.
29 # System library imports.
29 import zmq
30 import zmq
30 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
32
33
33 # Local imports.
34 # Local imports.
34 from IPython.utils import io
35 from IPython.utils import io
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 from session import Session, Message
38 from session import Session, Message
38
39
39 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
40 # Constants and exceptions
41 # Constants and exceptions
41 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
42
43
43 class InvalidPortNumber(Exception):
44 class InvalidPortNumber(Exception):
44 pass
45 pass
45
46
46 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
47 # Utility functions
48 # Utility functions
48 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
49
50
50 # some utilities to validate message structure, these might get moved elsewhere
51 # some utilities to validate message structure, these might get moved elsewhere
51 # if they prove to have more generic utility
52 # if they prove to have more generic utility
52
53
53 def validate_string_list(lst):
54 def validate_string_list(lst):
54 """Validate that the input is a list of strings.
55 """Validate that the input is a list of strings.
55
56
56 Raises ValueError if not."""
57 Raises ValueError if not."""
57 if not isinstance(lst, list):
58 if not isinstance(lst, list):
58 raise ValueError('input %r must be a list' % lst)
59 raise ValueError('input %r must be a list' % lst)
59 for x in lst:
60 for x in lst:
60 if not isinstance(x, basestring):
61 if not isinstance(x, basestring):
61 raise ValueError('element %r in list must be a string' % x)
62 raise ValueError('element %r in list must be a string' % x)
62
63
63
64
64 def validate_string_dict(dct):
65 def validate_string_dict(dct):
65 """Validate that the input is a dict with string keys and values.
66 """Validate that the input is a dict with string keys and values.
66
67
67 Raises ValueError if not."""
68 Raises ValueError if not."""
68 for k,v in dct.iteritems():
69 for k,v in dct.iteritems():
69 if not isinstance(k, basestring):
70 if not isinstance(k, basestring):
70 raise ValueError('key %r in dict must be a string' % k)
71 raise ValueError('key %r in dict must be a string' % k)
71 if not isinstance(v, basestring):
72 if not isinstance(v, basestring):
72 raise ValueError('value %r in dict must be a string' % v)
73 raise ValueError('value %r in dict must be a string' % v)
73
74
74
75
75 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
76 # ZMQ Socket Channel classes
77 # ZMQ Socket Channel classes
77 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
78
79
79 class ZmqSocketChannel(Thread):
80 class ZmqSocketChannel(Thread):
80 """The base class for the channels that use ZMQ sockets.
81 """The base class for the channels that use ZMQ sockets.
81 """
82 """
82 context = None
83 context = None
83 session = None
84 session = None
84 socket = None
85 socket = None
85 ioloop = None
86 ioloop = None
86 iostate = None
87 iostate = None
87 _address = None
88 _address = None
88
89
89 def __init__(self, context, session, address):
90 def __init__(self, context, session, address):
90 """Create a channel
91 """Create a channel
91
92
92 Parameters
93 Parameters
93 ----------
94 ----------
94 context : :class:`zmq.Context`
95 context : :class:`zmq.Context`
95 The ZMQ context to use.
96 The ZMQ context to use.
96 session : :class:`session.Session`
97 session : :class:`session.Session`
97 The session to use.
98 The session to use.
98 address : tuple
99 address : tuple
99 Standard (ip, port) tuple that the kernel is listening on.
100 Standard (ip, port) tuple that the kernel is listening on.
100 """
101 """
101 super(ZmqSocketChannel, self).__init__()
102 super(ZmqSocketChannel, self).__init__()
102 self.daemon = True
103 self.daemon = True
103
104
104 self.context = context
105 self.context = context
105 self.session = session
106 self.session = session
106 if address[1] == 0:
107 if address[1] == 0:
107 message = 'The port number for a channel cannot be 0.'
108 message = 'The port number for a channel cannot be 0.'
108 raise InvalidPortNumber(message)
109 raise InvalidPortNumber(message)
109 self._address = address
110 self._address = address
110
111
112 def _run_loop(self):
113 """Run my loop, ignoring EINTR events in the poller"""
114 while True:
115 try:
116 self.ioloop.start()
117 except zmq.ZMQError as e:
118 if e.errno == errno.EINTR:
119 continue
120 else:
121 raise
122 else:
123 break
124
111 def stop(self):
125 def stop(self):
112 """Stop the channel's activity.
126 """Stop the channel's activity.
113
127
114 This calls :method:`Thread.join` and returns when the thread
128 This calls :method:`Thread.join` and returns when the thread
115 terminates. :class:`RuntimeError` will be raised if
129 terminates. :class:`RuntimeError` will be raised if
116 :method:`self.start` is called again.
130 :method:`self.start` is called again.
117 """
131 """
118 self.join()
132 self.join()
119
133
120 @property
134 @property
121 def address(self):
135 def address(self):
122 """Get the channel's address as an (ip, port) tuple.
136 """Get the channel's address as an (ip, port) tuple.
123
137
124 By the default, the address is (localhost, 0), where 0 means a random
138 By the default, the address is (localhost, 0), where 0 means a random
125 port.
139 port.
126 """
140 """
127 return self._address
141 return self._address
128
142
129 def add_io_state(self, state):
143 def add_io_state(self, state):
130 """Add IO state to the eventloop.
144 """Add IO state to the eventloop.
131
145
132 Parameters
146 Parameters
133 ----------
147 ----------
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
148 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 The IO state flag to set.
149 The IO state flag to set.
136
150
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
151 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 """
152 """
139 def add_io_state_callback():
153 def add_io_state_callback():
140 if not self.iostate & state:
154 if not self.iostate & state:
141 self.iostate = self.iostate | state
155 self.iostate = self.iostate | state
142 self.ioloop.update_handler(self.socket, self.iostate)
156 self.ioloop.update_handler(self.socket, self.iostate)
143 self.ioloop.add_callback(add_io_state_callback)
157 self.ioloop.add_callback(add_io_state_callback)
144
158
145 def drop_io_state(self, state):
159 def drop_io_state(self, state):
146 """Drop IO state from the eventloop.
160 """Drop IO state from the eventloop.
147
161
148 Parameters
162 Parameters
149 ----------
163 ----------
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
164 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 The IO state flag to set.
165 The IO state flag to set.
152
166
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
167 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 """
168 """
155 def drop_io_state_callback():
169 def drop_io_state_callback():
156 if self.iostate & state:
170 if self.iostate & state:
157 self.iostate = self.iostate & (~state)
171 self.iostate = self.iostate & (~state)
158 self.ioloop.update_handler(self.socket, self.iostate)
172 self.ioloop.update_handler(self.socket, self.iostate)
159 self.ioloop.add_callback(drop_io_state_callback)
173 self.ioloop.add_callback(drop_io_state_callback)
160
174
161
175
162 class XReqSocketChannel(ZmqSocketChannel):
176 class XReqSocketChannel(ZmqSocketChannel):
163 """The XREQ channel for issues request/replies to the kernel.
177 """The XREQ channel for issues request/replies to the kernel.
164 """
178 """
165
179
166 command_queue = None
180 command_queue = None
167
181
168 def __init__(self, context, session, address):
182 def __init__(self, context, session, address):
169 super(XReqSocketChannel, self).__init__(context, session, address)
183 super(XReqSocketChannel, self).__init__(context, session, address)
170 self.command_queue = Queue()
184 self.command_queue = Queue()
171 self.ioloop = ioloop.IOLoop()
185 self.ioloop = ioloop.IOLoop()
172
186
173 def run(self):
187 def run(self):
174 """The thread's main activity. Call start() instead."""
188 """The thread's main activity. Call start() instead."""
175 self.socket = self.context.socket(zmq.XREQ)
189 self.socket = self.context.socket(zmq.XREQ)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
190 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 self.socket.connect('tcp://%s:%i' % self.address)
191 self.socket.connect('tcp://%s:%i' % self.address)
178 self.iostate = POLLERR|POLLIN
192 self.iostate = POLLERR|POLLIN
179 self.ioloop.add_handler(self.socket, self._handle_events,
193 self.ioloop.add_handler(self.socket, self._handle_events,
180 self.iostate)
194 self.iostate)
181 self.ioloop.start()
195 self._run_loop()
182
196
183 def stop(self):
197 def stop(self):
184 self.ioloop.stop()
198 self.ioloop.stop()
185 super(XReqSocketChannel, self).stop()
199 super(XReqSocketChannel, self).stop()
186
200
187 def call_handlers(self, msg):
201 def call_handlers(self, msg):
188 """This method is called in the ioloop thread when a message arrives.
202 """This method is called in the ioloop thread when a message arrives.
189
203
190 Subclasses should override this method to handle incoming messages.
204 Subclasses should override this method to handle incoming messages.
191 It is important to remember that this method is called in the thread
205 It is important to remember that this method is called in the thread
192 so that some logic must be done to ensure that the application leve
206 so that some logic must be done to ensure that the application leve
193 handlers are called in the application thread.
207 handlers are called in the application thread.
194 """
208 """
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
209 raise NotImplementedError('call_handlers must be defined in a subclass.')
196
210
197 def execute(self, code, silent=False,
211 def execute(self, code, silent=False,
198 user_variables=None, user_expressions=None):
212 user_variables=None, user_expressions=None):
199 """Execute code in the kernel.
213 """Execute code in the kernel.
200
214
201 Parameters
215 Parameters
202 ----------
216 ----------
203 code : str
217 code : str
204 A string of Python code.
218 A string of Python code.
205
219
206 silent : bool, optional (default False)
220 silent : bool, optional (default False)
207 If set, the kernel will execute the code as quietly possible.
221 If set, the kernel will execute the code as quietly possible.
208
222
209 user_variables : list, optional
223 user_variables : list, optional
210 A list of variable names to pull from the user's namespace. They
224 A list of variable names to pull from the user's namespace. They
211 will come back as a dict with these names as keys and their
225 will come back as a dict with these names as keys and their
212 :func:`repr` as values.
226 :func:`repr` as values.
213
227
214 user_expressions : dict, optional
228 user_expressions : dict, optional
215 A dict with string keys and to pull from the user's
229 A dict with string keys and to pull from the user's
216 namespace. They will come back as a dict with these names as keys
230 namespace. They will come back as a dict with these names as keys
217 and their :func:`repr` as values.
231 and their :func:`repr` as values.
218
232
219 Returns
233 Returns
220 -------
234 -------
221 The msg_id of the message sent.
235 The msg_id of the message sent.
222 """
236 """
223 if user_variables is None:
237 if user_variables is None:
224 user_variables = []
238 user_variables = []
225 if user_expressions is None:
239 if user_expressions is None:
226 user_expressions = {}
240 user_expressions = {}
227
241
228 # Don't waste network traffic if inputs are invalid
242 # Don't waste network traffic if inputs are invalid
229 if not isinstance(code, basestring):
243 if not isinstance(code, basestring):
230 raise ValueError('code %r must be a string' % code)
244 raise ValueError('code %r must be a string' % code)
231 validate_string_list(user_variables)
245 validate_string_list(user_variables)
232 validate_string_dict(user_expressions)
246 validate_string_dict(user_expressions)
233
247
234 # Create class for content/msg creation. Related to, but possibly
248 # Create class for content/msg creation. Related to, but possibly
235 # not in Session.
249 # not in Session.
236 content = dict(code=code, silent=silent,
250 content = dict(code=code, silent=silent,
237 user_variables=user_variables,
251 user_variables=user_variables,
238 user_expressions=user_expressions)
252 user_expressions=user_expressions)
239 msg = self.session.msg('execute_request', content)
253 msg = self.session.msg('execute_request', content)
240 self._queue_request(msg)
254 self._queue_request(msg)
241 return msg['header']['msg_id']
255 return msg['header']['msg_id']
242
256
243 def complete(self, text, line, cursor_pos, block=None):
257 def complete(self, text, line, cursor_pos, block=None):
244 """Tab complete text in the kernel's namespace.
258 """Tab complete text in the kernel's namespace.
245
259
246 Parameters
260 Parameters
247 ----------
261 ----------
248 text : str
262 text : str
249 The text to complete.
263 The text to complete.
250 line : str
264 line : str
251 The full line of text that is the surrounding context for the
265 The full line of text that is the surrounding context for the
252 text to complete.
266 text to complete.
253 cursor_pos : int
267 cursor_pos : int
254 The position of the cursor in the line where the completion was
268 The position of the cursor in the line where the completion was
255 requested.
269 requested.
256 block : str, optional
270 block : str, optional
257 The full block of code in which the completion is being requested.
271 The full block of code in which the completion is being requested.
258
272
259 Returns
273 Returns
260 -------
274 -------
261 The msg_id of the message sent.
275 The msg_id of the message sent.
262 """
276 """
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
277 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 msg = self.session.msg('complete_request', content)
278 msg = self.session.msg('complete_request', content)
265 self._queue_request(msg)
279 self._queue_request(msg)
266 return msg['header']['msg_id']
280 return msg['header']['msg_id']
267
281
268 def object_info(self, oname):
282 def object_info(self, oname):
269 """Get metadata information about an object.
283 """Get metadata information about an object.
270
284
271 Parameters
285 Parameters
272 ----------
286 ----------
273 oname : str
287 oname : str
274 A string specifying the object name.
288 A string specifying the object name.
275
289
276 Returns
290 Returns
277 -------
291 -------
278 The msg_id of the message sent.
292 The msg_id of the message sent.
279 """
293 """
280 content = dict(oname=oname)
294 content = dict(oname=oname)
281 msg = self.session.msg('object_info_request', content)
295 msg = self.session.msg('object_info_request', content)
282 self._queue_request(msg)
296 self._queue_request(msg)
283 return msg['header']['msg_id']
297 return msg['header']['msg_id']
284
298
285 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
299 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
286 """Get entries from the history list.
300 """Get entries from the history list.
287
301
288 Parameters
302 Parameters
289 ----------
303 ----------
290 raw : bool
304 raw : bool
291 If True, return the raw input.
305 If True, return the raw input.
292 output : bool
306 output : bool
293 If True, then return the output as well.
307 If True, then return the output as well.
294 hist_access_type : str
308 hist_access_type : str
295 'range' (fill in session, start and stop params), 'tail' (fill in n)
309 'range' (fill in session, start and stop params), 'tail' (fill in n)
296 or 'search' (fill in pattern param).
310 or 'search' (fill in pattern param).
297
311
298 session : int
312 session : int
299 For a range request, the session from which to get lines. Session
313 For a range request, the session from which to get lines. Session
300 numbers are positive integers; negative ones count back from the
314 numbers are positive integers; negative ones count back from the
301 current session.
315 current session.
302 start : int
316 start : int
303 The first line number of a history range.
317 The first line number of a history range.
304 stop : int
318 stop : int
305 The final (excluded) line number of a history range.
319 The final (excluded) line number of a history range.
306
320
307 n : int
321 n : int
308 The number of lines of history to get for a tail request.
322 The number of lines of history to get for a tail request.
309
323
310 pattern : str
324 pattern : str
311 The glob-syntax pattern for a search request.
325 The glob-syntax pattern for a search request.
312
326
313 Returns
327 Returns
314 -------
328 -------
315 The msg_id of the message sent.
329 The msg_id of the message sent.
316 """
330 """
317 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
331 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
318 **kwargs)
332 **kwargs)
319 msg = self.session.msg('history_request', content)
333 msg = self.session.msg('history_request', content)
320 self._queue_request(msg)
334 self._queue_request(msg)
321 return msg['header']['msg_id']
335 return msg['header']['msg_id']
322
336
323 def shutdown(self, restart=False):
337 def shutdown(self, restart=False):
324 """Request an immediate kernel shutdown.
338 """Request an immediate kernel shutdown.
325
339
326 Upon receipt of the (empty) reply, client code can safely assume that
340 Upon receipt of the (empty) reply, client code can safely assume that
327 the kernel has shut down and it's safe to forcefully terminate it if
341 the kernel has shut down and it's safe to forcefully terminate it if
328 it's still alive.
342 it's still alive.
329
343
330 The kernel will send the reply via a function registered with Python's
344 The kernel will send the reply via a function registered with Python's
331 atexit module, ensuring it's truly done as the kernel is done with all
345 atexit module, ensuring it's truly done as the kernel is done with all
332 normal operation.
346 normal operation.
333 """
347 """
334 # Send quit message to kernel. Once we implement kernel-side setattr,
348 # Send quit message to kernel. Once we implement kernel-side setattr,
335 # this should probably be done that way, but for now this will do.
349 # this should probably be done that way, but for now this will do.
336 msg = self.session.msg('shutdown_request', {'restart':restart})
350 msg = self.session.msg('shutdown_request', {'restart':restart})
337 self._queue_request(msg)
351 self._queue_request(msg)
338 return msg['header']['msg_id']
352 return msg['header']['msg_id']
339
353
340 def _handle_events(self, socket, events):
354 def _handle_events(self, socket, events):
341 if events & POLLERR:
355 if events & POLLERR:
342 self._handle_err()
356 self._handle_err()
343 if events & POLLOUT:
357 if events & POLLOUT:
344 self._handle_send()
358 self._handle_send()
345 if events & POLLIN:
359 if events & POLLIN:
346 self._handle_recv()
360 self._handle_recv()
347
361
348 def _handle_recv(self):
362 def _handle_recv(self):
349 ident,msg = self.session.recv(self.socket, 0)
363 ident,msg = self.session.recv(self.socket, 0)
350 self.call_handlers(msg)
364 self.call_handlers(msg)
351
365
352 def _handle_send(self):
366 def _handle_send(self):
353 try:
367 try:
354 msg = self.command_queue.get(False)
368 msg = self.command_queue.get(False)
355 except Empty:
369 except Empty:
356 pass
370 pass
357 else:
371 else:
358 self.session.send(self.socket,msg)
372 self.session.send(self.socket,msg)
359 if self.command_queue.empty():
373 if self.command_queue.empty():
360 self.drop_io_state(POLLOUT)
374 self.drop_io_state(POLLOUT)
361
375
362 def _handle_err(self):
376 def _handle_err(self):
363 # We don't want to let this go silently, so eventually we should log.
377 # We don't want to let this go silently, so eventually we should log.
364 raise zmq.ZMQError()
378 raise zmq.ZMQError()
365
379
366 def _queue_request(self, msg):
380 def _queue_request(self, msg):
367 self.command_queue.put(msg)
381 self.command_queue.put(msg)
368 self.add_io_state(POLLOUT)
382 self.add_io_state(POLLOUT)
369
383
370
384
371 class SubSocketChannel(ZmqSocketChannel):
385 class SubSocketChannel(ZmqSocketChannel):
372 """The SUB channel which listens for messages that the kernel publishes.
386 """The SUB channel which listens for messages that the kernel publishes.
373 """
387 """
374
388
375 def __init__(self, context, session, address):
389 def __init__(self, context, session, address):
376 super(SubSocketChannel, self).__init__(context, session, address)
390 super(SubSocketChannel, self).__init__(context, session, address)
377 self.ioloop = ioloop.IOLoop()
391 self.ioloop = ioloop.IOLoop()
378
392
379 def run(self):
393 def run(self):
380 """The thread's main activity. Call start() instead."""
394 """The thread's main activity. Call start() instead."""
381 self.socket = self.context.socket(zmq.SUB)
395 self.socket = self.context.socket(zmq.SUB)
382 self.socket.setsockopt(zmq.SUBSCRIBE,'')
396 self.socket.setsockopt(zmq.SUBSCRIBE,'')
383 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
397 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
384 self.socket.connect('tcp://%s:%i' % self.address)
398 self.socket.connect('tcp://%s:%i' % self.address)
385 self.iostate = POLLIN|POLLERR
399 self.iostate = POLLIN|POLLERR
386 self.ioloop.add_handler(self.socket, self._handle_events,
400 self.ioloop.add_handler(self.socket, self._handle_events,
387 self.iostate)
401 self.iostate)
388 self.ioloop.start()
402 self._run_loop()
389
403
390 def stop(self):
404 def stop(self):
391 self.ioloop.stop()
405 self.ioloop.stop()
392 super(SubSocketChannel, self).stop()
406 super(SubSocketChannel, self).stop()
393
407
394 def call_handlers(self, msg):
408 def call_handlers(self, msg):
395 """This method is called in the ioloop thread when a message arrives.
409 """This method is called in the ioloop thread when a message arrives.
396
410
397 Subclasses should override this method to handle incoming messages.
411 Subclasses should override this method to handle incoming messages.
398 It is important to remember that this method is called in the thread
412 It is important to remember that this method is called in the thread
399 so that some logic must be done to ensure that the application leve
413 so that some logic must be done to ensure that the application leve
400 handlers are called in the application thread.
414 handlers are called in the application thread.
401 """
415 """
402 raise NotImplementedError('call_handlers must be defined in a subclass.')
416 raise NotImplementedError('call_handlers must be defined in a subclass.')
403
417
404 def flush(self, timeout=1.0):
418 def flush(self, timeout=1.0):
405 """Immediately processes all pending messages on the SUB channel.
419 """Immediately processes all pending messages on the SUB channel.
406
420
407 Callers should use this method to ensure that :method:`call_handlers`
421 Callers should use this method to ensure that :method:`call_handlers`
408 has been called for all messages that have been received on the
422 has been called for all messages that have been received on the
409 0MQ SUB socket of this channel.
423 0MQ SUB socket of this channel.
410
424
411 This method is thread safe.
425 This method is thread safe.
412
426
413 Parameters
427 Parameters
414 ----------
428 ----------
415 timeout : float, optional
429 timeout : float, optional
416 The maximum amount of time to spend flushing, in seconds. The
430 The maximum amount of time to spend flushing, in seconds. The
417 default is one second.
431 default is one second.
418 """
432 """
419 # We do the IOLoop callback process twice to ensure that the IOLoop
433 # We do the IOLoop callback process twice to ensure that the IOLoop
420 # gets to perform at least one full poll.
434 # gets to perform at least one full poll.
421 stop_time = time.time() + timeout
435 stop_time = time.time() + timeout
422 for i in xrange(2):
436 for i in xrange(2):
423 self._flushed = False
437 self._flushed = False
424 self.ioloop.add_callback(self._flush)
438 self.ioloop.add_callback(self._flush)
425 while not self._flushed and time.time() < stop_time:
439 while not self._flushed and time.time() < stop_time:
426 time.sleep(0.01)
440 time.sleep(0.01)
427
441
428 def _handle_events(self, socket, events):
442 def _handle_events(self, socket, events):
429 # Turn on and off POLLOUT depending on if we have made a request
443 # Turn on and off POLLOUT depending on if we have made a request
430 if events & POLLERR:
444 if events & POLLERR:
431 self._handle_err()
445 self._handle_err()
432 if events & POLLIN:
446 if events & POLLIN:
433 self._handle_recv()
447 self._handle_recv()
434
448
435 def _handle_err(self):
449 def _handle_err(self):
436 # We don't want to let this go silently, so eventually we should log.
450 # We don't want to let this go silently, so eventually we should log.
437 raise zmq.ZMQError()
451 raise zmq.ZMQError()
438
452
439 def _handle_recv(self):
453 def _handle_recv(self):
440 # Get all of the messages we can
454 # Get all of the messages we can
441 while True:
455 while True:
442 try:
456 try:
443 ident,msg = self.session.recv(self.socket)
457 ident,msg = self.session.recv(self.socket)
444 except zmq.ZMQError:
458 except zmq.ZMQError:
445 # Check the errno?
459 # Check the errno?
446 # Will this trigger POLLERR?
460 # Will this trigger POLLERR?
447 break
461 break
448 else:
462 else:
449 if msg is None:
463 if msg is None:
450 break
464 break
451 self.call_handlers(msg)
465 self.call_handlers(msg)
452
466
453 def _flush(self):
467 def _flush(self):
454 """Callback for :method:`self.flush`."""
468 """Callback for :method:`self.flush`."""
455 self._flushed = True
469 self._flushed = True
456
470
457
471
458 class RepSocketChannel(ZmqSocketChannel):
472 class RepSocketChannel(ZmqSocketChannel):
459 """A reply channel to handle raw_input requests that the kernel makes."""
473 """A reply channel to handle raw_input requests that the kernel makes."""
460
474
461 msg_queue = None
475 msg_queue = None
462
476
463 def __init__(self, context, session, address):
477 def __init__(self, context, session, address):
464 super(RepSocketChannel, self).__init__(context, session, address)
478 super(RepSocketChannel, self).__init__(context, session, address)
465 self.ioloop = ioloop.IOLoop()
479 self.ioloop = ioloop.IOLoop()
466 self.msg_queue = Queue()
480 self.msg_queue = Queue()
467
481
468 def run(self):
482 def run(self):
469 """The thread's main activity. Call start() instead."""
483 """The thread's main activity. Call start() instead."""
470 self.socket = self.context.socket(zmq.XREQ)
484 self.socket = self.context.socket(zmq.XREQ)
471 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
485 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
472 self.socket.connect('tcp://%s:%i' % self.address)
486 self.socket.connect('tcp://%s:%i' % self.address)
473 self.iostate = POLLERR|POLLIN
487 self.iostate = POLLERR|POLLIN
474 self.ioloop.add_handler(self.socket, self._handle_events,
488 self.ioloop.add_handler(self.socket, self._handle_events,
475 self.iostate)
489 self.iostate)
476 self.ioloop.start()
490 self._run_loop()
477
491
478 def stop(self):
492 def stop(self):
479 self.ioloop.stop()
493 self.ioloop.stop()
480 super(RepSocketChannel, self).stop()
494 super(RepSocketChannel, self).stop()
481
495
482 def call_handlers(self, msg):
496 def call_handlers(self, msg):
483 """This method is called in the ioloop thread when a message arrives.
497 """This method is called in the ioloop thread when a message arrives.
484
498
485 Subclasses should override this method to handle incoming messages.
499 Subclasses should override this method to handle incoming messages.
486 It is important to remember that this method is called in the thread
500 It is important to remember that this method is called in the thread
487 so that some logic must be done to ensure that the application leve
501 so that some logic must be done to ensure that the application leve
488 handlers are called in the application thread.
502 handlers are called in the application thread.
489 """
503 """
490 raise NotImplementedError('call_handlers must be defined in a subclass.')
504 raise NotImplementedError('call_handlers must be defined in a subclass.')
491
505
492 def input(self, string):
506 def input(self, string):
493 """Send a string of raw input to the kernel."""
507 """Send a string of raw input to the kernel."""
494 content = dict(value=string)
508 content = dict(value=string)
495 msg = self.session.msg('input_reply', content)
509 msg = self.session.msg('input_reply', content)
496 self._queue_reply(msg)
510 self._queue_reply(msg)
497
511
498 def _handle_events(self, socket, events):
512 def _handle_events(self, socket, events):
499 if events & POLLERR:
513 if events & POLLERR:
500 self._handle_err()
514 self._handle_err()
501 if events & POLLOUT:
515 if events & POLLOUT:
502 self._handle_send()
516 self._handle_send()
503 if events & POLLIN:
517 if events & POLLIN:
504 self._handle_recv()
518 self._handle_recv()
505
519
506 def _handle_recv(self):
520 def _handle_recv(self):
507 ident,msg = self.session.recv(self.socket, 0)
521 ident,msg = self.session.recv(self.socket, 0)
508 self.call_handlers(msg)
522 self.call_handlers(msg)
509
523
510 def _handle_send(self):
524 def _handle_send(self):
511 try:
525 try:
512 msg = self.msg_queue.get(False)
526 msg = self.msg_queue.get(False)
513 except Empty:
527 except Empty:
514 pass
528 pass
515 else:
529 else:
516 self.session.send(self.socket,msg)
530 self.session.send(self.socket,msg)
517 if self.msg_queue.empty():
531 if self.msg_queue.empty():
518 self.drop_io_state(POLLOUT)
532 self.drop_io_state(POLLOUT)
519
533
520 def _handle_err(self):
534 def _handle_err(self):
521 # We don't want to let this go silently, so eventually we should log.
535 # We don't want to let this go silently, so eventually we should log.
522 raise zmq.ZMQError()
536 raise zmq.ZMQError()
523
537
524 def _queue_reply(self, msg):
538 def _queue_reply(self, msg):
525 self.msg_queue.put(msg)
539 self.msg_queue.put(msg)
526 self.add_io_state(POLLOUT)
540 self.add_io_state(POLLOUT)
527
541
528
542
529 class HBSocketChannel(ZmqSocketChannel):
543 class HBSocketChannel(ZmqSocketChannel):
530 """The heartbeat channel which monitors the kernel heartbeat.
544 """The heartbeat channel which monitors the kernel heartbeat.
531
545
532 Note that the heartbeat channel is paused by default. As long as you start
546 Note that the heartbeat channel is paused by default. As long as you start
533 this channel, the kernel manager will ensure that it is paused and un-paused
547 this channel, the kernel manager will ensure that it is paused and un-paused
534 as appropriate.
548 as appropriate.
535 """
549 """
536
550
537 time_to_dead = 3.0
551 time_to_dead = 3.0
538 socket = None
552 socket = None
539 poller = None
553 poller = None
540 _running = None
554 _running = None
541 _pause = None
555 _pause = None
542
556
543 def __init__(self, context, session, address):
557 def __init__(self, context, session, address):
544 super(HBSocketChannel, self).__init__(context, session, address)
558 super(HBSocketChannel, self).__init__(context, session, address)
545 self._running = False
559 self._running = False
546 self._pause = True
560 self._pause = True
547
561
548 def _create_socket(self):
562 def _create_socket(self):
549 self.socket = self.context.socket(zmq.REQ)
563 self.socket = self.context.socket(zmq.REQ)
550 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
564 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
551 self.socket.connect('tcp://%s:%i' % self.address)
565 self.socket.connect('tcp://%s:%i' % self.address)
552 self.poller = zmq.Poller()
566 self.poller = zmq.Poller()
553 self.poller.register(self.socket, zmq.POLLIN)
567 self.poller.register(self.socket, zmq.POLLIN)
554
568
555 def run(self):
569 def run(self):
556 """The thread's main activity. Call start() instead."""
570 """The thread's main activity. Call start() instead."""
557 self._create_socket()
571 self._create_socket()
558 self._running = True
572 self._running = True
559 while self._running:
573 while self._running:
560 if self._pause:
574 if self._pause:
561 time.sleep(self.time_to_dead)
575 time.sleep(self.time_to_dead)
562 else:
576 else:
563 since_last_heartbeat = 0.0
577 since_last_heartbeat = 0.0
564 request_time = time.time()
578 request_time = time.time()
565 try:
579 try:
566 #io.rprint('Ping from HB channel') # dbg
580 #io.rprint('Ping from HB channel') # dbg
567 self.socket.send(b'ping')
581 self.socket.send(b'ping')
568 except zmq.ZMQError, e:
582 except zmq.ZMQError, e:
569 #io.rprint('*** HB Error:', e) # dbg
583 #io.rprint('*** HB Error:', e) # dbg
570 if e.errno == zmq.EFSM:
584 if e.errno == zmq.EFSM:
571 #io.rprint('sleep...', self.time_to_dead) # dbg
585 #io.rprint('sleep...', self.time_to_dead) # dbg
572 time.sleep(self.time_to_dead)
586 time.sleep(self.time_to_dead)
573 self._create_socket()
587 self._create_socket()
574 else:
588 else:
575 raise
589 raise
576 else:
590 else:
577 while True:
591 while True:
578 try:
592 try:
579 self.socket.recv(zmq.NOBLOCK)
593 self.socket.recv(zmq.NOBLOCK)
580 except zmq.ZMQError, e:
594 except zmq.ZMQError, e:
581 #io.rprint('*** HB Error 2:', e) # dbg
595 #io.rprint('*** HB Error 2:', e) # dbg
582 if e.errno == zmq.EAGAIN:
596 if e.errno == zmq.EAGAIN:
583 before_poll = time.time()
597 before_poll = time.time()
584 until_dead = self.time_to_dead - (before_poll -
598 until_dead = self.time_to_dead - (before_poll -
585 request_time)
599 request_time)
586
600
587 # When the return value of poll() is an empty
601 # When the return value of poll() is an empty
588 # list, that is when things have gone wrong
602 # list, that is when things have gone wrong
589 # (zeromq bug). As long as it is not an empty
603 # (zeromq bug). As long as it is not an empty
590 # list, poll is working correctly even if it
604 # list, poll is working correctly even if it
591 # returns quickly. Note: poll timeout is in
605 # returns quickly. Note: poll timeout is in
592 # milliseconds.
606 # milliseconds.
593 if until_dead > 0.0:
607 if until_dead > 0.0:
608 while True:
609 try:
594 self.poller.poll(1000 * until_dead)
610 self.poller.poll(1000 * until_dead)
611 except zmq.ZMQError as e:
612 if e.errno == errno.EINTR:
613 continue
614 else:
615 raise
616 else:
617 break
595
618
596 since_last_heartbeat = time.time()-request_time
619 since_last_heartbeat = time.time()-request_time
597 if since_last_heartbeat > self.time_to_dead:
620 if since_last_heartbeat > self.time_to_dead:
598 self.call_handlers(since_last_heartbeat)
621 self.call_handlers(since_last_heartbeat)
599 break
622 break
600 else:
623 else:
601 # FIXME: We should probably log this instead.
624 # FIXME: We should probably log this instead.
602 raise
625 raise
603 else:
626 else:
604 until_dead = self.time_to_dead - (time.time() -
627 until_dead = self.time_to_dead - (time.time() -
605 request_time)
628 request_time)
606 if until_dead > 0.0:
629 if until_dead > 0.0:
607 #io.rprint('sleep...', self.time_to_dead) # dbg
630 #io.rprint('sleep...', self.time_to_dead) # dbg
608 time.sleep(until_dead)
631 time.sleep(until_dead)
609 break
632 break
610
633
611 def pause(self):
634 def pause(self):
612 """Pause the heartbeat."""
635 """Pause the heartbeat."""
613 self._pause = True
636 self._pause = True
614
637
615 def unpause(self):
638 def unpause(self):
616 """Unpause the heartbeat."""
639 """Unpause the heartbeat."""
617 self._pause = False
640 self._pause = False
618
641
619 def is_beating(self):
642 def is_beating(self):
620 """Is the heartbeat running and not paused."""
643 """Is the heartbeat running and not paused."""
621 if self.is_alive() and not self._pause:
644 if self.is_alive() and not self._pause:
622 return True
645 return True
623 else:
646 else:
624 return False
647 return False
625
648
626 def stop(self):
649 def stop(self):
627 self._running = False
650 self._running = False
628 super(HBSocketChannel, self).stop()
651 super(HBSocketChannel, self).stop()
629
652
630 def call_handlers(self, since_last_heartbeat):
653 def call_handlers(self, since_last_heartbeat):
631 """This method is called in the ioloop thread when a message arrives.
654 """This method is called in the ioloop thread when a message arrives.
632
655
633 Subclasses should override this method to handle incoming messages.
656 Subclasses should override this method to handle incoming messages.
634 It is important to remember that this method is called in the thread
657 It is important to remember that this method is called in the thread
635 so that some logic must be done to ensure that the application leve
658 so that some logic must be done to ensure that the application leve
636 handlers are called in the application thread.
659 handlers are called in the application thread.
637 """
660 """
638 raise NotImplementedError('call_handlers must be defined in a subclass.')
661 raise NotImplementedError('call_handlers must be defined in a subclass.')
639
662
640
663
641 #-----------------------------------------------------------------------------
664 #-----------------------------------------------------------------------------
642 # Main kernel manager class
665 # Main kernel manager class
643 #-----------------------------------------------------------------------------
666 #-----------------------------------------------------------------------------
644
667
645 class KernelManager(HasTraits):
668 class KernelManager(HasTraits):
646 """ Manages a kernel for a frontend.
669 """ Manages a kernel for a frontend.
647
670
648 The SUB channel is for the frontend to receive messages published by the
671 The SUB channel is for the frontend to receive messages published by the
649 kernel.
672 kernel.
650
673
651 The REQ channel is for the frontend to make requests of the kernel.
674 The REQ channel is for the frontend to make requests of the kernel.
652
675
653 The REP channel is for the kernel to request stdin (raw_input) from the
676 The REP channel is for the kernel to request stdin (raw_input) from the
654 frontend.
677 frontend.
655 """
678 """
656 # The PyZMQ Context to use for communication with the kernel.
679 # The PyZMQ Context to use for communication with the kernel.
657 context = Instance(zmq.Context,(),{})
680 context = Instance(zmq.Context,(),{})
658
681
659 # The Session to use for communication with the kernel.
682 # The Session to use for communication with the kernel.
660 session = Instance(Session,(),{})
683 session = Instance(Session,(),{})
661
684
662 # The kernel process with which the KernelManager is communicating.
685 # The kernel process with which the KernelManager is communicating.
663 kernel = Instance(Popen)
686 kernel = Instance(Popen)
664
687
665 # The addresses for the communication channels.
688 # The addresses for the communication channels.
666 xreq_address = TCPAddress((LOCALHOST, 0))
689 xreq_address = TCPAddress((LOCALHOST, 0))
667 sub_address = TCPAddress((LOCALHOST, 0))
690 sub_address = TCPAddress((LOCALHOST, 0))
668 rep_address = TCPAddress((LOCALHOST, 0))
691 rep_address = TCPAddress((LOCALHOST, 0))
669 hb_address = TCPAddress((LOCALHOST, 0))
692 hb_address = TCPAddress((LOCALHOST, 0))
670
693
671 # The classes to use for the various channels.
694 # The classes to use for the various channels.
672 xreq_channel_class = Type(XReqSocketChannel)
695 xreq_channel_class = Type(XReqSocketChannel)
673 sub_channel_class = Type(SubSocketChannel)
696 sub_channel_class = Type(SubSocketChannel)
674 rep_channel_class = Type(RepSocketChannel)
697 rep_channel_class = Type(RepSocketChannel)
675 hb_channel_class = Type(HBSocketChannel)
698 hb_channel_class = Type(HBSocketChannel)
676
699
677 # Protected traits.
700 # Protected traits.
678 _launch_args = Any
701 _launch_args = Any
679 _xreq_channel = Any
702 _xreq_channel = Any
680 _sub_channel = Any
703 _sub_channel = Any
681 _rep_channel = Any
704 _rep_channel = Any
682 _hb_channel = Any
705 _hb_channel = Any
683
706
684 def __init__(self, **kwargs):
707 def __init__(self, **kwargs):
685 super(KernelManager, self).__init__(**kwargs)
708 super(KernelManager, self).__init__(**kwargs)
686 # Uncomment this to try closing the context.
709 # Uncomment this to try closing the context.
687 # atexit.register(self.context.close)
710 # atexit.register(self.context.close)
688
711
689 #--------------------------------------------------------------------------
712 #--------------------------------------------------------------------------
690 # Channel management methods:
713 # Channel management methods:
691 #--------------------------------------------------------------------------
714 #--------------------------------------------------------------------------
692
715
693 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
716 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
694 """Starts the channels for this kernel.
717 """Starts the channels for this kernel.
695
718
696 This will create the channels if they do not exist and then start
719 This will create the channels if they do not exist and then start
697 them. If port numbers of 0 are being used (random ports) then you
720 them. If port numbers of 0 are being used (random ports) then you
698 must first call :method:`start_kernel`. If the channels have been
721 must first call :method:`start_kernel`. If the channels have been
699 stopped and you call this, :class:`RuntimeError` will be raised.
722 stopped and you call this, :class:`RuntimeError` will be raised.
700 """
723 """
701 if xreq:
724 if xreq:
702 self.xreq_channel.start()
725 self.xreq_channel.start()
703 if sub:
726 if sub:
704 self.sub_channel.start()
727 self.sub_channel.start()
705 if rep:
728 if rep:
706 self.rep_channel.start()
729 self.rep_channel.start()
707 if hb:
730 if hb:
708 self.hb_channel.start()
731 self.hb_channel.start()
709
732
710 def stop_channels(self):
733 def stop_channels(self):
711 """Stops all the running channels for this kernel.
734 """Stops all the running channels for this kernel.
712 """
735 """
713 if self.xreq_channel.is_alive():
736 if self.xreq_channel.is_alive():
714 self.xreq_channel.stop()
737 self.xreq_channel.stop()
715 if self.sub_channel.is_alive():
738 if self.sub_channel.is_alive():
716 self.sub_channel.stop()
739 self.sub_channel.stop()
717 if self.rep_channel.is_alive():
740 if self.rep_channel.is_alive():
718 self.rep_channel.stop()
741 self.rep_channel.stop()
719 if self.hb_channel.is_alive():
742 if self.hb_channel.is_alive():
720 self.hb_channel.stop()
743 self.hb_channel.stop()
721
744
722 @property
745 @property
723 def channels_running(self):
746 def channels_running(self):
724 """Are any of the channels created and running?"""
747 """Are any of the channels created and running?"""
725 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
748 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
726 self.rep_channel.is_alive() or self.hb_channel.is_alive())
749 self.rep_channel.is_alive() or self.hb_channel.is_alive())
727
750
728 #--------------------------------------------------------------------------
751 #--------------------------------------------------------------------------
729 # Kernel process management methods:
752 # Kernel process management methods:
730 #--------------------------------------------------------------------------
753 #--------------------------------------------------------------------------
731
754
732 def start_kernel(self, **kw):
755 def start_kernel(self, **kw):
733 """Starts a kernel process and configures the manager to use it.
756 """Starts a kernel process and configures the manager to use it.
734
757
735 If random ports (port=0) are being used, this method must be called
758 If random ports (port=0) are being used, this method must be called
736 before the channels are created.
759 before the channels are created.
737
760
738 Parameters:
761 Parameters:
739 -----------
762 -----------
740 ipython : bool, optional (default True)
763 ipython : bool, optional (default True)
741 Whether to use an IPython kernel instead of a plain Python kernel.
764 Whether to use an IPython kernel instead of a plain Python kernel.
742
765
743 **kw : optional
766 **kw : optional
744 See respective options for IPython and Python kernels.
767 See respective options for IPython and Python kernels.
745 """
768 """
746 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
769 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
747 self.rep_address, self.hb_address
770 self.rep_address, self.hb_address
748 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
771 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
749 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
772 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
750 raise RuntimeError("Can only launch a kernel on a local interface. "
773 raise RuntimeError("Can only launch a kernel on a local interface. "
751 "Make sure that the '*_address' attributes are "
774 "Make sure that the '*_address' attributes are "
752 "configured properly. "
775 "configured properly. "
753 "Currently valid addresses are: %s"%LOCAL_IPS
776 "Currently valid addresses are: %s"%LOCAL_IPS
754 )
777 )
755
778
756 self._launch_args = kw.copy()
779 self._launch_args = kw.copy()
757 if kw.pop('ipython', True):
780 if kw.pop('ipython', True):
758 from ipkernel import launch_kernel
781 from ipkernel import launch_kernel
759 else:
782 else:
760 from pykernel import launch_kernel
783 from pykernel import launch_kernel
761 self.kernel, xrep, pub, req, _hb = launch_kernel(
784 self.kernel, xrep, pub, req, _hb = launch_kernel(
762 xrep_port=xreq[1], pub_port=sub[1],
785 xrep_port=xreq[1], pub_port=sub[1],
763 req_port=rep[1], hb_port=hb[1], **kw)
786 req_port=rep[1], hb_port=hb[1], **kw)
764 self.xreq_address = (xreq[0], xrep)
787 self.xreq_address = (xreq[0], xrep)
765 self.sub_address = (sub[0], pub)
788 self.sub_address = (sub[0], pub)
766 self.rep_address = (rep[0], req)
789 self.rep_address = (rep[0], req)
767 self.hb_address = (hb[0], _hb)
790 self.hb_address = (hb[0], _hb)
768
791
769 def shutdown_kernel(self, restart=False):
792 def shutdown_kernel(self, restart=False):
770 """ Attempts to the stop the kernel process cleanly. If the kernel
793 """ Attempts to the stop the kernel process cleanly. If the kernel
771 cannot be stopped, it is killed, if possible.
794 cannot be stopped, it is killed, if possible.
772 """
795 """
773 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
796 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
774 if sys.platform == 'win32':
797 if sys.platform == 'win32':
775 self.kill_kernel()
798 self.kill_kernel()
776 return
799 return
777
800
778 # Pause the heart beat channel if it exists.
801 # Pause the heart beat channel if it exists.
779 if self._hb_channel is not None:
802 if self._hb_channel is not None:
780 self._hb_channel.pause()
803 self._hb_channel.pause()
781
804
782 # Don't send any additional kernel kill messages immediately, to give
805 # Don't send any additional kernel kill messages immediately, to give
783 # the kernel a chance to properly execute shutdown actions. Wait for at
806 # the kernel a chance to properly execute shutdown actions. Wait for at
784 # most 1s, checking every 0.1s.
807 # most 1s, checking every 0.1s.
785 self.xreq_channel.shutdown(restart=restart)
808 self.xreq_channel.shutdown(restart=restart)
786 for i in range(10):
809 for i in range(10):
787 if self.is_alive:
810 if self.is_alive:
788 time.sleep(0.1)
811 time.sleep(0.1)
789 else:
812 else:
790 break
813 break
791 else:
814 else:
792 # OK, we've waited long enough.
815 # OK, we've waited long enough.
793 if self.has_kernel:
816 if self.has_kernel:
794 self.kill_kernel()
817 self.kill_kernel()
795
818
796 def restart_kernel(self, now=False, **kw):
819 def restart_kernel(self, now=False, **kw):
797 """Restarts a kernel with the arguments that were used to launch it.
820 """Restarts a kernel with the arguments that were used to launch it.
798
821
799 If the old kernel was launched with random ports, the same ports will be
822 If the old kernel was launched with random ports, the same ports will be
800 used for the new kernel.
823 used for the new kernel.
801
824
802 Parameters
825 Parameters
803 ----------
826 ----------
804 now : bool, optional
827 now : bool, optional
805 If True, the kernel is forcefully restarted *immediately*, without
828 If True, the kernel is forcefully restarted *immediately*, without
806 having a chance to do any cleanup action. Otherwise the kernel is
829 having a chance to do any cleanup action. Otherwise the kernel is
807 given 1s to clean up before a forceful restart is issued.
830 given 1s to clean up before a forceful restart is issued.
808
831
809 In all cases the kernel is restarted, the only difference is whether
832 In all cases the kernel is restarted, the only difference is whether
810 it is given a chance to perform a clean shutdown or not.
833 it is given a chance to perform a clean shutdown or not.
811
834
812 **kw : optional
835 **kw : optional
813 Any options specified here will replace those used to launch the
836 Any options specified here will replace those used to launch the
814 kernel.
837 kernel.
815 """
838 """
816 if self._launch_args is None:
839 if self._launch_args is None:
817 raise RuntimeError("Cannot restart the kernel. "
840 raise RuntimeError("Cannot restart the kernel. "
818 "No previous call to 'start_kernel'.")
841 "No previous call to 'start_kernel'.")
819 else:
842 else:
820 # Stop currently running kernel.
843 # Stop currently running kernel.
821 if self.has_kernel:
844 if self.has_kernel:
822 if now:
845 if now:
823 self.kill_kernel()
846 self.kill_kernel()
824 else:
847 else:
825 self.shutdown_kernel(restart=True)
848 self.shutdown_kernel(restart=True)
826
849
827 # Start new kernel.
850 # Start new kernel.
828 self._launch_args.update(kw)
851 self._launch_args.update(kw)
829 self.start_kernel(**self._launch_args)
852 self.start_kernel(**self._launch_args)
830
853
831 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
854 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
832 # unless there is some delay here.
855 # unless there is some delay here.
833 if sys.platform == 'win32':
856 if sys.platform == 'win32':
834 time.sleep(0.2)
857 time.sleep(0.2)
835
858
836 @property
859 @property
837 def has_kernel(self):
860 def has_kernel(self):
838 """Returns whether a kernel process has been specified for the kernel
861 """Returns whether a kernel process has been specified for the kernel
839 manager.
862 manager.
840 """
863 """
841 return self.kernel is not None
864 return self.kernel is not None
842
865
843 def kill_kernel(self):
866 def kill_kernel(self):
844 """ Kill the running kernel. """
867 """ Kill the running kernel. """
845 if self.has_kernel:
868 if self.has_kernel:
846 # Pause the heart beat channel if it exists.
869 # Pause the heart beat channel if it exists.
847 if self._hb_channel is not None:
870 if self._hb_channel is not None:
848 self._hb_channel.pause()
871 self._hb_channel.pause()
849
872
850 # Attempt to kill the kernel.
873 # Attempt to kill the kernel.
851 try:
874 try:
852 self.kernel.kill()
875 self.kernel.kill()
853 except OSError, e:
876 except OSError, e:
854 # In Windows, we will get an Access Denied error if the process
877 # In Windows, we will get an Access Denied error if the process
855 # has already terminated. Ignore it.
878 # has already terminated. Ignore it.
856 if sys.platform == 'win32':
879 if sys.platform == 'win32':
857 if e.winerror != 5:
880 if e.winerror != 5:
858 raise
881 raise
859 # On Unix, we may get an ESRCH error if the process has already
882 # On Unix, we may get an ESRCH error if the process has already
860 # terminated. Ignore it.
883 # terminated. Ignore it.
861 else:
884 else:
862 from errno import ESRCH
885 from errno import ESRCH
863 if e.errno != ESRCH:
886 if e.errno != ESRCH:
864 raise
887 raise
865 self.kernel = None
888 self.kernel = None
866 else:
889 else:
867 raise RuntimeError("Cannot kill kernel. No kernel is running!")
890 raise RuntimeError("Cannot kill kernel. No kernel is running!")
868
891
869 def interrupt_kernel(self):
892 def interrupt_kernel(self):
870 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
893 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
871 well supported on all platforms.
894 well supported on all platforms.
872 """
895 """
873 if self.has_kernel:
896 if self.has_kernel:
874 if sys.platform == 'win32':
897 if sys.platform == 'win32':
875 from parentpoller import ParentPollerWindows as Poller
898 from parentpoller import ParentPollerWindows as Poller
876 Poller.send_interrupt(self.kernel.win32_interrupt_event)
899 Poller.send_interrupt(self.kernel.win32_interrupt_event)
877 else:
900 else:
878 self.kernel.send_signal(signal.SIGINT)
901 self.kernel.send_signal(signal.SIGINT)
879 else:
902 else:
880 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
903 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
881
904
882 def signal_kernel(self, signum):
905 def signal_kernel(self, signum):
883 """ Sends a signal to the kernel. Note that since only SIGTERM is
906 """ Sends a signal to the kernel. Note that since only SIGTERM is
884 supported on Windows, this function is only useful on Unix systems.
907 supported on Windows, this function is only useful on Unix systems.
885 """
908 """
886 if self.has_kernel:
909 if self.has_kernel:
887 self.kernel.send_signal(signum)
910 self.kernel.send_signal(signum)
888 else:
911 else:
889 raise RuntimeError("Cannot signal kernel. No kernel is running!")
912 raise RuntimeError("Cannot signal kernel. No kernel is running!")
890
913
891 @property
914 @property
892 def is_alive(self):
915 def is_alive(self):
893 """Is the kernel process still running?"""
916 """Is the kernel process still running?"""
894 # FIXME: not using a heartbeat means this method is broken for any
917 # FIXME: not using a heartbeat means this method is broken for any
895 # remote kernel, it's only capable of handling local kernels.
918 # remote kernel, it's only capable of handling local kernels.
896 if self.has_kernel:
919 if self.has_kernel:
897 if self.kernel.poll() is None:
920 if self.kernel.poll() is None:
898 return True
921 return True
899 else:
922 else:
900 return False
923 return False
901 else:
924 else:
902 # We didn't start the kernel with this KernelManager so we don't
925 # We didn't start the kernel with this KernelManager so we don't
903 # know if it is running. We should use a heartbeat for this case.
926 # know if it is running. We should use a heartbeat for this case.
904 return True
927 return True
905
928
906 #--------------------------------------------------------------------------
929 #--------------------------------------------------------------------------
907 # Channels used for communication with the kernel:
930 # Channels used for communication with the kernel:
908 #--------------------------------------------------------------------------
931 #--------------------------------------------------------------------------
909
932
910 @property
933 @property
911 def xreq_channel(self):
934 def xreq_channel(self):
912 """Get the REQ socket channel object to make requests of the kernel."""
935 """Get the REQ socket channel object to make requests of the kernel."""
913 if self._xreq_channel is None:
936 if self._xreq_channel is None:
914 self._xreq_channel = self.xreq_channel_class(self.context,
937 self._xreq_channel = self.xreq_channel_class(self.context,
915 self.session,
938 self.session,
916 self.xreq_address)
939 self.xreq_address)
917 return self._xreq_channel
940 return self._xreq_channel
918
941
919 @property
942 @property
920 def sub_channel(self):
943 def sub_channel(self):
921 """Get the SUB socket channel object."""
944 """Get the SUB socket channel object."""
922 if self._sub_channel is None:
945 if self._sub_channel is None:
923 self._sub_channel = self.sub_channel_class(self.context,
946 self._sub_channel = self.sub_channel_class(self.context,
924 self.session,
947 self.session,
925 self.sub_address)
948 self.sub_address)
926 return self._sub_channel
949 return self._sub_channel
927
950
928 @property
951 @property
929 def rep_channel(self):
952 def rep_channel(self):
930 """Get the REP socket channel object to handle stdin (raw_input)."""
953 """Get the REP socket channel object to handle stdin (raw_input)."""
931 if self._rep_channel is None:
954 if self._rep_channel is None:
932 self._rep_channel = self.rep_channel_class(self.context,
955 self._rep_channel = self.rep_channel_class(self.context,
933 self.session,
956 self.session,
934 self.rep_address)
957 self.rep_address)
935 return self._rep_channel
958 return self._rep_channel
936
959
937 @property
960 @property
938 def hb_channel(self):
961 def hb_channel(self):
939 """Get the heartbeat socket channel object to check that the
962 """Get the heartbeat socket channel object to check that the
940 kernel is alive."""
963 kernel is alive."""
941 if self._hb_channel is None:
964 if self._hb_channel is None:
942 self._hb_channel = self.hb_channel_class(self.context,
965 self._hb_channel = self.hb_channel_class(self.context,
943 self.session,
966 self.session,
944 self.hb_address)
967 self.hb_address)
945 return self._hb_channel
968 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now