##// END OF EJS Templates
enables resume of ipcontroller...
MinRK -
Show More
@@ -116,7 +116,10 b' flags.update({'
116 select one of the true db backends.
116 select one of the true db backends.
117 """),
117 """),
118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
118 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
119 'reuse existing json connection files')
119 'reuse existing json connection files'),
120 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}},
121 'Attempt to restore engines from a JSON file. '
122 'For use when resuming a crashed controller'),
120 })
123 })
121
124
122 flags.update(session_flags)
125 flags.update(session_flags)
@@ -156,6 +159,10 b' class IPControllerApp(BaseParallelApplication):'
156 If False, connection files will be removed on a clean exit.
159 If False, connection files will be removed on a clean exit.
157 """
160 """
158 )
161 )
162 restore_engines = Bool(False, config=True,
163 help="""Reload engine state from JSON file
164 """
165 )
159 ssh_server = Unicode(u'', config=True,
166 ssh_server = Unicode(u'', config=True,
160 help="""ssh url for clients to use when connecting to the Controller
167 help="""ssh url for clients to use when connecting to the Controller
161 processes. It should be of the form: [user@]server[:port]. The
168 processes. It should be of the form: [user@]server[:port]. The
@@ -343,17 +350,24 b' class IPControllerApp(BaseParallelApplication):'
343 edict.update(base)
350 edict.update(base)
344 self.save_connection_dict(self.engine_json_file, edict)
351 self.save_connection_dict(self.engine_json_file, edict)
345
352
353 fname = "engines%s.json" % self.cluster_id
354 self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname)
355 if self.restore_engines:
356 self.factory.hub._load_engine_state()
357
346 def init_schedulers(self):
358 def init_schedulers(self):
347 children = self.children
359 children = self.children
348 mq = import_item(str(self.mq_class))
360 mq = import_item(str(self.mq_class))
349
361
350 f = self.factory
362 f = self.factory
363 ident = f.session.bsession
351 # disambiguate url, in case of *
364 # disambiguate url, in case of *
352 monitor_url = disambiguate_url(f.monitor_url)
365 monitor_url = disambiguate_url(f.monitor_url)
353 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
366 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
354 # IOPub relay (in a Process)
367 # IOPub relay (in a Process)
355 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
368 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
356 q.bind_in(f.client_url('iopub'))
369 q.bind_in(f.client_url('iopub'))
370 q.setsockopt_in(zmq.IDENTITY, ident+"_iopub")
357 q.bind_out(f.engine_url('iopub'))
371 q.bind_out(f.engine_url('iopub'))
358 q.setsockopt_out(zmq.SUBSCRIBE, b'')
372 q.setsockopt_out(zmq.SUBSCRIBE, b'')
359 q.connect_mon(monitor_url)
373 q.connect_mon(monitor_url)
@@ -363,8 +377,9 b' class IPControllerApp(BaseParallelApplication):'
363 # Multiplexer Queue (in a Process)
377 # Multiplexer Queue (in a Process)
364 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
378 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
365 q.bind_in(f.client_url('mux'))
379 q.bind_in(f.client_url('mux'))
366 q.setsockopt_in(zmq.IDENTITY, b'mux')
380 q.setsockopt_in(zmq.IDENTITY, b'mux_in')
367 q.bind_out(f.engine_url('mux'))
381 q.bind_out(f.engine_url('mux'))
382 q.setsockopt_out(zmq.IDENTITY, b'mux_out')
368 q.connect_mon(monitor_url)
383 q.connect_mon(monitor_url)
369 q.daemon=True
384 q.daemon=True
370 children.append(q)
385 children.append(q)
@@ -372,8 +387,9 b' class IPControllerApp(BaseParallelApplication):'
372 # Control Queue (in a Process)
387 # Control Queue (in a Process)
373 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
388 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
374 q.bind_in(f.client_url('control'))
389 q.bind_in(f.client_url('control'))
375 q.setsockopt_in(zmq.IDENTITY, b'control')
390 q.setsockopt_in(zmq.IDENTITY, b'control_in')
376 q.bind_out(f.engine_url('control'))
391 q.bind_out(f.engine_url('control'))
392 q.setsockopt_out(zmq.IDENTITY, b'control_out')
377 q.connect_mon(monitor_url)
393 q.connect_mon(monitor_url)
378 q.daemon=True
394 q.daemon=True
379 children.append(q)
395 children.append(q)
@@ -387,8 +403,9 b' class IPControllerApp(BaseParallelApplication):'
387 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
403 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
388 # q.setsockopt_out(zmq.HWM, hub.hwm)
404 # q.setsockopt_out(zmq.HWM, hub.hwm)
389 q.bind_in(f.client_url('task'))
405 q.bind_in(f.client_url('task'))
390 q.setsockopt_in(zmq.IDENTITY, b'task')
406 q.setsockopt_in(zmq.IDENTITY, b'task_in')
391 q.bind_out(f.engine_url('task'))
407 q.bind_out(f.engine_url('task'))
408 q.setsockopt_out(zmq.IDENTITY, b'task_out')
392 q.connect_mon(monitor_url)
409 q.connect_mon(monitor_url)
393 q.daemon=True
410 q.daemon=True
394 children.append(q)
411 children.append(q)
@@ -398,7 +415,9 b' class IPControllerApp(BaseParallelApplication):'
398 else:
415 else:
399 self.log.info("task::using Python %s Task scheduler"%scheme)
416 self.log.info("task::using Python %s Task scheduler"%scheme)
400 sargs = (f.client_url('task'), f.engine_url('task'),
417 sargs = (f.client_url('task'), f.engine_url('task'),
401 monitor_url, disambiguate_url(f.client_url('notification')))
418 monitor_url, disambiguate_url(f.client_url('notification')),
419 disambiguate_url(f.client_url('registration')),
420 )
402 kwargs = dict(logname='scheduler', loglevel=self.log_level,
421 kwargs = dict(logname='scheduler', loglevel=self.log_level,
403 log_url = self.log_url, config=dict(self.config))
422 log_url = self.log_url, config=dict(self.config))
404 if 'Process' in self.mq_class:
423 if 'Process' in self.mq_class:
@@ -508,8 +508,9 b' class Client(HasTraits):'
508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
508 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
509 for k,v in engines.iteritems():
509 for k,v in engines.iteritems():
510 eid = int(k)
510 eid = int(k)
511 if eid not in self._engines:
512 self._ids.append(eid)
511 self._engines[eid] = v
513 self._engines[eid] = v
512 self._ids.append(eid)
513 self._ids = sorted(self._ids)
514 self._ids = sorted(self._ids)
514 if sorted(self._engines.keys()) != range(len(self._engines)) and \
515 if sorted(self._engines.keys()) != range(len(self._engines)) and \
515 self._task_scheme == 'pure' and self._task_socket:
516 self._task_scheme == 'pure' and self._task_socket:
@@ -652,7 +653,7 b' class Client(HasTraits):'
652 """Register a new engine, and update our connection info."""
653 """Register a new engine, and update our connection info."""
653 content = msg['content']
654 content = msg['content']
654 eid = content['id']
655 eid = content['id']
655 d = {eid : content['queue']}
656 d = {eid : content['uuid']}
656 self._update_engines(d)
657 self._update_engines(d)
657
658
658 def _unregister_engine(self, msg):
659 def _unregister_engine(self, msg):
@@ -18,6 +18,8 b' Authors:'
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import json
22 import os
21 import sys
23 import sys
22 import time
24 import time
23 from datetime import datetime
25 from datetime import datetime
@@ -107,17 +109,16 b' class EngineConnector(HasTraits):'
107 """A simple object for accessing the various zmq connections of an object.
109 """A simple object for accessing the various zmq connections of an object.
108 Attributes are:
110 Attributes are:
109 id (int): engine ID
111 id (int): engine ID
110 uuid (str): uuid (unused?)
112 uuid (unicode): engine UUID
111 queue (str): identity of queue's DEALER socket
113 pending: set of msg_ids
112 registration (str): identity of registration DEALER socket
114 stallback: DelayedCallback for stalled registration
113 heartbeat (str): identity of heartbeat DEALER socket
114 """
115 """
115 id=Integer(0)
116
116 queue=CBytes()
117 id = Integer(0)
117 control=CBytes()
118 uuid = Unicode()
118 registration=CBytes()
119 pending = Set()
119 heartbeat=CBytes()
120 stallback = Instance(ioloop.DelayedCallback)
120 pending=Set()
121
121
122
122 _db_shortcuts = {
123 _db_shortcuts = {
123 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
124 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
@@ -349,6 +350,9 b' class Hub(SessionFactory):'
349 client_info: dict of zmq connection information for engines to connect
350 client_info: dict of zmq connection information for engines to connect
350 to the queues.
351 to the queues.
351 """
352 """
353
354 engine_state_file = Unicode()
355
352 # internal data structures:
356 # internal data structures:
353 ids=Set() # engine IDs
357 ids=Set() # engine IDs
354 keytable=Dict()
358 keytable=Dict()
@@ -430,7 +434,7 b' class Hub(SessionFactory):'
430 self.resubmit.on_recv(lambda msg: None, copy=False)
434 self.resubmit.on_recv(lambda msg: None, copy=False)
431
435
432 self.log.info("hub::created hub")
436 self.log.info("hub::created hub")
433
437
434 @property
438 @property
435 def _next_id(self):
439 def _next_id(self):
436 """gemerate a new ID.
440 """gemerate a new ID.
@@ -445,7 +449,7 b' class Hub(SessionFactory):'
445 # while newid in self.ids or newid in incoming:
449 # while newid in self.ids or newid in incoming:
446 # newid += 1
450 # newid += 1
447 # return newid
451 # return newid
448
452
449 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
450 # message validation
454 # message validation
451 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
@@ -561,11 +565,11 b' class Hub(SessionFactory):'
561 triggers unregistration"""
565 triggers unregistration"""
562 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
566 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
563 eid = self.hearts.get(heart, None)
567 eid = self.hearts.get(heart, None)
564 queue = self.engines[eid].queue
568 uuid = self.engines[eid].uuid
565 if eid is None or self.keytable[eid] in self.dead_engines:
569 if eid is None or self.keytable[eid] in self.dead_engines:
566 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
570 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
567 else:
571 else:
568 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
572 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
569
573
570 #----------------------- MUX Queue Traffic ------------------------------
574 #----------------------- MUX Queue Traffic ------------------------------
571
575
@@ -873,7 +877,7 b' class Hub(SessionFactory):'
873 jsonable = {}
877 jsonable = {}
874 for k,v in self.keytable.iteritems():
878 for k,v in self.keytable.iteritems():
875 if v not in self.dead_engines:
879 if v not in self.dead_engines:
876 jsonable[str(k)] = v.decode('ascii')
880 jsonable[str(k)] = v
877 content['engines'] = jsonable
881 content['engines'] = jsonable
878 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
882 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
879
883
@@ -881,47 +885,37 b' class Hub(SessionFactory):'
881 """Register a new engine."""
885 """Register a new engine."""
882 content = msg['content']
886 content = msg['content']
883 try:
887 try:
884 queue = cast_bytes(content['queue'])
888 uuid = content['uuid']
885 except KeyError:
889 except KeyError:
886 self.log.error("registration::queue not specified", exc_info=True)
890 self.log.error("registration::queue not specified", exc_info=True)
887 return
891 return
888 heart = content.get('heartbeat', None)
892
889 if heart:
890 heart = cast_bytes(heart)
891 """register a new engine, and create the socket(s) necessary"""
892 eid = self._next_id
893 eid = self._next_id
893 # print (eid, queue, reg, heart)
894
894
895 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
895 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
896
896
897 content = dict(id=eid,status='ok')
897 content = dict(id=eid,status='ok')
898 # check if requesting available IDs:
898 # check if requesting available IDs:
899 if queue in self.by_ident:
899 if uuid in self.by_ident:
900 try:
900 try:
901 raise KeyError("queue_id %r in use" % queue)
901 raise KeyError("uuid %r in use" % uuid)
902 except:
902 except:
903 content = error.wrap_exception()
903 content = error.wrap_exception()
904 self.log.error("queue_id %r in use", queue, exc_info=True)
904 self.log.error("uuid %r in use", uuid, exc_info=True)
905 elif heart in self.hearts: # need to check unique hearts?
906 try:
907 raise KeyError("heart_id %r in use" % heart)
908 except:
909 self.log.error("heart_id %r in use", heart, exc_info=True)
910 content = error.wrap_exception()
911 else:
905 else:
912 for h, pack in self.incoming_registrations.iteritems():
906 for h, ec in self.incoming_registrations.iteritems():
913 if heart == h:
907 if uuid == h:
914 try:
908 try:
915 raise KeyError("heart_id %r in use" % heart)
909 raise KeyError("heart_id %r in use" % uuid)
916 except:
910 except:
917 self.log.error("heart_id %r in use", heart, exc_info=True)
911 self.log.error("heart_id %r in use", uuid, exc_info=True)
918 content = error.wrap_exception()
912 content = error.wrap_exception()
919 break
913 break
920 elif queue == pack[1]:
914 elif uuid == ec.uuid:
921 try:
915 try:
922 raise KeyError("queue_id %r in use" % queue)
916 raise KeyError("uuid %r in use" % uuid)
923 except:
917 except:
924 self.log.error("queue_id %r in use", queue, exc_info=True)
918 self.log.error("uuid %r in use", uuid, exc_info=True)
925 content = error.wrap_exception()
919 content = error.wrap_exception()
926 break
920 break
927
921
@@ -929,18 +923,21 b' class Hub(SessionFactory):'
929 content=content,
923 content=content,
930 ident=reg)
924 ident=reg)
931
925
926 heart = util.asbytes(uuid)
927
932 if content['status'] == 'ok':
928 if content['status'] == 'ok':
933 if heart in self.heartmonitor.hearts:
929 if heart in self.heartmonitor.hearts:
934 # already beating
930 # already beating
935 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
931 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
936 self.finish_registration(heart)
932 self.finish_registration(heart)
937 else:
933 else:
938 purge = lambda : self._purge_stalled_registration(heart)
934 purge = lambda : self._purge_stalled_registration(heart)
939 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
935 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
940 dc.start()
936 dc.start()
941 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
937 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
942 else:
938 else:
943 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
939 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
940
944 return eid
941 return eid
945
942
946 def unregister_engine(self, ident, msg):
943 def unregister_engine(self, ident, msg):
@@ -953,7 +950,7 b' class Hub(SessionFactory):'
953 self.log.info("registration::unregister_engine(%r)", eid)
950 self.log.info("registration::unregister_engine(%r)", eid)
954 # print (eid)
951 # print (eid)
955 uuid = self.keytable[eid]
952 uuid = self.keytable[eid]
956 content=dict(id=eid, queue=uuid.decode('ascii'))
953 content=dict(id=eid, uuid=uuid)
957 self.dead_engines.add(uuid)
954 self.dead_engines.add(uuid)
958 # self.ids.remove(eid)
955 # self.ids.remove(eid)
959 # uuid = self.keytable.pop(eid)
956 # uuid = self.keytable.pop(eid)
@@ -966,6 +963,8 b' class Hub(SessionFactory):'
966 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
963 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
967 dc.start()
964 dc.start()
968 ############## TODO: HANDLE IT ################
965 ############## TODO: HANDLE IT ################
966
967 self._save_engine_state()
969
968
970 if self.notifier:
969 if self.notifier:
971 self.session.send(self.notifier, "unregistration_notification", content=content)
970 self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -1004,36 +1003,97 b' class Hub(SessionFactory):'
1004 """Second half of engine registration, called after our HeartMonitor
1003 """Second half of engine registration, called after our HeartMonitor
1005 has received a beat from the Engine's Heart."""
1004 has received a beat from the Engine's Heart."""
1006 try:
1005 try:
1007 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1006 ec = self.incoming_registrations.pop(heart)
1008 except KeyError:
1007 except KeyError:
1009 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1008 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1010 return
1009 return
1011 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1010 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1012 if purge is not None:
1011 if ec.stallback is not None:
1013 purge.stop()
1012 ec.stallback.stop()
1014 control = queue
1013 eid = ec.id
1015 self.ids.add(eid)
1014 self.ids.add(eid)
1016 self.keytable[eid] = queue
1015 self.keytable[eid] = ec.uuid
1017 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1016 self.engines[eid] = ec
1018 control=control, heartbeat=heart)
1017 self.by_ident[ec.uuid] = ec.id
1019 self.by_ident[queue] = eid
1020 self.queues[eid] = list()
1018 self.queues[eid] = list()
1021 self.tasks[eid] = list()
1019 self.tasks[eid] = list()
1022 self.completed[eid] = list()
1020 self.completed[eid] = list()
1023 self.hearts[heart] = eid
1021 self.hearts[heart] = eid
1024 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1022 content = dict(id=eid, uuid=self.engines[eid].uuid)
1025 if self.notifier:
1023 if self.notifier:
1026 self.session.send(self.notifier, "registration_notification", content=content)
1024 self.session.send(self.notifier, "registration_notification", content=content)
1027 self.log.info("engine::Engine Connected: %i", eid)
1025 self.log.info("engine::Engine Connected: %i", eid)
1026
1027 self._save_engine_state()
1028
1028
1029 def _purge_stalled_registration(self, heart):
1029 def _purge_stalled_registration(self, heart):
1030 if heart in self.incoming_registrations:
1030 if heart in self.incoming_registrations:
1031 eid = self.incoming_registrations.pop(heart)[0]
1031 ec = self.incoming_registrations.pop(heart)
1032 self.log.info("registration::purging stalled registration: %i", eid)
1032 self.log.info("registration::purging stalled registration: %i", ec.id)
1033 else:
1033 else:
1034 pass
1034 pass
1035
1035
1036 #-------------------------------------------------------------------------
1036 #-------------------------------------------------------------------------
1037 # Engine State
1038 #-------------------------------------------------------------------------
1039
1040
1041 def _cleanup_engine_state_file(self):
1042 """cleanup engine state mapping"""
1043
1044 if os.path.exists(self.engine_state_file):
1045 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1046 try:
1047 os.remove(self.engine_state_file)
1048 except IOError:
1049 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1050
1051
1052 def _save_engine_state(self):
1053 """save engine mapping to JSON file"""
1054 if not self.engine_state_file:
1055 return
1056 self.log.debug("save engine state to %s" % self.engine_state_file)
1057 state = {}
1058 engines = {}
1059 for eid, ec in self.engines.iteritems():
1060 if ec.uuid not in self.dead_engines:
1061 engines[eid] = ec.uuid
1062
1063 state['engines'] = engines
1064
1065 state['next_id'] = self._idcounter
1066
1067 with open(self.engine_state_file, 'w') as f:
1068 json.dump(state, f)
1069
1070
1071 def _load_engine_state(self):
1072 """load engine mapping from JSON file"""
1073 if not os.path.exists(self.engine_state_file):
1074 return
1075
1076 self.log.info("loading engine state from %s" % self.engine_state_file)
1077
1078 with open(self.engine_state_file) as f:
1079 state = json.load(f)
1080
1081 save_notifier = self.notifier
1082 self.notifier = None
1083 for eid, uuid in state['engines'].iteritems():
1084 heart = uuid.encode('ascii')
1085 # start with this heart as current and beating:
1086 self.heartmonitor.responses.add(heart)
1087 self.heartmonitor.hearts.add(heart)
1088
1089 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1090 self.finish_registration(heart)
1091
1092 self.notifier = save_notifier
1093
1094 self._idcounter = state['next_id']
1095
1096 #-------------------------------------------------------------------------
1037 # Client Requests
1097 # Client Requests
1038 #-------------------------------------------------------------------------
1098 #-------------------------------------------------------------------------
1039
1099
@@ -1134,7 +1194,7 b' class Hub(SessionFactory):'
1134 except:
1194 except:
1135 reply = error.wrap_exception()
1195 reply = error.wrap_exception()
1136 break
1196 break
1137 uid = self.engines[eid].queue
1197 uid = self.engines[eid].uuid
1138 try:
1198 try:
1139 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1199 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1140 except Exception:
1200 except Exception:
@@ -189,6 +189,7 b' class TaskScheduler(SessionFactory):'
189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
189 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
190 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
191 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
192 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
192
193
193 # internals:
194 # internals:
194 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
195 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
@@ -216,6 +217,9 b' class TaskScheduler(SessionFactory):'
216 return self.session.bsession
217 return self.session.bsession
217
218
218 def start(self):
219 def start(self):
220 self.query_stream.on_recv(self.dispatch_query_reply)
221 self.session.send(self.query_stream, "connection_request", {})
222
219 self.engine_stream.on_recv(self.dispatch_result, copy=False)
223 self.engine_stream.on_recv(self.dispatch_result, copy=False)
220 self.client_stream.on_recv(self.dispatch_submission, copy=False)
224 self.client_stream.on_recv(self.dispatch_submission, copy=False)
221
225
@@ -240,6 +244,24 b' class TaskScheduler(SessionFactory):'
240 #-----------------------------------------------------------------------
244 #-----------------------------------------------------------------------
241 # [Un]Registration Handling
245 # [Un]Registration Handling
242 #-----------------------------------------------------------------------
246 #-----------------------------------------------------------------------
247
248
249 def dispatch_query_reply(self, msg):
250 """handle reply to our initial connection request"""
251 try:
252 idents,msg = self.session.feed_identities(msg)
253 except ValueError:
254 self.log.warn("task::Invalid Message: %r",msg)
255 return
256 try:
257 msg = self.session.unserialize(msg)
258 except ValueError:
259 self.log.warn("task::Unauthorized message from: %r"%idents)
260 return
261
262 content = msg['content']
263 for uuid in content.get('engines', {}).values():
264 self._register_engine(asbytes(uuid))
243
265
244
266
245 @util.log_errors
267 @util.log_errors
@@ -263,7 +285,7 b' class TaskScheduler(SessionFactory):'
263 self.log.error("Unhandled message type: %r"%msg_type)
285 self.log.error("Unhandled message type: %r"%msg_type)
264 else:
286 else:
265 try:
287 try:
266 handler(cast_bytes(msg['content']['queue']))
288 handler(cast_bytes(msg['content']['uuid']))
267 except Exception:
289 except Exception:
268 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
290 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
269
291
@@ -714,7 +736,7 b' class TaskScheduler(SessionFactory):'
714
736
715
737
716
738
717 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
739 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None,
718 logname='root', log_url=None, loglevel=logging.DEBUG,
740 logname='root', log_url=None, loglevel=logging.DEBUG,
719 identity=b'task', in_thread=False):
741 identity=b'task', in_thread=False):
720
742
@@ -734,18 +756,21 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
734 ctx = zmq.Context()
756 ctx = zmq.Context()
735 loop = ioloop.IOLoop()
757 loop = ioloop.IOLoop()
736 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
758 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
737 ins.setsockopt(zmq.IDENTITY, identity)
759 ins.setsockopt(zmq.IDENTITY, identity+'_in')
738 ins.bind(in_addr)
760 ins.bind(in_addr)
739
761
740 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
762 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
741 outs.setsockopt(zmq.IDENTITY, identity)
763 outs.setsockopt(zmq.IDENTITY, identity+'_out')
742 outs.bind(out_addr)
764 outs.bind(out_addr)
743 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
765 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
744 mons.connect(mon_addr)
766 mons.connect(mon_addr)
745 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
767 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
746 nots.setsockopt(zmq.SUBSCRIBE, b'')
768 nots.setsockopt(zmq.SUBSCRIBE, b'')
747 nots.connect(not_addr)
769 nots.connect(not_addr)
748
770
771 querys = ZMQStream(ctx.socket(zmq.DEALER),loop)
772 querys.connect(reg_addr)
773
749 # setup logging.
774 # setup logging.
750 if in_thread:
775 if in_thread:
751 log = Application.instance().log
776 log = Application.instance().log
@@ -757,6 +782,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,'
757
782
758 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
783 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
759 mon_stream=mons, notifier_stream=nots,
784 mon_stream=mons, notifier_stream=nots,
785 query_stream=querys,
760 loop=loop, log=log,
786 loop=loop, log=log,
761 config=config)
787 config=config)
762 scheduler.start()
788 scheduler.start()
@@ -129,7 +129,7 b' class EngineFactory(RegistrationFactory):'
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130
130
131
131
132 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
132 content = dict(uuid=self.ident)
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
134 # print (self.session.key)
134 # print (self.session.key)
135 self.session.send(self.registrar, "registration_request", content=content)
135 self.session.send(self.registrar, "registration_request", content=content)
@@ -43,9 +43,7 b' monitor the survival of the Engine process.'
43 Message type: ``registration_request``::
43 Message type: ``registration_request``::
44
44
45 content = {
45 content = {
46 'queue' : 'abcd-1234-...', # the MUX queue zmq.IDENTITY
46 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets
47 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY
48 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY
49 }
47 }
50
48
51 .. note::
49 .. note::
@@ -63,10 +61,6 b' Message type: ``registration_reply``::'
63 'status' : 'ok', # or 'error'
61 'status' : 'ok', # or 'error'
64 # if ok:
62 # if ok:
65 'id' : 0, # int, the engine id
63 'id' : 0, # int, the engine id
66 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue
67 'control' : 'tcp://...', # addr for control queue
68 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat
69 'task' : 'tcp://...', # addr for task queue, or None if no task queue running
70 }
64 }
71
65
72 Clients use the same socket as engines to start their connections. Connection requests
66 Clients use the same socket as engines to start their connections. Connection requests
@@ -84,11 +78,6 b' Message type: ``connection_reply``::'
84
78
85 content = {
79 content = {
86 'status' : 'ok', # or 'error'
80 'status' : 'ok', # or 'error'
87 # if ok:
88 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue
89 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple)
90 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc.
91 'control' : 'tcp...', # addr for control methods, like abort, etc.
92 }
81 }
93
82
94 Heartbeat
83 Heartbeat
@@ -110,13 +99,14 b' Message type: ``registration_notification``::'
110
99
111 content = {
100 content = {
112 'id' : 0, # engine ID that has been registered
101 'id' : 0, # engine ID that has been registered
113 'queue' : 'engine_id' # the IDENT for the engine's queue
102 'uuid' : 'engine_id' # the IDENT for the engine's sockets
114 }
103 }
115
104
116 Message type : ``unregistration_notification``::
105 Message type : ``unregistration_notification``::
117
106
118 content = {
107 content = {
119 'id' : 0 # engine ID that has been unregistered
108 'id' : 0 # engine ID that has been unregistered
109 'uuid' : 'engine_id' # the IDENT for the engine's sockets
120 }
110 }
121
111
122
112
General Comments 0
You need to be logged in to leave comments. Login now