##// END OF EJS Templates
enable IPC transport for kernels...
MinRK -
Show More
@@ -24,6 +24,7 b' Authors:'
24 24 import atexit
25 25 import json
26 26 import os
27 import shutil
27 28 import signal
28 29 import sys
29 30 import uuid
@@ -38,7 +39,7 b' from IPython.zmq.blockingkernelmanager import BlockingKernelManager'
38 39 from IPython.utils.path import filefind
39 40 from IPython.utils.py3compat import str_to_bytes
40 41 from IPython.utils.traitlets import (
41 Dict, List, Unicode, CUnicode, Int, CBool, Any
42 Dict, List, Unicode, CUnicode, Int, CBool, Any, CaselessStrEnum
42 43 )
43 44 from IPython.zmq.ipkernel import (
44 45 flags as ipkernel_flags,
@@ -151,12 +152,27 b' class IPythonConsoleApp(Configurable):'
151 152 # create requested profiles by default, if they don't exist:
152 153 auto_create = CBool(True)
153 154 # connection info:
154 ip = Unicode(LOCALHOST, config=True,
155
156 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
157
158 ip = Unicode(config=True,
155 159 help="""Set the kernel\'s IP address [default localhost].
156 160 If the IP address is something other than localhost, then
157 161 Consoles on other machines will be able to connect
158 162 to the Kernel, so be careful!"""
159 163 )
164 def _ip_default(self):
165 if self.transport == 'tcp':
166 return LOCALHOST
167 else:
168 # this can fire early if ip is given,
169 # in which case our return value is meaningless
170 if not hasattr(self, 'profile_dir'):
171 return ''
172 ipcdir = os.path.join(self.profile_dir.security_dir, 'kernel-%s' % os.getpid())
173 os.makedirs(ipcdir)
174 atexit.register(lambda : shutil.rmtree(ipcdir))
175 return os.path.join(ipcdir, 'ipc')
160 176
161 177 sshserver = Unicode('', config=True,
162 178 help="""The SSH server to use to connect to the kernel.""")
@@ -166,11 +182,11 b' class IPythonConsoleApp(Configurable):'
166 182 hb_port = Int(0, config=True,
167 183 help="set the heartbeat port [default: random]")
168 184 shell_port = Int(0, config=True,
169 help="set the shell (XREP) port [default: random]")
185 help="set the shell (ROUTER) port [default: random]")
170 186 iopub_port = Int(0, config=True,
171 187 help="set the iopub (PUB) port [default: random]")
172 188 stdin_port = Int(0, config=True,
173 help="set the stdin (XREQ) port [default: random]")
189 help="set the stdin (DEALER) port [default: random]")
174 190 connection_file = Unicode('', config=True,
175 191 help="""JSON file in which to store connection info [default: kernel-<pid>.json]
176 192
@@ -256,10 +272,10 b' class IPythonConsoleApp(Configurable):'
256 272 return
257 273 self.log.debug(u"Loading connection file %s", fname)
258 274 with open(fname) as f:
259 s = f.read()
260 cfg = json.loads(s)
261 if self.ip == LOCALHOST and 'ip' in cfg:
262 # not overridden by config or cl_args
275 cfg = json.load(f)
276
277 self.transport = cfg.get('transport', 'tcp')
278 if 'ip' in cfg:
263 279 self.ip = cfg['ip']
264 280 for channel in ('hb', 'shell', 'iopub', 'stdin'):
265 281 name = channel + '_port'
@@ -268,12 +284,17 b' class IPythonConsoleApp(Configurable):'
268 284 setattr(self, name, cfg[name])
269 285 if 'key' in cfg:
270 286 self.config.Session.key = str_to_bytes(cfg['key'])
287
271 288
272 289 def init_ssh(self):
273 290 """set up ssh tunnels, if needed."""
274 291 if not self.sshserver and not self.sshkey:
275 292 return
276 293
294 if self.transport != 'tcp':
295 self.log.error("Can only use ssh tunnels with TCP sockets, not %s", self.transport)
296 return
297
277 298 if self.sshkey and not self.sshserver:
278 299 # specifying just the key implies that we are connecting directly
279 300 self.sshserver = self.ip
@@ -326,6 +347,7 b' class IPythonConsoleApp(Configurable):'
326 347
327 348 # Create a KernelManager and start a kernel.
328 349 self.kernel_manager = self.kernel_manager_class(
350 transport=self.transport,
329 351 ip=self.ip,
330 352 shell_port=self.shell_port,
331 353 iopub_port=self.iopub_port,
@@ -21,7 +21,7 b' from IPython.utils.py3compat import bytes_to_str'
21 21 from parentpoller import ParentPollerWindows
22 22
23 23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 ip=LOCALHOST, key=b''):
24 ip=LOCALHOST, key=b'', transport='tcp'):
25 25 """Generates a JSON config file, including the selection of random ports.
26 26
27 27 Parameters
@@ -54,17 +54,26 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
54 54 fname = tempfile.mktemp('.json')
55 55
56 56 # Find open ports as necessary.
57
57 58 ports = []
58 59 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 60 int(stdin_port <= 0) + int(hb_port <= 0)
60 for i in xrange(ports_needed):
61 sock = socket.socket()
62 sock.bind(('', 0))
63 ports.append(sock)
64 for i, sock in enumerate(ports):
65 port = sock.getsockname()[1]
66 sock.close()
67 ports[i] = port
61 if transport == 'tcp':
62 for i in range(ports_needed):
63 sock = socket.socket()
64 sock.bind(('', 0))
65 ports.append(sock)
66 for i, sock in enumerate(ports):
67 port = sock.getsockname()[1]
68 sock.close()
69 ports[i] = port
70 else:
71 N = 1
72 for i in range(ports_needed):
73 while os.path.exists("%s-%s" % (ip, str(N))):
74 N += 1
75 ports.append(N)
76 N += 1
68 77 if shell_port <= 0:
69 78 shell_port = ports.pop(0)
70 79 if iopub_port <= 0:
@@ -81,6 +90,7 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
81 90 )
82 91 cfg['ip'] = ip
83 92 cfg['key'] = bytes_to_str(key)
93 cfg['transport'] = transport
84 94
85 95 with open(fname, 'w') as f:
86 96 f.write(json.dumps(cfg, indent=2))
@@ -12,6 +12,7 b''
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 import os
15 16 import socket
16 17 import sys
17 18 from threading import Thread
@@ -28,21 +29,28 b' from IPython.utils.localinterfaces import LOCALHOST'
28 29 class Heartbeat(Thread):
29 30 "A simple ping-pong style heartbeat that runs in a thread."
30 31
31 def __init__(self, context, addr=(LOCALHOST, 0)):
32 def __init__(self, context, addr=('tcp', LOCALHOST, 0)):
32 33 Thread.__init__(self)
33 34 self.context = context
34 self.ip, self.port = addr
35 self.transport, self.ip, self.port = addr
35 36 if self.port == 0:
36 s = socket.socket()
37 # '*' means all interfaces to 0MQ, which is '' to socket.socket
38 s.bind(('' if self.ip == '*' else self.ip, 0))
39 self.port = s.getsockname()[1]
40 s.close()
37 if addr[0] == 'tcp':
38 s = socket.socket()
39 # '*' means all interfaces to 0MQ, which is '' to socket.socket
40 s.bind(('' if self.ip == '*' else self.ip, 0))
41 self.port = s.getsockname()[1]
42 s.close()
43 elif addr[0] == 'ipc':
44 while os.path.exists(self.ip + '-' + self.port):
45 self.port = self.port + 1
46 else:
47 raise ValueError("Unrecognized zmq transport: %s" % addr[0])
41 48 self.addr = (self.ip, self.port)
42 49 self.daemon = True
43 50
44 51 def run(self):
45 52 self.socket = self.context.socket(zmq.REP)
46 self.socket.bind('tcp://%s:%i' % self.addr)
53 c = ':' if self.transport == 'tcp' else '-'
54 self.socket.bind('%s://%s' % (self.transport, self.ip) + c + str(self.port))
47 55 zmq.device(zmq.FORWARDER, self.socket, self.socket)
48 56
@@ -35,8 +35,10 b' from IPython.utils import io'
35 35 from IPython.utils.localinterfaces import LOCALHOST
36 36 from IPython.utils.path import filefind
37 37 from IPython.utils.py3compat import str_to_bytes
38 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
39 DottedObjectName)
38 from IPython.utils.traitlets import (
39 Any, Instance, Dict, Unicode, Integer, Bool, CaselessStrEnum,
40 DottedObjectName,
41 )
40 42 from IPython.utils.importstring import import_item
41 43 # local imports
42 44 from IPython.zmq.entry_point import write_connection_file
@@ -109,6 +111,7 b' class KernelApp(BaseIPythonApplication):'
109 111 self.config_file_specified = False
110 112
111 113 # connection info:
114 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
112 115 ip = Unicode(LOCALHOST, config=True,
113 116 help="Set the IP or interface on which the kernel will listen.")
114 117 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
@@ -154,11 +157,12 b' class KernelApp(BaseIPythonApplication):'
154 157 self.poller = ParentPollerUnix()
155 158
156 159 def _bind_socket(self, s, port):
157 iface = 'tcp://%s' % self.ip
158 if port <= 0:
160 iface = '%s://%s' % (self.transport, self.ip)
161 if port <= 0 and self.transport == 'tcp':
159 162 port = s.bind_to_random_port(iface)
160 163 else:
161 s.bind(iface + ':%i'%port)
164 c = ':' if self.transport == 'tcp' else '-'
165 s.bind(iface + c + str(port))
162 166 return port
163 167
164 168 def load_connection_file(self):
@@ -174,6 +178,7 b' class KernelApp(BaseIPythonApplication):'
174 178 with open(fname) as f:
175 179 s = f.read()
176 180 cfg = json.loads(s)
181 self.transport = cfg.get('transport', self.transport)
177 182 if self.ip == LOCALHOST and 'ip' in cfg:
178 183 # not overridden by config or cl_args
179 184 self.ip = cfg['ip']
@@ -191,7 +196,7 b' class KernelApp(BaseIPythonApplication):'
191 196 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
192 197 else:
193 198 cf = self.connection_file
194 write_connection_file(cf, ip=self.ip, key=self.session.key,
199 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
195 200 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
196 201 iopub_port=self.iopub_port)
197 202
@@ -204,6 +209,19 b' class KernelApp(BaseIPythonApplication):'
204 209 os.remove(cf)
205 210 except (IOError, OSError):
206 211 pass
212
213 self._cleanup_ipc_files()
214
215 def _cleanup_ipc_files(self):
216 """cleanup ipc files if we wrote them"""
217 if self.transport != 'ipc':
218 return
219 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
220 ipcfile = "%s-%i" % (self.ip, port)
221 try:
222 os.remove(ipcfile)
223 except (IOError, OSError):
224 pass
207 225
208 226 def init_connection_file(self):
209 227 if not self.connection_file:
@@ -238,7 +256,7 b' class KernelApp(BaseIPythonApplication):'
238 256 # heartbeat doesn't share context, because it mustn't be blocked
239 257 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
240 258 hb_ctx = zmq.Context()
241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
259 self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port))
242 260 self.hb_port = self.heartbeat.port
243 261 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 262 self.heartbeat.start()
@@ -37,7 +37,7 b' from zmq.eventloop import ioloop, zmqstream'
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
@@ -103,7 +103,7 b' class ZMQSocketChannel(Thread):'
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 address : tuple
106 address : zmq url
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
@@ -111,9 +111,11 b' class ZMQSocketChannel(Thread):'
111 111
112 112 self.context = context
113 113 self.session = session
114 if address[1] == 0:
115 message = 'The port number for a channel cannot be 0.'
116 raise InvalidPortNumber(message)
114 if isinstance(address, tuple):
115 if address[1] == 0:
116 message = 'The port number for a channel cannot be 0.'
117 raise InvalidPortNumber(message)
118 address = "tcp://%s:%i" % address
117 119 self._address = address
118 120 atexit.register(self._notice_exit)
119 121
@@ -149,10 +151,7 b' class ZMQSocketChannel(Thread):'
149 151
150 152 @property
151 153 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
153
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
154 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
156 155 """
157 156 return self._address
158 157
@@ -196,7 +195,7 b' class ShellSocketChannel(ZMQSocketChannel):'
196 195 """The thread's main activity. Call start() instead."""
197 196 self.socket = self.context.socket(zmq.DEALER)
198 197 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
198 self.socket.connect(self.address)
200 199 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 200 self.stream.on_recv(self._handle_recv)
202 201 self._run_loop()
@@ -390,7 +389,7 b' class SubSocketChannel(ZMQSocketChannel):'
390 389 self.socket = self.context.socket(zmq.SUB)
391 390 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
392 391 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
393 self.socket.connect('tcp://%s:%i' % self.address)
392 self.socket.connect(self.address)
394 393 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
395 394 self.stream.on_recv(self._handle_recv)
396 395 self._run_loop()
@@ -456,7 +455,7 b' class StdInSocketChannel(ZMQSocketChannel):'
456 455 """The thread's main activity. Call start() instead."""
457 456 self.socket = self.context.socket(zmq.DEALER)
458 457 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
459 self.socket.connect('tcp://%s:%i' % self.address)
458 self.socket.connect(self.address)
460 459 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
461 460 self.stream.on_recv(self._handle_recv)
462 461 self._run_loop()
@@ -515,7 +514,7 b' class HBSocketChannel(ZMQSocketChannel):'
515 514 self.socket.close()
516 515 self.socket = self.context.socket(zmq.REQ)
517 516 self.socket.setsockopt(zmq.LINGER, 0)
518 self.socket.connect('tcp://%s:%i' % self.address)
517 self.socket.connect(self.address)
519 518
520 519 self.poller.register(self.socket, zmq.POLLIN)
521 520
@@ -654,6 +653,10 b' class KernelManager(HasTraits):'
654 653
655 654 # The addresses for the communication channels.
656 655 connection_file = Unicode('')
656
657 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp')
658
659
657 660 ip = Unicode(LOCALHOST)
658 661 def _ip_changed(self, name, old, new):
659 662 if new == '*':
@@ -742,7 +745,20 b' class KernelManager(HasTraits):'
742 745 self._connection_file_written = False
743 746 try:
744 747 os.remove(self.connection_file)
745 except OSError:
748 except (IOError, OSError):
749 pass
750
751 self._cleanup_ipc_files()
752
753 def _cleanup_ipc_files(self):
754 """cleanup ipc files if we wrote them"""
755 if self.transport != 'ipc':
756 return
757 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
758 ipcfile = "%s-%i" % (self.ip, port)
759 try:
760 os.remove(ipcfile)
761 except (IOError, OSError):
746 762 pass
747 763
748 764 def load_connection_file(self):
@@ -750,6 +766,9 b' class KernelManager(HasTraits):'
750 766 with open(self.connection_file) as f:
751 767 cfg = json.loads(f.read())
752 768
769 from pprint import pprint
770 pprint(cfg)
771 self.transport = cfg.get('transport', 'tcp')
753 772 self.ip = cfg['ip']
754 773 self.shell_port = cfg['shell_port']
755 774 self.stdin_port = cfg['stdin_port']
@@ -762,7 +781,7 b' class KernelManager(HasTraits):'
762 781 if self._connection_file_written:
763 782 return
764 783 self.connection_file,cfg = write_connection_file(self.connection_file,
765 ip=self.ip, key=self.session.key,
784 transport=self.transport, ip=self.ip, key=self.session.key,
766 785 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
767 786 shell_port=self.shell_port, hb_port=self.hb_port)
768 787 # write_connection_file also sets default ports:
@@ -789,7 +808,7 b' class KernelManager(HasTraits):'
789 808 **kw : optional
790 809 See respective options for IPython and Python kernels.
791 810 """
792 if self.ip not in LOCAL_IPS:
811 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
793 812 raise RuntimeError("Can only launch a kernel on a local interface. "
794 813 "Make sure that the '*_address' attributes are "
795 814 "configured properly. "
@@ -956,13 +975,21 b' class KernelManager(HasTraits):'
956 975 # Channels used for communication with the kernel:
957 976 #--------------------------------------------------------------------------
958 977
978 def _make_url(self, port):
979 """make a zmq url with a port"""
980 if self.transport == 'tcp':
981 return "tcp://%s:%i" % (self.ip, port)
982 else:
983 return "%s://%s-%s" % (self.transport, self.ip, port)
984
959 985 @property
960 986 def shell_channel(self):
961 987 """Get the REQ socket channel object to make requests of the kernel."""
962 988 if self._shell_channel is None:
963 989 self._shell_channel = self.shell_channel_class(self.context,
964 self.session,
965 (self.ip, self.shell_port))
990 self.session,
991 self._make_url(self.shell_port),
992 )
966 993 return self._shell_channel
967 994
968 995 @property
@@ -971,7 +998,8 b' class KernelManager(HasTraits):'
971 998 if self._sub_channel is None:
972 999 self._sub_channel = self.sub_channel_class(self.context,
973 1000 self.session,
974 (self.ip, self.iopub_port))
1001 self._make_url(self.iopub_port),
1002 )
975 1003 return self._sub_channel
976 1004
977 1005 @property
@@ -979,8 +1007,9 b' class KernelManager(HasTraits):'
979 1007 """Get the REP socket channel object to handle stdin (raw_input)."""
980 1008 if self._stdin_channel is None:
981 1009 self._stdin_channel = self.stdin_channel_class(self.context,
982 self.session,
983 (self.ip, self.stdin_port))
1010 self.session,
1011 self._make_url(self.stdin_port),
1012 )
984 1013 return self._stdin_channel
985 1014
986 1015 @property
@@ -989,6 +1018,7 b' class KernelManager(HasTraits):'
989 1018 kernel is alive."""
990 1019 if self._hb_channel is None:
991 1020 self._hb_channel = self.hb_channel_class(self.context,
992 self.session,
993 (self.ip, self.hb_port))
1021 self.session,
1022 self._make_url(self.hb_port),
1023 )
994 1024 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now