"""Implements a fully blocking kernel client. Useful for test suites and blocking terminal interfaces. """ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. try: from queue import Empty # Python 3 except ImportError: from Queue import Empty # Python 2 from IPython.utils.traitlets import Type from jupyter_client.channels import HBChannel from jupyter_client.client import KernelClient from .channels import ZMQSocketChannel class BlockingKernelClient(KernelClient): def wait_for_ready(self): # Wait for kernel info reply on shell channel while True: msg = self.shell_channel.get_msg(block=True) if msg['msg_type'] == 'kernel_info_reply': self._handle_kernel_info_reply(msg) break # Flush IOPub channel while True: try: msg = self.iopub_channel.get_msg(block=True, timeout=0.2) except Empty: break # The classes to use for the various channels shell_channel_class = Type(ZMQSocketChannel) iopub_channel_class = Type(ZMQSocketChannel) stdin_channel_class = Type(ZMQSocketChannel) hb_channel_class = Type(HBChannel)