Show More
@@ -116,7 +116,10 b' flags.update({' | |||
|
116 | 116 | select one of the true db backends. |
|
117 | 117 | """), |
|
118 | 118 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, |
|
119 | 'reuse existing json connection files') | |
|
119 | 'reuse existing json connection files'), | |
|
120 | 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, | |
|
121 | 'Attempt to restore engines from a JSON file. ' | |
|
122 | 'For use when resuming a crashed controller'), | |
|
120 | 123 | }) |
|
121 | 124 | |
|
122 | 125 | flags.update(session_flags) |
@@ -156,6 +159,10 b' class IPControllerApp(BaseParallelApplication):' | |||
|
156 | 159 | If False, connection files will be removed on a clean exit. |
|
157 | 160 | """ |
|
158 | 161 | ) |
|
162 | restore_engines = Bool(False, config=True, | |
|
163 | help="""Reload engine state from JSON file | |
|
164 | """ | |
|
165 | ) | |
|
159 | 166 | ssh_server = Unicode(u'', config=True, |
|
160 | 167 | help="""ssh url for clients to use when connecting to the Controller |
|
161 | 168 | processes. It should be of the form: [user@]server[:port]. The |
@@ -343,17 +350,24 b' class IPControllerApp(BaseParallelApplication):' | |||
|
343 | 350 | edict.update(base) |
|
344 | 351 | self.save_connection_dict(self.engine_json_file, edict) |
|
345 | 352 | |
|
353 | fname = "engines%s.json" % self.cluster_id | |
|
354 | self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) | |
|
355 | if self.restore_engines: | |
|
356 | self.factory.hub._load_engine_state() | |
|
357 | ||
|
346 | 358 | def init_schedulers(self): |
|
347 | 359 | children = self.children |
|
348 | 360 | mq = import_item(str(self.mq_class)) |
|
349 | 361 | |
|
350 | 362 | f = self.factory |
|
363 | ident = f.session.bsession | |
|
351 | 364 | # disambiguate url, in case of * |
|
352 | 365 | monitor_url = disambiguate_url(f.monitor_url) |
|
353 | 366 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url |
|
354 | 367 | # IOPub relay (in a Process) |
|
355 | 368 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') |
|
356 | 369 | q.bind_in(f.client_url('iopub')) |
|
370 | q.setsockopt_in(zmq.IDENTITY, ident+"_iopub") | |
|
357 | 371 | q.bind_out(f.engine_url('iopub')) |
|
358 | 372 | q.setsockopt_out(zmq.SUBSCRIBE, b'') |
|
359 | 373 | q.connect_mon(monitor_url) |
@@ -363,8 +377,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
363 | 377 | # Multiplexer Queue (in a Process) |
|
364 | 378 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
365 | 379 | q.bind_in(f.client_url('mux')) |
|
366 | q.setsockopt_in(zmq.IDENTITY, b'mux') | |
|
380 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') | |
|
367 | 381 | q.bind_out(f.engine_url('mux')) |
|
382 | q.setsockopt_out(zmq.IDENTITY, b'mux_out') | |
|
368 | 383 | q.connect_mon(monitor_url) |
|
369 | 384 | q.daemon=True |
|
370 | 385 | children.append(q) |
@@ -372,8 +387,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
372 | 387 | # Control Queue (in a Process) |
|
373 | 388 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') |
|
374 | 389 | q.bind_in(f.client_url('control')) |
|
375 | q.setsockopt_in(zmq.IDENTITY, b'control') | |
|
390 | q.setsockopt_in(zmq.IDENTITY, b'control_in') | |
|
376 | 391 | q.bind_out(f.engine_url('control')) |
|
392 | q.setsockopt_out(zmq.IDENTITY, b'control_out') | |
|
377 | 393 | q.connect_mon(monitor_url) |
|
378 | 394 | q.daemon=True |
|
379 | 395 | children.append(q) |
@@ -387,8 +403,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
387 | 403 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') |
|
388 | 404 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
389 | 405 | q.bind_in(f.client_url('task')) |
|
390 | q.setsockopt_in(zmq.IDENTITY, b'task') | |
|
406 | q.setsockopt_in(zmq.IDENTITY, b'task_in') | |
|
391 | 407 | q.bind_out(f.engine_url('task')) |
|
408 | q.setsockopt_out(zmq.IDENTITY, b'task_out') | |
|
392 | 409 | q.connect_mon(monitor_url) |
|
393 | 410 | q.daemon=True |
|
394 | 411 | children.append(q) |
@@ -398,7 +415,9 b' class IPControllerApp(BaseParallelApplication):' | |||
|
398 | 415 | else: |
|
399 | 416 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
400 | 417 | sargs = (f.client_url('task'), f.engine_url('task'), |
|
401 |
|
|
|
418 | monitor_url, disambiguate_url(f.client_url('notification')), | |
|
419 | disambiguate_url(f.client_url('registration')), | |
|
420 | ) | |
|
402 | 421 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
403 | 422 | log_url = self.log_url, config=dict(self.config)) |
|
404 | 423 | if 'Process' in self.mq_class: |
@@ -508,8 +508,9 b' class Client(HasTraits):' | |||
|
508 | 508 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
509 | 509 | for k,v in engines.iteritems(): |
|
510 | 510 | eid = int(k) |
|
511 | if eid not in self._engines: | |
|
512 | self._ids.append(eid) | |
|
511 | 513 | self._engines[eid] = v |
|
512 | self._ids.append(eid) | |
|
513 | 514 | self._ids = sorted(self._ids) |
|
514 | 515 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
515 | 516 | self._task_scheme == 'pure' and self._task_socket: |
@@ -652,7 +653,7 b' class Client(HasTraits):' | |||
|
652 | 653 | """Register a new engine, and update our connection info.""" |
|
653 | 654 | content = msg['content'] |
|
654 | 655 | eid = content['id'] |
|
655 |
d = {eid : content[' |
|
|
656 | d = {eid : content['uuid']} | |
|
656 | 657 | self._update_engines(d) |
|
657 | 658 | |
|
658 | 659 | def _unregister_engine(self, msg): |
@@ -18,6 +18,8 b' Authors:' | |||
|
18 | 18 | #----------------------------------------------------------------------------- |
|
19 | 19 | from __future__ import print_function |
|
20 | 20 | |
|
21 | import json | |
|
22 | import os | |
|
21 | 23 | import sys |
|
22 | 24 | import time |
|
23 | 25 | from datetime import datetime |
@@ -107,17 +109,16 b' class EngineConnector(HasTraits):' | |||
|
107 | 109 | """A simple object for accessing the various zmq connections of an object. |
|
108 | 110 | Attributes are: |
|
109 | 111 | id (int): engine ID |
|
110 | uuid (str): uuid (unused?) | |
|
111 | queue (str): identity of queue's DEALER socket | |
|
112 | registration (str): identity of registration DEALER socket | |
|
113 | heartbeat (str): identity of heartbeat DEALER socket | |
|
112 | uuid (unicode): engine UUID | |
|
113 | pending: set of msg_ids | |
|
114 | stallback: DelayedCallback for stalled registration | |
|
114 | 115 | """ |
|
115 | id=Integer(0) | |
|
116 | queue=CBytes() | |
|
117 | control=CBytes() | |
|
118 | registration=CBytes() | |
|
119 | heartbeat=CBytes() | |
|
120 | pending=Set() | |
|
116 | ||
|
117 | id = Integer(0) | |
|
118 | uuid = Unicode() | |
|
119 | pending = Set() | |
|
120 | stallback = Instance(ioloop.DelayedCallback) | |
|
121 | ||
|
121 | 122 | |
|
122 | 123 | _db_shortcuts = { |
|
123 | 124 | 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', |
@@ -349,6 +350,9 b' class Hub(SessionFactory):' | |||
|
349 | 350 | client_info: dict of zmq connection information for engines to connect |
|
350 | 351 | to the queues. |
|
351 | 352 | """ |
|
353 | ||
|
354 | engine_state_file = Unicode() | |
|
355 | ||
|
352 | 356 | # internal data structures: |
|
353 | 357 | ids=Set() # engine IDs |
|
354 | 358 | keytable=Dict() |
@@ -430,7 +434,7 b' class Hub(SessionFactory):' | |||
|
430 | 434 | self.resubmit.on_recv(lambda msg: None, copy=False) |
|
431 | 435 | |
|
432 | 436 | self.log.info("hub::created hub") |
|
433 | ||
|
437 | ||
|
434 | 438 | @property |
|
435 | 439 | def _next_id(self): |
|
436 | 440 | """gemerate a new ID. |
@@ -445,7 +449,7 b' class Hub(SessionFactory):' | |||
|
445 | 449 | # while newid in self.ids or newid in incoming: |
|
446 | 450 | # newid += 1 |
|
447 | 451 | # return newid |
|
448 | ||
|
452 | ||
|
449 | 453 | #----------------------------------------------------------------------------- |
|
450 | 454 | # message validation |
|
451 | 455 | #----------------------------------------------------------------------------- |
@@ -561,11 +565,11 b' class Hub(SessionFactory):' | |||
|
561 | 565 | triggers unregistration""" |
|
562 | 566 | self.log.debug("heartbeat::handle_heart_failure(%r)", heart) |
|
563 | 567 | eid = self.hearts.get(heart, None) |
|
564 |
|
|
|
568 | uuid = self.engines[eid].uuid | |
|
565 | 569 | if eid is None or self.keytable[eid] in self.dead_engines: |
|
566 | 570 | self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) |
|
567 | 571 | else: |
|
568 |
self.unregister_engine(heart, dict(content=dict(id=eid, queue= |
|
|
572 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) | |
|
569 | 573 | |
|
570 | 574 | #----------------------- MUX Queue Traffic ------------------------------ |
|
571 | 575 | |
@@ -873,7 +877,7 b' class Hub(SessionFactory):' | |||
|
873 | 877 | jsonable = {} |
|
874 | 878 | for k,v in self.keytable.iteritems(): |
|
875 | 879 | if v not in self.dead_engines: |
|
876 |
jsonable[str(k)] = v |
|
|
880 | jsonable[str(k)] = v | |
|
877 | 881 | content['engines'] = jsonable |
|
878 | 882 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
879 | 883 | |
@@ -881,47 +885,37 b' class Hub(SessionFactory):' | |||
|
881 | 885 | """Register a new engine.""" |
|
882 | 886 | content = msg['content'] |
|
883 | 887 | try: |
|
884 |
|
|
|
888 | uuid = content['uuid'] | |
|
885 | 889 | except KeyError: |
|
886 | 890 | self.log.error("registration::queue not specified", exc_info=True) |
|
887 | 891 | return |
|
888 | heart = content.get('heartbeat', None) | |
|
889 | if heart: | |
|
890 | heart = cast_bytes(heart) | |
|
891 | """register a new engine, and create the socket(s) necessary""" | |
|
892 | ||
|
892 | 893 | eid = self._next_id |
|
893 | # print (eid, queue, reg, heart) | |
|
894 | 894 | |
|
895 |
self.log.debug("registration::register_engine(%i, %r |
|
|
895 | self.log.debug("registration::register_engine(%i, %r)", eid, uuid) | |
|
896 | 896 | |
|
897 | 897 | content = dict(id=eid,status='ok') |
|
898 | 898 | # check if requesting available IDs: |
|
899 |
if |
|
|
899 | if uuid in self.by_ident: | |
|
900 | 900 | try: |
|
901 |
raise KeyError(" |
|
|
901 | raise KeyError("uuid %r in use" % uuid) | |
|
902 | 902 | except: |
|
903 | 903 | content = error.wrap_exception() |
|
904 |
self.log.error(" |
|
|
905 | elif heart in self.hearts: # need to check unique hearts? | |
|
906 | try: | |
|
907 | raise KeyError("heart_id %r in use" % heart) | |
|
908 | except: | |
|
909 | self.log.error("heart_id %r in use", heart, exc_info=True) | |
|
910 | content = error.wrap_exception() | |
|
904 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
|
911 | 905 | else: |
|
912 |
for h, |
|
|
913 |
if |
|
|
906 | for h, ec in self.incoming_registrations.iteritems(): | |
|
907 | if uuid == h: | |
|
914 | 908 | try: |
|
915 |
raise KeyError("heart_id %r in use" % |
|
|
909 | raise KeyError("heart_id %r in use" % uuid) | |
|
916 | 910 | except: |
|
917 |
self.log.error("heart_id %r in use", |
|
|
911 | self.log.error("heart_id %r in use", uuid, exc_info=True) | |
|
918 | 912 | content = error.wrap_exception() |
|
919 | 913 | break |
|
920 |
elif |
|
|
914 | elif uuid == ec.uuid: | |
|
921 | 915 | try: |
|
922 |
raise KeyError(" |
|
|
916 | raise KeyError("uuid %r in use" % uuid) | |
|
923 | 917 | except: |
|
924 |
self.log.error(" |
|
|
918 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
|
925 | 919 | content = error.wrap_exception() |
|
926 | 920 | break |
|
927 | 921 | |
@@ -929,18 +923,21 b' class Hub(SessionFactory):' | |||
|
929 | 923 | content=content, |
|
930 | 924 | ident=reg) |
|
931 | 925 | |
|
926 | heart = util.asbytes(uuid) | |
|
927 | ||
|
932 | 928 | if content['status'] == 'ok': |
|
933 | 929 | if heart in self.heartmonitor.hearts: |
|
934 | 930 | # already beating |
|
935 |
self.incoming_registrations[heart] = (eid, |
|
|
931 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) | |
|
936 | 932 | self.finish_registration(heart) |
|
937 | 933 | else: |
|
938 | 934 | purge = lambda : self._purge_stalled_registration(heart) |
|
939 | 935 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
940 | 936 | dc.start() |
|
941 |
self.incoming_registrations[heart] = (eid, |
|
|
937 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) | |
|
942 | 938 | else: |
|
943 | 939 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) |
|
940 | ||
|
944 | 941 | return eid |
|
945 | 942 | |
|
946 | 943 | def unregister_engine(self, ident, msg): |
@@ -953,7 +950,7 b' class Hub(SessionFactory):' | |||
|
953 | 950 | self.log.info("registration::unregister_engine(%r)", eid) |
|
954 | 951 | # print (eid) |
|
955 | 952 | uuid = self.keytable[eid] |
|
956 |
content=dict(id=eid, |
|
|
953 | content=dict(id=eid, uuid=uuid) | |
|
957 | 954 | self.dead_engines.add(uuid) |
|
958 | 955 | # self.ids.remove(eid) |
|
959 | 956 | # uuid = self.keytable.pop(eid) |
@@ -966,6 +963,8 b' class Hub(SessionFactory):' | |||
|
966 | 963 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) |
|
967 | 964 | dc.start() |
|
968 | 965 | ############## TODO: HANDLE IT ################ |
|
966 | ||
|
967 | self._save_engine_state() | |
|
969 | 968 | |
|
970 | 969 | if self.notifier: |
|
971 | 970 | self.session.send(self.notifier, "unregistration_notification", content=content) |
@@ -1004,36 +1003,97 b' class Hub(SessionFactory):' | |||
|
1004 | 1003 | """Second half of engine registration, called after our HeartMonitor |
|
1005 | 1004 | has received a beat from the Engine's Heart.""" |
|
1006 | 1005 | try: |
|
1007 |
|
|
|
1006 | ec = self.incoming_registrations.pop(heart) | |
|
1008 | 1007 | except KeyError: |
|
1009 | 1008 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) |
|
1010 | 1009 | return |
|
1011 |
self.log.info("registration::finished registering engine %i:% |
|
|
1012 |
if |
|
|
1013 |
|
|
|
1014 | control = queue | |
|
1010 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) | |
|
1011 | if ec.stallback is not None: | |
|
1012 | ec.stallback.stop() | |
|
1013 | eid = ec.id | |
|
1015 | 1014 | self.ids.add(eid) |
|
1016 |
self.keytable[eid] = |
|
|
1017 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, | |
|
1018 | control=control, heartbeat=heart) | |
|
1019 | self.by_ident[queue] = eid | |
|
1015 | self.keytable[eid] = ec.uuid | |
|
1016 | self.engines[eid] = ec | |
|
1017 | self.by_ident[ec.uuid] = ec.id | |
|
1020 | 1018 | self.queues[eid] = list() |
|
1021 | 1019 | self.tasks[eid] = list() |
|
1022 | 1020 | self.completed[eid] = list() |
|
1023 | 1021 | self.hearts[heart] = eid |
|
1024 |
content = dict(id=eid, |
|
|
1022 | content = dict(id=eid, uuid=self.engines[eid].uuid) | |
|
1025 | 1023 | if self.notifier: |
|
1026 | 1024 | self.session.send(self.notifier, "registration_notification", content=content) |
|
1027 | 1025 | self.log.info("engine::Engine Connected: %i", eid) |
|
1026 | ||
|
1027 | self._save_engine_state() | |
|
1028 | 1028 | |
|
1029 | 1029 | def _purge_stalled_registration(self, heart): |
|
1030 | 1030 | if heart in self.incoming_registrations: |
|
1031 |
e |
|
|
1032 | self.log.info("registration::purging stalled registration: %i", eid) | |
|
1031 | ec = self.incoming_registrations.pop(heart) | |
|
1032 | self.log.info("registration::purging stalled registration: %i", ec.id) | |
|
1033 | 1033 | else: |
|
1034 | 1034 | pass |
|
1035 | 1035 | |
|
1036 | 1036 | #------------------------------------------------------------------------- |
|
1037 | # Engine State | |
|
1038 | #------------------------------------------------------------------------- | |
|
1039 | ||
|
1040 | ||
|
1041 | def _cleanup_engine_state_file(self): | |
|
1042 | """cleanup engine state mapping""" | |
|
1043 | ||
|
1044 | if os.path.exists(self.engine_state_file): | |
|
1045 | self.log.debug("cleaning up engine state: %s", self.engine_state_file) | |
|
1046 | try: | |
|
1047 | os.remove(self.engine_state_file) | |
|
1048 | except IOError: | |
|
1049 | self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) | |
|
1050 | ||
|
1051 | ||
|
1052 | def _save_engine_state(self): | |
|
1053 | """save engine mapping to JSON file""" | |
|
1054 | if not self.engine_state_file: | |
|
1055 | return | |
|
1056 | self.log.debug("save engine state to %s" % self.engine_state_file) | |
|
1057 | state = {} | |
|
1058 | engines = {} | |
|
1059 | for eid, ec in self.engines.iteritems(): | |
|
1060 | if ec.uuid not in self.dead_engines: | |
|
1061 | engines[eid] = ec.uuid | |
|
1062 | ||
|
1063 | state['engines'] = engines | |
|
1064 | ||
|
1065 | state['next_id'] = self._idcounter | |
|
1066 | ||
|
1067 | with open(self.engine_state_file, 'w') as f: | |
|
1068 | json.dump(state, f) | |
|
1069 | ||
|
1070 | ||
|
1071 | def _load_engine_state(self): | |
|
1072 | """load engine mapping from JSON file""" | |
|
1073 | if not os.path.exists(self.engine_state_file): | |
|
1074 | return | |
|
1075 | ||
|
1076 | self.log.info("loading engine state from %s" % self.engine_state_file) | |
|
1077 | ||
|
1078 | with open(self.engine_state_file) as f: | |
|
1079 | state = json.load(f) | |
|
1080 | ||
|
1081 | save_notifier = self.notifier | |
|
1082 | self.notifier = None | |
|
1083 | for eid, uuid in state['engines'].iteritems(): | |
|
1084 | heart = uuid.encode('ascii') | |
|
1085 | # start with this heart as current and beating: | |
|
1086 | self.heartmonitor.responses.add(heart) | |
|
1087 | self.heartmonitor.hearts.add(heart) | |
|
1088 | ||
|
1089 | self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) | |
|
1090 | self.finish_registration(heart) | |
|
1091 | ||
|
1092 | self.notifier = save_notifier | |
|
1093 | ||
|
1094 | self._idcounter = state['next_id'] | |
|
1095 | ||
|
1096 | #------------------------------------------------------------------------- | |
|
1037 | 1097 | # Client Requests |
|
1038 | 1098 | #------------------------------------------------------------------------- |
|
1039 | 1099 | |
@@ -1134,7 +1194,7 b' class Hub(SessionFactory):' | |||
|
1134 | 1194 | except: |
|
1135 | 1195 | reply = error.wrap_exception() |
|
1136 | 1196 | break |
|
1137 |
uid = self.engines[eid]. |
|
|
1197 | uid = self.engines[eid].uuid | |
|
1138 | 1198 | try: |
|
1139 | 1199 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1140 | 1200 | except Exception: |
@@ -189,6 +189,7 b' class TaskScheduler(SessionFactory):' | |||
|
189 | 189 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
190 | 190 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
|
191 | 191 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream |
|
192 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | |
|
192 | 193 | |
|
193 | 194 | # internals: |
|
194 | 195 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
@@ -216,6 +217,9 b' class TaskScheduler(SessionFactory):' | |||
|
216 | 217 | return self.session.bsession |
|
217 | 218 | |
|
218 | 219 | def start(self): |
|
220 | self.query_stream.on_recv(self.dispatch_query_reply) | |
|
221 | self.session.send(self.query_stream, "connection_request", {}) | |
|
222 | ||
|
219 | 223 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
220 | 224 | self.client_stream.on_recv(self.dispatch_submission, copy=False) |
|
221 | 225 | |
@@ -240,6 +244,24 b' class TaskScheduler(SessionFactory):' | |||
|
240 | 244 | #----------------------------------------------------------------------- |
|
241 | 245 | # [Un]Registration Handling |
|
242 | 246 | #----------------------------------------------------------------------- |
|
247 | ||
|
248 | ||
|
249 | def dispatch_query_reply(self, msg): | |
|
250 | """handle reply to our initial connection request""" | |
|
251 | try: | |
|
252 | idents,msg = self.session.feed_identities(msg) | |
|
253 | except ValueError: | |
|
254 | self.log.warn("task::Invalid Message: %r",msg) | |
|
255 | return | |
|
256 | try: | |
|
257 | msg = self.session.unserialize(msg) | |
|
258 | except ValueError: | |
|
259 | self.log.warn("task::Unauthorized message from: %r"%idents) | |
|
260 | return | |
|
261 | ||
|
262 | content = msg['content'] | |
|
263 | for uuid in content.get('engines', {}).values(): | |
|
264 | self._register_engine(asbytes(uuid)) | |
|
243 | 265 | |
|
244 | 266 | |
|
245 | 267 | @util.log_errors |
@@ -263,7 +285,7 b' class TaskScheduler(SessionFactory):' | |||
|
263 | 285 | self.log.error("Unhandled message type: %r"%msg_type) |
|
264 | 286 | else: |
|
265 | 287 | try: |
|
266 |
handler(cast_bytes(msg['content'][' |
|
|
288 | handler(cast_bytes(msg['content']['uuid'])) | |
|
267 | 289 | except Exception: |
|
268 | 290 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
269 | 291 | |
@@ -714,7 +736,7 b' class TaskScheduler(SessionFactory):' | |||
|
714 | 736 | |
|
715 | 737 | |
|
716 | 738 | |
|
717 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | |
|
739 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, | |
|
718 | 740 | logname='root', log_url=None, loglevel=logging.DEBUG, |
|
719 | 741 | identity=b'task', in_thread=False): |
|
720 | 742 | |
@@ -734,18 +756,21 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||
|
734 | 756 | ctx = zmq.Context() |
|
735 | 757 | loop = ioloop.IOLoop() |
|
736 | 758 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
737 | ins.setsockopt(zmq.IDENTITY, identity) | |
|
759 | ins.setsockopt(zmq.IDENTITY, identity+'_in') | |
|
738 | 760 | ins.bind(in_addr) |
|
739 | 761 | |
|
740 | 762 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
741 | outs.setsockopt(zmq.IDENTITY, identity) | |
|
763 | outs.setsockopt(zmq.IDENTITY, identity+'_out') | |
|
742 | 764 | outs.bind(out_addr) |
|
743 | 765 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) |
|
744 | 766 | mons.connect(mon_addr) |
|
745 | 767 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) |
|
746 | 768 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
|
747 | 769 | nots.connect(not_addr) |
|
748 | ||
|
770 | ||
|
771 | querys = ZMQStream(ctx.socket(zmq.DEALER),loop) | |
|
772 | querys.connect(reg_addr) | |
|
773 | ||
|
749 | 774 | # setup logging. |
|
750 | 775 | if in_thread: |
|
751 | 776 | log = Application.instance().log |
@@ -757,6 +782,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||
|
757 | 782 | |
|
758 | 783 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
759 | 784 | mon_stream=mons, notifier_stream=nots, |
|
785 | query_stream=querys, | |
|
760 | 786 | loop=loop, log=log, |
|
761 | 787 | config=config) |
|
762 | 788 | scheduler.start() |
@@ -129,7 +129,7 b' class EngineFactory(RegistrationFactory):' | |||
|
129 | 129 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
130 | 130 | |
|
131 | 131 | |
|
132 |
content = dict( |
|
|
132 | content = dict(uuid=self.ident) | |
|
133 | 133 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) |
|
134 | 134 | # print (self.session.key) |
|
135 | 135 | self.session.send(self.registrar, "registration_request", content=content) |
@@ -43,9 +43,7 b' monitor the survival of the Engine process.' | |||
|
43 | 43 | Message type: ``registration_request``:: |
|
44 | 44 | |
|
45 | 45 | content = { |
|
46 |
' |
|
|
47 | 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY | |
|
48 | 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY | |
|
46 | 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets | |
|
49 | 47 | } |
|
50 | 48 | |
|
51 | 49 | .. note:: |
@@ -63,10 +61,6 b' Message type: ``registration_reply``::' | |||
|
63 | 61 | 'status' : 'ok', # or 'error' |
|
64 | 62 | # if ok: |
|
65 | 63 | 'id' : 0, # int, the engine id |
|
66 | 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue | |
|
67 | 'control' : 'tcp://...', # addr for control queue | |
|
68 | 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat | |
|
69 | 'task' : 'tcp://...', # addr for task queue, or None if no task queue running | |
|
70 | 64 | } |
|
71 | 65 | |
|
72 | 66 | Clients use the same socket as engines to start their connections. Connection requests |
@@ -84,11 +78,6 b' Message type: ``connection_reply``::' | |||
|
84 | 78 | |
|
85 | 79 | content = { |
|
86 | 80 | 'status' : 'ok', # or 'error' |
|
87 | # if ok: | |
|
88 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue | |
|
89 | 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) | |
|
90 | 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. | |
|
91 | 'control' : 'tcp...', # addr for control methods, like abort, etc. | |
|
92 | 81 | } |
|
93 | 82 | |
|
94 | 83 | Heartbeat |
@@ -110,13 +99,14 b' Message type: ``registration_notification``::' | |||
|
110 | 99 | |
|
111 | 100 | content = { |
|
112 | 101 | 'id' : 0, # engine ID that has been registered |
|
113 |
' |
|
|
102 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |
|
114 | 103 | } |
|
115 | 104 | |
|
116 | 105 | Message type : ``unregistration_notification``:: |
|
117 | 106 | |
|
118 | 107 | content = { |
|
119 | 108 | 'id' : 0 # engine ID that has been unregistered |
|
109 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |
|
120 | 110 | } |
|
121 | 111 | |
|
122 | 112 |
General Comments 0
You need to be logged in to leave comments.
Login now