##// END OF EJS Templates
deps: bumped pycryptodome==3.21.0 for security issue
deps: bumped pycryptodome==3.21.0 for security issue

File last commit:

r5608:6d33e504 default
r5640:acc4336c default
Show More
jsonalchemy.py
181 lines | 5.9 KiB | text/x-python | PythonLexer
# Copyright (C) 2010-2024 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import sqlalchemy
from sqlalchemy import UnicodeText
from sqlalchemy.ext.mutable import Mutable, MutableList, MutableDict
from rhodecode.lib import ext_json
class JsonRaw(str):
"""
Allows interacting with a JSON types field using a raw string.
For example:
db_instance = JsonTable()
db_instance.enabled = True
db_instance.json_data = JsonRaw('{"a": 4}')
This will bypass serialization/checks, and allow storing
raw values
"""
pass
class JSONEncodedObj(sqlalchemy.types.TypeDecorator):
"""
Represents an immutable structure as a json-encoded string.
If default is, for example, a dict, then a NULL value in the
database will be exposed as an empty dict.
"""
impl = UnicodeText
safe = True
enforce_str = True
cache_ok = True
def __init__(self, *args, **kwargs):
self.default = kwargs.pop('default', None)
self.safe = kwargs.pop('safe_json', self.safe)
self.enforce_str = kwargs.pop('enforce_str', self.enforce_str)
self.dialect_map = kwargs.pop('dialect_map', {})
super(JSONEncodedObj, self).__init__(*args, **kwargs)
def load_dialect_impl(self, dialect):
if dialect.name in self.dialect_map:
return dialect.type_descriptor(self.dialect_map[dialect.name])
return dialect.type_descriptor(self.impl)
def process_bind_param(self, value, dialect):
if isinstance(value, JsonRaw):
value = value
elif value is not None:
if self.enforce_str:
value = ext_json.str_json(value)
else:
value = ext_json.json.dumps(value)
return value
def process_result_value(self, value, dialect):
if self.default is not None and (not value or value == '""'):
return self.default()
if value is not None:
try:
value = ext_json.json.loads(value)
except Exception:
if self.safe and self.default is not None:
return self.default()
else:
raise
return value
class MutationObj(Mutable):
@classmethod
def coerce(cls, key, value):
if isinstance(value, dict) and not isinstance(value, MutationDict):
return MutationDict.coerce(key, value)
if isinstance(value, list) and not isinstance(value, MutationList):
return MutationList.coerce(key, value)
return value
def de_coerce(self) -> "MutationObj":
return self
@classmethod
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
key = attribute.key
if parent_cls is not attribute.class_:
return
# rely on "propagate" here
parent_cls = attribute.class_
def load(state, *args):
val = state.dict.get(key, None)
if coerce:
val = cls.coerce(key, val)
state.dict[key] = val
if isinstance(val, cls):
val._parents[state.obj()] = key
def set(target, value, oldvalue, initiator):
if not isinstance(value, cls):
value = cls.coerce(key, value)
if isinstance(value, cls):
value._parents[target.obj()] = key
if isinstance(oldvalue, cls):
oldvalue._parents.pop(target.obj(), None)
return value
def pickle(state, state_dict):
val = state.dict.get(key, None)
if isinstance(val, cls):
if 'ext.mutable.values' not in state_dict:
state_dict['ext.mutable.values'] = []
state_dict['ext.mutable.values'].append(val)
def unpickle(state, state_dict):
if 'ext.mutable.values' in state_dict:
for val in state_dict['ext.mutable.values']:
val._parents[state.obj()] = key
sqlalchemy.event.listen(parent_cls, 'load', load, raw=True,
propagate=True)
sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True,
propagate=True)
sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True,
propagate=True)
sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True,
propagate=True)
sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True,
propagate=True)
class MutationList(MutableList):
def de_coerce(self):
return list(self)
class MutationDict(MutableDict):
def de_coerce(self):
return dict(self)
def JsonType(impl=None, **kwargs):
"""
Helper for using a mutation obj, it allows to use .with_variant easily.
example::
settings = Column('settings_json',
MutationObj.as_mutable(
JsonType(dialect_map=dict(mysql=UnicodeText(16384))))
"""
if impl == 'list':
return JSONEncodedObj(default=list, **kwargs)
elif impl == 'dict':
return JSONEncodedObj(default=dict, **kwargs)
else:
return JSONEncodedObj(**kwargs)