##// END OF EJS Templates
Fixes to the heartbeat channel...
MinRK -
Show More
@@ -1,152 +1,154 b''
1 1 """Implement a fully blocking kernel manager.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010-2011 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 # Stdlib
18 18 from Queue import Queue, Empty
19 19 from threading import Event
20 20
21 21 # Our own
22 22 from IPython.utils import io
23 23 from IPython.utils.traitlets import Type
24 24
25 25 from .kernelmanager import (KernelManager, SubSocketChannel, HBSocketChannel,
26 26 ShellSocketChannel, StdInSocketChannel)
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Functions and classes
30 30 #-----------------------------------------------------------------------------
31 31
32 32 class BlockingSubSocketChannel(SubSocketChannel):
33 33
34 34 def __init__(self, context, session, address=None):
35 35 super(BlockingSubSocketChannel, self).__init__(context, session,
36 36 address)
37 37 self._in_queue = Queue()
38 38
39 39 def call_handlers(self, msg):
40 40 #io.rprint('[[Sub]]', msg) # dbg
41 41 self._in_queue.put(msg)
42 42
43 43 def msg_ready(self):
44 44 """Is there a message that has been received?"""
45 45 if self._in_queue.qsize() == 0:
46 46 return False
47 47 else:
48 48 return True
49 49
50 50 def get_msg(self, block=True, timeout=None):
51 51 """Get a message if there is one that is ready."""
52 52 if block and timeout is None:
53 53 # never use timeout=None, because get
54 54 # becomes uninterruptible
55 55 timeout = 1e6
56 56 return self._in_queue.get(block, timeout)
57 57
58 58 def get_msgs(self):
59 59 """Get all messages that are currently ready."""
60 60 msgs = []
61 61 while True:
62 62 try:
63 63 msgs.append(self.get_msg(block=False))
64 64 except Empty:
65 65 break
66 66 return msgs
67 67
68 68
69 69 class BlockingShellSocketChannel(ShellSocketChannel):
70 70
71 71 def __init__(self, context, session, address=None):
72 72 super(BlockingShellSocketChannel, self).__init__(context, session,
73 73 address)
74 74 self._in_queue = Queue()
75 75
76 76 def call_handlers(self, msg):
77 77 #io.rprint('[[Shell]]', msg) # dbg
78 78 self._in_queue.put(msg)
79 79
80 80 def msg_ready(self):
81 81 """Is there a message that has been received?"""
82 82 if self._in_queue.qsize() == 0:
83 83 return False
84 84 else:
85 85 return True
86 86
87 87 def get_msg(self, block=True, timeout=None):
88 88 """Get a message if there is one that is ready."""
89 89 if block and timeout is None:
90 90 # never use timeout=None, because get
91 91 # becomes uninterruptible
92 92 timeout = 1e6
93 93 return self._in_queue.get(block, timeout)
94 94
95 95 def get_msgs(self):
96 96 """Get all messages that are currently ready."""
97 97 msgs = []
98 98 while True:
99 99 try:
100 100 msgs.append(self.get_msg(block=False))
101 101 except Empty:
102 102 break
103 103 return msgs
104 104
105 105
106 106 class BlockingStdInSocketChannel(StdInSocketChannel):
107 107
108 108 def __init__(self, context, session, address=None):
109 109 super(BlockingStdInSocketChannel, self).__init__(context, session, address)
110 110 self._in_queue = Queue()
111 111
112 112 def call_handlers(self, msg):
113 113 #io.rprint('[[Rep]]', msg) # dbg
114 114 self._in_queue.put(msg)
115 115
116 116 def get_msg(self, block=True, timeout=None):
117 117 "Gets a message if there is one that is ready."
118 118 return self._in_queue.get(block, timeout)
119 119
120 120 def get_msgs(self):
121 121 """Get all messages that are currently ready."""
122 122 msgs = []
123 123 while True:
124 124 try:
125 125 msgs.append(self.get_msg(block=False))
126 126 except Empty:
127 127 break
128 128 return msgs
129 129
130 130 def msg_ready(self):
131 131 "Is there a message that has been received?"
132 132 return not self._in_queue.empty()
133 133
134 134
135 135 class BlockingHBSocketChannel(HBSocketChannel):
136 136
137 # This kernel needs rapid monitoring capabilities
138 time_to_dead = 0.2
137 # This kernel needs quicker monitoring, shorten to 1 sec.
138 # less than 0.5s is unreliable, and will get occasional
139 # false reports of missed beats.
140 time_to_dead = 1.
139 141
140 142 def call_handlers(self, since_last_heartbeat):
141 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
143 """pause beating on missed heartbeat"""
142 144 pass
143 145
144 146
145 147 class BlockingKernelManager(KernelManager):
146 148
147 149 # The classes to use for the various channels.
148 150 shell_channel_class = Type(BlockingShellSocketChannel)
149 151 sub_channel_class = Type(BlockingSubSocketChannel)
150 152 stdin_channel_class = Type(BlockingStdInSocketChannel)
151 153 hb_channel_class = Type(BlockingHBSocketChannel)
152 154
@@ -1,954 +1,962 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import errno
20 20 import json
21 21 from subprocess import Popen
22 22 import os
23 23 import signal
24 24 import sys
25 25 from threading import Thread
26 26 import time
27 27
28 28 # System library imports.
29 29 import zmq
30 30 from zmq.eventloop import ioloop, zmqstream
31 31
32 32 # Local imports.
33 33 from IPython.config.loader import Config
34 34 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
35 35 from IPython.utils.traitlets import (
36 36 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
37 37 )
38 38 from IPython.utils.py3compat import str_to_bytes
39 39 from IPython.zmq.entry_point import write_connection_file
40 40 from session import Session
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # Constants and exceptions
44 44 #-----------------------------------------------------------------------------
45 45
46 46 class InvalidPortNumber(Exception):
47 47 pass
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Utility functions
51 51 #-----------------------------------------------------------------------------
52 52
53 53 # some utilities to validate message structure, these might get moved elsewhere
54 54 # if they prove to have more generic utility
55 55
56 56 def validate_string_list(lst):
57 57 """Validate that the input is a list of strings.
58 58
59 59 Raises ValueError if not."""
60 60 if not isinstance(lst, list):
61 61 raise ValueError('input %r must be a list' % lst)
62 62 for x in lst:
63 63 if not isinstance(x, basestring):
64 64 raise ValueError('element %r in list must be a string' % x)
65 65
66 66
67 67 def validate_string_dict(dct):
68 68 """Validate that the input is a dict with string keys and values.
69 69
70 70 Raises ValueError if not."""
71 71 for k,v in dct.iteritems():
72 72 if not isinstance(k, basestring):
73 73 raise ValueError('key %r in dict must be a string' % k)
74 74 if not isinstance(v, basestring):
75 75 raise ValueError('value %r in dict must be a string' % v)
76 76
77 77
78 78 #-----------------------------------------------------------------------------
79 79 # ZMQ Socket Channel classes
80 80 #-----------------------------------------------------------------------------
81 81
82 82 class ZMQSocketChannel(Thread):
83 83 """The base class for the channels that use ZMQ sockets.
84 84 """
85 85 context = None
86 86 session = None
87 87 socket = None
88 88 ioloop = None
89 89 stream = None
90 90 _address = None
91 91
92 92 def __init__(self, context, session, address):
93 93 """Create a channel
94 94
95 95 Parameters
96 96 ----------
97 97 context : :class:`zmq.Context`
98 98 The ZMQ context to use.
99 99 session : :class:`session.Session`
100 100 The session to use.
101 101 address : tuple
102 102 Standard (ip, port) tuple that the kernel is listening on.
103 103 """
104 104 super(ZMQSocketChannel, self).__init__()
105 105 self.daemon = True
106 106
107 107 self.context = context
108 108 self.session = session
109 109 if address[1] == 0:
110 110 message = 'The port number for a channel cannot be 0.'
111 111 raise InvalidPortNumber(message)
112 112 self._address = address
113 113
114 114 def _run_loop(self):
115 115 """Run my loop, ignoring EINTR events in the poller"""
116 116 while True:
117 117 try:
118 118 self.ioloop.start()
119 119 except zmq.ZMQError as e:
120 120 if e.errno == errno.EINTR:
121 121 continue
122 122 else:
123 123 raise
124 124 else:
125 125 break
126 126
127 127 def stop(self):
128 128 """Stop the channel's activity.
129 129
130 130 This calls :method:`Thread.join` and returns when the thread
131 131 terminates. :class:`RuntimeError` will be raised if
132 132 :method:`self.start` is called again.
133 133 """
134 134 self.join()
135 135
136 136 @property
137 137 def address(self):
138 138 """Get the channel's address as an (ip, port) tuple.
139 139
140 140 By the default, the address is (localhost, 0), where 0 means a random
141 141 port.
142 142 """
143 143 return self._address
144 144
145 145 def _queue_send(self, msg):
146 146 """Queue a message to be sent from the IOLoop's thread.
147 147
148 148 Parameters
149 149 ----------
150 150 msg : message to send
151 151
152 152 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
153 153 thread control of the action.
154 154 """
155 155 def thread_send():
156 156 self.session.send(self.stream, msg)
157 157 self.ioloop.add_callback(thread_send)
158 158
159 159 def _handle_recv(self, msg):
160 160 """callback for stream.on_recv
161 161
162 162 unpacks message, and calls handlers with it.
163 163 """
164 164 ident,smsg = self.session.feed_identities(msg)
165 165 self.call_handlers(self.session.unserialize(smsg))
166 166
167 167
168 168
169 169 class ShellSocketChannel(ZMQSocketChannel):
170 170 """The XREQ channel for issues request/replies to the kernel.
171 171 """
172 172
173 173 command_queue = None
174 174 # flag for whether execute requests should be allowed to call raw_input:
175 175 allow_stdin = True
176 176
177 177 def __init__(self, context, session, address):
178 178 super(ShellSocketChannel, self).__init__(context, session, address)
179 179 self.ioloop = ioloop.IOLoop()
180 180
181 181 def run(self):
182 182 """The thread's main activity. Call start() instead."""
183 183 self.socket = self.context.socket(zmq.DEALER)
184 184 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
185 185 self.socket.connect('tcp://%s:%i' % self.address)
186 186 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
187 187 self.stream.on_recv(self._handle_recv)
188 188 self._run_loop()
189 189
190 190 def stop(self):
191 191 self.ioloop.stop()
192 192 super(ShellSocketChannel, self).stop()
193 193
194 194 def call_handlers(self, msg):
195 195 """This method is called in the ioloop thread when a message arrives.
196 196
197 197 Subclasses should override this method to handle incoming messages.
198 198 It is important to remember that this method is called in the thread
199 199 so that some logic must be done to ensure that the application leve
200 200 handlers are called in the application thread.
201 201 """
202 202 raise NotImplementedError('call_handlers must be defined in a subclass.')
203 203
204 204 def execute(self, code, silent=False,
205 205 user_variables=None, user_expressions=None, allow_stdin=None):
206 206 """Execute code in the kernel.
207 207
208 208 Parameters
209 209 ----------
210 210 code : str
211 211 A string of Python code.
212 212
213 213 silent : bool, optional (default False)
214 214 If set, the kernel will execute the code as quietly possible.
215 215
216 216 user_variables : list, optional
217 217 A list of variable names to pull from the user's namespace. They
218 218 will come back as a dict with these names as keys and their
219 219 :func:`repr` as values.
220 220
221 221 user_expressions : dict, optional
222 222 A dict with string keys and to pull from the user's
223 223 namespace. They will come back as a dict with these names as keys
224 224 and their :func:`repr` as values.
225 225
226 226 allow_stdin : bool, optional
227 227 Flag for
228 228 A dict with string keys and to pull from the user's
229 229 namespace. They will come back as a dict with these names as keys
230 230 and their :func:`repr` as values.
231 231
232 232 Returns
233 233 -------
234 234 The msg_id of the message sent.
235 235 """
236 236 if user_variables is None:
237 237 user_variables = []
238 238 if user_expressions is None:
239 239 user_expressions = {}
240 240 if allow_stdin is None:
241 241 allow_stdin = self.allow_stdin
242 242
243 243
244 244 # Don't waste network traffic if inputs are invalid
245 245 if not isinstance(code, basestring):
246 246 raise ValueError('code %r must be a string' % code)
247 247 validate_string_list(user_variables)
248 248 validate_string_dict(user_expressions)
249 249
250 250 # Create class for content/msg creation. Related to, but possibly
251 251 # not in Session.
252 252 content = dict(code=code, silent=silent,
253 253 user_variables=user_variables,
254 254 user_expressions=user_expressions,
255 255 allow_stdin=allow_stdin,
256 256 )
257 257 msg = self.session.msg('execute_request', content)
258 258 self._queue_send(msg)
259 259 return msg['header']['msg_id']
260 260
261 261 def complete(self, text, line, cursor_pos, block=None):
262 262 """Tab complete text in the kernel's namespace.
263 263
264 264 Parameters
265 265 ----------
266 266 text : str
267 267 The text to complete.
268 268 line : str
269 269 The full line of text that is the surrounding context for the
270 270 text to complete.
271 271 cursor_pos : int
272 272 The position of the cursor in the line where the completion was
273 273 requested.
274 274 block : str, optional
275 275 The full block of code in which the completion is being requested.
276 276
277 277 Returns
278 278 -------
279 279 The msg_id of the message sent.
280 280 """
281 281 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
282 282 msg = self.session.msg('complete_request', content)
283 283 self._queue_send(msg)
284 284 return msg['header']['msg_id']
285 285
286 286 def object_info(self, oname):
287 287 """Get metadata information about an object.
288 288
289 289 Parameters
290 290 ----------
291 291 oname : str
292 292 A string specifying the object name.
293 293
294 294 Returns
295 295 -------
296 296 The msg_id of the message sent.
297 297 """
298 298 content = dict(oname=oname)
299 299 msg = self.session.msg('object_info_request', content)
300 300 self._queue_send(msg)
301 301 return msg['header']['msg_id']
302 302
303 303 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
304 304 """Get entries from the history list.
305 305
306 306 Parameters
307 307 ----------
308 308 raw : bool
309 309 If True, return the raw input.
310 310 output : bool
311 311 If True, then return the output as well.
312 312 hist_access_type : str
313 313 'range' (fill in session, start and stop params), 'tail' (fill in n)
314 314 or 'search' (fill in pattern param).
315 315
316 316 session : int
317 317 For a range request, the session from which to get lines. Session
318 318 numbers are positive integers; negative ones count back from the
319 319 current session.
320 320 start : int
321 321 The first line number of a history range.
322 322 stop : int
323 323 The final (excluded) line number of a history range.
324 324
325 325 n : int
326 326 The number of lines of history to get for a tail request.
327 327
328 328 pattern : str
329 329 The glob-syntax pattern for a search request.
330 330
331 331 Returns
332 332 -------
333 333 The msg_id of the message sent.
334 334 """
335 335 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
336 336 **kwargs)
337 337 msg = self.session.msg('history_request', content)
338 338 self._queue_send(msg)
339 339 return msg['header']['msg_id']
340 340
341 341 def shutdown(self, restart=False):
342 342 """Request an immediate kernel shutdown.
343 343
344 344 Upon receipt of the (empty) reply, client code can safely assume that
345 345 the kernel has shut down and it's safe to forcefully terminate it if
346 346 it's still alive.
347 347
348 348 The kernel will send the reply via a function registered with Python's
349 349 atexit module, ensuring it's truly done as the kernel is done with all
350 350 normal operation.
351 351 """
352 352 # Send quit message to kernel. Once we implement kernel-side setattr,
353 353 # this should probably be done that way, but for now this will do.
354 354 msg = self.session.msg('shutdown_request', {'restart':restart})
355 355 self._queue_send(msg)
356 356 return msg['header']['msg_id']
357 357
358 358
359 359
360 360 class SubSocketChannel(ZMQSocketChannel):
361 361 """The SUB channel which listens for messages that the kernel publishes.
362 362 """
363 363
364 364 def __init__(self, context, session, address):
365 365 super(SubSocketChannel, self).__init__(context, session, address)
366 366 self.ioloop = ioloop.IOLoop()
367 367
368 368 def run(self):
369 369 """The thread's main activity. Call start() instead."""
370 370 self.socket = self.context.socket(zmq.SUB)
371 371 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
372 372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 373 self.socket.connect('tcp://%s:%i' % self.address)
374 374 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
375 375 self.stream.on_recv(self._handle_recv)
376 376 self._run_loop()
377 377
378 378 def stop(self):
379 379 self.ioloop.stop()
380 380 super(SubSocketChannel, self).stop()
381 381
382 382 def call_handlers(self, msg):
383 383 """This method is called in the ioloop thread when a message arrives.
384 384
385 385 Subclasses should override this method to handle incoming messages.
386 386 It is important to remember that this method is called in the thread
387 387 so that some logic must be done to ensure that the application leve
388 388 handlers are called in the application thread.
389 389 """
390 390 raise NotImplementedError('call_handlers must be defined in a subclass.')
391 391
392 392 def flush(self, timeout=1.0):
393 393 """Immediately processes all pending messages on the SUB channel.
394 394
395 395 Callers should use this method to ensure that :method:`call_handlers`
396 396 has been called for all messages that have been received on the
397 397 0MQ SUB socket of this channel.
398 398
399 399 This method is thread safe.
400 400
401 401 Parameters
402 402 ----------
403 403 timeout : float, optional
404 404 The maximum amount of time to spend flushing, in seconds. The
405 405 default is one second.
406 406 """
407 407 # We do the IOLoop callback process twice to ensure that the IOLoop
408 408 # gets to perform at least one full poll.
409 409 stop_time = time.time() + timeout
410 410 for i in xrange(2):
411 411 self._flushed = False
412 412 self.ioloop.add_callback(self._flush)
413 413 while not self._flushed and time.time() < stop_time:
414 414 time.sleep(0.01)
415 415
416 416 def _flush(self):
417 417 """Callback for :method:`self.flush`."""
418 418 self.stream.flush()
419 419 self._flushed = True
420 420
421 421
422 422 class StdInSocketChannel(ZMQSocketChannel):
423 423 """A reply channel to handle raw_input requests that the kernel makes."""
424 424
425 425 msg_queue = None
426 426
427 427 def __init__(self, context, session, address):
428 428 super(StdInSocketChannel, self).__init__(context, session, address)
429 429 self.ioloop = ioloop.IOLoop()
430 430
431 431 def run(self):
432 432 """The thread's main activity. Call start() instead."""
433 433 self.socket = self.context.socket(zmq.DEALER)
434 434 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
435 435 self.socket.connect('tcp://%s:%i' % self.address)
436 436 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
437 437 self.stream.on_recv(self._handle_recv)
438 438 self._run_loop()
439 439
440 440 def stop(self):
441 441 self.ioloop.stop()
442 442 super(StdInSocketChannel, self).stop()
443 443
444 444 def call_handlers(self, msg):
445 445 """This method is called in the ioloop thread when a message arrives.
446 446
447 447 Subclasses should override this method to handle incoming messages.
448 448 It is important to remember that this method is called in the thread
449 449 so that some logic must be done to ensure that the application leve
450 450 handlers are called in the application thread.
451 451 """
452 452 raise NotImplementedError('call_handlers must be defined in a subclass.')
453 453
454 454 def input(self, string):
455 455 """Send a string of raw input to the kernel."""
456 456 content = dict(value=string)
457 457 msg = self.session.msg('input_reply', content)
458 458 self._queue_send(msg)
459 459
460 460
461 461 class HBSocketChannel(ZMQSocketChannel):
462 462 """The heartbeat channel which monitors the kernel heartbeat.
463 463
464 464 Note that the heartbeat channel is paused by default. As long as you start
465 465 this channel, the kernel manager will ensure that it is paused and un-paused
466 466 as appropriate.
467 467 """
468 468
469 469 time_to_dead = 3.0
470 470 socket = None
471 471 poller = None
472 472 _running = None
473 473 _pause = None
474 _beating = None
474 475
475 476 def __init__(self, context, session, address):
476 477 super(HBSocketChannel, self).__init__(context, session, address)
477 478 self._running = False
478 self._pause = True
479 self._pause =True
480 self.poller = zmq.Poller()
479 481
480 482 def _create_socket(self):
483 if self.socket is not None:
484 # close previous socket, before opening a new one
485 self.poller.unregister(self.socket)
486 self.socket.close()
481 487 self.socket = self.context.socket(zmq.REQ)
482 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
488 self.socket.setsockopt(zmq.LINGER, 0)
483 489 self.socket.connect('tcp://%s:%i' % self.address)
484 self.poller = zmq.Poller()
490
485 491 self.poller.register(self.socket, zmq.POLLIN)
492
493 def _poll(self, start_time):
494 """poll for heartbeat replies until we reach self.time_to_dead
495
496 Ignores interrupts, and returns the result of poll(), which
497 will be an empty list if no messages arrived before the timeout,
498 or the event tuple if there is a message to receive.
499 """
500
501 until_dead = self.time_to_dead - (time.time() - start_time)
502 # ensure poll at least once
503 until_dead = max(until_dead, 1e-3)
504 events = []
505 while True:
506 try:
507 events = self.poller.poll(1000 * until_dead)
508 except zmq.ZMQError as e:
509 if e.errno == errno.EINTR:
510 # ignore interrupts during heartbeat
511 # this may never actually happen
512 until_dead = self.time_to_dead - (time.time() - start_time)
513 until_dead = max(until_dead, 1e-3)
514 pass
515 else:
516 raise
517 else:
518 break
519 return events
486 520
487 521 def run(self):
488 522 """The thread's main activity. Call start() instead."""
489 523 self._create_socket()
490 524 self._running = True
525 self._beating = True
526
491 527 while self._running:
492 528 if self._pause:
529 # just sleep, and skip the rest of the loop
493 530 time.sleep(self.time_to_dead)
531 continue
532
533 since_last_heartbeat = 0.0
534 # io.rprint('Ping from HB channel') # dbg
535 # no need to catch EFSM here, because the previous event was
536 # either a recv or connect, which cannot be followed by EFSM
537 self.socket.send(b'ping')
538 request_time = time.time()
539 ready = self._poll(request_time)
540 if ready:
541 self._beating = True
542 # the poll above guarantees we have something to recv
543 self.socket.recv()
544 # sleep the remainder of the cycle
545 remainder = self.time_to_dead - (time.time() - request_time)
546 if remainder > 0:
547 time.sleep(remainder)
548 continue
494 549 else:
495 since_last_heartbeat = 0.0
496 request_time = time.time()
497 try:
498 #io.rprint('Ping from HB channel') # dbg
499 self.socket.send(b'ping')
500 except zmq.ZMQError, e:
501 #io.rprint('*** HB Error:', e) # dbg
502 if e.errno == zmq.EFSM:
503 #io.rprint('sleep...', self.time_to_dead) # dbg
504 time.sleep(self.time_to_dead)
505 self._create_socket()
506 else:
507 raise
508 else:
509 while True:
510 try:
511 self.socket.recv(zmq.NOBLOCK)
512 except zmq.ZMQError, e:
513 #io.rprint('*** HB Error 2:', e) # dbg
514 if e.errno == zmq.EAGAIN:
515 before_poll = time.time()
516 until_dead = self.time_to_dead - (before_poll -
517 request_time)
518
519 # When the return value of poll() is an empty
520 # list, that is when things have gone wrong
521 # (zeromq bug). As long as it is not an empty
522 # list, poll is working correctly even if it
523 # returns quickly. Note: poll timeout is in
524 # milliseconds.
525 if until_dead > 0.0:
526 while True:
527 try:
528 self.poller.poll(1000 * until_dead)
529 except zmq.ZMQError as e:
530 if e.errno == errno.EINTR:
531 continue
532 else:
533 raise
534 else:
535 break
536
537 since_last_heartbeat = time.time()-request_time
538 if since_last_heartbeat > self.time_to_dead:
539 self.call_handlers(since_last_heartbeat)
540 break
541 else:
542 # FIXME: We should probably log this instead.
543 raise
544 else:
545 until_dead = self.time_to_dead - (time.time() -
546 request_time)
547 if until_dead > 0.0:
548 #io.rprint('sleep...', self.time_to_dead) # dbg
549 time.sleep(until_dead)
550 break
550 # nothing was received within the time limit, signal heart failure
551 self._beating = False
552 since_last_heartbeat = time.time() - request_time
553 self.call_handlers(since_last_heartbeat)
554 # and close/reopen the socket, because the REQ/REP cycle has been broken
555 self._create_socket()
556 continue
551 557
552 558 def pause(self):
553 559 """Pause the heartbeat."""
554 560 self._pause = True
555 561
556 562 def unpause(self):
557 563 """Unpause the heartbeat."""
558 564 self._pause = False
559 565
560 566 def is_beating(self):
561 """Is the heartbeat running and not paused."""
562 if self.is_alive() and not self._pause:
567 """Is the heartbeat running and responsive (and not paused)."""
568 if self.is_alive() and not self._pause and self._beating:
563 569 return True
564 570 else:
565 571 return False
566 572
567 573 def stop(self):
568 574 self._running = False
569 575 super(HBSocketChannel, self).stop()
570 576
571 577 def call_handlers(self, since_last_heartbeat):
572 578 """This method is called in the ioloop thread when a message arrives.
573 579
574 580 Subclasses should override this method to handle incoming messages.
575 581 It is important to remember that this method is called in the thread
576 so that some logic must be done to ensure that the application leve
582 so that some logic must be done to ensure that the application level
577 583 handlers are called in the application thread.
578 584 """
579 585 raise NotImplementedError('call_handlers must be defined in a subclass.')
580 586
581 587
582 588 #-----------------------------------------------------------------------------
583 589 # Main kernel manager class
584 590 #-----------------------------------------------------------------------------
585 591
586 592 class KernelManager(HasTraits):
587 593 """ Manages a kernel for a frontend.
588 594
589 595 The SUB channel is for the frontend to receive messages published by the
590 596 kernel.
591 597
592 598 The REQ channel is for the frontend to make requests of the kernel.
593 599
594 600 The REP channel is for the kernel to request stdin (raw_input) from the
595 601 frontend.
596 602 """
597 603 # config object for passing to child configurables
598 604 config = Instance(Config)
599 605
600 606 # The PyZMQ Context to use for communication with the kernel.
601 607 context = Instance(zmq.Context)
602 608 def _context_default(self):
603 609 return zmq.Context.instance()
604 610
605 611 # The Session to use for communication with the kernel.
606 612 session = Instance(Session)
607 613
608 614 # The kernel process with which the KernelManager is communicating.
609 615 kernel = Instance(Popen)
610 616
611 617 # The addresses for the communication channels.
612 618 connection_file = Unicode('')
613 619 ip = Unicode(LOCALHOST)
614 620 def _ip_changed(self, name, old, new):
615 621 if new == '*':
616 622 self.ip = '0.0.0.0'
617 623 shell_port = Integer(0)
618 624 iopub_port = Integer(0)
619 625 stdin_port = Integer(0)
620 626 hb_port = Integer(0)
621 627
622 628 # The classes to use for the various channels.
623 629 shell_channel_class = Type(ShellSocketChannel)
624 630 sub_channel_class = Type(SubSocketChannel)
625 631 stdin_channel_class = Type(StdInSocketChannel)
626 632 hb_channel_class = Type(HBSocketChannel)
627 633
628 634 # Protected traits.
629 635 _launch_args = Any
630 636 _shell_channel = Any
631 637 _sub_channel = Any
632 638 _stdin_channel = Any
633 639 _hb_channel = Any
634 640 _connection_file_written=Bool(False)
635 641
636 642 def __init__(self, **kwargs):
637 643 super(KernelManager, self).__init__(**kwargs)
638 644 if self.session is None:
639 645 self.session = Session(config=self.config)
640 646
641 647 def __del__(self):
642 648 self.cleanup_connection_file()
643 649
644 650
645 651 #--------------------------------------------------------------------------
646 652 # Channel management methods:
647 653 #--------------------------------------------------------------------------
648 654
649 655 def start_channels(self, shell=True, sub=True, stdin=True, hb=True):
650 656 """Starts the channels for this kernel.
651 657
652 658 This will create the channels if they do not exist and then start
653 659 them. If port numbers of 0 are being used (random ports) then you
654 660 must first call :method:`start_kernel`. If the channels have been
655 661 stopped and you call this, :class:`RuntimeError` will be raised.
656 662 """
657 663 if shell:
658 664 self.shell_channel.start()
659 665 if sub:
660 666 self.sub_channel.start()
661 667 if stdin:
662 668 self.stdin_channel.start()
663 669 self.shell_channel.allow_stdin = True
664 670 else:
665 671 self.shell_channel.allow_stdin = False
666 672 if hb:
667 673 self.hb_channel.start()
668 674
669 675 def stop_channels(self):
670 676 """Stops all the running channels for this kernel.
671 677 """
672 678 if self.shell_channel.is_alive():
673 679 self.shell_channel.stop()
674 680 if self.sub_channel.is_alive():
675 681 self.sub_channel.stop()
676 682 if self.stdin_channel.is_alive():
677 683 self.stdin_channel.stop()
678 684 if self.hb_channel.is_alive():
679 685 self.hb_channel.stop()
680 686
681 687 @property
682 688 def channels_running(self):
683 689 """Are any of the channels created and running?"""
684 690 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
685 691 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
686 692
687 693 #--------------------------------------------------------------------------
688 694 # Kernel process management methods:
689 695 #--------------------------------------------------------------------------
690 696
691 697 def cleanup_connection_file(self):
692 698 """cleanup connection file *if we wrote it*
693 699
694 700 Will not raise if the connection file was already removed somehow.
695 701 """
696 702 if self._connection_file_written:
697 703 # cleanup connection files on full shutdown of kernel we started
698 704 self._connection_file_written = False
699 705 try:
700 706 os.remove(self.connection_file)
701 707 except OSError:
702 708 pass
703 709
704 710 def load_connection_file(self):
705 711 """load connection info from JSON dict in self.connection_file"""
706 712 with open(self.connection_file) as f:
707 713 cfg = json.loads(f.read())
708 714
709 715 self.ip = cfg['ip']
710 716 self.shell_port = cfg['shell_port']
711 717 self.stdin_port = cfg['stdin_port']
712 718 self.iopub_port = cfg['iopub_port']
713 719 self.hb_port = cfg['hb_port']
714 720 self.session.key = str_to_bytes(cfg['key'])
715 721
716 722 def write_connection_file(self):
717 723 """write connection info to JSON dict in self.connection_file"""
718 724 if self._connection_file_written:
719 725 return
720 726 self.connection_file,cfg = write_connection_file(self.connection_file,
721 727 ip=self.ip, key=self.session.key,
722 728 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
723 729 shell_port=self.shell_port, hb_port=self.hb_port)
724 730 # write_connection_file also sets default ports:
725 731 self.shell_port = cfg['shell_port']
726 732 self.stdin_port = cfg['stdin_port']
727 733 self.iopub_port = cfg['iopub_port']
728 734 self.hb_port = cfg['hb_port']
729 735
730 736 self._connection_file_written = True
731 737
732 738 def start_kernel(self, **kw):
733 739 """Starts a kernel process and configures the manager to use it.
734 740
735 741 If random ports (port=0) are being used, this method must be called
736 742 before the channels are created.
737 743
738 744 Parameters:
739 745 -----------
740 746 ipython : bool, optional (default True)
741 747 Whether to use an IPython kernel instead of a plain Python kernel.
742 748
743 749 launcher : callable, optional (default None)
744 750 A custom function for launching the kernel process (generally a
745 751 wrapper around ``entry_point.base_launch_kernel``). In most cases,
746 752 it should not be necessary to use this parameter.
747 753
748 754 **kw : optional
749 755 See respective options for IPython and Python kernels.
750 756 """
751 757 if self.ip not in LOCAL_IPS:
752 758 raise RuntimeError("Can only launch a kernel on a local interface. "
753 759 "Make sure that the '*_address' attributes are "
754 760 "configured properly. "
755 761 "Currently valid addresses are: %s"%LOCAL_IPS
756 762 )
757 763
758 764 # write connection file / get default ports
759 765 self.write_connection_file()
760 766
761 767 self._launch_args = kw.copy()
762 768 launch_kernel = kw.pop('launcher', None)
763 769 if launch_kernel is None:
764 770 if kw.pop('ipython', True):
765 771 from ipkernel import launch_kernel
766 772 else:
767 773 from pykernel import launch_kernel
768 774 self.kernel = launch_kernel(fname=self.connection_file, **kw)
769 775
770 776 def shutdown_kernel(self, restart=False):
771 777 """ Attempts to the stop the kernel process cleanly. If the kernel
772 778 cannot be stopped, it is killed, if possible.
773 779 """
774 780 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
775 781 if sys.platform == 'win32':
776 782 self.kill_kernel()
777 783 return
778 784
779 785 # Pause the heart beat channel if it exists.
780 786 if self._hb_channel is not None:
781 787 self._hb_channel.pause()
782 788
783 789 # Don't send any additional kernel kill messages immediately, to give
784 790 # the kernel a chance to properly execute shutdown actions. Wait for at
785 791 # most 1s, checking every 0.1s.
786 792 self.shell_channel.shutdown(restart=restart)
787 793 for i in range(10):
788 794 if self.is_alive:
789 795 time.sleep(0.1)
790 796 else:
791 797 break
792 798 else:
793 799 # OK, we've waited long enough.
794 800 if self.has_kernel:
795 801 self.kill_kernel()
796 802
797 803 if not restart and self._connection_file_written:
798 804 # cleanup connection files on full shutdown of kernel we started
799 805 self._connection_file_written = False
800 806 try:
801 807 os.remove(self.connection_file)
802 808 except IOError:
803 809 pass
804 810
805 811 def restart_kernel(self, now=False, **kw):
806 812 """Restarts a kernel with the arguments that were used to launch it.
807 813
808 814 If the old kernel was launched with random ports, the same ports will be
809 815 used for the new kernel.
810 816
811 817 Parameters
812 818 ----------
813 819 now : bool, optional
814 820 If True, the kernel is forcefully restarted *immediately*, without
815 821 having a chance to do any cleanup action. Otherwise the kernel is
816 822 given 1s to clean up before a forceful restart is issued.
817 823
818 824 In all cases the kernel is restarted, the only difference is whether
819 825 it is given a chance to perform a clean shutdown or not.
820 826
821 827 **kw : optional
822 828 Any options specified here will replace those used to launch the
823 829 kernel.
824 830 """
825 831 if self._launch_args is None:
826 832 raise RuntimeError("Cannot restart the kernel. "
827 833 "No previous call to 'start_kernel'.")
828 834 else:
829 835 # Stop currently running kernel.
830 836 if self.has_kernel:
831 837 if now:
832 838 self.kill_kernel()
833 839 else:
834 840 self.shutdown_kernel(restart=True)
835 841
836 842 # Start new kernel.
837 843 self._launch_args.update(kw)
838 844 self.start_kernel(**self._launch_args)
839 845
840 846 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
841 847 # unless there is some delay here.
842 848 if sys.platform == 'win32':
843 849 time.sleep(0.2)
844 850
845 851 @property
846 852 def has_kernel(self):
847 853 """Returns whether a kernel process has been specified for the kernel
848 854 manager.
849 855 """
850 856 return self.kernel is not None
851 857
852 858 def kill_kernel(self):
853 859 """ Kill the running kernel. """
854 860 if self.has_kernel:
855 861 # Pause the heart beat channel if it exists.
856 862 if self._hb_channel is not None:
857 863 self._hb_channel.pause()
858 864
859 865 # Attempt to kill the kernel.
860 866 try:
861 867 self.kernel.kill()
862 868 except OSError, e:
863 869 # In Windows, we will get an Access Denied error if the process
864 870 # has already terminated. Ignore it.
865 871 if sys.platform == 'win32':
866 872 if e.winerror != 5:
867 873 raise
868 874 # On Unix, we may get an ESRCH error if the process has already
869 875 # terminated. Ignore it.
870 876 else:
871 877 from errno import ESRCH
872 878 if e.errno != ESRCH:
873 879 raise
874 880 self.kernel = None
875 881 else:
876 882 raise RuntimeError("Cannot kill kernel. No kernel is running!")
877 883
878 884 def interrupt_kernel(self):
879 885 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
880 886 well supported on all platforms.
881 887 """
882 888 if self.has_kernel:
883 889 if sys.platform == 'win32':
884 890 from parentpoller import ParentPollerWindows as Poller
885 891 Poller.send_interrupt(self.kernel.win32_interrupt_event)
886 892 else:
887 893 self.kernel.send_signal(signal.SIGINT)
888 894 else:
889 895 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
890 896
891 897 def signal_kernel(self, signum):
892 898 """ Sends a signal to the kernel. Note that since only SIGTERM is
893 899 supported on Windows, this function is only useful on Unix systems.
894 900 """
895 901 if self.has_kernel:
896 902 self.kernel.send_signal(signum)
897 903 else:
898 904 raise RuntimeError("Cannot signal kernel. No kernel is running!")
899 905
900 906 @property
901 907 def is_alive(self):
902 908 """Is the kernel process still running?"""
903 # FIXME: not using a heartbeat means this method is broken for any
904 # remote kernel, it's only capable of handling local kernels.
905 909 if self.has_kernel:
906 910 if self.kernel.poll() is None:
907 911 return True
908 912 else:
909 913 return False
914 elif self._hb_channel is not None:
915 # We didn't start the kernel with this KernelManager so we
916 # use the heartbeat.
917 return self._hb_channel.is_beating()
910 918 else:
911 # We didn't start the kernel with this KernelManager so we don't
912 # know if it is running. We should use a heartbeat for this case.
919 # no heartbeat and not local, we can't tell if it's running,
920 # so naively return True
913 921 return True
914 922
915 923 #--------------------------------------------------------------------------
916 924 # Channels used for communication with the kernel:
917 925 #--------------------------------------------------------------------------
918 926
919 927 @property
920 928 def shell_channel(self):
921 929 """Get the REQ socket channel object to make requests of the kernel."""
922 930 if self._shell_channel is None:
923 931 self._shell_channel = self.shell_channel_class(self.context,
924 932 self.session,
925 933 (self.ip, self.shell_port))
926 934 return self._shell_channel
927 935
928 936 @property
929 937 def sub_channel(self):
930 938 """Get the SUB socket channel object."""
931 939 if self._sub_channel is None:
932 940 self._sub_channel = self.sub_channel_class(self.context,
933 941 self.session,
934 942 (self.ip, self.iopub_port))
935 943 return self._sub_channel
936 944
937 945 @property
938 946 def stdin_channel(self):
939 947 """Get the REP socket channel object to handle stdin (raw_input)."""
940 948 if self._stdin_channel is None:
941 949 self._stdin_channel = self.stdin_channel_class(self.context,
942 950 self.session,
943 951 (self.ip, self.stdin_port))
944 952 return self._stdin_channel
945 953
946 954 @property
947 955 def hb_channel(self):
948 956 """Get the heartbeat socket channel object to check that the
949 957 kernel is alive."""
950 958 if self._hb_channel is None:
951 959 self._hb_channel = self.hb_channel_class(self.context,
952 960 self.session,
953 961 (self.ip, self.hb_port))
954 962 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now