##// END OF EJS Templates
SGE test related fixes...
MinRK -
Show More
@@ -45,6 +45,30 b' def _printer(*args, **kwargs):'
45 print (args)
45 print (args)
46 print (kwargs)
46 print (kwargs)
47
47
48 def empty_record():
49 """Return an empty dict with all record keys."""
50 return {
51 'msg_id' : None,
52 'header' : None,
53 'content': None,
54 'buffers': None,
55 'submitted': None,
56 'client_uuid' : None,
57 'engine_uuid' : None,
58 'started': None,
59 'completed': None,
60 'resubmitted': None,
61 'result_header' : None,
62 'result_content' : None,
63 'result_buffers' : None,
64 'queue' : None,
65 'pyin' : None,
66 'pyout': None,
67 'pyerr': None,
68 'stdout': '',
69 'stderr': '',
70 }
71
48 def init_record(msg):
72 def init_record(msg):
49 """Initialize a TaskRecord based on a request."""
73 """Initialize a TaskRecord based on a request."""
50 header = msg['header']
74 header = msg['header']
@@ -283,6 +307,7 b' class Hub(LoggingFactory):'
283 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
307 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
284 completed=Dict() # completed msg_ids keyed by engine_id
308 completed=Dict() # completed msg_ids keyed by engine_id
285 all_completed=Set() # completed msg_ids keyed by engine_id
309 all_completed=Set() # completed msg_ids keyed by engine_id
310 dead_engines=Set() # completed msg_ids keyed by engine_id
286 # mia=None
311 # mia=None
287 incoming_registrations=Dict()
312 incoming_registrations=Dict()
288 registration_timeout=Int()
313 registration_timeout=Int()
@@ -531,9 +556,21 b' class Hub(LoggingFactory):'
531 record['client_uuid'] = client_id
556 record['client_uuid'] = client_id
532 record['queue'] = 'mux'
557 record['queue'] = 'mux'
533
558
559 try:
560 # it's posible iopub arrived first:
561 existing = self.db.get_record(msg_id)
562 for key,evalue in existing.iteritems():
563 rvalue = record[key]
564 if evalue and rvalue and evalue != rvalue:
565 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
566 elif evalue and not rvalue:
567 record[key] = evalue
568 self.db.update_record(msg_id, record)
569 except KeyError:
570 self.db.add_record(msg_id, record)
571
534 self.pending.add(msg_id)
572 self.pending.add(msg_id)
535 self.queues[eid].append(msg_id)
573 self.queues[eid].append(msg_id)
536 self.db.add_record(msg_id, record)
537
574
538 def save_queue_result(self, idents, msg):
575 def save_queue_result(self, idents, msg):
539 if len(idents) < 2:
576 if len(idents) < 2:
@@ -551,7 +588,7 b' class Hub(LoggingFactory):'
551 eid = self.by_ident.get(queue_id, None)
588 eid = self.by_ident.get(queue_id, None)
552 if eid is None:
589 if eid is None:
553 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
590 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
554 self.log.debug("queue:: %s"%msg[2:])
591 # self.log.debug("queue:: %s"%msg[2:])
555 return
592 return
556
593
557 parent = msg['parent_header']
594 parent = msg['parent_header']
@@ -604,7 +641,18 b' class Hub(LoggingFactory):'
604 header = msg['header']
641 header = msg['header']
605 msg_id = header['msg_id']
642 msg_id = header['msg_id']
606 self.pending.add(msg_id)
643 self.pending.add(msg_id)
607 self.db.add_record(msg_id, record)
644 try:
645 # it's posible iopub arrived first:
646 existing = self.db.get_record(msg_id)
647 for key,evalue in existing.iteritems():
648 rvalue = record[key]
649 if evalue and rvalue and evalue != rvalue:
650 self.log.error("conflicting initial state for record: %s:%s <> %s"%(msg_id, rvalue, evalue))
651 elif evalue and not rvalue:
652 record[key] = evalue
653 self.db.update_record(msg_id, record)
654 except KeyError:
655 self.db.add_record(msg_id, record)
608
656
609 def save_task_result(self, idents, msg):
657 def save_task_result(self, idents, msg):
610 """save the result of a completed task."""
658 """save the result of a completed task."""
@@ -704,9 +752,10 b' class Hub(LoggingFactory):'
704 # ensure msg_id is in db
752 # ensure msg_id is in db
705 try:
753 try:
706 rec = self.db.get_record(msg_id)
754 rec = self.db.get_record(msg_id)
707 except:
755 except KeyError:
708 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
756 rec = empty_record()
709 return
757 rec['msg_id'] = msg_id
758 self.db.add_record(msg_id, rec)
710 # stream
759 # stream
711 d = {}
760 d = {}
712 if msg_type == 'stream':
761 if msg_type == 'stream':
@@ -734,7 +783,8 b' class Hub(LoggingFactory):'
734 content.update(self.client_info)
783 content.update(self.client_info)
735 jsonable = {}
784 jsonable = {}
736 for k,v in self.keytable.iteritems():
785 for k,v in self.keytable.iteritems():
737 jsonable[str(k)] = v
786 if v not in self.dead_engines:
787 jsonable[str(k)] = v
738 content['engines'] = jsonable
788 content['engines'] = jsonable
739 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
789 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
740
790
@@ -812,15 +862,20 b' class Hub(LoggingFactory):'
812 return
862 return
813 self.log.info("registration::unregister_engine(%s)"%eid)
863 self.log.info("registration::unregister_engine(%s)"%eid)
814 # print (eid)
864 # print (eid)
815 content=dict(id=eid, queue=self.engines[eid].queue)
865 uuid = self.keytable[eid]
816 self.ids.remove(eid)
866 content=dict(id=eid, queue=uuid)
817 uuid = self.keytable.pop(eid)
867 self.dead_engines.add(uuid)
818 ec = self.engines.pop(eid)
868 # self.ids.remove(eid)
819 self.hearts.pop(ec.heartbeat)
869 # uuid = self.keytable.pop(eid)
820 self.by_ident.pop(ec.queue)
870 #
821 self.completed.pop(eid)
871 # ec = self.engines.pop(eid)
822 self._handle_stranded_msgs(eid, uuid)
872 # self.hearts.pop(ec.heartbeat)
823 ############## TODO: HANDLE IT ################
873 # self.by_ident.pop(ec.queue)
874 # self.completed.pop(eid)
875 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
876 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
877 dc.start()
878 ############## TODO: HANDLE IT ################
824
879
825 if self.notifier:
880 if self.notifier:
826 self.session.send(self.notifier, "unregistration_notification", content=content)
881 self.session.send(self.notifier, "unregistration_notification", content=content)
@@ -833,7 +888,7 b' class Hub(LoggingFactory):'
833 that the result failed and later receive the actual result.
888 that the result failed and later receive the actual result.
834 """
889 """
835
890
836 outstanding = self.queues.pop(eid)
891 outstanding = self.queues[eid]
837
892
838 for msg_id in outstanding:
893 for msg_id in outstanding:
839 self.pending.remove(msg_id)
894 self.pending.remove(msg_id)
@@ -904,7 +904,7 b' class PBSEngineSetLauncher(PBSLauncher):'
904
904
905 def start(self, n, cluster_dir):
905 def start(self, n, cluster_dir):
906 """Start n engines by profile or cluster_dir."""
906 """Start n engines by profile or cluster_dir."""
907 self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
907 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
909
909
910 #SGE is very similar to PBS
910 #SGE is very similar to PBS
@@ -942,7 +942,7 b' class SGEEngineSetLauncher(SGELauncher):'
942
942
943 def start(self, n, cluster_dir):
943 def start(self, n, cluster_dir):
944 """Start n engines by profile or cluster_dir."""
944 """Start n engines by profile or cluster_dir."""
945 self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
945 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947
947
948
948
@@ -13,6 +13,8 b' from datetime import datetime'
13
13
14 import sqlite3
14 import sqlite3
15
15
16 from zmq.eventloop import ioloop
17
16 from IPython.utils.traitlets import CUnicode, CStr, Instance, List
18 from IPython.utils.traitlets import CUnicode, CStr, Instance, List
17 from .dictdb import BaseDB
19 from .dictdb import BaseDB
18 from .util import ISO8601
20 from .util import ISO8601
@@ -114,6 +116,13 b' class SQLiteDB(BaseDB):'
114 else:
116 else:
115 self.location = '.'
117 self.location = '.'
116 self._init_db()
118 self._init_db()
119
120 # register db commit as 2s periodic callback
121 # to prevent clogging pipes
122 # assumes we are being run in a zmq ioloop app
123 loop = ioloop.IOLoop.instance()
124 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
125 pc.start()
117
126
118 def _defaults(self):
127 def _defaults(self):
119 """create an empty record"""
128 """create an empty record"""
@@ -133,7 +142,9 b' class SQLiteDB(BaseDB):'
133 sqlite3.register_converter('bufs', _convert_bufs)
142 sqlite3.register_converter('bufs', _convert_bufs)
134 # connect to the db
143 # connect to the db
135 dbfile = os.path.join(self.location, self.filename)
144 dbfile = os.path.join(self.location, self.filename)
136 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, cached_statements=16)
145 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
146 # isolation_level = None)#,
147 cached_statements=64)
137 # print dir(self._db)
148 # print dir(self._db)
138
149
139 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
150 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
@@ -218,7 +229,7 b' class SQLiteDB(BaseDB):'
218 line = self._dict_to_list(d)
229 line = self._dict_to_list(d)
219 tups = '(%s)'%(','.join(['?']*len(line)))
230 tups = '(%s)'%(','.join(['?']*len(line)))
220 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
231 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
221 self._db.commit()
232 # self._db.commit()
222
233
223 def get_record(self, msg_id):
234 def get_record(self, msg_id):
224 """Get a specific Task Record, by msg_id."""
235 """Get a specific Task Record, by msg_id."""
@@ -240,19 +251,19 b' class SQLiteDB(BaseDB):'
240 query += ', '.join(sets)
251 query += ', '.join(sets)
241 query += ' WHERE msg_id == %r'%msg_id
252 query += ' WHERE msg_id == %r'%msg_id
242 self._db.execute(query, values)
253 self._db.execute(query, values)
243 self._db.commit()
254 # self._db.commit()
244
255
245 def drop_record(self, msg_id):
256 def drop_record(self, msg_id):
246 """Remove a record from the DB."""
257 """Remove a record from the DB."""
247 self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
258 self._db.execute("""DELETE FROM %s WHERE mgs_id==?"""%self.table, (msg_id,))
248 self._db.commit()
259 # self._db.commit()
249
260
250 def drop_matching_records(self, check):
261 def drop_matching_records(self, check):
251 """Remove a record from the DB."""
262 """Remove a record from the DB."""
252 expr,args = self._render_expression(check)
263 expr,args = self._render_expression(check)
253 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
264 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
254 self._db.execute(query,args)
265 self._db.execute(query,args)
255 self._db.commit()
266 # self._db.commit()
256
267
257 def find_records(self, check, id_only=False):
268 def find_records(self, check, id_only=False):
258 """Find records matching a query dict."""
269 """Find records matching a query dict."""
@@ -28,7 +28,7 b' import time'
28 from numpy import exp, zeros, newaxis, sqrt
28 from numpy import exp, zeros, newaxis, sqrt
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.parallel.client import Client, Reference
31 from IPython.parallel import Client, Reference
32
32
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
33 def setup_partitioner(index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
@@ -28,7 +28,7 b' import time'
28 from numpy import exp, zeros, newaxis, sqrt
28 from numpy import exp, zeros, newaxis, sqrt
29
29
30 from IPython.external import argparse
30 from IPython.external import argparse
31 from IPython.parallel.client import Client, Reference
31 from IPython.parallel import Client, Reference
32
32
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
33 def setup_partitioner(comm, addrs, index, num_procs, gnum_cells, parts):
34 """create a partitioner in the engine namespace"""
34 """create a partitioner in the engine namespace"""
General Comments 0
You need to be logged in to leave comments. Login now