##// END OF EJS Templates
Merge branch 'kernel-logging' of https://github.com/omazapa/ipython into omazapa-kernel-logging...
Fernando Perez -
r3322:9d65bd5f merge
parent child Browse files
Show More
@@ -1,84 +1,92 b''
1 import logging
1 import sys
2 import sys
2 import time
3 import time
3 from cStringIO import StringIO
4 from cStringIO import StringIO
4
5
5 from session import extract_header, Message
6 from session import extract_header, Message
6
7
7 from IPython.utils import io
8 from IPython.utils import io
8
9
9 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Globals
12 #-----------------------------------------------------------------------------
13
14 # Module-level logger
15 logger = logging.getLogger(__name__)
16
17 #-----------------------------------------------------------------------------
10 # Stream classes
18 # Stream classes
11 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
12
20
13 class OutStream(object):
21 class OutStream(object):
14 """A file like object that publishes the stream to a 0MQ PUB socket."""
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
15
23
16 # The time interval between automatic flushes, in seconds.
24 # The time interval between automatic flushes, in seconds.
17 flush_interval = 0.05
25 flush_interval = 0.05
18
26
19 def __init__(self, session, pub_socket, name):
27 def __init__(self, session, pub_socket, name):
20 self.session = session
28 self.session = session
21 self.pub_socket = pub_socket
29 self.pub_socket = pub_socket
22 self.name = name
30 self.name = name
23 self.parent_header = {}
31 self.parent_header = {}
24 self._new_buffer()
32 self._new_buffer()
25
33
26 def set_parent(self, parent):
34 def set_parent(self, parent):
27 self.parent_header = extract_header(parent)
35 self.parent_header = extract_header(parent)
28
36
29 def close(self):
37 def close(self):
30 self.pub_socket = None
38 self.pub_socket = None
31
39
32 def flush(self):
40 def flush(self):
33 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
41 #io.rprint('>>>flushing output buffer: %s<<<' % self.name) # dbg
34 if self.pub_socket is None:
42 if self.pub_socket is None:
35 raise ValueError(u'I/O operation on closed file')
43 raise ValueError(u'I/O operation on closed file')
36 else:
44 else:
37 data = self._buffer.getvalue()
45 data = self._buffer.getvalue()
38 if data:
46 if data:
39 content = {u'name':self.name, u'data':data}
47 content = {u'name':self.name, u'data':data}
40 msg = self.session.send(self.pub_socket, u'stream', content=content,
48 msg = self.session.send(self.pub_socket, u'stream',
49 content=content,
41 parent=self.parent_header)
50 parent=self.parent_header)
42 io.raw_print(msg)
51 logger.debug(msg)
43
44 self._buffer.close()
52 self._buffer.close()
45 self._new_buffer()
53 self._new_buffer()
46
54
47 def isatty(self):
55 def isatty(self):
48 return False
56 return False
49
57
50 def next(self):
58 def next(self):
51 raise IOError('Read not supported on a write only stream.')
59 raise IOError('Read not supported on a write only stream.')
52
60
53 def read(self, size=-1):
61 def read(self, size=-1):
54 raise IOError('Read not supported on a write only stream.')
62 raise IOError('Read not supported on a write only stream.')
55
63
56 def readline(self, size=-1):
64 def readline(self, size=-1):
57 raise IOError('Read not supported on a write only stream.')
65 raise IOError('Read not supported on a write only stream.')
58
66
59 def write(self, string):
67 def write(self, string):
60 if self.pub_socket is None:
68 if self.pub_socket is None:
61 raise ValueError('I/O operation on closed file')
69 raise ValueError('I/O operation on closed file')
62 else:
70 else:
63 # We can only send raw bytes, not unicode objects, so we encode
71 # We can only send raw bytes, not unicode objects, so we encode
64 # into utf-8 for all frontends if we get unicode inputs.
72 # into utf-8 for all frontends if we get unicode inputs.
65 if type(string) == unicode:
73 if type(string) == unicode:
66 string = string.encode('utf-8')
74 string = string.encode('utf-8')
67
75
68 self._buffer.write(string)
76 self._buffer.write(string)
69 current_time = time.time()
77 current_time = time.time()
70 if self._start <= 0:
78 if self._start <= 0:
71 self._start = current_time
79 self._start = current_time
72 elif current_time - self._start > self.flush_interval:
80 elif current_time - self._start > self.flush_interval:
73 self.flush()
81 self.flush()
74
82
75 def writelines(self, sequence):
83 def writelines(self, sequence):
76 if self.pub_socket is None:
84 if self.pub_socket is None:
77 raise ValueError('I/O operation on closed file')
85 raise ValueError('I/O operation on closed file')
78 else:
86 else:
79 for string in sequence:
87 for string in sequence:
80 self.write(string)
88 self.write(string)
81
89
82 def _new_buffer(self):
90 def _new_buffer(self):
83 self._buffer = StringIO()
91 self._buffer = StringIO()
84 self._start = -1
92 self._start = -1
@@ -1,632 +1,641 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import __builtin__
19 import __builtin__
20 import atexit
20 import atexit
21 import sys
21 import sys
22 import time
22 import time
23 import traceback
23 import traceback
24
24 import logging
25 # System library imports.
25 # System library imports.
26 import zmq
26 import zmq
27
27
28 # Local imports.
28 # Local imports.
29 from IPython.config.configurable import Configurable
29 from IPython.config.configurable import Configurable
30 from IPython.utils import io
30 from IPython.utils import io
31 from IPython.utils.jsonutil import json_clean
31 from IPython.utils.jsonutil import json_clean
32 from IPython.lib import pylabtools
32 from IPython.lib import pylabtools
33 from IPython.utils.traitlets import Instance, Float
33 from IPython.utils.traitlets import Instance, Float
34 from entry_point import (base_launch_kernel, make_argument_parser, make_kernel,
34 from entry_point import (base_launch_kernel, make_argument_parser, make_kernel,
35 start_kernel)
35 start_kernel)
36 from iostream import OutStream
36 from iostream import OutStream
37 from session import Session, Message
37 from session import Session, Message
38 from zmqshell import ZMQInteractiveShell
38 from zmqshell import ZMQInteractiveShell
39
39
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41 # Globals
42 #-----------------------------------------------------------------------------
43
44 # Module-level logger
45 logger = logging.getLogger(__name__)
46
47 #-----------------------------------------------------------------------------
41 # Main kernel class
48 # Main kernel class
42 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
43
50
44 class Kernel(Configurable):
51 class Kernel(Configurable):
45
52
46 #---------------------------------------------------------------------------
53 #---------------------------------------------------------------------------
47 # Kernel interface
54 # Kernel interface
48 #---------------------------------------------------------------------------
55 #---------------------------------------------------------------------------
49
56
50 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
57 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
51 session = Instance(Session)
58 session = Instance(Session)
52 reply_socket = Instance('zmq.Socket')
59 reply_socket = Instance('zmq.Socket')
53 pub_socket = Instance('zmq.Socket')
60 pub_socket = Instance('zmq.Socket')
54 req_socket = Instance('zmq.Socket')
61 req_socket = Instance('zmq.Socket')
55
62
56 # Private interface
63 # Private interface
57
64
58 # Time to sleep after flushing the stdout/err buffers in each execute
65 # Time to sleep after flushing the stdout/err buffers in each execute
59 # cycle. While this introduces a hard limit on the minimal latency of the
66 # cycle. While this introduces a hard limit on the minimal latency of the
60 # execute cycle, it helps prevent output synchronization problems for
67 # execute cycle, it helps prevent output synchronization problems for
61 # clients.
68 # clients.
62 # Units are in seconds. The minimum zmq latency on local host is probably
69 # Units are in seconds. The minimum zmq latency on local host is probably
63 # ~150 microseconds, set this to 500us for now. We may need to increase it
70 # ~150 microseconds, set this to 500us for now. We may need to increase it
64 # a little if it's not enough after more interactive testing.
71 # a little if it's not enough after more interactive testing.
65 _execute_sleep = Float(0.0005, config=True)
72 _execute_sleep = Float(0.0005, config=True)
66
73
67 # Frequency of the kernel's event loop.
74 # Frequency of the kernel's event loop.
68 # Units are in seconds, kernel subclasses for GUI toolkits may need to
75 # Units are in seconds, kernel subclasses for GUI toolkits may need to
69 # adapt to milliseconds.
76 # adapt to milliseconds.
70 _poll_interval = Float(0.05, config=True)
77 _poll_interval = Float(0.05, config=True)
71
78
72 # If the shutdown was requested over the network, we leave here the
79 # If the shutdown was requested over the network, we leave here the
73 # necessary reply message so it can be sent by our registered atexit
80 # necessary reply message so it can be sent by our registered atexit
74 # handler. This ensures that the reply is only sent to clients truly at
81 # handler. This ensures that the reply is only sent to clients truly at
75 # the end of our shutdown process (which happens after the underlying
82 # the end of our shutdown process (which happens after the underlying
76 # IPython shell's own shutdown).
83 # IPython shell's own shutdown).
77 _shutdown_message = None
84 _shutdown_message = None
78
85
79 # This is a dict of port number that the kernel is listening on. It is set
86 # This is a dict of port number that the kernel is listening on. It is set
80 # by record_ports and used by connect_request.
87 # by record_ports and used by connect_request.
81 _recorded_ports = None
88 _recorded_ports = None
82
89
90
83 def __init__(self, **kwargs):
91 def __init__(self, **kwargs):
84 super(Kernel, self).__init__(**kwargs)
92 super(Kernel, self).__init__(**kwargs)
85
93
86 # Before we even start up the shell, register *first* our exit handlers
94 # Before we even start up the shell, register *first* our exit handlers
87 # so they come before the shell's
95 # so they come before the shell's
88 atexit.register(self._at_shutdown)
96 atexit.register(self._at_shutdown)
89
97
90 # Initialize the InteractiveShell subclass
98 # Initialize the InteractiveShell subclass
91 self.shell = ZMQInteractiveShell.instance()
99 self.shell = ZMQInteractiveShell.instance()
92 self.shell.displayhook.session = self.session
100 self.shell.displayhook.session = self.session
93 self.shell.displayhook.pub_socket = self.pub_socket
101 self.shell.displayhook.pub_socket = self.pub_socket
94 self.shell.display_pub.session = self.session
102 self.shell.display_pub.session = self.session
95 self.shell.display_pub.pub_socket = self.pub_socket
103 self.shell.display_pub.pub_socket = self.pub_socket
96
104
97 # TMP - hack while developing
105 # TMP - hack while developing
98 self.shell._reply_content = None
106 self.shell._reply_content = None
99
107
100 # Build dict of handlers for message types
108 # Build dict of handlers for message types
101 msg_types = [ 'execute_request', 'complete_request',
109 msg_types = [ 'execute_request', 'complete_request',
102 'object_info_request', 'history_request',
110 'object_info_request', 'history_request',
103 'connect_request', 'shutdown_request']
111 'connect_request', 'shutdown_request']
104 self.handlers = {}
112 self.handlers = {}
105 for msg_type in msg_types:
113 for msg_type in msg_types:
106 self.handlers[msg_type] = getattr(self, msg_type)
114 self.handlers[msg_type] = getattr(self, msg_type)
107
115
108 def do_one_iteration(self):
116 def do_one_iteration(self):
109 """Do one iteration of the kernel's evaluation loop.
117 """Do one iteration of the kernel's evaluation loop.
110 """
118 """
111 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
119 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
112 if msg is None:
120 if msg is None:
113 return
121 return
114
122
115 # This assert will raise in versions of zeromq 2.0.7 and lesser.
123 # This assert will raise in versions of zeromq 2.0.7 and lesser.
116 # We now require 2.0.8 or above, so we can uncomment for safety.
124 # We now require 2.0.8 or above, so we can uncomment for safety.
117 # print(ident,msg, file=sys.__stdout__)
125 # print(ident,msg, file=sys.__stdout__)
118 assert ident is not None, "Missing message part."
126 assert ident is not None, "Missing message part."
119
127
120 # Print some info about this message and leave a '--->' marker, so it's
128 # Print some info about this message and leave a '--->' marker, so it's
121 # easier to trace visually the message chain when debugging. Each
129 # easier to trace visually the message chain when debugging. Each
122 # handler prints its message at the end.
130 # handler prints its message at the end.
123 # Eventually we'll move these from stdout to a logger.
131 # Eventually we'll move these from stdout to a logger.
124 io.raw_print('\n*** MESSAGE TYPE:', msg['msg_type'], '***')
132 logger.debug('\n*** MESSAGE TYPE:'+str(msg['msg_type'])+'***')
125 io.raw_print(' Content: ', msg['content'],
133 logger.debug(' Content: '+str(msg['content'])+'\n --->\n ')
126 '\n --->\n ', sep='', end='')
127
134
128 # Find and call actual handler for message
135 # Find and call actual handler for message
129 handler = self.handlers.get(msg['msg_type'], None)
136 handler = self.handlers.get(msg['msg_type'], None)
130 if handler is None:
137 if handler is None:
131 io.raw_print_err("UNKNOWN MESSAGE TYPE:", msg)
138 logger.error("UNKNOWN MESSAGE TYPE:" +str(msg))
132 else:
139 else:
133 handler(ident, msg)
140 handler(ident, msg)
134
141
135 # Check whether we should exit, in case the incoming message set the
142 # Check whether we should exit, in case the incoming message set the
136 # exit flag on
143 # exit flag on
137 if self.shell.exit_now:
144 if self.shell.exit_now:
138 io.raw_print('\nExiting IPython kernel...')
145 logger.debug('\nExiting IPython kernel...')
139 # We do a normal, clean exit, which allows any actions registered
146 # We do a normal, clean exit, which allows any actions registered
140 # via atexit (such as history saving) to take place.
147 # via atexit (such as history saving) to take place.
141 sys.exit(0)
148 sys.exit(0)
142
149
143
150
144 def start(self):
151 def start(self):
145 """ Start the kernel main loop.
152 """ Start the kernel main loop.
146 """
153 """
147 while True:
154 while True:
148 time.sleep(self._poll_interval)
155 time.sleep(self._poll_interval)
149 self.do_one_iteration()
156 self.do_one_iteration()
150
157
151 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
158 def record_ports(self, xrep_port, pub_port, req_port, hb_port):
152 """Record the ports that this kernel is using.
159 """Record the ports that this kernel is using.
153
160
154 The creator of the Kernel instance must call this methods if they
161 The creator of the Kernel instance must call this methods if they
155 want the :meth:`connect_request` method to return the port numbers.
162 want the :meth:`connect_request` method to return the port numbers.
156 """
163 """
157 self._recorded_ports = {
164 self._recorded_ports = {
158 'xrep_port' : xrep_port,
165 'xrep_port' : xrep_port,
159 'pub_port' : pub_port,
166 'pub_port' : pub_port,
160 'req_port' : req_port,
167 'req_port' : req_port,
161 'hb_port' : hb_port
168 'hb_port' : hb_port
162 }
169 }
163
170
164 #---------------------------------------------------------------------------
171 #---------------------------------------------------------------------------
165 # Kernel request handlers
172 # Kernel request handlers
166 #---------------------------------------------------------------------------
173 #---------------------------------------------------------------------------
167
174
168 def _publish_pyin(self, code, parent):
175 def _publish_pyin(self, code, parent):
169 """Publish the code request on the pyin stream."""
176 """Publish the code request on the pyin stream."""
170
177
171 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
178 pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent)
172
179
173 def execute_request(self, ident, parent):
180 def execute_request(self, ident, parent):
174
181
175 status_msg = self.session.send(self.pub_socket,
182 status_msg = self.session.send(self.pub_socket,
176 u'status',
183 u'status',
177 {u'execution_state':u'busy'},
184 {u'execution_state':u'busy'},
178 parent=parent
185 parent=parent
179 )
186 )
180
187
181 try:
188 try:
182 content = parent[u'content']
189 content = parent[u'content']
183 code = content[u'code']
190 code = content[u'code']
184 silent = content[u'silent']
191 silent = content[u'silent']
185 except:
192 except:
186 io.raw_print_err("Got bad msg: ")
193 logger.error("Got bad msg: ")
187 io.raw_print_err(Message(parent))
194 logger.error(str(Message(parent)))
188 return
195 return
189
196
190 shell = self.shell # we'll need this a lot here
197 shell = self.shell # we'll need this a lot here
191
198
192 # Replace raw_input. Note that is not sufficient to replace
199 # Replace raw_input. Note that is not sufficient to replace
193 # raw_input in the user namespace.
200 # raw_input in the user namespace.
194 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
201 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
195 __builtin__.raw_input = raw_input
202 __builtin__.raw_input = raw_input
196
203
197 # Set the parent message of the display hook and out streams.
204 # Set the parent message of the display hook and out streams.
198 shell.displayhook.set_parent(parent)
205 shell.displayhook.set_parent(parent)
199 shell.display_pub.set_parent(parent)
206 shell.display_pub.set_parent(parent)
200 sys.stdout.set_parent(parent)
207 sys.stdout.set_parent(parent)
201 sys.stderr.set_parent(parent)
208 sys.stderr.set_parent(parent)
202
209
203 # Re-broadcast our input for the benefit of listening clients, and
210 # Re-broadcast our input for the benefit of listening clients, and
204 # start computing output
211 # start computing output
205 if not silent:
212 if not silent:
206 self._publish_pyin(code, parent)
213 self._publish_pyin(code, parent)
207
214
208 reply_content = {}
215 reply_content = {}
209 try:
216 try:
210 if silent:
217 if silent:
211 # run_code uses 'exec' mode, so no displayhook will fire, and it
218 # run_code uses 'exec' mode, so no displayhook will fire, and it
212 # doesn't call logging or history manipulations. Print
219 # doesn't call logging or history manipulations. Print
213 # statements in that code will obviously still execute.
220 # statements in that code will obviously still execute.
214 shell.run_code(code)
221 shell.run_code(code)
215 else:
222 else:
216 # FIXME: the shell calls the exception handler itself.
223 # FIXME: the shell calls the exception handler itself.
217 shell._reply_content = None
224 shell._reply_content = None
218 shell.run_cell(code)
225 shell.run_cell(code)
219 except:
226 except:
220 status = u'error'
227 status = u'error'
221 # FIXME: this code right now isn't being used yet by default,
228 # FIXME: this code right now isn't being used yet by default,
222 # because the runlines() call above directly fires off exception
229 # because the runlines() call above directly fires off exception
223 # reporting. This code, therefore, is only active in the scenario
230 # reporting. This code, therefore, is only active in the scenario
224 # where runlines itself has an unhandled exception. We need to
231 # where runlines itself has an unhandled exception. We need to
225 # uniformize this, for all exception construction to come from a
232 # uniformize this, for all exception construction to come from a
226 # single location in the codbase.
233 # single location in the codbase.
227 etype, evalue, tb = sys.exc_info()
234 etype, evalue, tb = sys.exc_info()
228 tb_list = traceback.format_exception(etype, evalue, tb)
235 tb_list = traceback.format_exception(etype, evalue, tb)
229 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
236 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
230 else:
237 else:
231 status = u'ok'
238 status = u'ok'
232
239
233 reply_content[u'status'] = status
240 reply_content[u'status'] = status
234
241
235 # Return the execution counter so clients can display prompts
242 # Return the execution counter so clients can display prompts
236 reply_content['execution_count'] = shell.execution_count -1
243 reply_content['execution_count'] = shell.execution_count -1
237
244
238 # FIXME - fish exception info out of shell, possibly left there by
245 # FIXME - fish exception info out of shell, possibly left there by
239 # runlines. We'll need to clean up this logic later.
246 # runlines. We'll need to clean up this logic later.
240 if shell._reply_content is not None:
247 if shell._reply_content is not None:
241 reply_content.update(shell._reply_content)
248 reply_content.update(shell._reply_content)
242
249
243 # At this point, we can tell whether the main code execution succeeded
250 # At this point, we can tell whether the main code execution succeeded
244 # or not. If it did, we proceed to evaluate user_variables/expressions
251 # or not. If it did, we proceed to evaluate user_variables/expressions
245 if reply_content['status'] == 'ok':
252 if reply_content['status'] == 'ok':
246 reply_content[u'user_variables'] = \
253 reply_content[u'user_variables'] = \
247 shell.user_variables(content[u'user_variables'])
254 shell.user_variables(content[u'user_variables'])
248 reply_content[u'user_expressions'] = \
255 reply_content[u'user_expressions'] = \
249 shell.user_expressions(content[u'user_expressions'])
256 shell.user_expressions(content[u'user_expressions'])
250 else:
257 else:
251 # If there was an error, don't even try to compute variables or
258 # If there was an error, don't even try to compute variables or
252 # expressions
259 # expressions
253 reply_content[u'user_variables'] = {}
260 reply_content[u'user_variables'] = {}
254 reply_content[u'user_expressions'] = {}
261 reply_content[u'user_expressions'] = {}
255
262
256 # Payloads should be retrieved regardless of outcome, so we can both
263 # Payloads should be retrieved regardless of outcome, so we can both
257 # recover partial output (that could have been generated early in a
264 # recover partial output (that could have been generated early in a
258 # block, before an error) and clear the payload system always.
265 # block, before an error) and clear the payload system always.
259 reply_content[u'payload'] = shell.payload_manager.read_payload()
266 reply_content[u'payload'] = shell.payload_manager.read_payload()
260 # Be agressive about clearing the payload because we don't want
267 # Be agressive about clearing the payload because we don't want
261 # it to sit in memory until the next execute_request comes in.
268 # it to sit in memory until the next execute_request comes in.
262 shell.payload_manager.clear_payload()
269 shell.payload_manager.clear_payload()
263
270
264 # Send the reply.
271 # Send the reply.
265 reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident)
272 reply_msg = self.session.send(self.reply_socket, u'execute_reply',
266 io.raw_print(reply_msg)
273 reply_content, parent, ident=ident)
274 logger.debug(str(reply_msg))
267
275
268 # Flush output before sending the reply.
276 # Flush output before sending the reply.
269 sys.stdout.flush()
277 sys.stdout.flush()
270 sys.stderr.flush()
278 sys.stderr.flush()
271 # FIXME: on rare occasions, the flush doesn't seem to make it to the
279 # FIXME: on rare occasions, the flush doesn't seem to make it to the
272 # clients... This seems to mitigate the problem, but we definitely need
280 # clients... This seems to mitigate the problem, but we definitely need
273 # to better understand what's going on.
281 # to better understand what's going on.
274 if self._execute_sleep:
282 if self._execute_sleep:
275 time.sleep(self._execute_sleep)
283 time.sleep(self._execute_sleep)
276
284
277 if reply_msg['content']['status'] == u'error':
285 if reply_msg['content']['status'] == u'error':
278 self._abort_queue()
286 self._abort_queue()
279
287
280 status_msg = self.session.send(self.pub_socket,
288 status_msg = self.session.send(self.pub_socket,
281 u'status',
289 u'status',
282 {u'execution_state':u'idle'},
290 {u'execution_state':u'idle'},
283 parent=parent
291 parent=parent
284 )
292 )
285
293
286 def complete_request(self, ident, parent):
294 def complete_request(self, ident, parent):
287 txt, matches = self._complete(parent)
295 txt, matches = self._complete(parent)
288 matches = {'matches' : matches,
296 matches = {'matches' : matches,
289 'matched_text' : txt,
297 'matched_text' : txt,
290 'status' : 'ok'}
298 'status' : 'ok'}
291 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
299 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
292 matches, parent, ident)
300 matches, parent, ident)
293 io.raw_print(completion_msg)
301 logger.debug(str(completion_msg))
294
302
295 def object_info_request(self, ident, parent):
303 def object_info_request(self, ident, parent):
296 object_info = self.shell.object_inspect(parent['content']['oname'])
304 object_info = self.shell.object_inspect(parent['content']['oname'])
297 # Before we send this object over, we scrub it for JSON usage
305 # Before we send this object over, we scrub it for JSON usage
298 oinfo = json_clean(object_info)
306 oinfo = json_clean(object_info)
299 msg = self.session.send(self.reply_socket, 'object_info_reply',
307 msg = self.session.send(self.reply_socket, 'object_info_reply',
300 oinfo, parent, ident)
308 oinfo, parent, ident)
301 io.raw_print(msg)
309 logger.debug(msg)
302
310
303 def history_request(self, ident, parent):
311 def history_request(self, ident, parent):
304 output = parent['content']['output']
312 output = parent['content']['output']
305 index = parent['content']['index']
313 index = parent['content']['index']
306 raw = parent['content']['raw']
314 raw = parent['content']['raw']
307 hist = self.shell.get_history(index=index, raw=raw, output=output)
315 hist = self.shell.get_history(index=index, raw=raw, output=output)
308 content = {'history' : hist}
316 content = {'history' : hist}
309 msg = self.session.send(self.reply_socket, 'history_reply',
317 msg = self.session.send(self.reply_socket, 'history_reply',
310 content, parent, ident)
318 content, parent, ident)
311 io.raw_print(msg)
319 logger.debug(str(msg))
312
320
313 def connect_request(self, ident, parent):
321 def connect_request(self, ident, parent):
314 if self._recorded_ports is not None:
322 if self._recorded_ports is not None:
315 content = self._recorded_ports.copy()
323 content = self._recorded_ports.copy()
316 else:
324 else:
317 content = {}
325 content = {}
318 msg = self.session.send(self.reply_socket, 'connect_reply',
326 msg = self.session.send(self.reply_socket, 'connect_reply',
319 content, parent, ident)
327 content, parent, ident)
320 io.raw_print(msg)
328 logger.debug(msg)
321
329
322 def shutdown_request(self, ident, parent):
330 def shutdown_request(self, ident, parent):
323 self.shell.exit_now = True
331 self.shell.exit_now = True
324 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
332 self._shutdown_message = self.session.msg(u'shutdown_reply', parent['content'], parent)
325 sys.exit(0)
333 sys.exit(0)
326
334
327 #---------------------------------------------------------------------------
335 #---------------------------------------------------------------------------
328 # Protected interface
336 # Protected interface
329 #---------------------------------------------------------------------------
337 #---------------------------------------------------------------------------
330
338
331 def _abort_queue(self):
339 def _abort_queue(self):
332 while True:
340 while True:
333 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
341 ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK)
334 if msg is None:
342 if msg is None:
335 break
343 break
336 else:
344 else:
337 assert ident is not None, \
345 assert ident is not None, \
338 "Unexpected missing message part."
346 "Unexpected missing message part."
339 io.raw_print("Aborting:\n", Message(msg))
347
348 logger.debug("Aborting:\n"+str(Message(msg)))
340 msg_type = msg['msg_type']
349 msg_type = msg['msg_type']
341 reply_type = msg_type.split('_')[0] + '_reply'
350 reply_type = msg_type.split('_')[0] + '_reply'
342 reply_msg = self.session.send(self.reply_socket, reply_type,
351 reply_msg = self.session.send(self.reply_socket, reply_type,
343 {'status' : 'aborted'}, msg, ident=ident)
352 {'status' : 'aborted'}, msg, ident=ident)
344 io.raw_print(reply_msg)
353 logger.debug(reply_msg)
345 # We need to wait a bit for requests to come in. This can probably
354 # We need to wait a bit for requests to come in. This can probably
346 # be set shorter for true asynchronous clients.
355 # be set shorter for true asynchronous clients.
347 time.sleep(0.1)
356 time.sleep(0.1)
348
357
349 def _raw_input(self, prompt, ident, parent):
358 def _raw_input(self, prompt, ident, parent):
350 # Flush output before making the request.
359 # Flush output before making the request.
351 sys.stderr.flush()
360 sys.stderr.flush()
352 sys.stdout.flush()
361 sys.stdout.flush()
353
362
354 # Send the input request.
363 # Send the input request.
355 content = dict(prompt=prompt)
364 content = dict(prompt=prompt)
356 msg = self.session.send(self.req_socket, u'input_request', content, parent)
365 msg = self.session.send(self.req_socket, u'input_request', content, parent)
357
366
358 # Await a response.
367 # Await a response.
359 ident, reply = self.session.recv(self.req_socket, 0)
368 ident, reply = self.session.recv(self.req_socket, 0)
360 try:
369 try:
361 value = reply['content']['value']
370 value = reply['content']['value']
362 except:
371 except:
363 io.raw_print_err("Got bad raw_input reply: ")
372 logger.error("Got bad raw_input reply: ")
364 io.raw_print_err(Message(parent))
373 logger.error(str(Message(parent)))
365 value = ''
374 value = ''
366 return value
375 return value
367
376
368 def _complete(self, msg):
377 def _complete(self, msg):
369 c = msg['content']
378 c = msg['content']
370 try:
379 try:
371 cpos = int(c['cursor_pos'])
380 cpos = int(c['cursor_pos'])
372 except:
381 except:
373 # If we don't get something that we can convert to an integer, at
382 # If we don't get something that we can convert to an integer, at
374 # least attempt the completion guessing the cursor is at the end of
383 # least attempt the completion guessing the cursor is at the end of
375 # the text, if there's any, and otherwise of the line
384 # the text, if there's any, and otherwise of the line
376 cpos = len(c['text'])
385 cpos = len(c['text'])
377 if cpos==0:
386 if cpos==0:
378 cpos = len(c['line'])
387 cpos = len(c['line'])
379 return self.shell.complete(c['text'], c['line'], cpos)
388 return self.shell.complete(c['text'], c['line'], cpos)
380
389
381 def _object_info(self, context):
390 def _object_info(self, context):
382 symbol, leftover = self._symbol_from_context(context)
391 symbol, leftover = self._symbol_from_context(context)
383 if symbol is not None and not leftover:
392 if symbol is not None and not leftover:
384 doc = getattr(symbol, '__doc__', '')
393 doc = getattr(symbol, '__doc__', '')
385 else:
394 else:
386 doc = ''
395 doc = ''
387 object_info = dict(docstring = doc)
396 object_info = dict(docstring = doc)
388 return object_info
397 return object_info
389
398
390 def _symbol_from_context(self, context):
399 def _symbol_from_context(self, context):
391 if not context:
400 if not context:
392 return None, context
401 return None, context
393
402
394 base_symbol_string = context[0]
403 base_symbol_string = context[0]
395 symbol = self.shell.user_ns.get(base_symbol_string, None)
404 symbol = self.shell.user_ns.get(base_symbol_string, None)
396 if symbol is None:
405 if symbol is None:
397 symbol = __builtin__.__dict__.get(base_symbol_string, None)
406 symbol = __builtin__.__dict__.get(base_symbol_string, None)
398 if symbol is None:
407 if symbol is None:
399 return None, context
408 return None, context
400
409
401 context = context[1:]
410 context = context[1:]
402 for i, name in enumerate(context):
411 for i, name in enumerate(context):
403 new_symbol = getattr(symbol, name, None)
412 new_symbol = getattr(symbol, name, None)
404 if new_symbol is None:
413 if new_symbol is None:
405 return symbol, context[i:]
414 return symbol, context[i:]
406 else:
415 else:
407 symbol = new_symbol
416 symbol = new_symbol
408
417
409 return symbol, []
418 return symbol, []
410
419
411 def _at_shutdown(self):
420 def _at_shutdown(self):
412 """Actions taken at shutdown by the kernel, called by python's atexit.
421 """Actions taken at shutdown by the kernel, called by python's atexit.
413 """
422 """
414 # io.rprint("Kernel at_shutdown") # dbg
423 # io.rprint("Kernel at_shutdown") # dbg
415 if self._shutdown_message is not None:
424 if self._shutdown_message is not None:
416 self.session.send(self.reply_socket, self._shutdown_message)
425 self.session.send(self.reply_socket, self._shutdown_message)
417 self.session.send(self.pub_socket, self._shutdown_message)
426 self.session.send(self.pub_socket, self._shutdown_message)
418 io.raw_print(self._shutdown_message)
427 logger.debug(str(self._shutdown_message))
419 # A very short sleep to give zmq time to flush its message buffers
428 # A very short sleep to give zmq time to flush its message buffers
420 # before Python truly shuts down.
429 # before Python truly shuts down.
421 time.sleep(0.01)
430 time.sleep(0.01)
422
431
423
432
424 class QtKernel(Kernel):
433 class QtKernel(Kernel):
425 """A Kernel subclass with Qt support."""
434 """A Kernel subclass with Qt support."""
426
435
427 def start(self):
436 def start(self):
428 """Start a kernel with QtPy4 event loop integration."""
437 """Start a kernel with QtPy4 event loop integration."""
429
438
430 from PyQt4 import QtCore
439 from PyQt4 import QtCore
431 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
440 from IPython.lib.guisupport import get_app_qt4, start_event_loop_qt4
432
441
433 self.app = get_app_qt4([" "])
442 self.app = get_app_qt4([" "])
434 self.app.setQuitOnLastWindowClosed(False)
443 self.app.setQuitOnLastWindowClosed(False)
435 self.timer = QtCore.QTimer()
444 self.timer = QtCore.QTimer()
436 self.timer.timeout.connect(self.do_one_iteration)
445 self.timer.timeout.connect(self.do_one_iteration)
437 # Units for the timer are in milliseconds
446 # Units for the timer are in milliseconds
438 self.timer.start(1000*self._poll_interval)
447 self.timer.start(1000*self._poll_interval)
439 start_event_loop_qt4(self.app)
448 start_event_loop_qt4(self.app)
440
449
441
450
442 class WxKernel(Kernel):
451 class WxKernel(Kernel):
443 """A Kernel subclass with Wx support."""
452 """A Kernel subclass with Wx support."""
444
453
445 def start(self):
454 def start(self):
446 """Start a kernel with wx event loop support."""
455 """Start a kernel with wx event loop support."""
447
456
448 import wx
457 import wx
449 from IPython.lib.guisupport import start_event_loop_wx
458 from IPython.lib.guisupport import start_event_loop_wx
450
459
451 doi = self.do_one_iteration
460 doi = self.do_one_iteration
452 # Wx uses milliseconds
461 # Wx uses milliseconds
453 poll_interval = int(1000*self._poll_interval)
462 poll_interval = int(1000*self._poll_interval)
454
463
455 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
464 # We have to put the wx.Timer in a wx.Frame for it to fire properly.
456 # We make the Frame hidden when we create it in the main app below.
465 # We make the Frame hidden when we create it in the main app below.
457 class TimerFrame(wx.Frame):
466 class TimerFrame(wx.Frame):
458 def __init__(self, func):
467 def __init__(self, func):
459 wx.Frame.__init__(self, None, -1)
468 wx.Frame.__init__(self, None, -1)
460 self.timer = wx.Timer(self)
469 self.timer = wx.Timer(self)
461 # Units for the timer are in milliseconds
470 # Units for the timer are in milliseconds
462 self.timer.Start(poll_interval)
471 self.timer.Start(poll_interval)
463 self.Bind(wx.EVT_TIMER, self.on_timer)
472 self.Bind(wx.EVT_TIMER, self.on_timer)
464 self.func = func
473 self.func = func
465
474
466 def on_timer(self, event):
475 def on_timer(self, event):
467 self.func()
476 self.func()
468
477
469 # We need a custom wx.App to create our Frame subclass that has the
478 # We need a custom wx.App to create our Frame subclass that has the
470 # wx.Timer to drive the ZMQ event loop.
479 # wx.Timer to drive the ZMQ event loop.
471 class IPWxApp(wx.App):
480 class IPWxApp(wx.App):
472 def OnInit(self):
481 def OnInit(self):
473 self.frame = TimerFrame(doi)
482 self.frame = TimerFrame(doi)
474 self.frame.Show(False)
483 self.frame.Show(False)
475 return True
484 return True
476
485
477 # The redirect=False here makes sure that wx doesn't replace
486 # The redirect=False here makes sure that wx doesn't replace
478 # sys.stdout/stderr with its own classes.
487 # sys.stdout/stderr with its own classes.
479 self.app = IPWxApp(redirect=False)
488 self.app = IPWxApp(redirect=False)
480 start_event_loop_wx(self.app)
489 start_event_loop_wx(self.app)
481
490
482
491
483 class TkKernel(Kernel):
492 class TkKernel(Kernel):
484 """A Kernel subclass with Tk support."""
493 """A Kernel subclass with Tk support."""
485
494
486 def start(self):
495 def start(self):
487 """Start a Tk enabled event loop."""
496 """Start a Tk enabled event loop."""
488
497
489 import Tkinter
498 import Tkinter
490 doi = self.do_one_iteration
499 doi = self.do_one_iteration
491 # Tk uses milliseconds
500 # Tk uses milliseconds
492 poll_interval = int(1000*self._poll_interval)
501 poll_interval = int(1000*self._poll_interval)
493 # For Tkinter, we create a Tk object and call its withdraw method.
502 # For Tkinter, we create a Tk object and call its withdraw method.
494 class Timer(object):
503 class Timer(object):
495 def __init__(self, func):
504 def __init__(self, func):
496 self.app = Tkinter.Tk()
505 self.app = Tkinter.Tk()
497 self.app.withdraw()
506 self.app.withdraw()
498 self.func = func
507 self.func = func
499
508
500 def on_timer(self):
509 def on_timer(self):
501 self.func()
510 self.func()
502 self.app.after(poll_interval, self.on_timer)
511 self.app.after(poll_interval, self.on_timer)
503
512
504 def start(self):
513 def start(self):
505 self.on_timer() # Call it once to get things going.
514 self.on_timer() # Call it once to get things going.
506 self.app.mainloop()
515 self.app.mainloop()
507
516
508 self.timer = Timer(doi)
517 self.timer = Timer(doi)
509 self.timer.start()
518 self.timer.start()
510
519
511
520
512 class GTKKernel(Kernel):
521 class GTKKernel(Kernel):
513 """A Kernel subclass with GTK support."""
522 """A Kernel subclass with GTK support."""
514
523
515 def start(self):
524 def start(self):
516 """Start the kernel, coordinating with the GTK event loop"""
525 """Start the kernel, coordinating with the GTK event loop"""
517 from .gui.gtkembed import GTKEmbed
526 from .gui.gtkembed import GTKEmbed
518
527
519 gtk_kernel = GTKEmbed(self)
528 gtk_kernel = GTKEmbed(self)
520 gtk_kernel.start()
529 gtk_kernel.start()
521
530
522
531
523 #-----------------------------------------------------------------------------
532 #-----------------------------------------------------------------------------
524 # Kernel main and launch functions
533 # Kernel main and launch functions
525 #-----------------------------------------------------------------------------
534 #-----------------------------------------------------------------------------
526
535
527 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
536 def launch_kernel(ip=None, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
528 independent=False, pylab=False, colors=None):
537 independent=False, pylab=False, colors=None):
529 """Launches a localhost kernel, binding to the specified ports.
538 """Launches a localhost kernel, binding to the specified ports.
530
539
531 Parameters
540 Parameters
532 ----------
541 ----------
533 ip : str, optional
542 ip : str, optional
534 The ip address the kernel will bind to.
543 The ip address the kernel will bind to.
535
544
536 xrep_port : int, optional
545 xrep_port : int, optional
537 The port to use for XREP channel.
546 The port to use for XREP channel.
538
547
539 pub_port : int, optional
548 pub_port : int, optional
540 The port to use for the SUB channel.
549 The port to use for the SUB channel.
541
550
542 req_port : int, optional
551 req_port : int, optional
543 The port to use for the REQ (raw input) channel.
552 The port to use for the REQ (raw input) channel.
544
553
545 hb_port : int, optional
554 hb_port : int, optional
546 The port to use for the hearbeat REP channel.
555 The port to use for the hearbeat REP channel.
547
556
548 independent : bool, optional (default False)
557 independent : bool, optional (default False)
549 If set, the kernel process is guaranteed to survive if this process
558 If set, the kernel process is guaranteed to survive if this process
550 dies. If not set, an effort is made to ensure that the kernel is killed
559 dies. If not set, an effort is made to ensure that the kernel is killed
551 when this process dies. Note that in this case it is still good practice
560 when this process dies. Note that in this case it is still good practice
552 to kill kernels manually before exiting.
561 to kill kernels manually before exiting.
553
562
554 pylab : bool or string, optional (default False)
563 pylab : bool or string, optional (default False)
555 If not False, the kernel will be launched with pylab enabled. If a
564 If not False, the kernel will be launched with pylab enabled. If a
556 string is passed, matplotlib will use the specified backend. Otherwise,
565 string is passed, matplotlib will use the specified backend. Otherwise,
557 matplotlib's default backend will be used.
566 matplotlib's default backend will be used.
558
567
559 colors : None or string, optional (default None)
568 colors : None or string, optional (default None)
560 If not None, specify the color scheme. One of (NoColor, LightBG, Linux)
569 If not None, specify the color scheme. One of (NoColor, LightBG, Linux)
561
570
562 Returns
571 Returns
563 -------
572 -------
564 A tuple of form:
573 A tuple of form:
565 (kernel_process, xrep_port, pub_port, req_port)
574 (kernel_process, xrep_port, pub_port, req_port)
566 where kernel_process is a Popen object and the ports are integers.
575 where kernel_process is a Popen object and the ports are integers.
567 """
576 """
568 extra_arguments = []
577 extra_arguments = []
569 if pylab:
578 if pylab:
570 extra_arguments.append('--pylab')
579 extra_arguments.append('--pylab')
571 if isinstance(pylab, basestring):
580 if isinstance(pylab, basestring):
572 extra_arguments.append(pylab)
581 extra_arguments.append(pylab)
573 if ip is not None:
582 if ip is not None:
574 extra_arguments.append('--ip')
583 extra_arguments.append('--ip')
575 if isinstance(ip, basestring):
584 if isinstance(ip, basestring):
576 extra_arguments.append(ip)
585 extra_arguments.append(ip)
577 if colors is not None:
586 if colors is not None:
578 extra_arguments.append('--colors')
587 extra_arguments.append('--colors')
579 extra_arguments.append(colors)
588 extra_arguments.append(colors)
580 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
589 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
581 xrep_port, pub_port, req_port, hb_port,
590 xrep_port, pub_port, req_port, hb_port,
582 independent, extra_arguments)
591 independent, extra_arguments)
583
592
584
593
585 def main():
594 def main():
586 """ The IPython kernel main entry point.
595 """ The IPython kernel main entry point.
587 """
596 """
588 parser = make_argument_parser()
597 parser = make_argument_parser()
589 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
598 parser.add_argument('--pylab', type=str, metavar='GUI', nargs='?',
590 const='auto', help = \
599 const='auto', help = \
591 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
600 "Pre-load matplotlib and numpy for interactive use. If GUI is not \
592 given, the GUI backend is matplotlib's, otherwise use one of: \
601 given, the GUI backend is matplotlib's, otherwise use one of: \
593 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
602 ['tk', 'gtk', 'qt', 'wx', 'inline'].")
594 parser.add_argument('--colors',
603 parser.add_argument('--colors',
595 type=str, dest='colors',
604 type=str, dest='colors',
596 help="Set the color scheme (NoColor, Linux, and LightBG).",
605 help="Set the color scheme (NoColor, Linux, and LightBG).",
597 metavar='ZMQInteractiveShell.colors')
606 metavar='ZMQInteractiveShell.colors')
598 namespace = parser.parse_args()
607 namespace = parser.parse_args()
599
608
600 kernel_class = Kernel
609 kernel_class = Kernel
601
610
602 kernel_classes = {
611 kernel_classes = {
603 'qt' : QtKernel,
612 'qt' : QtKernel,
604 'qt4': QtKernel,
613 'qt4': QtKernel,
605 'inline': Kernel,
614 'inline': Kernel,
606 'wx' : WxKernel,
615 'wx' : WxKernel,
607 'tk' : TkKernel,
616 'tk' : TkKernel,
608 'gtk': GTKKernel,
617 'gtk': GTKKernel,
609 }
618 }
610 if namespace.pylab:
619 if namespace.pylab:
611 if namespace.pylab == 'auto':
620 if namespace.pylab == 'auto':
612 gui, backend = pylabtools.find_gui_and_backend()
621 gui, backend = pylabtools.find_gui_and_backend()
613 else:
622 else:
614 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
623 gui, backend = pylabtools.find_gui_and_backend(namespace.pylab)
615 kernel_class = kernel_classes.get(gui)
624 kernel_class = kernel_classes.get(gui)
616 if kernel_class is None:
625 if kernel_class is None:
617 raise ValueError('GUI is not supported: %r' % gui)
626 raise ValueError('GUI is not supported: %r' % gui)
618 pylabtools.activate_matplotlib(backend)
627 pylabtools.activate_matplotlib(backend)
619 if namespace.colors:
628 if namespace.colors:
620 ZMQInteractiveShell.colors=namespace.colors
629 ZMQInteractiveShell.colors=namespace.colors
621
630
622 kernel = make_kernel(namespace, kernel_class, OutStream)
631 kernel = make_kernel(namespace, kernel_class, OutStream)
623
632
624 if namespace.pylab:
633 if namespace.pylab:
625 pylabtools.import_pylab(kernel.shell.user_ns, backend,
634 pylabtools.import_pylab(kernel.shell.user_ns, backend,
626 shell=kernel.shell)
635 shell=kernel.shell)
627
636
628 start_kernel(namespace, kernel)
637 start_kernel(namespace, kernel)
629
638
630
639
631 if __name__ == '__main__':
640 if __name__ == '__main__':
632 main()
641 main()
@@ -1,908 +1,909 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 TODO
3 TODO
4 * Create logger to handle debugging and console messages.
4 * Create logger to handle debugging and console messages.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2010 The IPython Development Team
8 # Copyright (C) 2008-2010 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import atexit
19 import atexit
20 from Queue import Queue, Empty
20 from Queue import Queue, Empty
21 from subprocess import Popen
21 from subprocess import Popen
22 import signal
22 import signal
23 import sys
23 import sys
24 from threading import Thread
24 from threading import Thread
25 import time
25 import time
26 import logging
26
27
27 # System library imports.
28 # System library imports.
28 import zmq
29 import zmq
29 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq import POLLIN, POLLOUT, POLLERR
30 from zmq.eventloop import ioloop
31 from zmq.eventloop import ioloop
31
32
32 # Local imports.
33 # Local imports.
33 from IPython.utils import io
34 from IPython.utils import io
34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
36 from session import Session, Message
37 from session import Session, Message
37
38
38 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
39 # Constants and exceptions
40 # Constants and exceptions
40 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
41
42
42 class InvalidPortNumber(Exception):
43 class InvalidPortNumber(Exception):
43 pass
44 pass
44
45
45 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
46 # Utility functions
47 # Utility functions
47 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
48
49
49 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
50 # if they prove to have more generic utility
51 # if they prove to have more generic utility
51
52
52 def validate_string_list(lst):
53 def validate_string_list(lst):
53 """Validate that the input is a list of strings.
54 """Validate that the input is a list of strings.
54
55
55 Raises ValueError if not."""
56 Raises ValueError if not."""
56 if not isinstance(lst, list):
57 if not isinstance(lst, list):
57 raise ValueError('input %r must be a list' % lst)
58 raise ValueError('input %r must be a list' % lst)
58 for x in lst:
59 for x in lst:
59 if not isinstance(x, basestring):
60 if not isinstance(x, basestring):
60 raise ValueError('element %r in list must be a string' % x)
61 raise ValueError('element %r in list must be a string' % x)
61
62
62
63
63 def validate_string_dict(dct):
64 def validate_string_dict(dct):
64 """Validate that the input is a dict with string keys and values.
65 """Validate that the input is a dict with string keys and values.
65
66
66 Raises ValueError if not."""
67 Raises ValueError if not."""
67 for k,v in dct.iteritems():
68 for k,v in dct.iteritems():
68 if not isinstance(k, basestring):
69 if not isinstance(k, basestring):
69 raise ValueError('key %r in dict must be a string' % k)
70 raise ValueError('key %r in dict must be a string' % k)
70 if not isinstance(v, basestring):
71 if not isinstance(v, basestring):
71 raise ValueError('value %r in dict must be a string' % v)
72 raise ValueError('value %r in dict must be a string' % v)
72
73
73
74
74 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
75 # ZMQ Socket Channel classes
76 # ZMQ Socket Channel classes
76 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
77
78
78 class ZmqSocketChannel(Thread):
79 class ZmqSocketChannel(Thread):
79 """The base class for the channels that use ZMQ sockets.
80 """The base class for the channels that use ZMQ sockets.
80 """
81 """
81 context = None
82 context = None
82 session = None
83 session = None
83 socket = None
84 socket = None
84 ioloop = None
85 ioloop = None
85 iostate = None
86 iostate = None
86 _address = None
87 _address = None
87
88
88 def __init__(self, context, session, address):
89 def __init__(self, context, session, address):
89 """Create a channel
90 """Create a channel
90
91
91 Parameters
92 Parameters
92 ----------
93 ----------
93 context : :class:`zmq.Context`
94 context : :class:`zmq.Context`
94 The ZMQ context to use.
95 The ZMQ context to use.
95 session : :class:`session.Session`
96 session : :class:`session.Session`
96 The session to use.
97 The session to use.
97 address : tuple
98 address : tuple
98 Standard (ip, port) tuple that the kernel is listening on.
99 Standard (ip, port) tuple that the kernel is listening on.
99 """
100 """
100 super(ZmqSocketChannel, self).__init__()
101 super(ZmqSocketChannel, self).__init__()
101 self.daemon = True
102 self.daemon = True
102
103
103 self.context = context
104 self.context = context
104 self.session = session
105 self.session = session
105 if address[1] == 0:
106 if address[1] == 0:
106 message = 'The port number for a channel cannot be 0.'
107 message = 'The port number for a channel cannot be 0.'
107 raise InvalidPortNumber(message)
108 raise InvalidPortNumber(message)
108 self._address = address
109 self._address = address
109
110
110 def stop(self):
111 def stop(self):
111 """Stop the channel's activity.
112 """Stop the channel's activity.
112
113
113 This calls :method:`Thread.join` and returns when the thread
114 This calls :method:`Thread.join` and returns when the thread
114 terminates. :class:`RuntimeError` will be raised if
115 terminates. :class:`RuntimeError` will be raised if
115 :method:`self.start` is called again.
116 :method:`self.start` is called again.
116 """
117 """
117 self.join()
118 self.join()
118
119
119 @property
120 @property
120 def address(self):
121 def address(self):
121 """Get the channel's address as an (ip, port) tuple.
122 """Get the channel's address as an (ip, port) tuple.
122
123
123 By the default, the address is (localhost, 0), where 0 means a random
124 By the default, the address is (localhost, 0), where 0 means a random
124 port.
125 port.
125 """
126 """
126 return self._address
127 return self._address
127
128
128 def add_io_state(self, state):
129 def add_io_state(self, state):
129 """Add IO state to the eventloop.
130 """Add IO state to the eventloop.
130
131
131 Parameters
132 Parameters
132 ----------
133 ----------
133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 The IO state flag to set.
135 The IO state flag to set.
135
136
136 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 """
138 """
138 def add_io_state_callback():
139 def add_io_state_callback():
139 if not self.iostate & state:
140 if not self.iostate & state:
140 self.iostate = self.iostate | state
141 self.iostate = self.iostate | state
141 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.add_callback(add_io_state_callback)
143 self.ioloop.add_callback(add_io_state_callback)
143
144
144 def drop_io_state(self, state):
145 def drop_io_state(self, state):
145 """Drop IO state from the eventloop.
146 """Drop IO state from the eventloop.
146
147
147 Parameters
148 Parameters
148 ----------
149 ----------
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 The IO state flag to set.
151 The IO state flag to set.
151
152
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 """
154 """
154 def drop_io_state_callback():
155 def drop_io_state_callback():
155 if self.iostate & state:
156 if self.iostate & state:
156 self.iostate = self.iostate & (~state)
157 self.iostate = self.iostate & (~state)
157 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.add_callback(drop_io_state_callback)
159 self.ioloop.add_callback(drop_io_state_callback)
159
160
160
161
161 class XReqSocketChannel(ZmqSocketChannel):
162 class XReqSocketChannel(ZmqSocketChannel):
162 """The XREQ channel for issues request/replies to the kernel.
163 """The XREQ channel for issues request/replies to the kernel.
163 """
164 """
164
165
165 command_queue = None
166 command_queue = None
166
167
167 def __init__(self, context, session, address):
168 def __init__(self, context, session, address):
168 super(XReqSocketChannel, self).__init__(context, session, address)
169 super(XReqSocketChannel, self).__init__(context, session, address)
169 self.command_queue = Queue()
170 self.command_queue = Queue()
170 self.ioloop = ioloop.IOLoop()
171 self.ioloop = ioloop.IOLoop()
171
172
172 def run(self):
173 def run(self):
173 """The thread's main activity. Call start() instead."""
174 """The thread's main activity. Call start() instead."""
174 self.socket = self.context.socket(zmq.XREQ)
175 self.socket = self.context.socket(zmq.XREQ)
175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 self.socket.connect('tcp://%s:%i' % self.address)
177 self.socket.connect('tcp://%s:%i' % self.address)
177 self.iostate = POLLERR|POLLIN
178 self.iostate = POLLERR|POLLIN
178 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.iostate)
180 self.iostate)
180 self.ioloop.start()
181 self.ioloop.start()
181
182
182 def stop(self):
183 def stop(self):
183 self.ioloop.stop()
184 self.ioloop.stop()
184 super(XReqSocketChannel, self).stop()
185 super(XReqSocketChannel, self).stop()
185
186
186 def call_handlers(self, msg):
187 def call_handlers(self, msg):
187 """This method is called in the ioloop thread when a message arrives.
188 """This method is called in the ioloop thread when a message arrives.
188
189
189 Subclasses should override this method to handle incoming messages.
190 Subclasses should override this method to handle incoming messages.
190 It is important to remember that this method is called in the thread
191 It is important to remember that this method is called in the thread
191 so that some logic must be done to ensure that the application leve
192 so that some logic must be done to ensure that the application leve
192 handlers are called in the application thread.
193 handlers are called in the application thread.
193 """
194 """
194 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195
196
196 def execute(self, code, silent=False,
197 def execute(self, code, silent=False,
197 user_variables=None, user_expressions=None):
198 user_variables=None, user_expressions=None):
198 """Execute code in the kernel.
199 """Execute code in the kernel.
199
200
200 Parameters
201 Parameters
201 ----------
202 ----------
202 code : str
203 code : str
203 A string of Python code.
204 A string of Python code.
204
205
205 silent : bool, optional (default False)
206 silent : bool, optional (default False)
206 If set, the kernel will execute the code as quietly possible.
207 If set, the kernel will execute the code as quietly possible.
207
208
208 user_variables : list, optional
209 user_variables : list, optional
209 A list of variable names to pull from the user's namespace. They
210 A list of variable names to pull from the user's namespace. They
210 will come back as a dict with these names as keys and their
211 will come back as a dict with these names as keys and their
211 :func:`repr` as values.
212 :func:`repr` as values.
212
213
213 user_expressions : dict, optional
214 user_expressions : dict, optional
214 A dict with string keys and to pull from the user's
215 A dict with string keys and to pull from the user's
215 namespace. They will come back as a dict with these names as keys
216 namespace. They will come back as a dict with these names as keys
216 and their :func:`repr` as values.
217 and their :func:`repr` as values.
217
218
218 Returns
219 Returns
219 -------
220 -------
220 The msg_id of the message sent.
221 The msg_id of the message sent.
221 """
222 """
222 if user_variables is None:
223 if user_variables is None:
223 user_variables = []
224 user_variables = []
224 if user_expressions is None:
225 if user_expressions is None:
225 user_expressions = {}
226 user_expressions = {}
226
227
227 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
228 if not isinstance(code, basestring):
229 if not isinstance(code, basestring):
229 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
230 validate_string_list(user_variables)
231 validate_string_list(user_variables)
231 validate_string_dict(user_expressions)
232 validate_string_dict(user_expressions)
232
233
233 # Create class for content/msg creation. Related to, but possibly
234 # Create class for content/msg creation. Related to, but possibly
234 # not in Session.
235 # not in Session.
235 content = dict(code=code, silent=silent,
236 content = dict(code=code, silent=silent,
236 user_variables=user_variables,
237 user_variables=user_variables,
237 user_expressions=user_expressions)
238 user_expressions=user_expressions)
238 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
239 self._queue_request(msg)
240 self._queue_request(msg)
240 return msg['header']['msg_id']
241 return msg['header']['msg_id']
241
242
242 def complete(self, text, line, cursor_pos, block=None):
243 def complete(self, text, line, cursor_pos, block=None):
243 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
244
245
245 Parameters
246 Parameters
246 ----------
247 ----------
247 text : str
248 text : str
248 The text to complete.
249 The text to complete.
249 line : str
250 line : str
250 The full line of text that is the surrounding context for the
251 The full line of text that is the surrounding context for the
251 text to complete.
252 text to complete.
252 cursor_pos : int
253 cursor_pos : int
253 The position of the cursor in the line where the completion was
254 The position of the cursor in the line where the completion was
254 requested.
255 requested.
255 block : str, optional
256 block : str, optional
256 The full block of code in which the completion is being requested.
257 The full block of code in which the completion is being requested.
257
258
258 Returns
259 Returns
259 -------
260 -------
260 The msg_id of the message sent.
261 The msg_id of the message sent.
261 """
262 """
262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 msg = self.session.msg('complete_request', content)
264 msg = self.session.msg('complete_request', content)
264 self._queue_request(msg)
265 self._queue_request(msg)
265 return msg['header']['msg_id']
266 return msg['header']['msg_id']
266
267
267 def object_info(self, oname):
268 def object_info(self, oname):
268 """Get metadata information about an object.
269 """Get metadata information about an object.
269
270
270 Parameters
271 Parameters
271 ----------
272 ----------
272 oname : str
273 oname : str
273 A string specifying the object name.
274 A string specifying the object name.
274
275
275 Returns
276 Returns
276 -------
277 -------
277 The msg_id of the message sent.
278 The msg_id of the message sent.
278 """
279 """
279 content = dict(oname=oname)
280 content = dict(oname=oname)
280 msg = self.session.msg('object_info_request', content)
281 msg = self.session.msg('object_info_request', content)
281 self._queue_request(msg)
282 self._queue_request(msg)
282 return msg['header']['msg_id']
283 return msg['header']['msg_id']
283
284
284 def history(self, index=None, raw=False, output=True):
285 def history(self, index=None, raw=False, output=True):
285 """Get the history list.
286 """Get the history list.
286
287
287 Parameters
288 Parameters
288 ----------
289 ----------
289 index : n or (n1, n2) or None
290 index : n or (n1, n2) or None
290 If n, then the last entries. If a tuple, then all in
291 If n, then the last entries. If a tuple, then all in
291 range(n1, n2). If None, then all entries. Raises IndexError if
292 range(n1, n2). If None, then all entries. Raises IndexError if
292 the format of index is incorrect.
293 the format of index is incorrect.
293 raw : bool
294 raw : bool
294 If True, return the raw input.
295 If True, return the raw input.
295 output : bool
296 output : bool
296 If True, then return the output as well.
297 If True, then return the output as well.
297
298
298 Returns
299 Returns
299 -------
300 -------
300 The msg_id of the message sent.
301 The msg_id of the message sent.
301 """
302 """
302 content = dict(index=index, raw=raw, output=output)
303 content = dict(index=index, raw=raw, output=output)
303 msg = self.session.msg('history_request', content)
304 msg = self.session.msg('history_request', content)
304 self._queue_request(msg)
305 self._queue_request(msg)
305 return msg['header']['msg_id']
306 return msg['header']['msg_id']
306
307
307 def shutdown(self, restart=False):
308 def shutdown(self, restart=False):
308 """Request an immediate kernel shutdown.
309 """Request an immediate kernel shutdown.
309
310
310 Upon receipt of the (empty) reply, client code can safely assume that
311 Upon receipt of the (empty) reply, client code can safely assume that
311 the kernel has shut down and it's safe to forcefully terminate it if
312 the kernel has shut down and it's safe to forcefully terminate it if
312 it's still alive.
313 it's still alive.
313
314
314 The kernel will send the reply via a function registered with Python's
315 The kernel will send the reply via a function registered with Python's
315 atexit module, ensuring it's truly done as the kernel is done with all
316 atexit module, ensuring it's truly done as the kernel is done with all
316 normal operation.
317 normal operation.
317 """
318 """
318 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # this should probably be done that way, but for now this will do.
320 # this should probably be done that way, but for now this will do.
320 msg = self.session.msg('shutdown_request', {'restart':restart})
321 msg = self.session.msg('shutdown_request', {'restart':restart})
321 self._queue_request(msg)
322 self._queue_request(msg)
322 return msg['header']['msg_id']
323 return msg['header']['msg_id']
323
324
324 def _handle_events(self, socket, events):
325 def _handle_events(self, socket, events):
325 if events & POLLERR:
326 if events & POLLERR:
326 self._handle_err()
327 self._handle_err()
327 if events & POLLOUT:
328 if events & POLLOUT:
328 self._handle_send()
329 self._handle_send()
329 if events & POLLIN:
330 if events & POLLIN:
330 self._handle_recv()
331 self._handle_recv()
331
332
332 def _handle_recv(self):
333 def _handle_recv(self):
333 ident,msg = self.session.recv(self.socket, 0)
334 ident,msg = self.session.recv(self.socket, 0)
334 self.call_handlers(msg)
335 self.call_handlers(msg)
335
336
336 def _handle_send(self):
337 def _handle_send(self):
337 try:
338 try:
338 msg = self.command_queue.get(False)
339 msg = self.command_queue.get(False)
339 except Empty:
340 except Empty:
340 pass
341 pass
341 else:
342 else:
342 self.session.send(self.socket,msg)
343 self.session.send(self.socket,msg)
343 if self.command_queue.empty():
344 if self.command_queue.empty():
344 self.drop_io_state(POLLOUT)
345 self.drop_io_state(POLLOUT)
345
346
346 def _handle_err(self):
347 def _handle_err(self):
347 # We don't want to let this go silently, so eventually we should log.
348 # We don't want to let this go silently, so eventually we should log.
348 raise zmq.ZMQError()
349 raise zmq.ZMQError()
349
350
350 def _queue_request(self, msg):
351 def _queue_request(self, msg):
351 self.command_queue.put(msg)
352 self.command_queue.put(msg)
352 self.add_io_state(POLLOUT)
353 self.add_io_state(POLLOUT)
353
354
354
355
355 class SubSocketChannel(ZmqSocketChannel):
356 class SubSocketChannel(ZmqSocketChannel):
356 """The SUB channel which listens for messages that the kernel publishes.
357 """The SUB channel which listens for messages that the kernel publishes.
357 """
358 """
358
359
359 def __init__(self, context, session, address):
360 def __init__(self, context, session, address):
360 super(SubSocketChannel, self).__init__(context, session, address)
361 super(SubSocketChannel, self).__init__(context, session, address)
361 self.ioloop = ioloop.IOLoop()
362 self.ioloop = ioloop.IOLoop()
362
363
363 def run(self):
364 def run(self):
364 """The thread's main activity. Call start() instead."""
365 """The thread's main activity. Call start() instead."""
365 self.socket = self.context.socket(zmq.SUB)
366 self.socket = self.context.socket(zmq.SUB)
366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 self.socket.connect('tcp://%s:%i' % self.address)
369 self.socket.connect('tcp://%s:%i' % self.address)
369 self.iostate = POLLIN|POLLERR
370 self.iostate = POLLIN|POLLERR
370 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.iostate)
372 self.iostate)
372 self.ioloop.start()
373 self.ioloop.start()
373
374
374 def stop(self):
375 def stop(self):
375 self.ioloop.stop()
376 self.ioloop.stop()
376 super(SubSocketChannel, self).stop()
377 super(SubSocketChannel, self).stop()
377
378
378 def call_handlers(self, msg):
379 def call_handlers(self, msg):
379 """This method is called in the ioloop thread when a message arrives.
380 """This method is called in the ioloop thread when a message arrives.
380
381
381 Subclasses should override this method to handle incoming messages.
382 Subclasses should override this method to handle incoming messages.
382 It is important to remember that this method is called in the thread
383 It is important to remember that this method is called in the thread
383 so that some logic must be done to ensure that the application leve
384 so that some logic must be done to ensure that the application leve
384 handlers are called in the application thread.
385 handlers are called in the application thread.
385 """
386 """
386 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387
388
388 def flush(self, timeout=1.0):
389 def flush(self, timeout=1.0):
389 """Immediately processes all pending messages on the SUB channel.
390 """Immediately processes all pending messages on the SUB channel.
390
391
391 Callers should use this method to ensure that :method:`call_handlers`
392 Callers should use this method to ensure that :method:`call_handlers`
392 has been called for all messages that have been received on the
393 has been called for all messages that have been received on the
393 0MQ SUB socket of this channel.
394 0MQ SUB socket of this channel.
394
395
395 This method is thread safe.
396 This method is thread safe.
396
397
397 Parameters
398 Parameters
398 ----------
399 ----------
399 timeout : float, optional
400 timeout : float, optional
400 The maximum amount of time to spend flushing, in seconds. The
401 The maximum amount of time to spend flushing, in seconds. The
401 default is one second.
402 default is one second.
402 """
403 """
403 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # gets to perform at least one full poll.
405 # gets to perform at least one full poll.
405 stop_time = time.time() + timeout
406 stop_time = time.time() + timeout
406 for i in xrange(2):
407 for i in xrange(2):
407 self._flushed = False
408 self._flushed = False
408 self.ioloop.add_callback(self._flush)
409 self.ioloop.add_callback(self._flush)
409 while not self._flushed and time.time() < stop_time:
410 while not self._flushed and time.time() < stop_time:
410 time.sleep(0.01)
411 time.sleep(0.01)
411
412
412 def _handle_events(self, socket, events):
413 def _handle_events(self, socket, events):
413 # Turn on and off POLLOUT depending on if we have made a request
414 # Turn on and off POLLOUT depending on if we have made a request
414 if events & POLLERR:
415 if events & POLLERR:
415 self._handle_err()
416 self._handle_err()
416 if events & POLLIN:
417 if events & POLLIN:
417 self._handle_recv()
418 self._handle_recv()
418
419
419 def _handle_err(self):
420 def _handle_err(self):
420 # We don't want to let this go silently, so eventually we should log.
421 # We don't want to let this go silently, so eventually we should log.
421 raise zmq.ZMQError()
422 raise zmq.ZMQError()
422
423
423 def _handle_recv(self):
424 def _handle_recv(self):
424 # Get all of the messages we can
425 # Get all of the messages we can
425 while True:
426 while True:
426 try:
427 try:
427 ident,msg = self.session.recv(self.socket)
428 ident,msg = self.session.recv(self.socket)
428 except zmq.ZMQError:
429 except zmq.ZMQError:
429 # Check the errno?
430 # Check the errno?
430 # Will this trigger POLLERR?
431 # Will this trigger POLLERR?
431 break
432 break
432 else:
433 else:
433 if msg is None:
434 if msg is None:
434 break
435 break
435 self.call_handlers(msg)
436 self.call_handlers(msg)
436
437
437 def _flush(self):
438 def _flush(self):
438 """Callback for :method:`self.flush`."""
439 """Callback for :method:`self.flush`."""
439 self._flushed = True
440 self._flushed = True
440
441
441
442
442 class RepSocketChannel(ZmqSocketChannel):
443 class RepSocketChannel(ZmqSocketChannel):
443 """A reply channel to handle raw_input requests that the kernel makes."""
444 """A reply channel to handle raw_input requests that the kernel makes."""
444
445
445 msg_queue = None
446 msg_queue = None
446
447
447 def __init__(self, context, session, address):
448 def __init__(self, context, session, address):
448 super(RepSocketChannel, self).__init__(context, session, address)
449 super(RepSocketChannel, self).__init__(context, session, address)
449 self.ioloop = ioloop.IOLoop()
450 self.ioloop = ioloop.IOLoop()
450 self.msg_queue = Queue()
451 self.msg_queue = Queue()
451
452
452 def run(self):
453 def run(self):
453 """The thread's main activity. Call start() instead."""
454 """The thread's main activity. Call start() instead."""
454 self.socket = self.context.socket(zmq.XREQ)
455 self.socket = self.context.socket(zmq.XREQ)
455 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
456 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
456 self.socket.connect('tcp://%s:%i' % self.address)
457 self.socket.connect('tcp://%s:%i' % self.address)
457 self.iostate = POLLERR|POLLIN
458 self.iostate = POLLERR|POLLIN
458 self.ioloop.add_handler(self.socket, self._handle_events,
459 self.ioloop.add_handler(self.socket, self._handle_events,
459 self.iostate)
460 self.iostate)
460 self.ioloop.start()
461 self.ioloop.start()
461
462
462 def stop(self):
463 def stop(self):
463 self.ioloop.stop()
464 self.ioloop.stop()
464 super(RepSocketChannel, self).stop()
465 super(RepSocketChannel, self).stop()
465
466
466 def call_handlers(self, msg):
467 def call_handlers(self, msg):
467 """This method is called in the ioloop thread when a message arrives.
468 """This method is called in the ioloop thread when a message arrives.
468
469
469 Subclasses should override this method to handle incoming messages.
470 Subclasses should override this method to handle incoming messages.
470 It is important to remember that this method is called in the thread
471 It is important to remember that this method is called in the thread
471 so that some logic must be done to ensure that the application leve
472 so that some logic must be done to ensure that the application leve
472 handlers are called in the application thread.
473 handlers are called in the application thread.
473 """
474 """
474 raise NotImplementedError('call_handlers must be defined in a subclass.')
475 raise NotImplementedError('call_handlers must be defined in a subclass.')
475
476
476 def input(self, string):
477 def input(self, string):
477 """Send a string of raw input to the kernel."""
478 """Send a string of raw input to the kernel."""
478 content = dict(value=string)
479 content = dict(value=string)
479 msg = self.session.msg('input_reply', content)
480 msg = self.session.msg('input_reply', content)
480 self._queue_reply(msg)
481 self._queue_reply(msg)
481
482
482 def _handle_events(self, socket, events):
483 def _handle_events(self, socket, events):
483 if events & POLLERR:
484 if events & POLLERR:
484 self._handle_err()
485 self._handle_err()
485 if events & POLLOUT:
486 if events & POLLOUT:
486 self._handle_send()
487 self._handle_send()
487 if events & POLLIN:
488 if events & POLLIN:
488 self._handle_recv()
489 self._handle_recv()
489
490
490 def _handle_recv(self):
491 def _handle_recv(self):
491 ident,msg = self.session.recv(self.socket, 0)
492 ident,msg = self.session.recv(self.socket, 0)
492 self.call_handlers(msg)
493 self.call_handlers(msg)
493
494
494 def _handle_send(self):
495 def _handle_send(self):
495 try:
496 try:
496 msg = self.msg_queue.get(False)
497 msg = self.msg_queue.get(False)
497 except Empty:
498 except Empty:
498 pass
499 pass
499 else:
500 else:
500 self.session.send(self.socket,msg)
501 self.session.send(self.socket,msg)
501 if self.msg_queue.empty():
502 if self.msg_queue.empty():
502 self.drop_io_state(POLLOUT)
503 self.drop_io_state(POLLOUT)
503
504
504 def _handle_err(self):
505 def _handle_err(self):
505 # We don't want to let this go silently, so eventually we should log.
506 # We don't want to let this go silently, so eventually we should log.
506 raise zmq.ZMQError()
507 raise zmq.ZMQError()
507
508
508 def _queue_reply(self, msg):
509 def _queue_reply(self, msg):
509 self.msg_queue.put(msg)
510 self.msg_queue.put(msg)
510 self.add_io_state(POLLOUT)
511 self.add_io_state(POLLOUT)
511
512
512
513
513 class HBSocketChannel(ZmqSocketChannel):
514 class HBSocketChannel(ZmqSocketChannel):
514 """The heartbeat channel which monitors the kernel heartbeat.
515 """The heartbeat channel which monitors the kernel heartbeat.
515
516
516 Note that the heartbeat channel is paused by default. As long as you start
517 Note that the heartbeat channel is paused by default. As long as you start
517 this channel, the kernel manager will ensure that it is paused and un-paused
518 this channel, the kernel manager will ensure that it is paused and un-paused
518 as appropriate.
519 as appropriate.
519 """
520 """
520
521
521 time_to_dead = 3.0
522 time_to_dead = 3.0
522 socket = None
523 socket = None
523 poller = None
524 poller = None
524 _running = None
525 _running = None
525 _pause = None
526 _pause = None
526
527
527 def __init__(self, context, session, address):
528 def __init__(self, context, session, address):
528 super(HBSocketChannel, self).__init__(context, session, address)
529 super(HBSocketChannel, self).__init__(context, session, address)
529 self._running = False
530 self._running = False
530 self._pause = True
531 self._pause = True
531
532
532 def _create_socket(self):
533 def _create_socket(self):
533 self.socket = self.context.socket(zmq.REQ)
534 self.socket = self.context.socket(zmq.REQ)
534 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
535 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
535 self.socket.connect('tcp://%s:%i' % self.address)
536 self.socket.connect('tcp://%s:%i' % self.address)
536 self.poller = zmq.Poller()
537 self.poller = zmq.Poller()
537 self.poller.register(self.socket, zmq.POLLIN)
538 self.poller.register(self.socket, zmq.POLLIN)
538
539
539 def run(self):
540 def run(self):
540 """The thread's main activity. Call start() instead."""
541 """The thread's main activity. Call start() instead."""
541 self._create_socket()
542 self._create_socket()
542 self._running = True
543 self._running = True
543 while self._running:
544 while self._running:
544 if self._pause:
545 if self._pause:
545 time.sleep(self.time_to_dead)
546 time.sleep(self.time_to_dead)
546 else:
547 else:
547 since_last_heartbeat = 0.0
548 since_last_heartbeat = 0.0
548 request_time = time.time()
549 request_time = time.time()
549 try:
550 try:
550 #io.rprint('Ping from HB channel') # dbg
551 #io.rprint('Ping from HB channel') # dbg
551 self.socket.send(b'ping')
552 self.socket.send(b'ping')
552 except zmq.ZMQError, e:
553 except zmq.ZMQError, e:
553 #io.rprint('*** HB Error:', e) # dbg
554 #io.rprint('*** HB Error:', e) # dbg
554 if e.errno == zmq.EFSM:
555 if e.errno == zmq.EFSM:
555 #io.rprint('sleep...', self.time_to_dead) # dbg
556 #io.rprint('sleep...', self.time_to_dead) # dbg
556 time.sleep(self.time_to_dead)
557 time.sleep(self.time_to_dead)
557 self._create_socket()
558 self._create_socket()
558 else:
559 else:
559 raise
560 raise
560 else:
561 else:
561 while True:
562 while True:
562 try:
563 try:
563 self.socket.recv(zmq.NOBLOCK)
564 self.socket.recv(zmq.NOBLOCK)
564 except zmq.ZMQError, e:
565 except zmq.ZMQError, e:
565 #io.rprint('*** HB Error 2:', e) # dbg
566 #io.rprint('*** HB Error 2:', e) # dbg
566 if e.errno == zmq.EAGAIN:
567 if e.errno == zmq.EAGAIN:
567 before_poll = time.time()
568 before_poll = time.time()
568 until_dead = self.time_to_dead - (before_poll -
569 until_dead = self.time_to_dead - (before_poll -
569 request_time)
570 request_time)
570
571
571 # When the return value of poll() is an empty
572 # When the return value of poll() is an empty
572 # list, that is when things have gone wrong
573 # list, that is when things have gone wrong
573 # (zeromq bug). As long as it is not an empty
574 # (zeromq bug). As long as it is not an empty
574 # list, poll is working correctly even if it
575 # list, poll is working correctly even if it
575 # returns quickly. Note: poll timeout is in
576 # returns quickly. Note: poll timeout is in
576 # milliseconds.
577 # milliseconds.
577 self.poller.poll(1000*until_dead)
578 self.poller.poll(1000*until_dead)
578
579
579 since_last_heartbeat = time.time()-request_time
580 since_last_heartbeat = time.time()-request_time
580 if since_last_heartbeat > self.time_to_dead:
581 if since_last_heartbeat > self.time_to_dead:
581 self.call_handlers(since_last_heartbeat)
582 self.call_handlers(since_last_heartbeat)
582 break
583 break
583 else:
584 else:
584 # FIXME: We should probably log this instead.
585 # FIXME: We should probably log this instead.
585 raise
586 raise
586 else:
587 else:
587 until_dead = self.time_to_dead - (time.time() -
588 until_dead = self.time_to_dead - (time.time() -
588 request_time)
589 request_time)
589 if until_dead > 0.0:
590 if until_dead > 0.0:
590 #io.rprint('sleep...', self.time_to_dead) # dbg
591 #io.rprint('sleep...', self.time_to_dead) # dbg
591 time.sleep(until_dead)
592 time.sleep(until_dead)
592 break
593 break
593
594
594 def pause(self):
595 def pause(self):
595 """Pause the heartbeat."""
596 """Pause the heartbeat."""
596 self._pause = True
597 self._pause = True
597
598
598 def unpause(self):
599 def unpause(self):
599 """Unpause the heartbeat."""
600 """Unpause the heartbeat."""
600 self._pause = False
601 self._pause = False
601
602
602 def is_beating(self):
603 def is_beating(self):
603 """Is the heartbeat running and not paused."""
604 """Is the heartbeat running and not paused."""
604 if self.is_alive() and not self._pause:
605 if self.is_alive() and not self._pause:
605 return True
606 return True
606 else:
607 else:
607 return False
608 return False
608
609
609 def stop(self):
610 def stop(self):
610 self._running = False
611 self._running = False
611 super(HBSocketChannel, self).stop()
612 super(HBSocketChannel, self).stop()
612
613
613 def call_handlers(self, since_last_heartbeat):
614 def call_handlers(self, since_last_heartbeat):
614 """This method is called in the ioloop thread when a message arrives.
615 """This method is called in the ioloop thread when a message arrives.
615
616
616 Subclasses should override this method to handle incoming messages.
617 Subclasses should override this method to handle incoming messages.
617 It is important to remember that this method is called in the thread
618 It is important to remember that this method is called in the thread
618 so that some logic must be done to ensure that the application leve
619 so that some logic must be done to ensure that the application leve
619 handlers are called in the application thread.
620 handlers are called in the application thread.
620 """
621 """
621 raise NotImplementedError('call_handlers must be defined in a subclass.')
622 raise NotImplementedError('call_handlers must be defined in a subclass.')
622
623
623
624
624 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
625 # Main kernel manager class
626 # Main kernel manager class
626 #-----------------------------------------------------------------------------
627 #-----------------------------------------------------------------------------
627
628
628 class KernelManager(HasTraits):
629 class KernelManager(HasTraits):
629 """ Manages a kernel for a frontend.
630 """ Manages a kernel for a frontend.
630
631
631 The SUB channel is for the frontend to receive messages published by the
632 The SUB channel is for the frontend to receive messages published by the
632 kernel.
633 kernel.
633
634
634 The REQ channel is for the frontend to make requests of the kernel.
635 The REQ channel is for the frontend to make requests of the kernel.
635
636
636 The REP channel is for the kernel to request stdin (raw_input) from the
637 The REP channel is for the kernel to request stdin (raw_input) from the
637 frontend.
638 frontend.
638 """
639 """
639 # The PyZMQ Context to use for communication with the kernel.
640 # The PyZMQ Context to use for communication with the kernel.
640 context = Instance(zmq.Context,(),{})
641 context = Instance(zmq.Context,(),{})
641
642
642 # The Session to use for communication with the kernel.
643 # The Session to use for communication with the kernel.
643 session = Instance(Session,(),{})
644 session = Instance(Session,(),{})
644
645
645 # The kernel process with which the KernelManager is communicating.
646 # The kernel process with which the KernelManager is communicating.
646 kernel = Instance(Popen)
647 kernel = Instance(Popen)
647
648
648 # The addresses for the communication channels.
649 # The addresses for the communication channels.
649 xreq_address = TCPAddress((LOCALHOST, 0))
650 xreq_address = TCPAddress((LOCALHOST, 0))
650 sub_address = TCPAddress((LOCALHOST, 0))
651 sub_address = TCPAddress((LOCALHOST, 0))
651 rep_address = TCPAddress((LOCALHOST, 0))
652 rep_address = TCPAddress((LOCALHOST, 0))
652 hb_address = TCPAddress((LOCALHOST, 0))
653 hb_address = TCPAddress((LOCALHOST, 0))
653
654
654 # The classes to use for the various channels.
655 # The classes to use for the various channels.
655 xreq_channel_class = Type(XReqSocketChannel)
656 xreq_channel_class = Type(XReqSocketChannel)
656 sub_channel_class = Type(SubSocketChannel)
657 sub_channel_class = Type(SubSocketChannel)
657 rep_channel_class = Type(RepSocketChannel)
658 rep_channel_class = Type(RepSocketChannel)
658 hb_channel_class = Type(HBSocketChannel)
659 hb_channel_class = Type(HBSocketChannel)
659
660
660 # Protected traits.
661 # Protected traits.
661 _launch_args = Any
662 _launch_args = Any
662 _xreq_channel = Any
663 _xreq_channel = Any
663 _sub_channel = Any
664 _sub_channel = Any
664 _rep_channel = Any
665 _rep_channel = Any
665 _hb_channel = Any
666 _hb_channel = Any
666
667
667 def __init__(self, **kwargs):
668 def __init__(self, **kwargs):
668 super(KernelManager, self).__init__(**kwargs)
669 super(KernelManager, self).__init__(**kwargs)
669 # Uncomment this to try closing the context.
670 # Uncomment this to try closing the context.
670 # atexit.register(self.context.close)
671 # atexit.register(self.context.close)
671
672
672 #--------------------------------------------------------------------------
673 #--------------------------------------------------------------------------
673 # Channel management methods:
674 # Channel management methods:
674 #--------------------------------------------------------------------------
675 #--------------------------------------------------------------------------
675
676
676 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
677 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
677 """Starts the channels for this kernel.
678 """Starts the channels for this kernel.
678
679
679 This will create the channels if they do not exist and then start
680 This will create the channels if they do not exist and then start
680 them. If port numbers of 0 are being used (random ports) then you
681 them. If port numbers of 0 are being used (random ports) then you
681 must first call :method:`start_kernel`. If the channels have been
682 must first call :method:`start_kernel`. If the channels have been
682 stopped and you call this, :class:`RuntimeError` will be raised.
683 stopped and you call this, :class:`RuntimeError` will be raised.
683 """
684 """
684 if xreq:
685 if xreq:
685 self.xreq_channel.start()
686 self.xreq_channel.start()
686 if sub:
687 if sub:
687 self.sub_channel.start()
688 self.sub_channel.start()
688 if rep:
689 if rep:
689 self.rep_channel.start()
690 self.rep_channel.start()
690 if hb:
691 if hb:
691 self.hb_channel.start()
692 self.hb_channel.start()
692
693
693 def stop_channels(self):
694 def stop_channels(self):
694 """Stops all the running channels for this kernel.
695 """Stops all the running channels for this kernel.
695 """
696 """
696 if self.xreq_channel.is_alive():
697 if self.xreq_channel.is_alive():
697 self.xreq_channel.stop()
698 self.xreq_channel.stop()
698 if self.sub_channel.is_alive():
699 if self.sub_channel.is_alive():
699 self.sub_channel.stop()
700 self.sub_channel.stop()
700 if self.rep_channel.is_alive():
701 if self.rep_channel.is_alive():
701 self.rep_channel.stop()
702 self.rep_channel.stop()
702 if self.hb_channel.is_alive():
703 if self.hb_channel.is_alive():
703 self.hb_channel.stop()
704 self.hb_channel.stop()
704
705
705 @property
706 @property
706 def channels_running(self):
707 def channels_running(self):
707 """Are any of the channels created and running?"""
708 """Are any of the channels created and running?"""
708 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
709 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
709 self.rep_channel.is_alive() or self.hb_channel.is_alive())
710 self.rep_channel.is_alive() or self.hb_channel.is_alive())
710
711
711 #--------------------------------------------------------------------------
712 #--------------------------------------------------------------------------
712 # Kernel process management methods:
713 # Kernel process management methods:
713 #--------------------------------------------------------------------------
714 #--------------------------------------------------------------------------
714
715
715 def start_kernel(self, **kw):
716 def start_kernel(self, **kw):
716 """Starts a kernel process and configures the manager to use it.
717 """Starts a kernel process and configures the manager to use it.
717
718
718 If random ports (port=0) are being used, this method must be called
719 If random ports (port=0) are being used, this method must be called
719 before the channels are created.
720 before the channels are created.
720
721
721 Parameters:
722 Parameters:
722 -----------
723 -----------
723 ipython : bool, optional (default True)
724 ipython : bool, optional (default True)
724 Whether to use an IPython kernel instead of a plain Python kernel.
725 Whether to use an IPython kernel instead of a plain Python kernel.
725 """
726 """
726 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
727 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
727 self.rep_address, self.hb_address
728 self.rep_address, self.hb_address
728 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
729 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
729 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
730 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
730 raise RuntimeError("Can only launch a kernel on a local interface. "
731 raise RuntimeError("Can only launch a kernel on a local interface. "
731 "Make sure that the '*_address' attributes are "
732 "Make sure that the '*_address' attributes are "
732 "configured properly. "
733 "configured properly. "
733 "Currently valid addresses are: %s"%LOCAL_IPS
734 "Currently valid addresses are: %s"%LOCAL_IPS
734 )
735 )
735
736
736 self._launch_args = kw.copy()
737 self._launch_args = kw.copy()
737 if kw.pop('ipython', True):
738 if kw.pop('ipython', True):
738 from ipkernel import launch_kernel
739 from ipkernel import launch_kernel
739 else:
740 else:
740 from pykernel import launch_kernel
741 from pykernel import launch_kernel
741 self.kernel, xrep, pub, req, _hb = launch_kernel(
742 self.kernel, xrep, pub, req, _hb = launch_kernel(
742 xrep_port=xreq[1], pub_port=sub[1],
743 xrep_port=xreq[1], pub_port=sub[1],
743 req_port=rep[1], hb_port=hb[1], **kw)
744 req_port=rep[1], hb_port=hb[1], **kw)
744 self.xreq_address = (xreq[0], xrep)
745 self.xreq_address = (xreq[0], xrep)
745 self.sub_address = (sub[0], pub)
746 self.sub_address = (sub[0], pub)
746 self.rep_address = (rep[0], req)
747 self.rep_address = (rep[0], req)
747 self.hb_address = (hb[0], _hb)
748 self.hb_address = (hb[0], _hb)
748
749
749 def shutdown_kernel(self, restart=False):
750 def shutdown_kernel(self, restart=False):
750 """ Attempts to the stop the kernel process cleanly. If the kernel
751 """ Attempts to the stop the kernel process cleanly. If the kernel
751 cannot be stopped, it is killed, if possible.
752 cannot be stopped, it is killed, if possible.
752 """
753 """
753 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
754 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
754 if sys.platform == 'win32':
755 if sys.platform == 'win32':
755 self.kill_kernel()
756 self.kill_kernel()
756 return
757 return
757
758
758 # Pause the heart beat channel if it exists.
759 # Pause the heart beat channel if it exists.
759 if self._hb_channel is not None:
760 if self._hb_channel is not None:
760 self._hb_channel.pause()
761 self._hb_channel.pause()
761
762
762 # Don't send any additional kernel kill messages immediately, to give
763 # Don't send any additional kernel kill messages immediately, to give
763 # the kernel a chance to properly execute shutdown actions. Wait for at
764 # the kernel a chance to properly execute shutdown actions. Wait for at
764 # most 1s, checking every 0.1s.
765 # most 1s, checking every 0.1s.
765 self.xreq_channel.shutdown(restart=restart)
766 self.xreq_channel.shutdown(restart=restart)
766 for i in range(10):
767 for i in range(10):
767 if self.is_alive:
768 if self.is_alive:
768 time.sleep(0.1)
769 time.sleep(0.1)
769 else:
770 else:
770 break
771 break
771 else:
772 else:
772 # OK, we've waited long enough.
773 # OK, we've waited long enough.
773 if self.has_kernel:
774 if self.has_kernel:
774 self.kill_kernel()
775 self.kill_kernel()
775
776
776 def restart_kernel(self, now=False):
777 def restart_kernel(self, now=False):
777 """Restarts a kernel with the same arguments that were used to launch
778 """Restarts a kernel with the same arguments that were used to launch
778 it. If the old kernel was launched with random ports, the same ports
779 it. If the old kernel was launched with random ports, the same ports
779 will be used for the new kernel.
780 will be used for the new kernel.
780
781
781 Parameters
782 Parameters
782 ----------
783 ----------
783 now : bool, optional
784 now : bool, optional
784 If True, the kernel is forcefully restarted *immediately*, without
785 If True, the kernel is forcefully restarted *immediately*, without
785 having a chance to do any cleanup action. Otherwise the kernel is
786 having a chance to do any cleanup action. Otherwise the kernel is
786 given 1s to clean up before a forceful restart is issued.
787 given 1s to clean up before a forceful restart is issued.
787
788
788 In all cases the kernel is restarted, the only difference is whether
789 In all cases the kernel is restarted, the only difference is whether
789 it is given a chance to perform a clean shutdown or not.
790 it is given a chance to perform a clean shutdown or not.
790 """
791 """
791 if self._launch_args is None:
792 if self._launch_args is None:
792 raise RuntimeError("Cannot restart the kernel. "
793 raise RuntimeError("Cannot restart the kernel. "
793 "No previous call to 'start_kernel'.")
794 "No previous call to 'start_kernel'.")
794 else:
795 else:
795 if self.has_kernel:
796 if self.has_kernel:
796 if now:
797 if now:
797 self.kill_kernel()
798 self.kill_kernel()
798 else:
799 else:
799 self.shutdown_kernel(restart=True)
800 self.shutdown_kernel(restart=True)
800 self.start_kernel(**self._launch_args)
801 self.start_kernel(**self._launch_args)
801
802
802 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
803 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
803 # unless there is some delay here.
804 # unless there is some delay here.
804 if sys.platform == 'win32':
805 if sys.platform == 'win32':
805 time.sleep(0.2)
806 time.sleep(0.2)
806
807
807 @property
808 @property
808 def has_kernel(self):
809 def has_kernel(self):
809 """Returns whether a kernel process has been specified for the kernel
810 """Returns whether a kernel process has been specified for the kernel
810 manager.
811 manager.
811 """
812 """
812 return self.kernel is not None
813 return self.kernel is not None
813
814
814 def kill_kernel(self):
815 def kill_kernel(self):
815 """ Kill the running kernel. """
816 """ Kill the running kernel. """
816 if self.has_kernel:
817 if self.has_kernel:
817 # Pause the heart beat channel if it exists.
818 # Pause the heart beat channel if it exists.
818 if self._hb_channel is not None:
819 if self._hb_channel is not None:
819 self._hb_channel.pause()
820 self._hb_channel.pause()
820
821
821 # Attempt to kill the kernel.
822 # Attempt to kill the kernel.
822 try:
823 try:
823 self.kernel.kill()
824 self.kernel.kill()
824 except OSError, e:
825 except OSError, e:
825 # In Windows, we will get an Access Denied error if the process
826 # In Windows, we will get an Access Denied error if the process
826 # has already terminated. Ignore it.
827 # has already terminated. Ignore it.
827 if not (sys.platform == 'win32' and e.winerror == 5):
828 if not (sys.platform == 'win32' and e.winerror == 5):
828 raise
829 raise
829 self.kernel = None
830 self.kernel = None
830 else:
831 else:
831 raise RuntimeError("Cannot kill kernel. No kernel is running!")
832 raise RuntimeError("Cannot kill kernel. No kernel is running!")
832
833
833 def interrupt_kernel(self):
834 def interrupt_kernel(self):
834 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
835 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
835 well supported on all platforms.
836 well supported on all platforms.
836 """
837 """
837 if self.has_kernel:
838 if self.has_kernel:
838 if sys.platform == 'win32':
839 if sys.platform == 'win32':
839 from parentpoller import ParentPollerWindows as Poller
840 from parentpoller import ParentPollerWindows as Poller
840 Poller.send_interrupt(self.kernel.win32_interrupt_event)
841 Poller.send_interrupt(self.kernel.win32_interrupt_event)
841 else:
842 else:
842 self.kernel.send_signal(signal.SIGINT)
843 self.kernel.send_signal(signal.SIGINT)
843 else:
844 else:
844 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
845 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
845
846
846 def signal_kernel(self, signum):
847 def signal_kernel(self, signum):
847 """ Sends a signal to the kernel. Note that since only SIGTERM is
848 """ Sends a signal to the kernel. Note that since only SIGTERM is
848 supported on Windows, this function is only useful on Unix systems.
849 supported on Windows, this function is only useful on Unix systems.
849 """
850 """
850 if self.has_kernel:
851 if self.has_kernel:
851 self.kernel.send_signal(signum)
852 self.kernel.send_signal(signum)
852 else:
853 else:
853 raise RuntimeError("Cannot signal kernel. No kernel is running!")
854 raise RuntimeError("Cannot signal kernel. No kernel is running!")
854
855
855 @property
856 @property
856 def is_alive(self):
857 def is_alive(self):
857 """Is the kernel process still running?"""
858 """Is the kernel process still running?"""
858 # FIXME: not using a heartbeat means this method is broken for any
859 # FIXME: not using a heartbeat means this method is broken for any
859 # remote kernel, it's only capable of handling local kernels.
860 # remote kernel, it's only capable of handling local kernels.
860 if self.has_kernel:
861 if self.has_kernel:
861 if self.kernel.poll() is None:
862 if self.kernel.poll() is None:
862 return True
863 return True
863 else:
864 else:
864 return False
865 return False
865 else:
866 else:
866 # We didn't start the kernel with this KernelManager so we don't
867 # We didn't start the kernel with this KernelManager so we don't
867 # know if it is running. We should use a heartbeat for this case.
868 # know if it is running. We should use a heartbeat for this case.
868 return True
869 return True
869
870
870 #--------------------------------------------------------------------------
871 #--------------------------------------------------------------------------
871 # Channels used for communication with the kernel:
872 # Channels used for communication with the kernel:
872 #--------------------------------------------------------------------------
873 #--------------------------------------------------------------------------
873
874
874 @property
875 @property
875 def xreq_channel(self):
876 def xreq_channel(self):
876 """Get the REQ socket channel object to make requests of the kernel."""
877 """Get the REQ socket channel object to make requests of the kernel."""
877 if self._xreq_channel is None:
878 if self._xreq_channel is None:
878 self._xreq_channel = self.xreq_channel_class(self.context,
879 self._xreq_channel = self.xreq_channel_class(self.context,
879 self.session,
880 self.session,
880 self.xreq_address)
881 self.xreq_address)
881 return self._xreq_channel
882 return self._xreq_channel
882
883
883 @property
884 @property
884 def sub_channel(self):
885 def sub_channel(self):
885 """Get the SUB socket channel object."""
886 """Get the SUB socket channel object."""
886 if self._sub_channel is None:
887 if self._sub_channel is None:
887 self._sub_channel = self.sub_channel_class(self.context,
888 self._sub_channel = self.sub_channel_class(self.context,
888 self.session,
889 self.session,
889 self.sub_address)
890 self.sub_address)
890 return self._sub_channel
891 return self._sub_channel
891
892
892 @property
893 @property
893 def rep_channel(self):
894 def rep_channel(self):
894 """Get the REP socket channel object to handle stdin (raw_input)."""
895 """Get the REP socket channel object to handle stdin (raw_input)."""
895 if self._rep_channel is None:
896 if self._rep_channel is None:
896 self._rep_channel = self.rep_channel_class(self.context,
897 self._rep_channel = self.rep_channel_class(self.context,
897 self.session,
898 self.session,
898 self.rep_address)
899 self.rep_address)
899 return self._rep_channel
900 return self._rep_channel
900
901
901 @property
902 @property
902 def hb_channel(self):
903 def hb_channel(self):
903 """Get the REP socket channel object to handle stdin (raw_input)."""
904 """Get the REP socket channel object to handle stdin (raw_input)."""
904 if self._hb_channel is None:
905 if self._hb_channel is None:
905 self._hb_channel = self.hb_channel_class(self.context,
906 self._hb_channel = self.hb_channel_class(self.context,
906 self.session,
907 self.session,
907 self.hb_address)
908 self.hb_address)
908 return self._hb_channel
909 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now