##// END OF EJS Templates
Factor out code to set up ZMQ sockets
Thomas Kluyver -
Show More
@@ -14,8 +14,9 b' try:'
14 except ImportError:
14 except ImportError:
15 from Queue import Queue, Empty # Py 2
15 from Queue import Queue, Empty # Py 2
16
16
17 from IPython.kernel.channels import IOPubChannel, HBChannel, \
17 from IPython.kernel.channels import HBChannel,\
18 ShellChannel, StdInChannel, InvalidPortNumber, major_protocol_version
18 make_iopub_socket, make_shell_socket, make_stdin_socket,\
19 InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
20 from IPython.utils.py3compat import string_types, iteritems
20
21
21 # some utilities to validate message structure, these might get moved elsewhere
22 # some utilities to validate message structure, these might get moved elsewhere
@@ -154,10 +155,7 b' class BlockingShellChannel(ZMQSocketChannel):'
154 ]
155 ]
155
156
156 def start(self):
157 def start(self):
157 self.socket = self.context.socket(zmq.DEALER)
158 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
158 self.socket.linger = 1000
159 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
160 self.socket.connect(self.address)
161
159
162 def execute(self, code, silent=False, store_history=True,
160 def execute(self, code, silent=False, store_history=True,
163 user_expressions=None, allow_stdin=None):
161 user_expressions=None, allow_stdin=None):
@@ -354,12 +352,7 b' class BlockingIOPubChannel(ZMQSocketChannel):'
354 This channel is where all output is published to frontends.
352 This channel is where all output is published to frontends.
355 """
353 """
356 def start(self):
354 def start(self):
357 self.socket = self.context.socket(zmq.SUB)
355 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
358 self.socket.linger = 1000
359 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
360 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
361 self.socket.connect(self.address)
362
363
356
364 class BlockingStdInChannel(ZMQSocketChannel):
357 class BlockingStdInChannel(ZMQSocketChannel):
365 """The stdin channel to handle raw_input requests that the kernel makes."""
358 """The stdin channel to handle raw_input requests that the kernel makes."""
@@ -367,10 +360,7 b' class BlockingStdInChannel(ZMQSocketChannel):'
367 proxy_methods = ['input']
360 proxy_methods = ['input']
368
361
369 def start(self):
362 def start(self):
370 self.socket = self.context.socket(zmq.DEALER)
363 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
371 self.socket.linger = 1000
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect(self.address)
374
364
375 def input(self, string):
365 def input(self, string):
376 """Send a string of raw input to the kernel."""
366 """Send a string of raw input to the kernel."""
@@ -500,6 +500,27 b' class StdInChannel(ZMQSocketChannel):'
500 msg = self.session.msg('input_reply', content)
500 msg = self.session.msg('input_reply', content)
501 self._queue_send(msg)
501 self._queue_send(msg)
502
502
503 def make_shell_socket(context, identity, address):
504 socket = context.socket(zmq.DEALER)
505 socket.linger = 1000
506 socket.setsockopt(zmq.IDENTITY, identity)
507 socket.connect(address)
508 return socket
509
510 def make_iopub_socket(context, identity, address):
511 socket = context.socket(zmq.SUB)
512 socket.linger = 1000
513 socket.setsockopt(zmq.SUBSCRIBE,b'')
514 socket.setsockopt(zmq.IDENTITY, identity)
515 socket.connect(address)
516 return socket
517
518 def make_stdin_socket(context, identity, address):
519 socket = context.socket(zmq.DEALER)
520 socket.linger = 1000
521 socket.setsockopt(zmq.IDENTITY, identity)
522 socket.connect(address)
523 return socket
503
524
504 class HBChannel(ZMQSocketChannel):
525 class HBChannel(ZMQSocketChannel):
505 """The heartbeat channel which monitors the kernel heartbeat.
526 """The heartbeat channel which monitors the kernel heartbeat.
@@ -15,7 +15,8 b' from IPython.external.qt import QtCore'
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
20
21
21 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
@@ -39,16 +40,6 b' class InvalidPortNumber(Exception):'
39 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
40 # if they prove to have more generic utility
41 # if they prove to have more generic utility
41
42
42 def validate_string_list(lst):
43 """Validate that the input is a list of strings.
44
45 Raises ValueError if not."""
46 if not isinstance(lst, list):
47 raise ValueError('input %r must be a list' % lst)
48 for x in lst:
49 if not isinstance(x, string_types):
50 raise ValueError('element %r in list must be a string' % x)
51
52
43
53 def validate_string_dict(dct):
44 def validate_string_dict(dct):
54 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
@@ -246,10 +237,7 b' class QtShellChannel(QtZMQSocketChannel):'
246
237
247 def run(self):
238 def run(self):
248 """The thread's main activity. Call start() instead."""
239 """The thread's main activity. Call start() instead."""
249 self.socket = self.context.socket(zmq.DEALER)
240 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
250 self.socket.linger = 1000
251 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
252 self.socket.connect(self.address)
253 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
241 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
254 self.stream.on_recv(self._handle_recv)
242 self.stream.on_recv(self._handle_recv)
255 self._run_loop()
243 self._run_loop()
@@ -482,11 +470,7 b' class QtIOPubChannel(QtZMQSocketChannel):'
482
470
483 def run(self):
471 def run(self):
484 """The thread's main activity. Call start() instead."""
472 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.SUB)
473 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
486 self.socket.linger = 1000
487 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
488 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
489 self.socket.connect(self.address)
490 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
474 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
491 self.stream.on_recv(self._handle_recv)
475 self.stream.on_recv(self._handle_recv)
492 self._run_loop()
476 self._run_loop()
@@ -545,10 +529,7 b' class QtStdInChannel(QtZMQSocketChannel):'
545
529
546 def run(self):
530 def run(self):
547 """The thread's main activity. Call start() instead."""
531 """The thread's main activity. Call start() instead."""
548 self.socket = self.context.socket(zmq.DEALER)
532 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
549 self.socket.linger = 1000
550 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
551 self.socket.connect(self.address)
552 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
533 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
553 self.stream.on_recv(self._handle_recv)
534 self.stream.on_recv(self._handle_recv)
554 self._run_loop()
535 self._run_loop()
General Comments 0
You need to be logged in to leave comments. Login now