##// END OF EJS Templates
* Fixed heartbeat thread not stopping cleanly....
epatters -
Show More
@@ -1,715 +1,723 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 from threading import Thread
23 from threading import Thread
24 import time
24 import time
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
29 from zmq.eventloop import ioloop
30
30
31 # Local imports.
31 # Local imports.
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
33 from session import Session
33 from session import Session
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Constants and exceptions
36 # Constants and exceptions
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39 LOCALHOST = '127.0.0.1'
39 LOCALHOST = '127.0.0.1'
40
40
41 class InvalidPortNumber(Exception):
41 class InvalidPortNumber(Exception):
42 pass
42 pass
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # ZMQ Socket Channel classes
45 # ZMQ Socket Channel classes
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 class ZmqSocketChannel(Thread):
48 class ZmqSocketChannel(Thread):
49 """The base class for the channels that use ZMQ sockets.
49 """The base class for the channels that use ZMQ sockets.
50 """
50 """
51 context = None
51 context = None
52 session = None
52 session = None
53 socket = None
53 socket = None
54 ioloop = None
54 ioloop = None
55 iostate = None
55 iostate = None
56 _address = None
56 _address = None
57
57
58 def __init__(self, context, session, address):
58 def __init__(self, context, session, address):
59 """Create a channel
59 """Create a channel
60
60
61 Parameters
61 Parameters
62 ----------
62 ----------
63 context : :class:`zmq.Context`
63 context : :class:`zmq.Context`
64 The ZMQ context to use.
64 The ZMQ context to use.
65 session : :class:`session.Session`
65 session : :class:`session.Session`
66 The session to use.
66 The session to use.
67 address : tuple
67 address : tuple
68 Standard (ip, port) tuple that the kernel is listening on.
68 Standard (ip, port) tuple that the kernel is listening on.
69 """
69 """
70 super(ZmqSocketChannel, self).__init__()
70 super(ZmqSocketChannel, self).__init__()
71 self.daemon = True
71 self.daemon = True
72
72
73 self.context = context
73 self.context = context
74 self.session = session
74 self.session = session
75 if address[1] == 0:
75 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
76 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
77 raise InvalidPortNumber(message)
78 self._address = address
78 self._address = address
79
79
80 def stop(self):
80 def stop(self):
81 """Stop the channel's activity.
81 """Stop the channel's activity.
82
82
83 This calls :method:`Thread.join` and returns when the thread
83 This calls :method:`Thread.join` and returns when the thread
84 terminates. :class:`RuntimeError` will be raised if
84 terminates. :class:`RuntimeError` will be raised if
85 :method:`self.start` is called again.
85 :method:`self.start` is called again.
86 """
86 """
87 self.join()
87 self.join()
88
88
89 @property
89 @property
90 def address(self):
90 def address(self):
91 """Get the channel's address as an (ip, port) tuple.
91 """Get the channel's address as an (ip, port) tuple.
92
92
93 By the default, the address is (localhost, 0), where 0 means a random
93 By the default, the address is (localhost, 0), where 0 means a random
94 port.
94 port.
95 """
95 """
96 return self._address
96 return self._address
97
97
98 def add_io_state(self, state):
98 def add_io_state(self, state):
99 """Add IO state to the eventloop.
99 """Add IO state to the eventloop.
100
100
101 Parameters
101 Parameters
102 ----------
102 ----------
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
103 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
104 The IO state flag to set.
104 The IO state flag to set.
105
105
106 This is thread safe as it uses the thread safe IOLoop.add_callback.
106 This is thread safe as it uses the thread safe IOLoop.add_callback.
107 """
107 """
108 def add_io_state_callback():
108 def add_io_state_callback():
109 if not self.iostate & state:
109 if not self.iostate & state:
110 self.iostate = self.iostate | state
110 self.iostate = self.iostate | state
111 self.ioloop.update_handler(self.socket, self.iostate)
111 self.ioloop.update_handler(self.socket, self.iostate)
112 self.ioloop.add_callback(add_io_state_callback)
112 self.ioloop.add_callback(add_io_state_callback)
113
113
114 def drop_io_state(self, state):
114 def drop_io_state(self, state):
115 """Drop IO state from the eventloop.
115 """Drop IO state from the eventloop.
116
116
117 Parameters
117 Parameters
118 ----------
118 ----------
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
119 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
120 The IO state flag to set.
120 The IO state flag to set.
121
121
122 This is thread safe as it uses the thread safe IOLoop.add_callback.
122 This is thread safe as it uses the thread safe IOLoop.add_callback.
123 """
123 """
124 def drop_io_state_callback():
124 def drop_io_state_callback():
125 if self.iostate & state:
125 if self.iostate & state:
126 self.iostate = self.iostate & (~state)
126 self.iostate = self.iostate & (~state)
127 self.ioloop.update_handler(self.socket, self.iostate)
127 self.ioloop.update_handler(self.socket, self.iostate)
128 self.ioloop.add_callback(drop_io_state_callback)
128 self.ioloop.add_callback(drop_io_state_callback)
129
129
130
130
131 class XReqSocketChannel(ZmqSocketChannel):
131 class XReqSocketChannel(ZmqSocketChannel):
132 """The XREQ channel for issues request/replies to the kernel.
132 """The XREQ channel for issues request/replies to the kernel.
133 """
133 """
134
134
135 command_queue = None
135 command_queue = None
136
136
137 def __init__(self, context, session, address):
137 def __init__(self, context, session, address):
138 self.command_queue = Queue()
138 self.command_queue = Queue()
139 super(XReqSocketChannel, self).__init__(context, session, address)
139 super(XReqSocketChannel, self).__init__(context, session, address)
140
140
141 def run(self):
141 def run(self):
142 """The thread's main activity. Call start() instead."""
142 """The thread's main activity. Call start() instead."""
143 self.socket = self.context.socket(zmq.XREQ)
143 self.socket = self.context.socket(zmq.XREQ)
144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
144 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
145 self.socket.connect('tcp://%s:%i' % self.address)
145 self.socket.connect('tcp://%s:%i' % self.address)
146 self.ioloop = ioloop.IOLoop()
146 self.ioloop = ioloop.IOLoop()
147 self.iostate = POLLERR|POLLIN
147 self.iostate = POLLERR|POLLIN
148 self.ioloop.add_handler(self.socket, self._handle_events,
148 self.ioloop.add_handler(self.socket, self._handle_events,
149 self.iostate)
149 self.iostate)
150 self.ioloop.start()
150 self.ioloop.start()
151
151
152 def stop(self):
152 def stop(self):
153 self.ioloop.stop()
153 self.ioloop.stop()
154 super(XReqSocketChannel, self).stop()
154 super(XReqSocketChannel, self).stop()
155
155
156 def call_handlers(self, msg):
156 def call_handlers(self, msg):
157 """This method is called in the ioloop thread when a message arrives.
157 """This method is called in the ioloop thread when a message arrives.
158
158
159 Subclasses should override this method to handle incoming messages.
159 Subclasses should override this method to handle incoming messages.
160 It is important to remember that this method is called in the thread
160 It is important to remember that this method is called in the thread
161 so that some logic must be done to ensure that the application leve
161 so that some logic must be done to ensure that the application leve
162 handlers are called in the application thread.
162 handlers are called in the application thread.
163 """
163 """
164 raise NotImplementedError('call_handlers must be defined in a subclass.')
164 raise NotImplementedError('call_handlers must be defined in a subclass.')
165
165
166 def execute(self, code, silent=False):
166 def execute(self, code, silent=False):
167 """Execute code in the kernel.
167 """Execute code in the kernel.
168
168
169 Parameters
169 Parameters
170 ----------
170 ----------
171 code : str
171 code : str
172 A string of Python code.
172 A string of Python code.
173 silent : bool, optional (default False)
173 silent : bool, optional (default False)
174 If set, the kernel will execute the code as quietly possible.
174 If set, the kernel will execute the code as quietly possible.
175
175
176 Returns
176 Returns
177 -------
177 -------
178 The msg_id of the message sent.
178 The msg_id of the message sent.
179 """
179 """
180 # Create class for content/msg creation. Related to, but possibly
180 # Create class for content/msg creation. Related to, but possibly
181 # not in Session.
181 # not in Session.
182 content = dict(code=code, silent=silent)
182 content = dict(code=code, silent=silent)
183 msg = self.session.msg('execute_request', content)
183 msg = self.session.msg('execute_request', content)
184 self._queue_request(msg)
184 self._queue_request(msg)
185 return msg['header']['msg_id']
185 return msg['header']['msg_id']
186
186
187 def complete(self, text, line, cursor_pos, block=None):
187 def complete(self, text, line, cursor_pos, block=None):
188 """Tab complete text in the kernel's namespace.
188 """Tab complete text in the kernel's namespace.
189
189
190 Parameters
190 Parameters
191 ----------
191 ----------
192 text : str
192 text : str
193 The text to complete.
193 The text to complete.
194 line : str
194 line : str
195 The full line of text that is the surrounding context for the
195 The full line of text that is the surrounding context for the
196 text to complete.
196 text to complete.
197 cursor_pos : int
197 cursor_pos : int
198 The position of the cursor in the line where the completion was
198 The position of the cursor in the line where the completion was
199 requested.
199 requested.
200 block : str, optional
200 block : str, optional
201 The full block of code in which the completion is being requested.
201 The full block of code in which the completion is being requested.
202
202
203 Returns
203 Returns
204 -------
204 -------
205 The msg_id of the message sent.
205 The msg_id of the message sent.
206 """
206 """
207 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
207 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
208 msg = self.session.msg('complete_request', content)
208 msg = self.session.msg('complete_request', content)
209 self._queue_request(msg)
209 self._queue_request(msg)
210 return msg['header']['msg_id']
210 return msg['header']['msg_id']
211
211
212 def object_info(self, oname):
212 def object_info(self, oname):
213 """Get metadata information about an object.
213 """Get metadata information about an object.
214
214
215 Parameters
215 Parameters
216 ----------
216 ----------
217 oname : str
217 oname : str
218 A string specifying the object name.
218 A string specifying the object name.
219
219
220 Returns
220 Returns
221 -------
221 -------
222 The msg_id of the message sent.
222 The msg_id of the message sent.
223 """
223 """
224 content = dict(oname=oname)
224 content = dict(oname=oname)
225 msg = self.session.msg('object_info_request', content)
225 msg = self.session.msg('object_info_request', content)
226 self._queue_request(msg)
226 self._queue_request(msg)
227 return msg['header']['msg_id']
227 return msg['header']['msg_id']
228
228
229 def history(self, index=None, raw=False, output=True):
229 def history(self, index=None, raw=False, output=True):
230 """Get the history list.
230 """Get the history list.
231
231
232 Parameters
232 Parameters
233 ----------
233 ----------
234 index : n or (n1, n2) or None
234 index : n or (n1, n2) or None
235 If n, then the last entries. If a tuple, then all in
235 If n, then the last entries. If a tuple, then all in
236 range(n1, n2). If None, then all entries. Raises IndexError if
236 range(n1, n2). If None, then all entries. Raises IndexError if
237 the format of index is incorrect.
237 the format of index is incorrect.
238 raw : bool
238 raw : bool
239 If True, return the raw input.
239 If True, return the raw input.
240 output : bool
240 output : bool
241 If True, then return the output as well.
241 If True, then return the output as well.
242
242
243 Returns
243 Returns
244 -------
244 -------
245 The msg_id of the message sent.
245 The msg_id of the message sent.
246 """
246 """
247 content = dict(index=index, raw=raw, output=output)
247 content = dict(index=index, raw=raw, output=output)
248 msg = self.session.msg('history_request', content)
248 msg = self.session.msg('history_request', content)
249 self._queue_request(msg)
249 self._queue_request(msg)
250 return msg['header']['msg_id']
250 return msg['header']['msg_id']
251
251
252 def prompt(self):
252 def prompt(self):
253 """Requests a prompt number from the kernel.
253 """Requests a prompt number from the kernel.
254
254
255 Returns
255 Returns
256 -------
256 -------
257 The msg_id of the message sent.
257 The msg_id of the message sent.
258 """
258 """
259 msg = self.session.msg('prompt_request')
259 msg = self.session.msg('prompt_request')
260 self._queue_request(msg)
260 self._queue_request(msg)
261 return msg['header']['msg_id']
261 return msg['header']['msg_id']
262
262
263 def _handle_events(self, socket, events):
263 def _handle_events(self, socket, events):
264 if events & POLLERR:
264 if events & POLLERR:
265 self._handle_err()
265 self._handle_err()
266 if events & POLLOUT:
266 if events & POLLOUT:
267 self._handle_send()
267 self._handle_send()
268 if events & POLLIN:
268 if events & POLLIN:
269 self._handle_recv()
269 self._handle_recv()
270
270
271 def _handle_recv(self):
271 def _handle_recv(self):
272 msg = self.socket.recv_json()
272 msg = self.socket.recv_json()
273 self.call_handlers(msg)
273 self.call_handlers(msg)
274
274
275 def _handle_send(self):
275 def _handle_send(self):
276 try:
276 try:
277 msg = self.command_queue.get(False)
277 msg = self.command_queue.get(False)
278 except Empty:
278 except Empty:
279 pass
279 pass
280 else:
280 else:
281 self.socket.send_json(msg)
281 self.socket.send_json(msg)
282 if self.command_queue.empty():
282 if self.command_queue.empty():
283 self.drop_io_state(POLLOUT)
283 self.drop_io_state(POLLOUT)
284
284
285 def _handle_err(self):
285 def _handle_err(self):
286 # We don't want to let this go silently, so eventually we should log.
286 # We don't want to let this go silently, so eventually we should log.
287 raise zmq.ZMQError()
287 raise zmq.ZMQError()
288
288
289 def _queue_request(self, msg):
289 def _queue_request(self, msg):
290 self.command_queue.put(msg)
290 self.command_queue.put(msg)
291 self.add_io_state(POLLOUT)
291 self.add_io_state(POLLOUT)
292
292
293
293
294 class SubSocketChannel(ZmqSocketChannel):
294 class SubSocketChannel(ZmqSocketChannel):
295 """The SUB channel which listens for messages that the kernel publishes.
295 """The SUB channel which listens for messages that the kernel publishes.
296 """
296 """
297
297
298 def __init__(self, context, session, address):
298 def __init__(self, context, session, address):
299 super(SubSocketChannel, self).__init__(context, session, address)
299 super(SubSocketChannel, self).__init__(context, session, address)
300
300
301 def run(self):
301 def run(self):
302 """The thread's main activity. Call start() instead."""
302 """The thread's main activity. Call start() instead."""
303 self.socket = self.context.socket(zmq.SUB)
303 self.socket = self.context.socket(zmq.SUB)
304 self.socket.setsockopt(zmq.SUBSCRIBE,'')
304 self.socket.setsockopt(zmq.SUBSCRIBE,'')
305 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
305 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
306 self.socket.connect('tcp://%s:%i' % self.address)
306 self.socket.connect('tcp://%s:%i' % self.address)
307 self.ioloop = ioloop.IOLoop()
307 self.ioloop = ioloop.IOLoop()
308 self.iostate = POLLIN|POLLERR
308 self.iostate = POLLIN|POLLERR
309 self.ioloop.add_handler(self.socket, self._handle_events,
309 self.ioloop.add_handler(self.socket, self._handle_events,
310 self.iostate)
310 self.iostate)
311 self.ioloop.start()
311 self.ioloop.start()
312
312
313 def stop(self):
313 def stop(self):
314 self.ioloop.stop()
314 self.ioloop.stop()
315 super(SubSocketChannel, self).stop()
315 super(SubSocketChannel, self).stop()
316
316
317 def call_handlers(self, msg):
317 def call_handlers(self, msg):
318 """This method is called in the ioloop thread when a message arrives.
318 """This method is called in the ioloop thread when a message arrives.
319
319
320 Subclasses should override this method to handle incoming messages.
320 Subclasses should override this method to handle incoming messages.
321 It is important to remember that this method is called in the thread
321 It is important to remember that this method is called in the thread
322 so that some logic must be done to ensure that the application leve
322 so that some logic must be done to ensure that the application leve
323 handlers are called in the application thread.
323 handlers are called in the application thread.
324 """
324 """
325 raise NotImplementedError('call_handlers must be defined in a subclass.')
325 raise NotImplementedError('call_handlers must be defined in a subclass.')
326
326
327 def flush(self, timeout=1.0):
327 def flush(self, timeout=1.0):
328 """Immediately processes all pending messages on the SUB channel.
328 """Immediately processes all pending messages on the SUB channel.
329
329
330 Callers should use this method to ensure that :method:`call_handlers`
330 Callers should use this method to ensure that :method:`call_handlers`
331 has been called for all messages that have been received on the
331 has been called for all messages that have been received on the
332 0MQ SUB socket of this channel.
332 0MQ SUB socket of this channel.
333
333
334 This method is thread safe.
334 This method is thread safe.
335
335
336 Parameters
336 Parameters
337 ----------
337 ----------
338 timeout : float, optional
338 timeout : float, optional
339 The maximum amount of time to spend flushing, in seconds. The
339 The maximum amount of time to spend flushing, in seconds. The
340 default is one second.
340 default is one second.
341 """
341 """
342 # We do the IOLoop callback process twice to ensure that the IOLoop
342 # We do the IOLoop callback process twice to ensure that the IOLoop
343 # gets to perform at least one full poll.
343 # gets to perform at least one full poll.
344 stop_time = time.time() + timeout
344 stop_time = time.time() + timeout
345 for i in xrange(2):
345 for i in xrange(2):
346 self._flushed = False
346 self._flushed = False
347 self.ioloop.add_callback(self._flush)
347 self.ioloop.add_callback(self._flush)
348 while not self._flushed and time.time() < stop_time:
348 while not self._flushed and time.time() < stop_time:
349 time.sleep(0.01)
349 time.sleep(0.01)
350
350
351 def _handle_events(self, socket, events):
351 def _handle_events(self, socket, events):
352 # Turn on and off POLLOUT depending on if we have made a request
352 # Turn on and off POLLOUT depending on if we have made a request
353 if events & POLLERR:
353 if events & POLLERR:
354 self._handle_err()
354 self._handle_err()
355 if events & POLLIN:
355 if events & POLLIN:
356 self._handle_recv()
356 self._handle_recv()
357
357
358 def _handle_err(self):
358 def _handle_err(self):
359 # We don't want to let this go silently, so eventually we should log.
359 # We don't want to let this go silently, so eventually we should log.
360 raise zmq.ZMQError()
360 raise zmq.ZMQError()
361
361
362 def _handle_recv(self):
362 def _handle_recv(self):
363 # Get all of the messages we can
363 # Get all of the messages we can
364 while True:
364 while True:
365 try:
365 try:
366 msg = self.socket.recv_json(zmq.NOBLOCK)
366 msg = self.socket.recv_json(zmq.NOBLOCK)
367 except zmq.ZMQError:
367 except zmq.ZMQError:
368 # Check the errno?
368 # Check the errno?
369 # Will this trigger POLLERR?
369 # Will this trigger POLLERR?
370 break
370 break
371 else:
371 else:
372 self.call_handlers(msg)
372 self.call_handlers(msg)
373
373
374 def _flush(self):
374 def _flush(self):
375 """Callback for :method:`self.flush`."""
375 """Callback for :method:`self.flush`."""
376 self._flushed = True
376 self._flushed = True
377
377
378
378
379 class RepSocketChannel(ZmqSocketChannel):
379 class RepSocketChannel(ZmqSocketChannel):
380 """A reply channel to handle raw_input requests that the kernel makes."""
380 """A reply channel to handle raw_input requests that the kernel makes."""
381
381
382 msg_queue = None
382 msg_queue = None
383
383
384 def __init__(self, context, session, address):
384 def __init__(self, context, session, address):
385 self.msg_queue = Queue()
385 self.msg_queue = Queue()
386 super(RepSocketChannel, self).__init__(context, session, address)
386 super(RepSocketChannel, self).__init__(context, session, address)
387
387
388 def run(self):
388 def run(self):
389 """The thread's main activity. Call start() instead."""
389 """The thread's main activity. Call start() instead."""
390 self.socket = self.context.socket(zmq.XREQ)
390 self.socket = self.context.socket(zmq.XREQ)
391 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
391 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
392 self.socket.connect('tcp://%s:%i' % self.address)
392 self.socket.connect('tcp://%s:%i' % self.address)
393 self.ioloop = ioloop.IOLoop()
393 self.ioloop = ioloop.IOLoop()
394 self.iostate = POLLERR|POLLIN
394 self.iostate = POLLERR|POLLIN
395 self.ioloop.add_handler(self.socket, self._handle_events,
395 self.ioloop.add_handler(self.socket, self._handle_events,
396 self.iostate)
396 self.iostate)
397 self.ioloop.start()
397 self.ioloop.start()
398
398
399 def stop(self):
399 def stop(self):
400 self.ioloop.stop()
400 self.ioloop.stop()
401 super(RepSocketChannel, self).stop()
401 super(RepSocketChannel, self).stop()
402
402
403 def call_handlers(self, msg):
403 def call_handlers(self, msg):
404 """This method is called in the ioloop thread when a message arrives.
404 """This method is called in the ioloop thread when a message arrives.
405
405
406 Subclasses should override this method to handle incoming messages.
406 Subclasses should override this method to handle incoming messages.
407 It is important to remember that this method is called in the thread
407 It is important to remember that this method is called in the thread
408 so that some logic must be done to ensure that the application leve
408 so that some logic must be done to ensure that the application leve
409 handlers are called in the application thread.
409 handlers are called in the application thread.
410 """
410 """
411 raise NotImplementedError('call_handlers must be defined in a subclass.')
411 raise NotImplementedError('call_handlers must be defined in a subclass.')
412
412
413 def input(self, string):
413 def input(self, string):
414 """Send a string of raw input to the kernel."""
414 """Send a string of raw input to the kernel."""
415 content = dict(value=string)
415 content = dict(value=string)
416 msg = self.session.msg('input_reply', content)
416 msg = self.session.msg('input_reply', content)
417 self._queue_reply(msg)
417 self._queue_reply(msg)
418
418
419 def _handle_events(self, socket, events):
419 def _handle_events(self, socket, events):
420 if events & POLLERR:
420 if events & POLLERR:
421 self._handle_err()
421 self._handle_err()
422 if events & POLLOUT:
422 if events & POLLOUT:
423 self._handle_send()
423 self._handle_send()
424 if events & POLLIN:
424 if events & POLLIN:
425 self._handle_recv()
425 self._handle_recv()
426
426
427 def _handle_recv(self):
427 def _handle_recv(self):
428 msg = self.socket.recv_json()
428 msg = self.socket.recv_json()
429 self.call_handlers(msg)
429 self.call_handlers(msg)
430
430
431 def _handle_send(self):
431 def _handle_send(self):
432 try:
432 try:
433 msg = self.msg_queue.get(False)
433 msg = self.msg_queue.get(False)
434 except Empty:
434 except Empty:
435 pass
435 pass
436 else:
436 else:
437 self.socket.send_json(msg)
437 self.socket.send_json(msg)
438 if self.msg_queue.empty():
438 if self.msg_queue.empty():
439 self.drop_io_state(POLLOUT)
439 self.drop_io_state(POLLOUT)
440
440
441 def _handle_err(self):
441 def _handle_err(self):
442 # We don't want to let this go silently, so eventually we should log.
442 # We don't want to let this go silently, so eventually we should log.
443 raise zmq.ZMQError()
443 raise zmq.ZMQError()
444
444
445 def _queue_reply(self, msg):
445 def _queue_reply(self, msg):
446 self.msg_queue.put(msg)
446 self.msg_queue.put(msg)
447 self.add_io_state(POLLOUT)
447 self.add_io_state(POLLOUT)
448
448
449
449
450 class HBSocketChannel(ZmqSocketChannel):
450 class HBSocketChannel(ZmqSocketChannel):
451 """The heartbeat channel which monitors the kernel heartbeat.
451 """The heartbeat channel which monitors the kernel heartbeat."""
452 """
453
452
454 time_to_dead = 5.0
453 time_to_dead = 5.0
455 socket = None
454 socket = None
456 poller = None
455 poller = None
457
456
458 def __init__(self, context, session, address):
457 def __init__(self, context, session, address):
459 super(HBSocketChannel, self).__init__(context, session, address)
458 super(HBSocketChannel, self).__init__(context, session, address)
459 self._running = False
460
460
461 def _create_socket(self):
461 def _create_socket(self):
462 self.socket = self.context.socket(zmq.REQ)
462 self.socket = self.context.socket(zmq.REQ)
463 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
463 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
464 self.socket.connect('tcp://%s:%i' % self.address)
464 self.socket.connect('tcp://%s:%i' % self.address)
465 self.poller = zmq.Poller()
465 self.poller = zmq.Poller()
466 self.poller.register(self.socket, zmq.POLLIN)
466 self.poller.register(self.socket, zmq.POLLIN)
467
467
468 def run(self):
468 def run(self):
469 """The thread's main activity. Call start() instead."""
469 """The thread's main activity. Call start() instead."""
470 self._create_socket()
470 self._create_socket()
471
471
472 while True:
472 self._running = True
473 while self._running:
473 since_last_heartbeat = 0.0
474 since_last_heartbeat = 0.0
474 request_time = time.time()
475 request_time = time.time()
475 try:
476 try:
476 self.socket.send_json('ping')
477 self.socket.send_json('ping')
477 except zmq.ZMQError, e:
478 except zmq.ZMQError, e:
478 if e.errno == zmq.EFSM:
479 if e.errno == zmq.EFSM:
479 time.sleep(self.time_to_dead)
480 time.sleep(self.time_to_dead)
480 self._create_socket()
481 self._create_socket()
481 else:
482 else:
482 raise
483 raise
483 else:
484 else:
484 while True:
485 while True:
485 try:
486 try:
486 reply = self.socket.recv_json(zmq.NOBLOCK)
487 reply = self.socket.recv_json(zmq.NOBLOCK)
487 except zmq.ZMQError, e:
488 except zmq.ZMQError, e:
488 if e.errno == zmq.EAGAIN:
489 if e.errno == zmq.EAGAIN:
489 until_dead = self.time_to_dead-(time.time()-request_time)
490 until_dead = self.time_to_dead - (time.time() -
491 request_time)
490 self.poller.poll(until_dead)
492 self.poller.poll(until_dead)
491 since_last_heartbeat = time.time() - request_time
493 since_last_heartbeat = time.time() - request_time
492 if since_last_heartbeat > self.time_to_dead:
494 if since_last_heartbeat > self.time_to_dead:
493 self.call_handlers(since_last_heartbeat)
495 self.call_handlers(since_last_heartbeat)
494 break
496 break
495 else:
497 else:
496 # We should probably log this instead
498 # FIXME: We should probably log this instead.
497 raise
499 raise
498 else:
500 else:
499 until_dead = self.time_to_dead-(time.time()-request_time)
501 until_dead = self.time_to_dead - (time.time() -
502 request_time)
500 if until_dead > 0.0:
503 if until_dead > 0.0:
501 time.sleep(until_dead)
504 time.sleep(until_dead)
502 break
505 break
503
506
507 def stop(self):
508 self._running = False
509 super(HBSocketChannel, self).stop()
510
504 def call_handlers(self, since_last_heartbeat):
511 def call_handlers(self, since_last_heartbeat):
505 """This method is called in the ioloop thread when a message arrives.
512 """This method is called in the ioloop thread when a message arrives.
506
513
507 Subclasses should override this method to handle incoming messages.
514 Subclasses should override this method to handle incoming messages.
508 It is important to remember that this method is called in the thread
515 It is important to remember that this method is called in the thread
509 so that some logic must be done to ensure that the application leve
516 so that some logic must be done to ensure that the application leve
510 handlers are called in the application thread.
517 handlers are called in the application thread.
511 """
518 """
512 raise NotImplementedError('call_handlers must be defined in a subclass.')
519 raise NotImplementedError('call_handlers must be defined in a subclass.')
513
520
514
521
515 #-----------------------------------------------------------------------------
522 #-----------------------------------------------------------------------------
516 # Main kernel manager class
523 # Main kernel manager class
517 #-----------------------------------------------------------------------------
524 #-----------------------------------------------------------------------------
518
525
519 class KernelManager(HasTraits):
526 class KernelManager(HasTraits):
520 """ Manages a kernel for a frontend.
527 """ Manages a kernel for a frontend.
521
528
522 The SUB channel is for the frontend to receive messages published by the
529 The SUB channel is for the frontend to receive messages published by the
523 kernel.
530 kernel.
524
531
525 The REQ channel is for the frontend to make requests of the kernel.
532 The REQ channel is for the frontend to make requests of the kernel.
526
533
527 The REP channel is for the kernel to request stdin (raw_input) from the
534 The REP channel is for the kernel to request stdin (raw_input) from the
528 frontend.
535 frontend.
529 """
536 """
530 # The PyZMQ Context to use for communication with the kernel.
537 # The PyZMQ Context to use for communication with the kernel.
531 context = Instance(zmq.Context,(),{})
538 context = Instance(zmq.Context,(),{})
532
539
533 # The Session to use for communication with the kernel.
540 # The Session to use for communication with the kernel.
534 session = Instance(Session,(),{})
541 session = Instance(Session,(),{})
535
542
536 # The kernel process with which the KernelManager is communicating.
543 # The kernel process with which the KernelManager is communicating.
537 kernel = Instance(Popen)
544 kernel = Instance(Popen)
538
545
539 # The addresses for the communication channels.
546 # The addresses for the communication channels.
540 xreq_address = TCPAddress((LOCALHOST, 0))
547 xreq_address = TCPAddress((LOCALHOST, 0))
541 sub_address = TCPAddress((LOCALHOST, 0))
548 sub_address = TCPAddress((LOCALHOST, 0))
542 rep_address = TCPAddress((LOCALHOST, 0))
549 rep_address = TCPAddress((LOCALHOST, 0))
543 hb_address = TCPAddress((LOCALHOST, 0))
550 hb_address = TCPAddress((LOCALHOST, 0))
544
551
545 # The classes to use for the various channels.
552 # The classes to use for the various channels.
546 xreq_channel_class = Type(XReqSocketChannel)
553 xreq_channel_class = Type(XReqSocketChannel)
547 sub_channel_class = Type(SubSocketChannel)
554 sub_channel_class = Type(SubSocketChannel)
548 rep_channel_class = Type(RepSocketChannel)
555 rep_channel_class = Type(RepSocketChannel)
549 hb_channel_class = Type(HBSocketChannel)
556 hb_channel_class = Type(HBSocketChannel)
550
557
551 # Protected traits.
558 # Protected traits.
552 _launch_args = Any
559 _launch_args = Any
553 _xreq_channel = Any
560 _xreq_channel = Any
554 _sub_channel = Any
561 _sub_channel = Any
555 _rep_channel = Any
562 _rep_channel = Any
556 _hb_channel = Any
563 _hb_channel = Any
557
564
558 #--------------------------------------------------------------------------
565 #--------------------------------------------------------------------------
559 # Channel management methods:
566 # Channel management methods:
560 #--------------------------------------------------------------------------
567 #--------------------------------------------------------------------------
561
568
562 def start_channels(self):
569 def start_channels(self):
563 """Starts the channels for this kernel.
570 """Starts the channels for this kernel.
564
571
565 This will create the channels if they do not exist and then start
572 This will create the channels if they do not exist and then start
566 them. If port numbers of 0 are being used (random ports) then you
573 them. If port numbers of 0 are being used (random ports) then you
567 must first call :method:`start_kernel`. If the channels have been
574 must first call :method:`start_kernel`. If the channels have been
568 stopped and you call this, :class:`RuntimeError` will be raised.
575 stopped and you call this, :class:`RuntimeError` will be raised.
569 """
576 """
570 self.xreq_channel.start()
577 self.xreq_channel.start()
571 self.sub_channel.start()
578 self.sub_channel.start()
572 self.rep_channel.start()
579 self.rep_channel.start()
573 self.hb_channel.start()
580 self.hb_channel.start()
574
581
575 def stop_channels(self):
582 def stop_channels(self):
576 """Stops the channels for this kernel.
583 """Stops the channels for this kernel.
577
584
578 This stops the channels by joining their threads. If the channels
585 This stops the channels by joining their threads. If the channels
579 were not started, :class:`RuntimeError` will be raised.
586 were not started, :class:`RuntimeError` will be raised.
580 """
587 """
581 self.xreq_channel.stop()
588 self.xreq_channel.stop()
582 self.sub_channel.stop()
589 self.sub_channel.stop()
583 self.rep_channel.stop()
590 self.rep_channel.stop()
584 self.hb_channel.stop()
591 self.hb_channel.stop()
585
592
586 @property
593 @property
587 def channels_running(self):
594 def channels_running(self):
588 """Are all of the channels created and running?"""
595 """Are all of the channels created and running?"""
589 return self.xreq_channel.is_alive() \
596 return self.xreq_channel.is_alive() \
590 and self.sub_channel.is_alive() \
597 and self.sub_channel.is_alive() \
591 and self.rep_channel.is_alive() \
598 and self.rep_channel.is_alive() \
592 and self.hb_channel.is_alive()
599 and self.hb_channel.is_alive()
593
600
594 #--------------------------------------------------------------------------
601 #--------------------------------------------------------------------------
595 # Kernel process management methods:
602 # Kernel process management methods:
596 #--------------------------------------------------------------------------
603 #--------------------------------------------------------------------------
597
604
598 def start_kernel(self, **kw):
605 def start_kernel(self, **kw):
599 """Starts a kernel process and configures the manager to use it.
606 """Starts a kernel process and configures the manager to use it.
600
607
601 If random ports (port=0) are being used, this method must be called
608 If random ports (port=0) are being used, this method must be called
602 before the channels are created.
609 before the channels are created.
603
610
604 Parameters:
611 Parameters:
605 -----------
612 -----------
606 ipython : bool, optional (default True)
613 ipython : bool, optional (default True)
607 Whether to use an IPython kernel instead of a plain Python kernel.
614 Whether to use an IPython kernel instead of a plain Python kernel.
608 """
615 """
609 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
616 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
610 self.rep_address, self.hb_address
617 self.rep_address, self.hb_address
611 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST or hb[0] != LOCALHOST:
618 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
619 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
612 raise RuntimeError("Can only launch a kernel on localhost."
620 raise RuntimeError("Can only launch a kernel on localhost."
613 "Make sure that the '*_address' attributes are "
621 "Make sure that the '*_address' attributes are "
614 "configured properly.")
622 "configured properly.")
615
623
616 self._launch_args = kw.copy()
624 self._launch_args = kw.copy()
617 if kw.pop('ipython', True):
625 if kw.pop('ipython', True):
618 from ipkernel import launch_kernel as launch
626 from ipkernel import launch_kernel as launch
619 else:
627 else:
620 from pykernel import launch_kernel as launch
628 from pykernel import launch_kernel as launch
621 self.kernel, xrep, pub, req, hb = launch(
629 self.kernel, xrep, pub, req, hb = launch(
622 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1],
630 xrep_port=xreq[1], pub_port=sub[1],
623 hb_port=hb[1], **kw)
631 req_port=rep[1], hb_port=hb[1], **kw)
624 self.xreq_address = (LOCALHOST, xrep)
632 self.xreq_address = (LOCALHOST, xrep)
625 self.sub_address = (LOCALHOST, pub)
633 self.sub_address = (LOCALHOST, pub)
626 self.rep_address = (LOCALHOST, req)
634 self.rep_address = (LOCALHOST, req)
627 self.hb_address = (LOCALHOST, hb)
635 self.hb_address = (LOCALHOST, hb)
628
636
629 def restart_kernel(self):
637 def restart_kernel(self):
630 """Restarts a kernel with the same arguments that were used to launch
638 """Restarts a kernel with the same arguments that were used to launch
631 it. If the old kernel was launched with random ports, the same ports
639 it. If the old kernel was launched with random ports, the same ports
632 will be used for the new kernel.
640 will be used for the new kernel.
633 """
641 """
634 if self._launch_args is None:
642 if self._launch_args is None:
635 raise RuntimeError("Cannot restart the kernel. "
643 raise RuntimeError("Cannot restart the kernel. "
636 "No previous call to 'start_kernel'.")
644 "No previous call to 'start_kernel'.")
637 else:
645 else:
638 if self.has_kernel:
646 if self.has_kernel:
639 self.kill_kernel()
647 self.kill_kernel()
640 self.start_kernel(*self._launch_args)
648 self.start_kernel(**self._launch_args)
641
649
642 @property
650 @property
643 def has_kernel(self):
651 def has_kernel(self):
644 """Returns whether a kernel process has been specified for the kernel
652 """Returns whether a kernel process has been specified for the kernel
645 manager.
653 manager.
646 """
654 """
647 return self.kernel is not None
655 return self.kernel is not None
648
656
649 def kill_kernel(self):
657 def kill_kernel(self):
650 """ Kill the running kernel. """
658 """ Kill the running kernel. """
651 if self.kernel is not None:
659 if self.kernel is not None:
652 self.kernel.kill()
660 self.kernel.kill()
653 self.kernel = None
661 self.kernel = None
654 else:
662 else:
655 raise RuntimeError("Cannot kill kernel. No kernel is running!")
663 raise RuntimeError("Cannot kill kernel. No kernel is running!")
656
664
657 def signal_kernel(self, signum):
665 def signal_kernel(self, signum):
658 """ Sends a signal to the kernel. """
666 """ Sends a signal to the kernel. """
659 if self.kernel is not None:
667 if self.kernel is not None:
660 self.kernel.send_signal(signum)
668 self.kernel.send_signal(signum)
661 else:
669 else:
662 raise RuntimeError("Cannot signal kernel. No kernel is running!")
670 raise RuntimeError("Cannot signal kernel. No kernel is running!")
663
671
664 @property
672 @property
665 def is_alive(self):
673 def is_alive(self):
666 """Is the kernel process still running?"""
674 """Is the kernel process still running?"""
667 if self.kernel is not None:
675 if self.kernel is not None:
668 if self.kernel.poll() is None:
676 if self.kernel.poll() is None:
669 return True
677 return True
670 else:
678 else:
671 return False
679 return False
672 else:
680 else:
673 # We didn't start the kernel with this KernelManager so we don't
681 # We didn't start the kernel with this KernelManager so we don't
674 # know if it is running. We should use a heartbeat for this case.
682 # know if it is running. We should use a heartbeat for this case.
675 return True
683 return True
676
684
677 #--------------------------------------------------------------------------
685 #--------------------------------------------------------------------------
678 # Channels used for communication with the kernel:
686 # Channels used for communication with the kernel:
679 #--------------------------------------------------------------------------
687 #--------------------------------------------------------------------------
680
688
681 @property
689 @property
682 def xreq_channel(self):
690 def xreq_channel(self):
683 """Get the REQ socket channel object to make requests of the kernel."""
691 """Get the REQ socket channel object to make requests of the kernel."""
684 if self._xreq_channel is None:
692 if self._xreq_channel is None:
685 self._xreq_channel = self.xreq_channel_class(self.context,
693 self._xreq_channel = self.xreq_channel_class(self.context,
686 self.session,
694 self.session,
687 self.xreq_address)
695 self.xreq_address)
688 return self._xreq_channel
696 return self._xreq_channel
689
697
690 @property
698 @property
691 def sub_channel(self):
699 def sub_channel(self):
692 """Get the SUB socket channel object."""
700 """Get the SUB socket channel object."""
693 if self._sub_channel is None:
701 if self._sub_channel is None:
694 self._sub_channel = self.sub_channel_class(self.context,
702 self._sub_channel = self.sub_channel_class(self.context,
695 self.session,
703 self.session,
696 self.sub_address)
704 self.sub_address)
697 return self._sub_channel
705 return self._sub_channel
698
706
699 @property
707 @property
700 def rep_channel(self):
708 def rep_channel(self):
701 """Get the REP socket channel object to handle stdin (raw_input)."""
709 """Get the REP socket channel object to handle stdin (raw_input)."""
702 if self._rep_channel is None:
710 if self._rep_channel is None:
703 self._rep_channel = self.rep_channel_class(self.context,
711 self._rep_channel = self.rep_channel_class(self.context,
704 self.session,
712 self.session,
705 self.rep_address)
713 self.rep_address)
706 return self._rep_channel
714 return self._rep_channel
707
715
708 @property
716 @property
709 def hb_channel(self):
717 def hb_channel(self):
710 """Get the REP socket channel object to handle stdin (raw_input)."""
718 """Get the REP socket channel object to handle stdin (raw_input)."""
711 if self._hb_channel is None:
719 if self._hb_channel is None:
712 self._hb_channel = self.hb_channel_class(self.context,
720 self._hb_channel = self.hb_channel_class(self.context,
713 self.session,
721 self.session,
714 self.hb_address)
722 self.hb_address)
715 return self._hb_channel
723 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now