##// END OF EJS Templates
Merge pull request #1868 from minrk/ipc...
Bussonnier Matthias -
r8501:af13023a merge
parent child Browse files
Show More
@@ -24,6 +24,7 b' Authors:'
24 24 import atexit
25 25 import json
26 26 import os
27 import shutil
27 28 import signal
28 29 import sys
29 30 import uuid
@@ -38,7 +39,7 b' from IPython.zmq.blockingkernelmanager import BlockingKernelManager'
38 39 from IPython.utils.path import filefind
39 40 from IPython.utils.py3compat import str_to_bytes
40 41 from IPython.utils.traitlets import (
41 Dict, List, Unicode, CUnicode, Int, CBool, Any
42 Dict, List, Unicode, CUnicode, Int, CBool, Any, CaselessStrEnum
42 43 )
43 44 from IPython.zmq.ipkernel import (
44 45 flags as ipkernel_flags,
@@ -151,12 +152,27 b' class IPythonConsoleApp(Configurable):'
151 152 # create requested profiles by default, if they don't exist:
152 153 auto_create = CBool(True)
153 154 # connection info:
154 ip = Unicode(LOCALHOST, config=True,
155
156 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
157
158 ip = Unicode(config=True,
155 159 help="""Set the kernel\'s IP address [default localhost].
156 160 If the IP address is something other than localhost, then
157 161 Consoles on other machines will be able to connect
158 162 to the Kernel, so be careful!"""
159 163 )
164 def _ip_default(self):
165 if self.transport == 'tcp':
166 return LOCALHOST
167 else:
168 # this can fire early if ip is given,
169 # in which case our return value is meaningless
170 if not hasattr(self, 'profile_dir'):
171 return ''
172 ipcdir = os.path.join(self.profile_dir.security_dir, 'kernel-%s' % os.getpid())
173 os.makedirs(ipcdir)
174 atexit.register(lambda : shutil.rmtree(ipcdir))
175 return os.path.join(ipcdir, 'ipc')
160 176
161 177 sshserver = Unicode('', config=True,
162 178 help="""The SSH server to use to connect to the kernel.""")
@@ -256,10 +272,10 b' class IPythonConsoleApp(Configurable):'
256 272 return
257 273 self.log.debug(u"Loading connection file %s", fname)
258 274 with open(fname) as f:
259 s = f.read()
260 cfg = json.loads(s)
261 if self.ip == LOCALHOST and 'ip' in cfg:
262 # not overridden by config or cl_args
275 cfg = json.load(f)
276
277 self.transport = cfg.get('transport', 'tcp')
278 if 'ip' in cfg:
263 279 self.ip = cfg['ip']
264 280 for channel in ('hb', 'shell', 'iopub', 'stdin'):
265 281 name = channel + '_port'
@@ -269,11 +285,16 b' class IPythonConsoleApp(Configurable):'
269 285 if 'key' in cfg:
270 286 self.config.Session.key = str_to_bytes(cfg['key'])
271 287
288
272 289 def init_ssh(self):
273 290 """set up ssh tunnels, if needed."""
274 291 if not self.sshserver and not self.sshkey:
275 292 return
276 293
294 if self.transport != 'tcp':
295 self.log.error("Can only use ssh tunnels with TCP sockets, not %s", self.transport)
296 return
297
277 298 if self.sshkey and not self.sshserver:
278 299 # specifying just the key implies that we are connecting directly
279 300 self.sshserver = self.ip
@@ -326,6 +347,7 b' class IPythonConsoleApp(Configurable):'
326 347
327 348 # Create a KernelManager and start a kernel.
328 349 self.kernel_manager = self.kernel_manager_class(
350 transport=self.transport,
329 351 ip=self.ip,
330 352 shell_port=self.shell_port,
331 353 iopub_port=self.iopub_port,
@@ -21,7 +21,7 b' from IPython.utils.py3compat import bytes_to_str'
21 21 from parentpoller import ParentPollerWindows
22 22
23 23 def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0,
24 ip=LOCALHOST, key=b''):
24 ip=LOCALHOST, key=b'', transport='tcp'):
25 25 """Generates a JSON config file, including the selection of random ports.
26 26
27 27 Parameters
@@ -54,10 +54,12 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
54 54 fname = tempfile.mktemp('.json')
55 55
56 56 # Find open ports as necessary.
57
57 58 ports = []
58 59 ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \
59 60 int(stdin_port <= 0) + int(hb_port <= 0)
60 for i in xrange(ports_needed):
61 if transport == 'tcp':
62 for i in range(ports_needed):
61 63 sock = socket.socket()
62 64 sock.bind(('', 0))
63 65 ports.append(sock)
@@ -65,6 +67,13 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
65 67 port = sock.getsockname()[1]
66 68 sock.close()
67 69 ports[i] = port
70 else:
71 N = 1
72 for i in range(ports_needed):
73 while os.path.exists("%s-%s" % (ip, str(N))):
74 N += 1
75 ports.append(N)
76 N += 1
68 77 if shell_port <= 0:
69 78 shell_port = ports.pop(0)
70 79 if iopub_port <= 0:
@@ -81,6 +90,7 b' def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, '
81 90 )
82 91 cfg['ip'] = ip
83 92 cfg['key'] = bytes_to_str(key)
93 cfg['transport'] = transport
84 94
85 95 with open(fname, 'w') as f:
86 96 f.write(json.dumps(cfg, indent=2))
@@ -12,6 +12,7 b''
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 import os
15 16 import socket
16 17 import sys
17 18 from threading import Thread
@@ -28,21 +29,28 b' from IPython.utils.localinterfaces import LOCALHOST'
28 29 class Heartbeat(Thread):
29 30 "A simple ping-pong style heartbeat that runs in a thread."
30 31
31 def __init__(self, context, addr=(LOCALHOST, 0)):
32 def __init__(self, context, addr=('tcp', LOCALHOST, 0)):
32 33 Thread.__init__(self)
33 34 self.context = context
34 self.ip, self.port = addr
35 self.transport, self.ip, self.port = addr
35 36 if self.port == 0:
37 if addr[0] == 'tcp':
36 38 s = socket.socket()
37 39 # '*' means all interfaces to 0MQ, which is '' to socket.socket
38 40 s.bind(('' if self.ip == '*' else self.ip, 0))
39 41 self.port = s.getsockname()[1]
40 42 s.close()
43 elif addr[0] == 'ipc':
44 while os.path.exists(self.ip + '-' + self.port):
45 self.port = self.port + 1
46 else:
47 raise ValueError("Unrecognized zmq transport: %s" % addr[0])
41 48 self.addr = (self.ip, self.port)
42 49 self.daemon = True
43 50
44 51 def run(self):
45 52 self.socket = self.context.socket(zmq.REP)
46 self.socket.bind('tcp://%s:%i' % self.addr)
53 c = ':' if self.transport == 'tcp' else '-'
54 self.socket.bind('%s://%s' % (self.transport, self.ip) + c + str(self.port))
47 55 zmq.device(zmq.FORWARDER, self.socket, self.socket)
48 56
@@ -35,8 +35,10 b' from IPython.utils import io'
35 35 from IPython.utils.localinterfaces import LOCALHOST
36 36 from IPython.utils.path import filefind
37 37 from IPython.utils.py3compat import str_to_bytes
38 from IPython.utils.traitlets import (Any, Instance, Dict, Unicode, Integer, Bool,
39 DottedObjectName)
38 from IPython.utils.traitlets import (
39 Any, Instance, Dict, Unicode, Integer, Bool, CaselessStrEnum,
40 DottedObjectName,
41 )
40 42 from IPython.utils.importstring import import_item
41 43 # local imports
42 44 from IPython.zmq.entry_point import write_connection_file
@@ -109,6 +111,7 b' class KernelApp(BaseIPythonApplication):'
109 111 self.config_file_specified = False
110 112
111 113 # connection info:
114 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True)
112 115 ip = Unicode(LOCALHOST, config=True,
113 116 help="Set the IP or interface on which the kernel will listen.")
114 117 hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]")
@@ -154,11 +157,12 b' class KernelApp(BaseIPythonApplication):'
154 157 self.poller = ParentPollerUnix()
155 158
156 159 def _bind_socket(self, s, port):
157 iface = 'tcp://%s' % self.ip
158 if port <= 0:
160 iface = '%s://%s' % (self.transport, self.ip)
161 if port <= 0 and self.transport == 'tcp':
159 162 port = s.bind_to_random_port(iface)
160 163 else:
161 s.bind(iface + ':%i'%port)
164 c = ':' if self.transport == 'tcp' else '-'
165 s.bind(iface + c + str(port))
162 166 return port
163 167
164 168 def load_connection_file(self):
@@ -174,6 +178,7 b' class KernelApp(BaseIPythonApplication):'
174 178 with open(fname) as f:
175 179 s = f.read()
176 180 cfg = json.loads(s)
181 self.transport = cfg.get('transport', self.transport)
177 182 if self.ip == LOCALHOST and 'ip' in cfg:
178 183 # not overridden by config or cl_args
179 184 self.ip = cfg['ip']
@@ -191,7 +196,7 b' class KernelApp(BaseIPythonApplication):'
191 196 cf = os.path.join(self.profile_dir.security_dir, self.connection_file)
192 197 else:
193 198 cf = self.connection_file
194 write_connection_file(cf, ip=self.ip, key=self.session.key,
199 write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport,
195 200 shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port,
196 201 iopub_port=self.iopub_port)
197 202
@@ -205,6 +210,19 b' class KernelApp(BaseIPythonApplication):'
205 210 except (IOError, OSError):
206 211 pass
207 212
213 self._cleanup_ipc_files()
214
215 def _cleanup_ipc_files(self):
216 """cleanup ipc files if we wrote them"""
217 if self.transport != 'ipc':
218 return
219 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
220 ipcfile = "%s-%i" % (self.ip, port)
221 try:
222 os.remove(ipcfile)
223 except (IOError, OSError):
224 pass
225
208 226 def init_connection_file(self):
209 227 if not self.connection_file:
210 228 self.connection_file = "kernel-%s.json"%os.getpid()
@@ -238,7 +256,7 b' class KernelApp(BaseIPythonApplication):'
238 256 # heartbeat doesn't share context, because it mustn't be blocked
239 257 # by the GIL, which is accessed by libzmq when freeing zero-copy messages
240 258 hb_ctx = zmq.Context()
241 self.heartbeat = Heartbeat(hb_ctx, (self.ip, self.hb_port))
259 self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port))
242 260 self.hb_port = self.heartbeat.port
243 261 self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port)
244 262 self.heartbeat.start()
@@ -37,7 +37,7 b' from zmq.eventloop import ioloop, zmqstream'
37 37 from IPython.config.loader import Config
38 38 from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS
39 39 from IPython.utils.traitlets import (
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool
40 HasTraits, Any, Instance, Type, Unicode, Integer, Bool, CaselessStrEnum
41 41 )
42 42 from IPython.utils.py3compat import str_to_bytes
43 43 from IPython.zmq.entry_point import write_connection_file
@@ -103,7 +103,7 b' class ZMQSocketChannel(Thread):'
103 103 The ZMQ context to use.
104 104 session : :class:`session.Session`
105 105 The session to use.
106 address : tuple
106 address : zmq url
107 107 Standard (ip, port) tuple that the kernel is listening on.
108 108 """
109 109 super(ZMQSocketChannel, self).__init__()
@@ -111,9 +111,11 b' class ZMQSocketChannel(Thread):'
111 111
112 112 self.context = context
113 113 self.session = session
114 if isinstance(address, tuple):
114 115 if address[1] == 0:
115 116 message = 'The port number for a channel cannot be 0.'
116 117 raise InvalidPortNumber(message)
118 address = "tcp://%s:%i" % address
117 119 self._address = address
118 120 atexit.register(self._notice_exit)
119 121
@@ -149,10 +151,7 b' class ZMQSocketChannel(Thread):'
149 151
150 152 @property
151 153 def address(self):
152 """Get the channel's address as an (ip, port) tuple.
153
154 By the default, the address is (localhost, 0), where 0 means a random
155 port.
154 """Get the channel's address as a zmq url string ('tcp://127.0.0.1:5555').
156 155 """
157 156 return self._address
158 157
@@ -196,7 +195,7 b' class ShellSocketChannel(ZMQSocketChannel):'
196 195 """The thread's main activity. Call start() instead."""
197 196 self.socket = self.context.socket(zmq.DEALER)
198 197 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
199 self.socket.connect('tcp://%s:%i' % self.address)
198 self.socket.connect(self.address)
200 199 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
201 200 self.stream.on_recv(self._handle_recv)
202 201 self._run_loop()
@@ -396,7 +395,7 b' class SubSocketChannel(ZMQSocketChannel):'
396 395 self.socket = self.context.socket(zmq.SUB)
397 396 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
398 397 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
399 self.socket.connect('tcp://%s:%i' % self.address)
398 self.socket.connect(self.address)
400 399 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
401 400 self.stream.on_recv(self._handle_recv)
402 401 self._run_loop()
@@ -462,7 +461,7 b' class StdInSocketChannel(ZMQSocketChannel):'
462 461 """The thread's main activity. Call start() instead."""
463 462 self.socket = self.context.socket(zmq.DEALER)
464 463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
465 self.socket.connect('tcp://%s:%i' % self.address)
464 self.socket.connect(self.address)
466 465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
467 466 self.stream.on_recv(self._handle_recv)
468 467 self._run_loop()
@@ -521,7 +520,7 b' class HBSocketChannel(ZMQSocketChannel):'
521 520 self.socket.close()
522 521 self.socket = self.context.socket(zmq.REQ)
523 522 self.socket.setsockopt(zmq.LINGER, 0)
524 self.socket.connect('tcp://%s:%i' % self.address)
523 self.socket.connect(self.address)
525 524
526 525 self.poller.register(self.socket, zmq.POLLIN)
527 526
@@ -660,6 +659,10 b' class KernelManager(HasTraits):'
660 659
661 660 # The addresses for the communication channels.
662 661 connection_file = Unicode('')
662
663 transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp')
664
665
663 666 ip = Unicode(LOCALHOST)
664 667 def _ip_changed(self, name, old, new):
665 668 if new == '*':
@@ -748,7 +751,20 b' class KernelManager(HasTraits):'
748 751 self._connection_file_written = False
749 752 try:
750 753 os.remove(self.connection_file)
751 except OSError:
754 except (IOError, OSError):
755 pass
756
757 self._cleanup_ipc_files()
758
759 def _cleanup_ipc_files(self):
760 """cleanup ipc files if we wrote them"""
761 if self.transport != 'ipc':
762 return
763 for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port):
764 ipcfile = "%s-%i" % (self.ip, port)
765 try:
766 os.remove(ipcfile)
767 except (IOError, OSError):
752 768 pass
753 769
754 770 def load_connection_file(self):
@@ -756,6 +772,9 b' class KernelManager(HasTraits):'
756 772 with open(self.connection_file) as f:
757 773 cfg = json.loads(f.read())
758 774
775 from pprint import pprint
776 pprint(cfg)
777 self.transport = cfg.get('transport', 'tcp')
759 778 self.ip = cfg['ip']
760 779 self.shell_port = cfg['shell_port']
761 780 self.stdin_port = cfg['stdin_port']
@@ -768,7 +787,7 b' class KernelManager(HasTraits):'
768 787 if self._connection_file_written:
769 788 return
770 789 self.connection_file,cfg = write_connection_file(self.connection_file,
771 ip=self.ip, key=self.session.key,
790 transport=self.transport, ip=self.ip, key=self.session.key,
772 791 stdin_port=self.stdin_port, iopub_port=self.iopub_port,
773 792 shell_port=self.shell_port, hb_port=self.hb_port)
774 793 # write_connection_file also sets default ports:
@@ -795,7 +814,7 b' class KernelManager(HasTraits):'
795 814 **kw : optional
796 815 See respective options for IPython and Python kernels.
797 816 """
798 if self.ip not in LOCAL_IPS:
817 if self.transport == 'tcp' and self.ip not in LOCAL_IPS:
799 818 raise RuntimeError("Can only launch a kernel on a local interface. "
800 819 "Make sure that the '*_address' attributes are "
801 820 "configured properly. "
@@ -974,13 +993,21 b' class KernelManager(HasTraits):'
974 993 # Channels used for communication with the kernel:
975 994 #--------------------------------------------------------------------------
976 995
996 def _make_url(self, port):
997 """make a zmq url with a port"""
998 if self.transport == 'tcp':
999 return "tcp://%s:%i" % (self.ip, port)
1000 else:
1001 return "%s://%s-%s" % (self.transport, self.ip, port)
1002
977 1003 @property
978 1004 def shell_channel(self):
979 1005 """Get the REQ socket channel object to make requests of the kernel."""
980 1006 if self._shell_channel is None:
981 1007 self._shell_channel = self.shell_channel_class(self.context,
982 1008 self.session,
983 (self.ip, self.shell_port))
1009 self._make_url(self.shell_port),
1010 )
984 1011 return self._shell_channel
985 1012
986 1013 @property
@@ -989,7 +1016,8 b' class KernelManager(HasTraits):'
989 1016 if self._sub_channel is None:
990 1017 self._sub_channel = self.sub_channel_class(self.context,
991 1018 self.session,
992 (self.ip, self.iopub_port))
1019 self._make_url(self.iopub_port),
1020 )
993 1021 return self._sub_channel
994 1022
995 1023 @property
@@ -998,7 +1026,8 b' class KernelManager(HasTraits):'
998 1026 if self._stdin_channel is None:
999 1027 self._stdin_channel = self.stdin_channel_class(self.context,
1000 1028 self.session,
1001 (self.ip, self.stdin_port))
1029 self._make_url(self.stdin_port),
1030 )
1002 1031 return self._stdin_channel
1003 1032
1004 1033 @property
@@ -1008,5 +1037,6 b' class KernelManager(HasTraits):'
1008 1037 if self._hb_channel is None:
1009 1038 self._hb_channel = self.hb_channel_class(self.context,
1010 1039 self.session,
1011 (self.ip, self.hb_port))
1040 self._make_url(self.hb_port),
1041 )
1012 1042 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now