Show More
@@ -209,7 +209,7 b' class IPControllerApp(BaseParallelApplication):' | |||
|
209 | 209 | def save_connection_dict(self, fname, cdict): |
|
210 | 210 | """save a connection dict to json file.""" |
|
211 | 211 | c = self.config |
|
212 |
url = cdict[' |
|
|
212 | url = cdict['registration'] | |
|
213 | 213 | location = cdict['location'] |
|
214 | 214 | if not location: |
|
215 | 215 | try: |
@@ -314,15 +314,21 b' class IPControllerApp(BaseParallelApplication):' | |||
|
314 | 314 | if self.write_connection_files: |
|
315 | 315 | # save to new json config files |
|
316 | 316 | f = self.factory |
|
317 | cdict = {'exec_key' : f.session.key.decode('ascii'), | |
|
318 | 'ssh' : self.ssh_server, | |
|
319 | 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), | |
|
320 | 'location' : self.location | |
|
317 | base = { | |
|
318 | 'exec_key' : f.session.key.decode('ascii'), | |
|
319 | 'location' : self.location, | |
|
320 | 'pack' : f.session.packer, | |
|
321 | 'unpack' : f.session.unpacker, | |
|
321 | 322 |
|
|
323 | ||
|
324 | cdict = {'ssh' : self.ssh_server} | |
|
325 | cdict.update(f.client_info) | |
|
326 | cdict.update(base) | |
|
322 | 327 | self.save_connection_dict(self.client_json_file, cdict) |
|
323 |
|
|
|
324 | edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) | |
|
325 |
edict |
|
|
328 | ||
|
329 | edict = {'ssh' : self.engine_ssh_server} | |
|
330 | edict.update(f.engine_info) | |
|
331 | edict.update(base) | |
|
326 | 332 | self.save_connection_dict(self.engine_json_file, edict) |
|
327 | 333 | |
|
328 | 334 | def init_schedulers(self): |
@@ -379,7 +385,7 b' class IPControllerApp(BaseParallelApplication):' | |||
|
379 | 385 | |
|
380 | 386 | else: |
|
381 | 387 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
382 |
sargs = (hub.client_info['task'] |
|
|
388 | sargs = (hub.client_info['task'], hub.engine_info['task'], | |
|
383 | 389 | monitor_url, disambiguate_url(hub.client_info['notification'])) |
|
384 | 390 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
385 | 391 | log_url = self.log_url, config=dict(self.config)) |
@@ -211,24 +211,35 b' class IPEngineApp(BaseParallelApplication):' | |||
|
211 | 211 | with open(self.url_file) as f: |
|
212 | 212 | d = json.loads(f.read()) |
|
213 | 213 | |
|
214 | if 'exec_key' in d: | |
|
215 | config.Session.key = cast_bytes(d['exec_key']) | |
|
216 | ||
|
214 | # allow hand-override of location for disambiguation | |
|
215 | # and ssh-server | |
|
217 | 216 | try: |
|
218 | 217 | config.EngineFactory.location |
|
219 | 218 | except AttributeError: |
|
220 | 219 | config.EngineFactory.location = d['location'] |
|
221 | 220 | |
|
222 | d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) | |
|
223 | try: | |
|
224 | config.EngineFactory.url | |
|
225 | except AttributeError: | |
|
226 | config.EngineFactory.url = d['url'] | |
|
227 | ||
|
228 | 221 | try: |
|
229 | 222 | config.EngineFactory.sshserver |
|
230 | 223 | except AttributeError: |
|
231 |
config.EngineFactory.sshserver = d |
|
|
224 | config.EngineFactory.sshserver = d.get('ssh') | |
|
225 | ||
|
226 | location = config.EngineFactory.location | |
|
227 | ||
|
228 | for key in ('registration', 'hb_ping', 'hb_pong', 'mux', 'task', 'control'): | |
|
229 | d[key] = disambiguate_url(d[key], location) | |
|
230 | ||
|
231 | # DO NOT allow override of basic URLs, serialization, or exec_key | |
|
232 | # JSON file takes top priority there | |
|
233 | config.Session.key = asbytes(d['exec_key']) | |
|
234 | ||
|
235 | config.EngineFactory.url = d['registration'] | |
|
236 | ||
|
237 | config.Session.packer = d['pack'] | |
|
238 | config.Session.unpacker = d['unpack'] | |
|
239 | ||
|
240 | self.log.debug("Config changed:") | |
|
241 | self.log.debug("%r", config) | |
|
242 | self.connection_info = d | |
|
232 | 243 | |
|
233 | 244 | def bind_kernel(self, **kwargs): |
|
234 | 245 | """Promote engine to listening kernel, accessible to frontends.""" |
@@ -320,7 +331,9 b' class IPEngineApp(BaseParallelApplication):' | |||
|
320 | 331 | # shell_class = import_item(self.master_config.Global.shell_class) |
|
321 | 332 | # print self.config |
|
322 | 333 | try: |
|
323 |
self.engine = EngineFactory(config=config, log=self.log |
|
|
334 | self.engine = EngineFactory(config=config, log=self.log, | |
|
335 | connection_info=self.connection_info, | |
|
336 | ) | |
|
324 | 337 | except: |
|
325 | 338 | self.log.error("Couldn't start the Engine", exc_info=True) |
|
326 | 339 | self.exit(1) |
@@ -217,7 +217,9 b' class Client(HasTraits):' | |||
|
217 | 217 | Parameters |
|
218 | 218 | ---------- |
|
219 | 219 | |
|
220 |
url_ |
|
|
220 | url_file : str/unicode; path to ipcontroller-client.json | |
|
221 | This JSON file should contain all the information needed to connect to a cluster, | |
|
222 | and is likely the only argument needed. | |
|
221 | 223 | Connection information for the Hub's registration. If a json connector |
|
222 | 224 | file is given, then likely no further configuration is necessary. |
|
223 | 225 | [Default: use profile] |
@@ -239,14 +241,6 b' class Client(HasTraits):' | |||
|
239 | 241 | If specified, this will be relayed to the Session for configuration |
|
240 | 242 | username : str |
|
241 | 243 | set username for the session object |
|
242 | packer : str (import_string) or callable | |
|
243 | Can be either the simple keyword 'json' or 'pickle', or an import_string to a | |
|
244 | function to serialize messages. Must support same input as | |
|
245 | JSON, and output must be bytes. | |
|
246 | You can pass a callable directly as `pack` | |
|
247 | unpacker : str (import_string) or callable | |
|
248 | The inverse of packer. Only necessary if packer is specified as *not* one | |
|
249 | of 'json' or 'pickle'. | |
|
250 | 244 | |
|
251 | 245 | #-------------- ssh related args ---------------- |
|
252 | 246 | # These are args for configuring the ssh tunnel to be used |
@@ -271,17 +265,6 b' class Client(HasTraits):' | |||
|
271 | 265 | flag for whether to use paramiko instead of shell ssh for tunneling. |
|
272 | 266 | [default: True on win32, False else] |
|
273 | 267 | |
|
274 | ------- exec authentication args ------- | |
|
275 | If even localhost is untrusted, you can have some protection against | |
|
276 | unauthorized execution by signing messages with HMAC digests. | |
|
277 | Messages are still sent as cleartext, so if someone can snoop your | |
|
278 | loopback traffic this will not protect your privacy, but will prevent | |
|
279 | unauthorized execution. | |
|
280 | ||
|
281 | exec_key : str | |
|
282 | an authentication key or file containing a key | |
|
283 | default: None | |
|
284 | ||
|
285 | 268 | |
|
286 | 269 | Attributes |
|
287 | 270 | ---------- |
@@ -378,8 +361,8 b' class Client(HasTraits):' | |||
|
378 | 361 | # don't raise on positional args |
|
379 | 362 | return HasTraits.__new__(self, **kw) |
|
380 | 363 | |
|
381 |
def __init__(self, url_ |
|
|
382 |
context=None, debug=False, |
|
|
364 | def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None, | |
|
365 | context=None, debug=False, | |
|
383 | 366 | sshserver=None, sshkey=None, password=None, paramiko=None, |
|
384 | 367 | timeout=10, **extra_args |
|
385 | 368 | ): |
@@ -392,37 +375,37 b' class Client(HasTraits):' | |||
|
392 | 375 | self._context = context |
|
393 | 376 | self._stop_spinning = Event() |
|
394 | 377 | |
|
378 | if 'url_or_file' in extra_args: | |
|
379 | url_file = extra_args['url_or_file'] | |
|
380 | warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning) | |
|
381 | ||
|
382 | if url_file and util.is_url(url_file): | |
|
383 | raise ValueError("single urls cannot be specified, url-files must be used.") | |
|
384 | ||
|
395 | 385 | self._setup_profile_dir(self.profile, profile_dir, ipython_dir) |
|
386 | ||
|
396 | 387 | if self._cd is not None: |
|
397 |
if url_ |
|
|
398 |
url_ |
|
|
399 |
if url_ |
|
|
388 | if url_file is None: | |
|
389 | url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') | |
|
390 | if url_file is None: | |
|
400 | 391 | raise ValueError( |
|
401 | 392 | "I can't find enough information to connect to a hub!" |
|
402 |
" Please specify at least one of url_ |
|
|
393 | " Please specify at least one of url_file or profile." | |
|
403 | 394 | ) |
|
404 | 395 | |
|
405 |
|
|
|
406 | # it's not a url, try for a file | |
|
407 | if not os.path.exists(url_or_file): | |
|
408 | if self._cd: | |
|
409 | url_or_file = os.path.join(self._cd.security_dir, url_or_file) | |
|
410 | if not os.path.exists(url_or_file): | |
|
411 | raise IOError("Connection file not found: %r" % url_or_file) | |
|
412 | with open(url_or_file) as f: | |
|
413 | cfg = json.loads(f.read()) | |
|
414 | else: | |
|
415 | cfg = {'url':url_or_file} | |
|
396 | with open(url_file) as f: | |
|
397 | cfg = json.load(f) | |
|
398 | ||
|
399 | self._task_scheme = cfg['task_scheme'] | |
|
416 | 400 | |
|
417 | 401 | # sync defaults from args, json: |
|
418 | 402 | if sshserver: |
|
419 | 403 | cfg['ssh'] = sshserver |
|
420 | if exec_key: | |
|
421 | cfg['exec_key'] = exec_key | |
|
422 | exec_key = cfg['exec_key'] | |
|
404 | ||
|
423 | 405 | location = cfg.setdefault('location', None) |
|
424 | cfg['url'] = util.disambiguate_url(cfg['url'], location) | |
|
425 | url = cfg['url'] | |
|
406 | for key in ('control', 'task', 'mux', 'notification', 'registration'): | |
|
407 | cfg[key] = util.disambiguate_url(cfg[key], location) | |
|
408 | url = cfg['registration'] | |
|
426 | 409 | proto,addr,port = util.split_url(url) |
|
427 | 410 | if location is not None and addr == '127.0.0.1': |
|
428 | 411 | # location specified, and connection is expected to be local |
@@ -457,12 +440,10 b' class Client(HasTraits):' | |||
|
457 | 440 | ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) |
|
458 | 441 | |
|
459 | 442 | # configure and construct the session |
|
460 | if exec_key is not None: | |
|
461 | if os.path.isfile(exec_key): | |
|
462 |
|
|
|
463 |
|
|
|
464 | exec_key = cast_bytes(exec_key) | |
|
465 | extra_args['key'] = exec_key | |
|
443 | extra_args['packer'] = cfg['pack'] | |
|
444 | extra_args['unpacker'] = cfg['unpack'] | |
|
445 | extra_args['key'] = cfg['exec_key'] | |
|
446 | ||
|
466 | 447 | self.session = Session(**extra_args) |
|
467 | 448 | |
|
468 | 449 | self._query_socket = self._context.socket(zmq.DEALER) |
@@ -583,7 +564,7 b' class Client(HasTraits):' | |||
|
583 | 564 | self._connected=True |
|
584 | 565 | |
|
585 | 566 | def connect_socket(s, url): |
|
586 | url = util.disambiguate_url(url, self._config['location']) | |
|
567 | # url = util.disambiguate_url(url, self._config['location']) | |
|
587 | 568 | if self._ssh: |
|
588 | 569 | return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) |
|
589 | 570 | else: |
@@ -600,30 +581,28 b' class Client(HasTraits):' | |||
|
600 | 581 | idents,msg = self.session.recv(self._query_socket,mode=0) |
|
601 | 582 | if self.debug: |
|
602 | 583 | pprint(msg) |
|
603 | msg = Message(msg) | |
|
604 | content = msg.content | |
|
605 | self._config['registration'] = dict(content) | |
|
606 |
if content |
|
|
607 | ident = self.session.bsession | |
|
608 | if content.mux: | |
|
584 | content = msg['content'] | |
|
585 | # self._config['registration'] = dict(content) | |
|
586 | cfg = self._config | |
|
587 | if content['status'] == 'ok': | |
|
609 | 588 |
|
|
610 |
|
|
|
611 | if content.task: | |
|
612 | self._task_scheme, task_addr = content.task | |
|
589 | connect_socket(self._mux_socket, cfg['mux']) | |
|
590 | ||
|
613 | 591 |
|
|
614 |
|
|
|
615 | if content.notification: | |
|
592 | connect_socket(self._task_socket, cfg['task']) | |
|
593 | ||
|
616 | 594 |
|
|
617 | connect_socket(self._notification_socket, content.notification) | |
|
618 | 595 |
|
|
619 | if content.control: | |
|
596 | connect_socket(self._notification_socket, cfg['notification']) | |
|
597 | ||
|
620 | 598 |
|
|
621 |
|
|
|
622 | if content.iopub: | |
|
599 | connect_socket(self._control_socket, cfg['control']) | |
|
600 | ||
|
623 | 601 |
|
|
624 | 602 |
|
|
625 |
|
|
|
626 | self._update_engines(dict(content.engines)) | |
|
603 | connect_socket(self._iopub_socket, cfg['iopub']) | |
|
604 | ||
|
605 | self._update_engines(dict(content['engines'])) | |
|
627 | 606 | else: |
|
628 | 607 | self._connected = False |
|
629 | 608 | raise Exception("Failed to connect!") |
@@ -239,30 +239,61 b' class HubFactory(RegistrationFactory):' | |||
|
239 | 239 | ctx = self.context |
|
240 | 240 | loop = self.loop |
|
241 | 241 | |
|
242 | try: | |
|
243 | scheme = self.config.TaskScheduler.scheme_name | |
|
244 | except AttributeError: | |
|
245 | from .scheduler import TaskScheduler | |
|
246 | scheme = TaskScheduler.scheme_name.get_default_value() | |
|
247 | ||
|
248 | # build connection dicts | |
|
249 | engine = self.engine_info = { | |
|
250 | 'registration' : engine_iface % self.regport, | |
|
251 | 'control' : engine_iface % self.control[1], | |
|
252 | 'mux' : engine_iface % self.mux[1], | |
|
253 | 'hb_ping' : engine_iface % self.hb[0], | |
|
254 | 'hb_pong' : engine_iface % self.hb[1], | |
|
255 | 'task' : engine_iface % self.task[1], | |
|
256 | 'iopub' : engine_iface % self.iopub[1], | |
|
257 | } | |
|
258 | ||
|
259 | client = self.client_info = { | |
|
260 | 'registration' : client_iface % self.regport, | |
|
261 | 'control' : client_iface % self.control[0], | |
|
262 | 'mux' : client_iface % self.mux[0], | |
|
263 | 'task' : client_iface % self.task[0], | |
|
264 | 'task_scheme' : scheme, | |
|
265 | 'iopub' : client_iface % self.iopub[0], | |
|
266 | 'notification' : client_iface % self.notifier_port, | |
|
267 | } | |
|
268 | ||
|
269 | self.log.debug("Hub engine addrs: %s", self.engine_info) | |
|
270 | self.log.debug("Hub client addrs: %s", self.client_info) | |
|
271 | ||
|
242 | 272 | # Registrar socket |
|
243 | 273 | q = ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
244 |
q.bind(client |
|
|
274 | q.bind(client['registration']) | |
|
245 | 275 | self.log.info("Hub listening on %s for registration.", client_iface % self.regport) |
|
246 | 276 | if self.client_ip != self.engine_ip: |
|
247 |
q.bind(engine |
|
|
277 | q.bind(engine['registration']) | |
|
248 | 278 | self.log.info("Hub listening on %s for registration.", engine_iface % self.regport) |
|
249 | 279 | |
|
250 | 280 | ### Engine connections ### |
|
251 | 281 | |
|
252 | 282 | # heartbeat |
|
253 | 283 | hpub = ctx.socket(zmq.PUB) |
|
254 |
hpub.bind(engine |
|
|
284 | hpub.bind(engine['hb_ping']) | |
|
255 | 285 | hrep = ctx.socket(zmq.ROUTER) |
|
256 |
hrep.bind(engine |
|
|
286 | hrep.bind(engine['hb_pong']) | |
|
257 | 287 | self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, |
|
258 | 288 | pingstream=ZMQStream(hpub,loop), |
|
259 | 289 | pongstream=ZMQStream(hrep,loop) |
|
260 | 290 | ) |
|
261 | 291 | |
|
262 | 292 | ### Client connections ### |
|
293 | ||
|
263 | 294 | # Notifier socket |
|
264 | 295 | n = ZMQStream(ctx.socket(zmq.PUB), loop) |
|
265 |
n.bind(client |
|
|
296 | n.bind(client['notification']) | |
|
266 | 297 | |
|
267 | 298 | ### build and launch the queues ### |
|
268 | 299 | |
@@ -279,35 +310,10 b' class HubFactory(RegistrationFactory):' | |||
|
279 | 310 | self.db = import_item(str(db_class))(session=self.session.session, |
|
280 | 311 | config=self.config, log=self.log) |
|
281 | 312 | time.sleep(.25) |
|
282 | try: | |
|
283 | scheme = self.config.TaskScheduler.scheme_name | |
|
284 | except AttributeError: | |
|
285 | from .scheduler import TaskScheduler | |
|
286 | scheme = TaskScheduler.scheme_name.get_default_value() | |
|
287 | # build connection dicts | |
|
288 | self.engine_info = { | |
|
289 | 'control' : engine_iface%self.control[1], | |
|
290 | 'mux': engine_iface%self.mux[1], | |
|
291 | 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), | |
|
292 | 'task' : engine_iface%self.task[1], | |
|
293 | 'iopub' : engine_iface%self.iopub[1], | |
|
294 | # 'monitor' : engine_iface%self.mon_port, | |
|
295 | } | |
|
296 | ||
|
297 | self.client_info = { | |
|
298 | 'control' : client_iface%self.control[0], | |
|
299 | 'mux': client_iface%self.mux[0], | |
|
300 | 'task' : (scheme, client_iface%self.task[0]), | |
|
301 | 'iopub' : client_iface%self.iopub[0], | |
|
302 | 'notification': client_iface%self.notifier_port | |
|
303 | } | |
|
304 | self.log.debug("Hub engine addrs: %s", self.engine_info) | |
|
305 | self.log.debug("Hub client addrs: %s", self.client_info) | |
|
306 | 313 | |
|
307 | 314 | # resubmit stream |
|
308 | 315 | r = ZMQStream(ctx.socket(zmq.DEALER), loop) |
|
309 |
url = util.disambiguate_url(self.client_info['task'] |
|
|
310 | r.setsockopt(zmq.IDENTITY, self.session.bsession) | |
|
316 | url = util.disambiguate_url(self.client_info['task']) | |
|
311 | 317 | r.connect(url) |
|
312 | 318 | |
|
313 | 319 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
@@ -384,8 +390,8 b' class Hub(SessionFactory):' | |||
|
384 | 390 | |
|
385 | 391 | # validate connection dicts: |
|
386 | 392 | for k,v in self.client_info.iteritems(): |
|
387 | if k == 'task': | |
|
388 |
|
|
|
393 | if k == 'task_scheme': | |
|
394 | continue | |
|
389 | 395 | else: |
|
390 | 396 | util.validate_url_container(v) |
|
391 | 397 | # util.validate_url_container(self.client_info) |
@@ -865,7 +871,6 b' class Hub(SessionFactory):' | |||
|
865 | 871 | """Reply with connection addresses for clients.""" |
|
866 | 872 | self.log.info("client::client %r connected", client_id) |
|
867 | 873 | content = dict(status='ok') |
|
868 | content.update(self.client_info) | |
|
869 | 874 | jsonable = {} |
|
870 | 875 | for k,v in self.keytable.iteritems(): |
|
871 | 876 | if v not in self.dead_engines: |
@@ -891,7 +896,6 b' class Hub(SessionFactory):' | |||
|
891 | 896 | self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) |
|
892 | 897 | |
|
893 | 898 | content = dict(id=eid,status='ok') |
|
894 | content.update(self.engine_info) | |
|
895 | 899 | # check if requesting available IDs: |
|
896 | 900 | if queue in self.by_ident: |
|
897 | 901 | try: |
@@ -50,7 +50,7 b' class EngineFactory(RegistrationFactory):' | |||
|
50 | 50 | help="""The location (an IP address) of the controller. This is |
|
51 | 51 | used for disambiguating URLs, to determine whether |
|
52 | 52 | loopback should be used to connect or the public address.""") |
|
53 |
timeout=CFloat( |
|
|
53 | timeout=CFloat(5, config=True, | |
|
54 | 54 | help="""The time (in seconds) to wait for the Controller to respond |
|
55 | 55 | to registration requests before giving up.""") |
|
56 | 56 | sshserver=Unicode(config=True, |
@@ -61,6 +61,7 b' class EngineFactory(RegistrationFactory):' | |||
|
61 | 61 | help="""Whether to use paramiko instead of openssh for tunnels.""") |
|
62 | 62 | |
|
63 | 63 | # not configurable: |
|
64 | connection_info = Dict() | |
|
64 | 65 | user_ns=Dict() |
|
65 | 66 | id=Integer(allow_none=True) |
|
66 | 67 | registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') |
@@ -96,7 +97,7 b' class EngineFactory(RegistrationFactory):' | |||
|
96 | 97 | def connect(s, url): |
|
97 | 98 | url = disambiguate_url(url, self.location) |
|
98 | 99 | if self.using_ssh: |
|
99 |
self.log.debug("Tunneling connection to %s via %s" |
|
|
100 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
|
100 | 101 | return tunnel.tunnel_connection(s, url, self.sshserver, |
|
101 | 102 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
102 | 103 | password=password, |
@@ -108,12 +109,12 b' class EngineFactory(RegistrationFactory):' | |||
|
108 | 109 | """like connect, but don't complete the connection (for use by heartbeat)""" |
|
109 | 110 | url = disambiguate_url(url, self.location) |
|
110 | 111 | if self.using_ssh: |
|
111 |
self.log.debug("Tunneling connection to %s via %s" |
|
|
112 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | |
|
112 | 113 | url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, |
|
113 | 114 | keyfile=self.sshkey, paramiko=self.paramiko, |
|
114 | 115 | password=password, |
|
115 | 116 | ) |
|
116 | return url | |
|
117 | return str(url) | |
|
117 | 118 | return connect, maybe_tunnel |
|
118 | 119 | |
|
119 | 120 | def register(self): |
@@ -140,50 +141,39 b' class EngineFactory(RegistrationFactory):' | |||
|
140 | 141 | loop = self.loop |
|
141 | 142 | identity = self.bident |
|
142 | 143 | idents,msg = self.session.feed_identities(msg) |
|
143 |
msg = |
|
|
144 | msg = self.session.unserialize(msg) | |
|
145 | content = msg['content'] | |
|
146 | info = self.connection_info | |
|
144 | 147 | |
|
145 |
if |
|
|
146 |
self.id = int( |
|
|
148 | if content['status'] == 'ok': | |
|
149 | self.id = int(content['id']) | |
|
147 | 150 | |
|
148 | 151 | # launch heartbeat |
|
149 | hb_addrs = msg.content.heartbeat | |
|
150 | ||
|
151 | 152 | # possibly forward hb ports with tunnels |
|
152 |
hb_ |
|
|
153 | heart = Heart(*map(str, hb_addrs), heart_id=identity) | |
|
153 | hb_ping = maybe_tunnel(info['hb_ping']) | |
|
154 | hb_pong = maybe_tunnel(info['hb_pong']) | |
|
155 | ||
|
156 | heart = Heart(hb_ping, hb_pong, heart_id=identity) | |
|
154 | 157 | heart.start() |
|
155 | 158 | |
|
156 |
# create Shell |
|
|
157 | queue_addr = msg.content.mux | |
|
158 | shell_addrs = [ str(queue_addr) ] | |
|
159 | task_addr = msg.content.task | |
|
160 | if task_addr: | |
|
161 | shell_addrs.append(str(task_addr)) | |
|
162 | ||
|
163 | # Uncomment this to go back to two-socket model | |
|
164 | # shell_streams = [] | |
|
165 | # for addr in shell_addrs: | |
|
166 | # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | |
|
167 | # stream.setsockopt(zmq.IDENTITY, identity) | |
|
168 | # stream.connect(disambiguate_url(addr, self.location)) | |
|
169 | # shell_streams.append(stream) | |
|
170 | ||
|
171 | # Now use only one shell stream for mux and tasks | |
|
159 | # create Shell Connections (MUX, Task, etc.): | |
|
160 | shell_addrs = map(str, [info['mux'], info['task']]) | |
|
161 | ||
|
162 | # Use only one shell stream for mux and tasks | |
|
172 | 163 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
173 | 164 | stream.setsockopt(zmq.IDENTITY, identity) |
|
174 | 165 | shell_streams = [stream] |
|
175 | 166 | for addr in shell_addrs: |
|
176 | 167 | connect(stream, addr) |
|
177 | # end single stream-socket | |
|
178 | 168 | |
|
179 | 169 | # control stream: |
|
180 |
control_addr = str( |
|
|
170 | control_addr = str(info['control']) | |
|
181 | 171 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) |
|
182 | 172 | control_stream.setsockopt(zmq.IDENTITY, identity) |
|
183 | 173 | connect(control_stream, control_addr) |
|
184 | 174 | |
|
185 | 175 | # create iopub stream: |
|
186 |
iopub_addr = |
|
|
176 | iopub_addr = info['iopub'] | |
|
187 | 177 | iopub_socket = ctx.socket(zmq.PUB) |
|
188 | 178 | iopub_socket.setsockopt(zmq.IDENTITY, identity) |
|
189 | 179 | connect(iopub_socket, iopub_addr) |
@@ -257,9 +257,11 b' class Session(Configurable):' | |||
|
257 | 257 | if new.lower() == 'json': |
|
258 | 258 | self.pack = json_packer |
|
259 | 259 | self.unpack = json_unpacker |
|
260 | self.unpacker = new | |
|
260 | 261 | elif new.lower() == 'pickle': |
|
261 | 262 | self.pack = pickle_packer |
|
262 | 263 | self.unpack = pickle_unpacker |
|
264 | self.unpacker = new | |
|
263 | 265 | else: |
|
264 | 266 | self.pack = import_item(str(new)) |
|
265 | 267 | |
@@ -270,9 +272,11 b' class Session(Configurable):' | |||
|
270 | 272 | if new.lower() == 'json': |
|
271 | 273 | self.pack = json_packer |
|
272 | 274 | self.unpack = json_unpacker |
|
275 | self.packer = new | |
|
273 | 276 | elif new.lower() == 'pickle': |
|
274 | 277 | self.pack = pickle_packer |
|
275 | 278 | self.unpack = pickle_unpacker |
|
279 | self.packer = new | |
|
276 | 280 | else: |
|
277 | 281 | self.unpack = import_item(str(new)) |
|
278 | 282 |
General Comments 0
You need to be logged in to leave comments.
Login now