# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import sqlalchemy from sqlalchemy import UnicodeText from sqlalchemy.ext.mutable import Mutable, MutableList, MutableDict from rhodecode.lib import ext_json class JsonRaw(str): """ Allows interacting with a JSON types field using a raw string. For example: db_instance = JsonTable() db_instance.enabled = True db_instance.json_data = JsonRaw('{"a": 4}') This will bypass serialization/checks, and allow storing raw values """ pass class JSONEncodedObj(sqlalchemy.types.TypeDecorator): """ Represents an immutable structure as a json-encoded string. If default is, for example, a dict, then a NULL value in the database will be exposed as an empty dict. """ impl = UnicodeText safe = True enforce_str = True cache_ok = True def __init__(self, *args, **kwargs): self.default = kwargs.pop('default', None) self.safe = kwargs.pop('safe_json', self.safe) self.enforce_str = kwargs.pop('enforce_str', self.enforce_str) self.dialect_map = kwargs.pop('dialect_map', {}) super(JSONEncodedObj, self).__init__(*args, **kwargs) def load_dialect_impl(self, dialect): if dialect.name in self.dialect_map: return dialect.type_descriptor(self.dialect_map[dialect.name]) return dialect.type_descriptor(self.impl) def process_bind_param(self, value, dialect): if isinstance(value, JsonRaw): value = value elif value is not None: if self.enforce_str: value = ext_json.str_json(value) else: value = ext_json.json.dumps(value) return value def process_result_value(self, value, dialect): if self.default is not None and (not value or value == '""'): return self.default() if value is not None: try: value = ext_json.json.loads(value) except Exception: if self.safe and self.default is not None: return self.default() else: raise return value class MutationObj(Mutable): @classmethod def coerce(cls, key, value): if isinstance(value, dict) and not isinstance(value, MutationDict): return MutationDict.coerce(key, value) if isinstance(value, list) and not isinstance(value, MutationList): return MutationList.coerce(key, value) return value def de_coerce(self) -> "MutationObj": return self @classmethod def _listen_on_attribute(cls, attribute, coerce, parent_cls): key = attribute.key if parent_cls is not attribute.class_: return # rely on "propagate" here parent_cls = attribute.class_ def load(state, *args): val = state.dict.get(key, None) if coerce: val = cls.coerce(key, val) state.dict[key] = val if isinstance(val, cls): val._parents[state.obj()] = key def set(target, value, oldvalue, initiator): if not isinstance(value, cls): value = cls.coerce(key, value) if isinstance(value, cls): value._parents[target.obj()] = key if isinstance(oldvalue, cls): oldvalue._parents.pop(target.obj(), None) return value def pickle(state, state_dict): val = state.dict.get(key, None) if isinstance(val, cls): if 'ext.mutable.values' not in state_dict: state_dict['ext.mutable.values'] = [] state_dict['ext.mutable.values'].append(val) def unpickle(state, state_dict): if 'ext.mutable.values' in state_dict: for val in state_dict['ext.mutable.values']: val._parents[state.obj()] = key sqlalchemy.event.listen(parent_cls, 'load', load, raw=True, propagate=True) sqlalchemy.event.listen(parent_cls, 'refresh', load, raw=True, propagate=True) sqlalchemy.event.listen(parent_cls, 'pickle', pickle, raw=True, propagate=True) sqlalchemy.event.listen(attribute, 'set', set, raw=True, retval=True, propagate=True) sqlalchemy.event.listen(parent_cls, 'unpickle', unpickle, raw=True, propagate=True) class MutationList(MutableList): def de_coerce(self): return list(self) class MutationDict(MutableDict): def de_coerce(self): return dict(self) def JsonType(impl=None, **kwargs): """ Helper for using a mutation obj, it allows to use .with_variant easily. example:: settings = Column('settings_json', MutationObj.as_mutable( JsonType(dialect_map=dict(mysql=UnicodeText(16384)))) """ if impl == 'list': return JSONEncodedObj(default=list, **kwargs) elif impl == 'dict': return JSONEncodedObj(default=dict, **kwargs) else: return JSONEncodedObj(**kwargs)