From 456307a8cd7c68d2aceb757932f77f93ae8d7d58 2012-07-18 14:38:17 From: MinRK Date: 2012-07-18 14:38:17 Subject: [PATCH] enables resume of ipcontroller cleans up EngineConnector and registration messages further, only storing single UUID for each engine (all the stored UUIDs happened to be the same already). Message spec docs updated to reflect changes to message formats. --- diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index bfbf121..461988a 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -116,7 +116,10 @@ flags.update({ select one of the true db backends. """), 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, - 'reuse existing json connection files') + 'reuse existing json connection files'), + 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, + 'Attempt to restore engines from a JSON file. ' + 'For use when resuming a crashed controller'), }) flags.update(session_flags) @@ -156,6 +159,10 @@ class IPControllerApp(BaseParallelApplication): If False, connection files will be removed on a clean exit. """ ) + restore_engines = Bool(False, config=True, + help="""Reload engine state from JSON file + """ + ) ssh_server = Unicode(u'', config=True, help="""ssh url for clients to use when connecting to the Controller processes. It should be of the form: [user@]server[:port]. The @@ -343,17 +350,24 @@ class IPControllerApp(BaseParallelApplication): edict.update(base) self.save_connection_dict(self.engine_json_file, edict) + fname = "engines%s.json" % self.cluster_id + self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) + if self.restore_engines: + self.factory.hub._load_engine_state() + def init_schedulers(self): children = self.children mq = import_item(str(self.mq_class)) f = self.factory + ident = f.session.bsession # disambiguate url, in case of * monitor_url = disambiguate_url(f.monitor_url) # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') q.bind_in(f.client_url('iopub')) + q.setsockopt_in(zmq.IDENTITY, ident+"_iopub") q.bind_out(f.engine_url('iopub')) q.setsockopt_out(zmq.SUBSCRIBE, b'') q.connect_mon(monitor_url) @@ -363,8 +377,9 @@ class IPControllerApp(BaseParallelApplication): # Multiplexer Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') q.bind_in(f.client_url('mux')) - q.setsockopt_in(zmq.IDENTITY, b'mux') + q.setsockopt_in(zmq.IDENTITY, b'mux_in') q.bind_out(f.engine_url('mux')) + q.setsockopt_out(zmq.IDENTITY, b'mux_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -372,8 +387,9 @@ class IPControllerApp(BaseParallelApplication): # Control Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') q.bind_in(f.client_url('control')) - q.setsockopt_in(zmq.IDENTITY, b'control') + q.setsockopt_in(zmq.IDENTITY, b'control_in') q.bind_out(f.engine_url('control')) + q.setsockopt_out(zmq.IDENTITY, b'control_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -387,8 +403,9 @@ class IPControllerApp(BaseParallelApplication): q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') # q.setsockopt_out(zmq.HWM, hub.hwm) q.bind_in(f.client_url('task')) - q.setsockopt_in(zmq.IDENTITY, b'task') + q.setsockopt_in(zmq.IDENTITY, b'task_in') q.bind_out(f.engine_url('task')) + q.setsockopt_out(zmq.IDENTITY, b'task_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -398,7 +415,9 @@ class IPControllerApp(BaseParallelApplication): else: self.log.info("task::using Python %s Task scheduler"%scheme) sargs = (f.client_url('task'), f.engine_url('task'), - monitor_url, disambiguate_url(f.client_url('notification'))) + monitor_url, disambiguate_url(f.client_url('notification')), + disambiguate_url(f.client_url('registration')), + ) kwargs = dict(logname='scheduler', loglevel=self.log_level, log_url = self.log_url, config=dict(self.config)) if 'Process' in self.mq_class: diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 2d513bf..0b06a36 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -508,8 +508,9 @@ class Client(HasTraits): """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" for k,v in engines.iteritems(): eid = int(k) + if eid not in self._engines: + self._ids.append(eid) self._engines[eid] = v - self._ids.append(eid) self._ids = sorted(self._ids) if sorted(self._engines.keys()) != range(len(self._engines)) and \ self._task_scheme == 'pure' and self._task_socket: @@ -652,7 +653,7 @@ class Client(HasTraits): """Register a new engine, and update our connection info.""" content = msg['content'] eid = content['id'] - d = {eid : content['queue']} + d = {eid : content['uuid']} self._update_engines(d) def _unregister_engine(self, msg): diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 2d1cce6..0516a60 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -18,6 +18,8 @@ Authors: #----------------------------------------------------------------------------- from __future__ import print_function +import json +import os import sys import time from datetime import datetime @@ -107,17 +109,16 @@ class EngineConnector(HasTraits): """A simple object for accessing the various zmq connections of an object. Attributes are: id (int): engine ID - uuid (str): uuid (unused?) - queue (str): identity of queue's DEALER socket - registration (str): identity of registration DEALER socket - heartbeat (str): identity of heartbeat DEALER socket + uuid (unicode): engine UUID + pending: set of msg_ids + stallback: DelayedCallback for stalled registration """ - id=Integer(0) - queue=CBytes() - control=CBytes() - registration=CBytes() - heartbeat=CBytes() - pending=Set() + + id = Integer(0) + uuid = Unicode() + pending = Set() + stallback = Instance(ioloop.DelayedCallback) + _db_shortcuts = { 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', @@ -349,6 +350,9 @@ class Hub(SessionFactory): client_info: dict of zmq connection information for engines to connect to the queues. """ + + engine_state_file = Unicode() + # internal data structures: ids=Set() # engine IDs keytable=Dict() @@ -430,7 +434,7 @@ class Hub(SessionFactory): self.resubmit.on_recv(lambda msg: None, copy=False) self.log.info("hub::created hub") - + @property def _next_id(self): """gemerate a new ID. @@ -445,7 +449,7 @@ class Hub(SessionFactory): # while newid in self.ids or newid in incoming: # newid += 1 # return newid - + #----------------------------------------------------------------------------- # message validation #----------------------------------------------------------------------------- @@ -561,11 +565,11 @@ class Hub(SessionFactory): triggers unregistration""" self.log.debug("heartbeat::handle_heart_failure(%r)", heart) eid = self.hearts.get(heart, None) - queue = self.engines[eid].queue + uuid = self.engines[eid].uuid if eid is None or self.keytable[eid] in self.dead_engines: self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) else: - self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) + self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) #----------------------- MUX Queue Traffic ------------------------------ @@ -873,7 +877,7 @@ class Hub(SessionFactory): jsonable = {} for k,v in self.keytable.iteritems(): if v not in self.dead_engines: - jsonable[str(k)] = v.decode('ascii') + jsonable[str(k)] = v content['engines'] = jsonable self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) @@ -881,47 +885,37 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = cast_bytes(content['queue']) + uuid = content['uuid'] except KeyError: self.log.error("registration::queue not specified", exc_info=True) return - heart = content.get('heartbeat', None) - if heart: - heart = cast_bytes(heart) - """register a new engine, and create the socket(s) necessary""" + eid = self._next_id - # print (eid, queue, reg, heart) - self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) + self.log.debug("registration::register_engine(%i, %r)", eid, uuid) content = dict(id=eid,status='ok') # check if requesting available IDs: - if queue in self.by_ident: + if uuid in self.by_ident: try: - raise KeyError("queue_id %r in use" % queue) + raise KeyError("uuid %r in use" % uuid) except: content = error.wrap_exception() - self.log.error("queue_id %r in use", queue, exc_info=True) - elif heart in self.hearts: # need to check unique hearts? - try: - raise KeyError("heart_id %r in use" % heart) - except: - self.log.error("heart_id %r in use", heart, exc_info=True) - content = error.wrap_exception() + self.log.error("uuid %r in use", uuid, exc_info=True) else: - for h, pack in self.incoming_registrations.iteritems(): - if heart == h: + for h, ec in self.incoming_registrations.iteritems(): + if uuid == h: try: - raise KeyError("heart_id %r in use" % heart) + raise KeyError("heart_id %r in use" % uuid) except: - self.log.error("heart_id %r in use", heart, exc_info=True) + self.log.error("heart_id %r in use", uuid, exc_info=True) content = error.wrap_exception() break - elif queue == pack[1]: + elif uuid == ec.uuid: try: - raise KeyError("queue_id %r in use" % queue) + raise KeyError("uuid %r in use" % uuid) except: - self.log.error("queue_id %r in use", queue, exc_info=True) + self.log.error("uuid %r in use", uuid, exc_info=True) content = error.wrap_exception() break @@ -929,18 +923,21 @@ class Hub(SessionFactory): content=content, ident=reg) + heart = util.asbytes(uuid) + if content['status'] == 'ok': if heart in self.heartmonitor.hearts: # already beating - self.incoming_registrations[heart] = (eid,queue,reg[0],None) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) self.finish_registration(heart) else: purge = lambda : self._purge_stalled_registration(heart) dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) dc.start() - self.incoming_registrations[heart] = (eid,queue,reg[0],dc) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) else: self.log.error("registration::registration %i failed: %r", eid, content['evalue']) + return eid def unregister_engine(self, ident, msg): @@ -953,7 +950,7 @@ class Hub(SessionFactory): self.log.info("registration::unregister_engine(%r)", eid) # print (eid) uuid = self.keytable[eid] - content=dict(id=eid, queue=uuid.decode('ascii')) + content=dict(id=eid, uuid=uuid) self.dead_engines.add(uuid) # self.ids.remove(eid) # uuid = self.keytable.pop(eid) @@ -966,6 +963,8 @@ class Hub(SessionFactory): dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) dc.start() ############## TODO: HANDLE IT ################ + + self._save_engine_state() if self.notifier: self.session.send(self.notifier, "unregistration_notification", content=content) @@ -1004,36 +1003,97 @@ class Hub(SessionFactory): """Second half of engine registration, called after our HeartMonitor has received a beat from the Engine's Heart.""" try: - (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) + ec = self.incoming_registrations.pop(heart) except KeyError: self.log.error("registration::tried to finish nonexistant registration", exc_info=True) return - self.log.info("registration::finished registering engine %i:%r", eid, queue) - if purge is not None: - purge.stop() - control = queue + self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) + if ec.stallback is not None: + ec.stallback.stop() + eid = ec.id self.ids.add(eid) - self.keytable[eid] = queue - self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, - control=control, heartbeat=heart) - self.by_ident[queue] = eid + self.keytable[eid] = ec.uuid + self.engines[eid] = ec + self.by_ident[ec.uuid] = ec.id self.queues[eid] = list() self.tasks[eid] = list() self.completed[eid] = list() self.hearts[heart] = eid - content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii')) + content = dict(id=eid, uuid=self.engines[eid].uuid) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) self.log.info("engine::Engine Connected: %i", eid) + + self._save_engine_state() def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: - eid = self.incoming_registrations.pop(heart)[0] - self.log.info("registration::purging stalled registration: %i", eid) + ec = self.incoming_registrations.pop(heart) + self.log.info("registration::purging stalled registration: %i", ec.id) else: pass #------------------------------------------------------------------------- + # Engine State + #------------------------------------------------------------------------- + + + def _cleanup_engine_state_file(self): + """cleanup engine state mapping""" + + if os.path.exists(self.engine_state_file): + self.log.debug("cleaning up engine state: %s", self.engine_state_file) + try: + os.remove(self.engine_state_file) + except IOError: + self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) + + + def _save_engine_state(self): + """save engine mapping to JSON file""" + if not self.engine_state_file: + return + self.log.debug("save engine state to %s" % self.engine_state_file) + state = {} + engines = {} + for eid, ec in self.engines.iteritems(): + if ec.uuid not in self.dead_engines: + engines[eid] = ec.uuid + + state['engines'] = engines + + state['next_id'] = self._idcounter + + with open(self.engine_state_file, 'w') as f: + json.dump(state, f) + + + def _load_engine_state(self): + """load engine mapping from JSON file""" + if not os.path.exists(self.engine_state_file): + return + + self.log.info("loading engine state from %s" % self.engine_state_file) + + with open(self.engine_state_file) as f: + state = json.load(f) + + save_notifier = self.notifier + self.notifier = None + for eid, uuid in state['engines'].iteritems(): + heart = uuid.encode('ascii') + # start with this heart as current and beating: + self.heartmonitor.responses.add(heart) + self.heartmonitor.hearts.add(heart) + + self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) + self.finish_registration(heart) + + self.notifier = save_notifier + + self._idcounter = state['next_id'] + + #------------------------------------------------------------------------- # Client Requests #------------------------------------------------------------------------- @@ -1134,7 +1194,7 @@ class Hub(SessionFactory): except: reply = error.wrap_exception() break - uid = self.engines[eid].queue + uid = self.engines[eid].uuid try: self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) except Exception: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index f339d3e..34b05ec 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -189,6 +189,7 @@ class TaskScheduler(SessionFactory): engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream + query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream # internals: graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] @@ -216,6 +217,9 @@ class TaskScheduler(SessionFactory): return self.session.bsession def start(self): + self.query_stream.on_recv(self.dispatch_query_reply) + self.session.send(self.query_stream, "connection_request", {}) + self.engine_stream.on_recv(self.dispatch_result, copy=False) self.client_stream.on_recv(self.dispatch_submission, copy=False) @@ -240,6 +244,24 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # [Un]Registration Handling #----------------------------------------------------------------------- + + + def dispatch_query_reply(self, msg): + """handle reply to our initial connection request""" + try: + idents,msg = self.session.feed_identities(msg) + except ValueError: + self.log.warn("task::Invalid Message: %r",msg) + return + try: + msg = self.session.unserialize(msg) + except ValueError: + self.log.warn("task::Unauthorized message from: %r"%idents) + return + + content = msg['content'] + for uuid in content.get('engines', {}).values(): + self._register_engine(asbytes(uuid)) @util.log_errors @@ -263,7 +285,7 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(cast_bytes(msg['content']['queue'])) + handler(cast_bytes(msg['content']['uuid'])) except Exception: self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) @@ -714,7 +736,7 @@ class TaskScheduler(SessionFactory): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, logname='root', log_url=None, loglevel=logging.DEBUG, identity=b'task', in_thread=False): @@ -734,18 +756,21 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, ctx = zmq.Context() loop = ioloop.IOLoop() ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) - ins.setsockopt(zmq.IDENTITY, identity) + ins.setsockopt(zmq.IDENTITY, identity+'_in') ins.bind(in_addr) outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) - outs.setsockopt(zmq.IDENTITY, identity) + outs.setsockopt(zmq.IDENTITY, identity+'_out') outs.bind(out_addr) mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) mons.connect(mon_addr) nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) nots.setsockopt(zmq.SUBSCRIBE, b'') nots.connect(not_addr) - + + querys = ZMQStream(ctx.socket(zmq.DEALER),loop) + querys.connect(reg_addr) + # setup logging. if in_thread: log = Application.instance().log @@ -757,6 +782,7 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, + query_stream=querys, loop=loop, log=log, config=config) scheduler.start() diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 79504d9..046d54a 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -129,7 +129,7 @@ class EngineFactory(RegistrationFactory): self.registrar = zmqstream.ZMQStream(reg, self.loop) - content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) + content = dict(uuid=self.ident) self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) # print (self.session.key) self.session.send(self.registrar, "registration_request", content=content) diff --git a/docs/source/development/parallel_messages.txt b/docs/source/development/parallel_messages.txt index 674b27b..50a02bc 100644 --- a/docs/source/development/parallel_messages.txt +++ b/docs/source/development/parallel_messages.txt @@ -43,9 +43,7 @@ monitor the survival of the Engine process. Message type: ``registration_request``:: content = { - 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY - 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY - 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY + 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets } .. note:: @@ -63,10 +61,6 @@ Message type: ``registration_reply``:: 'status' : 'ok', # or 'error' # if ok: 'id' : 0, # int, the engine id - 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue - 'control' : 'tcp://...', # addr for control queue - 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat - 'task' : 'tcp://...', # addr for task queue, or None if no task queue running } Clients use the same socket as engines to start their connections. Connection requests @@ -84,11 +78,6 @@ Message type: ``connection_reply``:: content = { 'status' : 'ok', # or 'error' - # if ok: - 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue - 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) - 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. - 'control' : 'tcp...', # addr for control methods, like abort, etc. } Heartbeat @@ -110,13 +99,14 @@ Message type: ``registration_notification``:: content = { 'id' : 0, # engine ID that has been registered - 'queue' : 'engine_id' # the IDENT for the engine's queue + 'uuid' : 'engine_id' # the IDENT for the engine's sockets } Message type : ``unregistration_notification``:: content = { 'id' : 0 # engine ID that has been unregistered + 'uuid' : 'engine_id' # the IDENT for the engine's sockets }