##// END OF EJS Templates
test: move test environment initialization from the main code to tests
Anton Schur -
r6562:6cc40e54 default
parent child Browse files
Show More
@@ -1,215 +1,196 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 """
14 """
15 Global configuration file for TurboGears2 specific settings in Kallithea.
15 Global configuration file for TurboGears2 specific settings in Kallithea.
16
16
17 This file complements the .ini file.
17 This file complements the .ini file.
18 """
18 """
19
19
20 import platform
20 import platform
21 import os, sys, logging
21 import os, sys, logging
22
22
23 import tg
23 import tg
24 from tg import hooks
24 from tg import hooks
25 from tg.configuration import AppConfig
25 from tg.configuration import AppConfig
26 from tg.support.converters import asbool
26 from tg.support.converters import asbool
27 import alembic
27 import alembic
28 from alembic.script.base import ScriptDirectory
28 from alembic.script.base import ScriptDirectory
29 from alembic.migration import MigrationContext
29 from alembic.migration import MigrationContext
30 from sqlalchemy import create_engine
30 from sqlalchemy import create_engine
31
31
32 from kallithea.lib.middleware.https_fixup import HttpsFixup
32 from kallithea.lib.middleware.https_fixup import HttpsFixup
33 from kallithea.lib.middleware.simplegit import SimpleGit
33 from kallithea.lib.middleware.simplegit import SimpleGit
34 from kallithea.lib.middleware.simplehg import SimpleHg
34 from kallithea.lib.middleware.simplehg import SimpleHg
35 from kallithea.lib.auth import set_available_permissions
35 from kallithea.lib.auth import set_available_permissions
36 from kallithea.lib.db_manage import DbManage
36 from kallithea.lib.db_manage import DbManage
37 from kallithea.lib.utils import load_rcextensions, make_ui, set_app_settings, set_vcs_config, \
37 from kallithea.lib.utils import load_rcextensions, make_ui, set_app_settings, set_vcs_config, \
38 set_indexer_config, check_git_version, repo2db_mapper
38 set_indexer_config, check_git_version, repo2db_mapper
39 from kallithea.lib.utils2 import str2bool
39 from kallithea.lib.utils2 import str2bool
40 from kallithea.model.scm import ScmModel
40 from kallithea.model.scm import ScmModel
41
41
42 import formencode
42 import formencode
43 import kallithea
43 import kallithea
44
44
45 log = logging.getLogger(__name__)
45 log = logging.getLogger(__name__)
46
46
47
47
48 class KallitheaAppConfig(AppConfig):
48 class KallitheaAppConfig(AppConfig):
49 # Note: AppConfig has a misleading name, as it's not the application
49 # Note: AppConfig has a misleading name, as it's not the application
50 # configuration, but the application configurator. The AppConfig values are
50 # configuration, but the application configurator. The AppConfig values are
51 # used as a template to create the actual configuration, which might
51 # used as a template to create the actual configuration, which might
52 # overwrite or extend the one provided by the configurator template.
52 # overwrite or extend the one provided by the configurator template.
53
53
54 # To make it clear, AppConfig creates the config and sets into it the same
54 # To make it clear, AppConfig creates the config and sets into it the same
55 # values that AppConfig itself has. Then the values from the config file and
55 # values that AppConfig itself has. Then the values from the config file and
56 # gearbox options are loaded and merged into the configuration. Then an
56 # gearbox options are loaded and merged into the configuration. Then an
57 # after_init_config(conf) method of AppConfig is called for any change that
57 # after_init_config(conf) method of AppConfig is called for any change that
58 # might depend on options provided by configuration files.
58 # might depend on options provided by configuration files.
59
59
60 def __init__(self):
60 def __init__(self):
61 super(KallitheaAppConfig, self).__init__()
61 super(KallitheaAppConfig, self).__init__()
62
62
63 self['package'] = kallithea
63 self['package'] = kallithea
64
64
65 self['prefer_toscawidgets2'] = False
65 self['prefer_toscawidgets2'] = False
66 self['use_toscawidgets'] = False
66 self['use_toscawidgets'] = False
67
67
68 self['renderers'] = []
68 self['renderers'] = []
69
69
70 # Enable json in expose
70 # Enable json in expose
71 self['renderers'].append('json')
71 self['renderers'].append('json')
72
72
73 # Configure template rendering
73 # Configure template rendering
74 self['renderers'].append('mako')
74 self['renderers'].append('mako')
75 self['default_renderer'] = 'mako'
75 self['default_renderer'] = 'mako'
76 self['use_dotted_templatenames'] = False
76 self['use_dotted_templatenames'] = False
77
77
78 # Configure Sessions, store data as JSON to avoid pickle security issues
78 # Configure Sessions, store data as JSON to avoid pickle security issues
79 self['session.enabled'] = True
79 self['session.enabled'] = True
80 self['session.data_serializer'] = 'json'
80 self['session.data_serializer'] = 'json'
81
81
82 # Configure the base SQLALchemy Setup
82 # Configure the base SQLALchemy Setup
83 self['use_sqlalchemy'] = True
83 self['use_sqlalchemy'] = True
84 self['model'] = kallithea.model.base
84 self['model'] = kallithea.model.base
85 self['DBSession'] = kallithea.model.meta.Session
85 self['DBSession'] = kallithea.model.meta.Session
86
86
87 # Configure App without an authentication backend.
87 # Configure App without an authentication backend.
88 self['auth_backend'] = None
88 self['auth_backend'] = None
89
89
90 # Use custom error page for these errors. By default, Turbogears2 does not add
90 # Use custom error page for these errors. By default, Turbogears2 does not add
91 # 400 in this list.
91 # 400 in this list.
92 # Explicitly listing all is considered more robust than appending to defaults,
92 # Explicitly listing all is considered more robust than appending to defaults,
93 # in light of possible future framework changes.
93 # in light of possible future framework changes.
94 self['errorpage.status_codes'] = [400, 401, 403, 404]
94 self['errorpage.status_codes'] = [400, 401, 403, 404]
95
95
96 # Disable transaction manager -- currently Kallithea takes care of transactions itself
96 # Disable transaction manager -- currently Kallithea takes care of transactions itself
97 self['tm.enabled'] = False
97 self['tm.enabled'] = False
98
98
99 base_config = KallitheaAppConfig()
99 base_config = KallitheaAppConfig()
100
100
101 # TODO still needed as long as we use pylonslib
101 # TODO still needed as long as we use pylonslib
102 sys.modules['pylons'] = tg
102 sys.modules['pylons'] = tg
103
103
104 # DebugBar, a debug toolbar for TurboGears2.
104 # DebugBar, a debug toolbar for TurboGears2.
105 # (https://github.com/TurboGears/tgext.debugbar)
105 # (https://github.com/TurboGears/tgext.debugbar)
106 # To enable it, install 'tgext.debugbar' and 'kajiki', and run Kallithea with
106 # To enable it, install 'tgext.debugbar' and 'kajiki', and run Kallithea with
107 # 'debug = true' (not in production!)
107 # 'debug = true' (not in production!)
108 # See the Kallithea documentation for more information.
108 # See the Kallithea documentation for more information.
109 try:
109 try:
110 from tgext.debugbar import enable_debugbar
110 from tgext.debugbar import enable_debugbar
111 import kajiki # only to check its existence
111 import kajiki # only to check its existence
112 except ImportError:
112 except ImportError:
113 pass
113 pass
114 else:
114 else:
115 base_config['renderers'].append('kajiki')
115 base_config['renderers'].append('kajiki')
116 enable_debugbar(base_config)
116 enable_debugbar(base_config)
117
117
118
118
119 def setup_configuration(app):
119 def setup_configuration(app):
120 config = app.config
120 config = app.config
121
121
122 if config.get('ignore_alembic_revision', False):
122 if config.get('ignore_alembic_revision', False):
123 log.warn('database alembic revision checking is disabled')
123 log.warn('database alembic revision checking is disabled')
124 else:
124 else:
125 dbconf = config['sqlalchemy.url']
125 dbconf = config['sqlalchemy.url']
126 alembic_cfg = alembic.config.Config()
126 alembic_cfg = alembic.config.Config()
127 alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
127 alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
128 alembic_cfg.set_main_option('sqlalchemy.url', dbconf)
128 alembic_cfg.set_main_option('sqlalchemy.url', dbconf)
129 script_dir = ScriptDirectory.from_config(alembic_cfg)
129 script_dir = ScriptDirectory.from_config(alembic_cfg)
130 available_heads = sorted(script_dir.get_heads())
130 available_heads = sorted(script_dir.get_heads())
131
131
132 engine = create_engine(dbconf)
132 engine = create_engine(dbconf)
133 with engine.connect() as conn:
133 with engine.connect() as conn:
134 context = MigrationContext.configure(conn)
134 context = MigrationContext.configure(conn)
135 current_heads = sorted(str(s) for s in context.get_current_heads())
135 current_heads = sorted(str(s) for s in context.get_current_heads())
136 if current_heads != available_heads:
136 if current_heads != available_heads:
137 log.error('Failed to run Kallithea:\n\n'
137 log.error('Failed to run Kallithea:\n\n'
138 'The database version does not match the Kallithea version.\n'
138 'The database version does not match the Kallithea version.\n'
139 'Please read the documentation on how to upgrade or downgrade the database.\n'
139 'Please read the documentation on how to upgrade or downgrade the database.\n'
140 'Current database version id(s): %s\n'
140 'Current database version id(s): %s\n'
141 'Expected database version id(s): %s\n'
141 'Expected database version id(s): %s\n'
142 'If you are a developer and you know what you are doing, you can add `ignore_alembic_revision = True` '
142 'If you are a developer and you know what you are doing, you can add `ignore_alembic_revision = True` '
143 'to your .ini file to skip the check.\n' % (' '.join(current_heads), ' '.join(available_heads)))
143 'to your .ini file to skip the check.\n' % (' '.join(current_heads), ' '.join(available_heads)))
144 sys.exit(1)
144 sys.exit(1)
145
145
146 # store some globals into kallithea
146 # store some globals into kallithea
147 kallithea.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
147 kallithea.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
148 kallithea.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
148 kallithea.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
149 kallithea.CONFIG = config
149 kallithea.CONFIG = config
150
150
151 load_rcextensions(root_path=config['here'])
151 load_rcextensions(root_path=config['here'])
152
152
153 # FIXME move test setup code out of here
154 test = os.path.split(config['__file__'])[-1] == 'test.ini'
155 if test:
156 test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
157 test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
158 if os.environ.get('TEST_DB'):
159 # swap config if we pass environment variable
160 config['sqlalchemy.url'] = os.environ.get('TEST_DB')
161
162 from kallithea.tests.fixture import create_test_env, create_test_index
163 from kallithea.tests.base import TESTS_TMP_PATH
164 #set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
165 #test repos
166 if test_env:
167 create_test_env(TESTS_TMP_PATH, config)
168 #set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
169 if test_index:
170 create_test_index(TESTS_TMP_PATH, config, True)
171
172 set_available_permissions(config)
153 set_available_permissions(config)
173 repos_path = make_ui('db').configitems('paths')[0][1]
154 repos_path = make_ui('db').configitems('paths')[0][1]
174 config['base_path'] = repos_path
155 config['base_path'] = repos_path
175 set_app_settings(config)
156 set_app_settings(config)
176
157
177 instance_id = kallithea.CONFIG.get('instance_id', '*')
158 instance_id = kallithea.CONFIG.get('instance_id', '*')
178 if instance_id == '*':
159 if instance_id == '*':
179 instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
160 instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
180 kallithea.CONFIG['instance_id'] = instance_id
161 kallithea.CONFIG['instance_id'] = instance_id
181
162
182 # update kallithea.CONFIG with the meanwhile changed 'config'
163 # update kallithea.CONFIG with the meanwhile changed 'config'
183 kallithea.CONFIG.update(config)
164 kallithea.CONFIG.update(config)
184
165
185 # configure vcs and indexer libraries (they are supposed to be independent
166 # configure vcs and indexer libraries (they are supposed to be independent
186 # as much as possible and thus avoid importing tg.config or
167 # as much as possible and thus avoid importing tg.config or
187 # kallithea.CONFIG).
168 # kallithea.CONFIG).
188 set_vcs_config(kallithea.CONFIG)
169 set_vcs_config(kallithea.CONFIG)
189 set_indexer_config(kallithea.CONFIG)
170 set_indexer_config(kallithea.CONFIG)
190
171
191 check_git_version()
172 check_git_version()
192
173
193 if str2bool(config.get('initial_repo_scan', True)):
174 if str2bool(config.get('initial_repo_scan', True)):
194 repo2db_mapper(ScmModel().repo_scan(repos_path),
175 repo2db_mapper(ScmModel().repo_scan(repos_path),
195 remove_obsolete=False, install_git_hooks=False)
176 remove_obsolete=False, install_git_hooks=False)
196
177
197 formencode.api.set_stdtranslation(languages=[config.get('lang')])
178 formencode.api.set_stdtranslation(languages=[config.get('lang')])
198
179
199 hooks.register('configure_new_app', setup_configuration)
180 hooks.register('configure_new_app', setup_configuration)
200
181
201
182
202 def setup_application(app):
183 def setup_application(app):
203 config = app.config
184 config = app.config
204
185
205 # we want our low level middleware to get to the request ASAP. We don't
186 # we want our low level middleware to get to the request ASAP. We don't
206 # need any stack middleware in them - especially no StatusCodeRedirect buffering
187 # need any stack middleware in them - especially no StatusCodeRedirect buffering
207 app = SimpleHg(app, config)
188 app = SimpleHg(app, config)
208 app = SimpleGit(app, config)
189 app = SimpleGit(app, config)
209
190
210 # Enable https redirects based on HTTP_X_URL_SCHEME set by proxy
191 # Enable https redirects based on HTTP_X_URL_SCHEME set by proxy
211 if any(asbool(config.get(x)) for x in ['https_fixup', 'force_https', 'use_htsts']):
192 if any(asbool(config.get(x)) for x in ['https_fixup', 'force_https', 'use_htsts']):
212 app = HttpsFixup(app, config)
193 app = HttpsFixup(app, config)
213 return app
194 return app
214
195
215 hooks.register('before_config', setup_application)
196 hooks.register('before_config', setup_application)
@@ -1,123 +1,141 b''
1 import os
1 import os
2 import sys
2 import sys
3 import logging
3 import logging
4 import pkg_resources
4 import pkg_resources
5
5
6 from paste.deploy import loadapp
6 from paste.deploy import loadwsgi
7 from routes.util import URLGenerator
7 from routes.util import URLGenerator
8 from tg import config
9
8
10 import pytest
9 import pytest
11 from kallithea.controllers.root import RootController
10 from kallithea.controllers.root import RootController
12 from kallithea.model.user import UserModel
11 from kallithea.model.user import UserModel
13 from kallithea.model.meta import Session
12 from kallithea.model.meta import Session
14 from kallithea.model.db import Setting, User, UserIpMap
13 from kallithea.model.db import Setting, User, UserIpMap
15 from kallithea.tests.base import invalidate_all_caches, TEST_USER_REGULAR_LOGIN
14 from kallithea.tests.base import invalidate_all_caches, TEST_USER_REGULAR_LOGIN
16 import kallithea.tests.base # FIXME: needed for setting testapp instance!!!
15 import kallithea.tests.base # FIXME: needed for setting testapp instance!!!
17
16
18 from tg.util.webtest import test_context
17 from tg.util.webtest import test_context
19
18
20 def pytest_configure():
19 def pytest_configure():
21 path = os.getcwd()
20 path = os.getcwd()
22 sys.path.insert(0, path)
21 sys.path.insert(0, path)
23 pkg_resources.working_set.add_entry(path)
22 pkg_resources.working_set.add_entry(path)
24
23
25 # Disable INFO logging of test database creation, restore with NOTSET
24 # Disable INFO logging of test database creation, restore with NOTSET
26 logging.disable(logging.INFO)
25 logging.disable(logging.INFO)
27 kallithea.tests.base.testapp = loadapp('config:kallithea/tests/test.ini', relative_to=path)
26
27 context = loadwsgi.loadcontext(loadwsgi.APP, 'config:kallithea/tests/test.ini', relative_to=path)
28
29 test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
30 test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
31 if os.environ.get('TEST_DB'):
32 # swap config if we pass environment variable
33 context.local_conf['sqlalchemy.url'] = os.environ.get('TEST_DB')
34
35 from kallithea.tests.fixture import create_test_env, create_test_index
36 from kallithea.tests.base import TESTS_TMP_PATH
37 # set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
38 # test repos
39 if test_env:
40 create_test_env(TESTS_TMP_PATH, context.config())
41 # set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
42 if test_index:
43 create_test_index(TESTS_TMP_PATH, context.config(), True)
44
45 kallithea.tests.base.testapp = context.create()
28 logging.disable(logging.NOTSET)
46 logging.disable(logging.NOTSET)
29
47
30 kallithea.tests.base.url = URLGenerator(RootController().mapper, kallithea.tests.base.environ)
48 kallithea.tests.base.url = URLGenerator(RootController().mapper, kallithea.tests.base.environ)
31
49
32
50
33 @pytest.fixture
51 @pytest.fixture
34 def create_test_user():
52 def create_test_user():
35 """Provide users that automatically disappear after test is over."""
53 """Provide users that automatically disappear after test is over."""
36 test_user_ids = []
54 test_user_ids = []
37 def _create_test_user(user_form):
55 def _create_test_user(user_form):
38 user = UserModel().create(user_form)
56 user = UserModel().create(user_form)
39 test_user_ids.append(user.user_id)
57 test_user_ids.append(user.user_id)
40 return user
58 return user
41 yield _create_test_user
59 yield _create_test_user
42 for user_id in test_user_ids:
60 for user_id in test_user_ids:
43 UserModel().delete(user_id)
61 UserModel().delete(user_id)
44 Session().commit()
62 Session().commit()
45
63
46
64
47 def _set_settings(*kvtseq):
65 def _set_settings(*kvtseq):
48 session = Session()
66 session = Session()
49 for kvt in kvtseq:
67 for kvt in kvtseq:
50 assert len(kvt) in (2, 3)
68 assert len(kvt) in (2, 3)
51 k = kvt[0]
69 k = kvt[0]
52 v = kvt[1]
70 v = kvt[1]
53 t = kvt[2] if len(kvt) == 3 else 'unicode'
71 t = kvt[2] if len(kvt) == 3 else 'unicode'
54 Setting.create_or_update(k, v, t)
72 Setting.create_or_update(k, v, t)
55 session.commit()
73 session.commit()
56
74
57
75
58 @pytest.fixture
76 @pytest.fixture
59 def set_test_settings():
77 def set_test_settings():
60 """Restore settings after test is over."""
78 """Restore settings after test is over."""
61 # Save settings.
79 # Save settings.
62 settings_snapshot = [
80 settings_snapshot = [
63 (s.app_settings_name, s.app_settings_value, s.app_settings_type)
81 (s.app_settings_name, s.app_settings_value, s.app_settings_type)
64 for s in Setting.query().all()]
82 for s in Setting.query().all()]
65 yield _set_settings
83 yield _set_settings
66 # Restore settings.
84 # Restore settings.
67 session = Session()
85 session = Session()
68 keys = frozenset(k for (k, v, t) in settings_snapshot)
86 keys = frozenset(k for (k, v, t) in settings_snapshot)
69 for s in Setting.query().all():
87 for s in Setting.query().all():
70 if s.app_settings_name not in keys:
88 if s.app_settings_name not in keys:
71 session.delete(s)
89 session.delete(s)
72 for k, v, t in settings_snapshot:
90 for k, v, t in settings_snapshot:
73 if t == 'list' and hasattr(v, '__iter__'):
91 if t == 'list' and hasattr(v, '__iter__'):
74 v = ','.join(v) # Quirk: must format list value manually.
92 v = ','.join(v) # Quirk: must format list value manually.
75 Setting.create_or_update(k, v, t)
93 Setting.create_or_update(k, v, t)
76 session.commit()
94 session.commit()
77
95
78 @pytest.fixture
96 @pytest.fixture
79 def auto_clear_ip_permissions():
97 def auto_clear_ip_permissions():
80 """Fixture that provides nothing but clearing IP permissions upon test
98 """Fixture that provides nothing but clearing IP permissions upon test
81 exit. This clearing is needed to avoid other test failing to make fake http
99 exit. This clearing is needed to avoid other test failing to make fake http
82 accesses."""
100 accesses."""
83 yield
101 yield
84 # cleanup
102 # cleanup
85 user_model = UserModel()
103 user_model = UserModel()
86
104
87 user_ids = []
105 user_ids = []
88 user_ids.append(User.get_default_user().user_id)
106 user_ids.append(User.get_default_user().user_id)
89 user_ids.append(User.get_by_username(TEST_USER_REGULAR_LOGIN).user_id)
107 user_ids.append(User.get_by_username(TEST_USER_REGULAR_LOGIN).user_id)
90
108
91 for user_id in user_ids:
109 for user_id in user_ids:
92 for ip in UserIpMap.query().filter(UserIpMap.user_id == user_id):
110 for ip in UserIpMap.query().filter(UserIpMap.user_id == user_id):
93 user_model.delete_extra_ip(user_id, ip.ip_id)
111 user_model.delete_extra_ip(user_id, ip.ip_id)
94
112
95 # IP permissions are cached, need to invalidate this cache explicitly
113 # IP permissions are cached, need to invalidate this cache explicitly
96 invalidate_all_caches()
114 invalidate_all_caches()
97
115
98 @pytest.fixture
116 @pytest.fixture
99 def test_context_fixture(app_fixture):
117 def test_context_fixture(app_fixture):
100 """
118 """
101 Encompass the entire test using this fixture in a test_context,
119 Encompass the entire test using this fixture in a test_context,
102 making sure that certain functionality still works even if no call to
120 making sure that certain functionality still works even if no call to
103 self.app.get/post has been made.
121 self.app.get/post has been made.
104 The typical error message indicating you need a test_context is:
122 The typical error message indicating you need a test_context is:
105 TypeError: No object (name: context) has been registered for this thread
123 TypeError: No object (name: context) has been registered for this thread
106
124
107 The standard way to fix this is simply using the test_context context
125 The standard way to fix this is simply using the test_context context
108 manager directly inside your test:
126 manager directly inside your test:
109 with test_context(self.app):
127 with test_context(self.app):
110 <actions>
128 <actions>
111 but if test setup code (xUnit-style or pytest fixtures) also needs to be
129 but if test setup code (xUnit-style or pytest fixtures) also needs to be
112 executed inside the test context, that method is not possible.
130 executed inside the test context, that method is not possible.
113 Even if there is no such setup code, the fixture may reduce code complexity
131 Even if there is no such setup code, the fixture may reduce code complexity
114 if the entire test needs to run inside a test context.
132 if the entire test needs to run inside a test context.
115
133
116 To apply this fixture (like any other fixture) to all test methods of a
134 To apply this fixture (like any other fixture) to all test methods of a
117 class, use the following class decorator:
135 class, use the following class decorator:
118 @pytest.mark.usefixtures("test_context_fixture")
136 @pytest.mark.usefixtures("test_context_fixture")
119 class TestFoo(TestController):
137 class TestFoo(TestController):
120 ...
138 ...
121 """
139 """
122 with test_context(app_fixture):
140 with test_context(app_fixture):
123 yield
141 yield
@@ -1,390 +1,390 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 # This program is free software: you can redistribute it and/or modify
2 # This program is free software: you can redistribute it and/or modify
3 # it under the terms of the GNU General Public License as published by
3 # it under the terms of the GNU General Public License as published by
4 # the Free Software Foundation, either version 3 of the License, or
4 # the Free Software Foundation, either version 3 of the License, or
5 # (at your option) any later version.
5 # (at your option) any later version.
6 #
6 #
7 # This program is distributed in the hope that it will be useful,
7 # This program is distributed in the hope that it will be useful,
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 # GNU General Public License for more details.
10 # GNU General Public License for more details.
11 #
11 #
12 # You should have received a copy of the GNU General Public License
12 # You should have received a copy of the GNU General Public License
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14
14
15 """
15 """
16 Helpers for fixture generation
16 Helpers for fixture generation
17 """
17 """
18
18
19 import logging
19 import logging
20 import os
20 import os
21 import shutil
21 import shutil
22 import tarfile
22 import tarfile
23 from os.path import dirname
23 from os.path import dirname
24
24
25 from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist
25 from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist
26 from kallithea.model.meta import Session
26 from kallithea.model.meta import Session
27 from kallithea.model.repo import RepoModel
27 from kallithea.model.repo import RepoModel
28 from kallithea.model.user import UserModel
28 from kallithea.model.user import UserModel
29 from kallithea.model.repo_group import RepoGroupModel
29 from kallithea.model.repo_group import RepoGroupModel
30 from kallithea.model.user_group import UserGroupModel
30 from kallithea.model.user_group import UserGroupModel
31 from kallithea.model.gist import GistModel
31 from kallithea.model.gist import GistModel
32 from kallithea.model.scm import ScmModel
32 from kallithea.model.scm import ScmModel
33 from kallithea.lib.db_manage import DbManage
33 from kallithea.lib.db_manage import DbManage
34 from kallithea.lib.vcs.backends.base import EmptyChangeset
34 from kallithea.lib.vcs.backends.base import EmptyChangeset
35 from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN
35 from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN
36
36
37
37
38 log = logging.getLogger(__name__)
38 log = logging.getLogger(__name__)
39
39
40 FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
40 FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
41
41
42
42
43 def error_function(*args, **kwargs):
43 def error_function(*args, **kwargs):
44 raise Exception('Total Crash !')
44 raise Exception('Total Crash !')
45
45
46
46
47 class Fixture(object):
47 class Fixture(object):
48
48
49 def __init__(self):
49 def __init__(self):
50 pass
50 pass
51
51
52 def anon_access(self, status):
52 def anon_access(self, status):
53 """
53 """
54 Context manager for controlling anonymous access.
54 Context manager for controlling anonymous access.
55 Anon access will be set and committed, but restored again when exiting the block.
55 Anon access will be set and committed, but restored again when exiting the block.
56
56
57 Usage:
57 Usage:
58
58
59 fixture = Fixture()
59 fixture = Fixture()
60 with fixture.anon_access(False):
60 with fixture.anon_access(False):
61 stuff
61 stuff
62 """
62 """
63
63
64 class context(object):
64 class context(object):
65 def __enter__(self):
65 def __enter__(self):
66 anon = User.get_default_user()
66 anon = User.get_default_user()
67 self._before = anon.active
67 self._before = anon.active
68 anon.active = status
68 anon.active = status
69 Session().commit()
69 Session().commit()
70 invalidate_all_caches()
70 invalidate_all_caches()
71
71
72 def __exit__(self, exc_type, exc_val, exc_tb):
72 def __exit__(self, exc_type, exc_val, exc_tb):
73 anon = User.get_default_user()
73 anon = User.get_default_user()
74 anon.active = self._before
74 anon.active = self._before
75 Session().commit()
75 Session().commit()
76
76
77 return context()
77 return context()
78
78
79 def _get_repo_create_params(self, **custom):
79 def _get_repo_create_params(self, **custom):
80 """Return form values to be validated through RepoForm"""
80 """Return form values to be validated through RepoForm"""
81 defs = dict(
81 defs = dict(
82 repo_name=None,
82 repo_name=None,
83 repo_type='hg',
83 repo_type='hg',
84 clone_uri='',
84 clone_uri='',
85 repo_group=u'-1',
85 repo_group=u'-1',
86 repo_description=u'DESC',
86 repo_description=u'DESC',
87 repo_private=False,
87 repo_private=False,
88 repo_landing_rev='rev:tip',
88 repo_landing_rev='rev:tip',
89 repo_copy_permissions=False,
89 repo_copy_permissions=False,
90 repo_state=Repository.STATE_CREATED,
90 repo_state=Repository.STATE_CREATED,
91 )
91 )
92 defs.update(custom)
92 defs.update(custom)
93 if 'repo_name_full' not in custom:
93 if 'repo_name_full' not in custom:
94 defs.update({'repo_name_full': defs['repo_name']})
94 defs.update({'repo_name_full': defs['repo_name']})
95
95
96 # fix the repo name if passed as repo_name_full
96 # fix the repo name if passed as repo_name_full
97 if defs['repo_name']:
97 if defs['repo_name']:
98 defs['repo_name'] = defs['repo_name'].split('/')[-1]
98 defs['repo_name'] = defs['repo_name'].split('/')[-1]
99
99
100 return defs
100 return defs
101
101
102 def _get_repo_group_create_params(self, **custom):
102 def _get_repo_group_create_params(self, **custom):
103 """Return form values to be validated through RepoGroupForm"""
103 """Return form values to be validated through RepoGroupForm"""
104 defs = dict(
104 defs = dict(
105 group_name=None,
105 group_name=None,
106 group_description=u'DESC',
106 group_description=u'DESC',
107 parent_group_id=u'-1',
107 parent_group_id=u'-1',
108 perms_updates=[],
108 perms_updates=[],
109 perms_new=[],
109 perms_new=[],
110 enable_locking=False,
110 enable_locking=False,
111 recursive=False
111 recursive=False
112 )
112 )
113 defs.update(custom)
113 defs.update(custom)
114
114
115 return defs
115 return defs
116
116
117 def _get_user_create_params(self, name, **custom):
117 def _get_user_create_params(self, name, **custom):
118 defs = dict(
118 defs = dict(
119 username=name,
119 username=name,
120 password='qweqwe',
120 password='qweqwe',
121 email='%s+test@example.com' % name,
121 email='%s+test@example.com' % name,
122 firstname=u'TestUser',
122 firstname=u'TestUser',
123 lastname=u'Test',
123 lastname=u'Test',
124 active=True,
124 active=True,
125 admin=False,
125 admin=False,
126 extern_type='internal',
126 extern_type='internal',
127 extern_name=None
127 extern_name=None
128 )
128 )
129 defs.update(custom)
129 defs.update(custom)
130
130
131 return defs
131 return defs
132
132
133 def _get_user_group_create_params(self, name, **custom):
133 def _get_user_group_create_params(self, name, **custom):
134 defs = dict(
134 defs = dict(
135 users_group_name=name,
135 users_group_name=name,
136 user_group_description=u'DESC',
136 user_group_description=u'DESC',
137 users_group_active=True,
137 users_group_active=True,
138 user_group_data={},
138 user_group_data={},
139 )
139 )
140 defs.update(custom)
140 defs.update(custom)
141
141
142 return defs
142 return defs
143
143
144 def create_repo(self, name, repo_group=None, **kwargs):
144 def create_repo(self, name, repo_group=None, **kwargs):
145 if 'skip_if_exists' in kwargs:
145 if 'skip_if_exists' in kwargs:
146 del kwargs['skip_if_exists']
146 del kwargs['skip_if_exists']
147 r = Repository.get_by_repo_name(name)
147 r = Repository.get_by_repo_name(name)
148 if r:
148 if r:
149 return r
149 return r
150
150
151 if isinstance(repo_group, RepoGroup):
151 if isinstance(repo_group, RepoGroup):
152 repo_group = repo_group.group_id
152 repo_group = repo_group.group_id
153
153
154 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
154 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
155 form_data['repo_group'] = repo_group # patch form dict so it can be used directly by model
155 form_data['repo_group'] = repo_group # patch form dict so it can be used directly by model
156 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
156 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
157 RepoModel().create(form_data, cur_user)
157 RepoModel().create(form_data, cur_user)
158 Session().commit()
158 Session().commit()
159 ScmModel().mark_for_invalidation(name)
159 ScmModel().mark_for_invalidation(name)
160 return Repository.get_by_repo_name(name)
160 return Repository.get_by_repo_name(name)
161
161
162 def create_fork(self, repo_to_fork, fork_name, **kwargs):
162 def create_fork(self, repo_to_fork, fork_name, **kwargs):
163 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
163 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
164
164
165 form_data = self._get_repo_create_params(repo_name=fork_name,
165 form_data = self._get_repo_create_params(repo_name=fork_name,
166 fork_parent_id=repo_to_fork,
166 fork_parent_id=repo_to_fork,
167 repo_type=repo_to_fork.repo_type,
167 repo_type=repo_to_fork.repo_type,
168 **kwargs)
168 **kwargs)
169 # patch form dict so it can be used directly by model
169 # patch form dict so it can be used directly by model
170 form_data['description'] = form_data['repo_description']
170 form_data['description'] = form_data['repo_description']
171 form_data['private'] = form_data['repo_private']
171 form_data['private'] = form_data['repo_private']
172 form_data['landing_rev'] = form_data['repo_landing_rev']
172 form_data['landing_rev'] = form_data['repo_landing_rev']
173
173
174 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
174 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
175 RepoModel().create_fork(form_data, cur_user=owner)
175 RepoModel().create_fork(form_data, cur_user=owner)
176 Session().commit()
176 Session().commit()
177 ScmModel().mark_for_invalidation(fork_name)
177 ScmModel().mark_for_invalidation(fork_name)
178 r = Repository.get_by_repo_name(fork_name)
178 r = Repository.get_by_repo_name(fork_name)
179 assert r
179 assert r
180 return r
180 return r
181
181
182 def destroy_repo(self, repo_name, **kwargs):
182 def destroy_repo(self, repo_name, **kwargs):
183 RepoModel().delete(repo_name, **kwargs)
183 RepoModel().delete(repo_name, **kwargs)
184 Session().commit()
184 Session().commit()
185
185
186 def create_repo_group(self, name, parent_group_id=None, **kwargs):
186 def create_repo_group(self, name, parent_group_id=None, **kwargs):
187 if 'skip_if_exists' in kwargs:
187 if 'skip_if_exists' in kwargs:
188 del kwargs['skip_if_exists']
188 del kwargs['skip_if_exists']
189 gr = RepoGroup.get_by_group_name(group_name=name)
189 gr = RepoGroup.get_by_group_name(group_name=name)
190 if gr:
190 if gr:
191 return gr
191 return gr
192 form_data = self._get_repo_group_create_params(group_name=name, **kwargs)
192 form_data = self._get_repo_group_create_params(group_name=name, **kwargs)
193 gr = RepoGroupModel().create(
193 gr = RepoGroupModel().create(
194 group_name=form_data['group_name'],
194 group_name=form_data['group_name'],
195 group_description=form_data['group_name'],
195 group_description=form_data['group_name'],
196 parent=parent_group_id,
196 parent=parent_group_id,
197 owner=kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN),
197 owner=kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN),
198 )
198 )
199 Session().commit()
199 Session().commit()
200 gr = RepoGroup.get_by_group_name(gr.group_name)
200 gr = RepoGroup.get_by_group_name(gr.group_name)
201 return gr
201 return gr
202
202
203 def destroy_repo_group(self, repogroupid):
203 def destroy_repo_group(self, repogroupid):
204 RepoGroupModel().delete(repogroupid)
204 RepoGroupModel().delete(repogroupid)
205 Session().commit()
205 Session().commit()
206
206
207 def create_user(self, name, **kwargs):
207 def create_user(self, name, **kwargs):
208 if 'skip_if_exists' in kwargs:
208 if 'skip_if_exists' in kwargs:
209 del kwargs['skip_if_exists']
209 del kwargs['skip_if_exists']
210 user = User.get_by_username(name)
210 user = User.get_by_username(name)
211 if user:
211 if user:
212 return user
212 return user
213 form_data = self._get_user_create_params(name, **kwargs)
213 form_data = self._get_user_create_params(name, **kwargs)
214 user = UserModel().create(form_data)
214 user = UserModel().create(form_data)
215 Session().commit()
215 Session().commit()
216 user = User.get_by_username(user.username)
216 user = User.get_by_username(user.username)
217 return user
217 return user
218
218
219 def destroy_user(self, userid):
219 def destroy_user(self, userid):
220 UserModel().delete(userid)
220 UserModel().delete(userid)
221 Session().commit()
221 Session().commit()
222
222
223 def create_user_group(self, name, **kwargs):
223 def create_user_group(self, name, **kwargs):
224 if 'skip_if_exists' in kwargs:
224 if 'skip_if_exists' in kwargs:
225 del kwargs['skip_if_exists']
225 del kwargs['skip_if_exists']
226 gr = UserGroup.get_by_group_name(group_name=name)
226 gr = UserGroup.get_by_group_name(group_name=name)
227 if gr:
227 if gr:
228 return gr
228 return gr
229 form_data = self._get_user_group_create_params(name, **kwargs)
229 form_data = self._get_user_group_create_params(name, **kwargs)
230 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
230 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
231 user_group = UserGroupModel().create(
231 user_group = UserGroupModel().create(
232 name=form_data['users_group_name'],
232 name=form_data['users_group_name'],
233 description=form_data['user_group_description'],
233 description=form_data['user_group_description'],
234 owner=owner, active=form_data['users_group_active'],
234 owner=owner, active=form_data['users_group_active'],
235 group_data=form_data['user_group_data'])
235 group_data=form_data['user_group_data'])
236 Session().commit()
236 Session().commit()
237 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
237 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
238 return user_group
238 return user_group
239
239
240 def destroy_user_group(self, usergroupid):
240 def destroy_user_group(self, usergroupid):
241 UserGroupModel().delete(user_group=usergroupid, force=True)
241 UserGroupModel().delete(user_group=usergroupid, force=True)
242 Session().commit()
242 Session().commit()
243
243
244 def create_gist(self, **kwargs):
244 def create_gist(self, **kwargs):
245 form_data = {
245 form_data = {
246 'description': u'new-gist',
246 'description': u'new-gist',
247 'owner': TEST_USER_ADMIN_LOGIN,
247 'owner': TEST_USER_ADMIN_LOGIN,
248 'gist_type': Gist.GIST_PUBLIC,
248 'gist_type': Gist.GIST_PUBLIC,
249 'lifetime': -1,
249 'lifetime': -1,
250 'gist_mapping': {'filename1.txt':{'content':'hello world'},}
250 'gist_mapping': {'filename1.txt':{'content':'hello world'},}
251 }
251 }
252 form_data.update(kwargs)
252 form_data.update(kwargs)
253 gist = GistModel().create(
253 gist = GistModel().create(
254 description=form_data['description'],owner=form_data['owner'],
254 description=form_data['description'],owner=form_data['owner'],
255 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
255 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
256 lifetime=form_data['lifetime']
256 lifetime=form_data['lifetime']
257 )
257 )
258 Session().commit()
258 Session().commit()
259
259
260 return gist
260 return gist
261
261
262 def destroy_gists(self, gistid=None):
262 def destroy_gists(self, gistid=None):
263 for g in Gist.query():
263 for g in Gist.query():
264 if gistid:
264 if gistid:
265 if gistid == g.gist_access_id:
265 if gistid == g.gist_access_id:
266 GistModel().delete(g)
266 GistModel().delete(g)
267 else:
267 else:
268 GistModel().delete(g)
268 GistModel().delete(g)
269 Session().commit()
269 Session().commit()
270
270
271 def load_resource(self, resource_name, strip=True):
271 def load_resource(self, resource_name, strip=True):
272 with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
272 with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
273 source = f.read()
273 source = f.read()
274 if strip:
274 if strip:
275 source = source.strip()
275 source = source.strip()
276
276
277 return source
277 return source
278
278
279 def commit_change(self, repo, filename, content, message, vcs_type,
279 def commit_change(self, repo, filename, content, message, vcs_type,
280 parent=None, newfile=False, author=None):
280 parent=None, newfile=False, author=None):
281 repo = Repository.get_by_repo_name(repo)
281 repo = Repository.get_by_repo_name(repo)
282 _cs = parent
282 _cs = parent
283 if parent is None:
283 if parent is None:
284 _cs = EmptyChangeset(alias=vcs_type)
284 _cs = EmptyChangeset(alias=vcs_type)
285 if author is None:
285 if author is None:
286 author = TEST_USER_ADMIN_LOGIN
286 author = TEST_USER_ADMIN_LOGIN
287
287
288 if newfile:
288 if newfile:
289 nodes = {
289 nodes = {
290 filename: {
290 filename: {
291 'content': content
291 'content': content
292 }
292 }
293 }
293 }
294 cs = ScmModel().create_nodes(
294 cs = ScmModel().create_nodes(
295 user=TEST_USER_ADMIN_LOGIN, repo=repo,
295 user=TEST_USER_ADMIN_LOGIN, repo=repo,
296 message=message,
296 message=message,
297 nodes=nodes,
297 nodes=nodes,
298 parent_cs=_cs,
298 parent_cs=_cs,
299 author=author,
299 author=author,
300 )
300 )
301 else:
301 else:
302 cs = ScmModel().commit_change(
302 cs = ScmModel().commit_change(
303 repo=repo.scm_instance, repo_name=repo.repo_name,
303 repo=repo.scm_instance, repo_name=repo.repo_name,
304 cs=parent, user=TEST_USER_ADMIN_LOGIN,
304 cs=parent, user=TEST_USER_ADMIN_LOGIN,
305 author=author,
305 author=author,
306 message=message,
306 message=message,
307 content=content,
307 content=content,
308 f_path=filename
308 f_path=filename
309 )
309 )
310 return cs
310 return cs
311
311
312
312
313 #==============================================================================
313 #==============================================================================
314 # Global test environment setup
314 # Global test environment setup
315 #==============================================================================
315 #==============================================================================
316
316
317 def create_test_env(repos_test_path, config):
317 def create_test_env(repos_test_path, config):
318 """
318 """
319 Makes a fresh database and
319 Makes a fresh database and
320 install test repository into tmp dir
320 install test repository into tmp dir
321 """
321 """
322
322
323 # PART ONE create db
323 # PART ONE create db
324 dbconf = config['sqlalchemy.url']
324 dbconf = config['sqlalchemy.url']
325 log.debug('making test db %s', dbconf)
325 log.debug('making test db %s', dbconf)
326
326
327 # create test dir if it doesn't exist
327 # create test dir if it doesn't exist
328 if not os.path.isdir(repos_test_path):
328 if not os.path.isdir(repos_test_path):
329 log.debug('Creating testdir %s', repos_test_path)
329 log.debug('Creating testdir %s', repos_test_path)
330 os.makedirs(repos_test_path)
330 os.makedirs(repos_test_path)
331
331
332 dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
332 dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
333 tests=True)
333 tests=True)
334 dbmanage.create_tables(override=True)
334 dbmanage.create_tables(override=True)
335 # for tests dynamically set new root paths based on generated content
335 # for tests dynamically set new root paths based on generated content
336 dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
336 dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
337 dbmanage.create_default_user()
337 dbmanage.create_default_user()
338 dbmanage.admin_prompt()
338 dbmanage.admin_prompt()
339 dbmanage.create_permissions()
339 dbmanage.create_permissions()
340 dbmanage.populate_default_permissions()
340 dbmanage.populate_default_permissions()
341 Session().commit()
341 Session().commit()
342 # PART TWO make test repo
342 # PART TWO make test repo
343 log.debug('making test vcs repositories')
343 log.debug('making test vcs repositories')
344
344
345 idx_path = config['app_conf']['index_dir']
345 idx_path = config['index_dir']
346 data_path = config['app_conf']['cache_dir']
346 data_path = config['cache_dir']
347
347
348 #clean index and data
348 #clean index and data
349 if idx_path and os.path.exists(idx_path):
349 if idx_path and os.path.exists(idx_path):
350 log.debug('remove %s', idx_path)
350 log.debug('remove %s', idx_path)
351 shutil.rmtree(idx_path)
351 shutil.rmtree(idx_path)
352
352
353 if data_path and os.path.exists(data_path):
353 if data_path and os.path.exists(data_path):
354 log.debug('remove %s', data_path)
354 log.debug('remove %s', data_path)
355 shutil.rmtree(data_path)
355 shutil.rmtree(data_path)
356
356
357 #CREATE DEFAULT TEST REPOS
357 #CREATE DEFAULT TEST REPOS
358 tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
358 tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
359 tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
359 tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
360 tar.close()
360 tar.close()
361
361
362 tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
362 tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
363 tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
363 tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
364 tar.close()
364 tar.close()
365
365
366 #LOAD VCS test stuff
366 #LOAD VCS test stuff
367 from kallithea.tests.vcs import setup_package
367 from kallithea.tests.vcs import setup_package
368 setup_package()
368 setup_package()
369
369
370
370
371 def create_test_index(repo_location, config, full_index):
371 def create_test_index(repo_location, config, full_index):
372 """
372 """
373 Makes default test index
373 Makes default test index
374 """
374 """
375
375
376 from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
376 from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
377 from kallithea.lib.pidlock import DaemonLock, LockHeld
377 from kallithea.lib.pidlock import DaemonLock, LockHeld
378
378
379 index_location = os.path.join(config['app_conf']['index_dir'])
379 index_location = os.path.join(config['index_dir'])
380 if not os.path.exists(index_location):
380 if not os.path.exists(index_location):
381 os.makedirs(index_location)
381 os.makedirs(index_location)
382
382
383 try:
383 try:
384 l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
384 l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
385 WhooshIndexingDaemon(index_location=index_location,
385 WhooshIndexingDaemon(index_location=index_location,
386 repo_location=repo_location) \
386 repo_location=repo_location) \
387 .run(full_index=full_index)
387 .run(full_index=full_index)
388 l.release()
388 l.release()
389 except LockHeld:
389 except LockHeld:
390 pass
390 pass
General Comments 0
You need to be logged in to leave comments. Login now