##// END OF EJS Templates
use stdlib json to load connection files in zmq.kernelmanager...
MinRK -
Show More
@@ -1,1043 +1,1043
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import errno
19 import errno
20 import json
20 from Queue import Queue, Empty
21 from Queue import Queue, Empty
21 from subprocess import Popen
22 from subprocess import Popen
22 import os
23 import os
23 import signal
24 import signal
24 import sys
25 import sys
25 from threading import Thread
26 from threading import Thread
26 import time
27 import time
27
28
28 # System library imports.
29 # System library imports.
29 import zmq
30 import zmq
30 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq import POLLIN, POLLOUT, POLLERR
31 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
32 from zmq.utils import jsonapi as json
33
33
34 # Local imports.
34 # Local imports.
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
37 from IPython.utils.traitlets import (
37 from IPython.utils.traitlets import (
38 HasTraits, Any, Instance, Type, Unicode, Int, Bool
38 HasTraits, Any, Instance, Type, Unicode, Int, Bool
39 )
39 )
40 from IPython.utils.py3compat import str_to_bytes
40 from IPython.utils.py3compat import str_to_bytes
41 from IPython.zmq.entry_point import write_connection_file
41 from IPython.zmq.entry_point import write_connection_file
42 from session import Session
42 from session import Session
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # Constants and exceptions
45 # Constants and exceptions
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 class InvalidPortNumber(Exception):
48 class InvalidPortNumber(Exception):
49 pass
49 pass
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Utility functions
52 # Utility functions
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55 # some utilities to validate message structure, these might get moved elsewhere
55 # some utilities to validate message structure, these might get moved elsewhere
56 # if they prove to have more generic utility
56 # if they prove to have more generic utility
57
57
58 def validate_string_list(lst):
58 def validate_string_list(lst):
59 """Validate that the input is a list of strings.
59 """Validate that the input is a list of strings.
60
60
61 Raises ValueError if not."""
61 Raises ValueError if not."""
62 if not isinstance(lst, list):
62 if not isinstance(lst, list):
63 raise ValueError('input %r must be a list' % lst)
63 raise ValueError('input %r must be a list' % lst)
64 for x in lst:
64 for x in lst:
65 if not isinstance(x, basestring):
65 if not isinstance(x, basestring):
66 raise ValueError('element %r in list must be a string' % x)
66 raise ValueError('element %r in list must be a string' % x)
67
67
68
68
69 def validate_string_dict(dct):
69 def validate_string_dict(dct):
70 """Validate that the input is a dict with string keys and values.
70 """Validate that the input is a dict with string keys and values.
71
71
72 Raises ValueError if not."""
72 Raises ValueError if not."""
73 for k,v in dct.iteritems():
73 for k,v in dct.iteritems():
74 if not isinstance(k, basestring):
74 if not isinstance(k, basestring):
75 raise ValueError('key %r in dict must be a string' % k)
75 raise ValueError('key %r in dict must be a string' % k)
76 if not isinstance(v, basestring):
76 if not isinstance(v, basestring):
77 raise ValueError('value %r in dict must be a string' % v)
77 raise ValueError('value %r in dict must be a string' % v)
78
78
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # ZMQ Socket Channel classes
81 # ZMQ Socket Channel classes
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84 class ZMQSocketChannel(Thread):
84 class ZMQSocketChannel(Thread):
85 """The base class for the channels that use ZMQ sockets.
85 """The base class for the channels that use ZMQ sockets.
86 """
86 """
87 context = None
87 context = None
88 session = None
88 session = None
89 socket = None
89 socket = None
90 ioloop = None
90 ioloop = None
91 iostate = None
91 iostate = None
92 _address = None
92 _address = None
93
93
94 def __init__(self, context, session, address):
94 def __init__(self, context, session, address):
95 """Create a channel
95 """Create a channel
96
96
97 Parameters
97 Parameters
98 ----------
98 ----------
99 context : :class:`zmq.Context`
99 context : :class:`zmq.Context`
100 The ZMQ context to use.
100 The ZMQ context to use.
101 session : :class:`session.Session`
101 session : :class:`session.Session`
102 The session to use.
102 The session to use.
103 address : tuple
103 address : tuple
104 Standard (ip, port) tuple that the kernel is listening on.
104 Standard (ip, port) tuple that the kernel is listening on.
105 """
105 """
106 super(ZMQSocketChannel, self).__init__()
106 super(ZMQSocketChannel, self).__init__()
107 self.daemon = True
107 self.daemon = True
108
108
109 self.context = context
109 self.context = context
110 self.session = session
110 self.session = session
111 if address[1] == 0:
111 if address[1] == 0:
112 message = 'The port number for a channel cannot be 0.'
112 message = 'The port number for a channel cannot be 0.'
113 raise InvalidPortNumber(message)
113 raise InvalidPortNumber(message)
114 self._address = address
114 self._address = address
115
115
116 def _run_loop(self):
116 def _run_loop(self):
117 """Run my loop, ignoring EINTR events in the poller"""
117 """Run my loop, ignoring EINTR events in the poller"""
118 while True:
118 while True:
119 try:
119 try:
120 self.ioloop.start()
120 self.ioloop.start()
121 except zmq.ZMQError as e:
121 except zmq.ZMQError as e:
122 if e.errno == errno.EINTR:
122 if e.errno == errno.EINTR:
123 continue
123 continue
124 else:
124 else:
125 raise
125 raise
126 else:
126 else:
127 break
127 break
128
128
129 def stop(self):
129 def stop(self):
130 """Stop the channel's activity.
130 """Stop the channel's activity.
131
131
132 This calls :method:`Thread.join` and returns when the thread
132 This calls :method:`Thread.join` and returns when the thread
133 terminates. :class:`RuntimeError` will be raised if
133 terminates. :class:`RuntimeError` will be raised if
134 :method:`self.start` is called again.
134 :method:`self.start` is called again.
135 """
135 """
136 self.join()
136 self.join()
137
137
138 @property
138 @property
139 def address(self):
139 def address(self):
140 """Get the channel's address as an (ip, port) tuple.
140 """Get the channel's address as an (ip, port) tuple.
141
141
142 By the default, the address is (localhost, 0), where 0 means a random
142 By the default, the address is (localhost, 0), where 0 means a random
143 port.
143 port.
144 """
144 """
145 return self._address
145 return self._address
146
146
147 def add_io_state(self, state):
147 def add_io_state(self, state):
148 """Add IO state to the eventloop.
148 """Add IO state to the eventloop.
149
149
150 Parameters
150 Parameters
151 ----------
151 ----------
152 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
152 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
153 The IO state flag to set.
153 The IO state flag to set.
154
154
155 This is thread safe as it uses the thread safe IOLoop.add_callback.
155 This is thread safe as it uses the thread safe IOLoop.add_callback.
156 """
156 """
157 def add_io_state_callback():
157 def add_io_state_callback():
158 if not self.iostate & state:
158 if not self.iostate & state:
159 self.iostate = self.iostate | state
159 self.iostate = self.iostate | state
160 self.ioloop.update_handler(self.socket, self.iostate)
160 self.ioloop.update_handler(self.socket, self.iostate)
161 self.ioloop.add_callback(add_io_state_callback)
161 self.ioloop.add_callback(add_io_state_callback)
162
162
163 def drop_io_state(self, state):
163 def drop_io_state(self, state):
164 """Drop IO state from the eventloop.
164 """Drop IO state from the eventloop.
165
165
166 Parameters
166 Parameters
167 ----------
167 ----------
168 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
168 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
169 The IO state flag to set.
169 The IO state flag to set.
170
170
171 This is thread safe as it uses the thread safe IOLoop.add_callback.
171 This is thread safe as it uses the thread safe IOLoop.add_callback.
172 """
172 """
173 def drop_io_state_callback():
173 def drop_io_state_callback():
174 if self.iostate & state:
174 if self.iostate & state:
175 self.iostate = self.iostate & (~state)
175 self.iostate = self.iostate & (~state)
176 self.ioloop.update_handler(self.socket, self.iostate)
176 self.ioloop.update_handler(self.socket, self.iostate)
177 self.ioloop.add_callback(drop_io_state_callback)
177 self.ioloop.add_callback(drop_io_state_callback)
178
178
179
179
180 class ShellSocketChannel(ZMQSocketChannel):
180 class ShellSocketChannel(ZMQSocketChannel):
181 """The XREQ channel for issues request/replies to the kernel.
181 """The XREQ channel for issues request/replies to the kernel.
182 """
182 """
183
183
184 command_queue = None
184 command_queue = None
185 # flag for whether execute requests should be allowed to call raw_input:
185 # flag for whether execute requests should be allowed to call raw_input:
186 allow_stdin = True
186 allow_stdin = True
187
187
188 def __init__(self, context, session, address):
188 def __init__(self, context, session, address):
189 super(ShellSocketChannel, self).__init__(context, session, address)
189 super(ShellSocketChannel, self).__init__(context, session, address)
190 self.command_queue = Queue()
190 self.command_queue = Queue()
191 self.ioloop = ioloop.IOLoop()
191 self.ioloop = ioloop.IOLoop()
192
192
193 def run(self):
193 def run(self):
194 """The thread's main activity. Call start() instead."""
194 """The thread's main activity. Call start() instead."""
195 self.socket = self.context.socket(zmq.DEALER)
195 self.socket = self.context.socket(zmq.DEALER)
196 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
196 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
197 self.socket.connect('tcp://%s:%i' % self.address)
197 self.socket.connect('tcp://%s:%i' % self.address)
198 self.iostate = POLLERR|POLLIN
198 self.iostate = POLLERR|POLLIN
199 self.ioloop.add_handler(self.socket, self._handle_events,
199 self.ioloop.add_handler(self.socket, self._handle_events,
200 self.iostate)
200 self.iostate)
201 self._run_loop()
201 self._run_loop()
202
202
203 def stop(self):
203 def stop(self):
204 self.ioloop.stop()
204 self.ioloop.stop()
205 super(ShellSocketChannel, self).stop()
205 super(ShellSocketChannel, self).stop()
206
206
207 def call_handlers(self, msg):
207 def call_handlers(self, msg):
208 """This method is called in the ioloop thread when a message arrives.
208 """This method is called in the ioloop thread when a message arrives.
209
209
210 Subclasses should override this method to handle incoming messages.
210 Subclasses should override this method to handle incoming messages.
211 It is important to remember that this method is called in the thread
211 It is important to remember that this method is called in the thread
212 so that some logic must be done to ensure that the application leve
212 so that some logic must be done to ensure that the application leve
213 handlers are called in the application thread.
213 handlers are called in the application thread.
214 """
214 """
215 raise NotImplementedError('call_handlers must be defined in a subclass.')
215 raise NotImplementedError('call_handlers must be defined in a subclass.')
216
216
217 def execute(self, code, silent=False,
217 def execute(self, code, silent=False,
218 user_variables=None, user_expressions=None, allow_stdin=None):
218 user_variables=None, user_expressions=None, allow_stdin=None):
219 """Execute code in the kernel.
219 """Execute code in the kernel.
220
220
221 Parameters
221 Parameters
222 ----------
222 ----------
223 code : str
223 code : str
224 A string of Python code.
224 A string of Python code.
225
225
226 silent : bool, optional (default False)
226 silent : bool, optional (default False)
227 If set, the kernel will execute the code as quietly possible.
227 If set, the kernel will execute the code as quietly possible.
228
228
229 user_variables : list, optional
229 user_variables : list, optional
230 A list of variable names to pull from the user's namespace. They
230 A list of variable names to pull from the user's namespace. They
231 will come back as a dict with these names as keys and their
231 will come back as a dict with these names as keys and their
232 :func:`repr` as values.
232 :func:`repr` as values.
233
233
234 user_expressions : dict, optional
234 user_expressions : dict, optional
235 A dict with string keys and to pull from the user's
235 A dict with string keys and to pull from the user's
236 namespace. They will come back as a dict with these names as keys
236 namespace. They will come back as a dict with these names as keys
237 and their :func:`repr` as values.
237 and their :func:`repr` as values.
238
238
239 allow_stdin : bool, optional
239 allow_stdin : bool, optional
240 Flag for
240 Flag for
241 A dict with string keys and to pull from the user's
241 A dict with string keys and to pull from the user's
242 namespace. They will come back as a dict with these names as keys
242 namespace. They will come back as a dict with these names as keys
243 and their :func:`repr` as values.
243 and their :func:`repr` as values.
244
244
245 Returns
245 Returns
246 -------
246 -------
247 The msg_id of the message sent.
247 The msg_id of the message sent.
248 """
248 """
249 if user_variables is None:
249 if user_variables is None:
250 user_variables = []
250 user_variables = []
251 if user_expressions is None:
251 if user_expressions is None:
252 user_expressions = {}
252 user_expressions = {}
253 if allow_stdin is None:
253 if allow_stdin is None:
254 allow_stdin = self.allow_stdin
254 allow_stdin = self.allow_stdin
255
255
256
256
257 # Don't waste network traffic if inputs are invalid
257 # Don't waste network traffic if inputs are invalid
258 if not isinstance(code, basestring):
258 if not isinstance(code, basestring):
259 raise ValueError('code %r must be a string' % code)
259 raise ValueError('code %r must be a string' % code)
260 validate_string_list(user_variables)
260 validate_string_list(user_variables)
261 validate_string_dict(user_expressions)
261 validate_string_dict(user_expressions)
262
262
263 # Create class for content/msg creation. Related to, but possibly
263 # Create class for content/msg creation. Related to, but possibly
264 # not in Session.
264 # not in Session.
265 content = dict(code=code, silent=silent,
265 content = dict(code=code, silent=silent,
266 user_variables=user_variables,
266 user_variables=user_variables,
267 user_expressions=user_expressions,
267 user_expressions=user_expressions,
268 allow_stdin=allow_stdin,
268 allow_stdin=allow_stdin,
269 )
269 )
270 msg = self.session.msg('execute_request', content)
270 msg = self.session.msg('execute_request', content)
271 self._queue_request(msg)
271 self._queue_request(msg)
272 return msg['header']['msg_id']
272 return msg['header']['msg_id']
273
273
274 def complete(self, text, line, cursor_pos, block=None):
274 def complete(self, text, line, cursor_pos, block=None):
275 """Tab complete text in the kernel's namespace.
275 """Tab complete text in the kernel's namespace.
276
276
277 Parameters
277 Parameters
278 ----------
278 ----------
279 text : str
279 text : str
280 The text to complete.
280 The text to complete.
281 line : str
281 line : str
282 The full line of text that is the surrounding context for the
282 The full line of text that is the surrounding context for the
283 text to complete.
283 text to complete.
284 cursor_pos : int
284 cursor_pos : int
285 The position of the cursor in the line where the completion was
285 The position of the cursor in the line where the completion was
286 requested.
286 requested.
287 block : str, optional
287 block : str, optional
288 The full block of code in which the completion is being requested.
288 The full block of code in which the completion is being requested.
289
289
290 Returns
290 Returns
291 -------
291 -------
292 The msg_id of the message sent.
292 The msg_id of the message sent.
293 """
293 """
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
294 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
295 msg = self.session.msg('complete_request', content)
295 msg = self.session.msg('complete_request', content)
296 self._queue_request(msg)
296 self._queue_request(msg)
297 return msg['header']['msg_id']
297 return msg['header']['msg_id']
298
298
299 def object_info(self, oname):
299 def object_info(self, oname):
300 """Get metadata information about an object.
300 """Get metadata information about an object.
301
301
302 Parameters
302 Parameters
303 ----------
303 ----------
304 oname : str
304 oname : str
305 A string specifying the object name.
305 A string specifying the object name.
306
306
307 Returns
307 Returns
308 -------
308 -------
309 The msg_id of the message sent.
309 The msg_id of the message sent.
310 """
310 """
311 content = dict(oname=oname)
311 content = dict(oname=oname)
312 msg = self.session.msg('object_info_request', content)
312 msg = self.session.msg('object_info_request', content)
313 self._queue_request(msg)
313 self._queue_request(msg)
314 return msg['header']['msg_id']
314 return msg['header']['msg_id']
315
315
316 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
316 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
317 """Get entries from the history list.
317 """Get entries from the history list.
318
318
319 Parameters
319 Parameters
320 ----------
320 ----------
321 raw : bool
321 raw : bool
322 If True, return the raw input.
322 If True, return the raw input.
323 output : bool
323 output : bool
324 If True, then return the output as well.
324 If True, then return the output as well.
325 hist_access_type : str
325 hist_access_type : str
326 'range' (fill in session, start and stop params), 'tail' (fill in n)
326 'range' (fill in session, start and stop params), 'tail' (fill in n)
327 or 'search' (fill in pattern param).
327 or 'search' (fill in pattern param).
328
328
329 session : int
329 session : int
330 For a range request, the session from which to get lines. Session
330 For a range request, the session from which to get lines. Session
331 numbers are positive integers; negative ones count back from the
331 numbers are positive integers; negative ones count back from the
332 current session.
332 current session.
333 start : int
333 start : int
334 The first line number of a history range.
334 The first line number of a history range.
335 stop : int
335 stop : int
336 The final (excluded) line number of a history range.
336 The final (excluded) line number of a history range.
337
337
338 n : int
338 n : int
339 The number of lines of history to get for a tail request.
339 The number of lines of history to get for a tail request.
340
340
341 pattern : str
341 pattern : str
342 The glob-syntax pattern for a search request.
342 The glob-syntax pattern for a search request.
343
343
344 Returns
344 Returns
345 -------
345 -------
346 The msg_id of the message sent.
346 The msg_id of the message sent.
347 """
347 """
348 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
348 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
349 **kwargs)
349 **kwargs)
350 msg = self.session.msg('history_request', content)
350 msg = self.session.msg('history_request', content)
351 self._queue_request(msg)
351 self._queue_request(msg)
352 return msg['header']['msg_id']
352 return msg['header']['msg_id']
353
353
354 def shutdown(self, restart=False):
354 def shutdown(self, restart=False):
355 """Request an immediate kernel shutdown.
355 """Request an immediate kernel shutdown.
356
356
357 Upon receipt of the (empty) reply, client code can safely assume that
357 Upon receipt of the (empty) reply, client code can safely assume that
358 the kernel has shut down and it's safe to forcefully terminate it if
358 the kernel has shut down and it's safe to forcefully terminate it if
359 it's still alive.
359 it's still alive.
360
360
361 The kernel will send the reply via a function registered with Python's
361 The kernel will send the reply via a function registered with Python's
362 atexit module, ensuring it's truly done as the kernel is done with all
362 atexit module, ensuring it's truly done as the kernel is done with all
363 normal operation.
363 normal operation.
364 """
364 """
365 # Send quit message to kernel. Once we implement kernel-side setattr,
365 # Send quit message to kernel. Once we implement kernel-side setattr,
366 # this should probably be done that way, but for now this will do.
366 # this should probably be done that way, but for now this will do.
367 msg = self.session.msg('shutdown_request', {'restart':restart})
367 msg = self.session.msg('shutdown_request', {'restart':restart})
368 self._queue_request(msg)
368 self._queue_request(msg)
369 return msg['header']['msg_id']
369 return msg['header']['msg_id']
370
370
371 def _handle_events(self, socket, events):
371 def _handle_events(self, socket, events):
372 if events & POLLERR:
372 if events & POLLERR:
373 self._handle_err()
373 self._handle_err()
374 if events & POLLOUT:
374 if events & POLLOUT:
375 self._handle_send()
375 self._handle_send()
376 if events & POLLIN:
376 if events & POLLIN:
377 self._handle_recv()
377 self._handle_recv()
378
378
379 def _handle_recv(self):
379 def _handle_recv(self):
380 ident,msg = self.session.recv(self.socket, 0)
380 ident,msg = self.session.recv(self.socket, 0)
381 self.call_handlers(msg)
381 self.call_handlers(msg)
382
382
383 def _handle_send(self):
383 def _handle_send(self):
384 try:
384 try:
385 msg = self.command_queue.get(False)
385 msg = self.command_queue.get(False)
386 except Empty:
386 except Empty:
387 pass
387 pass
388 else:
388 else:
389 self.session.send(self.socket,msg)
389 self.session.send(self.socket,msg)
390 if self.command_queue.empty():
390 if self.command_queue.empty():
391 self.drop_io_state(POLLOUT)
391 self.drop_io_state(POLLOUT)
392
392
393 def _handle_err(self):
393 def _handle_err(self):
394 # We don't want to let this go silently, so eventually we should log.
394 # We don't want to let this go silently, so eventually we should log.
395 raise zmq.ZMQError()
395 raise zmq.ZMQError()
396
396
397 def _queue_request(self, msg):
397 def _queue_request(self, msg):
398 self.command_queue.put(msg)
398 self.command_queue.put(msg)
399 self.add_io_state(POLLOUT)
399 self.add_io_state(POLLOUT)
400
400
401
401
402 class SubSocketChannel(ZMQSocketChannel):
402 class SubSocketChannel(ZMQSocketChannel):
403 """The SUB channel which listens for messages that the kernel publishes.
403 """The SUB channel which listens for messages that the kernel publishes.
404 """
404 """
405
405
406 def __init__(self, context, session, address):
406 def __init__(self, context, session, address):
407 super(SubSocketChannel, self).__init__(context, session, address)
407 super(SubSocketChannel, self).__init__(context, session, address)
408 self.ioloop = ioloop.IOLoop()
408 self.ioloop = ioloop.IOLoop()
409
409
410 def run(self):
410 def run(self):
411 """The thread's main activity. Call start() instead."""
411 """The thread's main activity. Call start() instead."""
412 self.socket = self.context.socket(zmq.SUB)
412 self.socket = self.context.socket(zmq.SUB)
413 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
413 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
414 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
414 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
415 self.socket.connect('tcp://%s:%i' % self.address)
415 self.socket.connect('tcp://%s:%i' % self.address)
416 self.iostate = POLLIN|POLLERR
416 self.iostate = POLLIN|POLLERR
417 self.ioloop.add_handler(self.socket, self._handle_events,
417 self.ioloop.add_handler(self.socket, self._handle_events,
418 self.iostate)
418 self.iostate)
419 self._run_loop()
419 self._run_loop()
420
420
421 def stop(self):
421 def stop(self):
422 self.ioloop.stop()
422 self.ioloop.stop()
423 super(SubSocketChannel, self).stop()
423 super(SubSocketChannel, self).stop()
424
424
425 def call_handlers(self, msg):
425 def call_handlers(self, msg):
426 """This method is called in the ioloop thread when a message arrives.
426 """This method is called in the ioloop thread when a message arrives.
427
427
428 Subclasses should override this method to handle incoming messages.
428 Subclasses should override this method to handle incoming messages.
429 It is important to remember that this method is called in the thread
429 It is important to remember that this method is called in the thread
430 so that some logic must be done to ensure that the application leve
430 so that some logic must be done to ensure that the application leve
431 handlers are called in the application thread.
431 handlers are called in the application thread.
432 """
432 """
433 raise NotImplementedError('call_handlers must be defined in a subclass.')
433 raise NotImplementedError('call_handlers must be defined in a subclass.')
434
434
435 def flush(self, timeout=1.0):
435 def flush(self, timeout=1.0):
436 """Immediately processes all pending messages on the SUB channel.
436 """Immediately processes all pending messages on the SUB channel.
437
437
438 Callers should use this method to ensure that :method:`call_handlers`
438 Callers should use this method to ensure that :method:`call_handlers`
439 has been called for all messages that have been received on the
439 has been called for all messages that have been received on the
440 0MQ SUB socket of this channel.
440 0MQ SUB socket of this channel.
441
441
442 This method is thread safe.
442 This method is thread safe.
443
443
444 Parameters
444 Parameters
445 ----------
445 ----------
446 timeout : float, optional
446 timeout : float, optional
447 The maximum amount of time to spend flushing, in seconds. The
447 The maximum amount of time to spend flushing, in seconds. The
448 default is one second.
448 default is one second.
449 """
449 """
450 # We do the IOLoop callback process twice to ensure that the IOLoop
450 # We do the IOLoop callback process twice to ensure that the IOLoop
451 # gets to perform at least one full poll.
451 # gets to perform at least one full poll.
452 stop_time = time.time() + timeout
452 stop_time = time.time() + timeout
453 for i in xrange(2):
453 for i in xrange(2):
454 self._flushed = False
454 self._flushed = False
455 self.ioloop.add_callback(self._flush)
455 self.ioloop.add_callback(self._flush)
456 while not self._flushed and time.time() < stop_time:
456 while not self._flushed and time.time() < stop_time:
457 time.sleep(0.01)
457 time.sleep(0.01)
458
458
459 def _handle_events(self, socket, events):
459 def _handle_events(self, socket, events):
460 # Turn on and off POLLOUT depending on if we have made a request
460 # Turn on and off POLLOUT depending on if we have made a request
461 if events & POLLERR:
461 if events & POLLERR:
462 self._handle_err()
462 self._handle_err()
463 if events & POLLIN:
463 if events & POLLIN:
464 self._handle_recv()
464 self._handle_recv()
465
465
466 def _handle_err(self):
466 def _handle_err(self):
467 # We don't want to let this go silently, so eventually we should log.
467 # We don't want to let this go silently, so eventually we should log.
468 raise zmq.ZMQError()
468 raise zmq.ZMQError()
469
469
470 def _handle_recv(self):
470 def _handle_recv(self):
471 # Get all of the messages we can
471 # Get all of the messages we can
472 while True:
472 while True:
473 try:
473 try:
474 ident,msg = self.session.recv(self.socket)
474 ident,msg = self.session.recv(self.socket)
475 except zmq.ZMQError:
475 except zmq.ZMQError:
476 # Check the errno?
476 # Check the errno?
477 # Will this trigger POLLERR?
477 # Will this trigger POLLERR?
478 break
478 break
479 else:
479 else:
480 if msg is None:
480 if msg is None:
481 break
481 break
482 self.call_handlers(msg)
482 self.call_handlers(msg)
483
483
484 def _flush(self):
484 def _flush(self):
485 """Callback for :method:`self.flush`."""
485 """Callback for :method:`self.flush`."""
486 self._flushed = True
486 self._flushed = True
487
487
488
488
489 class StdInSocketChannel(ZMQSocketChannel):
489 class StdInSocketChannel(ZMQSocketChannel):
490 """A reply channel to handle raw_input requests that the kernel makes."""
490 """A reply channel to handle raw_input requests that the kernel makes."""
491
491
492 msg_queue = None
492 msg_queue = None
493
493
494 def __init__(self, context, session, address):
494 def __init__(self, context, session, address):
495 super(StdInSocketChannel, self).__init__(context, session, address)
495 super(StdInSocketChannel, self).__init__(context, session, address)
496 self.ioloop = ioloop.IOLoop()
496 self.ioloop = ioloop.IOLoop()
497 self.msg_queue = Queue()
497 self.msg_queue = Queue()
498
498
499 def run(self):
499 def run(self):
500 """The thread's main activity. Call start() instead."""
500 """The thread's main activity. Call start() instead."""
501 self.socket = self.context.socket(zmq.DEALER)
501 self.socket = self.context.socket(zmq.DEALER)
502 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
502 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
503 self.socket.connect('tcp://%s:%i' % self.address)
503 self.socket.connect('tcp://%s:%i' % self.address)
504 self.iostate = POLLERR|POLLIN
504 self.iostate = POLLERR|POLLIN
505 self.ioloop.add_handler(self.socket, self._handle_events,
505 self.ioloop.add_handler(self.socket, self._handle_events,
506 self.iostate)
506 self.iostate)
507 self._run_loop()
507 self._run_loop()
508
508
509 def stop(self):
509 def stop(self):
510 self.ioloop.stop()
510 self.ioloop.stop()
511 super(StdInSocketChannel, self).stop()
511 super(StdInSocketChannel, self).stop()
512
512
513 def call_handlers(self, msg):
513 def call_handlers(self, msg):
514 """This method is called in the ioloop thread when a message arrives.
514 """This method is called in the ioloop thread when a message arrives.
515
515
516 Subclasses should override this method to handle incoming messages.
516 Subclasses should override this method to handle incoming messages.
517 It is important to remember that this method is called in the thread
517 It is important to remember that this method is called in the thread
518 so that some logic must be done to ensure that the application leve
518 so that some logic must be done to ensure that the application leve
519 handlers are called in the application thread.
519 handlers are called in the application thread.
520 """
520 """
521 raise NotImplementedError('call_handlers must be defined in a subclass.')
521 raise NotImplementedError('call_handlers must be defined in a subclass.')
522
522
523 def input(self, string):
523 def input(self, string):
524 """Send a string of raw input to the kernel."""
524 """Send a string of raw input to the kernel."""
525 content = dict(value=string)
525 content = dict(value=string)
526 msg = self.session.msg('input_reply', content)
526 msg = self.session.msg('input_reply', content)
527 self._queue_reply(msg)
527 self._queue_reply(msg)
528
528
529 def _handle_events(self, socket, events):
529 def _handle_events(self, socket, events):
530 if events & POLLERR:
530 if events & POLLERR:
531 self._handle_err()
531 self._handle_err()
532 if events & POLLOUT:
532 if events & POLLOUT:
533 self._handle_send()
533 self._handle_send()
534 if events & POLLIN:
534 if events & POLLIN:
535 self._handle_recv()
535 self._handle_recv()
536
536
537 def _handle_recv(self):
537 def _handle_recv(self):
538 ident,msg = self.session.recv(self.socket, 0)
538 ident,msg = self.session.recv(self.socket, 0)
539 self.call_handlers(msg)
539 self.call_handlers(msg)
540
540
541 def _handle_send(self):
541 def _handle_send(self):
542 try:
542 try:
543 msg = self.msg_queue.get(False)
543 msg = self.msg_queue.get(False)
544 except Empty:
544 except Empty:
545 pass
545 pass
546 else:
546 else:
547 self.session.send(self.socket,msg)
547 self.session.send(self.socket,msg)
548 if self.msg_queue.empty():
548 if self.msg_queue.empty():
549 self.drop_io_state(POLLOUT)
549 self.drop_io_state(POLLOUT)
550
550
551 def _handle_err(self):
551 def _handle_err(self):
552 # We don't want to let this go silently, so eventually we should log.
552 # We don't want to let this go silently, so eventually we should log.
553 raise zmq.ZMQError()
553 raise zmq.ZMQError()
554
554
555 def _queue_reply(self, msg):
555 def _queue_reply(self, msg):
556 self.msg_queue.put(msg)
556 self.msg_queue.put(msg)
557 self.add_io_state(POLLOUT)
557 self.add_io_state(POLLOUT)
558
558
559
559
560 class HBSocketChannel(ZMQSocketChannel):
560 class HBSocketChannel(ZMQSocketChannel):
561 """The heartbeat channel which monitors the kernel heartbeat.
561 """The heartbeat channel which monitors the kernel heartbeat.
562
562
563 Note that the heartbeat channel is paused by default. As long as you start
563 Note that the heartbeat channel is paused by default. As long as you start
564 this channel, the kernel manager will ensure that it is paused and un-paused
564 this channel, the kernel manager will ensure that it is paused and un-paused
565 as appropriate.
565 as appropriate.
566 """
566 """
567
567
568 time_to_dead = 3.0
568 time_to_dead = 3.0
569 socket = None
569 socket = None
570 poller = None
570 poller = None
571 _running = None
571 _running = None
572 _pause = None
572 _pause = None
573
573
574 def __init__(self, context, session, address):
574 def __init__(self, context, session, address):
575 super(HBSocketChannel, self).__init__(context, session, address)
575 super(HBSocketChannel, self).__init__(context, session, address)
576 self._running = False
576 self._running = False
577 self._pause = True
577 self._pause = True
578
578
579 def _create_socket(self):
579 def _create_socket(self):
580 self.socket = self.context.socket(zmq.REQ)
580 self.socket = self.context.socket(zmq.REQ)
581 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
581 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
582 self.socket.connect('tcp://%s:%i' % self.address)
582 self.socket.connect('tcp://%s:%i' % self.address)
583 self.poller = zmq.Poller()
583 self.poller = zmq.Poller()
584 self.poller.register(self.socket, zmq.POLLIN)
584 self.poller.register(self.socket, zmq.POLLIN)
585
585
586 def run(self):
586 def run(self):
587 """The thread's main activity. Call start() instead."""
587 """The thread's main activity. Call start() instead."""
588 self._create_socket()
588 self._create_socket()
589 self._running = True
589 self._running = True
590 while self._running:
590 while self._running:
591 if self._pause:
591 if self._pause:
592 time.sleep(self.time_to_dead)
592 time.sleep(self.time_to_dead)
593 else:
593 else:
594 since_last_heartbeat = 0.0
594 since_last_heartbeat = 0.0
595 request_time = time.time()
595 request_time = time.time()
596 try:
596 try:
597 #io.rprint('Ping from HB channel') # dbg
597 #io.rprint('Ping from HB channel') # dbg
598 self.socket.send(b'ping')
598 self.socket.send(b'ping')
599 except zmq.ZMQError, e:
599 except zmq.ZMQError, e:
600 #io.rprint('*** HB Error:', e) # dbg
600 #io.rprint('*** HB Error:', e) # dbg
601 if e.errno == zmq.EFSM:
601 if e.errno == zmq.EFSM:
602 #io.rprint('sleep...', self.time_to_dead) # dbg
602 #io.rprint('sleep...', self.time_to_dead) # dbg
603 time.sleep(self.time_to_dead)
603 time.sleep(self.time_to_dead)
604 self._create_socket()
604 self._create_socket()
605 else:
605 else:
606 raise
606 raise
607 else:
607 else:
608 while True:
608 while True:
609 try:
609 try:
610 self.socket.recv(zmq.NOBLOCK)
610 self.socket.recv(zmq.NOBLOCK)
611 except zmq.ZMQError, e:
611 except zmq.ZMQError, e:
612 #io.rprint('*** HB Error 2:', e) # dbg
612 #io.rprint('*** HB Error 2:', e) # dbg
613 if e.errno == zmq.EAGAIN:
613 if e.errno == zmq.EAGAIN:
614 before_poll = time.time()
614 before_poll = time.time()
615 until_dead = self.time_to_dead - (before_poll -
615 until_dead = self.time_to_dead - (before_poll -
616 request_time)
616 request_time)
617
617
618 # When the return value of poll() is an empty
618 # When the return value of poll() is an empty
619 # list, that is when things have gone wrong
619 # list, that is when things have gone wrong
620 # (zeromq bug). As long as it is not an empty
620 # (zeromq bug). As long as it is not an empty
621 # list, poll is working correctly even if it
621 # list, poll is working correctly even if it
622 # returns quickly. Note: poll timeout is in
622 # returns quickly. Note: poll timeout is in
623 # milliseconds.
623 # milliseconds.
624 if until_dead > 0.0:
624 if until_dead > 0.0:
625 while True:
625 while True:
626 try:
626 try:
627 self.poller.poll(1000 * until_dead)
627 self.poller.poll(1000 * until_dead)
628 except zmq.ZMQError as e:
628 except zmq.ZMQError as e:
629 if e.errno == errno.EINTR:
629 if e.errno == errno.EINTR:
630 continue
630 continue
631 else:
631 else:
632 raise
632 raise
633 else:
633 else:
634 break
634 break
635
635
636 since_last_heartbeat = time.time()-request_time
636 since_last_heartbeat = time.time()-request_time
637 if since_last_heartbeat > self.time_to_dead:
637 if since_last_heartbeat > self.time_to_dead:
638 self.call_handlers(since_last_heartbeat)
638 self.call_handlers(since_last_heartbeat)
639 break
639 break
640 else:
640 else:
641 # FIXME: We should probably log this instead.
641 # FIXME: We should probably log this instead.
642 raise
642 raise
643 else:
643 else:
644 until_dead = self.time_to_dead - (time.time() -
644 until_dead = self.time_to_dead - (time.time() -
645 request_time)
645 request_time)
646 if until_dead > 0.0:
646 if until_dead > 0.0:
647 #io.rprint('sleep...', self.time_to_dead) # dbg
647 #io.rprint('sleep...', self.time_to_dead) # dbg
648 time.sleep(until_dead)
648 time.sleep(until_dead)
649 break
649 break
650
650
651 def pause(self):
651 def pause(self):
652 """Pause the heartbeat."""
652 """Pause the heartbeat."""
653 self._pause = True
653 self._pause = True
654
654
655 def unpause(self):
655 def unpause(self):
656 """Unpause the heartbeat."""
656 """Unpause the heartbeat."""
657 self._pause = False
657 self._pause = False
658
658
659 def is_beating(self):
659 def is_beating(self):
660 """Is the heartbeat running and not paused."""
660 """Is the heartbeat running and not paused."""
661 if self.is_alive() and not self._pause:
661 if self.is_alive() and not self._pause:
662 return True
662 return True
663 else:
663 else:
664 return False
664 return False
665
665
666 def stop(self):
666 def stop(self):
667 self._running = False
667 self._running = False
668 super(HBSocketChannel, self).stop()
668 super(HBSocketChannel, self).stop()
669
669
670 def call_handlers(self, since_last_heartbeat):
670 def call_handlers(self, since_last_heartbeat):
671 """This method is called in the ioloop thread when a message arrives.
671 """This method is called in the ioloop thread when a message arrives.
672
672
673 Subclasses should override this method to handle incoming messages.
673 Subclasses should override this method to handle incoming messages.
674 It is important to remember that this method is called in the thread
674 It is important to remember that this method is called in the thread
675 so that some logic must be done to ensure that the application leve
675 so that some logic must be done to ensure that the application leve
676 handlers are called in the application thread.
676 handlers are called in the application thread.
677 """
677 """
678 raise NotImplementedError('call_handlers must be defined in a subclass.')
678 raise NotImplementedError('call_handlers must be defined in a subclass.')
679
679
680
680
681 #-----------------------------------------------------------------------------
681 #-----------------------------------------------------------------------------
682 # Main kernel manager class
682 # Main kernel manager class
683 #-----------------------------------------------------------------------------
683 #-----------------------------------------------------------------------------
684
684
685 class KernelManager(HasTraits):
685 class KernelManager(HasTraits):
686 """ Manages a kernel for a frontend.
686 """ Manages a kernel for a frontend.
687
687
688 The SUB channel is for the frontend to receive messages published by the
688 The SUB channel is for the frontend to receive messages published by the
689 kernel.
689 kernel.
690
690
691 The REQ channel is for the frontend to make requests of the kernel.
691 The REQ channel is for the frontend to make requests of the kernel.
692
692
693 The REP channel is for the kernel to request stdin (raw_input) from the
693 The REP channel is for the kernel to request stdin (raw_input) from the
694 frontend.
694 frontend.
695 """
695 """
696 # config object for passing to child configurables
696 # config object for passing to child configurables
697 config = Instance(Config)
697 config = Instance(Config)
698
698
699 # The PyZMQ Context to use for communication with the kernel.
699 # The PyZMQ Context to use for communication with the kernel.
700 context = Instance(zmq.Context)
700 context = Instance(zmq.Context)
701 def _context_default(self):
701 def _context_default(self):
702 return zmq.Context.instance()
702 return zmq.Context.instance()
703
703
704 # The Session to use for communication with the kernel.
704 # The Session to use for communication with the kernel.
705 session = Instance(Session)
705 session = Instance(Session)
706
706
707 # The kernel process with which the KernelManager is communicating.
707 # The kernel process with which the KernelManager is communicating.
708 kernel = Instance(Popen)
708 kernel = Instance(Popen)
709
709
710 # The addresses for the communication channels.
710 # The addresses for the communication channels.
711 connection_file = Unicode('')
711 connection_file = Unicode('')
712 ip = Unicode(LOCALHOST)
712 ip = Unicode(LOCALHOST)
713 shell_port = Int(0)
713 shell_port = Int(0)
714 iopub_port = Int(0)
714 iopub_port = Int(0)
715 stdin_port = Int(0)
715 stdin_port = Int(0)
716 hb_port = Int(0)
716 hb_port = Int(0)
717
717
718 # The classes to use for the various channels.
718 # The classes to use for the various channels.
719 shell_channel_class = Type(ShellSocketChannel)
719 shell_channel_class = Type(ShellSocketChannel)
720 sub_channel_class = Type(SubSocketChannel)
720 sub_channel_class = Type(SubSocketChannel)
721 stdin_channel_class = Type(StdInSocketChannel)
721 stdin_channel_class = Type(StdInSocketChannel)
722 hb_channel_class = Type(HBSocketChannel)
722 hb_channel_class = Type(HBSocketChannel)
723
723
724 # Protected traits.
724 # Protected traits.
725 _launch_args = Any
725 _launch_args = Any
726 _shell_channel = Any
726 _shell_channel = Any
727 _sub_channel = Any
727 _sub_channel = Any
728 _stdin_channel = Any
728 _stdin_channel = Any
729 _hb_channel = Any
729 _hb_channel = Any
730 _connection_file_written=Bool(False)
730 _connection_file_written=Bool(False)
731
731
732 def __init__(self, **kwargs):
732 def __init__(self, **kwargs):
733 super(KernelManager, self).__init__(**kwargs)
733 super(KernelManager, self).__init__(**kwargs)
734 if self.session is None:
734 if self.session is None:
735 self.session = Session(config=self.config)
735 self.session = Session(config=self.config)
736
736
737 def __del__(self):
737 def __del__(self):
738 if self._connection_file_written:
738 if self._connection_file_written:
739 # cleanup connection files on full shutdown of kernel we started
739 # cleanup connection files on full shutdown of kernel we started
740 self._connection_file_written = False
740 self._connection_file_written = False
741 try:
741 try:
742 os.remove(self.connection_file)
742 os.remove(self.connection_file)
743 except IOError:
743 except IOError:
744 pass
744 pass
745
745
746
746
747 #--------------------------------------------------------------------------
747 #--------------------------------------------------------------------------
748 # Channel management methods:
748 # Channel management methods:
749 #--------------------------------------------------------------------------
749 #--------------------------------------------------------------------------
750
750
751 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
751 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
752 """Starts the channels for this kernel.
752 """Starts the channels for this kernel.
753
753
754 This will create the channels if they do not exist and then start
754 This will create the channels if they do not exist and then start
755 them. If port numbers of 0 are being used (random ports) then you
755 them. If port numbers of 0 are being used (random ports) then you
756 must first call :method:`start_kernel`. If the channels have been
756 must first call :method:`start_kernel`. If the channels have been
757 stopped and you call this, :class:`RuntimeError` will be raised.
757 stopped and you call this, :class:`RuntimeError` will be raised.
758 """
758 """
759 if shell:
759 if shell:
760 self.shell_channel.start()
760 self.shell_channel.start()
761 if sub:
761 if sub:
762 self.sub_channel.start()
762 self.sub_channel.start()
763 if stdin:
763 if stdin:
764 self.stdin_channel.start()
764 self.stdin_channel.start()
765 self.shell_channel.allow_stdin = True
765 self.shell_channel.allow_stdin = True
766 else:
766 else:
767 self.shell_channel.allow_stdin = False
767 self.shell_channel.allow_stdin = False
768 if hb:
768 if hb:
769 self.hb_channel.start()
769 self.hb_channel.start()
770
770
771 def stop_channels(self):
771 def stop_channels(self):
772 """Stops all the running channels for this kernel.
772 """Stops all the running channels for this kernel.
773 """
773 """
774 if self.shell_channel.is_alive():
774 if self.shell_channel.is_alive():
775 self.shell_channel.stop()
775 self.shell_channel.stop()
776 if self.sub_channel.is_alive():
776 if self.sub_channel.is_alive():
777 self.sub_channel.stop()
777 self.sub_channel.stop()
778 if self.stdin_channel.is_alive():
778 if self.stdin_channel.is_alive():
779 self.stdin_channel.stop()
779 self.stdin_channel.stop()
780 if self.hb_channel.is_alive():
780 if self.hb_channel.is_alive():
781 self.hb_channel.stop()
781 self.hb_channel.stop()
782
782
783 @property
783 @property
784 def channels_running(self):
784 def channels_running(self):
785 """Are any of the channels created and running?"""
785 """Are any of the channels created and running?"""
786 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
786 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
787 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
788
788
789 #--------------------------------------------------------------------------
789 #--------------------------------------------------------------------------
790 # Kernel process management methods:
790 # Kernel process management methods:
791 #--------------------------------------------------------------------------
791 #--------------------------------------------------------------------------
792
792
793 def load_connection_file(self):
793 def load_connection_file(self):
794 """load connection info from JSON dict in self.connection_file"""
794 """load connection info from JSON dict in self.connection_file"""
795 with open(self.connection_file) as f:
795 with open(self.connection_file) as f:
796 cfg = json.loads(f.read())
796 cfg = json.loads(f.read())
797
797
798 self.ip = cfg['ip']
798 self.ip = cfg['ip']
799 self.shell_port = cfg['shell_port']
799 self.shell_port = cfg['shell_port']
800 self.stdin_port = cfg['stdin_port']
800 self.stdin_port = cfg['stdin_port']
801 self.iopub_port = cfg['iopub_port']
801 self.iopub_port = cfg['iopub_port']
802 self.hb_port = cfg['hb_port']
802 self.hb_port = cfg['hb_port']
803 self.session.key = str_to_bytes(cfg['key'])
803 self.session.key = str_to_bytes(cfg['key'])
804
804
805 def write_connection_file(self):
805 def write_connection_file(self):
806 """write connection info to JSON dict in self.connection_file"""
806 """write connection info to JSON dict in self.connection_file"""
807 if self._connection_file_written:
807 if self._connection_file_written:
808 return
808 return
809 self.connection_file,cfg = write_connection_file(self.connection_file,
809 self.connection_file,cfg = write_connection_file(self.connection_file,
810 ip=self.ip, key=self.session.key,
810 ip=self.ip, key=self.session.key,
811 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
811 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
812 shell_port=self.shell_port, hb_port=self.hb_port)
812 shell_port=self.shell_port, hb_port=self.hb_port)
813 # write_connection_file also sets default ports:
813 # write_connection_file also sets default ports:
814 self.shell_port = cfg['shell_port']
814 self.shell_port = cfg['shell_port']
815 self.stdin_port = cfg['stdin_port']
815 self.stdin_port = cfg['stdin_port']
816 self.iopub_port = cfg['iopub_port']
816 self.iopub_port = cfg['iopub_port']
817 self.hb_port = cfg['hb_port']
817 self.hb_port = cfg['hb_port']
818
818
819 self._connection_file_written = True
819 self._connection_file_written = True
820
820
821 def start_kernel(self, **kw):
821 def start_kernel(self, **kw):
822 """Starts a kernel process and configures the manager to use it.
822 """Starts a kernel process and configures the manager to use it.
823
823
824 If random ports (port=0) are being used, this method must be called
824 If random ports (port=0) are being used, this method must be called
825 before the channels are created.
825 before the channels are created.
826
826
827 Parameters:
827 Parameters:
828 -----------
828 -----------
829 ipython : bool, optional (default True)
829 ipython : bool, optional (default True)
830 Whether to use an IPython kernel instead of a plain Python kernel.
830 Whether to use an IPython kernel instead of a plain Python kernel.
831
831
832 launcher : callable, optional (default None)
832 launcher : callable, optional (default None)
833 A custom function for launching the kernel process (generally a
833 A custom function for launching the kernel process (generally a
834 wrapper around ``entry_point.base_launch_kernel``). In most cases,
834 wrapper around ``entry_point.base_launch_kernel``). In most cases,
835 it should not be necessary to use this parameter.
835 it should not be necessary to use this parameter.
836
836
837 **kw : optional
837 **kw : optional
838 See respective options for IPython and Python kernels.
838 See respective options for IPython and Python kernels.
839 """
839 """
840 if self.ip not in LOCAL_IPS:
840 if self.ip not in LOCAL_IPS:
841 raise RuntimeError("Can only launch a kernel on a local interface. "
841 raise RuntimeError("Can only launch a kernel on a local interface. "
842 "Make sure that the '*_address' attributes are "
842 "Make sure that the '*_address' attributes are "
843 "configured properly. "
843 "configured properly. "
844 "Currently valid addresses are: %s"%LOCAL_IPS
844 "Currently valid addresses are: %s"%LOCAL_IPS
845 )
845 )
846
846
847 # write connection file / get default ports
847 # write connection file / get default ports
848 self.write_connection_file()
848 self.write_connection_file()
849
849
850 self._launch_args = kw.copy()
850 self._launch_args = kw.copy()
851 launch_kernel = kw.pop('launcher', None)
851 launch_kernel = kw.pop('launcher', None)
852 if launch_kernel is None:
852 if launch_kernel is None:
853 if kw.pop('ipython', True):
853 if kw.pop('ipython', True):
854 from ipkernel import launch_kernel
854 from ipkernel import launch_kernel
855 else:
855 else:
856 from pykernel import launch_kernel
856 from pykernel import launch_kernel
857 self.kernel = launch_kernel(fname=self.connection_file, **kw)
857 self.kernel = launch_kernel(fname=self.connection_file, **kw)
858
858
859 def shutdown_kernel(self, restart=False):
859 def shutdown_kernel(self, restart=False):
860 """ Attempts to the stop the kernel process cleanly. If the kernel
860 """ Attempts to the stop the kernel process cleanly. If the kernel
861 cannot be stopped, it is killed, if possible.
861 cannot be stopped, it is killed, if possible.
862 """
862 """
863 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
863 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
864 if sys.platform == 'win32':
864 if sys.platform == 'win32':
865 self.kill_kernel()
865 self.kill_kernel()
866 return
866 return
867
867
868 # Pause the heart beat channel if it exists.
868 # Pause the heart beat channel if it exists.
869 if self._hb_channel is not None:
869 if self._hb_channel is not None:
870 self._hb_channel.pause()
870 self._hb_channel.pause()
871
871
872 # Don't send any additional kernel kill messages immediately, to give
872 # Don't send any additional kernel kill messages immediately, to give
873 # the kernel a chance to properly execute shutdown actions. Wait for at
873 # the kernel a chance to properly execute shutdown actions. Wait for at
874 # most 1s, checking every 0.1s.
874 # most 1s, checking every 0.1s.
875 self.shell_channel.shutdown(restart=restart)
875 self.shell_channel.shutdown(restart=restart)
876 for i in range(10):
876 for i in range(10):
877 if self.is_alive:
877 if self.is_alive:
878 time.sleep(0.1)
878 time.sleep(0.1)
879 else:
879 else:
880 break
880 break
881 else:
881 else:
882 # OK, we've waited long enough.
882 # OK, we've waited long enough.
883 if self.has_kernel:
883 if self.has_kernel:
884 self.kill_kernel()
884 self.kill_kernel()
885
885
886 if not restart and self._connection_file_written:
886 if not restart and self._connection_file_written:
887 # cleanup connection files on full shutdown of kernel we started
887 # cleanup connection files on full shutdown of kernel we started
888 self._connection_file_written = False
888 self._connection_file_written = False
889 try:
889 try:
890 os.remove(self.connection_file)
890 os.remove(self.connection_file)
891 except IOError:
891 except IOError:
892 pass
892 pass
893
893
894 def restart_kernel(self, now=False, **kw):
894 def restart_kernel(self, now=False, **kw):
895 """Restarts a kernel with the arguments that were used to launch it.
895 """Restarts a kernel with the arguments that were used to launch it.
896
896
897 If the old kernel was launched with random ports, the same ports will be
897 If the old kernel was launched with random ports, the same ports will be
898 used for the new kernel.
898 used for the new kernel.
899
899
900 Parameters
900 Parameters
901 ----------
901 ----------
902 now : bool, optional
902 now : bool, optional
903 If True, the kernel is forcefully restarted *immediately*, without
903 If True, the kernel is forcefully restarted *immediately*, without
904 having a chance to do any cleanup action. Otherwise the kernel is
904 having a chance to do any cleanup action. Otherwise the kernel is
905 given 1s to clean up before a forceful restart is issued.
905 given 1s to clean up before a forceful restart is issued.
906
906
907 In all cases the kernel is restarted, the only difference is whether
907 In all cases the kernel is restarted, the only difference is whether
908 it is given a chance to perform a clean shutdown or not.
908 it is given a chance to perform a clean shutdown or not.
909
909
910 **kw : optional
910 **kw : optional
911 Any options specified here will replace those used to launch the
911 Any options specified here will replace those used to launch the
912 kernel.
912 kernel.
913 """
913 """
914 if self._launch_args is None:
914 if self._launch_args is None:
915 raise RuntimeError("Cannot restart the kernel. "
915 raise RuntimeError("Cannot restart the kernel. "
916 "No previous call to 'start_kernel'.")
916 "No previous call to 'start_kernel'.")
917 else:
917 else:
918 # Stop currently running kernel.
918 # Stop currently running kernel.
919 if self.has_kernel:
919 if self.has_kernel:
920 if now:
920 if now:
921 self.kill_kernel()
921 self.kill_kernel()
922 else:
922 else:
923 self.shutdown_kernel(restart=True)
923 self.shutdown_kernel(restart=True)
924
924
925 # Start new kernel.
925 # Start new kernel.
926 self._launch_args.update(kw)
926 self._launch_args.update(kw)
927 self.start_kernel(**self._launch_args)
927 self.start_kernel(**self._launch_args)
928
928
929 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
929 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
930 # unless there is some delay here.
930 # unless there is some delay here.
931 if sys.platform == 'win32':
931 if sys.platform == 'win32':
932 time.sleep(0.2)
932 time.sleep(0.2)
933
933
934 @property
934 @property
935 def has_kernel(self):
935 def has_kernel(self):
936 """Returns whether a kernel process has been specified for the kernel
936 """Returns whether a kernel process has been specified for the kernel
937 manager.
937 manager.
938 """
938 """
939 return self.kernel is not None
939 return self.kernel is not None
940
940
941 def kill_kernel(self):
941 def kill_kernel(self):
942 """ Kill the running kernel. """
942 """ Kill the running kernel. """
943 if self.has_kernel:
943 if self.has_kernel:
944 # Pause the heart beat channel if it exists.
944 # Pause the heart beat channel if it exists.
945 if self._hb_channel is not None:
945 if self._hb_channel is not None:
946 self._hb_channel.pause()
946 self._hb_channel.pause()
947
947
948 # Attempt to kill the kernel.
948 # Attempt to kill the kernel.
949 try:
949 try:
950 self.kernel.kill()
950 self.kernel.kill()
951 except OSError, e:
951 except OSError, e:
952 # In Windows, we will get an Access Denied error if the process
952 # In Windows, we will get an Access Denied error if the process
953 # has already terminated. Ignore it.
953 # has already terminated. Ignore it.
954 if sys.platform == 'win32':
954 if sys.platform == 'win32':
955 if e.winerror != 5:
955 if e.winerror != 5:
956 raise
956 raise
957 # On Unix, we may get an ESRCH error if the process has already
957 # On Unix, we may get an ESRCH error if the process has already
958 # terminated. Ignore it.
958 # terminated. Ignore it.
959 else:
959 else:
960 from errno import ESRCH
960 from errno import ESRCH
961 if e.errno != ESRCH:
961 if e.errno != ESRCH:
962 raise
962 raise
963 self.kernel = None
963 self.kernel = None
964 else:
964 else:
965 raise RuntimeError("Cannot kill kernel. No kernel is running!")
965 raise RuntimeError("Cannot kill kernel. No kernel is running!")
966
966
967 def interrupt_kernel(self):
967 def interrupt_kernel(self):
968 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
968 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
969 well supported on all platforms.
969 well supported on all platforms.
970 """
970 """
971 if self.has_kernel:
971 if self.has_kernel:
972 if sys.platform == 'win32':
972 if sys.platform == 'win32':
973 from parentpoller import ParentPollerWindows as Poller
973 from parentpoller import ParentPollerWindows as Poller
974 Poller.send_interrupt(self.kernel.win32_interrupt_event)
974 Poller.send_interrupt(self.kernel.win32_interrupt_event)
975 else:
975 else:
976 self.kernel.send_signal(signal.SIGINT)
976 self.kernel.send_signal(signal.SIGINT)
977 else:
977 else:
978 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
978 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
979
979
980 def signal_kernel(self, signum):
980 def signal_kernel(self, signum):
981 """ Sends a signal to the kernel. Note that since only SIGTERM is
981 """ Sends a signal to the kernel. Note that since only SIGTERM is
982 supported on Windows, this function is only useful on Unix systems.
982 supported on Windows, this function is only useful on Unix systems.
983 """
983 """
984 if self.has_kernel:
984 if self.has_kernel:
985 self.kernel.send_signal(signum)
985 self.kernel.send_signal(signum)
986 else:
986 else:
987 raise RuntimeError("Cannot signal kernel. No kernel is running!")
987 raise RuntimeError("Cannot signal kernel. No kernel is running!")
988
988
989 @property
989 @property
990 def is_alive(self):
990 def is_alive(self):
991 """Is the kernel process still running?"""
991 """Is the kernel process still running?"""
992 # FIXME: not using a heartbeat means this method is broken for any
992 # FIXME: not using a heartbeat means this method is broken for any
993 # remote kernel, it's only capable of handling local kernels.
993 # remote kernel, it's only capable of handling local kernels.
994 if self.has_kernel:
994 if self.has_kernel:
995 if self.kernel.poll() is None:
995 if self.kernel.poll() is None:
996 return True
996 return True
997 else:
997 else:
998 return False
998 return False
999 else:
999 else:
1000 # We didn't start the kernel with this KernelManager so we don't
1000 # We didn't start the kernel with this KernelManager so we don't
1001 # know if it is running. We should use a heartbeat for this case.
1001 # know if it is running. We should use a heartbeat for this case.
1002 return True
1002 return True
1003
1003
1004 #--------------------------------------------------------------------------
1004 #--------------------------------------------------------------------------
1005 # Channels used for communication with the kernel:
1005 # Channels used for communication with the kernel:
1006 #--------------------------------------------------------------------------
1006 #--------------------------------------------------------------------------
1007
1007
1008 @property
1008 @property
1009 def shell_channel(self):
1009 def shell_channel(self):
1010 """Get the REQ socket channel object to make requests of the kernel."""
1010 """Get the REQ socket channel object to make requests of the kernel."""
1011 if self._shell_channel is None:
1011 if self._shell_channel is None:
1012 self._shell_channel = self.shell_channel_class(self.context,
1012 self._shell_channel = self.shell_channel_class(self.context,
1013 self.session,
1013 self.session,
1014 (self.ip, self.shell_port))
1014 (self.ip, self.shell_port))
1015 return self._shell_channel
1015 return self._shell_channel
1016
1016
1017 @property
1017 @property
1018 def sub_channel(self):
1018 def sub_channel(self):
1019 """Get the SUB socket channel object."""
1019 """Get the SUB socket channel object."""
1020 if self._sub_channel is None:
1020 if self._sub_channel is None:
1021 self._sub_channel = self.sub_channel_class(self.context,
1021 self._sub_channel = self.sub_channel_class(self.context,
1022 self.session,
1022 self.session,
1023 (self.ip, self.iopub_port))
1023 (self.ip, self.iopub_port))
1024 return self._sub_channel
1024 return self._sub_channel
1025
1025
1026 @property
1026 @property
1027 def stdin_channel(self):
1027 def stdin_channel(self):
1028 """Get the REP socket channel object to handle stdin (raw_input)."""
1028 """Get the REP socket channel object to handle stdin (raw_input)."""
1029 if self._stdin_channel is None:
1029 if self._stdin_channel is None:
1030 self._stdin_channel = self.stdin_channel_class(self.context,
1030 self._stdin_channel = self.stdin_channel_class(self.context,
1031 self.session,
1031 self.session,
1032 (self.ip, self.stdin_port))
1032 (self.ip, self.stdin_port))
1033 return self._stdin_channel
1033 return self._stdin_channel
1034
1034
1035 @property
1035 @property
1036 def hb_channel(self):
1036 def hb_channel(self):
1037 """Get the heartbeat socket channel object to check that the
1037 """Get the heartbeat socket channel object to check that the
1038 kernel is alive."""
1038 kernel is alive."""
1039 if self._hb_channel is None:
1039 if self._hb_channel is None:
1040 self._hb_channel = self.hb_channel_class(self.context,
1040 self._hb_channel = self.hb_channel_class(self.context,
1041 self.session,
1041 self.session,
1042 (self.ip, self.hb_port))
1042 (self.ip, self.hb_port))
1043 return self._hb_channel
1043 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now