Show More
@@ -45,6 +45,30 b' def _printer(*args, **kwargs):' | |||
|
45 | 45 | print (args) |
|
46 | 46 | print (kwargs) |
|
47 | 47 | |
|
48 | def empty_record(): | |
|
49 | """Return an empty dict with all record keys.""" | |
|
50 | return { | |
|
51 | 'msg_id' : None, | |
|
52 | 'header' : None, | |
|
53 | 'content': None, | |
|
54 | 'buffers': None, | |
|
55 | 'submitted': None, | |
|
56 | 'client_uuid' : None, | |
|
57 | 'engine_uuid' : None, | |
|
58 | 'started': None, | |
|
59 | 'completed': None, | |
|
60 | 'resubmitted': None, | |
|
61 | 'result_header' : None, | |
|
62 | 'result_content' : None, | |
|
63 | 'result_buffers' : None, | |
|
64 | 'queue' : None, | |
|
65 | 'pyin' : None, | |
|
66 | 'pyout': None, | |
|
67 | 'pyerr': None, | |
|
68 | 'stdout': '', | |
|
69 | 'stderr': '', | |
|
70 | } | |
|
71 | ||
|
48 | 72 | def init_record(msg): |
|
49 | 73 | """Initialize a TaskRecord based on a request.""" |
|
50 | 74 | header = msg['header'] |
@@ -283,6 +307,7 b' class Hub(LoggingFactory):' | |||
|
283 | 307 | tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id |
|
284 | 308 | completed=Dict() # completed msg_ids keyed by engine_id |
|
285 | 309 | all_completed=Set() # completed msg_ids keyed by engine_id |
|
310 | dead_engines=Set() # completed msg_ids keyed by engine_id | |
|
286 | 311 | # mia=None |
|
287 | 312 | incoming_registrations=Dict() |
|
288 | 313 | registration_timeout=Int() |
@@ -531,9 +556,21 b' class Hub(LoggingFactory):' | |||
|
531 | 556 | record['client_uuid'] = client_id |
|
532 | 557 | record['queue'] = 'mux' |
|
533 | 558 | |
|
559 | try: | |
|
560 | # it's posible iopub arrived first: | |
|
561 | existing = self.db.get_record(msg_id) | |
|
562 | for key,evalue in existing.iteritems(): | |
|
563 | rvalue = record[key] | |
|
564 | if evalue and rvalue and evalue != rvalue: | |
|
565 | self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue)) | |
|
566 | elif evalue and not rvalue: | |
|
567 | record[key] = evalue | |
|
568 | self.db.update_record(msg_id, record) | |
|
569 | except KeyError: | |
|
570 | self.db.add_record(msg_id, record) | |
|
571 | ||
|
534 | 572 | self.pending.add(msg_id) |
|
535 | 573 | self.queues[eid].append(msg_id) |
|
536 | self.db.add_record(msg_id, record) | |
|
537 | 574 | |
|
538 | 575 | def save_queue_result(self, idents, msg): |
|
539 | 576 | if len(idents) < 2: |
@@ -551,7 +588,7 b' class Hub(LoggingFactory):' | |||
|
551 | 588 | eid = self.by_ident.get(queue_id, None) |
|
552 | 589 | if eid is None: |
|
553 | 590 | self.log.error("queue::unknown engine %r is sending a reply: "%queue_id) |
|
554 | self.log.debug("queue:: %s"%msg[2:]) | |
|
591 | # self.log.debug("queue:: %s"%msg[2:]) | |
|
555 | 592 | return |
|
556 | 593 | |
|
557 | 594 | parent = msg['parent_header'] |
@@ -604,7 +641,18 b' class Hub(LoggingFactory):' | |||
|
604 | 641 | header = msg['header'] |
|
605 | 642 | msg_id = header['msg_id'] |
|
606 | 643 | self.pending.add(msg_id) |
|
607 | self.db.add_record(msg_id, record) | |
|
644 | try: | |
|
645 | # it's posible iopub arrived first: | |
|
646 | existing = self.db.get_record(msg_id) | |
|
647 | for key,evalue in existing.iteritems(): | |
|
648 | rvalue = record[key] | |
|
649 | if evalue and rvalue and evalue != rvalue: | |
|
650 | self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue)) | |
|
651 | elif evalue and not rvalue: | |
|
652 | record[key] = evalue | |
|
653 | self.db.update_record(msg_id, record) | |
|
654 | except KeyError: | |
|
655 | self.db.add_record(msg_id, record) | |
|
608 | 656 | |
|
609 | 657 | def save_task_result(self, idents, msg): |
|
610 | 658 | """save the result of a completed task.""" |
@@ -704,9 +752,10 b' class Hub(LoggingFactory):' | |||
|
704 | 752 | # ensure msg_id is in db |
|
705 | 753 | try: |
|
706 | 754 | rec = self.db.get_record(msg_id) |
|
707 | except: | |
|
708 | self.log.error("iopub::IOPub message has invalid parent", exc_info=True) | |
|
709 | return | |
|
755 | except KeyError: | |
|
756 | rec = empty_record() | |
|
757 | rec['msg_id'] = msg_id | |
|
758 | self.db.add_record(msg_id, rec) | |
|
710 | 759 | # stream |
|
711 | 760 | d = {} |
|
712 | 761 | if msg_type == 'stream': |
@@ -734,7 +783,8 b' class Hub(LoggingFactory):' | |||
|
734 | 783 | content.update(self.client_info) |
|
735 | 784 | jsonable = {} |
|
736 | 785 | for k,v in self.keytable.iteritems(): |
|
737 | jsonable[str(k)] = v | |
|
786 | if v not in self.dead_engines: | |
|
787 | jsonable[str(k)] = v | |
|
738 | 788 | content['engines'] = jsonable |
|
739 | 789 | self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id) |
|
740 | 790 | |
@@ -812,15 +862,20 b' class Hub(LoggingFactory):' | |||
|
812 | 862 | return |
|
813 | 863 | self.log.info("registration::unregister_engine(%s)"%eid) |
|
814 | 864 | # print (eid) |
|
815 | content=dict(id=eid, queue=self.engines[eid].queue) | |
|
816 | self.ids.remove(eid) | |
|
817 | uuid = self.keytable.pop(eid) | |
|
818 |
|
|
|
819 | self.hearts.pop(ec.heartbeat) | |
|
820 | self.by_ident.pop(ec.queue) | |
|
821 |
|
|
|
822 | self._handle_stranded_msgs(eid, uuid) | |
|
823 | ############## TODO: HANDLE IT ################ | |
|
865 | uuid = self.keytable[eid] | |
|
866 | content=dict(id=eid, queue=uuid) | |
|
867 | self.dead_engines.add(uuid) | |
|
868 | # self.ids.remove(eid) | |
|
869 | # uuid = self.keytable.pop(eid) | |
|
870 | # | |
|
871 | # ec = self.engines.pop(eid) | |
|
872 | # self.hearts.pop(ec.heartbeat) | |
|
873 | # self.by_ident.pop(ec.queue) | |
|
874 | # self.completed.pop(eid) | |
|
875 | handleit = lambda : self._handle_stranded_msgs(eid, uuid) | |
|
876 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) | |
|
877 | dc.start() | |
|
878 | ############## TODO: HANDLE IT ################ | |
|
824 | 879 | |
|
825 | 880 | if self.notifier: |
|
826 | 881 | self.session.send(self.notifier, "unregistration_notification", content=content) |
@@ -833,7 +888,7 b' class Hub(LoggingFactory):' | |||
|
833 | 888 | that the result failed and later receive the actual result. |
|
834 | 889 | """ |
|
835 | 890 | |
|
836 |
outstanding = self.queues |
|
|
891 | outstanding = self.queues[eid] | |
|
837 | 892 | |
|
838 | 893 | for msg_id in outstanding: |
|
839 | 894 | self.pending.remove(msg_id) |
@@ -904,7 +904,7 b' class PBSEngineSetLauncher(PBSLauncher):' | |||
|
904 | 904 | |
|
905 | 905 | def start(self, n, cluster_dir): |
|
906 | 906 | """Start n engines by profile or cluster_dir.""" |
|
907 |
self.log.info('Starting % |
|
|
907 | self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args)) | |
|
908 | 908 | return super(PBSEngineSetLauncher, self).start(n, cluster_dir) |
|
909 | 909 | |
|
910 | 910 | #SGE is very similar to PBS |
@@ -942,7 +942,7 b' class SGEEngineSetLauncher(SGELauncher):' | |||
|
942 | 942 | |
|
943 | 943 | def start(self, n, cluster_dir): |
|
944 | 944 | """Start n engines by profile or cluster_dir.""" |
|
945 |
self.log.info('Starting % |
|
|
945 | self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args)) | |
|
946 | 946 | return super(SGEEngineSetLauncher, self).start(n, cluster_dir) |
|
947 | 947 | |
|
948 | 948 |
@@ -13,6 +13,8 b' from datetime import datetime' | |||
|
13 | 13 | |
|
14 | 14 | import sqlite3 |
|
15 | 15 | |
|
16 | from zmq.eventloop import ioloop | |
|
17 | ||
|
16 | 18 | from IPython.utils.traitlets import CUnicode, CStr, Instance, List |
|
17 | 19 | from .dictdb import BaseDB |
|
18 | 20 | from .util import ISO8601 |
@@ -114,6 +116,13 b' class SQLiteDB(BaseDB):' | |||
|
114 | 116 | else: |
|
115 | 117 | self.location = '.' |
|
116 | 118 | self._init_db() |
|
119 | ||
|
120 | # register db commit as 2s periodic callback | |
|
121 | # to prevent clogging pipes | |
|
122 | # assumes we are being run in a zmq ioloop app | |
|
123 | loop = ioloop.IOLoop.instance() | |
|
124 | pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop) | |
|
125 | pc.start() | |
|
117 | 126 | |
|
118 | 127 | def _defaults(self): |
|
119 | 128 | """create an empty record""" |
@@ -133,7 +142,9 b' class SQLiteDB(BaseDB):' | |||
|
133 | 142 | sqlite3.register_converter('bufs', _convert_bufs) |
|
134 | 143 | # connect to the db |
|
135 | 144 | dbfile = os.path.join(self.location, self.filename) |
|
136 |
self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, |
|
|
145 | self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, | |
|
146 | # isolation_level = None)#, | |
|
147 | cached_statements=64) | |
|
137 | 148 | # print dir(self._db) |
|
138 | 149 | |
|
139 | 150 | self._db.execute("""CREATE TABLE IF NOT EXISTS %s |
@@ -218,7 +229,7 b' class SQLiteDB(BaseDB):' | |||
|
218 | 229 | line = self._dict_to_list(d) |
|
219 | 230 | tups = '(%s)'%(','.join(['?']*len(line))) |
|
220 | 231 | self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line) |
|
221 | self._db.commit() | |
|
232 | # self._db.commit() | |
|
222 | 233 | |
|
223 | 234 | def get_record(self, msg_id): |
|
224 | 235 | """Get a specific Task Record, by msg_id.""" |
@@ -240,19 +251,19 b' class SQLiteDB(BaseDB):' | |||
|
240 | 251 | query += ', '.join(sets) |
|
241 | 252 | query += ' WHERE msg_id == %r'%msg_id |
|
242 | 253 | self._db.execute(query, values) |
|
243 | self._db.commit() | |
|
254 | # self._db.commit() | |
|
244 | 255 | |
|
245 | 256 | def drop_record(self, msg_id): |
|
246 | 257 | """Remove a record from the DB.""" |
|
247 | 258 | self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,)) |
|
248 | self._db.commit() | |
|
259 | # self._db.commit() | |
|
249 | 260 | |
|
250 | 261 | def drop_matching_records(self, check): |
|
251 | 262 | """Remove a record from the DB.""" |
|
252 | 263 | expr,args = self._render_expression(check) |
|
253 | 264 | query = "DELETE FROM %s WHERE %s"%(self.table, expr) |
|
254 | 265 | self._db.execute(query,args) |
|
255 | self._db.commit() | |
|
266 | # self._db.commit() | |
|
256 | 267 | |
|
257 | 268 | def find_records(self, check, id_only=False): |
|
258 | 269 | """Find records matching a query dict.""" |
@@ -28,7 +28,7 b' import time' | |||
|
28 | 28 | from numpy import exp, zeros, newaxis, sqrt |
|
29 | 29 | |
|
30 | 30 | from IPython.external import argparse |
|
31 |
from IPython.parallel |
|
|
31 | from IPython.parallel import Client, Reference | |
|
32 | 32 | |
|
33 | 33 | def setup_partitioner(index, num_procs, gnum_cells, parts): |
|
34 | 34 | """create a partitioner in the engine namespace""" |
@@ -28,7 +28,7 b' import time' | |||
|
28 | 28 | from numpy import exp, zeros, newaxis, sqrt |
|
29 | 29 | |
|
30 | 30 | from IPython.external import argparse |
|
31 |
from IPython.parallel |
|
|
31 | from IPython.parallel import Client, Reference | |
|
32 | 32 | |
|
33 | 33 | def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts): |
|
34 | 34 | """create a partitioner in the engine namespace""" |
General Comments 0
You need to be logged in to leave comments.
Login now