##// END OF EJS Templates
use 'timestamp' datatype for timestamps in sqlitedb
MinRK -
Show More
@@ -1,180 +1,181
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 TaskRecords are dicts of the form:
4 TaskRecords are dicts of the form:
5 {
5 {
6 'msg_id' : str(uuid),
6 'msg_id' : str(uuid),
7 'client_uuid' : str(uuid),
7 'client_uuid' : str(uuid),
8 'engine_uuid' : str(uuid) or None,
8 'engine_uuid' : str(uuid) or None,
9 'header' : dict(header),
9 'header' : dict(header),
10 'content': dict(content),
10 'content': dict(content),
11 'buffers': list(buffers),
11 'buffers': list(buffers),
12 'submitted': datetime,
12 'submitted': datetime,
13 'started': datetime or None,
13 'started': datetime or None,
14 'completed': datetime or None,
14 'completed': datetime or None,
15 'resubmitted': datetime or None,
15 'resubmitted': datetime or None,
16 'result_header' : dict(header) or None,
16 'result_header' : dict(header) or None,
17 'result_content' : dict(content) or None,
17 'result_content' : dict(content) or None,
18 'result_buffers' : list(buffers) or None,
18 'result_buffers' : list(buffers) or None,
19 }
19 }
20 With this info, many of the special categories of tasks can be defined by query:
20 With this info, many of the special categories of tasks can be defined by query:
21
21
22 pending: completed is None
22 pending: completed is None
23 client's outstanding: client_uuid = uuid && completed is None
23 client's outstanding: client_uuid = uuid && completed is None
24 MIA: arrived is None (and completed is None)
24 MIA: arrived is None (and completed is None)
25 etc.
25 etc.
26
26
27 EngineRecords are dicts of the form:
27 EngineRecords are dicts of the form:
28 {
28 {
29 'eid' : int(id),
29 'eid' : int(id),
30 'uuid': str(uuid)
30 'uuid': str(uuid)
31 }
31 }
32 This may be extended, but is currently.
32 This may be extended, but is currently.
33
33
34 We support a subset of mongodb operators:
34 We support a subset of mongodb operators:
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 """
36 """
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Copyright (C) 2010 The IPython Development Team
38 # Copyright (C) 2010 The IPython Development Team
39 #
39 #
40 # Distributed under the terms of the BSD License. The full license is in
40 # Distributed under the terms of the BSD License. The full license is in
41 # the file COPYING, distributed as part of this software.
41 # the file COPYING, distributed as part of this software.
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44
44
45 from datetime import datetime
45 from datetime import datetime
46
46
47 from IPython.config.configurable import Configurable
47 from IPython.config.configurable import Configurable
48
48
49 from IPython.utils.traitlets import Dict, Unicode
49 from IPython.utils.traitlets import Dict, Unicode, Instance
50
50
51 filters = {
51 filters = {
52 '$lt' : lambda a,b: a < b,
52 '$lt' : lambda a,b: a < b,
53 '$gt' : lambda a,b: b > a,
53 '$gt' : lambda a,b: b > a,
54 '$eq' : lambda a,b: a == b,
54 '$eq' : lambda a,b: a == b,
55 '$ne' : lambda a,b: a != b,
55 '$ne' : lambda a,b: a != b,
56 '$lte': lambda a,b: a <= b,
56 '$lte': lambda a,b: a <= b,
57 '$gte': lambda a,b: a >= b,
57 '$gte': lambda a,b: a >= b,
58 '$in' : lambda a,b: a in b,
58 '$in' : lambda a,b: a in b,
59 '$nin': lambda a,b: a not in b,
59 '$nin': lambda a,b: a not in b,
60 '$all': lambda a,b: all([ a in bb for bb in b ]),
60 '$all': lambda a,b: all([ a in bb for bb in b ]),
61 '$mod': lambda a,b: a%b[0] == b[1],
61 '$mod': lambda a,b: a%b[0] == b[1],
62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
63 }
63 }
64
64
65
65
66 class CompositeFilter(object):
66 class CompositeFilter(object):
67 """Composite filter for matching multiple properties."""
67 """Composite filter for matching multiple properties."""
68
68
69 def __init__(self, dikt):
69 def __init__(self, dikt):
70 self.tests = []
70 self.tests = []
71 self.values = []
71 self.values = []
72 for key, value in dikt.iteritems():
72 for key, value in dikt.iteritems():
73 self.tests.append(filters[key])
73 self.tests.append(filters[key])
74 self.values.append(value)
74 self.values.append(value)
75
75
76 def __call__(self, value):
76 def __call__(self, value):
77 for test,check in zip(self.tests, self.values):
77 for test,check in zip(self.tests, self.values):
78 if not test(value, check):
78 if not test(value, check):
79 return False
79 return False
80 return True
80 return True
81
81
82 class BaseDB(Configurable):
82 class BaseDB(Configurable):
83 """Empty Parent class so traitlets work on DB."""
83 """Empty Parent class so traitlets work on DB."""
84 # base configurable traits:
84 # base configurable traits:
85 session = Unicode("")
85 session = Unicode("")
86 log = Instance('logging.Logger', ('root',))
86
87
87 class DictDB(BaseDB):
88 class DictDB(BaseDB):
88 """Basic in-memory dict-based object for saving Task Records.
89 """Basic in-memory dict-based object for saving Task Records.
89
90
90 This is the first object to present the DB interface
91 This is the first object to present the DB interface
91 for logging tasks out of memory.
92 for logging tasks out of memory.
92
93
93 The interface is based on MongoDB, so adding a MongoDB
94 The interface is based on MongoDB, so adding a MongoDB
94 backend should be straightforward.
95 backend should be straightforward.
95 """
96 """
96
97
97 _records = Dict()
98 _records = Dict()
98
99
99 def _match_one(self, rec, tests):
100 def _match_one(self, rec, tests):
100 """Check if a specific record matches tests."""
101 """Check if a specific record matches tests."""
101 for key,test in tests.iteritems():
102 for key,test in tests.iteritems():
102 if not test(rec.get(key, None)):
103 if not test(rec.get(key, None)):
103 return False
104 return False
104 return True
105 return True
105
106
106 def _match(self, check):
107 def _match(self, check):
107 """Find all the matches for a check dict."""
108 """Find all the matches for a check dict."""
108 matches = []
109 matches = []
109 tests = {}
110 tests = {}
110 for k,v in check.iteritems():
111 for k,v in check.iteritems():
111 if isinstance(v, dict):
112 if isinstance(v, dict):
112 tests[k] = CompositeFilter(v)
113 tests[k] = CompositeFilter(v)
113 else:
114 else:
114 tests[k] = lambda o: o==v
115 tests[k] = lambda o: o==v
115
116
116 for rec in self._records.itervalues():
117 for rec in self._records.itervalues():
117 if self._match_one(rec, tests):
118 if self._match_one(rec, tests):
118 matches.append(rec)
119 matches.append(rec)
119 return matches
120 return matches
120
121
121 def _extract_subdict(self, rec, keys):
122 def _extract_subdict(self, rec, keys):
122 """extract subdict of keys"""
123 """extract subdict of keys"""
123 d = {}
124 d = {}
124 d['msg_id'] = rec['msg_id']
125 d['msg_id'] = rec['msg_id']
125 for key in keys:
126 for key in keys:
126 d[key] = rec[key]
127 d[key] = rec[key]
127 return d
128 return d
128
129
129 def add_record(self, msg_id, rec):
130 def add_record(self, msg_id, rec):
130 """Add a new Task Record, by msg_id."""
131 """Add a new Task Record, by msg_id."""
131 if self._records.has_key(msg_id):
132 if self._records.has_key(msg_id):
132 raise KeyError("Already have msg_id %r"%(msg_id))
133 raise KeyError("Already have msg_id %r"%(msg_id))
133 self._records[msg_id] = rec
134 self._records[msg_id] = rec
134
135
135 def get_record(self, msg_id):
136 def get_record(self, msg_id):
136 """Get a specific Task Record, by msg_id."""
137 """Get a specific Task Record, by msg_id."""
137 if not self._records.has_key(msg_id):
138 if not self._records.has_key(msg_id):
138 raise KeyError("No such msg_id %r"%(msg_id))
139 raise KeyError("No such msg_id %r"%(msg_id))
139 return self._records[msg_id]
140 return self._records[msg_id]
140
141
141 def update_record(self, msg_id, rec):
142 def update_record(self, msg_id, rec):
142 """Update the data in an existing record."""
143 """Update the data in an existing record."""
143 self._records[msg_id].update(rec)
144 self._records[msg_id].update(rec)
144
145
145 def drop_matching_records(self, check):
146 def drop_matching_records(self, check):
146 """Remove a record from the DB."""
147 """Remove a record from the DB."""
147 matches = self._match(check)
148 matches = self._match(check)
148 for m in matches:
149 for m in matches:
149 del self._records[m['msg_id']]
150 del self._records[m['msg_id']]
150
151
151 def drop_record(self, msg_id):
152 def drop_record(self, msg_id):
152 """Remove a record from the DB."""
153 """Remove a record from the DB."""
153 del self._records[msg_id]
154 del self._records[msg_id]
154
155
155
156
156 def find_records(self, check, keys=None):
157 def find_records(self, check, keys=None):
157 """Find records matching a query dict, optionally extracting subset of keys.
158 """Find records matching a query dict, optionally extracting subset of keys.
158
159
159 Returns dict keyed by msg_id of matching records.
160 Returns dict keyed by msg_id of matching records.
160
161
161 Parameters
162 Parameters
162 ----------
163 ----------
163
164
164 check: dict
165 check: dict
165 mongodb-style query argument
166 mongodb-style query argument
166 keys: list of strs [optional]
167 keys: list of strs [optional]
167 if specified, the subset of keys to extract. msg_id will *always* be
168 if specified, the subset of keys to extract. msg_id will *always* be
168 included.
169 included.
169 """
170 """
170 matches = self._match(check)
171 matches = self._match(check)
171 if keys:
172 if keys:
172 return [ self._extract_subdict(rec, keys) for rec in matches ]
173 return [ self._extract_subdict(rec, keys) for rec in matches ]
173 else:
174 else:
174 return matches
175 return matches
175
176
176
177
177 def get_history(self):
178 def get_history(self):
178 """get all msg_ids, ordered by time submitted."""
179 """get all msg_ids, ordered by time submitted."""
179 msg_ids = self._records.keys()
180 msg_ids = self._records.keys()
180 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
181 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
@@ -1,339 +1,386
1 """A TaskRecord backend using sqlite3"""
1 """A TaskRecord backend using sqlite3"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 import json
9 import json
10 import os
10 import os
11 import cPickle as pickle
11 import cPickle as pickle
12 from datetime import datetime
12 from datetime import datetime
13
13
14 import sqlite3
14 import sqlite3
15
15
16 from zmq.eventloop import ioloop
16 from zmq.eventloop import ioloop
17
17
18 from IPython.utils.traitlets import Unicode, Instance, List
18 from IPython.utils.traitlets import Unicode, Instance, List, Dict
19 from .dictdb import BaseDB
19 from .dictdb import BaseDB
20 from IPython.utils.jsonutil import date_default, extract_dates
20 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
21
21
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23 # SQLite operators, adapters, and converters
23 # SQLite operators, adapters, and converters
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25
25
26 operators = {
26 operators = {
27 '$lt' : "<",
27 '$lt' : "<",
28 '$gt' : ">",
28 '$gt' : ">",
29 # null is handled weird with ==,!=
29 # null is handled weird with ==,!=
30 '$eq' : "=",
30 '$eq' : "=",
31 '$ne' : "!=",
31 '$ne' : "!=",
32 '$lte': "<=",
32 '$lte': "<=",
33 '$gte': ">=",
33 '$gte': ">=",
34 '$in' : ('=', ' OR '),
34 '$in' : ('=', ' OR '),
35 '$nin': ('!=', ' AND '),
35 '$nin': ('!=', ' AND '),
36 # '$all': None,
36 # '$all': None,
37 # '$mod': None,
37 # '$mod': None,
38 # '$exists' : None
38 # '$exists' : None
39 }
39 }
40 null_operators = {
40 null_operators = {
41 '=' : "IS NULL",
41 '=' : "IS NULL",
42 '!=' : "IS NOT NULL",
42 '!=' : "IS NOT NULL",
43 }
43 }
44
44
45 def _adapt_datetime(dt):
46 return dt.strftime(ISO8601)
47
48 def _convert_datetime(ds):
49 if ds is None:
50 return ds
51 else:
52 return datetime.strptime(ds, ISO8601)
53
54 def _adapt_dict(d):
45 def _adapt_dict(d):
55 return json.dumps(d, default=date_default)
46 return json.dumps(d, default=date_default)
56
47
57 def _convert_dict(ds):
48 def _convert_dict(ds):
58 if ds is None:
49 if ds is None:
59 return ds
50 return ds
60 else:
51 else:
61 return extract_dates(json.loads(ds))
52 return extract_dates(json.loads(ds))
62
53
63 def _adapt_bufs(bufs):
54 def _adapt_bufs(bufs):
64 # this is *horrible*
55 # this is *horrible*
65 # copy buffers into single list and pickle it:
56 # copy buffers into single list and pickle it:
66 if bufs and isinstance(bufs[0], (bytes, buffer)):
57 if bufs and isinstance(bufs[0], (bytes, buffer)):
67 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
58 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
68 elif bufs:
59 elif bufs:
69 return bufs
60 return bufs
70 else:
61 else:
71 return None
62 return None
72
63
73 def _convert_bufs(bs):
64 def _convert_bufs(bs):
74 if bs is None:
65 if bs is None:
75 return []
66 return []
76 else:
67 else:
77 return pickle.loads(bytes(bs))
68 return pickle.loads(bytes(bs))
78
69
79 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
80 # SQLiteDB class
71 # SQLiteDB class
81 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
82
73
83 class SQLiteDB(BaseDB):
74 class SQLiteDB(BaseDB):
84 """SQLite3 TaskRecord backend."""
75 """SQLite3 TaskRecord backend."""
85
76
86 filename = Unicode('tasks.db', config=True,
77 filename = Unicode('tasks.db', config=True,
87 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
78 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
88 location = Unicode('', config=True,
79 location = Unicode('', config=True,
89 help="""The directory containing the sqlite task database. The default
80 help="""The directory containing the sqlite task database. The default
90 is to use the cluster_dir location.""")
81 is to use the cluster_dir location.""")
91 table = Unicode("", config=True,
82 table = Unicode("", config=True,
92 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
83 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
93 a new table will be created with the Hub's IDENT. Specifying the table will result
84 a new table will be created with the Hub's IDENT. Specifying the table will result
94 in tasks from previous sessions being available via Clients' db_query and
85 in tasks from previous sessions being available via Clients' db_query and
95 get_result methods.""")
86 get_result methods.""")
96
87
97 _db = Instance('sqlite3.Connection')
88 _db = Instance('sqlite3.Connection')
89 # the ordered list of column names
98 _keys = List(['msg_id' ,
90 _keys = List(['msg_id' ,
99 'header' ,
91 'header' ,
100 'content',
92 'content',
101 'buffers',
93 'buffers',
102 'submitted',
94 'submitted',
103 'client_uuid' ,
95 'client_uuid' ,
104 'engine_uuid' ,
96 'engine_uuid' ,
105 'started',
97 'started',
106 'completed',
98 'completed',
107 'resubmitted',
99 'resubmitted',
108 'result_header' ,
100 'result_header' ,
109 'result_content' ,
101 'result_content' ,
110 'result_buffers' ,
102 'result_buffers' ,
111 'queue' ,
103 'queue' ,
112 'pyin' ,
104 'pyin' ,
113 'pyout',
105 'pyout',
114 'pyerr',
106 'pyerr',
115 'stdout',
107 'stdout',
116 'stderr',
108 'stderr',
117 ])
109 ])
110 # sqlite datatypes for checking that db is current format
111 _types = Dict({'msg_id' : 'text' ,
112 'header' : 'dict text',
113 'content' : 'dict text',
114 'buffers' : 'bufs blob',
115 'submitted' : 'timestamp',
116 'client_uuid' : 'text',
117 'engine_uuid' : 'text',
118 'started' : 'timestamp',
119 'completed' : 'timestamp',
120 'resubmitted' : 'timestamp',
121 'result_header' : 'dict text',
122 'result_content' : 'dict text',
123 'result_buffers' : 'bufs blob',
124 'queue' : 'text',
125 'pyin' : 'text',
126 'pyout' : 'text',
127 'pyerr' : 'text',
128 'stdout' : 'text',
129 'stderr' : 'text',
130 })
118
131
119 def __init__(self, **kwargs):
132 def __init__(self, **kwargs):
120 super(SQLiteDB, self).__init__(**kwargs)
133 super(SQLiteDB, self).__init__(**kwargs)
121 if not self.table:
134 if not self.table:
122 # use session, and prefix _, since starting with # is illegal
135 # use session, and prefix _, since starting with # is illegal
123 self.table = '_'+self.session.replace('-','_')
136 self.table = '_'+self.session.replace('-','_')
124 if not self.location:
137 if not self.location:
125 # get current profile
138 # get current profile
126 from IPython.core.newapplication import BaseIPythonApplication
139 from IPython.core.newapplication import BaseIPythonApplication
127 if BaseIPythonApplication.initialized():
140 if BaseIPythonApplication.initialized():
128 app = BaseIPythonApplication.instance()
141 app = BaseIPythonApplication.instance()
129 if app.profile_dir is not None:
142 if app.profile_dir is not None:
130 self.location = app.profile_dir.location
143 self.location = app.profile_dir.location
131 else:
144 else:
132 self.location = u'.'
145 self.location = u'.'
133 else:
146 else:
134 self.location = u'.'
147 self.location = u'.'
135 self._init_db()
148 self._init_db()
136
149
137 # register db commit as 2s periodic callback
150 # register db commit as 2s periodic callback
138 # to prevent clogging pipes
151 # to prevent clogging pipes
139 # assumes we are being run in a zmq ioloop app
152 # assumes we are being run in a zmq ioloop app
140 loop = ioloop.IOLoop.instance()
153 loop = ioloop.IOLoop.instance()
141 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
154 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
142 pc.start()
155 pc.start()
143
156
144 def _defaults(self, keys=None):
157 def _defaults(self, keys=None):
145 """create an empty record"""
158 """create an empty record"""
146 d = {}
159 d = {}
147 keys = self._keys if keys is None else keys
160 keys = self._keys if keys is None else keys
148 for key in keys:
161 for key in keys:
149 d[key] = None
162 d[key] = None
150 return d
163 return d
151
164
165 def _check_table(self):
166 """Ensure that an incorrect table doesn't exist
167
168 If a bad (old) table does exist, return False
169 """
170 cursor = self._db.execute("PRAGMA table_info(%s)"%self.table)
171 lines = cursor.fetchall()
172 if not lines:
173 # table does not exist
174 return True
175 types = {}
176 keys = []
177 for line in lines:
178 keys.append(line[1])
179 types[line[1]] = line[2]
180 if self._keys != keys:
181 # key mismatch
182 self.log.warn('keys mismatch')
183 return False
184 for key in self._keys:
185 if types[key] != self._types[key]:
186 self.log.warn(
187 'type mismatch: %s: %s != %s'%(key,types[key],self._types[key])
188 )
189 return False
190 return True
191
152 def _init_db(self):
192 def _init_db(self):
153 """Connect to the database and get new session number."""
193 """Connect to the database and get new session number."""
154 # register adapters
194 # register adapters
155 sqlite3.register_adapter(datetime, _adapt_datetime)
156 sqlite3.register_converter('datetime', _convert_datetime)
157 sqlite3.register_adapter(dict, _adapt_dict)
195 sqlite3.register_adapter(dict, _adapt_dict)
158 sqlite3.register_converter('dict', _convert_dict)
196 sqlite3.register_converter('dict', _convert_dict)
159 sqlite3.register_adapter(list, _adapt_bufs)
197 sqlite3.register_adapter(list, _adapt_bufs)
160 sqlite3.register_converter('bufs', _convert_bufs)
198 sqlite3.register_converter('bufs', _convert_bufs)
161 # connect to the db
199 # connect to the db
162 dbfile = os.path.join(self.location, self.filename)
200 dbfile = os.path.join(self.location, self.filename)
163 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
201 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
164 # isolation_level = None)#,
202 # isolation_level = None)#,
165 cached_statements=64)
203 cached_statements=64)
166 # print dir(self._db)
204 # print dir(self._db)
205 first_table = self.table
206 i=0
207 while not self._check_table():
208 i+=1
209 self.table = first_table+'_%i'%i
210 self.log.warn(
211 "Table %s exists and doesn't match db format, trying %s"%
212 (first_table,self.table)
213 )
167
214
168 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
215 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
169 (msg_id text PRIMARY KEY,
216 (msg_id text PRIMARY KEY,
170 header dict text,
217 header dict text,
171 content dict text,
218 content dict text,
172 buffers bufs blob,
219 buffers bufs blob,
173 submitted datetime text,
220 submitted timestamp,
174 client_uuid text,
221 client_uuid text,
175 engine_uuid text,
222 engine_uuid text,
176 started datetime text,
223 started timestamp,
177 completed datetime text,
224 completed timestamp,
178 resubmitted datetime text,
225 resubmitted timestamp,
179 result_header dict text,
226 result_header dict text,
180 result_content dict text,
227 result_content dict text,
181 result_buffers bufs blob,
228 result_buffers bufs blob,
182 queue text,
229 queue text,
183 pyin text,
230 pyin text,
184 pyout text,
231 pyout text,
185 pyerr text,
232 pyerr text,
186 stdout text,
233 stdout text,
187 stderr text)
234 stderr text)
188 """%self.table)
235 """%self.table)
189 self._db.commit()
236 self._db.commit()
190
237
191 def _dict_to_list(self, d):
238 def _dict_to_list(self, d):
192 """turn a mongodb-style record dict into a list."""
239 """turn a mongodb-style record dict into a list."""
193
240
194 return [ d[key] for key in self._keys ]
241 return [ d[key] for key in self._keys ]
195
242
196 def _list_to_dict(self, line, keys=None):
243 def _list_to_dict(self, line, keys=None):
197 """Inverse of dict_to_list"""
244 """Inverse of dict_to_list"""
198 keys = self._keys if keys is None else keys
245 keys = self._keys if keys is None else keys
199 d = self._defaults(keys)
246 d = self._defaults(keys)
200 for key,value in zip(keys, line):
247 for key,value in zip(keys, line):
201 d[key] = value
248 d[key] = value
202
249
203 return d
250 return d
204
251
205 def _render_expression(self, check):
252 def _render_expression(self, check):
206 """Turn a mongodb-style search dict into an SQL query."""
253 """Turn a mongodb-style search dict into an SQL query."""
207 expressions = []
254 expressions = []
208 args = []
255 args = []
209
256
210 skeys = set(check.keys())
257 skeys = set(check.keys())
211 skeys.difference_update(set(self._keys))
258 skeys.difference_update(set(self._keys))
212 skeys.difference_update(set(['buffers', 'result_buffers']))
259 skeys.difference_update(set(['buffers', 'result_buffers']))
213 if skeys:
260 if skeys:
214 raise KeyError("Illegal testing key(s): %s"%skeys)
261 raise KeyError("Illegal testing key(s): %s"%skeys)
215
262
216 for name,sub_check in check.iteritems():
263 for name,sub_check in check.iteritems():
217 if isinstance(sub_check, dict):
264 if isinstance(sub_check, dict):
218 for test,value in sub_check.iteritems():
265 for test,value in sub_check.iteritems():
219 try:
266 try:
220 op = operators[test]
267 op = operators[test]
221 except KeyError:
268 except KeyError:
222 raise KeyError("Unsupported operator: %r"%test)
269 raise KeyError("Unsupported operator: %r"%test)
223 if isinstance(op, tuple):
270 if isinstance(op, tuple):
224 op, join = op
271 op, join = op
225
272
226 if value is None and op in null_operators:
273 if value is None and op in null_operators:
227 expr = "%s %s"%null_operators[op]
274 expr = "%s %s"%null_operators[op]
228 else:
275 else:
229 expr = "%s %s ?"%(name, op)
276 expr = "%s %s ?"%(name, op)
230 if isinstance(value, (tuple,list)):
277 if isinstance(value, (tuple,list)):
231 if op in null_operators and any([v is None for v in value]):
278 if op in null_operators and any([v is None for v in value]):
232 # equality tests don't work with NULL
279 # equality tests don't work with NULL
233 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
280 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
234 expr = '( %s )'%( join.join([expr]*len(value)) )
281 expr = '( %s )'%( join.join([expr]*len(value)) )
235 args.extend(value)
282 args.extend(value)
236 else:
283 else:
237 args.append(value)
284 args.append(value)
238 expressions.append(expr)
285 expressions.append(expr)
239 else:
286 else:
240 # it's an equality check
287 # it's an equality check
241 if sub_check is None:
288 if sub_check is None:
242 expressions.append("%s IS NULL")
289 expressions.append("%s IS NULL")
243 else:
290 else:
244 expressions.append("%s = ?"%name)
291 expressions.append("%s = ?"%name)
245 args.append(sub_check)
292 args.append(sub_check)
246
293
247 expr = " AND ".join(expressions)
294 expr = " AND ".join(expressions)
248 return expr, args
295 return expr, args
249
296
250 def add_record(self, msg_id, rec):
297 def add_record(self, msg_id, rec):
251 """Add a new Task Record, by msg_id."""
298 """Add a new Task Record, by msg_id."""
252 d = self._defaults()
299 d = self._defaults()
253 d.update(rec)
300 d.update(rec)
254 d['msg_id'] = msg_id
301 d['msg_id'] = msg_id
255 line = self._dict_to_list(d)
302 line = self._dict_to_list(d)
256 tups = '(%s)'%(','.join(['?']*len(line)))
303 tups = '(%s)'%(','.join(['?']*len(line)))
257 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
304 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
258 # self._db.commit()
305 # self._db.commit()
259
306
260 def get_record(self, msg_id):
307 def get_record(self, msg_id):
261 """Get a specific Task Record, by msg_id."""
308 """Get a specific Task Record, by msg_id."""
262 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
309 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
263 line = cursor.fetchone()
310 line = cursor.fetchone()
264 if line is None:
311 if line is None:
265 raise KeyError("No such msg: %r"%msg_id)
312 raise KeyError("No such msg: %r"%msg_id)
266 return self._list_to_dict(line)
313 return self._list_to_dict(line)
267
314
268 def update_record(self, msg_id, rec):
315 def update_record(self, msg_id, rec):
269 """Update the data in an existing record."""
316 """Update the data in an existing record."""
270 query = "UPDATE %s SET "%self.table
317 query = "UPDATE %s SET "%self.table
271 sets = []
318 sets = []
272 keys = sorted(rec.keys())
319 keys = sorted(rec.keys())
273 values = []
320 values = []
274 for key in keys:
321 for key in keys:
275 sets.append('%s = ?'%key)
322 sets.append('%s = ?'%key)
276 values.append(rec[key])
323 values.append(rec[key])
277 query += ', '.join(sets)
324 query += ', '.join(sets)
278 query += ' WHERE msg_id == ?'
325 query += ' WHERE msg_id == ?'
279 values.append(msg_id)
326 values.append(msg_id)
280 self._db.execute(query, values)
327 self._db.execute(query, values)
281 # self._db.commit()
328 # self._db.commit()
282
329
283 def drop_record(self, msg_id):
330 def drop_record(self, msg_id):
284 """Remove a record from the DB."""
331 """Remove a record from the DB."""
285 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
332 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
286 # self._db.commit()
333 # self._db.commit()
287
334
288 def drop_matching_records(self, check):
335 def drop_matching_records(self, check):
289 """Remove a record from the DB."""
336 """Remove a record from the DB."""
290 expr,args = self._render_expression(check)
337 expr,args = self._render_expression(check)
291 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
338 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
292 self._db.execute(query,args)
339 self._db.execute(query,args)
293 # self._db.commit()
340 # self._db.commit()
294
341
295 def find_records(self, check, keys=None):
342 def find_records(self, check, keys=None):
296 """Find records matching a query dict, optionally extracting subset of keys.
343 """Find records matching a query dict, optionally extracting subset of keys.
297
344
298 Returns list of matching records.
345 Returns list of matching records.
299
346
300 Parameters
347 Parameters
301 ----------
348 ----------
302
349
303 check: dict
350 check: dict
304 mongodb-style query argument
351 mongodb-style query argument
305 keys: list of strs [optional]
352 keys: list of strs [optional]
306 if specified, the subset of keys to extract. msg_id will *always* be
353 if specified, the subset of keys to extract. msg_id will *always* be
307 included.
354 included.
308 """
355 """
309 if keys:
356 if keys:
310 bad_keys = [ key for key in keys if key not in self._keys ]
357 bad_keys = [ key for key in keys if key not in self._keys ]
311 if bad_keys:
358 if bad_keys:
312 raise KeyError("Bad record key(s): %s"%bad_keys)
359 raise KeyError("Bad record key(s): %s"%bad_keys)
313
360
314 if keys:
361 if keys:
315 # ensure msg_id is present and first:
362 # ensure msg_id is present and first:
316 if 'msg_id' in keys:
363 if 'msg_id' in keys:
317 keys.remove('msg_id')
364 keys.remove('msg_id')
318 keys.insert(0, 'msg_id')
365 keys.insert(0, 'msg_id')
319 req = ', '.join(keys)
366 req = ', '.join(keys)
320 else:
367 else:
321 req = '*'
368 req = '*'
322 expr,args = self._render_expression(check)
369 expr,args = self._render_expression(check)
323 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
370 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
324 cursor = self._db.execute(query, args)
371 cursor = self._db.execute(query, args)
325 matches = cursor.fetchall()
372 matches = cursor.fetchall()
326 records = []
373 records = []
327 for line in matches:
374 for line in matches:
328 rec = self._list_to_dict(line, keys)
375 rec = self._list_to_dict(line, keys)
329 records.append(rec)
376 records.append(rec)
330 return records
377 return records
331
378
332 def get_history(self):
379 def get_history(self):
333 """get all msg_ids, ordered by time submitted."""
380 """get all msg_ids, ordered by time submitted."""
334 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
381 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
335 cursor = self._db.execute(query)
382 cursor = self._db.execute(query)
336 # will be a list of length 1 tuples
383 # will be a list of length 1 tuples
337 return [ tup[0] for tup in cursor.fetchall()]
384 return [ tup[0] for tup in cursor.fetchall()]
338
385
339 __all__ = ['SQLiteDB'] No newline at end of file
386 __all__ = ['SQLiteDB']
General Comments 0
You need to be logged in to leave comments. Login now