##// END OF EJS Templates
auth: add scope and login restrictions to rhodecode plugin, and scope restriction to token plugin....
marcink -
r3392:5cc5c872 default
parent child Browse files
Show More
@@ -1,569 +1,579 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25
26 26 from rhodecode.tests import (
27 27 assert_session_flash, HG_REPO, TEST_USER_ADMIN_LOGIN,
28 28 no_newline_id_generator)
29 29 from rhodecode.tests.fixture import Fixture
30 30 from rhodecode.lib.auth import check_password
31 31 from rhodecode.lib import helpers as h
32 32 from rhodecode.model.auth_token import AuthTokenModel
33 33 from rhodecode.model.db import User, Notification, UserApiKeys
34 34 from rhodecode.model.meta import Session
35 35
36 36 fixture = Fixture()
37 37
38 38 whitelist_view = ['RepoCommitsView:repo_commit_raw']
39 39
40 40
41 41 def route_path(name, params=None, **kwargs):
42 42 import urllib
43 43 from rhodecode.apps._base import ADMIN_PREFIX
44 44
45 45 base_url = {
46 46 'login': ADMIN_PREFIX + '/login',
47 47 'logout': ADMIN_PREFIX + '/logout',
48 48 'register': ADMIN_PREFIX + '/register',
49 49 'reset_password':
50 50 ADMIN_PREFIX + '/password_reset',
51 51 'reset_password_confirmation':
52 52 ADMIN_PREFIX + '/password_reset_confirmation',
53 53
54 54 'admin_permissions_application':
55 55 ADMIN_PREFIX + '/permissions/application',
56 56 'admin_permissions_application_update':
57 57 ADMIN_PREFIX + '/permissions/application/update',
58 58
59 59 'repo_commit_raw': '/{repo_name}/raw-changeset/{commit_id}'
60 60
61 61 }[name].format(**kwargs)
62 62
63 63 if params:
64 64 base_url = '{}?{}'.format(base_url, urllib.urlencode(params))
65 65 return base_url
66 66
67 67
68 68 @pytest.mark.usefixtures('app')
69 69 class TestLoginController(object):
70 70 destroy_users = set()
71 71
72 72 @classmethod
73 73 def teardown_class(cls):
74 74 fixture.destroy_users(cls.destroy_users)
75 75
76 76 def teardown_method(self, method):
77 77 for n in Notification.query().all():
78 78 Session().delete(n)
79 79
80 80 Session().commit()
81 81 assert Notification.query().all() == []
82 82
83 83 def test_index(self):
84 84 response = self.app.get(route_path('login'))
85 85 assert response.status == '200 OK'
86 86 # Test response...
87 87
88 88 def test_login_admin_ok(self):
89 89 response = self.app.post(route_path('login'),
90 90 {'username': 'test_admin',
91 91 'password': 'test12'}, status=302)
92 92 response = response.follow()
93 93 session = response.get_session_from_response()
94 94 username = session['rhodecode_user'].get('username')
95 95 assert username == 'test_admin'
96 96 response.mustcontain('/%s' % HG_REPO)
97 97
98 98 def test_login_regular_ok(self):
99 99 response = self.app.post(route_path('login'),
100 100 {'username': 'test_regular',
101 101 'password': 'test12'}, status=302)
102 102
103 103 response = response.follow()
104 104 session = response.get_session_from_response()
105 105 username = session['rhodecode_user'].get('username')
106 106 assert username == 'test_regular'
107 107
108 108 response.mustcontain('/%s' % HG_REPO)
109 109
110 110 def test_login_regular_forbidden_when_super_admin_restriction(self):
111 111 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
112 with fixture.login_restriction(RhodeCodeAuthPlugin.LOGIN_RESTRICTION_SUPER_ADMIN):
112 with fixture.auth_restriction(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN):
113 response = self.app.post(route_path('login'),
114 {'username': 'test_regular',
115 'password': 'test12'})
116
117 response.mustcontain('invalid user name')
118 response.mustcontain('invalid password')
119
120 def test_login_regular_forbidden_when_scope_restriction(self):
121 from rhodecode.authentication.plugins.auth_rhodecode import RhodeCodeAuthPlugin
122 with fixture.scope_restriction(RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS):
113 123 response = self.app.post(route_path('login'),
114 124 {'username': 'test_regular',
115 125 'password': 'test12'})
116 126
117 127 response.mustcontain('invalid user name')
118 128 response.mustcontain('invalid password')
119 129
120 130 def test_login_ok_came_from(self):
121 131 test_came_from = '/_admin/users?branch=stable'
122 132 _url = '{}?came_from={}'.format(route_path('login'), test_came_from)
123 133 response = self.app.post(
124 134 _url, {'username': 'test_admin', 'password': 'test12'}, status=302)
125 135
126 136 assert 'branch=stable' in response.location
127 137 response = response.follow()
128 138
129 139 assert response.status == '200 OK'
130 140 response.mustcontain('Users administration')
131 141
132 142 def test_redirect_to_login_with_get_args(self):
133 143 with fixture.anon_access(False):
134 144 kwargs = {'branch': 'stable'}
135 145 response = self.app.get(
136 146 h.route_path('repo_summary', repo_name=HG_REPO, _query=kwargs),
137 147 status=302)
138 148
139 149 response_query = urlparse.parse_qsl(response.location)
140 150 assert 'branch=stable' in response_query[0][1]
141 151
142 152 def test_login_form_with_get_args(self):
143 153 _url = '{}?came_from=/_admin/users,branch=stable'.format(route_path('login'))
144 154 response = self.app.get(_url)
145 155 assert 'branch%3Dstable' in response.form.action
146 156
147 157 @pytest.mark.parametrize("url_came_from", [
148 158 'data:text/html,<script>window.alert("xss")</script>',
149 159 'mailto:test@rhodecode.org',
150 160 'file:///etc/passwd',
151 161 'ftp://some.ftp.server',
152 162 'http://other.domain',
153 163 '/\r\nX-Forwarded-Host: http://example.org',
154 164 ], ids=no_newline_id_generator)
155 165 def test_login_bad_came_froms(self, url_came_from):
156 166 _url = '{}?came_from={}'.format(route_path('login'), url_came_from)
157 167 response = self.app.post(
158 168 _url,
159 169 {'username': 'test_admin', 'password': 'test12'})
160 170 assert response.status == '302 Found'
161 171 response = response.follow()
162 172 assert response.status == '200 OK'
163 173 assert response.request.path == '/'
164 174
165 175 def test_login_short_password(self):
166 176 response = self.app.post(route_path('login'),
167 177 {'username': 'test_admin',
168 178 'password': 'as'})
169 179 assert response.status == '200 OK'
170 180
171 181 response.mustcontain('Enter 3 characters or more')
172 182
173 183 def test_login_wrong_non_ascii_password(self, user_regular):
174 184 response = self.app.post(
175 185 route_path('login'),
176 186 {'username': user_regular.username,
177 187 'password': u'invalid-non-asci\xe4'.encode('utf8')})
178 188
179 189 response.mustcontain('invalid user name')
180 190 response.mustcontain('invalid password')
181 191
182 192 def test_login_with_non_ascii_password(self, user_util):
183 193 password = u'valid-non-ascii\xe4'
184 194 user = user_util.create_user(password=password)
185 195 response = self.app.post(
186 196 route_path('login'),
187 197 {'username': user.username,
188 198 'password': password.encode('utf-8')})
189 199 assert response.status_code == 302
190 200
191 201 def test_login_wrong_username_password(self):
192 202 response = self.app.post(route_path('login'),
193 203 {'username': 'error',
194 204 'password': 'test12'})
195 205
196 206 response.mustcontain('invalid user name')
197 207 response.mustcontain('invalid password')
198 208
199 209 def test_login_admin_ok_password_migration(self, real_crypto_backend):
200 210 from rhodecode.lib import auth
201 211
202 212 # create new user, with sha256 password
203 213 temp_user = 'test_admin_sha256'
204 214 user = fixture.create_user(temp_user)
205 215 user.password = auth._RhodeCodeCryptoSha256().hash_create(
206 216 b'test123')
207 217 Session().add(user)
208 218 Session().commit()
209 219 self.destroy_users.add(temp_user)
210 220 response = self.app.post(route_path('login'),
211 221 {'username': temp_user,
212 222 'password': 'test123'}, status=302)
213 223
214 224 response = response.follow()
215 225 session = response.get_session_from_response()
216 226 username = session['rhodecode_user'].get('username')
217 227 assert username == temp_user
218 228 response.mustcontain('/%s' % HG_REPO)
219 229
220 230 # new password should be bcrypted, after log-in and transfer
221 231 user = User.get_by_username(temp_user)
222 232 assert user.password.startswith('$')
223 233
224 234 # REGISTRATIONS
225 235 def test_register(self):
226 236 response = self.app.get(route_path('register'))
227 237 response.mustcontain('Create an Account')
228 238
229 239 def test_register_err_same_username(self):
230 240 uname = 'test_admin'
231 241 response = self.app.post(
232 242 route_path('register'),
233 243 {
234 244 'username': uname,
235 245 'password': 'test12',
236 246 'password_confirmation': 'test12',
237 247 'email': 'goodmail@domain.com',
238 248 'firstname': 'test',
239 249 'lastname': 'test'
240 250 }
241 251 )
242 252
243 253 assertr = response.assert_response()
244 254 msg = 'Username "%(username)s" already exists'
245 255 msg = msg % {'username': uname}
246 256 assertr.element_contains('#username+.error-message', msg)
247 257
248 258 def test_register_err_same_email(self):
249 259 response = self.app.post(
250 260 route_path('register'),
251 261 {
252 262 'username': 'test_admin_0',
253 263 'password': 'test12',
254 264 'password_confirmation': 'test12',
255 265 'email': 'test_admin@mail.com',
256 266 'firstname': 'test',
257 267 'lastname': 'test'
258 268 }
259 269 )
260 270
261 271 assertr = response.assert_response()
262 272 msg = u'This e-mail address is already taken'
263 273 assertr.element_contains('#email+.error-message', msg)
264 274
265 275 def test_register_err_same_email_case_sensitive(self):
266 276 response = self.app.post(
267 277 route_path('register'),
268 278 {
269 279 'username': 'test_admin_1',
270 280 'password': 'test12',
271 281 'password_confirmation': 'test12',
272 282 'email': 'TesT_Admin@mail.COM',
273 283 'firstname': 'test',
274 284 'lastname': 'test'
275 285 }
276 286 )
277 287 assertr = response.assert_response()
278 288 msg = u'This e-mail address is already taken'
279 289 assertr.element_contains('#email+.error-message', msg)
280 290
281 291 def test_register_err_wrong_data(self):
282 292 response = self.app.post(
283 293 route_path('register'),
284 294 {
285 295 'username': 'xs',
286 296 'password': 'test',
287 297 'password_confirmation': 'test',
288 298 'email': 'goodmailm',
289 299 'firstname': 'test',
290 300 'lastname': 'test'
291 301 }
292 302 )
293 303 assert response.status == '200 OK'
294 304 response.mustcontain('An email address must contain a single @')
295 305 response.mustcontain('Enter a value 6 characters long or more')
296 306
297 307 def test_register_err_username(self):
298 308 response = self.app.post(
299 309 route_path('register'),
300 310 {
301 311 'username': 'error user',
302 312 'password': 'test12',
303 313 'password_confirmation': 'test12',
304 314 'email': 'goodmailm',
305 315 'firstname': 'test',
306 316 'lastname': 'test'
307 317 }
308 318 )
309 319
310 320 response.mustcontain('An email address must contain a single @')
311 321 response.mustcontain(
312 322 'Username may only contain '
313 323 'alphanumeric characters underscores, '
314 324 'periods or dashes and must begin with '
315 325 'alphanumeric character')
316 326
317 327 def test_register_err_case_sensitive(self):
318 328 usr = 'Test_Admin'
319 329 response = self.app.post(
320 330 route_path('register'),
321 331 {
322 332 'username': usr,
323 333 'password': 'test12',
324 334 'password_confirmation': 'test12',
325 335 'email': 'goodmailm',
326 336 'firstname': 'test',
327 337 'lastname': 'test'
328 338 }
329 339 )
330 340
331 341 assertr = response.assert_response()
332 342 msg = u'Username "%(username)s" already exists'
333 343 msg = msg % {'username': usr}
334 344 assertr.element_contains('#username+.error-message', msg)
335 345
336 346 def test_register_special_chars(self):
337 347 response = self.app.post(
338 348 route_path('register'),
339 349 {
340 350 'username': 'xxxaxn',
341 351 'password': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
342 352 'password_confirmation': 'Δ…Δ‡ΕΊΕΌΔ…Ε›Ε›Ε›Ε›',
343 353 'email': 'goodmailm@test.plx',
344 354 'firstname': 'test',
345 355 'lastname': 'test'
346 356 }
347 357 )
348 358
349 359 msg = u'Invalid characters (non-ascii) in password'
350 360 response.mustcontain(msg)
351 361
352 362 def test_register_password_mismatch(self):
353 363 response = self.app.post(
354 364 route_path('register'),
355 365 {
356 366 'username': 'xs',
357 367 'password': '123qwe',
358 368 'password_confirmation': 'qwe123',
359 369 'email': 'goodmailm@test.plxa',
360 370 'firstname': 'test',
361 371 'lastname': 'test'
362 372 }
363 373 )
364 374 msg = u'Passwords do not match'
365 375 response.mustcontain(msg)
366 376
367 377 def test_register_ok(self):
368 378 username = 'test_regular4'
369 379 password = 'qweqwe'
370 380 email = 'marcin@test.com'
371 381 name = 'testname'
372 382 lastname = 'testlastname'
373 383
374 384 # this initializes a session
375 385 response = self.app.get(route_path('register'))
376 386 response.mustcontain('Create an Account')
377 387
378 388
379 389 response = self.app.post(
380 390 route_path('register'),
381 391 {
382 392 'username': username,
383 393 'password': password,
384 394 'password_confirmation': password,
385 395 'email': email,
386 396 'firstname': name,
387 397 'lastname': lastname,
388 398 'admin': True
389 399 },
390 400 status=302
391 401 ) # This should be overridden
392 402
393 403 assert_session_flash(
394 404 response, 'You have successfully registered with RhodeCode')
395 405
396 406 ret = Session().query(User).filter(
397 407 User.username == 'test_regular4').one()
398 408 assert ret.username == username
399 409 assert check_password(password, ret.password)
400 410 assert ret.email == email
401 411 assert ret.name == name
402 412 assert ret.lastname == lastname
403 413 assert ret.auth_tokens is not None
404 414 assert not ret.admin
405 415
406 416 def test_forgot_password_wrong_mail(self):
407 417 bad_email = 'marcin@wrongmail.org'
408 418 # this initializes a session
409 419 self.app.get(route_path('reset_password'))
410 420
411 421 response = self.app.post(
412 422 route_path('reset_password'), {'email': bad_email, }
413 423 )
414 424 assert_session_flash(response,
415 425 'If such email exists, a password reset link was sent to it.')
416 426
417 427 def test_forgot_password(self, user_util):
418 428 # this initializes a session
419 429 self.app.get(route_path('reset_password'))
420 430
421 431 user = user_util.create_user()
422 432 user_id = user.user_id
423 433 email = user.email
424 434
425 435 response = self.app.post(route_path('reset_password'), {'email': email, })
426 436
427 437 assert_session_flash(response,
428 438 'If such email exists, a password reset link was sent to it.')
429 439
430 440 # BAD KEY
431 441 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), 'badkey')
432 442 response = self.app.get(confirm_url, status=302)
433 443 assert response.location.endswith(route_path('reset_password'))
434 444 assert_session_flash(response, 'Given reset token is invalid')
435 445
436 446 response.follow() # cleanup flash
437 447
438 448 # GOOD KEY
439 449 key = UserApiKeys.query()\
440 450 .filter(UserApiKeys.user_id == user_id)\
441 451 .filter(UserApiKeys.role == UserApiKeys.ROLE_PASSWORD_RESET)\
442 452 .first()
443 453
444 454 assert key
445 455
446 456 confirm_url = '{}?key={}'.format(route_path('reset_password_confirmation'), key.api_key)
447 457 response = self.app.get(confirm_url)
448 458 assert response.status == '302 Found'
449 459 assert response.location.endswith(route_path('login'))
450 460
451 461 assert_session_flash(
452 462 response,
453 463 'Your password reset was successful, '
454 464 'a new password has been sent to your email')
455 465
456 466 response.follow()
457 467
458 468 def _get_api_whitelist(self, values=None):
459 469 config = {'api_access_controllers_whitelist': values or []}
460 470 return config
461 471
462 472 @pytest.mark.parametrize("test_name, auth_token", [
463 473 ('none', None),
464 474 ('empty_string', ''),
465 475 ('fake_number', '123456'),
466 476 ('proper_auth_token', None)
467 477 ])
468 478 def test_access_not_whitelisted_page_via_auth_token(
469 479 self, test_name, auth_token, user_admin):
470 480
471 481 whitelist = self._get_api_whitelist([])
472 482 with mock.patch.dict('rhodecode.CONFIG', whitelist):
473 483 assert [] == whitelist['api_access_controllers_whitelist']
474 484 if test_name == 'proper_auth_token':
475 485 # use builtin if api_key is None
476 486 auth_token = user_admin.api_key
477 487
478 488 with fixture.anon_access(False):
479 489 self.app.get(
480 490 route_path('repo_commit_raw',
481 491 repo_name=HG_REPO, commit_id='tip',
482 492 params=dict(api_key=auth_token)),
483 493 status=302)
484 494
485 495 @pytest.mark.parametrize("test_name, auth_token, code", [
486 496 ('none', None, 302),
487 497 ('empty_string', '', 302),
488 498 ('fake_number', '123456', 302),
489 499 ('proper_auth_token', None, 200)
490 500 ])
491 501 def test_access_whitelisted_page_via_auth_token(
492 502 self, test_name, auth_token, code, user_admin):
493 503
494 504 whitelist = self._get_api_whitelist(whitelist_view)
495 505
496 506 with mock.patch.dict('rhodecode.CONFIG', whitelist):
497 507 assert whitelist_view == whitelist['api_access_controllers_whitelist']
498 508
499 509 if test_name == 'proper_auth_token':
500 510 auth_token = user_admin.api_key
501 511 assert auth_token
502 512
503 513 with fixture.anon_access(False):
504 514 self.app.get(
505 515 route_path('repo_commit_raw',
506 516 repo_name=HG_REPO, commit_id='tip',
507 517 params=dict(api_key=auth_token)),
508 518 status=code)
509 519
510 520 @pytest.mark.parametrize("test_name, auth_token, code", [
511 521 ('proper_auth_token', None, 200),
512 522 ('wrong_auth_token', '123456', 302),
513 523 ])
514 524 def test_access_whitelisted_page_via_auth_token_bound_to_token(
515 525 self, test_name, auth_token, code, user_admin):
516 526
517 527 expected_token = auth_token
518 528 if test_name == 'proper_auth_token':
519 529 auth_token = user_admin.api_key
520 530 expected_token = auth_token
521 531 assert auth_token
522 532
523 533 whitelist = self._get_api_whitelist([
524 534 'RepoCommitsView:repo_commit_raw@{}'.format(expected_token)])
525 535
526 536 with mock.patch.dict('rhodecode.CONFIG', whitelist):
527 537
528 538 with fixture.anon_access(False):
529 539 self.app.get(
530 540 route_path('repo_commit_raw',
531 541 repo_name=HG_REPO, commit_id='tip',
532 542 params=dict(api_key=auth_token)),
533 543 status=code)
534 544
535 545 def test_access_page_via_extra_auth_token(self):
536 546 whitelist = self._get_api_whitelist(whitelist_view)
537 547 with mock.patch.dict('rhodecode.CONFIG', whitelist):
538 548 assert whitelist_view == \
539 549 whitelist['api_access_controllers_whitelist']
540 550
541 551 new_auth_token = AuthTokenModel().create(
542 552 TEST_USER_ADMIN_LOGIN, 'test')
543 553 Session().commit()
544 554 with fixture.anon_access(False):
545 555 self.app.get(
546 556 route_path('repo_commit_raw',
547 557 repo_name=HG_REPO, commit_id='tip',
548 558 params=dict(api_key=new_auth_token.api_key)),
549 559 status=200)
550 560
551 561 def test_access_page_via_expired_auth_token(self):
552 562 whitelist = self._get_api_whitelist(whitelist_view)
553 563 with mock.patch.dict('rhodecode.CONFIG', whitelist):
554 564 assert whitelist_view == \
555 565 whitelist['api_access_controllers_whitelist']
556 566
557 567 new_auth_token = AuthTokenModel().create(
558 568 TEST_USER_ADMIN_LOGIN, 'test')
559 569 Session().commit()
560 570 # patch the api key and make it expired
561 571 new_auth_token.expires = 0
562 572 Session().add(new_auth_token)
563 573 Session().commit()
564 574 with fixture.anon_access(False):
565 575 self.app.get(
566 576 route_path('repo_commit_raw',
567 577 repo_name=HG_REPO, commit_id='tip',
568 578 params=dict(api_key=new_auth_token.api_key)),
569 579 status=302)
@@ -1,186 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2012-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode authentication plugin for built in internal auth
23 23 """
24 24
25 25 import logging
26 26
27 27 import colander
28 28
29 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
30 29 from rhodecode.translation import _
31
32 from rhodecode.authentication.base import RhodeCodeAuthPluginBase, hybrid_property
33 from rhodecode.authentication.routes import AuthnPluginResourceBase
34 30 from rhodecode.lib.utils2 import safe_str
35 31 from rhodecode.model.db import User
32 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
33 from rhodecode.authentication.base import (
34 RhodeCodeAuthPluginBase, hybrid_property, HTTP_TYPE, VCS_TYPE)
35 from rhodecode.authentication.routes import AuthnPluginResourceBase
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def plugin_factory(plugin_id, *args, **kwargs):
41 41 plugin = RhodeCodeAuthPlugin(plugin_id)
42 42 return plugin
43 43
44 44
45 45 class RhodecodeAuthnResource(AuthnPluginResourceBase):
46 46 pass
47 47
48 48
49 49 class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase):
50 50 uid = 'rhodecode'
51 LOGIN_RESTRICTION_NONE = 'none'
52 LOGIN_RESTRICTION_SUPER_ADMIN = 'super_admin'
51 AUTH_RESTRICTION_NONE = 'user_all'
52 AUTH_RESTRICTION_SUPER_ADMIN = 'user_super_admin'
53 AUTH_RESTRICTION_SCOPE_ALL = 'scope_all'
54 AUTH_RESTRICTION_SCOPE_HTTP = 'scope_http'
55 AUTH_RESTRICTION_SCOPE_VCS = 'scope_vcs'
53 56
54 57 def includeme(self, config):
55 58 config.add_authn_plugin(self)
56 59 config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self))
57 60 config.add_view(
58 61 'rhodecode.authentication.views.AuthnPluginViewBase',
59 62 attr='settings_get',
60 63 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
61 64 request_method='GET',
62 65 route_name='auth_home',
63 66 context=RhodecodeAuthnResource)
64 67 config.add_view(
65 68 'rhodecode.authentication.views.AuthnPluginViewBase',
66 69 attr='settings_post',
67 70 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
68 71 request_method='POST',
69 72 route_name='auth_home',
70 73 context=RhodecodeAuthnResource)
71 74
72 75 def get_settings_schema(self):
73 76 return RhodeCodeSettingsSchema()
74 77
75 78 def get_display_name(self):
76 79 return _('RhodeCode Internal')
77 80
78 81 @classmethod
79 82 def docs(cls):
80 83 return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth.html"
81 84
82 85 @hybrid_property
83 86 def name(self):
84 87 return u"rhodecode"
85 88
86 89 def user_activation_state(self):
87 90 def_user_perms = User.get_default_user().AuthUser().permissions['global']
88 91 return 'hg.register.auto_activate' in def_user_perms
89 92
90 93 def allows_authentication_from(
91 94 self, user, allows_non_existing_user=True,
92 95 allowed_auth_plugins=None, allowed_auth_sources=None):
93 96 """
94 97 Custom method for this auth that doesn't accept non existing users.
95 98 We know that user exists in our database.
96 99 """
97 100 allows_non_existing_user = False
98 101 return super(RhodeCodeAuthPlugin, self).allows_authentication_from(
99 102 user, allows_non_existing_user=allows_non_existing_user)
100 103
101 104 def auth(self, userobj, username, password, settings, **kwargs):
102 105 if not userobj:
103 106 log.debug('userobj was:%s skipping', userobj)
104 107 return None
105 108
106 109 if userobj.extern_type != self.name:
107 log.warning(
108 "userobj:%s extern_type mismatch got:`%s` expected:`%s`",
110 log.warning("userobj:%s extern_type mismatch got:`%s` expected:`%s`",
109 111 userobj, userobj.extern_type, self.name)
110 112 return None
111 113
112 login_restriction = settings.get('login_restriction', '')
113 if login_restriction == self.LOGIN_RESTRICTION_SUPER_ADMIN and userobj.admin is False:
114 log.info(
115 "userobj:%s is not super-admin and login restriction is set to %s",
116 userobj, login_restriction)
114 # check scope of auth
115 scope_restriction = settings.get('scope_restriction', '')
116
117 if scope_restriction == self.AUTH_RESTRICTION_SCOPE_HTTP \
118 and self.auth_type != HTTP_TYPE:
119 log.warning("userobj:%s tried scope type %s and scope restriction is set to %s",
120 userobj, self.auth_type, scope_restriction)
121 return None
122
123 if scope_restriction == self.AUTH_RESTRICTION_SCOPE_VCS \
124 and self.auth_type != VCS_TYPE:
125 log.warning("userobj:%s tried scope type %s and scope restriction is set to %s",
126 userobj, self.auth_type, scope_restriction)
127 return None
128
129 # check super-admin restriction
130 auth_restriction = settings.get('auth_restriction', '')
131
132 if auth_restriction == self.AUTH_RESTRICTION_SUPER_ADMIN \
133 and userobj.admin is False:
134 log.warning("userobj:%s is not super-admin and auth restriction is set to %s",
135 userobj, auth_restriction)
117 136 return None
118 137
119 138 user_attrs = {
120 139 "username": userobj.username,
121 140 "firstname": userobj.firstname,
122 141 "lastname": userobj.lastname,
123 142 "groups": [],
124 143 'user_group_sync': False,
125 144 "email": userobj.email,
126 145 "admin": userobj.admin,
127 146 "active": userobj.active,
128 147 "active_from_extern": userobj.active,
129 148 "extern_name": userobj.user_id,
130 149 "extern_type": userobj.extern_type,
131 150 }
132 151
133 152 log.debug("User attributes:%s", user_attrs)
134 153 if userobj.active:
135 154 from rhodecode.lib import auth
136 155 crypto_backend = auth.crypto_backend()
137 156 password_encoded = safe_str(password)
138 157 password_match, new_hash = crypto_backend.hash_check_with_upgrade(
139 158 password_encoded, userobj.password or '')
140 159
141 160 if password_match and new_hash:
142 161 log.debug('user %s properly authenticated, but '
143 162 'requires hash change to bcrypt', userobj)
144 163 # if password match, and we use OLD deprecated hash,
145 164 # we should migrate this user hash password to the new hash
146 165 # we store the new returned by hash_check_with_upgrade function
147 166 user_attrs['_hash_migrate'] = new_hash
148 167
149 168 if userobj.username == User.DEFAULT_USER and userobj.active:
150 169 log.info('user `%s` authenticated correctly as anonymous user',
151 170 userobj.username)
152 171 return user_attrs
153 172
154 173 elif userobj.username == username and password_match:
155 174 log.info('user `%s` authenticated correctly', userobj.username)
156 175 return user_attrs
157 log.warn("user `%s` used a wrong password when "
176 log.warning("user `%s` used a wrong password when "
158 177 "authenticating on this plugin", userobj.username)
159 178 return None
160 179 else:
161 log.warning(
162 'user `%s` failed to authenticate via %s, reason: account not '
180 log.warning('user `%s` failed to authenticate via %s, reason: account not '
163 181 'active.', username, self.name)
164 182 return None
165 183
166 184
167 185 class RhodeCodeSettingsSchema(AuthnPluginSettingsSchemaBase):
168 login_restriction_choices = [
169 (RhodeCodeAuthPlugin.LOGIN_RESTRICTION_NONE, 'All users'),
170 (RhodeCodeAuthPlugin.LOGIN_RESTRICTION_SUPER_ADMIN, 'Super admins only')
186
187 auth_restriction_choices = [
188 (RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE, 'All users'),
189 (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SUPER_ADMIN, 'Super admins only'),
190 ]
191
192 auth_scope_choices = [
193 (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL, 'HTTP and VCS'),
194 (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_HTTP, 'HTTP only'),
171 195 ]
172 196
173 login_restriction = colander.SchemaNode(
197 auth_restriction = colander.SchemaNode(
174 198 colander.String(),
175 default=login_restriction_choices[0],
176 description=_('Choose login restrition for users.'),
177 title=_('Login restriction'),
178 validator=colander.OneOf([x[0] for x in login_restriction_choices]),
199 default=auth_restriction_choices[0],
200 description=_('Allowed user types for authentication using this plugin.'),
201 title=_('User restriction'),
202 validator=colander.OneOf([x[0] for x in auth_restriction_choices]),
179 203 widget='select_with_labels',
180 choices=login_restriction_choices
204 choices=auth_restriction_choices
205 )
206 scope_restriction = colander.SchemaNode(
207 colander.String(),
208 default=auth_scope_choices[0],
209 description=_('Allowed protocols for authentication using this plugin. '
210 'VCS means GIT/HG/SVN. HTTP is web based login.'),
211 title=_('Scope restriction'),
212 validator=colander.OneOf([x[0] for x in auth_scope_choices]),
213 widget='select_with_labels',
214 choices=auth_scope_choices
181 215 )
182 216
183 217
184 218 def includeme(config):
185 219 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
186 220 plugin_factory(plugin_id).includeme(config)
@@ -1,157 +1,177 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 RhodeCode authentication token plugin for built in internal auth
23 23 """
24 24
25 25 import logging
26 import colander
26 27
28 from rhodecode.authentication.schema import AuthnPluginSettingsSchemaBase
27 29 from rhodecode.translation import _
28 30 from rhodecode.authentication.base import (
29 31 RhodeCodeAuthPluginBase, VCS_TYPE, hybrid_property)
30 32 from rhodecode.authentication.routes import AuthnPluginResourceBase
31 33 from rhodecode.model.db import User, UserApiKeys, Repository
32 34
33 35
34 36 log = logging.getLogger(__name__)
35 37
36 38
37 39 def plugin_factory(plugin_id, *args, **kwargs):
38 40 plugin = RhodeCodeAuthPlugin(plugin_id)
39 41 return plugin
40 42
41 43
42 44 class RhodecodeAuthnResource(AuthnPluginResourceBase):
43 45 pass
44 46
45 47
46 48 class RhodeCodeAuthPlugin(RhodeCodeAuthPluginBase):
47 49 """
48 50 Enables usage of authentication tokens for vcs operations.
49 51 """
50 52 uid = 'token'
53 AUTH_RESTRICTION_SCOPE_VCS = 'scope_vcs'
51 54
52 55 def includeme(self, config):
53 56 config.add_authn_plugin(self)
54 57 config.add_authn_resource(self.get_id(), RhodecodeAuthnResource(self))
55 58 config.add_view(
56 59 'rhodecode.authentication.views.AuthnPluginViewBase',
57 60 attr='settings_get',
58 61 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
59 62 request_method='GET',
60 63 route_name='auth_home',
61 64 context=RhodecodeAuthnResource)
62 65 config.add_view(
63 66 'rhodecode.authentication.views.AuthnPluginViewBase',
64 67 attr='settings_post',
65 68 renderer='rhodecode:templates/admin/auth/plugin_settings.mako',
66 69 request_method='POST',
67 70 route_name='auth_home',
68 71 context=RhodecodeAuthnResource)
69 72
73 def get_settings_schema(self):
74 return RhodeCodeSettingsSchema()
75
70 76 def get_display_name(self):
71 77 return _('Rhodecode Token')
72 78
73 79 @classmethod
74 80 def docs(cls):
75 81 return "https://docs.rhodecode.com/RhodeCode-Enterprise/auth/auth-token.html"
76 82
77 83 @hybrid_property
78 84 def name(self):
79 85 return u"authtoken"
80 86
81 87 def user_activation_state(self):
82 88 def_user_perms = User.get_default_user().AuthUser().permissions['global']
83 89 return 'hg.register.auto_activate' in def_user_perms
84 90
85 91 def allows_authentication_from(
86 92 self, user, allows_non_existing_user=True,
87 93 allowed_auth_plugins=None, allowed_auth_sources=None):
88 94 """
89 95 Custom method for this auth that doesn't accept empty users. And also
90 96 allows users from all other active plugins to use it and also
91 97 authenticate against it. But only via vcs mode
92 98 """
93 99 from rhodecode.authentication.base import get_authn_registry
94 100 authn_registry = get_authn_registry()
95 101
96 102 active_plugins = set(
97 103 [x.name for x in authn_registry.get_plugins_for_authentication()])
98 104 active_plugins.discard(self.name)
99 105
100 106 allowed_auth_plugins = [self.name] + list(active_plugins)
101 107 # only for vcs operations
102 108 allowed_auth_sources = [VCS_TYPE]
103 109
104 110 return super(RhodeCodeAuthPlugin, self).allows_authentication_from(
105 111 user, allows_non_existing_user=False,
106 112 allowed_auth_plugins=allowed_auth_plugins,
107 113 allowed_auth_sources=allowed_auth_sources)
108 114
109 115 def auth(self, userobj, username, password, settings, **kwargs):
110 116 if not userobj:
111 117 log.debug('userobj was:%s skipping', userobj)
112 118 return None
113 119
114 120 user_attrs = {
115 121 "username": userobj.username,
116 122 "firstname": userobj.firstname,
117 123 "lastname": userobj.lastname,
118 124 "groups": [],
119 125 'user_group_sync': False,
120 126 "email": userobj.email,
121 127 "admin": userobj.admin,
122 128 "active": userobj.active,
123 129 "active_from_extern": userobj.active,
124 130 "extern_name": userobj.user_id,
125 131 "extern_type": userobj.extern_type,
126 132 }
127 133
128 134 log.debug('Authenticating user with args %s', user_attrs)
129 135 if userobj.active:
130 136 # calling context repo for token scopes
131 137 scope_repo_id = None
132 138 if self.acl_repo_name:
133 139 repo = Repository.get_by_repo_name(self.acl_repo_name)
134 140 scope_repo_id = repo.repo_id if repo else None
135 141
136 142 token_match = userobj.authenticate_by_token(
137 143 password, roles=[UserApiKeys.ROLE_VCS],
138 144 scope_repo_id=scope_repo_id)
139 145
140 146 if userobj.username == username and token_match:
141 147 log.info(
142 148 'user `%s` successfully authenticated via %s',
143 149 user_attrs['username'], self.name)
144 150 return user_attrs
145 log.warn(
146 'user `%s` failed to authenticate via %s, reason: bad or '
151 log.warning('user `%s` failed to authenticate via %s, reason: bad or '
147 152 'inactive token.', username, self.name)
148 153 else:
149 log.warning(
150 'user `%s` failed to authenticate via %s, reason: account not '
154 log.warning('user `%s` failed to authenticate via %s, reason: account not '
151 155 'active.', username, self.name)
152 156 return None
153 157
154 158
155 159 def includeme(config):
156 160 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(RhodeCodeAuthPlugin.uid)
157 161 plugin_factory(plugin_id).includeme(config)
162
163
164 class RhodeCodeSettingsSchema(AuthnPluginSettingsSchemaBase):
165 auth_scope_choices = [
166 (RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_VCS, 'VCS only'),
167 ]
168
169 scope_restriction = colander.SchemaNode(
170 colander.String(),
171 default=auth_scope_choices[0],
172 description=_('Choose operation scope restriction when authenticating.'),
173 title=_('Scope restriction'),
174 validator=colander.OneOf([x[0] for x in auth_scope_choices]),
175 widget='select_with_labels',
176 choices=auth_scope_choices
177 )
@@ -1,383 +1,415 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Helpers for fixture generation
23 23 """
24 24
25 25 import os
26 26 import time
27 27 import tempfile
28 28 import shutil
29 29
30 30 import configobj
31 31
32 32 from rhodecode.tests import *
33 33 from rhodecode.model.db import Repository, User, RepoGroup, UserGroup, Gist, UserEmailMap
34 34 from rhodecode.model.meta import Session
35 35 from rhodecode.model.repo import RepoModel
36 36 from rhodecode.model.user import UserModel
37 37 from rhodecode.model.repo_group import RepoGroupModel
38 38 from rhodecode.model.user_group import UserGroupModel
39 39 from rhodecode.model.gist import GistModel
40 40 from rhodecode.model.auth_token import AuthTokenModel
41 41 from rhodecode.authentication.plugins.auth_rhodecode import \
42 42 RhodeCodeAuthPlugin
43 43
44 44 dn = os.path.dirname
45 45 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'tests', 'fixtures')
46 46
47 47
48 48 def error_function(*args, **kwargs):
49 49 raise Exception('Total Crash !')
50 50
51 51
52 52 class TestINI(object):
53 53 """
54 54 Allows to create a new test.ini file as a copy of existing one with edited
55 55 data. Example usage::
56 56
57 57 with TestINI('test.ini', [{'section':{'key':val'}]) as new_test_ini_path:
58 58 print('paster server %s' % new_test_ini)
59 59 """
60 60
61 61 def __init__(self, ini_file_path, ini_params, new_file_prefix='DEFAULT',
62 62 destroy=True, dir=None):
63 63 self.ini_file_path = ini_file_path
64 64 self.ini_params = ini_params
65 65 self.new_path = None
66 66 self.new_path_prefix = new_file_prefix
67 67 self._destroy = destroy
68 68 self._dir = dir
69 69
70 70 def __enter__(self):
71 71 return self.create()
72 72
73 73 def __exit__(self, exc_type, exc_val, exc_tb):
74 74 self.destroy()
75 75
76 76 def create(self):
77 77 config = configobj.ConfigObj(
78 78 self.ini_file_path, file_error=True, write_empty_values=True)
79 79
80 80 for data in self.ini_params:
81 81 section, ini_params = data.items()[0]
82 82 for key, val in ini_params.items():
83 83 config[section][key] = val
84 84 with tempfile.NamedTemporaryFile(
85 85 prefix=self.new_path_prefix, suffix='.ini', dir=self._dir,
86 86 delete=False) as new_ini_file:
87 87 config.write(new_ini_file)
88 88 self.new_path = new_ini_file.name
89 89
90 90 return self.new_path
91 91
92 92 def destroy(self):
93 93 if self._destroy:
94 94 os.remove(self.new_path)
95 95
96 96
97 97 class Fixture(object):
98 98
99 99 def anon_access(self, status):
100 100 """
101 101 Context process for disabling anonymous access. use like:
102 102 fixture = Fixture()
103 103 with fixture.anon_access(False):
104 104 #tests
105 105
106 106 after this block anon access will be set to `not status`
107 107 """
108 108
109 109 class context(object):
110 110 def __enter__(self):
111 111 anon = User.get_default_user()
112 112 anon.active = status
113 113 Session().add(anon)
114 114 Session().commit()
115 115 time.sleep(1.5) # must sleep for cache (1s to expire)
116 116
117 117 def __exit__(self, exc_type, exc_val, exc_tb):
118 118 anon = User.get_default_user()
119 119 anon.active = not status
120 120 Session().add(anon)
121 121 Session().commit()
122 122
123 123 return context()
124 124
125 def login_restriction(self, login_restriction):
125 def auth_restriction(self, auth_restriction):
126 126 """
127 Context process for changing the builtin rhodecode plugin login restrictions.
127 Context process for changing the builtin rhodecode plugin auth restrictions.
128 128 Use like:
129 129 fixture = Fixture()
130 with fixture.login_restriction('super_admin'):
130 with fixture.auth_restriction('super_admin'):
131 131 #tests
132 132
133 after this block login restriction will be taken off
133 after this block auth restriction will be taken off
134 134 """
135 135
136 136 class context(object):
137 137 def _get_pluing(self):
138 138 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(
139 139 RhodeCodeAuthPlugin.uid)
140 140 plugin = RhodeCodeAuthPlugin(plugin_id)
141 141 return plugin
142 142
143 143 def __enter__(self):
144 144 plugin = self._get_pluing()
145 145 plugin.create_or_update_setting(
146 'login_restriction', login_restriction)
146 'auth_restriction', auth_restriction)
147 147 Session().commit()
148 148
149 149 def __exit__(self, exc_type, exc_val, exc_tb):
150 150 plugin = self._get_pluing()
151 151 plugin.create_or_update_setting(
152 'login_restriction', RhodeCodeAuthPlugin.LOGIN_RESTRICTION_NONE)
152 'auth_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_NONE)
153 Session().commit()
154
155 return context()
156
157 def scope_restriction(self, scope_restriction):
158 """
159 Context process for changing the builtin rhodecode plugin scope restrictions.
160 Use like:
161 fixture = Fixture()
162 with fixture.scope_restriction('scope_http'):
163 #tests
164
165 after this block scope restriction will be taken off
166 """
167
168 class context(object):
169 def _get_pluing(self):
170 plugin_id = 'egg:rhodecode-enterprise-ce#{}'.format(
171 RhodeCodeAuthPlugin.uid)
172 plugin = RhodeCodeAuthPlugin(plugin_id)
173 return plugin
174
175 def __enter__(self):
176 plugin = self._get_pluing()
177 plugin.create_or_update_setting(
178 'scope_restriction', scope_restriction)
179 Session().commit()
180
181 def __exit__(self, exc_type, exc_val, exc_tb):
182 plugin = self._get_pluing()
183 plugin.create_or_update_setting(
184 'scope_restriction', RhodeCodeAuthPlugin.AUTH_RESTRICTION_SCOPE_ALL)
153 185 Session().commit()
154 186
155 187 return context()
156 188
157 189 def _get_repo_create_params(self, **custom):
158 190 defs = {
159 191 'repo_name': None,
160 192 'repo_type': 'hg',
161 193 'clone_uri': '',
162 194 'push_uri': '',
163 195 'repo_group': '-1',
164 196 'repo_description': 'DESC',
165 197 'repo_private': False,
166 198 'repo_landing_rev': 'rev:tip',
167 199 'repo_copy_permissions': False,
168 200 'repo_state': Repository.STATE_CREATED,
169 201 }
170 202 defs.update(custom)
171 203 if 'repo_name_full' not in custom:
172 204 defs.update({'repo_name_full': defs['repo_name']})
173 205
174 206 # fix the repo name if passed as repo_name_full
175 207 if defs['repo_name']:
176 208 defs['repo_name'] = defs['repo_name'].split('/')[-1]
177 209
178 210 return defs
179 211
180 212 def _get_group_create_params(self, **custom):
181 213 defs = {
182 214 'group_name': None,
183 215 'group_description': 'DESC',
184 216 'perm_updates': [],
185 217 'perm_additions': [],
186 218 'perm_deletions': [],
187 219 'group_parent_id': -1,
188 220 'enable_locking': False,
189 221 'recursive': False,
190 222 }
191 223 defs.update(custom)
192 224
193 225 return defs
194 226
195 227 def _get_user_create_params(self, name, **custom):
196 228 defs = {
197 229 'username': name,
198 230 'password': 'qweqwe',
199 231 'email': '%s+test@rhodecode.org' % name,
200 232 'firstname': 'TestUser',
201 233 'lastname': 'Test',
202 234 'active': True,
203 235 'admin': False,
204 236 'extern_type': 'rhodecode',
205 237 'extern_name': None,
206 238 }
207 239 defs.update(custom)
208 240
209 241 return defs
210 242
211 243 def _get_user_group_create_params(self, name, **custom):
212 244 defs = {
213 245 'users_group_name': name,
214 246 'user_group_description': 'DESC',
215 247 'users_group_active': True,
216 248 'user_group_data': {},
217 249 }
218 250 defs.update(custom)
219 251
220 252 return defs
221 253
222 254 def create_repo(self, name, **kwargs):
223 255 repo_group = kwargs.get('repo_group')
224 256 if isinstance(repo_group, RepoGroup):
225 257 kwargs['repo_group'] = repo_group.group_id
226 258 name = name.split(Repository.NAME_SEP)[-1]
227 259 name = Repository.NAME_SEP.join((repo_group.group_name, name))
228 260
229 261 if 'skip_if_exists' in kwargs:
230 262 del kwargs['skip_if_exists']
231 263 r = Repository.get_by_repo_name(name)
232 264 if r:
233 265 return r
234 266
235 267 form_data = self._get_repo_create_params(repo_name=name, **kwargs)
236 268 cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
237 269 RepoModel().create(form_data, cur_user)
238 270 Session().commit()
239 271 repo = Repository.get_by_repo_name(name)
240 272 assert repo
241 273 return repo
242 274
243 275 def create_fork(self, repo_to_fork, fork_name, **kwargs):
244 276 repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
245 277
246 278 form_data = self._get_repo_create_params(repo_name=fork_name,
247 279 fork_parent_id=repo_to_fork.repo_id,
248 280 repo_type=repo_to_fork.repo_type,
249 281 **kwargs)
250 282 #TODO: fix it !!
251 283 form_data['description'] = form_data['repo_description']
252 284 form_data['private'] = form_data['repo_private']
253 285 form_data['landing_rev'] = form_data['repo_landing_rev']
254 286
255 287 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
256 288 RepoModel().create_fork(form_data, cur_user=owner)
257 289 Session().commit()
258 290 r = Repository.get_by_repo_name(fork_name)
259 291 assert r
260 292 return r
261 293
262 294 def destroy_repo(self, repo_name, **kwargs):
263 295 RepoModel().delete(repo_name, pull_requests='delete', **kwargs)
264 296 Session().commit()
265 297
266 298 def destroy_repo_on_filesystem(self, repo_name):
267 299 rm_path = os.path.join(RepoModel().repos_path, repo_name)
268 300 if os.path.isdir(rm_path):
269 301 shutil.rmtree(rm_path)
270 302
271 303 def create_repo_group(self, name, **kwargs):
272 304 if 'skip_if_exists' in kwargs:
273 305 del kwargs['skip_if_exists']
274 306 gr = RepoGroup.get_by_group_name(group_name=name)
275 307 if gr:
276 308 return gr
277 309 form_data = self._get_group_create_params(group_name=name, **kwargs)
278 310 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
279 311 gr = RepoGroupModel().create(
280 312 group_name=form_data['group_name'],
281 313 group_description=form_data['group_name'],
282 314 owner=owner)
283 315 Session().commit()
284 316 gr = RepoGroup.get_by_group_name(gr.group_name)
285 317 return gr
286 318
287 319 def destroy_repo_group(self, repogroupid):
288 320 RepoGroupModel().delete(repogroupid)
289 321 Session().commit()
290 322
291 323 def create_user(self, name, **kwargs):
292 324 if 'skip_if_exists' in kwargs:
293 325 del kwargs['skip_if_exists']
294 326 user = User.get_by_username(name)
295 327 if user:
296 328 return user
297 329 form_data = self._get_user_create_params(name, **kwargs)
298 330 user = UserModel().create(form_data)
299 331
300 332 # create token for user
301 333 AuthTokenModel().create(
302 334 user=user, description=u'TEST_USER_TOKEN')
303 335
304 336 Session().commit()
305 337 user = User.get_by_username(user.username)
306 338 return user
307 339
308 340 def destroy_user(self, userid):
309 341 UserModel().delete(userid)
310 342 Session().commit()
311 343
312 344 def create_additional_user_email(self, user, email):
313 345 uem = UserEmailMap()
314 346 uem.user = user
315 347 uem.email = email
316 348 Session().add(uem)
317 349 return uem
318 350
319 351 def destroy_users(self, userid_iter):
320 352 for user_id in userid_iter:
321 353 if User.get_by_username(user_id):
322 354 UserModel().delete(user_id)
323 355 Session().commit()
324 356
325 357 def create_user_group(self, name, **kwargs):
326 358 if 'skip_if_exists' in kwargs:
327 359 del kwargs['skip_if_exists']
328 360 gr = UserGroup.get_by_group_name(group_name=name)
329 361 if gr:
330 362 return gr
331 363 # map active flag to the real attribute. For API consistency of fixtures
332 364 if 'active' in kwargs:
333 365 kwargs['users_group_active'] = kwargs['active']
334 366 del kwargs['active']
335 367 form_data = self._get_user_group_create_params(name, **kwargs)
336 368 owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
337 369 user_group = UserGroupModel().create(
338 370 name=form_data['users_group_name'],
339 371 description=form_data['user_group_description'],
340 372 owner=owner, active=form_data['users_group_active'],
341 373 group_data=form_data['user_group_data'])
342 374 Session().commit()
343 375 user_group = UserGroup.get_by_group_name(user_group.users_group_name)
344 376 return user_group
345 377
346 378 def destroy_user_group(self, usergroupid):
347 379 UserGroupModel().delete(user_group=usergroupid, force=True)
348 380 Session().commit()
349 381
350 382 def create_gist(self, **kwargs):
351 383 form_data = {
352 384 'description': 'new-gist',
353 385 'owner': TEST_USER_ADMIN_LOGIN,
354 386 'gist_type': GistModel.cls.GIST_PUBLIC,
355 387 'lifetime': -1,
356 388 'acl_level': Gist.ACL_LEVEL_PUBLIC,
357 389 'gist_mapping': {'filename1.txt': {'content': 'hello world'},}
358 390 }
359 391 form_data.update(kwargs)
360 392 gist = GistModel().create(
361 393 description=form_data['description'], owner=form_data['owner'],
362 394 gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
363 395 lifetime=form_data['lifetime'], gist_acl_level=form_data['acl_level']
364 396 )
365 397 Session().commit()
366 398 return gist
367 399
368 400 def destroy_gists(self, gistid=None):
369 401 for g in GistModel.cls.get_all():
370 402 if gistid:
371 403 if gistid == g.gist_access_id:
372 404 GistModel().delete(g)
373 405 else:
374 406 GistModel().delete(g)
375 407 Session().commit()
376 408
377 409 def load_resource(self, resource_name, strip=False):
378 410 with open(os.path.join(FIXTURES, resource_name)) as f:
379 411 source = f.read()
380 412 if strip:
381 413 source = source.strip()
382 414
383 415 return source
General Comments 0
You need to be logged in to leave comments. Login now