##// END OF EJS Templates
Remove unused copies of validate_string_dict
Thomas Kluyver -
Show More
@@ -1,107 +1,92
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14 from IPython.utils.py3compat import string_types, iteritems
15
16 # some utilities to validate message structure, these might get moved elsewhere
17 # if they prove to have more generic utility
18
19 def validate_string_dict(dct):
20 """Validate that the input is a dict with string keys and values.
21
22 Raises ValueError if not."""
23 for k,v in iteritems(dct):
24 if not isinstance(k, string_types):
25 raise ValueError('key %r in dict must be a string' % k)
26 if not isinstance(v, string_types):
27 raise ValueError('value %r in dict must be a string' % v)
28
29
14
30 class ZMQSocketChannel(object):
15 class ZMQSocketChannel(object):
31 """A ZMQ socket in a simple blocking API"""
16 """A ZMQ socket in a simple blocking API"""
32 session = None
17 session = None
33 socket = None
18 socket = None
34 stream = None
19 stream = None
35 _exiting = False
20 _exiting = False
36 proxy_methods = []
21 proxy_methods = []
37
22
38 def __init__(self, socket, session, loop=None):
23 def __init__(self, socket, session, loop=None):
39 """Create a channel.
24 """Create a channel.
40
25
41 Parameters
26 Parameters
42 ----------
27 ----------
43 socket : :class:`zmq.Socket`
28 socket : :class:`zmq.Socket`
44 The ZMQ socket to use.
29 The ZMQ socket to use.
45 session : :class:`session.Session`
30 session : :class:`session.Session`
46 The session to use.
31 The session to use.
47 loop
32 loop
48 Unused here, for other implementations
33 Unused here, for other implementations
49 """
34 """
50 super(ZMQSocketChannel, self).__init__()
35 super(ZMQSocketChannel, self).__init__()
51
36
52 self.socket = socket
37 self.socket = socket
53 self.session = session
38 self.session = session
54
39
55 def _recv(self, **kwargs):
40 def _recv(self, **kwargs):
56 msg = self.socket.recv_multipart(**kwargs)
41 msg = self.socket.recv_multipart(**kwargs)
57 ident,smsg = self.session.feed_identities(msg)
42 ident,smsg = self.session.feed_identities(msg)
58 return self.session.deserialize(smsg)
43 return self.session.deserialize(smsg)
59
44
60 def get_msg(self, block=True, timeout=None):
45 def get_msg(self, block=True, timeout=None):
61 """ Gets a message if there is one that is ready. """
46 """ Gets a message if there is one that is ready. """
62 if block:
47 if block:
63 if timeout is not None:
48 if timeout is not None:
64 timeout *= 1000 # seconds to ms
49 timeout *= 1000 # seconds to ms
65 ready = self.socket.poll(timeout)
50 ready = self.socket.poll(timeout)
66 else:
51 else:
67 ready = self.socket.poll(timeout=0)
52 ready = self.socket.poll(timeout=0)
68
53
69 if ready:
54 if ready:
70 return self._recv()
55 return self._recv()
71 else:
56 else:
72 raise Empty
57 raise Empty
73
58
74 def get_msgs(self):
59 def get_msgs(self):
75 """ Get all messages that are currently ready. """
60 """ Get all messages that are currently ready. """
76 msgs = []
61 msgs = []
77 while True:
62 while True:
78 try:
63 try:
79 msgs.append(self.get_msg(block=False))
64 msgs.append(self.get_msg(block=False))
80 except Empty:
65 except Empty:
81 break
66 break
82 return msgs
67 return msgs
83
68
84 def msg_ready(self):
69 def msg_ready(self):
85 """ Is there a message that has been received? """
70 """ Is there a message that has been received? """
86 return bool(self.socket.poll(timeout=0))
71 return bool(self.socket.poll(timeout=0))
87
72
88 def close(self):
73 def close(self):
89 if self.socket is not None:
74 if self.socket is not None:
90 try:
75 try:
91 self.socket.close(linger=0)
76 self.socket.close(linger=0)
92 except Exception:
77 except Exception:
93 pass
78 pass
94 self.socket = None
79 self.socket = None
95 stop = close
80 stop = close
96
81
97 def is_alive(self):
82 def is_alive(self):
98 return (self.socket is not None)
83 return (self.socket is not None)
99
84
100 def _queue_send(self, msg):
85 def _queue_send(self, msg):
101 """Pass a message to the ZMQ socket to send
86 """Pass a message to the ZMQ socket to send
102 """
87 """
103 self.session.send(self.socket, msg)
88 self.session.send(self.socket, msg)
104
89
105 def start(self):
90 def start(self):
106 pass
91 pass
107
92
@@ -1,245 +1,226
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
18
17
19 from IPython.core.release import kernel_protocol_version_info
18 from IPython.core.release import kernel_protocol_version_info
20
19
21 from .channelsabc import HBChannelABC
20 from .channelsabc import HBChannelABC
22 from IPython.utils.py3compat import string_types, iteritems
23
21
24 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
25 # Constants and exceptions
23 # Constants and exceptions
26 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
27
25
28 major_protocol_version = kernel_protocol_version_info[0]
26 major_protocol_version = kernel_protocol_version_info[0]
29
27
30 class InvalidPortNumber(Exception):
28 class InvalidPortNumber(Exception):
31 pass
29 pass
32
30
33 #-----------------------------------------------------------------------------
34 # Utility functions
35 #-----------------------------------------------------------------------------
36
37 # some utilities to validate message structure, these might get moved elsewhere
38 # if they prove to have more generic utility
39
40 def validate_string_dict(dct):
41 """Validate that the input is a dict with string keys and values.
42
43 Raises ValueError if not."""
44 for k,v in iteritems(dct):
45 if not isinstance(k, string_types):
46 raise ValueError('key %r in dict must be a string' % k)
47 if not isinstance(v, string_types):
48 raise ValueError('value %r in dict must be a string' % v)
49
50
31
51 def make_shell_socket(context, identity, address):
32 def make_shell_socket(context, identity, address):
52 socket = context.socket(zmq.DEALER)
33 socket = context.socket(zmq.DEALER)
53 socket.linger = 1000
34 socket.linger = 1000
54 socket.setsockopt(zmq.IDENTITY, identity)
35 socket.setsockopt(zmq.IDENTITY, identity)
55 socket.connect(address)
36 socket.connect(address)
56 return socket
37 return socket
57
38
58 def make_iopub_socket(context, identity, address):
39 def make_iopub_socket(context, identity, address):
59 socket = context.socket(zmq.SUB)
40 socket = context.socket(zmq.SUB)
60 socket.linger = 1000
41 socket.linger = 1000
61 socket.setsockopt(zmq.SUBSCRIBE,b'')
42 socket.setsockopt(zmq.SUBSCRIBE,b'')
62 socket.setsockopt(zmq.IDENTITY, identity)
43 socket.setsockopt(zmq.IDENTITY, identity)
63 socket.connect(address)
44 socket.connect(address)
64 return socket
45 return socket
65
46
66 def make_stdin_socket(context, identity, address):
47 def make_stdin_socket(context, identity, address):
67 socket = context.socket(zmq.DEALER)
48 socket = context.socket(zmq.DEALER)
68 socket.linger = 1000
49 socket.linger = 1000
69 socket.setsockopt(zmq.IDENTITY, identity)
50 socket.setsockopt(zmq.IDENTITY, identity)
70 socket.connect(address)
51 socket.connect(address)
71 return socket
52 return socket
72
53
73 class HBChannel(Thread):
54 class HBChannel(Thread):
74 """The heartbeat channel which monitors the kernel heartbeat.
55 """The heartbeat channel which monitors the kernel heartbeat.
75
56
76 Note that the heartbeat channel is paused by default. As long as you start
57 Note that the heartbeat channel is paused by default. As long as you start
77 this channel, the kernel manager will ensure that it is paused and un-paused
58 this channel, the kernel manager will ensure that it is paused and un-paused
78 as appropriate.
59 as appropriate.
79 """
60 """
80 context = None
61 context = None
81 session = None
62 session = None
82 socket = None
63 socket = None
83 address = None
64 address = None
84 _exiting = False
65 _exiting = False
85
66
86 time_to_dead = 1.
67 time_to_dead = 1.
87 poller = None
68 poller = None
88 _running = None
69 _running = None
89 _pause = None
70 _pause = None
90 _beating = None
71 _beating = None
91
72
92 def __init__(self, context, session, address):
73 def __init__(self, context, session, address):
93 """Create the heartbeat monitor thread.
74 """Create the heartbeat monitor thread.
94
75
95 Parameters
76 Parameters
96 ----------
77 ----------
97 context : :class:`zmq.Context`
78 context : :class:`zmq.Context`
98 The ZMQ context to use.
79 The ZMQ context to use.
99 session : :class:`session.Session`
80 session : :class:`session.Session`
100 The session to use.
81 The session to use.
101 address : zmq url
82 address : zmq url
102 Standard (ip, port) tuple that the kernel is listening on.
83 Standard (ip, port) tuple that the kernel is listening on.
103 """
84 """
104 super(HBChannel, self).__init__()
85 super(HBChannel, self).__init__()
105 self.daemon = True
86 self.daemon = True
106
87
107 self.context = context
88 self.context = context
108 self.session = session
89 self.session = session
109 if isinstance(address, tuple):
90 if isinstance(address, tuple):
110 if address[1] == 0:
91 if address[1] == 0:
111 message = 'The port number for a channel cannot be 0.'
92 message = 'The port number for a channel cannot be 0.'
112 raise InvalidPortNumber(message)
93 raise InvalidPortNumber(message)
113 address = "tcp://%s:%i" % address
94 address = "tcp://%s:%i" % address
114 self.address = address
95 self.address = address
115 atexit.register(self._notice_exit)
96 atexit.register(self._notice_exit)
116
97
117 self._running = False
98 self._running = False
118 self._pause = True
99 self._pause = True
119 self.poller = zmq.Poller()
100 self.poller = zmq.Poller()
120
101
121 def _notice_exit(self):
102 def _notice_exit(self):
122 self._exiting = True
103 self._exiting = True
123
104
124 def _create_socket(self):
105 def _create_socket(self):
125 if self.socket is not None:
106 if self.socket is not None:
126 # close previous socket, before opening a new one
107 # close previous socket, before opening a new one
127 self.poller.unregister(self.socket)
108 self.poller.unregister(self.socket)
128 self.socket.close()
109 self.socket.close()
129 self.socket = self.context.socket(zmq.REQ)
110 self.socket = self.context.socket(zmq.REQ)
130 self.socket.linger = 1000
111 self.socket.linger = 1000
131 self.socket.connect(self.address)
112 self.socket.connect(self.address)
132
113
133 self.poller.register(self.socket, zmq.POLLIN)
114 self.poller.register(self.socket, zmq.POLLIN)
134
115
135 def _poll(self, start_time):
116 def _poll(self, start_time):
136 """poll for heartbeat replies until we reach self.time_to_dead.
117 """poll for heartbeat replies until we reach self.time_to_dead.
137
118
138 Ignores interrupts, and returns the result of poll(), which
119 Ignores interrupts, and returns the result of poll(), which
139 will be an empty list if no messages arrived before the timeout,
120 will be an empty list if no messages arrived before the timeout,
140 or the event tuple if there is a message to receive.
121 or the event tuple if there is a message to receive.
141 """
122 """
142
123
143 until_dead = self.time_to_dead - (time.time() - start_time)
124 until_dead = self.time_to_dead - (time.time() - start_time)
144 # ensure poll at least once
125 # ensure poll at least once
145 until_dead = max(until_dead, 1e-3)
126 until_dead = max(until_dead, 1e-3)
146 events = []
127 events = []
147 while True:
128 while True:
148 try:
129 try:
149 events = self.poller.poll(1000 * until_dead)
130 events = self.poller.poll(1000 * until_dead)
150 except ZMQError as e:
131 except ZMQError as e:
151 if e.errno == errno.EINTR:
132 if e.errno == errno.EINTR:
152 # ignore interrupts during heartbeat
133 # ignore interrupts during heartbeat
153 # this may never actually happen
134 # this may never actually happen
154 until_dead = self.time_to_dead - (time.time() - start_time)
135 until_dead = self.time_to_dead - (time.time() - start_time)
155 until_dead = max(until_dead, 1e-3)
136 until_dead = max(until_dead, 1e-3)
156 pass
137 pass
157 else:
138 else:
158 raise
139 raise
159 except Exception:
140 except Exception:
160 if self._exiting:
141 if self._exiting:
161 break
142 break
162 else:
143 else:
163 raise
144 raise
164 else:
145 else:
165 break
146 break
166 return events
147 return events
167
148
168 def run(self):
149 def run(self):
169 """The thread's main activity. Call start() instead."""
150 """The thread's main activity. Call start() instead."""
170 self._create_socket()
151 self._create_socket()
171 self._running = True
152 self._running = True
172 self._beating = True
153 self._beating = True
173
154
174 while self._running:
155 while self._running:
175 if self._pause:
156 if self._pause:
176 # just sleep, and skip the rest of the loop
157 # just sleep, and skip the rest of the loop
177 time.sleep(self.time_to_dead)
158 time.sleep(self.time_to_dead)
178 continue
159 continue
179
160
180 since_last_heartbeat = 0.0
161 since_last_heartbeat = 0.0
181 # io.rprint('Ping from HB channel') # dbg
162 # io.rprint('Ping from HB channel') # dbg
182 # no need to catch EFSM here, because the previous event was
163 # no need to catch EFSM here, because the previous event was
183 # either a recv or connect, which cannot be followed by EFSM
164 # either a recv or connect, which cannot be followed by EFSM
184 self.socket.send(b'ping')
165 self.socket.send(b'ping')
185 request_time = time.time()
166 request_time = time.time()
186 ready = self._poll(request_time)
167 ready = self._poll(request_time)
187 if ready:
168 if ready:
188 self._beating = True
169 self._beating = True
189 # the poll above guarantees we have something to recv
170 # the poll above guarantees we have something to recv
190 self.socket.recv()
171 self.socket.recv()
191 # sleep the remainder of the cycle
172 # sleep the remainder of the cycle
192 remainder = self.time_to_dead - (time.time() - request_time)
173 remainder = self.time_to_dead - (time.time() - request_time)
193 if remainder > 0:
174 if remainder > 0:
194 time.sleep(remainder)
175 time.sleep(remainder)
195 continue
176 continue
196 else:
177 else:
197 # nothing was received within the time limit, signal heart failure
178 # nothing was received within the time limit, signal heart failure
198 self._beating = False
179 self._beating = False
199 since_last_heartbeat = time.time() - request_time
180 since_last_heartbeat = time.time() - request_time
200 self.call_handlers(since_last_heartbeat)
181 self.call_handlers(since_last_heartbeat)
201 # and close/reopen the socket, because the REQ/REP cycle has been broken
182 # and close/reopen the socket, because the REQ/REP cycle has been broken
202 self._create_socket()
183 self._create_socket()
203 continue
184 continue
204
185
205 def pause(self):
186 def pause(self):
206 """Pause the heartbeat."""
187 """Pause the heartbeat."""
207 self._pause = True
188 self._pause = True
208
189
209 def unpause(self):
190 def unpause(self):
210 """Unpause the heartbeat."""
191 """Unpause the heartbeat."""
211 self._pause = False
192 self._pause = False
212
193
213 def is_beating(self):
194 def is_beating(self):
214 """Is the heartbeat running and responsive (and not paused)."""
195 """Is the heartbeat running and responsive (and not paused)."""
215 if self.is_alive() and not self._pause and self._beating:
196 if self.is_alive() and not self._pause and self._beating:
216 return True
197 return True
217 else:
198 else:
218 return False
199 return False
219
200
220 def stop(self):
201 def stop(self):
221 """Stop the channel's event loop and join its thread."""
202 """Stop the channel's event loop and join its thread."""
222 self._running = False
203 self._running = False
223 self.join()
204 self.join()
224 self.close()
205 self.close()
225
206
226 def close(self):
207 def close(self):
227 if self.socket is not None:
208 if self.socket is not None:
228 try:
209 try:
229 self.socket.close(linger=0)
210 self.socket.close(linger=0)
230 except Exception:
211 except Exception:
231 pass
212 pass
232 self.socket = None
213 self.socket = None
233
214
234 def call_handlers(self, since_last_heartbeat):
215 def call_handlers(self, since_last_heartbeat):
235 """This method is called in the ioloop thread when a message arrives.
216 """This method is called in the ioloop thread when a message arrives.
236
217
237 Subclasses should override this method to handle incoming messages.
218 Subclasses should override this method to handle incoming messages.
238 It is important to remember that this method is called in the thread
219 It is important to remember that this method is called in the thread
239 so that some logic must be done to ensure that the application level
220 so that some logic must be done to ensure that the application level
240 handlers are called in the application thread.
221 handlers are called in the application thread.
241 """
222 """
242 pass
223 pass
243
224
244
225
245 HBChannelABC.register(HBChannel)
226 HBChannelABC.register(HBChannel)
@@ -1,375 +1,389
1 """Base class to manage the interaction with a running kernel"""
1 """Base class to manage the interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
7 from IPython.kernel.channels import major_protocol_version
8 from IPython.utils.py3compat import string_types
8 from IPython.utils.py3compat import string_types, iteritems
9
9
10 import zmq
10 import zmq
11
11
12 from IPython.utils.traitlets import (
12 from IPython.utils.traitlets import (
13 Any, Instance, Type,
13 Any, Instance, Type,
14 )
14 )
15
15
16 from .channelsabc import (ChannelABC, HBChannelABC)
16 from .channelsabc import (ChannelABC, HBChannelABC)
17 from .channels import (
17 from .channels import (
18 make_shell_socket, make_stdin_socket, make_iopub_socket
18 make_shell_socket, make_stdin_socket, make_iopub_socket
19 )
19 )
20 from .clientabc import KernelClientABC
20 from .clientabc import KernelClientABC
21 from .connect import ConnectionFileMixin
21 from .connect import ConnectionFileMixin
22
22
23
23
24 # some utilities to validate message structure, these might get moved elsewhere
25 # if they prove to have more generic utility
26
27 def validate_string_dict(dct):
28 """Validate that the input is a dict with string keys and values.
29
30 Raises ValueError if not."""
31 for k,v in iteritems(dct):
32 if not isinstance(k, string_types):
33 raise ValueError('key %r in dict must be a string' % k)
34 if not isinstance(v, string_types):
35 raise ValueError('value %r in dict must be a string' % v)
36
37
24 class KernelClient(ConnectionFileMixin):
38 class KernelClient(ConnectionFileMixin):
25 """Communicates with a single kernel on any host via zmq channels.
39 """Communicates with a single kernel on any host via zmq channels.
26
40
27 There are four channels associated with each kernel:
41 There are four channels associated with each kernel:
28
42
29 * shell: for request/reply calls to the kernel.
43 * shell: for request/reply calls to the kernel.
30 * iopub: for the kernel to publish results to frontends.
44 * iopub: for the kernel to publish results to frontends.
31 * hb: for monitoring the kernel's heartbeat.
45 * hb: for monitoring the kernel's heartbeat.
32 * stdin: for frontends to reply to raw_input calls in the kernel.
46 * stdin: for frontends to reply to raw_input calls in the kernel.
33
47
34 The methods of the channels are exposed as methods of the client itself
48 The methods of the channels are exposed as methods of the client itself
35 (KernelClient.execute, complete, history, etc.).
49 (KernelClient.execute, complete, history, etc.).
36 See the channels themselves for documentation of these methods.
50 See the channels themselves for documentation of these methods.
37
51
38 """
52 """
39
53
40 # The PyZMQ Context to use for communication with the kernel.
54 # The PyZMQ Context to use for communication with the kernel.
41 context = Instance(zmq.Context)
55 context = Instance(zmq.Context)
42 def _context_default(self):
56 def _context_default(self):
43 return zmq.Context.instance()
57 return zmq.Context.instance()
44
58
45 # The classes to use for the various channels
59 # The classes to use for the various channels
46 shell_channel_class = Type(ChannelABC)
60 shell_channel_class = Type(ChannelABC)
47 iopub_channel_class = Type(ChannelABC)
61 iopub_channel_class = Type(ChannelABC)
48 stdin_channel_class = Type(ChannelABC)
62 stdin_channel_class = Type(ChannelABC)
49 hb_channel_class = Type(HBChannelABC)
63 hb_channel_class = Type(HBChannelABC)
50
64
51 # Protected traits
65 # Protected traits
52 _shell_channel = Any
66 _shell_channel = Any
53 _iopub_channel = Any
67 _iopub_channel = Any
54 _stdin_channel = Any
68 _stdin_channel = Any
55 _hb_channel = Any
69 _hb_channel = Any
56
70
57 # flag for whether execute requests should be allowed to call raw_input:
71 # flag for whether execute requests should be allowed to call raw_input:
58 allow_stdin = True
72 allow_stdin = True
59
73
60 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
61 # Channel proxy methods
75 # Channel proxy methods
62 #--------------------------------------------------------------------------
76 #--------------------------------------------------------------------------
63
77
64 def _get_msg(channel, *args, **kwargs):
78 def _get_msg(channel, *args, **kwargs):
65 return channel.get_msg(*args, **kwargs)
79 return channel.get_msg(*args, **kwargs)
66
80
67 def get_shell_msg(self, *args, **kwargs):
81 def get_shell_msg(self, *args, **kwargs):
68 """Get a message from the shell channel"""
82 """Get a message from the shell channel"""
69 return self.shell_channel.get_msg(*args, **kwargs)
83 return self.shell_channel.get_msg(*args, **kwargs)
70
84
71 def get_iopub_msg(self, *args, **kwargs):
85 def get_iopub_msg(self, *args, **kwargs):
72 """Get a message from the iopub channel"""
86 """Get a message from the iopub channel"""
73 return self.iopub_channel.get_msg(*args, **kwargs)
87 return self.iopub_channel.get_msg(*args, **kwargs)
74
88
75 def get_stdin_msg(self, *args, **kwargs):
89 def get_stdin_msg(self, *args, **kwargs):
76 """Get a message from the stdin channel"""
90 """Get a message from the stdin channel"""
77 return self.stdin_channel.get_msg(*args, **kwargs)
91 return self.stdin_channel.get_msg(*args, **kwargs)
78
92
79 #--------------------------------------------------------------------------
93 #--------------------------------------------------------------------------
80 # Channel management methods
94 # Channel management methods
81 #--------------------------------------------------------------------------
95 #--------------------------------------------------------------------------
82
96
83 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
97 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
84 """Starts the channels for this kernel.
98 """Starts the channels for this kernel.
85
99
86 This will create the channels if they do not exist and then start
100 This will create the channels if they do not exist and then start
87 them (their activity runs in a thread). If port numbers of 0 are
101 them (their activity runs in a thread). If port numbers of 0 are
88 being used (random ports) then you must first call
102 being used (random ports) then you must first call
89 :meth:`start_kernel`. If the channels have been stopped and you
103 :meth:`start_kernel`. If the channels have been stopped and you
90 call this, :class:`RuntimeError` will be raised.
104 call this, :class:`RuntimeError` will be raised.
91 """
105 """
92 if shell:
106 if shell:
93 self.shell_channel.start()
107 self.shell_channel.start()
94 self.kernel_info()
108 self.kernel_info()
95 if iopub:
109 if iopub:
96 self.iopub_channel.start()
110 self.iopub_channel.start()
97 if stdin:
111 if stdin:
98 self.stdin_channel.start()
112 self.stdin_channel.start()
99 self.allow_stdin = True
113 self.allow_stdin = True
100 else:
114 else:
101 self.allow_stdin = False
115 self.allow_stdin = False
102 if hb:
116 if hb:
103 self.hb_channel.start()
117 self.hb_channel.start()
104
118
105 def stop_channels(self):
119 def stop_channels(self):
106 """Stops all the running channels for this kernel.
120 """Stops all the running channels for this kernel.
107
121
108 This stops their event loops and joins their threads.
122 This stops their event loops and joins their threads.
109 """
123 """
110 if self.shell_channel.is_alive():
124 if self.shell_channel.is_alive():
111 self.shell_channel.stop()
125 self.shell_channel.stop()
112 if self.iopub_channel.is_alive():
126 if self.iopub_channel.is_alive():
113 self.iopub_channel.stop()
127 self.iopub_channel.stop()
114 if self.stdin_channel.is_alive():
128 if self.stdin_channel.is_alive():
115 self.stdin_channel.stop()
129 self.stdin_channel.stop()
116 if self.hb_channel.is_alive():
130 if self.hb_channel.is_alive():
117 self.hb_channel.stop()
131 self.hb_channel.stop()
118
132
119 @property
133 @property
120 def channels_running(self):
134 def channels_running(self):
121 """Are any of the channels created and running?"""
135 """Are any of the channels created and running?"""
122 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
136 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
123 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
137 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
124
138
125 ioloop = None # Overridden in subclasses that use pyzmq event loop
139 ioloop = None # Overridden in subclasses that use pyzmq event loop
126
140
127 @property
141 @property
128 def shell_channel(self):
142 def shell_channel(self):
129 """Get the shell channel object for this kernel."""
143 """Get the shell channel object for this kernel."""
130 if self._shell_channel is None:
144 if self._shell_channel is None:
131 url = self._make_url('shell')
145 url = self._make_url('shell')
132 self.log.debug("connecting shell channel to %s", url)
146 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
147 socket = make_shell_socket(self.context, self.session.bsession, url)
134 self._shell_channel = self.shell_channel_class(
148 self._shell_channel = self.shell_channel_class(
135 socket, self.session, self.ioloop
149 socket, self.session, self.ioloop
136 )
150 )
137 return self._shell_channel
151 return self._shell_channel
138
152
139 @property
153 @property
140 def iopub_channel(self):
154 def iopub_channel(self):
141 """Get the iopub channel object for this kernel."""
155 """Get the iopub channel object for this kernel."""
142 if self._iopub_channel is None:
156 if self._iopub_channel is None:
143 url = self._make_url('iopub')
157 url = self._make_url('iopub')
144 self.log.debug("connecting iopub channel to %s", url)
158 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
159 socket = make_iopub_socket(self.context, self.session.bsession, url)
146 self._iopub_channel = self.iopub_channel_class(
160 self._iopub_channel = self.iopub_channel_class(
147 socket, self.session, self.ioloop
161 socket, self.session, self.ioloop
148 )
162 )
149 return self._iopub_channel
163 return self._iopub_channel
150
164
151 @property
165 @property
152 def stdin_channel(self):
166 def stdin_channel(self):
153 """Get the stdin channel object for this kernel."""
167 """Get the stdin channel object for this kernel."""
154 if self._stdin_channel is None:
168 if self._stdin_channel is None:
155 url = self._make_url('stdin')
169 url = self._make_url('stdin')
156 self.log.debug("connecting stdin channel to %s", url)
170 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
171 socket = make_stdin_socket(self.context, self.session.bsession, url)
158 self._stdin_channel = self.stdin_channel_class(
172 self._stdin_channel = self.stdin_channel_class(
159 socket, self.session, self.ioloop
173 socket, self.session, self.ioloop
160 )
174 )
161 return self._stdin_channel
175 return self._stdin_channel
162
176
163 @property
177 @property
164 def hb_channel(self):
178 def hb_channel(self):
165 """Get the hb channel object for this kernel."""
179 """Get the hb channel object for this kernel."""
166 if self._hb_channel is None:
180 if self._hb_channel is None:
167 url = self._make_url('hb')
181 url = self._make_url('hb')
168 self.log.debug("connecting heartbeat channel to %s", url)
182 self.log.debug("connecting heartbeat channel to %s", url)
169 self._hb_channel = self.hb_channel_class(
183 self._hb_channel = self.hb_channel_class(
170 self.context, self.session, url
184 self.context, self.session, url
171 )
185 )
172 return self._hb_channel
186 return self._hb_channel
173
187
174 def is_alive(self):
188 def is_alive(self):
175 """Is the kernel process still running?"""
189 """Is the kernel process still running?"""
176 if self._hb_channel is not None:
190 if self._hb_channel is not None:
177 # We didn't start the kernel with this KernelManager so we
191 # We didn't start the kernel with this KernelManager so we
178 # use the heartbeat.
192 # use the heartbeat.
179 return self._hb_channel.is_beating()
193 return self._hb_channel.is_beating()
180 else:
194 else:
181 # no heartbeat and not local, we can't tell if it's running,
195 # no heartbeat and not local, we can't tell if it's running,
182 # so naively return True
196 # so naively return True
183 return True
197 return True
184
198
185
199
186 # Methods to send specific messages on channels
200 # Methods to send specific messages on channels
187 def execute(self, code, silent=False, store_history=True,
201 def execute(self, code, silent=False, store_history=True,
188 user_expressions=None, allow_stdin=None):
202 user_expressions=None, allow_stdin=None):
189 """Execute code in the kernel.
203 """Execute code in the kernel.
190
204
191 Parameters
205 Parameters
192 ----------
206 ----------
193 code : str
207 code : str
194 A string of Python code.
208 A string of Python code.
195
209
196 silent : bool, optional (default False)
210 silent : bool, optional (default False)
197 If set, the kernel will execute the code as quietly possible, and
211 If set, the kernel will execute the code as quietly possible, and
198 will force store_history to be False.
212 will force store_history to be False.
199
213
200 store_history : bool, optional (default True)
214 store_history : bool, optional (default True)
201 If set, the kernel will store command history. This is forced
215 If set, the kernel will store command history. This is forced
202 to be False if silent is True.
216 to be False if silent is True.
203
217
204 user_expressions : dict, optional
218 user_expressions : dict, optional
205 A dict mapping names to expressions to be evaluated in the user's
219 A dict mapping names to expressions to be evaluated in the user's
206 dict. The expression values are returned as strings formatted using
220 dict. The expression values are returned as strings formatted using
207 :func:`repr`.
221 :func:`repr`.
208
222
209 allow_stdin : bool, optional (default self.allow_stdin)
223 allow_stdin : bool, optional (default self.allow_stdin)
210 Flag for whether the kernel can send stdin requests to frontends.
224 Flag for whether the kernel can send stdin requests to frontends.
211
225
212 Some frontends (e.g. the Notebook) do not support stdin requests.
226 Some frontends (e.g. the Notebook) do not support stdin requests.
213 If raw_input is called from code executed from such a frontend, a
227 If raw_input is called from code executed from such a frontend, a
214 StdinNotImplementedError will be raised.
228 StdinNotImplementedError will be raised.
215
229
216 Returns
230 Returns
217 -------
231 -------
218 The msg_id of the message sent.
232 The msg_id of the message sent.
219 """
233 """
220 if user_expressions is None:
234 if user_expressions is None:
221 user_expressions = {}
235 user_expressions = {}
222 if allow_stdin is None:
236 if allow_stdin is None:
223 allow_stdin = self.allow_stdin
237 allow_stdin = self.allow_stdin
224
238
225
239
226 # Don't waste network traffic if inputs are invalid
240 # Don't waste network traffic if inputs are invalid
227 if not isinstance(code, string_types):
241 if not isinstance(code, string_types):
228 raise ValueError('code %r must be a string' % code)
242 raise ValueError('code %r must be a string' % code)
229 validate_string_dict(user_expressions)
243 validate_string_dict(user_expressions)
230
244
231 # Create class for content/msg creation. Related to, but possibly
245 # Create class for content/msg creation. Related to, but possibly
232 # not in Session.
246 # not in Session.
233 content = dict(code=code, silent=silent, store_history=store_history,
247 content = dict(code=code, silent=silent, store_history=store_history,
234 user_expressions=user_expressions,
248 user_expressions=user_expressions,
235 allow_stdin=allow_stdin,
249 allow_stdin=allow_stdin,
236 )
250 )
237 msg = self.session.msg('execute_request', content)
251 msg = self.session.msg('execute_request', content)
238 self.shell_channel._queue_send(msg)
252 self.shell_channel._queue_send(msg)
239 return msg['header']['msg_id']
253 return msg['header']['msg_id']
240
254
241 def complete(self, code, cursor_pos=None):
255 def complete(self, code, cursor_pos=None):
242 """Tab complete text in the kernel's namespace.
256 """Tab complete text in the kernel's namespace.
243
257
244 Parameters
258 Parameters
245 ----------
259 ----------
246 code : str
260 code : str
247 The context in which completion is requested.
261 The context in which completion is requested.
248 Can be anything between a variable name and an entire cell.
262 Can be anything between a variable name and an entire cell.
249 cursor_pos : int, optional
263 cursor_pos : int, optional
250 The position of the cursor in the block of code where the completion was requested.
264 The position of the cursor in the block of code where the completion was requested.
251 Default: ``len(code)``
265 Default: ``len(code)``
252
266
253 Returns
267 Returns
254 -------
268 -------
255 The msg_id of the message sent.
269 The msg_id of the message sent.
256 """
270 """
257 if cursor_pos is None:
271 if cursor_pos is None:
258 cursor_pos = len(code)
272 cursor_pos = len(code)
259 content = dict(code=code, cursor_pos=cursor_pos)
273 content = dict(code=code, cursor_pos=cursor_pos)
260 msg = self.session.msg('complete_request', content)
274 msg = self.session.msg('complete_request', content)
261 self.shell_channel._queue_send(msg)
275 self.shell_channel._queue_send(msg)
262 return msg['header']['msg_id']
276 return msg['header']['msg_id']
263
277
264 def inspect(self, code, cursor_pos=None, detail_level=0):
278 def inspect(self, code, cursor_pos=None, detail_level=0):
265 """Get metadata information about an object in the kernel's namespace.
279 """Get metadata information about an object in the kernel's namespace.
266
280
267 It is up to the kernel to determine the appropriate object to inspect.
281 It is up to the kernel to determine the appropriate object to inspect.
268
282
269 Parameters
283 Parameters
270 ----------
284 ----------
271 code : str
285 code : str
272 The context in which info is requested.
286 The context in which info is requested.
273 Can be anything between a variable name and an entire cell.
287 Can be anything between a variable name and an entire cell.
274 cursor_pos : int, optional
288 cursor_pos : int, optional
275 The position of the cursor in the block of code where the info was requested.
289 The position of the cursor in the block of code where the info was requested.
276 Default: ``len(code)``
290 Default: ``len(code)``
277 detail_level : int, optional
291 detail_level : int, optional
278 The level of detail for the introspection (0-2)
292 The level of detail for the introspection (0-2)
279
293
280 Returns
294 Returns
281 -------
295 -------
282 The msg_id of the message sent.
296 The msg_id of the message sent.
283 """
297 """
284 if cursor_pos is None:
298 if cursor_pos is None:
285 cursor_pos = len(code)
299 cursor_pos = len(code)
286 content = dict(code=code, cursor_pos=cursor_pos,
300 content = dict(code=code, cursor_pos=cursor_pos,
287 detail_level=detail_level,
301 detail_level=detail_level,
288 )
302 )
289 msg = self.session.msg('inspect_request', content)
303 msg = self.session.msg('inspect_request', content)
290 self.shell_channel._queue_send(msg)
304 self.shell_channel._queue_send(msg)
291 return msg['header']['msg_id']
305 return msg['header']['msg_id']
292
306
293 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
307 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
294 """Get entries from the kernel's history list.
308 """Get entries from the kernel's history list.
295
309
296 Parameters
310 Parameters
297 ----------
311 ----------
298 raw : bool
312 raw : bool
299 If True, return the raw input.
313 If True, return the raw input.
300 output : bool
314 output : bool
301 If True, then return the output as well.
315 If True, then return the output as well.
302 hist_access_type : str
316 hist_access_type : str
303 'range' (fill in session, start and stop params), 'tail' (fill in n)
317 'range' (fill in session, start and stop params), 'tail' (fill in n)
304 or 'search' (fill in pattern param).
318 or 'search' (fill in pattern param).
305
319
306 session : int
320 session : int
307 For a range request, the session from which to get lines. Session
321 For a range request, the session from which to get lines. Session
308 numbers are positive integers; negative ones count back from the
322 numbers are positive integers; negative ones count back from the
309 current session.
323 current session.
310 start : int
324 start : int
311 The first line number of a history range.
325 The first line number of a history range.
312 stop : int
326 stop : int
313 The final (excluded) line number of a history range.
327 The final (excluded) line number of a history range.
314
328
315 n : int
329 n : int
316 The number of lines of history to get for a tail request.
330 The number of lines of history to get for a tail request.
317
331
318 pattern : str
332 pattern : str
319 The glob-syntax pattern for a search request.
333 The glob-syntax pattern for a search request.
320
334
321 Returns
335 Returns
322 -------
336 -------
323 The msg_id of the message sent.
337 The msg_id of the message sent.
324 """
338 """
325 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
339 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
326 **kwargs)
340 **kwargs)
327 msg = self.session.msg('history_request', content)
341 msg = self.session.msg('history_request', content)
328 self.shell_channel._queue_send(msg)
342 self.shell_channel._queue_send(msg)
329 return msg['header']['msg_id']
343 return msg['header']['msg_id']
330
344
331 def kernel_info(self):
345 def kernel_info(self):
332 """Request kernel info."""
346 """Request kernel info."""
333 msg = self.session.msg('kernel_info_request')
347 msg = self.session.msg('kernel_info_request')
334 self.shell_channel._queue_send(msg)
348 self.shell_channel._queue_send(msg)
335 return msg['header']['msg_id']
349 return msg['header']['msg_id']
336
350
337 def _handle_kernel_info_reply(self, msg):
351 def _handle_kernel_info_reply(self, msg):
338 """handle kernel info reply
352 """handle kernel info reply
339
353
340 sets protocol adaptation version
354 sets protocol adaptation version
341 """
355 """
342 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
356 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
343 if adapt_version != major_protocol_version:
357 if adapt_version != major_protocol_version:
344 self.session.adapt_version = adapt_version
358 self.session.adapt_version = adapt_version
345
359
346 def shutdown(self, restart=False):
360 def shutdown(self, restart=False):
347 """Request an immediate kernel shutdown.
361 """Request an immediate kernel shutdown.
348
362
349 Upon receipt of the (empty) reply, client code can safely assume that
363 Upon receipt of the (empty) reply, client code can safely assume that
350 the kernel has shut down and it's safe to forcefully terminate it if
364 the kernel has shut down and it's safe to forcefully terminate it if
351 it's still alive.
365 it's still alive.
352
366
353 The kernel will send the reply via a function registered with Python's
367 The kernel will send the reply via a function registered with Python's
354 atexit module, ensuring it's truly done as the kernel is done with all
368 atexit module, ensuring it's truly done as the kernel is done with all
355 normal operation.
369 normal operation.
356 """
370 """
357 # Send quit message to kernel. Once we implement kernel-side setattr,
371 # Send quit message to kernel. Once we implement kernel-side setattr,
358 # this should probably be done that way, but for now this will do.
372 # this should probably be done that way, but for now this will do.
359 msg = self.session.msg('shutdown_request', {'restart':restart})
373 msg = self.session.msg('shutdown_request', {'restart':restart})
360 self.shell_channel._queue_send(msg)
374 self.shell_channel._queue_send(msg)
361 return msg['header']['msg_id']
375 return msg['header']['msg_id']
362
376
363 def is_complete(self, code):
377 def is_complete(self, code):
364 msg = self.session.msg('is_complete_request', {'code': code})
378 msg = self.session.msg('is_complete_request', {'code': code})
365 self.shell_channel._queue_send(msg)
379 self.shell_channel._queue_send(msg)
366 return msg['header']['msg_id']
380 return msg['header']['msg_id']
367
381
368 def input(self, string):
382 def input(self, string):
369 """Send a string of raw input to the kernel."""
383 """Send a string of raw input to the kernel."""
370 content = dict(value=string)
384 content = dict(value=string)
371 msg = self.session.msg('input_reply', content)
385 msg = self.session.msg('input_reply', content)
372 self.stdin_channel._queue_send(msg)
386 self.stdin_channel._queue_send(msg)
373
387
374
388
375 KernelClientABC.register(KernelClient)
389 KernelClientABC.register(KernelClient)
@@ -1,267 +1,250
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import QtKernelClientMixin
22 from .kernel_mixins import QtKernelClientMixin
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(SuperQObject, HBChannel):
25 class QtHBChannel(SuperQObject, HBChannel):
26 # A longer timeout than the base class
26 # A longer timeout than the base class
27 time_to_dead = 3.0
27 time_to_dead = 3.0
28
28
29 # Emitted when the kernel has died.
29 # Emitted when the kernel has died.
30 kernel_died = QtCore.Signal(object)
30 kernel_died = QtCore.Signal(object)
31
31
32 def call_handlers(self, since_last_heartbeat):
32 def call_handlers(self, since_last_heartbeat):
33 """ Reimplemented to emit signals instead of making callbacks.
33 """ Reimplemented to emit signals instead of making callbacks.
34 """
34 """
35 # Emit the generic signal.
35 # Emit the generic signal.
36 self.kernel_died.emit(since_last_heartbeat)
36 self.kernel_died.emit(since_last_heartbeat)
37
37
38 from IPython.core.release import kernel_protocol_version_info
38 from IPython.core.release import kernel_protocol_version_info
39
39
40 from IPython.utils.py3compat import string_types, iteritems
41
42 major_protocol_version = kernel_protocol_version_info[0]
40 major_protocol_version = kernel_protocol_version_info[0]
43
41
44 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
45 pass
43 pass
46
44
47 # some utilities to validate message structure, these might get moved elsewhere
48 # if they prove to have more generic utility
49
50
51 def validate_string_dict(dct):
52 """Validate that the input is a dict with string keys and values.
53
54 Raises ValueError if not."""
55 for k,v in iteritems(dct):
56 if not isinstance(k, string_types):
57 raise ValueError('key %r in dict must be a string' % k)
58 if not isinstance(v, string_types):
59 raise ValueError('value %r in dict must be a string' % v)
60
61
62
45
63 class QtZMQSocketChannel(SuperQObject):
46 class QtZMQSocketChannel(SuperQObject):
64 """A ZMQ socket emitting a Qt signal when a message is received."""
47 """A ZMQ socket emitting a Qt signal when a message is received."""
65 session = None
48 session = None
66 socket = None
49 socket = None
67 ioloop = None
50 ioloop = None
68 stream = None
51 stream = None
69
52
70 message_received = QtCore.Signal(object)
53 message_received = QtCore.Signal(object)
71
54
72 def process_events(self):
55 def process_events(self):
73 """ Process any pending GUI events.
56 """ Process any pending GUI events.
74 """
57 """
75 QtCore.QCoreApplication.instance().processEvents()
58 QtCore.QCoreApplication.instance().processEvents()
76
59
77 def __init__(self, socket, session, loop):
60 def __init__(self, socket, session, loop):
78 """Create a channel.
61 """Create a channel.
79
62
80 Parameters
63 Parameters
81 ----------
64 ----------
82 socket : :class:`zmq.Socket`
65 socket : :class:`zmq.Socket`
83 The ZMQ socket to use.
66 The ZMQ socket to use.
84 session : :class:`session.Session`
67 session : :class:`session.Session`
85 The session to use.
68 The session to use.
86 loop
69 loop
87 A pyzmq ioloop to connect the socket to using a ZMQStream
70 A pyzmq ioloop to connect the socket to using a ZMQStream
88 """
71 """
89 super(QtZMQSocketChannel, self).__init__()
72 super(QtZMQSocketChannel, self).__init__()
90
73
91 self.socket = socket
74 self.socket = socket
92 self.session = session
75 self.session = session
93 self.ioloop = loop
76 self.ioloop = loop
94
77
95 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
78 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
96 self.stream.on_recv(self._handle_recv)
79 self.stream.on_recv(self._handle_recv)
97
80
98 _is_alive = False
81 _is_alive = False
99 def is_alive(self):
82 def is_alive(self):
100 return self._is_alive
83 return self._is_alive
101
84
102 def start(self):
85 def start(self):
103 self._is_alive = True
86 self._is_alive = True
104
87
105 def stop(self):
88 def stop(self):
106 self._is_alive = False
89 self._is_alive = False
107
90
108 def close(self):
91 def close(self):
109 if self.socket is not None:
92 if self.socket is not None:
110 try:
93 try:
111 self.socket.close(linger=0)
94 self.socket.close(linger=0)
112 except Exception:
95 except Exception:
113 pass
96 pass
114 self.socket = None
97 self.socket = None
115
98
116 def _queue_send(self, msg):
99 def _queue_send(self, msg):
117 """Queue a message to be sent from the IOLoop's thread.
100 """Queue a message to be sent from the IOLoop's thread.
118
101
119 Parameters
102 Parameters
120 ----------
103 ----------
121 msg : message to send
104 msg : message to send
122
105
123 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
106 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
124 thread control of the action.
107 thread control of the action.
125 """
108 """
126 def thread_send():
109 def thread_send():
127 self.session.send(self.stream, msg)
110 self.session.send(self.stream, msg)
128 self.ioloop.add_callback(thread_send)
111 self.ioloop.add_callback(thread_send)
129
112
130 def _handle_recv(self, msg):
113 def _handle_recv(self, msg):
131 """Callback for stream.on_recv.
114 """Callback for stream.on_recv.
132
115
133 Unpacks message, and calls handlers with it.
116 Unpacks message, and calls handlers with it.
134 """
117 """
135 ident,smsg = self.session.feed_identities(msg)
118 ident,smsg = self.session.feed_identities(msg)
136 msg = self.session.deserialize(smsg)
119 msg = self.session.deserialize(smsg)
137 self.call_handlers(msg)
120 self.call_handlers(msg)
138
121
139 def call_handlers(self, msg):
122 def call_handlers(self, msg):
140 """This method is called in the ioloop thread when a message arrives.
123 """This method is called in the ioloop thread when a message arrives.
141
124
142 Subclasses should override this method to handle incoming messages.
125 Subclasses should override this method to handle incoming messages.
143 It is important to remember that this method is called in the thread
126 It is important to remember that this method is called in the thread
144 so that some logic must be done to ensure that the application level
127 so that some logic must be done to ensure that the application level
145 handlers are called in the application thread.
128 handlers are called in the application thread.
146 """
129 """
147 # Emit the generic signal.
130 # Emit the generic signal.
148 self.message_received.emit(msg)
131 self.message_received.emit(msg)
149
132
150 def flush(self, timeout=1.0):
133 def flush(self, timeout=1.0):
151 """Immediately processes all pending messages on this channel.
134 """Immediately processes all pending messages on this channel.
152
135
153 This is only used for the IOPub channel.
136 This is only used for the IOPub channel.
154
137
155 Callers should use this method to ensure that :meth:`call_handlers`
138 Callers should use this method to ensure that :meth:`call_handlers`
156 has been called for all messages that have been received on the
139 has been called for all messages that have been received on the
157 0MQ SUB socket of this channel.
140 0MQ SUB socket of this channel.
158
141
159 This method is thread safe.
142 This method is thread safe.
160
143
161 Parameters
144 Parameters
162 ----------
145 ----------
163 timeout : float, optional
146 timeout : float, optional
164 The maximum amount of time to spend flushing, in seconds. The
147 The maximum amount of time to spend flushing, in seconds. The
165 default is one second.
148 default is one second.
166 """
149 """
167 # We do the IOLoop callback process twice to ensure that the IOLoop
150 # We do the IOLoop callback process twice to ensure that the IOLoop
168 # gets to perform at least one full poll.
151 # gets to perform at least one full poll.
169 stop_time = time.time() + timeout
152 stop_time = time.time() + timeout
170 for i in range(2):
153 for i in range(2):
171 self._flushed = False
154 self._flushed = False
172 self.ioloop.add_callback(self._flush)
155 self.ioloop.add_callback(self._flush)
173 while not self._flushed and time.time() < stop_time:
156 while not self._flushed and time.time() < stop_time:
174 time.sleep(0.01)
157 time.sleep(0.01)
175
158
176 def _flush(self):
159 def _flush(self):
177 """Callback for :method:`self.flush`."""
160 """Callback for :method:`self.flush`."""
178 self.stream.flush()
161 self.stream.flush()
179 self._flushed = True
162 self._flushed = True
180
163
181
164
182 class IOLoopThread(Thread):
165 class IOLoopThread(Thread):
183 """Run a pyzmq ioloop in a thread to send and receive messages
166 """Run a pyzmq ioloop in a thread to send and receive messages
184 """
167 """
185 def __init__(self, loop):
168 def __init__(self, loop):
186 super(IOLoopThread, self).__init__()
169 super(IOLoopThread, self).__init__()
187 self.daemon = True
170 self.daemon = True
188 atexit.register(self._notice_exit)
171 atexit.register(self._notice_exit)
189 self.ioloop = loop or ioloop.IOLoop()
172 self.ioloop = loop or ioloop.IOLoop()
190
173
191 def _notice_exit(self):
174 def _notice_exit(self):
192 self._exiting = True
175 self._exiting = True
193
176
194 def run(self):
177 def run(self):
195 """Run my loop, ignoring EINTR events in the poller"""
178 """Run my loop, ignoring EINTR events in the poller"""
196 while True:
179 while True:
197 try:
180 try:
198 self.ioloop.start()
181 self.ioloop.start()
199 except ZMQError as e:
182 except ZMQError as e:
200 if e.errno == errno.EINTR:
183 if e.errno == errno.EINTR:
201 continue
184 continue
202 else:
185 else:
203 raise
186 raise
204 except Exception:
187 except Exception:
205 if self._exiting:
188 if self._exiting:
206 break
189 break
207 else:
190 else:
208 raise
191 raise
209 else:
192 else:
210 break
193 break
211
194
212 def stop(self):
195 def stop(self):
213 """Stop the channel's event loop and join its thread.
196 """Stop the channel's event loop and join its thread.
214
197
215 This calls :meth:`~threading.Thread.join` and returns when the thread
198 This calls :meth:`~threading.Thread.join` and returns when the thread
216 terminates. :class:`RuntimeError` will be raised if
199 terminates. :class:`RuntimeError` will be raised if
217 :meth:`~threading.Thread.start` is called again.
200 :meth:`~threading.Thread.start` is called again.
218 """
201 """
219 if self.ioloop is not None:
202 if self.ioloop is not None:
220 self.ioloop.stop()
203 self.ioloop.stop()
221 self.join()
204 self.join()
222 self.close()
205 self.close()
223
206
224 def close(self):
207 def close(self):
225 if self.ioloop is not None:
208 if self.ioloop is not None:
226 try:
209 try:
227 self.ioloop.close(all_fds=True)
210 self.ioloop.close(all_fds=True)
228 except Exception:
211 except Exception:
229 pass
212 pass
230
213
231
214
232 class QtKernelClient(QtKernelClientMixin, KernelClient):
215 class QtKernelClient(QtKernelClientMixin, KernelClient):
233 """ A KernelClient that provides signals and slots.
216 """ A KernelClient that provides signals and slots.
234 """
217 """
235
218
236 _ioloop = None
219 _ioloop = None
237 @property
220 @property
238 def ioloop(self):
221 def ioloop(self):
239 if self._ioloop is None:
222 if self._ioloop is None:
240 self._ioloop = ioloop.IOLoop()
223 self._ioloop = ioloop.IOLoop()
241 return self._ioloop
224 return self._ioloop
242
225
243 ioloop_thread = Instance(IOLoopThread)
226 ioloop_thread = Instance(IOLoopThread)
244
227
245 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
228 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
246 if shell:
229 if shell:
247 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
230 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
248
231
249 self.ioloop_thread = IOLoopThread(self.ioloop)
232 self.ioloop_thread = IOLoopThread(self.ioloop)
250 self.ioloop_thread.start()
233 self.ioloop_thread.start()
251
234
252 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
235 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
253
236
254 def _check_kernel_info_reply(self, msg):
237 def _check_kernel_info_reply(self, msg):
255 if msg['msg_type'] == 'kernel_info_reply':
238 if msg['msg_type'] == 'kernel_info_reply':
256 self._handle_kernel_info_reply(msg)
239 self._handle_kernel_info_reply(msg)
257 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
240 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
258
241
259 def stop_channels(self):
242 def stop_channels(self):
260 super(QtKernelClient, self).stop_channels()
243 super(QtKernelClient, self).stop_channels()
261 if self.ioloop_thread.is_alive():
244 if self.ioloop_thread.is_alive():
262 self.ioloop_thread.stop()
245 self.ioloop_thread.stop()
263
246
264 iopub_channel_class = Type(QtZMQSocketChannel)
247 iopub_channel_class = Type(QtZMQSocketChannel)
265 shell_channel_class = Type(QtZMQSocketChannel)
248 shell_channel_class = Type(QtZMQSocketChannel)
266 stdin_channel_class = Type(QtZMQSocketChannel)
249 stdin_channel_class = Type(QtZMQSocketChannel)
267 hb_channel_class = Type(QtHBChannel)
250 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now