##// END OF EJS Templates
use individual ports, rather than full urls in connection files
MinRK -
Show More
@@ -211,19 +211,15 b' class IPControllerApp(BaseParallelApplication):'
211 211 c = self.config
212 212 url = cdict['registration']
213 213 location = cdict['location']
214
214 215 if not location:
215 216 try:
216 proto,ip,port = split_url(url)
217 except AssertionError:
218 pass
219 else:
220 try:
221 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
222 except (socket.gaierror, IndexError):
223 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
224 " You may need to specify '--location=<external_ip_address>' to help"
225 " IPython decide when to connect via loopback.")
226 location = '127.0.0.1'
217 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
218 except (socket.gaierror, IndexError):
219 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
220 " You may need to specify '--location=<external_ip_address>' to help"
221 " IPython decide when to connect via loopback.")
222 location = '127.0.0.1'
227 223 cdict['location'] = location
228 224 fname = os.path.join(self.profile_dir.security_dir, fname)
229 225 self.log.info("writing connection info to %s", fname)
@@ -235,35 +231,51 b' class IPControllerApp(BaseParallelApplication):'
235 231 """load config from existing json connector files."""
236 232 c = self.config
237 233 self.log.debug("loading config from JSON")
238 # load from engine config
234
235 # load engine config
236
239 237 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
240 238 self.log.info("loading connection info from %s", fname)
241 239 with open(fname) as f:
242 cfg = json.loads(f.read())
243 key = cfg['exec_key']
240 ecfg = json.loads(f.read())
241
244 242 # json gives unicode, Session.key wants bytes
245 c.Session.key = key.encode('ascii')
246 xport,addr = cfg['url'].split('://')
247 c.HubFactory.engine_transport = xport
248 ip,ports = addr.split(':')
243 c.Session.key = ecfg['exec_key'].encode('ascii')
244
245 xport,ip = ecfg['interface'].split('://')
246
249 247 c.HubFactory.engine_ip = ip
250 c.HubFactory.regport = int(ports)
251 self.location = cfg['location']
248 c.HubFactory.engine_transport = xport
249
250 self.location = ecfg['location']
252 251 if not self.engine_ssh_server:
253 self.engine_ssh_server = cfg['ssh']
252 self.engine_ssh_server = ecfg['ssh']
253
254 254 # load client config
255
255 256 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
256 257 self.log.info("loading connection info from %s", fname)
257 258 with open(fname) as f:
258 cfg = json.loads(f.read())
259 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
260 xport,addr = cfg['url'].split('://')
259 ccfg = json.loads(f.read())
260
261 for key in ('exec_key', 'registration', 'pack', 'unpack'):
262 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
263
264 xport,addr = ccfg['interface'].split('://')
265
261 266 c.HubFactory.client_transport = xport
262 ip,ports = addr.split(':')
263 267 c.HubFactory.client_ip = ip
264 268 if not self.ssh_server:
265 self.ssh_server = cfg['ssh']
266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
269 self.ssh_server = ccfg['ssh']
270
271 # load port config:
272 c.HubFactory.regport = ecfg['registration']
273 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
274 c.HubFactory.control = (ccfg['control'], ecfg['control'])
275 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
276 c.HubFactory.task = (ccfg['task'], ecfg['task'])
277 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
278 c.HubFactory.notifier_port = ccfg['notification']
267 279
268 280 def cleanup_connection_files(self):
269 281 if self.reuse_files:
@@ -335,14 +347,14 b' class IPControllerApp(BaseParallelApplication):'
335 347 children = self.children
336 348 mq = import_item(str(self.mq_class))
337 349
338 hub = self.factory
350 f = self.factory
339 351 # disambiguate url, in case of *
340 monitor_url = disambiguate_url(hub.monitor_url)
352 monitor_url = disambiguate_url(f.monitor_url)
341 353 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
342 354 # IOPub relay (in a Process)
343 355 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
344 q.bind_in(hub.client_info['iopub'])
345 q.bind_out(hub.engine_info['iopub'])
356 q.bind_in(f.client_url('iopub'))
357 q.bind_out(f.engine_url('iopub'))
346 358 q.setsockopt_out(zmq.SUBSCRIBE, b'')
347 359 q.connect_mon(monitor_url)
348 360 q.daemon=True
@@ -350,18 +362,18 b' class IPControllerApp(BaseParallelApplication):'
350 362
351 363 # Multiplexer Queue (in a Process)
352 364 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
353 q.bind_in(hub.client_info['mux'])
365 q.bind_in(f.client_url('mux'))
354 366 q.setsockopt_in(zmq.IDENTITY, b'mux')
355 q.bind_out(hub.engine_info['mux'])
367 q.bind_out(f.engine_url('mux'))
356 368 q.connect_mon(monitor_url)
357 369 q.daemon=True
358 370 children.append(q)
359 371
360 372 # Control Queue (in a Process)
361 373 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
362 q.bind_in(hub.client_info['control'])
374 q.bind_in(f.client_url('control'))
363 375 q.setsockopt_in(zmq.IDENTITY, b'control')
364 q.bind_out(hub.engine_info['control'])
376 q.bind_out(f.engine_url('control'))
365 377 q.connect_mon(monitor_url)
366 378 q.daemon=True
367 379 children.append(q)
@@ -374,9 +386,9 b' class IPControllerApp(BaseParallelApplication):'
374 386 self.log.warn("task::using pure DEALER Task scheduler")
375 387 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
376 388 # q.setsockopt_out(zmq.HWM, hub.hwm)
377 q.bind_in(hub.client_info['task'][1])
389 q.bind_in(f.client_url('task'))
378 390 q.setsockopt_in(zmq.IDENTITY, b'task')
379 q.bind_out(hub.engine_info['task'])
391 q.bind_out(f.engine_url('task'))
380 392 q.connect_mon(monitor_url)
381 393 q.daemon=True
382 394 children.append(q)
@@ -385,8 +397,8 b' class IPControllerApp(BaseParallelApplication):'
385 397
386 398 else:
387 399 self.log.info("task::using Python %s Task scheduler"%scheme)
388 sargs = (hub.client_info['task'], hub.engine_info['task'],
389 monitor_url, disambiguate_url(hub.client_info['notification']))
400 sargs = (f.client_url('task'), f.engine_url('task'),
401 monitor_url, disambiguate_url(f.client_url('notification')))
390 402 kwargs = dict(logname='scheduler', loglevel=self.log_level,
391 403 log_url = self.log_url, config=dict(self.config))
392 404 if 'Process' in self.mq_class:
@@ -45,7 +45,7 b' from IPython.zmq.session import ('
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url
48 from IPython.parallel.util import disambiguate_ip_address
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.py3compat import cast_bytes
@@ -225,14 +225,15 b' class IPEngineApp(BaseParallelApplication):'
225 225
226 226 location = config.EngineFactory.location
227 227
228 for key in ('registration', 'hb_ping', 'hb_pong', 'mux', 'task', 'control'):
229 d[key] = disambiguate_url(d[key], location)
228 proto, ip = d['interface'].split('://')
229 ip = disambiguate_ip_address(ip)
230 d['interface'] = '%s://%s' % (proto, ip)
230 231
231 232 # DO NOT allow override of basic URLs, serialization, or exec_key
232 233 # JSON file takes top priority there
233 config.Session.key = asbytes(d['exec_key'])
234 config.Session.key = cast_bytes(d['exec_key'])
234 235
235 config.EngineFactory.url = d['registration']
236 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
236 237
237 238 config.Session.packer = d['pack']
238 239 config.Session.unpacker = d['unpack']
@@ -403,10 +403,17 b' class Client(HasTraits):'
403 403 cfg['ssh'] = sshserver
404 404
405 405 location = cfg.setdefault('location', None)
406 for key in ('control', 'task', 'mux', 'notification', 'registration'):
407 cfg[key] = util.disambiguate_url(cfg[key], location)
406
407 proto,addr = cfg['interface'].split('://')
408 addr = util.disambiguate_ip_address(addr)
409 cfg['interface'] = "%s://%s" % (proto, addr)
410
411 # turn interface,port into full urls:
412 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
413 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
414
408 415 url = cfg['registration']
409 proto,addr,port = util.split_url(url)
416
410 417 if location is not None and addr == '127.0.0.1':
411 418 # location specified, and connection is expected to be local
412 419 if location not in LOCAL_IPS and not sshserver:
@@ -431,7 +438,7 b' class Client(HasTraits):'
431 438 self._ssh = bool(sshserver or sshkey or password)
432 439 if self._ssh and sshserver is None:
433 440 # default to ssh via localhost
434 sshserver = url.split('://')[1].split(':')[0]
441 sshserver = addr
435 442 if self._ssh and password is None:
436 443 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
437 444 password=False
@@ -449,9 +456,9 b' class Client(HasTraits):'
449 456 self._query_socket = self._context.socket(zmq.DEALER)
450 457
451 458 if self._ssh:
452 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
459 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
453 460 else:
454 self._query_socket.connect(url)
461 self._query_socket.connect(cfg['registration'])
455 462
456 463 self.session.debug = self.debug
457 464
@@ -131,29 +131,29 b' class HubFactory(RegistrationFactory):'
131 131
132 132 # port-pairs for monitoredqueues:
133 133 hb = Tuple(Integer,Integer,config=True,
134 help="""DEALER/SUB Port pair for Engine heartbeats""")
134 help="""PUB/ROUTER Port pair for Engine heartbeats""")
135 135 def _hb_default(self):
136 136 return tuple(util.select_random_ports(2))
137 137
138 138 mux = Tuple(Integer,Integer,config=True,
139 help="""Engine/Client Port pair for MUX queue""")
139 help="""Client/Engine Port pair for MUX queue""")
140 140
141 141 def _mux_default(self):
142 142 return tuple(util.select_random_ports(2))
143 143
144 144 task = Tuple(Integer,Integer,config=True,
145 help="""Engine/Client Port pair for Task queue""")
145 help="""Client/Engine Port pair for Task queue""")
146 146 def _task_default(self):
147 147 return tuple(util.select_random_ports(2))
148 148
149 149 control = Tuple(Integer,Integer,config=True,
150 help="""Engine/Client Port pair for Control queue""")
150 help="""Client/Engine Port pair for Control queue""")
151 151
152 152 def _control_default(self):
153 153 return tuple(util.select_random_ports(2))
154 154
155 155 iopub = Tuple(Integer,Integer,config=True,
156 help="""Engine/Client Port pair for IOPub relay""")
156 help="""Client/Engine Port pair for IOPub relay""")
157 157
158 158 def _iopub_default(self):
159 159 return tuple(util.select_random_ports(2))
@@ -231,10 +231,16 b' class HubFactory(RegistrationFactory):'
231 231 self.heartmonitor.start()
232 232 self.log.info("Heartmonitor started")
233 233
234 def client_url(self, channel):
235 """return full zmq url for a named client channel"""
236 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
237
238 def engine_url(self, channel):
239 """return full zmq url for a named engine channel"""
240 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
241
234 242 def init_hub(self):
235 """construct"""
236 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
237 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
243 """construct Hub object"""
238 244
239 245 ctx = self.context
240 246 loop = self.loop
@@ -247,23 +253,25 b' class HubFactory(RegistrationFactory):'
247 253
248 254 # build connection dicts
249 255 engine = self.engine_info = {
250 'registration' : engine_iface % self.regport,
251 'control' : engine_iface % self.control[1],
252 'mux' : engine_iface % self.mux[1],
253 'hb_ping' : engine_iface % self.hb[0],
254 'hb_pong' : engine_iface % self.hb[1],
255 'task' : engine_iface % self.task[1],
256 'iopub' : engine_iface % self.iopub[1],
256 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
257 'registration' : self.regport,
258 'control' : self.control[1],
259 'mux' : self.mux[1],
260 'hb_ping' : self.hb[0],
261 'hb_pong' : self.hb[1],
262 'task' : self.task[1],
263 'iopub' : self.iopub[1],
257 264 }
258 265
259 266 client = self.client_info = {
260 'registration' : client_iface % self.regport,
261 'control' : client_iface % self.control[0],
262 'mux' : client_iface % self.mux[0],
263 'task' : client_iface % self.task[0],
267 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
268 'registration' : self.regport,
269 'control' : self.control[0],
270 'mux' : self.mux[0],
271 'task' : self.task[0],
264 272 'task_scheme' : scheme,
265 'iopub' : client_iface % self.iopub[0],
266 'notification' : client_iface % self.notifier_port,
273 'iopub' : self.iopub[0],
274 'notification' : self.notifier_port,
267 275 }
268 276
269 277 self.log.debug("Hub engine addrs: %s", self.engine_info)
@@ -271,19 +279,19 b' class HubFactory(RegistrationFactory):'
271 279
272 280 # Registrar socket
273 281 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
274 q.bind(client['registration'])
275 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
282 q.bind(self.client_url('registration'))
283 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
276 284 if self.client_ip != self.engine_ip:
277 q.bind(engine['registration'])
278 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
285 q.bind(self.engine_url('registration'))
286 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
279 287
280 288 ### Engine connections ###
281 289
282 290 # heartbeat
283 291 hpub = ctx.socket(zmq.PUB)
284 hpub.bind(engine['hb_ping'])
292 hpub.bind(self.engine_url('hb_ping'))
285 293 hrep = ctx.socket(zmq.ROUTER)
286 hrep.bind(engine['hb_pong'])
294 hrep.bind(self.engine_url('hb_pong'))
287 295 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
288 296 pingstream=ZMQStream(hpub,loop),
289 297 pongstream=ZMQStream(hrep,loop)
@@ -293,7 +301,7 b' class HubFactory(RegistrationFactory):'
293 301
294 302 # Notifier socket
295 303 n = ZMQStream(ctx.socket(zmq.PUB), loop)
296 n.bind(client['notification'])
304 n.bind(self.client_url('notification'))
297 305
298 306 ### build and launch the queues ###
299 307
@@ -313,7 +321,7 b' class HubFactory(RegistrationFactory):'
313 321
314 322 # resubmit stream
315 323 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
316 url = util.disambiguate_url(self.client_info['task'])
324 url = util.disambiguate_url(self.client_url('task'))
317 325 r.connect(url)
318 326
319 327 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -388,15 +396,6 b' class Hub(SessionFactory):'
388 396 super(Hub, self).__init__(**kwargs)
389 397 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
390 398
391 # validate connection dicts:
392 for k,v in self.client_info.iteritems():
393 if k == 'task_scheme':
394 continue
395 else:
396 util.validate_url_container(v)
397 # util.validate_url_container(self.client_info)
398 util.validate_url_container(self.engine_info)
399
400 399 # register our callbacks
401 400 self.query.on_recv(self.dispatch_query)
402 401 self.monitor.on_recv(self.dispatch_monitor_traffic)
@@ -145,19 +145,23 b' class EngineFactory(RegistrationFactory):'
145 145 content = msg['content']
146 146 info = self.connection_info
147 147
148 def url(key):
149 """get zmq url for given channel"""
150 return str(info["interface"] + ":%i" % info[key])
151
148 152 if content['status'] == 'ok':
149 153 self.id = int(content['id'])
150 154
151 155 # launch heartbeat
152 156 # possibly forward hb ports with tunnels
153 hb_ping = maybe_tunnel(info['hb_ping'])
154 hb_pong = maybe_tunnel(info['hb_pong'])
157 hb_ping = maybe_tunnel(url('hb_ping'))
158 hb_pong = maybe_tunnel(url('hb_pong'))
155 159
156 160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
157 161 heart.start()
158 162
159 163 # create Shell Connections (MUX, Task, etc.):
160 shell_addrs = map(str, [info['mux'], info['task']])
164 shell_addrs = url('mux'), url('task')
161 165
162 166 # Use only one shell stream for mux and tasks
163 167 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
@@ -167,13 +171,13 b' class EngineFactory(RegistrationFactory):'
167 171 connect(stream, addr)
168 172
169 173 # control stream:
170 control_addr = str(info['control'])
174 control_addr = url('control')
171 175 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
172 176 control_stream.setsockopt(zmq.IDENTITY, identity)
173 177 connect(control_stream, control_addr)
174 178
175 179 # create iopub stream:
176 iopub_addr = info['iopub']
180 iopub_addr = url('iopub')
177 181 iopub_socket = ctx.socket(zmq.PUB)
178 182 iopub_socket.setsockopt(zmq.IDENTITY, identity)
179 183 connect(iopub_socket, iopub_addr)
General Comments 0
You need to be logged in to leave comments. Login now