##// END OF EJS Templates
s/channelabc/channelsabc/
MinRK -
Show More
@@ -1,648 +1,648 b''
1 1 """Base classes to manage a Client's interaction with a running kernel
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2013 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15 from __future__ import absolute_import
16 16
17 17 # Standard library imports
18 18 import atexit
19 19 import errno
20 20 from threading import Thread
21 21 import time
22 22
23 23 import zmq
24 24 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
25 25 # during garbage collection of threads at exit:
26 26 from zmq import ZMQError
27 27 from zmq.eventloop import ioloop, zmqstream
28 28
29 29 # Local imports
30 from .channelabc import (
30 from .channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC,
32 32 HBChannelABC, StdInChannelABC,
33 33 )
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Constants and exceptions
37 37 #-----------------------------------------------------------------------------
38 38
39 39 class InvalidPortNumber(Exception):
40 40 pass
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # Utility functions
44 44 #-----------------------------------------------------------------------------
45 45
46 46 # some utilities to validate message structure, these might get moved elsewhere
47 47 # if they prove to have more generic utility
48 48
49 49 def validate_string_list(lst):
50 50 """Validate that the input is a list of strings.
51 51
52 52 Raises ValueError if not."""
53 53 if not isinstance(lst, list):
54 54 raise ValueError('input %r must be a list' % lst)
55 55 for x in lst:
56 56 if not isinstance(x, basestring):
57 57 raise ValueError('element %r in list must be a string' % x)
58 58
59 59
60 60 def validate_string_dict(dct):
61 61 """Validate that the input is a dict with string keys and values.
62 62
63 63 Raises ValueError if not."""
64 64 for k,v in dct.iteritems():
65 65 if not isinstance(k, basestring):
66 66 raise ValueError('key %r in dict must be a string' % k)
67 67 if not isinstance(v, basestring):
68 68 raise ValueError('value %r in dict must be a string' % v)
69 69
70 70
71 71 #-----------------------------------------------------------------------------
72 72 # ZMQ Socket Channel classes
73 73 #-----------------------------------------------------------------------------
74 74
75 75 class ZMQSocketChannel(Thread):
76 76 """The base class for the channels that use ZMQ sockets."""
77 77 context = None
78 78 session = None
79 79 socket = None
80 80 ioloop = None
81 81 stream = None
82 82 _address = None
83 83 _exiting = False
84 84 proxy_methods = []
85 85
86 86 def __init__(self, context, session, address):
87 87 """Create a channel.
88 88
89 89 Parameters
90 90 ----------
91 91 context : :class:`zmq.Context`
92 92 The ZMQ context to use.
93 93 session : :class:`session.Session`
94 94 The session to use.
95 95 address : zmq url
96 96 Standard (ip, port) tuple that the kernel is listening on.
97 97 """
98 98 super(ZMQSocketChannel, self).__init__()
99 99 self.daemon = True
100 100
101 101 self.context = context
102 102 self.session = session
103 103 if isinstance(address, tuple):
104 104 if address[1] == 0:
105 105 message = 'The port number for a channel cannot be 0.'
106 106 raise InvalidPortNumber(message)
107 107 address = "tcp://%s:%i" % address
108 108 self._address = address
109 109 atexit.register(self._notice_exit)
110 110
111 111 def _notice_exit(self):
112 112 self._exiting = True
113 113
114 114 def _run_loop(self):
115 115 """Run my loop, ignoring EINTR events in the poller"""
116 116 while True:
117 117 try:
118 118 self.ioloop.start()
119 119 except ZMQError as e:
120 120 if e.errno == errno.EINTR:
121 121 continue
122 122 else:
123 123 raise
124 124 except Exception:
125 125 if self._exiting:
126 126 break
127 127 else:
128 128 raise
129 129 else:
130 130 break
131 131
132 132 def stop(self):
133 133 """Stop the channel's event loop and join its thread.
134 134
135 135 This calls :method:`Thread.join` and returns when the thread
136 136 terminates. :class:`RuntimeError` will be raised if
137 137 :method:`self.start` is called again.
138 138 """
139 139 self.join()
140 140
141 141 @property
142 142 def address(self):
143 143 """Get the channel's address as a zmq url string.
144 144
145 145 These URLS have the form: 'tcp://127.0.0.1:5555'.
146 146 """
147 147 return self._address
148 148
149 149 def _queue_send(self, msg):
150 150 """Queue a message to be sent from the IOLoop's thread.
151 151
152 152 Parameters
153 153 ----------
154 154 msg : message to send
155 155
156 156 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
157 157 thread control of the action.
158 158 """
159 159 def thread_send():
160 160 self.session.send(self.stream, msg)
161 161 self.ioloop.add_callback(thread_send)
162 162
163 163 def _handle_recv(self, msg):
164 164 """Callback for stream.on_recv.
165 165
166 166 Unpacks message, and calls handlers with it.
167 167 """
168 168 ident,smsg = self.session.feed_identities(msg)
169 169 self.call_handlers(self.session.unserialize(smsg))
170 170
171 171
172 172
173 173 class ShellChannel(ZMQSocketChannel):
174 174 """The shell channel for issuing request/replies to the kernel."""
175 175
176 176 command_queue = None
177 177 # flag for whether execute requests should be allowed to call raw_input:
178 178 allow_stdin = True
179 179 proxy_methods = [
180 180 'execute',
181 181 'complete',
182 182 'object_info',
183 183 'history',
184 184 'kernel_info',
185 185 'shutdown',
186 186 ]
187 187
188 188 def __init__(self, context, session, address):
189 189 super(ShellChannel, self).__init__(context, session, address)
190 190 self.ioloop = ioloop.IOLoop()
191 191
192 192 def run(self):
193 193 """The thread's main activity. Call start() instead."""
194 194 self.socket = self.context.socket(zmq.DEALER)
195 195 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
196 196 self.socket.connect(self.address)
197 197 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
198 198 self.stream.on_recv(self._handle_recv)
199 199 self._run_loop()
200 200 try:
201 201 self.socket.close()
202 202 except:
203 203 pass
204 204
205 205 def stop(self):
206 206 """Stop the channel's event loop and join its thread."""
207 207 self.ioloop.stop()
208 208 super(ShellChannel, self).stop()
209 209
210 210 def call_handlers(self, msg):
211 211 """This method is called in the ioloop thread when a message arrives.
212 212
213 213 Subclasses should override this method to handle incoming messages.
214 214 It is important to remember that this method is called in the thread
215 215 so that some logic must be done to ensure that the application level
216 216 handlers are called in the application thread.
217 217 """
218 218 raise NotImplementedError('call_handlers must be defined in a subclass.')
219 219
220 220 def execute(self, code, silent=False, store_history=True,
221 221 user_variables=None, user_expressions=None, allow_stdin=None):
222 222 """Execute code in the kernel.
223 223
224 224 Parameters
225 225 ----------
226 226 code : str
227 227 A string of Python code.
228 228
229 229 silent : bool, optional (default False)
230 230 If set, the kernel will execute the code as quietly possible, and
231 231 will force store_history to be False.
232 232
233 233 store_history : bool, optional (default True)
234 234 If set, the kernel will store command history. This is forced
235 235 to be False if silent is True.
236 236
237 237 user_variables : list, optional
238 238 A list of variable names to pull from the user's namespace. They
239 239 will come back as a dict with these names as keys and their
240 240 :func:`repr` as values.
241 241
242 242 user_expressions : dict, optional
243 243 A dict mapping names to expressions to be evaluated in the user's
244 244 dict. The expression values are returned as strings formatted using
245 245 :func:`repr`.
246 246
247 247 allow_stdin : bool, optional (default self.allow_stdin)
248 248 Flag for whether the kernel can send stdin requests to frontends.
249 249
250 250 Some frontends (e.g. the Notebook) do not support stdin requests.
251 251 If raw_input is called from code executed from such a frontend, a
252 252 StdinNotImplementedError will be raised.
253 253
254 254 Returns
255 255 -------
256 256 The msg_id of the message sent.
257 257 """
258 258 if user_variables is None:
259 259 user_variables = []
260 260 if user_expressions is None:
261 261 user_expressions = {}
262 262 if allow_stdin is None:
263 263 allow_stdin = self.allow_stdin
264 264
265 265
266 266 # Don't waste network traffic if inputs are invalid
267 267 if not isinstance(code, basestring):
268 268 raise ValueError('code %r must be a string' % code)
269 269 validate_string_list(user_variables)
270 270 validate_string_dict(user_expressions)
271 271
272 272 # Create class for content/msg creation. Related to, but possibly
273 273 # not in Session.
274 274 content = dict(code=code, silent=silent, store_history=store_history,
275 275 user_variables=user_variables,
276 276 user_expressions=user_expressions,
277 277 allow_stdin=allow_stdin,
278 278 )
279 279 msg = self.session.msg('execute_request', content)
280 280 self._queue_send(msg)
281 281 return msg['header']['msg_id']
282 282
283 283 def complete(self, text, line, cursor_pos, block=None):
284 284 """Tab complete text in the kernel's namespace.
285 285
286 286 Parameters
287 287 ----------
288 288 text : str
289 289 The text to complete.
290 290 line : str
291 291 The full line of text that is the surrounding context for the
292 292 text to complete.
293 293 cursor_pos : int
294 294 The position of the cursor in the line where the completion was
295 295 requested.
296 296 block : str, optional
297 297 The full block of code in which the completion is being requested.
298 298
299 299 Returns
300 300 -------
301 301 The msg_id of the message sent.
302 302 """
303 303 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
304 304 msg = self.session.msg('complete_request', content)
305 305 self._queue_send(msg)
306 306 return msg['header']['msg_id']
307 307
308 308 def object_info(self, oname, detail_level=0):
309 309 """Get metadata information about an object in the kernel's namespace.
310 310
311 311 Parameters
312 312 ----------
313 313 oname : str
314 314 A string specifying the object name.
315 315 detail_level : int, optional
316 316 The level of detail for the introspection (0-2)
317 317
318 318 Returns
319 319 -------
320 320 The msg_id of the message sent.
321 321 """
322 322 content = dict(oname=oname, detail_level=detail_level)
323 323 msg = self.session.msg('object_info_request', content)
324 324 self._queue_send(msg)
325 325 return msg['header']['msg_id']
326 326
327 327 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
328 328 """Get entries from the kernel's history list.
329 329
330 330 Parameters
331 331 ----------
332 332 raw : bool
333 333 If True, return the raw input.
334 334 output : bool
335 335 If True, then return the output as well.
336 336 hist_access_type : str
337 337 'range' (fill in session, start and stop params), 'tail' (fill in n)
338 338 or 'search' (fill in pattern param).
339 339
340 340 session : int
341 341 For a range request, the session from which to get lines. Session
342 342 numbers are positive integers; negative ones count back from the
343 343 current session.
344 344 start : int
345 345 The first line number of a history range.
346 346 stop : int
347 347 The final (excluded) line number of a history range.
348 348
349 349 n : int
350 350 The number of lines of history to get for a tail request.
351 351
352 352 pattern : str
353 353 The glob-syntax pattern for a search request.
354 354
355 355 Returns
356 356 -------
357 357 The msg_id of the message sent.
358 358 """
359 359 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
360 360 **kwargs)
361 361 msg = self.session.msg('history_request', content)
362 362 self._queue_send(msg)
363 363 return msg['header']['msg_id']
364 364
365 365 def kernel_info(self):
366 366 """Request kernel info."""
367 367 msg = self.session.msg('kernel_info_request')
368 368 self._queue_send(msg)
369 369 return msg['header']['msg_id']
370 370
371 371 def shutdown(self, restart=False):
372 372 """Request an immediate kernel shutdown.
373 373
374 374 Upon receipt of the (empty) reply, client code can safely assume that
375 375 the kernel has shut down and it's safe to forcefully terminate it if
376 376 it's still alive.
377 377
378 378 The kernel will send the reply via a function registered with Python's
379 379 atexit module, ensuring it's truly done as the kernel is done with all
380 380 normal operation.
381 381 """
382 382 # Send quit message to kernel. Once we implement kernel-side setattr,
383 383 # this should probably be done that way, but for now this will do.
384 384 msg = self.session.msg('shutdown_request', {'restart':restart})
385 385 self._queue_send(msg)
386 386 return msg['header']['msg_id']
387 387
388 388
389 389
390 390 class IOPubChannel(ZMQSocketChannel):
391 391 """The iopub channel which listens for messages that the kernel publishes.
392 392
393 393 This channel is where all output is published to frontends.
394 394 """
395 395
396 396 def __init__(self, context, session, address):
397 397 super(IOPubChannel, self).__init__(context, session, address)
398 398 self.ioloop = ioloop.IOLoop()
399 399
400 400 def run(self):
401 401 """The thread's main activity. Call start() instead."""
402 402 self.socket = self.context.socket(zmq.SUB)
403 403 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
404 404 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
405 405 self.socket.connect(self.address)
406 406 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
407 407 self.stream.on_recv(self._handle_recv)
408 408 self._run_loop()
409 409 try:
410 410 self.socket.close()
411 411 except:
412 412 pass
413 413
414 414 def stop(self):
415 415 """Stop the channel's event loop and join its thread."""
416 416 self.ioloop.stop()
417 417 super(IOPubChannel, self).stop()
418 418
419 419 def call_handlers(self, msg):
420 420 """This method is called in the ioloop thread when a message arrives.
421 421
422 422 Subclasses should override this method to handle incoming messages.
423 423 It is important to remember that this method is called in the thread
424 424 so that some logic must be done to ensure that the application leve
425 425 handlers are called in the application thread.
426 426 """
427 427 raise NotImplementedError('call_handlers must be defined in a subclass.')
428 428
429 429 def flush(self, timeout=1.0):
430 430 """Immediately processes all pending messages on the iopub channel.
431 431
432 432 Callers should use this method to ensure that :method:`call_handlers`
433 433 has been called for all messages that have been received on the
434 434 0MQ SUB socket of this channel.
435 435
436 436 This method is thread safe.
437 437
438 438 Parameters
439 439 ----------
440 440 timeout : float, optional
441 441 The maximum amount of time to spend flushing, in seconds. The
442 442 default is one second.
443 443 """
444 444 # We do the IOLoop callback process twice to ensure that the IOLoop
445 445 # gets to perform at least one full poll.
446 446 stop_time = time.time() + timeout
447 447 for i in xrange(2):
448 448 self._flushed = False
449 449 self.ioloop.add_callback(self._flush)
450 450 while not self._flushed and time.time() < stop_time:
451 451 time.sleep(0.01)
452 452
453 453 def _flush(self):
454 454 """Callback for :method:`self.flush`."""
455 455 self.stream.flush()
456 456 self._flushed = True
457 457
458 458
459 459 class StdInChannel(ZMQSocketChannel):
460 460 """The stdin channel to handle raw_input requests that the kernel makes."""
461 461
462 462 msg_queue = None
463 463 proxy_methods = ['input']
464 464
465 465 def __init__(self, context, session, address):
466 466 super(StdInChannel, self).__init__(context, session, address)
467 467 self.ioloop = ioloop.IOLoop()
468 468
469 469 def run(self):
470 470 """The thread's main activity. Call start() instead."""
471 471 self.socket = self.context.socket(zmq.DEALER)
472 472 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
473 473 self.socket.connect(self.address)
474 474 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
475 475 self.stream.on_recv(self._handle_recv)
476 476 self._run_loop()
477 477 try:
478 478 self.socket.close()
479 479 except:
480 480 pass
481 481
482 482 def stop(self):
483 483 """Stop the channel's event loop and join its thread."""
484 484 self.ioloop.stop()
485 485 super(StdInChannel, self).stop()
486 486
487 487 def call_handlers(self, msg):
488 488 """This method is called in the ioloop thread when a message arrives.
489 489
490 490 Subclasses should override this method to handle incoming messages.
491 491 It is important to remember that this method is called in the thread
492 492 so that some logic must be done to ensure that the application leve
493 493 handlers are called in the application thread.
494 494 """
495 495 raise NotImplementedError('call_handlers must be defined in a subclass.')
496 496
497 497 def input(self, string):
498 498 """Send a string of raw input to the kernel."""
499 499 content = dict(value=string)
500 500 msg = self.session.msg('input_reply', content)
501 501 self._queue_send(msg)
502 502
503 503
504 504 class HBChannel(ZMQSocketChannel):
505 505 """The heartbeat channel which monitors the kernel heartbeat.
506 506
507 507 Note that the heartbeat channel is paused by default. As long as you start
508 508 this channel, the kernel manager will ensure that it is paused and un-paused
509 509 as appropriate.
510 510 """
511 511
512 512 time_to_dead = 3.0
513 513 socket = None
514 514 poller = None
515 515 _running = None
516 516 _pause = None
517 517 _beating = None
518 518
519 519 def __init__(self, context, session, address):
520 520 super(HBChannel, self).__init__(context, session, address)
521 521 self._running = False
522 522 self._pause =True
523 523 self.poller = zmq.Poller()
524 524
525 525 def _create_socket(self):
526 526 if self.socket is not None:
527 527 # close previous socket, before opening a new one
528 528 self.poller.unregister(self.socket)
529 529 self.socket.close()
530 530 self.socket = self.context.socket(zmq.REQ)
531 531 self.socket.setsockopt(zmq.LINGER, 0)
532 532 self.socket.connect(self.address)
533 533
534 534 self.poller.register(self.socket, zmq.POLLIN)
535 535
536 536 def _poll(self, start_time):
537 537 """poll for heartbeat replies until we reach self.time_to_dead.
538 538
539 539 Ignores interrupts, and returns the result of poll(), which
540 540 will be an empty list if no messages arrived before the timeout,
541 541 or the event tuple if there is a message to receive.
542 542 """
543 543
544 544 until_dead = self.time_to_dead - (time.time() - start_time)
545 545 # ensure poll at least once
546 546 until_dead = max(until_dead, 1e-3)
547 547 events = []
548 548 while True:
549 549 try:
550 550 events = self.poller.poll(1000 * until_dead)
551 551 except ZMQError as e:
552 552 if e.errno == errno.EINTR:
553 553 # ignore interrupts during heartbeat
554 554 # this may never actually happen
555 555 until_dead = self.time_to_dead - (time.time() - start_time)
556 556 until_dead = max(until_dead, 1e-3)
557 557 pass
558 558 else:
559 559 raise
560 560 except Exception:
561 561 if self._exiting:
562 562 break
563 563 else:
564 564 raise
565 565 else:
566 566 break
567 567 return events
568 568
569 569 def run(self):
570 570 """The thread's main activity. Call start() instead."""
571 571 self._create_socket()
572 572 self._running = True
573 573 self._beating = True
574 574
575 575 while self._running:
576 576 if self._pause:
577 577 # just sleep, and skip the rest of the loop
578 578 time.sleep(self.time_to_dead)
579 579 continue
580 580
581 581 since_last_heartbeat = 0.0
582 582 # io.rprint('Ping from HB channel') # dbg
583 583 # no need to catch EFSM here, because the previous event was
584 584 # either a recv or connect, which cannot be followed by EFSM
585 585 self.socket.send(b'ping')
586 586 request_time = time.time()
587 587 ready = self._poll(request_time)
588 588 if ready:
589 589 self._beating = True
590 590 # the poll above guarantees we have something to recv
591 591 self.socket.recv()
592 592 # sleep the remainder of the cycle
593 593 remainder = self.time_to_dead - (time.time() - request_time)
594 594 if remainder > 0:
595 595 time.sleep(remainder)
596 596 continue
597 597 else:
598 598 # nothing was received within the time limit, signal heart failure
599 599 self._beating = False
600 600 since_last_heartbeat = time.time() - request_time
601 601 self.call_handlers(since_last_heartbeat)
602 602 # and close/reopen the socket, because the REQ/REP cycle has been broken
603 603 self._create_socket()
604 604 continue
605 605 try:
606 606 self.socket.close()
607 607 except:
608 608 pass
609 609
610 610 def pause(self):
611 611 """Pause the heartbeat."""
612 612 self._pause = True
613 613
614 614 def unpause(self):
615 615 """Unpause the heartbeat."""
616 616 self._pause = False
617 617
618 618 def is_beating(self):
619 619 """Is the heartbeat running and responsive (and not paused)."""
620 620 if self.is_alive() and not self._pause and self._beating:
621 621 return True
622 622 else:
623 623 return False
624 624
625 625 def stop(self):
626 626 """Stop the channel's event loop and join its thread."""
627 627 self._running = False
628 628 super(HBChannel, self).stop()
629 629
630 630 def call_handlers(self, since_last_heartbeat):
631 631 """This method is called in the ioloop thread when a message arrives.
632 632
633 633 Subclasses should override this method to handle incoming messages.
634 634 It is important to remember that this method is called in the thread
635 635 so that some logic must be done to ensure that the application level
636 636 handlers are called in the application thread.
637 637 """
638 638 raise NotImplementedError('call_handlers must be defined in a subclass.')
639 639
640 640
641 641 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
642 642 # ABC Registration
643 643 #-----------------------------------------------------------------------------
644 644
645 645 ShellChannelABC.register(ShellChannel)
646 646 IOPubChannelABC.register(IOPubChannel)
647 647 HBChannelABC.register(HBChannel)
648 648 StdInChannelABC.register(StdInChannel)
@@ -1,127 +1,126 b''
1 1 """Abstract base classes for kernel client channels"""
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # Standard library imports
15 15 import abc
16 16
17 17 #-----------------------------------------------------------------------------
18 18 # Channels
19 19 #-----------------------------------------------------------------------------
20 20
21 21
22 22 class ChannelABC(object):
23 23 """A base class for all channel ABCs."""
24 24
25 25 __metaclass__ = abc.ABCMeta
26 26
27 27 @abc.abstractmethod
28 28 def start(self):
29 29 pass
30 30
31 31 @abc.abstractmethod
32 32 def stop(self):
33 33 pass
34 34
35 35 @abc.abstractmethod
36 36 def is_alive(self):
37 37 pass
38 38
39 39
40 40 class ShellChannelABC(ChannelABC):
41 41 """ShellChannel ABC.
42 42
43 43 The docstrings for this class can be found in the base implementation:
44 44
45 45 `IPython.kernel.channels.ShellChannel`
46 46 """
47 47
48 48 @abc.abstractproperty
49 49 def allow_stdin(self):
50 50 pass
51 51
52 52 @abc.abstractmethod
53 53 def execute(self, code, silent=False, store_history=True,
54 54 user_variables=None, user_expressions=None, allow_stdin=None):
55 55 pass
56 56
57 57 @abc.abstractmethod
58 58 def complete(self, text, line, cursor_pos, block=None):
59 59 pass
60 60
61 61 @abc.abstractmethod
62 62 def object_info(self, oname, detail_level=0):
63 63 pass
64 64
65 65 @abc.abstractmethod
66 66 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
67 67 pass
68 68
69 69 @abc.abstractmethod
70 70 def kernel_info(self):
71 71 pass
72 72
73 73 @abc.abstractmethod
74 74 def shutdown(self, restart=False):
75 75 pass
76 76
77 77
78 78 class IOPubChannelABC(ChannelABC):
79 79 """IOPubChannel ABC.
80 80
81 81 The docstrings for this class can be found in the base implementation:
82 82
83 83 `IPython.kernel.channels.IOPubChannel`
84 84 """
85 85
86 86 @abc.abstractmethod
87 87 def flush(self, timeout=1.0):
88 88 pass
89 89
90 90
91 91 class StdInChannelABC(ChannelABC):
92 92 """StdInChannel ABC.
93 93
94 94 The docstrings for this class can be found in the base implementation:
95 95
96 96 `IPython.kernel.channels.StdInChannel`
97 97 """
98 98
99 99 @abc.abstractmethod
100 100 def input(self, string):
101 101 pass
102 102
103 103
104 104 class HBChannelABC(ChannelABC):
105 105 """HBChannel ABC.
106 106
107 107 The docstrings for this class can be found in the base implementation:
108 108
109 109 `IPython.kernel.channels.HBChannel`
110 110 """
111 111
112 112 @abc.abstractproperty
113 113 def time_to_dead(self):
114 114 pass
115 115
116 116 @abc.abstractmethod
117 117 def pause(self):
118 118 pass
119 119
120 120 @abc.abstractmethod
121 121 def unpause(self):
122 122 pass
123 123
124 124 @abc.abstractmethod
125 125 def is_beating(self):
126 126 pass
127
@@ -1,194 +1,194 b''
1 1 """ A kernel client for in-process kernels. """
2 2
3 3 #-----------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-----------------------------------------------------------------------------
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Imports
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # IPython imports
15 from IPython.kernel.channelabc import (
15 from IPython.kernel.channelsabc import (
16 16 ShellChannelABC, IOPubChannelABC,
17 17 HBChannelABC, StdInChannelABC,
18 18 )
19 19
20 20 # Local imports
21 21 from .socket import DummySocket
22 22
23 23 #-----------------------------------------------------------------------------
24 24 # Channel classes
25 25 #-----------------------------------------------------------------------------
26 26
27 27 class InProcessChannel(object):
28 28 """Base class for in-process channels."""
29 29 proxy_methods = []
30 30
31 31 def __init__(self, client):
32 32 super(InProcessChannel, self).__init__()
33 33 self.client = client
34 34 self._is_alive = False
35 35
36 36 #--------------------------------------------------------------------------
37 37 # Channel interface
38 38 #--------------------------------------------------------------------------
39 39
40 40 def is_alive(self):
41 41 return self._is_alive
42 42
43 43 def start(self):
44 44 self._is_alive = True
45 45
46 46 def stop(self):
47 47 self._is_alive = False
48 48
49 49 def call_handlers(self, msg):
50 50 """ This method is called in the main thread when a message arrives.
51 51
52 52 Subclasses should override this method to handle incoming messages.
53 53 """
54 54 raise NotImplementedError('call_handlers must be defined in a subclass.')
55 55
56 56 #--------------------------------------------------------------------------
57 57 # InProcessChannel interface
58 58 #--------------------------------------------------------------------------
59 59
60 60 def call_handlers_later(self, *args, **kwds):
61 61 """ Call the message handlers later.
62 62
63 63 The default implementation just calls the handlers immediately, but this
64 64 method exists so that GUI toolkits can defer calling the handlers until
65 65 after the event loop has run, as expected by GUI frontends.
66 66 """
67 67 self.call_handlers(*args, **kwds)
68 68
69 69 def process_events(self):
70 70 """ Process any pending GUI events.
71 71
72 72 This method will be never be called from a frontend without an event
73 73 loop (e.g., a terminal frontend).
74 74 """
75 75 raise NotImplementedError
76 76
77 77
78 78 class InProcessShellChannel(InProcessChannel):
79 79 """See `IPython.kernel.channels.ShellChannel` for docstrings."""
80 80
81 81 # flag for whether execute requests should be allowed to call raw_input
82 82 allow_stdin = True
83 83 proxy_methods = [
84 84 'execute',
85 85 'complete',
86 86 'object_info',
87 87 'history',
88 88 'shutdown',
89 89 ]
90 90
91 91 #--------------------------------------------------------------------------
92 92 # ShellChannel interface
93 93 #--------------------------------------------------------------------------
94 94
95 95 def execute(self, code, silent=False, store_history=True,
96 96 user_variables=[], user_expressions={}, allow_stdin=None):
97 97 if allow_stdin is None:
98 98 allow_stdin = self.allow_stdin
99 99 content = dict(code=code, silent=silent, store_history=store_history,
100 100 user_variables=user_variables,
101 101 user_expressions=user_expressions,
102 102 allow_stdin=allow_stdin)
103 103 msg = self.client.session.msg('execute_request', content)
104 104 self._dispatch_to_kernel(msg)
105 105 return msg['header']['msg_id']
106 106
107 107 def complete(self, text, line, cursor_pos, block=None):
108 108 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
109 109 msg = self.client.session.msg('complete_request', content)
110 110 self._dispatch_to_kernel(msg)
111 111 return msg['header']['msg_id']
112 112
113 113 def object_info(self, oname, detail_level=0):
114 114 content = dict(oname=oname, detail_level=detail_level)
115 115 msg = self.client.session.msg('object_info_request', content)
116 116 self._dispatch_to_kernel(msg)
117 117 return msg['header']['msg_id']
118 118
119 119 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
120 120 content = dict(raw=raw, output=output,
121 121 hist_access_type=hist_access_type, **kwds)
122 122 msg = self.client.session.msg('history_request', content)
123 123 self._dispatch_to_kernel(msg)
124 124 return msg['header']['msg_id']
125 125
126 126 def shutdown(self, restart=False):
127 127 # FIXME: What to do here?
128 128 raise NotImplementedError('Cannot shutdown in-process kernel')
129 129
130 130 #--------------------------------------------------------------------------
131 131 # Protected interface
132 132 #--------------------------------------------------------------------------
133 133
134 134 def _dispatch_to_kernel(self, msg):
135 135 """ Send a message to the kernel and handle a reply.
136 136 """
137 137 kernel = self.client.kernel
138 138 if kernel is None:
139 139 raise RuntimeError('Cannot send request. No kernel exists.')
140 140
141 141 stream = DummySocket()
142 142 self.client.session.send(stream, msg)
143 143 msg_parts = stream.recv_multipart()
144 144 kernel.dispatch_shell(stream, msg_parts)
145 145
146 146 idents, reply_msg = self.client.session.recv(stream, copy=False)
147 147 self.call_handlers_later(reply_msg)
148 148
149 149
150 150 class InProcessIOPubChannel(InProcessChannel):
151 151 """See `IPython.kernel.channels.IOPubChannel` for docstrings."""
152 152
153 153 def flush(self, timeout=1.0):
154 154 pass
155 155
156 156
157 157 class InProcessStdInChannel(InProcessChannel):
158 158 """See `IPython.kernel.channels.StdInChannel` for docstrings."""
159 159
160 160 proxy_methods = ['input']
161 161
162 162 def input(self, string):
163 163 kernel = self.client.kernel
164 164 if kernel is None:
165 165 raise RuntimeError('Cannot send input reply. No kernel exists.')
166 166 kernel.raw_input_str = string
167 167
168 168
169 169 class InProcessHBChannel(InProcessChannel):
170 170 """See `IPython.kernel.channels.HBChannel` for docstrings."""
171 171
172 172 time_to_dead = 3.0
173 173
174 174 def __init__(self, *args, **kwds):
175 175 super(InProcessHBChannel, self).__init__(*args, **kwds)
176 176 self._pause = True
177 177
178 178 def pause(self):
179 179 self._pause = True
180 180
181 181 def unpause(self):
182 182 self._pause = False
183 183
184 184 def is_beating(self):
185 185 return not self._pause
186 186
187 187 #-----------------------------------------------------------------------------
188 188 # ABC Registration
189 189 #-----------------------------------------------------------------------------
190 190
191 191 ShellChannelABC.register(InProcessShellChannel)
192 192 IOPubChannelABC.register(InProcessIOPubChannel)
193 193 HBChannelABC.register(InProcessHBChannel)
194 194 StdInChannelABC.register(InProcessStdInChannel)
General Comments 0
You need to be logged in to leave comments. Login now