sqlitedb.py
414 lines
| 13.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r16567 | """A TaskRecord backend using sqlite3""" | ||
MinRK
|
r4018 | |||
MinRK
|
r16567 | # Copyright (c) IPython Development Team. | ||
# Distributed under the terms of the Modified BSD License. | ||||
MinRK
|
r3652 | |||
import json | ||||
import os | ||||
Thomas Kluyver
|
r13354 | try: | ||
import cPickle as pickle | ||||
except ImportError: | ||||
import pickle | ||||
MinRK
|
r3652 | from datetime import datetime | ||
MinRK
|
r5147 | try: | ||
import sqlite3 | ||||
except ImportError: | ||||
sqlite3 = None | ||||
MinRK
|
r3652 | |||
MinRK
|
r3668 | from zmq.eventloop import ioloop | ||
MinRK
|
r4009 | from IPython.utils.traitlets import Unicode, Instance, List, Dict | ||
MinRK
|
r3652 | from .dictdb import BaseDB | ||
MinRK
|
r4009 | from IPython.utils.jsonutil import date_default, extract_dates, squash_dates | ||
Thomas Kluyver
|
r13361 | from IPython.utils.py3compat import iteritems | ||
MinRK
|
r3652 | |||
#----------------------------------------------------------------------------- | ||||
# SQLite operators, adapters, and converters | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4155 | try: | ||
buffer | ||||
except NameError: | ||||
# py3k | ||||
buffer = memoryview | ||||
MinRK
|
r3652 | operators = { | ||
MinRK
|
r3780 | '$lt' : "<", | ||
MinRK
|
r3652 | '$gt' : ">", | ||
# null is handled weird with ==,!= | ||||
MinRK
|
r3875 | '$eq' : "=", | ||
'$ne' : "!=", | ||||
MinRK
|
r3652 | '$lte': "<=", | ||
'$gte': ">=", | ||||
MinRK
|
r3875 | '$in' : ('=', ' OR '), | ||
'$nin': ('!=', ' AND '), | ||||
MinRK
|
r3652 | # '$all': None, | ||
# '$mod': None, | ||||
# '$exists' : None | ||||
} | ||||
MinRK
|
r3875 | null_operators = { | ||
'=' : "IS NULL", | ||||
'!=' : "IS NOT NULL", | ||||
} | ||||
MinRK
|
r3652 | |||
def _adapt_dict(d): | ||||
MinRK
|
r4006 | return json.dumps(d, default=date_default) | ||
MinRK
|
r3652 | |||
def _convert_dict(ds): | ||||
if ds is None: | ||||
return ds | ||||
else: | ||||
MinRK
|
r4155 | if isinstance(ds, bytes): | ||
# If I understand the sqlite doc correctly, this will always be utf8 | ||||
ds = ds.decode('utf8') | ||||
MinRK
|
r4161 | return extract_dates(json.loads(ds)) | ||
MinRK
|
r3652 | |||
def _adapt_bufs(bufs): | ||||
# this is *horrible* | ||||
# copy buffers into single list and pickle it: | ||||
if bufs and isinstance(bufs[0], (bytes, buffer)): | ||||
Thomas Kluyver
|
r13383 | return sqlite3.Binary(pickle.dumps(list(map(bytes, bufs)),-1)) | ||
MinRK
|
r3652 | elif bufs: | ||
return bufs | ||||
else: | ||||
return None | ||||
def _convert_bufs(bs): | ||||
if bs is None: | ||||
return [] | ||||
else: | ||||
return pickle.loads(bytes(bs)) | ||||
#----------------------------------------------------------------------------- | ||||
# SQLiteDB class | ||||
#----------------------------------------------------------------------------- | ||||
class SQLiteDB(BaseDB): | ||||
"""SQLite3 TaskRecord backend.""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3988 | filename = Unicode('tasks.db', config=True, | ||
MinRK
|
r3985 | help="""The filename of the sqlite task database. [default: 'tasks.db']""") | ||
MinRK
|
r3988 | location = Unicode('', config=True, | ||
MinRK
|
r3985 | help="""The directory containing the sqlite task database. The default | ||
is to use the cluster_dir location.""") | ||||
MinRK
|
r12255 | table = Unicode("ipython-tasks", config=True, | ||
MinRK
|
r3985 | help="""The SQLite Table to use for storing tasks for this session. If unspecified, | ||
a new table will be created with the Hub's IDENT. Specifying the table will result | ||||
in tasks from previous sessions being available via Clients' db_query and | ||||
get_result methods.""") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5147 | if sqlite3 is not None: | ||
Sylvain Corlay
|
r20940 | _db = Instance('sqlite3.Connection', allow_none=True) | ||
MinRK
|
r5147 | else: | ||
_db = None | ||||
MinRK
|
r4009 | # the ordered list of column names | ||
MinRK
|
r3652 | _keys = List(['msg_id' , | ||
'header' , | ||||
MinRK
|
r7957 | 'metadata', | ||
MinRK
|
r3652 | 'content', | ||
'buffers', | ||||
'submitted', | ||||
'client_uuid' , | ||||
'engine_uuid' , | ||||
'started', | ||||
'completed', | ||||
'resubmitted', | ||||
MinRK
|
r6469 | 'received', | ||
MinRK
|
r3652 | 'result_header' , | ||
MinRK
|
r7957 | 'result_metadata', | ||
MinRK
|
r3652 | 'result_content' , | ||
'result_buffers' , | ||||
'queue' , | ||||
MinRK
|
r16567 | 'execute_input' , | ||
MinRK
|
r16568 | 'execute_result', | ||
MinRK
|
r16569 | 'error', | ||
MinRK
|
r3652 | 'stdout', | ||
'stderr', | ||||
]) | ||||
MinRK
|
r4009 | # sqlite datatypes for checking that db is current format | ||
_types = Dict({'msg_id' : 'text' , | ||||
'header' : 'dict text', | ||||
MinRK
|
r7957 | 'metadata' : 'dict text', | ||
MinRK
|
r4009 | 'content' : 'dict text', | ||
'buffers' : 'bufs blob', | ||||
'submitted' : 'timestamp', | ||||
'client_uuid' : 'text', | ||||
'engine_uuid' : 'text', | ||||
'started' : 'timestamp', | ||||
'completed' : 'timestamp', | ||||
MinRK
|
r6817 | 'resubmitted' : 'text', | ||
MinRK
|
r6469 | 'received' : 'timestamp', | ||
MinRK
|
r4009 | 'result_header' : 'dict text', | ||
MinRK
|
r7957 | 'result_metadata' : 'dict text', | ||
MinRK
|
r4009 | 'result_content' : 'dict text', | ||
'result_buffers' : 'bufs blob', | ||||
'queue' : 'text', | ||||
MinRK
|
r16567 | 'execute_input' : 'text', | ||
MinRK
|
r16568 | 'execute_result' : 'text', | ||
MinRK
|
r16569 | 'error' : 'text', | ||
MinRK
|
r4009 | 'stdout' : 'text', | ||
'stderr' : 'text', | ||||
}) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def __init__(self, **kwargs): | ||
super(SQLiteDB, self).__init__(**kwargs) | ||||
MinRK
|
r5147 | if sqlite3 is None: | ||
raise ImportError("SQLiteDB requires sqlite3") | ||||
MinRK
|
r3652 | if not self.table: | ||
# use session, and prefix _, since starting with # is illegal | ||||
self.table = '_'+self.session.replace('-','_') | ||||
if not self.location: | ||||
MinRK
|
r3992 | # get current profile | ||
MinRK
|
r4023 | from IPython.core.application import BaseIPythonApplication | ||
MinRK
|
r3992 | if BaseIPythonApplication.initialized(): | ||
app = BaseIPythonApplication.instance() | ||||
if app.profile_dir is not None: | ||||
self.location = app.profile_dir.location | ||||
else: | ||||
self.location = u'.' | ||||
MinRK
|
r3652 | else: | ||
MinRK
|
r3992 | self.location = u'.' | ||
MinRK
|
r3652 | self._init_db() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3668 | # register db commit as 2s periodic callback | ||
# to prevent clogging pipes | ||||
# assumes we are being run in a zmq ioloop app | ||||
loop = ioloop.IOLoop.instance() | ||||
pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop) | ||||
pc.start() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def _defaults(self, keys=None): | ||
MinRK
|
r3652 | """create an empty record""" | ||
d = {} | ||||
MinRK
|
r3780 | keys = self._keys if keys is None else keys | ||
for key in keys: | ||||
MinRK
|
r3652 | d[key] = None | ||
return d | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4009 | def _check_table(self): | ||
"""Ensure that an incorrect table doesn't exist | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4009 | If a bad (old) table does exist, return False | ||
""" | ||||
MinRK
|
r12256 | cursor = self._db.execute("PRAGMA table_info('%s')"%self.table) | ||
MinRK
|
r4009 | lines = cursor.fetchall() | ||
if not lines: | ||||
# table does not exist | ||||
return True | ||||
types = {} | ||||
keys = [] | ||||
for line in lines: | ||||
keys.append(line[1]) | ||||
types[line[1]] = line[2] | ||||
if self._keys != keys: | ||||
# key mismatch | ||||
self.log.warn('keys mismatch') | ||||
return False | ||||
for key in self._keys: | ||||
if types[key] != self._types[key]: | ||||
self.log.warn( | ||||
'type mismatch: %s: %s != %s'%(key,types[key],self._types[key]) | ||||
) | ||||
return False | ||||
return True | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def _init_db(self): | ||
"""Connect to the database and get new session number.""" | ||||
# register adapters | ||||
sqlite3.register_adapter(dict, _adapt_dict) | ||||
sqlite3.register_converter('dict', _convert_dict) | ||||
sqlite3.register_adapter(list, _adapt_bufs) | ||||
sqlite3.register_converter('bufs', _convert_bufs) | ||||
# connect to the db | ||||
dbfile = os.path.join(self.location, self.filename) | ||||
Bernardo B. Marques
|
r4872 | self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES, | ||
MinRK
|
r3668 | # isolation_level = None)#, | ||
cached_statements=64) | ||||
MinRK
|
r3666 | # print dir(self._db) | ||
MinRK
|
r6815 | first_table = previous_table = self.table | ||
MinRK
|
r4009 | i=0 | ||
while not self._check_table(): | ||||
i+=1 | ||||
self.table = first_table+'_%i'%i | ||||
self.log.warn( | ||||
"Table %s exists and doesn't match db format, trying %s"% | ||||
MinRK
|
r6815 | (previous_table, self.table) | ||
MinRK
|
r4009 | ) | ||
MinRK
|
r6815 | previous_table = self.table | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r12256 | self._db.execute("""CREATE TABLE IF NOT EXISTS '%s' | ||
MinRK
|
r3652 | (msg_id text PRIMARY KEY, | ||
header dict text, | ||||
MinRK
|
r7957 | metadata dict text, | ||
MinRK
|
r3652 | content dict text, | ||
buffers bufs blob, | ||||
MinRK
|
r4009 | submitted timestamp, | ||
MinRK
|
r3652 | client_uuid text, | ||
engine_uuid text, | ||||
MinRK
|
r4009 | started timestamp, | ||
completed timestamp, | ||||
MinRK
|
r6817 | resubmitted text, | ||
MinRK
|
r6469 | received timestamp, | ||
MinRK
|
r3652 | result_header dict text, | ||
MinRK
|
r7957 | result_metadata dict text, | ||
MinRK
|
r3652 | result_content dict text, | ||
result_buffers bufs blob, | ||||
queue text, | ||||
MinRK
|
r16567 | execute_input text, | ||
MinRK
|
r16568 | execute_result text, | ||
MinRK
|
r16569 | error text, | ||
MinRK
|
r3652 | stdout text, | ||
stderr text) | ||||
"""%self.table) | ||||
self._db.commit() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def _dict_to_list(self, d): | ||
"""turn a mongodb-style record dict into a list.""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | return [ d[key] for key in self._keys ] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def _list_to_dict(self, line, keys=None): | ||
MinRK
|
r3652 | """Inverse of dict_to_list""" | ||
MinRK
|
r3780 | keys = self._keys if keys is None else keys | ||
d = self._defaults(keys) | ||||
for key,value in zip(keys, line): | ||||
MinRK
|
r3652 | d[key] = value | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | return d | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def _render_expression(self, check): | ||
"""Turn a mongodb-style search dict into an SQL query.""" | ||||
expressions = [] | ||||
args = [] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | skeys = set(check.keys()) | ||
skeys.difference_update(set(self._keys)) | ||||
skeys.difference_update(set(['buffers', 'result_buffers'])) | ||||
if skeys: | ||||
raise KeyError("Illegal testing key(s): %s"%skeys) | ||||
Bernardo B. Marques
|
r4872 | |||
Thomas Kluyver
|
r13361 | for name,sub_check in iteritems(check): | ||
MinRK
|
r3652 | if isinstance(sub_check, dict): | ||
Thomas Kluyver
|
r13361 | for test,value in iteritems(sub_check): | ||
MinRK
|
r3652 | try: | ||
op = operators[test] | ||||
except KeyError: | ||||
raise KeyError("Unsupported operator: %r"%test) | ||||
if isinstance(op, tuple): | ||||
op, join = op | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3875 | if value is None and op in null_operators: | ||
MinRK
|
r5884 | expr = "%s %s" % (name, null_operators[op]) | ||
MinRK
|
r3652 | else: | ||
MinRK
|
r3875 | expr = "%s %s ?"%(name, op) | ||
if isinstance(value, (tuple,list)): | ||||
if op in null_operators and any([v is None for v in value]): | ||||
# equality tests don't work with NULL | ||||
raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test) | ||||
expr = '( %s )'%( join.join([expr]*len(value)) ) | ||||
args.extend(value) | ||||
else: | ||||
args.append(value) | ||||
MinRK
|
r3652 | expressions.append(expr) | ||
else: | ||||
# it's an equality check | ||||
MinRK
|
r3875 | if sub_check is None: | ||
MinRK
|
r5884 | expressions.append("%s IS NULL" % name) | ||
MinRK
|
r3875 | else: | ||
expressions.append("%s = ?"%name) | ||||
args.append(sub_check) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | expr = " AND ".join(expressions) | ||
return expr, args | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def add_record(self, msg_id, rec): | ||
"""Add a new Task Record, by msg_id.""" | ||||
d = self._defaults() | ||||
d.update(rec) | ||||
d['msg_id'] = msg_id | ||||
line = self._dict_to_list(d) | ||||
tups = '(%s)'%(','.join(['?']*len(line))) | ||||
MinRK
|
r12256 | self._db.execute("INSERT INTO '%s' VALUES %s"%(self.table, tups), line) | ||
MinRK
|
r3668 | # self._db.commit() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def get_record(self, msg_id): | ||
"""Get a specific Task Record, by msg_id.""" | ||||
MinRK
|
r12256 | cursor = self._db.execute("""SELECT * FROM '%s' WHERE msg_id==?"""%self.table, (msg_id,)) | ||
MinRK
|
r3652 | line = cursor.fetchone() | ||
if line is None: | ||||
raise KeyError("No such msg: %r"%msg_id) | ||||
return self._list_to_dict(line) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def update_record(self, msg_id, rec): | ||
"""Update the data in an existing record.""" | ||||
MinRK
|
r12256 | query = "UPDATE '%s' SET "%self.table | ||
MinRK
|
r3652 | sets = [] | ||
keys = sorted(rec.keys()) | ||||
values = [] | ||||
for key in keys: | ||||
sets.append('%s = ?'%key) | ||||
values.append(rec[key]) | ||||
query += ', '.join(sets) | ||||
MinRK
|
r3780 | query += ' WHERE msg_id == ?' | ||
values.append(msg_id) | ||||
MinRK
|
r3652 | self._db.execute(query, values) | ||
MinRK
|
r3668 | # self._db.commit() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def drop_record(self, msg_id): | ||
"""Remove a record from the DB.""" | ||||
MinRK
|
r12256 | self._db.execute("""DELETE FROM '%s' WHERE msg_id==?"""%self.table, (msg_id,)) | ||
MinRK
|
r3668 | # self._db.commit() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3652 | def drop_matching_records(self, check): | ||
"""Remove a record from the DB.""" | ||||
expr,args = self._render_expression(check) | ||||
MinRK
|
r12256 | query = "DELETE FROM '%s' WHERE %s"%(self.table, expr) | ||
MinRK
|
r3652 | self._db.execute(query,args) | ||
MinRK
|
r3668 | # self._db.commit() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def find_records(self, check, keys=None): | ||
"""Find records matching a query dict, optionally extracting subset of keys. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Returns list of matching records. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | Parameters | ||
---------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | check: dict | ||
mongodb-style query argument | ||||
keys: list of strs [optional] | ||||
if specified, the subset of keys to extract. msg_id will *always* be | ||||
included. | ||||
""" | ||||
if keys: | ||||
bad_keys = [ key for key in keys if key not in self._keys ] | ||||
if bad_keys: | ||||
raise KeyError("Bad record key(s): %s"%bad_keys) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | if keys: | ||
# ensure msg_id is present and first: | ||||
if 'msg_id' in keys: | ||||
keys.remove('msg_id') | ||||
keys.insert(0, 'msg_id') | ||||
req = ', '.join(keys) | ||||
else: | ||||
req = '*' | ||||
MinRK
|
r3652 | expr,args = self._render_expression(check) | ||
MinRK
|
r12256 | query = """SELECT %s FROM '%s' WHERE %s"""%(req, self.table, expr) | ||
MinRK
|
r3652 | cursor = self._db.execute(query, args) | ||
matches = cursor.fetchall() | ||||
MinRK
|
r3780 | records = [] | ||
for line in matches: | ||||
rec = self._list_to_dict(line, keys) | ||||
records.append(rec) | ||||
return records | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3780 | def get_history(self): | ||
"""get all msg_ids, ordered by time submitted.""" | ||||
MinRK
|
r12256 | query = """SELECT msg_id FROM '%s' ORDER by submitted ASC"""%self.table | ||
MinRK
|
r3780 | cursor = self._db.execute(query) | ||
# will be a list of length 1 tuples | ||||
return [ tup[0] for tup in cursor.fetchall()] | ||||
MinRK
|
r3652 | |||
Sylvain Corlay
|
r20940 | __all__ = ['SQLiteDB'] | ||