##// END OF EJS Templates
Factor out code to set up ZMQ sockets
Thomas Kluyver -
Show More
@@ -14,8 +14,9 b' try:'
14 14 except ImportError:
15 15 from Queue import Queue, Empty # Py 2
16 16
17 from IPython.kernel.channels import IOPubChannel, HBChannel, \
18 ShellChannel, StdInChannel, InvalidPortNumber, major_protocol_version
17 from IPython.kernel.channels import HBChannel,\
18 make_iopub_socket, make_shell_socket, make_stdin_socket,\
19 InvalidPortNumber, major_protocol_version
19 20 from IPython.utils.py3compat import string_types, iteritems
20 21
21 22 # some utilities to validate message structure, these might get moved elsewhere
@@ -154,10 +155,7 b' class BlockingShellChannel(ZMQSocketChannel):'
154 155 ]
155 156
156 157 def start(self):
157 self.socket = self.context.socket(zmq.DEALER)
158 self.socket.linger = 1000
159 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
160 self.socket.connect(self.address)
158 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
161 159
162 160 def execute(self, code, silent=False, store_history=True,
163 161 user_expressions=None, allow_stdin=None):
@@ -354,12 +352,7 b' class BlockingIOPubChannel(ZMQSocketChannel):'
354 352 This channel is where all output is published to frontends.
355 353 """
356 354 def start(self):
357 self.socket = self.context.socket(zmq.SUB)
358 self.socket.linger = 1000
359 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
360 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
361 self.socket.connect(self.address)
362
355 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
363 356
364 357 class BlockingStdInChannel(ZMQSocketChannel):
365 358 """The stdin channel to handle raw_input requests that the kernel makes."""
@@ -367,10 +360,7 b' class BlockingStdInChannel(ZMQSocketChannel):'
367 360 proxy_methods = ['input']
368 361
369 362 def start(self):
370 self.socket = self.context.socket(zmq.DEALER)
371 self.socket.linger = 1000
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect(self.address)
363 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
374 364
375 365 def input(self, string):
376 366 """Send a string of raw input to the kernel."""
@@ -500,6 +500,27 b' class StdInChannel(ZMQSocketChannel):'
500 500 msg = self.session.msg('input_reply', content)
501 501 self._queue_send(msg)
502 502
503 def make_shell_socket(context, identity, address):
504 socket = context.socket(zmq.DEALER)
505 socket.linger = 1000
506 socket.setsockopt(zmq.IDENTITY, identity)
507 socket.connect(address)
508 return socket
509
510 def make_iopub_socket(context, identity, address):
511 socket = context.socket(zmq.SUB)
512 socket.linger = 1000
513 socket.setsockopt(zmq.SUBSCRIBE,b'')
514 socket.setsockopt(zmq.IDENTITY, identity)
515 socket.connect(address)
516 return socket
517
518 def make_stdin_socket(context, identity, address):
519 socket = context.socket(zmq.DEALER)
520 socket.linger = 1000
521 socket.setsockopt(zmq.IDENTITY, identity)
522 socket.connect(address)
523 return socket
503 524
504 525 class HBChannel(ZMQSocketChannel):
505 526 """The heartbeat channel which monitors the kernel heartbeat.
@@ -15,7 +15,8 b' from IPython.external.qt import QtCore'
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 20 from IPython.kernel import KernelClient
20 21
21 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
@@ -39,16 +40,6 b' class InvalidPortNumber(Exception):'
39 40 # some utilities to validate message structure, these might get moved elsewhere
40 41 # if they prove to have more generic utility
41 42
42 def validate_string_list(lst):
43 """Validate that the input is a list of strings.
44
45 Raises ValueError if not."""
46 if not isinstance(lst, list):
47 raise ValueError('input %r must be a list' % lst)
48 for x in lst:
49 if not isinstance(x, string_types):
50 raise ValueError('element %r in list must be a string' % x)
51
52 43
53 44 def validate_string_dict(dct):
54 45 """Validate that the input is a dict with string keys and values.
@@ -246,10 +237,7 b' class QtShellChannel(QtZMQSocketChannel):'
246 237
247 238 def run(self):
248 239 """The thread's main activity. Call start() instead."""
249 self.socket = self.context.socket(zmq.DEALER)
250 self.socket.linger = 1000
251 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
252 self.socket.connect(self.address)
240 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
253 241 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
254 242 self.stream.on_recv(self._handle_recv)
255 243 self._run_loop()
@@ -482,11 +470,7 b' class QtIOPubChannel(QtZMQSocketChannel):'
482 470
483 471 def run(self):
484 472 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.SUB)
486 self.socket.linger = 1000
487 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
488 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
489 self.socket.connect(self.address)
473 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
490 474 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
491 475 self.stream.on_recv(self._handle_recv)
492 476 self._run_loop()
@@ -545,10 +529,7 b' class QtStdInChannel(QtZMQSocketChannel):'
545 529
546 530 def run(self):
547 531 """The thread's main activity. Call start() instead."""
548 self.socket = self.context.socket(zmq.DEALER)
549 self.socket.linger = 1000
550 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
551 self.socket.connect(self.address)
532 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
552 533 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
553 534 self.stream.on_recv(self._handle_recv)
554 535 self._run_loop()
General Comments 0
You need to be logged in to leave comments. Login now