##// END OF EJS Templates
auth-tokens: fixed tests
marcink -
r1482:9278d852 default
parent child Browse files
Show More

The requested changes are too big and content was truncated. Show full diff

@@ -1,42 +1,52 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.model.meta import Session
24 24 from rhodecode.model.user import UserModel
25 from rhodecode.model.auth_token import AuthTokenModel
25 26 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
26 27
27 28
28 29 @pytest.fixture(scope="class")
29 30 def testuser_api(request, pylonsapp):
30 31 cls = request.cls
32
33 # ADMIN USER
31 34 cls.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
32 35 cls.apikey = cls.usr.api_key
36
37 # REGULAR USER
33 38 cls.test_user = UserModel().create_or_update(
34 39 username='test-api',
35 40 password='test',
36 41 email='test@api.rhodecode.org',
37 42 firstname='first',
38 43 lastname='last'
39 44 )
45 # create TOKEN for user, if he doesn't have one
46 if not cls.test_user.api_key:
47 AuthTokenModel().create(
48 user=cls.test_user, description='TEST_USER_TOKEN')
49
40 50 Session().commit()
41 51 cls.TEST_USER_LOGIN = cls.test_user.username
42 52 cls.apikey_regular = cls.test_user.api_key
@@ -1,111 +1,112 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.model.db import UserLog
24 24 from rhodecode.model.pull_request import PullRequestModel
25 25 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
26 26 from rhodecode.api.tests.utils import (
27 27 build_data, api_call, assert_error, assert_ok)
28 28
29 29
30 30 @pytest.mark.usefixtures("testuser_api", "app")
31 31 class TestClosePullRequest(object):
32
32 33 @pytest.mark.backends("git", "hg")
33 34 def test_api_close_pull_request(self, pr_util):
34 35 pull_request = pr_util.create_pull_request()
35 36 pull_request_id = pull_request.pull_request_id
36 37 author = pull_request.user_id
37 38 repo = pull_request.target_repo.repo_id
38 39 id_, params = build_data(
39 40 self.apikey, 'close_pull_request',
40 41 repoid=pull_request.target_repo.repo_name,
41 42 pullrequestid=pull_request.pull_request_id)
42 43 response = api_call(self.app, params)
43 44 expected = {
44 45 'pull_request_id': pull_request_id,
45 46 'closed': True,
46 47 }
47 48 assert_ok(id_, expected, response.body)
48 49 action = 'user_closed_pull_request:%d' % pull_request_id
49 50 journal = UserLog.query()\
50 51 .filter(UserLog.user_id == author)\
51 52 .filter(UserLog.repository_id == repo)\
52 53 .filter(UserLog.action == action)\
53 54 .all()
54 55 assert len(journal) == 1
55 56
56 57 @pytest.mark.backends("git", "hg")
57 58 def test_api_close_pull_request_already_closed_error(self, pr_util):
58 59 pull_request = pr_util.create_pull_request()
59 60 pull_request_id = pull_request.pull_request_id
60 61 pull_request_repo = pull_request.target_repo.repo_name
61 62 PullRequestModel().close_pull_request(
62 63 pull_request, pull_request.author)
63 64 id_, params = build_data(
64 65 self.apikey, 'close_pull_request',
65 66 repoid=pull_request_repo, pullrequestid=pull_request_id)
66 67 response = api_call(self.app, params)
67 68
68 69 expected = 'pull request `%s` is already closed' % pull_request_id
69 70 assert_error(id_, expected, given=response.body)
70 71
71 72 @pytest.mark.backends("git", "hg")
72 73 def test_api_close_pull_request_repo_error(self):
73 74 id_, params = build_data(
74 75 self.apikey, 'close_pull_request',
75 76 repoid=666, pullrequestid=1)
76 77 response = api_call(self.app, params)
77 78
78 79 expected = 'repository `666` does not exist'
79 80 assert_error(id_, expected, given=response.body)
80 81
81 82 @pytest.mark.backends("git", "hg")
82 83 def test_api_close_pull_request_non_admin_with_userid_error(self,
83 84 pr_util):
84 85 pull_request = pr_util.create_pull_request()
85 86 id_, params = build_data(
86 87 self.apikey_regular, 'close_pull_request',
87 88 repoid=pull_request.target_repo.repo_name,
88 89 pullrequestid=pull_request.pull_request_id,
89 90 userid=TEST_USER_ADMIN_LOGIN)
90 91 response = api_call(self.app, params)
91 92
92 93 expected = 'userid is not the same as your user'
93 94 assert_error(id_, expected, given=response.body)
94 95
95 96 @pytest.mark.backends("git", "hg")
96 97 def test_api_close_pull_request_no_perms_to_close(
97 98 self, user_util, pr_util):
98 99 user = user_util.create_user()
99 100 pull_request = pr_util.create_pull_request()
100 101
101 102 id_, params = build_data(
102 103 user.api_key, 'close_pull_request',
103 104 repoid=pull_request.target_repo.repo_name,
104 105 pullrequestid=pull_request.pull_request_id,)
105 106 response = api_call(self.app, params)
106 107
107 108 expected = ('pull request `%s` close failed, '
108 109 'no permission to close.') % pull_request.pull_request_id
109 110
110 111 response_json = response.json['error']
111 112 assert response_json == expected
@@ -1,73 +1,72 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.repo import RepoModel
25 25 from rhodecode.api.tests.utils import (
26 26 build_data, api_call, assert_error, assert_ok, crash)
27 27
28 28
29 29 @pytest.mark.usefixtures("testuser_api", "app")
30 30 class TestApiDeleteRepo(object):
31 31 def test_api_delete_repo(self, backend):
32 32 repo = backend.create_repo()
33 33
34 34 id_, params = build_data(
35 35 self.apikey, 'delete_repo', repoid=repo.repo_name, )
36 36 response = api_call(self.app, params)
37 37
38 38 expected = {
39 39 'msg': 'Deleted repository `%s`' % (repo.repo_name,),
40 40 'success': True
41 41 }
42 42 assert_ok(id_, expected, given=response.body)
43 43
44 44 def test_api_delete_repo_by_non_admin(self, backend, user_regular):
45 45 repo = backend.create_repo(cur_user=user_regular.username)
46 46 id_, params = build_data(
47 47 user_regular.api_key, 'delete_repo', repoid=repo.repo_name, )
48 48 response = api_call(self.app, params)
49 49
50 50 expected = {
51 51 'msg': 'Deleted repository `%s`' % (repo.repo_name,),
52 52 'success': True
53 53 }
54 54 assert_ok(id_, expected, given=response.body)
55 55
56 def test_api_delete_repo_by_non_admin_no_permission(
57 self, backend, user_regular):
56 def test_api_delete_repo_by_non_admin_no_permission(self, backend):
58 57 repo = backend.create_repo()
59 58 id_, params = build_data(
60 user_regular.api_key, 'delete_repo', repoid=repo.repo_name, )
59 self.apikey_regular, 'delete_repo', repoid=repo.repo_name, )
61 60 response = api_call(self.app, params)
62 61 expected = 'repository `%s` does not exist' % (repo.repo_name)
63 62 assert_error(id_, expected, given=response.body)
64 63
65 64 def test_api_delete_repo_exception_occurred(self, backend):
66 65 repo = backend.create_repo()
67 66 id_, params = build_data(
68 67 self.apikey, 'delete_repo', repoid=repo.repo_name, )
69 68 with mock.patch.object(RepoModel, 'delete', crash):
70 69 response = api_call(self.app, params)
71 70 expected = 'failed to delete repository `%s`' % (
72 71 repo.repo_name,)
73 72 assert_error(id_, expected, given=response.body)
@@ -1,472 +1,471 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2011-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import logging
22 22
23 23 from rhodecode.api import jsonrpc_method, JSONRPCError, JSONRPCForbidden
24 24 from rhodecode.api.utils import (
25 25 Optional, OAttr, has_superadmin_permission, get_user_or_error, store_update)
26 26 from rhodecode.lib.auth import AuthUser, PasswordGenerator
27 27 from rhodecode.lib.exceptions import DefaultUserException
28 28 from rhodecode.lib.utils2 import safe_int, str2bool
29 29 from rhodecode.model.db import Session, User, Repository
30 30 from rhodecode.model.user import UserModel
31 31
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 @jsonrpc_method()
37 37 def get_user(request, apiuser, userid=Optional(OAttr('apiuser'))):
38 38 """
39 39 Returns the information associated with a username or userid.
40 40
41 41 * If the ``userid`` is not set, this command returns the information
42 42 for the ``userid`` calling the method.
43 43
44 44 .. note::
45 45
46 46 Normal users may only run this command against their ``userid``. For
47 47 full privileges you must run this command using an |authtoken| with
48 48 admin rights.
49 49
50 50 :param apiuser: This is filled automatically from the |authtoken|.
51 51 :type apiuser: AuthUser
52 52 :param userid: Sets the userid for which data will be returned.
53 53 :type userid: Optional(str or int)
54 54
55 55 Example output:
56 56
57 57 .. code-block:: bash
58 58
59 59 {
60 60 "error": null,
61 61 "id": <id>,
62 62 "result": {
63 63 "active": true,
64 64 "admin": false,
65 "api_key": "api-key",
66 65 "api_keys": [ list of keys ],
67 66 "email": "user@example.com",
68 67 "emails": [
69 68 "user@example.com"
70 69 ],
71 70 "extern_name": "rhodecode",
72 71 "extern_type": "rhodecode",
73 72 "firstname": "username",
74 73 "ip_addresses": [],
75 74 "language": null,
76 75 "last_login": "Timestamp",
77 76 "lastname": "surnae",
78 77 "permissions": {
79 78 "global": [
80 79 "hg.inherit_default_perms.true",
81 80 "usergroup.read",
82 81 "hg.repogroup.create.false",
83 82 "hg.create.none",
84 83 "hg.password_reset.enabled",
85 84 "hg.extern_activate.manual",
86 85 "hg.create.write_on_repogroup.false",
87 86 "hg.usergroup.create.false",
88 87 "group.none",
89 88 "repository.none",
90 89 "hg.register.none",
91 90 "hg.fork.repository"
92 91 ],
93 92 "repositories": { "username/example": "repository.write"},
94 93 "repositories_groups": { "user-group/repo": "group.none" },
95 94 "user_groups": { "user_group_name": "usergroup.read" }
96 95 },
97 96 "user_id": 32,
98 97 "username": "username"
99 98 }
100 99 }
101 100 """
102 101
103 102 if not has_superadmin_permission(apiuser):
104 103 # make sure normal user does not pass someone else userid,
105 104 # he is not allowed to do that
106 105 if not isinstance(userid, Optional) and userid != apiuser.user_id:
107 106 raise JSONRPCError('userid is not the same as your user')
108 107
109 108 userid = Optional.extract(userid, evaluate_locals=locals())
110 109 userid = getattr(userid, 'user_id', userid)
111 110
112 111 user = get_user_or_error(userid)
113 112 data = user.get_api_data(include_secrets=True)
114 113 data['permissions'] = AuthUser(user_id=user.user_id).permissions
115 114 return data
116 115
117 116
118 117 @jsonrpc_method()
119 118 def get_users(request, apiuser):
120 119 """
121 120 Lists all users in the |RCE| user database.
122 121
123 122 This command can only be run using an |authtoken| with admin rights to
124 123 the specified repository.
125 124
126 125 This command takes the following options:
127 126
128 127 :param apiuser: This is filled automatically from the |authtoken|.
129 128 :type apiuser: AuthUser
130 129
131 130 Example output:
132 131
133 132 .. code-block:: bash
134 133
135 134 id : <id_given_in_input>
136 135 result: [<user_object>, ...]
137 136 error: null
138 137 """
139 138
140 139 if not has_superadmin_permission(apiuser):
141 140 raise JSONRPCForbidden()
142 141
143 142 result = []
144 143 users_list = User.query().order_by(User.username) \
145 144 .filter(User.username != User.DEFAULT_USER) \
146 145 .all()
147 146 for user in users_list:
148 147 result.append(user.get_api_data(include_secrets=True))
149 148 return result
150 149
151 150
152 151 @jsonrpc_method()
153 152 def create_user(request, apiuser, username, email, password=Optional(''),
154 153 firstname=Optional(''), lastname=Optional(''),
155 154 active=Optional(True), admin=Optional(False),
156 155 extern_name=Optional('rhodecode'),
157 156 extern_type=Optional('rhodecode'),
158 157 force_password_change=Optional(False),
159 158 create_personal_repo_group=Optional(None)):
160 159 """
161 160 Creates a new user and returns the new user object.
162 161
163 162 This command can only be run using an |authtoken| with admin rights to
164 163 the specified repository.
165 164
166 165 This command takes the following options:
167 166
168 167 :param apiuser: This is filled automatically from the |authtoken|.
169 168 :type apiuser: AuthUser
170 169 :param username: Set the new username.
171 170 :type username: str or int
172 171 :param email: Set the user email address.
173 172 :type email: str
174 173 :param password: Set the new user password.
175 174 :type password: Optional(str)
176 175 :param firstname: Set the new user firstname.
177 176 :type firstname: Optional(str)
178 177 :param lastname: Set the new user surname.
179 178 :type lastname: Optional(str)
180 179 :param active: Set the user as active.
181 180 :type active: Optional(``True`` | ``False``)
182 181 :param admin: Give the new user admin rights.
183 182 :type admin: Optional(``True`` | ``False``)
184 183 :param extern_name: Set the authentication plugin name.
185 184 Using LDAP this is filled with LDAP UID.
186 185 :type extern_name: Optional(str)
187 186 :param extern_type: Set the new user authentication plugin.
188 187 :type extern_type: Optional(str)
189 188 :param force_password_change: Force the new user to change password
190 189 on next login.
191 190 :type force_password_change: Optional(``True`` | ``False``)
192 191 :param create_personal_repo_group: Create personal repo group for this user
193 192 :type create_personal_repo_group: Optional(``True`` | ``False``)
194 193 Example output:
195 194
196 195 .. code-block:: bash
197 196
198 197 id : <id_given_in_input>
199 198 result: {
200 199 "msg" : "created new user `<username>`",
201 200 "user": <user_obj>
202 201 }
203 202 error: null
204 203
205 204 Example error output:
206 205
207 206 .. code-block:: bash
208 207
209 208 id : <id_given_in_input>
210 209 result : null
211 210 error : {
212 211 "user `<username>` already exist"
213 212 or
214 213 "email `<email>` already exist"
215 214 or
216 215 "failed to create user `<username>`"
217 216 }
218 217
219 218 """
220 219 if not has_superadmin_permission(apiuser):
221 220 raise JSONRPCForbidden()
222 221
223 222 if UserModel().get_by_username(username):
224 223 raise JSONRPCError("user `%s` already exist" % (username,))
225 224
226 225 if UserModel().get_by_email(email, case_insensitive=True):
227 226 raise JSONRPCError("email `%s` already exist" % (email,))
228 227
229 228 # generate random password if we actually given the
230 229 # extern_name and it's not rhodecode
231 230 if (not isinstance(extern_name, Optional) and
232 231 Optional.extract(extern_name) != 'rhodecode'):
233 232 # generate temporary password if user is external
234 233 password = PasswordGenerator().gen_password(length=16)
235 234 create_repo_group = Optional.extract(create_personal_repo_group)
236 235 if isinstance(create_repo_group, basestring):
237 236 create_repo_group = str2bool(create_repo_group)
238 237
239 238 try:
240 239 user = UserModel().create_or_update(
241 240 username=Optional.extract(username),
242 241 password=Optional.extract(password),
243 242 email=Optional.extract(email),
244 243 firstname=Optional.extract(firstname),
245 244 lastname=Optional.extract(lastname),
246 245 active=Optional.extract(active),
247 246 admin=Optional.extract(admin),
248 247 extern_type=Optional.extract(extern_type),
249 248 extern_name=Optional.extract(extern_name),
250 249 force_password_change=Optional.extract(force_password_change),
251 250 create_repo_group=create_repo_group
252 251 )
253 252 Session().commit()
254 253 return {
255 254 'msg': 'created new user `%s`' % username,
256 255 'user': user.get_api_data(include_secrets=True)
257 256 }
258 257 except Exception:
259 258 log.exception('Error occurred during creation of user')
260 259 raise JSONRPCError('failed to create user `%s`' % (username,))
261 260
262 261
263 262 @jsonrpc_method()
264 263 def update_user(request, apiuser, userid, username=Optional(None),
265 264 email=Optional(None), password=Optional(None),
266 265 firstname=Optional(None), lastname=Optional(None),
267 266 active=Optional(None), admin=Optional(None),
268 267 extern_type=Optional(None), extern_name=Optional(None), ):
269 268 """
270 269 Updates the details for the specified user, if that user exists.
271 270
272 271 This command can only be run using an |authtoken| with admin rights to
273 272 the specified repository.
274 273
275 274 This command takes the following options:
276 275
277 276 :param apiuser: This is filled automatically from |authtoken|.
278 277 :type apiuser: AuthUser
279 278 :param userid: Set the ``userid`` to update.
280 279 :type userid: str or int
281 280 :param username: Set the new username.
282 281 :type username: str or int
283 282 :param email: Set the new email.
284 283 :type email: str
285 284 :param password: Set the new password.
286 285 :type password: Optional(str)
287 286 :param firstname: Set the new first name.
288 287 :type firstname: Optional(str)
289 288 :param lastname: Set the new surname.
290 289 :type lastname: Optional(str)
291 290 :param active: Set the new user as active.
292 291 :type active: Optional(``True`` | ``False``)
293 292 :param admin: Give the user admin rights.
294 293 :type admin: Optional(``True`` | ``False``)
295 294 :param extern_name: Set the authentication plugin user name.
296 295 Using LDAP this is filled with LDAP UID.
297 296 :type extern_name: Optional(str)
298 297 :param extern_type: Set the authentication plugin type.
299 298 :type extern_type: Optional(str)
300 299
301 300
302 301 Example output:
303 302
304 303 .. code-block:: bash
305 304
306 305 id : <id_given_in_input>
307 306 result: {
308 307 "msg" : "updated user ID:<userid> <username>",
309 308 "user": <user_object>,
310 309 }
311 310 error: null
312 311
313 312 Example error output:
314 313
315 314 .. code-block:: bash
316 315
317 316 id : <id_given_in_input>
318 317 result : null
319 318 error : {
320 319 "failed to update user `<username>`"
321 320 }
322 321
323 322 """
324 323 if not has_superadmin_permission(apiuser):
325 324 raise JSONRPCForbidden()
326 325
327 326 user = get_user_or_error(userid)
328 327
329 328 # only non optional arguments will be stored in updates
330 329 updates = {}
331 330
332 331 try:
333 332
334 333 store_update(updates, username, 'username')
335 334 store_update(updates, password, 'password')
336 335 store_update(updates, email, 'email')
337 336 store_update(updates, firstname, 'name')
338 337 store_update(updates, lastname, 'lastname')
339 338 store_update(updates, active, 'active')
340 339 store_update(updates, admin, 'admin')
341 340 store_update(updates, extern_name, 'extern_name')
342 341 store_update(updates, extern_type, 'extern_type')
343 342
344 343 user = UserModel().update_user(user, **updates)
345 344 Session().commit()
346 345 return {
347 346 'msg': 'updated user ID:%s %s' % (user.user_id, user.username),
348 347 'user': user.get_api_data(include_secrets=True)
349 348 }
350 349 except DefaultUserException:
351 350 log.exception("Default user edit exception")
352 351 raise JSONRPCError('editing default user is forbidden')
353 352 except Exception:
354 353 log.exception("Error occurred during update of user")
355 354 raise JSONRPCError('failed to update user `%s`' % (userid,))
356 355
357 356
358 357 @jsonrpc_method()
359 358 def delete_user(request, apiuser, userid):
360 359 """
361 360 Deletes the specified user from the |RCE| user database.
362 361
363 362 This command can only be run using an |authtoken| with admin rights to
364 363 the specified repository.
365 364
366 365 .. important::
367 366
368 367 Ensure all open pull requests and open code review
369 368 requests to this user are close.
370 369
371 370 Also ensure all repositories, or repository groups owned by this
372 371 user are reassigned before deletion.
373 372
374 373 This command takes the following options:
375 374
376 375 :param apiuser: This is filled automatically from the |authtoken|.
377 376 :type apiuser: AuthUser
378 377 :param userid: Set the user to delete.
379 378 :type userid: str or int
380 379
381 380 Example output:
382 381
383 382 .. code-block:: bash
384 383
385 384 id : <id_given_in_input>
386 385 result: {
387 386 "msg" : "deleted user ID:<userid> <username>",
388 387 "user": null
389 388 }
390 389 error: null
391 390
392 391 Example error output:
393 392
394 393 .. code-block:: bash
395 394
396 395 id : <id_given_in_input>
397 396 result : null
398 397 error : {
399 398 "failed to delete user ID:<userid> <username>"
400 399 }
401 400
402 401 """
403 402 if not has_superadmin_permission(apiuser):
404 403 raise JSONRPCForbidden()
405 404
406 405 user = get_user_or_error(userid)
407 406
408 407 try:
409 408 UserModel().delete(userid)
410 409 Session().commit()
411 410 return {
412 411 'msg': 'deleted user ID:%s %s' % (user.user_id, user.username),
413 412 'user': None
414 413 }
415 414 except Exception:
416 415 log.exception("Error occurred during deleting of user")
417 416 raise JSONRPCError(
418 417 'failed to delete user ID:%s %s' % (user.user_id, user.username))
419 418
420 419
421 420 @jsonrpc_method()
422 421 def get_user_locks(request, apiuser, userid=Optional(OAttr('apiuser'))):
423 422 """
424 423 Displays all repositories locked by the specified user.
425 424
426 425 * If this command is run by a non-admin user, it returns
427 426 a list of |repos| locked by that user.
428 427
429 428 This command takes the following options:
430 429
431 430 :param apiuser: This is filled automatically from the |authtoken|.
432 431 :type apiuser: AuthUser
433 432 :param userid: Sets the userid whose list of locked |repos| will be
434 433 displayed.
435 434 :type userid: Optional(str or int)
436 435
437 436 Example output:
438 437
439 438 .. code-block:: bash
440 439
441 440 id : <id_given_in_input>
442 441 result : {
443 442 [repo_object, repo_object,...]
444 443 }
445 444 error : null
446 445 """
447 446
448 447 include_secrets = False
449 448 if not has_superadmin_permission(apiuser):
450 449 # make sure normal user does not pass someone else userid,
451 450 # he is not allowed to do that
452 451 if not isinstance(userid, Optional) and userid != apiuser.user_id:
453 452 raise JSONRPCError('userid is not the same as your user')
454 453 else:
455 454 include_secrets = True
456 455
457 456 userid = Optional.extract(userid, evaluate_locals=locals())
458 457 userid = getattr(userid, 'user_id', userid)
459 458 user = get_user_or_error(userid)
460 459
461 460 ret = []
462 461
463 462 # show all locks
464 463 for r in Repository.getAll():
465 464 _user_id, _time, _reason = r.locked
466 465 if _user_id and _time:
467 466 _api_data = r.get_api_data(include_secrets=include_secrets)
468 467 # if we use user filter just show the locks for this user
469 468 if safe_int(_user_id) == user.user_id:
470 469 ret.append(_api_data)
471 470
472 471 return ret
@@ -1,598 +1,600 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Database creation, and setup module for RhodeCode Enterprise. Used for creation
23 23 of database as well as for migration operations
24 24 """
25 25
26 26 import os
27 27 import sys
28 28 import time
29 29 import uuid
30 30 import logging
31 31 import getpass
32 32 from os.path import dirname as dn, join as jn
33 33
34 34 from sqlalchemy.engine import create_engine
35 35
36 36 from rhodecode import __dbversion__
37 37 from rhodecode.model import init_model
38 38 from rhodecode.model.user import UserModel
39 39 from rhodecode.model.db import (
40 40 User, Permission, RhodeCodeUi, RhodeCodeSetting, UserToPerm,
41 41 DbMigrateVersion, RepoGroup, UserRepoGroupToPerm, CacheKey, Repository)
42 42 from rhodecode.model.meta import Session, Base
43 43 from rhodecode.model.permission import PermissionModel
44 44 from rhodecode.model.repo import RepoModel
45 45 from rhodecode.model.repo_group import RepoGroupModel
46 46 from rhodecode.model.settings import SettingsModel
47 47
48 48
49 49 log = logging.getLogger(__name__)
50 50
51 51
52 52 def notify(msg):
53 53 """
54 54 Notification for migrations messages
55 55 """
56 56 ml = len(msg) + (4 * 2)
57 57 print('\n%s\n*** %s ***\n%s' % ('*' * ml, msg, '*' * ml)).upper()
58 58
59 59
60 60 class DbManage(object):
61 61
62 62 def __init__(self, log_sql, dbconf, root, tests=False,
63 63 SESSION=None, cli_args={}):
64 64 self.dbname = dbconf.split('/')[-1]
65 65 self.tests = tests
66 66 self.root = root
67 67 self.dburi = dbconf
68 68 self.log_sql = log_sql
69 69 self.db_exists = False
70 70 self.cli_args = cli_args
71 71 self.init_db(SESSION=SESSION)
72 72 self.ask_ok = self.get_ask_ok_func(self.cli_args.get('force_ask'))
73 73
74 74 def get_ask_ok_func(self, param):
75 75 if param not in [None]:
76 76 # return a function lambda that has a default set to param
77 77 return lambda *args, **kwargs: param
78 78 else:
79 79 from rhodecode.lib.utils import ask_ok
80 80 return ask_ok
81 81
82 82 def init_db(self, SESSION=None):
83 83 if SESSION:
84 84 self.sa = SESSION
85 85 else:
86 86 # init new sessions
87 87 engine = create_engine(self.dburi, echo=self.log_sql)
88 88 init_model(engine)
89 89 self.sa = Session()
90 90
91 91 def create_tables(self, override=False):
92 92 """
93 93 Create a auth database
94 94 """
95 95
96 96 log.info("Existing database with the same name is going to be destroyed.")
97 97 log.info("Setup command will run DROP ALL command on that database.")
98 98 if self.tests:
99 99 destroy = True
100 100 else:
101 101 destroy = self.ask_ok('Are you sure that you want to destroy the old database? [y/n]')
102 102 if not destroy:
103 103 log.info('Nothing done.')
104 104 sys.exit(0)
105 105 if destroy:
106 106 Base.metadata.drop_all()
107 107
108 108 checkfirst = not override
109 109 Base.metadata.create_all(checkfirst=checkfirst)
110 110 log.info('Created tables for %s' % self.dbname)
111 111
112 112 def set_db_version(self):
113 113 ver = DbMigrateVersion()
114 114 ver.version = __dbversion__
115 115 ver.repository_id = 'rhodecode_db_migrations'
116 116 ver.repository_path = 'versions'
117 117 self.sa.add(ver)
118 118 log.info('db version set to: %s' % __dbversion__)
119 119
120 120 def run_pre_migration_tasks(self):
121 121 """
122 122 Run various tasks before actually doing migrations
123 123 """
124 124 # delete cache keys on each upgrade
125 125 total = CacheKey.query().count()
126 126 log.info("Deleting (%s) cache keys now...", total)
127 127 CacheKey.delete_all_cache()
128 128
129 129 def upgrade(self):
130 130 """
131 131 Upgrades given database schema to given revision following
132 132 all needed steps, to perform the upgrade
133 133
134 134 """
135 135
136 136 from rhodecode.lib.dbmigrate.migrate.versioning import api
137 137 from rhodecode.lib.dbmigrate.migrate.exceptions import \
138 138 DatabaseNotControlledError
139 139
140 140 if 'sqlite' in self.dburi:
141 141 print (
142 142 '********************** WARNING **********************\n'
143 143 'Make sure your version of sqlite is at least 3.7.X. \n'
144 144 'Earlier versions are known to fail on some migrations\n'
145 145 '*****************************************************\n')
146 146
147 147 upgrade = self.ask_ok(
148 148 'You are about to perform a database upgrade. Make '
149 149 'sure you have backed up your database. '
150 150 'Continue ? [y/n]')
151 151 if not upgrade:
152 152 log.info('No upgrade performed')
153 153 sys.exit(0)
154 154
155 155 repository_path = jn(dn(dn(dn(os.path.realpath(__file__)))),
156 156 'rhodecode/lib/dbmigrate')
157 157 db_uri = self.dburi
158 158
159 159 try:
160 160 curr_version = api.db_version(db_uri, repository_path)
161 161 msg = ('Found current database under version '
162 162 'control with version %s' % curr_version)
163 163
164 164 except (RuntimeError, DatabaseNotControlledError):
165 165 curr_version = 1
166 166 msg = ('Current database is not under version control. Setting '
167 167 'as version %s' % curr_version)
168 168 api.version_control(db_uri, repository_path, curr_version)
169 169
170 170 notify(msg)
171 171
172 172 self.run_pre_migration_tasks()
173 173
174 174 if curr_version == __dbversion__:
175 175 log.info('This database is already at the newest version')
176 176 sys.exit(0)
177 177
178 178 upgrade_steps = range(curr_version + 1, __dbversion__ + 1)
179 179 notify('attempting to upgrade database from '
180 180 'version %s to version %s' % (curr_version, __dbversion__))
181 181
182 182 # CALL THE PROPER ORDER OF STEPS TO PERFORM FULL UPGRADE
183 183 _step = None
184 184 for step in upgrade_steps:
185 185 notify('performing upgrade step %s' % step)
186 186 time.sleep(0.5)
187 187
188 188 api.upgrade(db_uri, repository_path, step)
189 189 self.sa.rollback()
190 190 notify('schema upgrade for step %s completed' % (step,))
191 191
192 192 _step = step
193 193
194 194 notify('upgrade to version %s successful' % _step)
195 195
196 196 def fix_repo_paths(self):
197 197 """
198 198 Fixes an old RhodeCode version path into new one without a '*'
199 199 """
200 200
201 201 paths = self.sa.query(RhodeCodeUi)\
202 202 .filter(RhodeCodeUi.ui_key == '/')\
203 203 .scalar()
204 204
205 205 paths.ui_value = paths.ui_value.replace('*', '')
206 206
207 207 try:
208 208 self.sa.add(paths)
209 209 self.sa.commit()
210 210 except Exception:
211 211 self.sa.rollback()
212 212 raise
213 213
214 214 def fix_default_user(self):
215 215 """
216 216 Fixes an old default user with some 'nicer' default values,
217 217 used mostly for anonymous access
218 218 """
219 219 def_user = self.sa.query(User)\
220 220 .filter(User.username == User.DEFAULT_USER)\
221 221 .one()
222 222
223 223 def_user.name = 'Anonymous'
224 224 def_user.lastname = 'User'
225 225 def_user.email = User.DEFAULT_USER_EMAIL
226 226
227 227 try:
228 228 self.sa.add(def_user)
229 229 self.sa.commit()
230 230 except Exception:
231 231 self.sa.rollback()
232 232 raise
233 233
234 234 def fix_settings(self):
235 235 """
236 236 Fixes rhodecode settings and adds ga_code key for google analytics
237 237 """
238 238
239 239 hgsettings3 = RhodeCodeSetting('ga_code', '')
240 240
241 241 try:
242 242 self.sa.add(hgsettings3)
243 243 self.sa.commit()
244 244 except Exception:
245 245 self.sa.rollback()
246 246 raise
247 247
248 248 def create_admin_and_prompt(self):
249 249
250 250 # defaults
251 251 defaults = self.cli_args
252 252 username = defaults.get('username')
253 253 password = defaults.get('password')
254 254 email = defaults.get('email')
255 255
256 256 if username is None:
257 257 username = raw_input('Specify admin username:')
258 258 if password is None:
259 259 password = self._get_admin_password()
260 260 if not password:
261 261 # second try
262 262 password = self._get_admin_password()
263 263 if not password:
264 264 sys.exit()
265 265 if email is None:
266 266 email = raw_input('Specify admin email:')
267 267 api_key = self.cli_args.get('api_key')
268 268 self.create_user(username, password, email, True,
269 269 strict_creation_check=False,
270 270 api_key=api_key)
271 271
272 272 def _get_admin_password(self):
273 273 password = getpass.getpass('Specify admin password '
274 274 '(min 6 chars):')
275 275 confirm = getpass.getpass('Confirm password:')
276 276
277 277 if password != confirm:
278 278 log.error('passwords mismatch')
279 279 return False
280 280 if len(password) < 6:
281 281 log.error('password is too short - use at least 6 characters')
282 282 return False
283 283
284 284 return password
285 285
286 286 def create_test_admin_and_users(self):
287 287 log.info('creating admin and regular test users')
288 288 from rhodecode.tests import TEST_USER_ADMIN_LOGIN, \
289 289 TEST_USER_ADMIN_PASS, TEST_USER_ADMIN_EMAIL, \
290 290 TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, \
291 291 TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR2_LOGIN, \
292 292 TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL
293 293
294 294 self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
295 TEST_USER_ADMIN_EMAIL, True)
295 TEST_USER_ADMIN_EMAIL, True, api_key=True)
296 296
297 297 self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
298 TEST_USER_REGULAR_EMAIL, False)
298 TEST_USER_REGULAR_EMAIL, False, api_key=True)
299 299
300 300 self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
301 TEST_USER_REGULAR2_EMAIL, False)
301 TEST_USER_REGULAR2_EMAIL, False, api_key=True)
302 302
303 303 def create_ui_settings(self, repo_store_path):
304 304 """
305 305 Creates ui settings, fills out hooks
306 306 and disables dotencode
307 307 """
308 308 settings_model = SettingsModel(sa=self.sa)
309 309
310 310 # Build HOOKS
311 311 hooks = [
312 312 (RhodeCodeUi.HOOK_REPO_SIZE, 'python:vcsserver.hooks.repo_size'),
313 313
314 314 # HG
315 315 (RhodeCodeUi.HOOK_PRE_PULL, 'python:vcsserver.hooks.pre_pull'),
316 316 (RhodeCodeUi.HOOK_PULL, 'python:vcsserver.hooks.log_pull_action'),
317 317 (RhodeCodeUi.HOOK_PRE_PUSH, 'python:vcsserver.hooks.pre_push'),
318 318 (RhodeCodeUi.HOOK_PRETX_PUSH, 'python:vcsserver.hooks.pre_push'),
319 319 (RhodeCodeUi.HOOK_PUSH, 'python:vcsserver.hooks.log_push_action'),
320 320
321 321 ]
322 322
323 323 for key, value in hooks:
324 324 hook_obj = settings_model.get_ui_by_key(key)
325 325 hooks2 = hook_obj if hook_obj else RhodeCodeUi()
326 326 hooks2.ui_section = 'hooks'
327 327 hooks2.ui_key = key
328 328 hooks2.ui_value = value
329 329 self.sa.add(hooks2)
330 330
331 331 # enable largefiles
332 332 largefiles = RhodeCodeUi()
333 333 largefiles.ui_section = 'extensions'
334 334 largefiles.ui_key = 'largefiles'
335 335 largefiles.ui_value = ''
336 336 self.sa.add(largefiles)
337 337
338 338 # set default largefiles cache dir, defaults to
339 339 # /repo location/.cache/largefiles
340 340 largefiles = RhodeCodeUi()
341 341 largefiles.ui_section = 'largefiles'
342 342 largefiles.ui_key = 'usercache'
343 343 largefiles.ui_value = os.path.join(repo_store_path, '.cache',
344 344 'largefiles')
345 345 self.sa.add(largefiles)
346 346
347 347 # enable hgsubversion disabled by default
348 348 hgsubversion = RhodeCodeUi()
349 349 hgsubversion.ui_section = 'extensions'
350 350 hgsubversion.ui_key = 'hgsubversion'
351 351 hgsubversion.ui_value = ''
352 352 hgsubversion.ui_active = False
353 353 self.sa.add(hgsubversion)
354 354
355 355 # enable hggit disabled by default
356 356 hggit = RhodeCodeUi()
357 357 hggit.ui_section = 'extensions'
358 358 hggit.ui_key = 'hggit'
359 359 hggit.ui_value = ''
360 360 hggit.ui_active = False
361 361 self.sa.add(hggit)
362 362
363 363 # set svn branch defaults
364 364 branches = ["/branches/*", "/trunk"]
365 365 tags = ["/tags/*"]
366 366
367 367 for branch in branches:
368 368 settings_model.create_ui_section_value(
369 369 RhodeCodeUi.SVN_BRANCH_ID, branch)
370 370
371 371 for tag in tags:
372 372 settings_model.create_ui_section_value(RhodeCodeUi.SVN_TAG_ID, tag)
373 373
374 374 def create_auth_plugin_options(self, skip_existing=False):
375 375 """
376 376 Create default auth plugin settings, and make it active
377 377
378 378 :param skip_existing:
379 379 """
380 380
381 381 for k, v, t in [('auth_plugins', 'egg:rhodecode-enterprise-ce#rhodecode', 'list'),
382 382 ('auth_rhodecode_enabled', 'True', 'bool')]:
383 383 if (skip_existing and
384 384 SettingsModel().get_setting_by_name(k) is not None):
385 385 log.debug('Skipping option %s' % k)
386 386 continue
387 387 setting = RhodeCodeSetting(k, v, t)
388 388 self.sa.add(setting)
389 389
390 390 def create_default_options(self, skip_existing=False):
391 391 """Creates default settings"""
392 392
393 393 for k, v, t in [
394 394 ('default_repo_enable_locking', False, 'bool'),
395 395 ('default_repo_enable_downloads', False, 'bool'),
396 396 ('default_repo_enable_statistics', False, 'bool'),
397 397 ('default_repo_private', False, 'bool'),
398 398 ('default_repo_type', 'hg', 'unicode')]:
399 399
400 400 if (skip_existing and
401 401 SettingsModel().get_setting_by_name(k) is not None):
402 402 log.debug('Skipping option %s' % k)
403 403 continue
404 404 setting = RhodeCodeSetting(k, v, t)
405 405 self.sa.add(setting)
406 406
407 407 def fixup_groups(self):
408 408 def_usr = User.get_default_user()
409 409 for g in RepoGroup.query().all():
410 410 g.group_name = g.get_new_name(g.name)
411 411 self.sa.add(g)
412 412 # get default perm
413 413 default = UserRepoGroupToPerm.query()\
414 414 .filter(UserRepoGroupToPerm.group == g)\
415 415 .filter(UserRepoGroupToPerm.user == def_usr)\
416 416 .scalar()
417 417
418 418 if default is None:
419 419 log.debug('missing default permission for group %s adding' % g)
420 420 perm_obj = RepoGroupModel()._create_default_perms(g)
421 421 self.sa.add(perm_obj)
422 422
423 423 def reset_permissions(self, username):
424 424 """
425 425 Resets permissions to default state, useful when old systems had
426 426 bad permissions, we must clean them up
427 427
428 428 :param username:
429 429 """
430 430 default_user = User.get_by_username(username)
431 431 if not default_user:
432 432 return
433 433
434 434 u2p = UserToPerm.query()\
435 435 .filter(UserToPerm.user == default_user).all()
436 436 fixed = False
437 437 if len(u2p) != len(Permission.DEFAULT_USER_PERMISSIONS):
438 438 for p in u2p:
439 439 Session().delete(p)
440 440 fixed = True
441 441 self.populate_default_permissions()
442 442 return fixed
443 443
444 444 def update_repo_info(self):
445 445 RepoModel.update_repoinfo()
446 446
447 447 def config_prompt(self, test_repo_path='', retries=3):
448 448 defaults = self.cli_args
449 449 _path = defaults.get('repos_location')
450 450 if retries == 3:
451 451 log.info('Setting up repositories config')
452 452
453 453 if _path is not None:
454 454 path = _path
455 455 elif not self.tests and not test_repo_path:
456 456 path = raw_input(
457 457 'Enter a valid absolute path to store repositories. '
458 458 'All repositories in that path will be added automatically:'
459 459 )
460 460 else:
461 461 path = test_repo_path
462 462 path_ok = True
463 463
464 464 # check proper dir
465 465 if not os.path.isdir(path):
466 466 path_ok = False
467 467 log.error('Given path %s is not a valid directory' % (path,))
468 468
469 469 elif not os.path.isabs(path):
470 470 path_ok = False
471 471 log.error('Given path %s is not an absolute path' % (path,))
472 472
473 473 # check if path is at least readable.
474 474 if not os.access(path, os.R_OK):
475 475 path_ok = False
476 476 log.error('Given path %s is not readable' % (path,))
477 477
478 478 # check write access, warn user about non writeable paths
479 479 elif not os.access(path, os.W_OK) and path_ok:
480 480 log.warning('No write permission to given path %s' % (path,))
481 481
482 482 q = ('Given path %s is not writeable, do you want to '
483 483 'continue with read only mode ? [y/n]' % (path,))
484 484 if not self.ask_ok(q):
485 485 log.error('Canceled by user')
486 486 sys.exit(-1)
487 487
488 488 if retries == 0:
489 489 sys.exit('max retries reached')
490 490 if not path_ok:
491 491 retries -= 1
492 492 return self.config_prompt(test_repo_path, retries)
493 493
494 494 real_path = os.path.normpath(os.path.realpath(path))
495 495
496 496 if real_path != os.path.normpath(path):
497 497 q = ('Path looks like a symlink, RhodeCode Enterprise will store '
498 498 'given path as %s ? [y/n]') % (real_path,)
499 499 if not self.ask_ok(q):
500 500 log.error('Canceled by user')
501 501 sys.exit(-1)
502 502
503 503 return real_path
504 504
505 505 def create_settings(self, path):
506 506
507 507 self.create_ui_settings(path)
508 508
509 509 ui_config = [
510 510 ('web', 'push_ssl', 'false'),
511 511 ('web', 'allow_archive', 'gz zip bz2'),
512 512 ('web', 'allow_push', '*'),
513 513 ('web', 'baseurl', '/'),
514 514 ('paths', '/', path),
515 515 ('phases', 'publish', 'true')
516 516 ]
517 517 for section, key, value in ui_config:
518 518 ui_conf = RhodeCodeUi()
519 519 setattr(ui_conf, 'ui_section', section)
520 520 setattr(ui_conf, 'ui_key', key)
521 521 setattr(ui_conf, 'ui_value', value)
522 522 self.sa.add(ui_conf)
523 523
524 524 # rhodecode app settings
525 525 settings = [
526 526 ('realm', 'RhodeCode', 'unicode'),
527 527 ('title', '', 'unicode'),
528 528 ('pre_code', '', 'unicode'),
529 529 ('post_code', '', 'unicode'),
530 530 ('show_public_icon', True, 'bool'),
531 531 ('show_private_icon', True, 'bool'),
532 532 ('stylify_metatags', False, 'bool'),
533 533 ('dashboard_items', 100, 'int'),
534 534 ('admin_grid_items', 25, 'int'),
535 535 ('show_version', True, 'bool'),
536 536 ('use_gravatar', False, 'bool'),
537 537 ('gravatar_url', User.DEFAULT_GRAVATAR_URL, 'unicode'),
538 538 ('clone_uri_tmpl', Repository.DEFAULT_CLONE_URI, 'unicode'),
539 539 ('support_url', '', 'unicode'),
540 540 ('update_url', RhodeCodeSetting.DEFAULT_UPDATE_URL, 'unicode'),
541 541 ('show_revision_number', True, 'bool'),
542 542 ('show_sha_length', 12, 'int'),
543 543 ]
544 544
545 545 for key, val, type_ in settings:
546 546 sett = RhodeCodeSetting(key, val, type_)
547 547 self.sa.add(sett)
548 548
549 549 self.create_auth_plugin_options()
550 550 self.create_default_options()
551 551
552 552 log.info('created ui config')
553 553
554 554 def create_user(self, username, password, email='', admin=False,
555 555 strict_creation_check=True, api_key=None):
556 556 log.info('creating user %s' % username)
557 557 user = UserModel().create_or_update(
558 558 username, password, email, firstname='RhodeCode', lastname='Admin',
559 559 active=True, admin=admin, extern_type="rhodecode",
560 560 strict_creation_check=strict_creation_check)
561 561
562 562 if api_key:
563 563 log.info('setting a provided api key for the user %s', username)
564 user.api_key = api_key
564 from rhodecode.model.auth_token import AuthTokenModel
565 AuthTokenModel().create(
566 user=user, description='BUILTIN TOKEN')
565 567
566 568 def create_default_user(self):
567 569 log.info('creating default user')
568 570 # create default user for handling default permissions.
569 571 user = UserModel().create_or_update(username=User.DEFAULT_USER,
570 572 password=str(uuid.uuid1())[:20],
571 573 email=User.DEFAULT_USER_EMAIL,
572 574 firstname='Anonymous',
573 575 lastname='User',
574 576 strict_creation_check=False)
575 577 # based on configuration options activate/deactive this user which
576 578 # controlls anonymous access
577 579 if self.cli_args.get('public_access') is False:
578 580 log.info('Public access disabled')
579 581 user.active = False
580 582 Session().add(user)
581 583 Session().commit()
582 584
583 585 def create_permissions(self):
584 586 """
585 587 Creates all permissions defined in the system
586 588 """
587 589 # module.(access|create|change|delete)_[name]
588 590 # module.(none|read|write|admin)
589 591 log.info('creating permissions')
590 592 PermissionModel(self.sa).create_permissions()
591 593
592 594 def populate_default_permissions(self):
593 595 """
594 596 Populate default permissions. It will create only the default
595 597 permissions that are missing, and not alter already defined ones
596 598 """
597 599 log.info('creating default user permissions')
598 600 PermissionModel(self.sa).create_default_user_permissions(user=User.DEFAULT_USER)
@@ -1,396 +1,401 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2013-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 """
23 23 Set of hooks run by RhodeCode Enterprise
24 24 """
25 25
26 26 import os
27 27 import collections
28 28 import logging
29 29
30 30 import rhodecode
31 31 from rhodecode import events
32 32 from rhodecode.lib import helpers as h
33 33 from rhodecode.lib.utils import action_logger
34 34 from rhodecode.lib.utils2 import safe_str
35 35 from rhodecode.lib.exceptions import HTTPLockedRC, UserCreationError
36 36 from rhodecode.model.db import Repository, User
37 37
38 38 log = logging.getLogger(__name__)
39 39
40 40
41 41 HookResponse = collections.namedtuple('HookResponse', ('status', 'output'))
42 42
43 43
44 44 def is_shadow_repo(extras):
45 45 """
46 46 Returns ``True`` if this is an action executed against a shadow repository.
47 47 """
48 48 return extras['is_shadow_repo']
49 49
50 50
51 51 def _get_scm_size(alias, root_path):
52 52
53 53 if not alias.startswith('.'):
54 54 alias += '.'
55 55
56 56 size_scm, size_root = 0, 0
57 57 for path, unused_dirs, files in os.walk(safe_str(root_path)):
58 58 if path.find(alias) != -1:
59 59 for f in files:
60 60 try:
61 61 size_scm += os.path.getsize(os.path.join(path, f))
62 62 except OSError:
63 63 pass
64 64 else:
65 65 for f in files:
66 66 try:
67 67 size_root += os.path.getsize(os.path.join(path, f))
68 68 except OSError:
69 69 pass
70 70
71 71 size_scm_f = h.format_byte_size_binary(size_scm)
72 72 size_root_f = h.format_byte_size_binary(size_root)
73 73 size_total_f = h.format_byte_size_binary(size_root + size_scm)
74 74
75 75 return size_scm_f, size_root_f, size_total_f
76 76
77 77
78 78 # actual hooks called by Mercurial internally, and GIT by our Python Hooks
79 79 def repo_size(extras):
80 80 """Present size of repository after push."""
81 81 repo = Repository.get_by_repo_name(extras.repository)
82 82 vcs_part = safe_str(u'.%s' % repo.repo_type)
83 83 size_vcs, size_root, size_total = _get_scm_size(vcs_part,
84 84 repo.repo_full_path)
85 85 msg = ('Repository `%s` size summary %s:%s repo:%s total:%s\n'
86 86 % (repo.repo_name, vcs_part, size_vcs, size_root, size_total))
87 87 return HookResponse(0, msg)
88 88
89 89
90 90 def pre_push(extras):
91 91 """
92 92 Hook executed before pushing code.
93 93
94 94 It bans pushing when the repository is locked.
95 95 """
96 96
97 97 usr = User.get_by_username(extras.username)
98 98 output = ''
99 99 if extras.locked_by[0] and usr.user_id != int(extras.locked_by[0]):
100 100 locked_by = User.get(extras.locked_by[0]).username
101 101 reason = extras.locked_by[2]
102 102 # this exception is interpreted in git/hg middlewares and based
103 103 # on that proper return code is server to client
104 104 _http_ret = HTTPLockedRC(
105 105 _locked_by_explanation(extras.repository, locked_by, reason))
106 106 if str(_http_ret.code).startswith('2'):
107 107 # 2xx Codes don't raise exceptions
108 108 output = _http_ret.title
109 109 else:
110 110 raise _http_ret
111 111
112 112 # Propagate to external components. This is done after checking the
113 113 # lock, for consistent behavior.
114 114 if not is_shadow_repo(extras):
115 115 pre_push_extension(repo_store_path=Repository.base_path(), **extras)
116 116 events.trigger(events.RepoPrePushEvent(
117 117 repo_name=extras.repository, extras=extras))
118 118
119 119 return HookResponse(0, output)
120 120
121 121
122 122 def pre_pull(extras):
123 123 """
124 124 Hook executed before pulling the code.
125 125
126 126 It bans pulling when the repository is locked.
127 127 """
128 128
129 129 output = ''
130 130 if extras.locked_by[0]:
131 131 locked_by = User.get(extras.locked_by[0]).username
132 132 reason = extras.locked_by[2]
133 133 # this exception is interpreted in git/hg middlewares and based
134 134 # on that proper return code is server to client
135 135 _http_ret = HTTPLockedRC(
136 136 _locked_by_explanation(extras.repository, locked_by, reason))
137 137 if str(_http_ret.code).startswith('2'):
138 138 # 2xx Codes don't raise exceptions
139 139 output = _http_ret.title
140 140 else:
141 141 raise _http_ret
142 142
143 143 # Propagate to external components. This is done after checking the
144 144 # lock, for consistent behavior.
145 145 if not is_shadow_repo(extras):
146 146 pre_pull_extension(**extras)
147 147 events.trigger(events.RepoPrePullEvent(
148 148 repo_name=extras.repository, extras=extras))
149 149
150 150 return HookResponse(0, output)
151 151
152 152
153 153 def post_pull(extras):
154 154 """Hook executed after client pulls the code."""
155 155 user = User.get_by_username(extras.username)
156 156 action = 'pull'
157 157 action_logger(user, action, extras.repository, extras.ip, commit=True)
158 158
159 159 # Propagate to external components.
160 160 if not is_shadow_repo(extras):
161 161 post_pull_extension(**extras)
162 162 events.trigger(events.RepoPullEvent(
163 163 repo_name=extras.repository, extras=extras))
164 164
165 165 output = ''
166 166 # make lock is a tri state False, True, None. We only make lock on True
167 167 if extras.make_lock is True and not is_shadow_repo(extras):
168 168 Repository.lock(Repository.get_by_repo_name(extras.repository),
169 169 user.user_id,
170 170 lock_reason=Repository.LOCK_PULL)
171 171 msg = 'Made lock on repo `%s`' % (extras.repository,)
172 172 output += msg
173 173
174 174 if extras.locked_by[0]:
175 175 locked_by = User.get(extras.locked_by[0]).username
176 176 reason = extras.locked_by[2]
177 177 _http_ret = HTTPLockedRC(
178 178 _locked_by_explanation(extras.repository, locked_by, reason))
179 179 if str(_http_ret.code).startswith('2'):
180 180 # 2xx Codes don't raise exceptions
181 181 output += _http_ret.title
182 182
183 183 return HookResponse(0, output)
184 184
185 185
186 186 def post_push(extras):
187 187 """Hook executed after user pushes to the repository."""
188 188 action_tmpl = extras.action + ':%s'
189 189 commit_ids = extras.commit_ids[:29000]
190 190
191 191 action = action_tmpl % ','.join(commit_ids)
192 192 action_logger(
193 193 extras.username, action, extras.repository, extras.ip, commit=True)
194 194
195 195 # Propagate to external components.
196 196 if not is_shadow_repo(extras):
197 197 post_push_extension(
198 198 repo_store_path=Repository.base_path(),
199 199 pushed_revs=commit_ids,
200 200 **extras)
201 201 events.trigger(events.RepoPushEvent(
202 202 repo_name=extras.repository,
203 203 pushed_commit_ids=commit_ids,
204 204 extras=extras))
205 205
206 206 output = ''
207 207 # make lock is a tri state False, True, None. We only release lock on False
208 208 if extras.make_lock is False and not is_shadow_repo(extras):
209 209 Repository.unlock(Repository.get_by_repo_name(extras.repository))
210 210 msg = 'Released lock on repo `%s`\n' % extras.repository
211 211 output += msg
212 212
213 213 if extras.locked_by[0]:
214 214 locked_by = User.get(extras.locked_by[0]).username
215 215 reason = extras.locked_by[2]
216 216 _http_ret = HTTPLockedRC(
217 217 _locked_by_explanation(extras.repository, locked_by, reason))
218 218 # TODO: johbo: if not?
219 219 if str(_http_ret.code).startswith('2'):
220 220 # 2xx Codes don't raise exceptions
221 221 output += _http_ret.title
222 222
223 223 output += 'RhodeCode: push completed\n'
224 224
225 225 return HookResponse(0, output)
226 226
227 227
228 228 def _locked_by_explanation(repo_name, user_name, reason):
229 229 message = (
230 230 'Repository `%s` locked by user `%s`. Reason:`%s`'
231 231 % (repo_name, user_name, reason))
232 232 return message
233 233
234 234
235 235 def check_allowed_create_user(user_dict, created_by, **kwargs):
236 236 # pre create hooks
237 237 if pre_create_user.is_active():
238 238 allowed, reason = pre_create_user(created_by=created_by, **user_dict)
239 239 if not allowed:
240 240 raise UserCreationError(reason)
241 241
242 242
243 243 class ExtensionCallback(object):
244 244 """
245 245 Forwards a given call to rcextensions, sanitizes keyword arguments.
246 246
247 247 Does check if there is an extension active for that hook. If it is
248 248 there, it will forward all `kwargs_keys` keyword arguments to the
249 249 extension callback.
250 250 """
251 251
252 252 def __init__(self, hook_name, kwargs_keys):
253 253 self._hook_name = hook_name
254 254 self._kwargs_keys = set(kwargs_keys)
255 255
256 256 def __call__(self, *args, **kwargs):
257 257 log.debug('Calling extension callback for %s', self._hook_name)
258 258
259 259 kwargs_to_pass = dict((key, kwargs[key]) for key in self._kwargs_keys)
260 # backward compat for removed api_key for old hooks. THis was it works
261 # with older rcextensions that require api_key present
262 if self._hook_name in ['CREATE_USER_HOOK', 'DELETE_USER_HOOK']:
263 kwargs_to_pass['api_key'] = '_DEPRECATED_'
264
260 265 callback = self._get_callback()
261 266 if callback:
262 267 return callback(**kwargs_to_pass)
263 268 else:
264 269 log.debug('extensions callback not found skipping...')
265 270
266 271 def is_active(self):
267 272 return hasattr(rhodecode.EXTENSIONS, self._hook_name)
268 273
269 274 def _get_callback(self):
270 275 return getattr(rhodecode.EXTENSIONS, self._hook_name, None)
271 276
272 277
273 278 pre_pull_extension = ExtensionCallback(
274 279 hook_name='PRE_PULL_HOOK',
275 280 kwargs_keys=(
276 281 'server_url', 'config', 'scm', 'username', 'ip', 'action',
277 282 'repository'))
278 283
279 284
280 285 post_pull_extension = ExtensionCallback(
281 286 hook_name='PULL_HOOK',
282 287 kwargs_keys=(
283 288 'server_url', 'config', 'scm', 'username', 'ip', 'action',
284 289 'repository'))
285 290
286 291
287 292 pre_push_extension = ExtensionCallback(
288 293 hook_name='PRE_PUSH_HOOK',
289 294 kwargs_keys=(
290 295 'server_url', 'config', 'scm', 'username', 'ip', 'action',
291 296 'repository', 'repo_store_path', 'commit_ids'))
292 297
293 298
294 299 post_push_extension = ExtensionCallback(
295 300 hook_name='PUSH_HOOK',
296 301 kwargs_keys=(
297 302 'server_url', 'config', 'scm', 'username', 'ip', 'action',
298 303 'repository', 'repo_store_path', 'pushed_revs'))
299 304
300 305
301 306 pre_create_user = ExtensionCallback(
302 307 hook_name='PRE_CREATE_USER_HOOK',
303 308 kwargs_keys=(
304 309 'username', 'password', 'email', 'firstname', 'lastname', 'active',
305 310 'admin', 'created_by'))
306 311
307 312
308 313 log_create_pull_request = ExtensionCallback(
309 314 hook_name='CREATE_PULL_REQUEST',
310 315 kwargs_keys=(
311 316 'server_url', 'config', 'scm', 'username', 'ip', 'action',
312 317 'repository', 'pull_request_id', 'url', 'title', 'description',
313 318 'status', 'created_on', 'updated_on', 'commit_ids', 'review_status',
314 319 'mergeable', 'source', 'target', 'author', 'reviewers'))
315 320
316 321
317 322 log_merge_pull_request = ExtensionCallback(
318 323 hook_name='MERGE_PULL_REQUEST',
319 324 kwargs_keys=(
320 325 'server_url', 'config', 'scm', 'username', 'ip', 'action',
321 326 'repository', 'pull_request_id', 'url', 'title', 'description',
322 327 'status', 'created_on', 'updated_on', 'commit_ids', 'review_status',
323 328 'mergeable', 'source', 'target', 'author', 'reviewers'))
324 329
325 330
326 331 log_close_pull_request = ExtensionCallback(
327 332 hook_name='CLOSE_PULL_REQUEST',
328 333 kwargs_keys=(
329 334 'server_url', 'config', 'scm', 'username', 'ip', 'action',
330 335 'repository', 'pull_request_id', 'url', 'title', 'description',
331 336 'status', 'created_on', 'updated_on', 'commit_ids', 'review_status',
332 337 'mergeable', 'source', 'target', 'author', 'reviewers'))
333 338
334 339
335 340 log_review_pull_request = ExtensionCallback(
336 341 hook_name='REVIEW_PULL_REQUEST',
337 342 kwargs_keys=(
338 343 'server_url', 'config', 'scm', 'username', 'ip', 'action',
339 344 'repository', 'pull_request_id', 'url', 'title', 'description',
340 345 'status', 'created_on', 'updated_on', 'commit_ids', 'review_status',
341 346 'mergeable', 'source', 'target', 'author', 'reviewers'))
342 347
343 348
344 349 log_update_pull_request = ExtensionCallback(
345 350 hook_name='UPDATE_PULL_REQUEST',
346 351 kwargs_keys=(
347 352 'server_url', 'config', 'scm', 'username', 'ip', 'action',
348 353 'repository', 'pull_request_id', 'url', 'title', 'description',
349 354 'status', 'created_on', 'updated_on', 'commit_ids', 'review_status',
350 355 'mergeable', 'source', 'target', 'author', 'reviewers'))
351 356
352 357
353 358 log_create_user = ExtensionCallback(
354 359 hook_name='CREATE_USER_HOOK',
355 360 kwargs_keys=(
356 361 'username', 'full_name_or_username', 'full_contact', 'user_id',
357 362 'name', 'firstname', 'short_contact', 'admin', 'lastname',
358 363 'ip_addresses', 'extern_type', 'extern_name',
359 'email', 'api_key', 'api_keys', 'last_login',
364 'email', 'api_keys', 'last_login',
360 365 'full_name', 'active', 'password', 'emails',
361 366 'inherit_default_permissions', 'created_by', 'created_on'))
362 367
363 368
364 369 log_delete_user = ExtensionCallback(
365 370 hook_name='DELETE_USER_HOOK',
366 371 kwargs_keys=(
367 372 'username', 'full_name_or_username', 'full_contact', 'user_id',
368 373 'name', 'firstname', 'short_contact', 'admin', 'lastname',
369 374 'ip_addresses',
370 'email', 'api_key', 'last_login',
375 'email', 'last_login',
371 376 'full_name', 'active', 'password', 'emails',
372 377 'inherit_default_permissions', 'deleted_by'))
373 378
374 379
375 380 log_create_repository = ExtensionCallback(
376 381 hook_name='CREATE_REPO_HOOK',
377 382 kwargs_keys=(
378 383 'repo_name', 'repo_type', 'description', 'private', 'created_on',
379 384 'enable_downloads', 'repo_id', 'user_id', 'enable_statistics',
380 385 'clone_uri', 'fork_id', 'group_id', 'created_by'))
381 386
382 387
383 388 log_delete_repository = ExtensionCallback(
384 389 hook_name='DELETE_REPO_HOOK',
385 390 kwargs_keys=(
386 391 'repo_name', 'repo_type', 'description', 'private', 'created_on',
387 392 'enable_downloads', 'repo_id', 'user_id', 'enable_statistics',
388 393 'clone_uri', 'fork_id', 'group_id', 'deleted_by', 'deleted_on'))
389 394
390 395
391 396 log_create_repository_group = ExtensionCallback(
392 397 hook_name='CREATE_REPO_GROUP_HOOK',
393 398 kwargs_keys=(
394 399 'group_name', 'group_parent_id', 'group_description',
395 400 'group_id', 'user_id', 'created_by', 'created_on',
396 401 'enable_locking'))
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
@@ -1,331 +1,337 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Helpers for fixture generation
23 23 """
24 24
25 25 import os
26 26 import time
27 27 import tempfile
28 28 import shutil
29 29
30 30 import configobj
31 31
32 32 from rhodecode.tests import *
33 33 from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36 from rhodecode.model.user import UserModel
37 37 from rhodecode.model.repo_group import RepoGroupModel
38 38 from rhodecode.model.user_group import UserGroupModel
39 39 from rhodecode.model.gist import GistModel
40 from rhodecode.model.auth_token import AuthTokenModel
40 41
41 42 dn = os.path.dirname
42 43 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
43 44
44 45
45 46 def error_function(*args, **kwargs):
46 47 raise Exception('Total Crash !')
47 48
48 49
49 50 class TestINI(object):
50 51 """
51 52 Allows to create a new test.ini file as a copy of existing one with edited
52 53 data. Example usage::
53 54
54 55 with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
55 56 print 'paster server %s' % new_test_ini
56 57 """
57 58
58 59 def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT',
59 60 destroy=True, dir=None):
60 61 self.ini_file_path = ini_file_path
61 62 self.ini_params = ini_params
62 63 self.new_path = None
63 64 self.new_path_prefix = new_file_prefix
64 65 self._destroy = destroy
65 66 self._dir = dir
66 67
67 68 def __enter__(self):
68 69 return self.create()
69 70
70 71 def __exit__(self, exc_type, exc_val, exc_tb):
71 72 self.destroy()
72 73
73 74 def create(self):
74 75 config = configobj.ConfigObj(
75 76 self.ini_file_path, file_error=True, write_empty_values=True)
76 77
77 78 for data in self.ini_params:
78 79 section, ini_params = data.items()[0]
79 80 for key, val in ini_params.items():
80 81 config[section][key] = val
81 82 with tempfile.NamedTemporaryFile(
82 83 prefix=self.new_path_prefix, suffix='.ini', dir=self._dir,
83 84 delete=False) as new_ini_file:
84 85 config.write(new_ini_file)
85 86 self.new_path = new_ini_file.name
86 87
87 88 return self.new_path
88 89
89 90 def destroy(self):
90 91 if self._destroy:
91 92 os.remove(self.new_path)
92 93
93 94
94 95 class Fixture(object):
95 96
96 97 def anon_access(self, status):
97 98 """
98 99 Context process for disabling anonymous access. use like:
99 100 fixture = Fixture()
100 101 with fixture.anon_access(False):
101 102 #tests
102 103
103 104 after this block anon access will be set to `not status`
104 105 """
105 106
106 107 class context(object):
107 108 def __enter__(self):
108 109 anon = User.get_default_user()
109 110 anon.active = status
110 111 Session().add(anon)
111 112 Session().commit()
112 113 time.sleep(1.5) # must sleep for cache (1s to expire)
113 114
114 115 def __exit__(self, exc_type, exc_val, exc_tb):
115 116 anon = User.get_default_user()
116 117 anon.active = not status
117 118 Session().add(anon)
118 119 Session().commit()
119 120
120 121 return context()
121 122
122 123 def _get_repo_create_params(self, **custom):
123 124 defs = {
124 125 'repo_name': None,
125 126 'repo_type': 'hg',
126 127 'clone_uri': '',
127 128 'repo_group': '-1',
128 129 'repo_description': 'DESC',
129 130 'repo_private': False,
130 131 'repo_landing_rev': 'rev:tip',
131 132 'repo_copy_permissions': False,
132 133 'repo_state': Repository.STATE_CREATED,
133 134 }
134 135 defs.update(custom)
135 136 if 'repo_name_full' not in custom:
136 137 defs.update({'repo_name_full': defs['repo_name']})
137 138
138 139 # fix the repo name if passed as repo_name_full
139 140 if defs['repo_name']:
140 141 defs['repo_name'] = defs['repo_name'].split('/')[-1]
141 142
142 143 return defs
143 144
144 145 def _get_group_create_params(self, **custom):
145 146 defs = {
146 147 'group_name': None,
147 148 'group_description': 'DESC',
148 149 'perm_updates': [],
149 150 'perm_additions': [],
150 151 'perm_deletions': [],
151 152 'group_parent_id': -1,
152 153 'enable_locking': False,
153 154 'recursive': False,
154 155 }
155 156 defs.update(custom)
156 157
157 158 return defs
158 159
159 160 def _get_user_create_params(self, name, **custom):
160 161 defs = {
161 162 'username': name,
162 163 'password': 'qweqwe',
163 164 'email': '%s+test@rhodecode.org' % name,
164 165 'firstname': 'TestUser',
165 166 'lastname': 'Test',
166 167 'active': True,
167 168 'admin': False,
168 169 'extern_type': 'rhodecode',
169 170 'extern_name': None,
170 171 }
171 172 defs.update(custom)
172 173
173 174 return defs
174 175
175 176 def _get_user_group_create_params(self, name, **custom):
176 177 defs = {
177 178 'users_group_name': name,
178 179 'user_group_description': 'DESC',
179 180 'users_group_active': True,
180 181 'user_group_data': {},
181 182 }
182 183 defs.update(custom)
183 184
184 185 return defs
185 186
186 187 def create_repo(self, name, **kwargs):
187 188 repo_group = kwargs.get('repo_group')
188 189 if isinstance(repo_group, RepoGroup):
189 190 kwargs['repo_group'] = repo_group.group_id
190 191 name = name.split(Repository.NAME_SEP)[-1]
191 192 name = Repository.NAME_SEP.join((repo_group.group_name, name))
192 193
193 194 if 'skip_if_exists' in kwargs:
194 195 del kwargs['skip_if_exists']
195 196 r = Repository.get_by_repo_name(name)
196 197 if r:
197 198 return r
198 199
199 200 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
200 201 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
201 202 RepoModel().create(form_data, cur_user)
202 203 Session().commit()
203 204 repo = Repository.get_by_repo_name(name)
204 205 assert repo
205 206 return repo
206 207
207 208 def create_fork(self, repo_to_fork, fork_name, **kwargs):
208 209 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
209 210
210 211 form_data = self._get_repo_create_params(repo_name=fork_name,
211 212 fork_parent_id=repo_to_fork.repo_id,
212 213 repo_type=repo_to_fork.repo_type,
213 214 **kwargs)
214 215 #TODO: fix it !!
215 216 form_data['description'] = form_data['repo_description']
216 217 form_data['private'] = form_data['repo_private']
217 218 form_data['landing_rev'] = form_data['repo_landing_rev']
218 219
219 220 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
220 221 RepoModel().create_fork(form_data, cur_user=owner)
221 222 Session().commit()
222 223 r = Repository.get_by_repo_name(fork_name)
223 224 assert r
224 225 return r
225 226
226 227 def destroy_repo(self, repo_name, **kwargs):
227 228 RepoModel().delete(repo_name, **kwargs)
228 229 Session().commit()
229 230
230 231 def destroy_repo_on_filesystem(self, repo_name):
231 232 rm_path = os.path.join(RepoModel().repos_path, repo_name)
232 233 if os.path.isdir(rm_path):
233 234 shutil.rmtree(rm_path)
234 235
235 236 def create_repo_group(self, name, **kwargs):
236 237 if 'skip_if_exists' in kwargs:
237 238 del kwargs['skip_if_exists']
238 239 gr = RepoGroup.get_by_group_name(group_name=name)
239 240 if gr:
240 241 return gr
241 242 form_data = self._get_group_create_params(group_name=name, **kwargs)
242 243 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
243 244 gr = RepoGroupModel().create(
244 245 group_name=form_data['group_name'],
245 246 group_description=form_data['group_name'],
246 247 owner=owner)
247 248 Session().commit()
248 249 gr = RepoGroup.get_by_group_name(gr.group_name)
249 250 return gr
250 251
251 252 def destroy_repo_group(self, repogroupid):
252 253 RepoGroupModel().delete(repogroupid)
253 254 Session().commit()
254 255
255 256 def create_user(self, name, **kwargs):
256 257 if 'skip_if_exists' in kwargs:
257 258 del kwargs['skip_if_exists']
258 259 user = User.get_by_username(name)
259 260 if user:
260 261 return user
261 262 form_data = self._get_user_create_params(name, **kwargs)
262 263 user = UserModel().create(form_data)
264
265 # create token for user
266 AuthTokenModel().create(
267 user=user, description='TEST_USER_TOKEN')
268
263 269 Session().commit()
264 270 user = User.get_by_username(user.username)
265 271 return user
266 272
267 273 def destroy_user(self, userid):
268 274 UserModel().delete(userid)
269 275 Session().commit()
270 276
271 277 def destroy_users(self, userid_iter):
272 278 for user_id in userid_iter:
273 279 if User.get_by_username(user_id):
274 280 UserModel().delete(user_id)
275 281 Session().commit()
276 282
277 283 def create_user_group(self, name, **kwargs):
278 284 if 'skip_if_exists' in kwargs:
279 285 del kwargs['skip_if_exists']
280 286 gr = UserGroup.get_by_group_name(group_name=name)
281 287 if gr:
282 288 return gr
283 289 form_data = self._get_user_group_create_params(name, **kwargs)
284 290 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
285 291 user_group = UserGroupModel().create(
286 292 name=form_data['users_group_name'],
287 293 description=form_data['user_group_description'],
288 294 owner=owner, active=form_data['users_group_active'],
289 295 group_data=form_data['user_group_data'])
290 296 Session().commit()
291 297 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
292 298 return user_group
293 299
294 300 def destroy_user_group(self, usergroupid):
295 301 UserGroupModel().delete(user_group=usergroupid, force=True)
296 302 Session().commit()
297 303
298 304 def create_gist(self, **kwargs):
299 305 form_data = {
300 306 'description': 'new-gist',
301 307 'owner': TEST_USER_ADMIN_LOGIN,
302 308 'gist_type': GistModel.cls.GIST_PUBLIC,
303 309 'lifetime': -1,
304 310 'acl_level': Gist.ACL_LEVEL_PUBLIC,
305 311 'gist_mapping': {'filename1.txt': {'content': 'hello world'},}
306 312 }
307 313 form_data.update(kwargs)
308 314 gist = GistModel().create(
309 315 description=form_data['description'], owner=form_data['owner'],
310 316 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
311 317 lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level']
312 318 )
313 319 Session().commit()
314 320 return gist
315 321
316 322 def destroy_gists(self, gistid=None):
317 323 for g in GistModel.cls.get_all():
318 324 if gistid:
319 325 if gistid == g.gist_access_id:
320 326 GistModel().delete(g)
321 327 else:
322 328 GistModel().delete(g)
323 329 Session().commit()
324 330
325 331 def load_resource(self, resource_name, strip=False):
326 332 with open(os.path.join(FIXTURES, resource_name)) as f:
327 333 source = f.read()
328 334 if strip:
329 335 source = source.strip()
330 336
331 337 return source
@@ -1,384 +1,383 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.lib import helpers as h
24 24 from rhodecode.lib.auth import check_password
25 25 from rhodecode.model.db import User, UserFollowing, Repository, UserApiKeys
26 26 from rhodecode.model.meta import Session
27 27 from rhodecode.tests import (
28 28 TestController, url, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_EMAIL,
29 29 assert_session_flash)
30 30 from rhodecode.tests.fixture import Fixture
31 31 from rhodecode.tests.utils import AssertResponse
32 32
33 33 fixture = Fixture()
34 34
35 35
36 36 class TestMyAccountController(TestController):
37 37 test_user_1 = 'testme'
38 38 test_user_1_password = '0jd83nHNS/d23n'
39 39 destroy_users = set()
40 40
41 41 @classmethod
42 42 def teardown_class(cls):
43 43 fixture.destroy_users(cls.destroy_users)
44 44
45 45 def test_my_account(self):
46 46 self.log_user()
47 47 response = self.app.get(url('my_account'))
48 48
49 49 response.mustcontain('test_admin')
50 50 response.mustcontain('href="/_admin/my_account/edit"')
51 51
52 52 def test_logout_form_contains_csrf(self, autologin_user, csrf_token):
53 53 response = self.app.get(url('my_account'))
54 54 assert_response = AssertResponse(response)
55 55 element = assert_response.get_element('.logout #csrf_token')
56 56 assert element.value == csrf_token
57 57
58 58 def test_my_account_edit(self):
59 59 self.log_user()
60 60 response = self.app.get(url('my_account_edit'))
61 61
62 62 response.mustcontain('value="test_admin')
63 63
64 64 def test_my_account_my_repos(self):
65 65 self.log_user()
66 66 response = self.app.get(url('my_account_repos'))
67 67 repos = Repository.query().filter(
68 68 Repository.user == User.get_by_username(
69 69 TEST_USER_ADMIN_LOGIN)).all()
70 70 for repo in repos:
71 71 response.mustcontain('"name_raw": "%s"' % repo.repo_name)
72 72
73 73 def test_my_account_my_watched(self):
74 74 self.log_user()
75 75 response = self.app.get(url('my_account_watched'))
76 76
77 77 repos = UserFollowing.query().filter(
78 78 UserFollowing.user == User.get_by_username(
79 79 TEST_USER_ADMIN_LOGIN)).all()
80 80 for repo in repos:
81 81 response.mustcontain(
82 82 '"name_raw": "%s"' % repo.follows_repository.repo_name)
83 83
84 84 @pytest.mark.backends("git", "hg")
85 85 def test_my_account_my_pullrequests(self, pr_util):
86 86 self.log_user()
87 87 response = self.app.get(url('my_account_pullrequests'))
88 88 response.mustcontain('There are currently no open pull '
89 89 'requests requiring your participation.')
90 90
91 91 pr = pr_util.create_pull_request(title='TestMyAccountPR')
92 92 response = self.app.get(url('my_account_pullrequests'))
93 93 response.mustcontain('"name_raw": %s' % pr.pull_request_id)
94 94 response.mustcontain('TestMyAccountPR')
95 95
96 96 def test_my_account_my_emails(self):
97 97 self.log_user()
98 98 response = self.app.get(url('my_account_emails'))
99 99 response.mustcontain('No additional emails specified')
100 100
101 101 def test_my_account_my_emails_add_existing_email(self):
102 102 self.log_user()
103 103 response = self.app.get(url('my_account_emails'))
104 104 response.mustcontain('No additional emails specified')
105 105 response = self.app.post(url('my_account_emails'),
106 106 {'new_email': TEST_USER_REGULAR_EMAIL,
107 107 'csrf_token': self.csrf_token})
108 108 assert_session_flash(response, 'This e-mail address is already taken')
109 109
110 110 def test_my_account_my_emails_add_mising_email_in_form(self):
111 111 self.log_user()
112 112 response = self.app.get(url('my_account_emails'))
113 113 response.mustcontain('No additional emails specified')
114 114 response = self.app.post(url('my_account_emails'),
115 115 {'csrf_token': self.csrf_token})
116 116 assert_session_flash(response, 'Please enter an email address')
117 117
118 118 def test_my_account_my_emails_add_remove(self):
119 119 self.log_user()
120 120 response = self.app.get(url('my_account_emails'))
121 121 response.mustcontain('No additional emails specified')
122 122
123 123 response = self.app.post(url('my_account_emails'),
124 124 {'new_email': 'foo@barz.com',
125 125 'csrf_token': self.csrf_token})
126 126
127 127 response = self.app.get(url('my_account_emails'))
128 128
129 129 from rhodecode.model.db import UserEmailMap
130 130 email_id = UserEmailMap.query().filter(
131 131 UserEmailMap.user == User.get_by_username(
132 132 TEST_USER_ADMIN_LOGIN)).filter(
133 133 UserEmailMap.email == 'foo@barz.com').one().email_id
134 134
135 135 response.mustcontain('foo@barz.com')
136 136 response.mustcontain('<input id="del_email_id" name="del_email_id" '
137 137 'type="hidden" value="%s" />' % email_id)
138 138
139 139 response = self.app.post(
140 140 url('my_account_emails'), {
141 141 'del_email_id': email_id, '_method': 'delete',
142 142 'csrf_token': self.csrf_token})
143 143 assert_session_flash(response, 'Removed email address from user account')
144 144 response = self.app.get(url('my_account_emails'))
145 145 response.mustcontain('No additional emails specified')
146 146
147 147 @pytest.mark.parametrize(
148 148 "name, attrs", [
149 149 ('firstname', {'firstname': 'new_username'}),
150 150 ('lastname', {'lastname': 'new_username'}),
151 151 ('admin', {'admin': True}),
152 152 ('admin', {'admin': False}),
153 153 ('extern_type', {'extern_type': 'ldap'}),
154 154 ('extern_type', {'extern_type': None}),
155 155 # ('extern_name', {'extern_name': 'test'}),
156 156 # ('extern_name', {'extern_name': None}),
157 157 ('active', {'active': False}),
158 158 ('active', {'active': True}),
159 159 ('email', {'email': 'some@email.com'}),
160 160 ])
161 161 def test_my_account_update(self, name, attrs):
162 162 usr = fixture.create_user(self.test_user_1,
163 163 password=self.test_user_1_password,
164 164 email='testme@rhodecode.org',
165 165 extern_type='rhodecode',
166 166 extern_name=self.test_user_1,
167 167 skip_if_exists=True)
168 168 self.destroy_users.add(self.test_user_1)
169 169
170 170 params = usr.get_api_data() # current user data
171 171 user_id = usr.user_id
172 172 self.log_user(
173 173 username=self.test_user_1, password=self.test_user_1_password)
174 174
175 175 params.update({'password_confirmation': ''})
176 176 params.update({'new_password': ''})
177 177 params.update({'extern_type': 'rhodecode'})
178 178 params.update({'extern_name': self.test_user_1})
179 179 params.update({'csrf_token': self.csrf_token})
180 180
181 181 params.update(attrs)
182 182 # my account page cannot set language param yet, only for admins
183 183 del params['language']
184 184 response = self.app.post(url('my_account'), params)
185 185
186 186 assert_session_flash(
187 187 response, 'Your account was updated successfully')
188 188
189 189 del params['csrf_token']
190 190
191 191 updated_user = User.get_by_username(self.test_user_1)
192 192 updated_params = updated_user.get_api_data()
193 193 updated_params.update({'password_confirmation': ''})
194 194 updated_params.update({'new_password': ''})
195 195
196 196 params['last_login'] = updated_params['last_login']
197 197 # my account page cannot set language param yet, only for admins
198 198 # but we get this info from API anyway
199 199 params['language'] = updated_params['language']
200 200
201 201 if name == 'email':
202 202 params['emails'] = [attrs['email']]
203 203 if name == 'extern_type':
204 204 # cannot update this via form, expected value is original one
205 205 params['extern_type'] = "rhodecode"
206 206 if name == 'extern_name':
207 207 # cannot update this via form, expected value is original one
208 208 params['extern_name'] = str(user_id)
209 209 if name == 'active':
210 210 # my account cannot deactivate account
211 211 params['active'] = True
212 212 if name == 'admin':
213 213 # my account cannot make you an admin !
214 214 params['admin'] = False
215 215
216 216 assert params == updated_params
217 217
218 218 def test_my_account_update_err_email_exists(self):
219 219 self.log_user()
220 220
221 221 new_email = 'test_regular@mail.com' # already exisitn email
222 222 response = self.app.post(url('my_account'),
223 223 params={
224 224 'username': 'test_admin',
225 225 'new_password': 'test12',
226 226 'password_confirmation': 'test122',
227 227 'firstname': 'NewName',
228 228 'lastname': 'NewLastname',
229 229 'email': new_email,
230 230 'csrf_token': self.csrf_token,
231 231 })
232 232
233 233 response.mustcontain('This e-mail address is already taken')
234 234
235 235 def test_my_account_update_err(self):
236 236 self.log_user('test_regular2', 'test12')
237 237
238 238 new_email = 'newmail.pl'
239 239 response = self.app.post(url('my_account'),
240 240 params={
241 241 'username': 'test_admin',
242 242 'new_password': 'test12',
243 243 'password_confirmation': 'test122',
244 244 'firstname': 'NewName',
245 245 'lastname': 'NewLastname',
246 246 'email': new_email,
247 247 'csrf_token': self.csrf_token,
248 248 })
249 249
250 250 response.mustcontain('An email address must contain a single @')
251 251 from rhodecode.model import validators
252 252 msg = validators.ValidUsername(
253 253 edit=False, old_data={})._messages['username_exists']
254 254 msg = h.html_escape(msg % {'username': 'test_admin'})
255 255 response.mustcontain(u"%s" % msg)
256 256
257 257 def test_my_account_auth_tokens(self):
258 258 usr = self.log_user('test_regular2', 'test12')
259 259 user = User.get(usr['user_id'])
260 260 response = self.app.get(url('my_account_auth_tokens'))
261 response.mustcontain(user.api_key)
262 response.mustcontain('expires: never')
261 for token in user.auth_tokens:
262 response.mustcontain(token)
263 response.mustcontain('never')
263 264
264 265 @pytest.mark.parametrize("desc, lifetime", [
265 266 ('forever', -1),
266 267 ('5mins', 60*5),
267 268 ('30days', 60*60*24*30),
268 269 ])
269 def test_my_account_add_auth_tokens(self, desc, lifetime):
270 usr = self.log_user('test_regular2', 'test12')
271 user = User.get(usr['user_id'])
270 def test_my_account_add_auth_tokens(self, desc, lifetime, user_util):
271 user = user_util.create_user(password='qweqwe')
272 user_id = user.user_id
273 self.log_user(user.username, 'qweqwe')
274
272 275 response = self.app.post(url('my_account_auth_tokens'),
273 276 {'description': desc, 'lifetime': lifetime,
274 277 'csrf_token': self.csrf_token})
275 278 assert_session_flash(response, 'Auth token successfully created')
276 try:
277 response = response.follow()
278 user = User.get(usr['user_id'])
279 for auth_token in user.auth_tokens:
280 response.mustcontain(auth_token)
281 finally:
282 for auth_token in UserApiKeys.query().all():
283 Session().delete(auth_token)
284 Session().commit()
279
280 response = response.follow()
281 user = User.get(user_id)
282 for auth_token in user.auth_tokens:
283 response.mustcontain(auth_token)
285 284
286 285 def test_my_account_remove_auth_token(self, user_util):
287 user = user_util.create_user(password=self.test_user_1_password)
286 user = user_util.create_user(password='qweqwe')
288 287 user_id = user.user_id
289 self.log_user(user.username, self.test_user_1_password)
288 self.log_user(user.username, 'qweqwe')
290 289
291 290 user = User.get(user_id)
292 291 keys = user.extra_auth_tokens
293 assert 1 == len(keys)
292 assert 2 == len(keys)
294 293
295 294 response = self.app.post(url('my_account_auth_tokens'),
296 295 {'description': 'desc', 'lifetime': -1,
297 296 'csrf_token': self.csrf_token})
298 297 assert_session_flash(response, 'Auth token successfully created')
299 298 response.follow()
300 299
301 300 user = User.get(user_id)
302 301 keys = user.extra_auth_tokens
303 assert 2 == len(keys)
302 assert 3 == len(keys)
304 303
305 304 response = self.app.post(
306 305 url('my_account_auth_tokens'),
307 306 {'_method': 'delete', 'del_auth_token': keys[0].api_key,
308 307 'csrf_token': self.csrf_token})
309 308 assert_session_flash(response, 'Auth token successfully deleted')
310 309
311 310 user = User.get(user_id)
312 311 keys = user.extra_auth_tokens
313 assert 1 == len(keys)
312 assert 2 == len(keys)
314 313
315 314 def test_valid_change_password(self, user_util):
316 315 new_password = 'my_new_valid_password'
317 316 user = user_util.create_user(password=self.test_user_1_password)
318 317 session = self.log_user(user.username, self.test_user_1_password)
319 318 form_data = [
320 319 ('current_password', self.test_user_1_password),
321 320 ('__start__', 'new_password:mapping'),
322 321 ('new_password', new_password),
323 322 ('new_password-confirm', new_password),
324 323 ('__end__', 'new_password:mapping'),
325 324 ('csrf_token', self.csrf_token),
326 325 ]
327 326 response = self.app.post(url('my_account_password'), form_data).follow()
328 327 assert 'Successfully updated password' in response
329 328
330 # check_password depends on user being in session
329 # check_password depends on user being in session
331 330 Session().add(user)
332 331 try:
333 332 assert check_password(new_password, user.password)
334 333 finally:
335 334 Session().expunge(user)
336 335
337 336 @pytest.mark.parametrize('current_pw,new_pw,confirm_pw', [
338 337 ('', 'abcdef123', 'abcdef123'),
339 338 ('wrong_pw', 'abcdef123', 'abcdef123'),
340 339 (test_user_1_password, test_user_1_password, test_user_1_password),
341 340 (test_user_1_password, '', ''),
342 341 (test_user_1_password, 'abcdef123', ''),
343 342 (test_user_1_password, '', 'abcdef123'),
344 343 (test_user_1_password, 'not_the', 'same_pw'),
345 344 (test_user_1_password, 'short', 'short'),
346 345 ])
347 346 def test_invalid_change_password(self, current_pw, new_pw, confirm_pw,
348 347 user_util):
349 348 user = user_util.create_user(password=self.test_user_1_password)
350 349 session = self.log_user(user.username, self.test_user_1_password)
351 350 old_password_hash = session['password']
352 351 form_data = [
353 352 ('current_password', current_pw),
354 353 ('__start__', 'new_password:mapping'),
355 354 ('new_password', new_pw),
356 355 ('new_password-confirm', confirm_pw),
357 356 ('__end__', 'new_password:mapping'),
358 357 ('csrf_token', self.csrf_token),
359 358 ]
360 359 response = self.app.post(url('my_account_password'), form_data)
361 360 assert 'Error occurred' in response
362 361
363 362 def test_password_is_updated_in_session_on_password_change(self, user_util):
364 363 old_password = 'abcdef123'
365 364 new_password = 'abcdef124'
366 365
367 366 user = user_util.create_user(password=old_password)
368 367 session = self.log_user(user.username, old_password)
369 368 old_password_hash = session['password']
370 369
371 370 form_data = [
372 371 ('current_password', old_password),
373 372 ('__start__', 'new_password:mapping'),
374 373 ('new_password', new_password),
375 374 ('new_password-confirm', new_password),
376 375 ('__end__', 'new_password:mapping'),
377 376 ('csrf_token', self.csrf_token),
378 377 ]
379 378 self.app.post(url('my_account_password'), form_data)
380 379
381 380 response = self.app.get(url('home'))
382 381 new_password_hash = response.session['rhodecode_user']['password']
383 382
384 383 assert old_password_hash != new_password_hash
@@ -1,627 +1,623 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 from sqlalchemy.orm.exc import NoResultFound
23 23
24 24 from rhodecode.lib.auth import check_password
25 25 from rhodecode.lib import helpers as h
26 26 from rhodecode.model import validators
27 27 from rhodecode.model.db import User, UserIpMap, UserApiKeys
28 28 from rhodecode.model.meta import Session
29 29 from rhodecode.model.user import UserModel
30 30 from rhodecode.tests import (
31 31 TestController, url, link_to, TEST_USER_ADMIN_LOGIN,
32 32 TEST_USER_REGULAR_LOGIN, assert_session_flash)
33 33 from rhodecode.tests.fixture import Fixture
34 34 from rhodecode.tests.utils import AssertResponse
35 35
36 36 fixture = Fixture()
37 37
38 38
39 39 class TestAdminUsersController(TestController):
40 40 test_user_1 = 'testme'
41 41 destroy_users = set()
42 42
43 43 @classmethod
44 44 def teardown_method(cls, method):
45 45 fixture.destroy_users(cls.destroy_users)
46 46
47 47 def test_index(self):
48 48 self.log_user()
49 49 self.app.get(url('users'))
50 50
51 51 def test_create(self):
52 52 self.log_user()
53 53 username = 'newtestuser'
54 54 password = 'test12'
55 55 password_confirmation = password
56 56 name = 'name'
57 57 lastname = 'lastname'
58 58 email = 'mail@mail.com'
59 59
60 60 response = self.app.get(url('new_user'))
61 61
62 62 response = self.app.post(url('users'), params={
63 63 'username': username,
64 64 'password': password,
65 65 'password_confirmation': password_confirmation,
66 66 'firstname': name,
67 67 'active': True,
68 68 'lastname': lastname,
69 69 'extern_name': 'rhodecode',
70 70 'extern_type': 'rhodecode',
71 71 'email': email,
72 72 'csrf_token': self.csrf_token,
73 73 })
74 74 user_link = link_to(
75 75 username,
76 76 url('edit_user', user_id=User.get_by_username(username).user_id))
77 77 assert_session_flash(response, 'Created user %s' % (user_link,))
78 78 self.destroy_users.add(username)
79 79
80 80 new_user = User.query().filter(User.username == username).one()
81 81
82 82 assert new_user.username == username
83 83 assert check_password(password, new_user.password)
84 84 assert new_user.name == name
85 85 assert new_user.lastname == lastname
86 86 assert new_user.email == email
87 87
88 88 response.follow()
89 89 response = response.follow()
90 90 response.mustcontain(username)
91 91
92 92 def test_create_err(self):
93 93 self.log_user()
94 94 username = 'new_user'
95 95 password = ''
96 96 name = 'name'
97 97 lastname = 'lastname'
98 98 email = 'errmail.com'
99 99
100 100 response = self.app.get(url('new_user'))
101 101
102 102 response = self.app.post(url('users'), params={
103 103 'username': username,
104 104 'password': password,
105 105 'name': name,
106 106 'active': False,
107 107 'lastname': lastname,
108 108 'email': email,
109 109 'csrf_token': self.csrf_token,
110 110 })
111 111
112 112 msg = validators.ValidUsername(
113 113 False, {})._messages['system_invalid_username']
114 114 msg = h.html_escape(msg % {'username': 'new_user'})
115 115 response.mustcontain('<span class="error-message">%s</span>' % msg)
116 116 response.mustcontain(
117 117 '<span class="error-message">Please enter a value</span>')
118 118 response.mustcontain(
119 119 '<span class="error-message">An email address must contain a'
120 120 ' single @</span>')
121 121
122 122 def get_user():
123 123 Session().query(User).filter(User.username == username).one()
124 124
125 125 with pytest.raises(NoResultFound):
126 126 get_user()
127 127
128 128 def test_new(self):
129 129 self.log_user()
130 130 self.app.get(url('new_user'))
131 131
132 132 @pytest.mark.parametrize("name, attrs", [
133 133 ('firstname', {'firstname': 'new_username'}),
134 134 ('lastname', {'lastname': 'new_username'}),
135 135 ('admin', {'admin': True}),
136 136 ('admin', {'admin': False}),
137 137 ('extern_type', {'extern_type': 'ldap'}),
138 138 ('extern_type', {'extern_type': None}),
139 139 ('extern_name', {'extern_name': 'test'}),
140 140 ('extern_name', {'extern_name': None}),
141 141 ('active', {'active': False}),
142 142 ('active', {'active': True}),
143 143 ('email', {'email': 'some@email.com'}),
144 144 ('language', {'language': 'de'}),
145 145 ('language', {'language': 'en'}),
146 146 # ('new_password', {'new_password': 'foobar123',
147 147 # 'password_confirmation': 'foobar123'})
148 148 ])
149 149 def test_update(self, name, attrs):
150 150 self.log_user()
151 151 usr = fixture.create_user(self.test_user_1, password='qweqwe',
152 152 email='testme@rhodecode.org',
153 153 extern_type='rhodecode',
154 154 extern_name=self.test_user_1,
155 155 skip_if_exists=True)
156 156 Session().commit()
157 157 self.destroy_users.add(self.test_user_1)
158 158 params = usr.get_api_data()
159 159 cur_lang = params['language'] or 'en'
160 160 params.update({
161 161 'password_confirmation': '',
162 162 'new_password': '',
163 163 'language': cur_lang,
164 164 '_method': 'put',
165 165 'csrf_token': self.csrf_token,
166 166 })
167 167 params.update({'new_password': ''})
168 168 params.update(attrs)
169 169 if name == 'email':
170 170 params['emails'] = [attrs['email']]
171 171 elif name == 'extern_type':
172 172 # cannot update this via form, expected value is original one
173 173 params['extern_type'] = "rhodecode"
174 174 elif name == 'extern_name':
175 175 # cannot update this via form, expected value is original one
176 176 params['extern_name'] = self.test_user_1
177 177 # special case since this user is not
178 178 # logged in yet his data is not filled
179 179 # so we use creation data
180 180
181 181 response = self.app.post(url('user', user_id=usr.user_id), params)
182 182 assert response.status_int == 302
183 183 assert_session_flash(response, 'User updated successfully')
184 184
185 185 updated_user = User.get_by_username(self.test_user_1)
186 186 updated_params = updated_user.get_api_data()
187 187 updated_params.update({'password_confirmation': ''})
188 188 updated_params.update({'new_password': ''})
189 189
190 190 del params['_method']
191 191 del params['csrf_token']
192 192 assert params == updated_params
193 193
194 194 def test_update_and_migrate_password(
195 195 self, autologin_user, real_crypto_backend):
196 196 from rhodecode.lib import auth
197 197
198 198 # create new user, with sha256 password
199 199 temp_user = 'test_admin_sha256'
200 200 user = fixture.create_user(temp_user)
201 201 user.password = auth._RhodeCodeCryptoSha256().hash_create(
202 202 b'test123')
203 203 Session().add(user)
204 204 Session().commit()
205 205 self.destroy_users.add('test_admin_sha256')
206 206
207 207 params = user.get_api_data()
208 208
209 209 params.update({
210 210 'password_confirmation': 'qweqwe123',
211 211 'new_password': 'qweqwe123',
212 212 'language': 'en',
213 213 '_method': 'put',
214 214 'csrf_token': autologin_user.csrf_token,
215 215 })
216 216
217 217 response = self.app.post(url('user', user_id=user.user_id), params)
218 218 assert response.status_int == 302
219 219 assert_session_flash(response, 'User updated successfully')
220 220
221 221 # new password should be bcrypted, after log-in and transfer
222 222 user = User.get_by_username(temp_user)
223 223 assert user.password.startswith('$')
224 224
225 225 updated_user = User.get_by_username(temp_user)
226 226 updated_params = updated_user.get_api_data()
227 227 updated_params.update({'password_confirmation': 'qweqwe123'})
228 228 updated_params.update({'new_password': 'qweqwe123'})
229 229
230 230 del params['_method']
231 231 del params['csrf_token']
232 232 assert params == updated_params
233 233
234 234 def test_delete(self):
235 235 self.log_user()
236 236 username = 'newtestuserdeleteme'
237 237
238 238 fixture.create_user(name=username)
239 239
240 240 new_user = Session().query(User)\
241 241 .filter(User.username == username).one()
242 242 response = self.app.post(url('user', user_id=new_user.user_id),
243 243 params={'_method': 'delete',
244 244 'csrf_token': self.csrf_token})
245 245
246 246 assert_session_flash(response, 'Successfully deleted user')
247 247
248 248 def test_delete_owner_of_repository(self):
249 249 self.log_user()
250 250 username = 'newtestuserdeleteme_repo_owner'
251 251 obj_name = 'test_repo'
252 252 usr = fixture.create_user(name=username)
253 253 self.destroy_users.add(username)
254 254 fixture.create_repo(obj_name, cur_user=usr.username)
255 255
256 256 new_user = Session().query(User)\
257 257 .filter(User.username == username).one()
258 258 response = self.app.post(url('user', user_id=new_user.user_id),
259 259 params={'_method': 'delete',
260 260 'csrf_token': self.csrf_token})
261 261
262 262 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
263 263 'Switch owners or remove those repositories:%s' % (username,
264 264 obj_name)
265 265 assert_session_flash(response, msg)
266 266 fixture.destroy_repo(obj_name)
267 267
268 268 def test_delete_owner_of_repository_detaching(self):
269 269 self.log_user()
270 270 username = 'newtestuserdeleteme_repo_owner_detach'
271 271 obj_name = 'test_repo'
272 272 usr = fixture.create_user(name=username)
273 273 self.destroy_users.add(username)
274 274 fixture.create_repo(obj_name, cur_user=usr.username)
275 275
276 276 new_user = Session().query(User)\
277 277 .filter(User.username == username).one()
278 278 response = self.app.post(url('user', user_id=new_user.user_id),
279 279 params={'_method': 'delete',
280 280 'user_repos': 'detach',
281 281 'csrf_token': self.csrf_token})
282 282
283 283 msg = 'Detached 1 repositories'
284 284 assert_session_flash(response, msg)
285 285 fixture.destroy_repo(obj_name)
286 286
287 287 def test_delete_owner_of_repository_deleting(self):
288 288 self.log_user()
289 289 username = 'newtestuserdeleteme_repo_owner_delete'
290 290 obj_name = 'test_repo'
291 291 usr = fixture.create_user(name=username)
292 292 self.destroy_users.add(username)
293 293 fixture.create_repo(obj_name, cur_user=usr.username)
294 294
295 295 new_user = Session().query(User)\
296 296 .filter(User.username == username).one()
297 297 response = self.app.post(url('user', user_id=new_user.user_id),
298 298 params={'_method': 'delete',
299 299 'user_repos': 'delete',
300 300 'csrf_token': self.csrf_token})
301 301
302 302 msg = 'Deleted 1 repositories'
303 303 assert_session_flash(response, msg)
304 304
305 305 def test_delete_owner_of_repository_group(self):
306 306 self.log_user()
307 307 username = 'newtestuserdeleteme_repo_group_owner'
308 308 obj_name = 'test_group'
309 309 usr = fixture.create_user(name=username)
310 310 self.destroy_users.add(username)
311 311 fixture.create_repo_group(obj_name, cur_user=usr.username)
312 312
313 313 new_user = Session().query(User)\
314 314 .filter(User.username == username).one()
315 315 response = self.app.post(url('user', user_id=new_user.user_id),
316 316 params={'_method': 'delete',
317 317 'csrf_token': self.csrf_token})
318 318
319 319 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
320 320 'Switch owners or remove those repository groups:%s' % (username,
321 321 obj_name)
322 322 assert_session_flash(response, msg)
323 323 fixture.destroy_repo_group(obj_name)
324 324
325 325 def test_delete_owner_of_repository_group_detaching(self):
326 326 self.log_user()
327 327 username = 'newtestuserdeleteme_repo_group_owner_detach'
328 328 obj_name = 'test_group'
329 329 usr = fixture.create_user(name=username)
330 330 self.destroy_users.add(username)
331 331 fixture.create_repo_group(obj_name, cur_user=usr.username)
332 332
333 333 new_user = Session().query(User)\
334 334 .filter(User.username == username).one()
335 335 response = self.app.post(url('user', user_id=new_user.user_id),
336 336 params={'_method': 'delete',
337 337 'user_repo_groups': 'delete',
338 338 'csrf_token': self.csrf_token})
339 339
340 340 msg = 'Deleted 1 repository groups'
341 341 assert_session_flash(response, msg)
342 342
343 343 def test_delete_owner_of_repository_group_deleting(self):
344 344 self.log_user()
345 345 username = 'newtestuserdeleteme_repo_group_owner_delete'
346 346 obj_name = 'test_group'
347 347 usr = fixture.create_user(name=username)
348 348 self.destroy_users.add(username)
349 349 fixture.create_repo_group(obj_name, cur_user=usr.username)
350 350
351 351 new_user = Session().query(User)\
352 352 .filter(User.username == username).one()
353 353 response = self.app.post(url('user', user_id=new_user.user_id),
354 354 params={'_method': 'delete',
355 355 'user_repo_groups': 'detach',
356 356 'csrf_token': self.csrf_token})
357 357
358 358 msg = 'Detached 1 repository groups'
359 359 assert_session_flash(response, msg)
360 360 fixture.destroy_repo_group(obj_name)
361 361
362 362 def test_delete_owner_of_user_group(self):
363 363 self.log_user()
364 364 username = 'newtestuserdeleteme_user_group_owner'
365 365 obj_name = 'test_user_group'
366 366 usr = fixture.create_user(name=username)
367 367 self.destroy_users.add(username)
368 368 fixture.create_user_group(obj_name, cur_user=usr.username)
369 369
370 370 new_user = Session().query(User)\
371 371 .filter(User.username == username).one()
372 372 response = self.app.post(url('user', user_id=new_user.user_id),
373 373 params={'_method': 'delete',
374 374 'csrf_token': self.csrf_token})
375 375
376 376 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
377 377 'Switch owners or remove those user groups:%s' % (username,
378 378 obj_name)
379 379 assert_session_flash(response, msg)
380 380 fixture.destroy_user_group(obj_name)
381 381
382 382 def test_delete_owner_of_user_group_detaching(self):
383 383 self.log_user()
384 384 username = 'newtestuserdeleteme_user_group_owner_detaching'
385 385 obj_name = 'test_user_group'
386 386 usr = fixture.create_user(name=username)
387 387 self.destroy_users.add(username)
388 388 fixture.create_user_group(obj_name, cur_user=usr.username)
389 389
390 390 new_user = Session().query(User)\
391 391 .filter(User.username == username).one()
392 392 try:
393 393 response = self.app.post(url('user', user_id=new_user.user_id),
394 394 params={'_method': 'delete',
395 395 'user_user_groups': 'detach',
396 396 'csrf_token': self.csrf_token})
397 397
398 398 msg = 'Detached 1 user groups'
399 399 assert_session_flash(response, msg)
400 400 finally:
401 401 fixture.destroy_user_group(obj_name)
402 402
403 403 def test_delete_owner_of_user_group_deleting(self):
404 404 self.log_user()
405 405 username = 'newtestuserdeleteme_user_group_owner_deleting'
406 406 obj_name = 'test_user_group'
407 407 usr = fixture.create_user(name=username)
408 408 self.destroy_users.add(username)
409 409 fixture.create_user_group(obj_name, cur_user=usr.username)
410 410
411 411 new_user = Session().query(User)\
412 412 .filter(User.username == username).one()
413 413 response = self.app.post(url('user', user_id=new_user.user_id),
414 414 params={'_method': 'delete',
415 415 'user_user_groups': 'delete',
416 416 'csrf_token': self.csrf_token})
417 417
418 418 msg = 'Deleted 1 user groups'
419 419 assert_session_flash(response, msg)
420 420
421 421 def test_show(self):
422 422 self.app.get(url('user', user_id=1))
423 423
424 424 def test_edit(self):
425 425 self.log_user()
426 426 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
427 427 self.app.get(url('edit_user', user_id=user.user_id))
428 428
429 429 @pytest.mark.parametrize(
430 430 'repo_create, repo_create_write, user_group_create, repo_group_create,'
431 431 'fork_create, inherit_default_permissions, expect_error,'
432 432 'expect_form_error', [
433 433 ('hg.create.none', 'hg.create.write_on_repogroup.false',
434 434 'hg.usergroup.create.false', 'hg.repogroup.create.false',
435 435 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
436 436 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
437 437 'hg.usergroup.create.false', 'hg.repogroup.create.false',
438 438 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
439 439 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
440 440 'hg.usergroup.create.true', 'hg.repogroup.create.true',
441 441 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
442 442 False),
443 443 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
444 444 'hg.usergroup.create.true', 'hg.repogroup.create.true',
445 445 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
446 446 True),
447 447 ('', '', '', '', '', '', True, False),
448 448 ])
449 449 def test_global_perms_on_user(
450 450 self, repo_create, repo_create_write, user_group_create,
451 451 repo_group_create, fork_create, expect_error, expect_form_error,
452 452 inherit_default_permissions):
453 453 self.log_user()
454 454 user = fixture.create_user('dummy')
455 455 uid = user.user_id
456 456
457 457 # ENABLE REPO CREATE ON A GROUP
458 458 perm_params = {
459 459 'inherit_default_permissions': False,
460 460 'default_repo_create': repo_create,
461 461 'default_repo_create_on_write': repo_create_write,
462 462 'default_user_group_create': user_group_create,
463 463 'default_repo_group_create': repo_group_create,
464 464 'default_fork_create': fork_create,
465 465 'default_inherit_default_permissions': inherit_default_permissions,
466 466 '_method': 'put',
467 467 'csrf_token': self.csrf_token,
468 468 }
469 469 response = self.app.post(
470 470 url('edit_user_global_perms', user_id=uid),
471 471 params=perm_params)
472 472
473 473 if expect_form_error:
474 474 assert response.status_int == 200
475 475 response.mustcontain('Value must be one of')
476 476 else:
477 477 if expect_error:
478 478 msg = 'An error occurred during permissions saving'
479 479 else:
480 480 msg = 'User global permissions updated successfully'
481 481 ug = User.get(uid)
482 482 del perm_params['_method']
483 483 del perm_params['inherit_default_permissions']
484 484 del perm_params['csrf_token']
485 485 assert perm_params == ug.get_default_perms()
486 486 assert_session_flash(response, msg)
487 487 fixture.destroy_user(uid)
488 488
489 489 def test_global_permissions_initial_values(self, user_util):
490 490 self.log_user()
491 491 user = user_util.create_user()
492 492 uid = user.user_id
493 493 response = self.app.get(url('edit_user_global_perms', user_id=uid))
494 494 default_user = User.get_default_user()
495 495 default_permissions = default_user.get_default_perms()
496 496 assert_response = AssertResponse(response)
497 497 expected_permissions = (
498 498 'default_repo_create', 'default_repo_create_on_write',
499 499 'default_fork_create', 'default_repo_group_create',
500 500 'default_user_group_create', 'default_inherit_default_permissions')
501 501 for permission in expected_permissions:
502 502 css_selector = '[name={}][checked=checked]'.format(permission)
503 503 element = assert_response.get_element(css_selector)
504 504 assert element.value == default_permissions[permission]
505 505
506 506 def test_ips(self):
507 507 self.log_user()
508 508 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
509 509 response = self.app.get(url('edit_user_ips', user_id=user.user_id))
510 510 response.mustcontain('All IP addresses are allowed')
511 511
512 512 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
513 513 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
514 514 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
515 515 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
516 516 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
517 517 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
518 518 ('127_bad_ip', 'foobar', 'foobar', True),
519 519 ])
520 520 def test_add_ip(self, test_name, ip, ip_range, failure):
521 521 self.log_user()
522 522 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
523 523 user_id = user.user_id
524 524
525 525 response = self.app.post(url('edit_user_ips', user_id=user_id),
526 526 params={'new_ip': ip, '_method': 'put',
527 527 'csrf_token': self.csrf_token})
528 528
529 529 if failure:
530 530 assert_session_flash(
531 531 response, 'Please enter a valid IPv4 or IpV6 address')
532 532 response = self.app.get(url('edit_user_ips', user_id=user_id))
533 533 response.mustcontain(no=[ip])
534 534 response.mustcontain(no=[ip_range])
535 535
536 536 else:
537 537 response = self.app.get(url('edit_user_ips', user_id=user_id))
538 538 response.mustcontain(ip)
539 539 response.mustcontain(ip_range)
540 540
541 541 # cleanup
542 542 for del_ip in UserIpMap.query().filter(
543 543 UserIpMap.user_id == user_id).all():
544 544 Session().delete(del_ip)
545 545 Session().commit()
546 546
547 547 def test_delete_ip(self):
548 548 self.log_user()
549 549 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
550 550 user_id = user.user_id
551 551 ip = '127.0.0.1/32'
552 552 ip_range = '127.0.0.1 - 127.0.0.1'
553 553 new_ip = UserModel().add_extra_ip(user_id, ip)
554 554 Session().commit()
555 555 new_ip_id = new_ip.ip_id
556 556
557 557 response = self.app.get(url('edit_user_ips', user_id=user_id))
558 558 response.mustcontain(ip)
559 559 response.mustcontain(ip_range)
560 560
561 561 self.app.post(url('edit_user_ips', user_id=user_id),
562 562 params={'_method': 'delete', 'del_ip_id': new_ip_id,
563 563 'csrf_token': self.csrf_token})
564 564
565 565 response = self.app.get(url('edit_user_ips', user_id=user_id))
566 566 response.mustcontain('All IP addresses are allowed')
567 567 response.mustcontain(no=[ip])
568 568 response.mustcontain(no=[ip_range])
569 569
570 570 def test_auth_tokens(self):
571 571 self.log_user()
572 572
573 573 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
574 574 response = self.app.get(
575 575 url('edit_user_auth_tokens', user_id=user.user_id))
576 response.mustcontain(user.api_key)
577 response.mustcontain('expires: never')
576 for token in user.auth_tokens:
577 response.mustcontain(token)
578 response.mustcontain('never')
578 579
579 580 @pytest.mark.parametrize("desc, lifetime", [
580 581 ('forever', -1),
581 582 ('5mins', 60*5),
582 583 ('30days', 60*60*24*30),
583 584 ])
584 def test_add_auth_token(self, desc, lifetime):
585 def test_add_auth_token(self, desc, lifetime, user_util):
585 586 self.log_user()
586 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
587 user = user_util.create_user()
587 588 user_id = user.user_id
588 589
589 590 response = self.app.post(
590 591 url('edit_user_auth_tokens', user_id=user_id),
591 592 {'_method': 'put', 'description': desc, 'lifetime': lifetime,
592 593 'csrf_token': self.csrf_token})
593 594 assert_session_flash(response, 'Auth token successfully created')
594 try:
595 response = response.follow()
596 user = User.get(user_id)
597 for auth_token in user.auth_tokens:
598 response.mustcontain(auth_token)
599 finally:
600 for api_key in UserApiKeys.query().filter(
601 UserApiKeys.user_id == user_id).all():
602 Session().delete(api_key)
603 Session().commit()
604 595
605 def test_remove_auth_token(self):
596 response = response.follow()
597 user = User.get(user_id)
598 for auth_token in user.auth_tokens:
599 response.mustcontain(auth_token)
600
601 def test_remove_auth_token(self, user_util):
606 602 self.log_user()
607 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
603 user = user_util.create_user()
608 604 user_id = user.user_id
609 605
610 606 response = self.app.post(
611 607 url('edit_user_auth_tokens', user_id=user_id),
612 608 {'_method': 'put', 'description': 'desc', 'lifetime': -1,
613 609 'csrf_token': self.csrf_token})
614 610 assert_session_flash(response, 'Auth token successfully created')
615 611 response = response.follow()
616 612
617 613 # now delete our key
618 614 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
619 assert 1 == len(keys)
615 assert 3 == len(keys)
620 616
621 617 response = self.app.post(
622 618 url('edit_user_auth_tokens', user_id=user_id),
623 619 {'_method': 'delete', 'del_auth_token': keys[0].api_key,
624 620 'csrf_token': self.csrf_token})
625 621 assert_session_flash(response, 'Auth token successfully deleted')
626 622 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
627 assert 0 == len(keys)
623 assert 2 == len(keys)
@@ -1,510 +1,511 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.config.routing import ADMIN_PREFIX
27 27 from rhodecode.tests import (
28 28 assert_session_flash, url, HG_REPO, TEST_USER_ADMIN_LOGIN)
29 29 from rhodecode.tests.fixture import Fixture
30 30 from rhodecode.tests.utils import AssertResponse, get_session_from_response
31 31 from rhodecode.lib.auth import check_password
32 32 from rhodecode.model.auth_token import AuthTokenModel
33 33 from rhodecode.model import validators
34 34 from rhodecode.model.db import User, Notification, UserApiKeys
35 35 from rhodecode.model.meta import Session
36 36
37 37 fixture = Fixture()
38 38
39 39 # Hardcode URLs because we don't have a request object to use
40 40 # pyramids URL generation methods.
41 41 index_url = '/'
42 42 login_url = ADMIN_PREFIX + '/login'
43 43 logut_url = ADMIN_PREFIX + '/logout'
44 44 register_url = ADMIN_PREFIX + '/register'
45 45 pwd_reset_url = ADMIN_PREFIX + '/password_reset'
46 46 pwd_reset_confirm_url = ADMIN_PREFIX + '/password_reset_confirmation'
47 47
48 48
49 49 @pytest.mark.usefixtures('app')
50 50 class TestLoginController(object):
51 51 destroy_users = set()
52 52
53 53 @classmethod
54 54 def teardown_class(cls):
55 55 fixture.destroy_users(cls.destroy_users)
56 56
57 57 def teardown_method(self, method):
58 58 for n in Notification.query().all():
59 59 Session().delete(n)
60 60
61 61 Session().commit()
62 62 assert Notification.query().all() == []
63 63
64 64 def test_index(self):
65 65 response = self.app.get(login_url)
66 66 assert response.status == '200 OK'
67 67 # Test response...
68 68
69 69 def test_login_admin_ok(self):
70 70 response = self.app.post(login_url,
71 71 {'username': 'test_admin',
72 72 'password': 'test12'})
73 73 assert response.status == '302 Found'
74 74 session = get_session_from_response(response)
75 75 username = session['rhodecode_user'].get('username')
76 76 assert username == 'test_admin'
77 77 response = response.follow()
78 78 response.mustcontain('/%s' % HG_REPO)
79 79
80 80 def test_login_regular_ok(self):
81 81 response = self.app.post(login_url,
82 82 {'username': 'test_regular',
83 83 'password': 'test12'})
84 84
85 85 assert response.status == '302 Found'
86 86 session = get_session_from_response(response)
87 87 username = session['rhodecode_user'].get('username')
88 88 assert username == 'test_regular'
89 89 response = response.follow()
90 90 response.mustcontain('/%s' % HG_REPO)
91 91
92 92 def test_login_ok_came_from(self):
93 93 test_came_from = '/_admin/users?branch=stable'
94 94 _url = '{}?came_from={}'.format(login_url, test_came_from)
95 95 response = self.app.post(
96 96 _url, {'username': 'test_admin', 'password': 'test12'})
97 97 assert response.status == '302 Found'
98 98 assert 'branch=stable' in response.location
99 99 response = response.follow()
100 100
101 101 assert response.status == '200 OK'
102 102 response.mustcontain('Users administration')
103 103
104 104 def test_redirect_to_login_with_get_args(self):
105 105 with fixture.anon_access(False):
106 106 kwargs = {'branch': 'stable'}
107 107 response = self.app.get(
108 108 url('summary_home', repo_name=HG_REPO, **kwargs))
109 109 assert response.status == '302 Found'
110 110 response_query = urlparse.parse_qsl(response.location)
111 111 assert 'branch=stable' in response_query[0][1]
112 112
113 113 def test_login_form_with_get_args(self):
114 114 _url = '{}?came_from=/_admin/users,branch=stable'.format(login_url)
115 115 response = self.app.get(_url)
116 116 assert 'branch%3Dstable' in response.form.action
117 117
118 118 @pytest.mark.parametrize("url_came_from", [
119 119 'data:text/html,<script>window.alert("xss")</script>',
120 120 'mailto:test@rhodecode.org',
121 121 'file:///etc/passwd',
122 122 'ftp://some.ftp.server',
123 123 'http://other.domain',
124 124 '/\r\nX-Forwarded-Host: http://example.org',
125 125 ])
126 126 def test_login_bad_came_froms(self, url_came_from):
127 127 _url = '{}?came_from={}'.format(login_url, url_came_from)
128 128 response = self.app.post(
129 129 _url,
130 130 {'username': 'test_admin', 'password': 'test12'})
131 131 assert response.status == '302 Found'
132 132 response = response.follow()
133 133 assert response.status == '200 OK'
134 134 assert response.request.path == '/'
135 135
136 136 def test_login_short_password(self):
137 137 response = self.app.post(login_url,
138 138 {'username': 'test_admin',
139 139 'password': 'as'})
140 140 assert response.status == '200 OK'
141 141
142 142 response.mustcontain('Enter 3 characters or more')
143 143
144 144 def test_login_wrong_non_ascii_password(self, user_regular):
145 145 response = self.app.post(
146 146 login_url,
147 147 {'username': user_regular.username,
148 148 'password': u'invalid-non-asci\xe4'.encode('utf8')})
149 149
150 150 response.mustcontain('invalid user name')
151 151 response.mustcontain('invalid password')
152 152
153 153 def test_login_with_non_ascii_password(self, user_util):
154 154 password = u'valid-non-ascii\xe4'
155 155 user = user_util.create_user(password=password)
156 156 response = self.app.post(
157 157 login_url,
158 158 {'username': user.username,
159 159 'password': password.encode('utf-8')})
160 160 assert response.status_code == 302
161 161
162 162 def test_login_wrong_username_password(self):
163 163 response = self.app.post(login_url,
164 164 {'username': 'error',
165 165 'password': 'test12'})
166 166
167 167 response.mustcontain('invalid user name')
168 168 response.mustcontain('invalid password')
169 169
170 170 def test_login_admin_ok_password_migration(self, real_crypto_backend):
171 171 from rhodecode.lib import auth
172 172
173 173 # create new user, with sha256 password
174 174 temp_user = 'test_admin_sha256'
175 175 user = fixture.create_user(temp_user)
176 176 user.password = auth._RhodeCodeCryptoSha256().hash_create(
177 177 b'test123')
178 178 Session().add(user)
179 179 Session().commit()
180 180 self.destroy_users.add(temp_user)
181 181 response = self.app.post(login_url,
182 182 {'username': temp_user,
183 183 'password': 'test123'})
184 184
185 185 assert response.status == '302 Found'
186 186 session = get_session_from_response(response)
187 187 username = session['rhodecode_user'].get('username')
188 188 assert username == temp_user
189 189 response = response.follow()
190 190 response.mustcontain('/%s' % HG_REPO)
191 191
192 192 # new password should be bcrypted, after log-in and transfer
193 193 user = User.get_by_username(temp_user)
194 194 assert user.password.startswith('$')
195 195
196 196 # REGISTRATIONS
197 197 def test_register(self):
198 198 response = self.app.get(register_url)
199 199 response.mustcontain('Create an Account')
200 200
201 201 def test_register_err_same_username(self):
202 202 uname = 'test_admin'
203 203 response = self.app.post(
204 204 register_url,
205 205 {
206 206 'username': uname,
207 207 'password': 'test12',
208 208 'password_confirmation': 'test12',
209 209 'email': 'goodmail@domain.com',
210 210 'firstname': 'test',
211 211 'lastname': 'test'
212 212 }
213 213 )
214 214
215 215 assertr = AssertResponse(response)
216 216 msg = validators.ValidUsername()._messages['username_exists']
217 217 msg = msg % {'username': uname}
218 218 assertr.element_contains('#username+.error-message', msg)
219 219
220 220 def test_register_err_same_email(self):
221 221 response = self.app.post(
222 222 register_url,
223 223 {
224 224 'username': 'test_admin_0',
225 225 'password': 'test12',
226 226 'password_confirmation': 'test12',
227 227 'email': 'test_admin@mail.com',
228 228 'firstname': 'test',
229 229 'lastname': 'test'
230 230 }
231 231 )
232 232
233 233 assertr = AssertResponse(response)
234 234 msg = validators.UniqSystemEmail()()._messages['email_taken']
235 235 assertr.element_contains('#email+.error-message', msg)
236 236
237 237 def test_register_err_same_email_case_sensitive(self):
238 238 response = self.app.post(
239 239 register_url,
240 240 {
241 241 'username': 'test_admin_1',
242 242 'password': 'test12',
243 243 'password_confirmation': 'test12',
244 244 'email': 'TesT_Admin@mail.COM',
245 245 'firstname': 'test',
246 246 'lastname': 'test'
247 247 }
248 248 )
249 249 assertr = AssertResponse(response)
250 250 msg = validators.UniqSystemEmail()()._messages['email_taken']
251 251 assertr.element_contains('#email+.error-message', msg)
252 252
253 253 def test_register_err_wrong_data(self):
254 254 response = self.app.post(
255 255 register_url,
256 256 {
257 257 'username': 'xs',
258 258 'password': 'test',
259 259 'password_confirmation': 'test',
260 260 'email': 'goodmailm',
261 261 'firstname': 'test',
262 262 'lastname': 'test'
263 263 }
264 264 )
265 265 assert response.status == '200 OK'
266 266 response.mustcontain('An email address must contain a single @')
267 267 response.mustcontain('Enter a value 6 characters long or more')
268 268
269 269 def test_register_err_username(self):
270 270 response = self.app.post(
271 271 register_url,
272 272 {
273 273 'username': 'error user',
274 274 'password': 'test12',
275 275 'password_confirmation': 'test12',
276 276 'email': 'goodmailm',
277 277 'firstname': 'test',
278 278 'lastname': 'test'
279 279 }
280 280 )
281 281
282 282 response.mustcontain('An email address must contain a single @')
283 283 response.mustcontain(
284 284 'Username may only contain '
285 285 'alphanumeric characters underscores, '
286 286 'periods or dashes and must begin with '
287 287 'alphanumeric character')
288 288
289 289 def test_register_err_case_sensitive(self):
290 290 usr = 'Test_Admin'
291 291 response = self.app.post(
292 292 register_url,
293 293 {
294 294 'username': usr,
295 295 'password': 'test12',
296 296 'password_confirmation': 'test12',
297 297 'email': 'goodmailm',
298 298 'firstname': 'test',
299 299 'lastname': 'test'
300 300 }
301 301 )
302 302
303 303 assertr = AssertResponse(response)
304 304 msg = validators.ValidUsername()._messages['username_exists']
305 305 msg = msg % {'username': usr}
306 306 assertr.element_contains('#username+.error-message', msg)
307 307
308 308 def test_register_special_chars(self):
309 309 response = self.app.post(
310 310 register_url,
311 311 {
312 312 'username': 'xxxaxn',
313 313 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
314 314 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
315 315 'email': 'goodmailm@test.plx',
316 316 'firstname': 'test',
317 317 'lastname': 'test'
318 318 }
319 319 )
320 320
321 321 msg = validators.ValidPassword()._messages['invalid_password']
322 322 response.mustcontain(msg)
323 323
324 324 def test_register_password_mismatch(self):
325 325 response = self.app.post(
326 326 register_url,
327 327 {
328 328 'username': 'xs',
329 329 'password': '123qwe',
330 330 'password_confirmation': 'qwe123',
331 331 'email': 'goodmailm@test.plxa',
332 332 'firstname': 'test',
333 333 'lastname': 'test'
334 334 }
335 335 )
336 336 msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
337 337 response.mustcontain(msg)
338 338
339 339 def test_register_ok(self):
340 340 username = 'test_regular4'
341 341 password = 'qweqwe'
342 342 email = 'marcin@test.com'
343 343 name = 'testname'
344 344 lastname = 'testlastname'
345 345
346 346 response = self.app.post(
347 347 register_url,
348 348 {
349 349 'username': username,
350 350 'password': password,
351 351 'password_confirmation': password,
352 352 'email': email,
353 353 'firstname': name,
354 354 'lastname': lastname,
355 355 'admin': True
356 356 }
357 357 ) # This should be overriden
358 358 assert response.status == '302 Found'
359 359 assert_session_flash(
360 360 response, 'You have successfully registered with RhodeCode')
361 361
362 362 ret = Session().query(User).filter(
363 363 User.username == 'test_regular4').one()
364 364 assert ret.username == username
365 365 assert check_password(password, ret.password)
366 366 assert ret.email == email
367 367 assert ret.name == name
368 368 assert ret.lastname == lastname
369 assert ret.api_key is not None
369 assert ret.auth_tokens is not None
370 370 assert not ret.admin
371 371
372 372 def test_forgot_password_wrong_mail(self):
373 373 bad_email = 'marcin@wrongmail.org'
374 374 response = self.app.post(
375 375 pwd_reset_url, {'email': bad_email, }
376 376 )
377 377 assert_session_flash(response,
378 378 'If such email exists, a password reset link was sent to it.')
379 379
380 380 def test_forgot_password(self, user_util):
381 381 response = self.app.get(pwd_reset_url)
382 382 assert response.status == '200 OK'
383 383
384 384 user = user_util.create_user()
385 385 user_id = user.user_id
386 386 email = user.email
387 387
388 388 response = self.app.post(pwd_reset_url, {'email': email, })
389 389
390 390 assert_session_flash(response,
391 391 'If such email exists, a password reset link was sent to it.')
392 392
393 393 # BAD KEY
394 394 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, 'badkey')
395 395 response = self.app.get(confirm_url)
396 396 assert response.status == '302 Found'
397 397 assert response.location.endswith(pwd_reset_url)
398 398 assert_session_flash(response, 'Given reset token is invalid')
399 399
400 400 response.follow() # cleanup flash
401 401
402 402 # GOOD KEY
403 403 key = UserApiKeys.query()\
404 404 .filter(UserApiKeys.user_id == user_id)\
405 405 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
406 406 .first()
407 407
408 408 assert key
409 409
410 410 confirm_url = '{}?key={}'.format(pwd_reset_confirm_url, key.api_key)
411 411 response = self.app.get(confirm_url)
412 412 assert response.status == '302 Found'
413 413 assert response.location.endswith(login_url)
414 414
415 415 assert_session_flash(
416 416 response,
417 417 'Your password reset was successful, '
418 418 'a new password has been sent to your email')
419 419
420 420 response.follow()
421 421
422 422 def _get_api_whitelist(self, values=None):
423 423 config = {'api_access_controllers_whitelist': values or []}
424 424 return config
425 425
426 426 @pytest.mark.parametrize("test_name, auth_token", [
427 427 ('none', None),
428 428 ('empty_string', ''),
429 429 ('fake_number', '123456'),
430 430 ('proper_auth_token', None)
431 431 ])
432 432 def test_access_not_whitelisted_page_via_auth_token(
433 433 self, test_name, auth_token, user_admin):
434 434
435 435 whitelist = self._get_api_whitelist([])
436 436 with mock.patch.dict('rhodecode.CONFIG', whitelist):
437 437 assert [] == whitelist['api_access_controllers_whitelist']
438 438 if test_name == 'proper_auth_token':
439 439 # use builtin if api_key is None
440 440 auth_token = user_admin.api_key
441 441
442 442 with fixture.anon_access(False):
443 443 self.app.get(url(controller='changeset',
444 444 action='changeset_raw',
445 445 repo_name=HG_REPO, revision='tip',
446 446 api_key=auth_token),
447 447 status=302)
448 448
449 449 @pytest.mark.parametrize("test_name, auth_token, code", [
450 450 ('none', None, 302),
451 451 ('empty_string', '', 302),
452 452 ('fake_number', '123456', 302),
453 453 ('proper_auth_token', None, 200)
454 454 ])
455 455 def test_access_whitelisted_page_via_auth_token(
456 456 self, test_name, auth_token, code, user_admin):
457 457
458 458 whitelist_entry = ['ChangesetController:changeset_raw']
459 459 whitelist = self._get_api_whitelist(whitelist_entry)
460 460
461 461 with mock.patch.dict('rhodecode.CONFIG', whitelist):
462 462 assert whitelist_entry == whitelist['api_access_controllers_whitelist']
463 463
464 464 if test_name == 'proper_auth_token':
465 465 auth_token = user_admin.api_key
466 assert auth_token
466 467
467 468 with fixture.anon_access(False):
468 469 self.app.get(url(controller='changeset',
469 470 action='changeset_raw',
470 471 repo_name=HG_REPO, revision='tip',
471 472 api_key=auth_token),
472 473 status=code)
473 474
474 475 def test_access_page_via_extra_auth_token(self):
475 476 whitelist = self._get_api_whitelist(
476 477 ['ChangesetController:changeset_raw'])
477 478 with mock.patch.dict('rhodecode.CONFIG', whitelist):
478 479 assert ['ChangesetController:changeset_raw'] == \
479 480 whitelist['api_access_controllers_whitelist']
480 481
481 482 new_auth_token = AuthTokenModel().create(
482 483 TEST_USER_ADMIN_LOGIN, 'test')
483 484 Session().commit()
484 485 with fixture.anon_access(False):
485 486 self.app.get(url(controller='changeset',
486 487 action='changeset_raw',
487 488 repo_name=HG_REPO, revision='tip',
488 489 api_key=new_auth_token.api_key),
489 490 status=200)
490 491
491 492 def test_access_page_via_expired_auth_token(self):
492 493 whitelist = self._get_api_whitelist(
493 494 ['ChangesetController:changeset_raw'])
494 495 with mock.patch.dict('rhodecode.CONFIG', whitelist):
495 496 assert ['ChangesetController:changeset_raw'] == \
496 497 whitelist['api_access_controllers_whitelist']
497 498
498 499 new_auth_token = AuthTokenModel().create(
499 500 TEST_USER_ADMIN_LOGIN, 'test')
500 501 Session().commit()
501 502 # patch the api key and make it expired
502 503 new_auth_token.expires = 0
503 504 Session().add(new_auth_token)
504 505 Session().commit()
505 506 with fixture.anon_access(False):
506 507 self.app.get(url(controller='changeset',
507 508 action='changeset_raw',
508 509 repo_name=HG_REPO, revision='tip',
509 510 api_key=new_auth_token.api_key),
510 511 status=302)
@@ -1,86 +1,92 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.lib.db_manage import DbManage
25 25 from rhodecode.model import db
26 26
27 27
28 28 @pytest.fixture
29 29 def db_manage(pylonsapp):
30 30 db_manage = DbManage(
31 31 log_sql=True, dbconf='fake', root='fake', tests=False,
32 32 cli_args={}, SESSION=db.Session())
33 33 return db_manage
34 34
35 35
36 36 @pytest.fixture(autouse=True)
37 37 def session_rollback(pylonsapp, request):
38 38 """
39 39 Rollback the database session after the test run.
40 40
41 41 Intended usage is for tests wich mess with the database but don't
42 42 commit. In this case a rollback after the test run will leave the database
43 43 in a clean state.
44 44
45 45 This is still a workaround until we find a way to isolate the tests better
46 46 from each other.
47 47 """
48 48 @request.addfinalizer
49 49 def cleanup():
50 50 db.Session().rollback()
51 51
52 52
53 53 def test_create_admin_and_prompt_uses_getpass(db_manage):
54 54 db_manage.cli_args = {
55 55 'username': 'test',
56 56 'email': 'test@example.com'}
57 57 with mock.patch('getpass.getpass', return_value='password') as getpass:
58 58 db_manage.create_admin_and_prompt()
59 59 assert getpass.called
60 60
61 61
62 62 def test_create_admin_and_prompt_sets_the_api_key(db_manage):
63 63 db_manage.cli_args = {
64 64 'username': 'test',
65 65 'password': 'testpassword',
66 66 'email': 'test@example.com',
67 67 'api_key': 'testkey'}
68 68 with mock.patch.object(db_manage, 'create_user') as create_user:
69 69 db_manage.create_admin_and_prompt()
70 70
71 71 assert create_user.call_args[1]['api_key'] == 'testkey'
72 72
73 73
74 def test_create_user_sets_the_api_key(db_manage):
74 @pytest.mark.parametrize('add_keys', [True, False])
75 def test_create_user_sets_the_api_key(db_manage, add_keys):
76 username = 'test_add_keys_{}'.format(add_keys)
75 77 db_manage.create_user(
76 'test', 'testpassword', 'test@example.com',
77 api_key='testkey')
78 username, 'testpassword', 'test@example.com',
79 api_key=add_keys)
78 80
79 user = db.User.get_by_username('test')
80 assert user.api_key == 'testkey'
81 user = db.User.get_by_username(username)
82 if add_keys:
83 assert 2 == len(user.auth_tokens)
84 else:
85 # only feed token
86 assert 1 == len(user.auth_tokens)
81 87
82 88
83 89 def test_create_user_without_api_key(db_manage):
84 90 db_manage.create_user('test', 'testpassword', 'test@example.com')
85 91 user = db.User.get_by_username('test')
86 assert user.api_key
92 assert user.api_key is None
@@ -1,245 +1,242 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 from sqlalchemy.sql.expression import true
23 23
24 24 from rhodecode.model.db import User, UserGroup, UserGroupMember, UserEmailMap,\
25 25 Permission, UserIpMap
26 26 from rhodecode.model.meta import Session
27 27 from rhodecode.model.user import UserModel
28 28 from rhodecode.model.user_group import UserGroupModel
29 29 from rhodecode.model.repo import RepoModel
30 30 from rhodecode.model.repo_group import RepoGroupModel
31 31 from rhodecode.tests.fixture import Fixture
32 32
33 33 fixture = Fixture()
34 34
35 35
36 36 @pytest.fixture
37 37 def test_user(request, pylonsapp):
38 38 usr = UserModel().create_or_update(
39 39 username=u'test_user',
40 40 password=u'qweqwe',
41 41 email=u'main_email@rhodecode.org',
42 42 firstname=u'u1', lastname=u'u1')
43 43 Session().commit()
44 44 assert User.get_by_username(u'test_user') == usr
45 45
46 46 @request.addfinalizer
47 47 def cleanup():
48 48 if UserModel().get_user(usr.user_id) is None:
49 49 return
50 50
51 51 perm = Permission.query().all()
52 52 for p in perm:
53 53 UserModel().revoke_perm(usr, p)
54 54
55 55 UserModel().delete(usr.user_id)
56 56 Session().commit()
57 57
58 58 return usr
59 59
60 60
61 61 def test_create_and_remove(test_user):
62 62 usr = test_user
63 63
64 64 # make user group
65 65 user_group = fixture.create_user_group('some_example_group')
66 66 Session().commit()
67 67
68 68 UserGroupModel().add_user_to_group(user_group, usr)
69 69 Session().commit()
70 70
71 71 assert UserGroup.get(user_group.users_group_id) == user_group
72 72 assert UserGroupMember.query().count() == 1
73 73 UserModel().delete(usr.user_id)
74 74 Session().commit()
75 75
76 76 assert UserGroupMember.query().all() == []
77 77
78 78
79 79 def test_additonal_email_as_main(test_user):
80 80 with pytest.raises(AttributeError):
81 81 m = UserEmailMap()
82 82 m.email = test_user.email
83 83 m.user = test_user
84 84 Session().add(m)
85 85 Session().commit()
86 86
87 87
88 88 def test_extra_email_map(test_user):
89 89
90 90 m = UserEmailMap()
91 91 m.email = u'main_email2@rhodecode.org'
92 92 m.user = test_user
93 93 Session().add(m)
94 94 Session().commit()
95 95
96 96 u = User.get_by_email(email='main_email@rhodecode.org')
97 97 assert test_user.user_id == u.user_id
98 98 assert test_user.username == u.username
99 99
100 100 u = User.get_by_email(email='main_email2@rhodecode.org')
101 101 assert test_user.user_id == u.user_id
102 102 assert test_user.username == u.username
103 103 u = User.get_by_email(email='main_email3@rhodecode.org')
104 104 assert u is None
105 105
106 106
107 107 def test_get_api_data_replaces_secret_data_by_default(test_user):
108 108 api_data = test_user.get_api_data()
109 109 api_key_length = 40
110 110 expected_replacement = '*' * api_key_length
111 111
112 assert api_data['api_key'] == expected_replacement
113 112 for key in api_data['api_keys']:
114 113 assert key == expected_replacement
115 114
116 115
117 116 def test_get_api_data_includes_secret_data_if_activated(test_user):
118 117 api_data = test_user.get_api_data(include_secrets=True)
119
120 assert api_data['api_key'] == test_user.api_key
121 118 assert api_data['api_keys'] == test_user.auth_tokens
122 119
123 120
124 121 def test_add_perm(test_user):
125 122 perm = Permission.query().all()[0]
126 123 UserModel().grant_perm(test_user, perm)
127 124 Session().commit()
128 125 assert UserModel().has_perm(test_user, perm)
129 126
130 127
131 128 def test_has_perm(test_user):
132 129 perm = Permission.query().all()
133 130 for p in perm:
134 131 assert not UserModel().has_perm(test_user, p)
135 132
136 133
137 134 def test_revoke_perm(test_user):
138 135 perm = Permission.query().all()[0]
139 136 UserModel().grant_perm(test_user, perm)
140 137 Session().commit()
141 138 assert UserModel().has_perm(test_user, perm)
142 139
143 140 # revoke
144 141 UserModel().revoke_perm(test_user, perm)
145 142 Session().commit()
146 143 assert not UserModel().has_perm(test_user, perm)
147 144
148 145
149 146 @pytest.mark.parametrize("ip_range, expected, expect_errors", [
150 147 ('', [], False),
151 148 ('127.0.0.1', ['127.0.0.1'], False),
152 149 ('127.0.0.1,127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
153 150 ('127.0.0.1 , 127.0.0.2', ['127.0.0.1', '127.0.0.2'], False),
154 151 (
155 152 '127.0.0.1,172.172.172.0,127.0.0.2',
156 153 ['127.0.0.1', '172.172.172.0', '127.0.0.2'], False),
157 154 (
158 155 '127.0.0.1-127.0.0.5',
159 156 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
160 157 False),
161 158 (
162 159 '127.0.0.1 - 127.0.0.5',
163 160 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5'],
164 161 False
165 162 ),
166 163 ('-', [], True),
167 164 ('127.0.0.1-32', [], True),
168 165 (
169 166 '127.0.0.1,127.0.0.1,127.0.0.1,127.0.0.1-127.0.0.2,127.0.0.2',
170 167 ['127.0.0.1', '127.0.0.2'], False),
171 168 (
172 169 '127.0.0.1-127.0.0.2,127.0.0.4-127.0.0.6,',
173 170 ['127.0.0.1', '127.0.0.2', '127.0.0.4', '127.0.0.5', '127.0.0.6'],
174 171 False
175 172 ),
176 173 (
177 174 '127.0.0.1-127.0.0.2,127.0.0.1-127.0.0.6,',
178 175 ['127.0.0.1', '127.0.0.2', '127.0.0.3', '127.0.0.4', '127.0.0.5',
179 176 '127.0.0.6'],
180 177 False
181 178 ),
182 179 ])
183 180 def test_ip_range_generator(ip_range, expected, expect_errors):
184 181 func = UserModel().parse_ip_range
185 182 if expect_errors:
186 183 pytest.raises(Exception, func, ip_range)
187 184 else:
188 185 parsed_list = func(ip_range)
189 186 assert parsed_list == expected
190 187
191 188
192 189 def test_user_delete_cascades_ip_whitelist(test_user):
193 190 sample_ip = '1.1.1.1'
194 191 uid_map = UserIpMap(user_id=test_user.user_id, ip_addr=sample_ip)
195 192 Session().add(uid_map)
196 193 Session().delete(test_user)
197 194 try:
198 195 Session().flush()
199 196 finally:
200 197 Session().rollback()
201 198
202 199
203 200 def test_account_for_deactivation_generation(test_user):
204 201 accounts = UserModel().get_accounts_in_creation_order(
205 202 current_user=test_user)
206 203 # current user should be #1 in the list
207 204 assert accounts[0] == test_user.user_id
208 205 active_users = User.query().filter(User.active == true()).count()
209 206 assert active_users == len(accounts)
210 207
211 208
212 209 def test_user_delete_cascades_permissions_on_repo(backend, test_user):
213 210 test_repo = backend.create_repo()
214 211 RepoModel().grant_user_permission(
215 212 test_repo, test_user, 'repository.write')
216 213 Session().commit()
217 214
218 215 assert test_user.repo_to_perm
219 216
220 217 UserModel().delete(test_user)
221 218 Session().commit()
222 219
223 220
224 221 def test_user_delete_cascades_permissions_on_repo_group(
225 222 test_repo_group, test_user):
226 223 RepoGroupModel().grant_user_permission(
227 224 test_repo_group, test_user, 'group.write')
228 225 Session().commit()
229 226
230 227 assert test_user.repo_group_to_perm
231 228
232 229 Session().delete(test_user)
233 230 Session().commit()
234 231
235 232
236 233 def test_user_delete_cascades_permissions_on_user_group(
237 234 test_user_group, test_user):
238 235 UserGroupModel().grant_user_permission(
239 236 test_user_group, test_user, 'usergroup.write')
240 237 Session().commit()
241 238
242 239 assert test_user.user_group_to_perm
243 240
244 241 Session().delete(test_user)
245 242 Session().commit()
General Comments 0
You need to be logged in to leave comments. Login now