##// END OF EJS Templates
Adding some temporary hacks to work around (Py)ZMQ bugs on Windows.
epatters -
Show More
@@ -1,840 +1,851 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 3 Todo
4 4 ====
5 5
6 6 * Create logger to handle debugging and console messages.
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2008-2010 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 # Standard library imports.
21 21 from Queue import Queue, Empty
22 22 from subprocess import Popen
23 import sys
23 24 from threading import Thread
24 25 import time
25 26
26 27 # System library imports.
27 28 import zmq
28 29 from zmq import POLLIN, POLLOUT, POLLERR
29 30 from zmq.eventloop import ioloop
30 31
31 32 # Local imports.
32 33 from IPython.utils import io
33 34 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
34 35 from session import Session
35 36
36 37 #-----------------------------------------------------------------------------
37 38 # Constants and exceptions
38 39 #-----------------------------------------------------------------------------
39 40
40 41 LOCALHOST = '127.0.0.1'
41 42
42 43 class InvalidPortNumber(Exception):
43 44 pass
44 45
45 46 #-----------------------------------------------------------------------------
46 47 # Utility functions
47 48 #-----------------------------------------------------------------------------
48 49
49 50 # some utilities to validate message structure, these might get moved elsewhere
50 51 # if they prove to have more generic utility
51 52
52 53 def validate_string_list(lst):
53 54 """Validate that the input is a list of strings.
54 55
55 56 Raises ValueError if not."""
56 57 if not isinstance(lst, list):
57 58 raise ValueError('input %r must be a list' % lst)
58 59 for x in lst:
59 60 if not isinstance(x, basestring):
60 61 raise ValueError('element %r in list must be a string' % x)
61 62
62 63
63 64 def validate_string_dict(dct):
64 65 """Validate that the input is a dict with string keys and values.
65 66
66 67 Raises ValueError if not."""
67 68 for k,v in dct.iteritems():
68 69 if not isinstance(k, basestring):
69 70 raise ValueError('key %r in dict must be a string' % k)
70 71 if not isinstance(v, basestring):
71 72 raise ValueError('value %r in dict must be a string' % v)
72 73
73 74
74 75 #-----------------------------------------------------------------------------
75 76 # ZMQ Socket Channel classes
76 77 #-----------------------------------------------------------------------------
77 78
78 79 class ZmqSocketChannel(Thread):
79 80 """The base class for the channels that use ZMQ sockets.
80 81 """
81 82 context = None
82 83 session = None
83 84 socket = None
84 85 ioloop = None
85 86 iostate = None
86 87 _address = None
87 88
88 89 def __init__(self, context, session, address):
89 90 """Create a channel
90 91
91 92 Parameters
92 93 ----------
93 94 context : :class:`zmq.Context`
94 95 The ZMQ context to use.
95 96 session : :class:`session.Session`
96 97 The session to use.
97 98 address : tuple
98 99 Standard (ip, port) tuple that the kernel is listening on.
99 100 """
100 101 super(ZmqSocketChannel, self).__init__()
101 102 self.daemon = True
102 103
103 104 self.context = context
104 105 self.session = session
105 106 if address[1] == 0:
106 107 message = 'The port number for a channel cannot be 0.'
107 108 raise InvalidPortNumber(message)
108 109 self._address = address
109 110
110 111 def stop(self):
111 112 """Stop the channel's activity.
112 113
113 114 This calls :method:`Thread.join` and returns when the thread
114 115 terminates. :class:`RuntimeError` will be raised if
115 116 :method:`self.start` is called again.
116 117 """
117 118 self.join()
118 119
119 120 @property
120 121 def address(self):
121 122 """Get the channel's address as an (ip, port) tuple.
122 123
123 124 By the default, the address is (localhost, 0), where 0 means a random
124 125 port.
125 126 """
126 127 return self._address
127 128
128 129 def add_io_state(self, state):
129 130 """Add IO state to the eventloop.
130 131
131 132 Parameters
132 133 ----------
133 134 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
134 135 The IO state flag to set.
135 136
136 137 This is thread safe as it uses the thread safe IOLoop.add_callback.
137 138 """
138 139 def add_io_state_callback():
139 140 if not self.iostate & state:
140 141 self.iostate = self.iostate | state
141 142 self.ioloop.update_handler(self.socket, self.iostate)
142 143 self.ioloop.add_callback(add_io_state_callback)
143 144
144 145 def drop_io_state(self, state):
145 146 """Drop IO state from the eventloop.
146 147
147 148 Parameters
148 149 ----------
149 150 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
150 151 The IO state flag to set.
151 152
152 153 This is thread safe as it uses the thread safe IOLoop.add_callback.
153 154 """
154 155 def drop_io_state_callback():
155 156 if self.iostate & state:
156 157 self.iostate = self.iostate & (~state)
157 158 self.ioloop.update_handler(self.socket, self.iostate)
158 159 self.ioloop.add_callback(drop_io_state_callback)
159 160
160 161
161 162 class XReqSocketChannel(ZmqSocketChannel):
162 163 """The XREQ channel for issues request/replies to the kernel.
163 164 """
164 165
165 166 command_queue = None
166 167
167 168 def __init__(self, context, session, address):
168 169 self.command_queue = Queue()
169 170 super(XReqSocketChannel, self).__init__(context, session, address)
170 171
171 172 def run(self):
172 173 """The thread's main activity. Call start() instead."""
173 174 self.socket = self.context.socket(zmq.XREQ)
174 175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
175 176 self.socket.connect('tcp://%s:%i' % self.address)
176 177 self.ioloop = ioloop.IOLoop()
177 178 self.iostate = POLLERR|POLLIN
178 179 self.ioloop.add_handler(self.socket, self._handle_events,
179 180 self.iostate)
180 181 self.ioloop.start()
181 182
182 183 def stop(self):
183 184 self.ioloop.stop()
184 185 super(XReqSocketChannel, self).stop()
185 186
186 187 def call_handlers(self, msg):
187 188 """This method is called in the ioloop thread when a message arrives.
188 189
189 190 Subclasses should override this method to handle incoming messages.
190 191 It is important to remember that this method is called in the thread
191 192 so that some logic must be done to ensure that the application leve
192 193 handlers are called in the application thread.
193 194 """
194 195 raise NotImplementedError('call_handlers must be defined in a subclass.')
195 196
196 197 def execute(self, code, silent=False,
197 198 user_variables=None, user_expressions=None):
198 199 """Execute code in the kernel.
199 200
200 201 Parameters
201 202 ----------
202 203 code : str
203 204 A string of Python code.
204 205
205 206 silent : bool, optional (default False)
206 207 If set, the kernel will execute the code as quietly possible.
207 208
208 209 user_variables : list, optional
209 210 A list of variable names to pull from the user's namespace. They
210 211 will come back as a dict with these names as keys and their
211 212 :func:`repr` as values.
212 213
213 214 user_expressions : dict, optional
214 215 A dict with string keys and to pull from the user's
215 216 namespace. They will come back as a dict with these names as keys
216 217 and their :func:`repr` as values.
217 218
218 219 Returns
219 220 -------
220 221 The msg_id of the message sent.
221 222 """
222 223 if user_variables is None:
223 224 user_variables = []
224 225 if user_expressions is None:
225 226 user_expressions = {}
226 227
227 228 # Don't waste network traffic if inputs are invalid
228 229 if not isinstance(code, basestring):
229 230 raise ValueError('code %r must be a string' % code)
230 231 validate_string_list(user_variables)
231 232 validate_string_dict(user_expressions)
232 233
233 234 # Create class for content/msg creation. Related to, but possibly
234 235 # not in Session.
235 236 content = dict(code=code, silent=silent,
236 237 user_variables=user_variables,
237 238 user_expressions=user_expressions)
238 239 msg = self.session.msg('execute_request', content)
239 240 self._queue_request(msg)
240 241 return msg['header']['msg_id']
241 242
242 243 def complete(self, text, line, cursor_pos, block=None):
243 244 """Tab complete text in the kernel's namespace.
244 245
245 246 Parameters
246 247 ----------
247 248 text : str
248 249 The text to complete.
249 250 line : str
250 251 The full line of text that is the surrounding context for the
251 252 text to complete.
252 253 cursor_pos : int
253 254 The position of the cursor in the line where the completion was
254 255 requested.
255 256 block : str, optional
256 257 The full block of code in which the completion is being requested.
257 258
258 259 Returns
259 260 -------
260 261 The msg_id of the message sent.
261 262 """
262 263 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
263 264 msg = self.session.msg('complete_request', content)
264 265 self._queue_request(msg)
265 266 return msg['header']['msg_id']
266 267
267 268 def object_info(self, oname):
268 269 """Get metadata information about an object.
269 270
270 271 Parameters
271 272 ----------
272 273 oname : str
273 274 A string specifying the object name.
274 275
275 276 Returns
276 277 -------
277 278 The msg_id of the message sent.
278 279 """
279 280 content = dict(oname=oname)
280 281 msg = self.session.msg('object_info_request', content)
281 282 self._queue_request(msg)
282 283 return msg['header']['msg_id']
283 284
284 285 def history(self, index=None, raw=False, output=True):
285 286 """Get the history list.
286 287
287 288 Parameters
288 289 ----------
289 290 index : n or (n1, n2) or None
290 291 If n, then the last entries. If a tuple, then all in
291 292 range(n1, n2). If None, then all entries. Raises IndexError if
292 293 the format of index is incorrect.
293 294 raw : bool
294 295 If True, return the raw input.
295 296 output : bool
296 297 If True, then return the output as well.
297 298
298 299 Returns
299 300 -------
300 301 The msg_id of the message sent.
301 302 """
302 303 content = dict(index=index, raw=raw, output=output)
303 304 msg = self.session.msg('history_request', content)
304 305 self._queue_request(msg)
305 306 return msg['header']['msg_id']
306 307
307 308 def shutdown(self):
308 309 """Request an immediate kernel shutdown.
309 310
310 311 Upon receipt of the (empty) reply, client code can safely assume that
311 312 the kernel has shut down and it's safe to forcefully terminate it if
312 313 it's still alive.
313 314
314 315 The kernel will send the reply via a function registered with Python's
315 316 atexit module, ensuring it's truly done as the kernel is done with all
316 317 normal operation.
317 318 """
318 319 # Send quit message to kernel. Once we implement kernel-side setattr,
319 320 # this should probably be done that way, but for now this will do.
320 321 msg = self.session.msg('shutdown_request', {})
321 322 self._queue_request(msg)
322 323 return msg['header']['msg_id']
323 324
324 325 def _handle_events(self, socket, events):
325 326 if events & POLLERR:
326 327 self._handle_err()
327 328 if events & POLLOUT:
328 329 self._handle_send()
329 330 if events & POLLIN:
330 331 self._handle_recv()
331 332
332 333 def _handle_recv(self):
333 334 msg = self.socket.recv_json()
334 335 self.call_handlers(msg)
335 336
336 337 def _handle_send(self):
337 338 try:
338 339 msg = self.command_queue.get(False)
339 340 except Empty:
340 341 pass
341 342 else:
342 343 self.socket.send_json(msg)
343 344 if self.command_queue.empty():
344 345 self.drop_io_state(POLLOUT)
345 346
346 347 def _handle_err(self):
347 348 # We don't want to let this go silently, so eventually we should log.
348 349 raise zmq.ZMQError()
349 350
350 351 def _queue_request(self, msg):
351 352 self.command_queue.put(msg)
352 353 self.add_io_state(POLLOUT)
353 354
354 355
355 356 class SubSocketChannel(ZmqSocketChannel):
356 357 """The SUB channel which listens for messages that the kernel publishes.
357 358 """
358 359
359 360 def __init__(self, context, session, address):
360 361 super(SubSocketChannel, self).__init__(context, session, address)
361 362
362 363 def run(self):
363 364 """The thread's main activity. Call start() instead."""
364 365 self.socket = self.context.socket(zmq.SUB)
365 366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
366 367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
367 368 self.socket.connect('tcp://%s:%i' % self.address)
368 369 self.ioloop = ioloop.IOLoop()
369 370 self.iostate = POLLIN|POLLERR
370 371 self.ioloop.add_handler(self.socket, self._handle_events,
371 372 self.iostate)
372 373 self.ioloop.start()
373 374
374 375 def stop(self):
375 376 self.ioloop.stop()
376 377 super(SubSocketChannel, self).stop()
377 378
378 379 def call_handlers(self, msg):
379 380 """This method is called in the ioloop thread when a message arrives.
380 381
381 382 Subclasses should override this method to handle incoming messages.
382 383 It is important to remember that this method is called in the thread
383 384 so that some logic must be done to ensure that the application leve
384 385 handlers are called in the application thread.
385 386 """
386 387 raise NotImplementedError('call_handlers must be defined in a subclass.')
387 388
388 389 def flush(self, timeout=1.0):
389 390 """Immediately processes all pending messages on the SUB channel.
390 391
391 392 Callers should use this method to ensure that :method:`call_handlers`
392 393 has been called for all messages that have been received on the
393 394 0MQ SUB socket of this channel.
394 395
395 396 This method is thread safe.
396 397
397 398 Parameters
398 399 ----------
399 400 timeout : float, optional
400 401 The maximum amount of time to spend flushing, in seconds. The
401 402 default is one second.
402 403 """
403 404 # We do the IOLoop callback process twice to ensure that the IOLoop
404 405 # gets to perform at least one full poll.
405 406 stop_time = time.time() + timeout
406 407 for i in xrange(2):
407 408 self._flushed = False
408 409 self.ioloop.add_callback(self._flush)
409 410 while not self._flushed and time.time() < stop_time:
410 411 time.sleep(0.01)
411 412
412 413 def _handle_events(self, socket, events):
413 414 # Turn on and off POLLOUT depending on if we have made a request
414 415 if events & POLLERR:
415 416 self._handle_err()
416 417 if events & POLLIN:
417 418 self._handle_recv()
418 419
419 420 def _handle_err(self):
420 421 # We don't want to let this go silently, so eventually we should log.
421 422 raise zmq.ZMQError()
422 423
423 424 def _handle_recv(self):
424 425 # Get all of the messages we can
425 426 while True:
426 427 try:
427 428 msg = self.socket.recv_json(zmq.NOBLOCK)
428 429 except zmq.ZMQError:
429 430 # Check the errno?
430 431 # Will this trigger POLLERR?
431 432 break
432 433 else:
433 434 self.call_handlers(msg)
434 435
435 436 def _flush(self):
436 437 """Callback for :method:`self.flush`."""
437 438 self._flushed = True
438 439
439 440
440 441 class RepSocketChannel(ZmqSocketChannel):
441 442 """A reply channel to handle raw_input requests that the kernel makes."""
442 443
443 444 msg_queue = None
444 445
445 446 def __init__(self, context, session, address):
446 447 self.msg_queue = Queue()
447 448 super(RepSocketChannel, self).__init__(context, session, address)
448 449
449 450 def run(self):
450 451 """The thread's main activity. Call start() instead."""
451 452 self.socket = self.context.socket(zmq.XREQ)
452 453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
453 454 self.socket.connect('tcp://%s:%i' % self.address)
454 455 self.ioloop = ioloop.IOLoop()
455 456 self.iostate = POLLERR|POLLIN
456 457 self.ioloop.add_handler(self.socket, self._handle_events,
457 458 self.iostate)
458 459 self.ioloop.start()
459 460
460 461 def stop(self):
461 462 self.ioloop.stop()
462 463 super(RepSocketChannel, self).stop()
463 464
464 465 def call_handlers(self, msg):
465 466 """This method is called in the ioloop thread when a message arrives.
466 467
467 468 Subclasses should override this method to handle incoming messages.
468 469 It is important to remember that this method is called in the thread
469 470 so that some logic must be done to ensure that the application leve
470 471 handlers are called in the application thread.
471 472 """
472 473 raise NotImplementedError('call_handlers must be defined in a subclass.')
473 474
474 475 def input(self, string):
475 476 """Send a string of raw input to the kernel."""
476 477 content = dict(value=string)
477 478 msg = self.session.msg('input_reply', content)
478 479 self._queue_reply(msg)
479 480
480 481 def _handle_events(self, socket, events):
481 482 if events & POLLERR:
482 483 self._handle_err()
483 484 if events & POLLOUT:
484 485 self._handle_send()
485 486 if events & POLLIN:
486 487 self._handle_recv()
487 488
488 489 def _handle_recv(self):
489 490 msg = self.socket.recv_json()
490 491 self.call_handlers(msg)
491 492
492 493 def _handle_send(self):
493 494 try:
494 495 msg = self.msg_queue.get(False)
495 496 except Empty:
496 497 pass
497 498 else:
498 499 self.socket.send_json(msg)
499 500 if self.msg_queue.empty():
500 501 self.drop_io_state(POLLOUT)
501 502
502 503 def _handle_err(self):
503 504 # We don't want to let this go silently, so eventually we should log.
504 505 raise zmq.ZMQError()
505 506
506 507 def _queue_reply(self, msg):
507 508 self.msg_queue.put(msg)
508 509 self.add_io_state(POLLOUT)
509 510
510 511
511 512 class HBSocketChannel(ZmqSocketChannel):
512 513 """The heartbeat channel which monitors the kernel heartbeat."""
513 514
514 515 time_to_dead = 3.0
515 516 socket = None
516 517 poller = None
517 518
518 519 def __init__(self, context, session, address):
519 520 super(HBSocketChannel, self).__init__(context, session, address)
520 521 self._running = False
521 522
522 523 def _create_socket(self):
523 524 self.socket = self.context.socket(zmq.REQ)
524 525 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
525 526 self.socket.connect('tcp://%s:%i' % self.address)
526 527 self.poller = zmq.Poller()
527 528 self.poller.register(self.socket, zmq.POLLIN)
528 529
529 530 def run(self):
530 531 """The thread's main activity. Call start() instead."""
531 532 self._create_socket()
532 533 self._running = True
533 534 # Wait 2 seconds for the kernel to come up and the sockets to auto
534 535 # connect. If we don't we will see the kernel as dead. Also, before
535 536 # the sockets are connected, the poller.poll line below is returning
536 537 # too fast. This avoids that because the polling doesn't start until
537 538 # after the sockets are connected.
538 539 time.sleep(2.0)
539 540 while self._running:
540 541 since_last_heartbeat = 0.0
541 542 request_time = time.time()
542 543 try:
543 544 #io.rprint('Ping from HB channel') # dbg
544 545 self.socket.send_json('ping')
545 546 except zmq.ZMQError, e:
546 547 #io.rprint('*** HB Error:', e) # dbg
547 548 if e.errno == zmq.EFSM:
548 549 #io.rprint('sleep...', self.time_to_dead) # dbg
549 550 time.sleep(self.time_to_dead)
550 551 self._create_socket()
551 552 else:
552 553 raise
553 554 else:
554 555 while True:
555 556 try:
556 557 self.socket.recv_json(zmq.NOBLOCK)
557 558 except zmq.ZMQError, e:
558 559 #io.rprint('*** HB Error 2:', e) # dbg
559 560 if e.errno == zmq.EAGAIN:
560 561 before_poll = time.time()
561 562 until_dead = self.time_to_dead - (before_poll -
562 563 request_time)
563 564
564 565 # When the return value of poll() is an empty list,
565 566 # that is when things have gone wrong (zeromq bug).
566 567 # As long as it is not an empty list, poll is
567 568 # working correctly even if it returns quickly.
568 569 # Note: poll timeout is in milliseconds.
569 570 self.poller.poll(1000*until_dead)
570 571
571 572 since_last_heartbeat = time.time() - request_time
572 573 if since_last_heartbeat > self.time_to_dead:
573 574 self.call_handlers(since_last_heartbeat)
574 575 break
575 576 else:
576 577 # FIXME: We should probably log this instead.
577 578 raise
578 579 else:
579 580 until_dead = self.time_to_dead - (time.time() -
580 581 request_time)
581 582 if until_dead > 0.0:
582 583 #io.rprint('sleep...', self.time_to_dead) # dbg
583 584 time.sleep(until_dead)
584 585 break
585 586
586 587 def stop(self):
587 588 self._running = False
588 589 super(HBSocketChannel, self).stop()
589 590
590 591 def call_handlers(self, since_last_heartbeat):
591 592 """This method is called in the ioloop thread when a message arrives.
592 593
593 594 Subclasses should override this method to handle incoming messages.
594 595 It is important to remember that this method is called in the thread
595 596 so that some logic must be done to ensure that the application leve
596 597 handlers are called in the application thread.
597 598 """
598 599 raise NotImplementedError('call_handlers must be defined in a subclass.')
599 600
600 601
601 602 #-----------------------------------------------------------------------------
602 603 # Main kernel manager class
603 604 #-----------------------------------------------------------------------------
604 605
605 606 class KernelManager(HasTraits):
606 607 """ Manages a kernel for a frontend.
607 608
608 609 The SUB channel is for the frontend to receive messages published by the
609 610 kernel.
610 611
611 612 The REQ channel is for the frontend to make requests of the kernel.
612 613
613 614 The REP channel is for the kernel to request stdin (raw_input) from the
614 615 frontend.
615 616 """
616 617 # The PyZMQ Context to use for communication with the kernel.
617 618 context = Instance(zmq.Context,(),{})
618 619
619 620 # The Session to use for communication with the kernel.
620 621 session = Instance(Session,(),{})
621 622
622 623 # The kernel process with which the KernelManager is communicating.
623 624 kernel = Instance(Popen)
624 625
625 626 # The addresses for the communication channels.
626 627 xreq_address = TCPAddress((LOCALHOST, 0))
627 628 sub_address = TCPAddress((LOCALHOST, 0))
628 629 rep_address = TCPAddress((LOCALHOST, 0))
629 630 hb_address = TCPAddress((LOCALHOST, 0))
630 631
631 632 # The classes to use for the various channels.
632 633 xreq_channel_class = Type(XReqSocketChannel)
633 634 sub_channel_class = Type(SubSocketChannel)
634 635 rep_channel_class = Type(RepSocketChannel)
635 636 hb_channel_class = Type(HBSocketChannel)
636 637
637 638 # Protected traits.
638 639 _launch_args = Any
639 640 _xreq_channel = Any
640 641 _sub_channel = Any
641 642 _rep_channel = Any
642 643 _hb_channel = Any
643 644
644 645 #--------------------------------------------------------------------------
645 646 # Channel management methods:
646 647 #--------------------------------------------------------------------------
647 648
648 649 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
649 650 """Starts the channels for this kernel.
650 651
651 652 This will create the channels if they do not exist and then start
652 653 them. If port numbers of 0 are being used (random ports) then you
653 654 must first call :method:`start_kernel`. If the channels have been
654 655 stopped and you call this, :class:`RuntimeError` will be raised.
655 656 """
656 657 if xreq:
657 658 self.xreq_channel.start()
658 659 if sub:
659 660 self.sub_channel.start()
660 661 if rep:
661 662 self.rep_channel.start()
662 663 if hb:
663 664 self.hb_channel.start()
664 665
665 666 def stop_channels(self):
666 667 """Stops all the running channels for this kernel.
667 668 """
668 669 if self.xreq_channel.is_alive():
669 670 self.xreq_channel.stop()
670 671 if self.sub_channel.is_alive():
671 672 self.sub_channel.stop()
672 673 if self.rep_channel.is_alive():
673 674 self.rep_channel.stop()
674 675 if self.hb_channel.is_alive():
675 676 self.hb_channel.stop()
676 677
677 678 @property
678 679 def channels_running(self):
679 680 """Are any of the channels created and running?"""
680 681 return self.xreq_channel.is_alive() \
681 682 or self.sub_channel.is_alive() \
682 683 or self.rep_channel.is_alive() \
683 684 or self.hb_channel.is_alive()
684 685
685 686 #--------------------------------------------------------------------------
686 687 # Kernel process management methods:
687 688 #--------------------------------------------------------------------------
688 689
689 690 def start_kernel(self, **kw):
690 691 """Starts a kernel process and configures the manager to use it.
691 692
692 693 If random ports (port=0) are being used, this method must be called
693 694 before the channels are created.
694 695
695 696 Parameters:
696 697 -----------
697 698 ipython : bool, optional (default True)
698 699 Whether to use an IPython kernel instead of a plain Python kernel.
699 700 """
700 701 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
701 702 self.rep_address, self.hb_address
702 703 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
703 704 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
704 705 raise RuntimeError("Can only launch a kernel on localhost."
705 706 "Make sure that the '*_address' attributes are "
706 707 "configured properly.")
707 708
708 709 self._launch_args = kw.copy()
709 710 if kw.pop('ipython', True):
710 711 from ipkernel import launch_kernel
711 712 else:
712 713 from pykernel import launch_kernel
713 714 self.kernel, xrep, pub, req, hb = launch_kernel(
714 715 xrep_port=xreq[1], pub_port=sub[1],
715 716 req_port=rep[1], hb_port=hb[1], **kw)
716 717 self.xreq_address = (LOCALHOST, xrep)
717 718 self.sub_address = (LOCALHOST, pub)
718 719 self.rep_address = (LOCALHOST, req)
719 720 self.hb_address = (LOCALHOST, hb)
720 721
721 722 def shutdown_kernel(self):
722 723 """ Attempts to the stop the kernel process cleanly. If the kernel
723 724 cannot be stopped, it is killed, if possible.
724 725 """
726 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
727 if sys.platform == 'win32':
728 self.kill_kernel()
729 return
730
725 731 self.xreq_channel.shutdown()
726 732 # Don't send any additional kernel kill messages immediately, to give
727 733 # the kernel a chance to properly execute shutdown actions. Wait for at
728 734 # most 1s, checking every 0.1s.
729 735 for i in range(10):
730 736 if self.is_alive:
731 737 time.sleep(0.1)
732 738 else:
733 739 break
734 740 else:
735 741 # OK, we've waited long enough.
736 742 if self.has_kernel:
737 743 self.kill_kernel()
738 744
739 745 def restart_kernel(self, instant_death=False):
740 746 """Restarts a kernel with the same arguments that were used to launch
741 747 it. If the old kernel was launched with random ports, the same ports
742 748 will be used for the new kernel.
743 749
744 750 Parameters
745 751 ----------
746 752 instant_death : bool, optional
747 753 If True, the kernel is forcefully restarted *immediately*, without
748 754 having a chance to do any cleanup action. Otherwise the kernel is
749 755 given 1s to clean up before a forceful restart is issued.
750 756
751 757 In all cases the kernel is restarted, the only difference is whether
752 758 it is given a chance to perform a clean shutdown or not.
753 759 """
754 760 if self._launch_args is None:
755 761 raise RuntimeError("Cannot restart the kernel. "
756 762 "No previous call to 'start_kernel'.")
757 763 else:
758 764 if self.has_kernel:
759 765 if instant_death:
760 766 self.kill_kernel()
761 767 else:
762 768 self.shutdown_kernel()
763 769 self.start_kernel(**self._launch_args)
764 770
771 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
772 # unless there is some delay here.
773 if sys.platform == 'win32':
774 time.sleep(0.2)
775
765 776 @property
766 777 def has_kernel(self):
767 778 """Returns whether a kernel process has been specified for the kernel
768 779 manager.
769 780 """
770 781 return self.kernel is not None
771 782
772 783 def kill_kernel(self):
773 784 """ Kill the running kernel. """
774 785 if self.kernel is not None:
775 786 self.kernel.kill()
776 787 self.kernel = None
777 788 else:
778 789 raise RuntimeError("Cannot kill kernel. No kernel is running!")
779 790
780 791 def signal_kernel(self, signum):
781 792 """ Sends a signal to the kernel. """
782 793 if self.kernel is not None:
783 794 self.kernel.send_signal(signum)
784 795 else:
785 796 raise RuntimeError("Cannot signal kernel. No kernel is running!")
786 797
787 798 @property
788 799 def is_alive(self):
789 800 """Is the kernel process still running?"""
790 801 # FIXME: not using a heartbeat means this method is broken for any
791 802 # remote kernel, it's only capable of handling local kernels.
792 803 if self.kernel is not None:
793 804 if self.kernel.poll() is None:
794 805 return True
795 806 else:
796 807 return False
797 808 else:
798 809 # We didn't start the kernel with this KernelManager so we don't
799 810 # know if it is running. We should use a heartbeat for this case.
800 811 return True
801 812
802 813 #--------------------------------------------------------------------------
803 814 # Channels used for communication with the kernel:
804 815 #--------------------------------------------------------------------------
805 816
806 817 @property
807 818 def xreq_channel(self):
808 819 """Get the REQ socket channel object to make requests of the kernel."""
809 820 if self._xreq_channel is None:
810 821 self._xreq_channel = self.xreq_channel_class(self.context,
811 822 self.session,
812 823 self.xreq_address)
813 824 return self._xreq_channel
814 825
815 826 @property
816 827 def sub_channel(self):
817 828 """Get the SUB socket channel object."""
818 829 if self._sub_channel is None:
819 830 self._sub_channel = self.sub_channel_class(self.context,
820 831 self.session,
821 832 self.sub_address)
822 833 return self._sub_channel
823 834
824 835 @property
825 836 def rep_channel(self):
826 837 """Get the REP socket channel object to handle stdin (raw_input)."""
827 838 if self._rep_channel is None:
828 839 self._rep_channel = self.rep_channel_class(self.context,
829 840 self.session,
830 841 self.rep_address)
831 842 return self._rep_channel
832 843
833 844 @property
834 845 def hb_channel(self):
835 846 """Get the REP socket channel object to handle stdin (raw_input)."""
836 847 if self._hb_channel is None:
837 848 self._hb_channel = self.hb_channel_class(self.context,
838 849 self.session,
839 850 self.hb_address)
840 851 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now