##// END OF EJS Templates
Fixes to the heartbeat channel...
MinRK -
Show More
@@ -1,152 +1,154 b''
1 """Implement a fully blocking kernel manager.
1 """Implement a fully blocking kernel manager.
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 # Stdlib
17 # Stdlib
18 from Queue import Queue, Empty
18 from Queue import Queue, Empty
19 from threading import Event
19 from threading import Event
20
20
21 # Our own
21 # Our own
22 from IPython.utils import io
22 from IPython.utils import io
23 from IPython.utils.traitlets import Type
23 from IPython.utils.traitlets import Type
24
24
25 from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
25 from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
26 ShellSocketChannel, StdInSocketChannel)
26 ShellSocketChannel, StdInSocketChannel)
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Functions and classes
29 # Functions and classes
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32 class BlockingSubSocketChannel(SubSocketChannel):
32 class BlockingSubSocketChannel(SubSocketChannel):
33
33
34 def __init__(self, context, session, address=None):
34 def __init__(self, context, session, address=None):
35 super(BlockingSubSocketChannel, self).__init__(context, session,
35 super(BlockingSubSocketChannel, self).__init__(context, session,
36 address)
36 address)
37 self._in_queue = Queue()
37 self._in_queue = Queue()
38
38
39 def call_handlers(self, msg):
39 def call_handlers(self, msg):
40 #io.rprint('[[Sub]]', msg) # dbg
40 #io.rprint('[[Sub]]', msg) # dbg
41 self._in_queue.put(msg)
41 self._in_queue.put(msg)
42
42
43 def msg_ready(self):
43 def msg_ready(self):
44 """Is there a message that has been received?"""
44 """Is there a message that has been received?"""
45 if self._in_queue.qsize() == 0:
45 if self._in_queue.qsize() == 0:
46 return False
46 return False
47 else:
47 else:
48 return True
48 return True
49
49
50 def get_msg(self, block=True, timeout=None):
50 def get_msg(self, block=True, timeout=None):
51 """Get a message if there is one that is ready."""
51 """Get a message if there is one that is ready."""
52 if block and timeout is None:
52 if block and timeout is None:
53 # never use timeout=None, because get
53 # never use timeout=None, because get
54 # becomes uninterruptible
54 # becomes uninterruptible
55 timeout = 1e6
55 timeout = 1e6
56 return self._in_queue.get(block, timeout)
56 return self._in_queue.get(block, timeout)
57
57
58 def get_msgs(self):
58 def get_msgs(self):
59 """Get all messages that are currently ready."""
59 """Get all messages that are currently ready."""
60 msgs = []
60 msgs = []
61 while True:
61 while True:
62 try:
62 try:
63 msgs.append(self.get_msg(block=False))
63 msgs.append(self.get_msg(block=False))
64 except Empty:
64 except Empty:
65 break
65 break
66 return msgs
66 return msgs
67
67
68
68
69 class BlockingShellSocketChannel(ShellSocketChannel):
69 class BlockingShellSocketChannel(ShellSocketChannel):
70
70
71 def __init__(self, context, session, address=None):
71 def __init__(self, context, session, address=None):
72 super(BlockingShellSocketChannel, self).__init__(context, session,
72 super(BlockingShellSocketChannel, self).__init__(context, session,
73 address)
73 address)
74 self._in_queue = Queue()
74 self._in_queue = Queue()
75
75
76 def call_handlers(self, msg):
76 def call_handlers(self, msg):
77 #io.rprint('[[Shell]]', msg) # dbg
77 #io.rprint('[[Shell]]', msg) # dbg
78 self._in_queue.put(msg)
78 self._in_queue.put(msg)
79
79
80 def msg_ready(self):
80 def msg_ready(self):
81 """Is there a message that has been received?"""
81 """Is there a message that has been received?"""
82 if self._in_queue.qsize() == 0:
82 if self._in_queue.qsize() == 0:
83 return False
83 return False
84 else:
84 else:
85 return True
85 return True
86
86
87 def get_msg(self, block=True, timeout=None):
87 def get_msg(self, block=True, timeout=None):
88 """Get a message if there is one that is ready."""
88 """Get a message if there is one that is ready."""
89 if block and timeout is None:
89 if block and timeout is None:
90 # never use timeout=None, because get
90 # never use timeout=None, because get
91 # becomes uninterruptible
91 # becomes uninterruptible
92 timeout = 1e6
92 timeout = 1e6
93 return self._in_queue.get(block, timeout)
93 return self._in_queue.get(block, timeout)
94
94
95 def get_msgs(self):
95 def get_msgs(self):
96 """Get all messages that are currently ready."""
96 """Get all messages that are currently ready."""
97 msgs = []
97 msgs = []
98 while True:
98 while True:
99 try:
99 try:
100 msgs.append(self.get_msg(block=False))
100 msgs.append(self.get_msg(block=False))
101 except Empty:
101 except Empty:
102 break
102 break
103 return msgs
103 return msgs
104
104
105
105
106 class BlockingStdInSocketChannel(StdInSocketChannel):
106 class BlockingStdInSocketChannel(StdInSocketChannel):
107
107
108 def __init__(self, context, session, address=None):
108 def __init__(self, context, session, address=None):
109 super(BlockingStdInSocketChannel, self).__init__(context, session, address)
109 super(BlockingStdInSocketChannel, self).__init__(context, session, address)
110 self._in_queue = Queue()
110 self._in_queue = Queue()
111
111
112 def call_handlers(self, msg):
112 def call_handlers(self, msg):
113 #io.rprint('[[Rep]]', msg) # dbg
113 #io.rprint('[[Rep]]', msg) # dbg
114 self._in_queue.put(msg)
114 self._in_queue.put(msg)
115
115
116 def get_msg(self, block=True, timeout=None):
116 def get_msg(self, block=True, timeout=None):
117 "Gets a message if there is one that is ready."
117 "Gets a message if there is one that is ready."
118 return self._in_queue.get(block, timeout)
118 return self._in_queue.get(block, timeout)
119
119
120 def get_msgs(self):
120 def get_msgs(self):
121 """Get all messages that are currently ready."""
121 """Get all messages that are currently ready."""
122 msgs = []
122 msgs = []
123 while True:
123 while True:
124 try:
124 try:
125 msgs.append(self.get_msg(block=False))
125 msgs.append(self.get_msg(block=False))
126 except Empty:
126 except Empty:
127 break
127 break
128 return msgs
128 return msgs
129
129
130 def msg_ready(self):
130 def msg_ready(self):
131 "Is there a message that has been received?"
131 "Is there a message that has been received?"
132 return not self._in_queue.empty()
132 return not self._in_queue.empty()
133
133
134
134
135 class BlockingHBSocketChannel(HBSocketChannel):
135 class BlockingHBSocketChannel(HBSocketChannel):
136
136
137 # This kernel needs rapid monitoring capabilities
137 # This kernel needs quicker monitoring, shorten to 1 sec.
138 time_to_dead = 0.2
138 # less than 0.5s is unreliable, and will get occasional
139 # false reports of missed beats.
140 time_to_dead = 1.
139
141
140 def call_handlers(self, since_last_heartbeat):
142 def call_handlers(self, since_last_heartbeat):
141 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
143 """pause beating on missed heartbeat"""
142 pass
144 pass
143
145
144
146
145 class BlockingKernelManager(KernelManager):
147 class BlockingKernelManager(KernelManager):
146
148
147 # The classes to use for the various channels.
149 # The classes to use for the various channels.
148 shell_channel_class = Type(BlockingShellSocketChannel)
150 shell_channel_class = Type(BlockingShellSocketChannel)
149 sub_channel_class = Type(BlockingSubSocketChannel)
151 sub_channel_class = Type(BlockingSubSocketChannel)
150 stdin_channel_class = Type(BlockingStdInSocketChannel)
152 stdin_channel_class = Type(BlockingStdInSocketChannel)
151 hb_channel_class = Type(BlockingHBSocketChannel)
153 hb_channel_class = Type(BlockingHBSocketChannel)
152
154
@@ -1,954 +1,962 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2011 The IPython Development Team
8 # Copyright (C) 2008-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import errno
19 import errno
20 import json
20 import json
21 from subprocess import Popen
21 from subprocess import Popen
22 import os
22 import os
23 import signal
23 import signal
24 import sys
24 import sys
25 from threading import Thread
25 from threading import Thread
26 import time
26 import time
27
27
28 # System library imports.
28 # System library imports.
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop, zmqstream
30 from zmq.eventloop import ioloop, zmqstream
31
31
32 # Local imports.
32 # Local imports.
33 from IPython.config.loader import Config
33 from IPython.config.loader import Config
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.traitlets import (
35 from IPython.utils.traitlets import (
36 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
36 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
37 )
37 )
38 from IPython.utils.py3compat import str_to_bytes
38 from IPython.utils.py3compat import str_to_bytes
39 from IPython.zmq.entry_point import write_connection_file
39 from IPython.zmq.entry_point import write_connection_file
40 from session import Session
40 from session import Session
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Constants and exceptions
43 # Constants and exceptions
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 class InvalidPortNumber(Exception):
46 class InvalidPortNumber(Exception):
47 pass
47 pass
48
48
49 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
50 # Utility functions
50 # Utility functions
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52
52
53 # some utilities to validate message structure, these might get moved elsewhere
53 # some utilities to validate message structure, these might get moved elsewhere
54 # if they prove to have more generic utility
54 # if they prove to have more generic utility
55
55
56 def validate_string_list(lst):
56 def validate_string_list(lst):
57 """Validate that the input is a list of strings.
57 """Validate that the input is a list of strings.
58
58
59 Raises ValueError if not."""
59 Raises ValueError if not."""
60 if not isinstance(lst, list):
60 if not isinstance(lst, list):
61 raise ValueError('input %r must be a list' % lst)
61 raise ValueError('input %r must be a list' % lst)
62 for x in lst:
62 for x in lst:
63 if not isinstance(x, basestring):
63 if not isinstance(x, basestring):
64 raise ValueError('element %r in list must be a string' % x)
64 raise ValueError('element %r in list must be a string' % x)
65
65
66
66
67 def validate_string_dict(dct):
67 def validate_string_dict(dct):
68 """Validate that the input is a dict with string keys and values.
68 """Validate that the input is a dict with string keys and values.
69
69
70 Raises ValueError if not."""
70 Raises ValueError if not."""
71 for k,v in dct.iteritems():
71 for k,v in dct.iteritems():
72 if not isinstance(k, basestring):
72 if not isinstance(k, basestring):
73 raise ValueError('key %r in dict must be a string' % k)
73 raise ValueError('key %r in dict must be a string' % k)
74 if not isinstance(v, basestring):
74 if not isinstance(v, basestring):
75 raise ValueError('value %r in dict must be a string' % v)
75 raise ValueError('value %r in dict must be a string' % v)
76
76
77
77
78 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
79 # ZMQ Socket Channel classes
79 # ZMQ Socket Channel classes
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81
81
82 class ZMQSocketChannel(Thread):
82 class ZMQSocketChannel(Thread):
83 """The base class for the channels that use ZMQ sockets.
83 """The base class for the channels that use ZMQ sockets.
84 """
84 """
85 context = None
85 context = None
86 session = None
86 session = None
87 socket = None
87 socket = None
88 ioloop = None
88 ioloop = None
89 stream = None
89 stream = None
90 _address = None
90 _address = None
91
91
92 def __init__(self, context, session, address):
92 def __init__(self, context, session, address):
93 """Create a channel
93 """Create a channel
94
94
95 Parameters
95 Parameters
96 ----------
96 ----------
97 context : :class:`zmq.Context`
97 context : :class:`zmq.Context`
98 The ZMQ context to use.
98 The ZMQ context to use.
99 session : :class:`session.Session`
99 session : :class:`session.Session`
100 The session to use.
100 The session to use.
101 address : tuple
101 address : tuple
102 Standard (ip, port) tuple that the kernel is listening on.
102 Standard (ip, port) tuple that the kernel is listening on.
103 """
103 """
104 super(ZMQSocketChannel, self).__init__()
104 super(ZMQSocketChannel, self).__init__()
105 self.daemon = True
105 self.daemon = True
106
106
107 self.context = context
107 self.context = context
108 self.session = session
108 self.session = session
109 if address[1] == 0:
109 if address[1] == 0:
110 message = 'The port number for a channel cannot be 0.'
110 message = 'The port number for a channel cannot be 0.'
111 raise InvalidPortNumber(message)
111 raise InvalidPortNumber(message)
112 self._address = address
112 self._address = address
113
113
114 def _run_loop(self):
114 def _run_loop(self):
115 """Run my loop, ignoring EINTR events in the poller"""
115 """Run my loop, ignoring EINTR events in the poller"""
116 while True:
116 while True:
117 try:
117 try:
118 self.ioloop.start()
118 self.ioloop.start()
119 except zmq.ZMQError as e:
119 except zmq.ZMQError as e:
120 if e.errno == errno.EINTR:
120 if e.errno == errno.EINTR:
121 continue
121 continue
122 else:
122 else:
123 raise
123 raise
124 else:
124 else:
125 break
125 break
126
126
127 def stop(self):
127 def stop(self):
128 """Stop the channel's activity.
128 """Stop the channel's activity.
129
129
130 This calls :method:`Thread.join` and returns when the thread
130 This calls :method:`Thread.join` and returns when the thread
131 terminates. :class:`RuntimeError` will be raised if
131 terminates. :class:`RuntimeError` will be raised if
132 :method:`self.start` is called again.
132 :method:`self.start` is called again.
133 """
133 """
134 self.join()
134 self.join()
135
135
136 @property
136 @property
137 def address(self):
137 def address(self):
138 """Get the channel's address as an (ip, port) tuple.
138 """Get the channel's address as an (ip, port) tuple.
139
139
140 By the default, the address is (localhost, 0), where 0 means a random
140 By the default, the address is (localhost, 0), where 0 means a random
141 port.
141 port.
142 """
142 """
143 return self._address
143 return self._address
144
144
145 def _queue_send(self, msg):
145 def _queue_send(self, msg):
146 """Queue a message to be sent from the IOLoop's thread.
146 """Queue a message to be sent from the IOLoop's thread.
147
147
148 Parameters
148 Parameters
149 ----------
149 ----------
150 msg : message to send
150 msg : message to send
151
151
152 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
152 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
153 thread control of the action.
153 thread control of the action.
154 """
154 """
155 def thread_send():
155 def thread_send():
156 self.session.send(self.stream, msg)
156 self.session.send(self.stream, msg)
157 self.ioloop.add_callback(thread_send)
157 self.ioloop.add_callback(thread_send)
158
158
159 def _handle_recv(self, msg):
159 def _handle_recv(self, msg):
160 """callback for stream.on_recv
160 """callback for stream.on_recv
161
161
162 unpacks message, and calls handlers with it.
162 unpacks message, and calls handlers with it.
163 """
163 """
164 ident,smsg = self.session.feed_identities(msg)
164 ident,smsg = self.session.feed_identities(msg)
165 self.call_handlers(self.session.unserialize(smsg))
165 self.call_handlers(self.session.unserialize(smsg))
166
166
167
167
168
168
169 class ShellSocketChannel(ZMQSocketChannel):
169 class ShellSocketChannel(ZMQSocketChannel):
170 """The XREQ channel for issues request/replies to the kernel.
170 """The XREQ channel for issues request/replies to the kernel.
171 """
171 """
172
172
173 command_queue = None
173 command_queue = None
174 # flag for whether execute requests should be allowed to call raw_input:
174 # flag for whether execute requests should be allowed to call raw_input:
175 allow_stdin = True
175 allow_stdin = True
176
176
177 def __init__(self, context, session, address):
177 def __init__(self, context, session, address):
178 super(ShellSocketChannel, self).__init__(context, session, address)
178 super(ShellSocketChannel, self).__init__(context, session, address)
179 self.ioloop = ioloop.IOLoop()
179 self.ioloop = ioloop.IOLoop()
180
180
181 def run(self):
181 def run(self):
182 """The thread's main activity. Call start() instead."""
182 """The thread's main activity. Call start() instead."""
183 self.socket = self.context.socket(zmq.DEALER)
183 self.socket = self.context.socket(zmq.DEALER)
184 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
184 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
185 self.socket.connect('tcp://%s:%i' % self.address)
185 self.socket.connect('tcp://%s:%i' % self.address)
186 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
186 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
187 self.stream.on_recv(self._handle_recv)
187 self.stream.on_recv(self._handle_recv)
188 self._run_loop()
188 self._run_loop()
189
189
190 def stop(self):
190 def stop(self):
191 self.ioloop.stop()
191 self.ioloop.stop()
192 super(ShellSocketChannel, self).stop()
192 super(ShellSocketChannel, self).stop()
193
193
194 def call_handlers(self, msg):
194 def call_handlers(self, msg):
195 """This method is called in the ioloop thread when a message arrives.
195 """This method is called in the ioloop thread when a message arrives.
196
196
197 Subclasses should override this method to handle incoming messages.
197 Subclasses should override this method to handle incoming messages.
198 It is important to remember that this method is called in the thread
198 It is important to remember that this method is called in the thread
199 so that some logic must be done to ensure that the application leve
199 so that some logic must be done to ensure that the application leve
200 handlers are called in the application thread.
200 handlers are called in the application thread.
201 """
201 """
202 raise NotImplementedError('call_handlers must be defined in a subclass.')
202 raise NotImplementedError('call_handlers must be defined in a subclass.')
203
203
204 def execute(self, code, silent=False,
204 def execute(self, code, silent=False,
205 user_variables=None, user_expressions=None, allow_stdin=None):
205 user_variables=None, user_expressions=None, allow_stdin=None):
206 """Execute code in the kernel.
206 """Execute code in the kernel.
207
207
208 Parameters
208 Parameters
209 ----------
209 ----------
210 code : str
210 code : str
211 A string of Python code.
211 A string of Python code.
212
212
213 silent : bool, optional (default False)
213 silent : bool, optional (default False)
214 If set, the kernel will execute the code as quietly possible.
214 If set, the kernel will execute the code as quietly possible.
215
215
216 user_variables : list, optional
216 user_variables : list, optional
217 A list of variable names to pull from the user's namespace. They
217 A list of variable names to pull from the user's namespace. They
218 will come back as a dict with these names as keys and their
218 will come back as a dict with these names as keys and their
219 :func:`repr` as values.
219 :func:`repr` as values.
220
220
221 user_expressions : dict, optional
221 user_expressions : dict, optional
222 A dict with string keys and to pull from the user's
222 A dict with string keys and to pull from the user's
223 namespace. They will come back as a dict with these names as keys
223 namespace. They will come back as a dict with these names as keys
224 and their :func:`repr` as values.
224 and their :func:`repr` as values.
225
225
226 allow_stdin : bool, optional
226 allow_stdin : bool, optional
227 Flag for
227 Flag for
228 A dict with string keys and to pull from the user's
228 A dict with string keys and to pull from the user's
229 namespace. They will come back as a dict with these names as keys
229 namespace. They will come back as a dict with these names as keys
230 and their :func:`repr` as values.
230 and their :func:`repr` as values.
231
231
232 Returns
232 Returns
233 -------
233 -------
234 The msg_id of the message sent.
234 The msg_id of the message sent.
235 """
235 """
236 if user_variables is None:
236 if user_variables is None:
237 user_variables = []
237 user_variables = []
238 if user_expressions is None:
238 if user_expressions is None:
239 user_expressions = {}
239 user_expressions = {}
240 if allow_stdin is None:
240 if allow_stdin is None:
241 allow_stdin = self.allow_stdin
241 allow_stdin = self.allow_stdin
242
242
243
243
244 # Don't waste network traffic if inputs are invalid
244 # Don't waste network traffic if inputs are invalid
245 if not isinstance(code, basestring):
245 if not isinstance(code, basestring):
246 raise ValueError('code %r must be a string' % code)
246 raise ValueError('code %r must be a string' % code)
247 validate_string_list(user_variables)
247 validate_string_list(user_variables)
248 validate_string_dict(user_expressions)
248 validate_string_dict(user_expressions)
249
249
250 # Create class for content/msg creation. Related to, but possibly
250 # Create class for content/msg creation. Related to, but possibly
251 # not in Session.
251 # not in Session.
252 content = dict(code=code, silent=silent,
252 content = dict(code=code, silent=silent,
253 user_variables=user_variables,
253 user_variables=user_variables,
254 user_expressions=user_expressions,
254 user_expressions=user_expressions,
255 allow_stdin=allow_stdin,
255 allow_stdin=allow_stdin,
256 )
256 )
257 msg = self.session.msg('execute_request', content)
257 msg = self.session.msg('execute_request', content)
258 self._queue_send(msg)
258 self._queue_send(msg)
259 return msg['header']['msg_id']
259 return msg['header']['msg_id']
260
260
261 def complete(self, text, line, cursor_pos, block=None):
261 def complete(self, text, line, cursor_pos, block=None):
262 """Tab complete text in the kernel's namespace.
262 """Tab complete text in the kernel's namespace.
263
263
264 Parameters
264 Parameters
265 ----------
265 ----------
266 text : str
266 text : str
267 The text to complete.
267 The text to complete.
268 line : str
268 line : str
269 The full line of text that is the surrounding context for the
269 The full line of text that is the surrounding context for the
270 text to complete.
270 text to complete.
271 cursor_pos : int
271 cursor_pos : int
272 The position of the cursor in the line where the completion was
272 The position of the cursor in the line where the completion was
273 requested.
273 requested.
274 block : str, optional
274 block : str, optional
275 The full block of code in which the completion is being requested.
275 The full block of code in which the completion is being requested.
276
276
277 Returns
277 Returns
278 -------
278 -------
279 The msg_id of the message sent.
279 The msg_id of the message sent.
280 """
280 """
281 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
281 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
282 msg = self.session.msg('complete_request', content)
282 msg = self.session.msg('complete_request', content)
283 self._queue_send(msg)
283 self._queue_send(msg)
284 return msg['header']['msg_id']
284 return msg['header']['msg_id']
285
285
286 def object_info(self, oname):
286 def object_info(self, oname):
287 """Get metadata information about an object.
287 """Get metadata information about an object.
288
288
289 Parameters
289 Parameters
290 ----------
290 ----------
291 oname : str
291 oname : str
292 A string specifying the object name.
292 A string specifying the object name.
293
293
294 Returns
294 Returns
295 -------
295 -------
296 The msg_id of the message sent.
296 The msg_id of the message sent.
297 """
297 """
298 content = dict(oname=oname)
298 content = dict(oname=oname)
299 msg = self.session.msg('object_info_request', content)
299 msg = self.session.msg('object_info_request', content)
300 self._queue_send(msg)
300 self._queue_send(msg)
301 return msg['header']['msg_id']
301 return msg['header']['msg_id']
302
302
303 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
303 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
304 """Get entries from the history list.
304 """Get entries from the history list.
305
305
306 Parameters
306 Parameters
307 ----------
307 ----------
308 raw : bool
308 raw : bool
309 If True, return the raw input.
309 If True, return the raw input.
310 output : bool
310 output : bool
311 If True, then return the output as well.
311 If True, then return the output as well.
312 hist_access_type : str
312 hist_access_type : str
313 'range' (fill in session, start and stop params), 'tail' (fill in n)
313 'range' (fill in session, start and stop params), 'tail' (fill in n)
314 or 'search' (fill in pattern param).
314 or 'search' (fill in pattern param).
315
315
316 session : int
316 session : int
317 For a range request, the session from which to get lines. Session
317 For a range request, the session from which to get lines. Session
318 numbers are positive integers; negative ones count back from the
318 numbers are positive integers; negative ones count back from the
319 current session.
319 current session.
320 start : int
320 start : int
321 The first line number of a history range.
321 The first line number of a history range.
322 stop : int
322 stop : int
323 The final (excluded) line number of a history range.
323 The final (excluded) line number of a history range.
324
324
325 n : int
325 n : int
326 The number of lines of history to get for a tail request.
326 The number of lines of history to get for a tail request.
327
327
328 pattern : str
328 pattern : str
329 The glob-syntax pattern for a search request.
329 The glob-syntax pattern for a search request.
330
330
331 Returns
331 Returns
332 -------
332 -------
333 The msg_id of the message sent.
333 The msg_id of the message sent.
334 """
334 """
335 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
335 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
336 **kwargs)
336 **kwargs)
337 msg = self.session.msg('history_request', content)
337 msg = self.session.msg('history_request', content)
338 self._queue_send(msg)
338 self._queue_send(msg)
339 return msg['header']['msg_id']
339 return msg['header']['msg_id']
340
340
341 def shutdown(self, restart=False):
341 def shutdown(self, restart=False):
342 """Request an immediate kernel shutdown.
342 """Request an immediate kernel shutdown.
343
343
344 Upon receipt of the (empty) reply, client code can safely assume that
344 Upon receipt of the (empty) reply, client code can safely assume that
345 the kernel has shut down and it's safe to forcefully terminate it if
345 the kernel has shut down and it's safe to forcefully terminate it if
346 it's still alive.
346 it's still alive.
347
347
348 The kernel will send the reply via a function registered with Python's
348 The kernel will send the reply via a function registered with Python's
349 atexit module, ensuring it's truly done as the kernel is done with all
349 atexit module, ensuring it's truly done as the kernel is done with all
350 normal operation.
350 normal operation.
351 """
351 """
352 # Send quit message to kernel. Once we implement kernel-side setattr,
352 # Send quit message to kernel. Once we implement kernel-side setattr,
353 # this should probably be done that way, but for now this will do.
353 # this should probably be done that way, but for now this will do.
354 msg = self.session.msg('shutdown_request', {'restart':restart})
354 msg = self.session.msg('shutdown_request', {'restart':restart})
355 self._queue_send(msg)
355 self._queue_send(msg)
356 return msg['header']['msg_id']
356 return msg['header']['msg_id']
357
357
358
358
359
359
360 class SubSocketChannel(ZMQSocketChannel):
360 class SubSocketChannel(ZMQSocketChannel):
361 """The SUB channel which listens for messages that the kernel publishes.
361 """The SUB channel which listens for messages that the kernel publishes.
362 """
362 """
363
363
364 def __init__(self, context, session, address):
364 def __init__(self, context, session, address):
365 super(SubSocketChannel, self).__init__(context, session, address)
365 super(SubSocketChannel, self).__init__(context, session, address)
366 self.ioloop = ioloop.IOLoop()
366 self.ioloop = ioloop.IOLoop()
367
367
368 def run(self):
368 def run(self):
369 """The thread's main activity. Call start() instead."""
369 """The thread's main activity. Call start() instead."""
370 self.socket = self.context.socket(zmq.SUB)
370 self.socket = self.context.socket(zmq.SUB)
371 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
371 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect('tcp://%s:%i' % self.address)
373 self.socket.connect('tcp://%s:%i' % self.address)
374 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
374 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
375 self.stream.on_recv(self._handle_recv)
375 self.stream.on_recv(self._handle_recv)
376 self._run_loop()
376 self._run_loop()
377
377
378 def stop(self):
378 def stop(self):
379 self.ioloop.stop()
379 self.ioloop.stop()
380 super(SubSocketChannel, self).stop()
380 super(SubSocketChannel, self).stop()
381
381
382 def call_handlers(self, msg):
382 def call_handlers(self, msg):
383 """This method is called in the ioloop thread when a message arrives.
383 """This method is called in the ioloop thread when a message arrives.
384
384
385 Subclasses should override this method to handle incoming messages.
385 Subclasses should override this method to handle incoming messages.
386 It is important to remember that this method is called in the thread
386 It is important to remember that this method is called in the thread
387 so that some logic must be done to ensure that the application leve
387 so that some logic must be done to ensure that the application leve
388 handlers are called in the application thread.
388 handlers are called in the application thread.
389 """
389 """
390 raise NotImplementedError('call_handlers must be defined in a subclass.')
390 raise NotImplementedError('call_handlers must be defined in a subclass.')
391
391
392 def flush(self, timeout=1.0):
392 def flush(self, timeout=1.0):
393 """Immediately processes all pending messages on the SUB channel.
393 """Immediately processes all pending messages on the SUB channel.
394
394
395 Callers should use this method to ensure that :method:`call_handlers`
395 Callers should use this method to ensure that :method:`call_handlers`
396 has been called for all messages that have been received on the
396 has been called for all messages that have been received on the
397 0MQ SUB socket of this channel.
397 0MQ SUB socket of this channel.
398
398
399 This method is thread safe.
399 This method is thread safe.
400
400
401 Parameters
401 Parameters
402 ----------
402 ----------
403 timeout : float, optional
403 timeout : float, optional
404 The maximum amount of time to spend flushing, in seconds. The
404 The maximum amount of time to spend flushing, in seconds. The
405 default is one second.
405 default is one second.
406 """
406 """
407 # We do the IOLoop callback process twice to ensure that the IOLoop
407 # We do the IOLoop callback process twice to ensure that the IOLoop
408 # gets to perform at least one full poll.
408 # gets to perform at least one full poll.
409 stop_time = time.time() + timeout
409 stop_time = time.time() + timeout
410 for i in xrange(2):
410 for i in xrange(2):
411 self._flushed = False
411 self._flushed = False
412 self.ioloop.add_callback(self._flush)
412 self.ioloop.add_callback(self._flush)
413 while not self._flushed and time.time() < stop_time:
413 while not self._flushed and time.time() < stop_time:
414 time.sleep(0.01)
414 time.sleep(0.01)
415
415
416 def _flush(self):
416 def _flush(self):
417 """Callback for :method:`self.flush`."""
417 """Callback for :method:`self.flush`."""
418 self.stream.flush()
418 self.stream.flush()
419 self._flushed = True
419 self._flushed = True
420
420
421
421
422 class StdInSocketChannel(ZMQSocketChannel):
422 class StdInSocketChannel(ZMQSocketChannel):
423 """A reply channel to handle raw_input requests that the kernel makes."""
423 """A reply channel to handle raw_input requests that the kernel makes."""
424
424
425 msg_queue = None
425 msg_queue = None
426
426
427 def __init__(self, context, session, address):
427 def __init__(self, context, session, address):
428 super(StdInSocketChannel, self).__init__(context, session, address)
428 super(StdInSocketChannel, self).__init__(context, session, address)
429 self.ioloop = ioloop.IOLoop()
429 self.ioloop = ioloop.IOLoop()
430
430
431 def run(self):
431 def run(self):
432 """The thread's main activity. Call start() instead."""
432 """The thread's main activity. Call start() instead."""
433 self.socket = self.context.socket(zmq.DEALER)
433 self.socket = self.context.socket(zmq.DEALER)
434 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
434 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
435 self.socket.connect('tcp://%s:%i' % self.address)
435 self.socket.connect('tcp://%s:%i' % self.address)
436 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
436 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
437 self.stream.on_recv(self._handle_recv)
437 self.stream.on_recv(self._handle_recv)
438 self._run_loop()
438 self._run_loop()
439
439
440 def stop(self):
440 def stop(self):
441 self.ioloop.stop()
441 self.ioloop.stop()
442 super(StdInSocketChannel, self).stop()
442 super(StdInSocketChannel, self).stop()
443
443
444 def call_handlers(self, msg):
444 def call_handlers(self, msg):
445 """This method is called in the ioloop thread when a message arrives.
445 """This method is called in the ioloop thread when a message arrives.
446
446
447 Subclasses should override this method to handle incoming messages.
447 Subclasses should override this method to handle incoming messages.
448 It is important to remember that this method is called in the thread
448 It is important to remember that this method is called in the thread
449 so that some logic must be done to ensure that the application leve
449 so that some logic must be done to ensure that the application leve
450 handlers are called in the application thread.
450 handlers are called in the application thread.
451 """
451 """
452 raise NotImplementedError('call_handlers must be defined in a subclass.')
452 raise NotImplementedError('call_handlers must be defined in a subclass.')
453
453
454 def input(self, string):
454 def input(self, string):
455 """Send a string of raw input to the kernel."""
455 """Send a string of raw input to the kernel."""
456 content = dict(value=string)
456 content = dict(value=string)
457 msg = self.session.msg('input_reply', content)
457 msg = self.session.msg('input_reply', content)
458 self._queue_send(msg)
458 self._queue_send(msg)
459
459
460
460
461 class HBSocketChannel(ZMQSocketChannel):
461 class HBSocketChannel(ZMQSocketChannel):
462 """The heartbeat channel which monitors the kernel heartbeat.
462 """The heartbeat channel which monitors the kernel heartbeat.
463
463
464 Note that the heartbeat channel is paused by default. As long as you start
464 Note that the heartbeat channel is paused by default. As long as you start
465 this channel, the kernel manager will ensure that it is paused and un-paused
465 this channel, the kernel manager will ensure that it is paused and un-paused
466 as appropriate.
466 as appropriate.
467 """
467 """
468
468
469 time_to_dead = 3.0
469 time_to_dead = 3.0
470 socket = None
470 socket = None
471 poller = None
471 poller = None
472 _running = None
472 _running = None
473 _pause = None
473 _pause = None
474 _beating = None
474
475
475 def __init__(self, context, session, address):
476 def __init__(self, context, session, address):
476 super(HBSocketChannel, self).__init__(context, session, address)
477 super(HBSocketChannel, self).__init__(context, session, address)
477 self._running = False
478 self._running = False
478 self._pause = True
479 self._pause =True
480 self.poller = zmq.Poller()
479
481
480 def _create_socket(self):
482 def _create_socket(self):
483 if self.socket is not None:
484 # close previous socket, before opening a new one
485 self.poller.unregister(self.socket)
486 self.socket.close()
481 self.socket = self.context.socket(zmq.REQ)
487 self.socket = self.context.socket(zmq.REQ)
482 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
488 self.socket.setsockopt(zmq.LINGER, 0)
483 self.socket.connect('tcp://%s:%i' % self.address)
489 self.socket.connect('tcp://%s:%i' % self.address)
484 self.poller = zmq.Poller()
490
485 self.poller.register(self.socket, zmq.POLLIN)
491 self.poller.register(self.socket, zmq.POLLIN)
492
493 def _poll(self, start_time):
494 """poll for heartbeat replies until we reach self.time_to_dead
495
496 Ignores interrupts, and returns the result of poll(), which
497 will be an empty list if no messages arrived before the timeout,
498 or the event tuple if there is a message to receive.
499 """
500
501 until_dead = self.time_to_dead - (time.time() - start_time)
502 # ensure poll at least once
503 until_dead = max(until_dead, 1e-3)
504 events = []
505 while True:
506 try:
507 events = self.poller.poll(1000 * until_dead)
508 except zmq.ZMQError as e:
509 if e.errno == errno.EINTR:
510 # ignore interrupts during heartbeat
511 # this may never actually happen
512 until_dead = self.time_to_dead - (time.time() - start_time)
513 until_dead = max(until_dead, 1e-3)
514 pass
515 else:
516 raise
517 else:
518 break
519 return events
486
520
487 def run(self):
521 def run(self):
488 """The thread's main activity. Call start() instead."""
522 """The thread's main activity. Call start() instead."""
489 self._create_socket()
523 self._create_socket()
490 self._running = True
524 self._running = True
525 self._beating = True
526
491 while self._running:
527 while self._running:
492 if self._pause:
528 if self._pause:
529 # just sleep, and skip the rest of the loop
493 time.sleep(self.time_to_dead)
530 time.sleep(self.time_to_dead)
531 continue
532
533 since_last_heartbeat = 0.0
534 # io.rprint('Ping from HB channel') # dbg
535 # no need to catch EFSM here, because the previous event was
536 # either a recv or connect, which cannot be followed by EFSM
537 self.socket.send(b'ping')
538 request_time = time.time()
539 ready = self._poll(request_time)
540 if ready:
541 self._beating = True
542 # the poll above guarantees we have something to recv
543 self.socket.recv()
544 # sleep the remainder of the cycle
545 remainder = self.time_to_dead - (time.time() - request_time)
546 if remainder > 0:
547 time.sleep(remainder)
548 continue
494 else:
549 else:
495 since_last_heartbeat = 0.0
550 # nothing was received within the time limit, signal heart failure
496 request_time = time.time()
551 self._beating = False
497 try:
552 since_last_heartbeat = time.time() - request_time
498 #io.rprint('Ping from HB channel') # dbg
553 self.call_handlers(since_last_heartbeat)
499 self.socket.send(b'ping')
554 # and close/reopen the socket, because the REQ/REP cycle has been broken
500 except zmq.ZMQError, e:
555 self._create_socket()
501 #io.rprint('*** HB Error:', e) # dbg
556 continue
502 if e.errno == zmq.EFSM:
503 #io.rprint('sleep...', self.time_to_dead) # dbg
504 time.sleep(self.time_to_dead)
505 self._create_socket()
506 else:
507 raise
508 else:
509 while True:
510 try:
511 self.socket.recv(zmq.NOBLOCK)
512 except zmq.ZMQError, e:
513 #io.rprint('*** HB Error 2:', e) # dbg
514 if e.errno == zmq.EAGAIN:
515 before_poll = time.time()
516 until_dead = self.time_to_dead - (before_poll -
517 request_time)
518
519 # When the return value of poll() is an empty
520 # list, that is when things have gone wrong
521 # (zeromq bug). As long as it is not an empty
522 # list, poll is working correctly even if it
523 # returns quickly. Note: poll timeout is in
524 # milliseconds.
525 if until_dead > 0.0:
526 while True:
527 try:
528 self.poller.poll(1000 * until_dead)
529 except zmq.ZMQError as e:
530 if e.errno == errno.EINTR:
531 continue
532 else:
533 raise
534 else:
535 break
536
537 since_last_heartbeat = time.time()-request_time
538 if since_last_heartbeat > self.time_to_dead:
539 self.call_handlers(since_last_heartbeat)
540 break
541 else:
542 # FIXME: We should probably log this instead.
543 raise
544 else:
545 until_dead = self.time_to_dead - (time.time() -
546 request_time)
547 if until_dead > 0.0:
548 #io.rprint('sleep...', self.time_to_dead) # dbg
549 time.sleep(until_dead)
550 break
551
557
552 def pause(self):
558 def pause(self):
553 """Pause the heartbeat."""
559 """Pause the heartbeat."""
554 self._pause = True
560 self._pause = True
555
561
556 def unpause(self):
562 def unpause(self):
557 """Unpause the heartbeat."""
563 """Unpause the heartbeat."""
558 self._pause = False
564 self._pause = False
559
565
560 def is_beating(self):
566 def is_beating(self):
561 """Is the heartbeat running and not paused."""
567 """Is the heartbeat running and responsive (and not paused)."""
562 if self.is_alive() and not self._pause:
568 if self.is_alive() and not self._pause and self._beating:
563 return True
569 return True
564 else:
570 else:
565 return False
571 return False
566
572
567 def stop(self):
573 def stop(self):
568 self._running = False
574 self._running = False
569 super(HBSocketChannel, self).stop()
575 super(HBSocketChannel, self).stop()
570
576
571 def call_handlers(self, since_last_heartbeat):
577 def call_handlers(self, since_last_heartbeat):
572 """This method is called in the ioloop thread when a message arrives.
578 """This method is called in the ioloop thread when a message arrives.
573
579
574 Subclasses should override this method to handle incoming messages.
580 Subclasses should override this method to handle incoming messages.
575 It is important to remember that this method is called in the thread
581 It is important to remember that this method is called in the thread
576 so that some logic must be done to ensure that the application leve
582 so that some logic must be done to ensure that the application level
577 handlers are called in the application thread.
583 handlers are called in the application thread.
578 """
584 """
579 raise NotImplementedError('call_handlers must be defined in a subclass.')
585 raise NotImplementedError('call_handlers must be defined in a subclass.')
580
586
581
587
582 #-----------------------------------------------------------------------------
588 #-----------------------------------------------------------------------------
583 # Main kernel manager class
589 # Main kernel manager class
584 #-----------------------------------------------------------------------------
590 #-----------------------------------------------------------------------------
585
591
586 class KernelManager(HasTraits):
592 class KernelManager(HasTraits):
587 """ Manages a kernel for a frontend.
593 """ Manages a kernel for a frontend.
588
594
589 The SUB channel is for the frontend to receive messages published by the
595 The SUB channel is for the frontend to receive messages published by the
590 kernel.
596 kernel.
591
597
592 The REQ channel is for the frontend to make requests of the kernel.
598 The REQ channel is for the frontend to make requests of the kernel.
593
599
594 The REP channel is for the kernel to request stdin (raw_input) from the
600 The REP channel is for the kernel to request stdin (raw_input) from the
595 frontend.
601 frontend.
596 """
602 """
597 # config object for passing to child configurables
603 # config object for passing to child configurables
598 config = Instance(Config)
604 config = Instance(Config)
599
605
600 # The PyZMQ Context to use for communication with the kernel.
606 # The PyZMQ Context to use for communication with the kernel.
601 context = Instance(zmq.Context)
607 context = Instance(zmq.Context)
602 def _context_default(self):
608 def _context_default(self):
603 return zmq.Context.instance()
609 return zmq.Context.instance()
604
610
605 # The Session to use for communication with the kernel.
611 # The Session to use for communication with the kernel.
606 session = Instance(Session)
612 session = Instance(Session)
607
613
608 # The kernel process with which the KernelManager is communicating.
614 # The kernel process with which the KernelManager is communicating.
609 kernel = Instance(Popen)
615 kernel = Instance(Popen)
610
616
611 # The addresses for the communication channels.
617 # The addresses for the communication channels.
612 connection_file = Unicode('')
618 connection_file = Unicode('')
613 ip = Unicode(LOCALHOST)
619 ip = Unicode(LOCALHOST)
614 def _ip_changed(self, name, old, new):
620 def _ip_changed(self, name, old, new):
615 if new == '*':
621 if new == '*':
616 self.ip = '0.0.0.0'
622 self.ip = '0.0.0.0'
617 shell_port = Integer(0)
623 shell_port = Integer(0)
618 iopub_port = Integer(0)
624 iopub_port = Integer(0)
619 stdin_port = Integer(0)
625 stdin_port = Integer(0)
620 hb_port = Integer(0)
626 hb_port = Integer(0)
621
627
622 # The classes to use for the various channels.
628 # The classes to use for the various channels.
623 shell_channel_class = Type(ShellSocketChannel)
629 shell_channel_class = Type(ShellSocketChannel)
624 sub_channel_class = Type(SubSocketChannel)
630 sub_channel_class = Type(SubSocketChannel)
625 stdin_channel_class = Type(StdInSocketChannel)
631 stdin_channel_class = Type(StdInSocketChannel)
626 hb_channel_class = Type(HBSocketChannel)
632 hb_channel_class = Type(HBSocketChannel)
627
633
628 # Protected traits.
634 # Protected traits.
629 _launch_args = Any
635 _launch_args = Any
630 _shell_channel = Any
636 _shell_channel = Any
631 _sub_channel = Any
637 _sub_channel = Any
632 _stdin_channel = Any
638 _stdin_channel = Any
633 _hb_channel = Any
639 _hb_channel = Any
634 _connection_file_written=Bool(False)
640 _connection_file_written=Bool(False)
635
641
636 def __init__(self, **kwargs):
642 def __init__(self, **kwargs):
637 super(KernelManager, self).__init__(**kwargs)
643 super(KernelManager, self).__init__(**kwargs)
638 if self.session is None:
644 if self.session is None:
639 self.session = Session(config=self.config)
645 self.session = Session(config=self.config)
640
646
641 def __del__(self):
647 def __del__(self):
642 self.cleanup_connection_file()
648 self.cleanup_connection_file()
643
649
644
650
645 #--------------------------------------------------------------------------
651 #--------------------------------------------------------------------------
646 # Channel management methods:
652 # Channel management methods:
647 #--------------------------------------------------------------------------
653 #--------------------------------------------------------------------------
648
654
649 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
655 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
650 """Starts the channels for this kernel.
656 """Starts the channels for this kernel.
651
657
652 This will create the channels if they do not exist and then start
658 This will create the channels if they do not exist and then start
653 them. If port numbers of 0 are being used (random ports) then you
659 them. If port numbers of 0 are being used (random ports) then you
654 must first call :method:`start_kernel`. If the channels have been
660 must first call :method:`start_kernel`. If the channels have been
655 stopped and you call this, :class:`RuntimeError` will be raised.
661 stopped and you call this, :class:`RuntimeError` will be raised.
656 """
662 """
657 if shell:
663 if shell:
658 self.shell_channel.start()
664 self.shell_channel.start()
659 if sub:
665 if sub:
660 self.sub_channel.start()
666 self.sub_channel.start()
661 if stdin:
667 if stdin:
662 self.stdin_channel.start()
668 self.stdin_channel.start()
663 self.shell_channel.allow_stdin = True
669 self.shell_channel.allow_stdin = True
664 else:
670 else:
665 self.shell_channel.allow_stdin = False
671 self.shell_channel.allow_stdin = False
666 if hb:
672 if hb:
667 self.hb_channel.start()
673 self.hb_channel.start()
668
674
669 def stop_channels(self):
675 def stop_channels(self):
670 """Stops all the running channels for this kernel.
676 """Stops all the running channels for this kernel.
671 """
677 """
672 if self.shell_channel.is_alive():
678 if self.shell_channel.is_alive():
673 self.shell_channel.stop()
679 self.shell_channel.stop()
674 if self.sub_channel.is_alive():
680 if self.sub_channel.is_alive():
675 self.sub_channel.stop()
681 self.sub_channel.stop()
676 if self.stdin_channel.is_alive():
682 if self.stdin_channel.is_alive():
677 self.stdin_channel.stop()
683 self.stdin_channel.stop()
678 if self.hb_channel.is_alive():
684 if self.hb_channel.is_alive():
679 self.hb_channel.stop()
685 self.hb_channel.stop()
680
686
681 @property
687 @property
682 def channels_running(self):
688 def channels_running(self):
683 """Are any of the channels created and running?"""
689 """Are any of the channels created and running?"""
684 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
690 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
685 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
691 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
686
692
687 #--------------------------------------------------------------------------
693 #--------------------------------------------------------------------------
688 # Kernel process management methods:
694 # Kernel process management methods:
689 #--------------------------------------------------------------------------
695 #--------------------------------------------------------------------------
690
696
691 def cleanup_connection_file(self):
697 def cleanup_connection_file(self):
692 """cleanup connection file *if we wrote it*
698 """cleanup connection file *if we wrote it*
693
699
694 Will not raise if the connection file was already removed somehow.
700 Will not raise if the connection file was already removed somehow.
695 """
701 """
696 if self._connection_file_written:
702 if self._connection_file_written:
697 # cleanup connection files on full shutdown of kernel we started
703 # cleanup connection files on full shutdown of kernel we started
698 self._connection_file_written = False
704 self._connection_file_written = False
699 try:
705 try:
700 os.remove(self.connection_file)
706 os.remove(self.connection_file)
701 except OSError:
707 except OSError:
702 pass
708 pass
703
709
704 def load_connection_file(self):
710 def load_connection_file(self):
705 """load connection info from JSON dict in self.connection_file"""
711 """load connection info from JSON dict in self.connection_file"""
706 with open(self.connection_file) as f:
712 with open(self.connection_file) as f:
707 cfg = json.loads(f.read())
713 cfg = json.loads(f.read())
708
714
709 self.ip = cfg['ip']
715 self.ip = cfg['ip']
710 self.shell_port = cfg['shell_port']
716 self.shell_port = cfg['shell_port']
711 self.stdin_port = cfg['stdin_port']
717 self.stdin_port = cfg['stdin_port']
712 self.iopub_port = cfg['iopub_port']
718 self.iopub_port = cfg['iopub_port']
713 self.hb_port = cfg['hb_port']
719 self.hb_port = cfg['hb_port']
714 self.session.key = str_to_bytes(cfg['key'])
720 self.session.key = str_to_bytes(cfg['key'])
715
721
716 def write_connection_file(self):
722 def write_connection_file(self):
717 """write connection info to JSON dict in self.connection_file"""
723 """write connection info to JSON dict in self.connection_file"""
718 if self._connection_file_written:
724 if self._connection_file_written:
719 return
725 return
720 self.connection_file,cfg = write_connection_file(self.connection_file,
726 self.connection_file,cfg = write_connection_file(self.connection_file,
721 ip=self.ip, key=self.session.key,
727 ip=self.ip, key=self.session.key,
722 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
728 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
723 shell_port=self.shell_port, hb_port=self.hb_port)
729 shell_port=self.shell_port, hb_port=self.hb_port)
724 # write_connection_file also sets default ports:
730 # write_connection_file also sets default ports:
725 self.shell_port = cfg['shell_port']
731 self.shell_port = cfg['shell_port']
726 self.stdin_port = cfg['stdin_port']
732 self.stdin_port = cfg['stdin_port']
727 self.iopub_port = cfg['iopub_port']
733 self.iopub_port = cfg['iopub_port']
728 self.hb_port = cfg['hb_port']
734 self.hb_port = cfg['hb_port']
729
735
730 self._connection_file_written = True
736 self._connection_file_written = True
731
737
732 def start_kernel(self, **kw):
738 def start_kernel(self, **kw):
733 """Starts a kernel process and configures the manager to use it.
739 """Starts a kernel process and configures the manager to use it.
734
740
735 If random ports (port=0) are being used, this method must be called
741 If random ports (port=0) are being used, this method must be called
736 before the channels are created.
742 before the channels are created.
737
743
738 Parameters:
744 Parameters:
739 -----------
745 -----------
740 ipython : bool, optional (default True)
746 ipython : bool, optional (default True)
741 Whether to use an IPython kernel instead of a plain Python kernel.
747 Whether to use an IPython kernel instead of a plain Python kernel.
742
748
743 launcher : callable, optional (default None)
749 launcher : callable, optional (default None)
744 A custom function for launching the kernel process (generally a
750 A custom function for launching the kernel process (generally a
745 wrapper around ``entry_point.base_launch_kernel``). In most cases,
751 wrapper around ``entry_point.base_launch_kernel``). In most cases,
746 it should not be necessary to use this parameter.
752 it should not be necessary to use this parameter.
747
753
748 **kw : optional
754 **kw : optional
749 See respective options for IPython and Python kernels.
755 See respective options for IPython and Python kernels.
750 """
756 """
751 if self.ip not in LOCAL_IPS:
757 if self.ip not in LOCAL_IPS:
752 raise RuntimeError("Can only launch a kernel on a local interface. "
758 raise RuntimeError("Can only launch a kernel on a local interface. "
753 "Make sure that the '*_address' attributes are "
759 "Make sure that the '*_address' attributes are "
754 "configured properly. "
760 "configured properly. "
755 "Currently valid addresses are: %s"%LOCAL_IPS
761 "Currently valid addresses are: %s"%LOCAL_IPS
756 )
762 )
757
763
758 # write connection file / get default ports
764 # write connection file / get default ports
759 self.write_connection_file()
765 self.write_connection_file()
760
766
761 self._launch_args = kw.copy()
767 self._launch_args = kw.copy()
762 launch_kernel = kw.pop('launcher', None)
768 launch_kernel = kw.pop('launcher', None)
763 if launch_kernel is None:
769 if launch_kernel is None:
764 if kw.pop('ipython', True):
770 if kw.pop('ipython', True):
765 from ipkernel import launch_kernel
771 from ipkernel import launch_kernel
766 else:
772 else:
767 from pykernel import launch_kernel
773 from pykernel import launch_kernel
768 self.kernel = launch_kernel(fname=self.connection_file, **kw)
774 self.kernel = launch_kernel(fname=self.connection_file, **kw)
769
775
770 def shutdown_kernel(self, restart=False):
776 def shutdown_kernel(self, restart=False):
771 """ Attempts to the stop the kernel process cleanly. If the kernel
777 """ Attempts to the stop the kernel process cleanly. If the kernel
772 cannot be stopped, it is killed, if possible.
778 cannot be stopped, it is killed, if possible.
773 """
779 """
774 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
780 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
775 if sys.platform == 'win32':
781 if sys.platform == 'win32':
776 self.kill_kernel()
782 self.kill_kernel()
777 return
783 return
778
784
779 # Pause the heart beat channel if it exists.
785 # Pause the heart beat channel if it exists.
780 if self._hb_channel is not None:
786 if self._hb_channel is not None:
781 self._hb_channel.pause()
787 self._hb_channel.pause()
782
788
783 # Don't send any additional kernel kill messages immediately, to give
789 # Don't send any additional kernel kill messages immediately, to give
784 # the kernel a chance to properly execute shutdown actions. Wait for at
790 # the kernel a chance to properly execute shutdown actions. Wait for at
785 # most 1s, checking every 0.1s.
791 # most 1s, checking every 0.1s.
786 self.shell_channel.shutdown(restart=restart)
792 self.shell_channel.shutdown(restart=restart)
787 for i in range(10):
793 for i in range(10):
788 if self.is_alive:
794 if self.is_alive:
789 time.sleep(0.1)
795 time.sleep(0.1)
790 else:
796 else:
791 break
797 break
792 else:
798 else:
793 # OK, we've waited long enough.
799 # OK, we've waited long enough.
794 if self.has_kernel:
800 if self.has_kernel:
795 self.kill_kernel()
801 self.kill_kernel()
796
802
797 if not restart and self._connection_file_written:
803 if not restart and self._connection_file_written:
798 # cleanup connection files on full shutdown of kernel we started
804 # cleanup connection files on full shutdown of kernel we started
799 self._connection_file_written = False
805 self._connection_file_written = False
800 try:
806 try:
801 os.remove(self.connection_file)
807 os.remove(self.connection_file)
802 except IOError:
808 except IOError:
803 pass
809 pass
804
810
805 def restart_kernel(self, now=False, **kw):
811 def restart_kernel(self, now=False, **kw):
806 """Restarts a kernel with the arguments that were used to launch it.
812 """Restarts a kernel with the arguments that were used to launch it.
807
813
808 If the old kernel was launched with random ports, the same ports will be
814 If the old kernel was launched with random ports, the same ports will be
809 used for the new kernel.
815 used for the new kernel.
810
816
811 Parameters
817 Parameters
812 ----------
818 ----------
813 now : bool, optional
819 now : bool, optional
814 If True, the kernel is forcefully restarted *immediately*, without
820 If True, the kernel is forcefully restarted *immediately*, without
815 having a chance to do any cleanup action. Otherwise the kernel is
821 having a chance to do any cleanup action. Otherwise the kernel is
816 given 1s to clean up before a forceful restart is issued.
822 given 1s to clean up before a forceful restart is issued.
817
823
818 In all cases the kernel is restarted, the only difference is whether
824 In all cases the kernel is restarted, the only difference is whether
819 it is given a chance to perform a clean shutdown or not.
825 it is given a chance to perform a clean shutdown or not.
820
826
821 **kw : optional
827 **kw : optional
822 Any options specified here will replace those used to launch the
828 Any options specified here will replace those used to launch the
823 kernel.
829 kernel.
824 """
830 """
825 if self._launch_args is None:
831 if self._launch_args is None:
826 raise RuntimeError("Cannot restart the kernel. "
832 raise RuntimeError("Cannot restart the kernel. "
827 "No previous call to 'start_kernel'.")
833 "No previous call to 'start_kernel'.")
828 else:
834 else:
829 # Stop currently running kernel.
835 # Stop currently running kernel.
830 if self.has_kernel:
836 if self.has_kernel:
831 if now:
837 if now:
832 self.kill_kernel()
838 self.kill_kernel()
833 else:
839 else:
834 self.shutdown_kernel(restart=True)
840 self.shutdown_kernel(restart=True)
835
841
836 # Start new kernel.
842 # Start new kernel.
837 self._launch_args.update(kw)
843 self._launch_args.update(kw)
838 self.start_kernel(**self._launch_args)
844 self.start_kernel(**self._launch_args)
839
845
840 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
846 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
841 # unless there is some delay here.
847 # unless there is some delay here.
842 if sys.platform == 'win32':
848 if sys.platform == 'win32':
843 time.sleep(0.2)
849 time.sleep(0.2)
844
850
845 @property
851 @property
846 def has_kernel(self):
852 def has_kernel(self):
847 """Returns whether a kernel process has been specified for the kernel
853 """Returns whether a kernel process has been specified for the kernel
848 manager.
854 manager.
849 """
855 """
850 return self.kernel is not None
856 return self.kernel is not None
851
857
852 def kill_kernel(self):
858 def kill_kernel(self):
853 """ Kill the running kernel. """
859 """ Kill the running kernel. """
854 if self.has_kernel:
860 if self.has_kernel:
855 # Pause the heart beat channel if it exists.
861 # Pause the heart beat channel if it exists.
856 if self._hb_channel is not None:
862 if self._hb_channel is not None:
857 self._hb_channel.pause()
863 self._hb_channel.pause()
858
864
859 # Attempt to kill the kernel.
865 # Attempt to kill the kernel.
860 try:
866 try:
861 self.kernel.kill()
867 self.kernel.kill()
862 except OSError, e:
868 except OSError, e:
863 # In Windows, we will get an Access Denied error if the process
869 # In Windows, we will get an Access Denied error if the process
864 # has already terminated. Ignore it.
870 # has already terminated. Ignore it.
865 if sys.platform == 'win32':
871 if sys.platform == 'win32':
866 if e.winerror != 5:
872 if e.winerror != 5:
867 raise
873 raise
868 # On Unix, we may get an ESRCH error if the process has already
874 # On Unix, we may get an ESRCH error if the process has already
869 # terminated. Ignore it.
875 # terminated. Ignore it.
870 else:
876 else:
871 from errno import ESRCH
877 from errno import ESRCH
872 if e.errno != ESRCH:
878 if e.errno != ESRCH:
873 raise
879 raise
874 self.kernel = None
880 self.kernel = None
875 else:
881 else:
876 raise RuntimeError("Cannot kill kernel. No kernel is running!")
882 raise RuntimeError("Cannot kill kernel. No kernel is running!")
877
883
878 def interrupt_kernel(self):
884 def interrupt_kernel(self):
879 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
885 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
880 well supported on all platforms.
886 well supported on all platforms.
881 """
887 """
882 if self.has_kernel:
888 if self.has_kernel:
883 if sys.platform == 'win32':
889 if sys.platform == 'win32':
884 from parentpoller import ParentPollerWindows as Poller
890 from parentpoller import ParentPollerWindows as Poller
885 Poller.send_interrupt(self.kernel.win32_interrupt_event)
891 Poller.send_interrupt(self.kernel.win32_interrupt_event)
886 else:
892 else:
887 self.kernel.send_signal(signal.SIGINT)
893 self.kernel.send_signal(signal.SIGINT)
888 else:
894 else:
889 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
895 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
890
896
891 def signal_kernel(self, signum):
897 def signal_kernel(self, signum):
892 """ Sends a signal to the kernel. Note that since only SIGTERM is
898 """ Sends a signal to the kernel. Note that since only SIGTERM is
893 supported on Windows, this function is only useful on Unix systems.
899 supported on Windows, this function is only useful on Unix systems.
894 """
900 """
895 if self.has_kernel:
901 if self.has_kernel:
896 self.kernel.send_signal(signum)
902 self.kernel.send_signal(signum)
897 else:
903 else:
898 raise RuntimeError("Cannot signal kernel. No kernel is running!")
904 raise RuntimeError("Cannot signal kernel. No kernel is running!")
899
905
900 @property
906 @property
901 def is_alive(self):
907 def is_alive(self):
902 """Is the kernel process still running?"""
908 """Is the kernel process still running?"""
903 # FIXME: not using a heartbeat means this method is broken for any
904 # remote kernel, it's only capable of handling local kernels.
905 if self.has_kernel:
909 if self.has_kernel:
906 if self.kernel.poll() is None:
910 if self.kernel.poll() is None:
907 return True
911 return True
908 else:
912 else:
909 return False
913 return False
914 elif self._hb_channel is not None:
915 # We didn't start the kernel with this KernelManager so we
916 # use the heartbeat.
917 return self._hb_channel.is_beating()
910 else:
918 else:
911 # We didn't start the kernel with this KernelManager so we don't
919 # no heartbeat and not local, we can't tell if it's running,
912 # know if it is running. We should use a heartbeat for this case.
920 # so naively return True
913 return True
921 return True
914
922
915 #--------------------------------------------------------------------------
923 #--------------------------------------------------------------------------
916 # Channels used for communication with the kernel:
924 # Channels used for communication with the kernel:
917 #--------------------------------------------------------------------------
925 #--------------------------------------------------------------------------
918
926
919 @property
927 @property
920 def shell_channel(self):
928 def shell_channel(self):
921 """Get the REQ socket channel object to make requests of the kernel."""
929 """Get the REQ socket channel object to make requests of the kernel."""
922 if self._shell_channel is None:
930 if self._shell_channel is None:
923 self._shell_channel = self.shell_channel_class(self.context,
931 self._shell_channel = self.shell_channel_class(self.context,
924 self.session,
932 self.session,
925 (self.ip, self.shell_port))
933 (self.ip, self.shell_port))
926 return self._shell_channel
934 return self._shell_channel
927
935
928 @property
936 @property
929 def sub_channel(self):
937 def sub_channel(self):
930 """Get the SUB socket channel object."""
938 """Get the SUB socket channel object."""
931 if self._sub_channel is None:
939 if self._sub_channel is None:
932 self._sub_channel = self.sub_channel_class(self.context,
940 self._sub_channel = self.sub_channel_class(self.context,
933 self.session,
941 self.session,
934 (self.ip, self.iopub_port))
942 (self.ip, self.iopub_port))
935 return self._sub_channel
943 return self._sub_channel
936
944
937 @property
945 @property
938 def stdin_channel(self):
946 def stdin_channel(self):
939 """Get the REP socket channel object to handle stdin (raw_input)."""
947 """Get the REP socket channel object to handle stdin (raw_input)."""
940 if self._stdin_channel is None:
948 if self._stdin_channel is None:
941 self._stdin_channel = self.stdin_channel_class(self.context,
949 self._stdin_channel = self.stdin_channel_class(self.context,
942 self.session,
950 self.session,
943 (self.ip, self.stdin_port))
951 (self.ip, self.stdin_port))
944 return self._stdin_channel
952 return self._stdin_channel
945
953
946 @property
954 @property
947 def hb_channel(self):
955 def hb_channel(self):
948 """Get the heartbeat socket channel object to check that the
956 """Get the heartbeat socket channel object to check that the
949 kernel is alive."""
957 kernel is alive."""
950 if self._hb_channel is None:
958 if self._hb_channel is None:
951 self._hb_channel = self.hb_channel_class(self.context,
959 self._hb_channel = self.hb_channel_class(self.context,
952 self.session,
960 self.session,
953 (self.ip, self.hb_port))
961 (self.ip, self.hb_port))
954 return self._hb_channel
962 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now