##// END OF EJS Templates
add KernelManager.load_connection_file method...
MinRK -
Show More
@@ -1,1028 +1,1043
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import errno
19 import errno
20 from Queue import Queue, Empty
20 from Queue import Queue, Empty
21 from subprocess import Popen
21 from subprocess import Popen
22 import os
22 import os
23 import signal
23 import signal
24 import sys
24 import sys
25 from threading import Thread
25 from threading import Thread
26 import time
26 import time
27
27
28 # System library imports.
28 # System library imports.
29 import zmq
29 import zmq
30 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq.eventloop import ioloop
31 from zmq.eventloop import ioloop
32 from zmq.utils import jsonapi as json
32
33
33 # Local imports.
34 # Local imports.
34 from IPython.config.loader import Config
35 from IPython.config.loader import Config
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.traitlets import (
37 from IPython.utils.traitlets import (
37 HasTraits, Any, Instance, Type, Unicode, Int, Bool
38 HasTraits, Any, Instance, Type, Unicode, Int, Bool
38 )
39 )
40 from IPython.utils.py3compat import str_to_bytes
39 from IPython.zmq.entry_point import write_connection_file
41 from IPython.zmq.entry_point import write_connection_file
40 from session import Session
42 from session import Session
41
43
42 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
43 # Constants and exceptions
45 # Constants and exceptions
44 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
45
47
46 class InvalidPortNumber(Exception):
48 class InvalidPortNumber(Exception):
47 pass
49 pass
48
50
49 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
50 # Utility functions
52 # Utility functions
51 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
52
54
53 # some utilities to validate message structure, these might get moved elsewhere
55 # some utilities to validate message structure, these might get moved elsewhere
54 # if they prove to have more generic utility
56 # if they prove to have more generic utility
55
57
56 def validate_string_list(lst):
58 def validate_string_list(lst):
57 """Validate that the input is a list of strings.
59 """Validate that the input is a list of strings.
58
60
59 Raises ValueError if not."""
61 Raises ValueError if not."""
60 if not isinstance(lst, list):
62 if not isinstance(lst, list):
61 raise ValueError('input %r must be a list' % lst)
63 raise ValueError('input %r must be a list' % lst)
62 for x in lst:
64 for x in lst:
63 if not isinstance(x, basestring):
65 if not isinstance(x, basestring):
64 raise ValueError('element %r in list must be a string' % x)
66 raise ValueError('element %r in list must be a string' % x)
65
67
66
68
67 def validate_string_dict(dct):
69 def validate_string_dict(dct):
68 """Validate that the input is a dict with string keys and values.
70 """Validate that the input is a dict with string keys and values.
69
71
70 Raises ValueError if not."""
72 Raises ValueError if not."""
71 for k,v in dct.iteritems():
73 for k,v in dct.iteritems():
72 if not isinstance(k, basestring):
74 if not isinstance(k, basestring):
73 raise ValueError('key %r in dict must be a string' % k)
75 raise ValueError('key %r in dict must be a string' % k)
74 if not isinstance(v, basestring):
76 if not isinstance(v, basestring):
75 raise ValueError('value %r in dict must be a string' % v)
77 raise ValueError('value %r in dict must be a string' % v)
76
78
77
79
78 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
79 # ZMQ Socket Channel classes
81 # ZMQ Socket Channel classes
80 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
81
83
82 class ZMQSocketChannel(Thread):
84 class ZMQSocketChannel(Thread):
83 """The base class for the channels that use ZMQ sockets.
85 """The base class for the channels that use ZMQ sockets.
84 """
86 """
85 context = None
87 context = None
86 session = None
88 session = None
87 socket = None
89 socket = None
88 ioloop = None
90 ioloop = None
89 iostate = None
91 iostate = None
90 _address = None
92 _address = None
91
93
92 def __init__(self, context, session, address):
94 def __init__(self, context, session, address):
93 """Create a channel
95 """Create a channel
94
96
95 Parameters
97 Parameters
96 ----------
98 ----------
97 context : :class:`zmq.Context`
99 context : :class:`zmq.Context`
98 The ZMQ context to use.
100 The ZMQ context to use.
99 session : :class:`session.Session`
101 session : :class:`session.Session`
100 The session to use.
102 The session to use.
101 address : tuple
103 address : tuple
102 Standard (ip, port) tuple that the kernel is listening on.
104 Standard (ip, port) tuple that the kernel is listening on.
103 """
105 """
104 super(ZMQSocketChannel, self).__init__()
106 super(ZMQSocketChannel, self).__init__()
105 self.daemon = True
107 self.daemon = True
106
108
107 self.context = context
109 self.context = context
108 self.session = session
110 self.session = session
109 if address[1] == 0:
111 if address[1] == 0:
110 message = 'The port number for a channel cannot be 0.'
112 message = 'The port number for a channel cannot be 0.'
111 raise InvalidPortNumber(message)
113 raise InvalidPortNumber(message)
112 self._address = address
114 self._address = address
113
115
114 def _run_loop(self):
116 def _run_loop(self):
115 """Run my loop, ignoring EINTR events in the poller"""
117 """Run my loop, ignoring EINTR events in the poller"""
116 while True:
118 while True:
117 try:
119 try:
118 self.ioloop.start()
120 self.ioloop.start()
119 except zmq.ZMQError as e:
121 except zmq.ZMQError as e:
120 if e.errno == errno.EINTR:
122 if e.errno == errno.EINTR:
121 continue
123 continue
122 else:
124 else:
123 raise
125 raise
124 else:
126 else:
125 break
127 break
126
128
127 def stop(self):
129 def stop(self):
128 """Stop the channel's activity.
130 """Stop the channel's activity.
129
131
130 This calls :method:`Thread.join` and returns when the thread
132 This calls :method:`Thread.join` and returns when the thread
131 terminates. :class:`RuntimeError` will be raised if
133 terminates. :class:`RuntimeError` will be raised if
132 :method:`self.start` is called again.
134 :method:`self.start` is called again.
133 """
135 """
134 self.join()
136 self.join()
135
137
136 @property
138 @property
137 def address(self):
139 def address(self):
138 """Get the channel's address as an (ip, port) tuple.
140 """Get the channel's address as an (ip, port) tuple.
139
141
140 By the default, the address is (localhost, 0), where 0 means a random
142 By the default, the address is (localhost, 0), where 0 means a random
141 port.
143 port.
142 """
144 """
143 return self._address
145 return self._address
144
146
145 def add_io_state(self, state):
147 def add_io_state(self, state):
146 """Add IO state to the eventloop.
148 """Add IO state to the eventloop.
147
149
148 Parameters
150 Parameters
149 ----------
151 ----------
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
152 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 The IO state flag to set.
153 The IO state flag to set.
152
154
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
155 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 """
156 """
155 def add_io_state_callback():
157 def add_io_state_callback():
156 if not self.iostate & state:
158 if not self.iostate & state:
157 self.iostate = self.iostate | state
159 self.iostate = self.iostate | state
158 self.ioloop.update_handler(self.socket, self.iostate)
160 self.ioloop.update_handler(self.socket, self.iostate)
159 self.ioloop.add_callback(add_io_state_callback)
161 self.ioloop.add_callback(add_io_state_callback)
160
162
161 def drop_io_state(self, state):
163 def drop_io_state(self, state):
162 """Drop IO state from the eventloop.
164 """Drop IO state from the eventloop.
163
165
164 Parameters
166 Parameters
165 ----------
167 ----------
166 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
168 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
167 The IO state flag to set.
169 The IO state flag to set.
168
170
169 This is thread safe as it uses the thread safe IOLoop.add_callback.
171 This is thread safe as it uses the thread safe IOLoop.add_callback.
170 """
172 """
171 def drop_io_state_callback():
173 def drop_io_state_callback():
172 if self.iostate & state:
174 if self.iostate & state:
173 self.iostate = self.iostate & (~state)
175 self.iostate = self.iostate & (~state)
174 self.ioloop.update_handler(self.socket, self.iostate)
176 self.ioloop.update_handler(self.socket, self.iostate)
175 self.ioloop.add_callback(drop_io_state_callback)
177 self.ioloop.add_callback(drop_io_state_callback)
176
178
177
179
178 class ShellSocketChannel(ZMQSocketChannel):
180 class ShellSocketChannel(ZMQSocketChannel):
179 """The XREQ channel for issues request/replies to the kernel.
181 """The XREQ channel for issues request/replies to the kernel.
180 """
182 """
181
183
182 command_queue = None
184 command_queue = None
183 # flag for whether execute requests should be allowed to call raw_input:
185 # flag for whether execute requests should be allowed to call raw_input:
184 allow_stdin = True
186 allow_stdin = True
185
187
186 def __init__(self, context, session, address):
188 def __init__(self, context, session, address):
187 super(ShellSocketChannel, self).__init__(context, session, address)
189 super(ShellSocketChannel, self).__init__(context, session, address)
188 self.command_queue = Queue()
190 self.command_queue = Queue()
189 self.ioloop = ioloop.IOLoop()
191 self.ioloop = ioloop.IOLoop()
190
192
191 def run(self):
193 def run(self):
192 """The thread's main activity. Call start() instead."""
194 """The thread's main activity. Call start() instead."""
193 self.socket = self.context.socket(zmq.DEALER)
195 self.socket = self.context.socket(zmq.DEALER)
194 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
196 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
195 self.socket.connect('tcp://%s:%i' % self.address)
197 self.socket.connect('tcp://%s:%i' % self.address)
196 self.iostate = POLLERR|POLLIN
198 self.iostate = POLLERR|POLLIN
197 self.ioloop.add_handler(self.socket, self._handle_events,
199 self.ioloop.add_handler(self.socket, self._handle_events,
198 self.iostate)
200 self.iostate)
199 self._run_loop()
201 self._run_loop()
200
202
201 def stop(self):
203 def stop(self):
202 self.ioloop.stop()
204 self.ioloop.stop()
203 super(ShellSocketChannel, self).stop()
205 super(ShellSocketChannel, self).stop()
204
206
205 def call_handlers(self, msg):
207 def call_handlers(self, msg):
206 """This method is called in the ioloop thread when a message arrives.
208 """This method is called in the ioloop thread when a message arrives.
207
209
208 Subclasses should override this method to handle incoming messages.
210 Subclasses should override this method to handle incoming messages.
209 It is important to remember that this method is called in the thread
211 It is important to remember that this method is called in the thread
210 so that some logic must be done to ensure that the application leve
212 so that some logic must be done to ensure that the application leve
211 handlers are called in the application thread.
213 handlers are called in the application thread.
212 """
214 """
213 raise NotImplementedError('call_handlers must be defined in a subclass.')
215 raise NotImplementedError('call_handlers must be defined in a subclass.')
214
216
215 def execute(self, code, silent=False,
217 def execute(self, code, silent=False,
216 user_variables=None, user_expressions=None, allow_stdin=None):
218 user_variables=None, user_expressions=None, allow_stdin=None):
217 """Execute code in the kernel.
219 """Execute code in the kernel.
218
220
219 Parameters
221 Parameters
220 ----------
222 ----------
221 code : str
223 code : str
222 A string of Python code.
224 A string of Python code.
223
225
224 silent : bool, optional (default False)
226 silent : bool, optional (default False)
225 If set, the kernel will execute the code as quietly possible.
227 If set, the kernel will execute the code as quietly possible.
226
228
227 user_variables : list, optional
229 user_variables : list, optional
228 A list of variable names to pull from the user's namespace. They
230 A list of variable names to pull from the user's namespace. They
229 will come back as a dict with these names as keys and their
231 will come back as a dict with these names as keys and their
230 :func:`repr` as values.
232 :func:`repr` as values.
231
233
232 user_expressions : dict, optional
234 user_expressions : dict, optional
233 A dict with string keys and to pull from the user's
235 A dict with string keys and to pull from the user's
234 namespace. They will come back as a dict with these names as keys
236 namespace. They will come back as a dict with these names as keys
235 and their :func:`repr` as values.
237 and their :func:`repr` as values.
236
238
237 allow_stdin : bool, optional
239 allow_stdin : bool, optional
238 Flag for
240 Flag for
239 A dict with string keys and to pull from the user's
241 A dict with string keys and to pull from the user's
240 namespace. They will come back as a dict with these names as keys
242 namespace. They will come back as a dict with these names as keys
241 and their :func:`repr` as values.
243 and their :func:`repr` as values.
242
244
243 Returns
245 Returns
244 -------
246 -------
245 The msg_id of the message sent.
247 The msg_id of the message sent.
246 """
248 """
247 if user_variables is None:
249 if user_variables is None:
248 user_variables = []
250 user_variables = []
249 if user_expressions is None:
251 if user_expressions is None:
250 user_expressions = {}
252 user_expressions = {}
251 if allow_stdin is None:
253 if allow_stdin is None:
252 allow_stdin = self.allow_stdin
254 allow_stdin = self.allow_stdin
253
255
254
256
255 # Don't waste network traffic if inputs are invalid
257 # Don't waste network traffic if inputs are invalid
256 if not isinstance(code, basestring):
258 if not isinstance(code, basestring):
257 raise ValueError('code %r must be a string' % code)
259 raise ValueError('code %r must be a string' % code)
258 validate_string_list(user_variables)
260 validate_string_list(user_variables)
259 validate_string_dict(user_expressions)
261 validate_string_dict(user_expressions)
260
262
261 # Create class for content/msg creation. Related to, but possibly
263 # Create class for content/msg creation. Related to, but possibly
262 # not in Session.
264 # not in Session.
263 content = dict(code=code, silent=silent,
265 content = dict(code=code, silent=silent,
264 user_variables=user_variables,
266 user_variables=user_variables,
265 user_expressions=user_expressions,
267 user_expressions=user_expressions,
266 allow_stdin=allow_stdin,
268 allow_stdin=allow_stdin,
267 )
269 )
268 msg = self.session.msg('execute_request', content)
270 msg = self.session.msg('execute_request', content)
269 self._queue_request(msg)
271 self._queue_request(msg)
270 return msg['header']['msg_id']
272 return msg['header']['msg_id']
271
273
272 def complete(self, text, line, cursor_pos, block=None):
274 def complete(self, text, line, cursor_pos, block=None):
273 """Tab complete text in the kernel's namespace.
275 """Tab complete text in the kernel's namespace.
274
276
275 Parameters
277 Parameters
276 ----------
278 ----------
277 text : str
279 text : str
278 The text to complete.
280 The text to complete.
279 line : str
281 line : str
280 The full line of text that is the surrounding context for the
282 The full line of text that is the surrounding context for the
281 text to complete.
283 text to complete.
282 cursor_pos : int
284 cursor_pos : int
283 The position of the cursor in the line where the completion was
285 The position of the cursor in the line where the completion was
284 requested.
286 requested.
285 block : str, optional
287 block : str, optional
286 The full block of code in which the completion is being requested.
288 The full block of code in which the completion is being requested.
287
289
288 Returns
290 Returns
289 -------
291 -------
290 The msg_id of the message sent.
292 The msg_id of the message sent.
291 """
293 """
292 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
293 msg = self.session.msg('complete_request', content)
295 msg = self.session.msg('complete_request', content)
294 self._queue_request(msg)
296 self._queue_request(msg)
295 return msg['header']['msg_id']
297 return msg['header']['msg_id']
296
298
297 def object_info(self, oname):
299 def object_info(self, oname):
298 """Get metadata information about an object.
300 """Get metadata information about an object.
299
301
300 Parameters
302 Parameters
301 ----------
303 ----------
302 oname : str
304 oname : str
303 A string specifying the object name.
305 A string specifying the object name.
304
306
305 Returns
307 Returns
306 -------
308 -------
307 The msg_id of the message sent.
309 The msg_id of the message sent.
308 """
310 """
309 content = dict(oname=oname)
311 content = dict(oname=oname)
310 msg = self.session.msg('object_info_request', content)
312 msg = self.session.msg('object_info_request', content)
311 self._queue_request(msg)
313 self._queue_request(msg)
312 return msg['header']['msg_id']
314 return msg['header']['msg_id']
313
315
314 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
316 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
315 """Get entries from the history list.
317 """Get entries from the history list.
316
318
317 Parameters
319 Parameters
318 ----------
320 ----------
319 raw : bool
321 raw : bool
320 If True, return the raw input.
322 If True, return the raw input.
321 output : bool
323 output : bool
322 If True, then return the output as well.
324 If True, then return the output as well.
323 hist_access_type : str
325 hist_access_type : str
324 'range' (fill in session, start and stop params), 'tail' (fill in n)
326 'range' (fill in session, start and stop params), 'tail' (fill in n)
325 or 'search' (fill in pattern param).
327 or 'search' (fill in pattern param).
326
328
327 session : int
329 session : int
328 For a range request, the session from which to get lines. Session
330 For a range request, the session from which to get lines. Session
329 numbers are positive integers; negative ones count back from the
331 numbers are positive integers; negative ones count back from the
330 current session.
332 current session.
331 start : int
333 start : int
332 The first line number of a history range.
334 The first line number of a history range.
333 stop : int
335 stop : int
334 The final (excluded) line number of a history range.
336 The final (excluded) line number of a history range.
335
337
336 n : int
338 n : int
337 The number of lines of history to get for a tail request.
339 The number of lines of history to get for a tail request.
338
340
339 pattern : str
341 pattern : str
340 The glob-syntax pattern for a search request.
342 The glob-syntax pattern for a search request.
341
343
342 Returns
344 Returns
343 -------
345 -------
344 The msg_id of the message sent.
346 The msg_id of the message sent.
345 """
347 """
346 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
348 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
347 **kwargs)
349 **kwargs)
348 msg = self.session.msg('history_request', content)
350 msg = self.session.msg('history_request', content)
349 self._queue_request(msg)
351 self._queue_request(msg)
350 return msg['header']['msg_id']
352 return msg['header']['msg_id']
351
353
352 def shutdown(self, restart=False):
354 def shutdown(self, restart=False):
353 """Request an immediate kernel shutdown.
355 """Request an immediate kernel shutdown.
354
356
355 Upon receipt of the (empty) reply, client code can safely assume that
357 Upon receipt of the (empty) reply, client code can safely assume that
356 the kernel has shut down and it's safe to forcefully terminate it if
358 the kernel has shut down and it's safe to forcefully terminate it if
357 it's still alive.
359 it's still alive.
358
360
359 The kernel will send the reply via a function registered with Python's
361 The kernel will send the reply via a function registered with Python's
360 atexit module, ensuring it's truly done as the kernel is done with all
362 atexit module, ensuring it's truly done as the kernel is done with all
361 normal operation.
363 normal operation.
362 """
364 """
363 # Send quit message to kernel. Once we implement kernel-side setattr,
365 # Send quit message to kernel. Once we implement kernel-side setattr,
364 # this should probably be done that way, but for now this will do.
366 # this should probably be done that way, but for now this will do.
365 msg = self.session.msg('shutdown_request', {'restart':restart})
367 msg = self.session.msg('shutdown_request', {'restart':restart})
366 self._queue_request(msg)
368 self._queue_request(msg)
367 return msg['header']['msg_id']
369 return msg['header']['msg_id']
368
370
369 def _handle_events(self, socket, events):
371 def _handle_events(self, socket, events):
370 if events & POLLERR:
372 if events & POLLERR:
371 self._handle_err()
373 self._handle_err()
372 if events & POLLOUT:
374 if events & POLLOUT:
373 self._handle_send()
375 self._handle_send()
374 if events & POLLIN:
376 if events & POLLIN:
375 self._handle_recv()
377 self._handle_recv()
376
378
377 def _handle_recv(self):
379 def _handle_recv(self):
378 ident,msg = self.session.recv(self.socket, 0)
380 ident,msg = self.session.recv(self.socket, 0)
379 self.call_handlers(msg)
381 self.call_handlers(msg)
380
382
381 def _handle_send(self):
383 def _handle_send(self):
382 try:
384 try:
383 msg = self.command_queue.get(False)
385 msg = self.command_queue.get(False)
384 except Empty:
386 except Empty:
385 pass
387 pass
386 else:
388 else:
387 self.session.send(self.socket,msg)
389 self.session.send(self.socket,msg)
388 if self.command_queue.empty():
390 if self.command_queue.empty():
389 self.drop_io_state(POLLOUT)
391 self.drop_io_state(POLLOUT)
390
392
391 def _handle_err(self):
393 def _handle_err(self):
392 # We don't want to let this go silently, so eventually we should log.
394 # We don't want to let this go silently, so eventually we should log.
393 raise zmq.ZMQError()
395 raise zmq.ZMQError()
394
396
395 def _queue_request(self, msg):
397 def _queue_request(self, msg):
396 self.command_queue.put(msg)
398 self.command_queue.put(msg)
397 self.add_io_state(POLLOUT)
399 self.add_io_state(POLLOUT)
398
400
399
401
400 class SubSocketChannel(ZMQSocketChannel):
402 class SubSocketChannel(ZMQSocketChannel):
401 """The SUB channel which listens for messages that the kernel publishes.
403 """The SUB channel which listens for messages that the kernel publishes.
402 """
404 """
403
405
404 def __init__(self, context, session, address):
406 def __init__(self, context, session, address):
405 super(SubSocketChannel, self).__init__(context, session, address)
407 super(SubSocketChannel, self).__init__(context, session, address)
406 self.ioloop = ioloop.IOLoop()
408 self.ioloop = ioloop.IOLoop()
407
409
408 def run(self):
410 def run(self):
409 """The thread's main activity. Call start() instead."""
411 """The thread's main activity. Call start() instead."""
410 self.socket = self.context.socket(zmq.SUB)
412 self.socket = self.context.socket(zmq.SUB)
411 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
413 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
414 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 self.socket.connect('tcp://%s:%i' % self.address)
415 self.socket.connect('tcp://%s:%i' % self.address)
414 self.iostate = POLLIN|POLLERR
416 self.iostate = POLLIN|POLLERR
415 self.ioloop.add_handler(self.socket, self._handle_events,
417 self.ioloop.add_handler(self.socket, self._handle_events,
416 self.iostate)
418 self.iostate)
417 self._run_loop()
419 self._run_loop()
418
420
419 def stop(self):
421 def stop(self):
420 self.ioloop.stop()
422 self.ioloop.stop()
421 super(SubSocketChannel, self).stop()
423 super(SubSocketChannel, self).stop()
422
424
423 def call_handlers(self, msg):
425 def call_handlers(self, msg):
424 """This method is called in the ioloop thread when a message arrives.
426 """This method is called in the ioloop thread when a message arrives.
425
427
426 Subclasses should override this method to handle incoming messages.
428 Subclasses should override this method to handle incoming messages.
427 It is important to remember that this method is called in the thread
429 It is important to remember that this method is called in the thread
428 so that some logic must be done to ensure that the application leve
430 so that some logic must be done to ensure that the application leve
429 handlers are called in the application thread.
431 handlers are called in the application thread.
430 """
432 """
431 raise NotImplementedError('call_handlers must be defined in a subclass.')
433 raise NotImplementedError('call_handlers must be defined in a subclass.')
432
434
433 def flush(self, timeout=1.0):
435 def flush(self, timeout=1.0):
434 """Immediately processes all pending messages on the SUB channel.
436 """Immediately processes all pending messages on the SUB channel.
435
437
436 Callers should use this method to ensure that :method:`call_handlers`
438 Callers should use this method to ensure that :method:`call_handlers`
437 has been called for all messages that have been received on the
439 has been called for all messages that have been received on the
438 0MQ SUB socket of this channel.
440 0MQ SUB socket of this channel.
439
441
440 This method is thread safe.
442 This method is thread safe.
441
443
442 Parameters
444 Parameters
443 ----------
445 ----------
444 timeout : float, optional
446 timeout : float, optional
445 The maximum amount of time to spend flushing, in seconds. The
447 The maximum amount of time to spend flushing, in seconds. The
446 default is one second.
448 default is one second.
447 """
449 """
448 # We do the IOLoop callback process twice to ensure that the IOLoop
450 # We do the IOLoop callback process twice to ensure that the IOLoop
449 # gets to perform at least one full poll.
451 # gets to perform at least one full poll.
450 stop_time = time.time() + timeout
452 stop_time = time.time() + timeout
451 for i in xrange(2):
453 for i in xrange(2):
452 self._flushed = False
454 self._flushed = False
453 self.ioloop.add_callback(self._flush)
455 self.ioloop.add_callback(self._flush)
454 while not self._flushed and time.time() < stop_time:
456 while not self._flushed and time.time() < stop_time:
455 time.sleep(0.01)
457 time.sleep(0.01)
456
458
457 def _handle_events(self, socket, events):
459 def _handle_events(self, socket, events):
458 # Turn on and off POLLOUT depending on if we have made a request
460 # Turn on and off POLLOUT depending on if we have made a request
459 if events & POLLERR:
461 if events & POLLERR:
460 self._handle_err()
462 self._handle_err()
461 if events & POLLIN:
463 if events & POLLIN:
462 self._handle_recv()
464 self._handle_recv()
463
465
464 def _handle_err(self):
466 def _handle_err(self):
465 # We don't want to let this go silently, so eventually we should log.
467 # We don't want to let this go silently, so eventually we should log.
466 raise zmq.ZMQError()
468 raise zmq.ZMQError()
467
469
468 def _handle_recv(self):
470 def _handle_recv(self):
469 # Get all of the messages we can
471 # Get all of the messages we can
470 while True:
472 while True:
471 try:
473 try:
472 ident,msg = self.session.recv(self.socket)
474 ident,msg = self.session.recv(self.socket)
473 except zmq.ZMQError:
475 except zmq.ZMQError:
474 # Check the errno?
476 # Check the errno?
475 # Will this trigger POLLERR?
477 # Will this trigger POLLERR?
476 break
478 break
477 else:
479 else:
478 if msg is None:
480 if msg is None:
479 break
481 break
480 self.call_handlers(msg)
482 self.call_handlers(msg)
481
483
482 def _flush(self):
484 def _flush(self):
483 """Callback for :method:`self.flush`."""
485 """Callback for :method:`self.flush`."""
484 self._flushed = True
486 self._flushed = True
485
487
486
488
487 class StdInSocketChannel(ZMQSocketChannel):
489 class StdInSocketChannel(ZMQSocketChannel):
488 """A reply channel to handle raw_input requests that the kernel makes."""
490 """A reply channel to handle raw_input requests that the kernel makes."""
489
491
490 msg_queue = None
492 msg_queue = None
491
493
492 def __init__(self, context, session, address):
494 def __init__(self, context, session, address):
493 super(StdInSocketChannel, self).__init__(context, session, address)
495 super(StdInSocketChannel, self).__init__(context, session, address)
494 self.ioloop = ioloop.IOLoop()
496 self.ioloop = ioloop.IOLoop()
495 self.msg_queue = Queue()
497 self.msg_queue = Queue()
496
498
497 def run(self):
499 def run(self):
498 """The thread's main activity. Call start() instead."""
500 """The thread's main activity. Call start() instead."""
499 self.socket = self.context.socket(zmq.DEALER)
501 self.socket = self.context.socket(zmq.DEALER)
500 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
502 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
501 self.socket.connect('tcp://%s:%i' % self.address)
503 self.socket.connect('tcp://%s:%i' % self.address)
502 self.iostate = POLLERR|POLLIN
504 self.iostate = POLLERR|POLLIN
503 self.ioloop.add_handler(self.socket, self._handle_events,
505 self.ioloop.add_handler(self.socket, self._handle_events,
504 self.iostate)
506 self.iostate)
505 self._run_loop()
507 self._run_loop()
506
508
507 def stop(self):
509 def stop(self):
508 self.ioloop.stop()
510 self.ioloop.stop()
509 super(StdInSocketChannel, self).stop()
511 super(StdInSocketChannel, self).stop()
510
512
511 def call_handlers(self, msg):
513 def call_handlers(self, msg):
512 """This method is called in the ioloop thread when a message arrives.
514 """This method is called in the ioloop thread when a message arrives.
513
515
514 Subclasses should override this method to handle incoming messages.
516 Subclasses should override this method to handle incoming messages.
515 It is important to remember that this method is called in the thread
517 It is important to remember that this method is called in the thread
516 so that some logic must be done to ensure that the application leve
518 so that some logic must be done to ensure that the application leve
517 handlers are called in the application thread.
519 handlers are called in the application thread.
518 """
520 """
519 raise NotImplementedError('call_handlers must be defined in a subclass.')
521 raise NotImplementedError('call_handlers must be defined in a subclass.')
520
522
521 def input(self, string):
523 def input(self, string):
522 """Send a string of raw input to the kernel."""
524 """Send a string of raw input to the kernel."""
523 content = dict(value=string)
525 content = dict(value=string)
524 msg = self.session.msg('input_reply', content)
526 msg = self.session.msg('input_reply', content)
525 self._queue_reply(msg)
527 self._queue_reply(msg)
526
528
527 def _handle_events(self, socket, events):
529 def _handle_events(self, socket, events):
528 if events & POLLERR:
530 if events & POLLERR:
529 self._handle_err()
531 self._handle_err()
530 if events & POLLOUT:
532 if events & POLLOUT:
531 self._handle_send()
533 self._handle_send()
532 if events & POLLIN:
534 if events & POLLIN:
533 self._handle_recv()
535 self._handle_recv()
534
536
535 def _handle_recv(self):
537 def _handle_recv(self):
536 ident,msg = self.session.recv(self.socket, 0)
538 ident,msg = self.session.recv(self.socket, 0)
537 self.call_handlers(msg)
539 self.call_handlers(msg)
538
540
539 def _handle_send(self):
541 def _handle_send(self):
540 try:
542 try:
541 msg = self.msg_queue.get(False)
543 msg = self.msg_queue.get(False)
542 except Empty:
544 except Empty:
543 pass
545 pass
544 else:
546 else:
545 self.session.send(self.socket,msg)
547 self.session.send(self.socket,msg)
546 if self.msg_queue.empty():
548 if self.msg_queue.empty():
547 self.drop_io_state(POLLOUT)
549 self.drop_io_state(POLLOUT)
548
550
549 def _handle_err(self):
551 def _handle_err(self):
550 # We don't want to let this go silently, so eventually we should log.
552 # We don't want to let this go silently, so eventually we should log.
551 raise zmq.ZMQError()
553 raise zmq.ZMQError()
552
554
553 def _queue_reply(self, msg):
555 def _queue_reply(self, msg):
554 self.msg_queue.put(msg)
556 self.msg_queue.put(msg)
555 self.add_io_state(POLLOUT)
557 self.add_io_state(POLLOUT)
556
558
557
559
558 class HBSocketChannel(ZMQSocketChannel):
560 class HBSocketChannel(ZMQSocketChannel):
559 """The heartbeat channel which monitors the kernel heartbeat.
561 """The heartbeat channel which monitors the kernel heartbeat.
560
562
561 Note that the heartbeat channel is paused by default. As long as you start
563 Note that the heartbeat channel is paused by default. As long as you start
562 this channel, the kernel manager will ensure that it is paused and un-paused
564 this channel, the kernel manager will ensure that it is paused and un-paused
563 as appropriate.
565 as appropriate.
564 """
566 """
565
567
566 time_to_dead = 3.0
568 time_to_dead = 3.0
567 socket = None
569 socket = None
568 poller = None
570 poller = None
569 _running = None
571 _running = None
570 _pause = None
572 _pause = None
571
573
572 def __init__(self, context, session, address):
574 def __init__(self, context, session, address):
573 super(HBSocketChannel, self).__init__(context, session, address)
575 super(HBSocketChannel, self).__init__(context, session, address)
574 self._running = False
576 self._running = False
575 self._pause = True
577 self._pause = True
576
578
577 def _create_socket(self):
579 def _create_socket(self):
578 self.socket = self.context.socket(zmq.REQ)
580 self.socket = self.context.socket(zmq.REQ)
579 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
581 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
580 self.socket.connect('tcp://%s:%i' % self.address)
582 self.socket.connect('tcp://%s:%i' % self.address)
581 self.poller = zmq.Poller()
583 self.poller = zmq.Poller()
582 self.poller.register(self.socket, zmq.POLLIN)
584 self.poller.register(self.socket, zmq.POLLIN)
583
585
584 def run(self):
586 def run(self):
585 """The thread's main activity. Call start() instead."""
587 """The thread's main activity. Call start() instead."""
586 self._create_socket()
588 self._create_socket()
587 self._running = True
589 self._running = True
588 while self._running:
590 while self._running:
589 if self._pause:
591 if self._pause:
590 time.sleep(self.time_to_dead)
592 time.sleep(self.time_to_dead)
591 else:
593 else:
592 since_last_heartbeat = 0.0
594 since_last_heartbeat = 0.0
593 request_time = time.time()
595 request_time = time.time()
594 try:
596 try:
595 #io.rprint('Ping from HB channel') # dbg
597 #io.rprint('Ping from HB channel') # dbg
596 self.socket.send(b'ping')
598 self.socket.send(b'ping')
597 except zmq.ZMQError, e:
599 except zmq.ZMQError, e:
598 #io.rprint('*** HB Error:', e) # dbg
600 #io.rprint('*** HB Error:', e) # dbg
599 if e.errno == zmq.EFSM:
601 if e.errno == zmq.EFSM:
600 #io.rprint('sleep...', self.time_to_dead) # dbg
602 #io.rprint('sleep...', self.time_to_dead) # dbg
601 time.sleep(self.time_to_dead)
603 time.sleep(self.time_to_dead)
602 self._create_socket()
604 self._create_socket()
603 else:
605 else:
604 raise
606 raise
605 else:
607 else:
606 while True:
608 while True:
607 try:
609 try:
608 self.socket.recv(zmq.NOBLOCK)
610 self.socket.recv(zmq.NOBLOCK)
609 except zmq.ZMQError, e:
611 except zmq.ZMQError, e:
610 #io.rprint('*** HB Error 2:', e) # dbg
612 #io.rprint('*** HB Error 2:', e) # dbg
611 if e.errno == zmq.EAGAIN:
613 if e.errno == zmq.EAGAIN:
612 before_poll = time.time()
614 before_poll = time.time()
613 until_dead = self.time_to_dead - (before_poll -
615 until_dead = self.time_to_dead - (before_poll -
614 request_time)
616 request_time)
615
617
616 # When the return value of poll() is an empty
618 # When the return value of poll() is an empty
617 # list, that is when things have gone wrong
619 # list, that is when things have gone wrong
618 # (zeromq bug). As long as it is not an empty
620 # (zeromq bug). As long as it is not an empty
619 # list, poll is working correctly even if it
621 # list, poll is working correctly even if it
620 # returns quickly. Note: poll timeout is in
622 # returns quickly. Note: poll timeout is in
621 # milliseconds.
623 # milliseconds.
622 if until_dead > 0.0:
624 if until_dead > 0.0:
623 while True:
625 while True:
624 try:
626 try:
625 self.poller.poll(1000 * until_dead)
627 self.poller.poll(1000 * until_dead)
626 except zmq.ZMQError as e:
628 except zmq.ZMQError as e:
627 if e.errno == errno.EINTR:
629 if e.errno == errno.EINTR:
628 continue
630 continue
629 else:
631 else:
630 raise
632 raise
631 else:
633 else:
632 break
634 break
633
635
634 since_last_heartbeat = time.time()-request_time
636 since_last_heartbeat = time.time()-request_time
635 if since_last_heartbeat > self.time_to_dead:
637 if since_last_heartbeat > self.time_to_dead:
636 self.call_handlers(since_last_heartbeat)
638 self.call_handlers(since_last_heartbeat)
637 break
639 break
638 else:
640 else:
639 # FIXME: We should probably log this instead.
641 # FIXME: We should probably log this instead.
640 raise
642 raise
641 else:
643 else:
642 until_dead = self.time_to_dead - (time.time() -
644 until_dead = self.time_to_dead - (time.time() -
643 request_time)
645 request_time)
644 if until_dead > 0.0:
646 if until_dead > 0.0:
645 #io.rprint('sleep...', self.time_to_dead) # dbg
647 #io.rprint('sleep...', self.time_to_dead) # dbg
646 time.sleep(until_dead)
648 time.sleep(until_dead)
647 break
649 break
648
650
649 def pause(self):
651 def pause(self):
650 """Pause the heartbeat."""
652 """Pause the heartbeat."""
651 self._pause = True
653 self._pause = True
652
654
653 def unpause(self):
655 def unpause(self):
654 """Unpause the heartbeat."""
656 """Unpause the heartbeat."""
655 self._pause = False
657 self._pause = False
656
658
657 def is_beating(self):
659 def is_beating(self):
658 """Is the heartbeat running and not paused."""
660 """Is the heartbeat running and not paused."""
659 if self.is_alive() and not self._pause:
661 if self.is_alive() and not self._pause:
660 return True
662 return True
661 else:
663 else:
662 return False
664 return False
663
665
664 def stop(self):
666 def stop(self):
665 self._running = False
667 self._running = False
666 super(HBSocketChannel, self).stop()
668 super(HBSocketChannel, self).stop()
667
669
668 def call_handlers(self, since_last_heartbeat):
670 def call_handlers(self, since_last_heartbeat):
669 """This method is called in the ioloop thread when a message arrives.
671 """This method is called in the ioloop thread when a message arrives.
670
672
671 Subclasses should override this method to handle incoming messages.
673 Subclasses should override this method to handle incoming messages.
672 It is important to remember that this method is called in the thread
674 It is important to remember that this method is called in the thread
673 so that some logic must be done to ensure that the application leve
675 so that some logic must be done to ensure that the application leve
674 handlers are called in the application thread.
676 handlers are called in the application thread.
675 """
677 """
676 raise NotImplementedError('call_handlers must be defined in a subclass.')
678 raise NotImplementedError('call_handlers must be defined in a subclass.')
677
679
678
680
679 #-----------------------------------------------------------------------------
681 #-----------------------------------------------------------------------------
680 # Main kernel manager class
682 # Main kernel manager class
681 #-----------------------------------------------------------------------------
683 #-----------------------------------------------------------------------------
682
684
683 class KernelManager(HasTraits):
685 class KernelManager(HasTraits):
684 """ Manages a kernel for a frontend.
686 """ Manages a kernel for a frontend.
685
687
686 The SUB channel is for the frontend to receive messages published by the
688 The SUB channel is for the frontend to receive messages published by the
687 kernel.
689 kernel.
688
690
689 The REQ channel is for the frontend to make requests of the kernel.
691 The REQ channel is for the frontend to make requests of the kernel.
690
692
691 The REP channel is for the kernel to request stdin (raw_input) from the
693 The REP channel is for the kernel to request stdin (raw_input) from the
692 frontend.
694 frontend.
693 """
695 """
694 # config object for passing to child configurables
696 # config object for passing to child configurables
695 config = Instance(Config)
697 config = Instance(Config)
696
698
697 # The PyZMQ Context to use for communication with the kernel.
699 # The PyZMQ Context to use for communication with the kernel.
698 context = Instance(zmq.Context)
700 context = Instance(zmq.Context)
699 def _context_default(self):
701 def _context_default(self):
700 return zmq.Context.instance()
702 return zmq.Context.instance()
701
703
702 # The Session to use for communication with the kernel.
704 # The Session to use for communication with the kernel.
703 session = Instance(Session)
705 session = Instance(Session)
704
706
705 # The kernel process with which the KernelManager is communicating.
707 # The kernel process with which the KernelManager is communicating.
706 kernel = Instance(Popen)
708 kernel = Instance(Popen)
707
709
708 # The addresses for the communication channels.
710 # The addresses for the communication channels.
709 connection_file = Unicode('')
711 connection_file = Unicode('')
710 ip = Unicode(LOCALHOST)
712 ip = Unicode(LOCALHOST)
711 shell_port = Int(0)
713 shell_port = Int(0)
712 iopub_port = Int(0)
714 iopub_port = Int(0)
713 stdin_port = Int(0)
715 stdin_port = Int(0)
714 hb_port = Int(0)
716 hb_port = Int(0)
715
717
716 # The classes to use for the various channels.
718 # The classes to use for the various channels.
717 shell_channel_class = Type(ShellSocketChannel)
719 shell_channel_class = Type(ShellSocketChannel)
718 sub_channel_class = Type(SubSocketChannel)
720 sub_channel_class = Type(SubSocketChannel)
719 stdin_channel_class = Type(StdInSocketChannel)
721 stdin_channel_class = Type(StdInSocketChannel)
720 hb_channel_class = Type(HBSocketChannel)
722 hb_channel_class = Type(HBSocketChannel)
721
723
722 # Protected traits.
724 # Protected traits.
723 _launch_args = Any
725 _launch_args = Any
724 _shell_channel = Any
726 _shell_channel = Any
725 _sub_channel = Any
727 _sub_channel = Any
726 _stdin_channel = Any
728 _stdin_channel = Any
727 _hb_channel = Any
729 _hb_channel = Any
728 _connection_file_written=Bool(False)
730 _connection_file_written=Bool(False)
729
731
730 def __init__(self, **kwargs):
732 def __init__(self, **kwargs):
731 super(KernelManager, self).__init__(**kwargs)
733 super(KernelManager, self).__init__(**kwargs)
732 if self.session is None:
734 if self.session is None:
733 self.session = Session(config=self.config)
735 self.session = Session(config=self.config)
734
736
735 def __del__(self):
737 def __del__(self):
736 if self._connection_file_written:
738 if self._connection_file_written:
737 # cleanup connection files on full shutdown of kernel we started
739 # cleanup connection files on full shutdown of kernel we started
738 self._connection_file_written = False
740 self._connection_file_written = False
739 try:
741 try:
740 os.remove(self.connection_file)
742 os.remove(self.connection_file)
741 except IOError:
743 except IOError:
742 pass
744 pass
743
745
744
746
745 #--------------------------------------------------------------------------
747 #--------------------------------------------------------------------------
746 # Channel management methods:
748 # Channel management methods:
747 #--------------------------------------------------------------------------
749 #--------------------------------------------------------------------------
748
750
749 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
751 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
750 """Starts the channels for this kernel.
752 """Starts the channels for this kernel.
751
753
752 This will create the channels if they do not exist and then start
754 This will create the channels if they do not exist and then start
753 them. If port numbers of 0 are being used (random ports) then you
755 them. If port numbers of 0 are being used (random ports) then you
754 must first call :method:`start_kernel`. If the channels have been
756 must first call :method:`start_kernel`. If the channels have been
755 stopped and you call this, :class:`RuntimeError` will be raised.
757 stopped and you call this, :class:`RuntimeError` will be raised.
756 """
758 """
757 if shell:
759 if shell:
758 self.shell_channel.start()
760 self.shell_channel.start()
759 if sub:
761 if sub:
760 self.sub_channel.start()
762 self.sub_channel.start()
761 if stdin:
763 if stdin:
762 self.stdin_channel.start()
764 self.stdin_channel.start()
763 self.shell_channel.allow_stdin = True
765 self.shell_channel.allow_stdin = True
764 else:
766 else:
765 self.shell_channel.allow_stdin = False
767 self.shell_channel.allow_stdin = False
766 if hb:
768 if hb:
767 self.hb_channel.start()
769 self.hb_channel.start()
768
770
769 def stop_channels(self):
771 def stop_channels(self):
770 """Stops all the running channels for this kernel.
772 """Stops all the running channels for this kernel.
771 """
773 """
772 if self.shell_channel.is_alive():
774 if self.shell_channel.is_alive():
773 self.shell_channel.stop()
775 self.shell_channel.stop()
774 if self.sub_channel.is_alive():
776 if self.sub_channel.is_alive():
775 self.sub_channel.stop()
777 self.sub_channel.stop()
776 if self.stdin_channel.is_alive():
778 if self.stdin_channel.is_alive():
777 self.stdin_channel.stop()
779 self.stdin_channel.stop()
778 if self.hb_channel.is_alive():
780 if self.hb_channel.is_alive():
779 self.hb_channel.stop()
781 self.hb_channel.stop()
780
782
781 @property
783 @property
782 def channels_running(self):
784 def channels_running(self):
783 """Are any of the channels created and running?"""
785 """Are any of the channels created and running?"""
784 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
786 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
785 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
786
788
787 #--------------------------------------------------------------------------
789 #--------------------------------------------------------------------------
788 # Kernel process management methods:
790 # Kernel process management methods:
789 #--------------------------------------------------------------------------
791 #--------------------------------------------------------------------------
790
792
793 def load_connection_file(self):
794 """load connection info from JSON dict in self.connection_file"""
795 with open(self.connection_file) as f:
796 cfg = json.loads(f.read())
797
798 self.ip = cfg['ip']
799 self.shell_port = cfg['shell_port']
800 self.stdin_port = cfg['stdin_port']
801 self.iopub_port = cfg['iopub_port']
802 self.hb_port = cfg['hb_port']
803 self.session.key = str_to_bytes(cfg['key'])
804
791 def write_connection_file(self):
805 def write_connection_file(self):
806 """write connection info to JSON dict in self.connection_file"""
792 if self._connection_file_written:
807 if self._connection_file_written:
793 return
808 return
794 self.connection_file,cfg = write_connection_file(self.connection_file,
809 self.connection_file,cfg = write_connection_file(self.connection_file,
795 ip=self.ip, key=self.session.key,
810 ip=self.ip, key=self.session.key,
796 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
811 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
797 shell_port=self.shell_port, hb_port=self.hb_port)
812 shell_port=self.shell_port, hb_port=self.hb_port)
798 # write_connection_file also sets default ports:
813 # write_connection_file also sets default ports:
799 self.shell_port = cfg['shell_port']
814 self.shell_port = cfg['shell_port']
800 self.stdin_port = cfg['stdin_port']
815 self.stdin_port = cfg['stdin_port']
801 self.iopub_port = cfg['iopub_port']
816 self.iopub_port = cfg['iopub_port']
802 self.hb_port = cfg['hb_port']
817 self.hb_port = cfg['hb_port']
803
818
804 self._connection_file_written = True
819 self._connection_file_written = True
805
820
806 def start_kernel(self, **kw):
821 def start_kernel(self, **kw):
807 """Starts a kernel process and configures the manager to use it.
822 """Starts a kernel process and configures the manager to use it.
808
823
809 If random ports (port=0) are being used, this method must be called
824 If random ports (port=0) are being used, this method must be called
810 before the channels are created.
825 before the channels are created.
811
826
812 Parameters:
827 Parameters:
813 -----------
828 -----------
814 ipython : bool, optional (default True)
829 ipython : bool, optional (default True)
815 Whether to use an IPython kernel instead of a plain Python kernel.
830 Whether to use an IPython kernel instead of a plain Python kernel.
816
831
817 launcher : callable, optional (default None)
832 launcher : callable, optional (default None)
818 A custom function for launching the kernel process (generally a
833 A custom function for launching the kernel process (generally a
819 wrapper around ``entry_point.base_launch_kernel``). In most cases,
834 wrapper around ``entry_point.base_launch_kernel``). In most cases,
820 it should not be necessary to use this parameter.
835 it should not be necessary to use this parameter.
821
836
822 **kw : optional
837 **kw : optional
823 See respective options for IPython and Python kernels.
838 See respective options for IPython and Python kernels.
824 """
839 """
825 if self.ip not in LOCAL_IPS:
840 if self.ip not in LOCAL_IPS:
826 raise RuntimeError("Can only launch a kernel on a local interface. "
841 raise RuntimeError("Can only launch a kernel on a local interface. "
827 "Make sure that the '*_address' attributes are "
842 "Make sure that the '*_address' attributes are "
828 "configured properly. "
843 "configured properly. "
829 "Currently valid addresses are: %s"%LOCAL_IPS
844 "Currently valid addresses are: %s"%LOCAL_IPS
830 )
845 )
831
846
832 # write connection file / get default ports
847 # write connection file / get default ports
833 self.write_connection_file()
848 self.write_connection_file()
834
849
835 self._launch_args = kw.copy()
850 self._launch_args = kw.copy()
836 launch_kernel = kw.pop('launcher', None)
851 launch_kernel = kw.pop('launcher', None)
837 if launch_kernel is None:
852 if launch_kernel is None:
838 if kw.pop('ipython', True):
853 if kw.pop('ipython', True):
839 from ipkernel import launch_kernel
854 from ipkernel import launch_kernel
840 else:
855 else:
841 from pykernel import launch_kernel
856 from pykernel import launch_kernel
842 self.kernel = launch_kernel(fname=self.connection_file, **kw)
857 self.kernel = launch_kernel(fname=self.connection_file, **kw)
843
858
844 def shutdown_kernel(self, restart=False):
859 def shutdown_kernel(self, restart=False):
845 """ Attempts to the stop the kernel process cleanly. If the kernel
860 """ Attempts to the stop the kernel process cleanly. If the kernel
846 cannot be stopped, it is killed, if possible.
861 cannot be stopped, it is killed, if possible.
847 """
862 """
848 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
863 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
849 if sys.platform == 'win32':
864 if sys.platform == 'win32':
850 self.kill_kernel()
865 self.kill_kernel()
851 return
866 return
852
867
853 # Pause the heart beat channel if it exists.
868 # Pause the heart beat channel if it exists.
854 if self._hb_channel is not None:
869 if self._hb_channel is not None:
855 self._hb_channel.pause()
870 self._hb_channel.pause()
856
871
857 # Don't send any additional kernel kill messages immediately, to give
872 # Don't send any additional kernel kill messages immediately, to give
858 # the kernel a chance to properly execute shutdown actions. Wait for at
873 # the kernel a chance to properly execute shutdown actions. Wait for at
859 # most 1s, checking every 0.1s.
874 # most 1s, checking every 0.1s.
860 self.shell_channel.shutdown(restart=restart)
875 self.shell_channel.shutdown(restart=restart)
861 for i in range(10):
876 for i in range(10):
862 if self.is_alive:
877 if self.is_alive:
863 time.sleep(0.1)
878 time.sleep(0.1)
864 else:
879 else:
865 break
880 break
866 else:
881 else:
867 # OK, we've waited long enough.
882 # OK, we've waited long enough.
868 if self.has_kernel:
883 if self.has_kernel:
869 self.kill_kernel()
884 self.kill_kernel()
870
885
871 if not restart and self._connection_file_written:
886 if not restart and self._connection_file_written:
872 # cleanup connection files on full shutdown of kernel we started
887 # cleanup connection files on full shutdown of kernel we started
873 self._connection_file_written = False
888 self._connection_file_written = False
874 try:
889 try:
875 os.remove(self.connection_file)
890 os.remove(self.connection_file)
876 except IOError:
891 except IOError:
877 pass
892 pass
878
893
879 def restart_kernel(self, now=False, **kw):
894 def restart_kernel(self, now=False, **kw):
880 """Restarts a kernel with the arguments that were used to launch it.
895 """Restarts a kernel with the arguments that were used to launch it.
881
896
882 If the old kernel was launched with random ports, the same ports will be
897 If the old kernel was launched with random ports, the same ports will be
883 used for the new kernel.
898 used for the new kernel.
884
899
885 Parameters
900 Parameters
886 ----------
901 ----------
887 now : bool, optional
902 now : bool, optional
888 If True, the kernel is forcefully restarted *immediately*, without
903 If True, the kernel is forcefully restarted *immediately*, without
889 having a chance to do any cleanup action. Otherwise the kernel is
904 having a chance to do any cleanup action. Otherwise the kernel is
890 given 1s to clean up before a forceful restart is issued.
905 given 1s to clean up before a forceful restart is issued.
891
906
892 In all cases the kernel is restarted, the only difference is whether
907 In all cases the kernel is restarted, the only difference is whether
893 it is given a chance to perform a clean shutdown or not.
908 it is given a chance to perform a clean shutdown or not.
894
909
895 **kw : optional
910 **kw : optional
896 Any options specified here will replace those used to launch the
911 Any options specified here will replace those used to launch the
897 kernel.
912 kernel.
898 """
913 """
899 if self._launch_args is None:
914 if self._launch_args is None:
900 raise RuntimeError("Cannot restart the kernel. "
915 raise RuntimeError("Cannot restart the kernel. "
901 "No previous call to 'start_kernel'.")
916 "No previous call to 'start_kernel'.")
902 else:
917 else:
903 # Stop currently running kernel.
918 # Stop currently running kernel.
904 if self.has_kernel:
919 if self.has_kernel:
905 if now:
920 if now:
906 self.kill_kernel()
921 self.kill_kernel()
907 else:
922 else:
908 self.shutdown_kernel(restart=True)
923 self.shutdown_kernel(restart=True)
909
924
910 # Start new kernel.
925 # Start new kernel.
911 self._launch_args.update(kw)
926 self._launch_args.update(kw)
912 self.start_kernel(**self._launch_args)
927 self.start_kernel(**self._launch_args)
913
928
914 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
929 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
915 # unless there is some delay here.
930 # unless there is some delay here.
916 if sys.platform == 'win32':
931 if sys.platform == 'win32':
917 time.sleep(0.2)
932 time.sleep(0.2)
918
933
919 @property
934 @property
920 def has_kernel(self):
935 def has_kernel(self):
921 """Returns whether a kernel process has been specified for the kernel
936 """Returns whether a kernel process has been specified for the kernel
922 manager.
937 manager.
923 """
938 """
924 return self.kernel is not None
939 return self.kernel is not None
925
940
926 def kill_kernel(self):
941 def kill_kernel(self):
927 """ Kill the running kernel. """
942 """ Kill the running kernel. """
928 if self.has_kernel:
943 if self.has_kernel:
929 # Pause the heart beat channel if it exists.
944 # Pause the heart beat channel if it exists.
930 if self._hb_channel is not None:
945 if self._hb_channel is not None:
931 self._hb_channel.pause()
946 self._hb_channel.pause()
932
947
933 # Attempt to kill the kernel.
948 # Attempt to kill the kernel.
934 try:
949 try:
935 self.kernel.kill()
950 self.kernel.kill()
936 except OSError, e:
951 except OSError, e:
937 # In Windows, we will get an Access Denied error if the process
952 # In Windows, we will get an Access Denied error if the process
938 # has already terminated. Ignore it.
953 # has already terminated. Ignore it.
939 if sys.platform == 'win32':
954 if sys.platform == 'win32':
940 if e.winerror != 5:
955 if e.winerror != 5:
941 raise
956 raise
942 # On Unix, we may get an ESRCH error if the process has already
957 # On Unix, we may get an ESRCH error if the process has already
943 # terminated. Ignore it.
958 # terminated. Ignore it.
944 else:
959 else:
945 from errno import ESRCH
960 from errno import ESRCH
946 if e.errno != ESRCH:
961 if e.errno != ESRCH:
947 raise
962 raise
948 self.kernel = None
963 self.kernel = None
949 else:
964 else:
950 raise RuntimeError("Cannot kill kernel. No kernel is running!")
965 raise RuntimeError("Cannot kill kernel. No kernel is running!")
951
966
952 def interrupt_kernel(self):
967 def interrupt_kernel(self):
953 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
968 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
954 well supported on all platforms.
969 well supported on all platforms.
955 """
970 """
956 if self.has_kernel:
971 if self.has_kernel:
957 if sys.platform == 'win32':
972 if sys.platform == 'win32':
958 from parentpoller import ParentPollerWindows as Poller
973 from parentpoller import ParentPollerWindows as Poller
959 Poller.send_interrupt(self.kernel.win32_interrupt_event)
974 Poller.send_interrupt(self.kernel.win32_interrupt_event)
960 else:
975 else:
961 self.kernel.send_signal(signal.SIGINT)
976 self.kernel.send_signal(signal.SIGINT)
962 else:
977 else:
963 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
978 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
964
979
965 def signal_kernel(self, signum):
980 def signal_kernel(self, signum):
966 """ Sends a signal to the kernel. Note that since only SIGTERM is
981 """ Sends a signal to the kernel. Note that since only SIGTERM is
967 supported on Windows, this function is only useful on Unix systems.
982 supported on Windows, this function is only useful on Unix systems.
968 """
983 """
969 if self.has_kernel:
984 if self.has_kernel:
970 self.kernel.send_signal(signum)
985 self.kernel.send_signal(signum)
971 else:
986 else:
972 raise RuntimeError("Cannot signal kernel. No kernel is running!")
987 raise RuntimeError("Cannot signal kernel. No kernel is running!")
973
988
974 @property
989 @property
975 def is_alive(self):
990 def is_alive(self):
976 """Is the kernel process still running?"""
991 """Is the kernel process still running?"""
977 # FIXME: not using a heartbeat means this method is broken for any
992 # FIXME: not using a heartbeat means this method is broken for any
978 # remote kernel, it's only capable of handling local kernels.
993 # remote kernel, it's only capable of handling local kernels.
979 if self.has_kernel:
994 if self.has_kernel:
980 if self.kernel.poll() is None:
995 if self.kernel.poll() is None:
981 return True
996 return True
982 else:
997 else:
983 return False
998 return False
984 else:
999 else:
985 # We didn't start the kernel with this KernelManager so we don't
1000 # We didn't start the kernel with this KernelManager so we don't
986 # know if it is running. We should use a heartbeat for this case.
1001 # know if it is running. We should use a heartbeat for this case.
987 return True
1002 return True
988
1003
989 #--------------------------------------------------------------------------
1004 #--------------------------------------------------------------------------
990 # Channels used for communication with the kernel:
1005 # Channels used for communication with the kernel:
991 #--------------------------------------------------------------------------
1006 #--------------------------------------------------------------------------
992
1007
993 @property
1008 @property
994 def shell_channel(self):
1009 def shell_channel(self):
995 """Get the REQ socket channel object to make requests of the kernel."""
1010 """Get the REQ socket channel object to make requests of the kernel."""
996 if self._shell_channel is None:
1011 if self._shell_channel is None:
997 self._shell_channel = self.shell_channel_class(self.context,
1012 self._shell_channel = self.shell_channel_class(self.context,
998 self.session,
1013 self.session,
999 (self.ip, self.shell_port))
1014 (self.ip, self.shell_port))
1000 return self._shell_channel
1015 return self._shell_channel
1001
1016
1002 @property
1017 @property
1003 def sub_channel(self):
1018 def sub_channel(self):
1004 """Get the SUB socket channel object."""
1019 """Get the SUB socket channel object."""
1005 if self._sub_channel is None:
1020 if self._sub_channel is None:
1006 self._sub_channel = self.sub_channel_class(self.context,
1021 self._sub_channel = self.sub_channel_class(self.context,
1007 self.session,
1022 self.session,
1008 (self.ip, self.iopub_port))
1023 (self.ip, self.iopub_port))
1009 return self._sub_channel
1024 return self._sub_channel
1010
1025
1011 @property
1026 @property
1012 def stdin_channel(self):
1027 def stdin_channel(self):
1013 """Get the REP socket channel object to handle stdin (raw_input)."""
1028 """Get the REP socket channel object to handle stdin (raw_input)."""
1014 if self._stdin_channel is None:
1029 if self._stdin_channel is None:
1015 self._stdin_channel = self.stdin_channel_class(self.context,
1030 self._stdin_channel = self.stdin_channel_class(self.context,
1016 self.session,
1031 self.session,
1017 (self.ip, self.stdin_port))
1032 (self.ip, self.stdin_port))
1018 return self._stdin_channel
1033 return self._stdin_channel
1019
1034
1020 @property
1035 @property
1021 def hb_channel(self):
1036 def hb_channel(self):
1022 """Get the heartbeat socket channel object to check that the
1037 """Get the heartbeat socket channel object to check that the
1023 kernel is alive."""
1038 kernel is alive."""
1024 if self._hb_channel is None:
1039 if self._hb_channel is None:
1025 self._hb_channel = self.hb_channel_class(self.context,
1040 self._hb_channel = self.hb_channel_class(self.context,
1026 self.session,
1041 self.session,
1027 (self.ip, self.hb_port))
1042 (self.ip, self.hb_port))
1028 return self._hb_channel
1043 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now