##// END OF EJS Templates
set linger on every socket I can find...
MinRK -
Show More
@@ -1,634 +1,637 b''
1 1 """Base classes to manage a Client's interaction with a running kernel
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2013 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 from __future__ import absolute_import
16 16
17 17 # Standard library imports
18 18 import atexit
19 19 import errno
20 20 from threading import Thread
21 21 import time
22 22
23 23 import zmq
24 24 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
25 25 # during garbage collection of threads at exit:
26 26 from zmq import ZMQError
27 27 from zmq.eventloop import ioloop, zmqstream
28 28
29 29 # Local imports
30 30 from .channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC,
32 32 HBChannelABC, StdInChannelABC,
33 33 )
34 34 from IPython.utils.py3compat import string_types, iteritems
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Constants and exceptions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 class InvalidPortNumber(Exception):
41 41 pass
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Utility functions
45 45 #-----------------------------------------------------------------------------
46 46
47 47 # some utilities to validate message structure, these might get moved elsewhere
48 48 # if they prove to have more generic utility
49 49
50 50 def validate_string_list(lst):
51 51 """Validate that the input is a list of strings.
52 52
53 53 Raises ValueError if not."""
54 54 if not isinstance(lst, list):
55 55 raise ValueError('input %r must be a list' % lst)
56 56 for x in lst:
57 57 if not isinstance(x, string_types):
58 58 raise ValueError('element %r in list must be a string' % x)
59 59
60 60
61 61 def validate_string_dict(dct):
62 62 """Validate that the input is a dict with string keys and values.
63 63
64 64 Raises ValueError if not."""
65 65 for k,v in iteritems(dct):
66 66 if not isinstance(k, string_types):
67 67 raise ValueError('key %r in dict must be a string' % k)
68 68 if not isinstance(v, string_types):
69 69 raise ValueError('value %r in dict must be a string' % v)
70 70
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # ZMQ Socket Channel classes
74 74 #-----------------------------------------------------------------------------
75 75
76 76 class ZMQSocketChannel(Thread):
77 77 """The base class for the channels that use ZMQ sockets."""
78 78 context = None
79 79 session = None
80 80 socket = None
81 81 ioloop = None
82 82 stream = None
83 83 _address = None
84 84 _exiting = False
85 85 proxy_methods = []
86 86
87 87 def __init__(self, context, session, address):
88 88 """Create a channel.
89 89
90 90 Parameters
91 91 ----------
92 92 context : :class:`zmq.Context`
93 93 The ZMQ context to use.
94 94 session : :class:`session.Session`
95 95 The session to use.
96 96 address : zmq url
97 97 Standard (ip, port) tuple that the kernel is listening on.
98 98 """
99 99 super(ZMQSocketChannel, self).__init__()
100 100 self.daemon = True
101 101
102 102 self.context = context
103 103 self.session = session
104 104 if isinstance(address, tuple):
105 105 if address[1] == 0:
106 106 message = 'The port number for a channel cannot be 0.'
107 107 raise InvalidPortNumber(message)
108 108 address = "tcp://%s:%i" % address
109 109 self._address = address
110 110 atexit.register(self._notice_exit)
111 111
112 112 def _notice_exit(self):
113 113 self._exiting = True
114 114
115 115 def _run_loop(self):
116 116 """Run my loop, ignoring EINTR events in the poller"""
117 117 while True:
118 118 try:
119 119 self.ioloop.start()
120 120 except ZMQError as e:
121 121 if e.errno == errno.EINTR:
122 122 continue
123 123 else:
124 124 raise
125 125 except Exception:
126 126 if self._exiting:
127 127 break
128 128 else:
129 129 raise
130 130 else:
131 131 break
132 132
133 133 def stop(self):
134 134 """Stop the channel's event loop and join its thread.
135 135
136 136 This calls :meth:`~threading.Thread.join` and returns when the thread
137 137 terminates. :class:`RuntimeError` will be raised if
138 138 :meth:`~threading.Thread.start` is called again.
139 139 """
140 140 if self.ioloop is not None:
141 141 self.ioloop.stop()
142 142 self.join()
143 143 self.close()
144 144
145 145 def close(self):
146 146 if self.ioloop is not None:
147 147 try:
148 148 self.ioloop.close(all_fds=True)
149 149 except Exception:
150 150 pass
151 151 if self.socket is not None:
152 152 try:
153 153 self.socket.close(linger=0)
154 154 except Exception:
155 155 pass
156 156 self.socket = None
157 157
158 158 @property
159 159 def address(self):
160 160 """Get the channel's address as a zmq url string.
161 161
162 162 These URLS have the form: 'tcp://127.0.0.1:5555'.
163 163 """
164 164 return self._address
165 165
166 166 def _queue_send(self, msg):
167 167 """Queue a message to be sent from the IOLoop's thread.
168 168
169 169 Parameters
170 170 ----------
171 171 msg : message to send
172 172
173 173 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
174 174 thread control of the action.
175 175 """
176 176 def thread_send():
177 177 self.session.send(self.stream, msg)
178 178 self.ioloop.add_callback(thread_send)
179 179
180 180 def _handle_recv(self, msg):
181 181 """Callback for stream.on_recv.
182 182
183 183 Unpacks message, and calls handlers with it.
184 184 """
185 185 ident,smsg = self.session.feed_identities(msg)
186 186 self.call_handlers(self.session.unserialize(smsg))
187 187
188 188
189 189
190 190 class ShellChannel(ZMQSocketChannel):
191 191 """The shell channel for issuing request/replies to the kernel."""
192 192
193 193 command_queue = None
194 194 # flag for whether execute requests should be allowed to call raw_input:
195 195 allow_stdin = True
196 196 proxy_methods = [
197 197 'execute',
198 198 'complete',
199 199 'object_info',
200 200 'history',
201 201 'kernel_info',
202 202 'shutdown',
203 203 ]
204 204
205 205 def __init__(self, context, session, address):
206 206 super(ShellChannel, self).__init__(context, session, address)
207 207 self.ioloop = ioloop.IOLoop()
208 208
209 209 def run(self):
210 210 """The thread's main activity. Call start() instead."""
211 211 self.socket = self.context.socket(zmq.DEALER)
212 self.socket.linger = 1000
212 213 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
213 214 self.socket.connect(self.address)
214 215 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
215 216 self.stream.on_recv(self._handle_recv)
216 217 self._run_loop()
217 218
218 219 def call_handlers(self, msg):
219 220 """This method is called in the ioloop thread when a message arrives.
220 221
221 222 Subclasses should override this method to handle incoming messages.
222 223 It is important to remember that this method is called in the thread
223 224 so that some logic must be done to ensure that the application level
224 225 handlers are called in the application thread.
225 226 """
226 227 raise NotImplementedError('call_handlers must be defined in a subclass.')
227 228
228 229 def execute(self, code, silent=False, store_history=True,
229 230 user_variables=None, user_expressions=None, allow_stdin=None):
230 231 """Execute code in the kernel.
231 232
232 233 Parameters
233 234 ----------
234 235 code : str
235 236 A string of Python code.
236 237
237 238 silent : bool, optional (default False)
238 239 If set, the kernel will execute the code as quietly possible, and
239 240 will force store_history to be False.
240 241
241 242 store_history : bool, optional (default True)
242 243 If set, the kernel will store command history. This is forced
243 244 to be False if silent is True.
244 245
245 246 user_variables : list, optional
246 247 A list of variable names to pull from the user's namespace. They
247 248 will come back as a dict with these names as keys and their
248 249 :func:`repr` as values.
249 250
250 251 user_expressions : dict, optional
251 252 A dict mapping names to expressions to be evaluated in the user's
252 253 dict. The expression values are returned as strings formatted using
253 254 :func:`repr`.
254 255
255 256 allow_stdin : bool, optional (default self.allow_stdin)
256 257 Flag for whether the kernel can send stdin requests to frontends.
257 258
258 259 Some frontends (e.g. the Notebook) do not support stdin requests.
259 260 If raw_input is called from code executed from such a frontend, a
260 261 StdinNotImplementedError will be raised.
261 262
262 263 Returns
263 264 -------
264 265 The msg_id of the message sent.
265 266 """
266 267 if user_variables is None:
267 268 user_variables = []
268 269 if user_expressions is None:
269 270 user_expressions = {}
270 271 if allow_stdin is None:
271 272 allow_stdin = self.allow_stdin
272 273
273 274
274 275 # Don't waste network traffic if inputs are invalid
275 276 if not isinstance(code, string_types):
276 277 raise ValueError('code %r must be a string' % code)
277 278 validate_string_list(user_variables)
278 279 validate_string_dict(user_expressions)
279 280
280 281 # Create class for content/msg creation. Related to, but possibly
281 282 # not in Session.
282 283 content = dict(code=code, silent=silent, store_history=store_history,
283 284 user_variables=user_variables,
284 285 user_expressions=user_expressions,
285 286 allow_stdin=allow_stdin,
286 287 )
287 288 msg = self.session.msg('execute_request', content)
288 289 self._queue_send(msg)
289 290 return msg['header']['msg_id']
290 291
291 292 def complete(self, text, line, cursor_pos, block=None):
292 293 """Tab complete text in the kernel's namespace.
293 294
294 295 Parameters
295 296 ----------
296 297 text : str
297 298 The text to complete.
298 299 line : str
299 300 The full line of text that is the surrounding context for the
300 301 text to complete.
301 302 cursor_pos : int
302 303 The position of the cursor in the line where the completion was
303 304 requested.
304 305 block : str, optional
305 306 The full block of code in which the completion is being requested.
306 307
307 308 Returns
308 309 -------
309 310 The msg_id of the message sent.
310 311 """
311 312 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
312 313 msg = self.session.msg('complete_request', content)
313 314 self._queue_send(msg)
314 315 return msg['header']['msg_id']
315 316
316 317 def object_info(self, oname, detail_level=0):
317 318 """Get metadata information about an object in the kernel's namespace.
318 319
319 320 Parameters
320 321 ----------
321 322 oname : str
322 323 A string specifying the object name.
323 324 detail_level : int, optional
324 325 The level of detail for the introspection (0-2)
325 326
326 327 Returns
327 328 -------
328 329 The msg_id of the message sent.
329 330 """
330 331 content = dict(oname=oname, detail_level=detail_level)
331 332 msg = self.session.msg('object_info_request', content)
332 333 self._queue_send(msg)
333 334 return msg['header']['msg_id']
334 335
335 336 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
336 337 """Get entries from the kernel's history list.
337 338
338 339 Parameters
339 340 ----------
340 341 raw : bool
341 342 If True, return the raw input.
342 343 output : bool
343 344 If True, then return the output as well.
344 345 hist_access_type : str
345 346 'range' (fill in session, start and stop params), 'tail' (fill in n)
346 347 or 'search' (fill in pattern param).
347 348
348 349 session : int
349 350 For a range request, the session from which to get lines. Session
350 351 numbers are positive integers; negative ones count back from the
351 352 current session.
352 353 start : int
353 354 The first line number of a history range.
354 355 stop : int
355 356 The final (excluded) line number of a history range.
356 357
357 358 n : int
358 359 The number of lines of history to get for a tail request.
359 360
360 361 pattern : str
361 362 The glob-syntax pattern for a search request.
362 363
363 364 Returns
364 365 -------
365 366 The msg_id of the message sent.
366 367 """
367 368 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
368 369 **kwargs)
369 370 msg = self.session.msg('history_request', content)
370 371 self._queue_send(msg)
371 372 return msg['header']['msg_id']
372 373
373 374 def kernel_info(self):
374 375 """Request kernel info."""
375 376 msg = self.session.msg('kernel_info_request')
376 377 self._queue_send(msg)
377 378 return msg['header']['msg_id']
378 379
379 380 def shutdown(self, restart=False):
380 381 """Request an immediate kernel shutdown.
381 382
382 383 Upon receipt of the (empty) reply, client code can safely assume that
383 384 the kernel has shut down and it's safe to forcefully terminate it if
384 385 it's still alive.
385 386
386 387 The kernel will send the reply via a function registered with Python's
387 388 atexit module, ensuring it's truly done as the kernel is done with all
388 389 normal operation.
389 390 """
390 391 # Send quit message to kernel. Once we implement kernel-side setattr,
391 392 # this should probably be done that way, but for now this will do.
392 393 msg = self.session.msg('shutdown_request', {'restart':restart})
393 394 self._queue_send(msg)
394 395 return msg['header']['msg_id']
395 396
396 397
397 398
398 399 class IOPubChannel(ZMQSocketChannel):
399 400 """The iopub channel which listens for messages that the kernel publishes.
400 401
401 402 This channel is where all output is published to frontends.
402 403 """
403 404
404 405 def __init__(self, context, session, address):
405 406 super(IOPubChannel, self).__init__(context, session, address)
406 407 self.ioloop = ioloop.IOLoop()
407 408
408 409 def run(self):
409 410 """The thread's main activity. Call start() instead."""
410 411 self.socket = self.context.socket(zmq.SUB)
412 self.socket.linger = 1000
411 413 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
412 414 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
413 415 self.socket.connect(self.address)
414 416 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
415 417 self.stream.on_recv(self._handle_recv)
416 418 self._run_loop()
417 419
418 420 def call_handlers(self, msg):
419 421 """This method is called in the ioloop thread when a message arrives.
420 422
421 423 Subclasses should override this method to handle incoming messages.
422 424 It is important to remember that this method is called in the thread
423 425 so that some logic must be done to ensure that the application leve
424 426 handlers are called in the application thread.
425 427 """
426 428 raise NotImplementedError('call_handlers must be defined in a subclass.')
427 429
428 430 def flush(self, timeout=1.0):
429 431 """Immediately processes all pending messages on the iopub channel.
430 432
431 433 Callers should use this method to ensure that :meth:`call_handlers`
432 434 has been called for all messages that have been received on the
433 435 0MQ SUB socket of this channel.
434 436
435 437 This method is thread safe.
436 438
437 439 Parameters
438 440 ----------
439 441 timeout : float, optional
440 442 The maximum amount of time to spend flushing, in seconds. The
441 443 default is one second.
442 444 """
443 445 # We do the IOLoop callback process twice to ensure that the IOLoop
444 446 # gets to perform at least one full poll.
445 447 stop_time = time.time() + timeout
446 448 for i in range(2):
447 449 self._flushed = False
448 450 self.ioloop.add_callback(self._flush)
449 451 while not self._flushed and time.time() < stop_time:
450 452 time.sleep(0.01)
451 453
452 454 def _flush(self):
453 455 """Callback for :method:`self.flush`."""
454 456 self.stream.flush()
455 457 self._flushed = True
456 458
457 459
458 460 class StdInChannel(ZMQSocketChannel):
459 461 """The stdin channel to handle raw_input requests that the kernel makes."""
460 462
461 463 msg_queue = None
462 464 proxy_methods = ['input']
463 465
464 466 def __init__(self, context, session, address):
465 467 super(StdInChannel, self).__init__(context, session, address)
466 468 self.ioloop = ioloop.IOLoop()
467 469
468 470 def run(self):
469 471 """The thread's main activity. Call start() instead."""
470 472 self.socket = self.context.socket(zmq.DEALER)
473 self.socket.linger = 1000
471 474 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
472 475 self.socket.connect(self.address)
473 476 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
474 477 self.stream.on_recv(self._handle_recv)
475 478 self._run_loop()
476 479
477 480 def call_handlers(self, msg):
478 481 """This method is called in the ioloop thread when a message arrives.
479 482
480 483 Subclasses should override this method to handle incoming messages.
481 484 It is important to remember that this method is called in the thread
482 485 so that some logic must be done to ensure that the application leve
483 486 handlers are called in the application thread.
484 487 """
485 488 raise NotImplementedError('call_handlers must be defined in a subclass.')
486 489
487 490 def input(self, string):
488 491 """Send a string of raw input to the kernel."""
489 492 content = dict(value=string)
490 493 msg = self.session.msg('input_reply', content)
491 494 self._queue_send(msg)
492 495
493 496
494 497 class HBChannel(ZMQSocketChannel):
495 498 """The heartbeat channel which monitors the kernel heartbeat.
496 499
497 500 Note that the heartbeat channel is paused by default. As long as you start
498 501 this channel, the kernel manager will ensure that it is paused and un-paused
499 502 as appropriate.
500 503 """
501 504
502 505 time_to_dead = 3.0
503 506 socket = None
504 507 poller = None
505 508 _running = None
506 509 _pause = None
507 510 _beating = None
508 511
509 512 def __init__(self, context, session, address):
510 513 super(HBChannel, self).__init__(context, session, address)
511 514 self._running = False
512 515 self._pause =True
513 516 self.poller = zmq.Poller()
514 517
515 518 def _create_socket(self):
516 519 if self.socket is not None:
517 520 # close previous socket, before opening a new one
518 521 self.poller.unregister(self.socket)
519 522 self.socket.close()
520 523 self.socket = self.context.socket(zmq.REQ)
521 self.socket.setsockopt(zmq.LINGER, 0)
524 self.socket.linger = 1000
522 525 self.socket.connect(self.address)
523 526
524 527 self.poller.register(self.socket, zmq.POLLIN)
525 528
526 529 def _poll(self, start_time):
527 530 """poll for heartbeat replies until we reach self.time_to_dead.
528 531
529 532 Ignores interrupts, and returns the result of poll(), which
530 533 will be an empty list if no messages arrived before the timeout,
531 534 or the event tuple if there is a message to receive.
532 535 """
533 536
534 537 until_dead = self.time_to_dead - (time.time() - start_time)
535 538 # ensure poll at least once
536 539 until_dead = max(until_dead, 1e-3)
537 540 events = []
538 541 while True:
539 542 try:
540 543 events = self.poller.poll(1000 * until_dead)
541 544 except ZMQError as e:
542 545 if e.errno == errno.EINTR:
543 546 # ignore interrupts during heartbeat
544 547 # this may never actually happen
545 548 until_dead = self.time_to_dead - (time.time() - start_time)
546 549 until_dead = max(until_dead, 1e-3)
547 550 pass
548 551 else:
549 552 raise
550 553 except Exception:
551 554 if self._exiting:
552 555 break
553 556 else:
554 557 raise
555 558 else:
556 559 break
557 560 return events
558 561
559 562 def run(self):
560 563 """The thread's main activity. Call start() instead."""
561 564 self._create_socket()
562 565 self._running = True
563 566 self._beating = True
564 567
565 568 while self._running:
566 569 if self._pause:
567 570 # just sleep, and skip the rest of the loop
568 571 time.sleep(self.time_to_dead)
569 572 continue
570 573
571 574 since_last_heartbeat = 0.0
572 575 # io.rprint('Ping from HB channel') # dbg
573 576 # no need to catch EFSM here, because the previous event was
574 577 # either a recv or connect, which cannot be followed by EFSM
575 578 self.socket.send(b'ping')
576 579 request_time = time.time()
577 580 ready = self._poll(request_time)
578 581 if ready:
579 582 self._beating = True
580 583 # the poll above guarantees we have something to recv
581 584 self.socket.recv()
582 585 # sleep the remainder of the cycle
583 586 remainder = self.time_to_dead - (time.time() - request_time)
584 587 if remainder > 0:
585 588 time.sleep(remainder)
586 589 continue
587 590 else:
588 591 # nothing was received within the time limit, signal heart failure
589 592 self._beating = False
590 593 since_last_heartbeat = time.time() - request_time
591 594 self.call_handlers(since_last_heartbeat)
592 595 # and close/reopen the socket, because the REQ/REP cycle has been broken
593 596 self._create_socket()
594 597 continue
595 598
596 599 def pause(self):
597 600 """Pause the heartbeat."""
598 601 self._pause = True
599 602
600 603 def unpause(self):
601 604 """Unpause the heartbeat."""
602 605 self._pause = False
603 606
604 607 def is_beating(self):
605 608 """Is the heartbeat running and responsive (and not paused)."""
606 609 if self.is_alive() and not self._pause and self._beating:
607 610 return True
608 611 else:
609 612 return False
610 613
611 614 def stop(self):
612 615 """Stop the channel's event loop and join its thread."""
613 616 self._running = False
614 617 super(HBChannel, self).stop()
615 618
616 619 def call_handlers(self, since_last_heartbeat):
617 620 """This method is called in the ioloop thread when a message arrives.
618 621
619 622 Subclasses should override this method to handle incoming messages.
620 623 It is important to remember that this method is called in the thread
621 624 so that some logic must be done to ensure that the application level
622 625 handlers are called in the application thread.
623 626 """
624 627 raise NotImplementedError('call_handlers must be defined in a subclass.')
625 628
626 629
627 630 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
628 631 # ABC Registration
629 632 #-----------------------------------------------------------------------------
630 633
631 634 ShellChannelABC.register(ShellChannel)
632 635 IOPubChannelABC.register(IOPubChannel)
633 636 HBChannelABC.register(HBChannel)
634 637 StdInChannelABC.register(StdInChannel)
@@ -1,67 +1,68 b''
1 1 """The client and server for a basic ping-pong style heartbeat.
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2008-2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 import errno
16 16 import os
17 17 import socket
18 18 from threading import Thread
19 19
20 20 import zmq
21 21
22 22 from IPython.utils.localinterfaces import localhost
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Code
26 26 #-----------------------------------------------------------------------------
27 27
28 28
29 29 class Heartbeat(Thread):
30 30 "A simple ping-pong style heartbeat that runs in a thread."
31 31
32 32 def __init__(self, context, addr=None):
33 33 if addr is None:
34 34 addr = ('tcp', localhost(), 0)
35 35 Thread.__init__(self)
36 36 self.context = context
37 37 self.transport, self.ip, self.port = addr
38 38 if self.port == 0:
39 39 if addr[0] == 'tcp':
40 40 s = socket.socket()
41 41 # '*' means all interfaces to 0MQ, which is '' to socket.socket
42 42 s.bind(('' if self.ip == '*' else self.ip, 0))
43 43 self.port = s.getsockname()[1]
44 44 s.close()
45 45 elif addr[0] == 'ipc':
46 46 self.port = 1
47 47 while os.path.exists("%s-%s" % (self.ip, self.port)):
48 48 self.port = self.port + 1
49 49 else:
50 50 raise ValueError("Unrecognized zmq transport: %s" % addr[0])
51 51 self.addr = (self.ip, self.port)
52 52 self.daemon = True
53 53
54 54 def run(self):
55 55 self.socket = self.context.socket(zmq.REP)
56 self.socket.linger = 1000
56 57 c = ':' if self.transport == 'tcp' else '-'
57 58 self.socket.bind('%s://%s' % (self.transport, self.ip) + c + str(self.port))
58 59 while True:
59 60 try:
60 61 zmq.device(zmq.FORWARDER, self.socket, self.socket)
61 62 except zmq.ZMQError as e:
62 63 if e.errno == errno.EINTR:
63 64 continue
64 65 else:
65 66 raise
66 67 else:
67 68 break
@@ -1,473 +1,477 b''
1 1 """An Application for launching a kernel
2 2
3 3 Authors
4 4 -------
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING.txt, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import print_function
19 19
20 20 # Standard library imports
21 21 import atexit
22 22 import json
23 23 import os
24 24 import sys
25 25 import signal
26 26
27 27 # System library imports
28 28 import zmq
29 29 from zmq.eventloop import ioloop
30 30 from zmq.eventloop.zmqstream import ZMQStream
31 31
32 32 # IPython imports
33 33 from IPython.core.ultratb import FormattedTB
34 34 from IPython.core.application import (
35 35 BaseIPythonApplication, base_flags, base_aliases, catch_config_error
36 36 )
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.core.shellapp import (
39 39 InteractiveShellApp, shell_flags, shell_aliases
40 40 )
41 41 from IPython.utils import io
42 42 from IPython.utils.localinterfaces import localhost
43 43 from IPython.utils.path import filefind
44 44 from IPython.utils.py3compat import str_to_bytes
45 45 from IPython.utils.traitlets import (
46 46 Any, Instance, Dict, Unicode, Integer, Bool, CaselessStrEnum,
47 47 DottedObjectName,
48 48 )
49 49 from IPython.utils.importstring import import_item
50 50 from IPython.kernel import write_connection_file
51 51
52 52 # local imports
53 53 from .heartbeat import Heartbeat
54 54 from .ipkernel import Kernel
55 55 from .parentpoller import ParentPollerUnix, ParentPollerWindows
56 56 from .session import (
57 57 Session, session_flags, session_aliases, default_secure,
58 58 )
59 59 from .zmqshell import ZMQInteractiveShell
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Flags and Aliases
63 63 #-----------------------------------------------------------------------------
64 64
65 65 kernel_aliases = dict(base_aliases)
66 66 kernel_aliases.update({
67 67 'ip' : 'IPKernelApp.ip',
68 68 'hb' : 'IPKernelApp.hb_port',
69 69 'shell' : 'IPKernelApp.shell_port',
70 70 'iopub' : 'IPKernelApp.iopub_port',
71 71 'stdin' : 'IPKernelApp.stdin_port',
72 72 'control' : 'IPKernelApp.control_port',
73 73 'f' : 'IPKernelApp.connection_file',
74 74 'parent': 'IPKernelApp.parent_handle',
75 75 'transport': 'IPKernelApp.transport',
76 76 })
77 77 if sys.platform.startswith('win'):
78 78 kernel_aliases['interrupt'] = 'IPKernelApp.interrupt'
79 79
80 80 kernel_flags = dict(base_flags)
81 81 kernel_flags.update({
82 82 'no-stdout' : (
83 83 {'IPKernelApp' : {'no_stdout' : True}},
84 84 "redirect stdout to the null device"),
85 85 'no-stderr' : (
86 86 {'IPKernelApp' : {'no_stderr' : True}},
87 87 "redirect stderr to the null device"),
88 88 'pylab' : (
89 89 {'IPKernelApp' : {'pylab' : 'auto'}},
90 90 """Pre-load matplotlib and numpy for interactive use with
91 91 the default matplotlib backend."""),
92 92 })
93 93
94 94 # inherit flags&aliases for any IPython shell apps
95 95 kernel_aliases.update(shell_aliases)
96 96 kernel_flags.update(shell_flags)
97 97
98 98 # inherit flags&aliases for Sessions
99 99 kernel_aliases.update(session_aliases)
100 100 kernel_flags.update(session_flags)
101 101
102 102 _ctrl_c_message = """\
103 103 NOTE: When using the `ipython kernel` entry point, Ctrl-C will not work.
104 104
105 105 To exit, you will have to explicitly quit this process, by either sending
106 106 "quit" from a client, or using Ctrl-\\ in UNIX-like environments.
107 107
108 108 To read more about this, see https://github.com/ipython/ipython/issues/2049
109 109
110 110 """
111 111
112 112 #-----------------------------------------------------------------------------
113 113 # Application class for starting an IPython Kernel
114 114 #-----------------------------------------------------------------------------
115 115
116 116 class IPKernelApp(BaseIPythonApplication, InteractiveShellApp):
117 117 name='ipkernel'
118 118 aliases = Dict(kernel_aliases)
119 119 flags = Dict(kernel_flags)
120 120 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
121 121 # the kernel class, as an importstring
122 122 kernel_class = DottedObjectName('IPython.kernel.zmq.ipkernel.Kernel', config=True,
123 123 help="""The Kernel subclass to be used.
124 124
125 125 This should allow easy re-use of the IPKernelApp entry point
126 126 to configure and launch kernels other than IPython's own.
127 127 """)
128 128 kernel = Any()
129 129 poller = Any() # don't restrict this even though current pollers are all Threads
130 130 heartbeat = Instance(Heartbeat)
131 131 session = Instance('IPython.kernel.zmq.session.Session')
132 132 ports = Dict()
133 133
134 134 # ipkernel doesn't get its own config file
135 135 def _config_file_name_default(self):
136 136 return 'ipython_config.py'
137 137
138 138 # inherit config file name from parent:
139 139 parent_appname = Unicode(config=True)
140 140 def _parent_appname_changed(self, name, old, new):
141 141 if self.config_file_specified:
142 142 # it was manually specified, ignore
143 143 return
144 144 self.config_file_name = new.replace('-','_') + u'_config.py'
145 145 # don't let this count as specifying the config file
146 146 self.config_file_specified.remove(self.config_file_name)
147 147
148 148 # connection info:
149 149 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
150 150 ip = Unicode(config=True,
151 151 help="Set the IP or interface on which the kernel will listen.")
152 152 def _ip_default(self):
153 153 if self.transport == 'ipc':
154 154 if self.connection_file:
155 155 return os.path.splitext(self.abs_connection_file)[0] + '-ipc'
156 156 else:
157 157 return 'kernel-ipc'
158 158 else:
159 159 return localhost()
160 160
161 161 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
162 162 shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]")
163 163 iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]")
164 164 stdin_port = Integer(0, config=True, help="set the stdin (ROUTER) port [default: random]")
165 165 control_port = Integer(0, config=True, help="set the control (ROUTER) port [default: random]")
166 166 connection_file = Unicode('', config=True,
167 167 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
168 168
169 169 This file will contain the IP, ports, and authentication key needed to connect
170 170 clients to this kernel. By default, this file will be created in the security dir
171 171 of the current profile, but can be specified by absolute path.
172 172 """)
173 173 @property
174 174 def abs_connection_file(self):
175 175 if os.path.basename(self.connection_file) == self.connection_file:
176 176 return os.path.join(self.profile_dir.security_dir, self.connection_file)
177 177 else:
178 178 return self.connection_file
179 179
180 180
181 181 # streams, etc.
182 182 no_stdout = Bool(False, config=True, help="redirect stdout to the null device")
183 183 no_stderr = Bool(False, config=True, help="redirect stderr to the null device")
184 184 outstream_class = DottedObjectName('IPython.kernel.zmq.iostream.OutStream',
185 185 config=True, help="The importstring for the OutStream factory")
186 186 displayhook_class = DottedObjectName('IPython.kernel.zmq.displayhook.ZMQDisplayHook',
187 187 config=True, help="The importstring for the DisplayHook factory")
188 188
189 189 # polling
190 190 parent_handle = Integer(0, config=True,
191 191 help="""kill this process if its parent dies. On Windows, the argument
192 192 specifies the HANDLE of the parent process, otherwise it is simply boolean.
193 193 """)
194 194 interrupt = Integer(0, config=True,
195 195 help="""ONLY USED ON WINDOWS
196 196 Interrupt this process when the parent is signaled.
197 197 """)
198 198
199 199 def init_crash_handler(self):
200 200 # Install minimal exception handling
201 201 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
202 202 ostream=sys.__stdout__)
203 203
204 204 def init_poller(self):
205 205 if sys.platform == 'win32':
206 206 if self.interrupt or self.parent_handle:
207 207 self.poller = ParentPollerWindows(self.interrupt, self.parent_handle)
208 208 elif self.parent_handle:
209 209 self.poller = ParentPollerUnix()
210 210
211 211 def _bind_socket(self, s, port):
212 212 iface = '%s://%s' % (self.transport, self.ip)
213 213 if self.transport == 'tcp':
214 214 if port <= 0:
215 215 port = s.bind_to_random_port(iface)
216 216 else:
217 217 s.bind("tcp://%s:%i" % (self.ip, port))
218 218 elif self.transport == 'ipc':
219 219 if port <= 0:
220 220 port = 1
221 221 path = "%s-%i" % (self.ip, port)
222 222 while os.path.exists(path):
223 223 port = port + 1
224 224 path = "%s-%i" % (self.ip, port)
225 225 else:
226 226 path = "%s-%i" % (self.ip, port)
227 227 s.bind("ipc://%s" % path)
228 228 return port
229 229
230 230 def load_connection_file(self):
231 231 """load ip/port/hmac config from JSON connection file"""
232 232 try:
233 233 fname = filefind(self.connection_file, ['.', self.profile_dir.security_dir])
234 234 except IOError:
235 235 self.log.debug("Connection file not found: %s", self.connection_file)
236 236 # This means I own it, so I will clean it up:
237 237 atexit.register(self.cleanup_connection_file)
238 238 return
239 239 self.log.debug(u"Loading connection file %s", fname)
240 240 with open(fname) as f:
241 241 s = f.read()
242 242 cfg = json.loads(s)
243 243 self.transport = cfg.get('transport', self.transport)
244 244 if self.ip == self._ip_default() and 'ip' in cfg:
245 245 # not overridden by config or cl_args
246 246 self.ip = cfg['ip']
247 247 for channel in ('hb', 'shell', 'iopub', 'stdin', 'control'):
248 248 name = channel + '_port'
249 249 if getattr(self, name) == 0 and name in cfg:
250 250 # not overridden by config or cl_args
251 251 setattr(self, name, cfg[name])
252 252 if 'key' in cfg:
253 253 self.config.Session.key = str_to_bytes(cfg['key'])
254 254
255 255 def write_connection_file(self):
256 256 """write connection info to JSON file"""
257 257 cf = self.abs_connection_file
258 258 self.log.debug("Writing connection file: %s", cf)
259 259 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
260 260 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
261 261 iopub_port=self.iopub_port, control_port=self.control_port)
262 262
263 263 def cleanup_connection_file(self):
264 264 cf = self.abs_connection_file
265 265 self.log.debug("Cleaning up connection file: %s", cf)
266 266 try:
267 267 os.remove(cf)
268 268 except (IOError, OSError):
269 269 pass
270 270
271 271 self.cleanup_ipc_files()
272 272
273 273 def cleanup_ipc_files(self):
274 274 """cleanup ipc files if we wrote them"""
275 275 if self.transport != 'ipc':
276 276 return
277 277 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port):
278 278 ipcfile = "%s-%i" % (self.ip, port)
279 279 try:
280 280 os.remove(ipcfile)
281 281 except (IOError, OSError):
282 282 pass
283 283
284 284 def init_connection_file(self):
285 285 if not self.connection_file:
286 286 self.connection_file = "kernel-%s.json"%os.getpid()
287 287 try:
288 288 self.load_connection_file()
289 289 except Exception:
290 290 self.log.error("Failed to load connection file: %r", self.connection_file, exc_info=True)
291 291 self.exit(1)
292 292
293 293 def init_sockets(self):
294 294 # Create a context, a session, and the kernel sockets.
295 295 self.log.info("Starting the kernel at pid: %i", os.getpid())
296 296 context = zmq.Context.instance()
297 297 # Uncomment this to try closing the context.
298 298 # atexit.register(context.term)
299 299
300 300 self.shell_socket = context.socket(zmq.ROUTER)
301 self.shell_socket.linger = 1000
301 302 self.shell_port = self._bind_socket(self.shell_socket, self.shell_port)
302 303 self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port)
303 304
304 305 self.iopub_socket = context.socket(zmq.PUB)
306 self.iopub_socket.linger = 1000
305 307 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port)
306 308 self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port)
307 309
308 310 self.stdin_socket = context.socket(zmq.ROUTER)
311 self.stdin_socket.linger = 1000
309 312 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port)
310 313 self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port)
311 314
312 315 self.control_socket = context.socket(zmq.ROUTER)
316 self.control_socket.linger = 1000
313 317 self.control_port = self._bind_socket(self.control_socket, self.control_port)
314 318 self.log.debug("control ROUTER Channel on port: %i" % self.control_port)
315 319
316 320 def init_heartbeat(self):
317 321 """start the heart beating"""
318 322 # heartbeat doesn't share context, because it mustn't be blocked
319 323 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
320 324 hb_ctx = zmq.Context()
321 325 self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port))
322 326 self.hb_port = self.heartbeat.port
323 327 self.log.debug("Heartbeat REP Channel on port: %i" % self.hb_port)
324 328 self.heartbeat.start()
325 329
326 330 def log_connection_info(self):
327 331 """display connection info, and store ports"""
328 332 basename = os.path.basename(self.connection_file)
329 333 if basename == self.connection_file or \
330 334 os.path.dirname(self.connection_file) == self.profile_dir.security_dir:
331 335 # use shortname
332 336 tail = basename
333 337 if self.profile != 'default':
334 338 tail += " --profile %s" % self.profile
335 339 else:
336 340 tail = self.connection_file
337 341 lines = [
338 342 "To connect another client to this kernel, use:",
339 343 " --existing %s" % tail,
340 344 ]
341 345 # log connection info
342 346 # info-level, so often not shown.
343 347 # frontends should use the %connect_info magic
344 348 # to see the connection info
345 349 for line in lines:
346 350 self.log.info(line)
347 351 # also raw print to the terminal if no parent_handle (`ipython kernel`)
348 352 if not self.parent_handle:
349 353 io.rprint(_ctrl_c_message)
350 354 for line in lines:
351 355 io.rprint(line)
352 356
353 357 self.ports = dict(shell=self.shell_port, iopub=self.iopub_port,
354 358 stdin=self.stdin_port, hb=self.hb_port,
355 359 control=self.control_port)
356 360
357 361 def init_session(self):
358 362 """create our session object"""
359 363 default_secure(self.config)
360 364 self.session = Session(parent=self, username=u'kernel')
361 365
362 366 def init_blackhole(self):
363 367 """redirects stdout/stderr to devnull if necessary"""
364 368 if self.no_stdout or self.no_stderr:
365 369 blackhole = open(os.devnull, 'w')
366 370 if self.no_stdout:
367 371 sys.stdout = sys.__stdout__ = blackhole
368 372 if self.no_stderr:
369 373 sys.stderr = sys.__stderr__ = blackhole
370 374
371 375 def init_io(self):
372 376 """Redirect input streams and set a display hook."""
373 377 if self.outstream_class:
374 378 outstream_factory = import_item(str(self.outstream_class))
375 379 sys.stdout = outstream_factory(self.session, self.iopub_socket, u'stdout')
376 380 sys.stderr = outstream_factory(self.session, self.iopub_socket, u'stderr')
377 381 if self.displayhook_class:
378 382 displayhook_factory = import_item(str(self.displayhook_class))
379 383 sys.displayhook = displayhook_factory(self.session, self.iopub_socket)
380 384
381 385 def init_signal(self):
382 386 signal.signal(signal.SIGINT, signal.SIG_IGN)
383 387
384 388 def init_kernel(self):
385 389 """Create the Kernel object itself"""
386 390 shell_stream = ZMQStream(self.shell_socket)
387 391 control_stream = ZMQStream(self.control_socket)
388 392
389 393 kernel_factory = import_item(str(self.kernel_class))
390 394
391 395 kernel = kernel_factory(parent=self, session=self.session,
392 396 shell_streams=[shell_stream, control_stream],
393 397 iopub_socket=self.iopub_socket,
394 398 stdin_socket=self.stdin_socket,
395 399 log=self.log,
396 400 profile_dir=self.profile_dir,
397 401 user_ns=self.user_ns,
398 402 )
399 403 kernel.record_ports(self.ports)
400 404 self.kernel = kernel
401 405
402 406 def init_gui_pylab(self):
403 407 """Enable GUI event loop integration, taking pylab into account."""
404 408
405 409 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
406 410 # to ensure that any exception is printed straight to stderr.
407 411 # Normally _showtraceback associates the reply with an execution,
408 412 # which means frontends will never draw it, as this exception
409 413 # is not associated with any execute request.
410 414
411 415 shell = self.shell
412 416 _showtraceback = shell._showtraceback
413 417 try:
414 418 # replace pyerr-sending traceback with stderr
415 419 def print_tb(etype, evalue, stb):
416 420 print ("GUI event loop or pylab initialization failed",
417 421 file=io.stderr)
418 422 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
419 423 shell._showtraceback = print_tb
420 424 InteractiveShellApp.init_gui_pylab(self)
421 425 finally:
422 426 shell._showtraceback = _showtraceback
423 427
424 428 def init_shell(self):
425 429 self.shell = self.kernel.shell
426 430 self.shell.configurables.append(self)
427 431
428 432 @catch_config_error
429 433 def initialize(self, argv=None):
430 434 super(IPKernelApp, self).initialize(argv)
431 435 self.init_blackhole()
432 436 self.init_connection_file()
433 437 self.init_session()
434 438 self.init_poller()
435 439 self.init_sockets()
436 440 self.init_heartbeat()
437 441 # writing/displaying connection info must be *after* init_sockets/heartbeat
438 442 self.log_connection_info()
439 443 self.write_connection_file()
440 444 self.init_io()
441 445 self.init_signal()
442 446 self.init_kernel()
443 447 # shell init steps
444 448 self.init_path()
445 449 self.init_shell()
446 450 self.init_gui_pylab()
447 451 self.init_extensions()
448 452 self.init_code()
449 453 # flush stdout/stderr, so that anything written to these streams during
450 454 # initialization do not get associated with the first execution request
451 455 sys.stdout.flush()
452 456 sys.stderr.flush()
453 457
454 458 def start(self):
455 459 if self.poller is not None:
456 460 self.poller.start()
457 461 self.kernel.start()
458 462 try:
459 463 ioloop.IOLoop.instance().start()
460 464 except KeyboardInterrupt:
461 465 pass
462 466
463 467 launch_new_instance = IPKernelApp.launch_instance
464 468
465 469 def main():
466 470 """Run an IPKernel as an application"""
467 471 app = IPKernelApp.instance()
468 472 app.initialize()
469 473 app.start()
470 474
471 475
472 476 if __name__ == '__main__':
473 477 main()
General Comments 0
You need to be logged in to leave comments. Login now