Show More
@@ -209,7 +209,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
209 | def save_connection_dict(self, fname, cdict): |
|
209 | def save_connection_dict(self, fname, cdict): | |
210 | """save a connection dict to json file.""" |
|
210 | """save a connection dict to json file.""" | |
211 | c = self.config |
|
211 | c = self.config | |
212 |
url = cdict[' |
|
212 | url = cdict['registration'] | |
213 | location = cdict['location'] |
|
213 | location = cdict['location'] | |
214 | if not location: |
|
214 | if not location: | |
215 | try: |
|
215 | try: | |
@@ -314,15 +314,21 b' class IPControllerApp(BaseParallelApplication):' | |||||
314 | if self.write_connection_files: |
|
314 | if self.write_connection_files: | |
315 | # save to new json config files |
|
315 | # save to new json config files | |
316 | f = self.factory |
|
316 | f = self.factory | |
317 | cdict = {'exec_key' : f.session.key.decode('ascii'), |
|
317 | base = { | |
318 | 'ssh' : self.ssh_server, |
|
318 | 'exec_key' : f.session.key.decode('ascii'), | |
319 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), |
|
319 | 'location' : self.location, | |
320 | 'location' : self.location |
|
320 | 'pack' : f.session.packer, | |
|
321 | 'unpack' : f.session.unpacker, | |||
321 |
|
|
322 | } | |
|
323 | ||||
|
324 | cdict = {'ssh' : self.ssh_server} | |||
|
325 | cdict.update(f.client_info) | |||
|
326 | cdict.update(base) | |||
322 | self.save_connection_dict(self.client_json_file, cdict) |
|
327 | self.save_connection_dict(self.client_json_file, cdict) | |
323 |
|
|
328 | ||
324 | edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) |
|
329 | edict = {'ssh' : self.engine_ssh_server} | |
325 |
edict |
|
330 | edict.update(f.engine_info) | |
|
331 | edict.update(base) | |||
326 | self.save_connection_dict(self.engine_json_file, edict) |
|
332 | self.save_connection_dict(self.engine_json_file, edict) | |
327 |
|
333 | |||
328 | def init_schedulers(self): |
|
334 | def init_schedulers(self): | |
@@ -379,7 +385,7 b' class IPControllerApp(BaseParallelApplication):' | |||||
379 |
|
385 | |||
380 | else: |
|
386 | else: | |
381 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
387 | self.log.info("task::using Python %s Task scheduler"%scheme) | |
382 |
sargs = (hub.client_info['task'] |
|
388 | sargs = (hub.client_info['task'], hub.engine_info['task'], | |
383 | monitor_url, disambiguate_url(hub.client_info['notification'])) |
|
389 | monitor_url, disambiguate_url(hub.client_info['notification'])) | |
384 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
390 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | |
385 | log_url = self.log_url, config=dict(self.config)) |
|
391 | log_url = self.log_url, config=dict(self.config)) |
@@ -211,24 +211,35 b' class IPEngineApp(BaseParallelApplication):' | |||||
211 | with open(self.url_file) as f: |
|
211 | with open(self.url_file) as f: | |
212 | d = json.loads(f.read()) |
|
212 | d = json.loads(f.read()) | |
213 |
|
213 | |||
214 | if 'exec_key' in d: |
|
214 | # allow hand-override of location for disambiguation | |
215 | config.Session.key = cast_bytes(d['exec_key']) |
|
215 | # and ssh-server | |
216 |
|
||||
217 | try: |
|
216 | try: | |
218 | config.EngineFactory.location |
|
217 | config.EngineFactory.location | |
219 | except AttributeError: |
|
218 | except AttributeError: | |
220 | config.EngineFactory.location = d['location'] |
|
219 | config.EngineFactory.location = d['location'] | |
221 |
|
220 | |||
222 | d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) |
|
|||
223 | try: |
|
|||
224 | config.EngineFactory.url |
|
|||
225 | except AttributeError: |
|
|||
226 | config.EngineFactory.url = d['url'] |
|
|||
227 |
|
||||
228 | try: |
|
221 | try: | |
229 | config.EngineFactory.sshserver |
|
222 | config.EngineFactory.sshserver | |
230 | except AttributeError: |
|
223 | except AttributeError: | |
231 |
config.EngineFactory.sshserver = d |
|
224 | config.EngineFactory.sshserver = d.get('ssh') | |
|
225 | ||||
|
226 | location = config.EngineFactory.location | |||
|
227 | ||||
|
228 | for key in ('registration', 'hb_ping', 'hb_pong', 'mux', 'task', 'control'): | |||
|
229 | d[key] = disambiguate_url(d[key], location) | |||
|
230 | ||||
|
231 | # DO NOT allow override of basic URLs, serialization, or exec_key | |||
|
232 | # JSON file takes top priority there | |||
|
233 | config.Session.key = asbytes(d['exec_key']) | |||
|
234 | ||||
|
235 | config.EngineFactory.url = d['registration'] | |||
|
236 | ||||
|
237 | config.Session.packer = d['pack'] | |||
|
238 | config.Session.unpacker = d['unpack'] | |||
|
239 | ||||
|
240 | self.log.debug("Config changed:") | |||
|
241 | self.log.debug("%r", config) | |||
|
242 | self.connection_info = d | |||
232 |
|
243 | |||
233 | def bind_kernel(self, **kwargs): |
|
244 | def bind_kernel(self, **kwargs): | |
234 | """Promote engine to listening kernel, accessible to frontends.""" |
|
245 | """Promote engine to listening kernel, accessible to frontends.""" | |
@@ -320,7 +331,9 b' class IPEngineApp(BaseParallelApplication):' | |||||
320 | # shell_class = import_item(self.master_config.Global.shell_class) |
|
331 | # shell_class = import_item(self.master_config.Global.shell_class) | |
321 | # print self.config |
|
332 | # print self.config | |
322 | try: |
|
333 | try: | |
323 |
self.engine = EngineFactory(config=config, log=self.log |
|
334 | self.engine = EngineFactory(config=config, log=self.log, | |
|
335 | connection_info=self.connection_info, | |||
|
336 | ) | |||
324 | except: |
|
337 | except: | |
325 | self.log.error("Couldn't start the Engine", exc_info=True) |
|
338 | self.log.error("Couldn't start the Engine", exc_info=True) | |
326 | self.exit(1) |
|
339 | self.exit(1) |
@@ -217,7 +217,9 b' class Client(HasTraits):' | |||||
217 | Parameters |
|
217 | Parameters | |
218 | ---------- |
|
218 | ---------- | |
219 |
|
219 | |||
220 |
url_ |
|
220 | url_file : str/unicode; path to ipcontroller-client.json | |
|
221 | This JSON file should contain all the information needed to connect to a cluster, | |||
|
222 | and is likely the only argument needed. | |||
221 | Connection information for the Hub's registration. If a json connector |
|
223 | Connection information for the Hub's registration. If a json connector | |
222 | file is given, then likely no further configuration is necessary. |
|
224 | file is given, then likely no further configuration is necessary. | |
223 | [Default: use profile] |
|
225 | [Default: use profile] | |
@@ -239,14 +241,6 b' class Client(HasTraits):' | |||||
239 | If specified, this will be relayed to the Session for configuration |
|
241 | If specified, this will be relayed to the Session for configuration | |
240 | username : str |
|
242 | username : str | |
241 | set username for the session object |
|
243 | set username for the session object | |
242 | packer : str (import_string) or callable |
|
|||
243 | Can be either the simple keyword 'json' or 'pickle', or an import_string to a |
|
|||
244 | function to serialize messages. Must support same input as |
|
|||
245 | JSON, and output must be bytes. |
|
|||
246 | You can pass a callable directly as `pack` |
|
|||
247 | unpacker : str (import_string) or callable |
|
|||
248 | The inverse of packer. Only necessary if packer is specified as *not* one |
|
|||
249 | of 'json' or 'pickle'. |
|
|||
250 |
|
244 | |||
251 | #-------------- ssh related args ---------------- |
|
245 | #-------------- ssh related args ---------------- | |
252 | # These are args for configuring the ssh tunnel to be used |
|
246 | # These are args for configuring the ssh tunnel to be used | |
@@ -271,17 +265,6 b' class Client(HasTraits):' | |||||
271 | flag for whether to use paramiko instead of shell ssh for tunneling. |
|
265 | flag for whether to use paramiko instead of shell ssh for tunneling. | |
272 | [default: True on win32, False else] |
|
266 | [default: True on win32, False else] | |
273 |
|
267 | |||
274 | ------- exec authentication args ------- |
|
|||
275 | If even localhost is untrusted, you can have some protection against |
|
|||
276 | unauthorized execution by signing messages with HMAC digests. |
|
|||
277 | Messages are still sent as cleartext, so if someone can snoop your |
|
|||
278 | loopback traffic this will not protect your privacy, but will prevent |
|
|||
279 | unauthorized execution. |
|
|||
280 |
|
||||
281 | exec_key : str |
|
|||
282 | an authentication key or file containing a key |
|
|||
283 | default: None |
|
|||
284 |
|
||||
285 |
|
268 | |||
286 | Attributes |
|
269 | Attributes | |
287 | ---------- |
|
270 | ---------- | |
@@ -378,8 +361,8 b' class Client(HasTraits):' | |||||
378 | # don't raise on positional args |
|
361 | # don't raise on positional args | |
379 | return HasTraits.__new__(self, **kw) |
|
362 | return HasTraits.__new__(self, **kw) | |
380 |
|
363 | |||
381 |
def __init__(self, url_ |
|
364 | def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None, | |
382 |
context=None, debug=False, |
|
365 | context=None, debug=False, | |
383 | sshserver=None, sshkey=None, password=None, paramiko=None, |
|
366 | sshserver=None, sshkey=None, password=None, paramiko=None, | |
384 | timeout=10, **extra_args |
|
367 | timeout=10, **extra_args | |
385 | ): |
|
368 | ): | |
@@ -392,37 +375,37 b' class Client(HasTraits):' | |||||
392 | self._context = context |
|
375 | self._context = context | |
393 | self._stop_spinning = Event() |
|
376 | self._stop_spinning = Event() | |
394 |
|
377 | |||
|
378 | if 'url_or_file' in extra_args: | |||
|
379 | url_file = extra_args['url_or_file'] | |||
|
380 | warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning) | |||
|
381 | ||||
|
382 | if url_file and util.is_url(url_file): | |||
|
383 | raise ValueError("single urls cannot be specified, url-files must be used.") | |||
|
384 | ||||
395 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) |
|
385 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) | |
|
386 | ||||
396 | if self._cd is not None: |
|
387 | if self._cd is not None: | |
397 |
if url_ |
|
388 | if url_file is None: | |
398 |
url_ |
|
389 | url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | |
399 |
if url_ |
|
390 | if url_file is None: | |
400 | raise ValueError( |
|
391 | raise ValueError( | |
401 | "I can't find enough information to connect to a hub!" |
|
392 | "I can't find enough information to connect to a hub!" | |
402 |
" Please specify at least one of url_ |
|
393 | " Please specify at least one of url_file or profile." | |
403 | ) |
|
394 | ) | |
404 |
|
395 | |||
405 |
|
|
396 | with open(url_file) as f: | |
406 | # it's not a url, try for a file |
|
397 | cfg = json.load(f) | |
407 | if not os.path.exists(url_or_file): |
|
398 | ||
408 | if self._cd: |
|
399 | self._task_scheme = cfg['task_scheme'] | |
409 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) |
|
|||
410 | if not os.path.exists(url_or_file): |
|
|||
411 | raise IOError("Connection file not found: %r" % url_or_file) |
|
|||
412 | with open(url_or_file) as f: |
|
|||
413 | cfg = json.loads(f.read()) |
|
|||
414 | else: |
|
|||
415 | cfg = {'url':url_or_file} |
|
|||
416 |
|
400 | |||
417 | # sync defaults from args, json: |
|
401 | # sync defaults from args, json: | |
418 | if sshserver: |
|
402 | if sshserver: | |
419 | cfg['ssh'] = sshserver |
|
403 | cfg['ssh'] = sshserver | |
420 | if exec_key: |
|
404 | ||
421 | cfg['exec_key'] = exec_key |
|
|||
422 | exec_key = cfg['exec_key'] |
|
|||
423 | location = cfg.setdefault('location', None) |
|
405 | location = cfg.setdefault('location', None) | |
424 | cfg['url'] = util.disambiguate_url(cfg['url'], location) |
|
406 | for key in ('control', 'task', 'mux', 'notification', 'registration'): | |
425 | url = cfg['url'] |
|
407 | cfg[key] = util.disambiguate_url(cfg[key], location) | |
|
408 | url = cfg['registration'] | |||
426 | proto,addr,port = util.split_url(url) |
|
409 | proto,addr,port = util.split_url(url) | |
427 | if location is not None and addr == '127.0.0.1': |
|
410 | if location is not None and addr == '127.0.0.1': | |
428 | # location specified, and connection is expected to be local |
|
411 | # location specified, and connection is expected to be local | |
@@ -457,12 +440,10 b' class Client(HasTraits):' | |||||
457 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) |
|
440 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) | |
458 |
|
441 | |||
459 | # configure and construct the session |
|
442 | # configure and construct the session | |
460 | if exec_key is not None: |
|
443 | extra_args['packer'] = cfg['pack'] | |
461 | if os.path.isfile(exec_key): |
|
444 | extra_args['unpacker'] = cfg['unpack'] | |
462 |
|
|
445 | extra_args['key'] = cfg['exec_key'] | |
463 |
|
|
446 | ||
464 | exec_key = cast_bytes(exec_key) |
|
|||
465 | extra_args['key'] = exec_key |
|
|||
466 | self.session = Session(**extra_args) |
|
447 | self.session = Session(**extra_args) | |
467 |
|
448 | |||
468 | self._query_socket = self._context.socket(zmq.DEALER) |
|
449 | self._query_socket = self._context.socket(zmq.DEALER) | |
@@ -583,7 +564,7 b' class Client(HasTraits):' | |||||
583 | self._connected=True |
|
564 | self._connected=True | |
584 |
|
565 | |||
585 | def connect_socket(s, url): |
|
566 | def connect_socket(s, url): | |
586 | url = util.disambiguate_url(url, self._config['location']) |
|
567 | # url = util.disambiguate_url(url, self._config['location']) | |
587 | if self._ssh: |
|
568 | if self._ssh: | |
588 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) |
|
569 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) | |
589 | else: |
|
570 | else: | |
@@ -600,30 +581,28 b' class Client(HasTraits):' | |||||
600 | idents,msg = self.session.recv(self._query_socket,mode=0) |
|
581 | idents,msg = self.session.recv(self._query_socket,mode=0) | |
601 | if self.debug: |
|
582 | if self.debug: | |
602 | pprint(msg) |
|
583 | pprint(msg) | |
603 | msg = Message(msg) |
|
584 | content = msg['content'] | |
604 | content = msg.content |
|
585 | # self._config['registration'] = dict(content) | |
605 | self._config['registration'] = dict(content) |
|
586 | cfg = self._config | |
606 |
if content |
|
587 | if content['status'] == 'ok': | |
607 | ident = self.session.bsession |
|
|||
608 | if content.mux: |
|
|||
609 |
|
|
588 | self._mux_socket = self._context.socket(zmq.DEALER) | |
610 |
|
|
589 | connect_socket(self._mux_socket, cfg['mux']) | |
611 | if content.task: |
|
590 | ||
612 | self._task_scheme, task_addr = content.task |
|
|||
613 |
|
|
591 | self._task_socket = self._context.socket(zmq.DEALER) | |
614 |
|
|
592 | connect_socket(self._task_socket, cfg['task']) | |
615 | if content.notification: |
|
593 | ||
616 |
|
|
594 | self._notification_socket = self._context.socket(zmq.SUB) | |
617 | connect_socket(self._notification_socket, content.notification) |
|
|||
618 |
|
|
595 | self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
619 | if content.control: |
|
596 | connect_socket(self._notification_socket, cfg['notification']) | |
|
597 | ||||
620 |
|
|
598 | self._control_socket = self._context.socket(zmq.DEALER) | |
621 |
|
|
599 | connect_socket(self._control_socket, cfg['control']) | |
622 | if content.iopub: |
|
600 | ||
623 |
|
|
601 | self._iopub_socket = self._context.socket(zmq.SUB) | |
624 |
|
|
602 | self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') | |
625 |
|
|
603 | connect_socket(self._iopub_socket, cfg['iopub']) | |
626 | self._update_engines(dict(content.engines)) |
|
604 | ||
|
605 | self._update_engines(dict(content['engines'])) | |||
627 | else: |
|
606 | else: | |
628 | self._connected = False |
|
607 | self._connected = False | |
629 | raise Exception("Failed to connect!") |
|
608 | raise Exception("Failed to connect!") |
@@ -239,30 +239,61 b' class HubFactory(RegistrationFactory):' | |||||
239 | ctx = self.context |
|
239 | ctx = self.context | |
240 | loop = self.loop |
|
240 | loop = self.loop | |
241 |
|
241 | |||
|
242 | try: | |||
|
243 | scheme = self.config.TaskScheduler.scheme_name | |||
|
244 | except AttributeError: | |||
|
245 | from .scheduler import TaskScheduler | |||
|
246 | scheme = TaskScheduler.scheme_name.get_default_value() | |||
|
247 | ||||
|
248 | # build connection dicts | |||
|
249 | engine = self.engine_info = { | |||
|
250 | 'registration' : engine_iface % self.regport, | |||
|
251 | 'control' : engine_iface % self.control[1], | |||
|
252 | 'mux' : engine_iface % self.mux[1], | |||
|
253 | 'hb_ping' : engine_iface % self.hb[0], | |||
|
254 | 'hb_pong' : engine_iface % self.hb[1], | |||
|
255 | 'task' : engine_iface % self.task[1], | |||
|
256 | 'iopub' : engine_iface % self.iopub[1], | |||
|
257 | } | |||
|
258 | ||||
|
259 | client = self.client_info = { | |||
|
260 | 'registration' : client_iface % self.regport, | |||
|
261 | 'control' : client_iface % self.control[0], | |||
|
262 | 'mux' : client_iface % self.mux[0], | |||
|
263 | 'task' : client_iface % self.task[0], | |||
|
264 | 'task_scheme' : scheme, | |||
|
265 | 'iopub' : client_iface % self.iopub[0], | |||
|
266 | 'notification' : client_iface % self.notifier_port, | |||
|
267 | } | |||
|
268 | ||||
|
269 | self.log.debug("Hub engine addrs: %s", self.engine_info) | |||
|
270 | self.log.debug("Hub client addrs: %s", self.client_info) | |||
|
271 | ||||
242 | # Registrar socket |
|
272 | # Registrar socket | |
243 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
273 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
244 |
q.bind(client |
|
274 | q.bind(client['registration']) | |
245 | self.log.info("Hub listening on %s for registration.", client_iface % self.regport) |
|
275 | self.log.info("Hub listening on %s for registration.", client_iface % self.regport) | |
246 | if self.client_ip != self.engine_ip: |
|
276 | if self.client_ip != self.engine_ip: | |
247 |
q.bind(engine |
|
277 | q.bind(engine['registration']) | |
248 | self.log.info("Hub listening on %s for registration.", engine_iface % self.regport) |
|
278 | self.log.info("Hub listening on %s for registration.", engine_iface % self.regport) | |
249 |
|
279 | |||
250 | ### Engine connections ### |
|
280 | ### Engine connections ### | |
251 |
|
281 | |||
252 | # heartbeat |
|
282 | # heartbeat | |
253 | hpub = ctx.socket(zmq.PUB) |
|
283 | hpub = ctx.socket(zmq.PUB) | |
254 |
hpub.bind(engine |
|
284 | hpub.bind(engine['hb_ping']) | |
255 | hrep = ctx.socket(zmq.ROUTER) |
|
285 | hrep = ctx.socket(zmq.ROUTER) | |
256 |
hrep.bind(engine |
|
286 | hrep.bind(engine['hb_pong']) | |
257 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
287 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, | |
258 | pingstream=ZMQStream(hpub,loop), |
|
288 | pingstream=ZMQStream(hpub,loop), | |
259 | pongstream=ZMQStream(hrep,loop) |
|
289 | pongstream=ZMQStream(hrep,loop) | |
260 | ) |
|
290 | ) | |
261 |
|
291 | |||
262 | ### Client connections ### |
|
292 | ### Client connections ### | |
|
293 | ||||
263 | # Notifier socket |
|
294 | # Notifier socket | |
264 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
295 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |
265 |
n.bind(client |
|
296 | n.bind(client['notification']) | |
266 |
|
297 | |||
267 | ### build and launch the queues ### |
|
298 | ### build and launch the queues ### | |
268 |
|
299 | |||
@@ -279,35 +310,10 b' class HubFactory(RegistrationFactory):' | |||||
279 | self.db = import_item(str(db_class))(session=self.session.session, |
|
310 | self.db = import_item(str(db_class))(session=self.session.session, | |
280 | config=self.config, log=self.log) |
|
311 | config=self.config, log=self.log) | |
281 | time.sleep(.25) |
|
312 | time.sleep(.25) | |
282 | try: |
|
|||
283 | scheme = self.config.TaskScheduler.scheme_name |
|
|||
284 | except AttributeError: |
|
|||
285 | from .scheduler import TaskScheduler |
|
|||
286 | scheme = TaskScheduler.scheme_name.get_default_value() |
|
|||
287 | # build connection dicts |
|
|||
288 | self.engine_info = { |
|
|||
289 | 'control' : engine_iface%self.control[1], |
|
|||
290 | 'mux': engine_iface%self.mux[1], |
|
|||
291 | 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), |
|
|||
292 | 'task' : engine_iface%self.task[1], |
|
|||
293 | 'iopub' : engine_iface%self.iopub[1], |
|
|||
294 | # 'monitor' : engine_iface%self.mon_port, |
|
|||
295 | } |
|
|||
296 |
|
||||
297 | self.client_info = { |
|
|||
298 | 'control' : client_iface%self.control[0], |
|
|||
299 | 'mux': client_iface%self.mux[0], |
|
|||
300 | 'task' : (scheme, client_iface%self.task[0]), |
|
|||
301 | 'iopub' : client_iface%self.iopub[0], |
|
|||
302 | 'notification': client_iface%self.notifier_port |
|
|||
303 | } |
|
|||
304 | self.log.debug("Hub engine addrs: %s", self.engine_info) |
|
|||
305 | self.log.debug("Hub client addrs: %s", self.client_info) |
|
|||
306 |
|
313 | |||
307 | # resubmit stream |
|
314 | # resubmit stream | |
308 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) |
|
315 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) | |
309 |
url = util.disambiguate_url(self.client_info['task'] |
|
316 | url = util.disambiguate_url(self.client_info['task']) | |
310 | r.setsockopt(zmq.IDENTITY, self.session.bsession) |
|
|||
311 | r.connect(url) |
|
317 | r.connect(url) | |
312 |
|
318 | |||
313 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
319 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
@@ -384,8 +390,8 b' class Hub(SessionFactory):' | |||||
384 |
|
390 | |||
385 | # validate connection dicts: |
|
391 | # validate connection dicts: | |
386 | for k,v in self.client_info.iteritems(): |
|
392 | for k,v in self.client_info.iteritems(): | |
387 | if k == 'task': |
|
393 | if k == 'task_scheme': | |
388 |
|
|
394 | continue | |
389 | else: |
|
395 | else: | |
390 | util.validate_url_container(v) |
|
396 | util.validate_url_container(v) | |
391 | # util.validate_url_container(self.client_info) |
|
397 | # util.validate_url_container(self.client_info) | |
@@ -865,7 +871,6 b' class Hub(SessionFactory):' | |||||
865 | """Reply with connection addresses for clients.""" |
|
871 | """Reply with connection addresses for clients.""" | |
866 | self.log.info("client::client %r connected", client_id) |
|
872 | self.log.info("client::client %r connected", client_id) | |
867 | content = dict(status='ok') |
|
873 | content = dict(status='ok') | |
868 | content.update(self.client_info) |
|
|||
869 | jsonable = {} |
|
874 | jsonable = {} | |
870 | for k,v in self.keytable.iteritems(): |
|
875 | for k,v in self.keytable.iteritems(): | |
871 | if v not in self.dead_engines: |
|
876 | if v not in self.dead_engines: | |
@@ -891,7 +896,6 b' class Hub(SessionFactory):' | |||||
891 | self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) |
|
896 | self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) | |
892 |
|
897 | |||
893 | content = dict(id=eid,status='ok') |
|
898 | content = dict(id=eid,status='ok') | |
894 | content.update(self.engine_info) |
|
|||
895 | # check if requesting available IDs: |
|
899 | # check if requesting available IDs: | |
896 | if queue in self.by_ident: |
|
900 | if queue in self.by_ident: | |
897 | try: |
|
901 | try: |
@@ -50,7 +50,7 b' class EngineFactory(RegistrationFactory):' | |||||
50 | help="""The location (an IP address) of the controller. This is |
|
50 | help="""The location (an IP address) of the controller. This is | |
51 | used for disambiguating URLs, to determine whether |
|
51 | used for disambiguating URLs, to determine whether | |
52 | loopback should be used to connect or the public address.""") |
|
52 | loopback should be used to connect or the public address.""") | |
53 |
timeout=CFloat( |
|
53 | timeout=CFloat(5, config=True, | |
54 | help="""The time (in seconds) to wait for the Controller to respond |
|
54 | help="""The time (in seconds) to wait for the Controller to respond | |
55 | to registration requests before giving up.""") |
|
55 | to registration requests before giving up.""") | |
56 | sshserver=Unicode(config=True, |
|
56 | sshserver=Unicode(config=True, | |
@@ -61,6 +61,7 b' class EngineFactory(RegistrationFactory):' | |||||
61 | help="""Whether to use paramiko instead of openssh for tunnels.""") |
|
61 | help="""Whether to use paramiko instead of openssh for tunnels.""") | |
62 |
|
62 | |||
63 | # not configurable: |
|
63 | # not configurable: | |
|
64 | connection_info = Dict() | |||
64 | user_ns=Dict() |
|
65 | user_ns = Dict() | |
65 | id=Integer(allow_none=True) |
|
66 | id = Integer(allow_none=True) | |
66 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
67 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') | |
@@ -96,7 +97,7 b' class EngineFactory(RegistrationFactory):' | |||||
96 | def connect(s, url): |
|
97 | def connect(s, url): | |
97 | url = disambiguate_url(url, self.location) |
|
98 | url = disambiguate_url(url, self.location) | |
98 | if self.using_ssh: |
|
99 | if self.using_ssh: | |
99 |
self.log.debug("Tunneling connection to %s via %s" |
|
100 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
100 | return tunnel.tunnel_connection(s, url, self.sshserver, |
|
101 | return tunnel.tunnel_connection(s, url, self.sshserver, | |
101 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
102 | keyfile=self.sshkey, paramiko=self.paramiko, | |
102 | password=password, |
|
103 | password=password, | |
@@ -108,12 +109,12 b' class EngineFactory(RegistrationFactory):' | |||||
108 | """like connect, but don't complete the connection (for use by heartbeat)""" |
|
109 | """like connect, but don't complete the connection (for use by heartbeat)""" | |
109 | url = disambiguate_url(url, self.location) |
|
110 | url = disambiguate_url(url, self.location) | |
110 | if self.using_ssh: |
|
111 | if self.using_ssh: | |
111 |
self.log.debug("Tunneling connection to %s via %s" |
|
112 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
112 | url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, |
|
113 | url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, | |
113 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
114 | keyfile=self.sshkey, paramiko=self.paramiko, | |
114 | password=password, |
|
115 | password=password, | |
115 | ) |
|
116 | ) | |
116 | return url |
|
117 | return str(url) | |
117 | return connect, maybe_tunnel |
|
118 | return connect, maybe_tunnel | |
118 |
|
119 | |||
119 | def register(self): |
|
120 | def register(self): | |
@@ -140,50 +141,39 b' class EngineFactory(RegistrationFactory):' | |||||
140 | loop = self.loop |
|
141 | loop = self.loop | |
141 | identity = self.bident |
|
142 | identity = self.bident | |
142 | idents,msg = self.session.feed_identities(msg) |
|
143 | idents,msg = self.session.feed_identities(msg) | |
143 |
msg = |
|
144 | msg = self.session.unserialize(msg) | |
|
145 | content = msg['content'] | |||
|
146 | info = self.connection_info | |||
144 |
|
147 | |||
145 |
if |
|
148 | if content['status'] == 'ok': | |
146 |
self.id = int( |
|
149 | self.id = int(content['id']) | |
147 |
|
150 | |||
148 | # launch heartbeat |
|
151 | # launch heartbeat | |
149 | hb_addrs = msg.content.heartbeat |
|
|||
150 |
|
||||
151 | # possibly forward hb ports with tunnels |
|
152 | # possibly forward hb ports with tunnels | |
152 |
hb_ |
|
153 | hb_ping = maybe_tunnel(info['hb_ping']) | |
153 | heart = Heart(*map(str, hb_addrs), heart_id=identity) |
|
154 | hb_pong = maybe_tunnel(info['hb_pong']) | |
|
155 | ||||
|
156 | heart = Heart(hb_ping, hb_pong, heart_id=identity) | |||
154 | heart.start() |
|
157 | heart.start() | |
155 |
|
158 | |||
156 |
# create Shell |
|
159 | # create Shell Connections (MUX, Task, etc.): | |
157 | queue_addr = msg.content.mux |
|
160 | shell_addrs = map(str, [info['mux'], info['task']]) | |
158 | shell_addrs = [ str(queue_addr) ] |
|
161 | ||
159 | task_addr = msg.content.task |
|
162 | # Use only one shell stream for mux and tasks | |
160 | if task_addr: |
|
|||
161 | shell_addrs.append(str(task_addr)) |
|
|||
162 |
|
||||
163 | # Uncomment this to go back to two-socket model |
|
|||
164 | # shell_streams = [] |
|
|||
165 | # for addr in shell_addrs: |
|
|||
166 | # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
|||
167 | # stream.setsockopt(zmq.IDENTITY, identity) |
|
|||
168 | # stream.connect(disambiguate_url(addr, self.location)) |
|
|||
169 | # shell_streams.append(stream) |
|
|||
170 |
|
||||
171 | # Now use only one shell stream for mux and tasks |
|
|||
172 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
163 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
173 | stream.setsockopt(zmq.IDENTITY, identity) |
|
164 | stream.setsockopt(zmq.IDENTITY, identity) | |
174 | shell_streams = [stream] |
|
165 | shell_streams = [stream] | |
175 | for addr in shell_addrs: |
|
166 | for addr in shell_addrs: | |
176 | connect(stream, addr) |
|
167 | connect(stream, addr) | |
177 | # end single stream-socket |
|
|||
178 |
|
168 | |||
179 | # control stream: |
|
169 | # control stream: | |
180 |
control_addr = str( |
|
170 | control_addr = str(info['control']) | |
181 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
171 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
182 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
172 | control_stream.setsockopt(zmq.IDENTITY, identity) | |
183 | connect(control_stream, control_addr) |
|
173 | connect(control_stream, control_addr) | |
184 |
|
174 | |||
185 | # create iopub stream: |
|
175 | # create iopub stream: | |
186 |
iopub_addr = |
|
176 | iopub_addr = info['iopub'] | |
187 | iopub_socket = ctx.socket(zmq.PUB) |
|
177 | iopub_socket = ctx.socket(zmq.PUB) | |
188 | iopub_socket.setsockopt(zmq.IDENTITY, identity) |
|
178 | iopub_socket.setsockopt(zmq.IDENTITY, identity) | |
189 | connect(iopub_socket, iopub_addr) |
|
179 | connect(iopub_socket, iopub_addr) |
@@ -257,9 +257,11 b' class Session(Configurable):' | |||||
257 | if new.lower() == 'json': |
|
257 | if new.lower() == 'json': | |
258 | self.pack = json_packer |
|
258 | self.pack = json_packer | |
259 | self.unpack = json_unpacker |
|
259 | self.unpack = json_unpacker | |
|
260 | self.unpacker = new | |||
260 | elif new.lower() == 'pickle': |
|
261 | elif new.lower() == 'pickle': | |
261 | self.pack = pickle_packer |
|
262 | self.pack = pickle_packer | |
262 | self.unpack = pickle_unpacker |
|
263 | self.unpack = pickle_unpacker | |
|
264 | self.unpacker = new | |||
263 | else: |
|
265 | else: | |
264 | self.pack = import_item(str(new)) |
|
266 | self.pack = import_item(str(new)) | |
265 |
|
267 | |||
@@ -270,9 +272,11 b' class Session(Configurable):' | |||||
270 | if new.lower() == 'json': |
|
272 | if new.lower() == 'json': | |
271 | self.pack = json_packer |
|
273 | self.pack = json_packer | |
272 | self.unpack = json_unpacker |
|
274 | self.unpack = json_unpacker | |
|
275 | self.packer = new | |||
273 | elif new.lower() == 'pickle': |
|
276 | elif new.lower() == 'pickle': | |
274 | self.pack = pickle_packer |
|
277 | self.pack = pickle_packer | |
275 | self.unpack = pickle_unpacker |
|
278 | self.unpack = pickle_unpacker | |
|
279 | self.packer = new | |||
276 | else: |
|
280 | else: | |
277 | self.unpack = import_item(str(new)) |
|
281 | self.unpack = import_item(str(new)) | |
278 |
|
282 |
General Comments 0
You need to be logged in to leave comments.
Login now