Show More
@@ -211,19 +211,15 b' class IPControllerApp(BaseParallelApplication):' | |||
|
211 | 211 | c = self.config |
|
212 | 212 | url = cdict['registration'] |
|
213 | 213 | location = cdict['location'] |
|
214 | ||
|
214 | 215 | if not location: |
|
215 | 216 | try: |
|
216 | proto,ip,port = split_url(url) | |
|
217 |
except |
|
|
218 | pass | |
|
219 | else: | |
|
220 | try: | |
|
221 | location = socket.gethostbyname_ex(socket.gethostname())[2][-1] | |
|
222 | except (socket.gaierror, IndexError): | |
|
223 | self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1." | |
|
224 | " You may need to specify '--location=<external_ip_address>' to help" | |
|
225 | " IPython decide when to connect via loopback.") | |
|
226 | location = '127.0.0.1' | |
|
217 | location = socket.gethostbyname_ex(socket.gethostname())[2][-1] | |
|
218 | except (socket.gaierror, IndexError): | |
|
219 | self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1." | |
|
220 | " You may need to specify '--location=<external_ip_address>' to help" | |
|
221 | " IPython decide when to connect via loopback.") | |
|
222 | location = '127.0.0.1' | |
|
227 | 223 | cdict['location'] = location |
|
228 | 224 | fname = os.path.join(self.profile_dir.security_dir, fname) |
|
229 | 225 | self.log.info("writing connection info to %s", fname) |
@@ -235,35 +231,51 b' class IPControllerApp(BaseParallelApplication):' | |||
|
235 | 231 | """load config from existing json connector files.""" |
|
236 | 232 | c = self.config |
|
237 | 233 | self.log.debug("loading config from JSON") |
|
238 | # load from engine config | |
|
234 | ||
|
235 | # load engine config | |
|
236 | ||
|
239 | 237 | fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file) |
|
240 | 238 | self.log.info("loading connection info from %s", fname) |
|
241 | 239 | with open(fname) as f: |
|
242 | cfg = json.loads(f.read()) | |
|
243 | key = cfg['exec_key'] | |
|
240 | ecfg = json.loads(f.read()) | |
|
241 | ||
|
244 | 242 | # json gives unicode, Session.key wants bytes |
|
245 | c.Session.key = key.encode('ascii') | |
|
246 | xport,addr = cfg['url'].split('://') | |
|
247 | c.HubFactory.engine_transport = xport | |
|
248 | ip,ports = addr.split(':') | |
|
243 | c.Session.key = ecfg['exec_key'].encode('ascii') | |
|
244 | ||
|
245 | xport,ip = ecfg['interface'].split('://') | |
|
246 | ||
|
249 | 247 | c.HubFactory.engine_ip = ip |
|
250 |
c.HubFactory. |
|
|
251 | self.location = cfg['location'] | |
|
248 | c.HubFactory.engine_transport = xport | |
|
249 | ||
|
250 | self.location = ecfg['location'] | |
|
252 | 251 | if not self.engine_ssh_server: |
|
253 | self.engine_ssh_server = cfg['ssh'] | |
|
252 | self.engine_ssh_server = ecfg['ssh'] | |
|
253 | ||
|
254 | 254 | # load client config |
|
255 | ||
|
255 | 256 | fname = os.path.join(self.profile_dir.security_dir, self.client_json_file) |
|
256 | 257 | self.log.info("loading connection info from %s", fname) |
|
257 | 258 | with open(fname) as f: |
|
258 | cfg = json.loads(f.read()) | |
|
259 | assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" | |
|
260 | xport,addr = cfg['url'].split('://') | |
|
259 | ccfg = json.loads(f.read()) | |
|
260 | ||
|
261 | for key in ('exec_key', 'registration', 'pack', 'unpack'): | |
|
262 | assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key | |
|
263 | ||
|
264 | xport,addr = ccfg['interface'].split('://') | |
|
265 | ||
|
261 | 266 | c.HubFactory.client_transport = xport |
|
262 | ip,ports = addr.split(':') | |
|
263 | 267 | c.HubFactory.client_ip = ip |
|
264 | 268 | if not self.ssh_server: |
|
265 | self.ssh_server = cfg['ssh'] | |
|
266 | assert int(ports) == c.HubFactory.regport, "regport mismatch" | |
|
269 | self.ssh_server = ccfg['ssh'] | |
|
270 | ||
|
271 | # load port config: | |
|
272 | c.HubFactory.regport = ecfg['registration'] | |
|
273 | c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong']) | |
|
274 | c.HubFactory.control = (ccfg['control'], ecfg['control']) | |
|
275 | c.HubFactory.mux = (ccfg['mux'], ecfg['mux']) | |
|
276 | c.HubFactory.task = (ccfg['task'], ecfg['task']) | |
|
277 | c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub']) | |
|
278 | c.HubFactory.notifier_port = ccfg['notification'] | |
|
267 | 279 | |
|
268 | 280 | def cleanup_connection_files(self): |
|
269 | 281 | if self.reuse_files: |
@@ -335,14 +347,14 b' class IPControllerApp(BaseParallelApplication):' | |||
|
335 | 347 | children = self.children |
|
336 | 348 | mq = import_item(str(self.mq_class)) |
|
337 | 349 | |
|
338 |
|
|
|
350 | f = self.factory | |
|
339 | 351 | # disambiguate url, in case of * |
|
340 |
monitor_url = disambiguate_url( |
|
|
352 | monitor_url = disambiguate_url(f.monitor_url) | |
|
341 | 353 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url |
|
342 | 354 | # IOPub relay (in a Process) |
|
343 | 355 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') |
|
344 |
q.bind_in( |
|
|
345 |
q.bind_out( |
|
|
356 | q.bind_in(f.client_url('iopub')) | |
|
357 | q.bind_out(f.engine_url('iopub')) | |
|
346 | 358 | q.setsockopt_out(zmq.SUBSCRIBE, b'') |
|
347 | 359 | q.connect_mon(monitor_url) |
|
348 | 360 | q.daemon=True |
@@ -350,18 +362,18 b' class IPControllerApp(BaseParallelApplication):' | |||
|
350 | 362 | |
|
351 | 363 | # Multiplexer Queue (in a Process) |
|
352 | 364 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
353 |
q.bind_in( |
|
|
365 | q.bind_in(f.client_url('mux')) | |
|
354 | 366 | q.setsockopt_in(zmq.IDENTITY, b'mux') |
|
355 |
q.bind_out( |
|
|
367 | q.bind_out(f.engine_url('mux')) | |
|
356 | 368 | q.connect_mon(monitor_url) |
|
357 | 369 | q.daemon=True |
|
358 | 370 | children.append(q) |
|
359 | 371 | |
|
360 | 372 | # Control Queue (in a Process) |
|
361 | 373 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') |
|
362 |
q.bind_in( |
|
|
374 | q.bind_in(f.client_url('control')) | |
|
363 | 375 | q.setsockopt_in(zmq.IDENTITY, b'control') |
|
364 |
q.bind_out( |
|
|
376 | q.bind_out(f.engine_url('control')) | |
|
365 | 377 | q.connect_mon(monitor_url) |
|
366 | 378 | q.daemon=True |
|
367 | 379 | children.append(q) |
@@ -374,9 +386,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
374 | 386 | self.log.warn("task::using pure DEALER Task scheduler") |
|
375 | 387 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') |
|
376 | 388 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
377 |
q.bind_in( |
|
|
389 | q.bind_in(f.client_url('task')) | |
|
378 | 390 | q.setsockopt_in(zmq.IDENTITY, b'task') |
|
379 |
q.bind_out( |
|
|
391 | q.bind_out(f.engine_url('task')) | |
|
380 | 392 | q.connect_mon(monitor_url) |
|
381 | 393 | q.daemon=True |
|
382 | 394 | children.append(q) |
@@ -385,8 +397,8 b' class IPControllerApp(BaseParallelApplication):' | |||
|
385 | 397 | |
|
386 | 398 | else: |
|
387 | 399 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
388 |
sargs = ( |
|
|
389 |
monitor_url, disambiguate_url( |
|
|
400 | sargs = (f.client_url('task'), f.engine_url('task'), | |
|
401 | monitor_url, disambiguate_url(f.client_url('notification'))) | |
|
390 | 402 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
391 | 403 | log_url = self.log_url, config=dict(self.config)) |
|
392 | 404 | if 'Process' in self.mq_class: |
@@ -45,7 +45,7 b' from IPython.zmq.session import (' | |||
|
45 | 45 | from IPython.config.configurable import Configurable |
|
46 | 46 | |
|
47 | 47 | from IPython.parallel.engine.engine import EngineFactory |
|
48 |
from IPython.parallel.util import disambiguate_ |
|
|
48 | from IPython.parallel.util import disambiguate_ip_address | |
|
49 | 49 | |
|
50 | 50 | from IPython.utils.importstring import import_item |
|
51 | 51 | from IPython.utils.py3compat import cast_bytes |
@@ -225,14 +225,15 b' class IPEngineApp(BaseParallelApplication):' | |||
|
225 | 225 | |
|
226 | 226 | location = config.EngineFactory.location |
|
227 | 227 | |
|
228 | for key in ('registration', 'hb_ping', 'hb_pong', 'mux', 'task', 'control'): | |
|
229 | d[key] = disambiguate_url(d[key], location) | |
|
228 | proto, ip = d['interface'].split('://') | |
|
229 | ip = disambiguate_ip_address(ip) | |
|
230 | d['interface'] = '%s://%s' % (proto, ip) | |
|
230 | 231 | |
|
231 | 232 | # DO NOT allow override of basic URLs, serialization, or exec_key |
|
232 | 233 | # JSON file takes top priority there |
|
233 | config.Session.key = asbytes(d['exec_key']) | |
|
234 | config.Session.key = cast_bytes(d['exec_key']) | |
|
234 | 235 | |
|
235 | config.EngineFactory.url = d['registration'] | |
|
236 | config.EngineFactory.url = d['interface'] + ':%i' % d['registration'] | |
|
236 | 237 | |
|
237 | 238 | config.Session.packer = d['pack'] |
|
238 | 239 | config.Session.unpacker = d['unpack'] |
@@ -403,10 +403,17 b' class Client(HasTraits):' | |||
|
403 | 403 | cfg['ssh'] = sshserver |
|
404 | 404 | |
|
405 | 405 | location = cfg.setdefault('location', None) |
|
406 | for key in ('control', 'task', 'mux', 'notification', 'registration'): | |
|
407 | cfg[key] = util.disambiguate_url(cfg[key], location) | |
|
406 | ||
|
407 | proto,addr = cfg['interface'].split('://') | |
|
408 | addr = util.disambiguate_ip_address(addr) | |
|
409 | cfg['interface'] = "%s://%s" % (proto, addr) | |
|
410 | ||
|
411 | # turn interface,port into full urls: | |
|
412 | for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'): | |
|
413 | cfg[key] = cfg['interface'] + ':%i' % cfg[key] | |
|
414 | ||
|
408 | 415 | url = cfg['registration'] |
|
409 | proto,addr,port = util.split_url(url) | |
|
416 | ||
|
410 | 417 | if location is not None and addr == '127.0.0.1': |
|
411 | 418 | # location specified, and connection is expected to be local |
|
412 | 419 | if location not in LOCAL_IPS and not sshserver: |
@@ -431,7 +438,7 b' class Client(HasTraits):' | |||
|
431 | 438 | self._ssh = bool(sshserver or sshkey or password) |
|
432 | 439 | if self._ssh and sshserver is None: |
|
433 | 440 | # default to ssh via localhost |
|
434 | sshserver = url.split('://')[1].split(':')[0] | |
|
441 | sshserver = addr | |
|
435 | 442 | if self._ssh and password is None: |
|
436 | 443 | if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): |
|
437 | 444 | password=False |
@@ -449,9 +456,9 b' class Client(HasTraits):' | |||
|
449 | 456 | self._query_socket = self._context.socket(zmq.DEALER) |
|
450 | 457 | |
|
451 | 458 | if self._ssh: |
|
452 |
tunnel.tunnel_connection(self._query_socket, |
|
|
459 | tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs) | |
|
453 | 460 | else: |
|
454 |
self._query_socket.connect( |
|
|
461 | self._query_socket.connect(cfg['registration']) | |
|
455 | 462 | |
|
456 | 463 | self.session.debug = self.debug |
|
457 | 464 |
@@ -131,29 +131,29 b' class HubFactory(RegistrationFactory):' | |||
|
131 | 131 | |
|
132 | 132 | # port-pairs for monitoredqueues: |
|
133 | 133 | hb = Tuple(Integer,Integer,config=True, |
|
134 |
help=""" |
|
|
134 | help="""PUB/ROUTER Port pair for Engine heartbeats""") | |
|
135 | 135 | def _hb_default(self): |
|
136 | 136 | return tuple(util.select_random_ports(2)) |
|
137 | 137 | |
|
138 | 138 | mux = Tuple(Integer,Integer,config=True, |
|
139 |
help=""" |
|
|
139 | help="""Client/Engine Port pair for MUX queue""") | |
|
140 | 140 | |
|
141 | 141 | def _mux_default(self): |
|
142 | 142 | return tuple(util.select_random_ports(2)) |
|
143 | 143 | |
|
144 | 144 | task = Tuple(Integer,Integer,config=True, |
|
145 |
help=""" |
|
|
145 | help="""Client/Engine Port pair for Task queue""") | |
|
146 | 146 | def _task_default(self): |
|
147 | 147 | return tuple(util.select_random_ports(2)) |
|
148 | 148 | |
|
149 | 149 | control = Tuple(Integer,Integer,config=True, |
|
150 |
help=""" |
|
|
150 | help="""Client/Engine Port pair for Control queue""") | |
|
151 | 151 | |
|
152 | 152 | def _control_default(self): |
|
153 | 153 | return tuple(util.select_random_ports(2)) |
|
154 | 154 | |
|
155 | 155 | iopub = Tuple(Integer,Integer,config=True, |
|
156 |
help=""" |
|
|
156 | help="""Client/Engine Port pair for IOPub relay""") | |
|
157 | 157 | |
|
158 | 158 | def _iopub_default(self): |
|
159 | 159 | return tuple(util.select_random_ports(2)) |
@@ -231,10 +231,16 b' class HubFactory(RegistrationFactory):' | |||
|
231 | 231 | self.heartmonitor.start() |
|
232 | 232 | self.log.info("Heartmonitor started") |
|
233 | 233 | |
|
234 | def client_url(self, channel): | |
|
235 | """return full zmq url for a named client channel""" | |
|
236 | return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel]) | |
|
237 | ||
|
238 | def engine_url(self, channel): | |
|
239 | """return full zmq url for a named engine channel""" | |
|
240 | return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel]) | |
|
241 | ||
|
234 | 242 | def init_hub(self): |
|
235 | """construct""" | |
|
236 | client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i" | |
|
237 | engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i" | |
|
243 | """construct Hub object""" | |
|
238 | 244 | |
|
239 | 245 | ctx = self.context |
|
240 | 246 | loop = self.loop |
@@ -247,23 +253,25 b' class HubFactory(RegistrationFactory):' | |||
|
247 | 253 | |
|
248 | 254 | # build connection dicts |
|
249 | 255 | engine = self.engine_info = { |
|
250 | 'registration' : engine_iface % self.regport, | |
|
251 | 'control' : engine_iface % self.control[1], | |
|
252 |
' |
|
|
253 |
' |
|
|
254 |
'hb_p |
|
|
255 |
' |
|
|
256 |
' |
|
|
256 | 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip), | |
|
257 | 'registration' : self.regport, | |
|
258 | 'control' : self.control[1], | |
|
259 | 'mux' : self.mux[1], | |
|
260 | 'hb_ping' : self.hb[0], | |
|
261 | 'hb_pong' : self.hb[1], | |
|
262 | 'task' : self.task[1], | |
|
263 | 'iopub' : self.iopub[1], | |
|
257 | 264 | } |
|
258 | 265 | |
|
259 | 266 | client = self.client_info = { |
|
260 | 'registration' : client_iface % self.regport, | |
|
261 | 'control' : client_iface % self.control[0], | |
|
262 |
' |
|
|
263 |
' |
|
|
267 | 'interface' : "%s://%s" % (self.client_transport, self.client_ip), | |
|
268 | 'registration' : self.regport, | |
|
269 | 'control' : self.control[0], | |
|
270 | 'mux' : self.mux[0], | |
|
271 | 'task' : self.task[0], | |
|
264 | 272 | 'task_scheme' : scheme, |
|
265 |
'iopub' : |
|
|
266 |
'notification' : |
|
|
273 | 'iopub' : self.iopub[0], | |
|
274 | 'notification' : self.notifier_port, | |
|
267 | 275 | } |
|
268 | 276 | |
|
269 | 277 | self.log.debug("Hub engine addrs: %s", self.engine_info) |
@@ -271,19 +279,19 b' class HubFactory(RegistrationFactory):' | |||
|
271 | 279 | |
|
272 | 280 | # Registrar socket |
|
273 | 281 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
274 |
q.bind(client |
|
|
275 |
self.log.info("Hub listening on %s for registration.", |
|
|
282 | q.bind(self.client_url('registration')) | |
|
283 | self.log.info("Hub listening on %s for registration.", self.client_url('registration')) | |
|
276 | 284 | if self.client_ip != self.engine_ip: |
|
277 |
q.bind(engine |
|
|
278 |
self.log.info("Hub listening on %s for registration.", |
|
|
285 | q.bind(self.engine_url('registration')) | |
|
286 | self.log.info("Hub listening on %s for registration.", self.engine_url('registration')) | |
|
279 | 287 | |
|
280 | 288 | ### Engine connections ### |
|
281 | 289 | |
|
282 | 290 | # heartbeat |
|
283 | 291 | hpub = ctx.socket(zmq.PUB) |
|
284 |
hpub.bind(engine |
|
|
292 | hpub.bind(self.engine_url('hb_ping')) | |
|
285 | 293 | hrep = ctx.socket(zmq.ROUTER) |
|
286 |
hrep.bind(engine |
|
|
294 | hrep.bind(self.engine_url('hb_pong')) | |
|
287 | 295 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
288 | 296 | pingstream=ZMQStream(hpub,loop), |
|
289 | 297 | pongstream=ZMQStream(hrep,loop) |
@@ -293,7 +301,7 b' class HubFactory(RegistrationFactory):' | |||
|
293 | 301 | |
|
294 | 302 | # Notifier socket |
|
295 | 303 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
296 |
n.bind(client |
|
|
304 | n.bind(self.client_url('notification')) | |
|
297 | 305 | |
|
298 | 306 | ### build and launch the queues ### |
|
299 | 307 | |
@@ -313,7 +321,7 b' class HubFactory(RegistrationFactory):' | |||
|
313 | 321 | |
|
314 | 322 | # resubmit stream |
|
315 | 323 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) |
|
316 |
url = util.disambiguate_url(self.client_ |
|
|
324 | url = util.disambiguate_url(self.client_url('task')) | |
|
317 | 325 | r.connect(url) |
|
318 | 326 | |
|
319 | 327 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
@@ -388,15 +396,6 b' class Hub(SessionFactory):' | |||
|
388 | 396 | super(Hub, self).__init__(**kwargs) |
|
389 | 397 | self.registration_timeout = max(5000, 2*self.heartmonitor.period) |
|
390 | 398 | |
|
391 | # validate connection dicts: | |
|
392 | for k,v in self.client_info.iteritems(): | |
|
393 | if k == 'task_scheme': | |
|
394 | continue | |
|
395 | else: | |
|
396 | util.validate_url_container(v) | |
|
397 | # util.validate_url_container(self.client_info) | |
|
398 | util.validate_url_container(self.engine_info) | |
|
399 | ||
|
400 | 399 | # register our callbacks |
|
401 | 400 | self.query.on_recv(self.dispatch_query) |
|
402 | 401 | self.monitor.on_recv(self.dispatch_monitor_traffic) |
@@ -145,19 +145,23 b' class EngineFactory(RegistrationFactory):' | |||
|
145 | 145 | content = msg['content'] |
|
146 | 146 | info = self.connection_info |
|
147 | 147 | |
|
148 | def url(key): | |
|
149 | """get zmq url for given channel""" | |
|
150 | return str(info["interface"] + ":%i" % info[key]) | |
|
151 | ||
|
148 | 152 | if content['status'] == 'ok': |
|
149 | 153 | self.id = int(content['id']) |
|
150 | 154 | |
|
151 | 155 | # launch heartbeat |
|
152 | 156 | # possibly forward hb ports with tunnels |
|
153 |
hb_ping = maybe_tunnel( |
|
|
154 |
hb_pong = maybe_tunnel( |
|
|
157 | hb_ping = maybe_tunnel(url('hb_ping')) | |
|
158 | hb_pong = maybe_tunnel(url('hb_pong')) | |
|
155 | 159 | |
|
156 | 160 | heart = Heart(hb_ping, hb_pong, heart_id=identity) |
|
157 | 161 | heart.start() |
|
158 | 162 | |
|
159 | 163 | # create Shell Connections (MUX, Task, etc.): |
|
160 |
shell_addrs = |
|
|
164 | shell_addrs = url('mux'), url('task') | |
|
161 | 165 | |
|
162 | 166 | # Use only one shell stream for mux and tasks |
|
163 | 167 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
@@ -167,13 +171,13 b' class EngineFactory(RegistrationFactory):' | |||
|
167 | 171 | connect(stream, addr) |
|
168 | 172 | |
|
169 | 173 | # control stream: |
|
170 |
control_addr = |
|
|
174 | control_addr = url('control') | |
|
171 | 175 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
172 | 176 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
173 | 177 | connect(control_stream, control_addr) |
|
174 | 178 | |
|
175 | 179 | # create iopub stream: |
|
176 |
iopub_addr = |
|
|
180 | iopub_addr = url('iopub') | |
|
177 | 181 | iopub_socket = ctx.socket(zmq.PUB) |
|
178 | 182 | iopub_socket.setsockopt(zmq.IDENTITY, identity) |
|
179 | 183 | connect(iopub_socket, iopub_addr) |
General Comments 0
You need to be logged in to leave comments.
Login now