##// END OF EJS Templates
simplify IPython.parallel connections...
MinRK -
Show More
@@ -209,7 +209,7 b' class IPControllerApp(BaseParallelApplication):'
209 def save_connection_dict(self, fname, cdict):
209 def save_connection_dict(self, fname, cdict):
210 """save a connection dict to json file."""
210 """save a connection dict to json file."""
211 c = self.config
211 c = self.config
212 url = cdict['url']
212 url = cdict['registration']
213 location = cdict['location']
213 location = cdict['location']
214 if not location:
214 if not location:
215 try:
215 try:
@@ -314,15 +314,21 b' class IPControllerApp(BaseParallelApplication):'
314 if self.write_connection_files:
314 if self.write_connection_files:
315 # save to new json config files
315 # save to new json config files
316 f = self.factory
316 f = self.factory
317 cdict = {'exec_key' : f.session.key.decode('ascii'),
317 base = {
318 'ssh' : self.ssh_server,
318 'exec_key' : f.session.key.decode('ascii'),
319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
319 'location' : self.location,
320 'location' : self.location
320 'pack' : f.session.packer,
321 }
321 'unpack' : f.session.unpacker,
322 }
323
324 cdict = {'ssh' : self.ssh_server}
325 cdict.update(f.client_info)
326 cdict.update(base)
322 self.save_connection_dict(self.client_json_file, cdict)
327 self.save_connection_dict(self.client_json_file, cdict)
323 edict = cdict
328
324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
329 edict = {'ssh' : self.engine_ssh_server}
325 edict['ssh'] = self.engine_ssh_server
330 edict.update(f.engine_info)
331 edict.update(base)
326 self.save_connection_dict(self.engine_json_file, edict)
332 self.save_connection_dict(self.engine_json_file, edict)
327
333
328 def init_schedulers(self):
334 def init_schedulers(self):
@@ -379,7 +385,7 b' class IPControllerApp(BaseParallelApplication):'
379
385
380 else:
386 else:
381 self.log.info("task::using Python %s Task scheduler"%scheme)
387 self.log.info("task::using Python %s Task scheduler"%scheme)
382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
388 sargs = (hub.client_info['task'], hub.engine_info['task'],
383 monitor_url, disambiguate_url(hub.client_info['notification']))
389 monitor_url, disambiguate_url(hub.client_info['notification']))
384 kwargs = dict(logname='scheduler', loglevel=self.log_level,
390 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 log_url = self.log_url, config=dict(self.config))
391 log_url = self.log_url, config=dict(self.config))
@@ -211,24 +211,35 b' class IPEngineApp(BaseParallelApplication):'
211 with open(self.url_file) as f:
211 with open(self.url_file) as f:
212 d = json.loads(f.read())
212 d = json.loads(f.read())
213
213
214 if 'exec_key' in d:
214 # allow hand-override of location for disambiguation
215 config.Session.key = cast_bytes(d['exec_key'])
215 # and ssh-server
216
217 try:
216 try:
218 config.EngineFactory.location
217 config.EngineFactory.location
219 except AttributeError:
218 except AttributeError:
220 config.EngineFactory.location = d['location']
219 config.EngineFactory.location = d['location']
221
220
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 try:
224 config.EngineFactory.url
225 except AttributeError:
226 config.EngineFactory.url = d['url']
227
228 try:
221 try:
229 config.EngineFactory.sshserver
222 config.EngineFactory.sshserver
230 except AttributeError:
223 except AttributeError:
231 config.EngineFactory.sshserver = d['ssh']
224 config.EngineFactory.sshserver = d.get('ssh')
225
226 location = config.EngineFactory.location
227
228 for key in ('registration', 'hb_ping', 'hb_pong', 'mux', 'task', 'control'):
229 d[key] = disambiguate_url(d[key], location)
230
231 # DO NOT allow override of basic URLs, serialization, or exec_key
232 # JSON file takes top priority there
233 config.Session.key = asbytes(d['exec_key'])
234
235 config.EngineFactory.url = d['registration']
236
237 config.Session.packer = d['pack']
238 config.Session.unpacker = d['unpack']
239
240 self.log.debug("Config changed:")
241 self.log.debug("%r", config)
242 self.connection_info = d
232
243
233 def bind_kernel(self, **kwargs):
244 def bind_kernel(self, **kwargs):
234 """Promote engine to listening kernel, accessible to frontends."""
245 """Promote engine to listening kernel, accessible to frontends."""
@@ -320,7 +331,9 b' class IPEngineApp(BaseParallelApplication):'
320 # shell_class = import_item(self.master_config.Global.shell_class)
331 # shell_class = import_item(self.master_config.Global.shell_class)
321 # print self.config
332 # print self.config
322 try:
333 try:
323 self.engine = EngineFactory(config=config, log=self.log)
334 self.engine = EngineFactory(config=config, log=self.log,
335 connection_info=self.connection_info,
336 )
324 except:
337 except:
325 self.log.error("Couldn't start the Engine", exc_info=True)
338 self.log.error("Couldn't start the Engine", exc_info=True)
326 self.exit(1)
339 self.exit(1)
@@ -217,7 +217,9 b' class Client(HasTraits):'
217 Parameters
217 Parameters
218 ----------
218 ----------
219
219
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 url_file : str/unicode; path to ipcontroller-client.json
221 This JSON file should contain all the information needed to connect to a cluster,
222 and is likely the only argument needed.
221 Connection information for the Hub's registration. If a json connector
223 Connection information for the Hub's registration. If a json connector
222 file is given, then likely no further configuration is necessary.
224 file is given, then likely no further configuration is necessary.
223 [Default: use profile]
225 [Default: use profile]
@@ -239,14 +241,6 b' class Client(HasTraits):'
239 If specified, this will be relayed to the Session for configuration
241 If specified, this will be relayed to the Session for configuration
240 username : str
242 username : str
241 set username for the session object
243 set username for the session object
242 packer : str (import_string) or callable
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244 function to serialize messages. Must support same input as
245 JSON, and output must be bytes.
246 You can pass a callable directly as `pack`
247 unpacker : str (import_string) or callable
248 The inverse of packer. Only necessary if packer is specified as *not* one
249 of 'json' or 'pickle'.
250
244
251 #-------------- ssh related args ----------------
245 #-------------- ssh related args ----------------
252 # These are args for configuring the ssh tunnel to be used
246 # These are args for configuring the ssh tunnel to be used
@@ -271,17 +265,6 b' class Client(HasTraits):'
271 flag for whether to use paramiko instead of shell ssh for tunneling.
265 flag for whether to use paramiko instead of shell ssh for tunneling.
272 [default: True on win32, False else]
266 [default: True on win32, False else]
273
267
274 ------- exec authentication args -------
275 If even localhost is untrusted, you can have some protection against
276 unauthorized execution by signing messages with HMAC digests.
277 Messages are still sent as cleartext, so if someone can snoop your
278 loopback traffic this will not protect your privacy, but will prevent
279 unauthorized execution.
280
281 exec_key : str
282 an authentication key or file containing a key
283 default: None
284
285
268
286 Attributes
269 Attributes
287 ----------
270 ----------
@@ -378,8 +361,8 b' class Client(HasTraits):'
378 # don't raise on positional args
361 # don't raise on positional args
379 return HasTraits.__new__(self, **kw)
362 return HasTraits.__new__(self, **kw)
380
363
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
364 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
382 context=None, debug=False, exec_key=None,
365 context=None, debug=False,
383 sshserver=None, sshkey=None, password=None, paramiko=None,
366 sshserver=None, sshkey=None, password=None, paramiko=None,
384 timeout=10, **extra_args
367 timeout=10, **extra_args
385 ):
368 ):
@@ -391,38 +374,38 b' class Client(HasTraits):'
391 context = zmq.Context.instance()
374 context = zmq.Context.instance()
392 self._context = context
375 self._context = context
393 self._stop_spinning = Event()
376 self._stop_spinning = Event()
377
378 if 'url_or_file' in extra_args:
379 url_file = extra_args['url_or_file']
380 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381
382 if url_file and util.is_url(url_file):
383 raise ValueError("single urls cannot be specified, url-files must be used.")
394
384
395 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386
396 if self._cd is not None:
387 if self._cd is not None:
397 if url_or_file is None:
388 if url_file is None:
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
389 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399 if url_or_file is None:
390 if url_file is None:
400 raise ValueError(
391 raise ValueError(
401 "I can't find enough information to connect to a hub!"
392 "I can't find enough information to connect to a hub!"
402 " Please specify at least one of url_or_file or profile."
393 " Please specify at least one of url_file or profile."
403 )
394 )
404
395
405 if not util.is_url(url_or_file):
396 with open(url_file) as f:
406 # it's not a url, try for a file
397 cfg = json.load(f)
407 if not os.path.exists(url_or_file):
398
408 if self._cd:
399 self._task_scheme = cfg['task_scheme']
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410 if not os.path.exists(url_or_file):
411 raise IOError("Connection file not found: %r" % url_or_file)
412 with open(url_or_file) as f:
413 cfg = json.loads(f.read())
414 else:
415 cfg = {'url':url_or_file}
416
400
417 # sync defaults from args, json:
401 # sync defaults from args, json:
418 if sshserver:
402 if sshserver:
419 cfg['ssh'] = sshserver
403 cfg['ssh'] = sshserver
420 if exec_key:
404
421 cfg['exec_key'] = exec_key
422 exec_key = cfg['exec_key']
423 location = cfg.setdefault('location', None)
405 location = cfg.setdefault('location', None)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
406 for key in ('control', 'task', 'mux', 'notification', 'registration'):
425 url = cfg['url']
407 cfg[key] = util.disambiguate_url(cfg[key], location)
408 url = cfg['registration']
426 proto,addr,port = util.split_url(url)
409 proto,addr,port = util.split_url(url)
427 if location is not None and addr == '127.0.0.1':
410 if location is not None and addr == '127.0.0.1':
428 # location specified, and connection is expected to be local
411 # location specified, and connection is expected to be local
@@ -457,12 +440,10 b' class Client(HasTraits):'
457 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
440 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458
441
459 # configure and construct the session
442 # configure and construct the session
460 if exec_key is not None:
443 extra_args['packer'] = cfg['pack']
461 if os.path.isfile(exec_key):
444 extra_args['unpacker'] = cfg['unpack']
462 extra_args['keyfile'] = exec_key
445 extra_args['key'] = cfg['exec_key']
463 else:
446
464 exec_key = cast_bytes(exec_key)
465 extra_args['key'] = exec_key
466 self.session = Session(**extra_args)
447 self.session = Session(**extra_args)
467
448
468 self._query_socket = self._context.socket(zmq.DEALER)
449 self._query_socket = self._context.socket(zmq.DEALER)
@@ -583,7 +564,7 b' class Client(HasTraits):'
583 self._connected=True
564 self._connected=True
584
565
585 def connect_socket(s, url):
566 def connect_socket(s, url):
586 url = util.disambiguate_url(url, self._config['location'])
567 # url = util.disambiguate_url(url, self._config['location'])
587 if self._ssh:
568 if self._ssh:
588 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
569 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 else:
570 else:
@@ -600,30 +581,28 b' class Client(HasTraits):'
600 idents,msg = self.session.recv(self._query_socket,mode=0)
581 idents,msg = self.session.recv(self._query_socket,mode=0)
601 if self.debug:
582 if self.debug:
602 pprint(msg)
583 pprint(msg)
603 msg = Message(msg)
584 content = msg['content']
604 content = msg.content
585 # self._config['registration'] = dict(content)
605 self._config['registration'] = dict(content)
586 cfg = self._config
606 if content.status == 'ok':
587 if content['status'] == 'ok':
607 ident = self.session.bsession
588 self._mux_socket = self._context.socket(zmq.DEALER)
608 if content.mux:
589 connect_socket(self._mux_socket, cfg['mux'])
609 self._mux_socket = self._context.socket(zmq.DEALER)
590
610 connect_socket(self._mux_socket, content.mux)
591 self._task_socket = self._context.socket(zmq.DEALER)
611 if content.task:
592 connect_socket(self._task_socket, cfg['task'])
612 self._task_scheme, task_addr = content.task
593
613 self._task_socket = self._context.socket(zmq.DEALER)
594 self._notification_socket = self._context.socket(zmq.SUB)
614 connect_socket(self._task_socket, task_addr)
595 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
615 if content.notification:
596 connect_socket(self._notification_socket, cfg['notification'])
616 self._notification_socket = self._context.socket(zmq.SUB)
597
617 connect_socket(self._notification_socket, content.notification)
598 self._control_socket = self._context.socket(zmq.DEALER)
618 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
599 connect_socket(self._control_socket, cfg['control'])
619 if content.control:
600
620 self._control_socket = self._context.socket(zmq.DEALER)
601 self._iopub_socket = self._context.socket(zmq.SUB)
621 connect_socket(self._control_socket, content.control)
602 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
622 if content.iopub:
603 connect_socket(self._iopub_socket, cfg['iopub'])
623 self._iopub_socket = self._context.socket(zmq.SUB)
604
624 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
605 self._update_engines(dict(content['engines']))
625 connect_socket(self._iopub_socket, content.iopub)
626 self._update_engines(dict(content.engines))
627 else:
606 else:
628 self._connected = False
607 self._connected = False
629 raise Exception("Failed to connect!")
608 raise Exception("Failed to connect!")
@@ -239,30 +239,61 b' class HubFactory(RegistrationFactory):'
239 ctx = self.context
239 ctx = self.context
240 loop = self.loop
240 loop = self.loop
241
241
242 try:
243 scheme = self.config.TaskScheduler.scheme_name
244 except AttributeError:
245 from .scheduler import TaskScheduler
246 scheme = TaskScheduler.scheme_name.get_default_value()
247
248 # build connection dicts
249 engine = self.engine_info = {
250 'registration' : engine_iface % self.regport,
251 'control' : engine_iface % self.control[1],
252 'mux' : engine_iface % self.mux[1],
253 'hb_ping' : engine_iface % self.hb[0],
254 'hb_pong' : engine_iface % self.hb[1],
255 'task' : engine_iface % self.task[1],
256 'iopub' : engine_iface % self.iopub[1],
257 }
258
259 client = self.client_info = {
260 'registration' : client_iface % self.regport,
261 'control' : client_iface % self.control[0],
262 'mux' : client_iface % self.mux[0],
263 'task' : client_iface % self.task[0],
264 'task_scheme' : scheme,
265 'iopub' : client_iface % self.iopub[0],
266 'notification' : client_iface % self.notifier_port,
267 }
268
269 self.log.debug("Hub engine addrs: %s", self.engine_info)
270 self.log.debug("Hub client addrs: %s", self.client_info)
271
242 # Registrar socket
272 # Registrar socket
243 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
273 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
244 q.bind(client_iface % self.regport)
274 q.bind(client['registration'])
245 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
275 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
246 if self.client_ip != self.engine_ip:
276 if self.client_ip != self.engine_ip:
247 q.bind(engine_iface % self.regport)
277 q.bind(engine['registration'])
248 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
278 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
249
279
250 ### Engine connections ###
280 ### Engine connections ###
251
281
252 # heartbeat
282 # heartbeat
253 hpub = ctx.socket(zmq.PUB)
283 hpub = ctx.socket(zmq.PUB)
254 hpub.bind(engine_iface % self.hb[0])
284 hpub.bind(engine['hb_ping'])
255 hrep = ctx.socket(zmq.ROUTER)
285 hrep = ctx.socket(zmq.ROUTER)
256 hrep.bind(engine_iface % self.hb[1])
286 hrep.bind(engine['hb_pong'])
257 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
287 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
258 pingstream=ZMQStream(hpub,loop),
288 pingstream=ZMQStream(hpub,loop),
259 pongstream=ZMQStream(hrep,loop)
289 pongstream=ZMQStream(hrep,loop)
260 )
290 )
261
291
262 ### Client connections ###
292 ### Client connections ###
293
263 # Notifier socket
294 # Notifier socket
264 n = ZMQStream(ctx.socket(zmq.PUB), loop)
295 n = ZMQStream(ctx.socket(zmq.PUB), loop)
265 n.bind(client_iface%self.notifier_port)
296 n.bind(client['notification'])
266
297
267 ### build and launch the queues ###
298 ### build and launch the queues ###
268
299
@@ -279,35 +310,10 b' class HubFactory(RegistrationFactory):'
279 self.db = import_item(str(db_class))(session=self.session.session,
310 self.db = import_item(str(db_class))(session=self.session.session,
280 config=self.config, log=self.log)
311 config=self.config, log=self.log)
281 time.sleep(.25)
312 time.sleep(.25)
282 try:
283 scheme = self.config.TaskScheduler.scheme_name
284 except AttributeError:
285 from .scheduler import TaskScheduler
286 scheme = TaskScheduler.scheme_name.get_default_value()
287 # build connection dicts
288 self.engine_info = {
289 'control' : engine_iface%self.control[1],
290 'mux': engine_iface%self.mux[1],
291 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
292 'task' : engine_iface%self.task[1],
293 'iopub' : engine_iface%self.iopub[1],
294 # 'monitor' : engine_iface%self.mon_port,
295 }
296
297 self.client_info = {
298 'control' : client_iface%self.control[0],
299 'mux': client_iface%self.mux[0],
300 'task' : (scheme, client_iface%self.task[0]),
301 'iopub' : client_iface%self.iopub[0],
302 'notification': client_iface%self.notifier_port
303 }
304 self.log.debug("Hub engine addrs: %s", self.engine_info)
305 self.log.debug("Hub client addrs: %s", self.client_info)
306
313
307 # resubmit stream
314 # resubmit stream
308 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
315 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
309 url = util.disambiguate_url(self.client_info['task'][-1])
316 url = util.disambiguate_url(self.client_info['task'])
310 r.setsockopt(zmq.IDENTITY, self.session.bsession)
311 r.connect(url)
317 r.connect(url)
312
318
313 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
319 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -384,8 +390,8 b' class Hub(SessionFactory):'
384
390
385 # validate connection dicts:
391 # validate connection dicts:
386 for k,v in self.client_info.iteritems():
392 for k,v in self.client_info.iteritems():
387 if k == 'task':
393 if k == 'task_scheme':
388 util.validate_url_container(v[1])
394 continue
389 else:
395 else:
390 util.validate_url_container(v)
396 util.validate_url_container(v)
391 # util.validate_url_container(self.client_info)
397 # util.validate_url_container(self.client_info)
@@ -865,7 +871,6 b' class Hub(SessionFactory):'
865 """Reply with connection addresses for clients."""
871 """Reply with connection addresses for clients."""
866 self.log.info("client::client %r connected", client_id)
872 self.log.info("client::client %r connected", client_id)
867 content = dict(status='ok')
873 content = dict(status='ok')
868 content.update(self.client_info)
869 jsonable = {}
874 jsonable = {}
870 for k,v in self.keytable.iteritems():
875 for k,v in self.keytable.iteritems():
871 if v not in self.dead_engines:
876 if v not in self.dead_engines:
@@ -891,7 +896,6 b' class Hub(SessionFactory):'
891 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
896 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
892
897
893 content = dict(id=eid,status='ok')
898 content = dict(id=eid,status='ok')
894 content.update(self.engine_info)
895 # check if requesting available IDs:
899 # check if requesting available IDs:
896 if queue in self.by_ident:
900 if queue in self.by_ident:
897 try:
901 try:
@@ -50,7 +50,7 b' class EngineFactory(RegistrationFactory):'
50 help="""The location (an IP address) of the controller. This is
50 help="""The location (an IP address) of the controller. This is
51 used for disambiguating URLs, to determine whether
51 used for disambiguating URLs, to determine whether
52 loopback should be used to connect or the public address.""")
52 loopback should be used to connect or the public address.""")
53 timeout=CFloat(2,config=True,
53 timeout=CFloat(5, config=True,
54 help="""The time (in seconds) to wait for the Controller to respond
54 help="""The time (in seconds) to wait for the Controller to respond
55 to registration requests before giving up.""")
55 to registration requests before giving up.""")
56 sshserver=Unicode(config=True,
56 sshserver=Unicode(config=True,
@@ -61,10 +61,11 b' class EngineFactory(RegistrationFactory):'
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62
62
63 # not configurable:
63 # not configurable:
64 user_ns=Dict()
64 connection_info = Dict()
65 id=Integer(allow_none=True)
65 user_ns = Dict()
66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
66 id = Integer(allow_none=True)
67 kernel=Instance(Kernel)
67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 kernel = Instance(Kernel)
68
69
69 bident = CBytes()
70 bident = CBytes()
70 ident = Unicode()
71 ident = Unicode()
@@ -96,7 +97,7 b' class EngineFactory(RegistrationFactory):'
96 def connect(s, url):
97 def connect(s, url):
97 url = disambiguate_url(url, self.location)
98 url = disambiguate_url(url, self.location)
98 if self.using_ssh:
99 if self.using_ssh:
99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
100 return tunnel.tunnel_connection(s, url, self.sshserver,
101 return tunnel.tunnel_connection(s, url, self.sshserver,
101 keyfile=self.sshkey, paramiko=self.paramiko,
102 keyfile=self.sshkey, paramiko=self.paramiko,
102 password=password,
103 password=password,
@@ -108,12 +109,12 b' class EngineFactory(RegistrationFactory):'
108 """like connect, but don't complete the connection (for use by heartbeat)"""
109 """like connect, but don't complete the connection (for use by heartbeat)"""
109 url = disambiguate_url(url, self.location)
110 url = disambiguate_url(url, self.location)
110 if self.using_ssh:
111 if self.using_ssh:
111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
112 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 keyfile=self.sshkey, paramiko=self.paramiko,
114 keyfile=self.sshkey, paramiko=self.paramiko,
114 password=password,
115 password=password,
115 )
116 )
116 return url
117 return str(url)
117 return connect, maybe_tunnel
118 return connect, maybe_tunnel
118
119
119 def register(self):
120 def register(self):
@@ -131,7 +132,7 b' class EngineFactory(RegistrationFactory):'
131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 # print (self.session.key)
134 # print (self.session.key)
134 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request", content=content)
135
136
136 def complete_registration(self, msg, connect, maybe_tunnel):
137 def complete_registration(self, msg, connect, maybe_tunnel):
137 # print msg
138 # print msg
@@ -140,50 +141,39 b' class EngineFactory(RegistrationFactory):'
140 loop = self.loop
141 loop = self.loop
141 identity = self.bident
142 identity = self.bident
142 idents,msg = self.session.feed_identities(msg)
143 idents,msg = self.session.feed_identities(msg)
143 msg = Message(self.session.unserialize(msg))
144 msg = self.session.unserialize(msg)
144
145 content = msg['content']
145 if msg.content.status == 'ok':
146 info = self.connection_info
146 self.id = int(msg.content.id)
147
148 if content['status'] == 'ok':
149 self.id = int(content['id'])
147
150
148 # launch heartbeat
151 # launch heartbeat
149 hb_addrs = msg.content.heartbeat
150
151 # possibly forward hb ports with tunnels
152 # possibly forward hb ports with tunnels
152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 hb_ping = maybe_tunnel(info['hb_ping'])
153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
154 hb_pong = maybe_tunnel(info['hb_pong'])
155
156 heart = Heart(hb_ping, hb_pong, heart_id=identity)
154 heart.start()
157 heart.start()
155
158
156 # create Shell Streams (MUX, Task, etc.):
159 # create Shell Connections (MUX, Task, etc.):
157 queue_addr = msg.content.mux
160 shell_addrs = map(str, [info['mux'], info['task']])
158 shell_addrs = [ str(queue_addr) ]
161
159 task_addr = msg.content.task
162 # Use only one shell stream for mux and tasks
160 if task_addr:
161 shell_addrs.append(str(task_addr))
162
163 # Uncomment this to go back to two-socket model
164 # shell_streams = []
165 # for addr in shell_addrs:
166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.connect(disambiguate_url(addr, self.location))
169 # shell_streams.append(stream)
170
171 # Now use only one shell stream for mux and tasks
172 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
163 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 stream.setsockopt(zmq.IDENTITY, identity)
164 stream.setsockopt(zmq.IDENTITY, identity)
174 shell_streams = [stream]
165 shell_streams = [stream]
175 for addr in shell_addrs:
166 for addr in shell_addrs:
176 connect(stream, addr)
167 connect(stream, addr)
177 # end single stream-socket
178
168
179 # control stream:
169 # control stream:
180 control_addr = str(msg.content.control)
170 control_addr = str(info['control'])
181 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
171 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 control_stream.setsockopt(zmq.IDENTITY, identity)
172 control_stream.setsockopt(zmq.IDENTITY, identity)
183 connect(control_stream, control_addr)
173 connect(control_stream, control_addr)
184
174
185 # create iopub stream:
175 # create iopub stream:
186 iopub_addr = msg.content.iopub
176 iopub_addr = info['iopub']
187 iopub_socket = ctx.socket(zmq.PUB)
177 iopub_socket = ctx.socket(zmq.PUB)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
178 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 connect(iopub_socket, iopub_addr)
179 connect(iopub_socket, iopub_addr)
@@ -257,9 +257,11 b' class Session(Configurable):'
257 if new.lower() == 'json':
257 if new.lower() == 'json':
258 self.pack = json_packer
258 self.pack = json_packer
259 self.unpack = json_unpacker
259 self.unpack = json_unpacker
260 self.unpacker = new
260 elif new.lower() == 'pickle':
261 elif new.lower() == 'pickle':
261 self.pack = pickle_packer
262 self.pack = pickle_packer
262 self.unpack = pickle_unpacker
263 self.unpack = pickle_unpacker
264 self.unpacker = new
263 else:
265 else:
264 self.pack = import_item(str(new))
266 self.pack = import_item(str(new))
265
267
@@ -270,9 +272,11 b' class Session(Configurable):'
270 if new.lower() == 'json':
272 if new.lower() == 'json':
271 self.pack = json_packer
273 self.pack = json_packer
272 self.unpack = json_unpacker
274 self.unpack = json_unpacker
275 self.packer = new
273 elif new.lower() == 'pickle':
276 elif new.lower() == 'pickle':
274 self.pack = pickle_packer
277 self.pack = pickle_packer
275 self.unpack = pickle_unpacker
278 self.unpack = pickle_unpacker
279 self.packer = new
276 else:
280 else:
277 self.unpack = import_item(str(new))
281 self.unpack = import_item(str(new))
278
282
General Comments 0
You need to be logged in to leave comments. Login now