##// END OF EJS Templates
use 'timestamp' datatype for timestamps in sqlitedb
MinRK -
Show More
@@ -1,180 +1,181 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 TaskRecords are dicts of the form:
5 5 {
6 6 'msg_id' : str(uuid),
7 7 'client_uuid' : str(uuid),
8 8 'engine_uuid' : str(uuid) or None,
9 9 'header' : dict(header),
10 10 'content': dict(content),
11 11 'buffers': list(buffers),
12 12 'submitted': datetime,
13 13 'started': datetime or None,
14 14 'completed': datetime or None,
15 15 'resubmitted': datetime or None,
16 16 'result_header' : dict(header) or None,
17 17 'result_content' : dict(content) or None,
18 18 'result_buffers' : list(buffers) or None,
19 19 }
20 20 With this info, many of the special categories of tasks can be defined by query:
21 21
22 22 pending: completed is None
23 23 client's outstanding: client_uuid = uuid && completed is None
24 24 MIA: arrived is None (and completed is None)
25 25 etc.
26 26
27 27 EngineRecords are dicts of the form:
28 28 {
29 29 'eid' : int(id),
30 30 'uuid': str(uuid)
31 31 }
32 32 This may be extended, but is currently.
33 33
34 34 We support a subset of mongodb operators:
35 35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 36 """
37 37 #-----------------------------------------------------------------------------
38 38 # Copyright (C) 2010 The IPython Development Team
39 39 #
40 40 # Distributed under the terms of the BSD License. The full license is in
41 41 # the file COPYING, distributed as part of this software.
42 42 #-----------------------------------------------------------------------------
43 43
44 44
45 45 from datetime import datetime
46 46
47 47 from IPython.config.configurable import Configurable
48 48
49 from IPython.utils.traitlets import Dict, Unicode
49 from IPython.utils.traitlets import Dict, Unicode, Instance
50 50
51 51 filters = {
52 52 '$lt' : lambda a,b: a < b,
53 53 '$gt' : lambda a,b: b > a,
54 54 '$eq' : lambda a,b: a == b,
55 55 '$ne' : lambda a,b: a != b,
56 56 '$lte': lambda a,b: a <= b,
57 57 '$gte': lambda a,b: a >= b,
58 58 '$in' : lambda a,b: a in b,
59 59 '$nin': lambda a,b: a not in b,
60 60 '$all': lambda a,b: all([ a in bb for bb in b ]),
61 61 '$mod': lambda a,b: a%b[0] == b[1],
62 62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
63 63 }
64 64
65 65
66 66 class CompositeFilter(object):
67 67 """Composite filter for matching multiple properties."""
68 68
69 69 def __init__(self, dikt):
70 70 self.tests = []
71 71 self.values = []
72 72 for key, value in dikt.iteritems():
73 73 self.tests.append(filters[key])
74 74 self.values.append(value)
75 75
76 76 def __call__(self, value):
77 77 for test,check in zip(self.tests, self.values):
78 78 if not test(value, check):
79 79 return False
80 80 return True
81 81
82 82 class BaseDB(Configurable):
83 83 """Empty Parent class so traitlets work on DB."""
84 84 # base configurable traits:
85 85 session = Unicode("")
86 log = Instance('logging.Logger', ('root',))
86 87
87 88 class DictDB(BaseDB):
88 89 """Basic in-memory dict-based object for saving Task Records.
89 90
90 91 This is the first object to present the DB interface
91 92 for logging tasks out of memory.
92 93
93 94 The interface is based on MongoDB, so adding a MongoDB
94 95 backend should be straightforward.
95 96 """
96 97
97 98 _records = Dict()
98 99
99 100 def _match_one(self, rec, tests):
100 101 """Check if a specific record matches tests."""
101 102 for key,test in tests.iteritems():
102 103 if not test(rec.get(key, None)):
103 104 return False
104 105 return True
105 106
106 107 def _match(self, check):
107 108 """Find all the matches for a check dict."""
108 109 matches = []
109 110 tests = {}
110 111 for k,v in check.iteritems():
111 112 if isinstance(v, dict):
112 113 tests[k] = CompositeFilter(v)
113 114 else:
114 115 tests[k] = lambda o: o==v
115 116
116 117 for rec in self._records.itervalues():
117 118 if self._match_one(rec, tests):
118 119 matches.append(rec)
119 120 return matches
120 121
121 122 def _extract_subdict(self, rec, keys):
122 123 """extract subdict of keys"""
123 124 d = {}
124 125 d['msg_id'] = rec['msg_id']
125 126 for key in keys:
126 127 d[key] = rec[key]
127 128 return d
128 129
129 130 def add_record(self, msg_id, rec):
130 131 """Add a new Task Record, by msg_id."""
131 132 if self._records.has_key(msg_id):
132 133 raise KeyError("Already have msg_id %r"%(msg_id))
133 134 self._records[msg_id] = rec
134 135
135 136 def get_record(self, msg_id):
136 137 """Get a specific Task Record, by msg_id."""
137 138 if not self._records.has_key(msg_id):
138 139 raise KeyError("No such msg_id %r"%(msg_id))
139 140 return self._records[msg_id]
140 141
141 142 def update_record(self, msg_id, rec):
142 143 """Update the data in an existing record."""
143 144 self._records[msg_id].update(rec)
144 145
145 146 def drop_matching_records(self, check):
146 147 """Remove a record from the DB."""
147 148 matches = self._match(check)
148 149 for m in matches:
149 150 del self._records[m['msg_id']]
150 151
151 152 def drop_record(self, msg_id):
152 153 """Remove a record from the DB."""
153 154 del self._records[msg_id]
154 155
155 156
156 157 def find_records(self, check, keys=None):
157 158 """Find records matching a query dict, optionally extracting subset of keys.
158 159
159 160 Returns dict keyed by msg_id of matching records.
160 161
161 162 Parameters
162 163 ----------
163 164
164 165 check: dict
165 166 mongodb-style query argument
166 167 keys: list of strs [optional]
167 168 if specified, the subset of keys to extract. msg_id will *always* be
168 169 included.
169 170 """
170 171 matches = self._match(check)
171 172 if keys:
172 173 return [ self._extract_subdict(rec, keys) for rec in matches ]
173 174 else:
174 175 return matches
175 176
176 177
177 178 def get_history(self):
178 179 """get all msg_ids, ordered by time submitted."""
179 180 msg_ids = self._records.keys()
180 181 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
@@ -1,339 +1,386 b''
1 1 """A TaskRecord backend using sqlite3"""
2 2 #-----------------------------------------------------------------------------
3 3 # Copyright (C) 2011 The IPython Development Team
4 4 #
5 5 # Distributed under the terms of the BSD License. The full license is in
6 6 # the file COPYING, distributed as part of this software.
7 7 #-----------------------------------------------------------------------------
8 8
9 9 import json
10 10 import os
11 11 import cPickle as pickle
12 12 from datetime import datetime
13 13
14 14 import sqlite3
15 15
16 16 from zmq.eventloop import ioloop
17 17
18 from IPython.utils.traitlets import Unicode, Instance, List
18 from IPython.utils.traitlets import Unicode, Instance, List, Dict
19 19 from .dictdb import BaseDB
20 from IPython.utils.jsonutil import date_default, extract_dates
20 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
21 21
22 22 #-----------------------------------------------------------------------------
23 23 # SQLite operators, adapters, and converters
24 24 #-----------------------------------------------------------------------------
25 25
26 26 operators = {
27 27 '$lt' : "<",
28 28 '$gt' : ">",
29 29 # null is handled weird with ==,!=
30 30 '$eq' : "=",
31 31 '$ne' : "!=",
32 32 '$lte': "<=",
33 33 '$gte': ">=",
34 34 '$in' : ('=', ' OR '),
35 35 '$nin': ('!=', ' AND '),
36 36 # '$all': None,
37 37 # '$mod': None,
38 38 # '$exists' : None
39 39 }
40 40 null_operators = {
41 41 '=' : "IS NULL",
42 42 '!=' : "IS NOT NULL",
43 43 }
44 44
45 def _adapt_datetime(dt):
46 return dt.strftime(ISO8601)
47
48 def _convert_datetime(ds):
49 if ds is None:
50 return ds
51 else:
52 return datetime.strptime(ds, ISO8601)
53
54 45 def _adapt_dict(d):
55 46 return json.dumps(d, default=date_default)
56 47
57 48 def _convert_dict(ds):
58 49 if ds is None:
59 50 return ds
60 51 else:
61 52 return extract_dates(json.loads(ds))
62 53
63 54 def _adapt_bufs(bufs):
64 55 # this is *horrible*
65 56 # copy buffers into single list and pickle it:
66 57 if bufs and isinstance(bufs[0], (bytes, buffer)):
67 58 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
68 59 elif bufs:
69 60 return bufs
70 61 else:
71 62 return None
72 63
73 64 def _convert_bufs(bs):
74 65 if bs is None:
75 66 return []
76 67 else:
77 68 return pickle.loads(bytes(bs))
78 69
79 70 #-----------------------------------------------------------------------------
80 71 # SQLiteDB class
81 72 #-----------------------------------------------------------------------------
82 73
83 74 class SQLiteDB(BaseDB):
84 75 """SQLite3 TaskRecord backend."""
85 76
86 77 filename = Unicode('tasks.db', config=True,
87 78 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
88 79 location = Unicode('', config=True,
89 80 help="""The directory containing the sqlite task database. The default
90 81 is to use the cluster_dir location.""")
91 82 table = Unicode("", config=True,
92 83 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
93 84 a new table will be created with the Hub's IDENT. Specifying the table will result
94 85 in tasks from previous sessions being available via Clients' db_query and
95 86 get_result methods.""")
96 87
97 88 _db = Instance('sqlite3.Connection')
89 # the ordered list of column names
98 90 _keys = List(['msg_id' ,
99 91 'header' ,
100 92 'content',
101 93 'buffers',
102 94 'submitted',
103 95 'client_uuid' ,
104 96 'engine_uuid' ,
105 97 'started',
106 98 'completed',
107 99 'resubmitted',
108 100 'result_header' ,
109 101 'result_content' ,
110 102 'result_buffers' ,
111 103 'queue' ,
112 104 'pyin' ,
113 105 'pyout',
114 106 'pyerr',
115 107 'stdout',
116 108 'stderr',
117 109 ])
110 # sqlite datatypes for checking that db is current format
111 _types = Dict({'msg_id' : 'text' ,
112 'header' : 'dict text',
113 'content' : 'dict text',
114 'buffers' : 'bufs blob',
115 'submitted' : 'timestamp',
116 'client_uuid' : 'text',
117 'engine_uuid' : 'text',
118 'started' : 'timestamp',
119 'completed' : 'timestamp',
120 'resubmitted' : 'timestamp',
121 'result_header' : 'dict text',
122 'result_content' : 'dict text',
123 'result_buffers' : 'bufs blob',
124 'queue' : 'text',
125 'pyin' : 'text',
126 'pyout' : 'text',
127 'pyerr' : 'text',
128 'stdout' : 'text',
129 'stderr' : 'text',
130 })
118 131
119 132 def __init__(self, **kwargs):
120 133 super(SQLiteDB, self).__init__(**kwargs)
121 134 if not self.table:
122 135 # use session, and prefix _, since starting with # is illegal
123 136 self.table = '_'+self.session.replace('-','_')
124 137 if not self.location:
125 138 # get current profile
126 139 from IPython.core.newapplication import BaseIPythonApplication
127 140 if BaseIPythonApplication.initialized():
128 141 app = BaseIPythonApplication.instance()
129 142 if app.profile_dir is not None:
130 143 self.location = app.profile_dir.location
131 144 else:
132 145 self.location = u'.'
133 146 else:
134 147 self.location = u'.'
135 148 self._init_db()
136 149
137 150 # register db commit as 2s periodic callback
138 151 # to prevent clogging pipes
139 152 # assumes we are being run in a zmq ioloop app
140 153 loop = ioloop.IOLoop.instance()
141 154 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
142 155 pc.start()
143 156
144 157 def _defaults(self, keys=None):
145 158 """create an empty record"""
146 159 d = {}
147 160 keys = self._keys if keys is None else keys
148 161 for key in keys:
149 162 d[key] = None
150 163 return d
151 164
165 def _check_table(self):
166 """Ensure that an incorrect table doesn't exist
167
168 If a bad (old) table does exist, return False
169 """
170 cursor = self._db.execute("PRAGMA table_info(%s)"%self.table)
171 lines = cursor.fetchall()
172 if not lines:
173 # table does not exist
174 return True
175 types = {}
176 keys = []
177 for line in lines:
178 keys.append(line[1])
179 types[line[1]] = line[2]
180 if self._keys != keys:
181 # key mismatch
182 self.log.warn('keys mismatch')
183 return False
184 for key in self._keys:
185 if types[key] != self._types[key]:
186 self.log.warn(
187 'type mismatch: %s: %s != %s'%(key,types[key],self._types[key])
188 )
189 return False
190 return True
191
152 192 def _init_db(self):
153 193 """Connect to the database and get new session number."""
154 194 # register adapters
155 sqlite3.register_adapter(datetime, _adapt_datetime)
156 sqlite3.register_converter('datetime', _convert_datetime)
157 195 sqlite3.register_adapter(dict, _adapt_dict)
158 196 sqlite3.register_converter('dict', _convert_dict)
159 197 sqlite3.register_adapter(list, _adapt_bufs)
160 198 sqlite3.register_converter('bufs', _convert_bufs)
161 199 # connect to the db
162 200 dbfile = os.path.join(self.location, self.filename)
163 201 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
164 202 # isolation_level = None)#,
165 203 cached_statements=64)
166 204 # print dir(self._db)
205 first_table = self.table
206 i=0
207 while not self._check_table():
208 i+=1
209 self.table = first_table+'_%i'%i
210 self.log.warn(
211 "Table %s exists and doesn't match db format, trying %s"%
212 (first_table,self.table)
213 )
167 214
168 215 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
169 216 (msg_id text PRIMARY KEY,
170 217 header dict text,
171 218 content dict text,
172 219 buffers bufs blob,
173 submitted datetime text,
220 submitted timestamp,
174 221 client_uuid text,
175 222 engine_uuid text,
176 started datetime text,
177 completed datetime text,
178 resubmitted datetime text,
223 started timestamp,
224 completed timestamp,
225 resubmitted timestamp,
179 226 result_header dict text,
180 227 result_content dict text,
181 228 result_buffers bufs blob,
182 229 queue text,
183 230 pyin text,
184 231 pyout text,
185 232 pyerr text,
186 233 stdout text,
187 234 stderr text)
188 235 """%self.table)
189 236 self._db.commit()
190 237
191 238 def _dict_to_list(self, d):
192 239 """turn a mongodb-style record dict into a list."""
193 240
194 241 return [ d[key] for key in self._keys ]
195 242
196 243 def _list_to_dict(self, line, keys=None):
197 244 """Inverse of dict_to_list"""
198 245 keys = self._keys if keys is None else keys
199 246 d = self._defaults(keys)
200 247 for key,value in zip(keys, line):
201 248 d[key] = value
202 249
203 250 return d
204 251
205 252 def _render_expression(self, check):
206 253 """Turn a mongodb-style search dict into an SQL query."""
207 254 expressions = []
208 255 args = []
209 256
210 257 skeys = set(check.keys())
211 258 skeys.difference_update(set(self._keys))
212 259 skeys.difference_update(set(['buffers', 'result_buffers']))
213 260 if skeys:
214 261 raise KeyError("Illegal testing key(s): %s"%skeys)
215 262
216 263 for name,sub_check in check.iteritems():
217 264 if isinstance(sub_check, dict):
218 265 for test,value in sub_check.iteritems():
219 266 try:
220 267 op = operators[test]
221 268 except KeyError:
222 269 raise KeyError("Unsupported operator: %r"%test)
223 270 if isinstance(op, tuple):
224 271 op, join = op
225 272
226 273 if value is None and op in null_operators:
227 274 expr = "%s %s"%null_operators[op]
228 275 else:
229 276 expr = "%s %s ?"%(name, op)
230 277 if isinstance(value, (tuple,list)):
231 278 if op in null_operators and any([v is None for v in value]):
232 279 # equality tests don't work with NULL
233 280 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
234 281 expr = '( %s )'%( join.join([expr]*len(value)) )
235 282 args.extend(value)
236 283 else:
237 284 args.append(value)
238 285 expressions.append(expr)
239 286 else:
240 287 # it's an equality check
241 288 if sub_check is None:
242 289 expressions.append("%s IS NULL")
243 290 else:
244 291 expressions.append("%s = ?"%name)
245 292 args.append(sub_check)
246 293
247 294 expr = " AND ".join(expressions)
248 295 return expr, args
249 296
250 297 def add_record(self, msg_id, rec):
251 298 """Add a new Task Record, by msg_id."""
252 299 d = self._defaults()
253 300 d.update(rec)
254 301 d['msg_id'] = msg_id
255 302 line = self._dict_to_list(d)
256 303 tups = '(%s)'%(','.join(['?']*len(line)))
257 304 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
258 305 # self._db.commit()
259 306
260 307 def get_record(self, msg_id):
261 308 """Get a specific Task Record, by msg_id."""
262 309 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
263 310 line = cursor.fetchone()
264 311 if line is None:
265 312 raise KeyError("No such msg: %r"%msg_id)
266 313 return self._list_to_dict(line)
267 314
268 315 def update_record(self, msg_id, rec):
269 316 """Update the data in an existing record."""
270 317 query = "UPDATE %s SET "%self.table
271 318 sets = []
272 319 keys = sorted(rec.keys())
273 320 values = []
274 321 for key in keys:
275 322 sets.append('%s = ?'%key)
276 323 values.append(rec[key])
277 324 query += ', '.join(sets)
278 325 query += ' WHERE msg_id == ?'
279 326 values.append(msg_id)
280 327 self._db.execute(query, values)
281 328 # self._db.commit()
282 329
283 330 def drop_record(self, msg_id):
284 331 """Remove a record from the DB."""
285 332 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
286 333 # self._db.commit()
287 334
288 335 def drop_matching_records(self, check):
289 336 """Remove a record from the DB."""
290 337 expr,args = self._render_expression(check)
291 338 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
292 339 self._db.execute(query,args)
293 340 # self._db.commit()
294 341
295 342 def find_records(self, check, keys=None):
296 343 """Find records matching a query dict, optionally extracting subset of keys.
297 344
298 345 Returns list of matching records.
299 346
300 347 Parameters
301 348 ----------
302 349
303 350 check: dict
304 351 mongodb-style query argument
305 352 keys: list of strs [optional]
306 353 if specified, the subset of keys to extract. msg_id will *always* be
307 354 included.
308 355 """
309 356 if keys:
310 357 bad_keys = [ key for key in keys if key not in self._keys ]
311 358 if bad_keys:
312 359 raise KeyError("Bad record key(s): %s"%bad_keys)
313 360
314 361 if keys:
315 362 # ensure msg_id is present and first:
316 363 if 'msg_id' in keys:
317 364 keys.remove('msg_id')
318 365 keys.insert(0, 'msg_id')
319 366 req = ', '.join(keys)
320 367 else:
321 368 req = '*'
322 369 expr,args = self._render_expression(check)
323 370 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
324 371 cursor = self._db.execute(query, args)
325 372 matches = cursor.fetchall()
326 373 records = []
327 374 for line in matches:
328 375 rec = self._list_to_dict(line, keys)
329 376 records.append(rec)
330 377 return records
331 378
332 379 def get_history(self):
333 380 """get all msg_ids, ordered by time submitted."""
334 381 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
335 382 cursor = self._db.execute(query)
336 383 # will be a list of length 1 tuples
337 384 return [ tup[0] for tup in cursor.fetchall()]
338 385
339 386 __all__ = ['SQLiteDB'] No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now