##// END OF EJS Templates
Adding some temporary hacks to work around (Py)ZMQ bugs on Windows.
epatters -
Show More
@@ -1,840 +1,851 b''
1 """Base classes to manage the interaction with a running kernel.
1 """Base classes to manage the interaction with a running kernel.
2
2
3 Todo
3 Todo
4 ====
4 ====
5
5
6 * Create logger to handle debugging and console messages.
6 * Create logger to handle debugging and console messages.
7 """
7 """
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2008-2010 The IPython Development Team
10 # Copyright (C) 2008-2010 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 # Standard library imports.
20 # Standard library imports.
21 from Queue import Queue, Empty
21 from Queue import Queue, Empty
22 from subprocess import Popen
22 from subprocess import Popen
23 import sys
23 from threading import Thread
24 from threading import Thread
24 import time
25 import time
25
26
26 # System library imports.
27 # System library imports.
27 import zmq
28 import zmq
28 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq import POLLIN, POLLOUT, POLLERR
29 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
30
31
31 # Local imports.
32 # Local imports.
32 from IPython.utils import io
33 from IPython.utils import io
33 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 from session import Session
35 from session import Session
35
36
36 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
37 # Constants and exceptions
38 # Constants and exceptions
38 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
39
40
40 LOCALHOST = '127.0.0.1'
41 LOCALHOST = '127.0.0.1'
41
42
42 class InvalidPortNumber(Exception):
43 class InvalidPortNumber(Exception):
43 pass
44 pass
44
45
45 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
46 # Utility functions
47 # Utility functions
47 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
48
49
49 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
50 # if they prove to have more generic utility
51 # if they prove to have more generic utility
51
52
52 def validate_string_list(lst):
53 def validate_string_list(lst):
53 """Validate that the input is a list of strings.
54 """Validate that the input is a list of strings.
54
55
55 Raises ValueError if not."""
56 Raises ValueError if not."""
56 if not isinstance(lst, list):
57 if not isinstance(lst, list):
57 raise ValueError('input %r must be a list' % lst)
58 raise ValueError('input %r must be a list' % lst)
58 for x in lst:
59 for x in lst:
59 if not isinstance(x, basestring):
60 if not isinstance(x, basestring):
60 raise ValueError('element %r in list must be a string' % x)
61 raise ValueError('element %r in list must be a string' % x)
61
62
62
63
63 def validate_string_dict(dct):
64 def validate_string_dict(dct):
64 """Validate that the input is a dict with string keys and values.
65 """Validate that the input is a dict with string keys and values.
65
66
66 Raises ValueError if not."""
67 Raises ValueError if not."""
67 for k,v in dct.iteritems():
68 for k,v in dct.iteritems():
68 if not isinstance(k, basestring):
69 if not isinstance(k, basestring):
69 raise ValueError('key %r in dict must be a string' % k)
70 raise ValueError('key %r in dict must be a string' % k)
70 if not isinstance(v, basestring):
71 if not isinstance(v, basestring):
71 raise ValueError('value %r in dict must be a string' % v)
72 raise ValueError('value %r in dict must be a string' % v)
72
73
73
74
74 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
75 # ZMQ Socket Channel classes
76 # ZMQ Socket Channel classes
76 #-----------------------------------------------------------------------------
77 #-----------------------------------------------------------------------------
77
78
78 class ZmqSocketChannel(Thread):
79 class ZmqSocketChannel(Thread):
79 """The base class for the channels that use ZMQ sockets.
80 """The base class for the channels that use ZMQ sockets.
80 """
81 """
81 context = None
82 context = None
82 session = None
83 session = None
83 socket = None
84 socket = None
84 ioloop = None
85 ioloop = None
85 iostate = None
86 iostate = None
86 _address = None
87 _address = None
87
88
88 def __init__(self, context, session, address):
89 def __init__(self, context, session, address):
89 """Create a channel
90 """Create a channel
90
91
91 Parameters
92 Parameters
92 ----------
93 ----------
93 context : :class:`zmq.Context`
94 context : :class:`zmq.Context`
94 The ZMQ context to use.
95 The ZMQ context to use.
95 session : :class:`session.Session`
96 session : :class:`session.Session`
96 The session to use.
97 The session to use.
97 address : tuple
98 address : tuple
98 Standard (ip, port) tuple that the kernel is listening on.
99 Standard (ip, port) tuple that the kernel is listening on.
99 """
100 """
100 super(ZmqSocketChannel, self).__init__()
101 super(ZmqSocketChannel, self).__init__()
101 self.daemon = True
102 self.daemon = True
102
103
103 self.context = context
104 self.context = context
104 self.session = session
105 self.session = session
105 if address[1] == 0:
106 if address[1] == 0:
106 message = 'The port number for a channel cannot be 0.'
107 message = 'The port number for a channel cannot be 0.'
107 raise InvalidPortNumber(message)
108 raise InvalidPortNumber(message)
108 self._address = address
109 self._address = address
109
110
110 def stop(self):
111 def stop(self):
111 """Stop the channel's activity.
112 """Stop the channel's activity.
112
113
113 This calls :method:`Thread.join` and returns when the thread
114 This calls :method:`Thread.join` and returns when the thread
114 terminates. :class:`RuntimeError` will be raised if
115 terminates. :class:`RuntimeError` will be raised if
115 :method:`self.start` is called again.
116 :method:`self.start` is called again.
116 """
117 """
117 self.join()
118 self.join()
118
119
119 @property
120 @property
120 def address(self):
121 def address(self):
121 """Get the channel's address as an (ip, port) tuple.
122 """Get the channel's address as an (ip, port) tuple.
122
123
123 By the default, the address is (localhost, 0), where 0 means a random
124 By the default, the address is (localhost, 0), where 0 means a random
124 port.
125 port.
125 """
126 """
126 return self._address
127 return self._address
127
128
128 def add_io_state(self, state):
129 def add_io_state(self, state):
129 """Add IO state to the eventloop.
130 """Add IO state to the eventloop.
130
131
131 Parameters
132 Parameters
132 ----------
133 ----------
133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 The IO state flag to set.
135 The IO state flag to set.
135
136
136 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 """
138 """
138 def add_io_state_callback():
139 def add_io_state_callback():
139 if not self.iostate & state:
140 if not self.iostate & state:
140 self.iostate = self.iostate | state
141 self.iostate = self.iostate | state
141 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.update_handler(self.socket, self.iostate)
142 self.ioloop.add_callback(add_io_state_callback)
143 self.ioloop.add_callback(add_io_state_callback)
143
144
144 def drop_io_state(self, state):
145 def drop_io_state(self, state):
145 """Drop IO state from the eventloop.
146 """Drop IO state from the eventloop.
146
147
147 Parameters
148 Parameters
148 ----------
149 ----------
149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 The IO state flag to set.
151 The IO state flag to set.
151
152
152 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 """
154 """
154 def drop_io_state_callback():
155 def drop_io_state_callback():
155 if self.iostate & state:
156 if self.iostate & state:
156 self.iostate = self.iostate & (~state)
157 self.iostate = self.iostate & (~state)
157 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.update_handler(self.socket, self.iostate)
158 self.ioloop.add_callback(drop_io_state_callback)
159 self.ioloop.add_callback(drop_io_state_callback)
159
160
160
161
161 class XReqSocketChannel(ZmqSocketChannel):
162 class XReqSocketChannel(ZmqSocketChannel):
162 """The XREQ channel for issues request/replies to the kernel.
163 """The XREQ channel for issues request/replies to the kernel.
163 """
164 """
164
165
165 command_queue = None
166 command_queue = None
166
167
167 def __init__(self, context, session, address):
168 def __init__(self, context, session, address):
168 self.command_queue = Queue()
169 self.command_queue = Queue()
169 super(XReqSocketChannel, self).__init__(context, session, address)
170 super(XReqSocketChannel, self).__init__(context, session, address)
170
171
171 def run(self):
172 def run(self):
172 """The thread's main activity. Call start() instead."""
173 """The thread's main activity. Call start() instead."""
173 self.socket = self.context.socket(zmq.XREQ)
174 self.socket = self.context.socket(zmq.XREQ)
174 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
175 self.socket.connect('tcp://%s:%i' % self.address)
176 self.socket.connect('tcp://%s:%i' % self.address)
176 self.ioloop = ioloop.IOLoop()
177 self.ioloop = ioloop.IOLoop()
177 self.iostate = POLLERR|POLLIN
178 self.iostate = POLLERR|POLLIN
178 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.ioloop.add_handler(self.socket, self._handle_events,
179 self.iostate)
180 self.iostate)
180 self.ioloop.start()
181 self.ioloop.start()
181
182
182 def stop(self):
183 def stop(self):
183 self.ioloop.stop()
184 self.ioloop.stop()
184 super(XReqSocketChannel, self).stop()
185 super(XReqSocketChannel, self).stop()
185
186
186 def call_handlers(self, msg):
187 def call_handlers(self, msg):
187 """This method is called in the ioloop thread when a message arrives.
188 """This method is called in the ioloop thread when a message arrives.
188
189
189 Subclasses should override this method to handle incoming messages.
190 Subclasses should override this method to handle incoming messages.
190 It is important to remember that this method is called in the thread
191 It is important to remember that this method is called in the thread
191 so that some logic must be done to ensure that the application leve
192 so that some logic must be done to ensure that the application leve
192 handlers are called in the application thread.
193 handlers are called in the application thread.
193 """
194 """
194 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195
196
196 def execute(self, code, silent=False,
197 def execute(self, code, silent=False,
197 user_variables=None, user_expressions=None):
198 user_variables=None, user_expressions=None):
198 """Execute code in the kernel.
199 """Execute code in the kernel.
199
200
200 Parameters
201 Parameters
201 ----------
202 ----------
202 code : str
203 code : str
203 A string of Python code.
204 A string of Python code.
204
205
205 silent : bool, optional (default False)
206 silent : bool, optional (default False)
206 If set, the kernel will execute the code as quietly possible.
207 If set, the kernel will execute the code as quietly possible.
207
208
208 user_variables : list, optional
209 user_variables : list, optional
209 A list of variable names to pull from the user's namespace. They
210 A list of variable names to pull from the user's namespace. They
210 will come back as a dict with these names as keys and their
211 will come back as a dict with these names as keys and their
211 :func:`repr` as values.
212 :func:`repr` as values.
212
213
213 user_expressions : dict, optional
214 user_expressions : dict, optional
214 A dict with string keys and to pull from the user's
215 A dict with string keys and to pull from the user's
215 namespace. They will come back as a dict with these names as keys
216 namespace. They will come back as a dict with these names as keys
216 and their :func:`repr` as values.
217 and their :func:`repr` as values.
217
218
218 Returns
219 Returns
219 -------
220 -------
220 The msg_id of the message sent.
221 The msg_id of the message sent.
221 """
222 """
222 if user_variables is None:
223 if user_variables is None:
223 user_variables = []
224 user_variables = []
224 if user_expressions is None:
225 if user_expressions is None:
225 user_expressions = {}
226 user_expressions = {}
226
227
227 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
228 if not isinstance(code, basestring):
229 if not isinstance(code, basestring):
229 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
230 validate_string_list(user_variables)
231 validate_string_list(user_variables)
231 validate_string_dict(user_expressions)
232 validate_string_dict(user_expressions)
232
233
233 # Create class for content/msg creation. Related to, but possibly
234 # Create class for content/msg creation. Related to, but possibly
234 # not in Session.
235 # not in Session.
235 content = dict(code=code, silent=silent,
236 content = dict(code=code, silent=silent,
236 user_variables=user_variables,
237 user_variables=user_variables,
237 user_expressions=user_expressions)
238 user_expressions=user_expressions)
238 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
239 self._queue_request(msg)
240 self._queue_request(msg)
240 return msg['header']['msg_id']
241 return msg['header']['msg_id']
241
242
242 def complete(self, text, line, cursor_pos, block=None):
243 def complete(self, text, line, cursor_pos, block=None):
243 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
244
245
245 Parameters
246 Parameters
246 ----------
247 ----------
247 text : str
248 text : str
248 The text to complete.
249 The text to complete.
249 line : str
250 line : str
250 The full line of text that is the surrounding context for the
251 The full line of text that is the surrounding context for the
251 text to complete.
252 text to complete.
252 cursor_pos : int
253 cursor_pos : int
253 The position of the cursor in the line where the completion was
254 The position of the cursor in the line where the completion was
254 requested.
255 requested.
255 block : str, optional
256 block : str, optional
256 The full block of code in which the completion is being requested.
257 The full block of code in which the completion is being requested.
257
258
258 Returns
259 Returns
259 -------
260 -------
260 The msg_id of the message sent.
261 The msg_id of the message sent.
261 """
262 """
262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 msg = self.session.msg('complete_request', content)
264 msg = self.session.msg('complete_request', content)
264 self._queue_request(msg)
265 self._queue_request(msg)
265 return msg['header']['msg_id']
266 return msg['header']['msg_id']
266
267
267 def object_info(self, oname):
268 def object_info(self, oname):
268 """Get metadata information about an object.
269 """Get metadata information about an object.
269
270
270 Parameters
271 Parameters
271 ----------
272 ----------
272 oname : str
273 oname : str
273 A string specifying the object name.
274 A string specifying the object name.
274
275
275 Returns
276 Returns
276 -------
277 -------
277 The msg_id of the message sent.
278 The msg_id of the message sent.
278 """
279 """
279 content = dict(oname=oname)
280 content = dict(oname=oname)
280 msg = self.session.msg('object_info_request', content)
281 msg = self.session.msg('object_info_request', content)
281 self._queue_request(msg)
282 self._queue_request(msg)
282 return msg['header']['msg_id']
283 return msg['header']['msg_id']
283
284
284 def history(self, index=None, raw=False, output=True):
285 def history(self, index=None, raw=False, output=True):
285 """Get the history list.
286 """Get the history list.
286
287
287 Parameters
288 Parameters
288 ----------
289 ----------
289 index : n or (n1, n2) or None
290 index : n or (n1, n2) or None
290 If n, then the last entries. If a tuple, then all in
291 If n, then the last entries. If a tuple, then all in
291 range(n1, n2). If None, then all entries. Raises IndexError if
292 range(n1, n2). If None, then all entries. Raises IndexError if
292 the format of index is incorrect.
293 the format of index is incorrect.
293 raw : bool
294 raw : bool
294 If True, return the raw input.
295 If True, return the raw input.
295 output : bool
296 output : bool
296 If True, then return the output as well.
297 If True, then return the output as well.
297
298
298 Returns
299 Returns
299 -------
300 -------
300 The msg_id of the message sent.
301 The msg_id of the message sent.
301 """
302 """
302 content = dict(index=index, raw=raw, output=output)
303 content = dict(index=index, raw=raw, output=output)
303 msg = self.session.msg('history_request', content)
304 msg = self.session.msg('history_request', content)
304 self._queue_request(msg)
305 self._queue_request(msg)
305 return msg['header']['msg_id']
306 return msg['header']['msg_id']
306
307
307 def shutdown(self):
308 def shutdown(self):
308 """Request an immediate kernel shutdown.
309 """Request an immediate kernel shutdown.
309
310
310 Upon receipt of the (empty) reply, client code can safely assume that
311 Upon receipt of the (empty) reply, client code can safely assume that
311 the kernel has shut down and it's safe to forcefully terminate it if
312 the kernel has shut down and it's safe to forcefully terminate it if
312 it's still alive.
313 it's still alive.
313
314
314 The kernel will send the reply via a function registered with Python's
315 The kernel will send the reply via a function registered with Python's
315 atexit module, ensuring it's truly done as the kernel is done with all
316 atexit module, ensuring it's truly done as the kernel is done with all
316 normal operation.
317 normal operation.
317 """
318 """
318 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 # this should probably be done that way, but for now this will do.
320 # this should probably be done that way, but for now this will do.
320 msg = self.session.msg('shutdown_request', {})
321 msg = self.session.msg('shutdown_request', {})
321 self._queue_request(msg)
322 self._queue_request(msg)
322 return msg['header']['msg_id']
323 return msg['header']['msg_id']
323
324
324 def _handle_events(self, socket, events):
325 def _handle_events(self, socket, events):
325 if events & POLLERR:
326 if events & POLLERR:
326 self._handle_err()
327 self._handle_err()
327 if events & POLLOUT:
328 if events & POLLOUT:
328 self._handle_send()
329 self._handle_send()
329 if events & POLLIN:
330 if events & POLLIN:
330 self._handle_recv()
331 self._handle_recv()
331
332
332 def _handle_recv(self):
333 def _handle_recv(self):
333 msg = self.socket.recv_json()
334 msg = self.socket.recv_json()
334 self.call_handlers(msg)
335 self.call_handlers(msg)
335
336
336 def _handle_send(self):
337 def _handle_send(self):
337 try:
338 try:
338 msg = self.command_queue.get(False)
339 msg = self.command_queue.get(False)
339 except Empty:
340 except Empty:
340 pass
341 pass
341 else:
342 else:
342 self.socket.send_json(msg)
343 self.socket.send_json(msg)
343 if self.command_queue.empty():
344 if self.command_queue.empty():
344 self.drop_io_state(POLLOUT)
345 self.drop_io_state(POLLOUT)
345
346
346 def _handle_err(self):
347 def _handle_err(self):
347 # We don't want to let this go silently, so eventually we should log.
348 # We don't want to let this go silently, so eventually we should log.
348 raise zmq.ZMQError()
349 raise zmq.ZMQError()
349
350
350 def _queue_request(self, msg):
351 def _queue_request(self, msg):
351 self.command_queue.put(msg)
352 self.command_queue.put(msg)
352 self.add_io_state(POLLOUT)
353 self.add_io_state(POLLOUT)
353
354
354
355
355 class SubSocketChannel(ZmqSocketChannel):
356 class SubSocketChannel(ZmqSocketChannel):
356 """The SUB channel which listens for messages that the kernel publishes.
357 """The SUB channel which listens for messages that the kernel publishes.
357 """
358 """
358
359
359 def __init__(self, context, session, address):
360 def __init__(self, context, session, address):
360 super(SubSocketChannel, self).__init__(context, session, address)
361 super(SubSocketChannel, self).__init__(context, session, address)
361
362
362 def run(self):
363 def run(self):
363 """The thread's main activity. Call start() instead."""
364 """The thread's main activity. Call start() instead."""
364 self.socket = self.context.socket(zmq.SUB)
365 self.socket = self.context.socket(zmq.SUB)
365 self.socket.setsockopt(zmq.SUBSCRIBE,'')
366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
366 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
367 self.socket.connect('tcp://%s:%i' % self.address)
368 self.socket.connect('tcp://%s:%i' % self.address)
368 self.ioloop = ioloop.IOLoop()
369 self.ioloop = ioloop.IOLoop()
369 self.iostate = POLLIN|POLLERR
370 self.iostate = POLLIN|POLLERR
370 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.ioloop.add_handler(self.socket, self._handle_events,
371 self.iostate)
372 self.iostate)
372 self.ioloop.start()
373 self.ioloop.start()
373
374
374 def stop(self):
375 def stop(self):
375 self.ioloop.stop()
376 self.ioloop.stop()
376 super(SubSocketChannel, self).stop()
377 super(SubSocketChannel, self).stop()
377
378
378 def call_handlers(self, msg):
379 def call_handlers(self, msg):
379 """This method is called in the ioloop thread when a message arrives.
380 """This method is called in the ioloop thread when a message arrives.
380
381
381 Subclasses should override this method to handle incoming messages.
382 Subclasses should override this method to handle incoming messages.
382 It is important to remember that this method is called in the thread
383 It is important to remember that this method is called in the thread
383 so that some logic must be done to ensure that the application leve
384 so that some logic must be done to ensure that the application leve
384 handlers are called in the application thread.
385 handlers are called in the application thread.
385 """
386 """
386 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387
388
388 def flush(self, timeout=1.0):
389 def flush(self, timeout=1.0):
389 """Immediately processes all pending messages on the SUB channel.
390 """Immediately processes all pending messages on the SUB channel.
390
391
391 Callers should use this method to ensure that :method:`call_handlers`
392 Callers should use this method to ensure that :method:`call_handlers`
392 has been called for all messages that have been received on the
393 has been called for all messages that have been received on the
393 0MQ SUB socket of this channel.
394 0MQ SUB socket of this channel.
394
395
395 This method is thread safe.
396 This method is thread safe.
396
397
397 Parameters
398 Parameters
398 ----------
399 ----------
399 timeout : float, optional
400 timeout : float, optional
400 The maximum amount of time to spend flushing, in seconds. The
401 The maximum amount of time to spend flushing, in seconds. The
401 default is one second.
402 default is one second.
402 """
403 """
403 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 # gets to perform at least one full poll.
405 # gets to perform at least one full poll.
405 stop_time = time.time() + timeout
406 stop_time = time.time() + timeout
406 for i in xrange(2):
407 for i in xrange(2):
407 self._flushed = False
408 self._flushed = False
408 self.ioloop.add_callback(self._flush)
409 self.ioloop.add_callback(self._flush)
409 while not self._flushed and time.time() < stop_time:
410 while not self._flushed and time.time() < stop_time:
410 time.sleep(0.01)
411 time.sleep(0.01)
411
412
412 def _handle_events(self, socket, events):
413 def _handle_events(self, socket, events):
413 # Turn on and off POLLOUT depending on if we have made a request
414 # Turn on and off POLLOUT depending on if we have made a request
414 if events & POLLERR:
415 if events & POLLERR:
415 self._handle_err()
416 self._handle_err()
416 if events & POLLIN:
417 if events & POLLIN:
417 self._handle_recv()
418 self._handle_recv()
418
419
419 def _handle_err(self):
420 def _handle_err(self):
420 # We don't want to let this go silently, so eventually we should log.
421 # We don't want to let this go silently, so eventually we should log.
421 raise zmq.ZMQError()
422 raise zmq.ZMQError()
422
423
423 def _handle_recv(self):
424 def _handle_recv(self):
424 # Get all of the messages we can
425 # Get all of the messages we can
425 while True:
426 while True:
426 try:
427 try:
427 msg = self.socket.recv_json(zmq.NOBLOCK)
428 msg = self.socket.recv_json(zmq.NOBLOCK)
428 except zmq.ZMQError:
429 except zmq.ZMQError:
429 # Check the errno?
430 # Check the errno?
430 # Will this trigger POLLERR?
431 # Will this trigger POLLERR?
431 break
432 break
432 else:
433 else:
433 self.call_handlers(msg)
434 self.call_handlers(msg)
434
435
435 def _flush(self):
436 def _flush(self):
436 """Callback for :method:`self.flush`."""
437 """Callback for :method:`self.flush`."""
437 self._flushed = True
438 self._flushed = True
438
439
439
440
440 class RepSocketChannel(ZmqSocketChannel):
441 class RepSocketChannel(ZmqSocketChannel):
441 """A reply channel to handle raw_input requests that the kernel makes."""
442 """A reply channel to handle raw_input requests that the kernel makes."""
442
443
443 msg_queue = None
444 msg_queue = None
444
445
445 def __init__(self, context, session, address):
446 def __init__(self, context, session, address):
446 self.msg_queue = Queue()
447 self.msg_queue = Queue()
447 super(RepSocketChannel, self).__init__(context, session, address)
448 super(RepSocketChannel, self).__init__(context, session, address)
448
449
449 def run(self):
450 def run(self):
450 """The thread's main activity. Call start() instead."""
451 """The thread's main activity. Call start() instead."""
451 self.socket = self.context.socket(zmq.XREQ)
452 self.socket = self.context.socket(zmq.XREQ)
452 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
453 self.socket.connect('tcp://%s:%i' % self.address)
454 self.socket.connect('tcp://%s:%i' % self.address)
454 self.ioloop = ioloop.IOLoop()
455 self.ioloop = ioloop.IOLoop()
455 self.iostate = POLLERR|POLLIN
456 self.iostate = POLLERR|POLLIN
456 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.ioloop.add_handler(self.socket, self._handle_events,
457 self.iostate)
458 self.iostate)
458 self.ioloop.start()
459 self.ioloop.start()
459
460
460 def stop(self):
461 def stop(self):
461 self.ioloop.stop()
462 self.ioloop.stop()
462 super(RepSocketChannel, self).stop()
463 super(RepSocketChannel, self).stop()
463
464
464 def call_handlers(self, msg):
465 def call_handlers(self, msg):
465 """This method is called in the ioloop thread when a message arrives.
466 """This method is called in the ioloop thread when a message arrives.
466
467
467 Subclasses should override this method to handle incoming messages.
468 Subclasses should override this method to handle incoming messages.
468 It is important to remember that this method is called in the thread
469 It is important to remember that this method is called in the thread
469 so that some logic must be done to ensure that the application leve
470 so that some logic must be done to ensure that the application leve
470 handlers are called in the application thread.
471 handlers are called in the application thread.
471 """
472 """
472 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 raise NotImplementedError('call_handlers must be defined in a subclass.')
473
474
474 def input(self, string):
475 def input(self, string):
475 """Send a string of raw input to the kernel."""
476 """Send a string of raw input to the kernel."""
476 content = dict(value=string)
477 content = dict(value=string)
477 msg = self.session.msg('input_reply', content)
478 msg = self.session.msg('input_reply', content)
478 self._queue_reply(msg)
479 self._queue_reply(msg)
479
480
480 def _handle_events(self, socket, events):
481 def _handle_events(self, socket, events):
481 if events & POLLERR:
482 if events & POLLERR:
482 self._handle_err()
483 self._handle_err()
483 if events & POLLOUT:
484 if events & POLLOUT:
484 self._handle_send()
485 self._handle_send()
485 if events & POLLIN:
486 if events & POLLIN:
486 self._handle_recv()
487 self._handle_recv()
487
488
488 def _handle_recv(self):
489 def _handle_recv(self):
489 msg = self.socket.recv_json()
490 msg = self.socket.recv_json()
490 self.call_handlers(msg)
491 self.call_handlers(msg)
491
492
492 def _handle_send(self):
493 def _handle_send(self):
493 try:
494 try:
494 msg = self.msg_queue.get(False)
495 msg = self.msg_queue.get(False)
495 except Empty:
496 except Empty:
496 pass
497 pass
497 else:
498 else:
498 self.socket.send_json(msg)
499 self.socket.send_json(msg)
499 if self.msg_queue.empty():
500 if self.msg_queue.empty():
500 self.drop_io_state(POLLOUT)
501 self.drop_io_state(POLLOUT)
501
502
502 def _handle_err(self):
503 def _handle_err(self):
503 # We don't want to let this go silently, so eventually we should log.
504 # We don't want to let this go silently, so eventually we should log.
504 raise zmq.ZMQError()
505 raise zmq.ZMQError()
505
506
506 def _queue_reply(self, msg):
507 def _queue_reply(self, msg):
507 self.msg_queue.put(msg)
508 self.msg_queue.put(msg)
508 self.add_io_state(POLLOUT)
509 self.add_io_state(POLLOUT)
509
510
510
511
511 class HBSocketChannel(ZmqSocketChannel):
512 class HBSocketChannel(ZmqSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat."""
513 """The heartbeat channel which monitors the kernel heartbeat."""
513
514
514 time_to_dead = 3.0
515 time_to_dead = 3.0
515 socket = None
516 socket = None
516 poller = None
517 poller = None
517
518
518 def __init__(self, context, session, address):
519 def __init__(self, context, session, address):
519 super(HBSocketChannel, self).__init__(context, session, address)
520 super(HBSocketChannel, self).__init__(context, session, address)
520 self._running = False
521 self._running = False
521
522
522 def _create_socket(self):
523 def _create_socket(self):
523 self.socket = self.context.socket(zmq.REQ)
524 self.socket = self.context.socket(zmq.REQ)
524 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
525 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
525 self.socket.connect('tcp://%s:%i' % self.address)
526 self.socket.connect('tcp://%s:%i' % self.address)
526 self.poller = zmq.Poller()
527 self.poller = zmq.Poller()
527 self.poller.register(self.socket, zmq.POLLIN)
528 self.poller.register(self.socket, zmq.POLLIN)
528
529
529 def run(self):
530 def run(self):
530 """The thread's main activity. Call start() instead."""
531 """The thread's main activity. Call start() instead."""
531 self._create_socket()
532 self._create_socket()
532 self._running = True
533 self._running = True
533 # Wait 2 seconds for the kernel to come up and the sockets to auto
534 # Wait 2 seconds for the kernel to come up and the sockets to auto
534 # connect. If we don't we will see the kernel as dead. Also, before
535 # connect. If we don't we will see the kernel as dead. Also, before
535 # the sockets are connected, the poller.poll line below is returning
536 # the sockets are connected, the poller.poll line below is returning
536 # too fast. This avoids that because the polling doesn't start until
537 # too fast. This avoids that because the polling doesn't start until
537 # after the sockets are connected.
538 # after the sockets are connected.
538 time.sleep(2.0)
539 time.sleep(2.0)
539 while self._running:
540 while self._running:
540 since_last_heartbeat = 0.0
541 since_last_heartbeat = 0.0
541 request_time = time.time()
542 request_time = time.time()
542 try:
543 try:
543 #io.rprint('Ping from HB channel') # dbg
544 #io.rprint('Ping from HB channel') # dbg
544 self.socket.send_json('ping')
545 self.socket.send_json('ping')
545 except zmq.ZMQError, e:
546 except zmq.ZMQError, e:
546 #io.rprint('*** HB Error:', e) # dbg
547 #io.rprint('*** HB Error:', e) # dbg
547 if e.errno == zmq.EFSM:
548 if e.errno == zmq.EFSM:
548 #io.rprint('sleep...', self.time_to_dead) # dbg
549 #io.rprint('sleep...', self.time_to_dead) # dbg
549 time.sleep(self.time_to_dead)
550 time.sleep(self.time_to_dead)
550 self._create_socket()
551 self._create_socket()
551 else:
552 else:
552 raise
553 raise
553 else:
554 else:
554 while True:
555 while True:
555 try:
556 try:
556 self.socket.recv_json(zmq.NOBLOCK)
557 self.socket.recv_json(zmq.NOBLOCK)
557 except zmq.ZMQError, e:
558 except zmq.ZMQError, e:
558 #io.rprint('*** HB Error 2:', e) # dbg
559 #io.rprint('*** HB Error 2:', e) # dbg
559 if e.errno == zmq.EAGAIN:
560 if e.errno == zmq.EAGAIN:
560 before_poll = time.time()
561 before_poll = time.time()
561 until_dead = self.time_to_dead - (before_poll -
562 until_dead = self.time_to_dead - (before_poll -
562 request_time)
563 request_time)
563
564
564 # When the return value of poll() is an empty list,
565 # When the return value of poll() is an empty list,
565 # that is when things have gone wrong (zeromq bug).
566 # that is when things have gone wrong (zeromq bug).
566 # As long as it is not an empty list, poll is
567 # As long as it is not an empty list, poll is
567 # working correctly even if it returns quickly.
568 # working correctly even if it returns quickly.
568 # Note: poll timeout is in milliseconds.
569 # Note: poll timeout is in milliseconds.
569 self.poller.poll(1000*until_dead)
570 self.poller.poll(1000*until_dead)
570
571
571 since_last_heartbeat = time.time() - request_time
572 since_last_heartbeat = time.time() - request_time
572 if since_last_heartbeat > self.time_to_dead:
573 if since_last_heartbeat > self.time_to_dead:
573 self.call_handlers(since_last_heartbeat)
574 self.call_handlers(since_last_heartbeat)
574 break
575 break
575 else:
576 else:
576 # FIXME: We should probably log this instead.
577 # FIXME: We should probably log this instead.
577 raise
578 raise
578 else:
579 else:
579 until_dead = self.time_to_dead - (time.time() -
580 until_dead = self.time_to_dead - (time.time() -
580 request_time)
581 request_time)
581 if until_dead > 0.0:
582 if until_dead > 0.0:
582 #io.rprint('sleep...', self.time_to_dead) # dbg
583 #io.rprint('sleep...', self.time_to_dead) # dbg
583 time.sleep(until_dead)
584 time.sleep(until_dead)
584 break
585 break
585
586
586 def stop(self):
587 def stop(self):
587 self._running = False
588 self._running = False
588 super(HBSocketChannel, self).stop()
589 super(HBSocketChannel, self).stop()
589
590
590 def call_handlers(self, since_last_heartbeat):
591 def call_handlers(self, since_last_heartbeat):
591 """This method is called in the ioloop thread when a message arrives.
592 """This method is called in the ioloop thread when a message arrives.
592
593
593 Subclasses should override this method to handle incoming messages.
594 Subclasses should override this method to handle incoming messages.
594 It is important to remember that this method is called in the thread
595 It is important to remember that this method is called in the thread
595 so that some logic must be done to ensure that the application leve
596 so that some logic must be done to ensure that the application leve
596 handlers are called in the application thread.
597 handlers are called in the application thread.
597 """
598 """
598 raise NotImplementedError('call_handlers must be defined in a subclass.')
599 raise NotImplementedError('call_handlers must be defined in a subclass.')
599
600
600
601
601 #-----------------------------------------------------------------------------
602 #-----------------------------------------------------------------------------
602 # Main kernel manager class
603 # Main kernel manager class
603 #-----------------------------------------------------------------------------
604 #-----------------------------------------------------------------------------
604
605
605 class KernelManager(HasTraits):
606 class KernelManager(HasTraits):
606 """ Manages a kernel for a frontend.
607 """ Manages a kernel for a frontend.
607
608
608 The SUB channel is for the frontend to receive messages published by the
609 The SUB channel is for the frontend to receive messages published by the
609 kernel.
610 kernel.
610
611
611 The REQ channel is for the frontend to make requests of the kernel.
612 The REQ channel is for the frontend to make requests of the kernel.
612
613
613 The REP channel is for the kernel to request stdin (raw_input) from the
614 The REP channel is for the kernel to request stdin (raw_input) from the
614 frontend.
615 frontend.
615 """
616 """
616 # The PyZMQ Context to use for communication with the kernel.
617 # The PyZMQ Context to use for communication with the kernel.
617 context = Instance(zmq.Context,(),{})
618 context = Instance(zmq.Context,(),{})
618
619
619 # The Session to use for communication with the kernel.
620 # The Session to use for communication with the kernel.
620 session = Instance(Session,(),{})
621 session = Instance(Session,(),{})
621
622
622 # The kernel process with which the KernelManager is communicating.
623 # The kernel process with which the KernelManager is communicating.
623 kernel = Instance(Popen)
624 kernel = Instance(Popen)
624
625
625 # The addresses for the communication channels.
626 # The addresses for the communication channels.
626 xreq_address = TCPAddress((LOCALHOST, 0))
627 xreq_address = TCPAddress((LOCALHOST, 0))
627 sub_address = TCPAddress((LOCALHOST, 0))
628 sub_address = TCPAddress((LOCALHOST, 0))
628 rep_address = TCPAddress((LOCALHOST, 0))
629 rep_address = TCPAddress((LOCALHOST, 0))
629 hb_address = TCPAddress((LOCALHOST, 0))
630 hb_address = TCPAddress((LOCALHOST, 0))
630
631
631 # The classes to use for the various channels.
632 # The classes to use for the various channels.
632 xreq_channel_class = Type(XReqSocketChannel)
633 xreq_channel_class = Type(XReqSocketChannel)
633 sub_channel_class = Type(SubSocketChannel)
634 sub_channel_class = Type(SubSocketChannel)
634 rep_channel_class = Type(RepSocketChannel)
635 rep_channel_class = Type(RepSocketChannel)
635 hb_channel_class = Type(HBSocketChannel)
636 hb_channel_class = Type(HBSocketChannel)
636
637
637 # Protected traits.
638 # Protected traits.
638 _launch_args = Any
639 _launch_args = Any
639 _xreq_channel = Any
640 _xreq_channel = Any
640 _sub_channel = Any
641 _sub_channel = Any
641 _rep_channel = Any
642 _rep_channel = Any
642 _hb_channel = Any
643 _hb_channel = Any
643
644
644 #--------------------------------------------------------------------------
645 #--------------------------------------------------------------------------
645 # Channel management methods:
646 # Channel management methods:
646 #--------------------------------------------------------------------------
647 #--------------------------------------------------------------------------
647
648
648 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
649 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
649 """Starts the channels for this kernel.
650 """Starts the channels for this kernel.
650
651
651 This will create the channels if they do not exist and then start
652 This will create the channels if they do not exist and then start
652 them. If port numbers of 0 are being used (random ports) then you
653 them. If port numbers of 0 are being used (random ports) then you
653 must first call :method:`start_kernel`. If the channels have been
654 must first call :method:`start_kernel`. If the channels have been
654 stopped and you call this, :class:`RuntimeError` will be raised.
655 stopped and you call this, :class:`RuntimeError` will be raised.
655 """
656 """
656 if xreq:
657 if xreq:
657 self.xreq_channel.start()
658 self.xreq_channel.start()
658 if sub:
659 if sub:
659 self.sub_channel.start()
660 self.sub_channel.start()
660 if rep:
661 if rep:
661 self.rep_channel.start()
662 self.rep_channel.start()
662 if hb:
663 if hb:
663 self.hb_channel.start()
664 self.hb_channel.start()
664
665
665 def stop_channels(self):
666 def stop_channels(self):
666 """Stops all the running channels for this kernel.
667 """Stops all the running channels for this kernel.
667 """
668 """
668 if self.xreq_channel.is_alive():
669 if self.xreq_channel.is_alive():
669 self.xreq_channel.stop()
670 self.xreq_channel.stop()
670 if self.sub_channel.is_alive():
671 if self.sub_channel.is_alive():
671 self.sub_channel.stop()
672 self.sub_channel.stop()
672 if self.rep_channel.is_alive():
673 if self.rep_channel.is_alive():
673 self.rep_channel.stop()
674 self.rep_channel.stop()
674 if self.hb_channel.is_alive():
675 if self.hb_channel.is_alive():
675 self.hb_channel.stop()
676 self.hb_channel.stop()
676
677
677 @property
678 @property
678 def channels_running(self):
679 def channels_running(self):
679 """Are any of the channels created and running?"""
680 """Are any of the channels created and running?"""
680 return self.xreq_channel.is_alive() \
681 return self.xreq_channel.is_alive() \
681 or self.sub_channel.is_alive() \
682 or self.sub_channel.is_alive() \
682 or self.rep_channel.is_alive() \
683 or self.rep_channel.is_alive() \
683 or self.hb_channel.is_alive()
684 or self.hb_channel.is_alive()
684
685
685 #--------------------------------------------------------------------------
686 #--------------------------------------------------------------------------
686 # Kernel process management methods:
687 # Kernel process management methods:
687 #--------------------------------------------------------------------------
688 #--------------------------------------------------------------------------
688
689
689 def start_kernel(self, **kw):
690 def start_kernel(self, **kw):
690 """Starts a kernel process and configures the manager to use it.
691 """Starts a kernel process and configures the manager to use it.
691
692
692 If random ports (port=0) are being used, this method must be called
693 If random ports (port=0) are being used, this method must be called
693 before the channels are created.
694 before the channels are created.
694
695
695 Parameters:
696 Parameters:
696 -----------
697 -----------
697 ipython : bool, optional (default True)
698 ipython : bool, optional (default True)
698 Whether to use an IPython kernel instead of a plain Python kernel.
699 Whether to use an IPython kernel instead of a plain Python kernel.
699 """
700 """
700 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
701 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
701 self.rep_address, self.hb_address
702 self.rep_address, self.hb_address
702 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
703 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
703 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
704 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
704 raise RuntimeError("Can only launch a kernel on localhost."
705 raise RuntimeError("Can only launch a kernel on localhost."
705 "Make sure that the '*_address' attributes are "
706 "Make sure that the '*_address' attributes are "
706 "configured properly.")
707 "configured properly.")
707
708
708 self._launch_args = kw.copy()
709 self._launch_args = kw.copy()
709 if kw.pop('ipython', True):
710 if kw.pop('ipython', True):
710 from ipkernel import launch_kernel
711 from ipkernel import launch_kernel
711 else:
712 else:
712 from pykernel import launch_kernel
713 from pykernel import launch_kernel
713 self.kernel, xrep, pub, req, hb = launch_kernel(
714 self.kernel, xrep, pub, req, hb = launch_kernel(
714 xrep_port=xreq[1], pub_port=sub[1],
715 xrep_port=xreq[1], pub_port=sub[1],
715 req_port=rep[1], hb_port=hb[1], **kw)
716 req_port=rep[1], hb_port=hb[1], **kw)
716 self.xreq_address = (LOCALHOST, xrep)
717 self.xreq_address = (LOCALHOST, xrep)
717 self.sub_address = (LOCALHOST, pub)
718 self.sub_address = (LOCALHOST, pub)
718 self.rep_address = (LOCALHOST, req)
719 self.rep_address = (LOCALHOST, req)
719 self.hb_address = (LOCALHOST, hb)
720 self.hb_address = (LOCALHOST, hb)
720
721
721 def shutdown_kernel(self):
722 def shutdown_kernel(self):
722 """ Attempts to the stop the kernel process cleanly. If the kernel
723 """ Attempts to the stop the kernel process cleanly. If the kernel
723 cannot be stopped, it is killed, if possible.
724 cannot be stopped, it is killed, if possible.
724 """
725 """
726 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
727 if sys.platform == 'win32':
728 self.kill_kernel()
729 return
730
725 self.xreq_channel.shutdown()
731 self.xreq_channel.shutdown()
726 # Don't send any additional kernel kill messages immediately, to give
732 # Don't send any additional kernel kill messages immediately, to give
727 # the kernel a chance to properly execute shutdown actions. Wait for at
733 # the kernel a chance to properly execute shutdown actions. Wait for at
728 # most 1s, checking every 0.1s.
734 # most 1s, checking every 0.1s.
729 for i in range(10):
735 for i in range(10):
730 if self.is_alive:
736 if self.is_alive:
731 time.sleep(0.1)
737 time.sleep(0.1)
732 else:
738 else:
733 break
739 break
734 else:
740 else:
735 # OK, we've waited long enough.
741 # OK, we've waited long enough.
736 if self.has_kernel:
742 if self.has_kernel:
737 self.kill_kernel()
743 self.kill_kernel()
738
744
739 def restart_kernel(self, instant_death=False):
745 def restart_kernel(self, instant_death=False):
740 """Restarts a kernel with the same arguments that were used to launch
746 """Restarts a kernel with the same arguments that were used to launch
741 it. If the old kernel was launched with random ports, the same ports
747 it. If the old kernel was launched with random ports, the same ports
742 will be used for the new kernel.
748 will be used for the new kernel.
743
749
744 Parameters
750 Parameters
745 ----------
751 ----------
746 instant_death : bool, optional
752 instant_death : bool, optional
747 If True, the kernel is forcefully restarted *immediately*, without
753 If True, the kernel is forcefully restarted *immediately*, without
748 having a chance to do any cleanup action. Otherwise the kernel is
754 having a chance to do any cleanup action. Otherwise the kernel is
749 given 1s to clean up before a forceful restart is issued.
755 given 1s to clean up before a forceful restart is issued.
750
756
751 In all cases the kernel is restarted, the only difference is whether
757 In all cases the kernel is restarted, the only difference is whether
752 it is given a chance to perform a clean shutdown or not.
758 it is given a chance to perform a clean shutdown or not.
753 """
759 """
754 if self._launch_args is None:
760 if self._launch_args is None:
755 raise RuntimeError("Cannot restart the kernel. "
761 raise RuntimeError("Cannot restart the kernel. "
756 "No previous call to 'start_kernel'.")
762 "No previous call to 'start_kernel'.")
757 else:
763 else:
758 if self.has_kernel:
764 if self.has_kernel:
759 if instant_death:
765 if instant_death:
760 self.kill_kernel()
766 self.kill_kernel()
761 else:
767 else:
762 self.shutdown_kernel()
768 self.shutdown_kernel()
763 self.start_kernel(**self._launch_args)
769 self.start_kernel(**self._launch_args)
764
770
771 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
772 # unless there is some delay here.
773 if sys.platform == 'win32':
774 time.sleep(0.2)
775
765 @property
776 @property
766 def has_kernel(self):
777 def has_kernel(self):
767 """Returns whether a kernel process has been specified for the kernel
778 """Returns whether a kernel process has been specified for the kernel
768 manager.
779 manager.
769 """
780 """
770 return self.kernel is not None
781 return self.kernel is not None
771
782
772 def kill_kernel(self):
783 def kill_kernel(self):
773 """ Kill the running kernel. """
784 """ Kill the running kernel. """
774 if self.kernel is not None:
785 if self.kernel is not None:
775 self.kernel.kill()
786 self.kernel.kill()
776 self.kernel = None
787 self.kernel = None
777 else:
788 else:
778 raise RuntimeError("Cannot kill kernel. No kernel is running!")
789 raise RuntimeError("Cannot kill kernel. No kernel is running!")
779
790
780 def signal_kernel(self, signum):
791 def signal_kernel(self, signum):
781 """ Sends a signal to the kernel. """
792 """ Sends a signal to the kernel. """
782 if self.kernel is not None:
793 if self.kernel is not None:
783 self.kernel.send_signal(signum)
794 self.kernel.send_signal(signum)
784 else:
795 else:
785 raise RuntimeError("Cannot signal kernel. No kernel is running!")
796 raise RuntimeError("Cannot signal kernel. No kernel is running!")
786
797
787 @property
798 @property
788 def is_alive(self):
799 def is_alive(self):
789 """Is the kernel process still running?"""
800 """Is the kernel process still running?"""
790 # FIXME: not using a heartbeat means this method is broken for any
801 # FIXME: not using a heartbeat means this method is broken for any
791 # remote kernel, it's only capable of handling local kernels.
802 # remote kernel, it's only capable of handling local kernels.
792 if self.kernel is not None:
803 if self.kernel is not None:
793 if self.kernel.poll() is None:
804 if self.kernel.poll() is None:
794 return True
805 return True
795 else:
806 else:
796 return False
807 return False
797 else:
808 else:
798 # We didn't start the kernel with this KernelManager so we don't
809 # We didn't start the kernel with this KernelManager so we don't
799 # know if it is running. We should use a heartbeat for this case.
810 # know if it is running. We should use a heartbeat for this case.
800 return True
811 return True
801
812
802 #--------------------------------------------------------------------------
813 #--------------------------------------------------------------------------
803 # Channels used for communication with the kernel:
814 # Channels used for communication with the kernel:
804 #--------------------------------------------------------------------------
815 #--------------------------------------------------------------------------
805
816
806 @property
817 @property
807 def xreq_channel(self):
818 def xreq_channel(self):
808 """Get the REQ socket channel object to make requests of the kernel."""
819 """Get the REQ socket channel object to make requests of the kernel."""
809 if self._xreq_channel is None:
820 if self._xreq_channel is None:
810 self._xreq_channel = self.xreq_channel_class(self.context,
821 self._xreq_channel = self.xreq_channel_class(self.context,
811 self.session,
822 self.session,
812 self.xreq_address)
823 self.xreq_address)
813 return self._xreq_channel
824 return self._xreq_channel
814
825
815 @property
826 @property
816 def sub_channel(self):
827 def sub_channel(self):
817 """Get the SUB socket channel object."""
828 """Get the SUB socket channel object."""
818 if self._sub_channel is None:
829 if self._sub_channel is None:
819 self._sub_channel = self.sub_channel_class(self.context,
830 self._sub_channel = self.sub_channel_class(self.context,
820 self.session,
831 self.session,
821 self.sub_address)
832 self.sub_address)
822 return self._sub_channel
833 return self._sub_channel
823
834
824 @property
835 @property
825 def rep_channel(self):
836 def rep_channel(self):
826 """Get the REP socket channel object to handle stdin (raw_input)."""
837 """Get the REP socket channel object to handle stdin (raw_input)."""
827 if self._rep_channel is None:
838 if self._rep_channel is None:
828 self._rep_channel = self.rep_channel_class(self.context,
839 self._rep_channel = self.rep_channel_class(self.context,
829 self.session,
840 self.session,
830 self.rep_address)
841 self.rep_address)
831 return self._rep_channel
842 return self._rep_channel
832
843
833 @property
844 @property
834 def hb_channel(self):
845 def hb_channel(self):
835 """Get the REP socket channel object to handle stdin (raw_input)."""
846 """Get the REP socket channel object to handle stdin (raw_input)."""
836 if self._hb_channel is None:
847 if self._hb_channel is None:
837 self._hb_channel = self.hb_channel_class(self.context,
848 self._hb_channel = self.hb_channel_class(self.context,
838 self.session,
849 self.session,
839 self.hb_address)
850 self.hb_address)
840 return self._hb_channel
851 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now