##// END OF EJS Templates
fix(caching): fixed problems with Cache query for users....
fix(caching): fixed problems with Cache query for users. The old way of querying caused the user get query to be always cached, and returning old results even in 2fa forms. The new limited query doesn't cache the user object resolving issues

File last commit:

r5223:6c71ac19 default
r5365:ae8a165b default
Show More
jsonalchemy.py
182 lines | 5.9 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import sqlalchemy
from sqlalchemy import UnicodeText
from sqlalchemy.ext.mutable import Mutable, MutableList, MutableDict
from rhodecode.lib import ext_json
class JsonRaw(str):
"""
Allows interacting with a JSON types field using a raw string.
For example:
db_instance = JsonTable()
db_instance.enabled = True
db_instance.json_data = JsonRaw('{"a": 4}')
This will bypass serialization/checks, and allow storing
raw values
"""
pass
class JSONEncodedObj(sqlalchemy.types.TypeDecorator):
"""
Represents an immutable structure as a json-encoded string.
If default is, for example, a dict, then a NULL value in the
database will be exposed as an empty dict.
"""
impl = UnicodeText
safe = True
enforce_str = True
cache_ok = True
def __init__(self, *args, **kwargs):
self.default = kwargs.pop('default', None)
self.safe = kwargs.pop('safe_json', self.safe)
self.enforce_str = kwargs.pop('enforce_str', self.enforce_str)
self.dialect_map = kwargs.pop('dialect_map', {})
super(JSONEncodedObj, self).__init__(*args, **kwargs)
def load_dialect_impl(self, dialect):
if dialect.name in self.dialect_map:
return dialect.type_descriptor(self.dialect_map[dialect.name])
return dialect.type_descriptor(self.impl)
def process_bind_param(self, value, dialect):
if isinstance(value, JsonRaw):
value = value
elif value is not None:
if self.enforce_str:
value = ext_json.str_json(value)
else:
value = ext_json.json.dumps(value)
return value
def process_result_value(self, value, dialect):
if self.default is not None and (not value or value == '""'):
return self.default()
if value is not None:
try:
value = ext_json.json.loads(value)
except Exception:
if self.safe and self.default is not None:
return self.default()
else:
raise
return value
class MutationObj(Mutable):
@classmethod
def coerce(cls, key, value):
if isinstance(value, dict) and not isinstance(value, MutationDict):
return MutationDict.coerce(key, value)
if isinstance(value, list) and not isinstance(value, MutationList):
return MutationList.coerce(key, value)
return value
def de_coerce(self) -> "MutationObj":
return self
@classmethod
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
key = attribute.key
if parent_cls is not attribute.class_:
return
# rely on "propagate" here
parent_cls = attribute.class_
def load(state, *args):
val = state.dict.get(key, None)
if coerce:
val = cls.coerce(key, val)
state.dict[key] = val
if isinstance(val, cls):
val._parents[state.obj()] = key
def set(target, value, oldvalue, initiator):
if not isinstance(value, cls):
value = cls.coerce(key, value)
if isinstance(value, cls):
value._parents[target.obj()] = key
if isinstance(oldvalue, cls):
oldvalue._parents.pop(target.obj(), None)
return value
def pickle(state, state_dict):
val = state.dict.get(key, None)
if isinstance(val, cls):
if 'ext.mutable.values' not in state_dict:
state_dict['ext.mutable.values'] = []
state_dict['ext.mutable.values'].append(val)
def unpickle(state, state_dict):
if 'ext.mutable.values' in state_dict:
for val in state_dict['ext.mutable.values']:
val._parents[state.obj()] = key
sqlalchemy.event.listen(parent_cls, 'load', load, raw=True,
propagate=True)
sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True,
propagate=True)
sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True,
propagate=True)
sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True,
propagate=True)
sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True,
propagate=True)
class MutationList(MutableList):
def de_coerce(self):
return list(self)
class MutationDict(MutableDict):
def de_coerce(self):
return dict(self)
def JsonType(impl=None, **kwargs):
"""
Helper for using a mutation obj, it allows to use .with_variant easily.
example::
settings = Column('settings_json',
MutationObj.as_mutable(
JsonType(dialect_map=dict(mysql=UnicodeText(16384))))
"""
if impl == 'list':
return JSONEncodedObj(default=list, **kwargs)
elif impl == 'dict':
return JSONEncodedObj(default=dict, **kwargs)
else:
return JSONEncodedObj(**kwargs)