Show More
@@ -45,6 +45,30 b' def _printer(*args, **kwargs):' | |||||
45 | print (args) |
|
45 | print (args) | |
46 | print (kwargs) |
|
46 | print (kwargs) | |
47 |
|
47 | |||
|
48 | def empty_record(): | |||
|
49 | """Return an empty dict with all record keys.""" | |||
|
50 | return { | |||
|
51 | 'msg_id' : None, | |||
|
52 | 'header' : None, | |||
|
53 | 'content': None, | |||
|
54 | 'buffers': None, | |||
|
55 | 'submitted': None, | |||
|
56 | 'client_uuid' : None, | |||
|
57 | 'engine_uuid' : None, | |||
|
58 | 'started': None, | |||
|
59 | 'completed': None, | |||
|
60 | 'resubmitted': None, | |||
|
61 | 'result_header' : None, | |||
|
62 | 'result_content' : None, | |||
|
63 | 'result_buffers' : None, | |||
|
64 | 'queue' : None, | |||
|
65 | 'pyin' : None, | |||
|
66 | 'pyout': None, | |||
|
67 | 'pyerr': None, | |||
|
68 | 'stdout': '', | |||
|
69 | 'stderr': '', | |||
|
70 | } | |||
|
71 | ||||
48 | def init_record(msg): |
|
72 | def init_record(msg): | |
49 | """Initialize a TaskRecord based on a request.""" |
|
73 | """Initialize a TaskRecord based on a request.""" | |
50 | header = msg['header'] |
|
74 | header = msg['header'] | |
@@ -283,6 +307,7 b' class Hub(LoggingFactory):' | |||||
283 | tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id |
|
307 | tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id | |
284 | completed=Dict() # completed msg_ids keyed by engine_id |
|
308 | completed=Dict() # completed msg_ids keyed by engine_id | |
285 | all_completed=Set() # completed msg_ids keyed by engine_id |
|
309 | all_completed=Set() # completed msg_ids keyed by engine_id | |
|
310 | dead_engines=Set() # completed msg_ids keyed by engine_id | |||
286 | # mia=None |
|
311 | # mia=None | |
287 | incoming_registrations=Dict() |
|
312 | incoming_registrations=Dict() | |
288 | registration_timeout=Int() |
|
313 | registration_timeout=Int() | |
@@ -531,9 +556,21 b' class Hub(LoggingFactory):' | |||||
531 | record['client_uuid'] = client_id |
|
556 | record['client_uuid'] = client_id | |
532 | record['queue'] = 'mux' |
|
557 | record['queue'] = 'mux' | |
533 |
|
558 | |||
|
559 | try: | |||
|
560 | # it's posible iopub arrived first: | |||
|
561 | existing = self.db.get_record(msg_id) | |||
|
562 | for key,evalue in existing.iteritems(): | |||
|
563 | rvalue = record[key] | |||
|
564 | if evalue and rvalue and evalue != rvalue: | |||
|
565 | self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue)) | |||
|
566 | elif evalue and not rvalue: | |||
|
567 | record[key] = evalue | |||
|
568 | self.db.update_record(msg_id, record) | |||
|
569 | except KeyError: | |||
|
570 | self.db.add_record(msg_id, record) | |||
|
571 | ||||
534 | self.pending.add(msg_id) |
|
572 | self.pending.add(msg_id) | |
535 | self.queues[eid].append(msg_id) |
|
573 | self.queues[eid].append(msg_id) | |
536 | self.db.add_record(msg_id, record) |
|
|||
537 |
|
574 | |||
538 | def save_queue_result(self, idents, msg): |
|
575 | def save_queue_result(self, idents, msg): | |
539 | if len(idents) < 2: |
|
576 | if len(idents) < 2: | |
@@ -551,7 +588,7 b' class Hub(LoggingFactory):' | |||||
551 | eid = self.by_ident.get(queue_id, None) |
|
588 | eid = self.by_ident.get(queue_id, None) | |
552 | if eid is None: |
|
589 | if eid is None: | |
553 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) |
|
590 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) | |
554 | self.log.debug("queue:: %s"%msg[2:]) |
|
591 | # self.log.debug("queue:: %s"%msg[2:]) | |
555 | return |
|
592 | return | |
556 |
|
593 | |||
557 | parent = msg['parent_header'] |
|
594 | parent = msg['parent_header'] | |
@@ -604,7 +641,18 b' class Hub(LoggingFactory):' | |||||
604 | header = msg['header'] |
|
641 | header = msg['header'] | |
605 | msg_id = header['msg_id'] |
|
642 | msg_id = header['msg_id'] | |
606 | self.pending.add(msg_id) |
|
643 | self.pending.add(msg_id) | |
607 | self.db.add_record(msg_id, record) |
|
644 | try: | |
|
645 | # it's posible iopub arrived first: | |||
|
646 | existing = self.db.get_record(msg_id) | |||
|
647 | for key,evalue in existing.iteritems(): | |||
|
648 | rvalue = record[key] | |||
|
649 | if evalue and rvalue and evalue != rvalue: | |||
|
650 | self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue)) | |||
|
651 | elif evalue and not rvalue: | |||
|
652 | record[key] = evalue | |||
|
653 | self.db.update_record(msg_id, record) | |||
|
654 | except KeyError: | |||
|
655 | self.db.add_record(msg_id, record) | |||
608 |
|
656 | |||
609 | def save_task_result(self, idents, msg): |
|
657 | def save_task_result(self, idents, msg): | |
610 | """save the result of a completed task.""" |
|
658 | """save the result of a completed task.""" | |
@@ -704,9 +752,10 b' class Hub(LoggingFactory):' | |||||
704 | # ensure msg_id is in db |
|
752 | # ensure msg_id is in db | |
705 | try: |
|
753 | try: | |
706 | rec = self.db.get_record(msg_id) |
|
754 | rec = self.db.get_record(msg_id) | |
707 | except: |
|
755 | except KeyError: | |
708 | self.log.error("iopub::IOPub message has invalid parent", exc_info=True) |
|
756 | rec = empty_record() | |
709 | return |
|
757 | rec['msg_id'] = msg_id | |
|
758 | self.db.add_record(msg_id, rec) | |||
710 | # stream |
|
759 | # stream | |
711 | d = {} |
|
760 | d = {} | |
712 | if msg_type == 'stream': |
|
761 | if msg_type == 'stream': | |
@@ -734,7 +783,8 b' class Hub(LoggingFactory):' | |||||
734 | content.update(self.client_info) |
|
783 | content.update(self.client_info) | |
735 | jsonable = {} |
|
784 | jsonable = {} | |
736 | for k,v in self.keytable.iteritems(): |
|
785 | for k,v in self.keytable.iteritems(): | |
737 | jsonable[str(k)] = v |
|
786 | if v not in self.dead_engines: | |
|
787 | jsonable[str(k)] = v | |||
738 | content['engines'] = jsonable |
|
788 | content['engines'] = jsonable | |
739 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
789 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) | |
740 |
|
790 | |||
@@ -812,15 +862,20 b' class Hub(LoggingFactory):' | |||||
812 | return |
|
862 | return | |
813 | self.log.info("registration::unregister_engine(%s)"%eid) |
|
863 | self.log.info("registration::unregister_engine(%s)"%eid) | |
814 | # print (eid) |
|
864 | # print (eid) | |
815 | content=dict(id=eid, queue=self.engines[eid].queue) |
|
865 | uuid = self.keytable[eid] | |
816 | self.ids.remove(eid) |
|
866 | content=dict(id=eid, queue=uuid) | |
817 | uuid = self.keytable.pop(eid) |
|
867 | self.dead_engines.add(uuid) | |
818 |
|
|
868 | # self.ids.remove(eid) | |
819 | self.hearts.pop(ec.heartbeat) |
|
869 | # uuid = self.keytable.pop(eid) | |
820 | self.by_ident.pop(ec.queue) |
|
870 | # | |
821 |
|
|
871 | # ec = self.engines.pop(eid) | |
822 | self._handle_stranded_msgs(eid, uuid) |
|
872 | # self.hearts.pop(ec.heartbeat) | |
823 | ############## TODO: HANDLE IT ################ |
|
873 | # self.by_ident.pop(ec.queue) | |
|
874 | # self.completed.pop(eid) | |||
|
875 | handleit = lambda : self._handle_stranded_msgs(eid, uuid) | |||
|
876 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) | |||
|
877 | dc.start() | |||
|
878 | ############## TODO: HANDLE IT ################ | |||
824 |
|
879 | |||
825 | if self.notifier: |
|
880 | if self.notifier: | |
826 | self.session.send(self.notifier, "unregistration_notification", content=content) |
|
881 | self.session.send(self.notifier, "unregistration_notification", content=content) | |
@@ -833,7 +888,7 b' class Hub(LoggingFactory):' | |||||
833 | that the result failed and later receive the actual result. |
|
888 | that the result failed and later receive the actual result. | |
834 | """ |
|
889 | """ | |
835 |
|
890 | |||
836 |
outstanding = self.queues |
|
891 | outstanding = self.queues[eid] | |
837 |
|
892 | |||
838 | for msg_id in outstanding: |
|
893 | for msg_id in outstanding: | |
839 | self.pending.remove(msg_id) |
|
894 | self.pending.remove(msg_id) |
@@ -904,7 +904,7 b' class PBSEngineSetLauncher(PBSLauncher):' | |||||
904 |
|
904 | |||
905 | def start(self, n, cluster_dir): |
|
905 | def start(self, n, cluster_dir): | |
906 | """Start n engines by profile or cluster_dir.""" |
|
906 | """Start n engines by profile or cluster_dir.""" | |
907 |
self.log.info('Starting % |
|
907 | self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args)) | |
908 | return super(PBSEngineSetLauncher, self).start(n, cluster_dir) |
|
908 | return super(PBSEngineSetLauncher, self).start(n, cluster_dir) | |
909 |
|
909 | |||
910 | #SGE is very similar to PBS |
|
910 | #SGE is very similar to PBS | |
@@ -942,7 +942,7 b' class SGEEngineSetLauncher(SGELauncher):' | |||||
942 |
|
942 | |||
943 | def start(self, n, cluster_dir): |
|
943 | def start(self, n, cluster_dir): | |
944 | """Start n engines by profile or cluster_dir.""" |
|
944 | """Start n engines by profile or cluster_dir.""" | |
945 |
self.log.info('Starting % |
|
945 | self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args)) | |
946 | return super(SGEEngineSetLauncher, self).start(n, cluster_dir) |
|
946 | return super(SGEEngineSetLauncher, self).start(n, cluster_dir) | |
947 |
|
947 | |||
948 |
|
948 |
@@ -13,6 +13,8 b' from datetime import datetime' | |||||
13 |
|
13 | |||
14 | import sqlite3 |
|
14 | import sqlite3 | |
15 |
|
15 | |||
|
16 | from zmq.eventloop import ioloop | |||
|
17 | ||||
16 | from IPython.utils.traitlets import CUnicode, CStr, Instance, List |
|
18 | from IPython.utils.traitlets import CUnicode, CStr, Instance, List | |
17 | from .dictdb import BaseDB |
|
19 | from .dictdb import BaseDB | |
18 | from .util import ISO8601 |
|
20 | from .util import ISO8601 | |
@@ -114,6 +116,13 b' class SQLiteDB(BaseDB):' | |||||
114 | else: |
|
116 | else: | |
115 | self.location = '.' |
|
117 | self.location = '.' | |
116 | self._init_db() |
|
118 | self._init_db() | |
|
119 | ||||
|
120 | # register db commit as 2s periodic callback | |||
|
121 | # to prevent clogging pipes | |||
|
122 | # assumes we are being run in a zmq ioloop app | |||
|
123 | loop = ioloop.IOLoop.instance() | |||
|
124 | pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop) | |||
|
125 | pc.start() | |||
117 |
|
126 | |||
118 | def _defaults(self): |
|
127 | def _defaults(self): | |
119 | """create an empty record""" |
|
128 | """create an empty record""" | |
@@ -133,7 +142,9 b' class SQLiteDB(BaseDB):' | |||||
133 | sqlite3.register_converter('bufs', _convert_bufs) |
|
142 | sqlite3.register_converter('bufs', _convert_bufs) | |
134 | # connect to the db |
|
143 | # connect to the db | |
135 | dbfile = os.path.join(self.location, self.filename) |
|
144 | dbfile = os.path.join(self.location, self.filename) | |
136 |
self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, |
|
145 | self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, | |
|
146 | # isolation_level = None)#, | |||
|
147 | cached_statements=64) | |||
137 | # print dir(self._db) |
|
148 | # print dir(self._db) | |
138 |
|
149 | |||
139 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s |
|
150 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s | |
@@ -218,7 +229,7 b' class SQLiteDB(BaseDB):' | |||||
218 | line = self._dict_to_list(d) |
|
229 | line = self._dict_to_list(d) | |
219 | tups = '(%s)'%(','.join(['?']*len(line))) |
|
230 | tups = '(%s)'%(','.join(['?']*len(line))) | |
220 | self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line) |
|
231 | self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line) | |
221 | self._db.commit() |
|
232 | # self._db.commit() | |
222 |
|
233 | |||
223 | def get_record(self, msg_id): |
|
234 | def get_record(self, msg_id): | |
224 | """Get a specific Task Record, by msg_id.""" |
|
235 | """Get a specific Task Record, by msg_id.""" | |
@@ -240,19 +251,19 b' class SQLiteDB(BaseDB):' | |||||
240 | query += ', '.join(sets) |
|
251 | query += ', '.join(sets) | |
241 | query += ' WHERE msg_id == %r'%msg_id |
|
252 | query += ' WHERE msg_id == %r'%msg_id | |
242 | self._db.execute(query, values) |
|
253 | self._db.execute(query, values) | |
243 | self._db.commit() |
|
254 | # self._db.commit() | |
244 |
|
255 | |||
245 | def drop_record(self, msg_id): |
|
256 | def drop_record(self, msg_id): | |
246 | """Remove a record from the DB.""" |
|
257 | """Remove a record from the DB.""" | |
247 | self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,)) |
|
258 | self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,)) | |
248 | self._db.commit() |
|
259 | # self._db.commit() | |
249 |
|
260 | |||
250 | def drop_matching_records(self, check): |
|
261 | def drop_matching_records(self, check): | |
251 | """Remove a record from the DB.""" |
|
262 | """Remove a record from the DB.""" | |
252 | expr,args = self._render_expression(check) |
|
263 | expr,args = self._render_expression(check) | |
253 | query = "DELETE FROM %s WHERE %s"%(self.table, expr) |
|
264 | query = "DELETE FROM %s WHERE %s"%(self.table, expr) | |
254 | self._db.execute(query,args) |
|
265 | self._db.execute(query,args) | |
255 | self._db.commit() |
|
266 | # self._db.commit() | |
256 |
|
267 | |||
257 | def find_records(self, check, id_only=False): |
|
268 | def find_records(self, check, id_only=False): | |
258 | """Find records matching a query dict.""" |
|
269 | """Find records matching a query dict.""" |
@@ -28,7 +28,7 b' import time' | |||||
28 | from numpy import exp, zeros, newaxis, sqrt |
|
28 | from numpy import exp, zeros, newaxis, sqrt | |
29 |
|
29 | |||
30 | from IPython.external import argparse |
|
30 | from IPython.external import argparse | |
31 |
from IPython.parallel |
|
31 | from IPython.parallel import Client, Reference | |
32 |
|
32 | |||
33 | def setup_partitioner(index, num_procs, gnum_cells, parts): |
|
33 | def setup_partitioner(index, num_procs, gnum_cells, parts): | |
34 | """create a partitioner in the engine namespace""" |
|
34 | """create a partitioner in the engine namespace""" |
@@ -28,7 +28,7 b' import time' | |||||
28 | from numpy import exp, zeros, newaxis, sqrt |
|
28 | from numpy import exp, zeros, newaxis, sqrt | |
29 |
|
29 | |||
30 | from IPython.external import argparse |
|
30 | from IPython.external import argparse | |
31 |
from IPython.parallel |
|
31 | from IPython.parallel import Client, Reference | |
32 |
|
32 | |||
33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): |
|
33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): | |
34 | """create a partitioner in the engine namespace""" |
|
34 | """create a partitioner in the engine namespace""" |
General Comments 0
You need to be logged in to leave comments.
Login now