From bff463b522944975f7cc61c9222b006b85c87bdf 2012-07-21 04:43:07 From: Fernando Perez Date: 2012-07-21 04:43:07 Subject: [PATCH] Merge pull request #1471 from minrk/connection simplify IPython.parallel connections and enable Controller Resume Rolls back two-stage connection info, putting more information into the connection files. This makes it easier to use hand-crafted ssh tunnels, as all ports are read from the file, rather than from the reply to registration/connection requests. It is no longer possible to connect to the Controller without a connection file. Adding the serialization method to the connection file also makes it harder for custom serialization to result in a mismatch in configuration between the various objects. --- diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index 1c36420..28f4eaf 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -116,7 +116,10 @@ flags.update({ select one of the true db backends. """), 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, - 'reuse existing json connection files') + 'reuse existing json connection files'), + 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, + 'Attempt to restore engines from a JSON file. ' + 'For use when resuming a crashed controller'), }) flags.update(session_flags) @@ -156,6 +159,10 @@ class IPControllerApp(BaseParallelApplication): If False, connection files will be removed on a clean exit. """ ) + restore_engines = Bool(False, config=True, + help="""Reload engine state from JSON file + """ + ) ssh_server = Unicode(u'', config=True, help="""ssh url for clients to use when connecting to the Controller processes. It should be of the form: [user@]server[:port]. The @@ -209,21 +216,17 @@ class IPControllerApp(BaseParallelApplication): def save_connection_dict(self, fname, cdict): """save a connection dict to json file.""" c = self.config - url = cdict['url'] + url = cdict['registration'] location = cdict['location'] + if not location: try: - proto,ip,port = split_url(url) - except AssertionError: - pass - else: - try: - location = socket.gethostbyname_ex(socket.gethostname())[2][-1] - except (socket.gaierror, IndexError): - self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1." - " You may need to specify '--location=' to help" - " IPython decide when to connect via loopback.") - location = '127.0.0.1' + location = socket.gethostbyname_ex(socket.gethostname())[2][-1] + except (socket.gaierror, IndexError): + self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1." + " You may need to specify '--location=' to help" + " IPython decide when to connect via loopback.") + location = '127.0.0.1' cdict['location'] = location fname = os.path.join(self.profile_dir.security_dir, fname) self.log.info("writing connection info to %s", fname) @@ -235,35 +238,51 @@ class IPControllerApp(BaseParallelApplication): """load config from existing json connector files.""" c = self.config self.log.debug("loading config from JSON") - # load from engine config + + # load engine config + fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file) self.log.info("loading connection info from %s", fname) with open(fname) as f: - cfg = json.loads(f.read()) - key = cfg['exec_key'] + ecfg = json.loads(f.read()) + # json gives unicode, Session.key wants bytes - c.Session.key = key.encode('ascii') - xport,addr = cfg['url'].split('://') - c.HubFactory.engine_transport = xport - ip,ports = addr.split(':') + c.Session.key = ecfg['exec_key'].encode('ascii') + + xport,ip = ecfg['interface'].split('://') + c.HubFactory.engine_ip = ip - c.HubFactory.regport = int(ports) - self.location = cfg['location'] + c.HubFactory.engine_transport = xport + + self.location = ecfg['location'] if not self.engine_ssh_server: - self.engine_ssh_server = cfg['ssh'] + self.engine_ssh_server = ecfg['ssh'] + # load client config + fname = os.path.join(self.profile_dir.security_dir, self.client_json_file) self.log.info("loading connection info from %s", fname) with open(fname) as f: - cfg = json.loads(f.read()) - assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys" - xport,addr = cfg['url'].split('://') + ccfg = json.loads(f.read()) + + for key in ('exec_key', 'registration', 'pack', 'unpack'): + assert ccfg[key] == ecfg[key], "mismatch between engine and client info: %r" % key + + xport,addr = ccfg['interface'].split('://') + c.HubFactory.client_transport = xport - ip,ports = addr.split(':') c.HubFactory.client_ip = ip if not self.ssh_server: - self.ssh_server = cfg['ssh'] - assert int(ports) == c.HubFactory.regport, "regport mismatch" + self.ssh_server = ccfg['ssh'] + + # load port config: + c.HubFactory.regport = ecfg['registration'] + c.HubFactory.hb = (ecfg['hb_ping'], ecfg['hb_pong']) + c.HubFactory.control = (ccfg['control'], ecfg['control']) + c.HubFactory.mux = (ccfg['mux'], ecfg['mux']) + c.HubFactory.task = (ccfg['task'], ecfg['task']) + c.HubFactory.iopub = (ccfg['iopub'], ecfg['iopub']) + c.HubFactory.notifier_port = ccfg['notification'] def cleanup_connection_files(self): if self.reuse_files: @@ -314,29 +333,42 @@ class IPControllerApp(BaseParallelApplication): if self.write_connection_files: # save to new json config files f = self.factory - cdict = {'exec_key' : f.session.key.decode('ascii'), - 'ssh' : self.ssh_server, - 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport), - 'location' : self.location - } + base = { + 'exec_key' : f.session.key.decode('ascii'), + 'location' : self.location, + 'pack' : f.session.packer, + 'unpack' : f.session.unpacker, + } + + cdict = {'ssh' : self.ssh_server} + cdict.update(f.client_info) + cdict.update(base) self.save_connection_dict(self.client_json_file, cdict) - edict = cdict - edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport)) - edict['ssh'] = self.engine_ssh_server + + edict = {'ssh' : self.engine_ssh_server} + edict.update(f.engine_info) + edict.update(base) self.save_connection_dict(self.engine_json_file, edict) + fname = "engines%s.json" % self.cluster_id + self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) + if self.restore_engines: + self.factory.hub._load_engine_state() + def init_schedulers(self): children = self.children mq = import_item(str(self.mq_class)) - hub = self.factory + f = self.factory + ident = f.session.bsession # disambiguate url, in case of * - monitor_url = disambiguate_url(hub.monitor_url) + monitor_url = disambiguate_url(f.monitor_url) # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') - q.bind_in(hub.client_info['iopub']) - q.bind_out(hub.engine_info['iopub']) + q.bind_in(f.client_url('iopub')) + q.setsockopt_in(zmq.IDENTITY, ident + b"_iopub") + q.bind_out(f.engine_url('iopub')) q.setsockopt_out(zmq.SUBSCRIBE, b'') q.connect_mon(monitor_url) q.daemon=True @@ -344,18 +376,20 @@ class IPControllerApp(BaseParallelApplication): # Multiplexer Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') - q.bind_in(hub.client_info['mux']) - q.setsockopt_in(zmq.IDENTITY, b'mux') - q.bind_out(hub.engine_info['mux']) + q.bind_in(f.client_url('mux')) + q.setsockopt_in(zmq.IDENTITY, b'mux_in') + q.bind_out(f.engine_url('mux')) + q.setsockopt_out(zmq.IDENTITY, b'mux_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) # Control Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') - q.bind_in(hub.client_info['control']) - q.setsockopt_in(zmq.IDENTITY, b'control') - q.bind_out(hub.engine_info['control']) + q.bind_in(f.client_url('control')) + q.setsockopt_in(zmq.IDENTITY, b'control_in') + q.bind_out(f.engine_url('control')) + q.setsockopt_out(zmq.IDENTITY, b'control_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -368,9 +402,10 @@ class IPControllerApp(BaseParallelApplication): self.log.warn("task::using pure DEALER Task scheduler") q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') # q.setsockopt_out(zmq.HWM, hub.hwm) - q.bind_in(hub.client_info['task'][1]) - q.setsockopt_in(zmq.IDENTITY, b'task') - q.bind_out(hub.engine_info['task']) + q.bind_in(f.client_url('task')) + q.setsockopt_in(zmq.IDENTITY, b'task_in') + q.bind_out(f.engine_url('task')) + q.setsockopt_out(zmq.IDENTITY, b'task_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -379,8 +414,10 @@ class IPControllerApp(BaseParallelApplication): else: self.log.info("task::using Python %s Task scheduler"%scheme) - sargs = (hub.client_info['task'][1], hub.engine_info['task'], - monitor_url, disambiguate_url(hub.client_info['notification'])) + sargs = (f.client_url('task'), f.engine_url('task'), + monitor_url, disambiguate_url(f.client_url('notification')), + disambiguate_url(f.client_url('registration')), + ) kwargs = dict(logname='scheduler', loglevel=self.log_level, log_url = self.log_url, config=dict(self.config)) if 'Process' in self.mq_class: diff --git a/IPython/parallel/apps/ipengineapp.py b/IPython/parallel/apps/ipengineapp.py index d24f574..8e4bf22 100755 --- a/IPython/parallel/apps/ipengineapp.py +++ b/IPython/parallel/apps/ipengineapp.py @@ -45,7 +45,7 @@ from IPython.zmq.session import ( from IPython.config.configurable import Configurable from IPython.parallel.engine.engine import EngineFactory -from IPython.parallel.util import disambiguate_url +from IPython.parallel.util import disambiguate_ip_address from IPython.utils.importstring import import_item from IPython.utils.py3compat import cast_bytes @@ -211,24 +211,36 @@ class IPEngineApp(BaseParallelApplication): with open(self.url_file) as f: d = json.loads(f.read()) - if 'exec_key' in d: - config.Session.key = cast_bytes(d['exec_key']) - + # allow hand-override of location for disambiguation + # and ssh-server try: config.EngineFactory.location except AttributeError: config.EngineFactory.location = d['location'] - d['url'] = disambiguate_url(d['url'], config.EngineFactory.location) - try: - config.EngineFactory.url - except AttributeError: - config.EngineFactory.url = d['url'] - try: config.EngineFactory.sshserver except AttributeError: - config.EngineFactory.sshserver = d['ssh'] + config.EngineFactory.sshserver = d.get('ssh') + + location = config.EngineFactory.location + + proto, ip = d['interface'].split('://') + ip = disambiguate_ip_address(ip) + d['interface'] = '%s://%s' % (proto, ip) + + # DO NOT allow override of basic URLs, serialization, or exec_key + # JSON file takes top priority there + config.Session.key = cast_bytes(d['exec_key']) + + config.EngineFactory.url = d['interface'] + ':%i' % d['registration'] + + config.Session.packer = d['pack'] + config.Session.unpacker = d['unpack'] + + self.log.debug("Config changed:") + self.log.debug("%r", config) + self.connection_info = d def bind_kernel(self, **kwargs): """Promote engine to listening kernel, accessible to frontends.""" @@ -320,7 +332,9 @@ class IPEngineApp(BaseParallelApplication): # shell_class = import_item(self.master_config.Global.shell_class) # print self.config try: - self.engine = EngineFactory(config=config, log=self.log) + self.engine = EngineFactory(config=config, log=self.log, + connection_info=self.connection_info, + ) except: self.log.error("Couldn't start the Engine", exc_info=True) self.exit(1) diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 09903e0..a61036e 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -217,7 +217,9 @@ class Client(HasTraits): Parameters ---------- - url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json + url_file : str/unicode; path to ipcontroller-client.json + This JSON file should contain all the information needed to connect to a cluster, + and is likely the only argument needed. Connection information for the Hub's registration. If a json connector file is given, then likely no further configuration is necessary. [Default: use profile] @@ -239,14 +241,6 @@ class Client(HasTraits): If specified, this will be relayed to the Session for configuration username : str set username for the session object - packer : str (import_string) or callable - Can be either the simple keyword 'json' or 'pickle', or an import_string to a - function to serialize messages. Must support same input as - JSON, and output must be bytes. - You can pass a callable directly as `pack` - unpacker : str (import_string) or callable - The inverse of packer. Only necessary if packer is specified as *not* one - of 'json' or 'pickle'. #-------------- ssh related args ---------------- # These are args for configuring the ssh tunnel to be used @@ -271,17 +265,6 @@ class Client(HasTraits): flag for whether to use paramiko instead of shell ssh for tunneling. [default: True on win32, False else] - ------- exec authentication args ------- - If even localhost is untrusted, you can have some protection against - unauthorized execution by signing messages with HMAC digests. - Messages are still sent as cleartext, so if someone can snoop your - loopback traffic this will not protect your privacy, but will prevent - unauthorized execution. - - exec_key : str - an authentication key or file containing a key - default: None - Attributes ---------- @@ -378,8 +361,8 @@ class Client(HasTraits): # don't raise on positional args return HasTraits.__new__(self, **kw) - def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None, - context=None, debug=False, exec_key=None, + def __init__(self, url_file=None, profile=None, profile_dir=None, ipython_dir=None, + context=None, debug=False, sshserver=None, sshkey=None, password=None, paramiko=None, timeout=10, **extra_args ): @@ -391,39 +374,46 @@ class Client(HasTraits): context = zmq.Context.instance() self._context = context self._stop_spinning = Event() + + if 'url_or_file' in extra_args: + url_file = extra_args['url_or_file'] + warnings.warn("url_or_file arg no longer supported, use url_file", DeprecationWarning) + + if url_file and util.is_url(url_file): + raise ValueError("single urls cannot be specified, url-files must be used.") self._setup_profile_dir(self.profile, profile_dir, ipython_dir) + if self._cd is not None: - if url_or_file is None: - url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') - if url_or_file is None: + if url_file is None: + url_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json') + if url_file is None: raise ValueError( "I can't find enough information to connect to a hub!" - " Please specify at least one of url_or_file or profile." + " Please specify at least one of url_file or profile." ) - - if not util.is_url(url_or_file): - # it's not a url, try for a file - if not os.path.exists(url_or_file): - if self._cd: - url_or_file = os.path.join(self._cd.security_dir, url_or_file) - if not os.path.exists(url_or_file): - raise IOError("Connection file not found: %r" % url_or_file) - with open(url_or_file) as f: - cfg = json.loads(f.read()) - else: - cfg = {'url':url_or_file} + + with open(url_file) as f: + cfg = json.load(f) + + self._task_scheme = cfg['task_scheme'] # sync defaults from args, json: if sshserver: cfg['ssh'] = sshserver - if exec_key: - cfg['exec_key'] = exec_key - exec_key = cfg['exec_key'] + location = cfg.setdefault('location', None) - cfg['url'] = util.disambiguate_url(cfg['url'], location) - url = cfg['url'] - proto,addr,port = util.split_url(url) + + proto,addr = cfg['interface'].split('://') + addr = util.disambiguate_ip_address(addr) + cfg['interface'] = "%s://%s" % (proto, addr) + + # turn interface,port into full urls: + for key in ('control', 'task', 'mux', 'iopub', 'notification', 'registration'): + cfg[key] = cfg['interface'] + ':%i' % cfg[key] + + url = cfg['registration'] + if location is not None and addr == '127.0.0.1': # location specified, and connection is expected to be local if location not in LOCAL_IPS and not sshserver: @@ -448,7 +438,7 @@ class Client(HasTraits): self._ssh = bool(sshserver or sshkey or password) if self._ssh and sshserver is None: # default to ssh via localhost - sshserver = url.split('://')[1].split(':')[0] + sshserver = addr if self._ssh and password is None: if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko): password=False @@ -457,20 +447,18 @@ class Client(HasTraits): ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko) # configure and construct the session - if exec_key is not None: - if os.path.isfile(exec_key): - extra_args['keyfile'] = exec_key - else: - exec_key = cast_bytes(exec_key) - extra_args['key'] = exec_key + extra_args['packer'] = cfg['pack'] + extra_args['unpacker'] = cfg['unpack'] + extra_args['key'] = cast_bytes(cfg['exec_key']) + self.session = Session(**extra_args) self._query_socket = self._context.socket(zmq.DEALER) - self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) + if self._ssh: - tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs) + tunnel.tunnel_connection(self._query_socket, cfg['registration'], sshserver, **ssh_kwargs) else: - self._query_socket.connect(url) + self._query_socket.connect(cfg['registration']) self.session.debug = self.debug @@ -520,8 +508,9 @@ class Client(HasTraits): """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" for k,v in engines.iteritems(): eid = int(k) + if eid not in self._engines: + self._ids.append(eid) self._engines[eid] = v - self._ids.append(eid) self._ids = sorted(self._ids) if sorted(self._engines.keys()) != range(len(self._engines)) and \ self._task_scheme == 'pure' and self._task_socket: @@ -583,7 +572,7 @@ class Client(HasTraits): self._connected=True def connect_socket(s, url): - url = util.disambiguate_url(url, self._config['location']) + # url = util.disambiguate_url(url, self._config['location']) if self._ssh: return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs) else: @@ -600,38 +589,28 @@ class Client(HasTraits): idents,msg = self.session.recv(self._query_socket,mode=0) if self.debug: pprint(msg) - msg = Message(msg) - content = msg.content - self._config['registration'] = dict(content) - if content.status == 'ok': - ident = self.session.bsession - if content.mux: - self._mux_socket = self._context.socket(zmq.DEALER) - self._mux_socket.setsockopt(zmq.IDENTITY, ident) - connect_socket(self._mux_socket, content.mux) - if content.task: - self._task_scheme, task_addr = content.task - self._task_socket = self._context.socket(zmq.DEALER) - self._task_socket.setsockopt(zmq.IDENTITY, ident) - connect_socket(self._task_socket, task_addr) - if content.notification: - self._notification_socket = self._context.socket(zmq.SUB) - connect_socket(self._notification_socket, content.notification) - self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') - # if content.query: - # self._query_socket = self._context.socket(zmq.DEALER) - # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession) - # connect_socket(self._query_socket, content.query) - if content.control: - self._control_socket = self._context.socket(zmq.DEALER) - self._control_socket.setsockopt(zmq.IDENTITY, ident) - connect_socket(self._control_socket, content.control) - if content.iopub: - self._iopub_socket = self._context.socket(zmq.SUB) - self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') - self._iopub_socket.setsockopt(zmq.IDENTITY, ident) - connect_socket(self._iopub_socket, content.iopub) - self._update_engines(dict(content.engines)) + content = msg['content'] + # self._config['registration'] = dict(content) + cfg = self._config + if content['status'] == 'ok': + self._mux_socket = self._context.socket(zmq.DEALER) + connect_socket(self._mux_socket, cfg['mux']) + + self._task_socket = self._context.socket(zmq.DEALER) + connect_socket(self._task_socket, cfg['task']) + + self._notification_socket = self._context.socket(zmq.SUB) + self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'') + connect_socket(self._notification_socket, cfg['notification']) + + self._control_socket = self._context.socket(zmq.DEALER) + connect_socket(self._control_socket, cfg['control']) + + self._iopub_socket = self._context.socket(zmq.SUB) + self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'') + connect_socket(self._iopub_socket, cfg['iopub']) + + self._update_engines(dict(content['engines'])) else: self._connected = False raise Exception("Failed to connect!") @@ -674,7 +653,7 @@ class Client(HasTraits): """Register a new engine, and update our connection info.""" content = msg['content'] eid = content['id'] - d = {eid : content['queue']} + d = {eid : content['uuid']} self._update_engines(d) def _unregister_engine(self, msg): diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 9a5c6fb..49cdd67 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -18,6 +18,8 @@ Authors: #----------------------------------------------------------------------------- from __future__ import print_function +import json +import os import sys import time from datetime import datetime @@ -107,17 +109,16 @@ class EngineConnector(HasTraits): """A simple object for accessing the various zmq connections of an object. Attributes are: id (int): engine ID - uuid (str): uuid (unused?) - queue (str): identity of queue's DEALER socket - registration (str): identity of registration DEALER socket - heartbeat (str): identity of heartbeat DEALER socket + uuid (unicode): engine UUID + pending: set of msg_ids + stallback: DelayedCallback for stalled registration """ - id=Integer(0) - queue=CBytes() - control=CBytes() - registration=CBytes() - heartbeat=CBytes() - pending=Set() + + id = Integer(0) + uuid = Unicode() + pending = Set() + stallback = Instance(ioloop.DelayedCallback) + _db_shortcuts = { 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', @@ -131,29 +132,29 @@ class HubFactory(RegistrationFactory): # port-pairs for monitoredqueues: hb = Tuple(Integer,Integer,config=True, - help="""DEALER/SUB Port pair for Engine heartbeats""") + help="""PUB/ROUTER Port pair for Engine heartbeats""") def _hb_default(self): return tuple(util.select_random_ports(2)) mux = Tuple(Integer,Integer,config=True, - help="""Engine/Client Port pair for MUX queue""") + help="""Client/Engine Port pair for MUX queue""") def _mux_default(self): return tuple(util.select_random_ports(2)) task = Tuple(Integer,Integer,config=True, - help="""Engine/Client Port pair for Task queue""") + help="""Client/Engine Port pair for Task queue""") def _task_default(self): return tuple(util.select_random_ports(2)) control = Tuple(Integer,Integer,config=True, - help="""Engine/Client Port pair for Control queue""") + help="""Client/Engine Port pair for Control queue""") def _control_default(self): return tuple(util.select_random_ports(2)) iopub = Tuple(Integer,Integer,config=True, - help="""Engine/Client Port pair for IOPub relay""") + help="""Client/Engine Port pair for IOPub relay""") def _iopub_default(self): return tuple(util.select_random_ports(2)) @@ -231,38 +232,77 @@ class HubFactory(RegistrationFactory): self.heartmonitor.start() self.log.info("Heartmonitor started") + def client_url(self, channel): + """return full zmq url for a named client channel""" + return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel]) + + def engine_url(self, channel): + """return full zmq url for a named engine channel""" + return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel]) + def init_hub(self): - """construct""" - client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i" - engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i" + """construct Hub object""" ctx = self.context loop = self.loop + try: + scheme = self.config.TaskScheduler.scheme_name + except AttributeError: + from .scheduler import TaskScheduler + scheme = TaskScheduler.scheme_name.get_default_value() + + # build connection dicts + engine = self.engine_info = { + 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip), + 'registration' : self.regport, + 'control' : self.control[1], + 'mux' : self.mux[1], + 'hb_ping' : self.hb[0], + 'hb_pong' : self.hb[1], + 'task' : self.task[1], + 'iopub' : self.iopub[1], + } + + client = self.client_info = { + 'interface' : "%s://%s" % (self.client_transport, self.client_ip), + 'registration' : self.regport, + 'control' : self.control[0], + 'mux' : self.mux[0], + 'task' : self.task[0], + 'task_scheme' : scheme, + 'iopub' : self.iopub[0], + 'notification' : self.notifier_port, + } + + self.log.debug("Hub engine addrs: %s", self.engine_info) + self.log.debug("Hub client addrs: %s", self.client_info) + # Registrar socket q = ZMQStream(ctx.socket(zmq.ROUTER), loop) - q.bind(client_iface % self.regport) - self.log.info("Hub listening on %s for registration.", client_iface % self.regport) + q.bind(self.client_url('registration')) + self.log.info("Hub listening on %s for registration.", self.client_url('registration')) if self.client_ip != self.engine_ip: - q.bind(engine_iface % self.regport) - self.log.info("Hub listening on %s for registration.", engine_iface % self.regport) + q.bind(self.engine_url('registration')) + self.log.info("Hub listening on %s for registration.", self.engine_url('registration')) ### Engine connections ### # heartbeat hpub = ctx.socket(zmq.PUB) - hpub.bind(engine_iface % self.hb[0]) + hpub.bind(self.engine_url('hb_ping')) hrep = ctx.socket(zmq.ROUTER) - hrep.bind(engine_iface % self.hb[1]) + hrep.bind(self.engine_url('hb_pong')) self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop) ) ### Client connections ### + # Notifier socket n = ZMQStream(ctx.socket(zmq.PUB), loop) - n.bind(client_iface%self.notifier_port) + n.bind(self.client_url('notification')) ### build and launch the queues ### @@ -279,35 +319,10 @@ class HubFactory(RegistrationFactory): self.db = import_item(str(db_class))(session=self.session.session, config=self.config, log=self.log) time.sleep(.25) - try: - scheme = self.config.TaskScheduler.scheme_name - except AttributeError: - from .scheduler import TaskScheduler - scheme = TaskScheduler.scheme_name.get_default_value() - # build connection dicts - self.engine_info = { - 'control' : engine_iface%self.control[1], - 'mux': engine_iface%self.mux[1], - 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]), - 'task' : engine_iface%self.task[1], - 'iopub' : engine_iface%self.iopub[1], - # 'monitor' : engine_iface%self.mon_port, - } - - self.client_info = { - 'control' : client_iface%self.control[0], - 'mux': client_iface%self.mux[0], - 'task' : (scheme, client_iface%self.task[0]), - 'iopub' : client_iface%self.iopub[0], - 'notification': client_iface%self.notifier_port - } - self.log.debug("Hub engine addrs: %s", self.engine_info) - self.log.debug("Hub client addrs: %s", self.client_info) # resubmit stream r = ZMQStream(ctx.socket(zmq.DEALER), loop) - url = util.disambiguate_url(self.client_info['task'][-1]) - r.setsockopt(zmq.IDENTITY, self.session.bsession) + url = util.disambiguate_url(self.client_url('task')) r.connect(url) self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, @@ -335,6 +350,9 @@ class Hub(SessionFactory): client_info: dict of zmq connection information for engines to connect to the queues. """ + + engine_state_file = Unicode() + # internal data structures: ids=Set() # engine IDs keytable=Dict() @@ -382,15 +400,6 @@ class Hub(SessionFactory): super(Hub, self).__init__(**kwargs) self.registration_timeout = max(5000, 2*self.heartmonitor.period) - # validate connection dicts: - for k,v in self.client_info.iteritems(): - if k == 'task': - util.validate_url_container(v[1]) - else: - util.validate_url_container(v) - # util.validate_url_container(self.client_info) - util.validate_url_container(self.engine_info) - # register our callbacks self.query.on_recv(self.dispatch_query) self.monitor.on_recv(self.dispatch_monitor_traffic) @@ -425,7 +434,7 @@ class Hub(SessionFactory): self.resubmit.on_recv(lambda msg: None, copy=False) self.log.info("hub::created hub") - + @property def _next_id(self): """gemerate a new ID. @@ -440,7 +449,7 @@ class Hub(SessionFactory): # while newid in self.ids or newid in incoming: # newid += 1 # return newid - + #----------------------------------------------------------------------------- # message validation #----------------------------------------------------------------------------- @@ -556,11 +565,11 @@ class Hub(SessionFactory): triggers unregistration""" self.log.debug("heartbeat::handle_heart_failure(%r)", heart) eid = self.hearts.get(heart, None) - queue = self.engines[eid].queue + uuid = self.engines[eid].uuid if eid is None or self.keytable[eid] in self.dead_engines: self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) else: - self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) + self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) #----------------------- MUX Queue Traffic ------------------------------ @@ -585,7 +594,7 @@ class Hub(SessionFactory): self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid) # Unicode in records record['engine_uuid'] = queue_id.decode('ascii') - record['client_uuid'] = client_id.decode('ascii') + record['client_uuid'] = msg['header']['session'] record['queue'] = 'mux' try: @@ -677,7 +686,7 @@ class Hub(SessionFactory): return record = init_record(msg) - record['client_uuid'] = client_id.decode('ascii') + record['client_uuid'] = msg['header']['session'] record['queue'] = 'task' header = msg['header'] msg_id = header['msg_id'] @@ -865,11 +874,10 @@ class Hub(SessionFactory): """Reply with connection addresses for clients.""" self.log.info("client::client %r connected", client_id) content = dict(status='ok') - content.update(self.client_info) jsonable = {} for k,v in self.keytable.iteritems(): if v not in self.dead_engines: - jsonable[str(k)] = v.decode('ascii') + jsonable[str(k)] = v content['engines'] = jsonable self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) @@ -877,48 +885,37 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = cast_bytes(content['queue']) + uuid = content['uuid'] except KeyError: self.log.error("registration::queue not specified", exc_info=True) return - heart = content.get('heartbeat', None) - if heart: - heart = cast_bytes(heart) - """register a new engine, and create the socket(s) necessary""" + eid = self._next_id - # print (eid, queue, reg, heart) - self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) + self.log.debug("registration::register_engine(%i, %r)", eid, uuid) content = dict(id=eid,status='ok') - content.update(self.engine_info) # check if requesting available IDs: - if queue in self.by_ident: - try: - raise KeyError("queue_id %r in use" % queue) - except: - content = error.wrap_exception() - self.log.error("queue_id %r in use", queue, exc_info=True) - elif heart in self.hearts: # need to check unique hearts? + if cast_bytes(uuid) in self.by_ident: try: - raise KeyError("heart_id %r in use" % heart) + raise KeyError("uuid %r in use" % uuid) except: - self.log.error("heart_id %r in use", heart, exc_info=True) content = error.wrap_exception() + self.log.error("uuid %r in use", uuid, exc_info=True) else: - for h, pack in self.incoming_registrations.iteritems(): - if heart == h: + for h, ec in self.incoming_registrations.iteritems(): + if uuid == h: try: - raise KeyError("heart_id %r in use" % heart) + raise KeyError("heart_id %r in use" % uuid) except: - self.log.error("heart_id %r in use", heart, exc_info=True) + self.log.error("heart_id %r in use", uuid, exc_info=True) content = error.wrap_exception() break - elif queue == pack[1]: + elif uuid == ec.uuid: try: - raise KeyError("queue_id %r in use" % queue) + raise KeyError("uuid %r in use" % uuid) except: - self.log.error("queue_id %r in use", queue, exc_info=True) + self.log.error("uuid %r in use", uuid, exc_info=True) content = error.wrap_exception() break @@ -926,18 +923,21 @@ class Hub(SessionFactory): content=content, ident=reg) + heart = cast_bytes(uuid) + if content['status'] == 'ok': if heart in self.heartmonitor.hearts: # already beating - self.incoming_registrations[heart] = (eid,queue,reg[0],None) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) self.finish_registration(heart) else: purge = lambda : self._purge_stalled_registration(heart) dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) dc.start() - self.incoming_registrations[heart] = (eid,queue,reg[0],dc) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) else: self.log.error("registration::registration %i failed: %r", eid, content['evalue']) + return eid def unregister_engine(self, ident, msg): @@ -950,7 +950,7 @@ class Hub(SessionFactory): self.log.info("registration::unregister_engine(%r)", eid) # print (eid) uuid = self.keytable[eid] - content=dict(id=eid, queue=uuid.decode('ascii')) + content=dict(id=eid, uuid=uuid) self.dead_engines.add(uuid) # self.ids.remove(eid) # uuid = self.keytable.pop(eid) @@ -963,6 +963,8 @@ class Hub(SessionFactory): dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) dc.start() ############## TODO: HANDLE IT ################ + + self._save_engine_state() if self.notifier: self.session.send(self.notifier, "unregistration_notification", content=content) @@ -1001,36 +1003,97 @@ class Hub(SessionFactory): """Second half of engine registration, called after our HeartMonitor has received a beat from the Engine's Heart.""" try: - (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) + ec = self.incoming_registrations.pop(heart) except KeyError: self.log.error("registration::tried to finish nonexistant registration", exc_info=True) return - self.log.info("registration::finished registering engine %i:%r", eid, queue) - if purge is not None: - purge.stop() - control = queue + self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) + if ec.stallback is not None: + ec.stallback.stop() + eid = ec.id self.ids.add(eid) - self.keytable[eid] = queue - self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, - control=control, heartbeat=heart) - self.by_ident[queue] = eid + self.keytable[eid] = ec.uuid + self.engines[eid] = ec + self.by_ident[cast_bytes(ec.uuid)] = ec.id self.queues[eid] = list() self.tasks[eid] = list() self.completed[eid] = list() self.hearts[heart] = eid - content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii')) + content = dict(id=eid, uuid=self.engines[eid].uuid) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) self.log.info("engine::Engine Connected: %i", eid) + + self._save_engine_state() def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: - eid = self.incoming_registrations.pop(heart)[0] - self.log.info("registration::purging stalled registration: %i", eid) + ec = self.incoming_registrations.pop(heart) + self.log.info("registration::purging stalled registration: %i", ec.id) else: pass #------------------------------------------------------------------------- + # Engine State + #------------------------------------------------------------------------- + + + def _cleanup_engine_state_file(self): + """cleanup engine state mapping""" + + if os.path.exists(self.engine_state_file): + self.log.debug("cleaning up engine state: %s", self.engine_state_file) + try: + os.remove(self.engine_state_file) + except IOError: + self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) + + + def _save_engine_state(self): + """save engine mapping to JSON file""" + if not self.engine_state_file: + return + self.log.debug("save engine state to %s" % self.engine_state_file) + state = {} + engines = {} + for eid, ec in self.engines.iteritems(): + if ec.uuid not in self.dead_engines: + engines[eid] = ec.uuid + + state['engines'] = engines + + state['next_id'] = self._idcounter + + with open(self.engine_state_file, 'w') as f: + json.dump(state, f) + + + def _load_engine_state(self): + """load engine mapping from JSON file""" + if not os.path.exists(self.engine_state_file): + return + + self.log.info("loading engine state from %s" % self.engine_state_file) + + with open(self.engine_state_file) as f: + state = json.load(f) + + save_notifier = self.notifier + self.notifier = None + for eid, uuid in state['engines'].iteritems(): + heart = uuid.encode('ascii') + # start with this heart as current and beating: + self.heartmonitor.responses.add(heart) + self.heartmonitor.hearts.add(heart) + + self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) + self.finish_registration(heart) + + self.notifier = save_notifier + + self._idcounter = state['next_id'] + + #------------------------------------------------------------------------- # Client Requests #------------------------------------------------------------------------- @@ -1131,7 +1194,7 @@ class Hub(SessionFactory): except: reply = error.wrap_exception() break - uid = self.engines[eid].queue + uid = self.engines[eid].uuid try: self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) except Exception: @@ -1205,6 +1268,7 @@ class Hub(SessionFactory): self.db.add_record(msg_id, init_record(msg)) except Exception: self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True) + return finish(error.wrap_exception()) finish(dict(status='ok', resubmitted=resubmitted)) diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index f339d3e..39e2365 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -189,6 +189,7 @@ class TaskScheduler(SessionFactory): engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream + query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream # internals: graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] @@ -216,6 +217,9 @@ class TaskScheduler(SessionFactory): return self.session.bsession def start(self): + self.query_stream.on_recv(self.dispatch_query_reply) + self.session.send(self.query_stream, "connection_request", {}) + self.engine_stream.on_recv(self.dispatch_result, copy=False) self.client_stream.on_recv(self.dispatch_submission, copy=False) @@ -240,6 +244,24 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # [Un]Registration Handling #----------------------------------------------------------------------- + + + def dispatch_query_reply(self, msg): + """handle reply to our initial connection request""" + try: + idents,msg = self.session.feed_identities(msg) + except ValueError: + self.log.warn("task::Invalid Message: %r",msg) + return + try: + msg = self.session.unserialize(msg) + except ValueError: + self.log.warn("task::Unauthorized message from: %r"%idents) + return + + content = msg['content'] + for uuid in content.get('engines', {}).values(): + self._register_engine(cast_bytes(uuid)) @util.log_errors @@ -263,7 +285,7 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(cast_bytes(msg['content']['queue'])) + handler(cast_bytes(msg['content']['uuid'])) except Exception: self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) @@ -714,7 +736,7 @@ class TaskScheduler(SessionFactory): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, logname='root', log_url=None, loglevel=logging.DEBUG, identity=b'task', in_thread=False): @@ -734,18 +756,21 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, ctx = zmq.Context() loop = ioloop.IOLoop() ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) - ins.setsockopt(zmq.IDENTITY, identity) + ins.setsockopt(zmq.IDENTITY, identity + b'_in') ins.bind(in_addr) outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) - outs.setsockopt(zmq.IDENTITY, identity) + outs.setsockopt(zmq.IDENTITY, identity + b'_out') outs.bind(out_addr) mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) mons.connect(mon_addr) nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) nots.setsockopt(zmq.SUBSCRIBE, b'') nots.connect(not_addr) - + + querys = ZMQStream(ctx.socket(zmq.DEALER),loop) + querys.connect(reg_addr) + # setup logging. if in_thread: log = Application.instance().log @@ -757,6 +782,7 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, + query_stream=querys, loop=loop, log=log, config=config) scheduler.start() diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 136af66..046d54a 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -50,7 +50,7 @@ class EngineFactory(RegistrationFactory): help="""The location (an IP address) of the controller. This is used for disambiguating URLs, to determine whether loopback should be used to connect or the public address.""") - timeout=CFloat(2,config=True, + timeout=CFloat(5, config=True, help="""The time (in seconds) to wait for the Controller to respond to registration requests before giving up.""") sshserver=Unicode(config=True, @@ -61,10 +61,11 @@ class EngineFactory(RegistrationFactory): help="""Whether to use paramiko instead of openssh for tunnels.""") # not configurable: - user_ns=Dict() - id=Integer(allow_none=True) - registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') - kernel=Instance(Kernel) + connection_info = Dict() + user_ns = Dict() + id = Integer(allow_none=True) + registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') + kernel = Instance(Kernel) bident = CBytes() ident = Unicode() @@ -96,7 +97,7 @@ class EngineFactory(RegistrationFactory): def connect(s, url): url = disambiguate_url(url, self.location) if self.using_ssh: - self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver)) + self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) return tunnel.tunnel_connection(s, url, self.sshserver, keyfile=self.sshkey, paramiko=self.paramiko, password=password, @@ -108,12 +109,12 @@ class EngineFactory(RegistrationFactory): """like connect, but don't complete the connection (for use by heartbeat)""" url = disambiguate_url(url, self.location) if self.using_ssh: - self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver)) + self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) url,tunnelobj = tunnel.open_tunnel(url, self.sshserver, keyfile=self.sshkey, paramiko=self.paramiko, password=password, ) - return url + return str(url) return connect, maybe_tunnel def register(self): @@ -128,10 +129,10 @@ class EngineFactory(RegistrationFactory): self.registrar = zmqstream.ZMQStream(reg, self.loop) - content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) + content = dict(uuid=self.ident) self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) # print (self.session.key) - self.session.send(self.registrar, "registration_request",content=content) + self.session.send(self.registrar, "registration_request", content=content) def complete_registration(self, msg, connect, maybe_tunnel): # print msg @@ -140,50 +141,43 @@ class EngineFactory(RegistrationFactory): loop = self.loop identity = self.bident idents,msg = self.session.feed_identities(msg) - msg = Message(self.session.unserialize(msg)) - - if msg.content.status == 'ok': - self.id = int(msg.content.id) + msg = self.session.unserialize(msg) + content = msg['content'] + info = self.connection_info + + def url(key): + """get zmq url for given channel""" + return str(info["interface"] + ":%i" % info[key]) + + if content['status'] == 'ok': + self.id = int(content['id']) # launch heartbeat - hb_addrs = msg.content.heartbeat - # possibly forward hb ports with tunnels - hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ] - heart = Heart(*map(str, hb_addrs), heart_id=identity) + hb_ping = maybe_tunnel(url('hb_ping')) + hb_pong = maybe_tunnel(url('hb_pong')) + + heart = Heart(hb_ping, hb_pong, heart_id=identity) heart.start() - # create Shell Streams (MUX, Task, etc.): - queue_addr = msg.content.mux - shell_addrs = [ str(queue_addr) ] - task_addr = msg.content.task - if task_addr: - shell_addrs.append(str(task_addr)) - - # Uncomment this to go back to two-socket model - # shell_streams = [] - # for addr in shell_addrs: - # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) - # stream.setsockopt(zmq.IDENTITY, identity) - # stream.connect(disambiguate_url(addr, self.location)) - # shell_streams.append(stream) - - # Now use only one shell stream for mux and tasks + # create Shell Connections (MUX, Task, etc.): + shell_addrs = url('mux'), url('task') + + # Use only one shell stream for mux and tasks stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) stream.setsockopt(zmq.IDENTITY, identity) shell_streams = [stream] for addr in shell_addrs: connect(stream, addr) - # end single stream-socket # control stream: - control_addr = str(msg.content.control) + control_addr = url('control') control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) control_stream.setsockopt(zmq.IDENTITY, identity) connect(control_stream, control_addr) # create iopub stream: - iopub_addr = msg.content.iopub + iopub_addr = url('iopub') iopub_socket = ctx.socket(zmq.PUB) iopub_socket.setsockopt(zmq.IDENTITY, identity) connect(iopub_socket, iopub_addr) diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 04d70d2..4cf1ab2 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -257,9 +257,11 @@ class Session(Configurable): if new.lower() == 'json': self.pack = json_packer self.unpack = json_unpacker + self.unpacker = new elif new.lower() == 'pickle': self.pack = pickle_packer self.unpack = pickle_unpacker + self.unpacker = new else: self.pack = import_item(str(new)) @@ -270,9 +272,11 @@ class Session(Configurable): if new.lower() == 'json': self.pack = json_packer self.unpack = json_unpacker + self.packer = new elif new.lower() == 'pickle': self.pack = pickle_packer self.unpack = pickle_unpacker + self.packer = new else: self.unpack = import_item(str(new)) diff --git a/docs/source/development/parallel_messages.txt b/docs/source/development/parallel_messages.txt index 674b27b..50a02bc 100644 --- a/docs/source/development/parallel_messages.txt +++ b/docs/source/development/parallel_messages.txt @@ -43,9 +43,7 @@ monitor the survival of the Engine process. Message type: ``registration_request``:: content = { - 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY - 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY - 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY + 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets } .. note:: @@ -63,10 +61,6 @@ Message type: ``registration_reply``:: 'status' : 'ok', # or 'error' # if ok: 'id' : 0, # int, the engine id - 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue - 'control' : 'tcp://...', # addr for control queue - 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat - 'task' : 'tcp://...', # addr for task queue, or None if no task queue running } Clients use the same socket as engines to start their connections. Connection requests @@ -84,11 +78,6 @@ Message type: ``connection_reply``:: content = { 'status' : 'ok', # or 'error' - # if ok: - 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue - 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) - 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. - 'control' : 'tcp...', # addr for control methods, like abort, etc. } Heartbeat @@ -110,13 +99,14 @@ Message type: ``registration_notification``:: content = { 'id' : 0, # engine ID that has been registered - 'queue' : 'engine_id' # the IDENT for the engine's queue + 'uuid' : 'engine_id' # the IDENT for the engine's sockets } Message type : ``unregistration_notification``:: content = { 'id' : 0 # engine ID that has been unregistered + 'uuid' : 'engine_id' # the IDENT for the engine's sockets }