##// END OF EJS Templates
Safe kernel killing in Windows.
epatters -
Show More
@@ -1,892 +1,899 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 TODO
4 4 * Create logger to handle debugging and console messages.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2010 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 from Queue import Queue, Empty
20 20 from subprocess import Popen
21 21 import signal
22 22 import sys
23 23 from threading import Thread
24 24 import time
25 25
26 26 # System library imports.
27 27 import zmq
28 28 from zmq import POLLIN, POLLOUT, POLLERR
29 29 from zmq.eventloop import ioloop
30 30
31 31 # Local imports.
32 32 from IPython.utils import io
33 33 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 34 from session import Session
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Constants and exceptions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 LOCALHOST = '127.0.0.1'
41 41
42 42 class InvalidPortNumber(Exception):
43 43 pass
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # Utility functions
47 47 #-----------------------------------------------------------------------------
48 48
49 49 # some utilities to validate message structure, these might get moved elsewhere
50 50 # if they prove to have more generic utility
51 51
52 52 def validate_string_list(lst):
53 53 """Validate that the input is a list of strings.
54 54
55 55 Raises ValueError if not."""
56 56 if not isinstance(lst, list):
57 57 raise ValueError('input %r must be a list' % lst)
58 58 for x in lst:
59 59 if not isinstance(x, basestring):
60 60 raise ValueError('element %r in list must be a string' % x)
61 61
62 62
63 63 def validate_string_dict(dct):
64 64 """Validate that the input is a dict with string keys and values.
65 65
66 66 Raises ValueError if not."""
67 67 for k,v in dct.iteritems():
68 68 if not isinstance(k, basestring):
69 69 raise ValueError('key %r in dict must be a string' % k)
70 70 if not isinstance(v, basestring):
71 71 raise ValueError('value %r in dict must be a string' % v)
72 72
73 73
74 74 #-----------------------------------------------------------------------------
75 75 # ZMQ Socket Channel classes
76 76 #-----------------------------------------------------------------------------
77 77
78 78 class ZmqSocketChannel(Thread):
79 79 """The base class for the channels that use ZMQ sockets.
80 80 """
81 81 context = None
82 82 session = None
83 83 socket = None
84 84 ioloop = None
85 85 iostate = None
86 86 _address = None
87 87
88 88 def __init__(self, context, session, address):
89 89 """Create a channel
90 90
91 91 Parameters
92 92 ----------
93 93 context : :class:`zmq.Context`
94 94 The ZMQ context to use.
95 95 session : :class:`session.Session`
96 96 The session to use.
97 97 address : tuple
98 98 Standard (ip, port) tuple that the kernel is listening on.
99 99 """
100 100 super(ZmqSocketChannel, self).__init__()
101 101 self.daemon = True
102 102
103 103 self.context = context
104 104 self.session = session
105 105 if address[1] == 0:
106 106 message = 'The port number for a channel cannot be 0.'
107 107 raise InvalidPortNumber(message)
108 108 self._address = address
109 109
110 110 def stop(self):
111 111 """Stop the channel's activity.
112 112
113 113 This calls :method:`Thread.join` and returns when the thread
114 114 terminates. :class:`RuntimeError` will be raised if
115 115 :method:`self.start` is called again.
116 116 """
117 117 self.join()
118 118
119 119 @property
120 120 def address(self):
121 121 """Get the channel's address as an (ip, port) tuple.
122 122
123 123 By the default, the address is (localhost, 0), where 0 means a random
124 124 port.
125 125 """
126 126 return self._address
127 127
128 128 def add_io_state(self, state):
129 129 """Add IO state to the eventloop.
130 130
131 131 Parameters
132 132 ----------
133 133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 134 The IO state flag to set.
135 135
136 136 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 137 """
138 138 def add_io_state_callback():
139 139 if not self.iostate & state:
140 140 self.iostate = self.iostate | state
141 141 self.ioloop.update_handler(self.socket, self.iostate)
142 142 self.ioloop.add_callback(add_io_state_callback)
143 143
144 144 def drop_io_state(self, state):
145 145 """Drop IO state from the eventloop.
146 146
147 147 Parameters
148 148 ----------
149 149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 150 The IO state flag to set.
151 151
152 152 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 153 """
154 154 def drop_io_state_callback():
155 155 if self.iostate & state:
156 156 self.iostate = self.iostate & (~state)
157 157 self.ioloop.update_handler(self.socket, self.iostate)
158 158 self.ioloop.add_callback(drop_io_state_callback)
159 159
160 160
161 161 class XReqSocketChannel(ZmqSocketChannel):
162 162 """The XREQ channel for issues request/replies to the kernel.
163 163 """
164 164
165 165 command_queue = None
166 166
167 167 def __init__(self, context, session, address):
168 168 super(XReqSocketChannel, self).__init__(context, session, address)
169 169 self.command_queue = Queue()
170 170 self.ioloop = ioloop.IOLoop()
171 171
172 172 def run(self):
173 173 """The thread's main activity. Call start() instead."""
174 174 self.socket = self.context.socket(zmq.XREQ)
175 175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
176 176 self.socket.connect('tcp://%s:%i' % self.address)
177 177 self.iostate = POLLERR|POLLIN
178 178 self.ioloop.add_handler(self.socket, self._handle_events,
179 179 self.iostate)
180 180 self.ioloop.start()
181 181
182 182 def stop(self):
183 183 self.ioloop.stop()
184 184 super(XReqSocketChannel, self).stop()
185 185
186 186 def call_handlers(self, msg):
187 187 """This method is called in the ioloop thread when a message arrives.
188 188
189 189 Subclasses should override this method to handle incoming messages.
190 190 It is important to remember that this method is called in the thread
191 191 so that some logic must be done to ensure that the application leve
192 192 handlers are called in the application thread.
193 193 """
194 194 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 195
196 196 def execute(self, code, silent=False,
197 197 user_variables=None, user_expressions=None):
198 198 """Execute code in the kernel.
199 199
200 200 Parameters
201 201 ----------
202 202 code : str
203 203 A string of Python code.
204 204
205 205 silent : bool, optional (default False)
206 206 If set, the kernel will execute the code as quietly possible.
207 207
208 208 user_variables : list, optional
209 209 A list of variable names to pull from the user's namespace. They
210 210 will come back as a dict with these names as keys and their
211 211 :func:`repr` as values.
212 212
213 213 user_expressions : dict, optional
214 214 A dict with string keys and to pull from the user's
215 215 namespace. They will come back as a dict with these names as keys
216 216 and their :func:`repr` as values.
217 217
218 218 Returns
219 219 -------
220 220 The msg_id of the message sent.
221 221 """
222 222 if user_variables is None:
223 223 user_variables = []
224 224 if user_expressions is None:
225 225 user_expressions = {}
226 226
227 227 # Don't waste network traffic if inputs are invalid
228 228 if not isinstance(code, basestring):
229 229 raise ValueError('code %r must be a string' % code)
230 230 validate_string_list(user_variables)
231 231 validate_string_dict(user_expressions)
232 232
233 233 # Create class for content/msg creation. Related to, but possibly
234 234 # not in Session.
235 235 content = dict(code=code, silent=silent,
236 236 user_variables=user_variables,
237 237 user_expressions=user_expressions)
238 238 msg = self.session.msg('execute_request', content)
239 239 self._queue_request(msg)
240 240 return msg['header']['msg_id']
241 241
242 242 def complete(self, text, line, cursor_pos, block=None):
243 243 """Tab complete text in the kernel's namespace.
244 244
245 245 Parameters
246 246 ----------
247 247 text : str
248 248 The text to complete.
249 249 line : str
250 250 The full line of text that is the surrounding context for the
251 251 text to complete.
252 252 cursor_pos : int
253 253 The position of the cursor in the line where the completion was
254 254 requested.
255 255 block : str, optional
256 256 The full block of code in which the completion is being requested.
257 257
258 258 Returns
259 259 -------
260 260 The msg_id of the message sent.
261 261 """
262 262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 263 msg = self.session.msg('complete_request', content)
264 264 self._queue_request(msg)
265 265 return msg['header']['msg_id']
266 266
267 267 def object_info(self, oname):
268 268 """Get metadata information about an object.
269 269
270 270 Parameters
271 271 ----------
272 272 oname : str
273 273 A string specifying the object name.
274 274
275 275 Returns
276 276 -------
277 277 The msg_id of the message sent.
278 278 """
279 279 content = dict(oname=oname)
280 280 msg = self.session.msg('object_info_request', content)
281 281 self._queue_request(msg)
282 282 return msg['header']['msg_id']
283 283
284 284 def history(self, index=None, raw=False, output=True):
285 285 """Get the history list.
286 286
287 287 Parameters
288 288 ----------
289 289 index : n or (n1, n2) or None
290 290 If n, then the last entries. If a tuple, then all in
291 291 range(n1, n2). If None, then all entries. Raises IndexError if
292 292 the format of index is incorrect.
293 293 raw : bool
294 294 If True, return the raw input.
295 295 output : bool
296 296 If True, then return the output as well.
297 297
298 298 Returns
299 299 -------
300 300 The msg_id of the message sent.
301 301 """
302 302 content = dict(index=index, raw=raw, output=output)
303 303 msg = self.session.msg('history_request', content)
304 304 self._queue_request(msg)
305 305 return msg['header']['msg_id']
306 306
307 307 def shutdown(self):
308 308 """Request an immediate kernel shutdown.
309 309
310 310 Upon receipt of the (empty) reply, client code can safely assume that
311 311 the kernel has shut down and it's safe to forcefully terminate it if
312 312 it's still alive.
313 313
314 314 The kernel will send the reply via a function registered with Python's
315 315 atexit module, ensuring it's truly done as the kernel is done with all
316 316 normal operation.
317 317 """
318 318 # Send quit message to kernel. Once we implement kernel-side setattr,
319 319 # this should probably be done that way, but for now this will do.
320 320 msg = self.session.msg('shutdown_request', {})
321 321 self._queue_request(msg)
322 322 return msg['header']['msg_id']
323 323
324 324 def _handle_events(self, socket, events):
325 325 if events & POLLERR:
326 326 self._handle_err()
327 327 if events & POLLOUT:
328 328 self._handle_send()
329 329 if events & POLLIN:
330 330 self._handle_recv()
331 331
332 332 def _handle_recv(self):
333 333 msg = self.socket.recv_json()
334 334 self.call_handlers(msg)
335 335
336 336 def _handle_send(self):
337 337 try:
338 338 msg = self.command_queue.get(False)
339 339 except Empty:
340 340 pass
341 341 else:
342 342 self.socket.send_json(msg)
343 343 if self.command_queue.empty():
344 344 self.drop_io_state(POLLOUT)
345 345
346 346 def _handle_err(self):
347 347 # We don't want to let this go silently, so eventually we should log.
348 348 raise zmq.ZMQError()
349 349
350 350 def _queue_request(self, msg):
351 351 self.command_queue.put(msg)
352 352 self.add_io_state(POLLOUT)
353 353
354 354
355 355 class SubSocketChannel(ZmqSocketChannel):
356 356 """The SUB channel which listens for messages that the kernel publishes.
357 357 """
358 358
359 359 def __init__(self, context, session, address):
360 360 super(SubSocketChannel, self).__init__(context, session, address)
361 361 self.ioloop = ioloop.IOLoop()
362 362
363 363 def run(self):
364 364 """The thread's main activity. Call start() instead."""
365 365 self.socket = self.context.socket(zmq.SUB)
366 366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
367 367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
368 368 self.socket.connect('tcp://%s:%i' % self.address)
369 369 self.iostate = POLLIN|POLLERR
370 370 self.ioloop.add_handler(self.socket, self._handle_events,
371 371 self.iostate)
372 372 self.ioloop.start()
373 373
374 374 def stop(self):
375 375 self.ioloop.stop()
376 376 super(SubSocketChannel, self).stop()
377 377
378 378 def call_handlers(self, msg):
379 379 """This method is called in the ioloop thread when a message arrives.
380 380
381 381 Subclasses should override this method to handle incoming messages.
382 382 It is important to remember that this method is called in the thread
383 383 so that some logic must be done to ensure that the application leve
384 384 handlers are called in the application thread.
385 385 """
386 386 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 387
388 388 def flush(self, timeout=1.0):
389 389 """Immediately processes all pending messages on the SUB channel.
390 390
391 391 Callers should use this method to ensure that :method:`call_handlers`
392 392 has been called for all messages that have been received on the
393 393 0MQ SUB socket of this channel.
394 394
395 395 This method is thread safe.
396 396
397 397 Parameters
398 398 ----------
399 399 timeout : float, optional
400 400 The maximum amount of time to spend flushing, in seconds. The
401 401 default is one second.
402 402 """
403 403 # We do the IOLoop callback process twice to ensure that the IOLoop
404 404 # gets to perform at least one full poll.
405 405 stop_time = time.time() + timeout
406 406 for i in xrange(2):
407 407 self._flushed = False
408 408 self.ioloop.add_callback(self._flush)
409 409 while not self._flushed and time.time() < stop_time:
410 410 time.sleep(0.01)
411 411
412 412 def _handle_events(self, socket, events):
413 413 # Turn on and off POLLOUT depending on if we have made a request
414 414 if events & POLLERR:
415 415 self._handle_err()
416 416 if events & POLLIN:
417 417 self._handle_recv()
418 418
419 419 def _handle_err(self):
420 420 # We don't want to let this go silently, so eventually we should log.
421 421 raise zmq.ZMQError()
422 422
423 423 def _handle_recv(self):
424 424 # Get all of the messages we can
425 425 while True:
426 426 try:
427 427 msg = self.socket.recv_json(zmq.NOBLOCK)
428 428 except zmq.ZMQError:
429 429 # Check the errno?
430 430 # Will this trigger POLLERR?
431 431 break
432 432 else:
433 433 self.call_handlers(msg)
434 434
435 435 def _flush(self):
436 436 """Callback for :method:`self.flush`."""
437 437 self._flushed = True
438 438
439 439
440 440 class RepSocketChannel(ZmqSocketChannel):
441 441 """A reply channel to handle raw_input requests that the kernel makes."""
442 442
443 443 msg_queue = None
444 444
445 445 def __init__(self, context, session, address):
446 446 super(RepSocketChannel, self).__init__(context, session, address)
447 447 self.ioloop = ioloop.IOLoop()
448 448 self.msg_queue = Queue()
449 449
450 450 def run(self):
451 451 """The thread's main activity. Call start() instead."""
452 452 self.socket = self.context.socket(zmq.XREQ)
453 453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
454 454 self.socket.connect('tcp://%s:%i' % self.address)
455 455 self.iostate = POLLERR|POLLIN
456 456 self.ioloop.add_handler(self.socket, self._handle_events,
457 457 self.iostate)
458 458 self.ioloop.start()
459 459
460 460 def stop(self):
461 461 self.ioloop.stop()
462 462 super(RepSocketChannel, self).stop()
463 463
464 464 def call_handlers(self, msg):
465 465 """This method is called in the ioloop thread when a message arrives.
466 466
467 467 Subclasses should override this method to handle incoming messages.
468 468 It is important to remember that this method is called in the thread
469 469 so that some logic must be done to ensure that the application leve
470 470 handlers are called in the application thread.
471 471 """
472 472 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 473
474 474 def input(self, string):
475 475 """Send a string of raw input to the kernel."""
476 476 content = dict(value=string)
477 477 msg = self.session.msg('input_reply', content)
478 478 self._queue_reply(msg)
479 479
480 480 def _handle_events(self, socket, events):
481 481 if events & POLLERR:
482 482 self._handle_err()
483 483 if events & POLLOUT:
484 484 self._handle_send()
485 485 if events & POLLIN:
486 486 self._handle_recv()
487 487
488 488 def _handle_recv(self):
489 489 msg = self.socket.recv_json()
490 490 self.call_handlers(msg)
491 491
492 492 def _handle_send(self):
493 493 try:
494 494 msg = self.msg_queue.get(False)
495 495 except Empty:
496 496 pass
497 497 else:
498 498 self.socket.send_json(msg)
499 499 if self.msg_queue.empty():
500 500 self.drop_io_state(POLLOUT)
501 501
502 502 def _handle_err(self):
503 503 # We don't want to let this go silently, so eventually we should log.
504 504 raise zmq.ZMQError()
505 505
506 506 def _queue_reply(self, msg):
507 507 self.msg_queue.put(msg)
508 508 self.add_io_state(POLLOUT)
509 509
510 510
511 511 class HBSocketChannel(ZmqSocketChannel):
512 512 """The heartbeat channel which monitors the kernel heartbeat.
513 513
514 514 Note that the heartbeat channel is paused by default. As long as you start
515 515 this channel, the kernel manager will ensure that it is paused and un-paused
516 516 as appropriate.
517 517 """
518 518
519 519 time_to_dead = 3.0
520 520 socket = None
521 521 poller = None
522 522 _running = None
523 523 _pause = None
524 524
525 525 def __init__(self, context, session, address):
526 526 super(HBSocketChannel, self).__init__(context, session, address)
527 527 self._running = False
528 528 self._pause = True
529 529
530 530 def _create_socket(self):
531 531 self.socket = self.context.socket(zmq.REQ)
532 532 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
533 533 self.socket.connect('tcp://%s:%i' % self.address)
534 534 self.poller = zmq.Poller()
535 535 self.poller.register(self.socket, zmq.POLLIN)
536 536
537 537 def run(self):
538 538 """The thread's main activity. Call start() instead."""
539 539 self._create_socket()
540 540 self._running = True
541 541 while self._running:
542 542 if self._pause:
543 543 time.sleep(self.time_to_dead)
544 544 else:
545 545 since_last_heartbeat = 0.0
546 546 request_time = time.time()
547 547 try:
548 548 #io.rprint('Ping from HB channel') # dbg
549 549 self.socket.send_json('ping')
550 550 except zmq.ZMQError, e:
551 551 #io.rprint('*** HB Error:', e) # dbg
552 552 if e.errno == zmq.EFSM:
553 553 #io.rprint('sleep...', self.time_to_dead) # dbg
554 554 time.sleep(self.time_to_dead)
555 555 self._create_socket()
556 556 else:
557 557 raise
558 558 else:
559 559 while True:
560 560 try:
561 561 self.socket.recv_json(zmq.NOBLOCK)
562 562 except zmq.ZMQError, e:
563 563 #io.rprint('*** HB Error 2:', e) # dbg
564 564 if e.errno == zmq.EAGAIN:
565 565 before_poll = time.time()
566 566 until_dead = self.time_to_dead - (before_poll -
567 567 request_time)
568 568
569 569 # When the return value of poll() is an empty
570 570 # list, that is when things have gone wrong
571 571 # (zeromq bug). As long as it is not an empty
572 572 # list, poll is working correctly even if it
573 573 # returns quickly. Note: poll timeout is in
574 574 # milliseconds.
575 575 self.poller.poll(1000*until_dead)
576 576
577 577 since_last_heartbeat = time.time()-request_time
578 578 if since_last_heartbeat > self.time_to_dead:
579 579 self.call_handlers(since_last_heartbeat)
580 580 break
581 581 else:
582 582 # FIXME: We should probably log this instead.
583 583 raise
584 584 else:
585 585 until_dead = self.time_to_dead - (time.time() -
586 586 request_time)
587 587 if until_dead > 0.0:
588 588 #io.rprint('sleep...', self.time_to_dead) # dbg
589 589 time.sleep(until_dead)
590 590 break
591 591
592 592 def pause(self):
593 593 """Pause the heartbeat."""
594 594 self._pause = True
595 595
596 596 def unpause(self):
597 597 """Unpause the heartbeat."""
598 598 self._pause = False
599 599
600 600 def is_beating(self):
601 601 """Is the heartbeat running and not paused."""
602 602 if self.is_alive() and not self._pause:
603 603 return True
604 604 else:
605 605 return False
606 606
607 607 def stop(self):
608 608 self._running = False
609 609 super(HBSocketChannel, self).stop()
610 610
611 611 def call_handlers(self, since_last_heartbeat):
612 612 """This method is called in the ioloop thread when a message arrives.
613 613
614 614 Subclasses should override this method to handle incoming messages.
615 615 It is important to remember that this method is called in the thread
616 616 so that some logic must be done to ensure that the application leve
617 617 handlers are called in the application thread.
618 618 """
619 619 raise NotImplementedError('call_handlers must be defined in a subclass.')
620 620
621 621
622 622 #-----------------------------------------------------------------------------
623 623 # Main kernel manager class
624 624 #-----------------------------------------------------------------------------
625 625
626 626 class KernelManager(HasTraits):
627 627 """ Manages a kernel for a frontend.
628 628
629 629 The SUB channel is for the frontend to receive messages published by the
630 630 kernel.
631 631
632 632 The REQ channel is for the frontend to make requests of the kernel.
633 633
634 634 The REP channel is for the kernel to request stdin (raw_input) from the
635 635 frontend.
636 636 """
637 637 # The PyZMQ Context to use for communication with the kernel.
638 638 context = Instance(zmq.Context,(),{})
639 639
640 640 # The Session to use for communication with the kernel.
641 641 session = Instance(Session,(),{})
642 642
643 643 # The kernel process with which the KernelManager is communicating.
644 644 kernel = Instance(Popen)
645 645
646 646 # The addresses for the communication channels.
647 647 xreq_address = TCPAddress((LOCALHOST, 0))
648 648 sub_address = TCPAddress((LOCALHOST, 0))
649 649 rep_address = TCPAddress((LOCALHOST, 0))
650 650 hb_address = TCPAddress((LOCALHOST, 0))
651 651
652 652 # The classes to use for the various channels.
653 653 xreq_channel_class = Type(XReqSocketChannel)
654 654 sub_channel_class = Type(SubSocketChannel)
655 655 rep_channel_class = Type(RepSocketChannel)
656 656 hb_channel_class = Type(HBSocketChannel)
657 657
658 658 # Protected traits.
659 659 _launch_args = Any
660 660 _xreq_channel = Any
661 661 _sub_channel = Any
662 662 _rep_channel = Any
663 663 _hb_channel = Any
664 664
665 665 #--------------------------------------------------------------------------
666 666 # Channel management methods:
667 667 #--------------------------------------------------------------------------
668 668
669 669 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 670 """Starts the channels for this kernel.
671 671
672 672 This will create the channels if they do not exist and then start
673 673 them. If port numbers of 0 are being used (random ports) then you
674 674 must first call :method:`start_kernel`. If the channels have been
675 675 stopped and you call this, :class:`RuntimeError` will be raised.
676 676 """
677 677 if xreq:
678 678 self.xreq_channel.start()
679 679 if sub:
680 680 self.sub_channel.start()
681 681 if rep:
682 682 self.rep_channel.start()
683 683 if hb:
684 684 self.hb_channel.start()
685 685
686 686 def stop_channels(self):
687 687 """Stops all the running channels for this kernel.
688 688 """
689 689 if self.xreq_channel.is_alive():
690 690 self.xreq_channel.stop()
691 691 if self.sub_channel.is_alive():
692 692 self.sub_channel.stop()
693 693 if self.rep_channel.is_alive():
694 694 self.rep_channel.stop()
695 695 if self.hb_channel.is_alive():
696 696 self.hb_channel.stop()
697 697
698 698 @property
699 699 def channels_running(self):
700 700 """Are any of the channels created and running?"""
701 701 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
702 702 self.rep_channel.is_alive() or self.hb_channel.is_alive())
703 703
704 704 #--------------------------------------------------------------------------
705 705 # Kernel process management methods:
706 706 #--------------------------------------------------------------------------
707 707
708 708 def start_kernel(self, **kw):
709 709 """Starts a kernel process and configures the manager to use it.
710 710
711 711 If random ports (port=0) are being used, this method must be called
712 712 before the channels are created.
713 713
714 714 Parameters:
715 715 -----------
716 716 ipython : bool, optional (default True)
717 717 Whether to use an IPython kernel instead of a plain Python kernel.
718 718 """
719 719 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
720 720 self.rep_address, self.hb_address
721 721 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
722 722 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
723 723 raise RuntimeError("Can only launch a kernel on localhost."
724 724 "Make sure that the '*_address' attributes are "
725 725 "configured properly.")
726 726
727 727 self._launch_args = kw.copy()
728 728 if kw.pop('ipython', True):
729 729 from ipkernel import launch_kernel
730 730 else:
731 731 from pykernel import launch_kernel
732 732 self.kernel, xrep, pub, req, hb = launch_kernel(
733 733 xrep_port=xreq[1], pub_port=sub[1],
734 734 req_port=rep[1], hb_port=hb[1], **kw)
735 735 self.xreq_address = (LOCALHOST, xrep)
736 736 self.sub_address = (LOCALHOST, pub)
737 737 self.rep_address = (LOCALHOST, req)
738 738 self.hb_address = (LOCALHOST, hb)
739 739
740 740 def shutdown_kernel(self):
741 741 """ Attempts to the stop the kernel process cleanly. If the kernel
742 742 cannot be stopped, it is killed, if possible.
743 743 """
744 744 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
745 745 if sys.platform == 'win32':
746 746 self.kill_kernel()
747 747 return
748 748
749 749 # Pause the heart beat channel if it exists.
750 750 if self._hb_channel is not None:
751 751 self._hb_channel.pause()
752 752
753 753 # Don't send any additional kernel kill messages immediately, to give
754 754 # the kernel a chance to properly execute shutdown actions. Wait for at
755 755 # most 1s, checking every 0.1s.
756 756 self.xreq_channel.shutdown()
757 757 for i in range(10):
758 758 if self.is_alive:
759 759 time.sleep(0.1)
760 760 else:
761 761 break
762 762 else:
763 763 # OK, we've waited long enough.
764 764 if self.has_kernel:
765 765 self.kill_kernel()
766 766
767 767 def restart_kernel(self, now=False):
768 768 """Restarts a kernel with the same arguments that were used to launch
769 769 it. If the old kernel was launched with random ports, the same ports
770 770 will be used for the new kernel.
771 771
772 772 Parameters
773 773 ----------
774 774 now : bool, optional
775 775 If True, the kernel is forcefully restarted *immediately*, without
776 776 having a chance to do any cleanup action. Otherwise the kernel is
777 777 given 1s to clean up before a forceful restart is issued.
778 778
779 779 In all cases the kernel is restarted, the only difference is whether
780 780 it is given a chance to perform a clean shutdown or not.
781 781 """
782 782 if self._launch_args is None:
783 783 raise RuntimeError("Cannot restart the kernel. "
784 784 "No previous call to 'start_kernel'.")
785 785 else:
786 786 if self.has_kernel:
787 787 if now:
788 788 self.kill_kernel()
789 789 else:
790 790 self.shutdown_kernel()
791 791 self.start_kernel(**self._launch_args)
792 792
793 793 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
794 794 # unless there is some delay here.
795 795 if sys.platform == 'win32':
796 796 time.sleep(0.2)
797 797
798 798 @property
799 799 def has_kernel(self):
800 800 """Returns whether a kernel process has been specified for the kernel
801 801 manager.
802 802 """
803 803 return self.kernel is not None
804 804
805 805 def kill_kernel(self):
806 806 """ Kill the running kernel. """
807 807 if self.has_kernel:
808 808 # Pause the heart beat channel if it exists.
809 809 if self._hb_channel is not None:
810 810 self._hb_channel.pause()
811 811
812 self.kernel.kill()
812 # Attempt to kill the kernel.
813 try:
814 self.kernel.kill()
815 except OSError, e:
816 # In Windows, we will get an Access Denied error if the process
817 # has already terminated. Ignore it.
818 if not (sys.platform == 'win32' and e.winerror == 5):
819 raise
813 820 self.kernel = None
814 821 else:
815 822 raise RuntimeError("Cannot kill kernel. No kernel is running!")
816 823
817 824 def interrupt_kernel(self):
818 825 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
819 826 well supported on all platforms.
820 827 """
821 828 if self.has_kernel:
822 829 if sys.platform == 'win32':
823 830 from parentpoller import ParentPollerWindows as Poller
824 831 Poller.send_interrupt(self.kernel.win32_interrupt_event)
825 832 else:
826 833 self.kernel.send_signal(signal.SIGINT)
827 834 else:
828 835 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
829 836
830 837 def signal_kernel(self, signum):
831 838 """ Sends a signal to the kernel. Note that since only SIGTERM is
832 839 supported on Windows, this function is only useful on Unix systems.
833 840 """
834 841 if self.has_kernel:
835 842 self.kernel.send_signal(signum)
836 843 else:
837 844 raise RuntimeError("Cannot signal kernel. No kernel is running!")
838 845
839 846 @property
840 847 def is_alive(self):
841 848 """Is the kernel process still running?"""
842 849 # FIXME: not using a heartbeat means this method is broken for any
843 850 # remote kernel, it's only capable of handling local kernels.
844 851 if self.has_kernel:
845 852 if self.kernel.poll() is None:
846 853 return True
847 854 else:
848 855 return False
849 856 else:
850 857 # We didn't start the kernel with this KernelManager so we don't
851 858 # know if it is running. We should use a heartbeat for this case.
852 859 return True
853 860
854 861 #--------------------------------------------------------------------------
855 862 # Channels used for communication with the kernel:
856 863 #--------------------------------------------------------------------------
857 864
858 865 @property
859 866 def xreq_channel(self):
860 867 """Get the REQ socket channel object to make requests of the kernel."""
861 868 if self._xreq_channel is None:
862 869 self._xreq_channel = self.xreq_channel_class(self.context,
863 870 self.session,
864 871 self.xreq_address)
865 872 return self._xreq_channel
866 873
867 874 @property
868 875 def sub_channel(self):
869 876 """Get the SUB socket channel object."""
870 877 if self._sub_channel is None:
871 878 self._sub_channel = self.sub_channel_class(self.context,
872 879 self.session,
873 880 self.sub_address)
874 881 return self._sub_channel
875 882
876 883 @property
877 884 def rep_channel(self):
878 885 """Get the REP socket channel object to handle stdin (raw_input)."""
879 886 if self._rep_channel is None:
880 887 self._rep_channel = self.rep_channel_class(self.context,
881 888 self.session,
882 889 self.rep_address)
883 890 return self._rep_channel
884 891
885 892 @property
886 893 def hb_channel(self):
887 894 """Get the REP socket channel object to handle stdin (raw_input)."""
888 895 if self._hb_channel is None:
889 896 self._hb_channel = self.hb_channel_class(self.context,
890 897 self.session,
891 898 self.hb_address)
892 899 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now