Show More
@@ -116,7 +116,10 b' flags.update({' | |||||
116 | select one of the true db backends. |
|
116 | select one of the true db backends. | |
117 | """), |
|
117 | """), | |
118 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, |
|
118 | 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}}, | |
119 | 'reuse existing json connection files') |
|
119 | 'reuse existing json connection files'), | |
|
120 | 'restore' : ({'IPControllerApp' : {'restore_engines' : True, 'reuse_files' : True}}, | |||
|
121 | 'Attempt to restore engines from a JSON file. ' | |||
|
122 | 'For use when resuming a crashed controller'), | |||
120 | }) |
|
123 | }) | |
121 |
|
124 | |||
122 | flags.update(session_flags) |
|
125 | flags.update(session_flags) | |
@@ -156,6 +159,10 b' class IPControllerApp(BaseParallelApplication):' | |||||
156 | If False, connection files will be removed on a clean exit. |
|
159 | If False, connection files will be removed on a clean exit. | |
157 | """ |
|
160 | """ | |
158 | ) |
|
161 | ) | |
|
162 | restore_engines = Bool(False, config=True, | |||
|
163 | help="""Reload engine state from JSON file | |||
|
164 | """ | |||
|
165 | ) | |||
159 | ssh_server = Unicode(u'', config=True, |
|
166 | ssh_server = Unicode(u'', config=True, | |
160 | help="""ssh url for clients to use when connecting to the Controller |
|
167 | help="""ssh url for clients to use when connecting to the Controller | |
161 | processes. It should be of the form: [user@]server[:port]. The |
|
168 | processes. It should be of the form: [user@]server[:port]. The | |
@@ -343,17 +350,24 b' class IPControllerApp(BaseParallelApplication):' | |||||
343 | edict.update(base) |
|
350 | edict.update(base) | |
344 | self.save_connection_dict(self.engine_json_file, edict) |
|
351 | self.save_connection_dict(self.engine_json_file, edict) | |
345 |
|
352 | |||
|
353 | fname = "engines%s.json" % self.cluster_id | |||
|
354 | self.factory.hub.engine_state_file = os.path.join(self.profile_dir.log_dir, fname) | |||
|
355 | if self.restore_engines: | |||
|
356 | self.factory.hub._load_engine_state() | |||
|
357 | ||||
346 | def init_schedulers(self): |
|
358 | def init_schedulers(self): | |
347 | children = self.children |
|
359 | children = self.children | |
348 | mq = import_item(str(self.mq_class)) |
|
360 | mq = import_item(str(self.mq_class)) | |
349 |
|
361 | |||
350 | f = self.factory |
|
362 | f = self.factory | |
|
363 | ident = f.session.bsession | |||
351 | # disambiguate url, in case of * |
|
364 | # disambiguate url, in case of * | |
352 | monitor_url = disambiguate_url(f.monitor_url) |
|
365 | monitor_url = disambiguate_url(f.monitor_url) | |
353 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url |
|
366 | # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url | |
354 | # IOPub relay (in a Process) |
|
367 | # IOPub relay (in a Process) | |
355 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') |
|
368 | q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub') | |
356 | q.bind_in(f.client_url('iopub')) |
|
369 | q.bind_in(f.client_url('iopub')) | |
|
370 | q.setsockopt_in(zmq.IDENTITY, ident+"_iopub") | |||
357 | q.bind_out(f.engine_url('iopub')) |
|
371 | q.bind_out(f.engine_url('iopub')) | |
358 | q.setsockopt_out(zmq.SUBSCRIBE, b'') |
|
372 | q.setsockopt_out(zmq.SUBSCRIBE, b'') | |
359 | q.connect_mon(monitor_url) |
|
373 | q.connect_mon(monitor_url) | |
@@ -363,8 +377,9 b' class IPControllerApp(BaseParallelApplication):' | |||||
363 | # Multiplexer Queue (in a Process) |
|
377 | # Multiplexer Queue (in a Process) | |
364 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') |
|
378 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out') | |
365 | q.bind_in(f.client_url('mux')) |
|
379 | q.bind_in(f.client_url('mux')) | |
366 | q.setsockopt_in(zmq.IDENTITY, b'mux') |
|
380 | q.setsockopt_in(zmq.IDENTITY, b'mux_in') | |
367 | q.bind_out(f.engine_url('mux')) |
|
381 | q.bind_out(f.engine_url('mux')) | |
|
382 | q.setsockopt_out(zmq.IDENTITY, b'mux_out') | |||
368 | q.connect_mon(monitor_url) |
|
383 | q.connect_mon(monitor_url) | |
369 | q.daemon=True |
|
384 | q.daemon=True | |
370 | children.append(q) |
|
385 | children.append(q) | |
@@ -372,8 +387,9 b' class IPControllerApp(BaseParallelApplication):' | |||||
372 | # Control Queue (in a Process) |
|
387 | # Control Queue (in a Process) | |
373 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') |
|
388 | q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol') | |
374 | q.bind_in(f.client_url('control')) |
|
389 | q.bind_in(f.client_url('control')) | |
375 | q.setsockopt_in(zmq.IDENTITY, b'control') |
|
390 | q.setsockopt_in(zmq.IDENTITY, b'control_in') | |
376 | q.bind_out(f.engine_url('control')) |
|
391 | q.bind_out(f.engine_url('control')) | |
|
392 | q.setsockopt_out(zmq.IDENTITY, b'control_out') | |||
377 | q.connect_mon(monitor_url) |
|
393 | q.connect_mon(monitor_url) | |
378 | q.daemon=True |
|
394 | q.daemon=True | |
379 | children.append(q) |
|
395 | children.append(q) | |
@@ -387,8 +403,9 b' class IPControllerApp(BaseParallelApplication):' | |||||
387 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') |
|
403 | q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask') | |
388 | # q.setsockopt_out(zmq.HWM, hub.hwm) |
|
404 | # q.setsockopt_out(zmq.HWM, hub.hwm) | |
389 | q.bind_in(f.client_url('task')) |
|
405 | q.bind_in(f.client_url('task')) | |
390 | q.setsockopt_in(zmq.IDENTITY, b'task') |
|
406 | q.setsockopt_in(zmq.IDENTITY, b'task_in') | |
391 | q.bind_out(f.engine_url('task')) |
|
407 | q.bind_out(f.engine_url('task')) | |
|
408 | q.setsockopt_out(zmq.IDENTITY, b'task_out') | |||
392 | q.connect_mon(monitor_url) |
|
409 | q.connect_mon(monitor_url) | |
393 | q.daemon=True |
|
410 | q.daemon=True | |
394 | children.append(q) |
|
411 | children.append(q) | |
@@ -398,7 +415,9 b' class IPControllerApp(BaseParallelApplication):' | |||||
398 | else: |
|
415 | else: | |
399 | self.log.info("task::using Python %s Task scheduler"%scheme) |
|
416 | self.log.info("task::using Python %s Task scheduler"%scheme) | |
400 | sargs = (f.client_url('task'), f.engine_url('task'), |
|
417 | sargs = (f.client_url('task'), f.engine_url('task'), | |
401 |
|
|
418 | monitor_url, disambiguate_url(f.client_url('notification')), | |
|
419 | disambiguate_url(f.client_url('registration')), | |||
|
420 | ) | |||
402 | kwargs = dict(logname='scheduler', loglevel=self.log_level, |
|
421 | kwargs = dict(logname='scheduler', loglevel=self.log_level, | |
403 | log_url = self.log_url, config=dict(self.config)) |
|
422 | log_url = self.log_url, config=dict(self.config)) | |
404 | if 'Process' in self.mq_class: |
|
423 | if 'Process' in self.mq_class: |
@@ -508,8 +508,9 b' class Client(HasTraits):' | |||||
508 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" |
|
508 | """Update our engines dict and _ids from a dict of the form: {id:uuid}.""" | |
509 | for k,v in engines.iteritems(): |
|
509 | for k,v in engines.iteritems(): | |
510 | eid = int(k) |
|
510 | eid = int(k) | |
|
511 | if eid not in self._engines: | |||
|
512 | self._ids.append(eid) | |||
511 | self._engines[eid] = v |
|
513 | self._engines[eid] = v | |
512 | self._ids.append(eid) |
|
|||
513 | self._ids = sorted(self._ids) |
|
514 | self._ids = sorted(self._ids) | |
514 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ |
|
515 | if sorted(self._engines.keys()) != range(len(self._engines)) and \ | |
515 | self._task_scheme == 'pure' and self._task_socket: |
|
516 | self._task_scheme == 'pure' and self._task_socket: | |
@@ -652,7 +653,7 b' class Client(HasTraits):' | |||||
652 | """Register a new engine, and update our connection info.""" |
|
653 | """Register a new engine, and update our connection info.""" | |
653 | content = msg['content'] |
|
654 | content = msg['content'] | |
654 | eid = content['id'] |
|
655 | eid = content['id'] | |
655 |
d = {eid : content[' |
|
656 | d = {eid : content['uuid']} | |
656 | self._update_engines(d) |
|
657 | self._update_engines(d) | |
657 |
|
658 | |||
658 | def _unregister_engine(self, msg): |
|
659 | def _unregister_engine(self, msg): |
@@ -18,6 +18,8 b' Authors:' | |||||
18 | #----------------------------------------------------------------------------- |
|
18 | #----------------------------------------------------------------------------- | |
19 | from __future__ import print_function |
|
19 | from __future__ import print_function | |
20 |
|
20 | |||
|
21 | import json | |||
|
22 | import os | |||
21 | import sys |
|
23 | import sys | |
22 | import time |
|
24 | import time | |
23 | from datetime import datetime |
|
25 | from datetime import datetime | |
@@ -107,17 +109,16 b' class EngineConnector(HasTraits):' | |||||
107 | """A simple object for accessing the various zmq connections of an object. |
|
109 | """A simple object for accessing the various zmq connections of an object. | |
108 | Attributes are: |
|
110 | Attributes are: | |
109 | id (int): engine ID |
|
111 | id (int): engine ID | |
110 | uuid (str): uuid (unused?) |
|
112 | uuid (unicode): engine UUID | |
111 | queue (str): identity of queue's DEALER socket |
|
113 | pending: set of msg_ids | |
112 | registration (str): identity of registration DEALER socket |
|
114 | stallback: DelayedCallback for stalled registration | |
113 | heartbeat (str): identity of heartbeat DEALER socket |
|
|||
114 | """ |
|
115 | """ | |
115 | id=Integer(0) |
|
116 | ||
116 | queue=CBytes() |
|
117 | id = Integer(0) | |
117 | control=CBytes() |
|
118 | uuid = Unicode() | |
118 | registration=CBytes() |
|
119 | pending = Set() | |
119 | heartbeat=CBytes() |
|
120 | stallback = Instance(ioloop.DelayedCallback) | |
120 | pending=Set() |
|
121 | ||
121 |
|
122 | |||
122 | _db_shortcuts = { |
|
123 | _db_shortcuts = { | |
123 | 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', |
|
124 | 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB', | |
@@ -349,6 +350,9 b' class Hub(SessionFactory):' | |||||
349 | client_info: dict of zmq connection information for engines to connect |
|
350 | client_info: dict of zmq connection information for engines to connect | |
350 | to the queues. |
|
351 | to the queues. | |
351 | """ |
|
352 | """ | |
|
353 | ||||
|
354 | engine_state_file = Unicode() | |||
|
355 | ||||
352 | # internal data structures: |
|
356 | # internal data structures: | |
353 | ids=Set() # engine IDs |
|
357 | ids=Set() # engine IDs | |
354 | keytable=Dict() |
|
358 | keytable=Dict() | |
@@ -430,7 +434,7 b' class Hub(SessionFactory):' | |||||
430 | self.resubmit.on_recv(lambda msg: None, copy=False) |
|
434 | self.resubmit.on_recv(lambda msg: None, copy=False) | |
431 |
|
435 | |||
432 | self.log.info("hub::created hub") |
|
436 | self.log.info("hub::created hub") | |
433 |
|
437 | |||
434 | @property |
|
438 | @property | |
435 | def _next_id(self): |
|
439 | def _next_id(self): | |
436 | """gemerate a new ID. |
|
440 | """gemerate a new ID. | |
@@ -445,7 +449,7 b' class Hub(SessionFactory):' | |||||
445 | # while newid in self.ids or newid in incoming: |
|
449 | # while newid in self.ids or newid in incoming: | |
446 | # newid += 1 |
|
450 | # newid += 1 | |
447 | # return newid |
|
451 | # return newid | |
448 |
|
452 | |||
449 | #----------------------------------------------------------------------------- |
|
453 | #----------------------------------------------------------------------------- | |
450 | # message validation |
|
454 | # message validation | |
451 | #----------------------------------------------------------------------------- |
|
455 | #----------------------------------------------------------------------------- | |
@@ -561,11 +565,11 b' class Hub(SessionFactory):' | |||||
561 | triggers unregistration""" |
|
565 | triggers unregistration""" | |
562 | self.log.debug("heartbeat::handle_heart_failure(%r)", heart) |
|
566 | self.log.debug("heartbeat::handle_heart_failure(%r)", heart) | |
563 | eid = self.hearts.get(heart, None) |
|
567 | eid = self.hearts.get(heart, None) | |
564 |
|
|
568 | uuid = self.engines[eid].uuid | |
565 | if eid is None or self.keytable[eid] in self.dead_engines: |
|
569 | if eid is None or self.keytable[eid] in self.dead_engines: | |
566 | self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) |
|
570 | self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart) | |
567 | else: |
|
571 | else: | |
568 |
self.unregister_engine(heart, dict(content=dict(id=eid, queue= |
|
572 | self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid))) | |
569 |
|
573 | |||
570 | #----------------------- MUX Queue Traffic ------------------------------ |
|
574 | #----------------------- MUX Queue Traffic ------------------------------ | |
571 |
|
575 | |||
@@ -873,7 +877,7 b' class Hub(SessionFactory):' | |||||
873 | jsonable = {} |
|
877 | jsonable = {} | |
874 | for k,v in self.keytable.iteritems(): |
|
878 | for k,v in self.keytable.iteritems(): | |
875 | if v not in self.dead_engines: |
|
879 | if v not in self.dead_engines: | |
876 |
jsonable[str(k)] = v |
|
880 | jsonable[str(k)] = v | |
877 | content['engines'] = jsonable |
|
881 | content['engines'] = jsonable | |
878 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
882 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
879 |
|
883 | |||
@@ -881,47 +885,37 b' class Hub(SessionFactory):' | |||||
881 | """Register a new engine.""" |
|
885 | """Register a new engine.""" | |
882 | content = msg['content'] |
|
886 | content = msg['content'] | |
883 | try: |
|
887 | try: | |
884 |
|
|
888 | uuid = content['uuid'] | |
885 | except KeyError: |
|
889 | except KeyError: | |
886 | self.log.error("registration::queue not specified", exc_info=True) |
|
890 | self.log.error("registration::queue not specified", exc_info=True) | |
887 | return |
|
891 | return | |
888 | heart = content.get('heartbeat', None) |
|
892 | ||
889 | if heart: |
|
|||
890 | heart = cast_bytes(heart) |
|
|||
891 | """register a new engine, and create the socket(s) necessary""" |
|
|||
892 | eid = self._next_id |
|
893 | eid = self._next_id | |
893 | # print (eid, queue, reg, heart) |
|
|||
894 |
|
894 | |||
895 |
self.log.debug("registration::register_engine(%i, %r |
|
895 | self.log.debug("registration::register_engine(%i, %r)", eid, uuid) | |
896 |
|
896 | |||
897 | content = dict(id=eid,status='ok') |
|
897 | content = dict(id=eid,status='ok') | |
898 | # check if requesting available IDs: |
|
898 | # check if requesting available IDs: | |
899 |
if |
|
899 | if uuid in self.by_ident: | |
900 | try: |
|
900 | try: | |
901 |
raise KeyError(" |
|
901 | raise KeyError("uuid %r in use" % uuid) | |
902 | except: |
|
902 | except: | |
903 | content = error.wrap_exception() |
|
903 | content = error.wrap_exception() | |
904 |
self.log.error(" |
|
904 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
905 | elif heart in self.hearts: # need to check unique hearts? |
|
|||
906 | try: |
|
|||
907 | raise KeyError("heart_id %r in use" % heart) |
|
|||
908 | except: |
|
|||
909 | self.log.error("heart_id %r in use", heart, exc_info=True) |
|
|||
910 | content = error.wrap_exception() |
|
|||
911 | else: |
|
905 | else: | |
912 |
for h, |
|
906 | for h, ec in self.incoming_registrations.iteritems(): | |
913 |
if |
|
907 | if uuid == h: | |
914 | try: |
|
908 | try: | |
915 |
raise KeyError("heart_id %r in use" % |
|
909 | raise KeyError("heart_id %r in use" % uuid) | |
916 | except: |
|
910 | except: | |
917 |
self.log.error("heart_id %r in use", |
|
911 | self.log.error("heart_id %r in use", uuid, exc_info=True) | |
918 | content = error.wrap_exception() |
|
912 | content = error.wrap_exception() | |
919 | break |
|
913 | break | |
920 |
elif |
|
914 | elif uuid == ec.uuid: | |
921 | try: |
|
915 | try: | |
922 |
raise KeyError(" |
|
916 | raise KeyError("uuid %r in use" % uuid) | |
923 | except: |
|
917 | except: | |
924 |
self.log.error(" |
|
918 | self.log.error("uuid %r in use", uuid, exc_info=True) | |
925 | content = error.wrap_exception() |
|
919 | content = error.wrap_exception() | |
926 | break |
|
920 | break | |
927 |
|
921 | |||
@@ -929,18 +923,21 b' class Hub(SessionFactory):' | |||||
929 | content=content, |
|
923 | content=content, | |
930 | ident=reg) |
|
924 | ident=reg) | |
931 |
|
925 | |||
|
926 | heart = util.asbytes(uuid) | |||
|
927 | ||||
932 | if content['status'] == 'ok': |
|
928 | if content['status'] == 'ok': | |
933 | if heart in self.heartmonitor.hearts: |
|
929 | if heart in self.heartmonitor.hearts: | |
934 | # already beating |
|
930 | # already beating | |
935 |
self.incoming_registrations[heart] = (eid, |
|
931 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid) | |
936 | self.finish_registration(heart) |
|
932 | self.finish_registration(heart) | |
937 | else: |
|
933 | else: | |
938 | purge = lambda : self._purge_stalled_registration(heart) |
|
934 | purge = lambda : self._purge_stalled_registration(heart) | |
939 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
935 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |
940 | dc.start() |
|
936 | dc.start() | |
941 |
self.incoming_registrations[heart] = (eid, |
|
937 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) | |
942 | else: |
|
938 | else: | |
943 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) |
|
939 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) | |
|
940 | ||||
944 | return eid |
|
941 | return eid | |
945 |
|
942 | |||
946 | def unregister_engine(self, ident, msg): |
|
943 | def unregister_engine(self, ident, msg): | |
@@ -953,7 +950,7 b' class Hub(SessionFactory):' | |||||
953 | self.log.info("registration::unregister_engine(%r)", eid) |
|
950 | self.log.info("registration::unregister_engine(%r)", eid) | |
954 | # print (eid) |
|
951 | # print (eid) | |
955 | uuid = self.keytable[eid] |
|
952 | uuid = self.keytable[eid] | |
956 |
content=dict(id=eid, |
|
953 | content=dict(id=eid, uuid=uuid) | |
957 | self.dead_engines.add(uuid) |
|
954 | self.dead_engines.add(uuid) | |
958 | # self.ids.remove(eid) |
|
955 | # self.ids.remove(eid) | |
959 | # uuid = self.keytable.pop(eid) |
|
956 | # uuid = self.keytable.pop(eid) | |
@@ -966,6 +963,8 b' class Hub(SessionFactory):' | |||||
966 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) |
|
963 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) | |
967 | dc.start() |
|
964 | dc.start() | |
968 | ############## TODO: HANDLE IT ################ |
|
965 | ############## TODO: HANDLE IT ################ | |
|
966 | ||||
|
967 | self._save_engine_state() | |||
969 |
|
968 | |||
970 | if self.notifier: |
|
969 | if self.notifier: | |
971 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
970 | self.session.send(self.notifier, "unregistration_notification", content=content) | |
@@ -1004,36 +1003,97 b' class Hub(SessionFactory):' | |||||
1004 | """Second half of engine registration, called after our HeartMonitor |
|
1003 | """Second half of engine registration, called after our HeartMonitor | |
1005 | has received a beat from the Engine's Heart.""" |
|
1004 | has received a beat from the Engine's Heart.""" | |
1006 | try: |
|
1005 | try: | |
1007 |
|
|
1006 | ec = self.incoming_registrations.pop(heart) | |
1008 | except KeyError: |
|
1007 | except KeyError: | |
1009 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) |
|
1008 | self.log.error("registration::tried to finish nonexistant registration", exc_info=True) | |
1010 | return |
|
1009 | return | |
1011 |
self.log.info("registration::finished registering engine %i:% |
|
1010 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) | |
1012 |
if |
|
1011 | if ec.stallback is not None: | |
1013 |
|
|
1012 | ec.stallback.stop() | |
1014 | control = queue |
|
1013 | eid = ec.id | |
1015 | self.ids.add(eid) |
|
1014 | self.ids.add(eid) | |
1016 |
self.keytable[eid] = |
|
1015 | self.keytable[eid] = ec.uuid | |
1017 | self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg, |
|
1016 | self.engines[eid] = ec | |
1018 | control=control, heartbeat=heart) |
|
1017 | self.by_ident[ec.uuid] = ec.id | |
1019 | self.by_ident[queue] = eid |
|
|||
1020 | self.queues[eid] = list() |
|
1018 | self.queues[eid] = list() | |
1021 | self.tasks[eid] = list() |
|
1019 | self.tasks[eid] = list() | |
1022 | self.completed[eid] = list() |
|
1020 | self.completed[eid] = list() | |
1023 | self.hearts[heart] = eid |
|
1021 | self.hearts[heart] = eid | |
1024 |
content = dict(id=eid, |
|
1022 | content = dict(id=eid, uuid=self.engines[eid].uuid) | |
1025 | if self.notifier: |
|
1023 | if self.notifier: | |
1026 | self.session.send(self.notifier, "registration_notification", content=content) |
|
1024 | self.session.send(self.notifier, "registration_notification", content=content) | |
1027 | self.log.info("engine::Engine Connected: %i", eid) |
|
1025 | self.log.info("engine::Engine Connected: %i", eid) | |
|
1026 | ||||
|
1027 | self._save_engine_state() | |||
1028 |
|
1028 | |||
1029 | def _purge_stalled_registration(self, heart): |
|
1029 | def _purge_stalled_registration(self, heart): | |
1030 | if heart in self.incoming_registrations: |
|
1030 | if heart in self.incoming_registrations: | |
1031 |
e |
|
1031 | ec = self.incoming_registrations.pop(heart) | |
1032 | self.log.info("registration::purging stalled registration: %i", eid) |
|
1032 | self.log.info("registration::purging stalled registration: %i", ec.id) | |
1033 | else: |
|
1033 | else: | |
1034 | pass |
|
1034 | pass | |
1035 |
|
1035 | |||
1036 | #------------------------------------------------------------------------- |
|
1036 | #------------------------------------------------------------------------- | |
|
1037 | # Engine State | |||
|
1038 | #------------------------------------------------------------------------- | |||
|
1039 | ||||
|
1040 | ||||
|
1041 | def _cleanup_engine_state_file(self): | |||
|
1042 | """cleanup engine state mapping""" | |||
|
1043 | ||||
|
1044 | if os.path.exists(self.engine_state_file): | |||
|
1045 | self.log.debug("cleaning up engine state: %s", self.engine_state_file) | |||
|
1046 | try: | |||
|
1047 | os.remove(self.engine_state_file) | |||
|
1048 | except IOError: | |||
|
1049 | self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True) | |||
|
1050 | ||||
|
1051 | ||||
|
1052 | def _save_engine_state(self): | |||
|
1053 | """save engine mapping to JSON file""" | |||
|
1054 | if not self.engine_state_file: | |||
|
1055 | return | |||
|
1056 | self.log.debug("save engine state to %s" % self.engine_state_file) | |||
|
1057 | state = {} | |||
|
1058 | engines = {} | |||
|
1059 | for eid, ec in self.engines.iteritems(): | |||
|
1060 | if ec.uuid not in self.dead_engines: | |||
|
1061 | engines[eid] = ec.uuid | |||
|
1062 | ||||
|
1063 | state['engines'] = engines | |||
|
1064 | ||||
|
1065 | state['next_id'] = self._idcounter | |||
|
1066 | ||||
|
1067 | with open(self.engine_state_file, 'w') as f: | |||
|
1068 | json.dump(state, f) | |||
|
1069 | ||||
|
1070 | ||||
|
1071 | def _load_engine_state(self): | |||
|
1072 | """load engine mapping from JSON file""" | |||
|
1073 | if not os.path.exists(self.engine_state_file): | |||
|
1074 | return | |||
|
1075 | ||||
|
1076 | self.log.info("loading engine state from %s" % self.engine_state_file) | |||
|
1077 | ||||
|
1078 | with open(self.engine_state_file) as f: | |||
|
1079 | state = json.load(f) | |||
|
1080 | ||||
|
1081 | save_notifier = self.notifier | |||
|
1082 | self.notifier = None | |||
|
1083 | for eid, uuid in state['engines'].iteritems(): | |||
|
1084 | heart = uuid.encode('ascii') | |||
|
1085 | # start with this heart as current and beating: | |||
|
1086 | self.heartmonitor.responses.add(heart) | |||
|
1087 | self.heartmonitor.hearts.add(heart) | |||
|
1088 | ||||
|
1089 | self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid) | |||
|
1090 | self.finish_registration(heart) | |||
|
1091 | ||||
|
1092 | self.notifier = save_notifier | |||
|
1093 | ||||
|
1094 | self._idcounter = state['next_id'] | |||
|
1095 | ||||
|
1096 | #------------------------------------------------------------------------- | |||
1037 | # Client Requests |
|
1097 | # Client Requests | |
1038 | #------------------------------------------------------------------------- |
|
1098 | #------------------------------------------------------------------------- | |
1039 |
|
1099 | |||
@@ -1134,7 +1194,7 b' class Hub(SessionFactory):' | |||||
1134 | except: |
|
1194 | except: | |
1135 | reply = error.wrap_exception() |
|
1195 | reply = error.wrap_exception() | |
1136 | break |
|
1196 | break | |
1137 |
uid = self.engines[eid]. |
|
1197 | uid = self.engines[eid].uuid | |
1138 | try: |
|
1198 | try: | |
1139 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) |
|
1199 | self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None})) | |
1140 | except Exception: |
|
1200 | except Exception: |
@@ -189,6 +189,7 b' class TaskScheduler(SessionFactory):' | |||||
189 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream |
|
189 | engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | |
190 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream |
|
190 | notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | |
191 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream |
|
191 | mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | |
|
192 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | |||
192 |
|
193 | |||
193 | # internals: |
|
194 | # internals: | |
194 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
195 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
@@ -216,6 +217,9 b' class TaskScheduler(SessionFactory):' | |||||
216 | return self.session.bsession |
|
217 | return self.session.bsession | |
217 |
|
218 | |||
218 | def start(self): |
|
219 | def start(self): | |
|
220 | self.query_stream.on_recv(self.dispatch_query_reply) | |||
|
221 | self.session.send(self.query_stream, "connection_request", {}) | |||
|
222 | ||||
219 | self.engine_stream.on_recv(self.dispatch_result, copy=False) |
|
223 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | |
220 | self.client_stream.on_recv(self.dispatch_submission, copy=False) |
|
224 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | |
221 |
|
225 | |||
@@ -240,6 +244,24 b' class TaskScheduler(SessionFactory):' | |||||
240 | #----------------------------------------------------------------------- |
|
244 | #----------------------------------------------------------------------- | |
241 | # [Un]Registration Handling |
|
245 | # [Un]Registration Handling | |
242 | #----------------------------------------------------------------------- |
|
246 | #----------------------------------------------------------------------- | |
|
247 | ||||
|
248 | ||||
|
249 | def dispatch_query_reply(self, msg): | |||
|
250 | """handle reply to our initial connection request""" | |||
|
251 | try: | |||
|
252 | idents,msg = self.session.feed_identities(msg) | |||
|
253 | except ValueError: | |||
|
254 | self.log.warn("task::Invalid Message: %r",msg) | |||
|
255 | return | |||
|
256 | try: | |||
|
257 | msg = self.session.unserialize(msg) | |||
|
258 | except ValueError: | |||
|
259 | self.log.warn("task::Unauthorized message from: %r"%idents) | |||
|
260 | return | |||
|
261 | ||||
|
262 | content = msg['content'] | |||
|
263 | for uuid in content.get('engines', {}).values(): | |||
|
264 | self._register_engine(asbytes(uuid)) | |||
243 |
|
265 | |||
244 |
|
266 | |||
245 | @util.log_errors |
|
267 | @util.log_errors | |
@@ -263,7 +285,7 b' class TaskScheduler(SessionFactory):' | |||||
263 | self.log.error("Unhandled message type: %r"%msg_type) |
|
285 | self.log.error("Unhandled message type: %r"%msg_type) | |
264 | else: |
|
286 | else: | |
265 | try: |
|
287 | try: | |
266 |
handler(cast_bytes(msg['content'][' |
|
288 | handler(cast_bytes(msg['content']['uuid'])) | |
267 | except Exception: |
|
289 | except Exception: | |
268 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) |
|
290 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) | |
269 |
|
291 | |||
@@ -714,7 +736,7 b' class TaskScheduler(SessionFactory):' | |||||
714 |
|
736 | |||
715 |
|
737 | |||
716 |
|
738 | |||
717 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, |
|
739 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, | |
718 | logname='root', log_url=None, loglevel=logging.DEBUG, |
|
740 | logname='root', log_url=None, loglevel=logging.DEBUG, | |
719 | identity=b'task', in_thread=False): |
|
741 | identity=b'task', in_thread=False): | |
720 |
|
742 | |||
@@ -734,18 +756,21 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||||
734 | ctx = zmq.Context() |
|
756 | ctx = zmq.Context() | |
735 | loop = ioloop.IOLoop() |
|
757 | loop = ioloop.IOLoop() | |
736 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
758 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) | |
737 | ins.setsockopt(zmq.IDENTITY, identity) |
|
759 | ins.setsockopt(zmq.IDENTITY, identity+'_in') | |
738 | ins.bind(in_addr) |
|
760 | ins.bind(in_addr) | |
739 |
|
761 | |||
740 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) |
|
762 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) | |
741 | outs.setsockopt(zmq.IDENTITY, identity) |
|
763 | outs.setsockopt(zmq.IDENTITY, identity+'_out') | |
742 | outs.bind(out_addr) |
|
764 | outs.bind(out_addr) | |
743 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) |
|
765 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | |
744 | mons.connect(mon_addr) |
|
766 | mons.connect(mon_addr) | |
745 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) |
|
767 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | |
746 | nots.setsockopt(zmq.SUBSCRIBE, b'') |
|
768 | nots.setsockopt(zmq.SUBSCRIBE, b'') | |
747 | nots.connect(not_addr) |
|
769 | nots.connect(not_addr) | |
748 |
|
770 | |||
|
771 | querys = ZMQStream(ctx.socket(zmq.DEALER),loop) | |||
|
772 | querys.connect(reg_addr) | |||
|
773 | ||||
749 | # setup logging. |
|
774 | # setup logging. | |
750 | if in_thread: |
|
775 | if in_thread: | |
751 | log = Application.instance().log |
|
776 | log = Application.instance().log | |
@@ -757,6 +782,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,' | |||||
757 |
|
782 | |||
758 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, |
|
783 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | |
759 | mon_stream=mons, notifier_stream=nots, |
|
784 | mon_stream=mons, notifier_stream=nots, | |
|
785 | query_stream=querys, | |||
760 | loop=loop, log=log, |
|
786 | loop=loop, log=log, | |
761 | config=config) |
|
787 | config=config) | |
762 | scheduler.start() |
|
788 | scheduler.start() |
@@ -129,7 +129,7 b' class EngineFactory(RegistrationFactory):' | |||||
129 | self.registrar = zmqstream.ZMQStream(reg, self.loop) |
|
129 | self.registrar = zmqstream.ZMQStream(reg, self.loop) | |
130 |
|
130 | |||
131 |
|
131 | |||
132 |
content = dict( |
|
132 | content = dict(uuid=self.ident) | |
133 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) |
|
133 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) | |
134 | # print (self.session.key) |
|
134 | # print (self.session.key) | |
135 | self.session.send(self.registrar, "registration_request", content=content) |
|
135 | self.session.send(self.registrar, "registration_request", content=content) |
@@ -43,9 +43,7 b' monitor the survival of the Engine process.' | |||||
43 | Message type: ``registration_request``:: |
|
43 | Message type: ``registration_request``:: | |
44 |
|
44 | |||
45 | content = { |
|
45 | content = { | |
46 |
' |
|
46 | 'uuid' : 'abcd-1234-...', # the zmq.IDENTITY of the engine's sockets | |
47 | 'control' : 'abcd-1234-...', # the control queue zmq.IDENTITY |
|
|||
48 | 'heartbeat' : 'abcd-1234-...' # the heartbeat zmq.IDENTITY |
|
|||
49 | } |
|
47 | } | |
50 |
|
48 | |||
51 | .. note:: |
|
49 | .. note:: | |
@@ -63,10 +61,6 b' Message type: ``registration_reply``::' | |||||
63 | 'status' : 'ok', # or 'error' |
|
61 | 'status' : 'ok', # or 'error' | |
64 | # if ok: |
|
62 | # if ok: | |
65 | 'id' : 0, # int, the engine id |
|
63 | 'id' : 0, # int, the engine id | |
66 | 'queue' : 'tcp://127.0.0.1:12345', # connection for engine side of the queue |
|
|||
67 | 'control' : 'tcp://...', # addr for control queue |
|
|||
68 | 'heartbeat' : ('tcp://...','tcp://...'), # tuple containing two interfaces needed for heartbeat |
|
|||
69 | 'task' : 'tcp://...', # addr for task queue, or None if no task queue running |
|
|||
70 | } |
|
64 | } | |
71 |
|
65 | |||
72 | Clients use the same socket as engines to start their connections. Connection requests |
|
66 | Clients use the same socket as engines to start their connections. Connection requests | |
@@ -84,11 +78,6 b' Message type: ``connection_reply``::' | |||||
84 |
|
78 | |||
85 | content = { |
|
79 | content = { | |
86 | 'status' : 'ok', # or 'error' |
|
80 | 'status' : 'ok', # or 'error' | |
87 | # if ok: |
|
|||
88 | 'queue' : 'tcp://127.0.0.1:12345', # connection for client side of the MUX queue |
|
|||
89 | 'task' : ('lru','tcp...'), # routing scheme and addr for task queue (len 2 tuple) |
|
|||
90 | 'query' : 'tcp...', # addr for methods to query the hub, like queue_request, etc. |
|
|||
91 | 'control' : 'tcp...', # addr for control methods, like abort, etc. |
|
|||
92 | } |
|
81 | } | |
93 |
|
82 | |||
94 | Heartbeat |
|
83 | Heartbeat | |
@@ -110,13 +99,14 b' Message type: ``registration_notification``::' | |||||
110 |
|
99 | |||
111 | content = { |
|
100 | content = { | |
112 | 'id' : 0, # engine ID that has been registered |
|
101 | 'id' : 0, # engine ID that has been registered | |
113 |
' |
|
102 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |
114 | } |
|
103 | } | |
115 |
|
104 | |||
116 | Message type : ``unregistration_notification``:: |
|
105 | Message type : ``unregistration_notification``:: | |
117 |
|
106 | |||
118 | content = { |
|
107 | content = { | |
119 | 'id' : 0 # engine ID that has been unregistered |
|
108 | 'id' : 0 # engine ID that has been unregistered | |
|
109 | 'uuid' : 'engine_id' # the IDENT for the engine's sockets | |||
120 | } |
|
110 | } | |
121 |
|
111 | |||
122 |
|
112 |
General Comments 0
You need to be logged in to leave comments.
Login now