##// END OF EJS Templates
pytest: Use hardcoded login URLs in tests....
johbo -
r40:bc180d4b default
parent child Browse files
Show More
@@ -1,254 +1,254 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import datetime
24 import datetime
25 import hashlib
25 import hashlib
26 import tempfile
26 import tempfile
27 from os.path import join as jn
27 from os.path import join as jn
28
28
29 from tempfile import _RandomNameSequence
29 from tempfile import _RandomNameSequence
30
30
31 from paste.deploy import loadapp
31 from paste.deploy import loadapp
32 from paste.script.appinstall import SetupCommand
32 from paste.script.appinstall import SetupCommand
33
33
34 import pylons
34 import pylons
35 import pylons.test
35 import pylons.test
36 from pylons import config, url
36 from pylons import config, url
37 from pylons.i18n.translation import _get_translator
37 from pylons.i18n.translation import _get_translator
38 from pylons.util import ContextObj
38 from pylons.util import ContextObj
39
39
40 from routes.util import URLGenerator
40 from routes.util import URLGenerator
41 from webtest import TestApp
41 from webtest import TestApp
42 from nose.plugins.skip import SkipTest
42 from nose.plugins.skip import SkipTest
43 import pytest
43 import pytest
44
44
45 from rhodecode import is_windows
45 from rhodecode import is_windows
46 from rhodecode.config.routing import ADMIN_PREFIX
46 from rhodecode.model.meta import Session
47 from rhodecode.model.meta import Session
47 from rhodecode.model.db import User
48 from rhodecode.model.db import User
48 from rhodecode.lib import auth
49 from rhodecode.lib import auth
49 from rhodecode.lib.helpers import flash, link_to
50 from rhodecode.lib.helpers import flash, link_to
50 from rhodecode.lib.utils2 import safe_unicode, safe_str
51 from rhodecode.lib.utils2 import safe_unicode, safe_str
51
52
52 # TODO: johbo: Solve time zone related issues and remove this tweak
53 # TODO: johbo: Solve time zone related issues and remove this tweak
53 os.environ['TZ'] = 'UTC'
54 os.environ['TZ'] = 'UTC'
54 if not is_windows:
55 if not is_windows:
55 time.tzset()
56 time.tzset()
56
57
57 log = logging.getLogger(__name__)
58 log = logging.getLogger(__name__)
58
59
59 __all__ = [
60 __all__ = [
60 'get_new_dir', 'TestController', 'SkipTest',
61 'get_new_dir', 'TestController', 'SkipTest',
61 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
62 'url', 'link_to', 'ldap_lib_installed', 'clear_all_caches',
62 'assert_session_flash', 'login_user',
63 'assert_session_flash', 'login_user',
63 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
64 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
64 'NEW_HG_REPO', 'NEW_GIT_REPO',
65 'NEW_HG_REPO', 'NEW_GIT_REPO',
65 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
66 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
66 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
67 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
67 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
68 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
68 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
69 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
69 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
70 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
70 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
71 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
71 ]
72 ]
72
73
73 # Invoke websetup with the current config file
74 # Invoke websetup with the current config file
74 # SetupCommand('setup-app').run([config_file])
75 # SetupCommand('setup-app').run([config_file])
75
76
76 # SOME GLOBALS FOR TESTS
77 # SOME GLOBALS FOR TESTS
77 TEST_DIR = tempfile.gettempdir()
78 TEST_DIR = tempfile.gettempdir()
78
79
79 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
80 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
80 TEST_USER_ADMIN_LOGIN = 'test_admin'
81 TEST_USER_ADMIN_LOGIN = 'test_admin'
81 TEST_USER_ADMIN_PASS = 'test12'
82 TEST_USER_ADMIN_PASS = 'test12'
82 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
83 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
83
84
84 TEST_USER_REGULAR_LOGIN = 'test_regular'
85 TEST_USER_REGULAR_LOGIN = 'test_regular'
85 TEST_USER_REGULAR_PASS = 'test12'
86 TEST_USER_REGULAR_PASS = 'test12'
86 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
87 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
87
88
88 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
89 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
89 TEST_USER_REGULAR2_PASS = 'test12'
90 TEST_USER_REGULAR2_PASS = 'test12'
90 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
91 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
91
92
92 HG_REPO = 'vcs_test_hg'
93 HG_REPO = 'vcs_test_hg'
93 GIT_REPO = 'vcs_test_git'
94 GIT_REPO = 'vcs_test_git'
94 SVN_REPO = 'vcs_test_svn'
95 SVN_REPO = 'vcs_test_svn'
95
96
96 NEW_HG_REPO = 'vcs_test_hg_new'
97 NEW_HG_REPO = 'vcs_test_hg_new'
97 NEW_GIT_REPO = 'vcs_test_git_new'
98 NEW_GIT_REPO = 'vcs_test_git_new'
98
99
99 HG_FORK = 'vcs_test_hg_fork'
100 HG_FORK = 'vcs_test_hg_fork'
100 GIT_FORK = 'vcs_test_git_fork'
101 GIT_FORK = 'vcs_test_git_fork'
101
102
102 ## VCS
103 ## VCS
103 SCM_TESTS = ['hg', 'git']
104 SCM_TESTS = ['hg', 'git']
104 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
105 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
105
106
106 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
107 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
107 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
108 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
108 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
109 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
109
110
110 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
111 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
111 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
112 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
112 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
113 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
113
114
114 TEST_REPO_PREFIX = 'vcs-test'
115 TEST_REPO_PREFIX = 'vcs-test'
115
116
116
117
117 # skip ldap tests if LDAP lib is not installed
118 # skip ldap tests if LDAP lib is not installed
118 ldap_lib_installed = False
119 ldap_lib_installed = False
119 try:
120 try:
120 import ldap
121 import ldap
121 ldap_lib_installed = True
122 ldap_lib_installed = True
122 except ImportError:
123 except ImportError:
123 # means that python-ldap is not installed
124 # means that python-ldap is not installed
124 pass
125 pass
125
126
126
127
127 def clear_all_caches():
128 def clear_all_caches():
128 from beaker.cache import cache_managers
129 from beaker.cache import cache_managers
129 for _cache in cache_managers.values():
130 for _cache in cache_managers.values():
130 _cache.clear()
131 _cache.clear()
131
132
132
133
133 def get_new_dir(title):
134 def get_new_dir(title):
134 """
135 """
135 Returns always new directory path.
136 Returns always new directory path.
136 """
137 """
137 from rhodecode.tests.vcs.utils import get_normalized_path
138 from rhodecode.tests.vcs.utils import get_normalized_path
138 name_parts = [TEST_REPO_PREFIX]
139 name_parts = [TEST_REPO_PREFIX]
139 if title:
140 if title:
140 name_parts.append(title)
141 name_parts.append(title)
141 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
142 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
142 name_parts.append(hex_str)
143 name_parts.append(hex_str)
143 name = '-'.join(name_parts)
144 name = '-'.join(name_parts)
144 path = os.path.join(TEST_DIR, name)
145 path = os.path.join(TEST_DIR, name)
145 return get_normalized_path(path)
146 return get_normalized_path(path)
146
147
147
148
148 @pytest.mark.usefixtures('app', 'index_location')
149 @pytest.mark.usefixtures('app', 'index_location')
149 class TestController(object):
150 class TestController(object):
150
151
151 maxDiff = None
152 maxDiff = None
152
153
153 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
154 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
154 password=TEST_USER_ADMIN_PASS):
155 password=TEST_USER_ADMIN_PASS):
155 self._logged_username = username
156 self._logged_username = username
156 self._session = login_user_session(self.app, username, password)
157 self._session = login_user_session(self.app, username, password)
157 self.csrf_token = auth.get_csrf_token(self._session)
158 self.csrf_token = auth.get_csrf_token(self._session)
158
159
159 return self._session['rhodecode_user']
160 return self._session['rhodecode_user']
160
161
161 def logout_user(self):
162 def logout_user(self):
162 logout_user_session(self.app, auth.get_csrf_token(self._session))
163 logout_user_session(self.app, auth.get_csrf_token(self._session))
163 self.csrf_token = None
164 self.csrf_token = None
164 self._logged_username = None
165 self._logged_username = None
165 self._session = None
166 self._session = None
166
167
167 def _get_logged_user(self):
168 def _get_logged_user(self):
168 return User.get_by_username(self._logged_username)
169 return User.get_by_username(self._logged_username)
169
170
170 # TODO: remove, use plain assert in tests
171 # TODO: remove, use plain assert in tests
171 def assertEqual(self, a, b, msg=None):
172 def assertEqual(self, a, b, msg=None):
172 if msg:
173 if msg:
173 assert a == b, msg
174 assert a == b, msg
174 else:
175 else:
175 assert a == b
176 assert a == b
176
177
177
178
178 def login_user_session(
179 def login_user_session(
179 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
180 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
180 response = app.post(url(controller='login', action='index'),
181 from rhodecode.tests.functional.test_login import login_url
181 {'username': username,
182 response = app.post(
182 'password': password})
183 login_url,
183
184 {'username': username, 'password': password})
184 if 'invalid user name' in response.body:
185 if 'invalid user name' in response.body:
185 pytest.fail('could not login using %s %s' % (username, password))
186 pytest.fail('could not login using %s %s' % (username, password))
186
187
187 assert response.status == '302 Found'
188 assert response.status == '302 Found'
188 ses = response.session['rhodecode_user']
189 ses = response.session['rhodecode_user']
189 assert ses.get('username') == username
190 assert ses.get('username') == username
190 response = response.follow()
191 response = response.follow()
191 assert ses.get('is_authenticated')
192 assert ses.get('is_authenticated')
192
193
193 return response.session
194 return response.session
194
195
195
196
196 def logout_user_session(app, csrf_token):
197 def logout_user_session(app, csrf_token):
197 app.post(
198 from rhodecode.tests.functional.test_login import logut_url
198 url(controller='login', action='logout'),
199 app.post(logut_url, {'csrf_token': csrf_token}, status=302)
199 {'csrf_token': csrf_token}, status=302)
200
200
201
201
202 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
202 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
203 password=TEST_USER_ADMIN_PASS):
203 password=TEST_USER_ADMIN_PASS):
204 return login_user_session(app, username, password)['rhodecode_user']
204 return login_user_session(app, username, password)['rhodecode_user']
205
205
206
206
207 def assert_session_flash(response=None, msg=None, category=None):
207 def assert_session_flash(response=None, msg=None, category=None):
208 """
208 """
209 Assert on a flash message in the current session.
209 Assert on a flash message in the current session.
210
210
211 :param msg: Required. The expected message. Will be evaluated if a
211 :param msg: Required. The expected message. Will be evaluated if a
212 :class:`LazyString` is passed in.
212 :class:`LazyString` is passed in.
213 :param response: Optional. For functional testing, pass in the response
213 :param response: Optional. For functional testing, pass in the response
214 object. Otherwise don't pass in any value.
214 object. Otherwise don't pass in any value.
215 :param category: Optional. If passed, the message category will be
215 :param category: Optional. If passed, the message category will be
216 checked as well.
216 checked as well.
217 """
217 """
218 if msg is None:
218 if msg is None:
219 raise ValueError("Parameter msg is required.")
219 raise ValueError("Parameter msg is required.")
220
220
221 messages = flash.pop_messages()
221 messages = flash.pop_messages()
222 message = messages[0]
222 message = messages[0]
223
223
224 msg = _eval_if_lazy(msg)
224 msg = _eval_if_lazy(msg)
225 message_text = _eval_if_lazy(message.message)
225 message_text = _eval_if_lazy(message.message)
226
226
227 if msg not in message_text:
227 if msg not in message_text:
228 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
228 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
229 msg, message_text)
229 msg, message_text)
230 pytest.fail(safe_str(msg))
230 pytest.fail(safe_str(msg))
231 if category:
231 if category:
232 assert category == message.category
232 assert category == message.category
233
233
234
234
235 def _eval_if_lazy(value):
235 def _eval_if_lazy(value):
236 return value.eval() if hasattr(value, 'eval') else value
236 return value.eval() if hasattr(value, 'eval') else value
237
237
238
238
239 def assert_not_in_session_flash(response, msg, category=None):
239 def assert_not_in_session_flash(response, msg, category=None):
240 assert 'flash' in response.session, 'Response session has no flash key'
240 assert 'flash' in response.session, 'Response session has no flash key'
241 message_category, message_text = response.session['flash'][0]
241 message_category, message_text = response.session['flash'][0]
242 if msg in message_text:
242 if msg in message_text:
243 msg = u'msg `%s` found in session flash: got `%s` instead' % (
243 msg = u'msg `%s` found in session flash: got `%s` instead' % (
244 msg, message_text)
244 msg, message_text)
245 pytest.fail(safe_str(msg))
245 pytest.fail(safe_str(msg))
246 if category:
246 if category:
247 assert category == message_category
247 assert category == message_category
248
248
249
249
250 def assert_session_flash_is_empty(response):
250 def assert_session_flash_is_empty(response):
251 if 'flash' in response.session:
251 if 'flash' in response.session:
252 msg = 'flash messages are present in session:%s' % \
252 msg = 'flash messages are present in session:%s' % \
253 response.session['flash'][0]
253 response.session['flash'][0]
254 pytest.fail(safe_str(msg))
254 pytest.fail(safe_str(msg))
@@ -1,621 +1,622 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import mock
21 import mock
22 import pytest
22 import pytest
23
23
24 import rhodecode
24 import rhodecode
25 from rhodecode.config.routing import ADMIN_PREFIX
25 from rhodecode.lib.utils2 import md5
26 from rhodecode.lib.utils2 import md5
26 from rhodecode.model.db import RhodeCodeUi
27 from rhodecode.model.db import RhodeCodeUi
27 from rhodecode.model.meta import Session
28 from rhodecode.model.meta import Session
28 from rhodecode.model.settings import SettingsModel, IssueTrackerSettingsModel
29 from rhodecode.model.settings import SettingsModel, IssueTrackerSettingsModel
29 from rhodecode.tests import url, assert_session_flash
30 from rhodecode.tests import url, assert_session_flash
30 from rhodecode.tests.utils import AssertResponse
31 from rhodecode.tests.utils import AssertResponse
31
32
32
33
33 UPDATE_DATA_QUALNAME = (
34 UPDATE_DATA_QUALNAME = (
34 'rhodecode.controllers.admin.settings.SettingsController.get_update_data')
35 'rhodecode.controllers.admin.settings.SettingsController.get_update_data')
35
36
36
37
37 @pytest.mark.usefixtures('autologin_user', 'app')
38 @pytest.mark.usefixtures('autologin_user', 'app')
38 class TestAdminSettingsController:
39 class TestAdminSettingsController:
39
40
40 @pytest.mark.parametrize('urlname', [
41 @pytest.mark.parametrize('urlname', [
41 'admin_settings_vcs',
42 'admin_settings_vcs',
42 'admin_settings_mapping',
43 'admin_settings_mapping',
43 'admin_settings_global',
44 'admin_settings_global',
44 'admin_settings_visual',
45 'admin_settings_visual',
45 'admin_settings_email',
46 'admin_settings_email',
46 'admin_settings_hooks',
47 'admin_settings_hooks',
47 'admin_settings_search',
48 'admin_settings_search',
48 'admin_settings_system',
49 'admin_settings_system',
49 ])
50 ])
50 def test_simple_get(self, urlname, app):
51 def test_simple_get(self, urlname, app):
51 app.get(url(urlname))
52 app.get(url(urlname))
52
53
53 def test_create_custom_hook(self, csrf_token):
54 def test_create_custom_hook(self, csrf_token):
54 response = self.app.post(
55 response = self.app.post(
55 url('admin_settings_hooks'),
56 url('admin_settings_hooks'),
56 params={
57 params={
57 'new_hook_ui_key': 'test_hooks_1',
58 'new_hook_ui_key': 'test_hooks_1',
58 'new_hook_ui_value': 'cd /tmp',
59 'new_hook_ui_value': 'cd /tmp',
59 'csrf_token': csrf_token})
60 'csrf_token': csrf_token})
60
61
61 response = response.follow()
62 response = response.follow()
62 response.mustcontain('test_hooks_1')
63 response.mustcontain('test_hooks_1')
63 response.mustcontain('cd /tmp')
64 response.mustcontain('cd /tmp')
64
65
65 def test_create_custom_hook_delete(self, csrf_token):
66 def test_create_custom_hook_delete(self, csrf_token):
66 response = self.app.post(
67 response = self.app.post(
67 url('admin_settings_hooks'),
68 url('admin_settings_hooks'),
68 params={
69 params={
69 'new_hook_ui_key': 'test_hooks_2',
70 'new_hook_ui_key': 'test_hooks_2',
70 'new_hook_ui_value': 'cd /tmp2',
71 'new_hook_ui_value': 'cd /tmp2',
71 'csrf_token': csrf_token})
72 'csrf_token': csrf_token})
72
73
73 response = response.follow()
74 response = response.follow()
74 response.mustcontain('test_hooks_2')
75 response.mustcontain('test_hooks_2')
75 response.mustcontain('cd /tmp2')
76 response.mustcontain('cd /tmp2')
76
77
77 hook_id = SettingsModel().get_ui_by_key('test_hooks_2').ui_id
78 hook_id = SettingsModel().get_ui_by_key('test_hooks_2').ui_id
78
79
79 # delete
80 # delete
80 self.app.post(
81 self.app.post(
81 url('admin_settings_hooks'),
82 url('admin_settings_hooks'),
82 params={'hook_id': hook_id, 'csrf_token': csrf_token})
83 params={'hook_id': hook_id, 'csrf_token': csrf_token})
83 response = self.app.get(url('admin_settings_hooks'))
84 response = self.app.get(url('admin_settings_hooks'))
84 response.mustcontain(no=['test_hooks_2'])
85 response.mustcontain(no=['test_hooks_2'])
85 response.mustcontain(no=['cd /tmp2'])
86 response.mustcontain(no=['cd /tmp2'])
86
87
87 def test_system_update_new_version(self):
88 def test_system_update_new_version(self):
88 update_data = {
89 update_data = {
89 'versions': [
90 'versions': [
90 {
91 {
91 'version': '100.3.1415926535',
92 'version': '100.3.1415926535',
92 'general': 'The latest version we are ever going to ship'
93 'general': 'The latest version we are ever going to ship'
93 },
94 },
94 {
95 {
95 'version': '0.0.0',
96 'version': '0.0.0',
96 'general': 'The first version we ever shipped'
97 'general': 'The first version we ever shipped'
97 }
98 }
98 ]
99 ]
99 }
100 }
100 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
101 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
101 response = self.app.get(url('admin_settings_system_update'))
102 response = self.app.get(url('admin_settings_system_update'))
102 response.mustcontain('A <b>new version</b> is available')
103 response.mustcontain('A <b>new version</b> is available')
103
104
104 def test_system_update_nothing_new(self):
105 def test_system_update_nothing_new(self):
105 update_data = {
106 update_data = {
106 'versions': [
107 'versions': [
107 {
108 {
108 'version': '0.0.0',
109 'version': '0.0.0',
109 'general': 'The first version we ever shipped'
110 'general': 'The first version we ever shipped'
110 }
111 }
111 ]
112 ]
112 }
113 }
113 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
114 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
114 response = self.app.get(url('admin_settings_system_update'))
115 response = self.app.get(url('admin_settings_system_update'))
115 response.mustcontain(
116 response.mustcontain(
116 'You already have the <b>latest</b> stable version.')
117 'You already have the <b>latest</b> stable version.')
117
118
118 def test_system_update_bad_response(self):
119 def test_system_update_bad_response(self):
119 with mock.patch(UPDATE_DATA_QUALNAME, side_effect=ValueError('foo')):
120 with mock.patch(UPDATE_DATA_QUALNAME, side_effect=ValueError('foo')):
120 response = self.app.get(url('admin_settings_system_update'))
121 response = self.app.get(url('admin_settings_system_update'))
121 response.mustcontain(
122 response.mustcontain(
122 'Bad data sent from update server')
123 'Bad data sent from update server')
123
124
124
125
125 @pytest.mark.usefixtures('autologin_user', 'app')
126 @pytest.mark.usefixtures('autologin_user', 'app')
126 class TestAdminSettingsGlobal:
127 class TestAdminSettingsGlobal:
127
128
128 def test_pre_post_code_code_active(self, csrf_token):
129 def test_pre_post_code_code_active(self, csrf_token):
129 pre_code = 'rc-pre-code-187652122'
130 pre_code = 'rc-pre-code-187652122'
130 post_code = 'rc-postcode-98165231'
131 post_code = 'rc-postcode-98165231'
131
132
132 response = self.post_and_verify_settings({
133 response = self.post_and_verify_settings({
133 'rhodecode_pre_code': pre_code,
134 'rhodecode_pre_code': pre_code,
134 'rhodecode_post_code': post_code,
135 'rhodecode_post_code': post_code,
135 'csrf_token': csrf_token,
136 'csrf_token': csrf_token,
136 })
137 })
137
138
138 response = response.follow()
139 response = response.follow()
139 response.mustcontain(pre_code, post_code)
140 response.mustcontain(pre_code, post_code)
140
141
141 def test_pre_post_code_code_inactive(self, csrf_token):
142 def test_pre_post_code_code_inactive(self, csrf_token):
142 pre_code = 'rc-pre-code-187652122'
143 pre_code = 'rc-pre-code-187652122'
143 post_code = 'rc-postcode-98165231'
144 post_code = 'rc-postcode-98165231'
144 response = self.post_and_verify_settings({
145 response = self.post_and_verify_settings({
145 'rhodecode_pre_code': '',
146 'rhodecode_pre_code': '',
146 'rhodecode_post_code': '',
147 'rhodecode_post_code': '',
147 'csrf_token': csrf_token,
148 'csrf_token': csrf_token,
148 })
149 })
149
150
150 response = response.follow()
151 response = response.follow()
151 response.mustcontain(no=[pre_code, post_code])
152 response.mustcontain(no=[pre_code, post_code])
152
153
153 def test_captcha_activate(self, csrf_token):
154 def test_captcha_activate(self, csrf_token):
154 self.post_and_verify_settings({
155 self.post_and_verify_settings({
155 'rhodecode_captcha_private_key': '1234567890',
156 'rhodecode_captcha_private_key': '1234567890',
156 'rhodecode_captcha_public_key': '1234567890',
157 'rhodecode_captcha_public_key': '1234567890',
157 'csrf_token': csrf_token,
158 'csrf_token': csrf_token,
158 })
159 })
159
160
160 response = self.app.get(url('register'))
161 response = self.app.get(ADMIN_PREFIX + '/register')
161 response.mustcontain('captcha')
162 response.mustcontain('captcha')
162
163
163 def test_captcha_deactivate(self, csrf_token):
164 def test_captcha_deactivate(self, csrf_token):
164 self.post_and_verify_settings({
165 self.post_and_verify_settings({
165 'rhodecode_captcha_private_key': '',
166 'rhodecode_captcha_private_key': '',
166 'rhodecode_captcha_public_key': '1234567890',
167 'rhodecode_captcha_public_key': '1234567890',
167 'csrf_token': csrf_token,
168 'csrf_token': csrf_token,
168 })
169 })
169
170
170 response = self.app.get(url('register'))
171 response = self.app.get(ADMIN_PREFIX + '/register')
171 response.mustcontain(no=['captcha'])
172 response.mustcontain(no=['captcha'])
172
173
173 def test_title_change(self, csrf_token):
174 def test_title_change(self, csrf_token):
174 old_title = 'RhodeCode'
175 old_title = 'RhodeCode'
175 new_title = old_title + '_changed'
176 new_title = old_title + '_changed'
176
177
177 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
178 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
178 response = self.post_and_verify_settings({
179 response = self.post_and_verify_settings({
179 'rhodecode_title': new_title,
180 'rhodecode_title': new_title,
180 'csrf_token': csrf_token,
181 'csrf_token': csrf_token,
181 })
182 })
182
183
183 response = response.follow()
184 response = response.follow()
184 response.mustcontain(
185 response.mustcontain(
185 """<div class="branding">- %s</div>""" % new_title)
186 """<div class="branding">- %s</div>""" % new_title)
186
187
187 def post_and_verify_settings(self, settings):
188 def post_and_verify_settings(self, settings):
188 old_title = 'RhodeCode'
189 old_title = 'RhodeCode'
189 old_realm = 'RhodeCode authentication'
190 old_realm = 'RhodeCode authentication'
190 params = {
191 params = {
191 'rhodecode_title': old_title,
192 'rhodecode_title': old_title,
192 'rhodecode_realm': old_realm,
193 'rhodecode_realm': old_realm,
193 'rhodecode_pre_code': '',
194 'rhodecode_pre_code': '',
194 'rhodecode_post_code': '',
195 'rhodecode_post_code': '',
195 'rhodecode_captcha_private_key': '',
196 'rhodecode_captcha_private_key': '',
196 'rhodecode_captcha_public_key': '',
197 'rhodecode_captcha_public_key': '',
197 }
198 }
198 params.update(settings)
199 params.update(settings)
199 response = self.app.post(url('admin_settings_global'), params=params)
200 response = self.app.post(url('admin_settings_global'), params=params)
200
201
201 assert_session_flash(response, 'Updated application settings')
202 assert_session_flash(response, 'Updated application settings')
202 app_settings = SettingsModel().get_all_settings()
203 app_settings = SettingsModel().get_all_settings()
203 del settings['csrf_token']
204 del settings['csrf_token']
204 for key, value in settings.iteritems():
205 for key, value in settings.iteritems():
205 assert app_settings[key] == value.decode('utf-8')
206 assert app_settings[key] == value.decode('utf-8')
206
207
207 return response
208 return response
208
209
209
210
210 @pytest.mark.usefixtures('autologin_user', 'app')
211 @pytest.mark.usefixtures('autologin_user', 'app')
211 class TestAdminSettingsVcs:
212 class TestAdminSettingsVcs:
212
213
213 def test_contains_svn_default_patterns(self, app):
214 def test_contains_svn_default_patterns(self, app):
214 response = app.get(url('admin_settings_vcs'))
215 response = app.get(url('admin_settings_vcs'))
215 expected_patterns = [
216 expected_patterns = [
216 '/trunk',
217 '/trunk',
217 '/branches/*',
218 '/branches/*',
218 '/tags/*',
219 '/tags/*',
219 ]
220 ]
220 for pattern in expected_patterns:
221 for pattern in expected_patterns:
221 response.mustcontain(pattern)
222 response.mustcontain(pattern)
222
223
223 def test_add_new_svn_branch_and_tag_pattern(
224 def test_add_new_svn_branch_and_tag_pattern(
224 self, app, backend_svn, form_defaults, disable_sql_cache,
225 self, app, backend_svn, form_defaults, disable_sql_cache,
225 csrf_token):
226 csrf_token):
226 form_defaults.update({
227 form_defaults.update({
227 'new_svn_branch': '/exp/branches/*',
228 'new_svn_branch': '/exp/branches/*',
228 'new_svn_tag': '/important_tags/*',
229 'new_svn_tag': '/important_tags/*',
229 'csrf_token': csrf_token,
230 'csrf_token': csrf_token,
230 })
231 })
231
232
232 response = app.post(
233 response = app.post(
233 url('admin_settings_vcs'), params=form_defaults, status=302)
234 url('admin_settings_vcs'), params=form_defaults, status=302)
234 response = response.follow()
235 response = response.follow()
235
236
236 # Expect to find the new values on the page
237 # Expect to find the new values on the page
237 response.mustcontain('/exp/branches/*')
238 response.mustcontain('/exp/branches/*')
238 response.mustcontain('/important_tags/*')
239 response.mustcontain('/important_tags/*')
239
240
240 # Expect that those patterns are used to match branches and tags now
241 # Expect that those patterns are used to match branches and tags now
241 repo = backend_svn['svn-simple-layout'].scm_instance()
242 repo = backend_svn['svn-simple-layout'].scm_instance()
242 assert 'exp/branches/exp-sphinx-docs' in repo.branches
243 assert 'exp/branches/exp-sphinx-docs' in repo.branches
243 assert 'important_tags/v0.5' in repo.tags
244 assert 'important_tags/v0.5' in repo.tags
244
245
245 def test_add_same_svn_value_twice_shows_an_error_message(
246 def test_add_same_svn_value_twice_shows_an_error_message(
246 self, app, form_defaults, csrf_token, settings_util):
247 self, app, form_defaults, csrf_token, settings_util):
247 settings_util.create_rhodecode_ui('vcs_svn_branch', '/test')
248 settings_util.create_rhodecode_ui('vcs_svn_branch', '/test')
248 settings_util.create_rhodecode_ui('vcs_svn_tag', '/test')
249 settings_util.create_rhodecode_ui('vcs_svn_tag', '/test')
249
250
250 response = app.post(
251 response = app.post(
251 url('admin_settings_vcs'),
252 url('admin_settings_vcs'),
252 params={
253 params={
253 'paths_root_path': form_defaults['paths_root_path'],
254 'paths_root_path': form_defaults['paths_root_path'],
254 'new_svn_branch': '/test',
255 'new_svn_branch': '/test',
255 'new_svn_tag': '/test',
256 'new_svn_tag': '/test',
256 'csrf_token': csrf_token,
257 'csrf_token': csrf_token,
257 },
258 },
258 status=200)
259 status=200)
259
260
260 response.mustcontain("Pattern already exists")
261 response.mustcontain("Pattern already exists")
261 response.mustcontain("Some form inputs contain invalid data.")
262 response.mustcontain("Some form inputs contain invalid data.")
262
263
263 @pytest.mark.parametrize('section', [
264 @pytest.mark.parametrize('section', [
264 'vcs_svn_branch',
265 'vcs_svn_branch',
265 'vcs_svn_tag',
266 'vcs_svn_tag',
266 ])
267 ])
267 def test_delete_svn_patterns(
268 def test_delete_svn_patterns(
268 self, section, app, csrf_token, settings_util):
269 self, section, app, csrf_token, settings_util):
269 setting = settings_util.create_rhodecode_ui(
270 setting = settings_util.create_rhodecode_ui(
270 section, '/test_delete', cleanup=False)
271 section, '/test_delete', cleanup=False)
271
272
272 app.post(
273 app.post(
273 url('admin_settings_vcs'),
274 url('admin_settings_vcs'),
274 params={
275 params={
275 '_method': 'delete',
276 '_method': 'delete',
276 'delete_svn_pattern': setting.ui_id,
277 'delete_svn_pattern': setting.ui_id,
277 'csrf_token': csrf_token},
278 'csrf_token': csrf_token},
278 headers={'X-REQUESTED-WITH': 'XMLHttpRequest'})
279 headers={'X-REQUESTED-WITH': 'XMLHttpRequest'})
279
280
280 @pytest.mark.parametrize('section', [
281 @pytest.mark.parametrize('section', [
281 'vcs_svn_branch',
282 'vcs_svn_branch',
282 'vcs_svn_tag',
283 'vcs_svn_tag',
283 ])
284 ])
284 def test_delete_svn_patterns_raises_400_when_no_xhr(
285 def test_delete_svn_patterns_raises_400_when_no_xhr(
285 self, section, app, csrf_token, settings_util):
286 self, section, app, csrf_token, settings_util):
286 setting = settings_util.create_rhodecode_ui(section, '/test_delete')
287 setting = settings_util.create_rhodecode_ui(section, '/test_delete')
287
288
288 app.post(
289 app.post(
289 url('admin_settings_vcs'),
290 url('admin_settings_vcs'),
290 params={
291 params={
291 '_method': 'delete',
292 '_method': 'delete',
292 'delete_svn_pattern': setting.ui_id,
293 'delete_svn_pattern': setting.ui_id,
293 'csrf_token': csrf_token},
294 'csrf_token': csrf_token},
294 status=400)
295 status=400)
295
296
296 def test_extensions_hgsubversion(self, app, form_defaults, csrf_token):
297 def test_extensions_hgsubversion(self, app, form_defaults, csrf_token):
297 form_defaults.update({
298 form_defaults.update({
298 'csrf_token': csrf_token,
299 'csrf_token': csrf_token,
299 'extensions_hgsubversion': 'True',
300 'extensions_hgsubversion': 'True',
300 })
301 })
301 response = app.post(
302 response = app.post(
302 url('admin_settings_vcs'),
303 url('admin_settings_vcs'),
303 params=form_defaults,
304 params=form_defaults,
304 status=302)
305 status=302)
305
306
306 response = response.follow()
307 response = response.follow()
307 extensions_input = (
308 extensions_input = (
308 '<input id="extensions_hgsubversion" '
309 '<input id="extensions_hgsubversion" '
309 'name="extensions_hgsubversion" type="checkbox" '
310 'name="extensions_hgsubversion" type="checkbox" '
310 'value="True" checked="checked" />')
311 'value="True" checked="checked" />')
311 response.mustcontain(extensions_input)
312 response.mustcontain(extensions_input)
312
313
313 def test_has_a_section_for_pull_request_settings(self, app):
314 def test_has_a_section_for_pull_request_settings(self, app):
314 response = app.get(url('admin_settings_vcs'))
315 response = app.get(url('admin_settings_vcs'))
315 response.mustcontain('Pull Request Settings')
316 response.mustcontain('Pull Request Settings')
316
317
317 def test_has_an_input_for_invalidation_of_inline_comments(
318 def test_has_an_input_for_invalidation_of_inline_comments(
318 self, app):
319 self, app):
319 response = app.get(url('admin_settings_vcs'))
320 response = app.get(url('admin_settings_vcs'))
320 assert_response = AssertResponse(response)
321 assert_response = AssertResponse(response)
321 assert_response.one_element_exists(
322 assert_response.one_element_exists(
322 '[name=rhodecode_use_outdated_comments]')
323 '[name=rhodecode_use_outdated_comments]')
323
324
324 @pytest.mark.parametrize('new_value', [True, False])
325 @pytest.mark.parametrize('new_value', [True, False])
325 def test_allows_to_change_invalidation_of_inline_comments(
326 def test_allows_to_change_invalidation_of_inline_comments(
326 self, app, form_defaults, csrf_token, new_value):
327 self, app, form_defaults, csrf_token, new_value):
327 setting_key = 'use_outdated_comments'
328 setting_key = 'use_outdated_comments'
328 setting = SettingsModel().create_or_update_setting(
329 setting = SettingsModel().create_or_update_setting(
329 setting_key, not new_value, 'bool')
330 setting_key, not new_value, 'bool')
330 Session().add(setting)
331 Session().add(setting)
331 Session().commit()
332 Session().commit()
332
333
333 form_defaults.update({
334 form_defaults.update({
334 'csrf_token': csrf_token,
335 'csrf_token': csrf_token,
335 'rhodecode_use_outdated_comments': str(new_value),
336 'rhodecode_use_outdated_comments': str(new_value),
336 })
337 })
337 response = app.post(
338 response = app.post(
338 url('admin_settings_vcs'),
339 url('admin_settings_vcs'),
339 params=form_defaults,
340 params=form_defaults,
340 status=302)
341 status=302)
341 response = response.follow()
342 response = response.follow()
342 setting = SettingsModel().get_setting_by_name(setting_key)
343 setting = SettingsModel().get_setting_by_name(setting_key)
343 assert setting.app_settings_value is new_value
344 assert setting.app_settings_value is new_value
344
345
345 @pytest.fixture
346 @pytest.fixture
346 def disable_sql_cache(self, request):
347 def disable_sql_cache(self, request):
347 patcher = mock.patch(
348 patcher = mock.patch(
348 'rhodecode.lib.caching_query.FromCache.process_query')
349 'rhodecode.lib.caching_query.FromCache.process_query')
349 request.addfinalizer(patcher.stop)
350 request.addfinalizer(patcher.stop)
350 patcher.start()
351 patcher.start()
351
352
352 @pytest.fixture
353 @pytest.fixture
353 def form_defaults(self):
354 def form_defaults(self):
354 from rhodecode.controllers.admin.settings import SettingsController
355 from rhodecode.controllers.admin.settings import SettingsController
355 controller = SettingsController()
356 controller = SettingsController()
356 return controller._form_defaults()
357 return controller._form_defaults()
357
358
358 # TODO: johbo: What we really want is to checkpoint before a test run and
359 # TODO: johbo: What we really want is to checkpoint before a test run and
359 # reset the session afterwards.
360 # reset the session afterwards.
360 @pytest.fixture(scope='class', autouse=True)
361 @pytest.fixture(scope='class', autouse=True)
361 def cleanup_settings(self, request, pylonsapp):
362 def cleanup_settings(self, request, pylonsapp):
362 ui_id = RhodeCodeUi.ui_id
363 ui_id = RhodeCodeUi.ui_id
363 original_ids = list(
364 original_ids = list(
364 r.ui_id for r in RhodeCodeUi.query().values(ui_id))
365 r.ui_id for r in RhodeCodeUi.query().values(ui_id))
365
366
366 @request.addfinalizer
367 @request.addfinalizer
367 def cleanup():
368 def cleanup():
368 RhodeCodeUi.query().filter(
369 RhodeCodeUi.query().filter(
369 ui_id.notin_(original_ids)).delete(False)
370 ui_id.notin_(original_ids)).delete(False)
370
371
371
372
372 @pytest.mark.usefixtures('autologin_user', 'app')
373 @pytest.mark.usefixtures('autologin_user', 'app')
373 class TestLabsSettings(object):
374 class TestLabsSettings(object):
374 def test_get_settings_page_disabled(self):
375 def test_get_settings_page_disabled(self):
375 with mock.patch.dict(rhodecode.CONFIG,
376 with mock.patch.dict(rhodecode.CONFIG,
376 {'labs_settings_active': 'false'}):
377 {'labs_settings_active': 'false'}):
377 response = self.app.get(url('admin_settings_labs'), status=302)
378 response = self.app.get(url('admin_settings_labs'), status=302)
378
379
379 assert response.location.endswith(url('admin_settings'))
380 assert response.location.endswith(url('admin_settings'))
380
381
381 def test_get_settings_page_enabled(self):
382 def test_get_settings_page_enabled(self):
382 from rhodecode.controllers.admin import settings
383 from rhodecode.controllers.admin import settings
383 lab_settings = [
384 lab_settings = [
384 settings.LabSetting(
385 settings.LabSetting(
385 key='rhodecode_bool',
386 key='rhodecode_bool',
386 type='bool',
387 type='bool',
387 group='bool group',
388 group='bool group',
388 label='bool label',
389 label='bool label',
389 help='bool help'
390 help='bool help'
390 ),
391 ),
391 settings.LabSetting(
392 settings.LabSetting(
392 key='rhodecode_text',
393 key='rhodecode_text',
393 type='unicode',
394 type='unicode',
394 group='text group',
395 group='text group',
395 label='text label',
396 label='text label',
396 help='text help'
397 help='text help'
397 ),
398 ),
398 ]
399 ]
399 with mock.patch.dict(rhodecode.CONFIG,
400 with mock.patch.dict(rhodecode.CONFIG,
400 {'labs_settings_active': 'true'}):
401 {'labs_settings_active': 'true'}):
401 with mock.patch.object(settings, '_LAB_SETTINGS', lab_settings):
402 with mock.patch.object(settings, '_LAB_SETTINGS', lab_settings):
402 response = self.app.get(url('admin_settings_labs'))
403 response = self.app.get(url('admin_settings_labs'))
403
404
404 assert '<label>bool group:</label>' in response
405 assert '<label>bool group:</label>' in response
405 assert '<label for="rhodecode_bool">bool label</label>' in response
406 assert '<label for="rhodecode_bool">bool label</label>' in response
406 assert '<p class="help-block">bool help</p>' in response
407 assert '<p class="help-block">bool help</p>' in response
407 assert 'name="rhodecode_bool" type="checkbox"' in response
408 assert 'name="rhodecode_bool" type="checkbox"' in response
408
409
409 assert '<label>text group:</label>' in response
410 assert '<label>text group:</label>' in response
410 assert '<label for="rhodecode_text">text label</label>' in response
411 assert '<label for="rhodecode_text">text label</label>' in response
411 assert '<p class="help-block">text help</p>' in response
412 assert '<p class="help-block">text help</p>' in response
412 assert 'name="rhodecode_text" size="60" type="text"' in response
413 assert 'name="rhodecode_text" size="60" type="text"' in response
413
414
414 @pytest.mark.parametrize('setting_name', [
415 @pytest.mark.parametrize('setting_name', [
415 'proxy_subversion_http_requests',
416 'proxy_subversion_http_requests',
416 'hg_use_rebase_for_merging',
417 'hg_use_rebase_for_merging',
417 ])
418 ])
418 def test_update_boolean_settings(self, csrf_token, setting_name):
419 def test_update_boolean_settings(self, csrf_token, setting_name):
419 self.app.post(
420 self.app.post(
420 url('admin_settings_labs'),
421 url('admin_settings_labs'),
421 params={
422 params={
422 'rhodecode_{}'.format(setting_name): 'true',
423 'rhodecode_{}'.format(setting_name): 'true',
423 'csrf_token': csrf_token,
424 'csrf_token': csrf_token,
424 })
425 })
425 setting = SettingsModel().get_setting_by_name(setting_name)
426 setting = SettingsModel().get_setting_by_name(setting_name)
426 assert setting.app_settings_value
427 assert setting.app_settings_value
427
428
428 self.app.post(
429 self.app.post(
429 url('admin_settings_labs'),
430 url('admin_settings_labs'),
430 params={
431 params={
431 'rhodecode_{}'.format(setting_name): 'false',
432 'rhodecode_{}'.format(setting_name): 'false',
432 'csrf_token': csrf_token,
433 'csrf_token': csrf_token,
433 })
434 })
434 setting = SettingsModel().get_setting_by_name(setting_name)
435 setting = SettingsModel().get_setting_by_name(setting_name)
435 assert not setting.app_settings_value
436 assert not setting.app_settings_value
436
437
437 @pytest.mark.parametrize('setting_name', [
438 @pytest.mark.parametrize('setting_name', [
438 'subversion_http_server_url',
439 'subversion_http_server_url',
439 ])
440 ])
440 def test_update_string_settings(self, csrf_token, setting_name):
441 def test_update_string_settings(self, csrf_token, setting_name):
441 self.app.post(
442 self.app.post(
442 url('admin_settings_labs'),
443 url('admin_settings_labs'),
443 params={
444 params={
444 'rhodecode_{}'.format(setting_name): 'Test 1',
445 'rhodecode_{}'.format(setting_name): 'Test 1',
445 'csrf_token': csrf_token,
446 'csrf_token': csrf_token,
446 })
447 })
447 setting = SettingsModel().get_setting_by_name(setting_name)
448 setting = SettingsModel().get_setting_by_name(setting_name)
448 assert setting.app_settings_value == 'Test 1'
449 assert setting.app_settings_value == 'Test 1'
449
450
450 self.app.post(
451 self.app.post(
451 url('admin_settings_labs'),
452 url('admin_settings_labs'),
452 params={
453 params={
453 'rhodecode_{}'.format(setting_name): ' Test 2 ',
454 'rhodecode_{}'.format(setting_name): ' Test 2 ',
454 'csrf_token': csrf_token,
455 'csrf_token': csrf_token,
455 })
456 })
456 setting = SettingsModel().get_setting_by_name(setting_name)
457 setting = SettingsModel().get_setting_by_name(setting_name)
457 assert setting.app_settings_value == 'Test 2'
458 assert setting.app_settings_value == 'Test 2'
458
459
459
460
460 @pytest.mark.usefixtures('app')
461 @pytest.mark.usefixtures('app')
461 class TestOpenSourceLicenses(object):
462 class TestOpenSourceLicenses(object):
462 def test_records_are_displayed(self, autologin_user):
463 def test_records_are_displayed(self, autologin_user):
463 sample_licenses = {
464 sample_licenses = {
464 "python2.7-pytest-2.7.1": {
465 "python2.7-pytest-2.7.1": {
465 "UNKNOWN": None
466 "UNKNOWN": None
466 },
467 },
467 "python2.7-Markdown-2.6.2": {
468 "python2.7-Markdown-2.6.2": {
468 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
469 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
469 }
470 }
470 }
471 }
471 read_licenses_patch = mock.patch(
472 read_licenses_patch = mock.patch(
472 'rhodecode.controllers.admin.settings.read_opensource_licenses',
473 'rhodecode.controllers.admin.settings.read_opensource_licenses',
473 return_value=sample_licenses)
474 return_value=sample_licenses)
474 with read_licenses_patch:
475 with read_licenses_patch:
475 response = self.app.get(
476 response = self.app.get(
476 url('admin_settings_open_source'), status=200)
477 url('admin_settings_open_source'), status=200)
477
478
478 assert_response = AssertResponse(response)
479 assert_response = AssertResponse(response)
479 assert_response.element_contains(
480 assert_response.element_contains(
480 '.panel-heading', 'Licenses of Third Party Packages')
481 '.panel-heading', 'Licenses of Third Party Packages')
481 for name in sample_licenses:
482 for name in sample_licenses:
482 response.mustcontain(name)
483 response.mustcontain(name)
483 for license in sample_licenses[name]:
484 for license in sample_licenses[name]:
484 assert_response.element_contains('.panel-body', license)
485 assert_response.element_contains('.panel-body', license)
485
486
486 def test_records_can_be_read(self, autologin_user):
487 def test_records_can_be_read(self, autologin_user):
487 response = self.app.get(url('admin_settings_open_source'), status=200)
488 response = self.app.get(url('admin_settings_open_source'), status=200)
488 assert_response = AssertResponse(response)
489 assert_response = AssertResponse(response)
489 assert_response.element_contains(
490 assert_response.element_contains(
490 '.panel-heading', 'Licenses of Third Party Packages')
491 '.panel-heading', 'Licenses of Third Party Packages')
491
492
492 def test_forbidden_when_normal_user(self, autologin_regular_user):
493 def test_forbidden_when_normal_user(self, autologin_regular_user):
493 self.app.get(
494 self.app.get(
494 url('admin_settings_open_source'), status=403)
495 url('admin_settings_open_source'), status=403)
495
496
496
497
497 @pytest.mark.usefixtures("app")
498 @pytest.mark.usefixtures("app")
498 class TestAdminSettingsIssueTracker:
499 class TestAdminSettingsIssueTracker:
499 RC_PREFIX = 'rhodecode_'
500 RC_PREFIX = 'rhodecode_'
500 SHORT_PATTERN_KEY = 'issuetracker_pat_'
501 SHORT_PATTERN_KEY = 'issuetracker_pat_'
501 PATTERN_KEY = RC_PREFIX + SHORT_PATTERN_KEY
502 PATTERN_KEY = RC_PREFIX + SHORT_PATTERN_KEY
502
503
503 def test_issuetracker_index(self, autologin_user):
504 def test_issuetracker_index(self, autologin_user):
504 response = self.app.get(url('admin_settings_issuetracker'))
505 response = self.app.get(url('admin_settings_issuetracker'))
505 assert response.status_code == 200
506 assert response.status_code == 200
506
507
507 def test_add_issuetracker_pattern(
508 def test_add_issuetracker_pattern(
508 self, request, autologin_user, csrf_token):
509 self, request, autologin_user, csrf_token):
509 pattern = 'issuetracker_pat'
510 pattern = 'issuetracker_pat'
510 another_pattern = pattern+'1'
511 another_pattern = pattern+'1'
511 post_url = url('admin_settings_issuetracker_save')
512 post_url = url('admin_settings_issuetracker_save')
512 post_data = {
513 post_data = {
513 'new_pattern_pattern_0': pattern,
514 'new_pattern_pattern_0': pattern,
514 'new_pattern_url_0': 'url',
515 'new_pattern_url_0': 'url',
515 'new_pattern_prefix_0': 'prefix',
516 'new_pattern_prefix_0': 'prefix',
516 'new_pattern_description_0': 'description',
517 'new_pattern_description_0': 'description',
517 'new_pattern_pattern_1': another_pattern,
518 'new_pattern_pattern_1': another_pattern,
518 'new_pattern_url_1': 'url1',
519 'new_pattern_url_1': 'url1',
519 'new_pattern_prefix_1': 'prefix1',
520 'new_pattern_prefix_1': 'prefix1',
520 'new_pattern_description_1': 'description1',
521 'new_pattern_description_1': 'description1',
521 'csrf_token': csrf_token
522 'csrf_token': csrf_token
522 }
523 }
523 self.app.post(post_url, post_data, status=302)
524 self.app.post(post_url, post_data, status=302)
524 settings = SettingsModel().get_all_settings()
525 settings = SettingsModel().get_all_settings()
525 self.uid = md5(pattern)
526 self.uid = md5(pattern)
526 assert settings[self.PATTERN_KEY+self.uid] == pattern
527 assert settings[self.PATTERN_KEY+self.uid] == pattern
527 self.another_uid = md5(another_pattern)
528 self.another_uid = md5(another_pattern)
528 assert settings[self.PATTERN_KEY+self.another_uid] == another_pattern
529 assert settings[self.PATTERN_KEY+self.another_uid] == another_pattern
529
530
530 @request.addfinalizer
531 @request.addfinalizer
531 def cleanup():
532 def cleanup():
532 defaults = SettingsModel().get_all_settings()
533 defaults = SettingsModel().get_all_settings()
533
534
534 entries = [name for name in defaults if (
535 entries = [name for name in defaults if (
535 (self.uid in name) or (self.another_uid) in name)]
536 (self.uid in name) or (self.another_uid) in name)]
536 start = len(self.RC_PREFIX)
537 start = len(self.RC_PREFIX)
537 for del_key in entries:
538 for del_key in entries:
538 # TODO: anderson: get_by_name needs name without prefix
539 # TODO: anderson: get_by_name needs name without prefix
539 entry = SettingsModel().get_setting_by_name(del_key[start:])
540 entry = SettingsModel().get_setting_by_name(del_key[start:])
540 Session().delete(entry)
541 Session().delete(entry)
541
542
542 Session().commit()
543 Session().commit()
543
544
544 def test_edit_issuetracker_pattern(
545 def test_edit_issuetracker_pattern(
545 self, autologin_user, backend, csrf_token, request):
546 self, autologin_user, backend, csrf_token, request):
546 old_pattern = 'issuetracker_pat'
547 old_pattern = 'issuetracker_pat'
547 old_uid = md5(old_pattern)
548 old_uid = md5(old_pattern)
548 pattern = 'issuetracker_pat_new'
549 pattern = 'issuetracker_pat_new'
549 self.new_uid = md5(pattern)
550 self.new_uid = md5(pattern)
550
551
551 SettingsModel().create_or_update_setting(
552 SettingsModel().create_or_update_setting(
552 self.SHORT_PATTERN_KEY+old_uid, old_pattern, 'unicode')
553 self.SHORT_PATTERN_KEY+old_uid, old_pattern, 'unicode')
553
554
554 post_url = url('admin_settings_issuetracker_save')
555 post_url = url('admin_settings_issuetracker_save')
555 post_data = {
556 post_data = {
556 'new_pattern_pattern_0': pattern,
557 'new_pattern_pattern_0': pattern,
557 'new_pattern_url_0': 'url',
558 'new_pattern_url_0': 'url',
558 'new_pattern_prefix_0': 'prefix',
559 'new_pattern_prefix_0': 'prefix',
559 'new_pattern_description_0': 'description',
560 'new_pattern_description_0': 'description',
560 'uid': old_uid,
561 'uid': old_uid,
561 'csrf_token': csrf_token
562 'csrf_token': csrf_token
562 }
563 }
563 self.app.post(post_url, post_data, status=302)
564 self.app.post(post_url, post_data, status=302)
564 settings = SettingsModel().get_all_settings()
565 settings = SettingsModel().get_all_settings()
565 assert settings[self.PATTERN_KEY+self.new_uid] == pattern
566 assert settings[self.PATTERN_KEY+self.new_uid] == pattern
566 assert self.PATTERN_KEY+old_uid not in settings
567 assert self.PATTERN_KEY+old_uid not in settings
567
568
568 @request.addfinalizer
569 @request.addfinalizer
569 def cleanup():
570 def cleanup():
570 IssueTrackerSettingsModel().delete_entries(self.new_uid)
571 IssueTrackerSettingsModel().delete_entries(self.new_uid)
571
572
572 def test_replace_issuetracker_pattern_description(
573 def test_replace_issuetracker_pattern_description(
573 self, autologin_user, csrf_token, request, settings_util):
574 self, autologin_user, csrf_token, request, settings_util):
574 prefix = 'issuetracker'
575 prefix = 'issuetracker'
575 pattern = 'issuetracker_pat'
576 pattern = 'issuetracker_pat'
576 self.uid = md5(pattern)
577 self.uid = md5(pattern)
577 pattern_key = '_'.join([prefix, 'pat', self.uid])
578 pattern_key = '_'.join([prefix, 'pat', self.uid])
578 rc_pattern_key = '_'.join(['rhodecode', pattern_key])
579 rc_pattern_key = '_'.join(['rhodecode', pattern_key])
579 desc_key = '_'.join([prefix, 'desc', self.uid])
580 desc_key = '_'.join([prefix, 'desc', self.uid])
580 rc_desc_key = '_'.join(['rhodecode', desc_key])
581 rc_desc_key = '_'.join(['rhodecode', desc_key])
581 new_description = 'new_description'
582 new_description = 'new_description'
582
583
583 settings_util.create_rhodecode_setting(
584 settings_util.create_rhodecode_setting(
584 pattern_key, pattern, 'unicode', cleanup=False)
585 pattern_key, pattern, 'unicode', cleanup=False)
585 settings_util.create_rhodecode_setting(
586 settings_util.create_rhodecode_setting(
586 desc_key, 'old description', 'unicode', cleanup=False)
587 desc_key, 'old description', 'unicode', cleanup=False)
587
588
588 post_url = url('admin_settings_issuetracker_save')
589 post_url = url('admin_settings_issuetracker_save')
589 post_data = {
590 post_data = {
590 'new_pattern_pattern_0': pattern,
591 'new_pattern_pattern_0': pattern,
591 'new_pattern_url_0': 'url',
592 'new_pattern_url_0': 'url',
592 'new_pattern_prefix_0': 'prefix',
593 'new_pattern_prefix_0': 'prefix',
593 'new_pattern_description_0': new_description,
594 'new_pattern_description_0': new_description,
594 'uid': self.uid,
595 'uid': self.uid,
595 'csrf_token': csrf_token
596 'csrf_token': csrf_token
596 }
597 }
597 self.app.post(post_url, post_data, status=302)
598 self.app.post(post_url, post_data, status=302)
598 settings = SettingsModel().get_all_settings()
599 settings = SettingsModel().get_all_settings()
599 assert settings[rc_pattern_key] == pattern
600 assert settings[rc_pattern_key] == pattern
600 assert settings[rc_desc_key] == new_description
601 assert settings[rc_desc_key] == new_description
601
602
602 @request.addfinalizer
603 @request.addfinalizer
603 def cleanup():
604 def cleanup():
604 IssueTrackerSettingsModel().delete_entries(self.uid)
605 IssueTrackerSettingsModel().delete_entries(self.uid)
605
606
606 def test_delete_issuetracker_pattern(
607 def test_delete_issuetracker_pattern(
607 self, autologin_user, backend, csrf_token, settings_util):
608 self, autologin_user, backend, csrf_token, settings_util):
608 pattern = 'issuetracker_pat'
609 pattern = 'issuetracker_pat'
609 uid = md5(pattern)
610 uid = md5(pattern)
610 settings_util.create_rhodecode_setting(
611 settings_util.create_rhodecode_setting(
611 self.SHORT_PATTERN_KEY+uid, pattern, 'unicode', cleanup=False)
612 self.SHORT_PATTERN_KEY+uid, pattern, 'unicode', cleanup=False)
612
613
613 post_url = url('admin_issuetracker_delete')
614 post_url = url('admin_issuetracker_delete')
614 post_data = {
615 post_data = {
615 '_method': 'delete',
616 '_method': 'delete',
616 'uid': uid,
617 'uid': uid,
617 'csrf_token': csrf_token
618 'csrf_token': csrf_token
618 }
619 }
619 self.app.post(post_url, post_data, status=302)
620 self.app.post(post_url, post_data, status=302)
620 settings = SettingsModel().get_all_settings()
621 settings = SettingsModel().get_all_settings()
621 assert 'rhodecode_%s%s' % (self.SHORT_PATTERN_KEY, uid) not in settings
622 assert 'rhodecode_%s%s' % (self.SHORT_PATTERN_KEY, uid) not in settings
@@ -1,511 +1,519 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import urlparse
21 import urlparse
22
22
23 import mock
23 import mock
24 import pytest
24 import pytest
25
25
26 from rhodecode.config.routing import ADMIN_PREFIX
26 from rhodecode.tests import (
27 from rhodecode.tests import (
27 assert_session_flash, url, HG_REPO, TEST_USER_ADMIN_LOGIN)
28 assert_session_flash, url, HG_REPO, TEST_USER_ADMIN_LOGIN)
28 from rhodecode.tests.fixture import Fixture
29 from rhodecode.tests.fixture import Fixture
30 from rhodecode.tests.utils import AssertResponse, get_session_from_response
29 from rhodecode.lib.auth import check_password, generate_auth_token
31 from rhodecode.lib.auth import check_password, generate_auth_token
30 from rhodecode.lib import helpers as h
32 from rhodecode.lib import helpers as h
31 from rhodecode.model.auth_token import AuthTokenModel
33 from rhodecode.model.auth_token import AuthTokenModel
32 from rhodecode.model import validators
34 from rhodecode.model import validators
33 from rhodecode.model.db import User, Notification
35 from rhodecode.model.db import User, Notification
34 from rhodecode.model.meta import Session
36 from rhodecode.model.meta import Session
35
37
36 fixture = Fixture()
38 fixture = Fixture()
37
39
40 # Hardcode URLs because we don't have a request object to use
41 # pyramids URL generation methods.
42 login_url = ADMIN_PREFIX + '/login'
43 logut_url = ADMIN_PREFIX + '/logout'
44 register_url = ADMIN_PREFIX + '/register'
45 pwd_reset_url = ADMIN_PREFIX + '/password_reset'
46 pwd_reset_confirm_url = ADMIN_PREFIX + '/password_reset_confirmation'
47
38
48
39 @pytest.mark.usefixtures('app')
49 @pytest.mark.usefixtures('app')
40 class TestLoginController:
50 class TestLoginController:
41 destroy_users = set()
51 destroy_users = set()
42
52
43 @classmethod
53 @classmethod
44 def teardown_class(cls):
54 def teardown_class(cls):
45 fixture.destroy_users(cls.destroy_users)
55 fixture.destroy_users(cls.destroy_users)
46
56
47 def teardown_method(self, method):
57 def teardown_method(self, method):
48 for n in Notification.query().all():
58 for n in Notification.query().all():
49 Session().delete(n)
59 Session().delete(n)
50
60
51 Session().commit()
61 Session().commit()
52 assert Notification.query().all() == []
62 assert Notification.query().all() == []
53
63
54 def test_index(self):
64 def test_index(self):
55 response = self.app.get(url(controller='login', action='index'))
65 response = self.app.get(login_url)
56 assert response.status == '200 OK'
66 assert response.status == '200 OK'
57 # Test response...
67 # Test response...
58
68
59 def test_login_admin_ok(self):
69 def test_login_admin_ok(self):
60 response = self.app.post(url(controller='login', action='index'),
70 response = self.app.post(login_url,
61 {'username': 'test_admin',
71 {'username': 'test_admin',
62 'password': 'test12'})
72 'password': 'test12'})
63 assert response.status == '302 Found'
73 assert response.status == '302 Found'
64 username = response.session['rhodecode_user'].get('username')
74 session = get_session_from_response(response)
75 username = session['rhodecode_user'].get('username')
65 assert username == 'test_admin'
76 assert username == 'test_admin'
66 response = response.follow()
77 response = response.follow()
67 response.mustcontain('/%s' % HG_REPO)
78 response.mustcontain('/%s' % HG_REPO)
68
79
69 def test_login_regular_ok(self):
80 def test_login_regular_ok(self):
70 response = self.app.post(url(controller='login', action='index'),
81 response = self.app.post(login_url,
71 {'username': 'test_regular',
82 {'username': 'test_regular',
72 'password': 'test12'})
83 'password': 'test12'})
73
84
74 assert response.status == '302 Found'
85 assert response.status == '302 Found'
75 username = response.session['rhodecode_user'].get('username')
86 session = get_session_from_response(response)
87 username = session['rhodecode_user'].get('username')
76 assert username == 'test_regular'
88 assert username == 'test_regular'
77 response = response.follow()
89 response = response.follow()
78 response.mustcontain('/%s' % HG_REPO)
90 response.mustcontain('/%s' % HG_REPO)
79
91
80 def test_login_ok_came_from(self):
92 def test_login_ok_came_from(self):
81 test_came_from = '/_admin/users?branch=stable'
93 test_came_from = '/_admin/users?branch=stable'
82 response = self.app.post(url(controller='login', action='index',
94 _url = '{}?came_from={}'.format(login_url, test_came_from)
83 came_from=test_came_from),
95 response = self.app.post(
84 {'username': 'test_admin',
96 _url, {'username': 'test_admin', 'password': 'test12'})
85 'password': 'test12'})
86 assert response.status == '302 Found'
97 assert response.status == '302 Found'
87 assert 'branch=stable' in response.location
98 assert 'branch=stable' in response.location
88 response = response.follow()
99 response = response.follow()
89
100
90 assert response.status == '200 OK'
101 assert response.status == '200 OK'
91 response.mustcontain('Users administration')
102 response.mustcontain('Users administration')
92
103
93 def test_redirect_to_login_with_get_args(self):
104 def test_redirect_to_login_with_get_args(self):
94 with fixture.anon_access(False):
105 with fixture.anon_access(False):
95 kwargs = {'branch': 'stable'}
106 kwargs = {'branch': 'stable'}
96 response = self.app.get(
107 response = self.app.get(
97 url('summary_home', repo_name=HG_REPO, **kwargs))
108 url('summary_home', repo_name=HG_REPO, **kwargs))
98 assert response.status == '302 Found'
109 assert response.status == '302 Found'
99 response_query = urlparse.parse_qsl(response.location)
110 response_query = urlparse.parse_qsl(response.location)
100 assert 'branch=stable' in response_query[0][1]
111 assert 'branch=stable' in response_query[0][1]
101
112
102 def test_login_form_with_get_args(self):
113 def test_login_form_with_get_args(self):
103 kwargs = {'branch': 'stable'}
114 _url = '{}?came_from=/_admin/users,branch=stable'.format(login_url)
104 response = self.app.get(
115 response = self.app.get(_url)
105 url(controller='login', action='index',
116 assert 'branch%3Dstable' in response.form.action
106 came_from='/_admin/users', **kwargs))
107 assert 'branch=stable' in response.form.action
108
117
109 @pytest.mark.parametrize("url_came_from", [
118 @pytest.mark.parametrize("url_came_from", [
110 ('data:text/html,<script>window.alert("xss")</script>',),
119 'data:text/html,<script>window.alert("xss")</script>',
111 ('mailto:test@rhodecode.org',),
120 'mailto:test@rhodecode.org',
112 ('file:///etc/passwd',),
121 'file:///etc/passwd',
113 ('ftp://some.ftp.server',),
122 'ftp://some.ftp.server',
114 ('http://other.domain',),
123 'http://other.domain',
115 ('/\r\nX-Forwarded-Host: http://example.org',),
124 '/\r\nX-Forwarded-Host: http://example.org',
116 ])
125 ])
117 def test_login_bad_came_froms(self, url_came_from):
126 def test_login_bad_came_froms(self, url_came_from):
118 response = self.app.post(url(controller='login', action='index',
127 _url = '{}?came_from={}'.format(login_url, url_came_from)
119 came_from=url_came_from),
128 response = self.app.post(
120 {'username': 'test_admin',
129 _url,
121 'password': 'test12'})
130 {'username': 'test_admin', 'password': 'test12'})
122 assert response.status == '302 Found'
131 assert response.status == '302 Found'
123 assert response.tmpl_context.came_from == '/'
124
125 response = response.follow()
132 response = response.follow()
126 assert response.status == '200 OK'
133 assert response.status == '200 OK'
134 assert response.request.path == '/'
127
135
128 def test_login_short_password(self):
136 def test_login_short_password(self):
129 response = self.app.post(url(controller='login', action='index'),
137 response = self.app.post(login_url,
130 {'username': 'test_admin',
138 {'username': 'test_admin',
131 'password': 'as'})
139 'password': 'as'})
132 assert response.status == '200 OK'
140 assert response.status == '200 OK'
133
141
134 response.mustcontain('Enter 3 characters or more')
142 response.mustcontain('Enter 3 characters or more')
135
143
136 def test_login_wrong_non_ascii_password(self, user_regular):
144 def test_login_wrong_non_ascii_password(self, user_regular):
137 response = self.app.post(
145 response = self.app.post(
138 url(controller='login', action='index'),
146 login_url,
139 {'username': user_regular.username,
147 {'username': user_regular.username,
140 'password': u'invalid-non-asci\xe4'.encode('utf8')})
148 'password': u'invalid-non-asci\xe4'.encode('utf8')})
141
149
142 response.mustcontain('invalid user name')
150 response.mustcontain('invalid user name')
143 response.mustcontain('invalid password')
151 response.mustcontain('invalid password')
144
152
145 def test_login_with_non_ascii_password(self, user_util):
153 def test_login_with_non_ascii_password(self, user_util):
146 password = u'valid-non-ascii\xe4'
154 password = u'valid-non-ascii\xe4'
147 user = user_util.create_user(password=password)
155 user = user_util.create_user(password=password)
148 response = self.app.post(
156 response = self.app.post(
149 url(controller='login', action='index'),
157 login_url,
150 {'username': user.username,
158 {'username': user.username,
151 'password': password.encode('utf-8')})
159 'password': password.encode('utf-8')})
152 assert response.status_code == 302
160 assert response.status_code == 302
153
161
154 def test_login_wrong_username_password(self):
162 def test_login_wrong_username_password(self):
155 response = self.app.post(url(controller='login', action='index'),
163 response = self.app.post(login_url,
156 {'username': 'error',
164 {'username': 'error',
157 'password': 'test12'})
165 'password': 'test12'})
158
166
159 response.mustcontain('invalid user name')
167 response.mustcontain('invalid user name')
160 response.mustcontain('invalid password')
168 response.mustcontain('invalid password')
161
169
162 def test_login_admin_ok_password_migration(self, real_crypto_backend):
170 def test_login_admin_ok_password_migration(self, real_crypto_backend):
163 from rhodecode.lib import auth
171 from rhodecode.lib import auth
164
172
165 # create new user, with sha256 password
173 # create new user, with sha256 password
166 temp_user = 'test_admin_sha256'
174 temp_user = 'test_admin_sha256'
167 user = fixture.create_user(temp_user)
175 user = fixture.create_user(temp_user)
168 user.password = auth._RhodeCodeCryptoSha256().hash_create(
176 user.password = auth._RhodeCodeCryptoSha256().hash_create(
169 b'test123')
177 b'test123')
170 Session().add(user)
178 Session().add(user)
171 Session().commit()
179 Session().commit()
172 self.destroy_users.add(temp_user)
180 self.destroy_users.add(temp_user)
173 response = self.app.post(url(controller='login', action='index'),
181 response = self.app.post(login_url,
174 {'username': temp_user,
182 {'username': temp_user,
175 'password': 'test123'})
183 'password': 'test123'})
176
184
177 assert response.status == '302 Found'
185 assert response.status == '302 Found'
178 username = response.session['rhodecode_user'].get('username')
186 session = get_session_from_response(response)
187 username = session['rhodecode_user'].get('username')
179 assert username == temp_user
188 assert username == temp_user
180 response = response.follow()
189 response = response.follow()
181 response.mustcontain('/%s' % HG_REPO)
190 response.mustcontain('/%s' % HG_REPO)
182
191
183 # new password should be bcrypted, after log-in and transfer
192 # new password should be bcrypted, after log-in and transfer
184 user = User.get_by_username(temp_user)
193 user = User.get_by_username(temp_user)
185 assert user.password.startswith('$')
194 assert user.password.startswith('$')
186
195
187 # REGISTRATIONS
196 # REGISTRATIONS
188 def test_register(self):
197 def test_register(self):
189 response = self.app.get(url(controller='login', action='register'))
198 response = self.app.get(register_url)
190 response.mustcontain('Create an Account')
199 response.mustcontain('Create an Account')
191
200
192 def test_register_err_same_username(self):
201 def test_register_err_same_username(self):
193 uname = 'test_admin'
202 uname = 'test_admin'
194 response = self.app.post(
203 response = self.app.post(
195 url(controller='login', action='register'),
204 register_url,
196 {
205 {
197 'username': uname,
206 'username': uname,
198 'password': 'test12',
207 'password': 'test12',
199 'password_confirmation': 'test12',
208 'password_confirmation': 'test12',
200 'email': 'goodmail@domain.com',
209 'email': 'goodmail@domain.com',
201 'firstname': 'test',
210 'firstname': 'test',
202 'lastname': 'test'
211 'lastname': 'test'
203 }
212 }
204 )
213 )
205
214
215 assertr = AssertResponse(response)
206 msg = validators.ValidUsername()._messages['username_exists']
216 msg = validators.ValidUsername()._messages['username_exists']
207 msg = h.html_escape(msg % {'username': uname})
217 msg = msg % {'username': uname}
208 response.mustcontain(msg)
218 assertr.element_contains('#username+.error-message', msg)
209
219
210 def test_register_err_same_email(self):
220 def test_register_err_same_email(self):
211 response = self.app.post(
221 response = self.app.post(
212 url(controller='login', action='register'),
222 register_url,
213 {
223 {
214 'username': 'test_admin_0',
224 'username': 'test_admin_0',
215 'password': 'test12',
225 'password': 'test12',
216 'password_confirmation': 'test12',
226 'password_confirmation': 'test12',
217 'email': 'test_admin@mail.com',
227 'email': 'test_admin@mail.com',
218 'firstname': 'test',
228 'firstname': 'test',
219 'lastname': 'test'
229 'lastname': 'test'
220 }
230 }
221 )
231 )
222
232
233 assertr = AssertResponse(response)
223 msg = validators.UniqSystemEmail()()._messages['email_taken']
234 msg = validators.UniqSystemEmail()()._messages['email_taken']
224 response.mustcontain(msg)
235 assertr.element_contains('#email+.error-message', msg)
225
236
226 def test_register_err_same_email_case_sensitive(self):
237 def test_register_err_same_email_case_sensitive(self):
227 response = self.app.post(
238 response = self.app.post(
228 url(controller='login', action='register'),
239 register_url,
229 {
240 {
230 'username': 'test_admin_1',
241 'username': 'test_admin_1',
231 'password': 'test12',
242 'password': 'test12',
232 'password_confirmation': 'test12',
243 'password_confirmation': 'test12',
233 'email': 'TesT_Admin@mail.COM',
244 'email': 'TesT_Admin@mail.COM',
234 'firstname': 'test',
245 'firstname': 'test',
235 'lastname': 'test'
246 'lastname': 'test'
236 }
247 }
237 )
248 )
249 assertr = AssertResponse(response)
238 msg = validators.UniqSystemEmail()()._messages['email_taken']
250 msg = validators.UniqSystemEmail()()._messages['email_taken']
239 response.mustcontain(msg)
251 assertr.element_contains('#email+.error-message', msg)
240
252
241 def test_register_err_wrong_data(self):
253 def test_register_err_wrong_data(self):
242 response = self.app.post(
254 response = self.app.post(
243 url(controller='login', action='register'),
255 register_url,
244 {
256 {
245 'username': 'xs',
257 'username': 'xs',
246 'password': 'test',
258 'password': 'test',
247 'password_confirmation': 'test',
259 'password_confirmation': 'test',
248 'email': 'goodmailm',
260 'email': 'goodmailm',
249 'firstname': 'test',
261 'firstname': 'test',
250 'lastname': 'test'
262 'lastname': 'test'
251 }
263 }
252 )
264 )
253 assert response.status == '200 OK'
265 assert response.status == '200 OK'
254 response.mustcontain('An email address must contain a single @')
266 response.mustcontain('An email address must contain a single @')
255 response.mustcontain('Enter a value 6 characters long or more')
267 response.mustcontain('Enter a value 6 characters long or more')
256
268
257 def test_register_err_username(self):
269 def test_register_err_username(self):
258 response = self.app.post(
270 response = self.app.post(
259 url(controller='login', action='register'),
271 register_url,
260 {
272 {
261 'username': 'error user',
273 'username': 'error user',
262 'password': 'test12',
274 'password': 'test12',
263 'password_confirmation': 'test12',
275 'password_confirmation': 'test12',
264 'email': 'goodmailm',
276 'email': 'goodmailm',
265 'firstname': 'test',
277 'firstname': 'test',
266 'lastname': 'test'
278 'lastname': 'test'
267 }
279 }
268 )
280 )
269
281
270 response.mustcontain('An email address must contain a single @')
282 response.mustcontain('An email address must contain a single @')
271 response.mustcontain(
283 response.mustcontain(
272 'Username may only contain '
284 'Username may only contain '
273 'alphanumeric characters underscores, '
285 'alphanumeric characters underscores, '
274 'periods or dashes and must begin with '
286 'periods or dashes and must begin with '
275 'alphanumeric character')
287 'alphanumeric character')
276
288
277 def test_register_err_case_sensitive(self):
289 def test_register_err_case_sensitive(self):
278 usr = 'Test_Admin'
290 usr = 'Test_Admin'
279 response = self.app.post(
291 response = self.app.post(
280 url(controller='login', action='register'),
292 register_url,
281 {
293 {
282 'username': usr,
294 'username': usr,
283 'password': 'test12',
295 'password': 'test12',
284 'password_confirmation': 'test12',
296 'password_confirmation': 'test12',
285 'email': 'goodmailm',
297 'email': 'goodmailm',
286 'firstname': 'test',
298 'firstname': 'test',
287 'lastname': 'test'
299 'lastname': 'test'
288 }
300 }
289 )
301 )
290
302
291 response.mustcontain('An email address must contain a single @')
303 assertr = AssertResponse(response)
292 msg = validators.ValidUsername()._messages['username_exists']
304 msg = validators.ValidUsername()._messages['username_exists']
293 msg = h.html_escape(msg % {'username': usr})
305 msg = msg % {'username': usr}
294 response.mustcontain(msg)
306 assertr.element_contains('#username+.error-message', msg)
295
307
296 def test_register_special_chars(self):
308 def test_register_special_chars(self):
297 response = self.app.post(
309 response = self.app.post(
298 url(controller='login', action='register'),
310 register_url,
299 {
311 {
300 'username': 'xxxaxn',
312 'username': 'xxxaxn',
301 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
313 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
302 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
314 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
303 'email': 'goodmailm@test.plx',
315 'email': 'goodmailm@test.plx',
304 'firstname': 'test',
316 'firstname': 'test',
305 'lastname': 'test'
317 'lastname': 'test'
306 }
318 }
307 )
319 )
308
320
309 msg = validators.ValidPassword()._messages['invalid_password']
321 msg = validators.ValidPassword()._messages['invalid_password']
310 response.mustcontain(msg)
322 response.mustcontain(msg)
311
323
312 def test_register_password_mismatch(self):
324 def test_register_password_mismatch(self):
313 response = self.app.post(
325 response = self.app.post(
314 url(controller='login', action='register'),
326 register_url,
315 {
327 {
316 'username': 'xs',
328 'username': 'xs',
317 'password': '123qwe',
329 'password': '123qwe',
318 'password_confirmation': 'qwe123',
330 'password_confirmation': 'qwe123',
319 'email': 'goodmailm@test.plxa',
331 'email': 'goodmailm@test.plxa',
320 'firstname': 'test',
332 'firstname': 'test',
321 'lastname': 'test'
333 'lastname': 'test'
322 }
334 }
323 )
335 )
324 msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
336 msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
325 response.mustcontain(msg)
337 response.mustcontain(msg)
326
338
327 def test_register_ok(self):
339 def test_register_ok(self):
328 username = 'test_regular4'
340 username = 'test_regular4'
329 password = 'qweqwe'
341 password = 'qweqwe'
330 email = 'marcin@test.com'
342 email = 'marcin@test.com'
331 name = 'testname'
343 name = 'testname'
332 lastname = 'testlastname'
344 lastname = 'testlastname'
333
345
334 response = self.app.post(
346 response = self.app.post(
335 url(controller='login', action='register'),
347 register_url,
336 {
348 {
337 'username': username,
349 'username': username,
338 'password': password,
350 'password': password,
339 'password_confirmation': password,
351 'password_confirmation': password,
340 'email': email,
352 'email': email,
341 'firstname': name,
353 'firstname': name,
342 'lastname': lastname,
354 'lastname': lastname,
343 'admin': True
355 'admin': True
344 }
356 }
345 ) # This should be overriden
357 ) # This should be overriden
346 assert response.status == '302 Found'
358 assert response.status == '302 Found'
347 assert_session_flash(
359 assert_session_flash(
348 response, 'You have successfully registered with RhodeCode')
360 response, 'You have successfully registered with RhodeCode')
349
361
350 ret = Session().query(User).filter(
362 ret = Session().query(User).filter(
351 User.username == 'test_regular4').one()
363 User.username == 'test_regular4').one()
352 assert ret.username == username
364 assert ret.username == username
353 assert check_password(password, ret.password)
365 assert check_password(password, ret.password)
354 assert ret.email == email
366 assert ret.email == email
355 assert ret.name == name
367 assert ret.name == name
356 assert ret.lastname == lastname
368 assert ret.lastname == lastname
357 assert ret.api_key is not None
369 assert ret.api_key is not None
358 assert not ret.admin
370 assert not ret.admin
359
371
360 def test_forgot_password_wrong_mail(self):
372 def test_forgot_password_wrong_mail(self):
361 bad_email = 'marcin@wrongmail.org'
373 bad_email = 'marcin@wrongmail.org'
362 response = self.app.post(
374 response = self.app.post(
363 url(controller='login', action='password_reset'),
375 pwd_reset_url,
364 {'email': bad_email, }
376 {'email': bad_email, }
365 )
377 )
366
378
367 msg = validators.ValidSystemEmail()._messages['non_existing_email']
379 msg = validators.ValidSystemEmail()._messages['non_existing_email']
368 msg = h.html_escape(msg % {'email': bad_email})
380 msg = h.html_escape(msg % {'email': bad_email})
369 response.mustcontain()
381 response.mustcontain()
370
382
371 def test_forgot_password(self):
383 def test_forgot_password(self):
372 response = self.app.get(url(controller='login',
384 response = self.app.get(pwd_reset_url)
373 action='password_reset'))
374 assert response.status == '200 OK'
385 assert response.status == '200 OK'
375
386
376 username = 'test_password_reset_1'
387 username = 'test_password_reset_1'
377 password = 'qweqwe'
388 password = 'qweqwe'
378 email = 'marcin@python-works.com'
389 email = 'marcin@python-works.com'
379 name = 'passwd'
390 name = 'passwd'
380 lastname = 'reset'
391 lastname = 'reset'
381
392
382 new = User()
393 new = User()
383 new.username = username
394 new.username = username
384 new.password = password
395 new.password = password
385 new.email = email
396 new.email = email
386 new.name = name
397 new.name = name
387 new.lastname = lastname
398 new.lastname = lastname
388 new.api_key = generate_auth_token(username)
399 new.api_key = generate_auth_token(username)
389 Session().add(new)
400 Session().add(new)
390 Session().commit()
401 Session().commit()
391
402
392 response = self.app.post(url(controller='login',
403 response = self.app.post(pwd_reset_url,
393 action='password_reset'),
394 {'email': email, })
404 {'email': email, })
395
405
396 assert_session_flash(
406 assert_session_flash(
397 response, 'Your password reset link was sent')
407 response, 'Your password reset link was sent')
398
408
399 response = response.follow()
409 response = response.follow()
400
410
401 # BAD KEY
411 # BAD KEY
402
412
403 key = "bad"
413 key = "bad"
404 response = self.app.get(url(controller='login',
414 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key)
405 action='password_reset_confirmation',
415 response = self.app.get(confirm_url)
406 key=key))
407 assert response.status == '302 Found'
416 assert response.status == '302 Found'
408 assert response.location.endswith(url('reset_password'))
417 assert response.location.endswith(pwd_reset_url)
409
418
410 # GOOD KEY
419 # GOOD KEY
411
420
412 key = User.get_by_username(username).api_key
421 key = User.get_by_username(username).api_key
413 response = self.app.get(url(controller='login',
422 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key)
414 action='password_reset_confirmation',
423 response = self.app.get(confirm_url)
415 key=key))
416 assert response.status == '302 Found'
424 assert response.status == '302 Found'
417 assert response.location.endswith(url('login_home'))
425 assert response.location.endswith(login_url)
418
426
419 assert_session_flash(
427 assert_session_flash(
420 response,
428 response,
421 'Your password reset was successful, '
429 'Your password reset was successful, '
422 'a new password has been sent to your email')
430 'a new password has been sent to your email')
423
431
424 response = response.follow()
432 response = response.follow()
425
433
426 def _get_api_whitelist(self, values=None):
434 def _get_api_whitelist(self, values=None):
427 config = {'api_access_controllers_whitelist': values or []}
435 config = {'api_access_controllers_whitelist': values or []}
428 return config
436 return config
429
437
430 @pytest.mark.parametrize("test_name, auth_token", [
438 @pytest.mark.parametrize("test_name, auth_token", [
431 ('none', None),
439 ('none', None),
432 ('empty_string', ''),
440 ('empty_string', ''),
433 ('fake_number', '123456'),
441 ('fake_number', '123456'),
434 ('proper_auth_token', None)
442 ('proper_auth_token', None)
435 ])
443 ])
436 def test_access_not_whitelisted_page_via_auth_token(self, test_name,
444 def test_access_not_whitelisted_page_via_auth_token(self, test_name,
437 auth_token):
445 auth_token):
438 whitelist = self._get_api_whitelist([])
446 whitelist = self._get_api_whitelist([])
439 with mock.patch.dict('rhodecode.CONFIG', whitelist):
447 with mock.patch.dict('rhodecode.CONFIG', whitelist):
440 assert [] == whitelist['api_access_controllers_whitelist']
448 assert [] == whitelist['api_access_controllers_whitelist']
441 if test_name == 'proper_auth_token':
449 if test_name == 'proper_auth_token':
442 # use builtin if api_key is None
450 # use builtin if api_key is None
443 auth_token = User.get_first_admin().api_key
451 auth_token = User.get_first_admin().api_key
444
452
445 with fixture.anon_access(False):
453 with fixture.anon_access(False):
446 self.app.get(url(controller='changeset',
454 self.app.get(url(controller='changeset',
447 action='changeset_raw',
455 action='changeset_raw',
448 repo_name=HG_REPO, revision='tip',
456 repo_name=HG_REPO, revision='tip',
449 api_key=auth_token),
457 api_key=auth_token),
450 status=302)
458 status=302)
451
459
452 @pytest.mark.parametrize("test_name, auth_token, code", [
460 @pytest.mark.parametrize("test_name, auth_token, code", [
453 ('none', None, 302),
461 ('none', None, 302),
454 ('empty_string', '', 302),
462 ('empty_string', '', 302),
455 ('fake_number', '123456', 302),
463 ('fake_number', '123456', 302),
456 ('proper_auth_token', None, 200)
464 ('proper_auth_token', None, 200)
457 ])
465 ])
458 def test_access_whitelisted_page_via_auth_token(self, test_name,
466 def test_access_whitelisted_page_via_auth_token(self, test_name,
459 auth_token, code):
467 auth_token, code):
460 whitelist = self._get_api_whitelist(
468 whitelist = self._get_api_whitelist(
461 ['ChangesetController:changeset_raw'])
469 ['ChangesetController:changeset_raw'])
462 with mock.patch.dict('rhodecode.CONFIG', whitelist):
470 with mock.patch.dict('rhodecode.CONFIG', whitelist):
463 assert ['ChangesetController:changeset_raw'] == \
471 assert ['ChangesetController:changeset_raw'] == \
464 whitelist['api_access_controllers_whitelist']
472 whitelist['api_access_controllers_whitelist']
465 if test_name == 'proper_auth_token':
473 if test_name == 'proper_auth_token':
466 auth_token = User.get_first_admin().api_key
474 auth_token = User.get_first_admin().api_key
467
475
468 with fixture.anon_access(False):
476 with fixture.anon_access(False):
469 self.app.get(url(controller='changeset',
477 self.app.get(url(controller='changeset',
470 action='changeset_raw',
478 action='changeset_raw',
471 repo_name=HG_REPO, revision='tip',
479 repo_name=HG_REPO, revision='tip',
472 api_key=auth_token),
480 api_key=auth_token),
473 status=code)
481 status=code)
474
482
475 def test_access_page_via_extra_auth_token(self):
483 def test_access_page_via_extra_auth_token(self):
476 whitelist = self._get_api_whitelist(
484 whitelist = self._get_api_whitelist(
477 ['ChangesetController:changeset_raw'])
485 ['ChangesetController:changeset_raw'])
478 with mock.patch.dict('rhodecode.CONFIG', whitelist):
486 with mock.patch.dict('rhodecode.CONFIG', whitelist):
479 assert ['ChangesetController:changeset_raw'] == \
487 assert ['ChangesetController:changeset_raw'] == \
480 whitelist['api_access_controllers_whitelist']
488 whitelist['api_access_controllers_whitelist']
481
489
482 new_auth_token = AuthTokenModel().create(
490 new_auth_token = AuthTokenModel().create(
483 TEST_USER_ADMIN_LOGIN, 'test')
491 TEST_USER_ADMIN_LOGIN, 'test')
484 Session().commit()
492 Session().commit()
485 with fixture.anon_access(False):
493 with fixture.anon_access(False):
486 self.app.get(url(controller='changeset',
494 self.app.get(url(controller='changeset',
487 action='changeset_raw',
495 action='changeset_raw',
488 repo_name=HG_REPO, revision='tip',
496 repo_name=HG_REPO, revision='tip',
489 api_key=new_auth_token.api_key),
497 api_key=new_auth_token.api_key),
490 status=200)
498 status=200)
491
499
492 def test_access_page_via_expired_auth_token(self):
500 def test_access_page_via_expired_auth_token(self):
493 whitelist = self._get_api_whitelist(
501 whitelist = self._get_api_whitelist(
494 ['ChangesetController:changeset_raw'])
502 ['ChangesetController:changeset_raw'])
495 with mock.patch.dict('rhodecode.CONFIG', whitelist):
503 with mock.patch.dict('rhodecode.CONFIG', whitelist):
496 assert ['ChangesetController:changeset_raw'] == \
504 assert ['ChangesetController:changeset_raw'] == \
497 whitelist['api_access_controllers_whitelist']
505 whitelist['api_access_controllers_whitelist']
498
506
499 new_auth_token = AuthTokenModel().create(
507 new_auth_token = AuthTokenModel().create(
500 TEST_USER_ADMIN_LOGIN, 'test')
508 TEST_USER_ADMIN_LOGIN, 'test')
501 Session().commit()
509 Session().commit()
502 # patch the api key and make it expired
510 # patch the api key and make it expired
503 new_auth_token.expires = 0
511 new_auth_token.expires = 0
504 Session().add(new_auth_token)
512 Session().add(new_auth_token)
505 Session().commit()
513 Session().commit()
506 with fixture.anon_access(False):
514 with fixture.anon_access(False):
507 self.app.get(url(controller='changeset',
515 self.app.get(url(controller='changeset',
508 action='changeset_raw',
516 action='changeset_raw',
509 repo_name=HG_REPO, revision='tip',
517 repo_name=HG_REPO, revision='tip',
510 api_key=new_auth_token.api_key),
518 api_key=new_auth_token.api_key),
511 status=302)
519 status=302)
@@ -1,916 +1,917 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2016 RhodeCode GmbH
3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import mock
21 import mock
22 import pytest
22 import pytest
23 from webob.exc import HTTPNotFound
23 from webob.exc import HTTPNotFound
24
24
25 import rhodecode
25 import rhodecode
26 from rhodecode.lib.vcs.nodes import FileNode
26 from rhodecode.lib.vcs.nodes import FileNode
27 from rhodecode.model.changeset_status import ChangesetStatusModel
27 from rhodecode.model.changeset_status import ChangesetStatusModel
28 from rhodecode.model.db import (
28 from rhodecode.model.db import (
29 PullRequest, ChangesetStatus, UserLog, Notification)
29 PullRequest, ChangesetStatus, UserLog, Notification)
30 from rhodecode.model.meta import Session
30 from rhodecode.model.meta import Session
31 from rhodecode.model.pull_request import PullRequestModel
31 from rhodecode.model.pull_request import PullRequestModel
32 from rhodecode.model.user import UserModel
32 from rhodecode.model.user import UserModel
33 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
33 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
34 from rhodecode.tests.utils import AssertResponse
34 from rhodecode.tests.utils import AssertResponse
35
35
36
36
37 @pytest.mark.usefixtures('app', 'autologin_user')
37 @pytest.mark.usefixtures('app', 'autologin_user')
38 @pytest.mark.backends("git", "hg")
38 @pytest.mark.backends("git", "hg")
39 class TestPullrequestsController:
39 class TestPullrequestsController:
40
40
41 def test_index(self, backend):
41 def test_index(self, backend):
42 self.app.get(url(
42 self.app.get(url(
43 controller='pullrequests', action='index',
43 controller='pullrequests', action='index',
44 repo_name=backend.repo_name))
44 repo_name=backend.repo_name))
45
45
46 def test_option_menu_create_pull_request_exists(self, backend):
46 def test_option_menu_create_pull_request_exists(self, backend):
47 repo_name = backend.repo_name
47 repo_name = backend.repo_name
48 response = self.app.get(url('summary_home', repo_name=repo_name))
48 response = self.app.get(url('summary_home', repo_name=repo_name))
49
49
50 create_pr_link = '<a href="%s">Create Pull Request</a>' % url(
50 create_pr_link = '<a href="%s">Create Pull Request</a>' % url(
51 'pullrequest', repo_name=repo_name)
51 'pullrequest', repo_name=repo_name)
52 response.mustcontain(create_pr_link)
52 response.mustcontain(create_pr_link)
53
53
54 def test_global_redirect_of_pr(self, backend, pr_util):
54 def test_global_redirect_of_pr(self, backend, pr_util):
55 pull_request = pr_util.create_pull_request()
55 pull_request = pr_util.create_pull_request()
56
56
57 response = self.app.get(
57 response = self.app.get(
58 url('pull_requests_global',
58 url('pull_requests_global',
59 pull_request_id=pull_request.pull_request_id))
59 pull_request_id=pull_request.pull_request_id))
60
60
61 repo_name = pull_request.target_repo.repo_name
61 repo_name = pull_request.target_repo.repo_name
62 redirect_url = url('pullrequest_show', repo_name=repo_name,
62 redirect_url = url('pullrequest_show', repo_name=repo_name,
63 pull_request_id=pull_request.pull_request_id)
63 pull_request_id=pull_request.pull_request_id)
64 assert response.status == '302 Found'
64 assert response.status == '302 Found'
65 assert redirect_url in response.location
65 assert redirect_url in response.location
66
66
67 @pytest.mark.xfail_backends(
67 @pytest.mark.xfail_backends(
68 "git", reason="Pending bugfix/feature, issue #6")
68 "git", reason="Pending bugfix/feature, issue #6")
69 def test_create_pr_form_with_raw_commit_id(self, backend):
69 def test_create_pr_form_with_raw_commit_id(self, backend):
70 repo = backend.repo
70 repo = backend.repo
71
71
72 self.app.get(
72 self.app.get(
73 url(controller='pullrequests', action='index',
73 url(controller='pullrequests', action='index',
74 repo_name=repo.repo_name,
74 repo_name=repo.repo_name,
75 commit=repo.get_commit().raw_id),
75 commit=repo.get_commit().raw_id),
76 status=200)
76 status=200)
77
77
78 @pytest.mark.parametrize('pr_merge_enabled', [True, False])
78 @pytest.mark.parametrize('pr_merge_enabled', [True, False])
79 def test_show(self, pr_util, pr_merge_enabled):
79 def test_show(self, pr_util, pr_merge_enabled):
80 pull_request = pr_util.create_pull_request(
80 pull_request = pr_util.create_pull_request(
81 mergeable=pr_merge_enabled, enable_notifications=False)
81 mergeable=pr_merge_enabled, enable_notifications=False)
82
82
83 response = self.app.get(url(
83 response = self.app.get(url(
84 controller='pullrequests', action='show',
84 controller='pullrequests', action='show',
85 repo_name=pull_request.target_repo.scm_instance().name,
85 repo_name=pull_request.target_repo.scm_instance().name,
86 pull_request_id=str(pull_request.pull_request_id)))
86 pull_request_id=str(pull_request.pull_request_id)))
87
87
88 for commit_id in pull_request.revisions:
88 for commit_id in pull_request.revisions:
89 response.mustcontain(commit_id)
89 response.mustcontain(commit_id)
90
90
91 assert pull_request.target_ref_parts.type in response
91 assert pull_request.target_ref_parts.type in response
92 assert pull_request.target_ref_parts.name in response
92 assert pull_request.target_ref_parts.name in response
93 target_clone_url = pull_request.target_repo.clone_url()
93 target_clone_url = pull_request.target_repo.clone_url()
94 assert target_clone_url in response
94 assert target_clone_url in response
95
95
96 assert 'class="pull-request-merge"' in response
96 assert 'class="pull-request-merge"' in response
97 assert (
97 assert (
98 'Server-side pull request merging is disabled.'
98 'Server-side pull request merging is disabled.'
99 in response) != pr_merge_enabled
99 in response) != pr_merge_enabled
100
100
101 def test_close_status_visibility(self, pr_util, csrf_token):
101 def test_close_status_visibility(self, pr_util, csrf_token):
102 from rhodecode.tests.functional.test_login import login_url, logut_url
102 # Logout
103 # Logout
103 response = self.app.post(
104 response = self.app.post(
104 url(controller='login', action='logout'),
105 logut_url,
105 params={'csrf_token': csrf_token})
106 params={'csrf_token': csrf_token})
106 # Login as regular user
107 # Login as regular user
107 response = self.app.post(url(controller='login', action='index'),
108 response = self.app.post(login_url,
108 {'username': 'test_regular',
109 {'username': 'test_regular',
109 'password': 'test12'})
110 'password': 'test12'})
110
111
111 pull_request = pr_util.create_pull_request(author='test_regular')
112 pull_request = pr_util.create_pull_request(author='test_regular')
112
113
113 response = self.app.get(url(
114 response = self.app.get(url(
114 controller='pullrequests', action='show',
115 controller='pullrequests', action='show',
115 repo_name=pull_request.target_repo.scm_instance().name,
116 repo_name=pull_request.target_repo.scm_instance().name,
116 pull_request_id=str(pull_request.pull_request_id)))
117 pull_request_id=str(pull_request.pull_request_id)))
117
118
118 assert 'Server-side pull request merging is disabled.' in response
119 assert 'Server-side pull request merging is disabled.' in response
119 assert 'value="forced_closed"' in response
120 assert 'value="forced_closed"' in response
120
121
121 def test_show_invalid_commit_id(self, pr_util):
122 def test_show_invalid_commit_id(self, pr_util):
122 # Simulating invalid revisions which will cause a lookup error
123 # Simulating invalid revisions which will cause a lookup error
123 pull_request = pr_util.create_pull_request()
124 pull_request = pr_util.create_pull_request()
124 pull_request.revisions = ['invalid']
125 pull_request.revisions = ['invalid']
125 Session().add(pull_request)
126 Session().add(pull_request)
126 Session().commit()
127 Session().commit()
127
128
128 response = self.app.get(url(
129 response = self.app.get(url(
129 controller='pullrequests', action='show',
130 controller='pullrequests', action='show',
130 repo_name=pull_request.target_repo.scm_instance().name,
131 repo_name=pull_request.target_repo.scm_instance().name,
131 pull_request_id=str(pull_request.pull_request_id)))
132 pull_request_id=str(pull_request.pull_request_id)))
132
133
133 for commit_id in pull_request.revisions:
134 for commit_id in pull_request.revisions:
134 response.mustcontain(commit_id)
135 response.mustcontain(commit_id)
135
136
136 def test_show_invalid_source_reference(self, pr_util):
137 def test_show_invalid_source_reference(self, pr_util):
137 pull_request = pr_util.create_pull_request()
138 pull_request = pr_util.create_pull_request()
138 pull_request.source_ref = 'branch:b:invalid'
139 pull_request.source_ref = 'branch:b:invalid'
139 Session().add(pull_request)
140 Session().add(pull_request)
140 Session().commit()
141 Session().commit()
141
142
142 self.app.get(url(
143 self.app.get(url(
143 controller='pullrequests', action='show',
144 controller='pullrequests', action='show',
144 repo_name=pull_request.target_repo.scm_instance().name,
145 repo_name=pull_request.target_repo.scm_instance().name,
145 pull_request_id=str(pull_request.pull_request_id)))
146 pull_request_id=str(pull_request.pull_request_id)))
146
147
147 def test_edit_title_description(self, pr_util, csrf_token):
148 def test_edit_title_description(self, pr_util, csrf_token):
148 pull_request = pr_util.create_pull_request()
149 pull_request = pr_util.create_pull_request()
149 pull_request_id = pull_request.pull_request_id
150 pull_request_id = pull_request.pull_request_id
150
151
151 response = self.app.post(
152 response = self.app.post(
152 url(controller='pullrequests', action='update',
153 url(controller='pullrequests', action='update',
153 repo_name=pull_request.target_repo.repo_name,
154 repo_name=pull_request.target_repo.repo_name,
154 pull_request_id=str(pull_request_id)),
155 pull_request_id=str(pull_request_id)),
155 params={
156 params={
156 'edit_pull_request': 'true',
157 'edit_pull_request': 'true',
157 '_method': 'put',
158 '_method': 'put',
158 'title': 'New title',
159 'title': 'New title',
159 'description': 'New description',
160 'description': 'New description',
160 'csrf_token': csrf_token})
161 'csrf_token': csrf_token})
161
162
162 assert_session_flash(
163 assert_session_flash(
163 response, u'Pull request title & description updated.',
164 response, u'Pull request title & description updated.',
164 category='success')
165 category='success')
165
166
166 pull_request = PullRequest.get(pull_request_id)
167 pull_request = PullRequest.get(pull_request_id)
167 assert pull_request.title == 'New title'
168 assert pull_request.title == 'New title'
168 assert pull_request.description == 'New description'
169 assert pull_request.description == 'New description'
169
170
170 def test_edit_title_description_closed(self, pr_util, csrf_token):
171 def test_edit_title_description_closed(self, pr_util, csrf_token):
171 pull_request = pr_util.create_pull_request()
172 pull_request = pr_util.create_pull_request()
172 pull_request_id = pull_request.pull_request_id
173 pull_request_id = pull_request.pull_request_id
173 pr_util.close()
174 pr_util.close()
174
175
175 response = self.app.post(
176 response = self.app.post(
176 url(controller='pullrequests', action='update',
177 url(controller='pullrequests', action='update',
177 repo_name=pull_request.target_repo.repo_name,
178 repo_name=pull_request.target_repo.repo_name,
178 pull_request_id=str(pull_request_id)),
179 pull_request_id=str(pull_request_id)),
179 params={
180 params={
180 'edit_pull_request': 'true',
181 'edit_pull_request': 'true',
181 '_method': 'put',
182 '_method': 'put',
182 'title': 'New title',
183 'title': 'New title',
183 'description': 'New description',
184 'description': 'New description',
184 'csrf_token': csrf_token})
185 'csrf_token': csrf_token})
185
186
186 assert_session_flash(
187 assert_session_flash(
187 response, u'Cannot update closed pull requests.',
188 response, u'Cannot update closed pull requests.',
188 category='error')
189 category='error')
189
190
190 def test_update_invalid_source_reference(self, pr_util, csrf_token):
191 def test_update_invalid_source_reference(self, pr_util, csrf_token):
191 pull_request = pr_util.create_pull_request()
192 pull_request = pr_util.create_pull_request()
192 pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
193 pull_request.source_ref = 'branch:invalid-branch:invalid-commit-id'
193 Session().add(pull_request)
194 Session().add(pull_request)
194 Session().commit()
195 Session().commit()
195
196
196 pull_request_id = pull_request.pull_request_id
197 pull_request_id = pull_request.pull_request_id
197
198
198 response = self.app.post(
199 response = self.app.post(
199 url(controller='pullrequests', action='update',
200 url(controller='pullrequests', action='update',
200 repo_name=pull_request.target_repo.repo_name,
201 repo_name=pull_request.target_repo.repo_name,
201 pull_request_id=str(pull_request_id)),
202 pull_request_id=str(pull_request_id)),
202 params={'update_commits': 'true', '_method': 'put',
203 params={'update_commits': 'true', '_method': 'put',
203 'csrf_token': csrf_token})
204 'csrf_token': csrf_token})
204
205
205 assert_session_flash(
206 assert_session_flash(
206 response, u'Update failed due to missing commits.',
207 response, u'Update failed due to missing commits.',
207 category='error')
208 category='error')
208
209
209 def test_comment_and_close_pull_request(self, pr_util, csrf_token):
210 def test_comment_and_close_pull_request(self, pr_util, csrf_token):
210 pull_request = pr_util.create_pull_request(approved=True)
211 pull_request = pr_util.create_pull_request(approved=True)
211 pull_request_id = pull_request.pull_request_id
212 pull_request_id = pull_request.pull_request_id
212 author = pull_request.user_id
213 author = pull_request.user_id
213 repo = pull_request.target_repo.repo_id
214 repo = pull_request.target_repo.repo_id
214
215
215 self.app.post(
216 self.app.post(
216 url(controller='pullrequests',
217 url(controller='pullrequests',
217 action='comment',
218 action='comment',
218 repo_name=pull_request.target_repo.scm_instance().name,
219 repo_name=pull_request.target_repo.scm_instance().name,
219 pull_request_id=str(pull_request_id)),
220 pull_request_id=str(pull_request_id)),
220 params={
221 params={
221 'changeset_status':
222 'changeset_status':
222 ChangesetStatus.STATUS_APPROVED + '_closed',
223 ChangesetStatus.STATUS_APPROVED + '_closed',
223 'change_changeset_status': 'on',
224 'change_changeset_status': 'on',
224 'text': '',
225 'text': '',
225 'csrf_token': csrf_token},
226 'csrf_token': csrf_token},
226 status=302)
227 status=302)
227
228
228 action = 'user_closed_pull_request:%d' % pull_request_id
229 action = 'user_closed_pull_request:%d' % pull_request_id
229 journal = UserLog.query()\
230 journal = UserLog.query()\
230 .filter(UserLog.user_id == author)\
231 .filter(UserLog.user_id == author)\
231 .filter(UserLog.repository_id == repo)\
232 .filter(UserLog.repository_id == repo)\
232 .filter(UserLog.action == action)\
233 .filter(UserLog.action == action)\
233 .all()
234 .all()
234 assert len(journal) == 1
235 assert len(journal) == 1
235
236
236 def test_reject_and_close_pull_request(self, pr_util, csrf_token):
237 def test_reject_and_close_pull_request(self, pr_util, csrf_token):
237 pull_request = pr_util.create_pull_request()
238 pull_request = pr_util.create_pull_request()
238 pull_request_id = pull_request.pull_request_id
239 pull_request_id = pull_request.pull_request_id
239 response = self.app.post(
240 response = self.app.post(
240 url(controller='pullrequests',
241 url(controller='pullrequests',
241 action='update',
242 action='update',
242 repo_name=pull_request.target_repo.scm_instance().name,
243 repo_name=pull_request.target_repo.scm_instance().name,
243 pull_request_id=str(pull_request.pull_request_id)),
244 pull_request_id=str(pull_request.pull_request_id)),
244 params={'close_pull_request': 'true', '_method': 'put',
245 params={'close_pull_request': 'true', '_method': 'put',
245 'csrf_token': csrf_token})
246 'csrf_token': csrf_token})
246
247
247 pull_request = PullRequest.get(pull_request_id)
248 pull_request = PullRequest.get(pull_request_id)
248
249
249 assert response.json is True
250 assert response.json is True
250 assert pull_request.is_closed()
251 assert pull_request.is_closed()
251
252
252 # check only the latest status, not the review status
253 # check only the latest status, not the review status
253 status = ChangesetStatusModel().get_status(
254 status = ChangesetStatusModel().get_status(
254 pull_request.source_repo, pull_request=pull_request)
255 pull_request.source_repo, pull_request=pull_request)
255 assert status == ChangesetStatus.STATUS_REJECTED
256 assert status == ChangesetStatus.STATUS_REJECTED
256
257
257 def test_comment_force_close_pull_request(self, pr_util, csrf_token):
258 def test_comment_force_close_pull_request(self, pr_util, csrf_token):
258 pull_request = pr_util.create_pull_request()
259 pull_request = pr_util.create_pull_request()
259 pull_request_id = pull_request.pull_request_id
260 pull_request_id = pull_request.pull_request_id
260 reviewers_ids = [1, 2]
261 reviewers_ids = [1, 2]
261 PullRequestModel().update_reviewers(pull_request_id, reviewers_ids)
262 PullRequestModel().update_reviewers(pull_request_id, reviewers_ids)
262 author = pull_request.user_id
263 author = pull_request.user_id
263 repo = pull_request.target_repo.repo_id
264 repo = pull_request.target_repo.repo_id
264 self.app.post(
265 self.app.post(
265 url(controller='pullrequests',
266 url(controller='pullrequests',
266 action='comment',
267 action='comment',
267 repo_name=pull_request.target_repo.scm_instance().name,
268 repo_name=pull_request.target_repo.scm_instance().name,
268 pull_request_id=str(pull_request_id)),
269 pull_request_id=str(pull_request_id)),
269 params={
270 params={
270 'changeset_status': 'forced_closed',
271 'changeset_status': 'forced_closed',
271 'csrf_token': csrf_token},
272 'csrf_token': csrf_token},
272 status=302)
273 status=302)
273
274
274 pull_request = PullRequest.get(pull_request_id)
275 pull_request = PullRequest.get(pull_request_id)
275
276
276 action = 'user_closed_pull_request:%d' % pull_request_id
277 action = 'user_closed_pull_request:%d' % pull_request_id
277 journal = UserLog.query().filter(
278 journal = UserLog.query().filter(
278 UserLog.user_id == author,
279 UserLog.user_id == author,
279 UserLog.repository_id == repo,
280 UserLog.repository_id == repo,
280 UserLog.action == action).all()
281 UserLog.action == action).all()
281 assert len(journal) == 1
282 assert len(journal) == 1
282
283
283 # check only the latest status, not the review status
284 # check only the latest status, not the review status
284 status = ChangesetStatusModel().get_status(
285 status = ChangesetStatusModel().get_status(
285 pull_request.source_repo, pull_request=pull_request)
286 pull_request.source_repo, pull_request=pull_request)
286 assert status == ChangesetStatus.STATUS_REJECTED
287 assert status == ChangesetStatus.STATUS_REJECTED
287
288
288 def test_create_pull_request(self, backend, csrf_token):
289 def test_create_pull_request(self, backend, csrf_token):
289 commits = [
290 commits = [
290 {'message': 'ancestor'},
291 {'message': 'ancestor'},
291 {'message': 'change'},
292 {'message': 'change'},
292 ]
293 ]
293 commit_ids = backend.create_master_repo(commits)
294 commit_ids = backend.create_master_repo(commits)
294 target = backend.create_repo(heads=['ancestor'])
295 target = backend.create_repo(heads=['ancestor'])
295 source = backend.create_repo(heads=['change'])
296 source = backend.create_repo(heads=['change'])
296
297
297 response = self.app.post(
298 response = self.app.post(
298 url(
299 url(
299 controller='pullrequests',
300 controller='pullrequests',
300 action='create',
301 action='create',
301 repo_name=source.repo_name),
302 repo_name=source.repo_name),
302 params={
303 params={
303 'source_repo': source.repo_name,
304 'source_repo': source.repo_name,
304 'source_ref': 'branch:default:' + commit_ids['change'],
305 'source_ref': 'branch:default:' + commit_ids['change'],
305 'target_repo': target.repo_name,
306 'target_repo': target.repo_name,
306 'target_ref': 'branch:default:' + commit_ids['ancestor'],
307 'target_ref': 'branch:default:' + commit_ids['ancestor'],
307 'pullrequest_desc': 'Description',
308 'pullrequest_desc': 'Description',
308 'pullrequest_title': 'Title',
309 'pullrequest_title': 'Title',
309 'review_members': '1',
310 'review_members': '1',
310 'revisions': commit_ids['change'],
311 'revisions': commit_ids['change'],
311 'user': '',
312 'user': '',
312 'csrf_token': csrf_token,
313 'csrf_token': csrf_token,
313 },
314 },
314 status=302)
315 status=302)
315
316
316 location = response.headers['Location']
317 location = response.headers['Location']
317 pull_request_id = int(location.rsplit('/', 1)[1])
318 pull_request_id = int(location.rsplit('/', 1)[1])
318 pull_request = PullRequest.get(pull_request_id)
319 pull_request = PullRequest.get(pull_request_id)
319
320
320 # check that we have now both revisions
321 # check that we have now both revisions
321 assert pull_request.revisions == [commit_ids['change']]
322 assert pull_request.revisions == [commit_ids['change']]
322 assert pull_request.source_ref == 'branch:default:' + commit_ids['change']
323 assert pull_request.source_ref == 'branch:default:' + commit_ids['change']
323 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
324 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
324 assert pull_request.target_ref == expected_target_ref
325 assert pull_request.target_ref == expected_target_ref
325
326
326 def test_reviewer_notifications(self, backend, csrf_token):
327 def test_reviewer_notifications(self, backend, csrf_token):
327 # We have to use the app.post for this test so it will create the
328 # We have to use the app.post for this test so it will create the
328 # notifications properly with the new PR
329 # notifications properly with the new PR
329 commits = [
330 commits = [
330 {'message': 'ancestor',
331 {'message': 'ancestor',
331 'added': [FileNode('file_A', content='content_of_ancestor')]},
332 'added': [FileNode('file_A', content='content_of_ancestor')]},
332 {'message': 'change',
333 {'message': 'change',
333 'added': [FileNode('file_a', content='content_of_change')]},
334 'added': [FileNode('file_a', content='content_of_change')]},
334 {'message': 'change-child'},
335 {'message': 'change-child'},
335 {'message': 'ancestor-child', 'parents': ['ancestor'],
336 {'message': 'ancestor-child', 'parents': ['ancestor'],
336 'added': [
337 'added': [
337 FileNode('file_B', content='content_of_ancestor_child')]},
338 FileNode('file_B', content='content_of_ancestor_child')]},
338 {'message': 'ancestor-child-2'},
339 {'message': 'ancestor-child-2'},
339 ]
340 ]
340 commit_ids = backend.create_master_repo(commits)
341 commit_ids = backend.create_master_repo(commits)
341 target = backend.create_repo(heads=['ancestor-child'])
342 target = backend.create_repo(heads=['ancestor-child'])
342 source = backend.create_repo(heads=['change'])
343 source = backend.create_repo(heads=['change'])
343
344
344 response = self.app.post(
345 response = self.app.post(
345 url(
346 url(
346 controller='pullrequests',
347 controller='pullrequests',
347 action='create',
348 action='create',
348 repo_name=source.repo_name),
349 repo_name=source.repo_name),
349 params={
350 params={
350 'source_repo': source.repo_name,
351 'source_repo': source.repo_name,
351 'source_ref': 'branch:default:' + commit_ids['change'],
352 'source_ref': 'branch:default:' + commit_ids['change'],
352 'target_repo': target.repo_name,
353 'target_repo': target.repo_name,
353 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
354 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
354 'pullrequest_desc': 'Description',
355 'pullrequest_desc': 'Description',
355 'pullrequest_title': 'Title',
356 'pullrequest_title': 'Title',
356 'review_members': '2',
357 'review_members': '2',
357 'revisions': commit_ids['change'],
358 'revisions': commit_ids['change'],
358 'user': '',
359 'user': '',
359 'csrf_token': csrf_token,
360 'csrf_token': csrf_token,
360 },
361 },
361 status=302)
362 status=302)
362
363
363 location = response.headers['Location']
364 location = response.headers['Location']
364 pull_request_id = int(location.rsplit('/', 1)[1])
365 pull_request_id = int(location.rsplit('/', 1)[1])
365 pull_request = PullRequest.get(pull_request_id)
366 pull_request = PullRequest.get(pull_request_id)
366
367
367 # Check that a notification was made
368 # Check that a notification was made
368 notifications = Notification.query()\
369 notifications = Notification.query()\
369 .filter(Notification.created_by == pull_request.author.user_id,
370 .filter(Notification.created_by == pull_request.author.user_id,
370 Notification.type_ == Notification.TYPE_PULL_REQUEST,
371 Notification.type_ == Notification.TYPE_PULL_REQUEST,
371 Notification.subject.contains("wants you to review "
372 Notification.subject.contains("wants you to review "
372 "pull request #%d"
373 "pull request #%d"
373 % pull_request_id))
374 % pull_request_id))
374 assert len(notifications.all()) == 1
375 assert len(notifications.all()) == 1
375
376
376 # Change reviewers and check that a notification was made
377 # Change reviewers and check that a notification was made
377 PullRequestModel().update_reviewers(pull_request.pull_request_id, [1])
378 PullRequestModel().update_reviewers(pull_request.pull_request_id, [1])
378 assert len(notifications.all()) == 2
379 assert len(notifications.all()) == 2
379
380
380 def test_create_pull_request_stores_ancestor_commit_id(self, backend,
381 def test_create_pull_request_stores_ancestor_commit_id(self, backend,
381 csrf_token):
382 csrf_token):
382 commits = [
383 commits = [
383 {'message': 'ancestor',
384 {'message': 'ancestor',
384 'added': [FileNode('file_A', content='content_of_ancestor')]},
385 'added': [FileNode('file_A', content='content_of_ancestor')]},
385 {'message': 'change',
386 {'message': 'change',
386 'added': [FileNode('file_a', content='content_of_change')]},
387 'added': [FileNode('file_a', content='content_of_change')]},
387 {'message': 'change-child'},
388 {'message': 'change-child'},
388 {'message': 'ancestor-child', 'parents': ['ancestor'],
389 {'message': 'ancestor-child', 'parents': ['ancestor'],
389 'added': [
390 'added': [
390 FileNode('file_B', content='content_of_ancestor_child')]},
391 FileNode('file_B', content='content_of_ancestor_child')]},
391 {'message': 'ancestor-child-2'},
392 {'message': 'ancestor-child-2'},
392 ]
393 ]
393 commit_ids = backend.create_master_repo(commits)
394 commit_ids = backend.create_master_repo(commits)
394 target = backend.create_repo(heads=['ancestor-child'])
395 target = backend.create_repo(heads=['ancestor-child'])
395 source = backend.create_repo(heads=['change'])
396 source = backend.create_repo(heads=['change'])
396
397
397 response = self.app.post(
398 response = self.app.post(
398 url(
399 url(
399 controller='pullrequests',
400 controller='pullrequests',
400 action='create',
401 action='create',
401 repo_name=source.repo_name),
402 repo_name=source.repo_name),
402 params={
403 params={
403 'source_repo': source.repo_name,
404 'source_repo': source.repo_name,
404 'source_ref': 'branch:default:' + commit_ids['change'],
405 'source_ref': 'branch:default:' + commit_ids['change'],
405 'target_repo': target.repo_name,
406 'target_repo': target.repo_name,
406 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
407 'target_ref': 'branch:default:' + commit_ids['ancestor-child'],
407 'pullrequest_desc': 'Description',
408 'pullrequest_desc': 'Description',
408 'pullrequest_title': 'Title',
409 'pullrequest_title': 'Title',
409 'review_members': '1',
410 'review_members': '1',
410 'revisions': commit_ids['change'],
411 'revisions': commit_ids['change'],
411 'user': '',
412 'user': '',
412 'csrf_token': csrf_token,
413 'csrf_token': csrf_token,
413 },
414 },
414 status=302)
415 status=302)
415
416
416 location = response.headers['Location']
417 location = response.headers['Location']
417 pull_request_id = int(location.rsplit('/', 1)[1])
418 pull_request_id = int(location.rsplit('/', 1)[1])
418 pull_request = PullRequest.get(pull_request_id)
419 pull_request = PullRequest.get(pull_request_id)
419
420
420 # target_ref has to point to the ancestor's commit_id in order to
421 # target_ref has to point to the ancestor's commit_id in order to
421 # show the correct diff
422 # show the correct diff
422 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
423 expected_target_ref = 'branch:default:' + commit_ids['ancestor']
423 assert pull_request.target_ref == expected_target_ref
424 assert pull_request.target_ref == expected_target_ref
424
425
425 # Check generated diff contents
426 # Check generated diff contents
426 response = response.follow()
427 response = response.follow()
427 assert 'content_of_ancestor' not in response.body
428 assert 'content_of_ancestor' not in response.body
428 assert 'content_of_ancestor-child' not in response.body
429 assert 'content_of_ancestor-child' not in response.body
429 assert 'content_of_change' in response.body
430 assert 'content_of_change' in response.body
430
431
431 def test_merge_pull_request_enabled(self, pr_util, csrf_token):
432 def test_merge_pull_request_enabled(self, pr_util, csrf_token):
432 # Clear any previous calls to rcextensions
433 # Clear any previous calls to rcextensions
433 rhodecode.EXTENSIONS.calls.clear()
434 rhodecode.EXTENSIONS.calls.clear()
434
435
435 pull_request = pr_util.create_pull_request(
436 pull_request = pr_util.create_pull_request(
436 approved=True, mergeable=True)
437 approved=True, mergeable=True)
437 pull_request_id = pull_request.pull_request_id
438 pull_request_id = pull_request.pull_request_id
438 repo_name = pull_request.target_repo.scm_instance().name,
439 repo_name = pull_request.target_repo.scm_instance().name,
439
440
440 response = self.app.post(
441 response = self.app.post(
441 url(controller='pullrequests',
442 url(controller='pullrequests',
442 action='merge',
443 action='merge',
443 repo_name=str(repo_name[0]),
444 repo_name=str(repo_name[0]),
444 pull_request_id=str(pull_request_id)),
445 pull_request_id=str(pull_request_id)),
445 params={'csrf_token': csrf_token}).follow()
446 params={'csrf_token': csrf_token}).follow()
446
447
447 pull_request = PullRequest.get(pull_request_id)
448 pull_request = PullRequest.get(pull_request_id)
448
449
449 assert response.status_int == 200
450 assert response.status_int == 200
450 assert pull_request.is_closed()
451 assert pull_request.is_closed()
451 assert_pull_request_status(
452 assert_pull_request_status(
452 pull_request, ChangesetStatus.STATUS_APPROVED)
453 pull_request, ChangesetStatus.STATUS_APPROVED)
453
454
454 # Check the relevant log entries were added
455 # Check the relevant log entries were added
455 user_logs = UserLog.query().order_by('-user_log_id').limit(4)
456 user_logs = UserLog.query().order_by('-user_log_id').limit(4)
456 actions = [log.action for log in user_logs]
457 actions = [log.action for log in user_logs]
457 pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
458 pr_commit_ids = PullRequestModel()._get_commit_ids(pull_request)
458 expected_actions = [
459 expected_actions = [
459 u'user_closed_pull_request:%d' % pull_request_id,
460 u'user_closed_pull_request:%d' % pull_request_id,
460 u'user_merged_pull_request:%d' % pull_request_id,
461 u'user_merged_pull_request:%d' % pull_request_id,
461 # The action below reflect that the post push actions were executed
462 # The action below reflect that the post push actions were executed
462 u'user_commented_pull_request:%d' % pull_request_id,
463 u'user_commented_pull_request:%d' % pull_request_id,
463 u'push:%s' % ','.join(pr_commit_ids),
464 u'push:%s' % ','.join(pr_commit_ids),
464 ]
465 ]
465 assert actions == expected_actions
466 assert actions == expected_actions
466
467
467 # Check post_push rcextension was really executed
468 # Check post_push rcextension was really executed
468 push_calls = rhodecode.EXTENSIONS.calls['post_push']
469 push_calls = rhodecode.EXTENSIONS.calls['post_push']
469 assert len(push_calls) == 1
470 assert len(push_calls) == 1
470 unused_last_call_args, last_call_kwargs = push_calls[0]
471 unused_last_call_args, last_call_kwargs = push_calls[0]
471 assert last_call_kwargs['action'] == 'push'
472 assert last_call_kwargs['action'] == 'push'
472 assert last_call_kwargs['pushed_revs'] == pr_commit_ids
473 assert last_call_kwargs['pushed_revs'] == pr_commit_ids
473
474
474 def test_merge_pull_request_disabled(self, pr_util, csrf_token):
475 def test_merge_pull_request_disabled(self, pr_util, csrf_token):
475 pull_request = pr_util.create_pull_request(mergeable=False)
476 pull_request = pr_util.create_pull_request(mergeable=False)
476 pull_request_id = pull_request.pull_request_id
477 pull_request_id = pull_request.pull_request_id
477 pull_request = PullRequest.get(pull_request_id)
478 pull_request = PullRequest.get(pull_request_id)
478
479
479 response = self.app.post(
480 response = self.app.post(
480 url(controller='pullrequests',
481 url(controller='pullrequests',
481 action='merge',
482 action='merge',
482 repo_name=pull_request.target_repo.scm_instance().name,
483 repo_name=pull_request.target_repo.scm_instance().name,
483 pull_request_id=str(pull_request.pull_request_id)),
484 pull_request_id=str(pull_request.pull_request_id)),
484 params={'csrf_token': csrf_token}).follow()
485 params={'csrf_token': csrf_token}).follow()
485
486
486 assert response.status_int == 200
487 assert response.status_int == 200
487 assert 'Server-side pull request merging is disabled.' in response.body
488 assert 'Server-side pull request merging is disabled.' in response.body
488
489
489 @pytest.mark.skip_backends('svn')
490 @pytest.mark.skip_backends('svn')
490 def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
491 def test_merge_pull_request_not_approved(self, pr_util, csrf_token):
491 pull_request = pr_util.create_pull_request(mergeable=True)
492 pull_request = pr_util.create_pull_request(mergeable=True)
492 pull_request_id = pull_request.pull_request_id
493 pull_request_id = pull_request.pull_request_id
493 repo_name = pull_request.target_repo.scm_instance().name,
494 repo_name = pull_request.target_repo.scm_instance().name,
494
495
495 response = self.app.post(
496 response = self.app.post(
496 url(controller='pullrequests',
497 url(controller='pullrequests',
497 action='merge',
498 action='merge',
498 repo_name=str(repo_name[0]),
499 repo_name=str(repo_name[0]),
499 pull_request_id=str(pull_request_id)),
500 pull_request_id=str(pull_request_id)),
500 params={'csrf_token': csrf_token}).follow()
501 params={'csrf_token': csrf_token}).follow()
501
502
502 pull_request = PullRequest.get(pull_request_id)
503 pull_request = PullRequest.get(pull_request_id)
503
504
504 assert response.status_int == 200
505 assert response.status_int == 200
505 assert ' Reviewer approval is pending.' in response.body
506 assert ' Reviewer approval is pending.' in response.body
506
507
507 def test_update_source_revision(self, backend, csrf_token):
508 def test_update_source_revision(self, backend, csrf_token):
508 commits = [
509 commits = [
509 {'message': 'ancestor'},
510 {'message': 'ancestor'},
510 {'message': 'change'},
511 {'message': 'change'},
511 {'message': 'change-2'},
512 {'message': 'change-2'},
512 ]
513 ]
513 commit_ids = backend.create_master_repo(commits)
514 commit_ids = backend.create_master_repo(commits)
514 target = backend.create_repo(heads=['ancestor'])
515 target = backend.create_repo(heads=['ancestor'])
515 source = backend.create_repo(heads=['change'])
516 source = backend.create_repo(heads=['change'])
516
517
517 # create pr from a in source to A in target
518 # create pr from a in source to A in target
518 pull_request = PullRequest()
519 pull_request = PullRequest()
519 pull_request.source_repo = source
520 pull_request.source_repo = source
520 # TODO: johbo: Make sure that we write the source ref this way!
521 # TODO: johbo: Make sure that we write the source ref this way!
521 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
522 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
522 branch=backend.default_branch_name, commit_id=commit_ids['change'])
523 branch=backend.default_branch_name, commit_id=commit_ids['change'])
523 pull_request.target_repo = target
524 pull_request.target_repo = target
524
525
525 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
526 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
526 branch=backend.default_branch_name,
527 branch=backend.default_branch_name,
527 commit_id=commit_ids['ancestor'])
528 commit_id=commit_ids['ancestor'])
528 pull_request.revisions = [commit_ids['change']]
529 pull_request.revisions = [commit_ids['change']]
529 pull_request.title = u"Test"
530 pull_request.title = u"Test"
530 pull_request.description = u"Description"
531 pull_request.description = u"Description"
531 pull_request.author = UserModel().get_by_username(
532 pull_request.author = UserModel().get_by_username(
532 TEST_USER_ADMIN_LOGIN)
533 TEST_USER_ADMIN_LOGIN)
533 Session().add(pull_request)
534 Session().add(pull_request)
534 Session().commit()
535 Session().commit()
535 pull_request_id = pull_request.pull_request_id
536 pull_request_id = pull_request.pull_request_id
536
537
537 # source has ancestor - change - change-2
538 # source has ancestor - change - change-2
538 backend.pull_heads(source, heads=['change-2'])
539 backend.pull_heads(source, heads=['change-2'])
539
540
540 # update PR
541 # update PR
541 self.app.post(
542 self.app.post(
542 url(controller='pullrequests', action='update',
543 url(controller='pullrequests', action='update',
543 repo_name=target.repo_name,
544 repo_name=target.repo_name,
544 pull_request_id=str(pull_request_id)),
545 pull_request_id=str(pull_request_id)),
545 params={'update_commits': 'true', '_method': 'put',
546 params={'update_commits': 'true', '_method': 'put',
546 'csrf_token': csrf_token})
547 'csrf_token': csrf_token})
547
548
548 # check that we have now both revisions
549 # check that we have now both revisions
549 pull_request = PullRequest.get(pull_request_id)
550 pull_request = PullRequest.get(pull_request_id)
550 assert pull_request.revisions == [
551 assert pull_request.revisions == [
551 commit_ids['change-2'], commit_ids['change']]
552 commit_ids['change-2'], commit_ids['change']]
552
553
553 # TODO: johbo: this should be a test on its own
554 # TODO: johbo: this should be a test on its own
554 response = self.app.get(url(
555 response = self.app.get(url(
555 controller='pullrequests', action='index',
556 controller='pullrequests', action='index',
556 repo_name=target.repo_name))
557 repo_name=target.repo_name))
557 assert response.status_int == 200
558 assert response.status_int == 200
558 assert 'Pull request updated to' in response.body
559 assert 'Pull request updated to' in response.body
559 assert 'with 1 added, 0 removed commits.' in response.body
560 assert 'with 1 added, 0 removed commits.' in response.body
560
561
561 def test_update_target_revision(self, backend, csrf_token):
562 def test_update_target_revision(self, backend, csrf_token):
562 commits = [
563 commits = [
563 {'message': 'ancestor'},
564 {'message': 'ancestor'},
564 {'message': 'change'},
565 {'message': 'change'},
565 {'message': 'ancestor-new', 'parents': ['ancestor']},
566 {'message': 'ancestor-new', 'parents': ['ancestor']},
566 {'message': 'change-rebased'},
567 {'message': 'change-rebased'},
567 ]
568 ]
568 commit_ids = backend.create_master_repo(commits)
569 commit_ids = backend.create_master_repo(commits)
569 target = backend.create_repo(heads=['ancestor'])
570 target = backend.create_repo(heads=['ancestor'])
570 source = backend.create_repo(heads=['change'])
571 source = backend.create_repo(heads=['change'])
571
572
572 # create pr from a in source to A in target
573 # create pr from a in source to A in target
573 pull_request = PullRequest()
574 pull_request = PullRequest()
574 pull_request.source_repo = source
575 pull_request.source_repo = source
575 # TODO: johbo: Make sure that we write the source ref this way!
576 # TODO: johbo: Make sure that we write the source ref this way!
576 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
577 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
577 branch=backend.default_branch_name, commit_id=commit_ids['change'])
578 branch=backend.default_branch_name, commit_id=commit_ids['change'])
578 pull_request.target_repo = target
579 pull_request.target_repo = target
579 # TODO: johbo: Target ref should be branch based, since tip can jump
580 # TODO: johbo: Target ref should be branch based, since tip can jump
580 # from branch to branch
581 # from branch to branch
581 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
582 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
582 branch=backend.default_branch_name,
583 branch=backend.default_branch_name,
583 commit_id=commit_ids['ancestor'])
584 commit_id=commit_ids['ancestor'])
584 pull_request.revisions = [commit_ids['change']]
585 pull_request.revisions = [commit_ids['change']]
585 pull_request.title = u"Test"
586 pull_request.title = u"Test"
586 pull_request.description = u"Description"
587 pull_request.description = u"Description"
587 pull_request.author = UserModel().get_by_username(
588 pull_request.author = UserModel().get_by_username(
588 TEST_USER_ADMIN_LOGIN)
589 TEST_USER_ADMIN_LOGIN)
589 Session().add(pull_request)
590 Session().add(pull_request)
590 Session().commit()
591 Session().commit()
591 pull_request_id = pull_request.pull_request_id
592 pull_request_id = pull_request.pull_request_id
592
593
593 # target has ancestor - ancestor-new
594 # target has ancestor - ancestor-new
594 # source has ancestor - ancestor-new - change-rebased
595 # source has ancestor - ancestor-new - change-rebased
595 backend.pull_heads(target, heads=['ancestor-new'])
596 backend.pull_heads(target, heads=['ancestor-new'])
596 backend.pull_heads(source, heads=['change-rebased'])
597 backend.pull_heads(source, heads=['change-rebased'])
597
598
598 # update PR
599 # update PR
599 self.app.post(
600 self.app.post(
600 url(controller='pullrequests', action='update',
601 url(controller='pullrequests', action='update',
601 repo_name=target.repo_name,
602 repo_name=target.repo_name,
602 pull_request_id=str(pull_request_id)),
603 pull_request_id=str(pull_request_id)),
603 params={'update_commits': 'true', '_method': 'put',
604 params={'update_commits': 'true', '_method': 'put',
604 'csrf_token': csrf_token},
605 'csrf_token': csrf_token},
605 status=200)
606 status=200)
606
607
607 # check that we have now both revisions
608 # check that we have now both revisions
608 pull_request = PullRequest.get(pull_request_id)
609 pull_request = PullRequest.get(pull_request_id)
609 assert pull_request.revisions == [commit_ids['change-rebased']]
610 assert pull_request.revisions == [commit_ids['change-rebased']]
610 assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
611 assert pull_request.target_ref == 'branch:{branch}:{commit_id}'.format(
611 branch=backend.default_branch_name,
612 branch=backend.default_branch_name,
612 commit_id=commit_ids['ancestor-new'])
613 commit_id=commit_ids['ancestor-new'])
613
614
614 # TODO: johbo: This should be a test on its own
615 # TODO: johbo: This should be a test on its own
615 response = self.app.get(url(
616 response = self.app.get(url(
616 controller='pullrequests', action='index',
617 controller='pullrequests', action='index',
617 repo_name=target.repo_name))
618 repo_name=target.repo_name))
618 assert response.status_int == 200
619 assert response.status_int == 200
619 assert 'Pull request updated to' in response.body
620 assert 'Pull request updated to' in response.body
620 assert 'with 1 added, 1 removed commits.' in response.body
621 assert 'with 1 added, 1 removed commits.' in response.body
621
622
622 def test_update_of_ancestor_reference(self, backend, csrf_token):
623 def test_update_of_ancestor_reference(self, backend, csrf_token):
623 commits = [
624 commits = [
624 {'message': 'ancestor'},
625 {'message': 'ancestor'},
625 {'message': 'change'},
626 {'message': 'change'},
626 {'message': 'change-2'},
627 {'message': 'change-2'},
627 {'message': 'ancestor-new', 'parents': ['ancestor']},
628 {'message': 'ancestor-new', 'parents': ['ancestor']},
628 {'message': 'change-rebased'},
629 {'message': 'change-rebased'},
629 ]
630 ]
630 commit_ids = backend.create_master_repo(commits)
631 commit_ids = backend.create_master_repo(commits)
631 target = backend.create_repo(heads=['ancestor'])
632 target = backend.create_repo(heads=['ancestor'])
632 source = backend.create_repo(heads=['change'])
633 source = backend.create_repo(heads=['change'])
633
634
634 # create pr from a in source to A in target
635 # create pr from a in source to A in target
635 pull_request = PullRequest()
636 pull_request = PullRequest()
636 pull_request.source_repo = source
637 pull_request.source_repo = source
637 # TODO: johbo: Make sure that we write the source ref this way!
638 # TODO: johbo: Make sure that we write the source ref this way!
638 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
639 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
639 branch=backend.default_branch_name,
640 branch=backend.default_branch_name,
640 commit_id=commit_ids['change'])
641 commit_id=commit_ids['change'])
641 pull_request.target_repo = target
642 pull_request.target_repo = target
642 # TODO: johbo: Target ref should be branch based, since tip can jump
643 # TODO: johbo: Target ref should be branch based, since tip can jump
643 # from branch to branch
644 # from branch to branch
644 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
645 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
645 branch=backend.default_branch_name,
646 branch=backend.default_branch_name,
646 commit_id=commit_ids['ancestor'])
647 commit_id=commit_ids['ancestor'])
647 pull_request.revisions = [commit_ids['change']]
648 pull_request.revisions = [commit_ids['change']]
648 pull_request.title = u"Test"
649 pull_request.title = u"Test"
649 pull_request.description = u"Description"
650 pull_request.description = u"Description"
650 pull_request.author = UserModel().get_by_username(
651 pull_request.author = UserModel().get_by_username(
651 TEST_USER_ADMIN_LOGIN)
652 TEST_USER_ADMIN_LOGIN)
652 Session().add(pull_request)
653 Session().add(pull_request)
653 Session().commit()
654 Session().commit()
654 pull_request_id = pull_request.pull_request_id
655 pull_request_id = pull_request.pull_request_id
655
656
656 # target has ancestor - ancestor-new
657 # target has ancestor - ancestor-new
657 # source has ancestor - ancestor-new - change-rebased
658 # source has ancestor - ancestor-new - change-rebased
658 backend.pull_heads(target, heads=['ancestor-new'])
659 backend.pull_heads(target, heads=['ancestor-new'])
659 backend.pull_heads(source, heads=['change-rebased'])
660 backend.pull_heads(source, heads=['change-rebased'])
660
661
661 # update PR
662 # update PR
662 self.app.post(
663 self.app.post(
663 url(controller='pullrequests', action='update',
664 url(controller='pullrequests', action='update',
664 repo_name=target.repo_name,
665 repo_name=target.repo_name,
665 pull_request_id=str(pull_request_id)),
666 pull_request_id=str(pull_request_id)),
666 params={'update_commits': 'true', '_method': 'put',
667 params={'update_commits': 'true', '_method': 'put',
667 'csrf_token': csrf_token},
668 'csrf_token': csrf_token},
668 status=200)
669 status=200)
669
670
670 # Expect the target reference to be updated correctly
671 # Expect the target reference to be updated correctly
671 pull_request = PullRequest.get(pull_request_id)
672 pull_request = PullRequest.get(pull_request_id)
672 assert pull_request.revisions == [commit_ids['change-rebased']]
673 assert pull_request.revisions == [commit_ids['change-rebased']]
673 expected_target_ref = 'branch:{branch}:{commit_id}'.format(
674 expected_target_ref = 'branch:{branch}:{commit_id}'.format(
674 branch=backend.default_branch_name,
675 branch=backend.default_branch_name,
675 commit_id=commit_ids['ancestor-new'])
676 commit_id=commit_ids['ancestor-new'])
676 assert pull_request.target_ref == expected_target_ref
677 assert pull_request.target_ref == expected_target_ref
677
678
678 def test_remove_pull_request_branch(self, backend_git, csrf_token):
679 def test_remove_pull_request_branch(self, backend_git, csrf_token):
679 branch_name = 'development'
680 branch_name = 'development'
680 commits = [
681 commits = [
681 {'message': 'initial-commit'},
682 {'message': 'initial-commit'},
682 {'message': 'old-feature'},
683 {'message': 'old-feature'},
683 {'message': 'new-feature', 'branch': branch_name},
684 {'message': 'new-feature', 'branch': branch_name},
684 ]
685 ]
685 repo = backend_git.create_repo(commits)
686 repo = backend_git.create_repo(commits)
686 commit_ids = backend_git.commit_ids
687 commit_ids = backend_git.commit_ids
687
688
688 pull_request = PullRequest()
689 pull_request = PullRequest()
689 pull_request.source_repo = repo
690 pull_request.source_repo = repo
690 pull_request.target_repo = repo
691 pull_request.target_repo = repo
691 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
692 pull_request.source_ref = 'branch:{branch}:{commit_id}'.format(
692 branch=branch_name, commit_id=commit_ids['new-feature'])
693 branch=branch_name, commit_id=commit_ids['new-feature'])
693 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
694 pull_request.target_ref = 'branch:{branch}:{commit_id}'.format(
694 branch=backend_git.default_branch_name,
695 branch=backend_git.default_branch_name,
695 commit_id=commit_ids['old-feature'])
696 commit_id=commit_ids['old-feature'])
696 pull_request.revisions = [commit_ids['new-feature']]
697 pull_request.revisions = [commit_ids['new-feature']]
697 pull_request.title = u"Test"
698 pull_request.title = u"Test"
698 pull_request.description = u"Description"
699 pull_request.description = u"Description"
699 pull_request.author = UserModel().get_by_username(
700 pull_request.author = UserModel().get_by_username(
700 TEST_USER_ADMIN_LOGIN)
701 TEST_USER_ADMIN_LOGIN)
701 Session().add(pull_request)
702 Session().add(pull_request)
702 Session().commit()
703 Session().commit()
703
704
704 vcs = repo.scm_instance()
705 vcs = repo.scm_instance()
705 vcs.remove_ref('refs/heads/{}'.format(branch_name))
706 vcs.remove_ref('refs/heads/{}'.format(branch_name))
706
707
707 response = self.app.get(url(
708 response = self.app.get(url(
708 controller='pullrequests', action='show',
709 controller='pullrequests', action='show',
709 repo_name=repo.repo_name,
710 repo_name=repo.repo_name,
710 pull_request_id=str(pull_request.pull_request_id)))
711 pull_request_id=str(pull_request.pull_request_id)))
711
712
712 assert response.status_int == 200
713 assert response.status_int == 200
713 assert_response = AssertResponse(response)
714 assert_response = AssertResponse(response)
714 assert_response.element_contains(
715 assert_response.element_contains(
715 '#changeset_compare_view_content .alert strong',
716 '#changeset_compare_view_content .alert strong',
716 'Missing commits')
717 'Missing commits')
717 assert_response.element_contains(
718 assert_response.element_contains(
718 '#changeset_compare_view_content .alert',
719 '#changeset_compare_view_content .alert',
719 'This pull request cannot be displayed, because one or more'
720 'This pull request cannot be displayed, because one or more'
720 ' commits no longer exist in the source repository.')
721 ' commits no longer exist in the source repository.')
721
722
722 def test_strip_commits_from_pull_request(
723 def test_strip_commits_from_pull_request(
723 self, backend, pr_util, csrf_token):
724 self, backend, pr_util, csrf_token):
724 commits = [
725 commits = [
725 {'message': 'initial-commit'},
726 {'message': 'initial-commit'},
726 {'message': 'old-feature'},
727 {'message': 'old-feature'},
727 {'message': 'new-feature', 'parents': ['initial-commit']},
728 {'message': 'new-feature', 'parents': ['initial-commit']},
728 ]
729 ]
729 pull_request = pr_util.create_pull_request(
730 pull_request = pr_util.create_pull_request(
730 commits, target_head='initial-commit', source_head='new-feature',
731 commits, target_head='initial-commit', source_head='new-feature',
731 revisions=['new-feature'])
732 revisions=['new-feature'])
732
733
733 vcs = pr_util.source_repository.scm_instance()
734 vcs = pr_util.source_repository.scm_instance()
734 if backend.alias == 'git':
735 if backend.alias == 'git':
735 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
736 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
736 else:
737 else:
737 vcs.strip(pr_util.commit_ids['new-feature'])
738 vcs.strip(pr_util.commit_ids['new-feature'])
738
739
739 response = self.app.get(url(
740 response = self.app.get(url(
740 controller='pullrequests', action='show',
741 controller='pullrequests', action='show',
741 repo_name=pr_util.target_repository.repo_name,
742 repo_name=pr_util.target_repository.repo_name,
742 pull_request_id=str(pull_request.pull_request_id)))
743 pull_request_id=str(pull_request.pull_request_id)))
743
744
744 assert response.status_int == 200
745 assert response.status_int == 200
745 assert_response = AssertResponse(response)
746 assert_response = AssertResponse(response)
746 assert_response.element_contains(
747 assert_response.element_contains(
747 '#changeset_compare_view_content .alert strong',
748 '#changeset_compare_view_content .alert strong',
748 'Missing commits')
749 'Missing commits')
749 assert_response.element_contains(
750 assert_response.element_contains(
750 '#changeset_compare_view_content .alert',
751 '#changeset_compare_view_content .alert',
751 'This pull request cannot be displayed, because one or more'
752 'This pull request cannot be displayed, because one or more'
752 ' commits no longer exist in the source repository.')
753 ' commits no longer exist in the source repository.')
753 assert_response.element_contains(
754 assert_response.element_contains(
754 '#update_commits',
755 '#update_commits',
755 'Update commits')
756 'Update commits')
756
757
757 def test_strip_commits_and_update(
758 def test_strip_commits_and_update(
758 self, backend, pr_util, csrf_token):
759 self, backend, pr_util, csrf_token):
759 commits = [
760 commits = [
760 {'message': 'initial-commit'},
761 {'message': 'initial-commit'},
761 {'message': 'old-feature'},
762 {'message': 'old-feature'},
762 {'message': 'new-feature', 'parents': ['old-feature']},
763 {'message': 'new-feature', 'parents': ['old-feature']},
763 ]
764 ]
764 pull_request = pr_util.create_pull_request(
765 pull_request = pr_util.create_pull_request(
765 commits, target_head='old-feature', source_head='new-feature',
766 commits, target_head='old-feature', source_head='new-feature',
766 revisions=['new-feature'], mergeable=True)
767 revisions=['new-feature'], mergeable=True)
767
768
768 vcs = pr_util.source_repository.scm_instance()
769 vcs = pr_util.source_repository.scm_instance()
769 if backend.alias == 'git':
770 if backend.alias == 'git':
770 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
771 vcs.strip(pr_util.commit_ids['new-feature'], branch_name='master')
771 else:
772 else:
772 vcs.strip(pr_util.commit_ids['new-feature'])
773 vcs.strip(pr_util.commit_ids['new-feature'])
773
774
774 response = self.app.post(
775 response = self.app.post(
775 url(controller='pullrequests', action='update',
776 url(controller='pullrequests', action='update',
776 repo_name=pull_request.target_repo.repo_name,
777 repo_name=pull_request.target_repo.repo_name,
777 pull_request_id=str(pull_request.pull_request_id)),
778 pull_request_id=str(pull_request.pull_request_id)),
778 params={'update_commits': 'true', '_method': 'put',
779 params={'update_commits': 'true', '_method': 'put',
779 'csrf_token': csrf_token})
780 'csrf_token': csrf_token})
780
781
781 assert response.status_int == 200
782 assert response.status_int == 200
782 assert response.body == 'true'
783 assert response.body == 'true'
783
784
784 # Make sure that after update, it won't raise 500 errors
785 # Make sure that after update, it won't raise 500 errors
785 response = self.app.get(url(
786 response = self.app.get(url(
786 controller='pullrequests', action='show',
787 controller='pullrequests', action='show',
787 repo_name=pr_util.target_repository.repo_name,
788 repo_name=pr_util.target_repository.repo_name,
788 pull_request_id=str(pull_request.pull_request_id)))
789 pull_request_id=str(pull_request.pull_request_id)))
789
790
790 assert response.status_int == 200
791 assert response.status_int == 200
791 assert_response = AssertResponse(response)
792 assert_response = AssertResponse(response)
792 assert_response.element_contains(
793 assert_response.element_contains(
793 '#changeset_compare_view_content .alert strong',
794 '#changeset_compare_view_content .alert strong',
794 'Missing commits')
795 'Missing commits')
795
796
796 def test_branch_is_a_link(self, pr_util):
797 def test_branch_is_a_link(self, pr_util):
797 pull_request = pr_util.create_pull_request()
798 pull_request = pr_util.create_pull_request()
798 pull_request.source_ref = 'branch:origin:1234567890abcdef'
799 pull_request.source_ref = 'branch:origin:1234567890abcdef'
799 pull_request.target_ref = 'branch:target:abcdef1234567890'
800 pull_request.target_ref = 'branch:target:abcdef1234567890'
800 Session().add(pull_request)
801 Session().add(pull_request)
801 Session().commit()
802 Session().commit()
802
803
803 response = self.app.get(url(
804 response = self.app.get(url(
804 controller='pullrequests', action='show',
805 controller='pullrequests', action='show',
805 repo_name=pull_request.target_repo.scm_instance().name,
806 repo_name=pull_request.target_repo.scm_instance().name,
806 pull_request_id=str(pull_request.pull_request_id)))
807 pull_request_id=str(pull_request.pull_request_id)))
807 assert response.status_int == 200
808 assert response.status_int == 200
808 assert_response = AssertResponse(response)
809 assert_response = AssertResponse(response)
809
810
810 origin = assert_response.get_element('.pr-origininfo .tag')
811 origin = assert_response.get_element('.pr-origininfo .tag')
811 origin_children = origin.getchildren()
812 origin_children = origin.getchildren()
812 assert len(origin_children) == 1
813 assert len(origin_children) == 1
813 target = assert_response.get_element('.pr-targetinfo .tag')
814 target = assert_response.get_element('.pr-targetinfo .tag')
814 target_children = target.getchildren()
815 target_children = target.getchildren()
815 assert len(target_children) == 1
816 assert len(target_children) == 1
816
817
817 expected_origin_link = url(
818 expected_origin_link = url(
818 'changelog_home',
819 'changelog_home',
819 repo_name=pull_request.source_repo.scm_instance().name,
820 repo_name=pull_request.source_repo.scm_instance().name,
820 branch='origin')
821 branch='origin')
821 expected_target_link = url(
822 expected_target_link = url(
822 'changelog_home',
823 'changelog_home',
823 repo_name=pull_request.target_repo.scm_instance().name,
824 repo_name=pull_request.target_repo.scm_instance().name,
824 branch='target')
825 branch='target')
825 assert origin_children[0].attrib['href'] == expected_origin_link
826 assert origin_children[0].attrib['href'] == expected_origin_link
826 assert origin_children[0].text == 'branch: origin'
827 assert origin_children[0].text == 'branch: origin'
827 assert target_children[0].attrib['href'] == expected_target_link
828 assert target_children[0].attrib['href'] == expected_target_link
828 assert target_children[0].text == 'branch: target'
829 assert target_children[0].text == 'branch: target'
829
830
830 def test_bookmark_is_not_a_link(self, pr_util):
831 def test_bookmark_is_not_a_link(self, pr_util):
831 pull_request = pr_util.create_pull_request()
832 pull_request = pr_util.create_pull_request()
832 pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
833 pull_request.source_ref = 'bookmark:origin:1234567890abcdef'
833 pull_request.target_ref = 'bookmark:target:abcdef1234567890'
834 pull_request.target_ref = 'bookmark:target:abcdef1234567890'
834 Session().add(pull_request)
835 Session().add(pull_request)
835 Session().commit()
836 Session().commit()
836
837
837 response = self.app.get(url(
838 response = self.app.get(url(
838 controller='pullrequests', action='show',
839 controller='pullrequests', action='show',
839 repo_name=pull_request.target_repo.scm_instance().name,
840 repo_name=pull_request.target_repo.scm_instance().name,
840 pull_request_id=str(pull_request.pull_request_id)))
841 pull_request_id=str(pull_request.pull_request_id)))
841 assert response.status_int == 200
842 assert response.status_int == 200
842 assert_response = AssertResponse(response)
843 assert_response = AssertResponse(response)
843
844
844 origin = assert_response.get_element('.pr-origininfo .tag')
845 origin = assert_response.get_element('.pr-origininfo .tag')
845 assert origin.text.strip() == 'bookmark: origin'
846 assert origin.text.strip() == 'bookmark: origin'
846 assert origin.getchildren() == []
847 assert origin.getchildren() == []
847
848
848 target = assert_response.get_element('.pr-targetinfo .tag')
849 target = assert_response.get_element('.pr-targetinfo .tag')
849 assert target.text.strip() == 'bookmark: target'
850 assert target.text.strip() == 'bookmark: target'
850 assert target.getchildren() == []
851 assert target.getchildren() == []
851
852
852 def test_tag_is_not_a_link(self, pr_util):
853 def test_tag_is_not_a_link(self, pr_util):
853 pull_request = pr_util.create_pull_request()
854 pull_request = pr_util.create_pull_request()
854 pull_request.source_ref = 'tag:origin:1234567890abcdef'
855 pull_request.source_ref = 'tag:origin:1234567890abcdef'
855 pull_request.target_ref = 'tag:target:abcdef1234567890'
856 pull_request.target_ref = 'tag:target:abcdef1234567890'
856 Session().add(pull_request)
857 Session().add(pull_request)
857 Session().commit()
858 Session().commit()
858
859
859 response = self.app.get(url(
860 response = self.app.get(url(
860 controller='pullrequests', action='show',
861 controller='pullrequests', action='show',
861 repo_name=pull_request.target_repo.scm_instance().name,
862 repo_name=pull_request.target_repo.scm_instance().name,
862 pull_request_id=str(pull_request.pull_request_id)))
863 pull_request_id=str(pull_request.pull_request_id)))
863 assert response.status_int == 200
864 assert response.status_int == 200
864 assert_response = AssertResponse(response)
865 assert_response = AssertResponse(response)
865
866
866 origin = assert_response.get_element('.pr-origininfo .tag')
867 origin = assert_response.get_element('.pr-origininfo .tag')
867 assert origin.text.strip() == 'tag: origin'
868 assert origin.text.strip() == 'tag: origin'
868 assert origin.getchildren() == []
869 assert origin.getchildren() == []
869
870
870 target = assert_response.get_element('.pr-targetinfo .tag')
871 target = assert_response.get_element('.pr-targetinfo .tag')
871 assert target.text.strip() == 'tag: target'
872 assert target.text.strip() == 'tag: target'
872 assert target.getchildren() == []
873 assert target.getchildren() == []
873
874
874 def test_description_is_escaped_on_index_page(self, backend, pr_util):
875 def test_description_is_escaped_on_index_page(self, backend, pr_util):
875 xss_description = "<script>alert('Hi!')</script>"
876 xss_description = "<script>alert('Hi!')</script>"
876 pull_request = pr_util.create_pull_request(description=xss_description)
877 pull_request = pr_util.create_pull_request(description=xss_description)
877 response = self.app.get(url(
878 response = self.app.get(url(
878 controller='pullrequests', action='show_all',
879 controller='pullrequests', action='show_all',
879 repo_name=pull_request.target_repo.repo_name))
880 repo_name=pull_request.target_repo.repo_name))
880 response.mustcontain(
881 response.mustcontain(
881 "&lt;script&gt;alert(&#39;Hi!&#39;)&lt;/script&gt;")
882 "&lt;script&gt;alert(&#39;Hi!&#39;)&lt;/script&gt;")
882
883
883
884
884 def assert_pull_request_status(pull_request, expected_status):
885 def assert_pull_request_status(pull_request, expected_status):
885 status = ChangesetStatusModel().calculated_review_status(
886 status = ChangesetStatusModel().calculated_review_status(
886 pull_request=pull_request)
887 pull_request=pull_request)
887 assert status == expected_status
888 assert status == expected_status
888
889
889
890
890 @pytest.mark.parametrize('action', ['show_all', 'index', 'create'])
891 @pytest.mark.parametrize('action', ['show_all', 'index', 'create'])
891 @pytest.mark.usefixtures("autologin_user")
892 @pytest.mark.usefixtures("autologin_user")
892 def test_redirects_to_repo_summary_for_svn_repositories(
893 def test_redirects_to_repo_summary_for_svn_repositories(
893 backend_svn, app, action):
894 backend_svn, app, action):
894 denied_actions = ['show_all', 'index', 'create']
895 denied_actions = ['show_all', 'index', 'create']
895 for action in denied_actions:
896 for action in denied_actions:
896 response = app.get(url(
897 response = app.get(url(
897 controller='pullrequests', action=action,
898 controller='pullrequests', action=action,
898 repo_name=backend_svn.repo_name))
899 repo_name=backend_svn.repo_name))
899 assert response.status_int == 302
900 assert response.status_int == 302
900
901
901 # Not allowed, redirect to the summary
902 # Not allowed, redirect to the summary
902 redirected = response.follow()
903 redirected = response.follow()
903 summary_url = url('summary_home', repo_name=backend_svn.repo_name)
904 summary_url = url('summary_home', repo_name=backend_svn.repo_name)
904
905
905 # URL adds leading slash and path doesn't have it
906 # URL adds leading slash and path doesn't have it
906 assert redirected.req.path == summary_url
907 assert redirected.req.path == summary_url
907
908
908
909
909 def test_delete_comment_returns_404_if_comment_does_not_exist(pylonsapp):
910 def test_delete_comment_returns_404_if_comment_does_not_exist(pylonsapp):
910 # TODO: johbo: Global import not possible because models.forms blows up
911 # TODO: johbo: Global import not possible because models.forms blows up
911 from rhodecode.controllers.pullrequests import PullrequestsController
912 from rhodecode.controllers.pullrequests import PullrequestsController
912 controller = PullrequestsController()
913 controller = PullrequestsController()
913 patcher = mock.patch(
914 patcher = mock.patch(
914 'rhodecode.model.db.BaseModel.get', return_value=None)
915 'rhodecode.model.db.BaseModel.get', return_value=None)
915 with pytest.raises(HTTPNotFound), patcher:
916 with pytest.raises(HTTPNotFound), patcher:
916 controller._delete_comment(1)
917 controller._delete_comment(1)
General Comments 0
You need to be logged in to leave comments. Login now