##// END OF EJS Templates
simplify IPython.parallel connections...
MinRK -
Show More
@@ -209,7 +209,7 b' class IPControllerApp(BaseParallelApplication):'
209 209 def save_connection_dict(self, fname, cdict):
210 210 """save a connection dict to json file."""
211 211 c = self.config
212 url = cdict['url']
212 url = cdict['registration']
213 213 location = cdict['location']
214 214 if not location:
215 215 try:
@@ -314,15 +314,21 b' class IPControllerApp(BaseParallelApplication):'
314 314 if self.write_connection_files:
315 315 # save to new json config files
316 316 f = self.factory
317 cdict = {'exec_key' : f.session.key.decode('ascii'),
318 'ssh' : self.ssh_server,
319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
320 'location' : self.location
321 }
317 base = {
318 'exec_key' : f.session.key.decode('ascii'),
319 'location' : self.location,
320 'pack' : f.session.packer,
321 'unpack' : f.session.unpacker,
322 }
323
324 cdict = {'ssh' : self.ssh_server}
325 cdict.update(f.client_info)
326 cdict.update(base)
322 327 self.save_connection_dict(self.client_json_file, cdict)
323 edict = cdict
324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
325 edict['ssh'] = self.engine_ssh_server
328
329 edict = {'ssh' : self.engine_ssh_server}
330 edict.update(f.engine_info)
331 edict.update(base)
326 332 self.save_connection_dict(self.engine_json_file, edict)
327 333
328 334 def init_schedulers(self):
@@ -379,7 +385,7 b' class IPControllerApp(BaseParallelApplication):'
379 385
380 386 else:
381 387 self.log.info("task::using Python %s Task scheduler"%scheme)
382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
388 sargs = (hub.client_info['task'], hub.engine_info['task'],
383 389 monitor_url, disambiguate_url(hub.client_info['notification']))
384 390 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 391 log_url = self.log_url, config=dict(self.config))
@@ -211,24 +211,35 b' class IPEngineApp(BaseParallelApplication):'
211 211 with open(self.url_file) as f:
212 212 d = json.loads(f.read())
213 213
214 if 'exec_key' in d:
215 config.Session.key = cast_bytes(d['exec_key'])
216
214 # allow hand-override of location for disambiguation
215 # and ssh-server
217 216 try:
218 217 config.EngineFactory.location
219 218 except AttributeError:
220 219 config.EngineFactory.location = d['location']
221 220
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 try:
224 config.EngineFactory.url
225 except AttributeError:
226 config.EngineFactory.url = d['url']
227
228 221 try:
229 222 config.EngineFactory.sshserver
230 223 except AttributeError:
231 config.EngineFactory.sshserver = d['ssh']
224 config.EngineFactory.sshserver = d.get('ssh')
225
226 location = config.EngineFactory.location
227
228 for key in ('registration', 'hb_ping', 'hb_pong', 'mux', 'task', 'control'):
229 d[key] = disambiguate_url(d[key], location)
230
231 # DO NOT allow override of basic URLs, serialization, or exec_key
232 # JSON file takes top priority there
233 config.Session.key = asbytes(d['exec_key'])
234
235 config.EngineFactory.url = d['registration']
236
237 config.Session.packer = d['pack']
238 config.Session.unpacker = d['unpack']
239
240 self.log.debug("Config changed:")
241 self.log.debug("%r", config)
242 self.connection_info = d
232 243
233 244 def bind_kernel(self, **kwargs):
234 245 """Promote engine to listening kernel, accessible to frontends."""
@@ -320,7 +331,9 b' class IPEngineApp(BaseParallelApplication):'
320 331 # shell_class = import_item(self.master_config.Global.shell_class)
321 332 # print self.config
322 333 try:
323 self.engine = EngineFactory(config=config, log=self.log)
334 self.engine = EngineFactory(config=config, log=self.log,
335 connection_info=self.connection_info,
336 )
324 337 except:
325 338 self.log.error("Couldn't start the Engine", exc_info=True)
326 339 self.exit(1)
@@ -217,7 +217,9 b' class Client(HasTraits):'
217 217 Parameters
218 218 ----------
219 219
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 url_file : str/unicode; path to ipcontroller-client.json
221 This JSON file should contain all the information needed to connect to a cluster,
222 and is likely the only argument needed.
221 223 Connection information for the Hub's registration. If a json connector
222 224 file is given, then likely no further configuration is necessary.
223 225 [Default: use profile]
@@ -239,14 +241,6 b' class Client(HasTraits):'
239 241 If specified, this will be relayed to the Session for configuration
240 242 username : str
241 243 set username for the session object
242 packer : str (import_string) or callable
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244 function to serialize messages. Must support same input as
245 JSON, and output must be bytes.
246 You can pass a callable directly as `pack`
247 unpacker : str (import_string) or callable
248 The inverse of packer. Only necessary if packer is specified as *not* one
249 of 'json' or 'pickle'.
250 244
251 245 #-------------- ssh related args ----------------
252 246 # These are args for configuring the ssh tunnel to be used
@@ -271,17 +265,6 b' class Client(HasTraits):'
271 265 flag for whether to use paramiko instead of shell ssh for tunneling.
272 266 [default: True on win32, False else]
273 267
274 ------- exec authentication args -------
275 If even localhost is untrusted, you can have some protection against
276 unauthorized execution by signing messages with HMAC digests.
277 Messages are still sent as cleartext, so if someone can snoop your
278 loopback traffic this will not protect your privacy, but will prevent
279 unauthorized execution.
280
281 exec_key : str
282 an authentication key or file containing a key
283 default: None
284
285 268
286 269 Attributes
287 270 ----------
@@ -378,8 +361,8 b' class Client(HasTraits):'
378 361 # don't raise on positional args
379 362 return HasTraits.__new__(self, **kw)
380 363
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
382 context=None, debug=False, exec_key=None,
364 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
365 context=None, debug=False,
383 366 sshserver=None, sshkey=None, password=None, paramiko=None,
384 367 timeout=10, **extra_args
385 368 ):
@@ -391,38 +374,38 b' class Client(HasTraits):'
391 374 context = zmq.Context.instance()
392 375 self._context = context
393 376 self._stop_spinning = Event()
377
378 if 'url_or_file' in extra_args:
379 url_file = extra_args['url_or_file']
380 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381
382 if url_file and util.is_url(url_file):
383 raise ValueError("single urls cannot be specified, url-files must be used.")
394 384
395 385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386
396 387 if self._cd is not None:
397 if url_or_file is None:
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399 if url_or_file is None:
388 if url_file is None:
389 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
390 if url_file is None:
400 391 raise ValueError(
401 392 "I can't find enough information to connect to a hub!"
402 " Please specify at least one of url_or_file or profile."
393 " Please specify at least one of url_file or profile."
403 394 )
404
405 if not util.is_url(url_or_file):
406 # it's not a url, try for a file
407 if not os.path.exists(url_or_file):
408 if self._cd:
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410 if not os.path.exists(url_or_file):
411 raise IOError("Connection file not found: %r" % url_or_file)
412 with open(url_or_file) as f:
413 cfg = json.loads(f.read())
414 else:
415 cfg = {'url':url_or_file}
395
396 with open(url_file) as f:
397 cfg = json.load(f)
398
399 self._task_scheme = cfg['task_scheme']
416 400
417 401 # sync defaults from args, json:
418 402 if sshserver:
419 403 cfg['ssh'] = sshserver
420 if exec_key:
421 cfg['exec_key'] = exec_key
422 exec_key = cfg['exec_key']
404
423 405 location = cfg.setdefault('location', None)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
425 url = cfg['url']
406 for key in ('control', 'task', 'mux', 'notification', 'registration'):
407 cfg[key] = util.disambiguate_url(cfg[key], location)
408 url = cfg['registration']
426 409 proto,addr,port = util.split_url(url)
427 410 if location is not None and addr == '127.0.0.1':
428 411 # location specified, and connection is expected to be local
@@ -457,12 +440,10 b' class Client(HasTraits):'
457 440 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458 441
459 442 # configure and construct the session
460 if exec_key is not None:
461 if os.path.isfile(exec_key):
462 extra_args['keyfile'] = exec_key
463 else:
464 exec_key = cast_bytes(exec_key)
465 extra_args['key'] = exec_key
443 extra_args['packer'] = cfg['pack']
444 extra_args['unpacker'] = cfg['unpack']
445 extra_args['key'] = cfg['exec_key']
446
466 447 self.session = Session(**extra_args)
467 448
468 449 self._query_socket = self._context.socket(zmq.DEALER)
@@ -583,7 +564,7 b' class Client(HasTraits):'
583 564 self._connected=True
584 565
585 566 def connect_socket(s, url):
586 url = util.disambiguate_url(url, self._config['location'])
567 # url = util.disambiguate_url(url, self._config['location'])
587 568 if self._ssh:
588 569 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 570 else:
@@ -600,30 +581,28 b' class Client(HasTraits):'
600 581 idents,msg = self.session.recv(self._query_socket,mode=0)
601 582 if self.debug:
602 583 pprint(msg)
603 msg = Message(msg)
604 content = msg.content
605 self._config['registration'] = dict(content)
606 if content.status == 'ok':
607 ident = self.session.bsession
608 if content.mux:
609 self._mux_socket = self._context.socket(zmq.DEALER)
610 connect_socket(self._mux_socket, content.mux)
611 if content.task:
612 self._task_scheme, task_addr = content.task
613 self._task_socket = self._context.socket(zmq.DEALER)
614 connect_socket(self._task_socket, task_addr)
615 if content.notification:
616 self._notification_socket = self._context.socket(zmq.SUB)
617 connect_socket(self._notification_socket, content.notification)
618 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
619 if content.control:
620 self._control_socket = self._context.socket(zmq.DEALER)
621 connect_socket(self._control_socket, content.control)
622 if content.iopub:
623 self._iopub_socket = self._context.socket(zmq.SUB)
624 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
625 connect_socket(self._iopub_socket, content.iopub)
626 self._update_engines(dict(content.engines))
584 content = msg['content']
585 # self._config['registration'] = dict(content)
586 cfg = self._config
587 if content['status'] == 'ok':
588 self._mux_socket = self._context.socket(zmq.DEALER)
589 connect_socket(self._mux_socket, cfg['mux'])
590
591 self._task_socket = self._context.socket(zmq.DEALER)
592 connect_socket(self._task_socket, cfg['task'])
593
594 self._notification_socket = self._context.socket(zmq.SUB)
595 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
596 connect_socket(self._notification_socket, cfg['notification'])
597
598 self._control_socket = self._context.socket(zmq.DEALER)
599 connect_socket(self._control_socket, cfg['control'])
600
601 self._iopub_socket = self._context.socket(zmq.SUB)
602 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
603 connect_socket(self._iopub_socket, cfg['iopub'])
604
605 self._update_engines(dict(content['engines']))
627 606 else:
628 607 self._connected = False
629 608 raise Exception("Failed to connect!")
@@ -239,30 +239,61 b' class HubFactory(RegistrationFactory):'
239 239 ctx = self.context
240 240 loop = self.loop
241 241
242 try:
243 scheme = self.config.TaskScheduler.scheme_name
244 except AttributeError:
245 from .scheduler import TaskScheduler
246 scheme = TaskScheduler.scheme_name.get_default_value()
247
248 # build connection dicts
249 engine = self.engine_info = {
250 'registration' : engine_iface % self.regport,
251 'control' : engine_iface % self.control[1],
252 'mux' : engine_iface % self.mux[1],
253 'hb_ping' : engine_iface % self.hb[0],
254 'hb_pong' : engine_iface % self.hb[1],
255 'task' : engine_iface % self.task[1],
256 'iopub' : engine_iface % self.iopub[1],
257 }
258
259 client = self.client_info = {
260 'registration' : client_iface % self.regport,
261 'control' : client_iface % self.control[0],
262 'mux' : client_iface % self.mux[0],
263 'task' : client_iface % self.task[0],
264 'task_scheme' : scheme,
265 'iopub' : client_iface % self.iopub[0],
266 'notification' : client_iface % self.notifier_port,
267 }
268
269 self.log.debug("Hub engine addrs: %s", self.engine_info)
270 self.log.debug("Hub client addrs: %s", self.client_info)
271
242 272 # Registrar socket
243 273 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
244 q.bind(client_iface % self.regport)
274 q.bind(client['registration'])
245 275 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
246 276 if self.client_ip != self.engine_ip:
247 q.bind(engine_iface % self.regport)
277 q.bind(engine['registration'])
248 278 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
249 279
250 280 ### Engine connections ###
251 281
252 282 # heartbeat
253 283 hpub = ctx.socket(zmq.PUB)
254 hpub.bind(engine_iface % self.hb[0])
284 hpub.bind(engine['hb_ping'])
255 285 hrep = ctx.socket(zmq.ROUTER)
256 hrep.bind(engine_iface % self.hb[1])
286 hrep.bind(engine['hb_pong'])
257 287 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
258 288 pingstream=ZMQStream(hpub,loop),
259 289 pongstream=ZMQStream(hrep,loop)
260 290 )
261 291
262 292 ### Client connections ###
293
263 294 # Notifier socket
264 295 n = ZMQStream(ctx.socket(zmq.PUB), loop)
265 n.bind(client_iface%self.notifier_port)
296 n.bind(client['notification'])
266 297
267 298 ### build and launch the queues ###
268 299
@@ -279,35 +310,10 b' class HubFactory(RegistrationFactory):'
279 310 self.db = import_item(str(db_class))(session=self.session.session,
280 311 config=self.config, log=self.log)
281 312 time.sleep(.25)
282 try:
283 scheme = self.config.TaskScheduler.scheme_name
284 except AttributeError:
285 from .scheduler import TaskScheduler
286 scheme = TaskScheduler.scheme_name.get_default_value()
287 # build connection dicts
288 self.engine_info = {
289 'control' : engine_iface%self.control[1],
290 'mux': engine_iface%self.mux[1],
291 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
292 'task' : engine_iface%self.task[1],
293 'iopub' : engine_iface%self.iopub[1],
294 # 'monitor' : engine_iface%self.mon_port,
295 }
296
297 self.client_info = {
298 'control' : client_iface%self.control[0],
299 'mux': client_iface%self.mux[0],
300 'task' : (scheme, client_iface%self.task[0]),
301 'iopub' : client_iface%self.iopub[0],
302 'notification': client_iface%self.notifier_port
303 }
304 self.log.debug("Hub engine addrs: %s", self.engine_info)
305 self.log.debug("Hub client addrs: %s", self.client_info)
306 313
307 314 # resubmit stream
308 315 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
309 url = util.disambiguate_url(self.client_info['task'][-1])
310 r.setsockopt(zmq.IDENTITY, self.session.bsession)
316 url = util.disambiguate_url(self.client_info['task'])
311 317 r.connect(url)
312 318
313 319 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -384,8 +390,8 b' class Hub(SessionFactory):'
384 390
385 391 # validate connection dicts:
386 392 for k,v in self.client_info.iteritems():
387 if k == 'task':
388 util.validate_url_container(v[1])
393 if k == 'task_scheme':
394 continue
389 395 else:
390 396 util.validate_url_container(v)
391 397 # util.validate_url_container(self.client_info)
@@ -865,7 +871,6 b' class Hub(SessionFactory):'
865 871 """Reply with connection addresses for clients."""
866 872 self.log.info("client::client %r connected", client_id)
867 873 content = dict(status='ok')
868 content.update(self.client_info)
869 874 jsonable = {}
870 875 for k,v in self.keytable.iteritems():
871 876 if v not in self.dead_engines:
@@ -891,7 +896,6 b' class Hub(SessionFactory):'
891 896 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
892 897
893 898 content = dict(id=eid,status='ok')
894 content.update(self.engine_info)
895 899 # check if requesting available IDs:
896 900 if queue in self.by_ident:
897 901 try:
@@ -50,7 +50,7 b' class EngineFactory(RegistrationFactory):'
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 timeout=CFloat(2,config=True,
53 timeout=CFloat(5, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 56 sshserver=Unicode(config=True,
@@ -61,10 +61,11 b' class EngineFactory(RegistrationFactory):'
61 61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 62
63 63 # not configurable:
64 user_ns=Dict()
65 id=Integer(allow_none=True)
66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 kernel=Instance(Kernel)
64 connection_info = Dict()
65 user_ns = Dict()
66 id = Integer(allow_none=True)
67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 kernel = Instance(Kernel)
68 69
69 70 bident = CBytes()
70 71 ident = Unicode()
@@ -96,7 +97,7 b' class EngineFactory(RegistrationFactory):'
96 97 def connect(s, url):
97 98 url = disambiguate_url(url, self.location)
98 99 if self.using_ssh:
99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
100 101 return tunnel.tunnel_connection(s, url, self.sshserver,
101 102 keyfile=self.sshkey, paramiko=self.paramiko,
102 103 password=password,
@@ -108,12 +109,12 b' class EngineFactory(RegistrationFactory):'
108 109 """like connect, but don't complete the connection (for use by heartbeat)"""
109 110 url = disambiguate_url(url, self.location)
110 111 if self.using_ssh:
111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
112 113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 114 keyfile=self.sshkey, paramiko=self.paramiko,
114 115 password=password,
115 116 )
116 return url
117 return str(url)
117 118 return connect, maybe_tunnel
118 119
119 120 def register(self):
@@ -131,7 +132,7 b' class EngineFactory(RegistrationFactory):'
131 132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 134 # print (self.session.key)
134 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request", content=content)
135 136
136 137 def complete_registration(self, msg, connect, maybe_tunnel):
137 138 # print msg
@@ -140,50 +141,39 b' class EngineFactory(RegistrationFactory):'
140 141 loop = self.loop
141 142 identity = self.bident
142 143 idents,msg = self.session.feed_identities(msg)
143 msg = Message(self.session.unserialize(msg))
144
145 if msg.content.status == 'ok':
146 self.id = int(msg.content.id)
144 msg = self.session.unserialize(msg)
145 content = msg['content']
146 info = self.connection_info
147
148 if content['status'] == 'ok':
149 self.id = int(content['id'])
147 150
148 151 # launch heartbeat
149 hb_addrs = msg.content.heartbeat
150
151 152 # possibly forward hb ports with tunnels
152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
153 hb_ping = maybe_tunnel(info['hb_ping'])
154 hb_pong = maybe_tunnel(info['hb_pong'])
155
156 heart = Heart(hb_ping, hb_pong, heart_id=identity)
154 157 heart.start()
155 158
156 # create Shell Streams (MUX, Task, etc.):
157 queue_addr = msg.content.mux
158 shell_addrs = [ str(queue_addr) ]
159 task_addr = msg.content.task
160 if task_addr:
161 shell_addrs.append(str(task_addr))
162
163 # Uncomment this to go back to two-socket model
164 # shell_streams = []
165 # for addr in shell_addrs:
166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.connect(disambiguate_url(addr, self.location))
169 # shell_streams.append(stream)
170
171 # Now use only one shell stream for mux and tasks
159 # create Shell Connections (MUX, Task, etc.):
160 shell_addrs = map(str, [info['mux'], info['task']])
161
162 # Use only one shell stream for mux and tasks
172 163 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 164 stream.setsockopt(zmq.IDENTITY, identity)
174 165 shell_streams = [stream]
175 166 for addr in shell_addrs:
176 167 connect(stream, addr)
177 # end single stream-socket
178 168
179 169 # control stream:
180 control_addr = str(msg.content.control)
170 control_addr = str(info['control'])
181 171 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 172 control_stream.setsockopt(zmq.IDENTITY, identity)
183 173 connect(control_stream, control_addr)
184 174
185 175 # create iopub stream:
186 iopub_addr = msg.content.iopub
176 iopub_addr = info['iopub']
187 177 iopub_socket = ctx.socket(zmq.PUB)
188 178 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 179 connect(iopub_socket, iopub_addr)
@@ -257,9 +257,11 b' class Session(Configurable):'
257 257 if new.lower() == 'json':
258 258 self.pack = json_packer
259 259 self.unpack = json_unpacker
260 self.unpacker = new
260 261 elif new.lower() == 'pickle':
261 262 self.pack = pickle_packer
262 263 self.unpack = pickle_unpacker
264 self.unpacker = new
263 265 else:
264 266 self.pack = import_item(str(new))
265 267
@@ -270,9 +272,11 b' class Session(Configurable):'
270 272 if new.lower() == 'json':
271 273 self.pack = json_packer
272 274 self.unpack = json_unpacker
275 self.packer = new
273 276 elif new.lower() == 'pickle':
274 277 self.pack = pickle_packer
275 278 self.unpack = pickle_unpacker
279 self.packer = new
276 280 else:
277 281 self.unpack = import_item(str(new))
278 282
General Comments 0
You need to be logged in to leave comments. Login now