diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index bfbf121..461988a 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -116,7 +116,10 @@ flags.update({ select one of the true db backends. """), 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, - 'reuse existing json connection files') + 'reuse existing json connection files'), + 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, + 'Attempt to restore engines from a JSON file. ' + 'For use when resuming a crashed controller'), }) flags.update(session_flags) @@ -156,6 +159,10 @@ class IPControllerApp(BaseParallelApplication): If False, connection files will be removed on a clean exit. """ ) + restore_engines = Bool(False, config=True, + help="""Reload engine state from JSON file + """ + ) ssh_server = Unicode(u'', config=True, help="""ssh url for clients to use when connecting to the Controller processes. It should be of the form: [user@]server[:port]. The @@ -343,17 +350,24 @@ class IPControllerApp(BaseParallelApplication): edict.update(base) self.save_connection_dict(self.engine_json_file, edict) + fname = "engines%s.json" % self.cluster_id + self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) + if self.restore_engines: + self.factory.hub._load_engine_state() + def init_schedulers(self): children = self.children mq = import_item(str(self.mq_class)) f = self.factory + ident = f.session.bsession # disambiguate url, in case of * monitor_url = disambiguate_url(f.monitor_url) # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url # IOPub relay (in a Process) q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') q.bind_in(f.client_url('iopub')) + q.setsockopt_in(zmq.IDENTITY, ident+"_iopub") q.bind_out(f.engine_url('iopub')) q.setsockopt_out(zmq.SUBSCRIBE, b'') q.connect_mon(monitor_url) @@ -363,8 +377,9 @@ class IPControllerApp(BaseParallelApplication): # Multiplexer Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') q.bind_in(f.client_url('mux')) - q.setsockopt_in(zmq.IDENTITY, b'mux') + q.setsockopt_in(zmq.IDENTITY, b'mux_in') q.bind_out(f.engine_url('mux')) + q.setsockopt_out(zmq.IDENTITY, b'mux_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -372,8 +387,9 @@ class IPControllerApp(BaseParallelApplication): # Control Queue (in a Process) q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') q.bind_in(f.client_url('control')) - q.setsockopt_in(zmq.IDENTITY, b'control') + q.setsockopt_in(zmq.IDENTITY, b'control_in') q.bind_out(f.engine_url('control')) + q.setsockopt_out(zmq.IDENTITY, b'control_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -387,8 +403,9 @@ class IPControllerApp(BaseParallelApplication): q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') # q.setsockopt_out(zmq.HWM, hub.hwm) q.bind_in(f.client_url('task')) - q.setsockopt_in(zmq.IDENTITY, b'task') + q.setsockopt_in(zmq.IDENTITY, b'task_in') q.bind_out(f.engine_url('task')) + q.setsockopt_out(zmq.IDENTITY, b'task_out') q.connect_mon(monitor_url) q.daemon=True children.append(q) @@ -398,7 +415,9 @@ class IPControllerApp(BaseParallelApplication): else: self.log.info("task::using Python %s Task scheduler"%scheme) sargs = (f.client_url('task'), f.engine_url('task'), - monitor_url, disambiguate_url(f.client_url('notification'))) + monitor_url, disambiguate_url(f.client_url('notification')), + disambiguate_url(f.client_url('registration')), + ) kwargs = dict(logname='scheduler', loglevel=self.log_level, log_url = self.log_url, config=dict(self.config)) if 'Process' in self.mq_class: diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 2d513bf..0b06a36 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -508,8 +508,9 @@ class Client(HasTraits): """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" for k,v in engines.iteritems(): eid = int(k) + if eid not in self._engines: + self._ids.append(eid) self._engines[eid] = v - self._ids.append(eid) self._ids = sorted(self._ids) if sorted(self._engines.keys()) != range(len(self._engines)) and \ self._task_scheme == 'pure' and self._task_socket: @@ -652,7 +653,7 @@ class Client(HasTraits): """Register a new engine, and update our connection info.""" content = msg['content'] eid = content['id'] - d = {eid : content['queue']} + d = {eid : content['uuid']} self._update_engines(d) def _unregister_engine(self, msg): diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 2d1cce6..0516a60 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -18,6 +18,8 @@ Authors: #----------------------------------------------------------------------------- from __future__ import print_function +import json +import os import sys import time from datetime import datetime @@ -107,17 +109,16 @@ class EngineConnector(HasTraits): """A simple object for accessing the various zmq connections of an object. Attributes are: id (int): engine ID - uuid (str): uuid (unused?) - queue (str): identity of queue's DEALER socket - registration (str): identity of registration DEALER socket - heartbeat (str): identity of heartbeat DEALER socket + uuid (unicode): engine UUID + pending: set of msg_ids + stallback: DelayedCallback for stalled registration """ - id=Integer(0) - queue=CBytes() - control=CBytes() - registration=CBytes() - heartbeat=CBytes() - pending=Set() + + id = Integer(0) + uuid = Unicode() + pending = Set() + stallback = Instance(ioloop.DelayedCallback) + _db_shortcuts = { 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', @@ -349,6 +350,9 @@ class Hub(SessionFactory): client_info: dict of zmq connection information for engines to connect to the queues. """ + + engine_state_file = Unicode() + # internal data structures: ids=Set() # engine IDs keytable=Dict() @@ -430,7 +434,7 @@ class Hub(SessionFactory): self.resubmit.on_recv(lambda msg: None, copy=False) self.log.info("hub::created hub") - + @property def _next_id(self): """gemerate a new ID. @@ -445,7 +449,7 @@ class Hub(SessionFactory): # while newid in self.ids or newid in incoming: # newid += 1 # return newid - + #----------------------------------------------------------------------------- # message validation #----------------------------------------------------------------------------- @@ -561,11 +565,11 @@ class Hub(SessionFactory): triggers unregistration""" self.log.debug("heartbeat::handle_heart_failure(%r)", heart) eid = self.hearts.get(heart, None) - queue = self.engines[eid].queue + uuid = self.engines[eid].uuid if eid is None or self.keytable[eid] in self.dead_engines: self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) else: - self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue))) + self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) #----------------------- MUX Queue Traffic ------------------------------ @@ -873,7 +877,7 @@ class Hub(SessionFactory): jsonable = {} for k,v in self.keytable.iteritems(): if v not in self.dead_engines: - jsonable[str(k)] = v.decode('ascii') + jsonable[str(k)] = v content['engines'] = jsonable self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) @@ -881,47 +885,37 @@ class Hub(SessionFactory): """Register a new engine.""" content = msg['content'] try: - queue = cast_bytes(content['queue']) + uuid = content['uuid'] except KeyError: self.log.error("registration::queue not specified", exc_info=True) return - heart = content.get('heartbeat', None) - if heart: - heart = cast_bytes(heart) - """register a new engine, and create the socket(s) necessary""" + eid = self._next_id - # print (eid, queue, reg, heart) - self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart) + self.log.debug("registration::register_engine(%i, %r)", eid, uuid) content = dict(id=eid,status='ok') # check if requesting available IDs: - if queue in self.by_ident: + if uuid in self.by_ident: try: - raise KeyError("queue_id %r in use" % queue) + raise KeyError("uuid %r in use" % uuid) except: content = error.wrap_exception() - self.log.error("queue_id %r in use", queue, exc_info=True) - elif heart in self.hearts: # need to check unique hearts? - try: - raise KeyError("heart_id %r in use" % heart) - except: - self.log.error("heart_id %r in use", heart, exc_info=True) - content = error.wrap_exception() + self.log.error("uuid %r in use", uuid, exc_info=True) else: - for h, pack in self.incoming_registrations.iteritems(): - if heart == h: + for h, ec in self.incoming_registrations.iteritems(): + if uuid == h: try: - raise KeyError("heart_id %r in use" % heart) + raise KeyError("heart_id %r in use" % uuid) except: - self.log.error("heart_id %r in use", heart, exc_info=True) + self.log.error("heart_id %r in use", uuid, exc_info=True) content = error.wrap_exception() break - elif queue == pack[1]: + elif uuid == ec.uuid: try: - raise KeyError("queue_id %r in use" % queue) + raise KeyError("uuid %r in use" % uuid) except: - self.log.error("queue_id %r in use", queue, exc_info=True) + self.log.error("uuid %r in use", uuid, exc_info=True) content = error.wrap_exception() break @@ -929,18 +923,21 @@ class Hub(SessionFactory): content=content, ident=reg) + heart = util.asbytes(uuid) + if content['status'] == 'ok': if heart in self.heartmonitor.hearts: # already beating - self.incoming_registrations[heart] = (eid,queue,reg[0],None) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) self.finish_registration(heart) else: purge = lambda : self._purge_stalled_registration(heart) dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) dc.start() - self.incoming_registrations[heart] = (eid,queue,reg[0],dc) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) else: self.log.error("registration::registration %i failed: %r", eid, content['evalue']) + return eid def unregister_engine(self, ident, msg): @@ -953,7 +950,7 @@ class Hub(SessionFactory): self.log.info("registration::unregister_engine(%r)", eid) # print (eid) uuid = self.keytable[eid] - content=dict(id=eid, queue=uuid.decode('ascii')) + content=dict(id=eid, uuid=uuid) self.dead_engines.add(uuid) # self.ids.remove(eid) # uuid = self.keytable.pop(eid) @@ -966,6 +963,8 @@ class Hub(SessionFactory): dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) dc.start() ############## TODO: HANDLE IT ################ + + self._save_engine_state() if self.notifier: self.session.send(self.notifier, "unregistration_notification", content=content) @@ -1004,36 +1003,97 @@ class Hub(SessionFactory): """Second half of engine registration, called after our HeartMonitor has received a beat from the Engine's Heart.""" try: - (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) + ec = self.incoming_registrations.pop(heart) except KeyError: self.log.error("registration::tried to finish nonexistant registration", exc_info=True) return - self.log.info("registration::finished registering engine %i:%r", eid, queue) - if purge is not None: - purge.stop() - control = queue + self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) + if ec.stallback is not None: + ec.stallback.stop() + eid = ec.id self.ids.add(eid) - self.keytable[eid] = queue - self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, - control=control, heartbeat=heart) - self.by_ident[queue] = eid + self.keytable[eid] = ec.uuid + self.engines[eid] = ec + self.by_ident[ec.uuid] = ec.id self.queues[eid] = list() self.tasks[eid] = list() self.completed[eid] = list() self.hearts[heart] = eid - content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii')) + content = dict(id=eid, uuid=self.engines[eid].uuid) if self.notifier: self.session.send(self.notifier, "registration_notification", content=content) self.log.info("engine::Engine Connected: %i", eid) + + self._save_engine_state() def _purge_stalled_registration(self, heart): if heart in self.incoming_registrations: - eid = self.incoming_registrations.pop(heart)[0] - self.log.info("registration::purging stalled registration: %i", eid) + ec = self.incoming_registrations.pop(heart) + self.log.info("registration::purging stalled registration: %i", ec.id) else: pass #------------------------------------------------------------------------- + # Engine State + #------------------------------------------------------------------------- + + + def _cleanup_engine_state_file(self): + """cleanup engine state mapping""" + + if os.path.exists(self.engine_state_file): + self.log.debug("cleaning up engine state: %s", self.engine_state_file) + try: + os.remove(self.engine_state_file) + except IOError: + self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) + + + def _save_engine_state(self): + """save engine mapping to JSON file""" + if not self.engine_state_file: + return + self.log.debug("save engine state to %s" % self.engine_state_file) + state = {} + engines = {} + for eid, ec in self.engines.iteritems(): + if ec.uuid not in self.dead_engines: + engines[eid] = ec.uuid + + state['engines'] = engines + + state['next_id'] = self._idcounter + + with open(self.engine_state_file, 'w') as f: + json.dump(state, f) + + + def _load_engine_state(self): + """load engine mapping from JSON file""" + if not os.path.exists(self.engine_state_file): + return + + self.log.info("loading engine state from %s" % self.engine_state_file) + + with open(self.engine_state_file) as f: + state = json.load(f) + + save_notifier = self.notifier + self.notifier = None + for eid, uuid in state['engines'].iteritems(): + heart = uuid.encode('ascii') + # start with this heart as current and beating: + self.heartmonitor.responses.add(heart) + self.heartmonitor.hearts.add(heart) + + self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) + self.finish_registration(heart) + + self.notifier = save_notifier + + self._idcounter = state['next_id'] + + #------------------------------------------------------------------------- # Client Requests #------------------------------------------------------------------------- @@ -1134,7 +1194,7 @@ class Hub(SessionFactory): except: reply = error.wrap_exception() break - uid = self.engines[eid].queue + uid = self.engines[eid].uuid try: self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) except Exception: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index f339d3e..34b05ec 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -189,6 +189,7 @@ class TaskScheduler(SessionFactory): engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream + query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream # internals: graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] @@ -216,6 +217,9 @@ class TaskScheduler(SessionFactory): return self.session.bsession def start(self): + self.query_stream.on_recv(self.dispatch_query_reply) + self.session.send(self.query_stream, "connection_request", {}) + self.engine_stream.on_recv(self.dispatch_result, copy=False) self.client_stream.on_recv(self.dispatch_submission, copy=False) @@ -240,6 +244,24 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # [Un]Registration Handling #----------------------------------------------------------------------- + + + def dispatch_query_reply(self, msg): + """handle reply to our initial connection request""" + try: + idents,msg = self.session.feed_identities(msg) + except ValueError: + self.log.warn("task::Invalid Message: %r",msg) + return + try: + msg = self.session.unserialize(msg) + except ValueError: + self.log.warn("task::Unauthorized message from: %r"%idents) + return + + content = msg['content'] + for uuid in content.get('engines', {}).values(): + self._register_engine(asbytes(uuid)) @util.log_errors @@ -263,7 +285,7 @@ class TaskScheduler(SessionFactory): self.log.error("Unhandled message type: %r"%msg_type) else: try: - handler(cast_bytes(msg['content']['queue'])) + handler(cast_bytes(msg['content']['uuid'])) except Exception: self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) @@ -714,7 +736,7 @@ class TaskScheduler(SessionFactory): -def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, +def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, logname='root', log_url=None, loglevel=logging.DEBUG, identity=b'task', in_thread=False): @@ -734,18 +756,21 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, ctx = zmq.Context() loop = ioloop.IOLoop() ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) - ins.setsockopt(zmq.IDENTITY, identity) + ins.setsockopt(zmq.IDENTITY, identity+'_in') ins.bind(in_addr) outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) - outs.setsockopt(zmq.IDENTITY, identity) + outs.setsockopt(zmq.IDENTITY, identity+'_out') outs.bind(out_addr) mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) mons.connect(mon_addr) nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) nots.setsockopt(zmq.SUBSCRIBE, b'') nots.connect(not_addr) - + + querys = ZMQStream(ctx.socket(zmq.DEALER),loop) + querys.connect(reg_addr) + # setup logging. if in_thread: log = Application.instance().log @@ -757,6 +782,7 @@ def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, mon_stream=mons, notifier_stream=nots, + query_stream=querys, loop=loop, log=log, config=config) scheduler.start() diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 79504d9..046d54a 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -129,7 +129,7 @@ class EngineFactory(RegistrationFactory): self.registrar = zmqstream.ZMQStream(reg, self.loop) - content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) + content = dict(uuid=self.ident) self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) # print (self.session.key) self.session.send(self.registrar, "registration_request", content=content) diff --git a/docs/source/development/parallel_messages.txt b/docs/source/development/parallel_messages.txt index 674b27b..50a02bc 100644 --- a/docs/source/development/parallel_messages.txt +++ b/docs/source/development/parallel_messages.txt @@ -43,9 +43,7 @@ monitor the survival of the Engine process. Message type: ``registration_request``:: content = { - 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY - 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY - 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY + 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets } .. note:: @@ -63,10 +61,6 @@ Message type: ``registration_reply``:: 'status' : 'ok', # or 'error' # if ok: 'id' : 0, # int, the engine id - 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue - 'control' : 'tcp://...', # addr for control queue - 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat - 'task' : 'tcp://...', # addr for task queue, or None if no task queue running } Clients use the same socket as engines to start their connections. Connection requests @@ -84,11 +78,6 @@ Message type: ``connection_reply``:: content = { 'status' : 'ok', # or 'error' - # if ok: - 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue - 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) - 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. - 'control' : 'tcp...', # addr for control methods, like abort, etc. } Heartbeat @@ -110,13 +99,14 @@ Message type: ``registration_notification``:: content = { 'id' : 0, # engine ID that has been registered - 'queue' : 'engine_id' # the IDENT for the engine's queue + 'uuid' : 'engine_id' # the IDENT for the engine's sockets } Message type : ``unregistration_notification``:: content = { 'id' : 0 # engine ID that has been unregistered + 'uuid' : 'engine_id' # the IDENT for the engine's sockets }