##// END OF EJS Templates
Merge pull request #1471 from minrk/connection...
Fernando Perez -
r7962:bff463b5 merge
parent child Browse files
Show More
@@ -116,7 +116,10 b' flags.update({'
116 116 select one of the true db backends.
117 117 """),
118 118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119 'reuse existing json connection files')
119 'reuse existing json connection files'),
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
121 'Attempt to restore engines from a JSON file. '
122 'For use when resuming a crashed controller'),
120 123 })
121 124
122 125 flags.update(session_flags)
@@ -156,6 +159,10 b' class IPControllerApp(BaseParallelApplication):'
156 159 If False, connection files will be removed on a clean exit.
157 160 """
158 161 )
162 restore_engines = Bool(False, config=True,
163 help="""Reload engine state from JSON file
164 """
165 )
159 166 ssh_server = Unicode(u'', config=True,
160 167 help="""ssh url for clients to use when connecting to the Controller
161 168 processes. It should be of the form: [user@]server[:port]. The
@@ -209,21 +216,17 b' class IPControllerApp(BaseParallelApplication):'
209 216 def save_connection_dict(self, fname, cdict):
210 217 """save a connection dict to json file."""
211 218 c = self.config
212 url = cdict['url']
219 url = cdict['registration']
213 220 location = cdict['location']
221
214 222 if not location:
215 223 try:
216 proto,ip,port = split_url(url)
217 except AssertionError:
218 pass
219 else:
220 try:
221 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
222 except (socket.gaierror, IndexError):
223 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
224 " You may need to specify '--location=<external_ip_address>' to help"
225 " IPython decide when to connect via loopback.")
226 location = '127.0.0.1'
224 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
225 except (socket.gaierror, IndexError):
226 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
227 " You may need to specify '--location=<external_ip_address>' to help"
228 " IPython decide when to connect via loopback.")
229 location = '127.0.0.1'
227 230 cdict['location'] = location
228 231 fname = os.path.join(self.profile_dir.security_dir, fname)
229 232 self.log.info("writing connection info to %s", fname)
@@ -235,35 +238,51 b' class IPControllerApp(BaseParallelApplication):'
235 238 """load config from existing json connector files."""
236 239 c = self.config
237 240 self.log.debug("loading config from JSON")
238 # load from engine config
241
242 # load engine config
243
239 244 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
240 245 self.log.info("loading connection info from %s", fname)
241 246 with open(fname) as f:
242 cfg = json.loads(f.read())
243 key = cfg['exec_key']
247 ecfg = json.loads(f.read())
248
244 249 # json gives unicode, Session.key wants bytes
245 c.Session.key = key.encode('ascii')
246 xport,addr = cfg['url'].split('://')
247 c.HubFactory.engine_transport = xport
248 ip,ports = addr.split(':')
250 c.Session.key = ecfg['exec_key'].encode('ascii')
251
252 xport,ip = ecfg['interface'].split('://')
253
249 254 c.HubFactory.engine_ip = ip
250 c.HubFactory.regport = int(ports)
251 self.location = cfg['location']
255 c.HubFactory.engine_transport = xport
256
257 self.location = ecfg['location']
252 258 if not self.engine_ssh_server:
253 self.engine_ssh_server = cfg['ssh']
259 self.engine_ssh_server = ecfg['ssh']
260
254 261 # load client config
262
255 263 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
256 264 self.log.info("loading connection info from %s", fname)
257 265 with open(fname) as f:
258 cfg = json.loads(f.read())
259 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
260 xport,addr = cfg['url'].split('://')
266 ccfg = json.loads(f.read())
267
268 for key in ('exec_key', 'registration', 'pack', 'unpack'):
269 assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key
270
271 xport,addr = ccfg['interface'].split('://')
272
261 273 c.HubFactory.client_transport = xport
262 ip,ports = addr.split(':')
263 274 c.HubFactory.client_ip = ip
264 275 if not self.ssh_server:
265 self.ssh_server = cfg['ssh']
266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
276 self.ssh_server = ccfg['ssh']
277
278 # load port config:
279 c.HubFactory.regport = ecfg['registration']
280 c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong'])
281 c.HubFactory.control = (ccfg['control'], ecfg['control'])
282 c.HubFactory.mux = (ccfg['mux'], ecfg['mux'])
283 c.HubFactory.task = (ccfg['task'], ecfg['task'])
284 c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub'])
285 c.HubFactory.notifier_port = ccfg['notification']
267 286
268 287 def cleanup_connection_files(self):
269 288 if self.reuse_files:
@@ -314,29 +333,42 b' class IPControllerApp(BaseParallelApplication):'
314 333 if self.write_connection_files:
315 334 # save to new json config files
316 335 f = self.factory
317 cdict = {'exec_key' : f.session.key.decode('ascii'),
318 'ssh' : self.ssh_server,
319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
320 'location' : self.location
321 }
336 base = {
337 'exec_key' : f.session.key.decode('ascii'),
338 'location' : self.location,
339 'pack' : f.session.packer,
340 'unpack' : f.session.unpacker,
341 }
342
343 cdict = {'ssh' : self.ssh_server}
344 cdict.update(f.client_info)
345 cdict.update(base)
322 346 self.save_connection_dict(self.client_json_file, cdict)
323 edict = cdict
324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
325 edict['ssh'] = self.engine_ssh_server
347
348 edict = {'ssh' : self.engine_ssh_server}
349 edict.update(f.engine_info)
350 edict.update(base)
326 351 self.save_connection_dict(self.engine_json_file, edict)
327 352
353 fname = "engines%s.json" % self.cluster_id
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
355 if self.restore_engines:
356 self.factory.hub._load_engine_state()
357
328 358 def init_schedulers(self):
329 359 children = self.children
330 360 mq = import_item(str(self.mq_class))
331 361
332 hub = self.factory
362 f = self.factory
363 ident = f.session.bsession
333 364 # disambiguate url, in case of *
334 monitor_url = disambiguate_url(hub.monitor_url)
365 monitor_url = disambiguate_url(f.monitor_url)
335 366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
336 367 # IOPub relay (in a Process)
337 368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
338 q.bind_in(hub.client_info['iopub'])
339 q.bind_out(hub.engine_info['iopub'])
369 q.bind_in(f.client_url('iopub'))
370 q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub")
371 q.bind_out(f.engine_url('iopub'))
340 372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
341 373 q.connect_mon(monitor_url)
342 374 q.daemon=True
@@ -344,18 +376,20 b' class IPControllerApp(BaseParallelApplication):'
344 376
345 377 # Multiplexer Queue (in a Process)
346 378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
347 q.bind_in(hub.client_info['mux'])
348 q.setsockopt_in(zmq.IDENTITY, b'mux')
349 q.bind_out(hub.engine_info['mux'])
379 q.bind_in(f.client_url('mux'))
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
381 q.bind_out(f.engine_url('mux'))
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
350 383 q.connect_mon(monitor_url)
351 384 q.daemon=True
352 385 children.append(q)
353 386
354 387 # Control Queue (in a Process)
355 388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
356 q.bind_in(hub.client_info['control'])
357 q.setsockopt_in(zmq.IDENTITY, b'control')
358 q.bind_out(hub.engine_info['control'])
389 q.bind_in(f.client_url('control'))
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
391 q.bind_out(f.engine_url('control'))
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
359 393 q.connect_mon(monitor_url)
360 394 q.daemon=True
361 395 children.append(q)
@@ -368,9 +402,10 b' class IPControllerApp(BaseParallelApplication):'
368 402 self.log.warn("task::using pure DEALER Task scheduler")
369 403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
370 404 # q.setsockopt_out(zmq.HWM, hub.hwm)
371 q.bind_in(hub.client_info['task'][1])
372 q.setsockopt_in(zmq.IDENTITY, b'task')
373 q.bind_out(hub.engine_info['task'])
405 q.bind_in(f.client_url('task'))
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
407 q.bind_out(f.engine_url('task'))
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
374 409 q.connect_mon(monitor_url)
375 410 q.daemon=True
376 411 children.append(q)
@@ -379,8 +414,10 b' class IPControllerApp(BaseParallelApplication):'
379 414
380 415 else:
381 416 self.log.info("task::using Python %s Task scheduler"%scheme)
382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
383 monitor_url, disambiguate_url(hub.client_info['notification']))
417 sargs = (f.client_url('task'), f.engine_url('task'),
418 monitor_url, disambiguate_url(f.client_url('notification')),
419 disambiguate_url(f.client_url('registration')),
420 )
384 421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 422 log_url = self.log_url, config=dict(self.config))
386 423 if 'Process' in self.mq_class:
@@ -45,7 +45,7 b' from IPython.zmq.session import ('
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48 from IPython.parallel.util import disambiguate_url
48 from IPython.parallel.util import disambiguate_ip_address
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.py3compat import cast_bytes
@@ -211,24 +211,36 b' class IPEngineApp(BaseParallelApplication):'
211 211 with open(self.url_file) as f:
212 212 d = json.loads(f.read())
213 213
214 if 'exec_key' in d:
215 config.Session.key = cast_bytes(d['exec_key'])
216
214 # allow hand-override of location for disambiguation
215 # and ssh-server
217 216 try:
218 217 config.EngineFactory.location
219 218 except AttributeError:
220 219 config.EngineFactory.location = d['location']
221 220
222 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
223 try:
224 config.EngineFactory.url
225 except AttributeError:
226 config.EngineFactory.url = d['url']
227
228 221 try:
229 222 config.EngineFactory.sshserver
230 223 except AttributeError:
231 config.EngineFactory.sshserver = d['ssh']
224 config.EngineFactory.sshserver = d.get('ssh')
225
226 location = config.EngineFactory.location
227
228 proto, ip = d['interface'].split('://')
229 ip = disambiguate_ip_address(ip)
230 d['interface'] = '%s://%s' % (proto, ip)
231
232 # DO NOT allow override of basic URLs, serialization, or exec_key
233 # JSON file takes top priority there
234 config.Session.key = cast_bytes(d['exec_key'])
235
236 config.EngineFactory.url = d['interface'] + ':%i' % d['registration']
237
238 config.Session.packer = d['pack']
239 config.Session.unpacker = d['unpack']
240
241 self.log.debug("Config changed:")
242 self.log.debug("%r", config)
243 self.connection_info = d
232 244
233 245 def bind_kernel(self, **kwargs):
234 246 """Promote engine to listening kernel, accessible to frontends."""
@@ -320,7 +332,9 b' class IPEngineApp(BaseParallelApplication):'
320 332 # shell_class = import_item(self.master_config.Global.shell_class)
321 333 # print self.config
322 334 try:
323 self.engine = EngineFactory(config=config, log=self.log)
335 self.engine = EngineFactory(config=config, log=self.log,
336 connection_info=self.connection_info,
337 )
324 338 except:
325 339 self.log.error("Couldn't start the Engine", exc_info=True)
326 340 self.exit(1)
@@ -217,7 +217,9 b' class Client(HasTraits):'
217 217 Parameters
218 218 ----------
219 219
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 url_file : str/unicode; path to ipcontroller-client.json
221 This JSON file should contain all the information needed to connect to a cluster,
222 and is likely the only argument needed.
221 223 Connection information for the Hub's registration. If a json connector
222 224 file is given, then likely no further configuration is necessary.
223 225 [Default: use profile]
@@ -239,14 +241,6 b' class Client(HasTraits):'
239 241 If specified, this will be relayed to the Session for configuration
240 242 username : str
241 243 set username for the session object
242 packer : str (import_string) or callable
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244 function to serialize messages. Must support same input as
245 JSON, and output must be bytes.
246 You can pass a callable directly as `pack`
247 unpacker : str (import_string) or callable
248 The inverse of packer. Only necessary if packer is specified as *not* one
249 of 'json' or 'pickle'.
250 244
251 245 #-------------- ssh related args ----------------
252 246 # These are args for configuring the ssh tunnel to be used
@@ -271,17 +265,6 b' class Client(HasTraits):'
271 265 flag for whether to use paramiko instead of shell ssh for tunneling.
272 266 [default: True on win32, False else]
273 267
274 ------- exec authentication args -------
275 If even localhost is untrusted, you can have some protection against
276 unauthorized execution by signing messages with HMAC digests.
277 Messages are still sent as cleartext, so if someone can snoop your
278 loopback traffic this will not protect your privacy, but will prevent
279 unauthorized execution.
280
281 exec_key : str
282 an authentication key or file containing a key
283 default: None
284
285 268
286 269 Attributes
287 270 ----------
@@ -378,8 +361,8 b' class Client(HasTraits):'
378 361 # don't raise on positional args
379 362 return HasTraits.__new__(self, **kw)
380 363
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
382 context=None, debug=False, exec_key=None,
364 def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None,
365 context=None, debug=False,
383 366 sshserver=None, sshkey=None, password=None, paramiko=None,
384 367 timeout=10, **extra_args
385 368 ):
@@ -391,39 +374,46 b' class Client(HasTraits):'
391 374 context = zmq.Context.instance()
392 375 self._context = context
393 376 self._stop_spinning = Event()
377
378 if 'url_or_file' in extra_args:
379 url_file = extra_args['url_or_file']
380 warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning)
381
382 if url_file and util.is_url(url_file):
383 raise ValueError("single urls cannot be specified, url-files must be used.")
394 384
395 385 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
386
396 387 if self._cd is not None:
397 if url_or_file is None:
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399 if url_or_file is None:
388 if url_file is None:
389 url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
390 if url_file is None:
400 391 raise ValueError(
401 392 "I can't find enough information to connect to a hub!"
402 " Please specify at least one of url_or_file or profile."
393 " Please specify at least one of url_file or profile."
403 394 )
404
405 if not util.is_url(url_or_file):
406 # it's not a url, try for a file
407 if not os.path.exists(url_or_file):
408 if self._cd:
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410 if not os.path.exists(url_or_file):
411 raise IOError("Connection file not found: %r" % url_or_file)
412 with open(url_or_file) as f:
413 cfg = json.loads(f.read())
414 else:
415 cfg = {'url':url_or_file}
395
396 with open(url_file) as f:
397 cfg = json.load(f)
398
399 self._task_scheme = cfg['task_scheme']
416 400
417 401 # sync defaults from args, json:
418 402 if sshserver:
419 403 cfg['ssh'] = sshserver
420 if exec_key:
421 cfg['exec_key'] = exec_key
422 exec_key = cfg['exec_key']
404
423 405 location = cfg.setdefault('location', None)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
425 url = cfg['url']
426 proto,addr,port = util.split_url(url)
406
407 proto,addr = cfg['interface'].split('://')
408 addr = util.disambiguate_ip_address(addr)
409 cfg['interface'] = "%s://%s" % (proto, addr)
410
411 # turn interface,port into full urls:
412 for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'):
413 cfg[key] = cfg['interface'] + ':%i' % cfg[key]
414
415 url = cfg['registration']
416
427 417 if location is not None and addr == '127.0.0.1':
428 418 # location specified, and connection is expected to be local
429 419 if location not in LOCAL_IPS and not sshserver:
@@ -448,7 +438,7 b' class Client(HasTraits):'
448 438 self._ssh = bool(sshserver or sshkey or password)
449 439 if self._ssh and sshserver is None:
450 440 # default to ssh via localhost
451 sshserver = url.split('://')[1].split(':')[0]
441 sshserver = addr
452 442 if self._ssh and password is None:
453 443 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
454 444 password=False
@@ -457,20 +447,18 b' class Client(HasTraits):'
457 447 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458 448
459 449 # configure and construct the session
460 if exec_key is not None:
461 if os.path.isfile(exec_key):
462 extra_args['keyfile'] = exec_key
463 else:
464 exec_key = cast_bytes(exec_key)
465 extra_args['key'] = exec_key
450 extra_args['packer'] = cfg['pack']
451 extra_args['unpacker'] = cfg['unpack']
452 extra_args['key'] = cast_bytes(cfg['exec_key'])
453
466 454 self.session = Session(**extra_args)
467 455
468 456 self._query_socket = self._context.socket(zmq.DEALER)
469 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
457
470 458 if self._ssh:
471 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
459 tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs)
472 460 else:
473 self._query_socket.connect(url)
461 self._query_socket.connect(cfg['registration'])
474 462
475 463 self.session.debug = self.debug
476 464
@@ -520,8 +508,9 b' class Client(HasTraits):'
520 508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 509 for k,v in engines.iteritems():
522 510 eid = int(k)
511 if eid not in self._engines:
512 self._ids.append(eid)
523 513 self._engines[eid] = v
524 self._ids.append(eid)
525 514 self._ids = sorted(self._ids)
526 515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
527 516 self._task_scheme == 'pure' and self._task_socket:
@@ -583,7 +572,7 b' class Client(HasTraits):'
583 572 self._connected=True
584 573
585 574 def connect_socket(s, url):
586 url = util.disambiguate_url(url, self._config['location'])
575 # url = util.disambiguate_url(url, self._config['location'])
587 576 if self._ssh:
588 577 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 578 else:
@@ -600,38 +589,28 b' class Client(HasTraits):'
600 589 idents,msg = self.session.recv(self._query_socket,mode=0)
601 590 if self.debug:
602 591 pprint(msg)
603 msg = Message(msg)
604 content = msg.content
605 self._config['registration'] = dict(content)
606 if content.status == 'ok':
607 ident = self.session.bsession
608 if content.mux:
609 self._mux_socket = self._context.socket(zmq.DEALER)
610 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
611 connect_socket(self._mux_socket, content.mux)
612 if content.task:
613 self._task_scheme, task_addr = content.task
614 self._task_socket = self._context.socket(zmq.DEALER)
615 self._task_socket.setsockopt(zmq.IDENTITY, ident)
616 connect_socket(self._task_socket, task_addr)
617 if content.notification:
618 self._notification_socket = self._context.socket(zmq.SUB)
619 connect_socket(self._notification_socket, content.notification)
620 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
621 # if content.query:
622 # self._query_socket = self._context.socket(zmq.DEALER)
623 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
624 # connect_socket(self._query_socket, content.query)
625 if content.control:
626 self._control_socket = self._context.socket(zmq.DEALER)
627 self._control_socket.setsockopt(zmq.IDENTITY, ident)
628 connect_socket(self._control_socket, content.control)
629 if content.iopub:
630 self._iopub_socket = self._context.socket(zmq.SUB)
631 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
633 connect_socket(self._iopub_socket, content.iopub)
634 self._update_engines(dict(content.engines))
592 content = msg['content']
593 # self._config['registration'] = dict(content)
594 cfg = self._config
595 if content['status'] == 'ok':
596 self._mux_socket = self._context.socket(zmq.DEALER)
597 connect_socket(self._mux_socket, cfg['mux'])
598
599 self._task_socket = self._context.socket(zmq.DEALER)
600 connect_socket(self._task_socket, cfg['task'])
601
602 self._notification_socket = self._context.socket(zmq.SUB)
603 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
604 connect_socket(self._notification_socket, cfg['notification'])
605
606 self._control_socket = self._context.socket(zmq.DEALER)
607 connect_socket(self._control_socket, cfg['control'])
608
609 self._iopub_socket = self._context.socket(zmq.SUB)
610 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
611 connect_socket(self._iopub_socket, cfg['iopub'])
612
613 self._update_engines(dict(content['engines']))
635 614 else:
636 615 self._connected = False
637 616 raise Exception("Failed to connect!")
@@ -674,7 +653,7 b' class Client(HasTraits):'
674 653 """Register a new engine, and update our connection info."""
675 654 content = msg['content']
676 655 eid = content['id']
677 d = {eid : content['queue']}
656 d = {eid : content['uuid']}
678 657 self._update_engines(d)
679 658
680 659 def _unregister_engine(self, msg):
@@ -18,6 +18,8 b' Authors:'
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 import json
22 import os
21 23 import sys
22 24 import time
23 25 from datetime import datetime
@@ -107,17 +109,16 b' class EngineConnector(HasTraits):'
107 109 """A simple object for accessing the various zmq connections of an object.
108 110 Attributes are:
109 111 id (int): engine ID
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's DEALER socket
112 registration (str): identity of registration DEALER socket
113 heartbeat (str): identity of heartbeat DEALER socket
112 uuid (unicode): engine UUID
113 pending: set of msg_ids
114 stallback: DelayedCallback for stalled registration
114 115 """
115 id=Integer(0)
116 queue=CBytes()
117 control=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
116
117 id = Integer(0)
118 uuid = Unicode()
119 pending = Set()
120 stallback = Instance(ioloop.DelayedCallback)
121
121 122
122 123 _db_shortcuts = {
123 124 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
@@ -131,29 +132,29 b' class HubFactory(RegistrationFactory):'
131 132
132 133 # port-pairs for monitoredqueues:
133 134 hb = Tuple(Integer,Integer,config=True,
134 help="""DEALER/SUB Port pair for Engine heartbeats""")
135 help="""PUB/ROUTER Port pair for Engine heartbeats""")
135 136 def _hb_default(self):
136 137 return tuple(util.select_random_ports(2))
137 138
138 139 mux = Tuple(Integer,Integer,config=True,
139 help="""Engine/Client Port pair for MUX queue""")
140 help="""Client/Engine Port pair for MUX queue""")
140 141
141 142 def _mux_default(self):
142 143 return tuple(util.select_random_ports(2))
143 144
144 145 task = Tuple(Integer,Integer,config=True,
145 help="""Engine/Client Port pair for Task queue""")
146 help="""Client/Engine Port pair for Task queue""")
146 147 def _task_default(self):
147 148 return tuple(util.select_random_ports(2))
148 149
149 150 control = Tuple(Integer,Integer,config=True,
150 help="""Engine/Client Port pair for Control queue""")
151 help="""Client/Engine Port pair for Control queue""")
151 152
152 153 def _control_default(self):
153 154 return tuple(util.select_random_ports(2))
154 155
155 156 iopub = Tuple(Integer,Integer,config=True,
156 help="""Engine/Client Port pair for IOPub relay""")
157 help="""Client/Engine Port pair for IOPub relay""")
157 158
158 159 def _iopub_default(self):
159 160 return tuple(util.select_random_ports(2))
@@ -231,38 +232,77 b' class HubFactory(RegistrationFactory):'
231 232 self.heartmonitor.start()
232 233 self.log.info("Heartmonitor started")
233 234
235 def client_url(self, channel):
236 """return full zmq url for a named client channel"""
237 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
238
239 def engine_url(self, channel):
240 """return full zmq url for a named engine channel"""
241 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
242
234 243 def init_hub(self):
235 """construct"""
236 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
237 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
244 """construct Hub object"""
238 245
239 246 ctx = self.context
240 247 loop = self.loop
241 248
249 try:
250 scheme = self.config.TaskScheduler.scheme_name
251 except AttributeError:
252 from .scheduler import TaskScheduler
253 scheme = TaskScheduler.scheme_name.get_default_value()
254
255 # build connection dicts
256 engine = self.engine_info = {
257 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
258 'registration' : self.regport,
259 'control' : self.control[1],
260 'mux' : self.mux[1],
261 'hb_ping' : self.hb[0],
262 'hb_pong' : self.hb[1],
263 'task' : self.task[1],
264 'iopub' : self.iopub[1],
265 }
266
267 client = self.client_info = {
268 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
269 'registration' : self.regport,
270 'control' : self.control[0],
271 'mux' : self.mux[0],
272 'task' : self.task[0],
273 'task_scheme' : scheme,
274 'iopub' : self.iopub[0],
275 'notification' : self.notifier_port,
276 }
277
278 self.log.debug("Hub engine addrs: %s", self.engine_info)
279 self.log.debug("Hub client addrs: %s", self.client_info)
280
242 281 # Registrar socket
243 282 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
244 q.bind(client_iface % self.regport)
245 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
283 q.bind(self.client_url('registration'))
284 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
246 285 if self.client_ip != self.engine_ip:
247 q.bind(engine_iface % self.regport)
248 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
286 q.bind(self.engine_url('registration'))
287 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
249 288
250 289 ### Engine connections ###
251 290
252 291 # heartbeat
253 292 hpub = ctx.socket(zmq.PUB)
254 hpub.bind(engine_iface % self.hb[0])
293 hpub.bind(self.engine_url('hb_ping'))
255 294 hrep = ctx.socket(zmq.ROUTER)
256 hrep.bind(engine_iface % self.hb[1])
295 hrep.bind(self.engine_url('hb_pong'))
257 296 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
258 297 pingstream=ZMQStream(hpub,loop),
259 298 pongstream=ZMQStream(hrep,loop)
260 299 )
261 300
262 301 ### Client connections ###
302
263 303 # Notifier socket
264 304 n = ZMQStream(ctx.socket(zmq.PUB), loop)
265 n.bind(client_iface%self.notifier_port)
305 n.bind(self.client_url('notification'))
266 306
267 307 ### build and launch the queues ###
268 308
@@ -279,35 +319,10 b' class HubFactory(RegistrationFactory):'
279 319 self.db = import_item(str(db_class))(session=self.session.session,
280 320 config=self.config, log=self.log)
281 321 time.sleep(.25)
282 try:
283 scheme = self.config.TaskScheduler.scheme_name
284 except AttributeError:
285 from .scheduler import TaskScheduler
286 scheme = TaskScheduler.scheme_name.get_default_value()
287 # build connection dicts
288 self.engine_info = {
289 'control' : engine_iface%self.control[1],
290 'mux': engine_iface%self.mux[1],
291 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
292 'task' : engine_iface%self.task[1],
293 'iopub' : engine_iface%self.iopub[1],
294 # 'monitor' : engine_iface%self.mon_port,
295 }
296
297 self.client_info = {
298 'control' : client_iface%self.control[0],
299 'mux': client_iface%self.mux[0],
300 'task' : (scheme, client_iface%self.task[0]),
301 'iopub' : client_iface%self.iopub[0],
302 'notification': client_iface%self.notifier_port
303 }
304 self.log.debug("Hub engine addrs: %s", self.engine_info)
305 self.log.debug("Hub client addrs: %s", self.client_info)
306 322
307 323 # resubmit stream
308 324 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
309 url = util.disambiguate_url(self.client_info['task'][-1])
310 r.setsockopt(zmq.IDENTITY, self.session.bsession)
325 url = util.disambiguate_url(self.client_url('task'))
311 326 r.connect(url)
312 327
313 328 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
@@ -335,6 +350,9 b' class Hub(SessionFactory):'
335 350 client_info: dict of zmq connection information for engines to connect
336 351 to the queues.
337 352 """
353
354 engine_state_file = Unicode()
355
338 356 # internal data structures:
339 357 ids=Set() # engine IDs
340 358 keytable=Dict()
@@ -382,15 +400,6 b' class Hub(SessionFactory):'
382 400 super(Hub, self).__init__(**kwargs)
383 401 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
384 402
385 # validate connection dicts:
386 for k,v in self.client_info.iteritems():
387 if k == 'task':
388 util.validate_url_container(v[1])
389 else:
390 util.validate_url_container(v)
391 # util.validate_url_container(self.client_info)
392 util.validate_url_container(self.engine_info)
393
394 403 # register our callbacks
395 404 self.query.on_recv(self.dispatch_query)
396 405 self.monitor.on_recv(self.dispatch_monitor_traffic)
@@ -425,7 +434,7 b' class Hub(SessionFactory):'
425 434 self.resubmit.on_recv(lambda msg: None, copy=False)
426 435
427 436 self.log.info("hub::created hub")
428
437
429 438 @property
430 439 def _next_id(self):
431 440 """gemerate a new ID.
@@ -440,7 +449,7 b' class Hub(SessionFactory):'
440 449 # while newid in self.ids or newid in incoming:
441 450 # newid += 1
442 451 # return newid
443
452
444 453 #-----------------------------------------------------------------------------
445 454 # message validation
446 455 #-----------------------------------------------------------------------------
@@ -556,11 +565,11 b' class Hub(SessionFactory):'
556 565 triggers unregistration"""
557 566 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
558 567 eid = self.hearts.get(heart, None)
559 queue = self.engines[eid].queue
568 uuid = self.engines[eid].uuid
560 569 if eid is None or self.keytable[eid] in self.dead_engines:
561 570 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
562 571 else:
563 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
572 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
564 573
565 574 #----------------------- MUX Queue Traffic ------------------------------
566 575
@@ -585,7 +594,7 b' class Hub(SessionFactory):'
585 594 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
586 595 # Unicode in records
587 596 record['engine_uuid'] = queue_id.decode('ascii')
588 record['client_uuid'] = client_id.decode('ascii')
597 record['client_uuid'] = msg['header']['session']
589 598 record['queue'] = 'mux'
590 599
591 600 try:
@@ -677,7 +686,7 b' class Hub(SessionFactory):'
677 686 return
678 687 record = init_record(msg)
679 688
680 record['client_uuid'] = client_id.decode('ascii')
689 record['client_uuid'] = msg['header']['session']
681 690 record['queue'] = 'task'
682 691 header = msg['header']
683 692 msg_id = header['msg_id']
@@ -865,11 +874,10 b' class Hub(SessionFactory):'
865 874 """Reply with connection addresses for clients."""
866 875 self.log.info("client::client %r connected", client_id)
867 876 content = dict(status='ok')
868 content.update(self.client_info)
869 877 jsonable = {}
870 878 for k,v in self.keytable.iteritems():
871 879 if v not in self.dead_engines:
872 jsonable[str(k)] = v.decode('ascii')
880 jsonable[str(k)] = v
873 881 content['engines'] = jsonable
874 882 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
875 883
@@ -877,48 +885,37 b' class Hub(SessionFactory):'
877 885 """Register a new engine."""
878 886 content = msg['content']
879 887 try:
880 queue = cast_bytes(content['queue'])
888 uuid = content['uuid']
881 889 except KeyError:
882 890 self.log.error("registration::queue not specified", exc_info=True)
883 891 return
884 heart = content.get('heartbeat', None)
885 if heart:
886 heart = cast_bytes(heart)
887 """register a new engine, and create the socket(s) necessary"""
892
888 893 eid = self._next_id
889 # print (eid, queue, reg, heart)
890 894
891 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
895 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
892 896
893 897 content = dict(id=eid,status='ok')
894 content.update(self.engine_info)
895 898 # check if requesting available IDs:
896 if queue in self.by_ident:
897 try:
898 raise KeyError("queue_id %r in use" % queue)
899 except:
900 content = error.wrap_exception()
901 self.log.error("queue_id %r in use", queue, exc_info=True)
902 elif heart in self.hearts: # need to check unique hearts?
899 if cast_bytes(uuid) in self.by_ident:
903 900 try:
904 raise KeyError("heart_id %r in use" % heart)
901 raise KeyError("uuid %r in use" % uuid)
905 902 except:
906 self.log.error("heart_id %r in use", heart, exc_info=True)
907 903 content = error.wrap_exception()
904 self.log.error("uuid %r in use", uuid, exc_info=True)
908 905 else:
909 for h, pack in self.incoming_registrations.iteritems():
910 if heart == h:
906 for h, ec in self.incoming_registrations.iteritems():
907 if uuid == h:
911 908 try:
912 raise KeyError("heart_id %r in use" % heart)
909 raise KeyError("heart_id %r in use" % uuid)
913 910 except:
914 self.log.error("heart_id %r in use", heart, exc_info=True)
911 self.log.error("heart_id %r in use", uuid, exc_info=True)
915 912 content = error.wrap_exception()
916 913 break
917 elif queue == pack[1]:
914 elif uuid == ec.uuid:
918 915 try:
919 raise KeyError("queue_id %r in use" % queue)
916 raise KeyError("uuid %r in use" % uuid)
920 917 except:
921 self.log.error("queue_id %r in use", queue, exc_info=True)
918 self.log.error("uuid %r in use", uuid, exc_info=True)
922 919 content = error.wrap_exception()
923 920 break
924 921
@@ -926,18 +923,21 b' class Hub(SessionFactory):'
926 923 content=content,
927 924 ident=reg)
928 925
926 heart = cast_bytes(uuid)
927
929 928 if content['status'] == 'ok':
930 929 if heart in self.heartmonitor.hearts:
931 930 # already beating
932 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
931 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
933 932 self.finish_registration(heart)
934 933 else:
935 934 purge = lambda : self._purge_stalled_registration(heart)
936 935 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
937 936 dc.start()
938 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
937 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
939 938 else:
940 939 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
940
941 941 return eid
942 942
943 943 def unregister_engine(self, ident, msg):
@@ -950,7 +950,7 b' class Hub(SessionFactory):'
950 950 self.log.info("registration::unregister_engine(%r)", eid)
951 951 # print (eid)
952 952 uuid = self.keytable[eid]
953 content=dict(id=eid, queue=uuid.decode('ascii'))
953 content=dict(id=eid, uuid=uuid)
954 954 self.dead_engines.add(uuid)
955 955 # self.ids.remove(eid)
956 956 # uuid = self.keytable.pop(eid)
@@ -963,6 +963,8 b' class Hub(SessionFactory):'
963 963 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
964 964 dc.start()
965 965 ############## TODO: HANDLE IT ################
966
967 self._save_engine_state()
966 968
967 969 if self.notifier:
968 970 self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -1001,36 +1003,97 b' class Hub(SessionFactory):'
1001 1003 """Second half of engine registration, called after our HeartMonitor
1002 1004 has received a beat from the Engine's Heart."""
1003 1005 try:
1004 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1006 ec = self.incoming_registrations.pop(heart)
1005 1007 except KeyError:
1006 1008 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1007 1009 return
1008 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1009 if purge is not None:
1010 purge.stop()
1011 control = queue
1010 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1011 if ec.stallback is not None:
1012 ec.stallback.stop()
1013 eid = ec.id
1012 1014 self.ids.add(eid)
1013 self.keytable[eid] = queue
1014 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1015 control=control, heartbeat=heart)
1016 self.by_ident[queue] = eid
1015 self.keytable[eid] = ec.uuid
1016 self.engines[eid] = ec
1017 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1017 1018 self.queues[eid] = list()
1018 1019 self.tasks[eid] = list()
1019 1020 self.completed[eid] = list()
1020 1021 self.hearts[heart] = eid
1021 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1022 content = dict(id=eid, uuid=self.engines[eid].uuid)
1022 1023 if self.notifier:
1023 1024 self.session.send(self.notifier, "registration_notification", content=content)
1024 1025 self.log.info("engine::Engine Connected: %i", eid)
1026
1027 self._save_engine_state()
1025 1028
1026 1029 def _purge_stalled_registration(self, heart):
1027 1030 if heart in self.incoming_registrations:
1028 eid = self.incoming_registrations.pop(heart)[0]
1029 self.log.info("registration::purging stalled registration: %i", eid)
1031 ec = self.incoming_registrations.pop(heart)
1032 self.log.info("registration::purging stalled registration: %i", ec.id)
1030 1033 else:
1031 1034 pass
1032 1035
1033 1036 #-------------------------------------------------------------------------
1037 # Engine State
1038 #-------------------------------------------------------------------------
1039
1040
1041 def _cleanup_engine_state_file(self):
1042 """cleanup engine state mapping"""
1043
1044 if os.path.exists(self.engine_state_file):
1045 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1046 try:
1047 os.remove(self.engine_state_file)
1048 except IOError:
1049 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1050
1051
1052 def _save_engine_state(self):
1053 """save engine mapping to JSON file"""
1054 if not self.engine_state_file:
1055 return
1056 self.log.debug("save engine state to %s" % self.engine_state_file)
1057 state = {}
1058 engines = {}
1059 for eid, ec in self.engines.iteritems():
1060 if ec.uuid not in self.dead_engines:
1061 engines[eid] = ec.uuid
1062
1063 state['engines'] = engines
1064
1065 state['next_id'] = self._idcounter
1066
1067 with open(self.engine_state_file, 'w') as f:
1068 json.dump(state, f)
1069
1070
1071 def _load_engine_state(self):
1072 """load engine mapping from JSON file"""
1073 if not os.path.exists(self.engine_state_file):
1074 return
1075
1076 self.log.info("loading engine state from %s" % self.engine_state_file)
1077
1078 with open(self.engine_state_file) as f:
1079 state = json.load(f)
1080
1081 save_notifier = self.notifier
1082 self.notifier = None
1083 for eid, uuid in state['engines'].iteritems():
1084 heart = uuid.encode('ascii')
1085 # start with this heart as current and beating:
1086 self.heartmonitor.responses.add(heart)
1087 self.heartmonitor.hearts.add(heart)
1088
1089 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1090 self.finish_registration(heart)
1091
1092 self.notifier = save_notifier
1093
1094 self._idcounter = state['next_id']
1095
1096 #-------------------------------------------------------------------------
1034 1097 # Client Requests
1035 1098 #-------------------------------------------------------------------------
1036 1099
@@ -1131,7 +1194,7 b' class Hub(SessionFactory):'
1131 1194 except:
1132 1195 reply = error.wrap_exception()
1133 1196 break
1134 uid = self.engines[eid].queue
1197 uid = self.engines[eid].uuid
1135 1198 try:
1136 1199 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1137 1200 except Exception:
@@ -1205,6 +1268,7 b' class Hub(SessionFactory):'
1205 1268 self.db.add_record(msg_id, init_record(msg))
1206 1269 except Exception:
1207 1270 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1271 return finish(error.wrap_exception())
1208 1272
1209 1273 finish(dict(status='ok', resubmitted=resubmitted))
1210 1274
@@ -189,6 +189,7 b' class TaskScheduler(SessionFactory):'
189 189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
190 190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
191 191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
192 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
192 193
193 194 # internals:
194 195 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
@@ -216,6 +217,9 b' class TaskScheduler(SessionFactory):'
216 217 return self.session.bsession
217 218
218 219 def start(self):
220 self.query_stream.on_recv(self.dispatch_query_reply)
221 self.session.send(self.query_stream, "connection_request", {})
222
219 223 self.engine_stream.on_recv(self.dispatch_result, copy=False)
220 224 self.client_stream.on_recv(self.dispatch_submission, copy=False)
221 225
@@ -240,6 +244,24 b' class TaskScheduler(SessionFactory):'
240 244 #-----------------------------------------------------------------------
241 245 # [Un]Registration Handling
242 246 #-----------------------------------------------------------------------
247
248
249 def dispatch_query_reply(self, msg):
250 """handle reply to our initial connection request"""
251 try:
252 idents,msg = self.session.feed_identities(msg)
253 except ValueError:
254 self.log.warn("task::Invalid Message: %r",msg)
255 return
256 try:
257 msg = self.session.unserialize(msg)
258 except ValueError:
259 self.log.warn("task::Unauthorized message from: %r"%idents)
260 return
261
262 content = msg['content']
263 for uuid in content.get('engines', {}).values():
264 self._register_engine(cast_bytes(uuid))
243 265
244 266
245 267 @util.log_errors
@@ -263,7 +285,7 b' class TaskScheduler(SessionFactory):'
263 285 self.log.error("Unhandled message type: %r"%msg_type)
264 286 else:
265 287 try:
266 handler(cast_bytes(msg['content']['queue']))
288 handler(cast_bytes(msg['content']['uuid']))
267 289 except Exception:
268 290 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
269 291
@@ -714,7 +736,7 b' class TaskScheduler(SessionFactory):'
714 736
715 737
716 738
717 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
739 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
718 740 logname='root', log_url=None, loglevel=logging.DEBUG,
719 741 identity=b'task', in_thread=False):
720 742
@@ -734,18 +756,21 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
734 756 ctx = zmq.Context()
735 757 loop = ioloop.IOLoop()
736 758 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
737 ins.setsockopt(zmq.IDENTITY, identity)
759 ins.setsockopt(zmq.IDENTITY, identity + b'_in')
738 760 ins.bind(in_addr)
739 761
740 762 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
741 outs.setsockopt(zmq.IDENTITY, identity)
763 outs.setsockopt(zmq.IDENTITY, identity + b'_out')
742 764 outs.bind(out_addr)
743 765 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
744 766 mons.connect(mon_addr)
745 767 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
746 768 nots.setsockopt(zmq.SUBSCRIBE, b'')
747 769 nots.connect(not_addr)
748
770
771 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
772 querys.connect(reg_addr)
773
749 774 # setup logging.
750 775 if in_thread:
751 776 log = Application.instance().log
@@ -757,6 +782,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
757 782
758 783 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
759 784 mon_stream=mons, notifier_stream=nots,
785 query_stream=querys,
760 786 loop=loop, log=log,
761 787 config=config)
762 788 scheduler.start()
@@ -50,7 +50,7 b' class EngineFactory(RegistrationFactory):'
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 timeout=CFloat(2,config=True,
53 timeout=CFloat(5, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 56 sshserver=Unicode(config=True,
@@ -61,10 +61,11 b' class EngineFactory(RegistrationFactory):'
61 61 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 62
63 63 # not configurable:
64 user_ns=Dict()
65 id=Integer(allow_none=True)
66 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 kernel=Instance(Kernel)
64 connection_info = Dict()
65 user_ns = Dict()
66 id = Integer(allow_none=True)
67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 kernel = Instance(Kernel)
68 69
69 70 bident = CBytes()
70 71 ident = Unicode()
@@ -96,7 +97,7 b' class EngineFactory(RegistrationFactory):'
96 97 def connect(s, url):
97 98 url = disambiguate_url(url, self.location)
98 99 if self.using_ssh:
99 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
100 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
100 101 return tunnel.tunnel_connection(s, url, self.sshserver,
101 102 keyfile=self.sshkey, paramiko=self.paramiko,
102 103 password=password,
@@ -108,12 +109,12 b' class EngineFactory(RegistrationFactory):'
108 109 """like connect, but don't complete the connection (for use by heartbeat)"""
109 110 url = disambiguate_url(url, self.location)
110 111 if self.using_ssh:
111 self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
112 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
112 113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
113 114 keyfile=self.sshkey, paramiko=self.paramiko,
114 115 password=password,
115 116 )
116 return url
117 return str(url)
117 118 return connect, maybe_tunnel
118 119
119 120 def register(self):
@@ -128,10 +129,10 b' class EngineFactory(RegistrationFactory):'
128 129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 130
130 131
131 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(uuid=self.ident)
132 133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 134 # print (self.session.key)
134 self.session.send(self.registrar, "registration_request",content=content)
135 self.session.send(self.registrar, "registration_request", content=content)
135 136
136 137 def complete_registration(self, msg, connect, maybe_tunnel):
137 138 # print msg
@@ -140,50 +141,43 b' class EngineFactory(RegistrationFactory):'
140 141 loop = self.loop
141 142 identity = self.bident
142 143 idents,msg = self.session.feed_identities(msg)
143 msg = Message(self.session.unserialize(msg))
144
145 if msg.content.status == 'ok':
146 self.id = int(msg.content.id)
144 msg = self.session.unserialize(msg)
145 content = msg['content']
146 info = self.connection_info
147
148 def url(key):
149 """get zmq url for given channel"""
150 return str(info["interface"] + ":%i" % info[key])
151
152 if content['status'] == 'ok':
153 self.id = int(content['id'])
147 154
148 155 # launch heartbeat
149 hb_addrs = msg.content.heartbeat
150
151 156 # possibly forward hb ports with tunnels
152 hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
153 heart = Heart(*map(str, hb_addrs), heart_id=identity)
157 hb_ping = maybe_tunnel(url('hb_ping'))
158 hb_pong = maybe_tunnel(url('hb_pong'))
159
160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
154 161 heart.start()
155 162
156 # create Shell Streams (MUX, Task, etc.):
157 queue_addr = msg.content.mux
158 shell_addrs = [ str(queue_addr) ]
159 task_addr = msg.content.task
160 if task_addr:
161 shell_addrs.append(str(task_addr))
162
163 # Uncomment this to go back to two-socket model
164 # shell_streams = []
165 # for addr in shell_addrs:
166 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
167 # stream.setsockopt(zmq.IDENTITY, identity)
168 # stream.connect(disambiguate_url(addr, self.location))
169 # shell_streams.append(stream)
170
171 # Now use only one shell stream for mux and tasks
163 # create Shell Connections (MUX, Task, etc.):
164 shell_addrs = url('mux'), url('task')
165
166 # Use only one shell stream for mux and tasks
172 167 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
173 168 stream.setsockopt(zmq.IDENTITY, identity)
174 169 shell_streams = [stream]
175 170 for addr in shell_addrs:
176 171 connect(stream, addr)
177 # end single stream-socket
178 172
179 173 # control stream:
180 control_addr = str(msg.content.control)
174 control_addr = url('control')
181 175 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
182 176 control_stream.setsockopt(zmq.IDENTITY, identity)
183 177 connect(control_stream, control_addr)
184 178
185 179 # create iopub stream:
186 iopub_addr = msg.content.iopub
180 iopub_addr = url('iopub')
187 181 iopub_socket = ctx.socket(zmq.PUB)
188 182 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 183 connect(iopub_socket, iopub_addr)
@@ -257,9 +257,11 b' class Session(Configurable):'
257 257 if new.lower() == 'json':
258 258 self.pack = json_packer
259 259 self.unpack = json_unpacker
260 self.unpacker = new
260 261 elif new.lower() == 'pickle':
261 262 self.pack = pickle_packer
262 263 self.unpack = pickle_unpacker
264 self.unpacker = new
263 265 else:
264 266 self.pack = import_item(str(new))
265 267
@@ -270,9 +272,11 b' class Session(Configurable):'
270 272 if new.lower() == 'json':
271 273 self.pack = json_packer
272 274 self.unpack = json_unpacker
275 self.packer = new
273 276 elif new.lower() == 'pickle':
274 277 self.pack = pickle_packer
275 278 self.unpack = pickle_unpacker
279 self.packer = new
276 280 else:
277 281 self.unpack = import_item(str(new))
278 282
@@ -43,9 +43,7 b' monitor the survival of the Engine process.'
43 43 Message type: ``registration_request``::
44 44
45 45 content = {
46 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY
47 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY
48 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY
46 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets
49 47 }
50 48
51 49 .. note::
@@ -63,10 +61,6 b' Message type: ``registration_reply``::'
63 61 'status' : 'ok', # or 'error'
64 62 # if ok:
65 63 'id' : 0, # int, the engine id
66 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
67 'control' : 'tcp://...', # addr for control queue
68 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat
69 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
70 64 }
71 65
72 66 Clients use the same socket as engines to start their connections. Connection requests
@@ -84,11 +78,6 b' Message type: ``connection_reply``::'
84 78
85 79 content = {
86 80 'status' : 'ok', # or 'error'
87 # if ok:
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
89 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple)
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
92 81 }
93 82
94 83 Heartbeat
@@ -110,13 +99,14 b' Message type: ``registration_notification``::'
110 99
111 100 content = {
112 101 'id' : 0, # engine ID that has been registered
113 'queue' : 'engine_id' # the IDENT for the engine's queue
102 'uuid' : 'engine_id' # the IDENT for the engine's sockets
114 103 }
115 104
116 105 Message type : ``unregistration_notification``::
117 106
118 107 content = {
119 108 'id' : 0 # engine ID that has been unregistered
109 'uuid' : 'engine_id' # the IDENT for the engine's sockets
120 110 }
121 111
122 112
General Comments 0
You need to be logged in to leave comments. Login now