##// END OF EJS Templates
Move ZMQ socket creation out of channels
Thomas Kluyver -
Show More
@@ -54,7 +54,7 b' class ZMQSocketChannel(object):'
54 54 _exiting = False
55 55 proxy_methods = []
56 56
57 def __init__(self, context, session, address):
57 def __init__(self, socket, session):
58 58 """Create a channel.
59 59
60 60 Parameters
@@ -69,14 +69,8 b' class ZMQSocketChannel(object):'
69 69 super(ZMQSocketChannel, self).__init__()
70 70 self.daemon = True
71 71
72 self.context = context
72 self.socket = socket
73 73 self.session = session
74 if isinstance(address, tuple):
75 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
78 address = "tcp://%s:%i" % address
79 self._address = address
80 74
81 75 def _recv(self, **kwargs):
82 76 msg = self.socket.recv_multipart(**kwargs)
@@ -136,6 +130,9 b' class ZMQSocketChannel(object):'
136 130 """
137 131 self.session.send(self.socket, msg)
138 132
133 def start(self):
134 pass
135
139 136
140 137 class BlockingShellChannel(ZMQSocketChannel):
141 138 """The shell channel for issuing request/replies to the kernel."""
@@ -12,10 +12,7 b' except ImportError:'
12 12
13 13 from IPython.utils.traitlets import Type
14 14 from IPython.kernel.client import KernelClient
15 from .channels import (
16 BlockingIOPubChannel, BlockingHBChannel,
17 BlockingShellChannel, BlockingStdInChannel
18 )
15 from .channels import ZMQSocketChannel, BlockingHBChannel
19 16
20 17 class BlockingKernelClient(KernelClient):
21 18 def wait_for_ready(self):
@@ -35,7 +32,7 b' class BlockingKernelClient(KernelClient):'
35 32 break
36 33
37 34 # The classes to use for the various channels
38 shell_channel_class = Type(BlockingShellChannel)
39 iopub_channel_class = Type(BlockingIOPubChannel)
40 stdin_channel_class = Type(BlockingStdInChannel)
35 shell_channel_class = Type(ZMQSocketChannel)
36 iopub_channel_class = Type(ZMQSocketChannel)
37 stdin_channel_class = Type(ZMQSocketChannel)
41 38 hb_channel_class = Type(BlockingHBChannel)
@@ -16,6 +16,9 b' from IPython.utils.traitlets import ('
16 16 from .channelsabc import (
17 17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 18 )
19 from .channels import (
20 make_shell_socket, make_stdin_socket, make_iopub_socket
21 )
19 22 from .clientabc import KernelClientABC
20 23 from .connect import ConnectionFileMixin
21 24
@@ -127,8 +130,9 b' class KernelClient(ConnectionFileMixin):'
127 130 if self._shell_channel is None:
128 131 url = self._make_url('shell')
129 132 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
130 134 self._shell_channel = self.shell_channel_class(
131 self.context, self.session, url
135 socket, self.session
132 136 )
133 137 return self._shell_channel
134 138
@@ -138,8 +142,9 b' class KernelClient(ConnectionFileMixin):'
138 142 if self._iopub_channel is None:
139 143 url = self._make_url('iopub')
140 144 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
141 146 self._iopub_channel = self.iopub_channel_class(
142 self.context, self.session, url
147 socket, self.session
143 148 )
144 149 return self._iopub_channel
145 150
@@ -149,8 +154,9 b' class KernelClient(ConnectionFileMixin):'
149 154 if self._stdin_channel is None:
150 155 url = self._make_url('stdin')
151 156 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
152 158 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, url
159 socket, self.session
154 160 )
155 161 return self._stdin_channel
156 162
@@ -55,12 +55,10 b' def validate_string_dict(dct):'
55 55
56 56 class QtZMQSocketChannel(SuperQObject, Thread):
57 57 """The base class for the channels that use ZMQ sockets."""
58 context = None
59 58 session = None
60 59 socket = None
61 60 ioloop = None
62 61 stream = None
63 _address = None
64 62 _exiting = False
65 63 proxy_methods = []
66 64
@@ -87,7 +85,7 b' class QtZMQSocketChannel(SuperQObject, Thread):'
87 85 """
88 86 QtCore.QCoreApplication.instance().processEvents()
89 87
90 def __init__(self, context, session, address):
88 def __init__(self, socket, session):
91 89 """Create a channel.
92 90
93 91 Parameters
@@ -102,14 +100,8 b' class QtZMQSocketChannel(SuperQObject, Thread):'
102 100 super(QtZMQSocketChannel, self).__init__()
103 101 self.daemon = True
104 102
105 self.context = context
103 self.socket = socket
106 104 self.session = session
107 if isinstance(address, tuple):
108 if address[1] == 0:
109 message = 'The port number for a channel cannot be 0.'
110 raise InvalidPortNumber(message)
111 address = "tcp://%s:%i" % address
112 self._address = address
113 105 atexit.register(self._notice_exit)
114 106
115 107 def _notice_exit(self):
@@ -218,13 +210,12 b' class QtShellChannel(QtZMQSocketChannel):'
218 210 history_reply = QtCore.Signal(object)
219 211 kernel_info_reply = QtCore.Signal(object)
220 212
221 def __init__(self, context, session, address):
222 super(QtShellChannel, self).__init__(context, session, address)
213 def __init__(self, socket, session):
214 super(QtShellChannel, self).__init__(socket, session)
223 215 self.ioloop = ioloop.IOLoop()
224 216
225 217 def run(self):
226 218 """The thread's main activity. Call start() instead."""
227 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
228 219 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
229 220 self.stream.on_recv(self._handle_recv)
230 221 self._run_loop()
@@ -279,13 +270,12 b' class QtIOPubChannel(QtZMQSocketChannel):'
279 270 # Emitted when a shutdown is noticed.
280 271 shutdown_reply_received = QtCore.Signal(object)
281 272
282 def __init__(self, context, session, address):
283 super(QtIOPubChannel, self).__init__(context, session, address)
273 def __init__(self, socket, session):
274 super(QtIOPubChannel, self).__init__(socket, session)
284 275 self.ioloop = ioloop.IOLoop()
285 276
286 277 def run(self):
287 278 """The thread's main activity. Call start() instead."""
288 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
289 279 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
290 280 self.stream.on_recv(self._handle_recv)
291 281 self._run_loop()
@@ -338,13 +328,12 b' class QtStdInChannel(QtZMQSocketChannel):'
338 328 # Emitted when an input request is received.
339 329 input_requested = QtCore.Signal(object)
340 330
341 def __init__(self, context, session, address):
342 super(QtStdInChannel, self).__init__(context, session, address)
331 def __init__(self, socket, session):
332 super(QtStdInChannel, self).__init__(socket, session)
343 333 self.ioloop = ioloop.IOLoop()
344 334
345 335 def run(self):
346 336 """The thread's main activity. Call start() instead."""
347 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
348 337 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
349 338 self.stream.on_recv(self._handle_recv)
350 339 self._run_loop()
General Comments 0
You need to be logged in to leave comments. Login now