##// END OF EJS Templates
Move ZMQ socket creation out of channels
Thomas Kluyver -
Show More
@@ -1,174 +1,171 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 StdInChannelABC
15 StdInChannelABC
16 from IPython.kernel.channels import HBChannel,\
16 from IPython.kernel.channels import HBChannel,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 InvalidPortNumber, major_protocol_version
18 InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
19 from IPython.utils.py3compat import string_types, iteritems
20
20
21 # some utilities to validate message structure, these might get moved elsewhere
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
22 # if they prove to have more generic utility
23
23
24 def validate_string_list(lst):
24 def validate_string_list(lst):
25 """Validate that the input is a list of strings.
25 """Validate that the input is a list of strings.
26
26
27 Raises ValueError if not."""
27 Raises ValueError if not."""
28 if not isinstance(lst, list):
28 if not isinstance(lst, list):
29 raise ValueError('input %r must be a list' % lst)
29 raise ValueError('input %r must be a list' % lst)
30 for x in lst:
30 for x in lst:
31 if not isinstance(x, string_types):
31 if not isinstance(x, string_types):
32 raise ValueError('element %r in list must be a string' % x)
32 raise ValueError('element %r in list must be a string' % x)
33
33
34
34
35 def validate_string_dict(dct):
35 def validate_string_dict(dct):
36 """Validate that the input is a dict with string keys and values.
36 """Validate that the input is a dict with string keys and values.
37
37
38 Raises ValueError if not."""
38 Raises ValueError if not."""
39 for k,v in iteritems(dct):
39 for k,v in iteritems(dct):
40 if not isinstance(k, string_types):
40 if not isinstance(k, string_types):
41 raise ValueError('key %r in dict must be a string' % k)
41 raise ValueError('key %r in dict must be a string' % k)
42 if not isinstance(v, string_types):
42 if not isinstance(v, string_types):
43 raise ValueError('value %r in dict must be a string' % v)
43 raise ValueError('value %r in dict must be a string' % v)
44
44
45
45
46 class ZMQSocketChannel(object):
46 class ZMQSocketChannel(object):
47 """The base class for the channels that use ZMQ sockets."""
47 """The base class for the channels that use ZMQ sockets."""
48 context = None
48 context = None
49 session = None
49 session = None
50 socket = None
50 socket = None
51 ioloop = None
51 ioloop = None
52 stream = None
52 stream = None
53 _address = None
53 _address = None
54 _exiting = False
54 _exiting = False
55 proxy_methods = []
55 proxy_methods = []
56
56
57 def __init__(self, context, session, address):
57 def __init__(self, socket, session):
58 """Create a channel.
58 """Create a channel.
59
59
60 Parameters
60 Parameters
61 ----------
61 ----------
62 context : :class:`zmq.Context`
62 context : :class:`zmq.Context`
63 The ZMQ context to use.
63 The ZMQ context to use.
64 session : :class:`session.Session`
64 session : :class:`session.Session`
65 The session to use.
65 The session to use.
66 address : zmq url
66 address : zmq url
67 Standard (ip, port) tuple that the kernel is listening on.
67 Standard (ip, port) tuple that the kernel is listening on.
68 """
68 """
69 super(ZMQSocketChannel, self).__init__()
69 super(ZMQSocketChannel, self).__init__()
70 self.daemon = True
70 self.daemon = True
71
71
72 self.context = context
72 self.socket = socket
73 self.session = session
73 self.session = session
74 if isinstance(address, tuple):
75 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
78 address = "tcp://%s:%i" % address
79 self._address = address
80
74
81 def _recv(self, **kwargs):
75 def _recv(self, **kwargs):
82 msg = self.socket.recv_multipart(**kwargs)
76 msg = self.socket.recv_multipart(**kwargs)
83 ident,smsg = self.session.feed_identities(msg)
77 ident,smsg = self.session.feed_identities(msg)
84 return self.session.deserialize(smsg)
78 return self.session.deserialize(smsg)
85
79
86 def get_msg(self, block=True, timeout=None):
80 def get_msg(self, block=True, timeout=None):
87 """ Gets a message if there is one that is ready. """
81 """ Gets a message if there is one that is ready. """
88 if block:
82 if block:
89 if timeout is not None:
83 if timeout is not None:
90 timeout *= 1000 # seconds to ms
84 timeout *= 1000 # seconds to ms
91 ready = self.socket.poll(timeout)
85 ready = self.socket.poll(timeout)
92 else:
86 else:
93 ready = self.socket.poll(timeout=0)
87 ready = self.socket.poll(timeout=0)
94
88
95 if ready:
89 if ready:
96 return self._recv()
90 return self._recv()
97 else:
91 else:
98 raise Empty
92 raise Empty
99
93
100 def get_msgs(self):
94 def get_msgs(self):
101 """ Get all messages that are currently ready. """
95 """ Get all messages that are currently ready. """
102 msgs = []
96 msgs = []
103 while True:
97 while True:
104 try:
98 try:
105 msgs.append(self.get_msg(block=False))
99 msgs.append(self.get_msg(block=False))
106 except Empty:
100 except Empty:
107 break
101 break
108 return msgs
102 return msgs
109
103
110 def msg_ready(self):
104 def msg_ready(self):
111 """ Is there a message that has been received? """
105 """ Is there a message that has been received? """
112 return bool(self.socket.poll(timeout=0))
106 return bool(self.socket.poll(timeout=0))
113
107
114 def close(self):
108 def close(self):
115 if self.socket is not None:
109 if self.socket is not None:
116 try:
110 try:
117 self.socket.close(linger=0)
111 self.socket.close(linger=0)
118 except Exception:
112 except Exception:
119 pass
113 pass
120 self.socket = None
114 self.socket = None
121 stop = close
115 stop = close
122
116
123 def is_alive(self):
117 def is_alive(self):
124 return (self.socket is not None)
118 return (self.socket is not None)
125
119
126 @property
120 @property
127 def address(self):
121 def address(self):
128 """Get the channel's address as a zmq url string.
122 """Get the channel's address as a zmq url string.
129
123
130 These URLS have the form: 'tcp://127.0.0.1:5555'.
124 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 """
125 """
132 return self._address
126 return self._address
133
127
134 def _queue_send(self, msg):
128 def _queue_send(self, msg):
135 """Pass a message to the ZMQ socket to send
129 """Pass a message to the ZMQ socket to send
136 """
130 """
137 self.session.send(self.socket, msg)
131 self.session.send(self.socket, msg)
138
132
133 def start(self):
134 pass
135
139
136
140 class BlockingShellChannel(ZMQSocketChannel):
137 class BlockingShellChannel(ZMQSocketChannel):
141 """The shell channel for issuing request/replies to the kernel."""
138 """The shell channel for issuing request/replies to the kernel."""
142
139
143 def start(self):
140 def start(self):
144 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
141 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
145
142
146
143
147 class BlockingIOPubChannel(ZMQSocketChannel):
144 class BlockingIOPubChannel(ZMQSocketChannel):
148 """The iopub channel which listens for messages that the kernel publishes.
145 """The iopub channel which listens for messages that the kernel publishes.
149
146
150 This channel is where all output is published to frontends.
147 This channel is where all output is published to frontends.
151 """
148 """
152 def start(self):
149 def start(self):
153 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
150 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
154
151
155 class BlockingStdInChannel(ZMQSocketChannel):
152 class BlockingStdInChannel(ZMQSocketChannel):
156 """The stdin channel to handle raw_input requests that the kernel makes."""
153 """The stdin channel to handle raw_input requests that the kernel makes."""
157 def start(self):
154 def start(self):
158 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
155 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
159
156
160 ShellChannelABC.register(BlockingShellChannel)
157 ShellChannelABC.register(BlockingShellChannel)
161 IOPubChannelABC.register(BlockingIOPubChannel)
158 IOPubChannelABC.register(BlockingIOPubChannel)
162 StdInChannelABC.register(BlockingStdInChannel)
159 StdInChannelABC.register(BlockingStdInChannel)
163
160
164
161
165 class BlockingHBChannel(HBChannel):
162 class BlockingHBChannel(HBChannel):
166
163
167 # This kernel needs quicker monitoring, shorten to 1 sec.
164 # This kernel needs quicker monitoring, shorten to 1 sec.
168 # less than 0.5s is unreliable, and will get occasional
165 # less than 0.5s is unreliable, and will get occasional
169 # false reports of missed beats.
166 # false reports of missed beats.
170 time_to_dead = 1.
167 time_to_dead = 1.
171
168
172 def call_handlers(self, since_last_heartbeat):
169 def call_handlers(self, since_last_heartbeat):
173 """ Pause beating on missed heartbeat. """
170 """ Pause beating on missed heartbeat. """
174 pass
171 pass
@@ -1,41 +1,38 b''
1 """Implements a fully blocking kernel client.
1 """Implements a fully blocking kernel client.
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 # Copyright (c) IPython Development Team.
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
6 # Distributed under the terms of the Modified BSD License.
7
7
8 try:
8 try:
9 from queue import Empty # Python 3
9 from queue import Empty # Python 3
10 except ImportError:
10 except ImportError:
11 from Queue import Empty # Python 2
11 from Queue import Empty # Python 2
12
12
13 from IPython.utils.traitlets import Type
13 from IPython.utils.traitlets import Type
14 from IPython.kernel.client import KernelClient
14 from IPython.kernel.client import KernelClient
15 from .channels import (
15 from .channels import ZMQSocketChannel, BlockingHBChannel
16 BlockingIOPubChannel, BlockingHBChannel,
17 BlockingShellChannel, BlockingStdInChannel
18 )
19
16
20 class BlockingKernelClient(KernelClient):
17 class BlockingKernelClient(KernelClient):
21 def wait_for_ready(self):
18 def wait_for_ready(self):
22 # Wait for kernel info reply on shell channel
19 # Wait for kernel info reply on shell channel
23 while True:
20 while True:
24 msg = self.shell_channel.get_msg(block=True)
21 msg = self.shell_channel.get_msg(block=True)
25 if msg['msg_type'] == 'kernel_info_reply':
22 if msg['msg_type'] == 'kernel_info_reply':
26 self._handle_kernel_info_reply(msg)
23 self._handle_kernel_info_reply(msg)
27 break
24 break
28
25
29 # Flush IOPub channel
26 # Flush IOPub channel
30 while True:
27 while True:
31 try:
28 try:
32 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
29 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
33 print(msg['msg_type'])
30 print(msg['msg_type'])
34 except Empty:
31 except Empty:
35 break
32 break
36
33
37 # The classes to use for the various channels
34 # The classes to use for the various channels
38 shell_channel_class = Type(BlockingShellChannel)
35 shell_channel_class = Type(ZMQSocketChannel)
39 iopub_channel_class = Type(BlockingIOPubChannel)
36 iopub_channel_class = Type(ZMQSocketChannel)
40 stdin_channel_class = Type(BlockingStdInChannel)
37 stdin_channel_class = Type(ZMQSocketChannel)
41 hb_channel_class = Type(BlockingHBChannel)
38 hb_channel_class = Type(BlockingHBChannel)
@@ -1,369 +1,375 b''
1 """Base class to manage the interaction with a running kernel"""
1 """Base class to manage the interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
8 from IPython.utils.py3compat import string_types
8 from IPython.utils.py3compat import string_types
9
9
10 import zmq
10 import zmq
11
11
12 from IPython.utils.traitlets import (
12 from IPython.utils.traitlets import (
13 Any, Instance, Type,
13 Any, Instance, Type,
14 )
14 )
15
15
16 from .channelsabc import (
16 from .channelsabc import (
17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 )
18 )
19 from .channels import (
20 make_shell_socket, make_stdin_socket, make_iopub_socket
21 )
19 from .clientabc import KernelClientABC
22 from .clientabc import KernelClientABC
20 from .connect import ConnectionFileMixin
23 from .connect import ConnectionFileMixin
21
24
22
25
23 class KernelClient(ConnectionFileMixin):
26 class KernelClient(ConnectionFileMixin):
24 """Communicates with a single kernel on any host via zmq channels.
27 """Communicates with a single kernel on any host via zmq channels.
25
28
26 There are four channels associated with each kernel:
29 There are four channels associated with each kernel:
27
30
28 * shell: for request/reply calls to the kernel.
31 * shell: for request/reply calls to the kernel.
29 * iopub: for the kernel to publish results to frontends.
32 * iopub: for the kernel to publish results to frontends.
30 * hb: for monitoring the kernel's heartbeat.
33 * hb: for monitoring the kernel's heartbeat.
31 * stdin: for frontends to reply to raw_input calls in the kernel.
34 * stdin: for frontends to reply to raw_input calls in the kernel.
32
35
33 The methods of the channels are exposed as methods of the client itself
36 The methods of the channels are exposed as methods of the client itself
34 (KernelClient.execute, complete, history, etc.).
37 (KernelClient.execute, complete, history, etc.).
35 See the channels themselves for documentation of these methods.
38 See the channels themselves for documentation of these methods.
36
39
37 """
40 """
38
41
39 # The PyZMQ Context to use for communication with the kernel.
42 # The PyZMQ Context to use for communication with the kernel.
40 context = Instance(zmq.Context)
43 context = Instance(zmq.Context)
41 def _context_default(self):
44 def _context_default(self):
42 return zmq.Context.instance()
45 return zmq.Context.instance()
43
46
44 # The classes to use for the various channels
47 # The classes to use for the various channels
45 shell_channel_class = Type(ShellChannelABC)
48 shell_channel_class = Type(ShellChannelABC)
46 iopub_channel_class = Type(IOPubChannelABC)
49 iopub_channel_class = Type(IOPubChannelABC)
47 stdin_channel_class = Type(StdInChannelABC)
50 stdin_channel_class = Type(StdInChannelABC)
48 hb_channel_class = Type(HBChannelABC)
51 hb_channel_class = Type(HBChannelABC)
49
52
50 # Protected traits
53 # Protected traits
51 _shell_channel = Any
54 _shell_channel = Any
52 _iopub_channel = Any
55 _iopub_channel = Any
53 _stdin_channel = Any
56 _stdin_channel = Any
54 _hb_channel = Any
57 _hb_channel = Any
55
58
56 # flag for whether execute requests should be allowed to call raw_input:
59 # flag for whether execute requests should be allowed to call raw_input:
57 allow_stdin = True
60 allow_stdin = True
58
61
59 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
60 # Channel proxy methods
63 # Channel proxy methods
61 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
62
65
63 def _get_msg(channel, *args, **kwargs):
66 def _get_msg(channel, *args, **kwargs):
64 return channel.get_msg(*args, **kwargs)
67 return channel.get_msg(*args, **kwargs)
65
68
66 def get_shell_msg(self, *args, **kwargs):
69 def get_shell_msg(self, *args, **kwargs):
67 """Get a message from the shell channel"""
70 """Get a message from the shell channel"""
68 return self.shell_channel.get_msg(*args, **kwargs)
71 return self.shell_channel.get_msg(*args, **kwargs)
69
72
70 def get_iopub_msg(self, *args, **kwargs):
73 def get_iopub_msg(self, *args, **kwargs):
71 """Get a message from the iopub channel"""
74 """Get a message from the iopub channel"""
72 return self.iopub_channel.get_msg(*args, **kwargs)
75 return self.iopub_channel.get_msg(*args, **kwargs)
73
76
74 def get_stdin_msg(self, *args, **kwargs):
77 def get_stdin_msg(self, *args, **kwargs):
75 """Get a message from the stdin channel"""
78 """Get a message from the stdin channel"""
76 return self.stdin_channel.get_msg(*args, **kwargs)
79 return self.stdin_channel.get_msg(*args, **kwargs)
77
80
78 #--------------------------------------------------------------------------
81 #--------------------------------------------------------------------------
79 # Channel management methods
82 # Channel management methods
80 #--------------------------------------------------------------------------
83 #--------------------------------------------------------------------------
81
84
82 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
85 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
83 """Starts the channels for this kernel.
86 """Starts the channels for this kernel.
84
87
85 This will create the channels if they do not exist and then start
88 This will create the channels if they do not exist and then start
86 them (their activity runs in a thread). If port numbers of 0 are
89 them (their activity runs in a thread). If port numbers of 0 are
87 being used (random ports) then you must first call
90 being used (random ports) then you must first call
88 :meth:`start_kernel`. If the channels have been stopped and you
91 :meth:`start_kernel`. If the channels have been stopped and you
89 call this, :class:`RuntimeError` will be raised.
92 call this, :class:`RuntimeError` will be raised.
90 """
93 """
91 if shell:
94 if shell:
92 self.shell_channel.start()
95 self.shell_channel.start()
93 self.kernel_info()
96 self.kernel_info()
94 if iopub:
97 if iopub:
95 self.iopub_channel.start()
98 self.iopub_channel.start()
96 if stdin:
99 if stdin:
97 self.stdin_channel.start()
100 self.stdin_channel.start()
98 self.allow_stdin = True
101 self.allow_stdin = True
99 else:
102 else:
100 self.allow_stdin = False
103 self.allow_stdin = False
101 if hb:
104 if hb:
102 self.hb_channel.start()
105 self.hb_channel.start()
103
106
104 def stop_channels(self):
107 def stop_channels(self):
105 """Stops all the running channels for this kernel.
108 """Stops all the running channels for this kernel.
106
109
107 This stops their event loops and joins their threads.
110 This stops their event loops and joins their threads.
108 """
111 """
109 if self.shell_channel.is_alive():
112 if self.shell_channel.is_alive():
110 self.shell_channel.stop()
113 self.shell_channel.stop()
111 if self.iopub_channel.is_alive():
114 if self.iopub_channel.is_alive():
112 self.iopub_channel.stop()
115 self.iopub_channel.stop()
113 if self.stdin_channel.is_alive():
116 if self.stdin_channel.is_alive():
114 self.stdin_channel.stop()
117 self.stdin_channel.stop()
115 if self.hb_channel.is_alive():
118 if self.hb_channel.is_alive():
116 self.hb_channel.stop()
119 self.hb_channel.stop()
117
120
118 @property
121 @property
119 def channels_running(self):
122 def channels_running(self):
120 """Are any of the channels created and running?"""
123 """Are any of the channels created and running?"""
121 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
122 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
123
126
124 @property
127 @property
125 def shell_channel(self):
128 def shell_channel(self):
126 """Get the shell channel object for this kernel."""
129 """Get the shell channel object for this kernel."""
127 if self._shell_channel is None:
130 if self._shell_channel is None:
128 url = self._make_url('shell')
131 url = self._make_url('shell')
129 self.log.debug("connecting shell channel to %s", url)
132 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
130 self._shell_channel = self.shell_channel_class(
134 self._shell_channel = self.shell_channel_class(
131 self.context, self.session, url
135 socket, self.session
132 )
136 )
133 return self._shell_channel
137 return self._shell_channel
134
138
135 @property
139 @property
136 def iopub_channel(self):
140 def iopub_channel(self):
137 """Get the iopub channel object for this kernel."""
141 """Get the iopub channel object for this kernel."""
138 if self._iopub_channel is None:
142 if self._iopub_channel is None:
139 url = self._make_url('iopub')
143 url = self._make_url('iopub')
140 self.log.debug("connecting iopub channel to %s", url)
144 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
141 self._iopub_channel = self.iopub_channel_class(
146 self._iopub_channel = self.iopub_channel_class(
142 self.context, self.session, url
147 socket, self.session
143 )
148 )
144 return self._iopub_channel
149 return self._iopub_channel
145
150
146 @property
151 @property
147 def stdin_channel(self):
152 def stdin_channel(self):
148 """Get the stdin channel object for this kernel."""
153 """Get the stdin channel object for this kernel."""
149 if self._stdin_channel is None:
154 if self._stdin_channel is None:
150 url = self._make_url('stdin')
155 url = self._make_url('stdin')
151 self.log.debug("connecting stdin channel to %s", url)
156 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
152 self._stdin_channel = self.stdin_channel_class(
158 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, url
159 socket, self.session
154 )
160 )
155 return self._stdin_channel
161 return self._stdin_channel
156
162
157 @property
163 @property
158 def hb_channel(self):
164 def hb_channel(self):
159 """Get the hb channel object for this kernel."""
165 """Get the hb channel object for this kernel."""
160 if self._hb_channel is None:
166 if self._hb_channel is None:
161 url = self._make_url('hb')
167 url = self._make_url('hb')
162 self.log.debug("connecting heartbeat channel to %s", url)
168 self.log.debug("connecting heartbeat channel to %s", url)
163 self._hb_channel = self.hb_channel_class(
169 self._hb_channel = self.hb_channel_class(
164 self.context, self.session, url
170 self.context, self.session, url
165 )
171 )
166 return self._hb_channel
172 return self._hb_channel
167
173
168 def is_alive(self):
174 def is_alive(self):
169 """Is the kernel process still running?"""
175 """Is the kernel process still running?"""
170 if self._hb_channel is not None:
176 if self._hb_channel is not None:
171 # We didn't start the kernel with this KernelManager so we
177 # We didn't start the kernel with this KernelManager so we
172 # use the heartbeat.
178 # use the heartbeat.
173 return self._hb_channel.is_beating()
179 return self._hb_channel.is_beating()
174 else:
180 else:
175 # no heartbeat and not local, we can't tell if it's running,
181 # no heartbeat and not local, we can't tell if it's running,
176 # so naively return True
182 # so naively return True
177 return True
183 return True
178
184
179
185
180 # Methods to send specific messages on channels
186 # Methods to send specific messages on channels
181 def execute(self, code, silent=False, store_history=True,
187 def execute(self, code, silent=False, store_history=True,
182 user_expressions=None, allow_stdin=None):
188 user_expressions=None, allow_stdin=None):
183 """Execute code in the kernel.
189 """Execute code in the kernel.
184
190
185 Parameters
191 Parameters
186 ----------
192 ----------
187 code : str
193 code : str
188 A string of Python code.
194 A string of Python code.
189
195
190 silent : bool, optional (default False)
196 silent : bool, optional (default False)
191 If set, the kernel will execute the code as quietly possible, and
197 If set, the kernel will execute the code as quietly possible, and
192 will force store_history to be False.
198 will force store_history to be False.
193
199
194 store_history : bool, optional (default True)
200 store_history : bool, optional (default True)
195 If set, the kernel will store command history. This is forced
201 If set, the kernel will store command history. This is forced
196 to be False if silent is True.
202 to be False if silent is True.
197
203
198 user_expressions : dict, optional
204 user_expressions : dict, optional
199 A dict mapping names to expressions to be evaluated in the user's
205 A dict mapping names to expressions to be evaluated in the user's
200 dict. The expression values are returned as strings formatted using
206 dict. The expression values are returned as strings formatted using
201 :func:`repr`.
207 :func:`repr`.
202
208
203 allow_stdin : bool, optional (default self.allow_stdin)
209 allow_stdin : bool, optional (default self.allow_stdin)
204 Flag for whether the kernel can send stdin requests to frontends.
210 Flag for whether the kernel can send stdin requests to frontends.
205
211
206 Some frontends (e.g. the Notebook) do not support stdin requests.
212 Some frontends (e.g. the Notebook) do not support stdin requests.
207 If raw_input is called from code executed from such a frontend, a
213 If raw_input is called from code executed from such a frontend, a
208 StdinNotImplementedError will be raised.
214 StdinNotImplementedError will be raised.
209
215
210 Returns
216 Returns
211 -------
217 -------
212 The msg_id of the message sent.
218 The msg_id of the message sent.
213 """
219 """
214 if user_expressions is None:
220 if user_expressions is None:
215 user_expressions = {}
221 user_expressions = {}
216 if allow_stdin is None:
222 if allow_stdin is None:
217 allow_stdin = self.allow_stdin
223 allow_stdin = self.allow_stdin
218
224
219
225
220 # Don't waste network traffic if inputs are invalid
226 # Don't waste network traffic if inputs are invalid
221 if not isinstance(code, string_types):
227 if not isinstance(code, string_types):
222 raise ValueError('code %r must be a string' % code)
228 raise ValueError('code %r must be a string' % code)
223 validate_string_dict(user_expressions)
229 validate_string_dict(user_expressions)
224
230
225 # Create class for content/msg creation. Related to, but possibly
231 # Create class for content/msg creation. Related to, but possibly
226 # not in Session.
232 # not in Session.
227 content = dict(code=code, silent=silent, store_history=store_history,
233 content = dict(code=code, silent=silent, store_history=store_history,
228 user_expressions=user_expressions,
234 user_expressions=user_expressions,
229 allow_stdin=allow_stdin,
235 allow_stdin=allow_stdin,
230 )
236 )
231 msg = self.session.msg('execute_request', content)
237 msg = self.session.msg('execute_request', content)
232 self.shell_channel._queue_send(msg)
238 self.shell_channel._queue_send(msg)
233 return msg['header']['msg_id']
239 return msg['header']['msg_id']
234
240
235 def complete(self, code, cursor_pos=None):
241 def complete(self, code, cursor_pos=None):
236 """Tab complete text in the kernel's namespace.
242 """Tab complete text in the kernel's namespace.
237
243
238 Parameters
244 Parameters
239 ----------
245 ----------
240 code : str
246 code : str
241 The context in which completion is requested.
247 The context in which completion is requested.
242 Can be anything between a variable name and an entire cell.
248 Can be anything between a variable name and an entire cell.
243 cursor_pos : int, optional
249 cursor_pos : int, optional
244 The position of the cursor in the block of code where the completion was requested.
250 The position of the cursor in the block of code where the completion was requested.
245 Default: ``len(code)``
251 Default: ``len(code)``
246
252
247 Returns
253 Returns
248 -------
254 -------
249 The msg_id of the message sent.
255 The msg_id of the message sent.
250 """
256 """
251 if cursor_pos is None:
257 if cursor_pos is None:
252 cursor_pos = len(code)
258 cursor_pos = len(code)
253 content = dict(code=code, cursor_pos=cursor_pos)
259 content = dict(code=code, cursor_pos=cursor_pos)
254 msg = self.session.msg('complete_request', content)
260 msg = self.session.msg('complete_request', content)
255 self.shell_channel._queue_send(msg)
261 self.shell_channel._queue_send(msg)
256 return msg['header']['msg_id']
262 return msg['header']['msg_id']
257
263
258 def inspect(self, code, cursor_pos=None, detail_level=0):
264 def inspect(self, code, cursor_pos=None, detail_level=0):
259 """Get metadata information about an object in the kernel's namespace.
265 """Get metadata information about an object in the kernel's namespace.
260
266
261 It is up to the kernel to determine the appropriate object to inspect.
267 It is up to the kernel to determine the appropriate object to inspect.
262
268
263 Parameters
269 Parameters
264 ----------
270 ----------
265 code : str
271 code : str
266 The context in which info is requested.
272 The context in which info is requested.
267 Can be anything between a variable name and an entire cell.
273 Can be anything between a variable name and an entire cell.
268 cursor_pos : int, optional
274 cursor_pos : int, optional
269 The position of the cursor in the block of code where the info was requested.
275 The position of the cursor in the block of code where the info was requested.
270 Default: ``len(code)``
276 Default: ``len(code)``
271 detail_level : int, optional
277 detail_level : int, optional
272 The level of detail for the introspection (0-2)
278 The level of detail for the introspection (0-2)
273
279
274 Returns
280 Returns
275 -------
281 -------
276 The msg_id of the message sent.
282 The msg_id of the message sent.
277 """
283 """
278 if cursor_pos is None:
284 if cursor_pos is None:
279 cursor_pos = len(code)
285 cursor_pos = len(code)
280 content = dict(code=code, cursor_pos=cursor_pos,
286 content = dict(code=code, cursor_pos=cursor_pos,
281 detail_level=detail_level,
287 detail_level=detail_level,
282 )
288 )
283 msg = self.session.msg('inspect_request', content)
289 msg = self.session.msg('inspect_request', content)
284 self.shell_channel._queue_send(msg)
290 self.shell_channel._queue_send(msg)
285 return msg['header']['msg_id']
291 return msg['header']['msg_id']
286
292
287 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
293 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
288 """Get entries from the kernel's history list.
294 """Get entries from the kernel's history list.
289
295
290 Parameters
296 Parameters
291 ----------
297 ----------
292 raw : bool
298 raw : bool
293 If True, return the raw input.
299 If True, return the raw input.
294 output : bool
300 output : bool
295 If True, then return the output as well.
301 If True, then return the output as well.
296 hist_access_type : str
302 hist_access_type : str
297 'range' (fill in session, start and stop params), 'tail' (fill in n)
303 'range' (fill in session, start and stop params), 'tail' (fill in n)
298 or 'search' (fill in pattern param).
304 or 'search' (fill in pattern param).
299
305
300 session : int
306 session : int
301 For a range request, the session from which to get lines. Session
307 For a range request, the session from which to get lines. Session
302 numbers are positive integers; negative ones count back from the
308 numbers are positive integers; negative ones count back from the
303 current session.
309 current session.
304 start : int
310 start : int
305 The first line number of a history range.
311 The first line number of a history range.
306 stop : int
312 stop : int
307 The final (excluded) line number of a history range.
313 The final (excluded) line number of a history range.
308
314
309 n : int
315 n : int
310 The number of lines of history to get for a tail request.
316 The number of lines of history to get for a tail request.
311
317
312 pattern : str
318 pattern : str
313 The glob-syntax pattern for a search request.
319 The glob-syntax pattern for a search request.
314
320
315 Returns
321 Returns
316 -------
322 -------
317 The msg_id of the message sent.
323 The msg_id of the message sent.
318 """
324 """
319 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
325 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
320 **kwargs)
326 **kwargs)
321 msg = self.session.msg('history_request', content)
327 msg = self.session.msg('history_request', content)
322 self.shell_channel._queue_send(msg)
328 self.shell_channel._queue_send(msg)
323 return msg['header']['msg_id']
329 return msg['header']['msg_id']
324
330
325 def kernel_info(self):
331 def kernel_info(self):
326 """Request kernel info."""
332 """Request kernel info."""
327 msg = self.session.msg('kernel_info_request')
333 msg = self.session.msg('kernel_info_request')
328 self.shell_channel._queue_send(msg)
334 self.shell_channel._queue_send(msg)
329 return msg['header']['msg_id']
335 return msg['header']['msg_id']
330
336
331 def _handle_kernel_info_reply(self, msg):
337 def _handle_kernel_info_reply(self, msg):
332 """handle kernel info reply
338 """handle kernel info reply
333
339
334 sets protocol adaptation version
340 sets protocol adaptation version
335 """
341 """
336 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
342 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
337 if adapt_version != major_protocol_version:
343 if adapt_version != major_protocol_version:
338 self.session.adapt_version = adapt_version
344 self.session.adapt_version = adapt_version
339
345
340 def shutdown(self, restart=False):
346 def shutdown(self, restart=False):
341 """Request an immediate kernel shutdown.
347 """Request an immediate kernel shutdown.
342
348
343 Upon receipt of the (empty) reply, client code can safely assume that
349 Upon receipt of the (empty) reply, client code can safely assume that
344 the kernel has shut down and it's safe to forcefully terminate it if
350 the kernel has shut down and it's safe to forcefully terminate it if
345 it's still alive.
351 it's still alive.
346
352
347 The kernel will send the reply via a function registered with Python's
353 The kernel will send the reply via a function registered with Python's
348 atexit module, ensuring it's truly done as the kernel is done with all
354 atexit module, ensuring it's truly done as the kernel is done with all
349 normal operation.
355 normal operation.
350 """
356 """
351 # Send quit message to kernel. Once we implement kernel-side setattr,
357 # Send quit message to kernel. Once we implement kernel-side setattr,
352 # this should probably be done that way, but for now this will do.
358 # this should probably be done that way, but for now this will do.
353 msg = self.session.msg('shutdown_request', {'restart':restart})
359 msg = self.session.msg('shutdown_request', {'restart':restart})
354 self.shell_channel._queue_send(msg)
360 self.shell_channel._queue_send(msg)
355 return msg['header']['msg_id']
361 return msg['header']['msg_id']
356
362
357 def is_complete(self, code):
363 def is_complete(self, code):
358 msg = self.session.msg('is_complete_request', {'code': code})
364 msg = self.session.msg('is_complete_request', {'code': code})
359 self.shell_channel._queue_send(msg)
365 self.shell_channel._queue_send(msg)
360 return msg['header']['msg_id']
366 return msg['header']['msg_id']
361
367
362 def input(self, string):
368 def input(self, string):
363 """Send a string of raw input to the kernel."""
369 """Send a string of raw input to the kernel."""
364 content = dict(value=string)
370 content = dict(value=string)
365 msg = self.session.msg('input_reply', content)
371 msg = self.session.msg('input_reply', content)
366 self.stdin_channel._queue_send(msg)
372 self.stdin_channel._queue_send(msg)
367
373
368
374
369 KernelClientABC.register(KernelClient)
375 KernelClientABC.register(KernelClient)
@@ -1,381 +1,370 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
26 pass
27
27
28 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
29
29
30 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
32 )
33 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
34
34
35 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
36
36
37 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
38 pass
38 pass
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43
43
44 def validate_string_dict(dct):
44 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
46
46
47 Raises ValueError if not."""
47 Raises ValueError if not."""
48 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject, Thread):
57 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
58 context = None
59 session = None
58 session = None
60 socket = None
59 socket = None
61 ioloop = None
60 ioloop = None
62 stream = None
61 stream = None
63 _address = None
64 _exiting = False
62 _exiting = False
65 proxy_methods = []
63 proxy_methods = []
66
64
67 # Emitted when the channel is started.
65 # Emitted when the channel is started.
68 started = QtCore.Signal()
66 started = QtCore.Signal()
69
67
70 # Emitted when the channel is stopped.
68 # Emitted when the channel is stopped.
71 stopped = QtCore.Signal()
69 stopped = QtCore.Signal()
72
70
73 message_received = QtCore.Signal(object)
71 message_received = QtCore.Signal(object)
74
72
75 #---------------------------------------------------------------------------
73 #---------------------------------------------------------------------------
76 # InProcessChannel interface
74 # InProcessChannel interface
77 #---------------------------------------------------------------------------
75 #---------------------------------------------------------------------------
78
76
79 def call_handlers_later(self, *args, **kwds):
77 def call_handlers_later(self, *args, **kwds):
80 """ Call the message handlers later.
78 """ Call the message handlers later.
81 """
79 """
82 do_later = lambda: self.call_handlers(*args, **kwds)
80 do_later = lambda: self.call_handlers(*args, **kwds)
83 QtCore.QTimer.singleShot(0, do_later)
81 QtCore.QTimer.singleShot(0, do_later)
84
82
85 def process_events(self):
83 def process_events(self):
86 """ Process any pending GUI events.
84 """ Process any pending GUI events.
87 """
85 """
88 QtCore.QCoreApplication.instance().processEvents()
86 QtCore.QCoreApplication.instance().processEvents()
89
87
90 def __init__(self, context, session, address):
88 def __init__(self, socket, session):
91 """Create a channel.
89 """Create a channel.
92
90
93 Parameters
91 Parameters
94 ----------
92 ----------
95 context : :class:`zmq.Context`
93 context : :class:`zmq.Context`
96 The ZMQ context to use.
94 The ZMQ context to use.
97 session : :class:`session.Session`
95 session : :class:`session.Session`
98 The session to use.
96 The session to use.
99 address : zmq url
97 address : zmq url
100 Standard (ip, port) tuple that the kernel is listening on.
98 Standard (ip, port) tuple that the kernel is listening on.
101 """
99 """
102 super(QtZMQSocketChannel, self).__init__()
100 super(QtZMQSocketChannel, self).__init__()
103 self.daemon = True
101 self.daemon = True
104
102
105 self.context = context
103 self.socket = socket
106 self.session = session
104 self.session = session
107 if isinstance(address, tuple):
108 if address[1] == 0:
109 message = 'The port number for a channel cannot be 0.'
110 raise InvalidPortNumber(message)
111 address = "tcp://%s:%i" % address
112 self._address = address
113 atexit.register(self._notice_exit)
105 atexit.register(self._notice_exit)
114
106
115 def _notice_exit(self):
107 def _notice_exit(self):
116 self._exiting = True
108 self._exiting = True
117
109
118 def _run_loop(self):
110 def _run_loop(self):
119 """Run my loop, ignoring EINTR events in the poller"""
111 """Run my loop, ignoring EINTR events in the poller"""
120 while True:
112 while True:
121 try:
113 try:
122 self.ioloop.start()
114 self.ioloop.start()
123 except ZMQError as e:
115 except ZMQError as e:
124 if e.errno == errno.EINTR:
116 if e.errno == errno.EINTR:
125 continue
117 continue
126 else:
118 else:
127 raise
119 raise
128 except Exception:
120 except Exception:
129 if self._exiting:
121 if self._exiting:
130 break
122 break
131 else:
123 else:
132 raise
124 raise
133 else:
125 else:
134 break
126 break
135
127
136 def start(self):
128 def start(self):
137 """ Reimplemented to emit signal.
129 """ Reimplemented to emit signal.
138 """
130 """
139 super(QtZMQSocketChannel, self).start()
131 super(QtZMQSocketChannel, self).start()
140 self.started.emit()
132 self.started.emit()
141
133
142 def stop(self):
134 def stop(self):
143 """Stop the channel's event loop and join its thread.
135 """Stop the channel's event loop and join its thread.
144
136
145 This calls :meth:`~threading.Thread.join` and returns when the thread
137 This calls :meth:`~threading.Thread.join` and returns when the thread
146 terminates. :class:`RuntimeError` will be raised if
138 terminates. :class:`RuntimeError` will be raised if
147 :meth:`~threading.Thread.start` is called again.
139 :meth:`~threading.Thread.start` is called again.
148 """
140 """
149 if self.ioloop is not None:
141 if self.ioloop is not None:
150 self.ioloop.stop()
142 self.ioloop.stop()
151 self.join()
143 self.join()
152 self.close()
144 self.close()
153 self.stopped.emit()
145 self.stopped.emit()
154
146
155 def close(self):
147 def close(self):
156 if self.ioloop is not None:
148 if self.ioloop is not None:
157 try:
149 try:
158 self.ioloop.close(all_fds=True)
150 self.ioloop.close(all_fds=True)
159 except Exception:
151 except Exception:
160 pass
152 pass
161 if self.socket is not None:
153 if self.socket is not None:
162 try:
154 try:
163 self.socket.close(linger=0)
155 self.socket.close(linger=0)
164 except Exception:
156 except Exception:
165 pass
157 pass
166 self.socket = None
158 self.socket = None
167
159
168 @property
160 @property
169 def address(self):
161 def address(self):
170 """Get the channel's address as a zmq url string.
162 """Get the channel's address as a zmq url string.
171
163
172 These URLS have the form: 'tcp://127.0.0.1:5555'.
164 These URLS have the form: 'tcp://127.0.0.1:5555'.
173 """
165 """
174 return self._address
166 return self._address
175
167
176 def _queue_send(self, msg):
168 def _queue_send(self, msg):
177 """Queue a message to be sent from the IOLoop's thread.
169 """Queue a message to be sent from the IOLoop's thread.
178
170
179 Parameters
171 Parameters
180 ----------
172 ----------
181 msg : message to send
173 msg : message to send
182
174
183 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
175 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
184 thread control of the action.
176 thread control of the action.
185 """
177 """
186 def thread_send():
178 def thread_send():
187 self.session.send(self.stream, msg)
179 self.session.send(self.stream, msg)
188 self.ioloop.add_callback(thread_send)
180 self.ioloop.add_callback(thread_send)
189
181
190 def _handle_recv(self, msg):
182 def _handle_recv(self, msg):
191 """Callback for stream.on_recv.
183 """Callback for stream.on_recv.
192
184
193 Unpacks message, and calls handlers with it.
185 Unpacks message, and calls handlers with it.
194 """
186 """
195 ident,smsg = self.session.feed_identities(msg)
187 ident,smsg = self.session.feed_identities(msg)
196 msg = self.session.deserialize(smsg)
188 msg = self.session.deserialize(smsg)
197 self.call_handlers(msg)
189 self.call_handlers(msg)
198
190
199 def call_handlers(self, msg):
191 def call_handlers(self, msg):
200 """This method is called in the ioloop thread when a message arrives.
192 """This method is called in the ioloop thread when a message arrives.
201
193
202 Subclasses should override this method to handle incoming messages.
194 Subclasses should override this method to handle incoming messages.
203 It is important to remember that this method is called in the thread
195 It is important to remember that this method is called in the thread
204 so that some logic must be done to ensure that the application level
196 so that some logic must be done to ensure that the application level
205 handlers are called in the application thread.
197 handlers are called in the application thread.
206 """
198 """
207 # Emit the generic signal.
199 # Emit the generic signal.
208 self.message_received.emit(msg)
200 self.message_received.emit(msg)
209
201
210
202
211 class QtShellChannel(QtZMQSocketChannel):
203 class QtShellChannel(QtZMQSocketChannel):
212 """The shell channel for issuing request/replies to the kernel."""
204 """The shell channel for issuing request/replies to the kernel."""
213
205
214 # Emitted when a reply has been received for the corresponding request type.
206 # Emitted when a reply has been received for the corresponding request type.
215 execute_reply = QtCore.Signal(object)
207 execute_reply = QtCore.Signal(object)
216 complete_reply = QtCore.Signal(object)
208 complete_reply = QtCore.Signal(object)
217 inspect_reply = QtCore.Signal(object)
209 inspect_reply = QtCore.Signal(object)
218 history_reply = QtCore.Signal(object)
210 history_reply = QtCore.Signal(object)
219 kernel_info_reply = QtCore.Signal(object)
211 kernel_info_reply = QtCore.Signal(object)
220
212
221 def __init__(self, context, session, address):
213 def __init__(self, socket, session):
222 super(QtShellChannel, self).__init__(context, session, address)
214 super(QtShellChannel, self).__init__(socket, session)
223 self.ioloop = ioloop.IOLoop()
215 self.ioloop = ioloop.IOLoop()
224
216
225 def run(self):
217 def run(self):
226 """The thread's main activity. Call start() instead."""
218 """The thread's main activity. Call start() instead."""
227 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
228 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
219 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
229 self.stream.on_recv(self._handle_recv)
220 self.stream.on_recv(self._handle_recv)
230 self._run_loop()
221 self._run_loop()
231
222
232 def call_handlers(self, msg):
223 def call_handlers(self, msg):
233 super(QtShellChannel, self).call_handlers(msg)
224 super(QtShellChannel, self).call_handlers(msg)
234
225
235 # Catch kernel_info_reply for message spec adaptation
226 # Catch kernel_info_reply for message spec adaptation
236 msg_type = msg['header']['msg_type']
227 msg_type = msg['header']['msg_type']
237 if msg_type == 'kernel_info_reply':
228 if msg_type == 'kernel_info_reply':
238 self._handle_kernel_info_reply(msg)
229 self._handle_kernel_info_reply(msg)
239
230
240 # Emit specific signals
231 # Emit specific signals
241 signal = getattr(self, msg_type, None)
232 signal = getattr(self, msg_type, None)
242 if signal:
233 if signal:
243 signal.emit(msg)
234 signal.emit(msg)
244
235
245 def _handle_kernel_info_reply(self, msg):
236 def _handle_kernel_info_reply(self, msg):
246 """handle kernel info reply
237 """handle kernel info reply
247
238
248 sets protocol adaptation version
239 sets protocol adaptation version
249 """
240 """
250 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
241 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
251 if adapt_version != major_protocol_version:
242 if adapt_version != major_protocol_version:
252 self.session.adapt_version = adapt_version
243 self.session.adapt_version = adapt_version
253
244
254
245
255 class QtIOPubChannel(QtZMQSocketChannel):
246 class QtIOPubChannel(QtZMQSocketChannel):
256 """The iopub channel which listens for messages that the kernel publishes.
247 """The iopub channel which listens for messages that the kernel publishes.
257
248
258 This channel is where all output is published to frontends.
249 This channel is where all output is published to frontends.
259 """
250 """
260 # Emitted when a message of type 'stream' is received.
251 # Emitted when a message of type 'stream' is received.
261 stream_received = QtCore.Signal(object)
252 stream_received = QtCore.Signal(object)
262
253
263 # Emitted when a message of type 'execute_input' is received.
254 # Emitted when a message of type 'execute_input' is received.
264 execute_input_received = QtCore.Signal(object)
255 execute_input_received = QtCore.Signal(object)
265
256
266 # Emitted when a message of type 'execute_result' is received.
257 # Emitted when a message of type 'execute_result' is received.
267 execute_result_received = QtCore.Signal(object)
258 execute_result_received = QtCore.Signal(object)
268
259
269 # Emitted when a message of type 'error' is received.
260 # Emitted when a message of type 'error' is received.
270 error_received = QtCore.Signal(object)
261 error_received = QtCore.Signal(object)
271
262
272 # Emitted when a message of type 'display_data' is received
263 # Emitted when a message of type 'display_data' is received
273 display_data_received = QtCore.Signal(object)
264 display_data_received = QtCore.Signal(object)
274
265
275 # Emitted when a crash report message is received from the kernel's
266 # Emitted when a crash report message is received from the kernel's
276 # last-resort sys.excepthook.
267 # last-resort sys.excepthook.
277 crash_received = QtCore.Signal(object)
268 crash_received = QtCore.Signal(object)
278
269
279 # Emitted when a shutdown is noticed.
270 # Emitted when a shutdown is noticed.
280 shutdown_reply_received = QtCore.Signal(object)
271 shutdown_reply_received = QtCore.Signal(object)
281
272
282 def __init__(self, context, session, address):
273 def __init__(self, socket, session):
283 super(QtIOPubChannel, self).__init__(context, session, address)
274 super(QtIOPubChannel, self).__init__(socket, session)
284 self.ioloop = ioloop.IOLoop()
275 self.ioloop = ioloop.IOLoop()
285
276
286 def run(self):
277 def run(self):
287 """The thread's main activity. Call start() instead."""
278 """The thread's main activity. Call start() instead."""
288 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
289 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
279 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
290 self.stream.on_recv(self._handle_recv)
280 self.stream.on_recv(self._handle_recv)
291 self._run_loop()
281 self._run_loop()
292
282
293 def call_handlers(self, msg):
283 def call_handlers(self, msg):
294 super(QtIOPubChannel, self).call_handlers(msg)
284 super(QtIOPubChannel, self).call_handlers(msg)
295
285
296 # Emit signals for specialized message types.
286 # Emit signals for specialized message types.
297 msg_type = msg['header']['msg_type']
287 msg_type = msg['header']['msg_type']
298 signal = getattr(self, msg_type + '_received', None)
288 signal = getattr(self, msg_type + '_received', None)
299 if signal:
289 if signal:
300 signal.emit(msg)
290 signal.emit(msg)
301
291
302 def flush(self, timeout=1.0):
292 def flush(self, timeout=1.0):
303 """Immediately processes all pending messages on the iopub channel.
293 """Immediately processes all pending messages on the iopub channel.
304
294
305 Callers should use this method to ensure that :meth:`call_handlers`
295 Callers should use this method to ensure that :meth:`call_handlers`
306 has been called for all messages that have been received on the
296 has been called for all messages that have been received on the
307 0MQ SUB socket of this channel.
297 0MQ SUB socket of this channel.
308
298
309 This method is thread safe.
299 This method is thread safe.
310
300
311 Parameters
301 Parameters
312 ----------
302 ----------
313 timeout : float, optional
303 timeout : float, optional
314 The maximum amount of time to spend flushing, in seconds. The
304 The maximum amount of time to spend flushing, in seconds. The
315 default is one second.
305 default is one second.
316 """
306 """
317 # We do the IOLoop callback process twice to ensure that the IOLoop
307 # We do the IOLoop callback process twice to ensure that the IOLoop
318 # gets to perform at least one full poll.
308 # gets to perform at least one full poll.
319 stop_time = time.time() + timeout
309 stop_time = time.time() + timeout
320 for i in range(2):
310 for i in range(2):
321 self._flushed = False
311 self._flushed = False
322 self.ioloop.add_callback(self._flush)
312 self.ioloop.add_callback(self._flush)
323 while not self._flushed and time.time() < stop_time:
313 while not self._flushed and time.time() < stop_time:
324 time.sleep(0.01)
314 time.sleep(0.01)
325
315
326 def _flush(self):
316 def _flush(self):
327 """Callback for :method:`self.flush`."""
317 """Callback for :method:`self.flush`."""
328 self.stream.flush()
318 self.stream.flush()
329 self._flushed = True
319 self._flushed = True
330
320
331
321
332 class QtStdInChannel(QtZMQSocketChannel):
322 class QtStdInChannel(QtZMQSocketChannel):
333 """The stdin channel to handle raw_input requests that the kernel makes."""
323 """The stdin channel to handle raw_input requests that the kernel makes."""
334
324
335 msg_queue = None
325 msg_queue = None
336 proxy_methods = ['input']
326 proxy_methods = ['input']
337
327
338 # Emitted when an input request is received.
328 # Emitted when an input request is received.
339 input_requested = QtCore.Signal(object)
329 input_requested = QtCore.Signal(object)
340
330
341 def __init__(self, context, session, address):
331 def __init__(self, socket, session):
342 super(QtStdInChannel, self).__init__(context, session, address)
332 super(QtStdInChannel, self).__init__(socket, session)
343 self.ioloop = ioloop.IOLoop()
333 self.ioloop = ioloop.IOLoop()
344
334
345 def run(self):
335 def run(self):
346 """The thread's main activity. Call start() instead."""
336 """The thread's main activity. Call start() instead."""
347 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
348 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
337 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
349 self.stream.on_recv(self._handle_recv)
338 self.stream.on_recv(self._handle_recv)
350 self._run_loop()
339 self._run_loop()
351
340
352 def call_handlers(self, msg):
341 def call_handlers(self, msg):
353 super(QtStdInChannel, self).call_handlers(msg)
342 super(QtStdInChannel, self).call_handlers(msg)
354
343
355 # Emit signals for specialized message types.
344 # Emit signals for specialized message types.
356 msg_type = msg['header']['msg_type']
345 msg_type = msg['header']['msg_type']
357 if msg_type == 'input_request':
346 if msg_type == 'input_request':
358 self.input_requested.emit(msg)
347 self.input_requested.emit(msg)
359
348
360
349
361 ShellChannelABC.register(QtShellChannel)
350 ShellChannelABC.register(QtShellChannel)
362 IOPubChannelABC.register(QtIOPubChannel)
351 IOPubChannelABC.register(QtIOPubChannel)
363 StdInChannelABC.register(QtStdInChannel)
352 StdInChannelABC.register(QtStdInChannel)
364
353
365
354
366 class QtKernelClient(QtKernelClientMixin, KernelClient):
355 class QtKernelClient(QtKernelClientMixin, KernelClient):
367 """ A KernelClient that provides signals and slots.
356 """ A KernelClient that provides signals and slots.
368 """
357 """
369 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
358 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
370 if shell:
359 if shell:
371 self.shell_channel.kernel_info_reply.connect(self._handle_kernel_info_reply)
360 self.shell_channel.kernel_info_reply.connect(self._handle_kernel_info_reply)
372 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
361 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
373
362
374 def _handle_kernel_info_reply(self, msg):
363 def _handle_kernel_info_reply(self, msg):
375 super(QtKernelClient, self)._handle_kernel_info_reply(msg)
364 super(QtKernelClient, self)._handle_kernel_info_reply(msg)
376 self.shell_channel.kernel_info_reply.disconnect(self._handle_kernel_info_reply)
365 self.shell_channel.kernel_info_reply.disconnect(self._handle_kernel_info_reply)
377
366
378 iopub_channel_class = Type(QtIOPubChannel)
367 iopub_channel_class = Type(QtIOPubChannel)
379 shell_channel_class = Type(QtShellChannel)
368 shell_channel_class = Type(QtShellChannel)
380 stdin_channel_class = Type(QtStdInChannel)
369 stdin_channel_class = Type(QtStdInChannel)
381 hb_channel_class = Type(QtHBChannel)
370 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now