##// END OF EJS Templates
pytest: Use hardcoded login URLs in tests....
johbo -
r40:bc180d4b default
parent child Browse files
Show More
@@ -1,254 +1,254 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import datetime
25 25 import hashlib
26 26 import tempfile
27 27 from os.path import join as jn
28 28
29 29 from tempfile import _RandomNameSequence
30 30
31 31 from paste.deploy import loadapp
32 32 from paste.script.appinstall import SetupCommand
33 33
34 34 import pylons
35 35 import pylons.test
36 36 from pylons import config, url
37 37 from pylons.i18n.translation import _get_translator
38 38 from pylons.util import ContextObj
39 39
40 40 from routes.util import URLGenerator
41 41 from webtest import TestApp
42 42 from nose.plugins.skip import SkipTest
43 43 import pytest
44 44
45 45 from rhodecode import is_windows
46 from rhodecode.config.routing import ADMIN_PREFIX
46 47 from rhodecode.model.meta import Session
47 48 from rhodecode.model.db import User
48 49 from rhodecode.lib import auth
49 50 from rhodecode.lib.helpers import flash, link_to
50 51 from rhodecode.lib.utils2 import safe_unicode, safe_str
51 52
52 53 # TODO: johbo: Solve time zone related issues and remove this tweak
53 54 os.environ['TZ'] = 'UTC'
54 55 if not is_windows:
55 56 time.tzset()
56 57
57 58 log = logging.getLogger(__name__)
58 59
59 60 __all__ = [
60 61 'get_new_dir', 'TestController', 'SkipTest',
61 62 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
62 63 'assert_session_flash', 'login_user',
63 64 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
64 65 'NEW_HG_REPO', 'NEW_GIT_REPO',
65 66 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
66 67 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
67 68 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
68 69 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
69 70 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
70 71 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
71 72 ]
72 73
73 74 # Invoke websetup with the current config file
74 75 # SetupCommand('setup-app').run([config_file])
75 76
76 77 # SOME GLOBALS FOR TESTS
77 78 TEST_DIR = tempfile.gettempdir()
78 79
79 80 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
80 81 TEST_USER_ADMIN_LOGIN = 'test_admin'
81 82 TEST_USER_ADMIN_PASS = 'test12'
82 83 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
83 84
84 85 TEST_USER_REGULAR_LOGIN = 'test_regular'
85 86 TEST_USER_REGULAR_PASS = 'test12'
86 87 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
87 88
88 89 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
89 90 TEST_USER_REGULAR2_PASS = 'test12'
90 91 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
91 92
92 93 HG_REPO = 'vcs_test_hg'
93 94 GIT_REPO = 'vcs_test_git'
94 95 SVN_REPO = 'vcs_test_svn'
95 96
96 97 NEW_HG_REPO = 'vcs_test_hg_new'
97 98 NEW_GIT_REPO = 'vcs_test_git_new'
98 99
99 100 HG_FORK = 'vcs_test_hg_fork'
100 101 GIT_FORK = 'vcs_test_git_fork'
101 102
102 103 ## VCS
103 104 SCM_TESTS = ['hg', 'git']
104 105 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
105 106
106 107 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
107 108 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
108 109 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
109 110
110 111 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
111 112 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
112 113 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
113 114
114 115 TEST_REPO_PREFIX = 'vcs-test'
115 116
116 117
117 118 # skip ldap tests if LDAP lib is not installed
118 119 ldap_lib_installed = False
119 120 try:
120 121 import ldap
121 122 ldap_lib_installed = True
122 123 except ImportError:
123 124 # means that python-ldap is not installed
124 125 pass
125 126
126 127
127 128 def clear_all_caches():
128 129 from beaker.cache import cache_managers
129 130 for _cache in cache_managers.values():
130 131 _cache.clear()
131 132
132 133
133 134 def get_new_dir(title):
134 135 """
135 136 Returns always new directory path.
136 137 """
137 138 from rhodecode.tests.vcs.utils import get_normalized_path
138 139 name_parts = [TEST_REPO_PREFIX]
139 140 if title:
140 141 name_parts.append(title)
141 142 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
142 143 name_parts.append(hex_str)
143 144 name = '-'.join(name_parts)
144 145 path = os.path.join(TEST_DIR, name)
145 146 return get_normalized_path(path)
146 147
147 148
148 149 @pytest.mark.usefixtures('app', 'index_location')
149 150 class TestController(object):
150 151
151 152 maxDiff = None
152 153
153 154 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
154 155 password=TEST_USER_ADMIN_PASS):
155 156 self._logged_username = username
156 157 self._session = login_user_session(self.app, username, password)
157 158 self.csrf_token = auth.get_csrf_token(self._session)
158 159
159 160 return self._session['rhodecode_user']
160 161
161 162 def logout_user(self):
162 163 logout_user_session(self.app, auth.get_csrf_token(self._session))
163 164 self.csrf_token = None
164 165 self._logged_username = None
165 166 self._session = None
166 167
167 168 def _get_logged_user(self):
168 169 return User.get_by_username(self._logged_username)
169 170
170 171 # TODO: remove, use plain assert in tests
171 172 def assertEqual(self, a, b, msg=None):
172 173 if msg:
173 174 assert a == b, msg
174 175 else:
175 176 assert a == b
176 177
177 178
178 179 def login_user_session(
179 180 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
180 response = app.post(url(controller='login', action='index'),
181 {'username': username,
182 'password': password})
183
181 from rhodecode.tests.functional.test_login import login_url
182 response = app.post(
183 login_url,
184 {'username': username, 'password': password})
184 185 if 'invalid user name' in response.body:
185 186 pytest.fail('could not login using %s %s' % (username, password))
186 187
187 188 assert response.status == '302 Found'
188 189 ses = response.session['rhodecode_user']
189 190 assert ses.get('username') == username
190 191 response = response.follow()
191 192 assert ses.get('is_authenticated')
192 193
193 194 return response.session
194 195
195 196
196 197 def logout_user_session(app, csrf_token):
197 app.post(
198 url(controller='login', action='logout'),
199 {'csrf_token': csrf_token}, status=302)
198 from rhodecode.tests.functional.test_login import logut_url
199 app.post(logut_url, {'csrf_token': csrf_token}, status=302)
200 200
201 201
202 202 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
203 203 password=TEST_USER_ADMIN_PASS):
204 204 return login_user_session(app, username, password)['rhodecode_user']
205 205
206 206
207 207 def assert_session_flash(response=None, msg=None, category=None):
208 208 """
209 209 Assert on a flash message in the current session.
210 210
211 211 :param msg: Required. The expected message. Will be evaluated if a
212 212 :class:`LazyString` is passed in.
213 213 :param response: Optional. For functional testing, pass in the response
214 214 object. Otherwise don't pass in any value.
215 215 :param category: Optional. If passed, the message category will be
216 216 checked as well.
217 217 """
218 218 if msg is None:
219 219 raise ValueError("Parameter msg is required.")
220 220
221 221 messages = flash.pop_messages()
222 222 message = messages[0]
223 223
224 224 msg = _eval_if_lazy(msg)
225 225 message_text = _eval_if_lazy(message.message)
226 226
227 227 if msg not in message_text:
228 228 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
229 229 msg, message_text)
230 230 pytest.fail(safe_str(msg))
231 231 if category:
232 232 assert category == message.category
233 233
234 234
235 235 def _eval_if_lazy(value):
236 236 return value.eval() if hasattr(value, 'eval') else value
237 237
238 238
239 239 def assert_not_in_session_flash(response, msg, category=None):
240 240 assert 'flash' in response.session, 'Response session has no flash key'
241 241 message_category, message_text = response.session['flash'][0]
242 242 if msg in message_text:
243 243 msg = u'msg `%s` found in session flash: got `%s` instead' % (
244 244 msg, message_text)
245 245 pytest.fail(safe_str(msg))
246 246 if category:
247 247 assert category == message_category
248 248
249 249
250 250 def assert_session_flash_is_empty(response):
251 251 if 'flash' in response.session:
252 252 msg = 'flash messages are present in session:%s' % \
253 253 response.session['flash'][0]
254 254 pytest.fail(safe_str(msg))
@@ -1,621 +1,622 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 import rhodecode
25 from rhodecode.config.routing import ADMIN_PREFIX
25 26 from rhodecode.lib.utils2 import md5
26 27 from rhodecode.model.db import RhodeCodeUi
27 28 from rhodecode.model.meta import Session
28 29 from rhodecode.model.settings import SettingsModel, IssueTrackerSettingsModel
29 30 from rhodecode.tests import url, assert_session_flash
30 31 from rhodecode.tests.utils import AssertResponse
31 32
32 33
33 34 UPDATE_DATA_QUALNAME = (
34 35 'rhodecode.controllers.admin.settings.SettingsController.get_update_data')
35 36
36 37
37 38 @pytest.mark.usefixtures('autologin_user', 'app')
38 39 class TestAdminSettingsController:
39 40
40 41 @pytest.mark.parametrize('urlname', [
41 42 'admin_settings_vcs',
42 43 'admin_settings_mapping',
43 44 'admin_settings_global',
44 45 'admin_settings_visual',
45 46 'admin_settings_email',
46 47 'admin_settings_hooks',
47 48 'admin_settings_search',
48 49 'admin_settings_system',
49 50 ])
50 51 def test_simple_get(self, urlname, app):
51 52 app.get(url(urlname))
52 53
53 54 def test_create_custom_hook(self, csrf_token):
54 55 response = self.app.post(
55 56 url('admin_settings_hooks'),
56 57 params={
57 58 'new_hook_ui_key': 'test_hooks_1',
58 59 'new_hook_ui_value': 'cd /tmp',
59 60 'csrf_token': csrf_token})
60 61
61 62 response = response.follow()
62 63 response.mustcontain('test_hooks_1')
63 64 response.mustcontain('cd /tmp')
64 65
65 66 def test_create_custom_hook_delete(self, csrf_token):
66 67 response = self.app.post(
67 68 url('admin_settings_hooks'),
68 69 params={
69 70 'new_hook_ui_key': 'test_hooks_2',
70 71 'new_hook_ui_value': 'cd /tmp2',
71 72 'csrf_token': csrf_token})
72 73
73 74 response = response.follow()
74 75 response.mustcontain('test_hooks_2')
75 76 response.mustcontain('cd /tmp2')
76 77
77 78 hook_id = SettingsModel().get_ui_by_key('test_hooks_2').ui_id
78 79
79 80 # delete
80 81 self.app.post(
81 82 url('admin_settings_hooks'),
82 83 params={'hook_id': hook_id, 'csrf_token': csrf_token})
83 84 response = self.app.get(url('admin_settings_hooks'))
84 85 response.mustcontain(no=['test_hooks_2'])
85 86 response.mustcontain(no=['cd /tmp2'])
86 87
87 88 def test_system_update_new_version(self):
88 89 update_data = {
89 90 'versions': [
90 91 {
91 92 'version': '100.3.1415926535',
92 93 'general': 'The latest version we are ever going to ship'
93 94 },
94 95 {
95 96 'version': '0.0.0',
96 97 'general': 'The first version we ever shipped'
97 98 }
98 99 ]
99 100 }
100 101 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
101 102 response = self.app.get(url('admin_settings_system_update'))
102 103 response.mustcontain('A <b>new version</b> is available')
103 104
104 105 def test_system_update_nothing_new(self):
105 106 update_data = {
106 107 'versions': [
107 108 {
108 109 'version': '0.0.0',
109 110 'general': 'The first version we ever shipped'
110 111 }
111 112 ]
112 113 }
113 114 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
114 115 response = self.app.get(url('admin_settings_system_update'))
115 116 response.mustcontain(
116 117 'You already have the <b>latest</b> stable version.')
117 118
118 119 def test_system_update_bad_response(self):
119 120 with mock.patch(UPDATE_DATA_QUALNAME, side_effect=ValueError('foo')):
120 121 response = self.app.get(url('admin_settings_system_update'))
121 122 response.mustcontain(
122 123 'Bad data sent from update server')
123 124
124 125
125 126 @pytest.mark.usefixtures('autologin_user', 'app')
126 127 class TestAdminSettingsGlobal:
127 128
128 129 def test_pre_post_code_code_active(self, csrf_token):
129 130 pre_code = 'rc-pre-code-187652122'
130 131 post_code = 'rc-postcode-98165231'
131 132
132 133 response = self.post_and_verify_settings({
133 134 'rhodecode_pre_code': pre_code,
134 135 'rhodecode_post_code': post_code,
135 136 'csrf_token': csrf_token,
136 137 })
137 138
138 139 response = response.follow()
139 140 response.mustcontain(pre_code, post_code)
140 141
141 142 def test_pre_post_code_code_inactive(self, csrf_token):
142 143 pre_code = 'rc-pre-code-187652122'
143 144 post_code = 'rc-postcode-98165231'
144 145 response = self.post_and_verify_settings({
145 146 'rhodecode_pre_code': '',
146 147 'rhodecode_post_code': '',
147 148 'csrf_token': csrf_token,
148 149 })
149 150
150 151 response = response.follow()
151 152 response.mustcontain(no=[pre_code, post_code])
152 153
153 154 def test_captcha_activate(self, csrf_token):
154 155 self.post_and_verify_settings({
155 156 'rhodecode_captcha_private_key': '1234567890',
156 157 'rhodecode_captcha_public_key': '1234567890',
157 158 'csrf_token': csrf_token,
158 159 })
159 160
160 response = self.app.get(url('register'))
161 response = self.app.get(ADMIN_PREFIX + '/register')
161 162 response.mustcontain('captcha')
162 163
163 164 def test_captcha_deactivate(self, csrf_token):
164 165 self.post_and_verify_settings({
165 166 'rhodecode_captcha_private_key': '',
166 167 'rhodecode_captcha_public_key': '1234567890',
167 168 'csrf_token': csrf_token,
168 169 })
169 170
170 response = self.app.get(url('register'))
171 response = self.app.get(ADMIN_PREFIX + '/register')
171 172 response.mustcontain(no=['captcha'])
172 173
173 174 def test_title_change(self, csrf_token):
174 175 old_title = 'RhodeCode'
175 176 new_title = old_title + '_changed'
176 177
177 178 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
178 179 response = self.post_and_verify_settings({
179 180 'rhodecode_title': new_title,
180 181 'csrf_token': csrf_token,
181 182 })
182 183
183 184 response = response.follow()
184 185 response.mustcontain(
185 186 """<div class="branding">- %s</div>""" % new_title)
186 187
187 188 def post_and_verify_settings(self, settings):
188 189 old_title = 'RhodeCode'
189 190 old_realm = 'RhodeCode authentication'
190 191 params = {
191 192 'rhodecode_title': old_title,
192 193 'rhodecode_realm': old_realm,
193 194 'rhodecode_pre_code': '',
194 195 'rhodecode_post_code': '',
195 196 'rhodecode_captcha_private_key': '',
196 197 'rhodecode_captcha_public_key': '',
197 198 }
198 199 params.update(settings)
199 200 response = self.app.post(url('admin_settings_global'), params=params)
200 201
201 202 assert_session_flash(response, 'Updated application settings')
202 203 app_settings = SettingsModel().get_all_settings()
203 204 del settings['csrf_token']
204 205 for key, value in settings.iteritems():
205 206 assert app_settings[key] == value.decode('utf-8')
206 207
207 208 return response
208 209
209 210
210 211 @pytest.mark.usefixtures('autologin_user', 'app')
211 212 class TestAdminSettingsVcs:
212 213
213 214 def test_contains_svn_default_patterns(self, app):
214 215 response = app.get(url('admin_settings_vcs'))
215 216 expected_patterns = [
216 217 '/trunk',
217 218 '/branches/*',
218 219 '/tags/*',
219 220 ]
220 221 for pattern in expected_patterns:
221 222 response.mustcontain(pattern)
222 223
223 224 def test_add_new_svn_branch_and_tag_pattern(
224 225 self, app, backend_svn, form_defaults, disable_sql_cache,
225 226 csrf_token):
226 227 form_defaults.update({
227 228 'new_svn_branch': '/exp/branches/*',
228 229 'new_svn_tag': '/important_tags/*',
229 230 'csrf_token': csrf_token,
230 231 })
231 232
232 233 response = app.post(
233 234 url('admin_settings_vcs'), params=form_defaults, status=302)
234 235 response = response.follow()
235 236
236 237 # Expect to find the new values on the page
237 238 response.mustcontain('/exp/branches/*')
238 239 response.mustcontain('/important_tags/*')
239 240
240 241 # Expect that those patterns are used to match branches and tags now
241 242 repo = backend_svn['svn-simple-layout'].scm_instance()
242 243 assert 'exp/branches/exp-sphinx-docs' in repo.branches
243 244 assert 'important_tags/v0.5' in repo.tags
244 245
245 246 def test_add_same_svn_value_twice_shows_an_error_message(
246 247 self, app, form_defaults, csrf_token, settings_util):
247 248 settings_util.create_rhodecode_ui('vcs_svn_branch', '/test')
248 249 settings_util.create_rhodecode_ui('vcs_svn_tag', '/test')
249 250
250 251 response = app.post(
251 252 url('admin_settings_vcs'),
252 253 params={
253 254 'paths_root_path': form_defaults['paths_root_path'],
254 255 'new_svn_branch': '/test',
255 256 'new_svn_tag': '/test',
256 257 'csrf_token': csrf_token,
257 258 },
258 259 status=200)
259 260
260 261 response.mustcontain("Pattern already exists")
261 262 response.mustcontain("Some form inputs contain invalid data.")
262 263
263 264 @pytest.mark.parametrize('section', [
264 265 'vcs_svn_branch',
265 266 'vcs_svn_tag',
266 267 ])
267 268 def test_delete_svn_patterns(
268 269 self, section, app, csrf_token, settings_util):
269 270 setting = settings_util.create_rhodecode_ui(
270 271 section, '/test_delete', cleanup=False)
271 272
272 273 app.post(
273 274 url('admin_settings_vcs'),
274 275 params={
275 276 '_method': 'delete',
276 277 'delete_svn_pattern': setting.ui_id,
277 278 'csrf_token': csrf_token},
278 279 headers={'X-REQUESTED-WITH': 'XMLHttpRequest'})
279 280
280 281 @pytest.mark.parametrize('section', [
281 282 'vcs_svn_branch',
282 283 'vcs_svn_tag',
283 284 ])
284 285 def test_delete_svn_patterns_raises_400_when_no_xhr(
285 286 self, section, app, csrf_token, settings_util):
286 287 setting = settings_util.create_rhodecode_ui(section, '/test_delete')
287 288
288 289 app.post(
289 290 url('admin_settings_vcs'),
290 291 params={
291 292 '_method': 'delete',
292 293 'delete_svn_pattern': setting.ui_id,
293 294 'csrf_token': csrf_token},
294 295 status=400)
295 296
296 297 def test_extensions_hgsubversion(self, app, form_defaults, csrf_token):
297 298 form_defaults.update({
298 299 'csrf_token': csrf_token,
299 300 'extensions_hgsubversion': 'True',
300 301 })
301 302 response = app.post(
302 303 url('admin_settings_vcs'),
303 304 params=form_defaults,
304 305 status=302)
305 306
306 307 response = response.follow()
307 308 extensions_input = (
308 309 '<input id="extensions_hgsubversion" '
309 310 'name="extensions_hgsubversion" type="checkbox" '
310 311 'value="True" checked="checked" />')
311 312 response.mustcontain(extensions_input)
312 313
313 314 def test_has_a_section_for_pull_request_settings(self, app):
314 315 response = app.get(url('admin_settings_vcs'))
315 316 response.mustcontain('Pull Request Settings')
316 317
317 318 def test_has_an_input_for_invalidation_of_inline_comments(
318 319 self, app):
319 320 response = app.get(url('admin_settings_vcs'))
320 321 assert_response = AssertResponse(response)
321 322 assert_response.one_element_exists(
322 323 '[name=rhodecode_use_outdated_comments]')
323 324
324 325 @pytest.mark.parametrize('new_value', [True, False])
325 326 def test_allows_to_change_invalidation_of_inline_comments(
326 327 self, app, form_defaults, csrf_token, new_value):
327 328 setting_key = 'use_outdated_comments'
328 329 setting = SettingsModel().create_or_update_setting(
329 330 setting_key, not new_value, 'bool')
330 331 Session().add(setting)
331 332 Session().commit()
332 333
333 334 form_defaults.update({
334 335 'csrf_token': csrf_token,
335 336 'rhodecode_use_outdated_comments': str(new_value),
336 337 })
337 338 response = app.post(
338 339 url('admin_settings_vcs'),
339 340 params=form_defaults,
340 341 status=302)
341 342 response = response.follow()
342 343 setting = SettingsModel().get_setting_by_name(setting_key)
343 344 assert setting.app_settings_value is new_value
344 345
345 346 @pytest.fixture
346 347 def disable_sql_cache(self, request):
347 348 patcher = mock.patch(
348 349 'rhodecode.lib.caching_query.FromCache.process_query')
349 350 request.addfinalizer(patcher.stop)
350 351 patcher.start()
351 352
352 353 @pytest.fixture
353 354 def form_defaults(self):
354 355 from rhodecode.controllers.admin.settings import SettingsController
355 356 controller = SettingsController()
356 357 return controller._form_defaults()
357 358
358 359 # TODO: johbo: What we really want is to checkpoint before a test run and
359 360 # reset the session afterwards.
360 361 @pytest.fixture(scope='class', autouse=True)
361 362 def cleanup_settings(self, request, pylonsapp):
362 363 ui_id = RhodeCodeUi.ui_id
363 364 original_ids = list(
364 365 r.ui_id for r in RhodeCodeUi.query().values(ui_id))
365 366
366 367 @request.addfinalizer
367 368 def cleanup():
368 369 RhodeCodeUi.query().filter(
369 370 ui_id.notin_(original_ids)).delete(False)
370 371
371 372
372 373 @pytest.mark.usefixtures('autologin_user', 'app')
373 374 class TestLabsSettings(object):
374 375 def test_get_settings_page_disabled(self):
375 376 with mock.patch.dict(rhodecode.CONFIG,
376 377 {'labs_settings_active': 'false'}):
377 378 response = self.app.get(url('admin_settings_labs'), status=302)
378 379
379 380 assert response.location.endswith(url('admin_settings'))
380 381
381 382 def test_get_settings_page_enabled(self):
382 383 from rhodecode.controllers.admin import settings
383 384 lab_settings = [
384 385 settings.LabSetting(
385 386 key='rhodecode_bool',
386 387 type='bool',
387 388 group='bool group',
388 389 label='bool label',
389 390 help='bool help'
390 391 ),
391 392 settings.LabSetting(
392 393 key='rhodecode_text',
393 394 type='unicode',
394 395 group='text group',
395 396 label='text label',
396 397 help='text help'
397 398 ),
398 399 ]
399 400 with mock.patch.dict(rhodecode.CONFIG,
400 401 {'labs_settings_active': 'true'}):
401 402 with mock.patch.object(settings, '_LAB_SETTINGS', lab_settings):
402 403 response = self.app.get(url('admin_settings_labs'))
403 404
404 405 assert '<label>bool group:</label>' in response
405 406 assert '<label for="rhodecode_bool">bool label</label>' in response
406 407 assert '<p class="help-block">bool help</p>' in response
407 408 assert 'name="rhodecode_bool" type="checkbox"' in response
408 409
409 410 assert '<label>text group:</label>' in response
410 411 assert '<label for="rhodecode_text">text label</label>' in response
411 412 assert '<p class="help-block">text help</p>' in response
412 413 assert 'name="rhodecode_text" size="60" type="text"' in response
413 414
414 415 @pytest.mark.parametrize('setting_name', [
415 416 'proxy_subversion_http_requests',
416 417 'hg_use_rebase_for_merging',
417 418 ])
418 419 def test_update_boolean_settings(self, csrf_token, setting_name):
419 420 self.app.post(
420 421 url('admin_settings_labs'),
421 422 params={
422 423 'rhodecode_{}'.format(setting_name): 'true',
423 424 'csrf_token': csrf_token,
424 425 })
425 426 setting = SettingsModel().get_setting_by_name(setting_name)
426 427 assert setting.app_settings_value
427 428
428 429 self.app.post(
429 430 url('admin_settings_labs'),
430 431 params={
431 432 'rhodecode_{}'.format(setting_name): 'false',
432 433 'csrf_token': csrf_token,
433 434 })
434 435 setting = SettingsModel().get_setting_by_name(setting_name)
435 436 assert not setting.app_settings_value
436 437
437 438 @pytest.mark.parametrize('setting_name', [
438 439 'subversion_http_server_url',
439 440 ])
440 441 def test_update_string_settings(self, csrf_token, setting_name):
441 442 self.app.post(
442 443 url('admin_settings_labs'),
443 444 params={
444 445 'rhodecode_{}'.format(setting_name): 'Test 1',
445 446 'csrf_token': csrf_token,
446 447 })
447 448 setting = SettingsModel().get_setting_by_name(setting_name)
448 449 assert setting.app_settings_value == 'Test 1'
449 450
450 451 self.app.post(
451 452 url('admin_settings_labs'),
452 453 params={
453 454 'rhodecode_{}'.format(setting_name): ' Test 2 ',
454 455 'csrf_token': csrf_token,
455 456 })
456 457 setting = SettingsModel().get_setting_by_name(setting_name)
457 458 assert setting.app_settings_value == 'Test 2'
458 459
459 460
460 461 @pytest.mark.usefixtures('app')
461 462 class TestOpenSourceLicenses(object):
462 463 def test_records_are_displayed(self, autologin_user):
463 464 sample_licenses = {
464 465 "python2.7-pytest-2.7.1": {
465 466 "UNKNOWN": None
466 467 },
467 468 "python2.7-Markdown-2.6.2": {
468 469 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
469 470 }
470 471 }
471 472 read_licenses_patch = mock.patch(
472 473 'rhodecode.controllers.admin.settings.read_opensource_licenses',
473 474 return_value=sample_licenses)
474 475 with read_licenses_patch:
475 476 response = self.app.get(
476 477 url('admin_settings_open_source'), status=200)
477 478
478 479 assert_response = AssertResponse(response)
479 480 assert_response.element_contains(
480 481 '.panel-heading', 'Licenses of Third Party Packages')
481 482 for name in sample_licenses:
482 483 response.mustcontain(name)
483 484 for license in sample_licenses[name]:
484 485 assert_response.element_contains('.panel-body', license)
485 486
486 487 def test_records_can_be_read(self, autologin_user):
487 488 response = self.app.get(url('admin_settings_open_source'), status=200)
488 489 assert_response = AssertResponse(response)
489 490 assert_response.element_contains(
490 491 '.panel-heading', 'Licenses of Third Party Packages')
491 492
492 493 def test_forbidden_when_normal_user(self, autologin_regular_user):
493 494 self.app.get(
494 495 url('admin_settings_open_source'), status=403)
495 496
496 497
497 498 @pytest.mark.usefixtures("app")
498 499 class TestAdminSettingsIssueTracker:
499 500 RC_PREFIX = 'rhodecode_'
500 501 SHORT_PATTERN_KEY = 'issuetracker_pat_'
501 502 PATTERN_KEY = RC_PREFIX + SHORT_PATTERN_KEY
502 503
503 504 def test_issuetracker_index(self, autologin_user):
504 505 response = self.app.get(url('admin_settings_issuetracker'))
505 506 assert response.status_code == 200
506 507
507 508 def test_add_issuetracker_pattern(
508 509 self, request, autologin_user, csrf_token):
509 510 pattern = 'issuetracker_pat'
510 511 another_pattern = pattern+'1'
511 512 post_url = url('admin_settings_issuetracker_save')
512 513 post_data = {
513 514 'new_pattern_pattern_0': pattern,
514 515 'new_pattern_url_0': 'url',
515 516 'new_pattern_prefix_0': 'prefix',
516 517 'new_pattern_description_0': 'description',
517 518 'new_pattern_pattern_1': another_pattern,
518 519 'new_pattern_url_1': 'url1',
519 520 'new_pattern_prefix_1': 'prefix1',
520 521 'new_pattern_description_1': 'description1',
521 522 'csrf_token': csrf_token
522 523 }
523 524 self.app.post(post_url, post_data, status=302)
524 525 settings = SettingsModel().get_all_settings()
525 526 self.uid = md5(pattern)
526 527 assert settings[self.PATTERN_KEY+self.uid] == pattern
527 528 self.another_uid = md5(another_pattern)
528 529 assert settings[self.PATTERN_KEY+self.another_uid] == another_pattern
529 530
530 531 @request.addfinalizer
531 532 def cleanup():
532 533 defaults = SettingsModel().get_all_settings()
533 534
534 535 entries = [name for name in defaults if (
535 536 (self.uid in name) or (self.another_uid) in name)]
536 537 start = len(self.RC_PREFIX)
537 538 for del_key in entries:
538 539 # TODO: anderson: get_by_name needs name without prefix
539 540 entry = SettingsModel().get_setting_by_name(del_key[start:])
540 541 Session().delete(entry)
541 542
542 543 Session().commit()
543 544
544 545 def test_edit_issuetracker_pattern(
545 546 self, autologin_user, backend, csrf_token, request):
546 547 old_pattern = 'issuetracker_pat'
547 548 old_uid = md5(old_pattern)
548 549 pattern = 'issuetracker_pat_new'
549 550 self.new_uid = md5(pattern)
550 551
551 552 SettingsModel().create_or_update_setting(
552 553 self.SHORT_PATTERN_KEY+old_uid, old_pattern, 'unicode')
553 554
554 555 post_url = url('admin_settings_issuetracker_save')
555 556 post_data = {
556 557 'new_pattern_pattern_0': pattern,
557 558 'new_pattern_url_0': 'url',
558 559 'new_pattern_prefix_0': 'prefix',
559 560 'new_pattern_description_0': 'description',
560 561 'uid': old_uid,
561 562 'csrf_token': csrf_token
562 563 }
563 564 self.app.post(post_url, post_data, status=302)
564 565 settings = SettingsModel().get_all_settings()
565 566 assert settings[self.PATTERN_KEY+self.new_uid] == pattern
566 567 assert self.PATTERN_KEY+old_uid not in settings
567 568
568 569 @request.addfinalizer
569 570 def cleanup():
570 571 IssueTrackerSettingsModel().delete_entries(self.new_uid)
571 572
572 573 def test_replace_issuetracker_pattern_description(
573 574 self, autologin_user, csrf_token, request, settings_util):
574 575 prefix = 'issuetracker'
575 576 pattern = 'issuetracker_pat'
576 577 self.uid = md5(pattern)
577 578 pattern_key = '_'.join([prefix, 'pat', self.uid])
578 579 rc_pattern_key = '_'.join(['rhodecode', pattern_key])
579 580 desc_key = '_'.join([prefix, 'desc', self.uid])
580 581 rc_desc_key = '_'.join(['rhodecode', desc_key])
581 582 new_description = 'new_description'
582 583
583 584 settings_util.create_rhodecode_setting(
584 585 pattern_key, pattern, 'unicode', cleanup=False)
585 586 settings_util.create_rhodecode_setting(
586 587 desc_key, 'old description', 'unicode', cleanup=False)
587 588
588 589 post_url = url('admin_settings_issuetracker_save')
589 590 post_data = {
590 591 'new_pattern_pattern_0': pattern,
591 592 'new_pattern_url_0': 'url',
592 593 'new_pattern_prefix_0': 'prefix',
593 594 'new_pattern_description_0': new_description,
594 595 'uid': self.uid,
595 596 'csrf_token': csrf_token
596 597 }
597 598 self.app.post(post_url, post_data, status=302)
598 599 settings = SettingsModel().get_all_settings()
599 600 assert settings[rc_pattern_key] == pattern
600 601 assert settings[rc_desc_key] == new_description
601 602
602 603 @request.addfinalizer
603 604 def cleanup():
604 605 IssueTrackerSettingsModel().delete_entries(self.uid)
605 606
606 607 def test_delete_issuetracker_pattern(
607 608 self, autologin_user, backend, csrf_token, settings_util):
608 609 pattern = 'issuetracker_pat'
609 610 uid = md5(pattern)
610 611 settings_util.create_rhodecode_setting(
611 612 self.SHORT_PATTERN_KEY+uid, pattern, 'unicode', cleanup=False)
612 613
613 614 post_url = url('admin_issuetracker_delete')
614 615 post_data = {
615 616 '_method': 'delete',
616 617 'uid': uid,
617 618 'csrf_token': csrf_token
618 619 }
619 620 self.app.post(post_url, post_data, status=302)
620 621 settings = SettingsModel().get_all_settings()
621 622 assert 'rhodecode_%s%s' % (self.SHORT_PATTERN_KEY, uid) not in settings
@@ -1,511 +1,519 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25
26 from rhodecode.config.routing import ADMIN_PREFIX
26 27 from rhodecode.tests import (
27 28 assert_session_flash, url, HG_REPO, TEST_USER_ADMIN_LOGIN)
28 29 from rhodecode.tests.fixture import Fixture
30 from rhodecode.tests.utils import AssertResponse, get_session_from_response
29 31 from rhodecode.lib.auth import check_password, generate_auth_token
30 32 from rhodecode.lib import helpers as h
31 33 from rhodecode.model.auth_token import AuthTokenModel
32 34 from rhodecode.model import validators
33 35 from rhodecode.model.db import User, Notification
34 36 from rhodecode.model.meta import Session
35 37
36 38 fixture = Fixture()
37 39
40 # Hardcode URLs because we don't have a request object to use
41 # pyramids URL generation methods.
42 login_url = ADMIN_PREFIX + '/login'
43 logut_url = ADMIN_PREFIX + '/logout'
44 register_url = ADMIN_PREFIX + '/register'
45 pwd_reset_url = ADMIN_PREFIX + '/password_reset'
46 pwd_reset_confirm_url = ADMIN_PREFIX + '/password_reset_confirmation'
47
38 48
39 49 @pytest.mark.usefixtures('app')
40 50 class TestLoginController:
41 51 destroy_users = set()
42 52
43 53 @classmethod
44 54 def teardown_class(cls):
45 55 fixture.destroy_users(cls.destroy_users)
46 56
47 57 def teardown_method(self, method):
48 58 for n in Notification.query().all():
49 59 Session().delete(n)
50 60
51 61 Session().commit()
52 62 assert Notification.query().all() == []
53 63
54 64 def test_index(self):
55 response = self.app.get(url(controller='login', action='index'))
65 response = self.app.get(login_url)
56 66 assert response.status == '200 OK'
57 67 # Test response...
58 68
59 69 def test_login_admin_ok(self):
60 response = self.app.post(url(controller='login', action='index'),
70 response = self.app.post(login_url,
61 71 {'username': 'test_admin',
62 72 'password': 'test12'})
63 73 assert response.status == '302 Found'
64 username = response.session['rhodecode_user'].get('username')
74 session = get_session_from_response(response)
75 username = session['rhodecode_user'].get('username')
65 76 assert username == 'test_admin'
66 77 response = response.follow()
67 78 response.mustcontain('/%s' % HG_REPO)
68 79
69 80 def test_login_regular_ok(self):
70 response = self.app.post(url(controller='login', action='index'),
81 response = self.app.post(login_url,
71 82 {'username': 'test_regular',
72 83 'password': 'test12'})
73 84
74 85 assert response.status == '302 Found'
75 username = response.session['rhodecode_user'].get('username')
86 session = get_session_from_response(response)
87 username = session['rhodecode_user'].get('username')
76 88 assert username == 'test_regular'
77 89 response = response.follow()
78 90 response.mustcontain('/%s' % HG_REPO)
79 91
80 92 def test_login_ok_came_from(self):
81 93 test_came_from = '/_admin/users?branch=stable'
82 response = self.app.post(url(controller='login', action='index',
83 came_from=test_came_from),
84 {'username': 'test_admin',
85 'password': 'test12'})
94 _url = '{}?came_from={}'.format(login_url, test_came_from)
95 response = self.app.post(
96 _url, {'username': 'test_admin', 'password': 'test12'})
86 97 assert response.status == '302 Found'
87 98 assert 'branch=stable' in response.location
88 99 response = response.follow()
89 100
90 101 assert response.status == '200 OK'
91 102 response.mustcontain('Users administration')
92 103
93 104 def test_redirect_to_login_with_get_args(self):
94 105 with fixture.anon_access(False):
95 106 kwargs = {'branch': 'stable'}
96 107 response = self.app.get(
97 108 url('summary_home', repo_name=HG_REPO, **kwargs))
98 109 assert response.status == '302 Found'
99 110 response_query = urlparse.parse_qsl(response.location)
100 111 assert 'branch=stable' in response_query[0][1]
101 112
102 113 def test_login_form_with_get_args(self):
103 kwargs = {'branch': 'stable'}
104 response = self.app.get(
105 url(controller='login', action='index',
106 came_from='/_admin/users', **kwargs))
107 assert 'branch=stable' in response.form.action
114 _url = '{}?came_from=/_admin/users,branch=stable'.format(login_url)
115 response = self.app.get(_url)
116 assert 'branch%3Dstable' in response.form.action
108 117
109 118 @pytest.mark.parametrize("url_came_from", [
110 ('data:text/html,<script>window.alert("xss")</script>',),
111 ('mailto:test@rhodecode.org',),
112 ('file:///etc/passwd',),
113 ('ftp://some.ftp.server',),
114 ('http://other.domain',),
115 ('/\r\nX-Forwarded-Host: http://example.org',),
119 'data:text/html,<script>window.alert("xss")</script>',
120 'mailto:test@rhodecode.org',
121 'file:///etc/passwd',
122 'ftp://some.ftp.server',
123 'http://other.domain',
124 '/\r\nX-Forwarded-Host: http://example.org',
116 125 ])
117 126 def test_login_bad_came_froms(self, url_came_from):
118 response = self.app.post(url(controller='login', action='index',
119 came_from=url_came_from),
120 {'username': 'test_admin',
121 'password': 'test12'})
127 _url = '{}?came_from={}'.format(login_url, url_came_from)
128 response = self.app.post(
129 _url,
130 {'username': 'test_admin', 'password': 'test12'})
122 131 assert response.status == '302 Found'
123 assert response.tmpl_context.came_from == '/'
124
125 132 response = response.follow()
126 133 assert response.status == '200 OK'
134 assert response.request.path == '/'
127 135
128 136 def test_login_short_password(self):
129 response = self.app.post(url(controller='login', action='index'),
137 response = self.app.post(login_url,
130 138 {'username': 'test_admin',
131 139 'password': 'as'})
132 140 assert response.status == '200 OK'
133 141
134 142 response.mustcontain('Enter 3 characters or more')
135 143
136 144 def test_login_wrong_non_ascii_password(self, user_regular):
137 145 response = self.app.post(
138 url(controller='login', action='index'),
146 login_url,
139 147 {'username': user_regular.username,
140 148 'password': u'invalid-non-asci\xe4'.encode('utf8')})
141 149
142 150 response.mustcontain('invalid user name')
143 151 response.mustcontain('invalid password')
144 152
145 153 def test_login_with_non_ascii_password(self, user_util):
146 154 password = u'valid-non-ascii\xe4'
147 155 user = user_util.create_user(password=password)
148 156 response = self.app.post(
149 url(controller='login', action='index'),
157 login_url,
150 158 {'username': user.username,
151 159 'password': password.encode('utf-8')})
152 160 assert response.status_code == 302
153 161
154 162 def test_login_wrong_username_password(self):
155 response = self.app.post(url(controller='login', action='index'),
163 response = self.app.post(login_url,
156 164 {'username': 'error',
157 165 'password': 'test12'})
158 166
159 167 response.mustcontain('invalid user name')
160 168 response.mustcontain('invalid password')
161 169
162 170 def test_login_admin_ok_password_migration(self, real_crypto_backend):
163 171 from rhodecode.lib import auth
164 172
165 173 # create new user, with sha256 password
166 174 temp_user = 'test_admin_sha256'
167 175 user = fixture.create_user(temp_user)
168 176 user.password = auth._RhodeCodeCryptoSha256().hash_create(
169 177 b'test123')
170 178 Session().add(user)
171 179 Session().commit()
172 180 self.destroy_users.add(temp_user)
173 response = self.app.post(url(controller='login', action='index'),
181 response = self.app.post(login_url,
174 182 {'username': temp_user,
175 183 'password': 'test123'})
176 184
177 185 assert response.status == '302 Found'
178 username = response.session['rhodecode_user'].get('username')
186 session = get_session_from_response(response)
187 username = session['rhodecode_user'].get('username')
179 188 assert username == temp_user
180 189 response = response.follow()
181 190 response.mustcontain('/%s' % HG_REPO)
182 191
183 192 # new password should be bcrypted, after log-in and transfer
184 193 user = User.get_by_username(temp_user)
185 194 assert user.password.startswith('$')
186 195
187 196 # REGISTRATIONS
188 197 def test_register(self):
189 response = self.app.get(url(controller='login', action='register'))
198 response = self.app.get(register_url)
190 199 response.mustcontain('Create an Account')
191 200
192 201 def test_register_err_same_username(self):
193 202 uname = 'test_admin'
194 203 response = self.app.post(
195 url(controller='login', action='register'),
204 register_url,
196 205 {
197 206 'username': uname,
198 207 'password': 'test12',
199 208 'password_confirmation': 'test12',
200 209 'email': 'goodmail@domain.com',
201 210 'firstname': 'test',
202 211 'lastname': 'test'
203 212 }
204 213 )
205 214
215 assertr = AssertResponse(response)
206 216 msg = validators.ValidUsername()._messages['username_exists']
207 msg = h.html_escape(msg % {'username': uname})
208 response.mustcontain(msg)
217 msg = msg % {'username': uname}
218 assertr.element_contains('#username+.error-message', msg)
209 219
210 220 def test_register_err_same_email(self):
211 221 response = self.app.post(
212 url(controller='login', action='register'),
222 register_url,
213 223 {
214 224 'username': 'test_admin_0',
215 225 'password': 'test12',
216 226 'password_confirmation': 'test12',
217 227 'email': 'test_admin@mail.com',
218 228 'firstname': 'test',
219 229 'lastname': 'test'
220 230 }
221 231 )
222 232
233 assertr = AssertResponse(response)
223 234 msg = validators.UniqSystemEmail()()._messages['email_taken']
224 response.mustcontain(msg)
235 assertr.element_contains('#email+.error-message', msg)
225 236
226 237 def test_register_err_same_email_case_sensitive(self):
227 238 response = self.app.post(
228 url(controller='login', action='register'),
239 register_url,
229 240 {
230 241 'username': 'test_admin_1',
231 242 'password': 'test12',
232 243 'password_confirmation': 'test12',
233 244 'email': 'TesT_Admin@mail.COM',
234 245 'firstname': 'test',
235 246 'lastname': 'test'
236 247 }
237 248 )
249 assertr = AssertResponse(response)
238 250 msg = validators.UniqSystemEmail()()._messages['email_taken']
239 response.mustcontain(msg)
251 assertr.element_contains('#email+.error-message', msg)
240 252
241 253 def test_register_err_wrong_data(self):
242 254 response = self.app.post(
243 url(controller='login', action='register'),
255 register_url,
244 256 {
245 257 'username': 'xs',
246 258 'password': 'test',
247 259 'password_confirmation': 'test',
248 260 'email': 'goodmailm',
249 261 'firstname': 'test',
250 262 'lastname': 'test'
251 263 }
252 264 )
253 265 assert response.status == '200 OK'
254 266 response.mustcontain('An email address must contain a single @')
255 267 response.mustcontain('Enter a value 6 characters long or more')
256 268
257 269 def test_register_err_username(self):
258 270 response = self.app.post(
259 url(controller='login', action='register'),
271 register_url,
260 272 {
261 273 'username': 'error user',
262 274 'password': 'test12',
263 275 'password_confirmation': 'test12',
264 276 'email': 'goodmailm',
265 277 'firstname': 'test',
266 278 'lastname': 'test'
267 279 }
268 280 )
269 281
270 282 response.mustcontain('An email address must contain a single @')
271 283 response.mustcontain(
272 284 'Username may only contain '
273 285 'alphanumeric characters underscores, '
274 286 'periods or dashes and must begin with '
275 287 'alphanumeric character')
276 288
277 289 def test_register_err_case_sensitive(self):
278 290 usr = 'Test_Admin'
279 291 response = self.app.post(
280 url(controller='login', action='register'),
292 register_url,
281 293 {
282 294 'username': usr,
283 295 'password': 'test12',
284 296 'password_confirmation': 'test12',
285 297 'email': 'goodmailm',
286 298 'firstname': 'test',
287 299 'lastname': 'test'
288 300 }
289 301 )
290 302
291 response.mustcontain('An email address must contain a single @')
303 assertr = AssertResponse(response)
292 304 msg = validators.ValidUsername()._messages['username_exists']
293 msg = h.html_escape(msg % {'username': usr})
294 response.mustcontain(msg)
305 msg = msg % {'username': usr}
306 assertr.element_contains('#username+.error-message', msg)
295 307
296 308 def test_register_special_chars(self):
297 309 response = self.app.post(
298 url(controller='login', action='register'),
310 register_url,
299 311 {
300 312 'username': 'xxxaxn',
301 313 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
302 314 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
303 315 'email': 'goodmailm@test.plx',
304 316 'firstname': 'test',
305 317 'lastname': 'test'
306 318 }
307 319 )
308 320
309 321 msg = validators.ValidPassword()._messages['invalid_password']
310 322 response.mustcontain(msg)
311 323
312 324 def test_register_password_mismatch(self):
313 325 response = self.app.post(
314 url(controller='login', action='register'),
326 register_url,
315 327 {
316 328 'username': 'xs',
317 329 'password': '123qwe',
318 330 'password_confirmation': 'qwe123',
319 331 'email': 'goodmailm@test.plxa',
320 332 'firstname': 'test',
321 333 'lastname': 'test'
322 334 }
323 335 )
324 336 msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
325 337 response.mustcontain(msg)
326 338
327 339 def test_register_ok(self):
328 340 username = 'test_regular4'
329 341 password = 'qweqwe'
330 342 email = 'marcin@test.com'
331 343 name = 'testname'
332 344 lastname = 'testlastname'
333 345
334 346 response = self.app.post(
335 url(controller='login', action='register'),
347 register_url,
336 348 {
337 349 'username': username,
338 350 'password': password,
339 351 'password_confirmation': password,
340 352 'email': email,
341 353 'firstname': name,
342 354 'lastname': lastname,
343 355 'admin': True
344 356 }
345 357 ) # This should be overriden
346 358 assert response.status == '302 Found'
347 359 assert_session_flash(
348 360 response, 'You have successfully registered with RhodeCode')
349 361
350 362 ret = Session().query(User).filter(
351 363 User.username == 'test_regular4').one()
352 364 assert ret.username == username
353 365 assert check_password(password, ret.password)
354 366 assert ret.email == email
355 367 assert ret.name == name
356 368 assert ret.lastname == lastname
357 369 assert ret.api_key is not None
358 370 assert not ret.admin
359 371
360 372 def test_forgot_password_wrong_mail(self):
361 373 bad_email = 'marcin@wrongmail.org'
362 374 response = self.app.post(
363 url(controller='login', action='password_reset'),
375 pwd_reset_url,
364 376 {'email': bad_email, }
365 377 )
366 378
367 379 msg = validators.ValidSystemEmail()._messages['non_existing_email']
368 380 msg = h.html_escape(msg % {'email': bad_email})
369 381 response.mustcontain()
370 382
371 383 def test_forgot_password(self):
372 response = self.app.get(url(controller='login',
373 action='password_reset'))
384 response = self.app.get(pwd_reset_url)
374 385 assert response.status == '200 OK'
375 386
376 387 username = 'test_password_reset_1'
377 388 password = 'qweqwe'
378 389 email = 'marcin@python-works.com'
379 390 name = 'passwd'
380 391 lastname = 'reset'
381 392
382 393 new = User()
383 394 new.username = username
384 395 new.password = password
385 396 new.email = email
386 397 new.name = name
387 398 new.lastname = lastname
388 399 new.api_key = generate_auth_token(username)
389 400 Session().add(new)
390 401 Session().commit()
391 402
392 response = self.app.post(url(controller='login',
393 action='password_reset'),
403 response = self.app.post(pwd_reset_url,
394 404 {'email': email, })
395 405
396 406 assert_session_flash(
397 407 response, 'Your password reset link was sent')
398 408
399 409 response = response.follow()
400 410
401 411 # BAD KEY
402 412
403 413 key = "bad"
404 response = self.app.get(url(controller='login',
405 action='password_reset_confirmation',
406 key=key))
414 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key)
415 response = self.app.get(confirm_url)
407 416 assert response.status == '302 Found'
408 assert response.location.endswith(url('reset_password'))
417 assert response.location.endswith(pwd_reset_url)
409 418
410 419 # GOOD KEY
411 420
412 421 key = User.get_by_username(username).api_key
413 response = self.app.get(url(controller='login',
414 action='password_reset_confirmation',
415 key=key))
422 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key)
423 response = self.app.get(confirm_url)
416 424 assert response.status == '302 Found'
417 assert response.location.endswith(url('login_home'))
425 assert response.location.endswith(login_url)
418 426
419 427 assert_session_flash(
420 428 response,
421 429 'Your password reset was successful, '
422 430 'a new password has been sent to your email')
423 431
424 432 response = response.follow()
425 433
426 434 def _get_api_whitelist(self, values=None):
427 435 config = {'api_access_controllers_whitelist': values or []}
428 436 return config
429 437
430 438 @pytest.mark.parametrize("test_name, auth_token", [
431 439 ('none', None),
432 440 ('empty_string', ''),
433 441 ('fake_number', '123456'),
434 442 ('proper_auth_token', None)
435 443 ])
436 444 def test_access_not_whitelisted_page_via_auth_token(self, test_name,
437 445 auth_token):
438 446 whitelist = self._get_api_whitelist([])
439 447 with mock.patch.dict('rhodecode.CONFIG', whitelist):
440 448 assert [] == whitelist['api_access_controllers_whitelist']
441 449 if test_name == 'proper_auth_token':
442 450 # use builtin if api_key is None
443 451 auth_token = User.get_first_admin().api_key
444 452
445 453 with fixture.anon_access(False):
446 454 self.app.get(url(controller='changeset',
447 455 action='changeset_raw',
448 456 repo_name=HG_REPO, revision='tip',
449 457 api_key=auth_token),
450 458 status=302)
451 459
452 460 @pytest.mark.parametrize("test_name, auth_token, code", [
453 461 ('none', None, 302),
454 462 ('empty_string', '', 302),
455 463 ('fake_number', '123456', 302),
456 464 ('proper_auth_token', None, 200)
457 465 ])
458 466 def test_access_whitelisted_page_via_auth_token(self, test_name,
459 467 auth_token, code):
460 468 whitelist = self._get_api_whitelist(
461 469 ['ChangesetController:changeset_raw'])
462 470 with mock.patch.dict('rhodecode.CONFIG', whitelist):
463 471 assert ['ChangesetController:changeset_raw'] == \
464 472 whitelist['api_access_controllers_whitelist']
465 473 if test_name == 'proper_auth_token':
466 474 auth_token = User.get_first_admin().api_key
467 475
468 476 with fixture.anon_access(False):
469 477 self.app.get(url(controller='changeset',
470 478 action='changeset_raw',
471 479 repo_name=HG_REPO, revision='tip',
472 480 api_key=auth_token),
473 481 status=code)
474 482
475 483 def test_access_page_via_extra_auth_token(self):
476 484 whitelist = self._get_api_whitelist(
477 485 ['ChangesetController:changeset_raw'])
478 486 with mock.patch.dict('rhodecode.CONFIG', whitelist):
479 487 assert ['ChangesetController:changeset_raw'] == \
480 488 whitelist['api_access_controllers_whitelist']
481 489
482 490 new_auth_token = AuthTokenModel().create(
483 491 TEST_USER_ADMIN_LOGIN, 'test')
484 492 Session().commit()
485 493 with fixture.anon_access(False):
486 494 self.app.get(url(controller='changeset',
487 495 action='changeset_raw',
488 496 repo_name=HG_REPO, revision='tip',
489 497 api_key=new_auth_token.api_key),
490 498 status=200)
491 499
492 500 def test_access_page_via_expired_auth_token(self):
493 501 whitelist = self._get_api_whitelist(
494 502 ['ChangesetController:changeset_raw'])
495 503 with mock.patch.dict('rhodecode.CONFIG', whitelist):
496 504 assert ['ChangesetController:changeset_raw'] == \
497 505 whitelist['api_access_controllers_whitelist']
498 506
499 507 new_auth_token = AuthTokenModel().create(
500 508 TEST_USER_ADMIN_LOGIN, 'test')
501 509 Session().commit()
502 510 # patch the api key and make it expired
503 511 new_auth_token.expires = 0
504 512 Session().add(new_auth_token)
505 513 Session().commit()
506 514 with fixture.anon_access(False):
507 515 self.app.get(url(controller='changeset',
508 516 action='changeset_raw',
509 517 repo_name=HG_REPO, revision='tip',
510 518 api_key=new_auth_token.api_key),
511 519 status=302)
@@ -1,916 +1,917 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23 from webob.exc import HTTPNotFound
24 24
25 25 import rhodecode
26 26 from rhodecode.lib.vcs.nodes import FileNode
27 27 from rhodecode.model.changeset_status import ChangesetStatusModel
28 28 from rhodecode.model.db import (
29 29 PullRequest, ChangesetStatus, UserLog, Notification)
30 30 from rhodecode.model.meta import Session
31 31 from rhodecode.model.pull_request import PullRequestModel
32 32 from rhodecode.model.user import UserModel
33 33 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
34 34 from rhodecode.tests.utils import AssertResponse
35 35
36 36
37 37 @pytest.mark.usefixtures('app', 'autologin_user')
38 38 @pytest.mark.backends("git", "hg")
39 39 class TestPullrequestsController:
40 40
41 41 def test_index(self, backend):
42 42 self.app.get(url(
43 43 controller='pullrequests', action='index',
44 44 repo_name=backend.repo_name))
45 45
46 46 def test_option_menu_create_pull_request_exists(self, backend):
47 47 repo_name = backend.repo_name
48 48 response = self.app.get(url('summary_home', repo_name=repo_name))
49 49
50 50 create_pr_link = '<a href="%s">Create Pull Request</a>' % url(
51 51 'pullrequest', repo_name=repo_name)
52 52 response.mustcontain(create_pr_link)
53 53
54 54 def test_global_redirect_of_pr(self, backend, pr_util):
55 55 pull_request = pr_util.create_pull_request()
56 56
57 57 response = self.app.get(
58 58 url('pull_requests_global',
59 59 pull_request_id=pull_request.pull_request_id))
60 60
61 61 repo_name = pull_request.target_repo.repo_name
62 62 redirect_url = url('pullrequest_show', repo_name=repo_name,
63 63 pull_request_id=pull_request.pull_request_id)
64 64 assert response.status == '302 Found'
65 65 assert redirect_url in response.location
66 66
67 67 @pytest.mark.xfail_backends(
68 68 "git", reason="Pending bugfix/feature, issue #6")
69 69 def test_create_pr_form_with_raw_commit_id(self, backend):
70 70 repo = backend.repo
71 71
72 72 self.app.get(
73 73 url(controller='pullrequests', action='index',
74 74 repo_name=repo.repo_name,
75 75 commit=repo.get_commit().raw_id),
76 76 status=200)
77 77
78 78 @pytest.mark.parametrize('pr_merge_enabled', [True, False])
79 79 def test_show(self, pr_util, pr_merge_enabled):
80 80 pull_request = pr_util.create_pull_request(
81 81 mergeable=pr_merge_enabled, enable_notifications=False)
82 82
83 83 response = self.app.get(url(
84 84 controller='pullrequests', action='show',
85 85 repo_name=pull_request.target_repo.scm_instance().name,
86 86 pull_request_id=str(pull_request.pull_request_id)))
87 87
88 88 for commit_id in pull_request.revisions:
89 89 response.mustcontain(commit_id)
90 90
91 91 assert pull_request.target_ref_parts.type in response
92 92 assert pull_request.target_ref_parts.name in response
93 93 target_clone_url = pull_request.target_repo.clone_url()
94 94 assert target_clone_url in response
95 95
96 96 assert 'class="pull-request-merge"' in response
97 97 assert (
98 98 'Server-side pull request merging is disabled.'
99 99 in response) != pr_merge_enabled
100 100
101 101 def test_close_status_visibility(self, pr_util, csrf_token):
102 from rhodecode.tests.functional.test_login import login_url, logut_url
102 103 # Logout
103 104 response = self.app.post(
104 url(controller='login', action='logout'),
105 logut_url,
105 106 params={'csrf_token': csrf_token})
106 107 # Login as regular user
107 response = self.app.post(url(controller='login', action='index'),
108 response = self.app.post(login_url,
108 109 {'username': 'test_regular',
109 110 'password': 'test12'})
110 111
111 112 pull_request = pr_util.create_pull_request(author='test_regular')
112 113
113 114 response = self.app.get(url(
114 115 controller='pullrequests', action='show',
115 116 repo_name=pull_request.target_repo.scm_instance().name,
116 117 pull_request_id=str(pull_request.pull_request_id)))
117 118
118 119 assert 'Server-side pull request merging is disabled.' in response
119 120 assert 'value="forced_closed"' in response
120 121
121 122 def test_show_invalid_commit_id(self, pr_util):
122 123 # Simulating invalid revisions which will cause a lookup error
123 124 pull_request = pr_util.create_pull_request()
124 125 pull_request.revisions = ['invalid']
125 126 Session().add(pull_request)
126 127 Session().commit()
127 128
128 129 response = self.app.get(url(
129 130 controller='pullrequests', action='show',
130 131 repo_name=pull_request.target_repo.scm_instance().name,
131 132 pull_request_id=str(pull_request.pull_request_id)))
132 133
133 134 for commit_id in pull_request.revisions:
134 135 response.mustcontain(commit_id)
135 136
136 137 def test_show_invalid_source_reference(self, pr_util):
137 138 pull_request = pr_util.create_pull_request()
138 139 pull_request.source_ref = 'branch:b:invalid'
139 140 Session().add(pull_request)
140 141 Session().commit()
141 142
142 143 self.app.get(url(
143 144 controller='pullrequests', action='show',
144 145 repo_name=pull_request.target_repo.scm_instance().name,
145 146 pull_request_id=str(pull_request.pull_request_id)))
146 147
147 148 def test_edit_title_description(self, pr_util, csrf_token):
148 149 pull_request = pr_util.create_pull_request()
149 150 pull_request_id = pull_request.pull_request_id
150 151
151 152 response = self.app.post(
152 153 url(controller='pullrequests', action='update',
153 154 repo_name=pull_request.target_repo.repo_name,
154 155 pull_request_id=str(pull_request_id)),
155 156 params={
156 157 'edit_pull_request': 'true',
157 158 '_method': 'put',
158 159 'title': 'New title',
159 160 'description': 'New description',
160 161 'csrf_token': csrf_token})
161 162
162 163 assert_session_flash(
163 164 response, u'Pull request title & description updated.',
164 165 category='success')
165 166
166 167 pull_request = PullRequest.get(pull_request_id)
167 168 assert pull_request.title == 'New title'
168 169 assert pull_request.description == 'New description'
169 170
170 171 def test_edit_title_description_closed(self, pr_util, csrf_token):
171 172 pull_request = pr_util.create_pull_request()
172 173 pull_request_id = pull_request.pull_request_id
173 174 pr_util.close()
174 175
175 176 response = self.app.post(
176 177 url(controller='pullrequests', action='update',
177 178 repo_name=pull_request.target_repo.repo_name,
178 179 pull_request_id=str(pull_request_id)),
179 180 params={
180 181 'edit_pull_request': 'true',
181 182 '_method': 'put',
182 183 'title': 'New title',
183 184 'description': 'New description',
184 185 'csrf_token': csrf_token})
185 186
186 187 assert_session_flash(
187 188 response, u'Cannot update closed pull requests.',
188 189 category='error')
189 190
190 191 def test_update_invalid_source_reference(self, pr_util, csrf_token):
191 192 pull_request = pr_util.create_pull_request()
192 193 pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
193 194 Session().add(pull_request)
194 195 Session().commit()
195 196
196 197 pull_request_id = pull_request.pull_request_id
197 198
198 199 response = self.app.post(
199 200 url(controller='pullrequests', action='update',
200 201 repo_name=pull_request.target_repo.repo_name,
201 202 pull_request_id=str(pull_request_id)),
202 203 params={'update_commits': 'true', '_method': 'put',
203 204 'csrf_token': csrf_token})
204 205
205 206 assert_session_flash(
206 207 response, u'Update failed due to missing commits.',
207 208 category='error')
208 209
209 210 def test_comment_and_close_pull_request(self, pr_util, csrf_token):
210 211 pull_request = pr_util.create_pull_request(approved=True)
211 212 pull_request_id = pull_request.pull_request_id
212 213 author = pull_request.user_id
213 214 repo = pull_request.target_repo.repo_id
214 215
215 216 self.app.post(
216 217 url(controller='pullrequests',
217 218 action='comment',
218 219 repo_name=pull_request.target_repo.scm_instance().name,
219 220 pull_request_id=str(pull_request_id)),
220 221 params={
221 222 'changeset_status':
222 223 ChangesetStatus.STATUS_APPROVED + '_closed',
223 224 'change_changeset_status': 'on',
224 225 'text': '',
225 226 'csrf_token': csrf_token},
226 227 status=302)
227 228
228 229 action = 'user_closed_pull_request:%d' % pull_request_id
229 230 journal = UserLog.query()\
230 231 .filter(UserLog.user_id == author)\
231 232 .filter(UserLog.repository_id == repo)\
232 233 .filter(UserLog.action == action)\
233 234 .all()
234 235 assert len(journal) == 1
235 236
236 237 def test_reject_and_close_pull_request(self, pr_util, csrf_token):
237 238 pull_request = pr_util.create_pull_request()
238 239 pull_request_id = pull_request.pull_request_id
239 240 response = self.app.post(
240 241 url(controller='pullrequests',
241 242 action='update',
242 243 repo_name=pull_request.target_repo.scm_instance().name,
243 244 pull_request_id=str(pull_request.pull_request_id)),
244 245 params={'close_pull_request': 'true', '_method': 'put',
245 246 'csrf_token': csrf_token})
246 247
247 248 pull_request = PullRequest.get(pull_request_id)
248 249
249 250 assert response.json is True
250 251 assert pull_request.is_closed()
251 252
252 253 # check only the latest status, not the review status
253 254 status = ChangesetStatusModel().get_status(
254 255 pull_request.source_repo, pull_request=pull_request)
255 256 assert status == ChangesetStatus.STATUS_REJECTED
256 257
257 258 def test_comment_force_close_pull_request(self, pr_util, csrf_token):
258 259 pull_request = pr_util.create_pull_request()
259 260 pull_request_id = pull_request.pull_request_id
260 261 reviewers_ids = [1, 2]
261 262 PullRequestModel().update_reviewers(pull_request_id, reviewers_ids)
262 263 author = pull_request.user_id
263 264 repo = pull_request.target_repo.repo_id
264 265 self.app.post(
265 266 url(controller='pullrequests',
266 267 action='comment',
267 268 repo_name=pull_request.target_repo.scm_instance().name,
268 269 pull_request_id=str(pull_request_id)),
269 270 params={
270 271 'changeset_status': 'forced_closed',
271 272 'csrf_token': csrf_token},
272 273 status=302)
273 274
274 275 pull_request = PullRequest.get(pull_request_id)
275 276
276 277 action = 'user_closed_pull_request:%d' % pull_request_id
277 278 journal = UserLog.query().filter(
278 279 UserLog.user_id == author,
279 280 UserLog.repository_id == repo,
280 281 UserLog.action == action).all()
281 282 assert len(journal) == 1
282 283
283 284 # check only the latest status, not the review status
284 285 status = ChangesetStatusModel().get_status(
285 286 pull_request.source_repo, pull_request=pull_request)
286 287 assert status == ChangesetStatus.STATUS_REJECTED
287 288
288 289 def test_create_pull_request(self, backend, csrf_token):
289 290 commits = [
290 291 {'message': 'ancestor'},
291 292 {'message': 'change'},
292 293 ]
293 294 commit_ids = backend.create_master_repo(commits)
294 295 target = backend.create_repo(heads=['ancestor'])
295 296 source = backend.create_repo(heads=['change'])
296 297
297 298 response = self.app.post(
298 299 url(
299 300 controller='pullrequests',
300 301 action='create',
301 302 repo_name=source.repo_name),
302 303 params={
303 304 'source_repo': source.repo_name,
304 305 'source_ref': 'branch:default:' + commit_ids['change'],
305 306 'target_repo': target.repo_name,
306 307 'target_ref': 'branch:default:' + commit_ids['ancestor'],
307 308 'pullrequest_desc': 'Description',
308 309 'pullrequest_title': 'Title',
309 310 'review_members': '1',
310 311 'revisions': commit_ids['change'],
311 312 'user': '',
312 313 'csrf_token': csrf_token,
313 314 },
314 315 status=302)
315 316
316 317 location = response.headers['Location']
317 318 pull_request_id = int(location.rsplit('/', 1)[1])
318 319 pull_request = PullRequest.get(pull_request_id)
319 320
320 321 # check that we have now both revisions
321 322 assert pull_request.revisions == [commit_ids['change']]
322 323 assert pull_request.source_ref == 'branch:default:' + commit_ids['change']
323 324 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
324 325 assert pull_request.target_ref == expected_target_ref
325 326
326 327 def test_reviewer_notifications(self, backend, csrf_token):
327 328 # We have to use the app.post for this test so it will create the
328 329 # notifications properly with the new PR
329 330 commits = [
330 331 {'message': 'ancestor',
331 332 'added': [FileNode('file_A', content='content_of_ancestor')]},
332 333 {'message': 'change',
333 334 'added': [FileNode('file_a', content='content_of_change')]},
334 335 {'message': 'change-child'},
335 336 {'message': 'ancestor-child', 'parents': ['ancestor'],
336 337 'added': [
337 338 FileNode('file_B', content='content_of_ancestor_child')]},
338 339 {'message': 'ancestor-child-2'},
339 340 ]
340 341 commit_ids = backend.create_master_repo(commits)
341 342 target = backend.create_repo(heads=['ancestor-child'])
342 343 source = backend.create_repo(heads=['change'])
343 344
344 345 response = self.app.post(
345 346 url(
346 347 controller='pullrequests',
347 348 action='create',
348 349 repo_name=source.repo_name),
349 350 params={
350 351 'source_repo': source.repo_name,
351 352 'source_ref': 'branch:default:' + commit_ids['change'],
352 353 'target_repo': target.repo_name,
353 354 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
354 355 'pullrequest_desc': 'Description',
355 356 'pullrequest_title': 'Title',
356 357 'review_members': '2',
357 358 'revisions': commit_ids['change'],
358 359 'user': '',
359 360 'csrf_token': csrf_token,
360 361 },
361 362 status=302)
362 363
363 364 location = response.headers['Location']
364 365 pull_request_id = int(location.rsplit('/', 1)[1])
365 366 pull_request = PullRequest.get(pull_request_id)
366 367
367 368 # Check that a notification was made
368 369 notifications = Notification.query()\
369 370 .filter(Notification.created_by == pull_request.author.user_id,
370 371 Notification.type_ == Notification.TYPE_PULL_REQUEST,
371 372 Notification.subject.contains("wants you to review "
372 373 "pull request #%d"
373 374 % pull_request_id))
374 375 assert len(notifications.all()) == 1
375 376
376 377 # Change reviewers and check that a notification was made
377 378 PullRequestModel().update_reviewers(pull_request.pull_request_id, [1])
378 379 assert len(notifications.all()) == 2
379 380
380 381 def test_create_pull_request_stores_ancestor_commit_id(self, backend,
381 382 csrf_token):
382 383 commits = [
383 384 {'message': 'ancestor',
384 385 'added': [FileNode('file_A', content='content_of_ancestor')]},
385 386 {'message': 'change',
386 387 'added': [FileNode('file_a', content='content_of_change')]},
387 388 {'message': 'change-child'},
388 389 {'message': 'ancestor-child', 'parents': ['ancestor'],
389 390 'added': [
390 391 FileNode('file_B', content='content_of_ancestor_child')]},
391 392 {'message': 'ancestor-child-2'},
392 393 ]
393 394 commit_ids = backend.create_master_repo(commits)
394 395 target = backend.create_repo(heads=['ancestor-child'])
395 396 source = backend.create_repo(heads=['change'])
396 397
397 398 response = self.app.post(
398 399 url(
399 400 controller='pullrequests',
400 401 action='create',
401 402 repo_name=source.repo_name),
402 403 params={
403 404 'source_repo': source.repo_name,
404 405 'source_ref': 'branch:default:' + commit_ids['change'],
405 406 'target_repo': target.repo_name,
406 407 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
407 408 'pullrequest_desc': 'Description',
408 409 'pullrequest_title': 'Title',
409 410 'review_members': '1',
410 411 'revisions': commit_ids['change'],
411 412 'user': '',
412 413 'csrf_token': csrf_token,
413 414 },
414 415 status=302)
415 416
416 417 location = response.headers['Location']
417 418 pull_request_id = int(location.rsplit('/', 1)[1])
418 419 pull_request = PullRequest.get(pull_request_id)
419 420
420 421 # target_ref has to point to the ancestor's commit_id in order to
421 422 # show the correct diff
422 423 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
423 424 assert pull_request.target_ref == expected_target_ref
424 425
425 426 # Check generated diff contents
426 427 response = response.follow()
427 428 assert 'content_of_ancestor' not in response.body
428 429 assert 'content_of_ancestor-child' not in response.body
429 430 assert 'content_of_change' in response.body
430 431
431 432 def test_merge_pull_request_enabled(self, pr_util, csrf_token):
432 433 # Clear any previous calls to rcextensions
433 434 rhodecode.EXTENSIONS.calls.clear()
434 435
435 436 pull_request = pr_util.create_pull_request(
436 437 approved=True, mergeable=True)
437 438 pull_request_id = pull_request.pull_request_id
438 439 repo_name = pull_request.target_repo.scm_instance().name,
439 440
440 441 response = self.app.post(
441 442 url(controller='pullrequests',
442 443 action='merge',
443 444 repo_name=str(repo_name[0]),
444 445 pull_request_id=str(pull_request_id)),
445 446 params={'csrf_token': csrf_token}).follow()
446 447
447 448 pull_request = PullRequest.get(pull_request_id)
448 449
449 450 assert response.status_int == 200
450 451 assert pull_request.is_closed()
451 452 assert_pull_request_status(
452 453 pull_request, ChangesetStatus.STATUS_APPROVED)
453 454
454 455 # Check the relevant log entries were added
455 456 user_logs = UserLog.query().order_by('-user_log_id').limit(4)
456 457 actions = [log.action for log in user_logs]
457 458 pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
458 459 expected_actions = [
459 460 u'user_closed_pull_request:%d' % pull_request_id,
460 461 u'user_merged_pull_request:%d' % pull_request_id,
461 462 # The action below reflect that the post push actions were executed
462 463 u'user_commented_pull_request:%d' % pull_request_id,
463 464 u'push:%s' % ','.join(pr_commit_ids),
464 465 ]
465 466 assert actions == expected_actions
466 467
467 468 # Check post_push rcextension was really executed
468 469 push_calls = rhodecode.EXTENSIONS.calls['post_push']
469 470 assert len(push_calls) == 1
470 471 unused_last_call_args, last_call_kwargs = push_calls[0]
471 472 assert last_call_kwargs['action'] == 'push'
472 473 assert last_call_kwargs['pushed_revs'] == pr_commit_ids
473 474
474 475 def test_merge_pull_request_disabled(self, pr_util, csrf_token):
475 476 pull_request = pr_util.create_pull_request(mergeable=False)
476 477 pull_request_id = pull_request.pull_request_id
477 478 pull_request = PullRequest.get(pull_request_id)
478 479
479 480 response = self.app.post(
480 481 url(controller='pullrequests',
481 482 action='merge',
482 483 repo_name=pull_request.target_repo.scm_instance().name,
483 484 pull_request_id=str(pull_request.pull_request_id)),
484 485 params={'csrf_token': csrf_token}).follow()
485 486
486 487 assert response.status_int == 200
487 488 assert 'Server-side pull request merging is disabled.' in response.body
488 489
489 490 @pytest.mark.skip_backends('svn')
490 491 def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
491 492 pull_request = pr_util.create_pull_request(mergeable=True)
492 493 pull_request_id = pull_request.pull_request_id
493 494 repo_name = pull_request.target_repo.scm_instance().name,
494 495
495 496 response = self.app.post(
496 497 url(controller='pullrequests',
497 498 action='merge',
498 499 repo_name=str(repo_name[0]),
499 500 pull_request_id=str(pull_request_id)),
500 501 params={'csrf_token': csrf_token}).follow()
501 502
502 503 pull_request = PullRequest.get(pull_request_id)
503 504
504 505 assert response.status_int == 200
505 506 assert ' Reviewer approval is pending.' in response.body
506 507
507 508 def test_update_source_revision(self, backend, csrf_token):
508 509 commits = [
509 510 {'message': 'ancestor'},
510 511 {'message': 'change'},
511 512 {'message': 'change-2'},
512 513 ]
513 514 commit_ids = backend.create_master_repo(commits)
514 515 target = backend.create_repo(heads=['ancestor'])
515 516 source = backend.create_repo(heads=['change'])
516 517
517 518 # create pr from a in source to A in target
518 519 pull_request = PullRequest()
519 520 pull_request.source_repo = source
520 521 # TODO: johbo: Make sure that we write the source ref this way!
521 522 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
522 523 branch=backend.default_branch_name, commit_id=commit_ids['change'])
523 524 pull_request.target_repo = target
524 525
525 526 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
526 527 branch=backend.default_branch_name,
527 528 commit_id=commit_ids['ancestor'])
528 529 pull_request.revisions = [commit_ids['change']]
529 530 pull_request.title = u"Test"
530 531 pull_request.description = u"Description"
531 532 pull_request.author = UserModel().get_by_username(
532 533 TEST_USER_ADMIN_LOGIN)
533 534 Session().add(pull_request)
534 535 Session().commit()
535 536 pull_request_id = pull_request.pull_request_id
536 537
537 538 # source has ancestor - change - change-2
538 539 backend.pull_heads(source, heads=['change-2'])
539 540
540 541 # update PR
541 542 self.app.post(
542 543 url(controller='pullrequests', action='update',
543 544 repo_name=target.repo_name,
544 545 pull_request_id=str(pull_request_id)),
545 546 params={'update_commits': 'true', '_method': 'put',
546 547 'csrf_token': csrf_token})
547 548
548 549 # check that we have now both revisions
549 550 pull_request = PullRequest.get(pull_request_id)
550 551 assert pull_request.revisions == [
551 552 commit_ids['change-2'], commit_ids['change']]
552 553
553 554 # TODO: johbo: this should be a test on its own
554 555 response = self.app.get(url(
555 556 controller='pullrequests', action='index',
556 557 repo_name=target.repo_name))
557 558 assert response.status_int == 200
558 559 assert 'Pull request updated to' in response.body
559 560 assert 'with 1 added, 0 removed commits.' in response.body
560 561
561 562 def test_update_target_revision(self, backend, csrf_token):
562 563 commits = [
563 564 {'message': 'ancestor'},
564 565 {'message': 'change'},
565 566 {'message': 'ancestor-new', 'parents': ['ancestor']},
566 567 {'message': 'change-rebased'},
567 568 ]
568 569 commit_ids = backend.create_master_repo(commits)
569 570 target = backend.create_repo(heads=['ancestor'])
570 571 source = backend.create_repo(heads=['change'])
571 572
572 573 # create pr from a in source to A in target
573 574 pull_request = PullRequest()
574 575 pull_request.source_repo = source
575 576 # TODO: johbo: Make sure that we write the source ref this way!
576 577 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
577 578 branch=backend.default_branch_name, commit_id=commit_ids['change'])
578 579 pull_request.target_repo = target
579 580 # TODO: johbo: Target ref should be branch based, since tip can jump
580 581 # from branch to branch
581 582 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
582 583 branch=backend.default_branch_name,
583 584 commit_id=commit_ids['ancestor'])
584 585 pull_request.revisions = [commit_ids['change']]
585 586 pull_request.title = u"Test"
586 587 pull_request.description = u"Description"
587 588 pull_request.author = UserModel().get_by_username(
588 589 TEST_USER_ADMIN_LOGIN)
589 590 Session().add(pull_request)
590 591 Session().commit()
591 592 pull_request_id = pull_request.pull_request_id
592 593
593 594 # target has ancestor - ancestor-new
594 595 # source has ancestor - ancestor-new - change-rebased
595 596 backend.pull_heads(target, heads=['ancestor-new'])
596 597 backend.pull_heads(source, heads=['change-rebased'])
597 598
598 599 # update PR
599 600 self.app.post(
600 601 url(controller='pullrequests', action='update',
601 602 repo_name=target.repo_name,
602 603 pull_request_id=str(pull_request_id)),
603 604 params={'update_commits': 'true', '_method': 'put',
604 605 'csrf_token': csrf_token},
605 606 status=200)
606 607
607 608 # check that we have now both revisions
608 609 pull_request = PullRequest.get(pull_request_id)
609 610 assert pull_request.revisions == [commit_ids['change-rebased']]
610 611 assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
611 612 branch=backend.default_branch_name,
612 613 commit_id=commit_ids['ancestor-new'])
613 614
614 615 # TODO: johbo: This should be a test on its own
615 616 response = self.app.get(url(
616 617 controller='pullrequests', action='index',
617 618 repo_name=target.repo_name))
618 619 assert response.status_int == 200
619 620 assert 'Pull request updated to' in response.body
620 621 assert 'with 1 added, 1 removed commits.' in response.body
621 622
622 623 def test_update_of_ancestor_reference(self, backend, csrf_token):
623 624 commits = [
624 625 {'message': 'ancestor'},
625 626 {'message': 'change'},
626 627 {'message': 'change-2'},
627 628 {'message': 'ancestor-new', 'parents': ['ancestor']},
628 629 {'message': 'change-rebased'},
629 630 ]
630 631 commit_ids = backend.create_master_repo(commits)
631 632 target = backend.create_repo(heads=['ancestor'])
632 633 source = backend.create_repo(heads=['change'])
633 634
634 635 # create pr from a in source to A in target
635 636 pull_request = PullRequest()
636 637 pull_request.source_repo = source
637 638 # TODO: johbo: Make sure that we write the source ref this way!
638 639 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
639 640 branch=backend.default_branch_name,
640 641 commit_id=commit_ids['change'])
641 642 pull_request.target_repo = target
642 643 # TODO: johbo: Target ref should be branch based, since tip can jump
643 644 # from branch to branch
644 645 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
645 646 branch=backend.default_branch_name,
646 647 commit_id=commit_ids['ancestor'])
647 648 pull_request.revisions = [commit_ids['change']]
648 649 pull_request.title = u"Test"
649 650 pull_request.description = u"Description"
650 651 pull_request.author = UserModel().get_by_username(
651 652 TEST_USER_ADMIN_LOGIN)
652 653 Session().add(pull_request)
653 654 Session().commit()
654 655 pull_request_id = pull_request.pull_request_id
655 656
656 657 # target has ancestor - ancestor-new
657 658 # source has ancestor - ancestor-new - change-rebased
658 659 backend.pull_heads(target, heads=['ancestor-new'])
659 660 backend.pull_heads(source, heads=['change-rebased'])
660 661
661 662 # update PR
662 663 self.app.post(
663 664 url(controller='pullrequests', action='update',
664 665 repo_name=target.repo_name,
665 666 pull_request_id=str(pull_request_id)),
666 667 params={'update_commits': 'true', '_method': 'put',
667 668 'csrf_token': csrf_token},
668 669 status=200)
669 670
670 671 # Expect the target reference to be updated correctly
671 672 pull_request = PullRequest.get(pull_request_id)
672 673 assert pull_request.revisions == [commit_ids['change-rebased']]
673 674 expected_target_ref = 'branch:{branch}:{commit_id}'.format(
674 675 branch=backend.default_branch_name,
675 676 commit_id=commit_ids['ancestor-new'])
676 677 assert pull_request.target_ref == expected_target_ref
677 678
678 679 def test_remove_pull_request_branch(self, backend_git, csrf_token):
679 680 branch_name = 'development'
680 681 commits = [
681 682 {'message': 'initial-commit'},
682 683 {'message': 'old-feature'},
683 684 {'message': 'new-feature', 'branch': branch_name},
684 685 ]
685 686 repo = backend_git.create_repo(commits)
686 687 commit_ids = backend_git.commit_ids
687 688
688 689 pull_request = PullRequest()
689 690 pull_request.source_repo = repo
690 691 pull_request.target_repo = repo
691 692 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
692 693 branch=branch_name, commit_id=commit_ids['new-feature'])
693 694 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
694 695 branch=backend_git.default_branch_name,
695 696 commit_id=commit_ids['old-feature'])
696 697 pull_request.revisions = [commit_ids['new-feature']]
697 698 pull_request.title = u"Test"
698 699 pull_request.description = u"Description"
699 700 pull_request.author = UserModel().get_by_username(
700 701 TEST_USER_ADMIN_LOGIN)
701 702 Session().add(pull_request)
702 703 Session().commit()
703 704
704 705 vcs = repo.scm_instance()
705 706 vcs.remove_ref('refs/heads/{}'.format(branch_name))
706 707
707 708 response = self.app.get(url(
708 709 controller='pullrequests', action='show',
709 710 repo_name=repo.repo_name,
710 711 pull_request_id=str(pull_request.pull_request_id)))
711 712
712 713 assert response.status_int == 200
713 714 assert_response = AssertResponse(response)
714 715 assert_response.element_contains(
715 716 '#changeset_compare_view_content .alert strong',
716 717 'Missing commits')
717 718 assert_response.element_contains(
718 719 '#changeset_compare_view_content .alert',
719 720 'This pull request cannot be displayed, because one or more'
720 721 ' commits no longer exist in the source repository.')
721 722
722 723 def test_strip_commits_from_pull_request(
723 724 self, backend, pr_util, csrf_token):
724 725 commits = [
725 726 {'message': 'initial-commit'},
726 727 {'message': 'old-feature'},
727 728 {'message': 'new-feature', 'parents': ['initial-commit']},
728 729 ]
729 730 pull_request = pr_util.create_pull_request(
730 731 commits, target_head='initial-commit', source_head='new-feature',
731 732 revisions=['new-feature'])
732 733
733 734 vcs = pr_util.source_repository.scm_instance()
734 735 if backend.alias == 'git':
735 736 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
736 737 else:
737 738 vcs.strip(pr_util.commit_ids['new-feature'])
738 739
739 740 response = self.app.get(url(
740 741 controller='pullrequests', action='show',
741 742 repo_name=pr_util.target_repository.repo_name,
742 743 pull_request_id=str(pull_request.pull_request_id)))
743 744
744 745 assert response.status_int == 200
745 746 assert_response = AssertResponse(response)
746 747 assert_response.element_contains(
747 748 '#changeset_compare_view_content .alert strong',
748 749 'Missing commits')
749 750 assert_response.element_contains(
750 751 '#changeset_compare_view_content .alert',
751 752 'This pull request cannot be displayed, because one or more'
752 753 ' commits no longer exist in the source repository.')
753 754 assert_response.element_contains(
754 755 '#update_commits',
755 756 'Update commits')
756 757
757 758 def test_strip_commits_and_update(
758 759 self, backend, pr_util, csrf_token):
759 760 commits = [
760 761 {'message': 'initial-commit'},
761 762 {'message': 'old-feature'},
762 763 {'message': 'new-feature', 'parents': ['old-feature']},
763 764 ]
764 765 pull_request = pr_util.create_pull_request(
765 766 commits, target_head='old-feature', source_head='new-feature',
766 767 revisions=['new-feature'], mergeable=True)
767 768
768 769 vcs = pr_util.source_repository.scm_instance()
769 770 if backend.alias == 'git':
770 771 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
771 772 else:
772 773 vcs.strip(pr_util.commit_ids['new-feature'])
773 774
774 775 response = self.app.post(
775 776 url(controller='pullrequests', action='update',
776 777 repo_name=pull_request.target_repo.repo_name,
777 778 pull_request_id=str(pull_request.pull_request_id)),
778 779 params={'update_commits': 'true', '_method': 'put',
779 780 'csrf_token': csrf_token})
780 781
781 782 assert response.status_int == 200
782 783 assert response.body == 'true'
783 784
784 785 # Make sure that after update, it won't raise 500 errors
785 786 response = self.app.get(url(
786 787 controller='pullrequests', action='show',
787 788 repo_name=pr_util.target_repository.repo_name,
788 789 pull_request_id=str(pull_request.pull_request_id)))
789 790
790 791 assert response.status_int == 200
791 792 assert_response = AssertResponse(response)
792 793 assert_response.element_contains(
793 794 '#changeset_compare_view_content .alert strong',
794 795 'Missing commits')
795 796
796 797 def test_branch_is_a_link(self, pr_util):
797 798 pull_request = pr_util.create_pull_request()
798 799 pull_request.source_ref = 'branch:origin:1234567890abcdef'
799 800 pull_request.target_ref = 'branch:target:abcdef1234567890'
800 801 Session().add(pull_request)
801 802 Session().commit()
802 803
803 804 response = self.app.get(url(
804 805 controller='pullrequests', action='show',
805 806 repo_name=pull_request.target_repo.scm_instance().name,
806 807 pull_request_id=str(pull_request.pull_request_id)))
807 808 assert response.status_int == 200
808 809 assert_response = AssertResponse(response)
809 810
810 811 origin = assert_response.get_element('.pr-origininfo .tag')
811 812 origin_children = origin.getchildren()
812 813 assert len(origin_children) == 1
813 814 target = assert_response.get_element('.pr-targetinfo .tag')
814 815 target_children = target.getchildren()
815 816 assert len(target_children) == 1
816 817
817 818 expected_origin_link = url(
818 819 'changelog_home',
819 820 repo_name=pull_request.source_repo.scm_instance().name,
820 821 branch='origin')
821 822 expected_target_link = url(
822 823 'changelog_home',
823 824 repo_name=pull_request.target_repo.scm_instance().name,
824 825 branch='target')
825 826 assert origin_children[0].attrib['href'] == expected_origin_link
826 827 assert origin_children[0].text == 'branch: origin'
827 828 assert target_children[0].attrib['href'] == expected_target_link
828 829 assert target_children[0].text == 'branch: target'
829 830
830 831 def test_bookmark_is_not_a_link(self, pr_util):
831 832 pull_request = pr_util.create_pull_request()
832 833 pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
833 834 pull_request.target_ref = 'bookmark:target:abcdef1234567890'
834 835 Session().add(pull_request)
835 836 Session().commit()
836 837
837 838 response = self.app.get(url(
838 839 controller='pullrequests', action='show',
839 840 repo_name=pull_request.target_repo.scm_instance().name,
840 841 pull_request_id=str(pull_request.pull_request_id)))
841 842 assert response.status_int == 200
842 843 assert_response = AssertResponse(response)
843 844
844 845 origin = assert_response.get_element('.pr-origininfo .tag')
845 846 assert origin.text.strip() == 'bookmark: origin'
846 847 assert origin.getchildren() == []
847 848
848 849 target = assert_response.get_element('.pr-targetinfo .tag')
849 850 assert target.text.strip() == 'bookmark: target'
850 851 assert target.getchildren() == []
851 852
852 853 def test_tag_is_not_a_link(self, pr_util):
853 854 pull_request = pr_util.create_pull_request()
854 855 pull_request.source_ref = 'tag:origin:1234567890abcdef'
855 856 pull_request.target_ref = 'tag:target:abcdef1234567890'
856 857 Session().add(pull_request)
857 858 Session().commit()
858 859
859 860 response = self.app.get(url(
860 861 controller='pullrequests', action='show',
861 862 repo_name=pull_request.target_repo.scm_instance().name,
862 863 pull_request_id=str(pull_request.pull_request_id)))
863 864 assert response.status_int == 200
864 865 assert_response = AssertResponse(response)
865 866
866 867 origin = assert_response.get_element('.pr-origininfo .tag')
867 868 assert origin.text.strip() == 'tag: origin'
868 869 assert origin.getchildren() == []
869 870
870 871 target = assert_response.get_element('.pr-targetinfo .tag')
871 872 assert target.text.strip() == 'tag: target'
872 873 assert target.getchildren() == []
873 874
874 875 def test_description_is_escaped_on_index_page(self, backend, pr_util):
875 876 xss_description = "<script>alert('Hi!')</script>"
876 877 pull_request = pr_util.create_pull_request(description=xss_description)
877 878 response = self.app.get(url(
878 879 controller='pullrequests', action='show_all',
879 880 repo_name=pull_request.target_repo.repo_name))
880 881 response.mustcontain(
881 882 "&lt;script&gt;alert(&#39;Hi!&#39;)&lt;/script&gt;")
882 883
883 884
884 885 def assert_pull_request_status(pull_request, expected_status):
885 886 status = ChangesetStatusModel().calculated_review_status(
886 887 pull_request=pull_request)
887 888 assert status == expected_status
888 889
889 890
890 891 @pytest.mark.parametrize('action', ['show_all', 'index', 'create'])
891 892 @pytest.mark.usefixtures("autologin_user")
892 893 def test_redirects_to_repo_summary_for_svn_repositories(
893 894 backend_svn, app, action):
894 895 denied_actions = ['show_all', 'index', 'create']
895 896 for action in denied_actions:
896 897 response = app.get(url(
897 898 controller='pullrequests', action=action,
898 899 repo_name=backend_svn.repo_name))
899 900 assert response.status_int == 302
900 901
901 902 # Not allowed, redirect to the summary
902 903 redirected = response.follow()
903 904 summary_url = url('summary_home', repo_name=backend_svn.repo_name)
904 905
905 906 # URL adds leading slash and path doesn't have it
906 907 assert redirected.req.path == summary_url
907 908
908 909
909 910 def test_delete_comment_returns_404_if_comment_does_not_exist(pylonsapp):
910 911 # TODO: johbo: Global import not possible because models.forms blows up
911 912 from rhodecode.controllers.pullrequests import PullrequestsController
912 913 controller = PullrequestsController()
913 914 patcher = mock.patch(
914 915 'rhodecode.model.db.BaseModel.get', return_value=None)
915 916 with pytest.raises(HTTPNotFound), patcher:
916 917 controller._delete_comment(1)
General Comments 0
You need to be logged in to leave comments. Login now