##// END OF EJS Templates
tests: use custom test app for better debug of .mustcontain function....
marcink -
r1256:c3a5baac default
parent child Browse files
Show More
@@ -1,261 +1,260 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import datetime
25 25 import hashlib
26 26 import tempfile
27 27 from os.path import join as jn
28 28
29 29 from tempfile import _RandomNameSequence
30 30
31 31 from paste.deploy import loadapp
32 32 from paste.script.appinstall import SetupCommand
33 33
34 34 import pylons
35 35 import pylons.test
36 36 from pylons import config, url
37 37 from pylons.i18n.translation import _get_translator
38 38 from pylons.util import ContextObj
39 39
40 40 from routes.util import URLGenerator
41 from webtest import TestApp
42 41 from nose.plugins.skip import SkipTest
43 42 import pytest
44 43
45 44 from rhodecode import is_windows
46 45 from rhodecode.config.routing import ADMIN_PREFIX
47 46 from rhodecode.model.meta import Session
48 47 from rhodecode.model.db import User
49 48 from rhodecode.lib import auth
50 49 from rhodecode.lib.helpers import flash, link_to
51 50 from rhodecode.lib.utils2 import safe_unicode, safe_str
52 51 from rhodecode.tests.utils import get_session_from_response
53 52
54 53 # TODO: johbo: Solve time zone related issues and remove this tweak
55 54 os.environ['TZ'] = 'UTC'
56 55 if not is_windows:
57 56 time.tzset()
58 57
59 58 log = logging.getLogger(__name__)
60 59
61 60 __all__ = [
62 61 'get_new_dir', 'TestController', 'SkipTest',
63 62 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
64 63 'assert_session_flash', 'login_user',
65 64 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
66 65 'NEW_HG_REPO', 'NEW_GIT_REPO',
67 66 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
68 67 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
69 68 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
70 69 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
71 70 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
72 71 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
73 72 ]
74 73
75 74 # Invoke websetup with the current config file
76 75 # SetupCommand('setup-app').run([config_file])
77 76
78 77 # SOME GLOBALS FOR TESTS
79 78 TEST_DIR = tempfile.gettempdir()
80 79
81 80 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
82 81 TEST_USER_ADMIN_LOGIN = 'test_admin'
83 82 TEST_USER_ADMIN_PASS = 'test12'
84 83 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
85 84
86 85 TEST_USER_REGULAR_LOGIN = 'test_regular'
87 86 TEST_USER_REGULAR_PASS = 'test12'
88 87 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
89 88
90 89 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
91 90 TEST_USER_REGULAR2_PASS = 'test12'
92 91 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
93 92
94 93 HG_REPO = 'vcs_test_hg'
95 94 GIT_REPO = 'vcs_test_git'
96 95 SVN_REPO = 'vcs_test_svn'
97 96
98 97 NEW_HG_REPO = 'vcs_test_hg_new'
99 98 NEW_GIT_REPO = 'vcs_test_git_new'
100 99
101 100 HG_FORK = 'vcs_test_hg_fork'
102 101 GIT_FORK = 'vcs_test_git_fork'
103 102
104 103 ## VCS
105 104 SCM_TESTS = ['hg', 'git']
106 105 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
107 106
108 107 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
109 108 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
110 109 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
111 110
112 111 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
113 112 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
114 113 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
115 114
116 115 TEST_REPO_PREFIX = 'vcs-test'
117 116
118 117
119 118 # skip ldap tests if LDAP lib is not installed
120 119 ldap_lib_installed = False
121 120 try:
122 121 import ldap
123 122 ldap_lib_installed = True
124 123 except ImportError:
125 124 # means that python-ldap is not installed
126 125 pass
127 126
128 127
129 128 def clear_all_caches():
130 129 from beaker.cache import cache_managers
131 130 for _cache in cache_managers.values():
132 131 _cache.clear()
133 132
134 133
135 134 def get_new_dir(title):
136 135 """
137 136 Returns always new directory path.
138 137 """
139 138 from rhodecode.tests.vcs.utils import get_normalized_path
140 139 name_parts = [TEST_REPO_PREFIX]
141 140 if title:
142 141 name_parts.append(title)
143 142 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
144 143 name_parts.append(hex_str)
145 144 name = '-'.join(name_parts)
146 145 path = os.path.join(TEST_DIR, name)
147 146 return get_normalized_path(path)
148 147
149 148
150 149 @pytest.mark.usefixtures('app', 'index_location')
151 150 class TestController(object):
152 151
153 152 maxDiff = None
154 153
155 154 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
156 155 password=TEST_USER_ADMIN_PASS):
157 156 self._logged_username = username
158 157 self._session = login_user_session(self.app, username, password)
159 158 self.csrf_token = auth.get_csrf_token(self._session)
160 159
161 160 return self._session['rhodecode_user']
162 161
163 162 def logout_user(self):
164 163 logout_user_session(self.app, auth.get_csrf_token(self._session))
165 164 self.csrf_token = None
166 165 self._logged_username = None
167 166 self._session = None
168 167
169 168 def _get_logged_user(self):
170 169 return User.get_by_username(self._logged_username)
171 170
172 171 # TODO: remove, use plain assert in tests
173 172 def assertEqual(self, a, b, msg=None):
174 173 if msg:
175 174 assert a == b, msg
176 175 else:
177 176 assert a == b
178 177
179 178
180 179 def login_user_session(
181 180 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
182 181 from rhodecode.tests.functional.test_login import login_url
183 182 response = app.post(
184 183 login_url,
185 184 {'username': username, 'password': password})
186 185 if 'invalid user name' in response.body:
187 186 pytest.fail('could not login using %s %s' % (username, password))
188 187
189 188 assert response.status == '302 Found'
190 189 response = response.follow()
191 190 assert response.status == '200 OK'
192 191
193 192 session = get_session_from_response(response)
194 193 assert 'rhodecode_user' in session
195 194 rc_user = session['rhodecode_user']
196 195 assert rc_user.get('username') == username
197 196 assert rc_user.get('is_authenticated')
198 197
199 198 return session
200 199
201 200
202 201 def logout_user_session(app, csrf_token):
203 202 from rhodecode.tests.functional.test_login import logut_url
204 203 app.post(logut_url, {'csrf_token': csrf_token}, status=302)
205 204
206 205
207 206 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
208 207 password=TEST_USER_ADMIN_PASS):
209 208 return login_user_session(app, username, password)['rhodecode_user']
210 209
211 210
212 211 def assert_session_flash(response=None, msg=None, category=None):
213 212 """
214 213 Assert on a flash message in the current session.
215 214
216 215 :param msg: Required. The expected message. Will be evaluated if a
217 216 :class:`LazyString` is passed in.
218 217 :param response: Optional. For functional testing, pass in the response
219 218 object. Otherwise don't pass in any value.
220 219 :param category: Optional. If passed, the message category will be
221 220 checked as well.
222 221 """
223 222 if msg is None:
224 223 raise ValueError("Parameter msg is required.")
225 224
226 225 messages = flash.pop_messages()
227 226 msg = _eval_if_lazy(msg)
228 227
229 228 assert messages, 'unable to find message `%s` in empty flash list' % msg
230 229 message = messages[0]
231 230
232 231 message_text = _eval_if_lazy(message.message)
233 232
234 233 if msg not in message_text:
235 234 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
236 235 msg, message_text)
237 236 pytest.fail(safe_str(msg))
238 237 if category:
239 238 assert category == message.category
240 239
241 240
242 241 def _eval_if_lazy(value):
243 242 return value.eval() if hasattr(value, 'eval') else value
244 243
245 244
246 245 def assert_not_in_session_flash(response, msg, category=None):
247 246 assert 'flash' in response.session, 'Response session has no flash key'
248 247 message_category, message_text = response.session['flash'][0]
249 248 if msg in message_text:
250 249 msg = u'msg `%s` found in session flash: got `%s` instead' % (
251 250 msg, message_text)
252 251 pytest.fail(safe_str(msg))
253 252 if category:
254 253 assert category == message_category
255 254
256 255
257 256 def assert_session_flash_is_empty(response):
258 257 if 'flash' in response.session:
259 258 msg = 'flash messages are present in session:%s' % \
260 259 response.session['flash'][0]
261 260 pytest.fail(safe_str(msg))
@@ -1,89 +1,89 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import wsgiref.simple_server
22 22
23 23 import pytest
24 import webtest
25 24
25 from rhodecode.tests.utils import CustomTestApp
26 26 from rhodecode.lib.middleware import csrf
27 27
28 28
29 29 def test_origin_checker_no_origin():
30 30 app = csrf.OriginChecker(
31 31 wsgiref.simple_server.demo_app, 'https://safe.org')
32 app = webtest.TestApp(app)
32 app = CustomTestApp(app)
33 33
34 34 app.post('/foo')
35 35
36 36
37 37 def test_origin_checker_null_origin():
38 38 app = csrf.OriginChecker(
39 39 wsgiref.simple_server.demo_app, 'https://safe.org')
40 app = webtest.TestApp(app)
40 app = CustomTestApp(app)
41 41
42 42 app.post('/foo', headers={'Origin': 'null'})
43 43
44 44
45 45 @pytest.mark.parametrize('origin', [
46 46 'http://safe.org',
47 47 'http://safe.org:80',
48 48 'http://safe.org http://redirect',
49 49 ])
50 50 def test_origin_checker_valid_origin(origin):
51 51 app = csrf.OriginChecker(
52 52 wsgiref.simple_server.demo_app, 'http://safe.org')
53 app = webtest.TestApp(app)
53 app = CustomTestApp(app)
54 54
55 55 app.post('/foo', headers={'Origin': origin})
56 56
57 57
58 58 @pytest.mark.parametrize('origin', [
59 59 'https://safe.org',
60 60 'https://safe.org:443',
61 61 'https://safe.org https://redirect',
62 62 ])
63 63 def test_origin_checker_valid_origin_https(origin):
64 64 app = csrf.OriginChecker(
65 65 wsgiref.simple_server.demo_app, 'https://safe.org')
66 app = webtest.TestApp(app)
66 app = CustomTestApp(app)
67 67
68 68 app.post('/foo', headers={'Origin': origin})
69 69
70 70
71 71 @pytest.mark.parametrize('origin', [
72 72 'http://www.evil.org',
73 73 'https://www.evil.org',
74 74 'foo',
75 75 ])
76 76 def test_origin_checker_invalid_origin(origin):
77 77 app = csrf.OriginChecker(
78 78 wsgiref.simple_server.demo_app, 'https://safe.org')
79 app = webtest.TestApp(app)
79 app = CustomTestApp(app)
80 80
81 81 app.post('/foo', headers={'Origin': origin}, status=403)
82 82
83 83
84 84 def test_origin_checker_invalid_origin_skipped_url():
85 85 app = csrf.OriginChecker(
86 86 wsgiref.simple_server.demo_app, 'https://safe.org', skip_urls=['/foo'])
87 app = webtest.TestApp(app)
87 app = CustomTestApp(app)
88 88
89 89 app.post('/foo', headers={'Origin': 'http://www.evil.org'})
@@ -1,461 +1,462 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import base64
22 22
23 23 import mock
24 24 import pytest
25 import webtest.app
25
26 from rhodecode.tests.utils import CustomTestApp
26 27
27 28 from rhodecode.lib.caching_query import FromCache
28 29 from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon
29 30 from rhodecode.lib.middleware import simplevcs
30 31 from rhodecode.lib.middleware.https_fixup import HttpsFixup
31 32 from rhodecode.lib.middleware.utils import scm_app_http
32 33 from rhodecode.model.db import User, _hash_key
33 34 from rhodecode.model.meta import Session
34 35 from rhodecode.tests import (
35 36 HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
36 37 from rhodecode.tests.lib.middleware import mock_scm_app
37 38 from rhodecode.tests.utils import set_anonymous_access
38 39
39 40
40 41 class StubVCSController(simplevcs.SimpleVCS):
41 42
42 43 SCM = 'hg'
43 44 stub_response_body = tuple()
44 45
45 46 def __init__(self, *args, **kwargs):
46 47 super(StubVCSController, self).__init__(*args, **kwargs)
47 48 self._action = 'pull'
48 49 self._name = HG_REPO
49 50 self.set_repo_names(None)
50 51
51 52 def _get_repository_name(self, environ):
52 53 return self._name
53 54
54 55 def _get_action(self, environ):
55 56 return self._action
56 57
57 58 def _create_wsgi_app(self, repo_path, repo_name, config):
58 59 def fake_app(environ, start_response):
59 60 start_response('200 OK', [])
60 61 return self.stub_response_body
61 62 return fake_app
62 63
63 64 def _create_config(self, extras, repo_name):
64 65 return None
65 66
66 67
67 68 @pytest.fixture
68 69 def vcscontroller(pylonsapp, config_stub):
69 70 config_stub.testing_securitypolicy()
70 71 config_stub.include('rhodecode.authentication')
71 72
72 73 set_anonymous_access(True)
73 74 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
74 75 app = HttpsFixup(controller, pylonsapp.config)
75 app = webtest.app.TestApp(app)
76 app = CustomTestApp(app)
76 77
77 78 _remove_default_user_from_query_cache()
78 79
79 80 # Sanity checks that things are set up correctly
80 81 app.get('/' + HG_REPO, status=200)
81 82
82 83 app.controller = controller
83 84 return app
84 85
85 86
86 87 def _remove_default_user_from_query_cache():
87 88 user = User.get_default_user(cache=True)
88 89 query = Session().query(User).filter(User.username == user.username)
89 90 query = query.options(FromCache(
90 91 "sql_cache_short", "get_user_%s" % _hash_key(user.username)))
91 92 query.invalidate()
92 93 Session().expire(user)
93 94
94 95
95 96 @pytest.fixture
96 97 def disable_anonymous_user(request, pylonsapp):
97 98 set_anonymous_access(False)
98 99
99 100 @request.addfinalizer
100 101 def cleanup():
101 102 set_anonymous_access(True)
102 103
103 104
104 105 def test_handles_exceptions_during_permissions_checks(
105 106 vcscontroller, disable_anonymous_user):
106 107 user_and_pass = '%s:%s' % (TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
107 108 auth_password = base64.encodestring(user_and_pass).strip()
108 109 extra_environ = {
109 110 'AUTH_TYPE': 'Basic',
110 111 'HTTP_AUTHORIZATION': 'Basic %s' % auth_password,
111 112 'REMOTE_USER': TEST_USER_ADMIN_LOGIN,
112 113 }
113 114
114 115 # Verify that things are hooked up correctly
115 116 vcscontroller.get('/', status=200, extra_environ=extra_environ)
116 117
117 118 # Simulate trouble during permission checks
118 119 with mock.patch('rhodecode.model.db.User.get_by_username',
119 120 side_effect=Exception) as get_user:
120 121 # Verify that a correct 500 is returned and check that the expected
121 122 # code path was hit.
122 123 vcscontroller.get('/', status=500, extra_environ=extra_environ)
123 124 assert get_user.called
124 125
125 126
126 127 def test_returns_forbidden_if_no_anonymous_access(
127 128 vcscontroller, disable_anonymous_user):
128 129 vcscontroller.get('/', status=401)
129 130
130 131
131 132 class StubFailVCSController(simplevcs.SimpleVCS):
132 133 def _handle_request(self, environ, start_response):
133 134 raise Exception("BOOM")
134 135
135 136
136 137 @pytest.fixture(scope='module')
137 138 def fail_controller(pylonsapp):
138 139 controller = StubFailVCSController(pylonsapp, pylonsapp.config, None)
139 140 controller = HttpsFixup(controller, pylonsapp.config)
140 controller = webtest.app.TestApp(controller)
141 controller = CustomTestApp(controller)
141 142 return controller
142 143
143 144
144 145 def test_handles_exceptions_as_internal_server_error(fail_controller):
145 146 fail_controller.get('/', status=500)
146 147
147 148
148 149 def test_provides_traceback_for_appenlight(fail_controller):
149 150 response = fail_controller.get(
150 151 '/', status=500, extra_environ={'appenlight.client': 'fake'})
151 152 assert 'appenlight.__traceback' in response.request.environ
152 153
153 154
154 155 def test_provides_utils_scm_app_as_scm_app_by_default(pylonsapp):
155 156 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
156 157 assert controller.scm_app is scm_app_http
157 158
158 159
159 160 def test_allows_to_override_scm_app_via_config(pylonsapp):
160 161 config = pylonsapp.config.copy()
161 162 config['vcs.scm_app_implementation'] = (
162 163 'rhodecode.tests.lib.middleware.mock_scm_app')
163 164 controller = StubVCSController(pylonsapp, config, None)
164 165 assert controller.scm_app is mock_scm_app
165 166
166 167
167 168 @pytest.mark.parametrize('query_string, expected', [
168 169 ('cmd=stub_command', True),
169 170 ('cmd=listkeys', False),
170 171 ])
171 172 def test_should_check_locking(query_string, expected):
172 173 result = simplevcs._should_check_locking(query_string)
173 174 assert result == expected
174 175
175 176
176 177 class TestShadowRepoRegularExpression(object):
177 178 pr_segment = 'pull-request'
178 179 shadow_segment = 'repository'
179 180
180 181 @pytest.mark.parametrize('url, expected', [
181 182 # repo with/without groups
182 183 ('My-Repo/{pr_segment}/1/{shadow_segment}', True),
183 184 ('Group/My-Repo/{pr_segment}/2/{shadow_segment}', True),
184 185 ('Group/Sub-Group/My-Repo/{pr_segment}/3/{shadow_segment}', True),
185 186 ('Group/Sub-Group1/Sub-Group2/My-Repo/{pr_segment}/3/{shadow_segment}', True),
186 187
187 188 # pull request ID
188 189 ('MyRepo/{pr_segment}/1/{shadow_segment}', True),
189 190 ('MyRepo/{pr_segment}/1234567890/{shadow_segment}', True),
190 191 ('MyRepo/{pr_segment}/-1/{shadow_segment}', False),
191 192 ('MyRepo/{pr_segment}/invalid/{shadow_segment}', False),
192 193
193 194 # unicode
194 195 (u'Sp€çîál-Repö/{pr_segment}/1/{shadow_segment}', True),
195 196 (u'Sp€çîál-Gröüp/Sp€çîál-Repö/{pr_segment}/1/{shadow_segment}', True),
196 197
197 198 # trailing/leading slash
198 199 ('/My-Repo/{pr_segment}/1/{shadow_segment}', False),
199 200 ('My-Repo/{pr_segment}/1/{shadow_segment}/', False),
200 201 ('/My-Repo/{pr_segment}/1/{shadow_segment}/', False),
201 202
202 203 # misc
203 204 ('My-Repo/{pr_segment}/1/{shadow_segment}/extra', False),
204 205 ('My-Repo/{pr_segment}/1/{shadow_segment}extra', False),
205 206 ])
206 207 def test_shadow_repo_regular_expression(self, url, expected):
207 208 from rhodecode.lib.middleware.simplevcs import SimpleVCS
208 209 url = url.format(
209 210 pr_segment=self.pr_segment,
210 211 shadow_segment=self.shadow_segment)
211 212 match_obj = SimpleVCS.shadow_repo_re.match(url)
212 213 assert (match_obj is not None) == expected
213 214
214 215
215 216 @pytest.mark.backends('git', 'hg')
216 217 class TestShadowRepoExposure(object):
217 218
218 219 def test_pull_on_shadow_repo_propagates_to_wsgi_app(self, pylonsapp):
219 220 """
220 221 Check that a pull action to a shadow repo is propagated to the
221 222 underlying wsgi app.
222 223 """
223 224 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
224 225 controller._check_ssl = mock.Mock()
225 226 controller.is_shadow_repo = True
226 227 controller._action = 'pull'
227 228 controller.stub_response_body = 'dummy body value'
228 229 environ_stub = {
229 230 'HTTP_HOST': 'test.example.com',
230 231 'REQUEST_METHOD': 'GET',
231 232 'wsgi.url_scheme': 'http',
232 233 }
233 234
234 235 response = controller(environ_stub, mock.Mock())
235 236 response_body = ''.join(response)
236 237
237 238 # Assert that we got the response from the wsgi app.
238 239 assert response_body == controller.stub_response_body
239 240
240 241 def test_push_on_shadow_repo_raises(self, pylonsapp):
241 242 """
242 243 Check that a push action to a shadow repo is aborted.
243 244 """
244 245 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
245 246 controller._check_ssl = mock.Mock()
246 247 controller.is_shadow_repo = True
247 248 controller._action = 'push'
248 249 controller.stub_response_body = 'dummy body value'
249 250 environ_stub = {
250 251 'HTTP_HOST': 'test.example.com',
251 252 'REQUEST_METHOD': 'GET',
252 253 'wsgi.url_scheme': 'http',
253 254 }
254 255
255 256 response = controller(environ_stub, mock.Mock())
256 257 response_body = ''.join(response)
257 258
258 259 assert response_body != controller.stub_response_body
259 260 # Assert that a 406 error is returned.
260 261 assert '406 Not Acceptable' in response_body
261 262
262 263 def test_set_repo_names_no_shadow(self, pylonsapp):
263 264 """
264 265 Check that the set_repo_names method sets all names to the one returned
265 266 by the _get_repository_name method on a request to a non shadow repo.
266 267 """
267 268 environ_stub = {}
268 269 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
269 270 controller._name = 'RepoGroup/MyRepo'
270 271 controller.set_repo_names(environ_stub)
271 272 assert not controller.is_shadow_repo
272 273 assert (controller.url_repo_name ==
273 274 controller.acl_repo_name ==
274 275 controller.vcs_repo_name ==
275 276 controller._get_repository_name(environ_stub))
276 277
277 278 def test_set_repo_names_with_shadow(self, pylonsapp, pr_util):
278 279 """
279 280 Check that the set_repo_names method sets correct names on a request
280 281 to a shadow repo.
281 282 """
282 283 from rhodecode.model.pull_request import PullRequestModel
283 284
284 285 pull_request = pr_util.create_pull_request()
285 286 shadow_url = '{target}/{pr_segment}/{pr_id}/{shadow_segment}'.format(
286 287 target=pull_request.target_repo.repo_name,
287 288 pr_id=pull_request.pull_request_id,
288 289 pr_segment=TestShadowRepoRegularExpression.pr_segment,
289 290 shadow_segment=TestShadowRepoRegularExpression.shadow_segment)
290 291 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
291 292 controller._name = shadow_url
292 293 controller.set_repo_names({})
293 294
294 295 # Get file system path to shadow repo for assertions.
295 296 workspace_id = PullRequestModel()._workspace_id(pull_request)
296 297 target_vcs = pull_request.target_repo.scm_instance()
297 298 vcs_repo_name = target_vcs._get_shadow_repository_path(
298 299 workspace_id)
299 300
300 301 assert controller.vcs_repo_name == vcs_repo_name
301 302 assert controller.url_repo_name == shadow_url
302 303 assert controller.acl_repo_name == pull_request.target_repo.repo_name
303 304 assert controller.is_shadow_repo
304 305
305 306 def test_set_repo_names_with_shadow_but_missing_pr(
306 307 self, pylonsapp, pr_util):
307 308 """
308 309 Checks that the set_repo_names method enforces matching target repos
309 310 and pull request IDs.
310 311 """
311 312 pull_request = pr_util.create_pull_request()
312 313 shadow_url = '{target}/{pr_segment}/{pr_id}/{shadow_segment}'.format(
313 314 target=pull_request.target_repo.repo_name,
314 315 pr_id=999999999,
315 316 pr_segment=TestShadowRepoRegularExpression.pr_segment,
316 317 shadow_segment=TestShadowRepoRegularExpression.shadow_segment)
317 318 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
318 319 controller._name = shadow_url
319 320 controller.set_repo_names({})
320 321
321 322 assert not controller.is_shadow_repo
322 323 assert (controller.url_repo_name ==
323 324 controller.acl_repo_name ==
324 325 controller.vcs_repo_name)
325 326
326 327
327 328 @pytest.mark.usefixtures('db')
328 329 @mock.patch.multiple(
329 330 'Pyro4.config', SERVERTYPE='multiplex', POLLTIMEOUT=0.01)
330 331 class TestGenerateVcsResponse:
331 332
332 333 def test_ensures_that_start_response_is_called_early_enough(self):
333 334 self.call_controller_with_response_body(iter(['a', 'b']))
334 335 assert self.start_response.called
335 336
336 337 def test_invalidates_cache_after_body_is_consumed(self):
337 338 result = self.call_controller_with_response_body(iter(['a', 'b']))
338 339 assert not self.was_cache_invalidated()
339 340 # Consume the result
340 341 list(result)
341 342 assert self.was_cache_invalidated()
342 343
343 344 @mock.patch('rhodecode.lib.middleware.simplevcs.HTTPLockedRC')
344 345 def test_handles_locking_exception(self, http_locked_rc):
345 346 result = self.call_controller_with_response_body(
346 347 self.raise_result_iter(vcs_kind='repo_locked'))
347 348 assert not http_locked_rc.called
348 349 # Consume the result
349 350 list(result)
350 351 assert http_locked_rc.called
351 352
352 353 @mock.patch('rhodecode.lib.middleware.simplevcs.HTTPRequirementError')
353 354 def test_handles_requirement_exception(self, http_requirement):
354 355 result = self.call_controller_with_response_body(
355 356 self.raise_result_iter(vcs_kind='requirement'))
356 357 assert not http_requirement.called
357 358 # Consume the result
358 359 list(result)
359 360 assert http_requirement.called
360 361
361 362 @mock.patch('rhodecode.lib.middleware.simplevcs.HTTPLockedRC')
362 363 def test_handles_locking_exception_in_app_call(self, http_locked_rc):
363 364 app_factory_patcher = mock.patch.object(
364 365 StubVCSController, '_create_wsgi_app')
365 366 with app_factory_patcher as app_factory:
366 367 app_factory().side_effect = self.vcs_exception()
367 368 result = self.call_controller_with_response_body(['a'])
368 369 list(result)
369 370 assert http_locked_rc.called
370 371
371 372 def test_raises_unknown_exceptions(self):
372 373 result = self.call_controller_with_response_body(
373 374 self.raise_result_iter(vcs_kind='unknown'))
374 375 with pytest.raises(Exception):
375 376 list(result)
376 377
377 378 def test_prepare_callback_daemon_is_called(self):
378 379 def side_effect(extras):
379 380 return DummyHooksCallbackDaemon(), extras
380 381
381 382 prepare_patcher = mock.patch.object(
382 383 StubVCSController, '_prepare_callback_daemon')
383 384 with prepare_patcher as prepare_mock:
384 385 prepare_mock.side_effect = side_effect
385 386 self.call_controller_with_response_body(iter(['a', 'b']))
386 387 assert prepare_mock.called
387 388 assert prepare_mock.call_count == 1
388 389
389 390 def call_controller_with_response_body(self, response_body):
390 391 settings = {
391 392 'base_path': 'fake_base_path',
392 393 'vcs.hooks.protocol': 'http',
393 394 'vcs.hooks.direct_calls': False,
394 395 }
395 396 controller = StubVCSController(None, settings, None)
396 397 controller._invalidate_cache = mock.Mock()
397 398 controller.stub_response_body = response_body
398 399 self.start_response = mock.Mock()
399 400 result = controller._generate_vcs_response(
400 401 environ={}, start_response=self.start_response,
401 402 repo_path='fake_repo_path',
402 403 extras={}, action='push')
403 404 self.controller = controller
404 405 return result
405 406
406 407 def raise_result_iter(self, vcs_kind='repo_locked'):
407 408 """
408 409 Simulates an exception due to a vcs raised exception if kind vcs_kind
409 410 """
410 411 raise self.vcs_exception(vcs_kind=vcs_kind)
411 412 yield "never_reached"
412 413
413 414 def vcs_exception(self, vcs_kind='repo_locked'):
414 415 locked_exception = Exception('TEST_MESSAGE')
415 416 locked_exception._vcs_kind = vcs_kind
416 417 return locked_exception
417 418
418 419 def was_cache_invalidated(self):
419 420 return self.controller._invalidate_cache.called
420 421
421 422
422 423 class TestInitializeGenerator:
423 424
424 425 def test_drains_first_element(self):
425 426 gen = self.factory(['__init__', 1, 2])
426 427 result = list(gen)
427 428 assert result == [1, 2]
428 429
429 430 @pytest.mark.parametrize('values', [
430 431 [],
431 432 [1, 2],
432 433 ])
433 434 def test_raises_value_error(self, values):
434 435 with pytest.raises(ValueError):
435 436 self.factory(values)
436 437
437 438 @simplevcs.initialize_generator
438 439 def factory(self, iterable):
439 440 for elem in iterable:
440 441 yield elem
441 442
442 443
443 444 class TestPrepareHooksDaemon(object):
444 445 def test_calls_imported_prepare_callback_daemon(self, app_settings):
445 446 expected_extras = {'extra1': 'value1'}
446 447 daemon = DummyHooksCallbackDaemon()
447 448
448 449 controller = StubVCSController(None, app_settings, None)
449 450 prepare_patcher = mock.patch.object(
450 451 simplevcs, 'prepare_callback_daemon',
451 452 return_value=(daemon, expected_extras))
452 453 with prepare_patcher as prepare_mock:
453 454 callback_daemon, extras = controller._prepare_callback_daemon(
454 455 expected_extras.copy())
455 456 prepare_mock.assert_called_once_with(
456 457 expected_extras,
457 458 protocol=app_settings['vcs.hooks.protocol'],
458 459 use_direct_calls=app_settings['vcs.hooks.direct_calls'])
459 460
460 461 assert callback_daemon == daemon
461 462 assert extras == extras
@@ -1,130 +1,130 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import Pyro4
23 23 import pytest
24 import webtest
25 24
25 from rhodecode.tests.utils import CustomTestApp
26 26 from rhodecode.lib.middleware.utils import scm_app_http, scm_app
27 27 from rhodecode.lib.vcs.conf import settings
28 28
29 29
30 30 def vcs_http_app(vcsserver_http_echo_app):
31 31 """
32 32 VcsHttpProxy wrapped in WebTest.
33 33 """
34 34 git_url = vcsserver_http_echo_app.http_url + 'stream/git/'
35 35 vcs_http_proxy = scm_app_http.VcsHttpProxy(
36 36 git_url, 'stub_path', 'stub_name', None)
37 app = webtest.TestApp(vcs_http_proxy)
37 app = CustomTestApp(vcs_http_proxy)
38 38 return app
39 39
40 40
41 41 @pytest.fixture(scope='module')
42 42 def vcsserver_http_echo_app(request, vcsserver_factory):
43 43 """
44 44 A running VCSServer with the EchoApp activated via HTTP.
45 45 """
46 46 vcsserver = vcsserver_factory(
47 47 request=request,
48 48 use_http=True,
49 49 overrides=[{'app:main': {'dev.use_echo_app': 'true'}}])
50 50 return vcsserver
51 51
52 52
53 53 @pytest.fixture(scope='session')
54 54 def data():
55 55 one_kb = "x" * 1024
56 56 return one_kb * 1024 * 10
57 57
58 58
59 59 def test_reuse_app_no_data(repeat, vcsserver_http_echo_app):
60 60 app = vcs_http_app(vcsserver_http_echo_app)
61 61 for x in xrange(repeat / 10):
62 62 response = app.post('/')
63 63 assert response.status_code == 200
64 64
65 65
66 66 def test_reuse_app_with_data(data, repeat, vcsserver_http_echo_app):
67 67 app = vcs_http_app(vcsserver_http_echo_app)
68 68 for x in xrange(repeat / 10):
69 69 response = app.post('/', params=data)
70 70 assert response.status_code == 200
71 71
72 72
73 73 def test_create_app_per_request_no_data(repeat, vcsserver_http_echo_app):
74 74 for x in xrange(repeat / 10):
75 75 app = vcs_http_app(vcsserver_http_echo_app)
76 76 response = app.post('/')
77 77 assert response.status_code == 200
78 78
79 79
80 80 def test_create_app_per_request_with_data(
81 81 data, repeat, vcsserver_http_echo_app):
82 82 for x in xrange(repeat / 10):
83 83 app = vcs_http_app(vcsserver_http_echo_app)
84 84 response = app.post('/', params=data)
85 85 assert response.status_code == 200
86 86
87 87
88 88 @pytest.fixture(scope='module')
89 89 def vcsserver_pyro_echo_app(request, vcsserver_factory):
90 90 """
91 91 A running VCSServer with the EchoApp activated via Pyro4.
92 92 """
93 93 vcsserver = vcsserver_factory(
94 94 request=request,
95 95 use_http=False,
96 96 overrides=[{'DEFAULT': {'dev.use_echo_app': 'true'}}])
97 97 return vcsserver
98 98
99 99
100 100 def vcs_pyro4_app(vcsserver_pyro_echo_app):
101 101 """
102 102 Pyro4 based Vcs proxy wrapped in WebTest
103 103 """
104 104 stub_config = {
105 105 'git_update_server_info': 'stub',
106 106 }
107 107 server_and_port = vcsserver_pyro_echo_app.server_and_port
108 108 GIT_REMOTE_WSGI = Pyro4.Proxy(
109 109 settings.pyro_remote(
110 110 settings.PYRO_GIT_REMOTE_WSGI, server_and_port))
111 111 with mock.patch('rhodecode.lib.middleware.utils.scm_app.GIT_REMOTE_WSGI',
112 112 GIT_REMOTE_WSGI):
113 113 pyro4_app = scm_app.create_git_wsgi_app(
114 114 'stub_path', 'stub_name', stub_config)
115 app = webtest.TestApp(pyro4_app)
115 app = CustomTestApp(pyro4_app)
116 116 return app
117 117
118 118
119 119 def test_pyro4_no_data(repeat, pylonsapp, vcsserver_pyro_echo_app):
120 120 for x in xrange(repeat / 10):
121 121 app = vcs_pyro4_app(vcsserver_pyro_echo_app)
122 122 response = app.post('/')
123 123 assert response.status_code == 200
124 124
125 125
126 126 def test_pyro4_with_data(repeat, pylonsapp, vcsserver_pyro_echo_app, data):
127 127 for x in xrange(repeat / 10):
128 128 app = vcs_pyro4_app(vcsserver_pyro_echo_app)
129 129 response = app.post('/', params=data)
130 130 assert response.status_code == 200
@@ -1,100 +1,99 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 import webtest
22
21 from rhodecode.tests.utils import CustomTestApp
23 22 from rhodecode.lib.middleware.utils import wsgi_app_caller_client
24 23
25 24 # pylint: disable=protected-access,too-many-public-methods
26 25
27 26
28 27 BASE_ENVIRON = {
29 28 'REQUEST_METHOD': 'GET',
30 29 'SERVER_NAME': 'localhost',
31 30 'SERVER_PORT': '80',
32 31 'SCRIPT_NAME': '',
33 32 'PATH_INFO': '/',
34 33 'QUERY_STRING': '',
35 34 'foo.bool_var': True,
36 35 'foo.str_var': 'True',
37 36 'wsgi.foo': True,
38 37 # Some non string values. The validator expects to get an iterable as
39 38 # value.
40 39 (42,): '42',
41 40 (True,): 'False',
42 41 }
43 42
44 43
45 44 def assert_all_values_are_str(environ):
46 45 """Checks that all values of a dict are str."""
47 46 for key, value in environ.iteritems():
48 47 assert isinstance(value, str), (
49 48 "Value for key %s: has type %s but 'str' was expected. Value: %s" %
50 49 (key, type(value), repr(value)))
51 50
52 51
53 52 def assert_all_keys_are_str(environ):
54 53 """Checks that all keys of a dict are str."""
55 54 for key, value in environ.iteritems():
56 55 assert isinstance(value, str), (
57 56 "Key %s: has type %s but 'str' was expected. " %
58 57 (repr(key), type(key)))
59 58
60 59
61 60 def assert_no_prefix_in_keys(environ, prefix):
62 61 """Checks that no key of the dict starts with the prefix."""
63 62 for key in environ:
64 63 assert not key.startswith(prefix), 'Key %s should not be present' % key
65 64
66 65
67 66 def test_get_environ():
68 67 clean_environ = wsgi_app_caller_client._get_clean_environ(BASE_ENVIRON)
69 68
70 69 assert len(clean_environ) == 7
71 70 assert_no_prefix_in_keys(clean_environ, 'wsgi.')
72 71 assert_all_keys_are_str(clean_environ)
73 72 assert_all_values_are_str(clean_environ)
74 73
75 74
76 75 def test_remote_app_caller():
77 76
78 77 class RemoteAppCallerMock(object):
79 78
80 79 def handle(self, environ, input_data, arg1, arg2,
81 80 arg3=None, arg4=None, arg5=None):
82 81 assert ((arg1, arg2, arg3, arg4, arg5) ==
83 82 ('a1', 'a2', 'a3', 'a4', None))
84 83 # Note: RemoteAppCaller is expected to return a tuple like the
85 84 # following one
86 85 return (['content'], '200 OK', [('Content-Type', 'text/plain')])
87 86
88 87 wrapper_app = wsgi_app_caller_client.RemoteAppCaller(
89 88 RemoteAppCallerMock(), 'a1', 'a2', arg3='a3', arg4='a4')
90 89
91 test_app = webtest.TestApp(wrapper_app)
90 test_app = CustomTestApp(wrapper_app)
92 91
93 92 response = test_app.get('/path')
94 93
95 94 assert response.status == '200 OK'
96 95 assert sorted(response.headers.items()) == sorted([
97 96 ('Content-Type', 'text/plain'),
98 97 ('Content-Length', '7'),
99 98 ])
100 99 assert response.body == 'content'
@@ -1,1797 +1,1799 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32
33 33 import mock
34 34 import pyramid.testing
35 35 import pytest
36 36 import colander
37 37 import requests
38 from webtest.app import TestApp
39 38
40 39 import rhodecode
41 40 from rhodecode.lib.utils2 import AttributeDict
42 41 from rhodecode.model.changeset_status import ChangesetStatusModel
43 42 from rhodecode.model.comment import ChangesetCommentsModel
44 43 from rhodecode.model.db import (
45 44 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 45 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 46 from rhodecode.model.meta import Session
48 47 from rhodecode.model.pull_request import PullRequestModel
49 48 from rhodecode.model.repo import RepoModel
50 49 from rhodecode.model.repo_group import RepoGroupModel
51 50 from rhodecode.model.user import UserModel
52 51 from rhodecode.model.settings import VcsSettingsModel
53 52 from rhodecode.model.user_group import UserGroupModel
54 53 from rhodecode.model.integration import IntegrationModel
55 54 from rhodecode.integrations import integration_type_registry
56 55 from rhodecode.integrations.types.base import IntegrationTypeBase
57 56 from rhodecode.lib.utils import repo2db_mapper
58 57 from rhodecode.lib.vcs import create_vcsserver_proxy
59 58 from rhodecode.lib.vcs.backends import get_backend
60 59 from rhodecode.lib.vcs.nodes import FileNode
61 60 from rhodecode.tests import (
62 61 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 62 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 63 TEST_USER_REGULAR_PASS)
64 from rhodecode.tests.utils import CustomTestApp
65 65 from rhodecode.tests.fixture import Fixture
66 66
67 67
68 68 def _split_comma(value):
69 69 return value.split(',')
70 70
71 71
72 72 def pytest_addoption(parser):
73 73 parser.addoption(
74 74 '--keep-tmp-path', action='store_true',
75 75 help="Keep the test temporary directories")
76 76 parser.addoption(
77 77 '--backends', action='store', type=_split_comma,
78 78 default=['git', 'hg', 'svn'],
79 79 help="Select which backends to test for backend specific tests.")
80 80 parser.addoption(
81 81 '--dbs', action='store', type=_split_comma,
82 82 default=['sqlite'],
83 83 help="Select which database to test for database specific tests. "
84 84 "Possible options are sqlite,postgres,mysql")
85 85 parser.addoption(
86 86 '--appenlight', '--ae', action='store_true',
87 87 help="Track statistics in appenlight.")
88 88 parser.addoption(
89 89 '--appenlight-api-key', '--ae-key',
90 90 help="API key for Appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-url', '--ae-url',
93 93 default="https://ae.rhodecode.com",
94 94 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
95 95 parser.addoption(
96 96 '--sqlite-connection-string', action='store',
97 97 default='', help="Connection string for the dbs tests with SQLite")
98 98 parser.addoption(
99 99 '--postgres-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with Postgres")
101 101 parser.addoption(
102 102 '--mysql-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with MySQL")
104 104 parser.addoption(
105 105 '--repeat', type=int, default=100,
106 106 help="Number of repetitions in performance tests.")
107 107
108 108
109 109 def pytest_configure(config):
110 110 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
111 111 from rhodecode.config import patches
112 112 patches.kombu_1_5_1_python_2_7_11()
113 113
114 114
115 115 def pytest_collection_modifyitems(session, config, items):
116 116 # nottest marked, compare nose, used for transition from nose to pytest
117 117 remaining = [
118 118 i for i in items if getattr(i.obj, '__test__', True)]
119 119 items[:] = remaining
120 120
121 121
122 122 def pytest_generate_tests(metafunc):
123 123 # Support test generation based on --backend parameter
124 124 if 'backend_alias' in metafunc.fixturenames:
125 125 backends = get_backends_from_metafunc(metafunc)
126 126 scope = None
127 127 if not backends:
128 128 pytest.skip("Not enabled for any of selected backends")
129 129 metafunc.parametrize('backend_alias', backends, scope=scope)
130 130 elif hasattr(metafunc.function, 'backends'):
131 131 backends = get_backends_from_metafunc(metafunc)
132 132 if not backends:
133 133 pytest.skip("Not enabled for any of selected backends")
134 134
135 135
136 136 def get_backends_from_metafunc(metafunc):
137 137 requested_backends = set(metafunc.config.getoption('--backends'))
138 138 if hasattr(metafunc.function, 'backends'):
139 139 # Supported backends by this test function, created from
140 140 # pytest.mark.backends
141 141 backends = metafunc.function.backends.args
142 142 elif hasattr(metafunc.cls, 'backend_alias'):
143 143 # Support class attribute "backend_alias", this is mainly
144 144 # for legacy reasons for tests not yet using pytest.mark.backends
145 145 backends = [metafunc.cls.backend_alias]
146 146 else:
147 147 backends = metafunc.config.getoption('--backends')
148 148 return requested_backends.intersection(backends)
149 149
150 150
151 151 @pytest.fixture(scope='session', autouse=True)
152 152 def activate_example_rcextensions(request):
153 153 """
154 154 Patch in an example rcextensions module which verifies passed in kwargs.
155 155 """
156 156 from rhodecode.tests.other import example_rcextensions
157 157
158 158 old_extensions = rhodecode.EXTENSIONS
159 159 rhodecode.EXTENSIONS = example_rcextensions
160 160
161 161 @request.addfinalizer
162 162 def cleanup():
163 163 rhodecode.EXTENSIONS = old_extensions
164 164
165 165
166 166 @pytest.fixture
167 167 def capture_rcextensions():
168 168 """
169 169 Returns the recorded calls to entry points in rcextensions.
170 170 """
171 171 calls = rhodecode.EXTENSIONS.calls
172 172 calls.clear()
173 173 # Note: At this moment, it is still the empty dict, but that will
174 174 # be filled during the test run and since it is a reference this
175 175 # is enough to make it work.
176 176 return calls
177 177
178 178
179 179 @pytest.fixture(scope='session')
180 180 def http_environ_session():
181 181 """
182 182 Allow to use "http_environ" in session scope.
183 183 """
184 184 return http_environ(
185 185 http_host_stub=http_host_stub())
186 186
187 187
188 188 @pytest.fixture
189 189 def http_host_stub():
190 190 """
191 191 Value of HTTP_HOST in the test run.
192 192 """
193 193 return 'test.example.com:80'
194 194
195 195
196 196 @pytest.fixture
197 197 def http_environ(http_host_stub):
198 198 """
199 199 HTTP extra environ keys.
200 200
201 201 User by the test application and as well for setting up the pylons
202 202 environment. In the case of the fixture "app" it should be possible
203 203 to override this for a specific test case.
204 204 """
205 205 return {
206 206 'SERVER_NAME': http_host_stub.split(':')[0],
207 207 'SERVER_PORT': http_host_stub.split(':')[1],
208 208 'HTTP_HOST': http_host_stub,
209 209 }
210 210
211 211
212 212 @pytest.fixture(scope='function')
213 213 def app(request, pylonsapp, http_environ):
214 app = TestApp(
214
215
216 app = CustomTestApp(
215 217 pylonsapp,
216 218 extra_environ=http_environ)
217 219 if request.cls:
218 220 request.cls.app = app
219 221 return app
220 222
221 223
222 224 @pytest.fixture(scope='session')
223 225 def app_settings(pylonsapp, pylons_config):
224 226 """
225 227 Settings dictionary used to create the app.
226 228
227 229 Parses the ini file and passes the result through the sanitize and apply
228 230 defaults mechanism in `rhodecode.config.middleware`.
229 231 """
230 232 from paste.deploy.loadwsgi import loadcontext, APP
231 233 from rhodecode.config.middleware import (
232 234 sanitize_settings_and_apply_defaults)
233 235 context = loadcontext(APP, 'config:' + pylons_config)
234 236 settings = sanitize_settings_and_apply_defaults(context.config())
235 237 return settings
236 238
237 239
238 240 @pytest.fixture(scope='session')
239 241 def db(app_settings):
240 242 """
241 243 Initializes the database connection.
242 244
243 245 It uses the same settings which are used to create the ``pylonsapp`` or
244 246 ``app`` fixtures.
245 247 """
246 248 from rhodecode.config.utils import initialize_database
247 249 initialize_database(app_settings)
248 250
249 251
250 252 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
251 253
252 254
253 255 def _autologin_user(app, *args):
254 256 session = login_user_session(app, *args)
255 257 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
256 258 return LoginData(csrf_token, session['rhodecode_user'])
257 259
258 260
259 261 @pytest.fixture
260 262 def autologin_user(app):
261 263 """
262 264 Utility fixture which makes sure that the admin user is logged in
263 265 """
264 266 return _autologin_user(app)
265 267
266 268
267 269 @pytest.fixture
268 270 def autologin_regular_user(app):
269 271 """
270 272 Utility fixture which makes sure that the regular user is logged in
271 273 """
272 274 return _autologin_user(
273 275 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
274 276
275 277
276 278 @pytest.fixture(scope='function')
277 279 def csrf_token(request, autologin_user):
278 280 return autologin_user.csrf_token
279 281
280 282
281 283 @pytest.fixture(scope='function')
282 284 def xhr_header(request):
283 285 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
284 286
285 287
286 288 @pytest.fixture
287 289 def real_crypto_backend(monkeypatch):
288 290 """
289 291 Switch the production crypto backend on for this test.
290 292
291 293 During the test run the crypto backend is replaced with a faster
292 294 implementation based on the MD5 algorithm.
293 295 """
294 296 monkeypatch.setattr(rhodecode, 'is_test', False)
295 297
296 298
297 299 @pytest.fixture(scope='class')
298 300 def index_location(request, pylonsapp):
299 301 index_location = pylonsapp.config['app_conf']['search.location']
300 302 if request.cls:
301 303 request.cls.index_location = index_location
302 304 return index_location
303 305
304 306
305 307 @pytest.fixture(scope='session', autouse=True)
306 308 def tests_tmp_path(request):
307 309 """
308 310 Create temporary directory to be used during the test session.
309 311 """
310 312 if not os.path.exists(TESTS_TMP_PATH):
311 313 os.makedirs(TESTS_TMP_PATH)
312 314
313 315 if not request.config.getoption('--keep-tmp-path'):
314 316 @request.addfinalizer
315 317 def remove_tmp_path():
316 318 shutil.rmtree(TESTS_TMP_PATH)
317 319
318 320 return TESTS_TMP_PATH
319 321
320 322
321 323 @pytest.fixture(scope='session', autouse=True)
322 324 def patch_pyro_request_scope_proxy_factory(request):
323 325 """
324 326 Patch the pyro proxy factory to always use the same dummy request object
325 327 when under test. This will return the same pyro proxy on every call.
326 328 """
327 329 dummy_request = pyramid.testing.DummyRequest()
328 330
329 331 def mocked_call(self, request=None):
330 332 return self.getProxy(request=dummy_request)
331 333
332 334 patcher = mock.patch(
333 335 'rhodecode.lib.vcs.client.RequestScopeProxyFactory.__call__',
334 336 new=mocked_call)
335 337 patcher.start()
336 338
337 339 @request.addfinalizer
338 340 def undo_patching():
339 341 patcher.stop()
340 342
341 343
342 344 @pytest.fixture
343 345 def test_repo_group(request):
344 346 """
345 347 Create a temporary repository group, and destroy it after
346 348 usage automatically
347 349 """
348 350 fixture = Fixture()
349 351 repogroupid = 'test_repo_group_%s' % int(time.time())
350 352 repo_group = fixture.create_repo_group(repogroupid)
351 353
352 354 def _cleanup():
353 355 fixture.destroy_repo_group(repogroupid)
354 356
355 357 request.addfinalizer(_cleanup)
356 358 return repo_group
357 359
358 360
359 361 @pytest.fixture
360 362 def test_user_group(request):
361 363 """
362 364 Create a temporary user group, and destroy it after
363 365 usage automatically
364 366 """
365 367 fixture = Fixture()
366 368 usergroupid = 'test_user_group_%s' % int(time.time())
367 369 user_group = fixture.create_user_group(usergroupid)
368 370
369 371 def _cleanup():
370 372 fixture.destroy_user_group(user_group)
371 373
372 374 request.addfinalizer(_cleanup)
373 375 return user_group
374 376
375 377
376 378 @pytest.fixture(scope='session')
377 379 def test_repo(request):
378 380 container = TestRepoContainer()
379 381 request.addfinalizer(container._cleanup)
380 382 return container
381 383
382 384
383 385 class TestRepoContainer(object):
384 386 """
385 387 Container for test repositories which are used read only.
386 388
387 389 Repositories will be created on demand and re-used during the lifetime
388 390 of this object.
389 391
390 392 Usage to get the svn test repository "minimal"::
391 393
392 394 test_repo = TestContainer()
393 395 repo = test_repo('minimal', 'svn')
394 396
395 397 """
396 398
397 399 dump_extractors = {
398 400 'git': utils.extract_git_repo_from_dump,
399 401 'hg': utils.extract_hg_repo_from_dump,
400 402 'svn': utils.extract_svn_repo_from_dump,
401 403 }
402 404
403 405 def __init__(self):
404 406 self._cleanup_repos = []
405 407 self._fixture = Fixture()
406 408 self._repos = {}
407 409
408 410 def __call__(self, dump_name, backend_alias):
409 411 key = (dump_name, backend_alias)
410 412 if key not in self._repos:
411 413 repo = self._create_repo(dump_name, backend_alias)
412 414 self._repos[key] = repo.repo_id
413 415 return Repository.get(self._repos[key])
414 416
415 417 def _create_repo(self, dump_name, backend_alias):
416 418 repo_name = '%s-%s' % (backend_alias, dump_name)
417 419 backend_class = get_backend(backend_alias)
418 420 dump_extractor = self.dump_extractors[backend_alias]
419 421 repo_path = dump_extractor(dump_name, repo_name)
420 422 vcs_repo = backend_class(repo_path)
421 423 repo2db_mapper({repo_name: vcs_repo})
422 424 repo = RepoModel().get_by_repo_name(repo_name)
423 425 self._cleanup_repos.append(repo_name)
424 426 return repo
425 427
426 428 def _cleanup(self):
427 429 for repo_name in reversed(self._cleanup_repos):
428 430 self._fixture.destroy_repo(repo_name)
429 431
430 432
431 433 @pytest.fixture
432 434 def backend(request, backend_alias, pylonsapp, test_repo):
433 435 """
434 436 Parametrized fixture which represents a single backend implementation.
435 437
436 438 It respects the option `--backends` to focus the test run on specific
437 439 backend implementations.
438 440
439 441 It also supports `pytest.mark.xfail_backends` to mark tests as failing
440 442 for specific backends. This is intended as a utility for incremental
441 443 development of a new backend implementation.
442 444 """
443 445 if backend_alias not in request.config.getoption('--backends'):
444 446 pytest.skip("Backend %s not selected." % (backend_alias, ))
445 447
446 448 utils.check_xfail_backends(request.node, backend_alias)
447 449 utils.check_skip_backends(request.node, backend_alias)
448 450
449 451 repo_name = 'vcs_test_%s' % (backend_alias, )
450 452 backend = Backend(
451 453 alias=backend_alias,
452 454 repo_name=repo_name,
453 455 test_name=request.node.name,
454 456 test_repo_container=test_repo)
455 457 request.addfinalizer(backend.cleanup)
456 458 return backend
457 459
458 460
459 461 @pytest.fixture
460 462 def backend_git(request, pylonsapp, test_repo):
461 463 return backend(request, 'git', pylonsapp, test_repo)
462 464
463 465
464 466 @pytest.fixture
465 467 def backend_hg(request, pylonsapp, test_repo):
466 468 return backend(request, 'hg', pylonsapp, test_repo)
467 469
468 470
469 471 @pytest.fixture
470 472 def backend_svn(request, pylonsapp, test_repo):
471 473 return backend(request, 'svn', pylonsapp, test_repo)
472 474
473 475
474 476 @pytest.fixture
475 477 def backend_random(backend_git):
476 478 """
477 479 Use this to express that your tests need "a backend.
478 480
479 481 A few of our tests need a backend, so that we can run the code. This
480 482 fixture is intended to be used for such cases. It will pick one of the
481 483 backends and run the tests.
482 484
483 485 The fixture `backend` would run the test multiple times for each
484 486 available backend which is a pure waste of time if the test is
485 487 independent of the backend type.
486 488 """
487 489 # TODO: johbo: Change this to pick a random backend
488 490 return backend_git
489 491
490 492
491 493 @pytest.fixture
492 494 def backend_stub(backend_git):
493 495 """
494 496 Use this to express that your tests need a backend stub
495 497
496 498 TODO: mikhail: Implement a real stub logic instead of returning
497 499 a git backend
498 500 """
499 501 return backend_git
500 502
501 503
502 504 @pytest.fixture
503 505 def repo_stub(backend_stub):
504 506 """
505 507 Use this to express that your tests need a repository stub
506 508 """
507 509 return backend_stub.create_repo()
508 510
509 511
510 512 class Backend(object):
511 513 """
512 514 Represents the test configuration for one supported backend
513 515
514 516 Provides easy access to different test repositories based on
515 517 `__getitem__`. Such repositories will only be created once per test
516 518 session.
517 519 """
518 520
519 521 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
520 522 _master_repo = None
521 523 _commit_ids = {}
522 524
523 525 def __init__(self, alias, repo_name, test_name, test_repo_container):
524 526 self.alias = alias
525 527 self.repo_name = repo_name
526 528 self._cleanup_repos = []
527 529 self._test_name = test_name
528 530 self._test_repo_container = test_repo_container
529 531 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
530 532 # Fixture will survive in the end.
531 533 self._fixture = Fixture()
532 534
533 535 def __getitem__(self, key):
534 536 return self._test_repo_container(key, self.alias)
535 537
536 538 @property
537 539 def repo(self):
538 540 """
539 541 Returns the "current" repository. This is the vcs_test repo or the
540 542 last repo which has been created with `create_repo`.
541 543 """
542 544 from rhodecode.model.db import Repository
543 545 return Repository.get_by_repo_name(self.repo_name)
544 546
545 547 @property
546 548 def default_branch_name(self):
547 549 VcsRepository = get_backend(self.alias)
548 550 return VcsRepository.DEFAULT_BRANCH_NAME
549 551
550 552 @property
551 553 def default_head_id(self):
552 554 """
553 555 Returns the default head id of the underlying backend.
554 556
555 557 This will be the default branch name in case the backend does have a
556 558 default branch. In the other cases it will point to a valid head
557 559 which can serve as the base to create a new commit on top of it.
558 560 """
559 561 vcsrepo = self.repo.scm_instance()
560 562 head_id = (
561 563 vcsrepo.DEFAULT_BRANCH_NAME or
562 564 vcsrepo.commit_ids[-1])
563 565 return head_id
564 566
565 567 @property
566 568 def commit_ids(self):
567 569 """
568 570 Returns the list of commits for the last created repository
569 571 """
570 572 return self._commit_ids
571 573
572 574 def create_master_repo(self, commits):
573 575 """
574 576 Create a repository and remember it as a template.
575 577
576 578 This allows to easily create derived repositories to construct
577 579 more complex scenarios for diff, compare and pull requests.
578 580
579 581 Returns a commit map which maps from commit message to raw_id.
580 582 """
581 583 self._master_repo = self.create_repo(commits=commits)
582 584 return self._commit_ids
583 585
584 586 def create_repo(
585 587 self, commits=None, number_of_commits=0, heads=None,
586 588 name_suffix=u'', **kwargs):
587 589 """
588 590 Create a repository and record it for later cleanup.
589 591
590 592 :param commits: Optional. A sequence of dict instances.
591 593 Will add a commit per entry to the new repository.
592 594 :param number_of_commits: Optional. If set to a number, this number of
593 595 commits will be added to the new repository.
594 596 :param heads: Optional. Can be set to a sequence of of commit
595 597 names which shall be pulled in from the master repository.
596 598
597 599 """
598 600 self.repo_name = self._next_repo_name() + name_suffix
599 601 repo = self._fixture.create_repo(
600 602 self.repo_name, repo_type=self.alias, **kwargs)
601 603 self._cleanup_repos.append(repo.repo_name)
602 604
603 605 commits = commits or [
604 606 {'message': 'Commit %s of %s' % (x, self.repo_name)}
605 607 for x in xrange(number_of_commits)]
606 608 self._add_commits_to_repo(repo.scm_instance(), commits)
607 609 if heads:
608 610 self.pull_heads(repo, heads)
609 611
610 612 return repo
611 613
612 614 def pull_heads(self, repo, heads):
613 615 """
614 616 Make sure that repo contains all commits mentioned in `heads`
615 617 """
616 618 vcsmaster = self._master_repo.scm_instance()
617 619 vcsrepo = repo.scm_instance()
618 620 vcsrepo.config.clear_section('hooks')
619 621 commit_ids = [self._commit_ids[h] for h in heads]
620 622 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
621 623
622 624 def create_fork(self):
623 625 repo_to_fork = self.repo_name
624 626 self.repo_name = self._next_repo_name()
625 627 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
626 628 self._cleanup_repos.append(self.repo_name)
627 629 return repo
628 630
629 631 def new_repo_name(self, suffix=u''):
630 632 self.repo_name = self._next_repo_name() + suffix
631 633 self._cleanup_repos.append(self.repo_name)
632 634 return self.repo_name
633 635
634 636 def _next_repo_name(self):
635 637 return u"%s_%s" % (
636 638 self.invalid_repo_name.sub(u'_', self._test_name),
637 639 len(self._cleanup_repos))
638 640
639 641 def ensure_file(self, filename, content='Test content\n'):
640 642 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
641 643 commits = [
642 644 {'added': [
643 645 FileNode(filename, content=content),
644 646 ]},
645 647 ]
646 648 self._add_commits_to_repo(self.repo.scm_instance(), commits)
647 649
648 650 def enable_downloads(self):
649 651 repo = self.repo
650 652 repo.enable_downloads = True
651 653 Session().add(repo)
652 654 Session().commit()
653 655
654 656 def cleanup(self):
655 657 for repo_name in reversed(self._cleanup_repos):
656 658 self._fixture.destroy_repo(repo_name)
657 659
658 660 def _add_commits_to_repo(self, repo, commits):
659 661 commit_ids = _add_commits_to_repo(repo, commits)
660 662 if not commit_ids:
661 663 return
662 664 self._commit_ids = commit_ids
663 665
664 666 # Creating refs for Git to allow fetching them from remote repository
665 667 if self.alias == 'git':
666 668 refs = {}
667 669 for message in self._commit_ids:
668 670 # TODO: mikhail: do more special chars replacements
669 671 ref_name = 'refs/test-refs/{}'.format(
670 672 message.replace(' ', ''))
671 673 refs[ref_name] = self._commit_ids[message]
672 674 self._create_refs(repo, refs)
673 675
674 676 def _create_refs(self, repo, refs):
675 677 for ref_name in refs:
676 678 repo.set_refs(ref_name, refs[ref_name])
677 679
678 680
679 681 @pytest.fixture
680 682 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
681 683 """
682 684 Parametrized fixture which represents a single vcs backend implementation.
683 685
684 686 See the fixture `backend` for more details. This one implements the same
685 687 concept, but on vcs level. So it does not provide model instances etc.
686 688
687 689 Parameters are generated dynamically, see :func:`pytest_generate_tests`
688 690 for how this works.
689 691 """
690 692 if backend_alias not in request.config.getoption('--backends'):
691 693 pytest.skip("Backend %s not selected." % (backend_alias, ))
692 694
693 695 utils.check_xfail_backends(request.node, backend_alias)
694 696 utils.check_skip_backends(request.node, backend_alias)
695 697
696 698 repo_name = 'vcs_test_%s' % (backend_alias, )
697 699 repo_path = os.path.join(tests_tmp_path, repo_name)
698 700 backend = VcsBackend(
699 701 alias=backend_alias,
700 702 repo_path=repo_path,
701 703 test_name=request.node.name,
702 704 test_repo_container=test_repo)
703 705 request.addfinalizer(backend.cleanup)
704 706 return backend
705 707
706 708
707 709 @pytest.fixture
708 710 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
709 711 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
710 712
711 713
712 714 @pytest.fixture
713 715 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
714 716 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
715 717
716 718
717 719 @pytest.fixture
718 720 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
719 721 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
720 722
721 723
722 724 @pytest.fixture
723 725 def vcsbackend_random(vcsbackend_git):
724 726 """
725 727 Use this to express that your tests need "a vcsbackend".
726 728
727 729 The fixture `vcsbackend` would run the test multiple times for each
728 730 available vcs backend which is a pure waste of time if the test is
729 731 independent of the vcs backend type.
730 732 """
731 733 # TODO: johbo: Change this to pick a random backend
732 734 return vcsbackend_git
733 735
734 736
735 737 @pytest.fixture
736 738 def vcsbackend_stub(vcsbackend_git):
737 739 """
738 740 Use this to express that your test just needs a stub of a vcsbackend.
739 741
740 742 Plan is to eventually implement an in-memory stub to speed tests up.
741 743 """
742 744 return vcsbackend_git
743 745
744 746
745 747 class VcsBackend(object):
746 748 """
747 749 Represents the test configuration for one supported vcs backend.
748 750 """
749 751
750 752 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
751 753
752 754 def __init__(self, alias, repo_path, test_name, test_repo_container):
753 755 self.alias = alias
754 756 self._repo_path = repo_path
755 757 self._cleanup_repos = []
756 758 self._test_name = test_name
757 759 self._test_repo_container = test_repo_container
758 760
759 761 def __getitem__(self, key):
760 762 return self._test_repo_container(key, self.alias).scm_instance()
761 763
762 764 @property
763 765 def repo(self):
764 766 """
765 767 Returns the "current" repository. This is the vcs_test repo of the last
766 768 repo which has been created.
767 769 """
768 770 Repository = get_backend(self.alias)
769 771 return Repository(self._repo_path)
770 772
771 773 @property
772 774 def backend(self):
773 775 """
774 776 Returns the backend implementation class.
775 777 """
776 778 return get_backend(self.alias)
777 779
778 780 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
779 781 repo_name = self._next_repo_name()
780 782 self._repo_path = get_new_dir(repo_name)
781 783 repo_class = get_backend(self.alias)
782 784 src_url = None
783 785 if _clone_repo:
784 786 src_url = _clone_repo.path
785 787 repo = repo_class(self._repo_path, create=True, src_url=src_url)
786 788 self._cleanup_repos.append(repo)
787 789
788 790 commits = commits or [
789 791 {'message': 'Commit %s of %s' % (x, repo_name)}
790 792 for x in xrange(number_of_commits)]
791 793 _add_commits_to_repo(repo, commits)
792 794 return repo
793 795
794 796 def clone_repo(self, repo):
795 797 return self.create_repo(_clone_repo=repo)
796 798
797 799 def cleanup(self):
798 800 for repo in self._cleanup_repos:
799 801 shutil.rmtree(repo.path)
800 802
801 803 def new_repo_path(self):
802 804 repo_name = self._next_repo_name()
803 805 self._repo_path = get_new_dir(repo_name)
804 806 return self._repo_path
805 807
806 808 def _next_repo_name(self):
807 809 return "%s_%s" % (
808 810 self.invalid_repo_name.sub('_', self._test_name),
809 811 len(self._cleanup_repos))
810 812
811 813 def add_file(self, repo, filename, content='Test content\n'):
812 814 imc = repo.in_memory_commit
813 815 imc.add(FileNode(filename, content=content))
814 816 imc.commit(
815 817 message=u'Automatic commit from vcsbackend fixture',
816 818 author=u'Automatic')
817 819
818 820 def ensure_file(self, filename, content='Test content\n'):
819 821 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
820 822 self.add_file(self.repo, filename, content)
821 823
822 824
823 825 def _add_commits_to_repo(vcs_repo, commits):
824 826 commit_ids = {}
825 827 if not commits:
826 828 return commit_ids
827 829
828 830 imc = vcs_repo.in_memory_commit
829 831 commit = None
830 832
831 833 for idx, commit in enumerate(commits):
832 834 message = unicode(commit.get('message', 'Commit %s' % idx))
833 835
834 836 for node in commit.get('added', []):
835 837 imc.add(FileNode(node.path, content=node.content))
836 838 for node in commit.get('changed', []):
837 839 imc.change(FileNode(node.path, content=node.content))
838 840 for node in commit.get('removed', []):
839 841 imc.remove(FileNode(node.path))
840 842
841 843 parents = [
842 844 vcs_repo.get_commit(commit_id=commit_ids[p])
843 845 for p in commit.get('parents', [])]
844 846
845 847 operations = ('added', 'changed', 'removed')
846 848 if not any((commit.get(o) for o in operations)):
847 849 imc.add(FileNode('file_%s' % idx, content=message))
848 850
849 851 commit = imc.commit(
850 852 message=message,
851 853 author=unicode(commit.get('author', 'Automatic')),
852 854 date=commit.get('date'),
853 855 branch=commit.get('branch'),
854 856 parents=parents)
855 857
856 858 commit_ids[commit.message] = commit.raw_id
857 859
858 860 return commit_ids
859 861
860 862
861 863 @pytest.fixture
862 864 def reposerver(request):
863 865 """
864 866 Allows to serve a backend repository
865 867 """
866 868
867 869 repo_server = RepoServer()
868 870 request.addfinalizer(repo_server.cleanup)
869 871 return repo_server
870 872
871 873
872 874 class RepoServer(object):
873 875 """
874 876 Utility to serve a local repository for the duration of a test case.
875 877
876 878 Supports only Subversion so far.
877 879 """
878 880
879 881 url = None
880 882
881 883 def __init__(self):
882 884 self._cleanup_servers = []
883 885
884 886 def serve(self, vcsrepo):
885 887 if vcsrepo.alias != 'svn':
886 888 raise TypeError("Backend %s not supported" % vcsrepo.alias)
887 889
888 890 proc = subprocess32.Popen(
889 891 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
890 892 '--root', vcsrepo.path])
891 893 self._cleanup_servers.append(proc)
892 894 self.url = 'svn://localhost'
893 895
894 896 def cleanup(self):
895 897 for proc in self._cleanup_servers:
896 898 proc.terminate()
897 899
898 900
899 901 @pytest.fixture
900 902 def pr_util(backend, request):
901 903 """
902 904 Utility for tests of models and for functional tests around pull requests.
903 905
904 906 It gives an instance of :class:`PRTestUtility` which provides various
905 907 utility methods around one pull request.
906 908
907 909 This fixture uses `backend` and inherits its parameterization.
908 910 """
909 911
910 912 util = PRTestUtility(backend)
911 913
912 914 @request.addfinalizer
913 915 def cleanup():
914 916 util.cleanup()
915 917
916 918 return util
917 919
918 920
919 921 class PRTestUtility(object):
920 922
921 923 pull_request = None
922 924 pull_request_id = None
923 925 mergeable_patcher = None
924 926 mergeable_mock = None
925 927 notification_patcher = None
926 928
927 929 def __init__(self, backend):
928 930 self.backend = backend
929 931
930 932 def create_pull_request(
931 933 self, commits=None, target_head=None, source_head=None,
932 934 revisions=None, approved=False, author=None, mergeable=False,
933 935 enable_notifications=True, name_suffix=u'', reviewers=None,
934 936 title=u"Test", description=u"Description"):
935 937 self.set_mergeable(mergeable)
936 938 if not enable_notifications:
937 939 # mock notification side effect
938 940 self.notification_patcher = mock.patch(
939 941 'rhodecode.model.notification.NotificationModel.create')
940 942 self.notification_patcher.start()
941 943
942 944 if not self.pull_request:
943 945 if not commits:
944 946 commits = [
945 947 {'message': 'c1'},
946 948 {'message': 'c2'},
947 949 {'message': 'c3'},
948 950 ]
949 951 target_head = 'c1'
950 952 source_head = 'c2'
951 953 revisions = ['c2']
952 954
953 955 self.commit_ids = self.backend.create_master_repo(commits)
954 956 self.target_repository = self.backend.create_repo(
955 957 heads=[target_head], name_suffix=name_suffix)
956 958 self.source_repository = self.backend.create_repo(
957 959 heads=[source_head], name_suffix=name_suffix)
958 960 self.author = author or UserModel().get_by_username(
959 961 TEST_USER_ADMIN_LOGIN)
960 962
961 963 model = PullRequestModel()
962 964 self.create_parameters = {
963 965 'created_by': self.author,
964 966 'source_repo': self.source_repository.repo_name,
965 967 'source_ref': self._default_branch_reference(source_head),
966 968 'target_repo': self.target_repository.repo_name,
967 969 'target_ref': self._default_branch_reference(target_head),
968 970 'revisions': [self.commit_ids[r] for r in revisions],
969 971 'reviewers': reviewers or self._get_reviewers(),
970 972 'title': title,
971 973 'description': description,
972 974 }
973 975 self.pull_request = model.create(**self.create_parameters)
974 976 assert model.get_versions(self.pull_request) == []
975 977
976 978 self.pull_request_id = self.pull_request.pull_request_id
977 979
978 980 if approved:
979 981 self.approve()
980 982
981 983 Session().add(self.pull_request)
982 984 Session().commit()
983 985
984 986 return self.pull_request
985 987
986 988 def approve(self):
987 989 self.create_status_votes(
988 990 ChangesetStatus.STATUS_APPROVED,
989 991 *self.pull_request.reviewers)
990 992
991 993 def close(self):
992 994 PullRequestModel().close_pull_request(self.pull_request, self.author)
993 995
994 996 def _default_branch_reference(self, commit_message):
995 997 reference = '%s:%s:%s' % (
996 998 'branch',
997 999 self.backend.default_branch_name,
998 1000 self.commit_ids[commit_message])
999 1001 return reference
1000 1002
1001 1003 def _get_reviewers(self):
1002 1004 model = UserModel()
1003 1005 return [
1004 1006 model.get_by_username(TEST_USER_REGULAR_LOGIN),
1005 1007 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
1006 1008 ]
1007 1009
1008 1010 def update_source_repository(self, head=None):
1009 1011 heads = [head or 'c3']
1010 1012 self.backend.pull_heads(self.source_repository, heads=heads)
1011 1013
1012 1014 def add_one_commit(self, head=None):
1013 1015 self.update_source_repository(head=head)
1014 1016 old_commit_ids = set(self.pull_request.revisions)
1015 1017 PullRequestModel().update_commits(self.pull_request)
1016 1018 commit_ids = set(self.pull_request.revisions)
1017 1019 new_commit_ids = commit_ids - old_commit_ids
1018 1020 assert len(new_commit_ids) == 1
1019 1021 return new_commit_ids.pop()
1020 1022
1021 1023 def remove_one_commit(self):
1022 1024 assert len(self.pull_request.revisions) == 2
1023 1025 source_vcs = self.source_repository.scm_instance()
1024 1026 removed_commit_id = source_vcs.commit_ids[-1]
1025 1027
1026 1028 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1027 1029 # remove the if once that's sorted out.
1028 1030 if self.backend.alias == "git":
1029 1031 kwargs = {'branch_name': self.backend.default_branch_name}
1030 1032 else:
1031 1033 kwargs = {}
1032 1034 source_vcs.strip(removed_commit_id, **kwargs)
1033 1035
1034 1036 PullRequestModel().update_commits(self.pull_request)
1035 1037 assert len(self.pull_request.revisions) == 1
1036 1038 return removed_commit_id
1037 1039
1038 1040 def create_comment(self, linked_to=None):
1039 1041 comment = ChangesetCommentsModel().create(
1040 1042 text=u"Test comment",
1041 1043 repo=self.target_repository.repo_name,
1042 1044 user=self.author,
1043 1045 pull_request=self.pull_request)
1044 1046 assert comment.pull_request_version_id is None
1045 1047
1046 1048 if linked_to:
1047 1049 PullRequestModel()._link_comments_to_version(linked_to)
1048 1050
1049 1051 return comment
1050 1052
1051 1053 def create_inline_comment(
1052 1054 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1053 1055 comment = ChangesetCommentsModel().create(
1054 1056 text=u"Test comment",
1055 1057 repo=self.target_repository.repo_name,
1056 1058 user=self.author,
1057 1059 line_no=line_no,
1058 1060 f_path=file_path,
1059 1061 pull_request=self.pull_request)
1060 1062 assert comment.pull_request_version_id is None
1061 1063
1062 1064 if linked_to:
1063 1065 PullRequestModel()._link_comments_to_version(linked_to)
1064 1066
1065 1067 return comment
1066 1068
1067 1069 def create_version_of_pull_request(self):
1068 1070 pull_request = self.create_pull_request()
1069 1071 version = PullRequestModel()._create_version_from_snapshot(
1070 1072 pull_request)
1071 1073 return version
1072 1074
1073 1075 def create_status_votes(self, status, *reviewers):
1074 1076 for reviewer in reviewers:
1075 1077 ChangesetStatusModel().set_status(
1076 1078 repo=self.pull_request.target_repo,
1077 1079 status=status,
1078 1080 user=reviewer.user_id,
1079 1081 pull_request=self.pull_request)
1080 1082
1081 1083 def set_mergeable(self, value):
1082 1084 if not self.mergeable_patcher:
1083 1085 self.mergeable_patcher = mock.patch.object(
1084 1086 VcsSettingsModel, 'get_general_settings')
1085 1087 self.mergeable_mock = self.mergeable_patcher.start()
1086 1088 self.mergeable_mock.return_value = {
1087 1089 'rhodecode_pr_merge_enabled': value}
1088 1090
1089 1091 def cleanup(self):
1090 1092 # In case the source repository is already cleaned up, the pull
1091 1093 # request will already be deleted.
1092 1094 pull_request = PullRequest().get(self.pull_request_id)
1093 1095 if pull_request:
1094 1096 PullRequestModel().delete(pull_request)
1095 1097 Session().commit()
1096 1098
1097 1099 if self.notification_patcher:
1098 1100 self.notification_patcher.stop()
1099 1101
1100 1102 if self.mergeable_patcher:
1101 1103 self.mergeable_patcher.stop()
1102 1104
1103 1105
1104 1106 @pytest.fixture
1105 1107 def user_admin(pylonsapp):
1106 1108 """
1107 1109 Provides the default admin test user as an instance of `db.User`.
1108 1110 """
1109 1111 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1110 1112 return user
1111 1113
1112 1114
1113 1115 @pytest.fixture
1114 1116 def user_regular(pylonsapp):
1115 1117 """
1116 1118 Provides the default regular test user as an instance of `db.User`.
1117 1119 """
1118 1120 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1119 1121 return user
1120 1122
1121 1123
1122 1124 @pytest.fixture
1123 1125 def user_util(request, pylonsapp):
1124 1126 """
1125 1127 Provides a wired instance of `UserUtility` with integrated cleanup.
1126 1128 """
1127 1129 utility = UserUtility(test_name=request.node.name)
1128 1130 request.addfinalizer(utility.cleanup)
1129 1131 return utility
1130 1132
1131 1133
1132 1134 # TODO: johbo: Split this up into utilities per domain or something similar
1133 1135 class UserUtility(object):
1134 1136
1135 1137 def __init__(self, test_name="test"):
1136 1138 self._test_name = self._sanitize_name(test_name)
1137 1139 self.fixture = Fixture()
1138 1140 self.repo_group_ids = []
1139 1141 self.user_ids = []
1140 1142 self.user_group_ids = []
1141 1143 self.user_repo_permission_ids = []
1142 1144 self.user_group_repo_permission_ids = []
1143 1145 self.user_repo_group_permission_ids = []
1144 1146 self.user_group_repo_group_permission_ids = []
1145 1147 self.user_user_group_permission_ids = []
1146 1148 self.user_group_user_group_permission_ids = []
1147 1149 self.user_permissions = []
1148 1150
1149 1151 def _sanitize_name(self, name):
1150 1152 for char in ['[', ']']:
1151 1153 name = name.replace(char, '_')
1152 1154 return name
1153 1155
1154 1156 def create_repo_group(
1155 1157 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1156 1158 group_name = "{prefix}_repogroup_{count}".format(
1157 1159 prefix=self._test_name,
1158 1160 count=len(self.repo_group_ids))
1159 1161 repo_group = self.fixture.create_repo_group(
1160 1162 group_name, cur_user=owner)
1161 1163 if auto_cleanup:
1162 1164 self.repo_group_ids.append(repo_group.group_id)
1163 1165 return repo_group
1164 1166
1165 1167 def create_user(self, auto_cleanup=True, **kwargs):
1166 1168 user_name = "{prefix}_user_{count}".format(
1167 1169 prefix=self._test_name,
1168 1170 count=len(self.user_ids))
1169 1171 user = self.fixture.create_user(user_name, **kwargs)
1170 1172 if auto_cleanup:
1171 1173 self.user_ids.append(user.user_id)
1172 1174 return user
1173 1175
1174 1176 def create_user_with_group(self):
1175 1177 user = self.create_user()
1176 1178 user_group = self.create_user_group(members=[user])
1177 1179 return user, user_group
1178 1180
1179 1181 def create_user_group(self, members=None, auto_cleanup=True, **kwargs):
1180 1182 group_name = "{prefix}_usergroup_{count}".format(
1181 1183 prefix=self._test_name,
1182 1184 count=len(self.user_group_ids))
1183 1185 user_group = self.fixture.create_user_group(group_name, **kwargs)
1184 1186 if auto_cleanup:
1185 1187 self.user_group_ids.append(user_group.users_group_id)
1186 1188 if members:
1187 1189 for user in members:
1188 1190 UserGroupModel().add_user_to_group(user_group, user)
1189 1191 return user_group
1190 1192
1191 1193 def grant_user_permission(self, user_name, permission_name):
1192 1194 self._inherit_default_user_permissions(user_name, False)
1193 1195 self.user_permissions.append((user_name, permission_name))
1194 1196
1195 1197 def grant_user_permission_to_repo_group(
1196 1198 self, repo_group, user, permission_name):
1197 1199 permission = RepoGroupModel().grant_user_permission(
1198 1200 repo_group, user, permission_name)
1199 1201 self.user_repo_group_permission_ids.append(
1200 1202 (repo_group.group_id, user.user_id))
1201 1203 return permission
1202 1204
1203 1205 def grant_user_group_permission_to_repo_group(
1204 1206 self, repo_group, user_group, permission_name):
1205 1207 permission = RepoGroupModel().grant_user_group_permission(
1206 1208 repo_group, user_group, permission_name)
1207 1209 self.user_group_repo_group_permission_ids.append(
1208 1210 (repo_group.group_id, user_group.users_group_id))
1209 1211 return permission
1210 1212
1211 1213 def grant_user_permission_to_repo(
1212 1214 self, repo, user, permission_name):
1213 1215 permission = RepoModel().grant_user_permission(
1214 1216 repo, user, permission_name)
1215 1217 self.user_repo_permission_ids.append(
1216 1218 (repo.repo_id, user.user_id))
1217 1219 return permission
1218 1220
1219 1221 def grant_user_group_permission_to_repo(
1220 1222 self, repo, user_group, permission_name):
1221 1223 permission = RepoModel().grant_user_group_permission(
1222 1224 repo, user_group, permission_name)
1223 1225 self.user_group_repo_permission_ids.append(
1224 1226 (repo.repo_id, user_group.users_group_id))
1225 1227 return permission
1226 1228
1227 1229 def grant_user_permission_to_user_group(
1228 1230 self, target_user_group, user, permission_name):
1229 1231 permission = UserGroupModel().grant_user_permission(
1230 1232 target_user_group, user, permission_name)
1231 1233 self.user_user_group_permission_ids.append(
1232 1234 (target_user_group.users_group_id, user.user_id))
1233 1235 return permission
1234 1236
1235 1237 def grant_user_group_permission_to_user_group(
1236 1238 self, target_user_group, user_group, permission_name):
1237 1239 permission = UserGroupModel().grant_user_group_permission(
1238 1240 target_user_group, user_group, permission_name)
1239 1241 self.user_group_user_group_permission_ids.append(
1240 1242 (target_user_group.users_group_id, user_group.users_group_id))
1241 1243 return permission
1242 1244
1243 1245 def revoke_user_permission(self, user_name, permission_name):
1244 1246 self._inherit_default_user_permissions(user_name, True)
1245 1247 UserModel().revoke_perm(user_name, permission_name)
1246 1248
1247 1249 def _inherit_default_user_permissions(self, user_name, value):
1248 1250 user = UserModel().get_by_username(user_name)
1249 1251 user.inherit_default_permissions = value
1250 1252 Session().add(user)
1251 1253 Session().commit()
1252 1254
1253 1255 def cleanup(self):
1254 1256 self._cleanup_permissions()
1255 1257 self._cleanup_repo_groups()
1256 1258 self._cleanup_user_groups()
1257 1259 self._cleanup_users()
1258 1260
1259 1261 def _cleanup_permissions(self):
1260 1262 if self.user_permissions:
1261 1263 for user_name, permission_name in self.user_permissions:
1262 1264 self.revoke_user_permission(user_name, permission_name)
1263 1265
1264 1266 for permission in self.user_repo_permission_ids:
1265 1267 RepoModel().revoke_user_permission(*permission)
1266 1268
1267 1269 for permission in self.user_group_repo_permission_ids:
1268 1270 RepoModel().revoke_user_group_permission(*permission)
1269 1271
1270 1272 for permission in self.user_repo_group_permission_ids:
1271 1273 RepoGroupModel().revoke_user_permission(*permission)
1272 1274
1273 1275 for permission in self.user_group_repo_group_permission_ids:
1274 1276 RepoGroupModel().revoke_user_group_permission(*permission)
1275 1277
1276 1278 for permission in self.user_user_group_permission_ids:
1277 1279 UserGroupModel().revoke_user_permission(*permission)
1278 1280
1279 1281 for permission in self.user_group_user_group_permission_ids:
1280 1282 UserGroupModel().revoke_user_group_permission(*permission)
1281 1283
1282 1284 def _cleanup_repo_groups(self):
1283 1285 def _repo_group_compare(first_group_id, second_group_id):
1284 1286 """
1285 1287 Gives higher priority to the groups with the most complex paths
1286 1288 """
1287 1289 first_group = RepoGroup.get(first_group_id)
1288 1290 second_group = RepoGroup.get(second_group_id)
1289 1291 first_group_parts = (
1290 1292 len(first_group.group_name.split('/')) if first_group else 0)
1291 1293 second_group_parts = (
1292 1294 len(second_group.group_name.split('/')) if second_group else 0)
1293 1295 return cmp(second_group_parts, first_group_parts)
1294 1296
1295 1297 sorted_repo_group_ids = sorted(
1296 1298 self.repo_group_ids, cmp=_repo_group_compare)
1297 1299 for repo_group_id in sorted_repo_group_ids:
1298 1300 self.fixture.destroy_repo_group(repo_group_id)
1299 1301
1300 1302 def _cleanup_user_groups(self):
1301 1303 def _user_group_compare(first_group_id, second_group_id):
1302 1304 """
1303 1305 Gives higher priority to the groups with the most complex paths
1304 1306 """
1305 1307 first_group = UserGroup.get(first_group_id)
1306 1308 second_group = UserGroup.get(second_group_id)
1307 1309 first_group_parts = (
1308 1310 len(first_group.users_group_name.split('/'))
1309 1311 if first_group else 0)
1310 1312 second_group_parts = (
1311 1313 len(second_group.users_group_name.split('/'))
1312 1314 if second_group else 0)
1313 1315 return cmp(second_group_parts, first_group_parts)
1314 1316
1315 1317 sorted_user_group_ids = sorted(
1316 1318 self.user_group_ids, cmp=_user_group_compare)
1317 1319 for user_group_id in sorted_user_group_ids:
1318 1320 self.fixture.destroy_user_group(user_group_id)
1319 1321
1320 1322 def _cleanup_users(self):
1321 1323 for user_id in self.user_ids:
1322 1324 self.fixture.destroy_user(user_id)
1323 1325
1324 1326
1325 1327 # TODO: Think about moving this into a pytest-pyro package and make it a
1326 1328 # pytest plugin
1327 1329 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1328 1330 def pytest_runtest_makereport(item, call):
1329 1331 """
1330 1332 Adding the remote traceback if the exception has this information.
1331 1333
1332 1334 Pyro4 attaches this information as the attribute `_pyroTraceback`
1333 1335 to the exception instance.
1334 1336 """
1335 1337 outcome = yield
1336 1338 report = outcome.get_result()
1337 1339 if call.excinfo:
1338 1340 _add_pyro_remote_traceback(report, call.excinfo.value)
1339 1341
1340 1342
1341 1343 def _add_pyro_remote_traceback(report, exc):
1342 1344 pyro_traceback = getattr(exc, '_pyroTraceback', None)
1343 1345
1344 1346 if pyro_traceback:
1345 1347 traceback = ''.join(pyro_traceback)
1346 1348 section = 'Pyro4 remote traceback ' + report.when
1347 1349 report.sections.append((section, traceback))
1348 1350
1349 1351
1350 1352 @pytest.fixture(scope='session')
1351 1353 def testrun():
1352 1354 return {
1353 1355 'uuid': uuid.uuid4(),
1354 1356 'start': datetime.datetime.utcnow().isoformat(),
1355 1357 'timestamp': int(time.time()),
1356 1358 }
1357 1359
1358 1360
1359 1361 @pytest.fixture(autouse=True)
1360 1362 def collect_appenlight_stats(request, testrun):
1361 1363 """
1362 1364 This fixture reports memory consumtion of single tests.
1363 1365
1364 1366 It gathers data based on `psutil` and sends them to Appenlight. The option
1365 1367 ``--ae`` has te be used to enable this fixture and the API key for your
1366 1368 application has to be provided in ``--ae-key``.
1367 1369 """
1368 1370 try:
1369 1371 # cygwin cannot have yet psutil support.
1370 1372 import psutil
1371 1373 except ImportError:
1372 1374 return
1373 1375
1374 1376 if not request.config.getoption('--appenlight'):
1375 1377 return
1376 1378 else:
1377 1379 # Only request the pylonsapp fixture if appenlight tracking is
1378 1380 # enabled. This will speed up a test run of unit tests by 2 to 3
1379 1381 # seconds if appenlight is not enabled.
1380 1382 pylonsapp = request.getfuncargvalue("pylonsapp")
1381 1383 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1382 1384 client = AppenlightClient(
1383 1385 url=url,
1384 1386 api_key=request.config.getoption('--appenlight-api-key'),
1385 1387 namespace=request.node.nodeid,
1386 1388 request=str(testrun['uuid']),
1387 1389 testrun=testrun)
1388 1390
1389 1391 client.collect({
1390 1392 'message': "Starting",
1391 1393 })
1392 1394
1393 1395 server_and_port = pylonsapp.config['vcs.server']
1394 1396 server = create_vcsserver_proxy(server_and_port)
1395 1397 with server:
1396 1398 vcs_pid = server.get_pid()
1397 1399 server.run_gc()
1398 1400 vcs_process = psutil.Process(vcs_pid)
1399 1401 mem = vcs_process.memory_info()
1400 1402 client.tag_before('vcsserver.rss', mem.rss)
1401 1403 client.tag_before('vcsserver.vms', mem.vms)
1402 1404
1403 1405 test_process = psutil.Process()
1404 1406 mem = test_process.memory_info()
1405 1407 client.tag_before('test.rss', mem.rss)
1406 1408 client.tag_before('test.vms', mem.vms)
1407 1409
1408 1410 client.tag_before('time', time.time())
1409 1411
1410 1412 @request.addfinalizer
1411 1413 def send_stats():
1412 1414 client.tag_after('time', time.time())
1413 1415 with server:
1414 1416 gc_stats = server.run_gc()
1415 1417 for tag, value in gc_stats.items():
1416 1418 client.tag_after(tag, value)
1417 1419 mem = vcs_process.memory_info()
1418 1420 client.tag_after('vcsserver.rss', mem.rss)
1419 1421 client.tag_after('vcsserver.vms', mem.vms)
1420 1422
1421 1423 mem = test_process.memory_info()
1422 1424 client.tag_after('test.rss', mem.rss)
1423 1425 client.tag_after('test.vms', mem.vms)
1424 1426
1425 1427 client.collect({
1426 1428 'message': "Finished",
1427 1429 })
1428 1430 client.send_stats()
1429 1431
1430 1432 return client
1431 1433
1432 1434
1433 1435 class AppenlightClient():
1434 1436
1435 1437 url_template = '{url}?protocol_version=0.5'
1436 1438
1437 1439 def __init__(
1438 1440 self, url, api_key, add_server=True, add_timestamp=True,
1439 1441 namespace=None, request=None, testrun=None):
1440 1442 self.url = self.url_template.format(url=url)
1441 1443 self.api_key = api_key
1442 1444 self.add_server = add_server
1443 1445 self.add_timestamp = add_timestamp
1444 1446 self.namespace = namespace
1445 1447 self.request = request
1446 1448 self.server = socket.getfqdn(socket.gethostname())
1447 1449 self.tags_before = {}
1448 1450 self.tags_after = {}
1449 1451 self.stats = []
1450 1452 self.testrun = testrun or {}
1451 1453
1452 1454 def tag_before(self, tag, value):
1453 1455 self.tags_before[tag] = value
1454 1456
1455 1457 def tag_after(self, tag, value):
1456 1458 self.tags_after[tag] = value
1457 1459
1458 1460 def collect(self, data):
1459 1461 if self.add_server:
1460 1462 data.setdefault('server', self.server)
1461 1463 if self.add_timestamp:
1462 1464 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1463 1465 if self.namespace:
1464 1466 data.setdefault('namespace', self.namespace)
1465 1467 if self.request:
1466 1468 data.setdefault('request', self.request)
1467 1469 self.stats.append(data)
1468 1470
1469 1471 def send_stats(self):
1470 1472 tags = [
1471 1473 ('testrun', self.request),
1472 1474 ('testrun.start', self.testrun['start']),
1473 1475 ('testrun.timestamp', self.testrun['timestamp']),
1474 1476 ('test', self.namespace),
1475 1477 ]
1476 1478 for key, value in self.tags_before.items():
1477 1479 tags.append((key + '.before', value))
1478 1480 try:
1479 1481 delta = self.tags_after[key] - value
1480 1482 tags.append((key + '.delta', delta))
1481 1483 except Exception:
1482 1484 pass
1483 1485 for key, value in self.tags_after.items():
1484 1486 tags.append((key + '.after', value))
1485 1487 self.collect({
1486 1488 'message': "Collected tags",
1487 1489 'tags': tags,
1488 1490 })
1489 1491
1490 1492 response = requests.post(
1491 1493 self.url,
1492 1494 headers={
1493 1495 'X-appenlight-api-key': self.api_key},
1494 1496 json=self.stats,
1495 1497 )
1496 1498
1497 1499 if not response.status_code == 200:
1498 1500 pprint.pprint(self.stats)
1499 1501 print response.headers
1500 1502 print response.text
1501 1503 raise Exception('Sending to appenlight failed')
1502 1504
1503 1505
1504 1506 @pytest.fixture
1505 1507 def gist_util(request, pylonsapp):
1506 1508 """
1507 1509 Provides a wired instance of `GistUtility` with integrated cleanup.
1508 1510 """
1509 1511 utility = GistUtility()
1510 1512 request.addfinalizer(utility.cleanup)
1511 1513 return utility
1512 1514
1513 1515
1514 1516 class GistUtility(object):
1515 1517 def __init__(self):
1516 1518 self.fixture = Fixture()
1517 1519 self.gist_ids = []
1518 1520
1519 1521 def create_gist(self, **kwargs):
1520 1522 gist = self.fixture.create_gist(**kwargs)
1521 1523 self.gist_ids.append(gist.gist_id)
1522 1524 return gist
1523 1525
1524 1526 def cleanup(self):
1525 1527 for id_ in self.gist_ids:
1526 1528 self.fixture.destroy_gists(str(id_))
1527 1529
1528 1530
1529 1531 @pytest.fixture
1530 1532 def enabled_backends(request):
1531 1533 backends = request.config.option.backends
1532 1534 return backends[:]
1533 1535
1534 1536
1535 1537 @pytest.fixture
1536 1538 def settings_util(request):
1537 1539 """
1538 1540 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1539 1541 """
1540 1542 utility = SettingsUtility()
1541 1543 request.addfinalizer(utility.cleanup)
1542 1544 return utility
1543 1545
1544 1546
1545 1547 class SettingsUtility(object):
1546 1548 def __init__(self):
1547 1549 self.rhodecode_ui_ids = []
1548 1550 self.rhodecode_setting_ids = []
1549 1551 self.repo_rhodecode_ui_ids = []
1550 1552 self.repo_rhodecode_setting_ids = []
1551 1553
1552 1554 def create_repo_rhodecode_ui(
1553 1555 self, repo, section, value, key=None, active=True, cleanup=True):
1554 1556 key = key or hashlib.sha1(
1555 1557 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1556 1558
1557 1559 setting = RepoRhodeCodeUi()
1558 1560 setting.repository_id = repo.repo_id
1559 1561 setting.ui_section = section
1560 1562 setting.ui_value = value
1561 1563 setting.ui_key = key
1562 1564 setting.ui_active = active
1563 1565 Session().add(setting)
1564 1566 Session().commit()
1565 1567
1566 1568 if cleanup:
1567 1569 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1568 1570 return setting
1569 1571
1570 1572 def create_rhodecode_ui(
1571 1573 self, section, value, key=None, active=True, cleanup=True):
1572 1574 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1573 1575
1574 1576 setting = RhodeCodeUi()
1575 1577 setting.ui_section = section
1576 1578 setting.ui_value = value
1577 1579 setting.ui_key = key
1578 1580 setting.ui_active = active
1579 1581 Session().add(setting)
1580 1582 Session().commit()
1581 1583
1582 1584 if cleanup:
1583 1585 self.rhodecode_ui_ids.append(setting.ui_id)
1584 1586 return setting
1585 1587
1586 1588 def create_repo_rhodecode_setting(
1587 1589 self, repo, name, value, type_, cleanup=True):
1588 1590 setting = RepoRhodeCodeSetting(
1589 1591 repo.repo_id, key=name, val=value, type=type_)
1590 1592 Session().add(setting)
1591 1593 Session().commit()
1592 1594
1593 1595 if cleanup:
1594 1596 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1595 1597 return setting
1596 1598
1597 1599 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1598 1600 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1599 1601 Session().add(setting)
1600 1602 Session().commit()
1601 1603
1602 1604 if cleanup:
1603 1605 self.rhodecode_setting_ids.append(setting.app_settings_id)
1604 1606
1605 1607 return setting
1606 1608
1607 1609 def cleanup(self):
1608 1610 for id_ in self.rhodecode_ui_ids:
1609 1611 setting = RhodeCodeUi.get(id_)
1610 1612 Session().delete(setting)
1611 1613
1612 1614 for id_ in self.rhodecode_setting_ids:
1613 1615 setting = RhodeCodeSetting.get(id_)
1614 1616 Session().delete(setting)
1615 1617
1616 1618 for id_ in self.repo_rhodecode_ui_ids:
1617 1619 setting = RepoRhodeCodeUi.get(id_)
1618 1620 Session().delete(setting)
1619 1621
1620 1622 for id_ in self.repo_rhodecode_setting_ids:
1621 1623 setting = RepoRhodeCodeSetting.get(id_)
1622 1624 Session().delete(setting)
1623 1625
1624 1626 Session().commit()
1625 1627
1626 1628
1627 1629 @pytest.fixture
1628 1630 def no_notifications(request):
1629 1631 notification_patcher = mock.patch(
1630 1632 'rhodecode.model.notification.NotificationModel.create')
1631 1633 notification_patcher.start()
1632 1634 request.addfinalizer(notification_patcher.stop)
1633 1635
1634 1636
1635 1637 @pytest.fixture
1636 1638 def silence_action_logger(request):
1637 1639 notification_patcher = mock.patch(
1638 1640 'rhodecode.lib.utils.action_logger')
1639 1641 notification_patcher.start()
1640 1642 request.addfinalizer(notification_patcher.stop)
1641 1643
1642 1644
1643 1645 @pytest.fixture(scope='session')
1644 1646 def repeat(request):
1645 1647 """
1646 1648 The number of repetitions is based on this fixture.
1647 1649
1648 1650 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1649 1651 tests are not too slow in our default test suite.
1650 1652 """
1651 1653 return request.config.getoption('--repeat')
1652 1654
1653 1655
1654 1656 @pytest.fixture
1655 1657 def rhodecode_fixtures():
1656 1658 return Fixture()
1657 1659
1658 1660
1659 1661 @pytest.fixture
1660 1662 def request_stub():
1661 1663 """
1662 1664 Stub request object.
1663 1665 """
1664 1666 request = pyramid.testing.DummyRequest()
1665 1667 request.scheme = 'https'
1666 1668 return request
1667 1669
1668 1670
1669 1671 @pytest.fixture
1670 1672 def config_stub(request, request_stub):
1671 1673 """
1672 1674 Set up pyramid.testing and return the Configurator.
1673 1675 """
1674 1676 config = pyramid.testing.setUp(request=request_stub)
1675 1677
1676 1678 @request.addfinalizer
1677 1679 def cleanup():
1678 1680 pyramid.testing.tearDown()
1679 1681
1680 1682 return config
1681 1683
1682 1684
1683 1685 @pytest.fixture
1684 1686 def StubIntegrationType():
1685 1687 class _StubIntegrationType(IntegrationTypeBase):
1686 1688 """ Test integration type class """
1687 1689
1688 1690 key = 'test'
1689 1691 display_name = 'Test integration type'
1690 1692 description = 'A test integration type for testing'
1691 1693 icon = 'test_icon_html_image'
1692 1694
1693 1695 def __init__(self, settings):
1694 1696 super(_StubIntegrationType, self).__init__(settings)
1695 1697 self.sent_events = [] # for testing
1696 1698
1697 1699 def send_event(self, event):
1698 1700 self.sent_events.append(event)
1699 1701
1700 1702 def settings_schema(self):
1701 1703 class SettingsSchema(colander.Schema):
1702 1704 test_string_field = colander.SchemaNode(
1703 1705 colander.String(),
1704 1706 missing=colander.required,
1705 1707 title='test string field',
1706 1708 )
1707 1709 test_int_field = colander.SchemaNode(
1708 1710 colander.Int(),
1709 1711 title='some integer setting',
1710 1712 )
1711 1713 return SettingsSchema()
1712 1714
1713 1715
1714 1716 integration_type_registry.register_integration_type(_StubIntegrationType)
1715 1717 return _StubIntegrationType
1716 1718
1717 1719 @pytest.fixture
1718 1720 def stub_integration_settings():
1719 1721 return {
1720 1722 'test_string_field': 'some data',
1721 1723 'test_int_field': 100,
1722 1724 }
1723 1725
1724 1726
1725 1727 @pytest.fixture
1726 1728 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1727 1729 stub_integration_settings):
1728 1730 integration = IntegrationModel().create(
1729 1731 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1730 1732 name='test repo integration',
1731 1733 repo=repo_stub, repo_group=None, child_repos_only=None)
1732 1734
1733 1735 @request.addfinalizer
1734 1736 def cleanup():
1735 1737 IntegrationModel().delete(integration)
1736 1738
1737 1739 return integration
1738 1740
1739 1741
1740 1742 @pytest.fixture
1741 1743 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1742 1744 stub_integration_settings):
1743 1745 integration = IntegrationModel().create(
1744 1746 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1745 1747 name='test repogroup integration',
1746 1748 repo=None, repo_group=test_repo_group, child_repos_only=True)
1747 1749
1748 1750 @request.addfinalizer
1749 1751 def cleanup():
1750 1752 IntegrationModel().delete(integration)
1751 1753
1752 1754 return integration
1753 1755
1754 1756
1755 1757 @pytest.fixture
1756 1758 def repogroup_recursive_integration_stub(request, test_repo_group,
1757 1759 StubIntegrationType, stub_integration_settings):
1758 1760 integration = IntegrationModel().create(
1759 1761 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1760 1762 name='test recursive repogroup integration',
1761 1763 repo=None, repo_group=test_repo_group, child_repos_only=False)
1762 1764
1763 1765 @request.addfinalizer
1764 1766 def cleanup():
1765 1767 IntegrationModel().delete(integration)
1766 1768
1767 1769 return integration
1768 1770
1769 1771
1770 1772 @pytest.fixture
1771 1773 def global_integration_stub(request, StubIntegrationType,
1772 1774 stub_integration_settings):
1773 1775 integration = IntegrationModel().create(
1774 1776 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1775 1777 name='test global integration',
1776 1778 repo=None, repo_group=None, child_repos_only=None)
1777 1779
1778 1780 @request.addfinalizer
1779 1781 def cleanup():
1780 1782 IntegrationModel().delete(integration)
1781 1783
1782 1784 return integration
1783 1785
1784 1786
1785 1787 @pytest.fixture
1786 1788 def root_repos_integration_stub(request, StubIntegrationType,
1787 1789 stub_integration_settings):
1788 1790 integration = IntegrationModel().create(
1789 1791 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1790 1792 name='test global integration',
1791 1793 repo=None, repo_group=None, child_repos_only=True)
1792 1794
1793 1795 @request.addfinalizer
1794 1796 def cleanup():
1795 1797 IntegrationModel().delete(integration)
1796 1798
1797 1799 return integration
@@ -1,304 +1,374 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 import tempfile
26 27 import urllib2
27 28 from urlparse import urlparse, parse_qsl
28 29 from urllib import unquote_plus
29 30
31 from webtest.app import (
32 Request, TestResponse, TestApp, print_stderr, string_types)
33
30 34 import pytest
31 35 import rc_testdata
32 36
33 from rhodecode.model.db import User
37 from rhodecode.model.db import User, Repository
34 38 from rhodecode.model.meta import Session
35 39 from rhodecode.model.scm import ScmModel
36 40 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
37 41
38 42
39 43 log = logging.getLogger(__name__)
40 44
41 45
46 class CustomTestResponse(TestResponse):
47 def _save_output(self, out):
48 f = tempfile.NamedTemporaryFile(
49 delete=False, prefix='rc-test-', suffix='.html')
50 f.write(out)
51 return f.name
52
53 def mustcontain(self, *strings, **kw):
54 """
55 Assert that the response contains all of the strings passed
56 in as arguments.
57
58 Equivalent to::
59
60 assert string in res
61 """
62 if 'no' in kw:
63 no = kw['no']
64 del kw['no']
65 if isinstance(no, string_types):
66 no = [no]
67 else:
68 no = []
69 if kw:
70 raise TypeError(
71 "The only keyword argument allowed is 'no'")
72
73 f = self._save_output(str(self))
74
75 for s in strings:
76 if not s in self:
77 print_stderr("Actual response (no %r):" % s)
78 print_stderr(str(self))
79 raise IndexError(
80 "Body does not contain string %r, output saved as %s" % (
81 s, f))
82
83 for no_s in no:
84 if no_s in self:
85 print_stderr("Actual response (has %r)" % no_s)
86 print_stderr(str(self))
87 raise IndexError(
88 "Body contains bad string %r, output saved as %s" % (
89 no_s, f))
90
91 def assert_response(self):
92 return AssertResponse(self)
93
94
95 class TestRequest(Request):
96
97 # for py.test
98 disabled = True
99 ResponseClass = CustomTestResponse
100
101
102 class CustomTestApp(TestApp):
103 """
104 Custom app to make mustcontain more usefull
105 """
106 RequestClass = TestRequest
107
108
109
110
111
42 112 def set_anonymous_access(enabled):
43 113 """(Dis)allows anonymous access depending on parameter `enabled`"""
44 114 user = User.get_default_user()
45 115 user.active = enabled
46 116 Session().add(user)
47 117 Session().commit()
48 118 log.info('anonymous access is now: %s', enabled)
49 119 assert enabled == User.get_default_user().active, (
50 120 'Cannot set anonymous access')
51 121
52 122
53 123 def check_xfail_backends(node, backend_alias):
54 124 # Using "xfail_backends" here intentionally, since this marks work
55 125 # which is "to be done" soon.
56 126 skip_marker = node.get_marker('xfail_backends')
57 127 if skip_marker and backend_alias in skip_marker.args:
58 128 msg = "Support for backend %s to be developed." % (backend_alias, )
59 129 msg = skip_marker.kwargs.get('reason', msg)
60 130 pytest.xfail(msg)
61 131
62 132
63 133 def check_skip_backends(node, backend_alias):
64 134 # Using "skip_backends" here intentionally, since this marks work which is
65 135 # not supported.
66 136 skip_marker = node.get_marker('skip_backends')
67 137 if skip_marker and backend_alias in skip_marker.args:
68 138 msg = "Feature not supported for backend %s." % (backend_alias, )
69 139 msg = skip_marker.kwargs.get('reason', msg)
70 140 pytest.skip(msg)
71 141
72 142
73 143 def extract_git_repo_from_dump(dump_name, repo_name):
74 144 """Create git repo `repo_name` from dump `dump_name`."""
75 145 repos_path = ScmModel().repos_path
76 146 target_path = os.path.join(repos_path, repo_name)
77 147 rc_testdata.extract_git_dump(dump_name, target_path)
78 148 return target_path
79 149
80 150
81 151 def extract_hg_repo_from_dump(dump_name, repo_name):
82 152 """Create hg repo `repo_name` from dump `dump_name`."""
83 153 repos_path = ScmModel().repos_path
84 154 target_path = os.path.join(repos_path, repo_name)
85 155 rc_testdata.extract_hg_dump(dump_name, target_path)
86 156 return target_path
87 157
88 158
89 159 def extract_svn_repo_from_dump(dump_name, repo_name):
90 160 """Create a svn repo `repo_name` from dump `dump_name`."""
91 161 repos_path = ScmModel().repos_path
92 162 target_path = os.path.join(repos_path, repo_name)
93 163 SubversionRepository(target_path, create=True)
94 164 _load_svn_dump_into_repo(dump_name, target_path)
95 165 return target_path
96 166
97 167
98 168 def assert_message_in_log(log_records, message, levelno, module):
99 169 messages = [
100 170 r.message for r in log_records
101 171 if r.module == module and r.levelno == levelno
102 172 ]
103 173 assert message in messages
104 174
105 175
106 176 def _load_svn_dump_into_repo(dump_name, repo_path):
107 177 """
108 178 Utility to populate a svn repository with a named dump
109 179
110 180 Currently the dumps are in rc_testdata. They might later on be
111 181 integrated with the main repository once they stabilize more.
112 182 """
113 183 dump = rc_testdata.load_svn_dump(dump_name)
114 184 load_dump = subprocess32.Popen(
115 185 ['svnadmin', 'load', repo_path],
116 186 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
117 187 stderr=subprocess32.PIPE)
118 188 out, err = load_dump.communicate(dump)
119 189 if load_dump.returncode != 0:
120 190 log.error("Output of load_dump command: %s", out)
121 191 log.error("Error output of load_dump command: %s", err)
122 192 raise Exception(
123 193 'Failed to load dump "%s" into repository at path "%s".'
124 194 % (dump_name, repo_path))
125 195
126 196
127 197 class AssertResponse(object):
128 198 """
129 199 Utility that helps to assert things about a given HTML response.
130 200 """
131 201
132 202 def __init__(self, response):
133 203 self.response = response
134 204
135 205 def get_imports(self):
136 206 from lxml.html import fromstring, tostring
137 207 from lxml.cssselect import CSSSelector
138 208 return fromstring, tostring, CSSSelector
139 209
140 210 def one_element_exists(self, css_selector):
141 211 self.get_element(css_selector)
142 212
143 213 def no_element_exists(self, css_selector):
144 214 assert not self._get_elements(css_selector)
145 215
146 216 def element_equals_to(self, css_selector, expected_content):
147 217 element = self.get_element(css_selector)
148 218 element_text = self._element_to_string(element)
149 219 assert expected_content in element_text
150 220
151 221 def element_contains(self, css_selector, expected_content):
152 222 element = self.get_element(css_selector)
153 223 assert expected_content in element.text_content()
154 224
155 225 def element_value_contains(self, css_selector, expected_content):
156 226 element = self.get_element(css_selector)
157 227 assert expected_content in element.value
158 228
159 229 def contains_one_link(self, link_text, href):
160 230 fromstring, tostring, CSSSelector = self.get_imports()
161 231 doc = fromstring(self.response.body)
162 232 sel = CSSSelector('a[href]')
163 233 elements = [
164 234 e for e in sel(doc) if e.text_content().strip() == link_text]
165 235 assert len(elements) == 1, "Did not find link or found multiple links"
166 236 self._ensure_url_equal(elements[0].attrib.get('href'), href)
167 237
168 238 def contains_one_anchor(self, anchor_id):
169 239 fromstring, tostring, CSSSelector = self.get_imports()
170 240 doc = fromstring(self.response.body)
171 241 sel = CSSSelector('#' + anchor_id)
172 242 elements = sel(doc)
173 243 assert len(elements) == 1
174 244
175 245 def _ensure_url_equal(self, found, expected):
176 246 assert _Url(found) == _Url(expected)
177 247
178 248 def get_element(self, css_selector):
179 249 elements = self._get_elements(css_selector)
180 250 assert len(elements) == 1
181 251 return elements[0]
182 252
183 253 def get_elements(self, css_selector):
184 254 return self._get_elements(css_selector)
185 255
186 256 def _get_elements(self, css_selector):
187 257 fromstring, tostring, CSSSelector = self.get_imports()
188 258 doc = fromstring(self.response.body)
189 259 sel = CSSSelector(css_selector)
190 260 elements = sel(doc)
191 261 return elements
192 262
193 263 def _element_to_string(self, element):
194 264 fromstring, tostring, CSSSelector = self.get_imports()
195 265 return tostring(element)
196 266
197 267
198 268 class _Url(object):
199 269 """
200 270 A url object that can be compared with other url orbjects
201 271 without regard to the vagaries of encoding, escaping, and ordering
202 272 of parameters in query strings.
203 273
204 274 Inspired by
205 275 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
206 276 """
207 277
208 278 def __init__(self, url):
209 279 parts = urlparse(url)
210 280 _query = frozenset(parse_qsl(parts.query))
211 281 _path = unquote_plus(parts.path)
212 282 parts = parts._replace(query=_query, path=_path)
213 283 self.parts = parts
214 284
215 285 def __eq__(self, other):
216 286 return self.parts == other.parts
217 287
218 288 def __hash__(self):
219 289 return hash(self.parts)
220 290
221 291
222 292 def run_test_concurrently(times, raise_catched_exc=True):
223 293 """
224 294 Add this decorator to small pieces of code that you want to test
225 295 concurrently
226 296
227 297 ex:
228 298
229 299 @test_concurrently(25)
230 300 def my_test_function():
231 301 ...
232 302 """
233 303 def test_concurrently_decorator(test_func):
234 304 def wrapper(*args, **kwargs):
235 305 exceptions = []
236 306
237 307 def call_test_func():
238 308 try:
239 309 test_func(*args, **kwargs)
240 310 except Exception as e:
241 311 exceptions.append(e)
242 312 if raise_catched_exc:
243 313 raise
244 314 threads = []
245 315 for i in range(times):
246 316 threads.append(threading.Thread(target=call_test_func))
247 317 for t in threads:
248 318 t.start()
249 319 for t in threads:
250 320 t.join()
251 321 if exceptions:
252 322 raise Exception(
253 323 'test_concurrently intercepted %s exceptions: %s' % (
254 324 len(exceptions), exceptions))
255 325 return wrapper
256 326 return test_concurrently_decorator
257 327
258 328
259 329 def wait_for_url(url, timeout=10):
260 330 """
261 331 Wait until URL becomes reachable.
262 332
263 333 It polls the URL until the timeout is reached or it became reachable.
264 334 If will call to `py.test.fail` in case the URL is not reachable.
265 335 """
266 336 timeout = time.time() + timeout
267 337 last = 0
268 338 wait = 0.1
269 339
270 340 while timeout > last:
271 341 last = time.time()
272 342 if is_url_reachable(url):
273 343 break
274 344 elif (last + wait) > time.time():
275 345 # Go to sleep because not enough time has passed since last check.
276 346 time.sleep(wait)
277 347 else:
278 348 pytest.fail("Timeout while waiting for URL {}".format(url))
279 349
280 350
281 351 def is_url_reachable(url):
282 352 try:
283 353 urllib2.urlopen(url)
284 354 except urllib2.URLError:
285 355 return False
286 356 return True
287 357
288 358
289 359 def get_session_from_response(response):
290 360 """
291 361 This returns the session from a response object. Pylons has some magic
292 362 to make the session available as `response.session`. But pyramid
293 363 doesn't expose it.
294 364 """
295 365 # TODO: Try to look up the session key also.
296 366 return response.request.environ['beaker.session']
297 367
298 368
299 369 def repo_on_filesystem(repo_name):
300 370 from rhodecode.lib import vcs
301 371 from rhodecode.tests import TESTS_TMP_PATH
302 372 repo = vcs.get_vcs_instance(
303 373 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
304 374 return repo is not None
General Comments 0
You need to be logged in to leave comments. Login now