##// END OF EJS Templates
enables resume of ipcontroller...
MinRK -
Show More
@@ -116,7 +116,10 b' flags.update({'
116 116 select one of the true db backends.
117 117 """),
118 118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119 'reuse existing json connection files')
119 'reuse existing json connection files'),
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
121 'Attempt to restore engines from a JSON file. '
122 'For use when resuming a crashed controller'),
120 123 })
121 124
122 125 flags.update(session_flags)
@@ -156,6 +159,10 b' class IPControllerApp(BaseParallelApplication):'
156 159 If False, connection files will be removed on a clean exit.
157 160 """
158 161 )
162 restore_engines = Bool(False, config=True,
163 help="""Reload engine state from JSON file
164 """
165 )
159 166 ssh_server = Unicode(u'', config=True,
160 167 help="""ssh url for clients to use when connecting to the Controller
161 168 processes. It should be of the form: [user@]server[:port]. The
@@ -343,17 +350,24 b' class IPControllerApp(BaseParallelApplication):'
343 350 edict.update(base)
344 351 self.save_connection_dict(self.engine_json_file, edict)
345 352
353 fname = "engines%s.json" % self.cluster_id
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
355 if self.restore_engines:
356 self.factory.hub._load_engine_state()
357
346 358 def init_schedulers(self):
347 359 children = self.children
348 360 mq = import_item(str(self.mq_class))
349 361
350 362 f = self.factory
363 ident = f.session.bsession
351 364 # disambiguate url, in case of *
352 365 monitor_url = disambiguate_url(f.monitor_url)
353 366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
354 367 # IOPub relay (in a Process)
355 368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
356 369 q.bind_in(f.client_url('iopub'))
370 q.setsockopt_in(zmq.IDENTITY, ident+"_iopub")
357 371 q.bind_out(f.engine_url('iopub'))
358 372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
359 373 q.connect_mon(monitor_url)
@@ -363,8 +377,9 b' class IPControllerApp(BaseParallelApplication):'
363 377 # Multiplexer Queue (in a Process)
364 378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
365 379 q.bind_in(f.client_url('mux'))
366 q.setsockopt_in(zmq.IDENTITY, b'mux')
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
367 381 q.bind_out(f.engine_url('mux'))
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
368 383 q.connect_mon(monitor_url)
369 384 q.daemon=True
370 385 children.append(q)
@@ -372,8 +387,9 b' class IPControllerApp(BaseParallelApplication):'
372 387 # Control Queue (in a Process)
373 388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
374 389 q.bind_in(f.client_url('control'))
375 q.setsockopt_in(zmq.IDENTITY, b'control')
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
376 391 q.bind_out(f.engine_url('control'))
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
377 393 q.connect_mon(monitor_url)
378 394 q.daemon=True
379 395 children.append(q)
@@ -387,8 +403,9 b' class IPControllerApp(BaseParallelApplication):'
387 403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
388 404 # q.setsockopt_out(zmq.HWM, hub.hwm)
389 405 q.bind_in(f.client_url('task'))
390 q.setsockopt_in(zmq.IDENTITY, b'task')
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
391 407 q.bind_out(f.engine_url('task'))
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
392 409 q.connect_mon(monitor_url)
393 410 q.daemon=True
394 411 children.append(q)
@@ -398,7 +415,9 b' class IPControllerApp(BaseParallelApplication):'
398 415 else:
399 416 self.log.info("task::using Python %s Task scheduler"%scheme)
400 417 sargs = (f.client_url('task'), f.engine_url('task'),
401 monitor_url, disambiguate_url(f.client_url('notification')))
418 monitor_url, disambiguate_url(f.client_url('notification')),
419 disambiguate_url(f.client_url('registration')),
420 )
402 421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
403 422 log_url = self.log_url, config=dict(self.config))
404 423 if 'Process' in self.mq_class:
@@ -508,8 +508,9 b' class Client(HasTraits):'
508 508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
509 509 for k,v in engines.iteritems():
510 510 eid = int(k)
511 self._engines[eid] = v
511 if eid not in self._engines:
512 512 self._ids.append(eid)
513 self._engines[eid] = v
513 514 self._ids = sorted(self._ids)
514 515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
515 516 self._task_scheme == 'pure' and self._task_socket:
@@ -652,7 +653,7 b' class Client(HasTraits):'
652 653 """Register a new engine, and update our connection info."""
653 654 content = msg['content']
654 655 eid = content['id']
655 d = {eid : content['queue']}
656 d = {eid : content['uuid']}
656 657 self._update_engines(d)
657 658
658 659 def _unregister_engine(self, msg):
@@ -18,6 +18,8 b' Authors:'
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 import json
22 import os
21 23 import sys
22 24 import time
23 25 from datetime import datetime
@@ -107,17 +109,16 b' class EngineConnector(HasTraits):'
107 109 """A simple object for accessing the various zmq connections of an object.
108 110 Attributes are:
109 111 id (int): engine ID
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's DEALER socket
112 registration (str): identity of registration DEALER socket
113 heartbeat (str): identity of heartbeat DEALER socket
112 uuid (unicode): engine UUID
113 pending: set of msg_ids
114 stallback: DelayedCallback for stalled registration
114 115 """
116
115 117 id=Integer(0)
116 queue=CBytes()
117 control=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
118 uuid = Unicode()
120 119 pending=Set()
120 stallback = Instance(ioloop.DelayedCallback)
121
121 122
122 123 _db_shortcuts = {
123 124 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
@@ -349,6 +350,9 b' class Hub(SessionFactory):'
349 350 client_info: dict of zmq connection information for engines to connect
350 351 to the queues.
351 352 """
353
354 engine_state_file = Unicode()
355
352 356 # internal data structures:
353 357 ids=Set() # engine IDs
354 358 keytable=Dict()
@@ -561,11 +565,11 b' class Hub(SessionFactory):'
561 565 triggers unregistration"""
562 566 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
563 567 eid = self.hearts.get(heart, None)
564 queue = self.engines[eid].queue
568 uuid = self.engines[eid].uuid
565 569 if eid is None or self.keytable[eid] in self.dead_engines:
566 570 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
567 571 else:
568 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
572 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
569 573
570 574 #----------------------- MUX Queue Traffic ------------------------------
571 575
@@ -873,7 +877,7 b' class Hub(SessionFactory):'
873 877 jsonable = {}
874 878 for k,v in self.keytable.iteritems():
875 879 if v not in self.dead_engines:
876 jsonable[str(k)] = v.decode('ascii')
880 jsonable[str(k)] = v
877 881 content['engines'] = jsonable
878 882 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
879 883
@@ -881,47 +885,37 b' class Hub(SessionFactory):'
881 885 """Register a new engine."""
882 886 content = msg['content']
883 887 try:
884 queue = cast_bytes(content['queue'])
888 uuid = content['uuid']
885 889 except KeyError:
886 890 self.log.error("registration::queue not specified", exc_info=True)
887 891 return
888 heart = content.get('heartbeat', None)
889 if heart:
890 heart = cast_bytes(heart)
891 """register a new engine, and create the socket(s) necessary"""
892
892 893 eid = self._next_id
893 # print (eid, queue, reg, heart)
894 894
895 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
895 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
896 896
897 897 content = dict(id=eid,status='ok')
898 898 # check if requesting available IDs:
899 if queue in self.by_ident:
899 if uuid in self.by_ident:
900 900 try:
901 raise KeyError("queue_id %r in use" % queue)
901 raise KeyError("uuid %r in use" % uuid)
902 902 except:
903 903 content = error.wrap_exception()
904 self.log.error("queue_id %r in use", queue, exc_info=True)
905 elif heart in self.hearts: # need to check unique hearts?
906 try:
907 raise KeyError("heart_id %r in use" % heart)
908 except:
909 self.log.error("heart_id %r in use", heart, exc_info=True)
910 content = error.wrap_exception()
904 self.log.error("uuid %r in use", uuid, exc_info=True)
911 905 else:
912 for h, pack in self.incoming_registrations.iteritems():
913 if heart == h:
906 for h, ec in self.incoming_registrations.iteritems():
907 if uuid == h:
914 908 try:
915 raise KeyError("heart_id %r in use" % heart)
909 raise KeyError("heart_id %r in use" % uuid)
916 910 except:
917 self.log.error("heart_id %r in use", heart, exc_info=True)
911 self.log.error("heart_id %r in use", uuid, exc_info=True)
918 912 content = error.wrap_exception()
919 913 break
920 elif queue == pack[1]:
914 elif uuid == ec.uuid:
921 915 try:
922 raise KeyError("queue_id %r in use" % queue)
916 raise KeyError("uuid %r in use" % uuid)
923 917 except:
924 self.log.error("queue_id %r in use", queue, exc_info=True)
918 self.log.error("uuid %r in use", uuid, exc_info=True)
925 919 content = error.wrap_exception()
926 920 break
927 921
@@ -929,18 +923,21 b' class Hub(SessionFactory):'
929 923 content=content,
930 924 ident=reg)
931 925
926 heart = util.asbytes(uuid)
927
932 928 if content['status'] == 'ok':
933 929 if heart in self.heartmonitor.hearts:
934 930 # already beating
935 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
931 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
936 932 self.finish_registration(heart)
937 933 else:
938 934 purge = lambda : self._purge_stalled_registration(heart)
939 935 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
940 936 dc.start()
941 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
937 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
942 938 else:
943 939 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
940
944 941 return eid
945 942
946 943 def unregister_engine(self, ident, msg):
@@ -953,7 +950,7 b' class Hub(SessionFactory):'
953 950 self.log.info("registration::unregister_engine(%r)", eid)
954 951 # print (eid)
955 952 uuid = self.keytable[eid]
956 content=dict(id=eid, queue=uuid.decode('ascii'))
953 content=dict(id=eid, uuid=uuid)
957 954 self.dead_engines.add(uuid)
958 955 # self.ids.remove(eid)
959 956 # uuid = self.keytable.pop(eid)
@@ -967,6 +964,8 b' class Hub(SessionFactory):'
967 964 dc.start()
968 965 ############## TODO: HANDLE IT ################
969 966
967 self._save_engine_state()
968
970 969 if self.notifier:
971 970 self.session.send(self.notifier, "unregistration_notification", content=content)
972 971
@@ -1004,36 +1003,97 b' class Hub(SessionFactory):'
1004 1003 """Second half of engine registration, called after our HeartMonitor
1005 1004 has received a beat from the Engine's Heart."""
1006 1005 try:
1007 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1006 ec = self.incoming_registrations.pop(heart)
1008 1007 except KeyError:
1009 1008 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1010 1009 return
1011 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1012 if purge is not None:
1013 purge.stop()
1014 control = queue
1010 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1011 if ec.stallback is not None:
1012 ec.stallback.stop()
1013 eid = ec.id
1015 1014 self.ids.add(eid)
1016 self.keytable[eid] = queue
1017 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1018 control=control, heartbeat=heart)
1019 self.by_ident[queue] = eid
1015 self.keytable[eid] = ec.uuid
1016 self.engines[eid] = ec
1017 self.by_ident[ec.uuid] = ec.id
1020 1018 self.queues[eid] = list()
1021 1019 self.tasks[eid] = list()
1022 1020 self.completed[eid] = list()
1023 1021 self.hearts[heart] = eid
1024 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1022 content = dict(id=eid, uuid=self.engines[eid].uuid)
1025 1023 if self.notifier:
1026 1024 self.session.send(self.notifier, "registration_notification", content=content)
1027 1025 self.log.info("engine::Engine Connected: %i", eid)
1028 1026
1027 self._save_engine_state()
1028
1029 1029 def _purge_stalled_registration(self, heart):
1030 1030 if heart in self.incoming_registrations:
1031 eid = self.incoming_registrations.pop(heart)[0]
1032 self.log.info("registration::purging stalled registration: %i", eid)
1031 ec = self.incoming_registrations.pop(heart)
1032 self.log.info("registration::purging stalled registration: %i", ec.id)
1033 1033 else:
1034 1034 pass
1035 1035
1036 1036 #-------------------------------------------------------------------------
1037 # Engine State
1038 #-------------------------------------------------------------------------
1039
1040
1041 def _cleanup_engine_state_file(self):
1042 """cleanup engine state mapping"""
1043
1044 if os.path.exists(self.engine_state_file):
1045 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1046 try:
1047 os.remove(self.engine_state_file)
1048 except IOError:
1049 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1050
1051
1052 def _save_engine_state(self):
1053 """save engine mapping to JSON file"""
1054 if not self.engine_state_file:
1055 return
1056 self.log.debug("save engine state to %s" % self.engine_state_file)
1057 state = {}
1058 engines = {}
1059 for eid, ec in self.engines.iteritems():
1060 if ec.uuid not in self.dead_engines:
1061 engines[eid] = ec.uuid
1062
1063 state['engines'] = engines
1064
1065 state['next_id'] = self._idcounter
1066
1067 with open(self.engine_state_file, 'w') as f:
1068 json.dump(state, f)
1069
1070
1071 def _load_engine_state(self):
1072 """load engine mapping from JSON file"""
1073 if not os.path.exists(self.engine_state_file):
1074 return
1075
1076 self.log.info("loading engine state from %s" % self.engine_state_file)
1077
1078 with open(self.engine_state_file) as f:
1079 state = json.load(f)
1080
1081 save_notifier = self.notifier
1082 self.notifier = None
1083 for eid, uuid in state['engines'].iteritems():
1084 heart = uuid.encode('ascii')
1085 # start with this heart as current and beating:
1086 self.heartmonitor.responses.add(heart)
1087 self.heartmonitor.hearts.add(heart)
1088
1089 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1090 self.finish_registration(heart)
1091
1092 self.notifier = save_notifier
1093
1094 self._idcounter = state['next_id']
1095
1096 #-------------------------------------------------------------------------
1037 1097 # Client Requests
1038 1098 #-------------------------------------------------------------------------
1039 1099
@@ -1134,7 +1194,7 b' class Hub(SessionFactory):'
1134 1194 except:
1135 1195 reply = error.wrap_exception()
1136 1196 break
1137 uid = self.engines[eid].queue
1197 uid = self.engines[eid].uuid
1138 1198 try:
1139 1199 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1140 1200 except Exception:
@@ -189,6 +189,7 b' class TaskScheduler(SessionFactory):'
189 189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
190 190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
191 191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
192 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
192 193
193 194 # internals:
194 195 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
@@ -216,6 +217,9 b' class TaskScheduler(SessionFactory):'
216 217 return self.session.bsession
217 218
218 219 def start(self):
220 self.query_stream.on_recv(self.dispatch_query_reply)
221 self.session.send(self.query_stream, "connection_request", {})
222
219 223 self.engine_stream.on_recv(self.dispatch_result, copy=False)
220 224 self.client_stream.on_recv(self.dispatch_submission, copy=False)
221 225
@@ -242,6 +246,24 b' class TaskScheduler(SessionFactory):'
242 246 #-----------------------------------------------------------------------
243 247
244 248
249 def dispatch_query_reply(self, msg):
250 """handle reply to our initial connection request"""
251 try:
252 idents,msg = self.session.feed_identities(msg)
253 except ValueError:
254 self.log.warn("task::Invalid Message: %r",msg)
255 return
256 try:
257 msg = self.session.unserialize(msg)
258 except ValueError:
259 self.log.warn("task::Unauthorized message from: %r"%idents)
260 return
261
262 content = msg['content']
263 for uuid in content.get('engines', {}).values():
264 self._register_engine(asbytes(uuid))
265
266
245 267 @util.log_errors
246 268 def dispatch_notification(self, msg):
247 269 """dispatch register/unregister events."""
@@ -263,7 +285,7 b' class TaskScheduler(SessionFactory):'
263 285 self.log.error("Unhandled message type: %r"%msg_type)
264 286 else:
265 287 try:
266 handler(cast_bytes(msg['content']['queue']))
288 handler(cast_bytes(msg['content']['uuid']))
267 289 except Exception:
268 290 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
269 291
@@ -714,7 +736,7 b' class TaskScheduler(SessionFactory):'
714 736
715 737
716 738
717 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
739 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
718 740 logname='root', log_url=None, loglevel=logging.DEBUG,
719 741 identity=b'task', in_thread=False):
720 742
@@ -734,11 +756,11 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
734 756 ctx = zmq.Context()
735 757 loop = ioloop.IOLoop()
736 758 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
737 ins.setsockopt(zmq.IDENTITY, identity)
759 ins.setsockopt(zmq.IDENTITY, identity+'_in')
738 760 ins.bind(in_addr)
739 761
740 762 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
741 outs.setsockopt(zmq.IDENTITY, identity)
763 outs.setsockopt(zmq.IDENTITY, identity+'_out')
742 764 outs.bind(out_addr)
743 765 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
744 766 mons.connect(mon_addr)
@@ -746,6 +768,9 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
746 768 nots.setsockopt(zmq.SUBSCRIBE, b'')
747 769 nots.connect(not_addr)
748 770
771 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
772 querys.connect(reg_addr)
773
749 774 # setup logging.
750 775 if in_thread:
751 776 log = Application.instance().log
@@ -757,6 +782,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
757 782
758 783 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
759 784 mon_stream=mons, notifier_stream=nots,
785 query_stream=querys,
760 786 loop=loop, log=log,
761 787 config=config)
762 788 scheduler.start()
@@ -129,7 +129,7 b' class EngineFactory(RegistrationFactory):'
129 129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130 130
131 131
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(uuid=self.ident)
133 133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
134 134 # print (self.session.key)
135 135 self.session.send(self.registrar, "registration_request", content=content)
@@ -43,9 +43,7 b' monitor the survival of the Engine process.'
43 43 Message type: ``registration_request``::
44 44
45 45 content = {
46 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY
47 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY
48 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY
46 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets
49 47 }
50 48
51 49 .. note::
@@ -63,10 +61,6 b' Message type: ``registration_reply``::'
63 61 'status' : 'ok', # or 'error'
64 62 # if ok:
65 63 'id' : 0, # int, the engine id
66 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
67 'control' : 'tcp://...', # addr for control queue
68 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat
69 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
70 64 }
71 65
72 66 Clients use the same socket as engines to start their connections. Connection requests
@@ -84,11 +78,6 b' Message type: ``connection_reply``::'
84 78
85 79 content = {
86 80 'status' : 'ok', # or 'error'
87 # if ok:
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
89 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple)
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
92 81 }
93 82
94 83 Heartbeat
@@ -110,13 +99,14 b' Message type: ``registration_notification``::'
110 99
111 100 content = {
112 101 'id' : 0, # engine ID that has been registered
113 'queue' : 'engine_id' # the IDENT for the engine's queue
102 'uuid' : 'engine_id' # the IDENT for the engine's sockets
114 103 }
115 104
116 105 Message type : ``unregistration_notification``::
117 106
118 107 content = {
119 108 'id' : 0 # engine ID that has been unregistered
109 'uuid' : 'engine_id' # the IDENT for the engine's sockets
120 110 }
121 111
122 112
General Comments 0
You need to be logged in to leave comments. Login now