##// END OF EJS Templates
Correction to docstring...
Thomas Kluyver -
Show More
@@ -1,909 +1,910 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import atexit
20 20 from Queue import Queue, Empty
21 21 from subprocess import Popen
22 22 import signal
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 26 import logging
27 27
28 28 # System library imports.
29 29 import zmq
30 30 from zmq import POLLIN, POLLOUT, POLLERR
31 31 from zmq.eventloop import ioloop
32 32
33 33 # Local imports.
34 34 from IPython.utils import io
35 35 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
36 36 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
37 37 from session import Session, Message
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Constants and exceptions
41 41 #-----------------------------------------------------------------------------
42 42
43 43 class InvalidPortNumber(Exception):
44 44 pass
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Utility functions
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # some utilities to validate message structure, these might get moved elsewhere
51 51 # if they prove to have more generic utility
52 52
53 53 def validate_string_list(lst):
54 54 """Validate that the input is a list of strings.
55 55
56 56 Raises ValueError if not."""
57 57 if not isinstance(lst, list):
58 58 raise ValueError('input %r must be a list' % lst)
59 59 for x in lst:
60 60 if not isinstance(x, basestring):
61 61 raise ValueError('element %r in list must be a string' % x)
62 62
63 63
64 64 def validate_string_dict(dct):
65 65 """Validate that the input is a dict with string keys and values.
66 66
67 67 Raises ValueError if not."""
68 68 for k,v in dct.iteritems():
69 69 if not isinstance(k, basestring):
70 70 raise ValueError('key %r in dict must be a string' % k)
71 71 if not isinstance(v, basestring):
72 72 raise ValueError('value %r in dict must be a string' % v)
73 73
74 74
75 75 #-----------------------------------------------------------------------------
76 76 # ZMQ Socket Channel classes
77 77 #-----------------------------------------------------------------------------
78 78
79 79 class ZmqSocketChannel(Thread):
80 80 """The base class for the channels that use ZMQ sockets.
81 81 """
82 82 context = None
83 83 session = None
84 84 socket = None
85 85 ioloop = None
86 86 iostate = None
87 87 _address = None
88 88
89 89 def __init__(self, context, session, address):
90 90 """Create a channel
91 91
92 92 Parameters
93 93 ----------
94 94 context : :class:`zmq.Context`
95 95 The ZMQ context to use.
96 96 session : :class:`session.Session`
97 97 The session to use.
98 98 address : tuple
99 99 Standard (ip, port) tuple that the kernel is listening on.
100 100 """
101 101 super(ZmqSocketChannel, self).__init__()
102 102 self.daemon = True
103 103
104 104 self.context = context
105 105 self.session = session
106 106 if address[1] == 0:
107 107 message = 'The port number for a channel cannot be 0.'
108 108 raise InvalidPortNumber(message)
109 109 self._address = address
110 110
111 111 def stop(self):
112 112 """Stop the channel's activity.
113 113
114 114 This calls :method:`Thread.join` and returns when the thread
115 115 terminates. :class:`RuntimeError` will be raised if
116 116 :method:`self.start` is called again.
117 117 """
118 118 self.join()
119 119
120 120 @property
121 121 def address(self):
122 122 """Get the channel's address as an (ip, port) tuple.
123 123
124 124 By the default, the address is (localhost, 0), where 0 means a random
125 125 port.
126 126 """
127 127 return self._address
128 128
129 129 def add_io_state(self, state):
130 130 """Add IO state to the eventloop.
131 131
132 132 Parameters
133 133 ----------
134 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 135 The IO state flag to set.
136 136
137 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 138 """
139 139 def add_io_state_callback():
140 140 if not self.iostate & state:
141 141 self.iostate = self.iostate | state
142 142 self.ioloop.update_handler(self.socket, self.iostate)
143 143 self.ioloop.add_callback(add_io_state_callback)
144 144
145 145 def drop_io_state(self, state):
146 146 """Drop IO state from the eventloop.
147 147
148 148 Parameters
149 149 ----------
150 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 151 The IO state flag to set.
152 152
153 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 154 """
155 155 def drop_io_state_callback():
156 156 if self.iostate & state:
157 157 self.iostate = self.iostate & (~state)
158 158 self.ioloop.update_handler(self.socket, self.iostate)
159 159 self.ioloop.add_callback(drop_io_state_callback)
160 160
161 161
162 162 class XReqSocketChannel(ZmqSocketChannel):
163 163 """The XREQ channel for issues request/replies to the kernel.
164 164 """
165 165
166 166 command_queue = None
167 167
168 168 def __init__(self, context, session, address):
169 169 super(XReqSocketChannel, self).__init__(context, session, address)
170 170 self.command_queue = Queue()
171 171 self.ioloop = ioloop.IOLoop()
172 172
173 173 def run(self):
174 174 """The thread's main activity. Call start() instead."""
175 175 self.socket = self.context.socket(zmq.XREQ)
176 176 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 177 self.socket.connect('tcp://%s:%i' % self.address)
178 178 self.iostate = POLLERR|POLLIN
179 179 self.ioloop.add_handler(self.socket, self._handle_events,
180 180 self.iostate)
181 181 self.ioloop.start()
182 182
183 183 def stop(self):
184 184 self.ioloop.stop()
185 185 super(XReqSocketChannel, self).stop()
186 186
187 187 def call_handlers(self, msg):
188 188 """This method is called in the ioloop thread when a message arrives.
189 189
190 190 Subclasses should override this method to handle incoming messages.
191 191 It is important to remember that this method is called in the thread
192 192 so that some logic must be done to ensure that the application leve
193 193 handlers are called in the application thread.
194 194 """
195 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 196
197 197 def execute(self, code, silent=False,
198 198 user_variables=None, user_expressions=None):
199 199 """Execute code in the kernel.
200 200
201 201 Parameters
202 202 ----------
203 203 code : str
204 204 A string of Python code.
205 205
206 206 silent : bool, optional (default False)
207 207 If set, the kernel will execute the code as quietly possible.
208 208
209 209 user_variables : list, optional
210 210 A list of variable names to pull from the user's namespace. They
211 211 will come back as a dict with these names as keys and their
212 212 :func:`repr` as values.
213 213
214 214 user_expressions : dict, optional
215 215 A dict with string keys and to pull from the user's
216 216 namespace. They will come back as a dict with these names as keys
217 217 and their :func:`repr` as values.
218 218
219 219 Returns
220 220 -------
221 221 The msg_id of the message sent.
222 222 """
223 223 if user_variables is None:
224 224 user_variables = []
225 225 if user_expressions is None:
226 226 user_expressions = {}
227 227
228 228 # Don't waste network traffic if inputs are invalid
229 229 if not isinstance(code, basestring):
230 230 raise ValueError('code %r must be a string' % code)
231 231 validate_string_list(user_variables)
232 232 validate_string_dict(user_expressions)
233 233
234 234 # Create class for content/msg creation. Related to, but possibly
235 235 # not in Session.
236 236 content = dict(code=code, silent=silent,
237 237 user_variables=user_variables,
238 238 user_expressions=user_expressions)
239 239 msg = self.session.msg('execute_request', content)
240 240 self._queue_request(msg)
241 241 return msg['header']['msg_id']
242 242
243 243 def complete(self, text, line, cursor_pos, block=None):
244 244 """Tab complete text in the kernel's namespace.
245 245
246 246 Parameters
247 247 ----------
248 248 text : str
249 249 The text to complete.
250 250 line : str
251 251 The full line of text that is the surrounding context for the
252 252 text to complete.
253 253 cursor_pos : int
254 254 The position of the cursor in the line where the completion was
255 255 requested.
256 256 block : str, optional
257 257 The full block of code in which the completion is being requested.
258 258
259 259 Returns
260 260 -------
261 261 The msg_id of the message sent.
262 262 """
263 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 264 msg = self.session.msg('complete_request', content)
265 265 self._queue_request(msg)
266 266 return msg['header']['msg_id']
267 267
268 268 def object_info(self, oname):
269 269 """Get metadata information about an object.
270 270
271 271 Parameters
272 272 ----------
273 273 oname : str
274 274 A string specifying the object name.
275 275
276 276 Returns
277 277 -------
278 278 The msg_id of the message sent.
279 279 """
280 280 content = dict(oname=oname)
281 281 msg = self.session.msg('object_info_request', content)
282 282 self._queue_request(msg)
283 283 return msg['header']['msg_id']
284 284
285 285 def history(self, index=None, raw=False, output=True):
286 286 """Get the history list.
287 287
288 288 Parameters
289 289 ----------
290 290 index : n or (n1, n2) or None
291 291 If n, then the last entries. If a tuple, then all in
292 292 range(n1, n2). If None, then all entries. Raises IndexError if
293 293 the format of index is incorrect.
294 294 raw : bool
295 295 If True, return the raw input.
296 296 output : bool
297 297 If True, then return the output as well.
298 298
299 299 Returns
300 300 -------
301 301 The msg_id of the message sent.
302 302 """
303 303 content = dict(index=index, raw=raw, output=output)
304 304 msg = self.session.msg('history_request', content)
305 305 self._queue_request(msg)
306 306 return msg['header']['msg_id']
307 307
308 308 def shutdown(self, restart=False):
309 309 """Request an immediate kernel shutdown.
310 310
311 311 Upon receipt of the (empty) reply, client code can safely assume that
312 312 the kernel has shut down and it's safe to forcefully terminate it if
313 313 it's still alive.
314 314
315 315 The kernel will send the reply via a function registered with Python's
316 316 atexit module, ensuring it's truly done as the kernel is done with all
317 317 normal operation.
318 318 """
319 319 # Send quit message to kernel. Once we implement kernel-side setattr,
320 320 # this should probably be done that way, but for now this will do.
321 321 msg = self.session.msg('shutdown_request', {'restart':restart})
322 322 self._queue_request(msg)
323 323 return msg['header']['msg_id']
324 324
325 325 def _handle_events(self, socket, events):
326 326 if events & POLLERR:
327 327 self._handle_err()
328 328 if events & POLLOUT:
329 329 self._handle_send()
330 330 if events & POLLIN:
331 331 self._handle_recv()
332 332
333 333 def _handle_recv(self):
334 334 ident,msg = self.session.recv(self.socket, 0)
335 335 self.call_handlers(msg)
336 336
337 337 def _handle_send(self):
338 338 try:
339 339 msg = self.command_queue.get(False)
340 340 except Empty:
341 341 pass
342 342 else:
343 343 self.session.send(self.socket,msg)
344 344 if self.command_queue.empty():
345 345 self.drop_io_state(POLLOUT)
346 346
347 347 def _handle_err(self):
348 348 # We don't want to let this go silently, so eventually we should log.
349 349 raise zmq.ZMQError()
350 350
351 351 def _queue_request(self, msg):
352 352 self.command_queue.put(msg)
353 353 self.add_io_state(POLLOUT)
354 354
355 355
356 356 class SubSocketChannel(ZmqSocketChannel):
357 357 """The SUB channel which listens for messages that the kernel publishes.
358 358 """
359 359
360 360 def __init__(self, context, session, address):
361 361 super(SubSocketChannel, self).__init__(context, session, address)
362 362 self.ioloop = ioloop.IOLoop()
363 363
364 364 def run(self):
365 365 """The thread's main activity. Call start() instead."""
366 366 self.socket = self.context.socket(zmq.SUB)
367 367 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 368 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 369 self.socket.connect('tcp://%s:%i' % self.address)
370 370 self.iostate = POLLIN|POLLERR
371 371 self.ioloop.add_handler(self.socket, self._handle_events,
372 372 self.iostate)
373 373 self.ioloop.start()
374 374
375 375 def stop(self):
376 376 self.ioloop.stop()
377 377 super(SubSocketChannel, self).stop()
378 378
379 379 def call_handlers(self, msg):
380 380 """This method is called in the ioloop thread when a message arrives.
381 381
382 382 Subclasses should override this method to handle incoming messages.
383 383 It is important to remember that this method is called in the thread
384 384 so that some logic must be done to ensure that the application leve
385 385 handlers are called in the application thread.
386 386 """
387 387 raise NotImplementedError('call_handlers must be defined in a subclass.')
388 388
389 389 def flush(self, timeout=1.0):
390 390 """Immediately processes all pending messages on the SUB channel.
391 391
392 392 Callers should use this method to ensure that :method:`call_handlers`
393 393 has been called for all messages that have been received on the
394 394 0MQ SUB socket of this channel.
395 395
396 396 This method is thread safe.
397 397
398 398 Parameters
399 399 ----------
400 400 timeout : float, optional
401 401 The maximum amount of time to spend flushing, in seconds. The
402 402 default is one second.
403 403 """
404 404 # We do the IOLoop callback process twice to ensure that the IOLoop
405 405 # gets to perform at least one full poll.
406 406 stop_time = time.time() + timeout
407 407 for i in xrange(2):
408 408 self._flushed = False
409 409 self.ioloop.add_callback(self._flush)
410 410 while not self._flushed and time.time() < stop_time:
411 411 time.sleep(0.01)
412 412
413 413 def _handle_events(self, socket, events):
414 414 # Turn on and off POLLOUT depending on if we have made a request
415 415 if events & POLLERR:
416 416 self._handle_err()
417 417 if events & POLLIN:
418 418 self._handle_recv()
419 419
420 420 def _handle_err(self):
421 421 # We don't want to let this go silently, so eventually we should log.
422 422 raise zmq.ZMQError()
423 423
424 424 def _handle_recv(self):
425 425 # Get all of the messages we can
426 426 while True:
427 427 try:
428 428 ident,msg = self.session.recv(self.socket)
429 429 except zmq.ZMQError:
430 430 # Check the errno?
431 431 # Will this trigger POLLERR?
432 432 break
433 433 else:
434 434 if msg is None:
435 435 break
436 436 self.call_handlers(msg)
437 437
438 438 def _flush(self):
439 439 """Callback for :method:`self.flush`."""
440 440 self._flushed = True
441 441
442 442
443 443 class RepSocketChannel(ZmqSocketChannel):
444 444 """A reply channel to handle raw_input requests that the kernel makes."""
445 445
446 446 msg_queue = None
447 447
448 448 def __init__(self, context, session, address):
449 449 super(RepSocketChannel, self).__init__(context, session, address)
450 450 self.ioloop = ioloop.IOLoop()
451 451 self.msg_queue = Queue()
452 452
453 453 def run(self):
454 454 """The thread's main activity. Call start() instead."""
455 455 self.socket = self.context.socket(zmq.XREQ)
456 456 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
457 457 self.socket.connect('tcp://%s:%i' % self.address)
458 458 self.iostate = POLLERR|POLLIN
459 459 self.ioloop.add_handler(self.socket, self._handle_events,
460 460 self.iostate)
461 461 self.ioloop.start()
462 462
463 463 def stop(self):
464 464 self.ioloop.stop()
465 465 super(RepSocketChannel, self).stop()
466 466
467 467 def call_handlers(self, msg):
468 468 """This method is called in the ioloop thread when a message arrives.
469 469
470 470 Subclasses should override this method to handle incoming messages.
471 471 It is important to remember that this method is called in the thread
472 472 so that some logic must be done to ensure that the application leve
473 473 handlers are called in the application thread.
474 474 """
475 475 raise NotImplementedError('call_handlers must be defined in a subclass.')
476 476
477 477 def input(self, string):
478 478 """Send a string of raw input to the kernel."""
479 479 content = dict(value=string)
480 480 msg = self.session.msg('input_reply', content)
481 481 self._queue_reply(msg)
482 482
483 483 def _handle_events(self, socket, events):
484 484 if events & POLLERR:
485 485 self._handle_err()
486 486 if events & POLLOUT:
487 487 self._handle_send()
488 488 if events & POLLIN:
489 489 self._handle_recv()
490 490
491 491 def _handle_recv(self):
492 492 ident,msg = self.session.recv(self.socket, 0)
493 493 self.call_handlers(msg)
494 494
495 495 def _handle_send(self):
496 496 try:
497 497 msg = self.msg_queue.get(False)
498 498 except Empty:
499 499 pass
500 500 else:
501 501 self.session.send(self.socket,msg)
502 502 if self.msg_queue.empty():
503 503 self.drop_io_state(POLLOUT)
504 504
505 505 def _handle_err(self):
506 506 # We don't want to let this go silently, so eventually we should log.
507 507 raise zmq.ZMQError()
508 508
509 509 def _queue_reply(self, msg):
510 510 self.msg_queue.put(msg)
511 511 self.add_io_state(POLLOUT)
512 512
513 513
514 514 class HBSocketChannel(ZmqSocketChannel):
515 515 """The heartbeat channel which monitors the kernel heartbeat.
516 516
517 517 Note that the heartbeat channel is paused by default. As long as you start
518 518 this channel, the kernel manager will ensure that it is paused and un-paused
519 519 as appropriate.
520 520 """
521 521
522 522 time_to_dead = 3.0
523 523 socket = None
524 524 poller = None
525 525 _running = None
526 526 _pause = None
527 527
528 528 def __init__(self, context, session, address):
529 529 super(HBSocketChannel, self).__init__(context, session, address)
530 530 self._running = False
531 531 self._pause = True
532 532
533 533 def _create_socket(self):
534 534 self.socket = self.context.socket(zmq.REQ)
535 535 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
536 536 self.socket.connect('tcp://%s:%i' % self.address)
537 537 self.poller = zmq.Poller()
538 538 self.poller.register(self.socket, zmq.POLLIN)
539 539
540 540 def run(self):
541 541 """The thread's main activity. Call start() instead."""
542 542 self._create_socket()
543 543 self._running = True
544 544 while self._running:
545 545 if self._pause:
546 546 time.sleep(self.time_to_dead)
547 547 else:
548 548 since_last_heartbeat = 0.0
549 549 request_time = time.time()
550 550 try:
551 551 #io.rprint('Ping from HB channel') # dbg
552 552 self.socket.send(b'ping')
553 553 except zmq.ZMQError, e:
554 554 #io.rprint('*** HB Error:', e) # dbg
555 555 if e.errno == zmq.EFSM:
556 556 #io.rprint('sleep...', self.time_to_dead) # dbg
557 557 time.sleep(self.time_to_dead)
558 558 self._create_socket()
559 559 else:
560 560 raise
561 561 else:
562 562 while True:
563 563 try:
564 564 self.socket.recv(zmq.NOBLOCK)
565 565 except zmq.ZMQError, e:
566 566 #io.rprint('*** HB Error 2:', e) # dbg
567 567 if e.errno == zmq.EAGAIN:
568 568 before_poll = time.time()
569 569 until_dead = self.time_to_dead - (before_poll -
570 570 request_time)
571 571
572 572 # When the return value of poll() is an empty
573 573 # list, that is when things have gone wrong
574 574 # (zeromq bug). As long as it is not an empty
575 575 # list, poll is working correctly even if it
576 576 # returns quickly. Note: poll timeout is in
577 577 # milliseconds.
578 578 self.poller.poll(1000*until_dead)
579 579
580 580 since_last_heartbeat = time.time()-request_time
581 581 if since_last_heartbeat > self.time_to_dead:
582 582 self.call_handlers(since_last_heartbeat)
583 583 break
584 584 else:
585 585 # FIXME: We should probably log this instead.
586 586 raise
587 587 else:
588 588 until_dead = self.time_to_dead - (time.time() -
589 589 request_time)
590 590 if until_dead > 0.0:
591 591 #io.rprint('sleep...', self.time_to_dead) # dbg
592 592 time.sleep(until_dead)
593 593 break
594 594
595 595 def pause(self):
596 596 """Pause the heartbeat."""
597 597 self._pause = True
598 598
599 599 def unpause(self):
600 600 """Unpause the heartbeat."""
601 601 self._pause = False
602 602
603 603 def is_beating(self):
604 604 """Is the heartbeat running and not paused."""
605 605 if self.is_alive() and not self._pause:
606 606 return True
607 607 else:
608 608 return False
609 609
610 610 def stop(self):
611 611 self._running = False
612 612 super(HBSocketChannel, self).stop()
613 613
614 614 def call_handlers(self, since_last_heartbeat):
615 615 """This method is called in the ioloop thread when a message arrives.
616 616
617 617 Subclasses should override this method to handle incoming messages.
618 618 It is important to remember that this method is called in the thread
619 619 so that some logic must be done to ensure that the application leve
620 620 handlers are called in the application thread.
621 621 """
622 622 raise NotImplementedError('call_handlers must be defined in a subclass.')
623 623
624 624
625 625 #-----------------------------------------------------------------------------
626 626 # Main kernel manager class
627 627 #-----------------------------------------------------------------------------
628 628
629 629 class KernelManager(HasTraits):
630 630 """ Manages a kernel for a frontend.
631 631
632 632 The SUB channel is for the frontend to receive messages published by the
633 633 kernel.
634 634
635 635 The REQ channel is for the frontend to make requests of the kernel.
636 636
637 637 The REP channel is for the kernel to request stdin (raw_input) from the
638 638 frontend.
639 639 """
640 640 # The PyZMQ Context to use for communication with the kernel.
641 641 context = Instance(zmq.Context,(),{})
642 642
643 643 # The Session to use for communication with the kernel.
644 644 session = Instance(Session,(),{})
645 645
646 646 # The kernel process with which the KernelManager is communicating.
647 647 kernel = Instance(Popen)
648 648
649 649 # The addresses for the communication channels.
650 650 xreq_address = TCPAddress((LOCALHOST, 0))
651 651 sub_address = TCPAddress((LOCALHOST, 0))
652 652 rep_address = TCPAddress((LOCALHOST, 0))
653 653 hb_address = TCPAddress((LOCALHOST, 0))
654 654
655 655 # The classes to use for the various channels.
656 656 xreq_channel_class = Type(XReqSocketChannel)
657 657 sub_channel_class = Type(SubSocketChannel)
658 658 rep_channel_class = Type(RepSocketChannel)
659 659 hb_channel_class = Type(HBSocketChannel)
660 660
661 661 # Protected traits.
662 662 _launch_args = Any
663 663 _xreq_channel = Any
664 664 _sub_channel = Any
665 665 _rep_channel = Any
666 666 _hb_channel = Any
667 667
668 668 def __init__(self, **kwargs):
669 669 super(KernelManager, self).__init__(**kwargs)
670 670 # Uncomment this to try closing the context.
671 671 # atexit.register(self.context.close)
672 672
673 673 #--------------------------------------------------------------------------
674 674 # Channel management methods:
675 675 #--------------------------------------------------------------------------
676 676
677 677 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
678 678 """Starts the channels for this kernel.
679 679
680 680 This will create the channels if they do not exist and then start
681 681 them. If port numbers of 0 are being used (random ports) then you
682 682 must first call :method:`start_kernel`. If the channels have been
683 683 stopped and you call this, :class:`RuntimeError` will be raised.
684 684 """
685 685 if xreq:
686 686 self.xreq_channel.start()
687 687 if sub:
688 688 self.sub_channel.start()
689 689 if rep:
690 690 self.rep_channel.start()
691 691 if hb:
692 692 self.hb_channel.start()
693 693
694 694 def stop_channels(self):
695 695 """Stops all the running channels for this kernel.
696 696 """
697 697 if self.xreq_channel.is_alive():
698 698 self.xreq_channel.stop()
699 699 if self.sub_channel.is_alive():
700 700 self.sub_channel.stop()
701 701 if self.rep_channel.is_alive():
702 702 self.rep_channel.stop()
703 703 if self.hb_channel.is_alive():
704 704 self.hb_channel.stop()
705 705
706 706 @property
707 707 def channels_running(self):
708 708 """Are any of the channels created and running?"""
709 709 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
710 710 self.rep_channel.is_alive() or self.hb_channel.is_alive())
711 711
712 712 #--------------------------------------------------------------------------
713 713 # Kernel process management methods:
714 714 #--------------------------------------------------------------------------
715 715
716 716 def start_kernel(self, **kw):
717 717 """Starts a kernel process and configures the manager to use it.
718 718
719 719 If random ports (port=0) are being used, this method must be called
720 720 before the channels are created.
721 721
722 722 Parameters:
723 723 -----------
724 724 ipython : bool, optional (default True)
725 725 Whether to use an IPython kernel instead of a plain Python kernel.
726 726 """
727 727 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
728 728 self.rep_address, self.hb_address
729 729 if xreq[0] not in LOCAL_IPS or sub[0] not in LOCAL_IPS or \
730 730 rep[0] not in LOCAL_IPS or hb[0] not in LOCAL_IPS:
731 731 raise RuntimeError("Can only launch a kernel on a local interface. "
732 732 "Make sure that the '*_address' attributes are "
733 733 "configured properly. "
734 734 "Currently valid addresses are: %s"%LOCAL_IPS
735 735 )
736 736
737 737 self._launch_args = kw.copy()
738 738 if kw.pop('ipython', True):
739 739 from ipkernel import launch_kernel
740 740 else:
741 741 from pykernel import launch_kernel
742 742 self.kernel, xrep, pub, req, _hb = launch_kernel(
743 743 xrep_port=xreq[1], pub_port=sub[1],
744 744 req_port=rep[1], hb_port=hb[1], **kw)
745 745 self.xreq_address = (xreq[0], xrep)
746 746 self.sub_address = (sub[0], pub)
747 747 self.rep_address = (rep[0], req)
748 748 self.hb_address = (hb[0], _hb)
749 749
750 750 def shutdown_kernel(self, restart=False):
751 751 """ Attempts to the stop the kernel process cleanly. If the kernel
752 752 cannot be stopped, it is killed, if possible.
753 753 """
754 754 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
755 755 if sys.platform == 'win32':
756 756 self.kill_kernel()
757 757 return
758 758
759 759 # Pause the heart beat channel if it exists.
760 760 if self._hb_channel is not None:
761 761 self._hb_channel.pause()
762 762
763 763 # Don't send any additional kernel kill messages immediately, to give
764 764 # the kernel a chance to properly execute shutdown actions. Wait for at
765 765 # most 1s, checking every 0.1s.
766 766 self.xreq_channel.shutdown(restart=restart)
767 767 for i in range(10):
768 768 if self.is_alive:
769 769 time.sleep(0.1)
770 770 else:
771 771 break
772 772 else:
773 773 # OK, we've waited long enough.
774 774 if self.has_kernel:
775 775 self.kill_kernel()
776 776
777 777 def restart_kernel(self, now=False):
778 778 """Restarts a kernel with the same arguments that were used to launch
779 779 it. If the old kernel was launched with random ports, the same ports
780 780 will be used for the new kernel.
781 781
782 782 Parameters
783 783 ----------
784 784 now : bool, optional
785 785 If True, the kernel is forcefully restarted *immediately*, without
786 786 having a chance to do any cleanup action. Otherwise the kernel is
787 787 given 1s to clean up before a forceful restart is issued.
788 788
789 789 In all cases the kernel is restarted, the only difference is whether
790 790 it is given a chance to perform a clean shutdown or not.
791 791 """
792 792 if self._launch_args is None:
793 793 raise RuntimeError("Cannot restart the kernel. "
794 794 "No previous call to 'start_kernel'.")
795 795 else:
796 796 if self.has_kernel:
797 797 if now:
798 798 self.kill_kernel()
799 799 else:
800 800 self.shutdown_kernel(restart=True)
801 801 self.start_kernel(**self._launch_args)
802 802
803 803 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
804 804 # unless there is some delay here.
805 805 if sys.platform == 'win32':
806 806 time.sleep(0.2)
807 807
808 808 @property
809 809 def has_kernel(self):
810 810 """Returns whether a kernel process has been specified for the kernel
811 811 manager.
812 812 """
813 813 return self.kernel is not None
814 814
815 815 def kill_kernel(self):
816 816 """ Kill the running kernel. """
817 817 if self.has_kernel:
818 818 # Pause the heart beat channel if it exists.
819 819 if self._hb_channel is not None:
820 820 self._hb_channel.pause()
821 821
822 822 # Attempt to kill the kernel.
823 823 try:
824 824 self.kernel.kill()
825 825 except OSError, e:
826 826 # In Windows, we will get an Access Denied error if the process
827 827 # has already terminated. Ignore it.
828 828 if not (sys.platform == 'win32' and e.winerror == 5):
829 829 raise
830 830 self.kernel = None
831 831 else:
832 832 raise RuntimeError("Cannot kill kernel. No kernel is running!")
833 833
834 834 def interrupt_kernel(self):
835 835 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
836 836 well supported on all platforms.
837 837 """
838 838 if self.has_kernel:
839 839 if sys.platform == 'win32':
840 840 from parentpoller import ParentPollerWindows as Poller
841 841 Poller.send_interrupt(self.kernel.win32_interrupt_event)
842 842 else:
843 843 self.kernel.send_signal(signal.SIGINT)
844 844 else:
845 845 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
846 846
847 847 def signal_kernel(self, signum):
848 848 """ Sends a signal to the kernel. Note that since only SIGTERM is
849 849 supported on Windows, this function is only useful on Unix systems.
850 850 """
851 851 if self.has_kernel:
852 852 self.kernel.send_signal(signum)
853 853 else:
854 854 raise RuntimeError("Cannot signal kernel. No kernel is running!")
855 855
856 856 @property
857 857 def is_alive(self):
858 858 """Is the kernel process still running?"""
859 859 # FIXME: not using a heartbeat means this method is broken for any
860 860 # remote kernel, it's only capable of handling local kernels.
861 861 if self.has_kernel:
862 862 if self.kernel.poll() is None:
863 863 return True
864 864 else:
865 865 return False
866 866 else:
867 867 # We didn't start the kernel with this KernelManager so we don't
868 868 # know if it is running. We should use a heartbeat for this case.
869 869 return True
870 870
871 871 #--------------------------------------------------------------------------
872 872 # Channels used for communication with the kernel:
873 873 #--------------------------------------------------------------------------
874 874
875 875 @property
876 876 def xreq_channel(self):
877 877 """Get the REQ socket channel object to make requests of the kernel."""
878 878 if self._xreq_channel is None:
879 879 self._xreq_channel = self.xreq_channel_class(self.context,
880 880 self.session,
881 881 self.xreq_address)
882 882 return self._xreq_channel
883 883
884 884 @property
885 885 def sub_channel(self):
886 886 """Get the SUB socket channel object."""
887 887 if self._sub_channel is None:
888 888 self._sub_channel = self.sub_channel_class(self.context,
889 889 self.session,
890 890 self.sub_address)
891 891 return self._sub_channel
892 892
893 893 @property
894 894 def rep_channel(self):
895 895 """Get the REP socket channel object to handle stdin (raw_input)."""
896 896 if self._rep_channel is None:
897 897 self._rep_channel = self.rep_channel_class(self.context,
898 898 self.session,
899 899 self.rep_address)
900 900 return self._rep_channel
901 901
902 902 @property
903 903 def hb_channel(self):
904 """Get the REP socket channel object to handle stdin (raw_input)."""
904 """Get the heartbeat socket channel object to check that the
905 kernel is alive."""
905 906 if self._hb_channel is None:
906 907 self._hb_channel = self.hb_channel_class(self.context,
907 908 self.session,
908 909 self.hb_address)
909 910 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now