##// END OF EJS Templates
Merge pull request #685 from minrk/enginessh...
Fernando Perez -
r4590:52dffc06 merge
parent child Browse files
Show More
@@ -110,6 +110,22 b' def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramik'
110 selected local port of the tunnel.
110 selected local port of the tunnel.
111
111
112 """
112 """
113 new_url, tunnel = open_tunnel(addr, server, keyfile=keyfile, password=password, paramiko=paramiko)
114 socket.connect(new_url)
115 return tunnel
116
117
118 def open_tunnel(addr, server, keyfile=None, password=None, paramiko=None):
119 """Open a tunneled connection from a 0MQ url.
120
121 For use inside tunnel_connection.
122
123 Returns
124 -------
125
126 (url, tunnel): The 0MQ url that has been forwarded, and the tunnel object
127 """
128
113 lport = select_random_ports(1)[0]
129 lport = select_random_ports(1)[0]
114 transport, addr = addr.split('://')
130 transport, addr = addr.split('://')
115 ip,rport = addr.split(':')
131 ip,rport = addr.split(':')
@@ -121,8 +137,7 b' def tunnel_connection(socket, addr, server, keyfile=None, password=None, paramik'
121 else:
137 else:
122 tunnelf = openssh_tunnel
138 tunnelf = openssh_tunnel
123 tunnel = tunnelf(lport, rport, server, remoteip=ip, keyfile=keyfile, password=password)
139 tunnel = tunnelf(lport, rport, server, remoteip=ip, keyfile=keyfile, password=password)
124 socket.connect('tcp://127.0.0.1:%i'%lport)
140 return 'tcp://127.0.0.1:%i'%lport, tunnel
125 return tunnel
126
141
127 def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, password=None, timeout=15):
142 def openssh_tunnel(lport, rport, server, remoteip='127.0.0.1', keyfile=None, password=None, timeout=15):
128 """Create an ssh tunnel using command-line ssh that connects port lport
143 """Create an ssh tunnel using command-line ssh that connects port lport
@@ -116,6 +116,7 b" flags.update(boolean_flag('secure', 'IPControllerApp.secure',"
116 aliases = dict(
116 aliases = dict(
117 secure = 'IPControllerApp.secure',
117 secure = 'IPControllerApp.secure',
118 ssh = 'IPControllerApp.ssh_server',
118 ssh = 'IPControllerApp.ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
119 location = 'IPControllerApp.location',
120 location = 'IPControllerApp.location',
120
121
121 ident = 'Session.session',
122 ident = 'Session.session',
@@ -158,6 +159,11 b' class IPControllerApp(BaseParallelApplication):'
158 processes. It should be of the form: [user@]server[:port]. The
159 processes. It should be of the form: [user@]server[:port]. The
159 Controller's listening addresses must be accessible from the ssh server""",
160 Controller's listening addresses must be accessible from the ssh server""",
160 )
161 )
162 engine_ssh_server = Unicode(u'', config=True,
163 help="""ssh url for engines to use when connecting to the Controller
164 processes. It should be of the form: [user@]server[:port]. The
165 Controller's listening addresses must be accessible from the ssh server""",
166 )
161 location = Unicode(u'', config=True,
167 location = Unicode(u'', config=True,
162 help="""The external IP or domain name of the Controller, used for disambiguating
168 help="""The external IP or domain name of the Controller, used for disambiguating
163 engine and client connections.""",
169 engine and client connections.""",
@@ -218,6 +224,8 b' class IPControllerApp(BaseParallelApplication):'
218 c.HubFactory.engine_ip = ip
224 c.HubFactory.engine_ip = ip
219 c.HubFactory.regport = int(ports)
225 c.HubFactory.regport = int(ports)
220 self.location = cfg['location']
226 self.location = cfg['location']
227 if not self.engine_ssh_server:
228 self.engine_ssh_server = cfg['ssh']
221 # load client config
229 # load client config
222 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
223 cfg = json.loads(f.read())
231 cfg = json.loads(f.read())
@@ -226,7 +234,8 b' class IPControllerApp(BaseParallelApplication):'
226 c.HubFactory.client_transport = xport
234 c.HubFactory.client_transport = xport
227 ip,ports = addr.split(':')
235 ip,ports = addr.split(':')
228 c.HubFactory.client_ip = ip
236 c.HubFactory.client_ip = ip
229 self.ssh_server = cfg['ssh']
237 if not self.ssh_server:
238 self.ssh_server = cfg['ssh']
230 assert int(ports) == c.HubFactory.regport, "regport mismatch"
239 assert int(ports) == c.HubFactory.regport, "regport mismatch"
231
240
232 def init_hub(self):
241 def init_hub(self):
@@ -271,6 +280,7 b' class IPControllerApp(BaseParallelApplication):'
271 self.save_connection_dict('ipcontroller-client.json', cdict)
280 self.save_connection_dict('ipcontroller-client.json', cdict)
272 edict = cdict
281 edict = cdict
273 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
282 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['ssh'] = self.engine_ssh_server
274 self.save_connection_dict('ipcontroller-engine.json', edict)
284 self.save_connection_dict('ipcontroller-engine.json', edict)
275
285
276 #
286 #
@@ -118,6 +118,8 b' aliases = dict('
118 keyfile = 'Session.keyfile',
118 keyfile = 'Session.keyfile',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
121 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
122 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
123 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
@@ -192,6 +194,40 b' class IPEngineApp(BaseParallelApplication):'
192 self.profile_dir.security_dir,
194 self.profile_dir.security_dir,
193 self.url_file_name
195 self.url_file_name
194 )
196 )
197
198 def load_connector_file(self):
199 """load config from a JSON connector file,
200 at a *lower* priority than command-line/config files.
201 """
202
203 self.log.info("Loading url_file %r"%self.url_file)
204 config = self.config
205
206 with open(self.url_file) as f:
207 d = json.loads(f.read())
208
209 try:
210 config.Session.key
211 except AttributeError:
212 if d['exec_key']:
213 config.Session.key = asbytes(d['exec_key'])
214
215 try:
216 config.EngineFactory.location
217 except AttributeError:
218 config.EngineFactory.location = d['location']
219
220 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 try:
222 config.EngineFactory.url
223 except AttributeError:
224 config.EngineFactory.url = d['url']
225
226 try:
227 config.EngineFactory.sshserver
228 except AttributeError:
229 config.EngineFactory.sshserver = d['ssh']
230
195 def init_engine(self):
231 def init_engine(self):
196 # This is the working dir by now.
232 # This is the working dir by now.
197 sys.path.insert(0, '')
233 sys.path.insert(0, '')
@@ -219,14 +255,7 b' class IPEngineApp(BaseParallelApplication):'
219 time.sleep(0.1)
255 time.sleep(0.1)
220
256
221 if os.path.exists(self.url_file):
257 if os.path.exists(self.url_file):
222 self.log.info("Loading url_file %r"%self.url_file)
258 self.load_connector_file()
223 with open(self.url_file) as f:
224 d = json.loads(f.read())
225 if d['exec_key']:
226 config.Session.key = asbytes(d['exec_key'])
227 d['url'] = disambiguate_url(d['url'], d['location'])
228 config.EngineFactory.url = d['url']
229 config.EngineFactory.location = d['location']
230 elif not url_specified:
259 elif not url_specified:
231 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
260 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
232 self.exit(1)
261 self.exit(1)
@@ -253,7 +282,7 b' class IPEngineApp(BaseParallelApplication):'
253 except:
282 except:
254 self.log.error("Couldn't start the Engine", exc_info=True)
283 self.log.error("Couldn't start the Engine", exc_info=True)
255 self.exit(1)
284 self.exit(1)
256
285
257 def forward_logging(self):
286 def forward_logging(self):
258 if self.log_url:
287 if self.log_url:
259 self.log.info("Forwarding logging to %s"%self.log_url)
288 self.log.info("Forwarding logging to %s"%self.log_url)
@@ -265,7 +294,7 b' class IPEngineApp(BaseParallelApplication):'
265 handler.setLevel(self.log_level)
294 handler.setLevel(self.log_level)
266 self.log.addHandler(handler)
295 self.log.addHandler(handler)
267 self._log_handler = handler
296 self._log_handler = handler
268 #
297
269 def init_mpi(self):
298 def init_mpi(self):
270 global mpi
299 global mpi
271 self.mpi = MPI(config=self.config)
300 self.mpi = MPI(config=self.config)
@@ -56,7 +56,7 b' from zmq.eventloop import ioloop'
56 from IPython.config.application import Application
56 from IPython.config.application import Application
57 from IPython.config.configurable import LoggingConfigurable
57 from IPython.config.configurable import LoggingConfigurable
58 from IPython.utils.text import EvalFormatter
58 from IPython.utils.text import EvalFormatter
59 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
59 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.path import get_ipython_module_path
60 from IPython.utils.path import get_ipython_module_path
61 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
61 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
62
62
@@ -364,6 +364,12 b' class LocalEngineSetLauncher(BaseLauncher):'
364 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
364 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
365 help="command-line arguments to pass to ipengine"
365 help="command-line arguments to pass to ipengine"
366 )
366 )
367 delay = CFloat(0.1, config=True,
368 help="""delay (in seconds) between starting each engine after the first.
369 This can help force the engines to get their ids in order, or limit
370 process flood when starting many engines."""
371 )
372
367 # launcher class
373 # launcher class
368 launcher_class = LocalEngineLauncher
374 launcher_class = LocalEngineLauncher
369
375
@@ -381,6 +387,8 b' class LocalEngineSetLauncher(BaseLauncher):'
381 self.profile_dir = unicode(profile_dir)
387 self.profile_dir = unicode(profile_dir)
382 dlist = []
388 dlist = []
383 for i in range(n):
389 for i in range(n):
390 if i > 0:
391 time.sleep(self.delay)
384 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
385 # Copy the engine args over to each engine launcher.
393 # Copy the engine args over to each engine launcher.
386 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
@@ -603,6 +611,8 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
603 else:
611 else:
604 user=None
612 user=None
605 for i in range(n):
613 for i in range(n):
614 if i > 0:
615 time.sleep(self.delay)
606 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
616 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
607
617
608 # Copy the engine args over to each engine launcher.
618 # Copy the engine args over to each engine launcher.
@@ -171,7 +171,7 b' class Client(HasTraits):'
171 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
171 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
172 If keyfile or password is specified, and this is not, it will default to
172 If keyfile or password is specified, and this is not, it will default to
173 the ip given in addr.
173 the ip given in addr.
174 sshkey : str; path to public ssh key file
174 sshkey : str; path to ssh private key file
175 This specifies a key to be used in ssh login, default None.
175 This specifies a key to be used in ssh login, default None.
176 Regular default ssh keys will be used without specifying this argument.
176 Regular default ssh keys will be used without specifying this argument.
177 password : str
177 password : str
@@ -17,12 +17,16 b' from __future__ import print_function'
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from getpass import getpass
20
21
21 import zmq
22 import zmq
22 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
23
24
25 from IPython.external.ssh import tunnel
24 # internal
26 # internal
25 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
27 from IPython.utils.traitlets import (
28 Instance, Dict, Int, Type, CFloat, Unicode, CBytes, Bool
29 )
26 # from IPython.utils.localinterfaces import LOCALHOST
30 # from IPython.utils.localinterfaces import LOCALHOST
27
31
28 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
@@ -50,6 +54,12 b' class EngineFactory(RegistrationFactory):'
50 timeout=CFloat(2,config=True,
54 timeout=CFloat(2,config=True,
51 help="""The time (in seconds) to wait for the Controller to respond
55 help="""The time (in seconds) to wait for the Controller to respond
52 to registration requests before giving up.""")
56 to registration requests before giving up.""")
57 sshserver=Unicode(config=True,
58 help="""The SSH server to use for tunneling connections to the Controller.""")
59 sshkey=Unicode(config=True,
60 help="""The SSH private key file to use when tunneling connections to the Controller.""")
61 paramiko=Bool(sys.platform == 'win32', config=True,
62 help="""Whether to use paramiko instead of openssh for tunnels.""")
53
63
54 # not configurable:
64 # not configurable:
55 user_ns=Dict()
65 user_ns=Dict()
@@ -61,28 +71,70 b' class EngineFactory(RegistrationFactory):'
61 ident = Unicode()
71 ident = Unicode()
62 def _ident_changed(self, name, old, new):
72 def _ident_changed(self, name, old, new):
63 self.bident = asbytes(new)
73 self.bident = asbytes(new)
74 using_ssh=Bool(False)
64
75
65
76
66 def __init__(self, **kwargs):
77 def __init__(self, **kwargs):
67 super(EngineFactory, self).__init__(**kwargs)
78 super(EngineFactory, self).__init__(**kwargs)
68 self.ident = self.session.session
79 self.ident = self.session.session
69 ctx = self.context
80
81 def init_connector(self):
82 """construct connection function, which handles tunnels."""
83 self.using_ssh = bool(self.sshkey or self.sshserver)
70
84
71 reg = ctx.socket(zmq.XREQ)
85 if self.sshkey and not self.sshserver:
72 reg.setsockopt(zmq.IDENTITY, self.bident)
86 # We are using ssh directly to the controller, tunneling localhost to localhost
73 reg.connect(self.url)
87 self.sshserver = self.url.split('://')[1].split(':')[0]
74 self.registrar = zmqstream.ZMQStream(reg, self.loop)
88
89 if self.using_ssh:
90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 password=False
92 else:
93 password = getpass("SSH Password for %s: "%self.sshserver)
94 else:
95 password = False
96
97 def connect(s, url):
98 url = disambiguate_url(url, self.location)
99 if self.using_ssh:
100 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
101 return tunnel.tunnel_connection(s, url, self.sshserver,
102 keyfile=self.sshkey, paramiko=self.paramiko,
103 password=password,
104 )
105 else:
106 return s.connect(url)
107
108 def maybe_tunnel(url):
109 """like connect, but don't complete the connection (for use by heartbeat)"""
110 url = disambiguate_url(url, self.location)
111 if self.using_ssh:
112 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 keyfile=self.sshkey, paramiko=self.paramiko,
115 password=password,
116 )
117 return url
118 return connect, maybe_tunnel
75
119
76 def register(self):
120 def register(self):
77 """send the registration_request"""
121 """send the registration_request"""
78
122
79 self.log.info("Registering with controller at %s"%self.url)
123 self.log.info("Registering with controller at %s"%self.url)
124 ctx = self.context
125 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.XREQ)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
128 connect(reg, self.url)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130
131
80 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
81 self.registrar.on_recv(self.complete_registration)
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
82 # print (self.session.key)
134 # print (self.session.key)
83 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request",content=content)
84
136
85 def complete_registration(self, msg):
137 def complete_registration(self, msg, connect, maybe_tunnel):
86 # print msg
138 # print msg
87 self._abort_dc.stop()
139 self._abort_dc.stop()
88 ctx = self.context
140 ctx = self.context
@@ -94,6 +146,14 b' class EngineFactory(RegistrationFactory):'
94 if msg.content.status == 'ok':
146 if msg.content.status == 'ok':
95 self.id = int(msg.content.id)
147 self.id = int(msg.content.id)
96
148
149 # launch heartbeat
150 hb_addrs = msg.content.heartbeat
151
152 # possibly forward hb ports with tunnels
153 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
154 heart = Heart(*map(str, hb_addrs), heart_id=identity)
155 heart.start()
156
97 # create Shell Streams (MUX, Task, etc.):
157 # create Shell Streams (MUX, Task, etc.):
98 queue_addr = msg.content.mux
158 queue_addr = msg.content.mux
99 shell_addrs = [ str(queue_addr) ]
159 shell_addrs = [ str(queue_addr) ]
@@ -114,24 +174,20 b' class EngineFactory(RegistrationFactory):'
114 stream.setsockopt(zmq.IDENTITY, identity)
174 stream.setsockopt(zmq.IDENTITY, identity)
115 shell_streams = [stream]
175 shell_streams = [stream]
116 for addr in shell_addrs:
176 for addr in shell_addrs:
117 stream.connect(disambiguate_url(addr, self.location))
177 connect(stream, addr)
118 # end single stream-socket
178 # end single stream-socket
119
179
120 # control stream:
180 # control stream:
121 control_addr = str(msg.content.control)
181 control_addr = str(msg.content.control)
122 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
182 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
123 control_stream.setsockopt(zmq.IDENTITY, identity)
183 control_stream.setsockopt(zmq.IDENTITY, identity)
124 control_stream.connect(disambiguate_url(control_addr, self.location))
184 connect(control_stream, control_addr)
125
185
126 # create iopub stream:
186 # create iopub stream:
127 iopub_addr = msg.content.iopub
187 iopub_addr = msg.content.iopub
128 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
188 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
129 iopub_stream.setsockopt(zmq.IDENTITY, identity)
189 iopub_stream.setsockopt(zmq.IDENTITY, identity)
130 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
190 connect(iopub_stream, iopub_addr)
131
132 # launch heartbeat
133 hb_addrs = msg.content.heartbeat
134 # print (hb_addrs)
135
191
136 # # Redirect input streams and set a display hook.
192 # # Redirect input streams and set a display hook.
137 if self.out_stream_factory:
193 if self.out_stream_factory:
@@ -147,9 +203,6 b' class EngineFactory(RegistrationFactory):'
147 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
203 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
148 loop=loop, user_ns = self.user_ns, log=self.log)
204 loop=loop, user_ns = self.user_ns, log=self.log)
149 self.kernel.start()
205 self.kernel.start()
150 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
151 heart = Heart(*map(str, hb_addrs), heart_id=identity)
152 heart.start()
153
206
154
207
155 else:
208 else:
@@ -484,6 +484,49 b' The ``file`` flag works like this::'
484 (:file:`~/.ipython/profile_<name>/security` is the same on all of them), then things
484 (:file:`~/.ipython/profile_<name>/security` is the same on all of them), then things
485 will just work!
485 will just work!
486
486
487 SSH Tunnels
488 ***********
489
490 If your engines are not on the same LAN as the controller, or you are on a highly
491 restricted network where your nodes cannot see each others ports, then you can
492 use SSH tunnels to connect engines to the controller.
493
494 .. note::
495
496 This does not work in all cases. Manual tunnels may be an option, but are
497 highly inconvenient. Support for manual tunnels will be improved.
498
499 You can instruct all engines to use ssh, by specifying the ssh server in
500 :file:`ipcontroller-engine.json`:
501
502 .. I know this is really JSON, but the example is a subset of Python:
503 .. sourcecode:: python
504
505 {
506 "url":"tcp://192.168.1.123:56951",
507 "exec_key":"26f4c040-587d-4a4e-b58b-030b96399584",
508 "ssh":"user@example.com",
509 "location":"192.168.1.123"
510 }
511
512 This will be specified if you give the ``--enginessh=use@example.com`` argument when
513 starting :command:`ipcontroller`.
514
515 Or you can specify an ssh server on the command-line when starting an engine::
516
517 $> ipengine --profile=foo --ssh=my.login.node
518
519 For example, if your system is totally restricted, then all connections will actually be
520 loopback, and ssh tunnels will be used to connect engines to the controller::
521
522 [node1] $> ipcontroller --enginessh=node1
523 [node2] $> ipengine
524 [node3] $> ipcluster engines --n=4
525
526 Or if you want to start many engines on each node, the command `ipcluster engines --n=4`
527 without any configuration is equivalent to running ipengine 4 times.
528
529
487 Make JSON files persistent
530 Make JSON files persistent
488 --------------------------
531 --------------------------
489
532
@@ -105,9 +105,6 b' use OpenSSH or Paramiko, or the tunneling utilities are insufficient, then they '
105 construct the tunnels themselves, and simply connect clients and engines as if the
105 construct the tunnels themselves, and simply connect clients and engines as if the
106 controller were on loopback on the connecting machine.
106 controller were on loopback on the connecting machine.
107
107
108 .. note::
109
110 There is not currently tunneling available for engines.
111
108
112 Authentication
109 Authentication
113 --------------
110 --------------
General Comments 0
You need to be logged in to leave comments. Login now