##// END OF EJS Templates
SGE test related fixes...
MinRK -
Show More
@@ -45,6 +45,30 b' def _printer(*args, **kwargs):'
45 45 print (args)
46 46 print (kwargs)
47 47
48 def empty_record():
49 """Return an empty dict with all record keys."""
50 return {
51 'msg_id' : None,
52 'header' : None,
53 'content': None,
54 'buffers': None,
55 'submitted': None,
56 'client_uuid' : None,
57 'engine_uuid' : None,
58 'started': None,
59 'completed': None,
60 'resubmitted': None,
61 'result_header' : None,
62 'result_content' : None,
63 'result_buffers' : None,
64 'queue' : None,
65 'pyin' : None,
66 'pyout': None,
67 'pyerr': None,
68 'stdout': '',
69 'stderr': '',
70 }
71
48 72 def init_record(msg):
49 73 """Initialize a TaskRecord based on a request."""
50 74 header = msg['header']
@@ -283,6 +307,7 b' class Hub(LoggingFactory):'
283 307 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
284 308 completed=Dict() # completed msg_ids keyed by engine_id
285 309 all_completed=Set() # completed msg_ids keyed by engine_id
310 dead_engines=Set() # completed msg_ids keyed by engine_id
286 311 # mia=None
287 312 incoming_registrations=Dict()
288 313 registration_timeout=Int()
@@ -531,9 +556,21 b' class Hub(LoggingFactory):'
531 556 record['client_uuid'] = client_id
532 557 record['queue'] = 'mux'
533 558
559 try:
560 # it's posible iopub arrived first:
561 existing = self.db.get_record(msg_id)
562 for key,evalue in existing.iteritems():
563 rvalue = record[key]
564 if evalue and rvalue and evalue != rvalue:
565 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
566 elif evalue and not rvalue:
567 record[key] = evalue
568 self.db.update_record(msg_id, record)
569 except KeyError:
570 self.db.add_record(msg_id, record)
571
534 572 self.pending.add(msg_id)
535 573 self.queues[eid].append(msg_id)
536 self.db.add_record(msg_id, record)
537 574
538 575 def save_queue_result(self, idents, msg):
539 576 if len(idents) < 2:
@@ -551,7 +588,7 b' class Hub(LoggingFactory):'
551 588 eid = self.by_ident.get(queue_id, None)
552 589 if eid is None:
553 590 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
554 self.log.debug("queue:: %s"%msg[2:])
591 # self.log.debug("queue:: %s"%msg[2:])
555 592 return
556 593
557 594 parent = msg['parent_header']
@@ -604,7 +641,18 b' class Hub(LoggingFactory):'
604 641 header = msg['header']
605 642 msg_id = header['msg_id']
606 643 self.pending.add(msg_id)
607 self.db.add_record(msg_id, record)
644 try:
645 # it's posible iopub arrived first:
646 existing = self.db.get_record(msg_id)
647 for key,evalue in existing.iteritems():
648 rvalue = record[key]
649 if evalue and rvalue and evalue != rvalue:
650 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
651 elif evalue and not rvalue:
652 record[key] = evalue
653 self.db.update_record(msg_id, record)
654 except KeyError:
655 self.db.add_record(msg_id, record)
608 656
609 657 def save_task_result(self, idents, msg):
610 658 """save the result of a completed task."""
@@ -704,9 +752,10 b' class Hub(LoggingFactory):'
704 752 # ensure msg_id is in db
705 753 try:
706 754 rec = self.db.get_record(msg_id)
707 except:
708 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
709 return
755 except KeyError:
756 rec = empty_record()
757 rec['msg_id'] = msg_id
758 self.db.add_record(msg_id, rec)
710 759 # stream
711 760 d = {}
712 761 if msg_type == 'stream':
@@ -734,7 +783,8 b' class Hub(LoggingFactory):'
734 783 content.update(self.client_info)
735 784 jsonable = {}
736 785 for k,v in self.keytable.iteritems():
737 jsonable[str(k)] = v
786 if v not in self.dead_engines:
787 jsonable[str(k)] = v
738 788 content['engines'] = jsonable
739 789 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
740 790
@@ -812,15 +862,20 b' class Hub(LoggingFactory):'
812 862 return
813 863 self.log.info("registration::unregister_engine(%s)"%eid)
814 864 # print (eid)
815 content=dict(id=eid, queue=self.engines[eid].queue)
816 self.ids.remove(eid)
817 uuid = self.keytable.pop(eid)
818 ec = self.engines.pop(eid)
819 self.hearts.pop(ec.heartbeat)
820 self.by_ident.pop(ec.queue)
821 self.completed.pop(eid)
822 self._handle_stranded_msgs(eid, uuid)
823 ############## TODO: HANDLE IT ################
865 uuid = self.keytable[eid]
866 content=dict(id=eid, queue=uuid)
867 self.dead_engines.add(uuid)
868 # self.ids.remove(eid)
869 # uuid = self.keytable.pop(eid)
870 #
871 # ec = self.engines.pop(eid)
872 # self.hearts.pop(ec.heartbeat)
873 # self.by_ident.pop(ec.queue)
874 # self.completed.pop(eid)
875 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
876 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
877 dc.start()
878 ############## TODO: HANDLE IT ################
824 879
825 880 if self.notifier:
826 881 self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -833,7 +888,7 b' class Hub(LoggingFactory):'
833 888 that the result failed and later receive the actual result.
834 889 """
835 890
836 outstanding = self.queues.pop(eid)
891 outstanding = self.queues[eid]
837 892
838 893 for msg_id in outstanding:
839 894 self.pending.remove(msg_id)
@@ -904,7 +904,7 b' class PBSEngineSetLauncher(PBSLauncher):'
904 904
905 905 def start(self, n, cluster_dir):
906 906 """Start n engines by profile or cluster_dir."""
907 self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
907 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
908 908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
909 909
910 910 #SGE is very similar to PBS
@@ -942,7 +942,7 b' class SGEEngineSetLauncher(SGELauncher):'
942 942
943 943 def start(self, n, cluster_dir):
944 944 """Start n engines by profile or cluster_dir."""
945 self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
945 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947 947
948 948
@@ -13,6 +13,8 b' from datetime import datetime'
13 13
14 14 import sqlite3
15 15
16 from zmq.eventloop import ioloop
17
16 18 from IPython.utils.traitlets import CUnicode, CStr, Instance, List
17 19 from .dictdb import BaseDB
18 20 from .util import ISO8601
@@ -114,6 +116,13 b' class SQLiteDB(BaseDB):'
114 116 else:
115 117 self.location = '.'
116 118 self._init_db()
119
120 # register db commit as 2s periodic callback
121 # to prevent clogging pipes
122 # assumes we are being run in a zmq ioloop app
123 loop = ioloop.IOLoop.instance()
124 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
125 pc.start()
117 126
118 127 def _defaults(self):
119 128 """create an empty record"""
@@ -133,7 +142,9 b' class SQLiteDB(BaseDB):'
133 142 sqlite3.register_converter('bufs', _convert_bufs)
134 143 # connect to the db
135 144 dbfile = os.path.join(self.location, self.filename)
136 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, cached_statements=16)
145 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
146 # isolation_level = None)#,
147 cached_statements=64)
137 148 # print dir(self._db)
138 149
139 150 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
@@ -218,7 +229,7 b' class SQLiteDB(BaseDB):'
218 229 line = self._dict_to_list(d)
219 230 tups = '(%s)'%(','.join(['?']*len(line)))
220 231 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
221 self._db.commit()
232 # self._db.commit()
222 233
223 234 def get_record(self, msg_id):
224 235 """Get a specific Task Record, by msg_id."""
@@ -240,19 +251,19 b' class SQLiteDB(BaseDB):'
240 251 query += ', '.join(sets)
241 252 query += ' WHERE msg_id == %r'%msg_id
242 253 self._db.execute(query, values)
243 self._db.commit()
254 # self._db.commit()
244 255
245 256 def drop_record(self, msg_id):
246 257 """Remove a record from the DB."""
247 258 self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
248 self._db.commit()
259 # self._db.commit()
249 260
250 261 def drop_matching_records(self, check):
251 262 """Remove a record from the DB."""
252 263 expr,args = self._render_expression(check)
253 264 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
254 265 self._db.execute(query,args)
255 self._db.commit()
266 # self._db.commit()
256 267
257 268 def find_records(self, check, id_only=False):
258 269 """Find records matching a query dict."""
@@ -28,7 +28,7 b' import time'
28 28 from numpy import exp, zeros, newaxis, sqrt
29 29
30 30 from IPython.external import argparse
31 from IPython.parallel.client import Client, Reference
31 from IPython.parallel import Client, Reference
32 32
33 33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 34 """create a partitioner in the engine namespace"""
@@ -28,7 +28,7 b' import time'
28 28 from numpy import exp, zeros, newaxis, sqrt
29 29
30 30 from IPython.external import argparse
31 from IPython.parallel.client import Client, Reference
31 from IPython.parallel import Client, Reference
32 32
33 33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 34 """create a partitioner in the engine namespace"""
General Comments 0
You need to be logged in to leave comments. Login now