__init__.py
180 lines
| 5.2 KiB
| text/x-python
|
PythonLexer
r1 | #!/usr/bin/env python | |||
r5042 | ||||
r1 | """.. currentmodule:: migrate.versioning.util""" | |||
import warnings | ||||
import logging | ||||
from decorator import decorator | ||||
from pkg_resources import EntryPoint | ||||
from sqlalchemy import create_engine | ||||
from sqlalchemy.engine import Engine | ||||
from sqlalchemy.pool import StaticPool | ||||
from rhodecode.lib.dbmigrate.migrate import exceptions | ||||
from rhodecode.lib.dbmigrate.migrate.versioning.util.keyedinstance import KeyedInstance | ||||
from rhodecode.lib.dbmigrate.migrate.versioning.util.importpath import import_path | ||||
log = logging.getLogger(__name__) | ||||
r3425 | ||||
r1 | def load_model(dotted_name): | |||
"""Import module and use module-level variable". | ||||
:param dotted_name: path to model in form of string: ``some.python.module:Class`` | ||||
.. versionchanged:: 0.5.4 | ||||
""" | ||||
r4908 | if isinstance(dotted_name, str): | |||
r1 | if ':' not in dotted_name: | |||
# backwards compatibility | ||||
warnings.warn('model should be in form of module.model:User ' | ||||
'and not module.model.User', exceptions.MigrateDeprecationWarning) | ||||
dotted_name = ':'.join(dotted_name.rsplit('.', 1)) | ||||
return EntryPoint.parse('x=%s' % dotted_name).load(False) | ||||
else: | ||||
# Assume it's already loaded. | ||||
return dotted_name | ||||
def asbool(obj): | ||||
"""Do everything to use object as bool""" | ||||
r4908 | if isinstance(obj, str): | |||
r1 | obj = obj.strip().lower() | |||
if obj in ['true', 'yes', 'on', 'y', 't', '1']: | ||||
return True | ||||
elif obj in ['false', 'no', 'off', 'n', 'f', '0']: | ||||
return False | ||||
else: | ||||
raise ValueError("String is not true/false: %r" % obj) | ||||
if obj in (True, False): | ||||
return bool(obj) | ||||
else: | ||||
raise ValueError("String is not true/false: %r" % obj) | ||||
def guess_obj_type(obj): | ||||
"""Do everything to guess object type from string | ||||
Tries to convert to `int`, `bool` and finally returns if not succeded. | ||||
.. versionadded: 0.5.4 | ||||
""" | ||||
result = None | ||||
try: | ||||
result = int(obj) | ||||
except: | ||||
pass | ||||
if result is None: | ||||
try: | ||||
result = asbool(obj) | ||||
except: | ||||
pass | ||||
if result is not None: | ||||
return result | ||||
else: | ||||
return obj | ||||
@decorator | ||||
def catch_known_errors(f, *a, **kw): | ||||
"""Decorator that catches known api errors | ||||
.. versionadded: 0.5.4 | ||||
""" | ||||
try: | ||||
return f(*a, **kw) | ||||
except exceptions.PathFoundError as e: | ||||
raise exceptions.KnownError("The path %s already exists" % e.args[0]) | ||||
def construct_engine(engine, **opts): | ||||
""".. versionadded:: 0.5.4 | ||||
Constructs and returns SQLAlchemy engine. | ||||
Currently, there are 2 ways to pass create_engine options to :mod:`migrate.versioning.api` functions: | ||||
:param engine: connection string or a existing engine | ||||
:param engine_dict: python dictionary of options to pass to `create_engine` | ||||
:param engine_arg_*: keyword parameters to pass to `create_engine` (evaluated with :func:`migrate.versioning.util.guess_obj_type`) | ||||
:type engine_dict: dict | ||||
:type engine: string or Engine instance | ||||
:type engine_arg_*: string | ||||
:returns: SQLAlchemy Engine | ||||
.. note:: | ||||
keyword parameters override ``engine_dict`` values. | ||||
""" | ||||
if isinstance(engine, Engine): | ||||
return engine | ||||
r4908 | elif not isinstance(engine, str): | |||
r1 | raise ValueError("you need to pass either an existing engine or a database uri") | |||
# get options for create_engine | ||||
if opts.get('engine_dict') and isinstance(opts['engine_dict'], dict): | ||||
kwargs = opts['engine_dict'] | ||||
else: | ||||
kwargs = {} | ||||
# DEPRECATED: handle echo the old way | ||||
echo = asbool(opts.get('echo', False)) | ||||
if echo: | ||||
r5042 | warnings.warn( | |||
'echo=True parameter is deprecated, pass engine_arg_echo=True or engine_dict={"echo": True}', | ||||
r1 | exceptions.MigrateDeprecationWarning) | |||
kwargs['echo'] = echo | ||||
# parse keyword arguments | ||||
r4988 | for key, value in list(opts.items()): | |||
r1 | if key.startswith('engine_arg_'): | |||
kwargs[key[11:]] = guess_obj_type(value) | ||||
log.debug('Constructing engine') | ||||
# TODO: return create_engine(engine, poolclass=StaticPool, **kwargs) | ||||
# seems like 0.5.x branch does not work with engine.dispose and staticpool | ||||
return create_engine(engine, **kwargs) | ||||
@decorator | ||||
def with_engine(f, *a, **kw): | ||||
"""Decorator for :mod:`migrate.versioning.api` functions | ||||
to safely close resources after function usage. | ||||
Passes engine parameters to :func:`construct_engine` and | ||||
resulting parameter is available as kw['engine']. | ||||
Engine is disposed after wrapped function is executed. | ||||
.. versionadded: 0.6.0 | ||||
""" | ||||
url = a[0] | ||||
engine = construct_engine(url, **kw) | ||||
try: | ||||
kw['engine'] = engine | ||||
return f(*a, **kw) | ||||
finally: | ||||
if isinstance(engine, Engine) and engine is not url: | ||||
log.debug('Disposing SQLAlchemy engine %s', engine) | ||||
engine.dispose() | ||||
class Memoize: | ||||
"""Memoize(fn) - an instance which acts like fn but memoizes its arguments | ||||
Will only work on functions with non-mutable arguments | ||||
ActiveState Code 52201 | ||||
""" | ||||
def __init__(self, fn): | ||||
self.fn = fn | ||||
self.memo = {} | ||||
def __call__(self, *args): | ||||
if args not in self.memo: | ||||
self.memo[args] = self.fn(*args) | ||||
return self.memo[args] | ||||