##// END OF EJS Templates
Added pausing to the heartbeat channel.
Brian Granger -
Show More
@@ -1,851 +1,867 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 Todo
4 4 ====
5 5
6 6 * Create logger to handle debugging and console messages.
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2008-2010 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 # Standard library imports.
21 21 from Queue import Queue, Empty
22 22 from subprocess import Popen
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 26
27 27 # System library imports.
28 28 import zmq
29 29 from zmq import POLLIN, POLLOUT, POLLERR
30 30 from zmq.eventloop import ioloop
31 31
32 32 # Local imports.
33 33 from IPython.utils import io
34 34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
35 35 from session import Session
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Constants and exceptions
39 39 #-----------------------------------------------------------------------------
40 40
41 41 LOCALHOST = '127.0.0.1'
42 42
43 43 class InvalidPortNumber(Exception):
44 44 pass
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Utility functions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # some utilities to validate message structure, these might get moved elsewhere
51 51 # if they prove to have more generic utility
52 52
53 53 def validate_string_list(lst):
54 54 """Validate that the input is a list of strings.
55 55
56 56 Raises ValueError if not."""
57 57 if not isinstance(lst, list):
58 58 raise ValueError('input %r must be a list' % lst)
59 59 for x in lst:
60 60 if not isinstance(x, basestring):
61 61 raise ValueError('element %r in list must be a string' % x)
62 62
63 63
64 64 def validate_string_dict(dct):
65 65 """Validate that the input is a dict with string keys and values.
66 66
67 67 Raises ValueError if not."""
68 68 for k,v in dct.iteritems():
69 69 if not isinstance(k, basestring):
70 70 raise ValueError('key %r in dict must be a string' % k)
71 71 if not isinstance(v, basestring):
72 72 raise ValueError('value %r in dict must be a string' % v)
73 73
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # ZMQ Socket Channel classes
77 77 #-----------------------------------------------------------------------------
78 78
79 79 class ZmqSocketChannel(Thread):
80 80 """The base class for the channels that use ZMQ sockets.
81 81 """
82 82 context = None
83 83 session = None
84 84 socket = None
85 85 ioloop = None
86 86 iostate = None
87 87 _address = None
88 88
89 89 def __init__(self, context, session, address):
90 90 """Create a channel
91 91
92 92 Parameters
93 93 ----------
94 94 context : :class:`zmq.Context`
95 95 The ZMQ context to use.
96 96 session : :class:`session.Session`
97 97 The session to use.
98 98 address : tuple
99 99 Standard (ip, port) tuple that the kernel is listening on.
100 100 """
101 101 super(ZmqSocketChannel, self).__init__()
102 102 self.daemon = True
103 103
104 104 self.context = context
105 105 self.session = session
106 106 if address[1] == 0:
107 107 message = 'The port number for a channel cannot be 0.'
108 108 raise InvalidPortNumber(message)
109 109 self._address = address
110 110
111 111 def stop(self):
112 112 """Stop the channel's activity.
113 113
114 114 This calls :method:`Thread.join` and returns when the thread
115 115 terminates. :class:`RuntimeError` will be raised if
116 116 :method:`self.start` is called again.
117 117 """
118 118 self.join()
119 119
120 120 @property
121 121 def address(self):
122 122 """Get the channel's address as an (ip, port) tuple.
123 123
124 124 By the default, the address is (localhost, 0), where 0 means a random
125 125 port.
126 126 """
127 127 return self._address
128 128
129 129 def add_io_state(self, state):
130 130 """Add IO state to the eventloop.
131 131
132 132 Parameters
133 133 ----------
134 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 135 The IO state flag to set.
136 136
137 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 138 """
139 139 def add_io_state_callback():
140 140 if not self.iostate & state:
141 141 self.iostate = self.iostate | state
142 142 self.ioloop.update_handler(self.socket, self.iostate)
143 143 self.ioloop.add_callback(add_io_state_callback)
144 144
145 145 def drop_io_state(self, state):
146 146 """Drop IO state from the eventloop.
147 147
148 148 Parameters
149 149 ----------
150 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 151 The IO state flag to set.
152 152
153 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 154 """
155 155 def drop_io_state_callback():
156 156 if self.iostate & state:
157 157 self.iostate = self.iostate & (~state)
158 158 self.ioloop.update_handler(self.socket, self.iostate)
159 159 self.ioloop.add_callback(drop_io_state_callback)
160 160
161 161
162 162 class XReqSocketChannel(ZmqSocketChannel):
163 163 """The XREQ channel for issues request/replies to the kernel.
164 164 """
165 165
166 166 command_queue = None
167 167
168 168 def __init__(self, context, session, address):
169 169 super(XReqSocketChannel, self).__init__(context, session, address)
170 170 self.command_queue = Queue()
171 171 self.ioloop = ioloop.IOLoop()
172 172
173 173 def run(self):
174 174 """The thread's main activity. Call start() instead."""
175 175 self.socket = self.context.socket(zmq.XREQ)
176 176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 177 self.socket.connect('tcp://%s:%i' % self.address)
178 178 self.iostate = POLLERR|POLLIN
179 179 self.ioloop.add_handler(self.socket, self._handle_events,
180 180 self.iostate)
181 181 self.ioloop.start()
182 182
183 183 def stop(self):
184 184 self.ioloop.stop()
185 185 super(XReqSocketChannel, self).stop()
186 186
187 187 def call_handlers(self, msg):
188 188 """This method is called in the ioloop thread when a message arrives.
189 189
190 190 Subclasses should override this method to handle incoming messages.
191 191 It is important to remember that this method is called in the thread
192 192 so that some logic must be done to ensure that the application leve
193 193 handlers are called in the application thread.
194 194 """
195 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 196
197 197 def execute(self, code, silent=False,
198 198 user_variables=None, user_expressions=None):
199 199 """Execute code in the kernel.
200 200
201 201 Parameters
202 202 ----------
203 203 code : str
204 204 A string of Python code.
205 205
206 206 silent : bool, optional (default False)
207 207 If set, the kernel will execute the code as quietly possible.
208 208
209 209 user_variables : list, optional
210 210 A list of variable names to pull from the user's namespace. They
211 211 will come back as a dict with these names as keys and their
212 212 :func:`repr` as values.
213 213
214 214 user_expressions : dict, optional
215 215 A dict with string keys and to pull from the user's
216 216 namespace. They will come back as a dict with these names as keys
217 217 and their :func:`repr` as values.
218 218
219 219 Returns
220 220 -------
221 221 The msg_id of the message sent.
222 222 """
223 223 if user_variables is None:
224 224 user_variables = []
225 225 if user_expressions is None:
226 226 user_expressions = {}
227 227
228 228 # Don't waste network traffic if inputs are invalid
229 229 if not isinstance(code, basestring):
230 230 raise ValueError('code %r must be a string' % code)
231 231 validate_string_list(user_variables)
232 232 validate_string_dict(user_expressions)
233 233
234 234 # Create class for content/msg creation. Related to, but possibly
235 235 # not in Session.
236 236 content = dict(code=code, silent=silent,
237 237 user_variables=user_variables,
238 238 user_expressions=user_expressions)
239 239 msg = self.session.msg('execute_request', content)
240 240 self._queue_request(msg)
241 241 return msg['header']['msg_id']
242 242
243 243 def complete(self, text, line, cursor_pos, block=None):
244 244 """Tab complete text in the kernel's namespace.
245 245
246 246 Parameters
247 247 ----------
248 248 text : str
249 249 The text to complete.
250 250 line : str
251 251 The full line of text that is the surrounding context for the
252 252 text to complete.
253 253 cursor_pos : int
254 254 The position of the cursor in the line where the completion was
255 255 requested.
256 256 block : str, optional
257 257 The full block of code in which the completion is being requested.
258 258
259 259 Returns
260 260 -------
261 261 The msg_id of the message sent.
262 262 """
263 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 264 msg = self.session.msg('complete_request', content)
265 265 self._queue_request(msg)
266 266 return msg['header']['msg_id']
267 267
268 268 def object_info(self, oname):
269 269 """Get metadata information about an object.
270 270
271 271 Parameters
272 272 ----------
273 273 oname : str
274 274 A string specifying the object name.
275 275
276 276 Returns
277 277 -------
278 278 The msg_id of the message sent.
279 279 """
280 280 content = dict(oname=oname)
281 281 msg = self.session.msg('object_info_request', content)
282 282 self._queue_request(msg)
283 283 return msg['header']['msg_id']
284 284
285 285 def history(self, index=None, raw=False, output=True):
286 286 """Get the history list.
287 287
288 288 Parameters
289 289 ----------
290 290 index : n or (n1, n2) or None
291 291 If n, then the last entries. If a tuple, then all in
292 292 range(n1, n2). If None, then all entries. Raises IndexError if
293 293 the format of index is incorrect.
294 294 raw : bool
295 295 If True, return the raw input.
296 296 output : bool
297 297 If True, then return the output as well.
298 298
299 299 Returns
300 300 -------
301 301 The msg_id of the message sent.
302 302 """
303 303 content = dict(index=index, raw=raw, output=output)
304 304 msg = self.session.msg('history_request', content)
305 305 self._queue_request(msg)
306 306 return msg['header']['msg_id']
307 307
308 308 def shutdown(self):
309 309 """Request an immediate kernel shutdown.
310 310
311 311 Upon receipt of the (empty) reply, client code can safely assume that
312 312 the kernel has shut down and it's safe to forcefully terminate it if
313 313 it's still alive.
314 314
315 315 The kernel will send the reply via a function registered with Python's
316 316 atexit module, ensuring it's truly done as the kernel is done with all
317 317 normal operation.
318 318 """
319 319 # Send quit message to kernel. Once we implement kernel-side setattr,
320 320 # this should probably be done that way, but for now this will do.
321 321 msg = self.session.msg('shutdown_request', {})
322 322 self._queue_request(msg)
323 323 return msg['header']['msg_id']
324 324
325 325 def _handle_events(self, socket, events):
326 326 if events & POLLERR:
327 327 self._handle_err()
328 328 if events & POLLOUT:
329 329 self._handle_send()
330 330 if events & POLLIN:
331 331 self._handle_recv()
332 332
333 333 def _handle_recv(self):
334 334 msg = self.socket.recv_json()
335 335 self.call_handlers(msg)
336 336
337 337 def _handle_send(self):
338 338 try:
339 339 msg = self.command_queue.get(False)
340 340 except Empty:
341 341 pass
342 342 else:
343 343 self.socket.send_json(msg)
344 344 if self.command_queue.empty():
345 345 self.drop_io_state(POLLOUT)
346 346
347 347 def _handle_err(self):
348 348 # We don't want to let this go silently, so eventually we should log.
349 349 raise zmq.ZMQError()
350 350
351 351 def _queue_request(self, msg):
352 352 self.command_queue.put(msg)
353 353 self.add_io_state(POLLOUT)
354 354
355 355
356 356 class SubSocketChannel(ZmqSocketChannel):
357 357 """The SUB channel which listens for messages that the kernel publishes.
358 358 """
359 359
360 360 def __init__(self, context, session, address):
361 361 super(SubSocketChannel, self).__init__(context, session, address)
362 362 self.ioloop = ioloop.IOLoop()
363 363
364 364 def run(self):
365 365 """The thread's main activity. Call start() instead."""
366 366 self.socket = self.context.socket(zmq.SUB)
367 367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 369 self.socket.connect('tcp://%s:%i' % self.address)
370 370 self.iostate = POLLIN|POLLERR
371 371 self.ioloop.add_handler(self.socket, self._handle_events,
372 372 self.iostate)
373 373 self.ioloop.start()
374 374
375 375 def stop(self):
376 376 self.ioloop.stop()
377 377 super(SubSocketChannel, self).stop()
378 378
379 379 def call_handlers(self, msg):
380 380 """This method is called in the ioloop thread when a message arrives.
381 381
382 382 Subclasses should override this method to handle incoming messages.
383 383 It is important to remember that this method is called in the thread
384 384 so that some logic must be done to ensure that the application leve
385 385 handlers are called in the application thread.
386 386 """
387 387 raise NotImplementedError('call_handlers must be defined in a subclass.')
388 388
389 389 def flush(self, timeout=1.0):
390 390 """Immediately processes all pending messages on the SUB channel.
391 391
392 392 Callers should use this method to ensure that :method:`call_handlers`
393 393 has been called for all messages that have been received on the
394 394 0MQ SUB socket of this channel.
395 395
396 396 This method is thread safe.
397 397
398 398 Parameters
399 399 ----------
400 400 timeout : float, optional
401 401 The maximum amount of time to spend flushing, in seconds. The
402 402 default is one second.
403 403 """
404 404 # We do the IOLoop callback process twice to ensure that the IOLoop
405 405 # gets to perform at least one full poll.
406 406 stop_time = time.time() + timeout
407 407 for i in xrange(2):
408 408 self._flushed = False
409 409 self.ioloop.add_callback(self._flush)
410 410 while not self._flushed and time.time() < stop_time:
411 411 time.sleep(0.01)
412 412
413 413 def _handle_events(self, socket, events):
414 414 # Turn on and off POLLOUT depending on if we have made a request
415 415 if events & POLLERR:
416 416 self._handle_err()
417 417 if events & POLLIN:
418 418 self._handle_recv()
419 419
420 420 def _handle_err(self):
421 421 # We don't want to let this go silently, so eventually we should log.
422 422 raise zmq.ZMQError()
423 423
424 424 def _handle_recv(self):
425 425 # Get all of the messages we can
426 426 while True:
427 427 try:
428 428 msg = self.socket.recv_json(zmq.NOBLOCK)
429 429 except zmq.ZMQError:
430 430 # Check the errno?
431 431 # Will this trigger POLLERR?
432 432 break
433 433 else:
434 434 self.call_handlers(msg)
435 435
436 436 def _flush(self):
437 437 """Callback for :method:`self.flush`."""
438 438 self._flushed = True
439 439
440 440
441 441 class RepSocketChannel(ZmqSocketChannel):
442 442 """A reply channel to handle raw_input requests that the kernel makes."""
443 443
444 444 msg_queue = None
445 445
446 446 def __init__(self, context, session, address):
447 447 super(RepSocketChannel, self).__init__(context, session, address)
448 448 self.ioloop = ioloop.IOLoop()
449 449 self.msg_queue = Queue()
450 450
451 451 def run(self):
452 452 """The thread's main activity. Call start() instead."""
453 453 self.socket = self.context.socket(zmq.XREQ)
454 454 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 455 self.socket.connect('tcp://%s:%i' % self.address)
456 456 self.iostate = POLLERR|POLLIN
457 457 self.ioloop.add_handler(self.socket, self._handle_events,
458 458 self.iostate)
459 459 self.ioloop.start()
460 460
461 461 def stop(self):
462 462 self.ioloop.stop()
463 463 super(RepSocketChannel, self).stop()
464 464
465 465 def call_handlers(self, msg):
466 466 """This method is called in the ioloop thread when a message arrives.
467 467
468 468 Subclasses should override this method to handle incoming messages.
469 469 It is important to remember that this method is called in the thread
470 470 so that some logic must be done to ensure that the application leve
471 471 handlers are called in the application thread.
472 472 """
473 473 raise NotImplementedError('call_handlers must be defined in a subclass.')
474 474
475 475 def input(self, string):
476 476 """Send a string of raw input to the kernel."""
477 477 content = dict(value=string)
478 478 msg = self.session.msg('input_reply', content)
479 479 self._queue_reply(msg)
480 480
481 481 def _handle_events(self, socket, events):
482 482 if events & POLLERR:
483 483 self._handle_err()
484 484 if events & POLLOUT:
485 485 self._handle_send()
486 486 if events & POLLIN:
487 487 self._handle_recv()
488 488
489 489 def _handle_recv(self):
490 490 msg = self.socket.recv_json()
491 491 self.call_handlers(msg)
492 492
493 493 def _handle_send(self):
494 494 try:
495 495 msg = self.msg_queue.get(False)
496 496 except Empty:
497 497 pass
498 498 else:
499 499 self.socket.send_json(msg)
500 500 if self.msg_queue.empty():
501 501 self.drop_io_state(POLLOUT)
502 502
503 503 def _handle_err(self):
504 504 # We don't want to let this go silently, so eventually we should log.
505 505 raise zmq.ZMQError()
506 506
507 507 def _queue_reply(self, msg):
508 508 self.msg_queue.put(msg)
509 509 self.add_io_state(POLLOUT)
510 510
511 511
512 512 class HBSocketChannel(ZmqSocketChannel):
513 513 """The heartbeat channel which monitors the kernel heartbeat."""
514 514
515 515 time_to_dead = 3.0
516 516 socket = None
517 517 poller = None
518 _running = None
519 _pause = None
518 520
519 521 def __init__(self, context, session, address):
520 522 super(HBSocketChannel, self).__init__(context, session, address)
521 523 self._running = False
524 self._pause = False
522 525
523 526 def _create_socket(self):
524 527 self.socket = self.context.socket(zmq.REQ)
525 528 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
526 529 self.socket.connect('tcp://%s:%i' % self.address)
527 530 self.poller = zmq.Poller()
528 531 self.poller.register(self.socket, zmq.POLLIN)
529 532
530 533 def run(self):
531 534 """The thread's main activity. Call start() instead."""
532 535 self._create_socket()
533 536 self._running = True
534 537 # Wait 2 seconds for the kernel to come up and the sockets to auto
535 538 # connect. If we don't we will see the kernel as dead. Also, before
536 539 # the sockets are connected, the poller.poll line below is returning
537 540 # too fast. This avoids that because the polling doesn't start until
538 541 # after the sockets are connected.
539 542 time.sleep(2.0)
540 543 while self._running:
541 since_last_heartbeat = 0.0
542 request_time = time.time()
543 try:
544 #io.rprint('Ping from HB channel') # dbg
545 self.socket.send_json('ping')
546 except zmq.ZMQError, e:
547 #io.rprint('*** HB Error:', e) # dbg
548 if e.errno == zmq.EFSM:
549 #io.rprint('sleep...', self.time_to_dead) # dbg
550 time.sleep(self.time_to_dead)
551 self._create_socket()
552 else:
553 raise
544 if self._pause:
545 time.sleep(self.time_to_dead)
554 546 else:
555 while True:
556 try:
557 self.socket.recv_json(zmq.NOBLOCK)
558 except zmq.ZMQError, e:
559 #io.rprint('*** HB Error 2:', e) # dbg
560 if e.errno == zmq.EAGAIN:
561 before_poll = time.time()
562 until_dead = self.time_to_dead - (before_poll -
563 request_time)
564
565 # When the return value of poll() is an empty list,
566 # that is when things have gone wrong (zeromq bug).
567 # As long as it is not an empty list, poll is
568 # working correctly even if it returns quickly.
569 # Note: poll timeout is in milliseconds.
570 self.poller.poll(1000*until_dead)
547 since_last_heartbeat = 0.0
548 request_time = time.time()
549 try:
550 #io.rprint('Ping from HB channel') # dbg
551 self.socket.send_json('ping')
552 except zmq.ZMQError, e:
553 #io.rprint('*** HB Error:', e) # dbg
554 if e.errno == zmq.EFSM:
555 #io.rprint('sleep...', self.time_to_dead) # dbg
556 time.sleep(self.time_to_dead)
557 self._create_socket()
558 else:
559 raise
560 else:
561 while True:
562 try:
563 self.socket.recv_json(zmq.NOBLOCK)
564 except zmq.ZMQError, e:
565 #io.rprint('*** HB Error 2:', e) # dbg
566 if e.errno == zmq.EAGAIN:
567 before_poll = time.time()
568 until_dead = self.time_to_dead - (before_poll -
569 request_time)
570
571 # When the return value of poll() is an empty list,
572 # that is when things have gone wrong (zeromq bug).
573 # As long as it is not an empty list, poll is
574 # working correctly even if it returns quickly.
575 # Note: poll timeout is in milliseconds.
576 self.poller.poll(1000*until_dead)
571 577
572 since_last_heartbeat = time.time() - request_time
573 if since_last_heartbeat > self.time_to_dead:
574 self.call_handlers(since_last_heartbeat)
575 break
578 since_last_heartbeat = time.time() - request_time
579 if since_last_heartbeat > self.time_to_dead:
580 self.call_handlers(since_last_heartbeat)
581 break
582 else:
583 # FIXME: We should probably log this instead.
584 raise
576 585 else:
577 # FIXME: We should probably log this instead.
578 raise
579 else:
580 until_dead = self.time_to_dead - (time.time() -
581 request_time)
582 if until_dead > 0.0:
583 #io.rprint('sleep...', self.time_to_dead) # dbg
584 time.sleep(until_dead)
585 break
586 until_dead = self.time_to_dead - (time.time() -
587 request_time)
588 if until_dead > 0.0:
589 #io.rprint('sleep...', self.time_to_dead) # dbg
590 time.sleep(until_dead)
591 break
592
593 def pause(self):
594 """Pause the heartbeat."""
595 self._pause = True
596
597 def unpause(self):
598 """Unpause the heartbeat."""
599 self._pause = False
600
601 def is_beating(self):
602 """Is the heartbeat running and not paused."""
603 if self.is_alive() and not self._pause:
604 return True
605 else:
606 return False
586 607
587 608 def stop(self):
588 609 self._running = False
589 610 super(HBSocketChannel, self).stop()
590 611
591 612 def call_handlers(self, since_last_heartbeat):
592 613 """This method is called in the ioloop thread when a message arrives.
593 614
594 615 Subclasses should override this method to handle incoming messages.
595 616 It is important to remember that this method is called in the thread
596 617 so that some logic must be done to ensure that the application leve
597 618 handlers are called in the application thread.
598 619 """
599 620 raise NotImplementedError('call_handlers must be defined in a subclass.')
600 621
601 622
602 623 #-----------------------------------------------------------------------------
603 624 # Main kernel manager class
604 625 #-----------------------------------------------------------------------------
605 626
606 627 class KernelManager(HasTraits):
607 628 """ Manages a kernel for a frontend.
608 629
609 630 The SUB channel is for the frontend to receive messages published by the
610 631 kernel.
611 632
612 633 The REQ channel is for the frontend to make requests of the kernel.
613 634
614 635 The REP channel is for the kernel to request stdin (raw_input) from the
615 636 frontend.
616 637 """
617 638 # The PyZMQ Context to use for communication with the kernel.
618 639 context = Instance(zmq.Context,(),{})
619 640
620 641 # The Session to use for communication with the kernel.
621 642 session = Instance(Session,(),{})
622 643
623 644 # The kernel process with which the KernelManager is communicating.
624 645 kernel = Instance(Popen)
625 646
626 647 # The addresses for the communication channels.
627 648 xreq_address = TCPAddress((LOCALHOST, 0))
628 649 sub_address = TCPAddress((LOCALHOST, 0))
629 650 rep_address = TCPAddress((LOCALHOST, 0))
630 651 hb_address = TCPAddress((LOCALHOST, 0))
631 652
632 653 # The classes to use for the various channels.
633 654 xreq_channel_class = Type(XReqSocketChannel)
634 655 sub_channel_class = Type(SubSocketChannel)
635 656 rep_channel_class = Type(RepSocketChannel)
636 657 hb_channel_class = Type(HBSocketChannel)
637 658
638 659 # Protected traits.
639 660 _launch_args = Any
640 661 _xreq_channel = Any
641 662 _sub_channel = Any
642 663 _rep_channel = Any
643 664 _hb_channel = Any
644 665
645 666 #--------------------------------------------------------------------------
646 667 # Channel management methods:
647 668 #--------------------------------------------------------------------------
648 669
649 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
650 """Starts the channels for this kernel.
670 def start_channels(self, xreq=True, sub=True, rep=True):
671 """Starts the channels for this kernel, but not the heartbeat.
651 672
652 673 This will create the channels if they do not exist and then start
653 674 them. If port numbers of 0 are being used (random ports) then you
654 675 must first call :method:`start_kernel`. If the channels have been
655 676 stopped and you call this, :class:`RuntimeError` will be raised.
656 677 """
657 678 if xreq:
658 679 self.xreq_channel.start()
659 680 if sub:
660 681 self.sub_channel.start()
661 682 if rep:
662 683 self.rep_channel.start()
663 if hb:
664 self.hb_channel.start()
665 684
666 685 def stop_channels(self):
667 686 """Stops all the running channels for this kernel.
668 687 """
669 688 if self.xreq_channel.is_alive():
670 689 self.xreq_channel.stop()
671 690 if self.sub_channel.is_alive():
672 691 self.sub_channel.stop()
673 692 if self.rep_channel.is_alive():
674 693 self.rep_channel.stop()
675 if self.hb_channel.is_alive():
676 self.hb_channel.stop()
677 694
678 695 @property
679 696 def channels_running(self):
680 697 """Are any of the channels created and running?"""
681 698 return self.xreq_channel.is_alive() \
682 699 or self.sub_channel.is_alive() \
683 or self.rep_channel.is_alive() \
684 or self.hb_channel.is_alive()
700 or self.rep_channel.is_alive()
685 701
686 702 #--------------------------------------------------------------------------
687 703 # Kernel process management methods:
688 704 #--------------------------------------------------------------------------
689 705
690 706 def start_kernel(self, **kw):
691 707 """Starts a kernel process and configures the manager to use it.
692 708
693 709 If random ports (port=0) are being used, this method must be called
694 710 before the channels are created.
695 711
696 712 Parameters:
697 713 -----------
698 714 ipython : bool, optional (default True)
699 715 Whether to use an IPython kernel instead of a plain Python kernel.
700 716 """
701 717 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
702 718 self.rep_address, self.hb_address
703 719 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
704 720 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
705 721 raise RuntimeError("Can only launch a kernel on localhost."
706 722 "Make sure that the '*_address' attributes are "
707 723 "configured properly.")
708 724
709 725 self._launch_args = kw.copy()
710 726 if kw.pop('ipython', True):
711 727 from ipkernel import launch_kernel
712 728 else:
713 729 from pykernel import launch_kernel
714 730 self.kernel, xrep, pub, req, hb = launch_kernel(
715 731 xrep_port=xreq[1], pub_port=sub[1],
716 732 req_port=rep[1], hb_port=hb[1], **kw)
717 733 self.xreq_address = (LOCALHOST, xrep)
718 734 self.sub_address = (LOCALHOST, pub)
719 735 self.rep_address = (LOCALHOST, req)
720 736 self.hb_address = (LOCALHOST, hb)
721 737
722 738 def shutdown_kernel(self):
723 739 """ Attempts to the stop the kernel process cleanly. If the kernel
724 740 cannot be stopped, it is killed, if possible.
725 741 """
726 742 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
727 743 if sys.platform == 'win32':
728 744 self.kill_kernel()
729 745 return
730 746
731 747 self.xreq_channel.shutdown()
732 748 # Don't send any additional kernel kill messages immediately, to give
733 749 # the kernel a chance to properly execute shutdown actions. Wait for at
734 750 # most 1s, checking every 0.1s.
735 751 for i in range(10):
736 752 if self.is_alive:
737 753 time.sleep(0.1)
738 754 else:
739 755 break
740 756 else:
741 757 # OK, we've waited long enough.
742 758 if self.has_kernel:
743 759 self.kill_kernel()
744 760
745 761 def restart_kernel(self, instant_death=False):
746 762 """Restarts a kernel with the same arguments that were used to launch
747 763 it. If the old kernel was launched with random ports, the same ports
748 764 will be used for the new kernel.
749 765
750 766 Parameters
751 767 ----------
752 768 instant_death : bool, optional
753 769 If True, the kernel is forcefully restarted *immediately*, without
754 770 having a chance to do any cleanup action. Otherwise the kernel is
755 771 given 1s to clean up before a forceful restart is issued.
756 772
757 773 In all cases the kernel is restarted, the only difference is whether
758 774 it is given a chance to perform a clean shutdown or not.
759 775 """
760 776 if self._launch_args is None:
761 777 raise RuntimeError("Cannot restart the kernel. "
762 778 "No previous call to 'start_kernel'.")
763 779 else:
764 780 if self.has_kernel:
765 781 if instant_death:
766 782 self.kill_kernel()
767 783 else:
768 784 self.shutdown_kernel()
769 785 self.start_kernel(**self._launch_args)
770 786
771 787 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
772 788 # unless there is some delay here.
773 789 if sys.platform == 'win32':
774 790 time.sleep(0.2)
775 791
776 792 @property
777 793 def has_kernel(self):
778 794 """Returns whether a kernel process has been specified for the kernel
779 795 manager.
780 796 """
781 797 return self.kernel is not None
782 798
783 799 def kill_kernel(self):
784 800 """ Kill the running kernel. """
785 801 if self.kernel is not None:
786 802 self.kernel.kill()
787 803 self.kernel = None
788 804 else:
789 805 raise RuntimeError("Cannot kill kernel. No kernel is running!")
790 806
791 807 def signal_kernel(self, signum):
792 808 """ Sends a signal to the kernel. """
793 809 if self.kernel is not None:
794 810 self.kernel.send_signal(signum)
795 811 else:
796 812 raise RuntimeError("Cannot signal kernel. No kernel is running!")
797 813
798 814 @property
799 815 def is_alive(self):
800 816 """Is the kernel process still running?"""
801 817 # FIXME: not using a heartbeat means this method is broken for any
802 818 # remote kernel, it's only capable of handling local kernels.
803 819 if self.kernel is not None:
804 820 if self.kernel.poll() is None:
805 821 return True
806 822 else:
807 823 return False
808 824 else:
809 825 # We didn't start the kernel with this KernelManager so we don't
810 826 # know if it is running. We should use a heartbeat for this case.
811 827 return True
812 828
813 829 #--------------------------------------------------------------------------
814 830 # Channels used for communication with the kernel:
815 831 #--------------------------------------------------------------------------
816 832
817 833 @property
818 834 def xreq_channel(self):
819 835 """Get the REQ socket channel object to make requests of the kernel."""
820 836 if self._xreq_channel is None:
821 837 self._xreq_channel = self.xreq_channel_class(self.context,
822 838 self.session,
823 839 self.xreq_address)
824 840 return self._xreq_channel
825 841
826 842 @property
827 843 def sub_channel(self):
828 844 """Get the SUB socket channel object."""
829 845 if self._sub_channel is None:
830 846 self._sub_channel = self.sub_channel_class(self.context,
831 847 self.session,
832 848 self.sub_address)
833 849 return self._sub_channel
834 850
835 851 @property
836 852 def rep_channel(self):
837 853 """Get the REP socket channel object to handle stdin (raw_input)."""
838 854 if self._rep_channel is None:
839 855 self._rep_channel = self.rep_channel_class(self.context,
840 856 self.session,
841 857 self.rep_address)
842 858 return self._rep_channel
843 859
844 860 @property
845 861 def hb_channel(self):
846 862 """Get the REP socket channel object to handle stdin (raw_input)."""
847 863 if self._hb_channel is None:
848 864 self._hb_channel = self.hb_channel_class(self.context,
849 865 self.session,
850 866 self.hb_address)
851 867 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now