##// END OF EJS Templates
If commit operation is used out of pylons request scope use blank request instead
If commit operation is used out of pylons request scope use blank request instead

File last commit:

r1442:7f31de15 beta
r3481:c9214877 beta
Show More
py.py
160 lines | 5.5 KiB | text/x-python | PythonLexer
added dbmigrate package, added model changes...
r833 #!/usr/bin/env python
# -*- coding: utf-8 -*-
import shutil
import warnings
import logging
updated sqlalchemy migrate to latest version
r1061 import inspect
added dbmigrate package, added model changes...
r833 from StringIO import StringIO
fixed imports on migrate, added getting current version from database
r835 from rhodecode.lib.dbmigrate import migrate
from rhodecode.lib.dbmigrate.migrate.versioning import genmodel, schemadiff
from rhodecode.lib.dbmigrate.migrate.versioning.config import operations
from rhodecode.lib.dbmigrate.migrate.versioning.template import Template
from rhodecode.lib.dbmigrate.migrate.versioning.script import base
from rhodecode.lib.dbmigrate.migrate.versioning.util import import_path, load_model, with_engine
from rhodecode.lib.dbmigrate.migrate.exceptions import MigrateDeprecationWarning, InvalidScriptError, ScriptError
added dbmigrate package, added model changes...
r833
log = logging.getLogger(__name__)
__all__ = ['PythonScript']
class PythonScript(base.BaseScript):
"""Base for Python scripts"""
@classmethod
def create(cls, path, **opts):
"""Create an empty migration script at specified path
source code cleanup: remove trailing white space, normalize file endings
r1203
added dbmigrate package, added model changes...
r833 :returns: :class:`PythonScript instance <migrate.versioning.script.py.PythonScript>`"""
cls.require_notfound(path)
src = Template(opts.pop('templates_path', None)).get_script(theme=opts.pop('templates_theme', None))
shutil.copy(src, path)
return cls(path)
@classmethod
def make_update_script_for_model(cls, engine, oldmodel,
model, repository, **opts):
"""Create a migration script based on difference between two SA models.
source code cleanup: remove trailing white space, normalize file endings
r1203
added dbmigrate package, added model changes...
r833 :param repository: path to migrate repository
:param oldmodel: dotted.module.name:SAClass or SAClass object
:param model: dotted.module.name:SAClass or SAClass object
:param engine: SQLAlchemy engine
:type repository: string or :class:`Repository instance <migrate.versioning.repository.Repository>`
:type oldmodel: string or Class
:type model: string or Class
:type engine: Engine instance
:returns: Upgrade / Downgrade script
:rtype: string
"""
source code cleanup: remove trailing white space, normalize file endings
r1203
added dbmigrate package, added model changes...
r833 if isinstance(repository, basestring):
# oh dear, an import cycle!
fixed imports on migrate, added getting current version from database
r835 from rhodecode.lib.dbmigrate.migrate.versioning.repository import Repository
added dbmigrate package, added model changes...
r833 repository = Repository(repository)
oldmodel = load_model(oldmodel)
model = load_model(model)
# Compute differences.
diff = schemadiff.getDiffOfModelAgainstModel(
update migrations for 1.2
r1442 model,
added dbmigrate package, added model changes...
r833 oldmodel,
excludeTables=[repository.version_table])
# TODO: diff can be False (there is no difference?)
decls, upgradeCommands, downgradeCommands = \
update migrations for 1.2
r1442 genmodel.ModelGenerator(diff,engine).genB2AMigration()
added dbmigrate package, added model changes...
r833
# Store differences into file.
src = Template(opts.pop('templates_path', None)).get_script(opts.pop('templates_theme', None))
f = open(src)
contents = f.read()
f.close()
# generate source
search = 'def upgrade(migrate_engine):'
contents = contents.replace(search, '\n\n'.join((decls, search)), 1)
if upgradeCommands:
contents = contents.replace(' pass', upgradeCommands, 1)
if downgradeCommands:
contents = contents.replace(' pass', downgradeCommands, 1)
return contents
@classmethod
def verify_module(cls, path):
"""Ensure path is a valid script
source code cleanup: remove trailing white space, normalize file endings
r1203
added dbmigrate package, added model changes...
r833 :param path: Script location
:type path: string
:raises: :exc:`InvalidScriptError <migrate.exceptions.InvalidScriptError>`
:returns: Python module
"""
# Try to import and get the upgrade() func
module = import_path(path)
try:
assert callable(module.upgrade)
except Exception, e:
raise InvalidScriptError(path + ': %s' % str(e))
return module
def preview_sql(self, url, step, **args):
source code cleanup: remove trailing white space, normalize file endings
r1203 """Mocks SQLAlchemy Engine to store all executed calls in a string
added dbmigrate package, added model changes...
r833 and runs :meth:`PythonScript.run <migrate.versioning.script.py.PythonScript.run>`
:returns: SQL file
"""
buf = StringIO()
args['engine_arg_strategy'] = 'mock'
args['engine_arg_executor'] = lambda s, p = '': buf.write(str(s) + p)
@with_engine
def go(url, step, **kw):
engine = kw.pop('engine')
self.run(engine, step)
return buf.getvalue()
return go(url, step, **args)
def run(self, engine, step):
source code cleanup: remove trailing white space, normalize file endings
r1203 """Core method of Script file.
added dbmigrate package, added model changes...
r833 Exectues :func:`update` or :func:`downgrade` functions
:param engine: SQLAlchemy Engine
:param step: Operation to run
:type engine: string
:type step: int
"""
if step > 0:
op = 'upgrade'
elif step < 0:
op = 'downgrade'
else:
raise ScriptError("%d is not a valid step" % step)
funcname = base.operations[op]
script_func = self._func(funcname)
updated sqlalchemy migrate to latest version
r1061 # check for old way of using engine
if not inspect.getargspec(script_func)[0]:
raise TypeError("upgrade/downgrade functions must accept engine"
" parameter (since version 0.5.4)")
script_func(engine)
added dbmigrate package, added model changes...
r833
@property
def module(self):
"""Calls :meth:`migrate.versioning.script.py.verify_module`
and returns it.
"""
if not hasattr(self, '_module'):
self._module = self.verify_module(self.path)
return self._module
def _func(self, funcname):
if not hasattr(self.module, funcname):
msg = "Function '%s' is not defined in this script"
raise ScriptError(msg % funcname)
return getattr(self.module, funcname)