##// END OF EJS Templates
test: move test environment initialization from the main code to tests
Anton Schur -
r6562:6cc40e54 default
parent child Browse files
Show More
@@ -1,215 +1,196 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 Global configuration file for TurboGears2 specific settings in Kallithea.
16 16
17 17 This file complements the .ini file.
18 18 """
19 19
20 20 import platform
21 21 import os, sys, logging
22 22
23 23 import tg
24 24 from tg import hooks
25 25 from tg.configuration import AppConfig
26 26 from tg.support.converters import asbool
27 27 import alembic
28 28 from alembic.script.base import ScriptDirectory
29 29 from alembic.migration import MigrationContext
30 30 from sqlalchemy import create_engine
31 31
32 32 from kallithea.lib.middleware.https_fixup import HttpsFixup
33 33 from kallithea.lib.middleware.simplegit import SimpleGit
34 34 from kallithea.lib.middleware.simplehg import SimpleHg
35 35 from kallithea.lib.auth import set_available_permissions
36 36 from kallithea.lib.db_manage import DbManage
37 37 from kallithea.lib.utils import load_rcextensions, make_ui, set_app_settings, set_vcs_config, \
38 38 set_indexer_config, check_git_version, repo2db_mapper
39 39 from kallithea.lib.utils2 import str2bool
40 40 from kallithea.model.scm import ScmModel
41 41
42 42 import formencode
43 43 import kallithea
44 44
45 45 log = logging.getLogger(__name__)
46 46
47 47
48 48 class KallitheaAppConfig(AppConfig):
49 49 # Note: AppConfig has a misleading name, as it's not the application
50 50 # configuration, but the application configurator. The AppConfig values are
51 51 # used as a template to create the actual configuration, which might
52 52 # overwrite or extend the one provided by the configurator template.
53 53
54 54 # To make it clear, AppConfig creates the config and sets into it the same
55 55 # values that AppConfig itself has. Then the values from the config file and
56 56 # gearbox options are loaded and merged into the configuration. Then an
57 57 # after_init_config(conf) method of AppConfig is called for any change that
58 58 # might depend on options provided by configuration files.
59 59
60 60 def __init__(self):
61 61 super(KallitheaAppConfig, self).__init__()
62 62
63 63 self['package'] = kallithea
64 64
65 65 self['prefer_toscawidgets2'] = False
66 66 self['use_toscawidgets'] = False
67 67
68 68 self['renderers'] = []
69 69
70 70 # Enable json in expose
71 71 self['renderers'].append('json')
72 72
73 73 # Configure template rendering
74 74 self['renderers'].append('mako')
75 75 self['default_renderer'] = 'mako'
76 76 self['use_dotted_templatenames'] = False
77 77
78 78 # Configure Sessions, store data as JSON to avoid pickle security issues
79 79 self['session.enabled'] = True
80 80 self['session.data_serializer'] = 'json'
81 81
82 82 # Configure the base SQLALchemy Setup
83 83 self['use_sqlalchemy'] = True
84 84 self['model'] = kallithea.model.base
85 85 self['DBSession'] = kallithea.model.meta.Session
86 86
87 87 # Configure App without an authentication backend.
88 88 self['auth_backend'] = None
89 89
90 90 # Use custom error page for these errors. By default, Turbogears2 does not add
91 91 # 400 in this list.
92 92 # Explicitly listing all is considered more robust than appending to defaults,
93 93 # in light of possible future framework changes.
94 94 self['errorpage.status_codes'] = [400, 401, 403, 404]
95 95
96 96 # Disable transaction manager -- currently Kallithea takes care of transactions itself
97 97 self['tm.enabled'] = False
98 98
99 99 base_config = KallitheaAppConfig()
100 100
101 101 # TODO still needed as long as we use pylonslib
102 102 sys.modules['pylons'] = tg
103 103
104 104 # DebugBar, a debug toolbar for TurboGears2.
105 105 # (https://github.com/TurboGears/tgext.debugbar)
106 106 # To enable it, install 'tgext.debugbar' and 'kajiki', and run Kallithea with
107 107 # 'debug = true' (not in production!)
108 108 # See the Kallithea documentation for more information.
109 109 try:
110 110 from tgext.debugbar import enable_debugbar
111 111 import kajiki # only to check its existence
112 112 except ImportError:
113 113 pass
114 114 else:
115 115 base_config['renderers'].append('kajiki')
116 116 enable_debugbar(base_config)
117 117
118 118
119 119 def setup_configuration(app):
120 120 config = app.config
121 121
122 122 if config.get('ignore_alembic_revision', False):
123 123 log.warn('database alembic revision checking is disabled')
124 124 else:
125 125 dbconf = config['sqlalchemy.url']
126 126 alembic_cfg = alembic.config.Config()
127 127 alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
128 128 alembic_cfg.set_main_option('sqlalchemy.url', dbconf)
129 129 script_dir = ScriptDirectory.from_config(alembic_cfg)
130 130 available_heads = sorted(script_dir.get_heads())
131 131
132 132 engine = create_engine(dbconf)
133 133 with engine.connect() as conn:
134 134 context = MigrationContext.configure(conn)
135 135 current_heads = sorted(str(s) for s in context.get_current_heads())
136 136 if current_heads != available_heads:
137 137 log.error('Failed to run Kallithea:\n\n'
138 138 'The database version does not match the Kallithea version.\n'
139 139 'Please read the documentation on how to upgrade or downgrade the database.\n'
140 140 'Current database version id(s): %s\n'
141 141 'Expected database version id(s): %s\n'
142 142 'If you are a developer and you know what you are doing, you can add `ignore_alembic_revision = True` '
143 143 'to your .ini file to skip the check.\n' % (' '.join(current_heads), ' '.join(available_heads)))
144 144 sys.exit(1)
145 145
146 146 # store some globals into kallithea
147 147 kallithea.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
148 148 kallithea.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
149 149 kallithea.CONFIG = config
150 150
151 151 load_rcextensions(root_path=config['here'])
152 152
153 # FIXME move test setup code out of here
154 test = os.path.split(config['__file__'])[-1] == 'test.ini'
155 if test:
156 test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
157 test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
158 if os.environ.get('TEST_DB'):
159 # swap config if we pass environment variable
160 config['sqlalchemy.url'] = os.environ.get('TEST_DB')
161
162 from kallithea.tests.fixture import create_test_env, create_test_index
163 from kallithea.tests.base import TESTS_TMP_PATH
164 #set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
165 #test repos
166 if test_env:
167 create_test_env(TESTS_TMP_PATH, config)
168 #set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
169 if test_index:
170 create_test_index(TESTS_TMP_PATH, config, True)
171
172 153 set_available_permissions(config)
173 154 repos_path = make_ui('db').configitems('paths')[0][1]
174 155 config['base_path'] = repos_path
175 156 set_app_settings(config)
176 157
177 158 instance_id = kallithea.CONFIG.get('instance_id', '*')
178 159 if instance_id == '*':
179 160 instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
180 161 kallithea.CONFIG['instance_id'] = instance_id
181 162
182 163 # update kallithea.CONFIG with the meanwhile changed 'config'
183 164 kallithea.CONFIG.update(config)
184 165
185 166 # configure vcs and indexer libraries (they are supposed to be independent
186 167 # as much as possible and thus avoid importing tg.config or
187 168 # kallithea.CONFIG).
188 169 set_vcs_config(kallithea.CONFIG)
189 170 set_indexer_config(kallithea.CONFIG)
190 171
191 172 check_git_version()
192 173
193 174 if str2bool(config.get('initial_repo_scan', True)):
194 175 repo2db_mapper(ScmModel().repo_scan(repos_path),
195 176 remove_obsolete=False, install_git_hooks=False)
196 177
197 178 formencode.api.set_stdtranslation(languages=[config.get('lang')])
198 179
199 180 hooks.register('configure_new_app', setup_configuration)
200 181
201 182
202 183 def setup_application(app):
203 184 config = app.config
204 185
205 186 # we want our low level middleware to get to the request ASAP. We don't
206 187 # need any stack middleware in them - especially no StatusCodeRedirect buffering
207 188 app = SimpleHg(app, config)
208 189 app = SimpleGit(app, config)
209 190
210 191 # Enable https redirects based on HTTP_X_URL_SCHEME set by proxy
211 192 if any(asbool(config.get(x)) for x in ['https_fixup', 'force_https', 'use_htsts']):
212 193 app = HttpsFixup(app, config)
213 194 return app
214 195
215 196 hooks.register('before_config', setup_application)
@@ -1,123 +1,141 b''
1 1 import os
2 2 import sys
3 3 import logging
4 4 import pkg_resources
5 5
6 from paste.deploy import loadapp
6 from paste.deploy import loadwsgi
7 7 from routes.util import URLGenerator
8 from tg import config
9 8
10 9 import pytest
11 10 from kallithea.controllers.root import RootController
12 11 from kallithea.model.user import UserModel
13 12 from kallithea.model.meta import Session
14 13 from kallithea.model.db import Setting, User, UserIpMap
15 14 from kallithea.tests.base import invalidate_all_caches, TEST_USER_REGULAR_LOGIN
16 15 import kallithea.tests.base # FIXME: needed for setting testapp instance!!!
17 16
18 17 from tg.util.webtest import test_context
19 18
20 19 def pytest_configure():
21 20 path = os.getcwd()
22 21 sys.path.insert(0, path)
23 22 pkg_resources.working_set.add_entry(path)
24 23
25 24 # Disable INFO logging of test database creation, restore with NOTSET
26 25 logging.disable(logging.INFO)
27 kallithea.tests.base.testapp = loadapp('config:kallithea/tests/test.ini', relative_to=path)
26
27 context = loadwsgi.loadcontext(loadwsgi.APP, 'config:kallithea/tests/test.ini', relative_to=path)
28
29 test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
30 test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
31 if os.environ.get('TEST_DB'):
32 # swap config if we pass environment variable
33 context.local_conf['sqlalchemy.url'] = os.environ.get('TEST_DB')
34
35 from kallithea.tests.fixture import create_test_env, create_test_index
36 from kallithea.tests.base import TESTS_TMP_PATH
37 # set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
38 # test repos
39 if test_env:
40 create_test_env(TESTS_TMP_PATH, context.config())
41 # set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
42 if test_index:
43 create_test_index(TESTS_TMP_PATH, context.config(), True)
44
45 kallithea.tests.base.testapp = context.create()
28 46 logging.disable(logging.NOTSET)
29 47
30 48 kallithea.tests.base.url = URLGenerator(RootController().mapper, kallithea.tests.base.environ)
31 49
32 50
33 51 @pytest.fixture
34 52 def create_test_user():
35 53 """Provide users that automatically disappear after test is over."""
36 54 test_user_ids = []
37 55 def _create_test_user(user_form):
38 56 user = UserModel().create(user_form)
39 57 test_user_ids.append(user.user_id)
40 58 return user
41 59 yield _create_test_user
42 60 for user_id in test_user_ids:
43 61 UserModel().delete(user_id)
44 62 Session().commit()
45 63
46 64
47 65 def _set_settings(*kvtseq):
48 66 session = Session()
49 67 for kvt in kvtseq:
50 68 assert len(kvt) in (2, 3)
51 69 k = kvt[0]
52 70 v = kvt[1]
53 71 t = kvt[2] if len(kvt) == 3 else 'unicode'
54 72 Setting.create_or_update(k, v, t)
55 73 session.commit()
56 74
57 75
58 76 @pytest.fixture
59 77 def set_test_settings():
60 78 """Restore settings after test is over."""
61 79 # Save settings.
62 80 settings_snapshot = [
63 81 (s.app_settings_name, s.app_settings_value, s.app_settings_type)
64 82 for s in Setting.query().all()]
65 83 yield _set_settings
66 84 # Restore settings.
67 85 session = Session()
68 86 keys = frozenset(k for (k, v, t) in settings_snapshot)
69 87 for s in Setting.query().all():
70 88 if s.app_settings_name not in keys:
71 89 session.delete(s)
72 90 for k, v, t in settings_snapshot:
73 91 if t == 'list' and hasattr(v, '__iter__'):
74 92 v = ','.join(v) # Quirk: must format list value manually.
75 93 Setting.create_or_update(k, v, t)
76 94 session.commit()
77 95
78 96 @pytest.fixture
79 97 def auto_clear_ip_permissions():
80 98 """Fixture that provides nothing but clearing IP permissions upon test
81 99 exit. This clearing is needed to avoid other test failing to make fake http
82 100 accesses."""
83 101 yield
84 102 # cleanup
85 103 user_model = UserModel()
86 104
87 105 user_ids = []
88 106 user_ids.append(User.get_default_user().user_id)
89 107 user_ids.append(User.get_by_username(TEST_USER_REGULAR_LOGIN).user_id)
90 108
91 109 for user_id in user_ids:
92 110 for ip in UserIpMap.query().filter(UserIpMap.user_id == user_id):
93 111 user_model.delete_extra_ip(user_id, ip.ip_id)
94 112
95 113 # IP permissions are cached, need to invalidate this cache explicitly
96 114 invalidate_all_caches()
97 115
98 116 @pytest.fixture
99 117 def test_context_fixture(app_fixture):
100 118 """
101 119 Encompass the entire test using this fixture in a test_context,
102 120 making sure that certain functionality still works even if no call to
103 121 self.app.get/post has been made.
104 122 The typical error message indicating you need a test_context is:
105 123 TypeError: No object (name: context) has been registered for this thread
106 124
107 125 The standard way to fix this is simply using the test_context context
108 126 manager directly inside your test:
109 127 with test_context(self.app):
110 128 <actions>
111 129 but if test setup code (xUnit-style or pytest fixtures) also needs to be
112 130 executed inside the test context, that method is not possible.
113 131 Even if there is no such setup code, the fixture may reduce code complexity
114 132 if the entire test needs to run inside a test context.
115 133
116 134 To apply this fixture (like any other fixture) to all test methods of a
117 135 class, use the following class decorator:
118 136 @pytest.mark.usefixtures("test_context_fixture")
119 137 class TestFoo(TestController):
120 138 ...
121 139 """
122 140 with test_context(app_fixture):
123 141 yield
@@ -1,390 +1,390 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14
15 15 """
16 16 Helpers for fixture generation
17 17 """
18 18
19 19 import logging
20 20 import os
21 21 import shutil
22 22 import tarfile
23 23 from os.path import dirname
24 24
25 25 from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist
26 26 from kallithea.model.meta import Session
27 27 from kallithea.model.repo import RepoModel
28 28 from kallithea.model.user import UserModel
29 29 from kallithea.model.repo_group import RepoGroupModel
30 30 from kallithea.model.user_group import UserGroupModel
31 31 from kallithea.model.gist import GistModel
32 32 from kallithea.model.scm import ScmModel
33 33 from kallithea.lib.db_manage import DbManage
34 34 from kallithea.lib.vcs.backends.base import EmptyChangeset
35 35 from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN
36 36
37 37
38 38 log = logging.getLogger(__name__)
39 39
40 40 FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
41 41
42 42
43 43 def error_function(*args, **kwargs):
44 44 raise Exception('Total Crash !')
45 45
46 46
47 47 class Fixture(object):
48 48
49 49 def __init__(self):
50 50 pass
51 51
52 52 def anon_access(self, status):
53 53 """
54 54 Context manager for controlling anonymous access.
55 55 Anon access will be set and committed, but restored again when exiting the block.
56 56
57 57 Usage:
58 58
59 59 fixture = Fixture()
60 60 with fixture.anon_access(False):
61 61 stuff
62 62 """
63 63
64 64 class context(object):
65 65 def __enter__(self):
66 66 anon = User.get_default_user()
67 67 self._before = anon.active
68 68 anon.active = status
69 69 Session().commit()
70 70 invalidate_all_caches()
71 71
72 72 def __exit__(self, exc_type, exc_val, exc_tb):
73 73 anon = User.get_default_user()
74 74 anon.active = self._before
75 75 Session().commit()
76 76
77 77 return context()
78 78
79 79 def _get_repo_create_params(self, **custom):
80 80 """Return form values to be validated through RepoForm"""
81 81 defs = dict(
82 82 repo_name=None,
83 83 repo_type='hg',
84 84 clone_uri='',
85 85 repo_group=u'-1',
86 86 repo_description=u'DESC',
87 87 repo_private=False,
88 88 repo_landing_rev='rev:tip',
89 89 repo_copy_permissions=False,
90 90 repo_state=Repository.STATE_CREATED,
91 91 )
92 92 defs.update(custom)
93 93 if 'repo_name_full' not in custom:
94 94 defs.update({'repo_name_full': defs['repo_name']})
95 95
96 96 # fix the repo name if passed as repo_name_full
97 97 if defs['repo_name']:
98 98 defs['repo_name'] = defs['repo_name'].split('/')[-1]
99 99
100 100 return defs
101 101
102 102 def _get_repo_group_create_params(self, **custom):
103 103 """Return form values to be validated through RepoGroupForm"""
104 104 defs = dict(
105 105 group_name=None,
106 106 group_description=u'DESC',
107 107 parent_group_id=u'-1',
108 108 perms_updates=[],
109 109 perms_new=[],
110 110 enable_locking=False,
111 111 recursive=False
112 112 )
113 113 defs.update(custom)
114 114
115 115 return defs
116 116
117 117 def _get_user_create_params(self, name, **custom):
118 118 defs = dict(
119 119 username=name,
120 120 password='qweqwe',
121 121 email='%s+test@example.com' % name,
122 122 firstname=u'TestUser',
123 123 lastname=u'Test',
124 124 active=True,
125 125 admin=False,
126 126 extern_type='internal',
127 127 extern_name=None
128 128 )
129 129 defs.update(custom)
130 130
131 131 return defs
132 132
133 133 def _get_user_group_create_params(self, name, **custom):
134 134 defs = dict(
135 135 users_group_name=name,
136 136 user_group_description=u'DESC',
137 137 users_group_active=True,
138 138 user_group_data={},
139 139 )
140 140 defs.update(custom)
141 141
142 142 return defs
143 143
144 144 def create_repo(self, name, repo_group=None, **kwargs):
145 145 if 'skip_if_exists' in kwargs:
146 146 del kwargs['skip_if_exists']
147 147 r = Repository.get_by_repo_name(name)
148 148 if r:
149 149 return r
150 150
151 151 if isinstance(repo_group, RepoGroup):
152 152 repo_group = repo_group.group_id
153 153
154 154 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
155 155 form_data['repo_group'] = repo_group # patch form dict so it can be used directly by model
156 156 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
157 157 RepoModel().create(form_data, cur_user)
158 158 Session().commit()
159 159 ScmModel().mark_for_invalidation(name)
160 160 return Repository.get_by_repo_name(name)
161 161
162 162 def create_fork(self, repo_to_fork, fork_name, **kwargs):
163 163 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
164 164
165 165 form_data = self._get_repo_create_params(repo_name=fork_name,
166 166 fork_parent_id=repo_to_fork,
167 167 repo_type=repo_to_fork.repo_type,
168 168 **kwargs)
169 169 # patch form dict so it can be used directly by model
170 170 form_data['description'] = form_data['repo_description']
171 171 form_data['private'] = form_data['repo_private']
172 172 form_data['landing_rev'] = form_data['repo_landing_rev']
173 173
174 174 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
175 175 RepoModel().create_fork(form_data, cur_user=owner)
176 176 Session().commit()
177 177 ScmModel().mark_for_invalidation(fork_name)
178 178 r = Repository.get_by_repo_name(fork_name)
179 179 assert r
180 180 return r
181 181
182 182 def destroy_repo(self, repo_name, **kwargs):
183 183 RepoModel().delete(repo_name, **kwargs)
184 184 Session().commit()
185 185
186 186 def create_repo_group(self, name, parent_group_id=None, **kwargs):
187 187 if 'skip_if_exists' in kwargs:
188 188 del kwargs['skip_if_exists']
189 189 gr = RepoGroup.get_by_group_name(group_name=name)
190 190 if gr:
191 191 return gr
192 192 form_data = self._get_repo_group_create_params(group_name=name, **kwargs)
193 193 gr = RepoGroupModel().create(
194 194 group_name=form_data['group_name'],
195 195 group_description=form_data['group_name'],
196 196 parent=parent_group_id,
197 197 owner=kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN),
198 198 )
199 199 Session().commit()
200 200 gr = RepoGroup.get_by_group_name(gr.group_name)
201 201 return gr
202 202
203 203 def destroy_repo_group(self, repogroupid):
204 204 RepoGroupModel().delete(repogroupid)
205 205 Session().commit()
206 206
207 207 def create_user(self, name, **kwargs):
208 208 if 'skip_if_exists' in kwargs:
209 209 del kwargs['skip_if_exists']
210 210 user = User.get_by_username(name)
211 211 if user:
212 212 return user
213 213 form_data = self._get_user_create_params(name, **kwargs)
214 214 user = UserModel().create(form_data)
215 215 Session().commit()
216 216 user = User.get_by_username(user.username)
217 217 return user
218 218
219 219 def destroy_user(self, userid):
220 220 UserModel().delete(userid)
221 221 Session().commit()
222 222
223 223 def create_user_group(self, name, **kwargs):
224 224 if 'skip_if_exists' in kwargs:
225 225 del kwargs['skip_if_exists']
226 226 gr = UserGroup.get_by_group_name(group_name=name)
227 227 if gr:
228 228 return gr
229 229 form_data = self._get_user_group_create_params(name, **kwargs)
230 230 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
231 231 user_group = UserGroupModel().create(
232 232 name=form_data['users_group_name'],
233 233 description=form_data['user_group_description'],
234 234 owner=owner, active=form_data['users_group_active'],
235 235 group_data=form_data['user_group_data'])
236 236 Session().commit()
237 237 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
238 238 return user_group
239 239
240 240 def destroy_user_group(self, usergroupid):
241 241 UserGroupModel().delete(user_group=usergroupid, force=True)
242 242 Session().commit()
243 243
244 244 def create_gist(self, **kwargs):
245 245 form_data = {
246 246 'description': u'new-gist',
247 247 'owner': TEST_USER_ADMIN_LOGIN,
248 248 'gist_type': Gist.GIST_PUBLIC,
249 249 'lifetime': -1,
250 250 'gist_mapping': {'filename1.txt':{'content':'hello world'},}
251 251 }
252 252 form_data.update(kwargs)
253 253 gist = GistModel().create(
254 254 description=form_data['description'],owner=form_data['owner'],
255 255 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
256 256 lifetime=form_data['lifetime']
257 257 )
258 258 Session().commit()
259 259
260 260 return gist
261 261
262 262 def destroy_gists(self, gistid=None):
263 263 for g in Gist.query():
264 264 if gistid:
265 265 if gistid == g.gist_access_id:
266 266 GistModel().delete(g)
267 267 else:
268 268 GistModel().delete(g)
269 269 Session().commit()
270 270
271 271 def load_resource(self, resource_name, strip=True):
272 272 with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
273 273 source = f.read()
274 274 if strip:
275 275 source = source.strip()
276 276
277 277 return source
278 278
279 279 def commit_change(self, repo, filename, content, message, vcs_type,
280 280 parent=None, newfile=False, author=None):
281 281 repo = Repository.get_by_repo_name(repo)
282 282 _cs = parent
283 283 if parent is None:
284 284 _cs = EmptyChangeset(alias=vcs_type)
285 285 if author is None:
286 286 author = TEST_USER_ADMIN_LOGIN
287 287
288 288 if newfile:
289 289 nodes = {
290 290 filename: {
291 291 'content': content
292 292 }
293 293 }
294 294 cs = ScmModel().create_nodes(
295 295 user=TEST_USER_ADMIN_LOGIN, repo=repo,
296 296 message=message,
297 297 nodes=nodes,
298 298 parent_cs=_cs,
299 299 author=author,
300 300 )
301 301 else:
302 302 cs = ScmModel().commit_change(
303 303 repo=repo.scm_instance, repo_name=repo.repo_name,
304 304 cs=parent, user=TEST_USER_ADMIN_LOGIN,
305 305 author=author,
306 306 message=message,
307 307 content=content,
308 308 f_path=filename
309 309 )
310 310 return cs
311 311
312 312
313 313 #==============================================================================
314 314 # Global test environment setup
315 315 #==============================================================================
316 316
317 317 def create_test_env(repos_test_path, config):
318 318 """
319 319 Makes a fresh database and
320 320 install test repository into tmp dir
321 321 """
322 322
323 323 # PART ONE create db
324 324 dbconf = config['sqlalchemy.url']
325 325 log.debug('making test db %s', dbconf)
326 326
327 327 # create test dir if it doesn't exist
328 328 if not os.path.isdir(repos_test_path):
329 329 log.debug('Creating testdir %s', repos_test_path)
330 330 os.makedirs(repos_test_path)
331 331
332 332 dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
333 333 tests=True)
334 334 dbmanage.create_tables(override=True)
335 335 # for tests dynamically set new root paths based on generated content
336 336 dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
337 337 dbmanage.create_default_user()
338 338 dbmanage.admin_prompt()
339 339 dbmanage.create_permissions()
340 340 dbmanage.populate_default_permissions()
341 341 Session().commit()
342 342 # PART TWO make test repo
343 343 log.debug('making test vcs repositories')
344 344
345 idx_path = config['app_conf']['index_dir']
346 data_path = config['app_conf']['cache_dir']
345 idx_path = config['index_dir']
346 data_path = config['cache_dir']
347 347
348 348 #clean index and data
349 349 if idx_path and os.path.exists(idx_path):
350 350 log.debug('remove %s', idx_path)
351 351 shutil.rmtree(idx_path)
352 352
353 353 if data_path and os.path.exists(data_path):
354 354 log.debug('remove %s', data_path)
355 355 shutil.rmtree(data_path)
356 356
357 357 #CREATE DEFAULT TEST REPOS
358 358 tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
359 359 tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
360 360 tar.close()
361 361
362 362 tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
363 363 tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
364 364 tar.close()
365 365
366 366 #LOAD VCS test stuff
367 367 from kallithea.tests.vcs import setup_package
368 368 setup_package()
369 369
370 370
371 371 def create_test_index(repo_location, config, full_index):
372 372 """
373 373 Makes default test index
374 374 """
375 375
376 376 from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
377 377 from kallithea.lib.pidlock import DaemonLock, LockHeld
378 378
379 index_location = os.path.join(config['app_conf']['index_dir'])
379 index_location = os.path.join(config['index_dir'])
380 380 if not os.path.exists(index_location):
381 381 os.makedirs(index_location)
382 382
383 383 try:
384 384 l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
385 385 WhooshIndexingDaemon(index_location=index_location,
386 386 repo_location=repo_location) \
387 387 .run(full_index=full_index)
388 388 l.release()
389 389 except LockHeld:
390 390 pass
General Comments 0
You need to be logged in to leave comments. Login now