##// END OF EJS Templates
Move ZMQ socket creation out of channels
Thomas Kluyver -
Show More
@@ -54,7 +54,7 b' class ZMQSocketChannel(object):'
54 _exiting = False
54 _exiting = False
55 proxy_methods = []
55 proxy_methods = []
56
56
57 def __init__(self, context, session, address):
57 def __init__(self, socket, session):
58 """Create a channel.
58 """Create a channel.
59
59
60 Parameters
60 Parameters
@@ -69,14 +69,8 b' class ZMQSocketChannel(object):'
69 super(ZMQSocketChannel, self).__init__()
69 super(ZMQSocketChannel, self).__init__()
70 self.daemon = True
70 self.daemon = True
71
71
72 self.context = context
72 self.socket = socket
73 self.session = session
73 self.session = session
74 if isinstance(address, tuple):
75 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
78 address = "tcp://%s:%i" % address
79 self._address = address
80
74
81 def _recv(self, **kwargs):
75 def _recv(self, **kwargs):
82 msg = self.socket.recv_multipart(**kwargs)
76 msg = self.socket.recv_multipart(**kwargs)
@@ -136,6 +130,9 b' class ZMQSocketChannel(object):'
136 """
130 """
137 self.session.send(self.socket, msg)
131 self.session.send(self.socket, msg)
138
132
133 def start(self):
134 pass
135
139
136
140 class BlockingShellChannel(ZMQSocketChannel):
137 class BlockingShellChannel(ZMQSocketChannel):
141 """The shell channel for issuing request/replies to the kernel."""
138 """The shell channel for issuing request/replies to the kernel."""
@@ -12,10 +12,7 b' except ImportError:'
12
12
13 from IPython.utils.traitlets import Type
13 from IPython.utils.traitlets import Type
14 from IPython.kernel.client import KernelClient
14 from IPython.kernel.client import KernelClient
15 from .channels import (
15 from .channels import ZMQSocketChannel, BlockingHBChannel
16 BlockingIOPubChannel, BlockingHBChannel,
17 BlockingShellChannel, BlockingStdInChannel
18 )
19
16
20 class BlockingKernelClient(KernelClient):
17 class BlockingKernelClient(KernelClient):
21 def wait_for_ready(self):
18 def wait_for_ready(self):
@@ -35,7 +32,7 b' class BlockingKernelClient(KernelClient):'
35 break
32 break
36
33
37 # The classes to use for the various channels
34 # The classes to use for the various channels
38 shell_channel_class = Type(BlockingShellChannel)
35 shell_channel_class = Type(ZMQSocketChannel)
39 iopub_channel_class = Type(BlockingIOPubChannel)
36 iopub_channel_class = Type(ZMQSocketChannel)
40 stdin_channel_class = Type(BlockingStdInChannel)
37 stdin_channel_class = Type(ZMQSocketChannel)
41 hb_channel_class = Type(BlockingHBChannel)
38 hb_channel_class = Type(BlockingHBChannel)
@@ -16,6 +16,9 b' from IPython.utils.traitlets import ('
16 from .channelsabc import (
16 from .channelsabc import (
17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 )
18 )
19 from .channels import (
20 make_shell_socket, make_stdin_socket, make_iopub_socket
21 )
19 from .clientabc import KernelClientABC
22 from .clientabc import KernelClientABC
20 from .connect import ConnectionFileMixin
23 from .connect import ConnectionFileMixin
21
24
@@ -127,8 +130,9 b' class KernelClient(ConnectionFileMixin):'
127 if self._shell_channel is None:
130 if self._shell_channel is None:
128 url = self._make_url('shell')
131 url = self._make_url('shell')
129 self.log.debug("connecting shell channel to %s", url)
132 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
130 self._shell_channel = self.shell_channel_class(
134 self._shell_channel = self.shell_channel_class(
131 self.context, self.session, url
135 socket, self.session
132 )
136 )
133 return self._shell_channel
137 return self._shell_channel
134
138
@@ -138,8 +142,9 b' class KernelClient(ConnectionFileMixin):'
138 if self._iopub_channel is None:
142 if self._iopub_channel is None:
139 url = self._make_url('iopub')
143 url = self._make_url('iopub')
140 self.log.debug("connecting iopub channel to %s", url)
144 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
141 self._iopub_channel = self.iopub_channel_class(
146 self._iopub_channel = self.iopub_channel_class(
142 self.context, self.session, url
147 socket, self.session
143 )
148 )
144 return self._iopub_channel
149 return self._iopub_channel
145
150
@@ -149,8 +154,9 b' class KernelClient(ConnectionFileMixin):'
149 if self._stdin_channel is None:
154 if self._stdin_channel is None:
150 url = self._make_url('stdin')
155 url = self._make_url('stdin')
151 self.log.debug("connecting stdin channel to %s", url)
156 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
152 self._stdin_channel = self.stdin_channel_class(
158 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, url
159 socket, self.session
154 )
160 )
155 return self._stdin_channel
161 return self._stdin_channel
156
162
@@ -55,12 +55,10 b' def validate_string_dict(dct):'
55
55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject, Thread):
57 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
58 context = None
59 session = None
58 session = None
60 socket = None
59 socket = None
61 ioloop = None
60 ioloop = None
62 stream = None
61 stream = None
63 _address = None
64 _exiting = False
62 _exiting = False
65 proxy_methods = []
63 proxy_methods = []
66
64
@@ -87,7 +85,7 b' class QtZMQSocketChannel(SuperQObject, Thread):'
87 """
85 """
88 QtCore.QCoreApplication.instance().processEvents()
86 QtCore.QCoreApplication.instance().processEvents()
89
87
90 def __init__(self, context, session, address):
88 def __init__(self, socket, session):
91 """Create a channel.
89 """Create a channel.
92
90
93 Parameters
91 Parameters
@@ -102,14 +100,8 b' class QtZMQSocketChannel(SuperQObject, Thread):'
102 super(QtZMQSocketChannel, self).__init__()
100 super(QtZMQSocketChannel, self).__init__()
103 self.daemon = True
101 self.daemon = True
104
102
105 self.context = context
103 self.socket = socket
106 self.session = session
104 self.session = session
107 if isinstance(address, tuple):
108 if address[1] == 0:
109 message = 'The port number for a channel cannot be 0.'
110 raise InvalidPortNumber(message)
111 address = "tcp://%s:%i" % address
112 self._address = address
113 atexit.register(self._notice_exit)
105 atexit.register(self._notice_exit)
114
106
115 def _notice_exit(self):
107 def _notice_exit(self):
@@ -218,13 +210,12 b' class QtShellChannel(QtZMQSocketChannel):'
218 history_reply = QtCore.Signal(object)
210 history_reply = QtCore.Signal(object)
219 kernel_info_reply = QtCore.Signal(object)
211 kernel_info_reply = QtCore.Signal(object)
220
212
221 def __init__(self, context, session, address):
213 def __init__(self, socket, session):
222 super(QtShellChannel, self).__init__(context, session, address)
214 super(QtShellChannel, self).__init__(socket, session)
223 self.ioloop = ioloop.IOLoop()
215 self.ioloop = ioloop.IOLoop()
224
216
225 def run(self):
217 def run(self):
226 """The thread's main activity. Call start() instead."""
218 """The thread's main activity. Call start() instead."""
227 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
228 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
219 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
229 self.stream.on_recv(self._handle_recv)
220 self.stream.on_recv(self._handle_recv)
230 self._run_loop()
221 self._run_loop()
@@ -279,13 +270,12 b' class QtIOPubChannel(QtZMQSocketChannel):'
279 # Emitted when a shutdown is noticed.
270 # Emitted when a shutdown is noticed.
280 shutdown_reply_received = QtCore.Signal(object)
271 shutdown_reply_received = QtCore.Signal(object)
281
272
282 def __init__(self, context, session, address):
273 def __init__(self, socket, session):
283 super(QtIOPubChannel, self).__init__(context, session, address)
274 super(QtIOPubChannel, self).__init__(socket, session)
284 self.ioloop = ioloop.IOLoop()
275 self.ioloop = ioloop.IOLoop()
285
276
286 def run(self):
277 def run(self):
287 """The thread's main activity. Call start() instead."""
278 """The thread's main activity. Call start() instead."""
288 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
289 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
279 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
290 self.stream.on_recv(self._handle_recv)
280 self.stream.on_recv(self._handle_recv)
291 self._run_loop()
281 self._run_loop()
@@ -338,13 +328,12 b' class QtStdInChannel(QtZMQSocketChannel):'
338 # Emitted when an input request is received.
328 # Emitted when an input request is received.
339 input_requested = QtCore.Signal(object)
329 input_requested = QtCore.Signal(object)
340
330
341 def __init__(self, context, session, address):
331 def __init__(self, socket, session):
342 super(QtStdInChannel, self).__init__(context, session, address)
332 super(QtStdInChannel, self).__init__(socket, session)
343 self.ioloop = ioloop.IOLoop()
333 self.ioloop = ioloop.IOLoop()
344
334
345 def run(self):
335 def run(self):
346 """The thread's main activity. Call start() instead."""
336 """The thread's main activity. Call start() instead."""
347 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
348 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
337 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
349 self.stream.on_recv(self._handle_recv)
338 self.stream.on_recv(self._handle_recv)
350 self._run_loop()
339 self._run_loop()
General Comments 0
You need to be logged in to leave comments. Login now