##// END OF EJS Templates
security: use 404 instead of 403 code on permission decorator to prevent resource discovery attacks.
ergo -
r1817:7df55c97 default
parent child Browse files
Show More
@@ -1,2023 +1,2023 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 authentication and permission libraries
23 23 """
24 24
25 25 import os
26 26 import inspect
27 27 import collections
28 28 import fnmatch
29 29 import hashlib
30 30 import itertools
31 31 import logging
32 32 import random
33 33 import traceback
34 34 from functools import wraps
35 35
36 36 import ipaddress
37 from pyramid.httpexceptions import HTTPForbidden, HTTPFound
37 from pyramid.httpexceptions import HTTPForbidden, HTTPFound, HTTPNotFound
38 38 from pylons.i18n.translation import _
39 39 # NOTE(marcink): this has to be removed only after pyramid migration,
40 40 # replace with _ = request.translate
41 41 from sqlalchemy.orm.exc import ObjectDeletedError
42 42 from sqlalchemy.orm import joinedload
43 43 from zope.cachedescriptors.property import Lazy as LazyProperty
44 44
45 45 import rhodecode
46 46 from rhodecode.model import meta
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.user import UserModel
49 49 from rhodecode.model.db import (
50 50 User, Repository, Permission, UserToPerm, UserGroupToPerm, UserGroupMember,
51 51 UserIpMap, UserApiKeys, RepoGroup)
52 52 from rhodecode.lib import caches
53 53 from rhodecode.lib.utils2 import safe_unicode, aslist, safe_str, md5
54 54 from rhodecode.lib.utils import (
55 55 get_repo_slug, get_repo_group_slug, get_user_group_slug)
56 56 from rhodecode.lib.caching_query import FromCache
57 57
58 58
59 59 if rhodecode.is_unix:
60 60 import bcrypt
61 61
62 62 log = logging.getLogger(__name__)
63 63
64 64 csrf_token_key = "csrf_token"
65 65
66 66
67 67 class PasswordGenerator(object):
68 68 """
69 69 This is a simple class for generating password from different sets of
70 70 characters
71 71 usage::
72 72
73 73 passwd_gen = PasswordGenerator()
74 74 #print 8-letter password containing only big and small letters
75 75 of alphabet
76 76 passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
77 77 """
78 78 ALPHABETS_NUM = r'''1234567890'''
79 79 ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
80 80 ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
81 81 ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
82 82 ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
83 83 + ALPHABETS_NUM + ALPHABETS_SPECIAL
84 84 ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
85 85 ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
86 86 ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
87 87 ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
88 88
89 89 def __init__(self, passwd=''):
90 90 self.passwd = passwd
91 91
92 92 def gen_password(self, length, type_=None):
93 93 if type_ is None:
94 94 type_ = self.ALPHABETS_FULL
95 95 self.passwd = ''.join([random.choice(type_) for _ in xrange(length)])
96 96 return self.passwd
97 97
98 98
99 99 class _RhodeCodeCryptoBase(object):
100 100 ENC_PREF = None
101 101
102 102 def hash_create(self, str_):
103 103 """
104 104 hash the string using
105 105
106 106 :param str_: password to hash
107 107 """
108 108 raise NotImplementedError
109 109
110 110 def hash_check_with_upgrade(self, password, hashed):
111 111 """
112 112 Returns tuple in which first element is boolean that states that
113 113 given password matches it's hashed version, and the second is new hash
114 114 of the password, in case this password should be migrated to new
115 115 cipher.
116 116 """
117 117 checked_hash = self.hash_check(password, hashed)
118 118 return checked_hash, None
119 119
120 120 def hash_check(self, password, hashed):
121 121 """
122 122 Checks matching password with it's hashed value.
123 123
124 124 :param password: password
125 125 :param hashed: password in hashed form
126 126 """
127 127 raise NotImplementedError
128 128
129 129 def _assert_bytes(self, value):
130 130 """
131 131 Passing in an `unicode` object can lead to hard to detect issues
132 132 if passwords contain non-ascii characters. Doing a type check
133 133 during runtime, so that such mistakes are detected early on.
134 134 """
135 135 if not isinstance(value, str):
136 136 raise TypeError(
137 137 "Bytestring required as input, got %r." % (value, ))
138 138
139 139
140 140 class _RhodeCodeCryptoBCrypt(_RhodeCodeCryptoBase):
141 141 ENC_PREF = ('$2a$10', '$2b$10')
142 142
143 143 def hash_create(self, str_):
144 144 self._assert_bytes(str_)
145 145 return bcrypt.hashpw(str_, bcrypt.gensalt(10))
146 146
147 147 def hash_check_with_upgrade(self, password, hashed):
148 148 """
149 149 Returns tuple in which first element is boolean that states that
150 150 given password matches it's hashed version, and the second is new hash
151 151 of the password, in case this password should be migrated to new
152 152 cipher.
153 153
154 154 This implements special upgrade logic which works like that:
155 155 - check if the given password == bcrypted hash, if yes then we
156 156 properly used password and it was already in bcrypt. Proceed
157 157 without any changes
158 158 - if bcrypt hash check is not working try with sha256. If hash compare
159 159 is ok, it means we using correct but old hashed password. indicate
160 160 hash change and proceed
161 161 """
162 162
163 163 new_hash = None
164 164
165 165 # regular pw check
166 166 password_match_bcrypt = self.hash_check(password, hashed)
167 167
168 168 # now we want to know if the password was maybe from sha256
169 169 # basically calling _RhodeCodeCryptoSha256().hash_check()
170 170 if not password_match_bcrypt:
171 171 if _RhodeCodeCryptoSha256().hash_check(password, hashed):
172 172 new_hash = self.hash_create(password) # make new bcrypt hash
173 173 password_match_bcrypt = True
174 174
175 175 return password_match_bcrypt, new_hash
176 176
177 177 def hash_check(self, password, hashed):
178 178 """
179 179 Checks matching password with it's hashed value.
180 180
181 181 :param password: password
182 182 :param hashed: password in hashed form
183 183 """
184 184 self._assert_bytes(password)
185 185 try:
186 186 return bcrypt.hashpw(password, hashed) == hashed
187 187 except ValueError as e:
188 188 # we're having a invalid salt here probably, we should not crash
189 189 # just return with False as it would be a wrong password.
190 190 log.debug('Failed to check password hash using bcrypt %s',
191 191 safe_str(e))
192 192
193 193 return False
194 194
195 195
196 196 class _RhodeCodeCryptoSha256(_RhodeCodeCryptoBase):
197 197 ENC_PREF = '_'
198 198
199 199 def hash_create(self, str_):
200 200 self._assert_bytes(str_)
201 201 return hashlib.sha256(str_).hexdigest()
202 202
203 203 def hash_check(self, password, hashed):
204 204 """
205 205 Checks matching password with it's hashed value.
206 206
207 207 :param password: password
208 208 :param hashed: password in hashed form
209 209 """
210 210 self._assert_bytes(password)
211 211 return hashlib.sha256(password).hexdigest() == hashed
212 212
213 213
214 214 class _RhodeCodeCryptoMd5(_RhodeCodeCryptoBase):
215 215 ENC_PREF = '_'
216 216
217 217 def hash_create(self, str_):
218 218 self._assert_bytes(str_)
219 219 return hashlib.md5(str_).hexdigest()
220 220
221 221 def hash_check(self, password, hashed):
222 222 """
223 223 Checks matching password with it's hashed value.
224 224
225 225 :param password: password
226 226 :param hashed: password in hashed form
227 227 """
228 228 self._assert_bytes(password)
229 229 return hashlib.md5(password).hexdigest() == hashed
230 230
231 231
232 232 def crypto_backend():
233 233 """
234 234 Return the matching crypto backend.
235 235
236 236 Selection is based on if we run tests or not, we pick md5 backend to run
237 237 tests faster since BCRYPT is expensive to calculate
238 238 """
239 239 if rhodecode.is_test:
240 240 RhodeCodeCrypto = _RhodeCodeCryptoMd5()
241 241 else:
242 242 RhodeCodeCrypto = _RhodeCodeCryptoBCrypt()
243 243
244 244 return RhodeCodeCrypto
245 245
246 246
247 247 def get_crypt_password(password):
248 248 """
249 249 Create the hash of `password` with the active crypto backend.
250 250
251 251 :param password: The cleartext password.
252 252 :type password: unicode
253 253 """
254 254 password = safe_str(password)
255 255 return crypto_backend().hash_create(password)
256 256
257 257
258 258 def check_password(password, hashed):
259 259 """
260 260 Check if the value in `password` matches the hash in `hashed`.
261 261
262 262 :param password: The cleartext password.
263 263 :type password: unicode
264 264
265 265 :param hashed: The expected hashed version of the password.
266 266 :type hashed: The hash has to be passed in in text representation.
267 267 """
268 268 password = safe_str(password)
269 269 return crypto_backend().hash_check(password, hashed)
270 270
271 271
272 272 def generate_auth_token(data, salt=None):
273 273 """
274 274 Generates API KEY from given string
275 275 """
276 276
277 277 if salt is None:
278 278 salt = os.urandom(16)
279 279 return hashlib.sha1(safe_str(data) + salt).hexdigest()
280 280
281 281
282 282 class CookieStoreWrapper(object):
283 283
284 284 def __init__(self, cookie_store):
285 285 self.cookie_store = cookie_store
286 286
287 287 def __repr__(self):
288 288 return 'CookieStore<%s>' % (self.cookie_store)
289 289
290 290 def get(self, key, other=None):
291 291 if isinstance(self.cookie_store, dict):
292 292 return self.cookie_store.get(key, other)
293 293 elif isinstance(self.cookie_store, AuthUser):
294 294 return self.cookie_store.__dict__.get(key, other)
295 295
296 296
297 297 def _cached_perms_data(user_id, scope, user_is_admin,
298 298 user_inherit_default_permissions, explicit, algo):
299 299
300 300 permissions = PermissionCalculator(
301 301 user_id, scope, user_is_admin, user_inherit_default_permissions,
302 302 explicit, algo)
303 303 return permissions.calculate()
304 304
305 305
306 306 class PermOrigin(object):
307 307 ADMIN = 'superadmin'
308 308
309 309 REPO_USER = 'user:%s'
310 310 REPO_USERGROUP = 'usergroup:%s'
311 311 REPO_OWNER = 'repo.owner'
312 312 REPO_DEFAULT = 'repo.default'
313 313 REPO_PRIVATE = 'repo.private'
314 314
315 315 REPOGROUP_USER = 'user:%s'
316 316 REPOGROUP_USERGROUP = 'usergroup:%s'
317 317 REPOGROUP_OWNER = 'group.owner'
318 318 REPOGROUP_DEFAULT = 'group.default'
319 319
320 320 USERGROUP_USER = 'user:%s'
321 321 USERGROUP_USERGROUP = 'usergroup:%s'
322 322 USERGROUP_OWNER = 'usergroup.owner'
323 323 USERGROUP_DEFAULT = 'usergroup.default'
324 324
325 325
326 326 class PermOriginDict(dict):
327 327 """
328 328 A special dict used for tracking permissions along with their origins.
329 329
330 330 `__setitem__` has been overridden to expect a tuple(perm, origin)
331 331 `__getitem__` will return only the perm
332 332 `.perm_origin_stack` will return the stack of (perm, origin) set per key
333 333
334 334 >>> perms = PermOriginDict()
335 335 >>> perms['resource'] = 'read', 'default'
336 336 >>> perms['resource']
337 337 'read'
338 338 >>> perms['resource'] = 'write', 'admin'
339 339 >>> perms['resource']
340 340 'write'
341 341 >>> perms.perm_origin_stack
342 342 {'resource': [('read', 'default'), ('write', 'admin')]}
343 343 """
344 344
345 345 def __init__(self, *args, **kw):
346 346 dict.__init__(self, *args, **kw)
347 347 self.perm_origin_stack = {}
348 348
349 349 def __setitem__(self, key, (perm, origin)):
350 350 self.perm_origin_stack.setdefault(key, []).append((perm, origin))
351 351 dict.__setitem__(self, key, perm)
352 352
353 353
354 354 class PermissionCalculator(object):
355 355
356 356 def __init__(
357 357 self, user_id, scope, user_is_admin,
358 358 user_inherit_default_permissions, explicit, algo):
359 359 self.user_id = user_id
360 360 self.user_is_admin = user_is_admin
361 361 self.inherit_default_permissions = user_inherit_default_permissions
362 362 self.explicit = explicit
363 363 self.algo = algo
364 364
365 365 scope = scope or {}
366 366 self.scope_repo_id = scope.get('repo_id')
367 367 self.scope_repo_group_id = scope.get('repo_group_id')
368 368 self.scope_user_group_id = scope.get('user_group_id')
369 369
370 370 self.default_user_id = User.get_default_user(cache=True).user_id
371 371
372 372 self.permissions_repositories = PermOriginDict()
373 373 self.permissions_repository_groups = PermOriginDict()
374 374 self.permissions_user_groups = PermOriginDict()
375 375 self.permissions_global = set()
376 376
377 377 self.default_repo_perms = Permission.get_default_repo_perms(
378 378 self.default_user_id, self.scope_repo_id)
379 379 self.default_repo_groups_perms = Permission.get_default_group_perms(
380 380 self.default_user_id, self.scope_repo_group_id)
381 381 self.default_user_group_perms = \
382 382 Permission.get_default_user_group_perms(
383 383 self.default_user_id, self.scope_user_group_id)
384 384
385 385 def calculate(self):
386 386 if self.user_is_admin:
387 387 return self._admin_permissions()
388 388
389 389 self._calculate_global_default_permissions()
390 390 self._calculate_global_permissions()
391 391 self._calculate_default_permissions()
392 392 self._calculate_repository_permissions()
393 393 self._calculate_repository_group_permissions()
394 394 self._calculate_user_group_permissions()
395 395 return self._permission_structure()
396 396
397 397 def _admin_permissions(self):
398 398 """
399 399 admin user have all default rights for repositories
400 400 and groups set to admin
401 401 """
402 402 self.permissions_global.add('hg.admin')
403 403 self.permissions_global.add('hg.create.write_on_repogroup.true')
404 404
405 405 # repositories
406 406 for perm in self.default_repo_perms:
407 407 r_k = perm.UserRepoToPerm.repository.repo_name
408 408 p = 'repository.admin'
409 409 self.permissions_repositories[r_k] = p, PermOrigin.ADMIN
410 410
411 411 # repository groups
412 412 for perm in self.default_repo_groups_perms:
413 413 rg_k = perm.UserRepoGroupToPerm.group.group_name
414 414 p = 'group.admin'
415 415 self.permissions_repository_groups[rg_k] = p, PermOrigin.ADMIN
416 416
417 417 # user groups
418 418 for perm in self.default_user_group_perms:
419 419 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
420 420 p = 'usergroup.admin'
421 421 self.permissions_user_groups[u_k] = p, PermOrigin.ADMIN
422 422
423 423 return self._permission_structure()
424 424
425 425 def _calculate_global_default_permissions(self):
426 426 """
427 427 global permissions taken from the default user
428 428 """
429 429 default_global_perms = UserToPerm.query()\
430 430 .filter(UserToPerm.user_id == self.default_user_id)\
431 431 .options(joinedload(UserToPerm.permission))
432 432
433 433 for perm in default_global_perms:
434 434 self.permissions_global.add(perm.permission.permission_name)
435 435
436 436 def _calculate_global_permissions(self):
437 437 """
438 438 Set global system permissions with user permissions or permissions
439 439 taken from the user groups of the current user.
440 440
441 441 The permissions include repo creating, repo group creating, forking
442 442 etc.
443 443 """
444 444
445 445 # now we read the defined permissions and overwrite what we have set
446 446 # before those can be configured from groups or users explicitly.
447 447
448 448 # TODO: johbo: This seems to be out of sync, find out the reason
449 449 # for the comment below and update it.
450 450
451 451 # In case we want to extend this list we should be always in sync with
452 452 # User.DEFAULT_USER_PERMISSIONS definitions
453 453 _configurable = frozenset([
454 454 'hg.fork.none', 'hg.fork.repository',
455 455 'hg.create.none', 'hg.create.repository',
456 456 'hg.usergroup.create.false', 'hg.usergroup.create.true',
457 457 'hg.repogroup.create.false', 'hg.repogroup.create.true',
458 458 'hg.create.write_on_repogroup.false',
459 459 'hg.create.write_on_repogroup.true',
460 460 'hg.inherit_default_perms.false', 'hg.inherit_default_perms.true'
461 461 ])
462 462
463 463 # USER GROUPS comes first user group global permissions
464 464 user_perms_from_users_groups = Session().query(UserGroupToPerm)\
465 465 .options(joinedload(UserGroupToPerm.permission))\
466 466 .join((UserGroupMember, UserGroupToPerm.users_group_id ==
467 467 UserGroupMember.users_group_id))\
468 468 .filter(UserGroupMember.user_id == self.user_id)\
469 469 .order_by(UserGroupToPerm.users_group_id)\
470 470 .all()
471 471
472 472 # need to group here by groups since user can be in more than
473 473 # one group, so we get all groups
474 474 _explicit_grouped_perms = [
475 475 [x, list(y)] for x, y in
476 476 itertools.groupby(user_perms_from_users_groups,
477 477 lambda _x: _x.users_group)]
478 478
479 479 for gr, perms in _explicit_grouped_perms:
480 480 # since user can be in multiple groups iterate over them and
481 481 # select the lowest permissions first (more explicit)
482 482 # TODO: marcink: do this^^
483 483
484 484 # group doesn't inherit default permissions so we actually set them
485 485 if not gr.inherit_default_permissions:
486 486 # NEED TO IGNORE all previously set configurable permissions
487 487 # and replace them with explicitly set from this user
488 488 # group permissions
489 489 self.permissions_global = self.permissions_global.difference(
490 490 _configurable)
491 491 for perm in perms:
492 492 self.permissions_global.add(perm.permission.permission_name)
493 493
494 494 # user explicit global permissions
495 495 user_perms = Session().query(UserToPerm)\
496 496 .options(joinedload(UserToPerm.permission))\
497 497 .filter(UserToPerm.user_id == self.user_id).all()
498 498
499 499 if not self.inherit_default_permissions:
500 500 # NEED TO IGNORE all configurable permissions and
501 501 # replace them with explicitly set from this user permissions
502 502 self.permissions_global = self.permissions_global.difference(
503 503 _configurable)
504 504 for perm in user_perms:
505 505 self.permissions_global.add(perm.permission.permission_name)
506 506
507 507 def _calculate_default_permissions(self):
508 508 """
509 509 Set default user permissions for repositories, repository groups
510 510 taken from the default user.
511 511
512 512 Calculate inheritance of object permissions based on what we have now
513 513 in GLOBAL permissions. We check if .false is in GLOBAL since this is
514 514 explicitly set. Inherit is the opposite of .false being there.
515 515
516 516 .. note::
517 517
518 518 the syntax is little bit odd but what we need to check here is
519 519 the opposite of .false permission being in the list so even for
520 520 inconsistent state when both .true/.false is there
521 521 .false is more important
522 522
523 523 """
524 524 user_inherit_object_permissions = not ('hg.inherit_default_perms.false'
525 525 in self.permissions_global)
526 526
527 527 # defaults for repositories, taken from `default` user permissions
528 528 # on given repo
529 529 for perm in self.default_repo_perms:
530 530 r_k = perm.UserRepoToPerm.repository.repo_name
531 531 o = PermOrigin.REPO_DEFAULT
532 532 if perm.Repository.private and not (
533 533 perm.Repository.user_id == self.user_id):
534 534 # disable defaults for private repos,
535 535 p = 'repository.none'
536 536 o = PermOrigin.REPO_PRIVATE
537 537 elif perm.Repository.user_id == self.user_id:
538 538 # set admin if owner
539 539 p = 'repository.admin'
540 540 o = PermOrigin.REPO_OWNER
541 541 else:
542 542 p = perm.Permission.permission_name
543 543 # if we decide this user isn't inheriting permissions from
544 544 # default user we set him to .none so only explicit
545 545 # permissions work
546 546 if not user_inherit_object_permissions:
547 547 p = 'repository.none'
548 548 self.permissions_repositories[r_k] = p, o
549 549
550 550 # defaults for repository groups taken from `default` user permission
551 551 # on given group
552 552 for perm in self.default_repo_groups_perms:
553 553 rg_k = perm.UserRepoGroupToPerm.group.group_name
554 554 o = PermOrigin.REPOGROUP_DEFAULT
555 555 if perm.RepoGroup.user_id == self.user_id:
556 556 # set admin if owner
557 557 p = 'group.admin'
558 558 o = PermOrigin.REPOGROUP_OWNER
559 559 else:
560 560 p = perm.Permission.permission_name
561 561
562 562 # if we decide this user isn't inheriting permissions from default
563 563 # user we set him to .none so only explicit permissions work
564 564 if not user_inherit_object_permissions:
565 565 p = 'group.none'
566 566 self.permissions_repository_groups[rg_k] = p, o
567 567
568 568 # defaults for user groups taken from `default` user permission
569 569 # on given user group
570 570 for perm in self.default_user_group_perms:
571 571 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
572 572 o = PermOrigin.USERGROUP_DEFAULT
573 573 if perm.UserGroup.user_id == self.user_id:
574 574 # set admin if owner
575 575 p = 'usergroup.admin'
576 576 o = PermOrigin.USERGROUP_OWNER
577 577 else:
578 578 p = perm.Permission.permission_name
579 579
580 580 # if we decide this user isn't inheriting permissions from default
581 581 # user we set him to .none so only explicit permissions work
582 582 if not user_inherit_object_permissions:
583 583 p = 'usergroup.none'
584 584 self.permissions_user_groups[u_k] = p, o
585 585
586 586 def _calculate_repository_permissions(self):
587 587 """
588 588 Repository permissions for the current user.
589 589
590 590 Check if the user is part of user groups for this repository and
591 591 fill in the permission from it. `_choose_permission` decides of which
592 592 permission should be selected based on selected method.
593 593 """
594 594
595 595 # user group for repositories permissions
596 596 user_repo_perms_from_user_group = Permission\
597 597 .get_default_repo_perms_from_user_group(
598 598 self.user_id, self.scope_repo_id)
599 599
600 600 multiple_counter = collections.defaultdict(int)
601 601 for perm in user_repo_perms_from_user_group:
602 602 r_k = perm.UserGroupRepoToPerm.repository.repo_name
603 603 ug_k = perm.UserGroupRepoToPerm.users_group.users_group_name
604 604 multiple_counter[r_k] += 1
605 605 p = perm.Permission.permission_name
606 606 o = PermOrigin.REPO_USERGROUP % ug_k
607 607
608 608 if perm.Repository.user_id == self.user_id:
609 609 # set admin if owner
610 610 p = 'repository.admin'
611 611 o = PermOrigin.REPO_OWNER
612 612 else:
613 613 if multiple_counter[r_k] > 1:
614 614 cur_perm = self.permissions_repositories[r_k]
615 615 p = self._choose_permission(p, cur_perm)
616 616 self.permissions_repositories[r_k] = p, o
617 617
618 618 # user explicit permissions for repositories, overrides any specified
619 619 # by the group permission
620 620 user_repo_perms = Permission.get_default_repo_perms(
621 621 self.user_id, self.scope_repo_id)
622 622 for perm in user_repo_perms:
623 623 r_k = perm.UserRepoToPerm.repository.repo_name
624 624 o = PermOrigin.REPO_USER % perm.UserRepoToPerm.user.username
625 625 # set admin if owner
626 626 if perm.Repository.user_id == self.user_id:
627 627 p = 'repository.admin'
628 628 o = PermOrigin.REPO_OWNER
629 629 else:
630 630 p = perm.Permission.permission_name
631 631 if not self.explicit:
632 632 cur_perm = self.permissions_repositories.get(
633 633 r_k, 'repository.none')
634 634 p = self._choose_permission(p, cur_perm)
635 635 self.permissions_repositories[r_k] = p, o
636 636
637 637 def _calculate_repository_group_permissions(self):
638 638 """
639 639 Repository group permissions for the current user.
640 640
641 641 Check if the user is part of user groups for repository groups and
642 642 fill in the permissions from it. `_choose_permmission` decides of which
643 643 permission should be selected based on selected method.
644 644 """
645 645 # user group for repo groups permissions
646 646 user_repo_group_perms_from_user_group = Permission\
647 647 .get_default_group_perms_from_user_group(
648 648 self.user_id, self.scope_repo_group_id)
649 649
650 650 multiple_counter = collections.defaultdict(int)
651 651 for perm in user_repo_group_perms_from_user_group:
652 652 g_k = perm.UserGroupRepoGroupToPerm.group.group_name
653 653 ug_k = perm.UserGroupRepoGroupToPerm.users_group.users_group_name
654 654 o = PermOrigin.REPOGROUP_USERGROUP % ug_k
655 655 multiple_counter[g_k] += 1
656 656 p = perm.Permission.permission_name
657 657 if perm.RepoGroup.user_id == self.user_id:
658 658 # set admin if owner, even for member of other user group
659 659 p = 'group.admin'
660 660 o = PermOrigin.REPOGROUP_OWNER
661 661 else:
662 662 if multiple_counter[g_k] > 1:
663 663 cur_perm = self.permissions_repository_groups[g_k]
664 664 p = self._choose_permission(p, cur_perm)
665 665 self.permissions_repository_groups[g_k] = p, o
666 666
667 667 # user explicit permissions for repository groups
668 668 user_repo_groups_perms = Permission.get_default_group_perms(
669 669 self.user_id, self.scope_repo_group_id)
670 670 for perm in user_repo_groups_perms:
671 671 rg_k = perm.UserRepoGroupToPerm.group.group_name
672 672 u_k = perm.UserRepoGroupToPerm.user.username
673 673 o = PermOrigin.REPOGROUP_USER % u_k
674 674
675 675 if perm.RepoGroup.user_id == self.user_id:
676 676 # set admin if owner
677 677 p = 'group.admin'
678 678 o = PermOrigin.REPOGROUP_OWNER
679 679 else:
680 680 p = perm.Permission.permission_name
681 681 if not self.explicit:
682 682 cur_perm = self.permissions_repository_groups.get(
683 683 rg_k, 'group.none')
684 684 p = self._choose_permission(p, cur_perm)
685 685 self.permissions_repository_groups[rg_k] = p, o
686 686
687 687 def _calculate_user_group_permissions(self):
688 688 """
689 689 User group permissions for the current user.
690 690 """
691 691 # user group for user group permissions
692 692 user_group_from_user_group = Permission\
693 693 .get_default_user_group_perms_from_user_group(
694 694 self.user_id, self.scope_user_group_id)
695 695
696 696 multiple_counter = collections.defaultdict(int)
697 697 for perm in user_group_from_user_group:
698 698 g_k = perm.UserGroupUserGroupToPerm\
699 699 .target_user_group.users_group_name
700 700 u_k = perm.UserGroupUserGroupToPerm\
701 701 .user_group.users_group_name
702 702 o = PermOrigin.USERGROUP_USERGROUP % u_k
703 703 multiple_counter[g_k] += 1
704 704 p = perm.Permission.permission_name
705 705
706 706 if perm.UserGroup.user_id == self.user_id:
707 707 # set admin if owner, even for member of other user group
708 708 p = 'usergroup.admin'
709 709 o = PermOrigin.USERGROUP_OWNER
710 710 else:
711 711 if multiple_counter[g_k] > 1:
712 712 cur_perm = self.permissions_user_groups[g_k]
713 713 p = self._choose_permission(p, cur_perm)
714 714 self.permissions_user_groups[g_k] = p, o
715 715
716 716 # user explicit permission for user groups
717 717 user_user_groups_perms = Permission.get_default_user_group_perms(
718 718 self.user_id, self.scope_user_group_id)
719 719 for perm in user_user_groups_perms:
720 720 ug_k = perm.UserUserGroupToPerm.user_group.users_group_name
721 721 u_k = perm.UserUserGroupToPerm.user.username
722 722 o = PermOrigin.USERGROUP_USER % u_k
723 723
724 724 if perm.UserGroup.user_id == self.user_id:
725 725 # set admin if owner
726 726 p = 'usergroup.admin'
727 727 o = PermOrigin.USERGROUP_OWNER
728 728 else:
729 729 p = perm.Permission.permission_name
730 730 if not self.explicit:
731 731 cur_perm = self.permissions_user_groups.get(
732 732 ug_k, 'usergroup.none')
733 733 p = self._choose_permission(p, cur_perm)
734 734 self.permissions_user_groups[ug_k] = p, o
735 735
736 736 def _choose_permission(self, new_perm, cur_perm):
737 737 new_perm_val = Permission.PERM_WEIGHTS[new_perm]
738 738 cur_perm_val = Permission.PERM_WEIGHTS[cur_perm]
739 739 if self.algo == 'higherwin':
740 740 if new_perm_val > cur_perm_val:
741 741 return new_perm
742 742 return cur_perm
743 743 elif self.algo == 'lowerwin':
744 744 if new_perm_val < cur_perm_val:
745 745 return new_perm
746 746 return cur_perm
747 747
748 748 def _permission_structure(self):
749 749 return {
750 750 'global': self.permissions_global,
751 751 'repositories': self.permissions_repositories,
752 752 'repositories_groups': self.permissions_repository_groups,
753 753 'user_groups': self.permissions_user_groups,
754 754 }
755 755
756 756
757 757 def allowed_auth_token_access(controller_name, whitelist=None, auth_token=None):
758 758 """
759 759 Check if given controller_name is in whitelist of auth token access
760 760 """
761 761 if not whitelist:
762 762 from rhodecode import CONFIG
763 763 whitelist = aslist(
764 764 CONFIG.get('api_access_controllers_whitelist'), sep=',')
765 765 log.debug(
766 766 'Allowed controllers for AUTH TOKEN access: %s' % (whitelist,))
767 767
768 768 auth_token_access_valid = False
769 769 for entry in whitelist:
770 770 if fnmatch.fnmatch(controller_name, entry):
771 771 auth_token_access_valid = True
772 772 break
773 773
774 774 if auth_token_access_valid:
775 775 log.debug('controller:%s matches entry in whitelist'
776 776 % (controller_name,))
777 777 else:
778 778 msg = ('controller: %s does *NOT* match any entry in whitelist'
779 779 % (controller_name,))
780 780 if auth_token:
781 781 # if we use auth token key and don't have access it's a warning
782 782 log.warning(msg)
783 783 else:
784 784 log.debug(msg)
785 785
786 786 return auth_token_access_valid
787 787
788 788
789 789 class AuthUser(object):
790 790 """
791 791 A simple object that handles all attributes of user in RhodeCode
792 792
793 793 It does lookup based on API key,given user, or user present in session
794 794 Then it fills all required information for such user. It also checks if
795 795 anonymous access is enabled and if so, it returns default user as logged in
796 796 """
797 797 GLOBAL_PERMS = [x[0] for x in Permission.PERMS]
798 798
799 799 def __init__(self, user_id=None, api_key=None, username=None, ip_addr=None):
800 800
801 801 self.user_id = user_id
802 802 self._api_key = api_key
803 803
804 804 self.api_key = None
805 805 self.feed_token = ''
806 806 self.username = username
807 807 self.ip_addr = ip_addr
808 808 self.name = ''
809 809 self.lastname = ''
810 810 self.first_name = ''
811 811 self.last_name = ''
812 812 self.email = ''
813 813 self.is_authenticated = False
814 814 self.admin = False
815 815 self.inherit_default_permissions = False
816 816 self.password = ''
817 817
818 818 self.anonymous_user = None # propagated on propagate_data
819 819 self.propagate_data()
820 820 self._instance = None
821 821 self._permissions_scoped_cache = {} # used to bind scoped calculation
822 822
823 823 @LazyProperty
824 824 def permissions(self):
825 825 return self.get_perms(user=self, cache=False)
826 826
827 827 def permissions_with_scope(self, scope):
828 828 """
829 829 Call the get_perms function with scoped data. The scope in that function
830 830 narrows the SQL calls to the given ID of objects resulting in fetching
831 831 Just particular permission we want to obtain. If scope is an empty dict
832 832 then it basically narrows the scope to GLOBAL permissions only.
833 833
834 834 :param scope: dict
835 835 """
836 836 if 'repo_name' in scope:
837 837 obj = Repository.get_by_repo_name(scope['repo_name'])
838 838 if obj:
839 839 scope['repo_id'] = obj.repo_id
840 840 _scope = {
841 841 'repo_id': -1,
842 842 'user_group_id': -1,
843 843 'repo_group_id': -1,
844 844 }
845 845 _scope.update(scope)
846 846 cache_key = "_".join(map(safe_str, reduce(lambda a, b: a+b,
847 847 _scope.items())))
848 848 if cache_key not in self._permissions_scoped_cache:
849 849 # store in cache to mimic how the @LazyProperty works,
850 850 # the difference here is that we use the unique key calculated
851 851 # from params and values
852 852 res = self.get_perms(user=self, cache=False, scope=_scope)
853 853 self._permissions_scoped_cache[cache_key] = res
854 854 return self._permissions_scoped_cache[cache_key]
855 855
856 856 def get_instance(self):
857 857 return User.get(self.user_id)
858 858
859 859 def update_lastactivity(self):
860 860 if self.user_id:
861 861 User.get(self.user_id).update_lastactivity()
862 862
863 863 def propagate_data(self):
864 864 """
865 865 Fills in user data and propagates values to this instance. Maps fetched
866 866 user attributes to this class instance attributes
867 867 """
868 868 log.debug('starting data propagation for new potential AuthUser')
869 869 user_model = UserModel()
870 870 anon_user = self.anonymous_user = User.get_default_user(cache=True)
871 871 is_user_loaded = False
872 872
873 873 # lookup by userid
874 874 if self.user_id is not None and self.user_id != anon_user.user_id:
875 875 log.debug('Trying Auth User lookup by USER ID: `%s`' % self.user_id)
876 876 is_user_loaded = user_model.fill_data(self, user_id=self.user_id)
877 877
878 878 # try go get user by api key
879 879 elif self._api_key and self._api_key != anon_user.api_key:
880 880 log.debug('Trying Auth User lookup by API KEY: `%s`' % self._api_key)
881 881 is_user_loaded = user_model.fill_data(self, api_key=self._api_key)
882 882
883 883 # lookup by username
884 884 elif self.username:
885 885 log.debug('Trying Auth User lookup by USER NAME: `%s`' % self.username)
886 886 is_user_loaded = user_model.fill_data(self, username=self.username)
887 887 else:
888 888 log.debug('No data in %s that could been used to log in' % self)
889 889
890 890 if not is_user_loaded:
891 891 log.debug('Failed to load user. Fallback to default user')
892 892 # if we cannot authenticate user try anonymous
893 893 if anon_user.active:
894 894 user_model.fill_data(self, user_id=anon_user.user_id)
895 895 # then we set this user is logged in
896 896 self.is_authenticated = True
897 897 else:
898 898 # in case of disabled anonymous user we reset some of the
899 899 # parameters so such user is "corrupted", skipping the fill_data
900 900 for attr in ['user_id', 'username', 'admin', 'active']:
901 901 setattr(self, attr, None)
902 902 self.is_authenticated = False
903 903
904 904 if not self.username:
905 905 self.username = 'None'
906 906
907 907 log.debug('Auth User is now %s' % self)
908 908
909 909 def get_perms(self, user, scope=None, explicit=True, algo='higherwin',
910 910 cache=False):
911 911 """
912 912 Fills user permission attribute with permissions taken from database
913 913 works for permissions given for repositories, and for permissions that
914 914 are granted to groups
915 915
916 916 :param user: instance of User object from database
917 917 :param explicit: In case there are permissions both for user and a group
918 918 that user is part of, explicit flag will defiine if user will
919 919 explicitly override permissions from group, if it's False it will
920 920 make decision based on the algo
921 921 :param algo: algorithm to decide what permission should be choose if
922 922 it's multiple defined, eg user in two different groups. It also
923 923 decides if explicit flag is turned off how to specify the permission
924 924 for case when user is in a group + have defined separate permission
925 925 """
926 926 user_id = user.user_id
927 927 user_is_admin = user.is_admin
928 928
929 929 # inheritance of global permissions like create repo/fork repo etc
930 930 user_inherit_default_permissions = user.inherit_default_permissions
931 931
932 932 log.debug('Computing PERMISSION tree for scope %s' % (scope, ))
933 933 compute = caches.conditional_cache(
934 934 'short_term', 'cache_desc',
935 935 condition=cache, func=_cached_perms_data)
936 936 result = compute(user_id, scope, user_is_admin,
937 937 user_inherit_default_permissions, explicit, algo)
938 938
939 939 result_repr = []
940 940 for k in result:
941 941 result_repr.append((k, len(result[k])))
942 942
943 943 log.debug('PERMISSION tree computed %s' % (result_repr,))
944 944 return result
945 945
946 946 @property
947 947 def is_default(self):
948 948 return self.username == User.DEFAULT_USER
949 949
950 950 @property
951 951 def is_admin(self):
952 952 return self.admin
953 953
954 954 @property
955 955 def is_user_object(self):
956 956 return self.user_id is not None
957 957
958 958 @property
959 959 def repositories_admin(self):
960 960 """
961 961 Returns list of repositories you're an admin of
962 962 """
963 963 return [
964 964 x[0] for x in self.permissions['repositories'].iteritems()
965 965 if x[1] == 'repository.admin']
966 966
967 967 @property
968 968 def repository_groups_admin(self):
969 969 """
970 970 Returns list of repository groups you're an admin of
971 971 """
972 972 return [
973 973 x[0] for x in self.permissions['repositories_groups'].iteritems()
974 974 if x[1] == 'group.admin']
975 975
976 976 @property
977 977 def user_groups_admin(self):
978 978 """
979 979 Returns list of user groups you're an admin of
980 980 """
981 981 return [
982 982 x[0] for x in self.permissions['user_groups'].iteritems()
983 983 if x[1] == 'usergroup.admin']
984 984
985 985 @property
986 986 def ip_allowed(self):
987 987 """
988 988 Checks if ip_addr used in constructor is allowed from defined list of
989 989 allowed ip_addresses for user
990 990
991 991 :returns: boolean, True if ip is in allowed ip range
992 992 """
993 993 # check IP
994 994 inherit = self.inherit_default_permissions
995 995 return AuthUser.check_ip_allowed(self.user_id, self.ip_addr,
996 996 inherit_from_default=inherit)
997 997 @property
998 998 def personal_repo_group(self):
999 999 return RepoGroup.get_user_personal_repo_group(self.user_id)
1000 1000
1001 1001 @classmethod
1002 1002 def check_ip_allowed(cls, user_id, ip_addr, inherit_from_default):
1003 1003 allowed_ips = AuthUser.get_allowed_ips(
1004 1004 user_id, cache=True, inherit_from_default=inherit_from_default)
1005 1005 if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
1006 1006 log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
1007 1007 return True
1008 1008 else:
1009 1009 log.info('Access for IP:%s forbidden, '
1010 1010 'not in %s' % (ip_addr, allowed_ips))
1011 1011 return False
1012 1012
1013 1013 def __repr__(self):
1014 1014 return "<AuthUser('id:%s[%s] ip:%s auth:%s')>"\
1015 1015 % (self.user_id, self.username, self.ip_addr, self.is_authenticated)
1016 1016
1017 1017 def set_authenticated(self, authenticated=True):
1018 1018 if self.user_id != self.anonymous_user.user_id:
1019 1019 self.is_authenticated = authenticated
1020 1020
1021 1021 def get_cookie_store(self):
1022 1022 return {
1023 1023 'username': self.username,
1024 1024 'password': md5(self.password),
1025 1025 'user_id': self.user_id,
1026 1026 'is_authenticated': self.is_authenticated
1027 1027 }
1028 1028
1029 1029 @classmethod
1030 1030 def from_cookie_store(cls, cookie_store):
1031 1031 """
1032 1032 Creates AuthUser from a cookie store
1033 1033
1034 1034 :param cls:
1035 1035 :param cookie_store:
1036 1036 """
1037 1037 user_id = cookie_store.get('user_id')
1038 1038 username = cookie_store.get('username')
1039 1039 api_key = cookie_store.get('api_key')
1040 1040 return AuthUser(user_id, api_key, username)
1041 1041
1042 1042 @classmethod
1043 1043 def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
1044 1044 _set = set()
1045 1045
1046 1046 if inherit_from_default:
1047 1047 default_ips = UserIpMap.query().filter(
1048 1048 UserIpMap.user == User.get_default_user(cache=True))
1049 1049 if cache:
1050 1050 default_ips = default_ips.options(
1051 1051 FromCache("sql_cache_short", "get_user_ips_default"))
1052 1052
1053 1053 # populate from default user
1054 1054 for ip in default_ips:
1055 1055 try:
1056 1056 _set.add(ip.ip_addr)
1057 1057 except ObjectDeletedError:
1058 1058 # since we use heavy caching sometimes it happens that
1059 1059 # we get deleted objects here, we just skip them
1060 1060 pass
1061 1061
1062 1062 user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
1063 1063 if cache:
1064 1064 user_ips = user_ips.options(
1065 1065 FromCache("sql_cache_short", "get_user_ips_%s" % user_id))
1066 1066
1067 1067 for ip in user_ips:
1068 1068 try:
1069 1069 _set.add(ip.ip_addr)
1070 1070 except ObjectDeletedError:
1071 1071 # since we use heavy caching sometimes it happens that we get
1072 1072 # deleted objects here, we just skip them
1073 1073 pass
1074 1074 return _set or set(['0.0.0.0/0', '::/0'])
1075 1075
1076 1076
1077 1077 def set_available_permissions(config):
1078 1078 """
1079 1079 This function will propagate pylons globals with all available defined
1080 1080 permission given in db. We don't want to check each time from db for new
1081 1081 permissions since adding a new permission also requires application restart
1082 1082 ie. to decorate new views with the newly created permission
1083 1083
1084 1084 :param config: current pylons config instance
1085 1085
1086 1086 """
1087 1087 log.info('getting information about all available permissions')
1088 1088 try:
1089 1089 sa = meta.Session
1090 1090 all_perms = sa.query(Permission).all()
1091 1091 config['available_permissions'] = [x.permission_name for x in all_perms]
1092 1092 except Exception:
1093 1093 log.error(traceback.format_exc())
1094 1094 finally:
1095 1095 meta.Session.remove()
1096 1096
1097 1097
1098 1098 def get_csrf_token(session=None, force_new=False, save_if_missing=True):
1099 1099 """
1100 1100 Return the current authentication token, creating one if one doesn't
1101 1101 already exist and the save_if_missing flag is present.
1102 1102
1103 1103 :param session: pass in the pylons session, else we use the global ones
1104 1104 :param force_new: force to re-generate the token and store it in session
1105 1105 :param save_if_missing: save the newly generated token if it's missing in
1106 1106 session
1107 1107 """
1108 1108 if not session:
1109 1109 from pylons import session
1110 1110
1111 1111 if (csrf_token_key not in session and save_if_missing) or force_new:
1112 1112 token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
1113 1113 session[csrf_token_key] = token
1114 1114 if hasattr(session, 'save'):
1115 1115 session.save()
1116 1116 return session.get(csrf_token_key)
1117 1117
1118 1118
1119 1119 def get_request(perm_class):
1120 1120 from pyramid.threadlocal import get_current_request
1121 1121 pyramid_request = get_current_request()
1122 1122 if not pyramid_request:
1123 1123 # return global request of pylons in case pyramid isn't available
1124 1124 # NOTE(marcink): this should be removed after migration to pyramid
1125 1125 from pylons import request
1126 1126 return request
1127 1127 return pyramid_request
1128 1128
1129 1129
1130 1130 # CHECK DECORATORS
1131 1131 class CSRFRequired(object):
1132 1132 """
1133 1133 Decorator for authenticating a form
1134 1134
1135 1135 This decorator uses an authorization token stored in the client's
1136 1136 session for prevention of certain Cross-site request forgery (CSRF)
1137 1137 attacks (See
1138 1138 http://en.wikipedia.org/wiki/Cross-site_request_forgery for more
1139 1139 information).
1140 1140
1141 1141 For use with the ``webhelpers.secure_form`` helper functions.
1142 1142
1143 1143 """
1144 1144 def __init__(self, token=csrf_token_key, header='X-CSRF-Token',
1145 1145 except_methods=None):
1146 1146 self.token = token
1147 1147 self.header = header
1148 1148 self.except_methods = except_methods or []
1149 1149
1150 1150 def __call__(self, func):
1151 1151 return get_cython_compat_decorator(self.__wrapper, func)
1152 1152
1153 1153 def _get_csrf(self, _request):
1154 1154 return _request.POST.get(self.token, _request.headers.get(self.header))
1155 1155
1156 1156 def check_csrf(self, _request, cur_token):
1157 1157 supplied_token = self._get_csrf(_request)
1158 1158 return supplied_token and supplied_token == cur_token
1159 1159
1160 1160 def _get_request(self):
1161 1161 return get_request(self)
1162 1162
1163 1163 def __wrapper(self, func, *fargs, **fkwargs):
1164 1164 request = self._get_request()
1165 1165
1166 1166 if request.method in self.except_methods:
1167 1167 return func(*fargs, **fkwargs)
1168 1168
1169 1169 cur_token = get_csrf_token(save_if_missing=False)
1170 1170 if self.check_csrf(request, cur_token):
1171 1171 if request.POST.get(self.token):
1172 1172 del request.POST[self.token]
1173 1173 return func(*fargs, **fkwargs)
1174 1174 else:
1175 1175 reason = 'token-missing'
1176 1176 supplied_token = self._get_csrf(request)
1177 1177 if supplied_token and cur_token != supplied_token:
1178 1178 reason = 'token-mismatch [%s:%s]' % (
1179 1179 cur_token or ''[:6], supplied_token or ''[:6])
1180 1180
1181 1181 csrf_message = \
1182 1182 ("Cross-site request forgery detected, request denied. See "
1183 1183 "http://en.wikipedia.org/wiki/Cross-site_request_forgery for "
1184 1184 "more information.")
1185 1185 log.warn('Cross-site request forgery detected, request %r DENIED: %s '
1186 1186 'REMOTE_ADDR:%s, HEADERS:%s' % (
1187 1187 request, reason, request.remote_addr, request.headers))
1188 1188
1189 1189 raise HTTPForbidden(explanation=csrf_message)
1190 1190
1191 1191
1192 1192 class LoginRequired(object):
1193 1193 """
1194 1194 Must be logged in to execute this function else
1195 1195 redirect to login page
1196 1196
1197 1197 :param api_access: if enabled this checks only for valid auth token
1198 1198 and grants access based on valid token
1199 1199 """
1200 1200 def __init__(self, auth_token_access=None):
1201 1201 self.auth_token_access = auth_token_access
1202 1202
1203 1203 def __call__(self, func):
1204 1204 return get_cython_compat_decorator(self.__wrapper, func)
1205 1205
1206 1206 def _get_request(self):
1207 1207 return get_request(self)
1208 1208
1209 1209 def __wrapper(self, func, *fargs, **fkwargs):
1210 1210 from rhodecode.lib import helpers as h
1211 1211 cls = fargs[0]
1212 1212 user = cls._rhodecode_user
1213 1213 request = self._get_request()
1214 1214
1215 1215 loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
1216 1216 log.debug('Starting login restriction checks for user: %s' % (user,))
1217 1217 # check if our IP is allowed
1218 1218 ip_access_valid = True
1219 1219 if not user.ip_allowed:
1220 1220 h.flash(h.literal(_('IP %s not allowed' % (user.ip_addr,))),
1221 1221 category='warning')
1222 1222 ip_access_valid = False
1223 1223
1224 1224 # check if we used an APIKEY and it's a valid one
1225 1225 # defined white-list of controllers which API access will be enabled
1226 1226 _auth_token = request.GET.get(
1227 1227 'auth_token', '') or request.GET.get('api_key', '')
1228 1228 auth_token_access_valid = allowed_auth_token_access(
1229 1229 loc, auth_token=_auth_token)
1230 1230
1231 1231 # explicit controller is enabled or API is in our whitelist
1232 1232 if self.auth_token_access or auth_token_access_valid:
1233 1233 log.debug('Checking AUTH TOKEN access for %s' % (cls,))
1234 1234 db_user = user.get_instance()
1235 1235
1236 1236 if db_user:
1237 1237 if self.auth_token_access:
1238 1238 roles = self.auth_token_access
1239 1239 else:
1240 1240 roles = [UserApiKeys.ROLE_HTTP]
1241 1241 token_match = db_user.authenticate_by_token(
1242 1242 _auth_token, roles=roles)
1243 1243 else:
1244 1244 log.debug('Unable to fetch db instance for auth user: %s', user)
1245 1245 token_match = False
1246 1246
1247 1247 if _auth_token and token_match:
1248 1248 auth_token_access_valid = True
1249 1249 log.debug('AUTH TOKEN ****%s is VALID' % (_auth_token[-4:],))
1250 1250 else:
1251 1251 auth_token_access_valid = False
1252 1252 if not _auth_token:
1253 1253 log.debug("AUTH TOKEN *NOT* present in request")
1254 1254 else:
1255 1255 log.warning(
1256 1256 "AUTH TOKEN ****%s *NOT* valid" % _auth_token[-4:])
1257 1257
1258 1258 log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
1259 1259 reason = 'RHODECODE_AUTH' if user.is_authenticated \
1260 1260 else 'AUTH_TOKEN_AUTH'
1261 1261
1262 1262 if ip_access_valid and (
1263 1263 user.is_authenticated or auth_token_access_valid):
1264 1264 log.info(
1265 1265 'user %s authenticating with:%s IS authenticated on func %s'
1266 1266 % (user, reason, loc))
1267 1267
1268 1268 # update user data to check last activity
1269 1269 user.update_lastactivity()
1270 1270 Session().commit()
1271 1271 return func(*fargs, **fkwargs)
1272 1272 else:
1273 1273 log.warning(
1274 1274 'user %s authenticating with:%s NOT authenticated on '
1275 1275 'func: %s: IP_ACCESS:%s AUTH_TOKEN_ACCESS:%s'
1276 1276 % (user, reason, loc, ip_access_valid,
1277 1277 auth_token_access_valid))
1278 1278 # we preserve the get PARAM
1279 1279 came_from = request.path_qs
1280 1280 log.debug('redirecting to login page with %s' % (came_from,))
1281 1281 raise HTTPFound(
1282 1282 h.route_path('login', _query={'came_from': came_from}))
1283 1283
1284 1284
1285 1285 class NotAnonymous(object):
1286 1286 """
1287 1287 Must be logged in to execute this function else
1288 1288 redirect to login page
1289 1289 """
1290 1290
1291 1291 def __call__(self, func):
1292 1292 return get_cython_compat_decorator(self.__wrapper, func)
1293 1293
1294 1294 def _get_request(self):
1295 1295 return get_request(self)
1296 1296
1297 1297 def __wrapper(self, func, *fargs, **fkwargs):
1298 1298 import rhodecode.lib.helpers as h
1299 1299 cls = fargs[0]
1300 1300 self.user = cls._rhodecode_user
1301 1301 request = self._get_request()
1302 1302
1303 1303 log.debug('Checking if user is not anonymous @%s' % cls)
1304 1304
1305 1305 anonymous = self.user.username == User.DEFAULT_USER
1306 1306
1307 1307 if anonymous:
1308 1308 came_from = request.path_qs
1309 1309 h.flash(_('You need to be a registered user to '
1310 1310 'perform this action'),
1311 1311 category='warning')
1312 1312 raise HTTPFound(
1313 1313 h.route_path('login', _query={'came_from': came_from}))
1314 1314 else:
1315 1315 return func(*fargs, **fkwargs)
1316 1316
1317 1317
1318 1318 class XHRRequired(object):
1319 1319 # TODO(marcink): remove this in favor of the predicates in pyramid routes
1320 1320
1321 1321 def __call__(self, func):
1322 1322 return get_cython_compat_decorator(self.__wrapper, func)
1323 1323
1324 1324 def _get_request(self):
1325 1325 return get_request(self)
1326 1326
1327 1327 def __wrapper(self, func, *fargs, **fkwargs):
1328 1328 from pylons.controllers.util import abort
1329 1329 request = self._get_request()
1330 1330
1331 1331 log.debug('Checking if request is XMLHttpRequest (XHR)')
1332 1332 xhr_message = 'This is not a valid XMLHttpRequest (XHR) request'
1333 1333
1334 1334 if not request.is_xhr:
1335 1335 abort(400, detail=xhr_message)
1336 1336
1337 1337 return func(*fargs, **fkwargs)
1338 1338
1339 1339
1340 1340 class HasAcceptedRepoType(object):
1341 1341 """
1342 1342 Check if requested repo is within given repo type aliases
1343 1343 """
1344 1344
1345 1345 # TODO(marcink): remove this in favor of the predicates in pyramid routes
1346 1346
1347 1347 def __init__(self, *repo_type_list):
1348 1348 self.repo_type_list = set(repo_type_list)
1349 1349
1350 1350 def __call__(self, func):
1351 1351 return get_cython_compat_decorator(self.__wrapper, func)
1352 1352
1353 1353 def __wrapper(self, func, *fargs, **fkwargs):
1354 1354 import rhodecode.lib.helpers as h
1355 1355 cls = fargs[0]
1356 1356 rhodecode_repo = cls.rhodecode_repo
1357 1357
1358 1358 log.debug('%s checking repo type for %s in %s',
1359 1359 self.__class__.__name__,
1360 1360 rhodecode_repo.alias, self.repo_type_list)
1361 1361
1362 1362 if rhodecode_repo.alias in self.repo_type_list:
1363 1363 return func(*fargs, **fkwargs)
1364 1364 else:
1365 1365 h.flash(h.literal(
1366 1366 _('Action not supported for %s.' % rhodecode_repo.alias)),
1367 1367 category='warning')
1368 1368 raise HTTPFound(
1369 1369 h.route_path('repo_summary',
1370 1370 repo_name=cls.rhodecode_db_repo.repo_name))
1371 1371
1372 1372
1373 1373 class PermsDecorator(object):
1374 1374 """
1375 1375 Base class for controller decorators, we extract the current user from
1376 1376 the class itself, which has it stored in base controllers
1377 1377 """
1378 1378
1379 1379 def __init__(self, *required_perms):
1380 1380 self.required_perms = set(required_perms)
1381 1381
1382 1382 def __call__(self, func):
1383 1383 return get_cython_compat_decorator(self.__wrapper, func)
1384 1384
1385 1385 def _get_request(self):
1386 1386 return get_request(self)
1387 1387
1388 1388 def _get_came_from(self):
1389 1389 _request = self._get_request()
1390 1390
1391 1391 # both pylons/pyramid has this attribute
1392 1392 return _request.path_qs
1393 1393
1394 1394 def __wrapper(self, func, *fargs, **fkwargs):
1395 1395 import rhodecode.lib.helpers as h
1396 1396 cls = fargs[0]
1397 1397 _user = cls._rhodecode_user
1398 1398
1399 1399 log.debug('checking %s permissions %s for %s %s',
1400 1400 self.__class__.__name__, self.required_perms, cls, _user)
1401 1401
1402 1402 if self.check_permissions(_user):
1403 1403 log.debug('Permission granted for %s %s', cls, _user)
1404 1404 return func(*fargs, **fkwargs)
1405 1405
1406 1406 else:
1407 1407 log.debug('Permission denied for %s %s', cls, _user)
1408 1408 anonymous = _user.username == User.DEFAULT_USER
1409 1409
1410 1410 if anonymous:
1411 1411 came_from = self._get_came_from()
1412 1412 h.flash(_('You need to be signed in to view this page'),
1413 1413 category='warning')
1414 1414 raise HTTPFound(
1415 1415 h.route_path('login', _query={'came_from': came_from}))
1416 1416
1417 1417 else:
1418 # redirect with forbidden ret code
1419 raise HTTPForbidden()
1418 # redirect with 404 to prevent resource discovery
1419 raise HTTPNotFound()
1420 1420
1421 1421 def check_permissions(self, user):
1422 1422 """Dummy function for overriding"""
1423 1423 raise NotImplementedError(
1424 1424 'You have to write this function in child class')
1425 1425
1426 1426
1427 1427 class HasPermissionAllDecorator(PermsDecorator):
1428 1428 """
1429 1429 Checks for access permission for all given predicates. All of them
1430 1430 have to be meet in order to fulfill the request
1431 1431 """
1432 1432
1433 1433 def check_permissions(self, user):
1434 1434 perms = user.permissions_with_scope({})
1435 1435 if self.required_perms.issubset(perms['global']):
1436 1436 return True
1437 1437 return False
1438 1438
1439 1439
1440 1440 class HasPermissionAnyDecorator(PermsDecorator):
1441 1441 """
1442 1442 Checks for access permission for any of given predicates. In order to
1443 1443 fulfill the request any of predicates must be meet
1444 1444 """
1445 1445
1446 1446 def check_permissions(self, user):
1447 1447 perms = user.permissions_with_scope({})
1448 1448 if self.required_perms.intersection(perms['global']):
1449 1449 return True
1450 1450 return False
1451 1451
1452 1452
1453 1453 class HasRepoPermissionAllDecorator(PermsDecorator):
1454 1454 """
1455 1455 Checks for access permission for all given predicates for specific
1456 1456 repository. All of them have to be meet in order to fulfill the request
1457 1457 """
1458 1458 def _get_repo_name(self):
1459 1459 _request = self._get_request()
1460 1460 return get_repo_slug(_request)
1461 1461
1462 1462 def check_permissions(self, user):
1463 1463 perms = user.permissions
1464 1464 repo_name = self._get_repo_name()
1465 1465
1466 1466 try:
1467 1467 user_perms = set([perms['repositories'][repo_name]])
1468 1468 except KeyError:
1469 1469 log.debug('cannot locate repo with name: `%s` in permissions defs',
1470 1470 repo_name)
1471 1471 return False
1472 1472
1473 1473 log.debug('checking `%s` permissions for repo `%s`',
1474 1474 user_perms, repo_name)
1475 1475 if self.required_perms.issubset(user_perms):
1476 1476 return True
1477 1477 return False
1478 1478
1479 1479
1480 1480 class HasRepoPermissionAnyDecorator(PermsDecorator):
1481 1481 """
1482 1482 Checks for access permission for any of given predicates for specific
1483 1483 repository. In order to fulfill the request any of predicates must be meet
1484 1484 """
1485 1485 def _get_repo_name(self):
1486 1486 _request = self._get_request()
1487 1487 return get_repo_slug(_request)
1488 1488
1489 1489 def check_permissions(self, user):
1490 1490 perms = user.permissions
1491 1491 repo_name = self._get_repo_name()
1492 1492
1493 1493 try:
1494 1494 user_perms = set([perms['repositories'][repo_name]])
1495 1495 except KeyError:
1496 1496 log.debug('cannot locate repo with name: `%s` in permissions defs',
1497 1497 repo_name)
1498 1498 return False
1499 1499
1500 1500 log.debug('checking `%s` permissions for repo `%s`',
1501 1501 user_perms, repo_name)
1502 1502 if self.required_perms.intersection(user_perms):
1503 1503 return True
1504 1504 return False
1505 1505
1506 1506
1507 1507 class HasRepoGroupPermissionAllDecorator(PermsDecorator):
1508 1508 """
1509 1509 Checks for access permission for all given predicates for specific
1510 1510 repository group. All of them have to be meet in order to
1511 1511 fulfill the request
1512 1512 """
1513 1513 def _get_repo_group_name(self):
1514 1514 _request = self._get_request()
1515 1515 return get_repo_group_slug(_request)
1516 1516
1517 1517 def check_permissions(self, user):
1518 1518 perms = user.permissions
1519 1519 group_name = self._get_repo_group_name()
1520 1520 try:
1521 1521 user_perms = set([perms['repositories_groups'][group_name]])
1522 1522 except KeyError:
1523 1523 log.debug('cannot locate repo group with name: `%s` in permissions defs',
1524 1524 group_name)
1525 1525 return False
1526 1526
1527 1527 log.debug('checking `%s` permissions for repo group `%s`',
1528 1528 user_perms, group_name)
1529 1529 if self.required_perms.issubset(user_perms):
1530 1530 return True
1531 1531 return False
1532 1532
1533 1533
1534 1534 class HasRepoGroupPermissionAnyDecorator(PermsDecorator):
1535 1535 """
1536 1536 Checks for access permission for any of given predicates for specific
1537 1537 repository group. In order to fulfill the request any
1538 1538 of predicates must be met
1539 1539 """
1540 1540 def _get_repo_group_name(self):
1541 1541 _request = self._get_request()
1542 1542 return get_repo_group_slug(_request)
1543 1543
1544 1544 def check_permissions(self, user):
1545 1545 perms = user.permissions
1546 1546 group_name = self._get_repo_group_name()
1547 1547
1548 1548 try:
1549 1549 user_perms = set([perms['repositories_groups'][group_name]])
1550 1550 except KeyError:
1551 1551 log.debug('cannot locate repo group with name: `%s` in permissions defs',
1552 1552 group_name)
1553 1553 return False
1554 1554
1555 1555 log.debug('checking `%s` permissions for repo group `%s`',
1556 1556 user_perms, group_name)
1557 1557 if self.required_perms.intersection(user_perms):
1558 1558 return True
1559 1559 return False
1560 1560
1561 1561
1562 1562 class HasUserGroupPermissionAllDecorator(PermsDecorator):
1563 1563 """
1564 1564 Checks for access permission for all given predicates for specific
1565 1565 user group. All of them have to be meet in order to fulfill the request
1566 1566 """
1567 1567 def _get_user_group_name(self):
1568 1568 _request = self._get_request()
1569 1569 return get_user_group_slug(_request)
1570 1570
1571 1571 def check_permissions(self, user):
1572 1572 perms = user.permissions
1573 1573 group_name = self._get_user_group_name()
1574 1574 try:
1575 1575 user_perms = set([perms['user_groups'][group_name]])
1576 1576 except KeyError:
1577 1577 return False
1578 1578
1579 1579 if self.required_perms.issubset(user_perms):
1580 1580 return True
1581 1581 return False
1582 1582
1583 1583
1584 1584 class HasUserGroupPermissionAnyDecorator(PermsDecorator):
1585 1585 """
1586 1586 Checks for access permission for any of given predicates for specific
1587 1587 user group. In order to fulfill the request any of predicates must be meet
1588 1588 """
1589 1589 def _get_user_group_name(self):
1590 1590 _request = self._get_request()
1591 1591 return get_user_group_slug(_request)
1592 1592
1593 1593 def check_permissions(self, user):
1594 1594 perms = user.permissions
1595 1595 group_name = self._get_user_group_name()
1596 1596 try:
1597 1597 user_perms = set([perms['user_groups'][group_name]])
1598 1598 except KeyError:
1599 1599 return False
1600 1600
1601 1601 if self.required_perms.intersection(user_perms):
1602 1602 return True
1603 1603 return False
1604 1604
1605 1605
1606 1606 # CHECK FUNCTIONS
1607 1607 class PermsFunction(object):
1608 1608 """Base function for other check functions"""
1609 1609
1610 1610 def __init__(self, *perms):
1611 1611 self.required_perms = set(perms)
1612 1612 self.repo_name = None
1613 1613 self.repo_group_name = None
1614 1614 self.user_group_name = None
1615 1615
1616 1616 def __bool__(self):
1617 1617 frame = inspect.currentframe()
1618 1618 stack_trace = traceback.format_stack(frame)
1619 1619 log.error('Checking bool value on a class instance of perm '
1620 1620 'function is not allowed: %s' % ''.join(stack_trace))
1621 1621 # rather than throwing errors, here we always return False so if by
1622 1622 # accident someone checks truth for just an instance it will always end
1623 1623 # up in returning False
1624 1624 return False
1625 1625 __nonzero__ = __bool__
1626 1626
1627 1627 def __call__(self, check_location='', user=None):
1628 1628 if not user:
1629 1629 log.debug('Using user attribute from global request')
1630 1630 # TODO: remove this someday,put as user as attribute here
1631 1631 request = self._get_request()
1632 1632 user = request.user
1633 1633
1634 1634 # init auth user if not already given
1635 1635 if not isinstance(user, AuthUser):
1636 1636 log.debug('Wrapping user %s into AuthUser', user)
1637 1637 user = AuthUser(user.user_id)
1638 1638
1639 1639 cls_name = self.__class__.__name__
1640 1640 check_scope = self._get_check_scope(cls_name)
1641 1641 check_location = check_location or 'unspecified location'
1642 1642
1643 1643 log.debug('checking cls:%s %s usr:%s %s @ %s', cls_name,
1644 1644 self.required_perms, user, check_scope, check_location)
1645 1645 if not user:
1646 1646 log.warning('Empty user given for permission check')
1647 1647 return False
1648 1648
1649 1649 if self.check_permissions(user):
1650 1650 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1651 1651 check_scope, user, check_location)
1652 1652 return True
1653 1653
1654 1654 else:
1655 1655 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1656 1656 check_scope, user, check_location)
1657 1657 return False
1658 1658
1659 1659 def _get_request(self):
1660 1660 return get_request(self)
1661 1661
1662 1662 def _get_check_scope(self, cls_name):
1663 1663 return {
1664 1664 'HasPermissionAll': 'GLOBAL',
1665 1665 'HasPermissionAny': 'GLOBAL',
1666 1666 'HasRepoPermissionAll': 'repo:%s' % self.repo_name,
1667 1667 'HasRepoPermissionAny': 'repo:%s' % self.repo_name,
1668 1668 'HasRepoGroupPermissionAll': 'repo_group:%s' % self.repo_group_name,
1669 1669 'HasRepoGroupPermissionAny': 'repo_group:%s' % self.repo_group_name,
1670 1670 'HasUserGroupPermissionAll': 'user_group:%s' % self.user_group_name,
1671 1671 'HasUserGroupPermissionAny': 'user_group:%s' % self.user_group_name,
1672 1672 }.get(cls_name, '?:%s' % cls_name)
1673 1673
1674 1674 def check_permissions(self, user):
1675 1675 """Dummy function for overriding"""
1676 1676 raise Exception('You have to write this function in child class')
1677 1677
1678 1678
1679 1679 class HasPermissionAll(PermsFunction):
1680 1680 def check_permissions(self, user):
1681 1681 perms = user.permissions_with_scope({})
1682 1682 if self.required_perms.issubset(perms.get('global')):
1683 1683 return True
1684 1684 return False
1685 1685
1686 1686
1687 1687 class HasPermissionAny(PermsFunction):
1688 1688 def check_permissions(self, user):
1689 1689 perms = user.permissions_with_scope({})
1690 1690 if self.required_perms.intersection(perms.get('global')):
1691 1691 return True
1692 1692 return False
1693 1693
1694 1694
1695 1695 class HasRepoPermissionAll(PermsFunction):
1696 1696 def __call__(self, repo_name=None, check_location='', user=None):
1697 1697 self.repo_name = repo_name
1698 1698 return super(HasRepoPermissionAll, self).__call__(check_location, user)
1699 1699
1700 1700 def _get_repo_name(self):
1701 1701 if not self.repo_name:
1702 1702 _request = self._get_request()
1703 1703 self.repo_name = get_repo_slug(_request)
1704 1704 return self.repo_name
1705 1705
1706 1706 def check_permissions(self, user):
1707 1707 self.repo_name = self._get_repo_name()
1708 1708 perms = user.permissions
1709 1709 try:
1710 1710 user_perms = set([perms['repositories'][self.repo_name]])
1711 1711 except KeyError:
1712 1712 return False
1713 1713 if self.required_perms.issubset(user_perms):
1714 1714 return True
1715 1715 return False
1716 1716
1717 1717
1718 1718 class HasRepoPermissionAny(PermsFunction):
1719 1719 def __call__(self, repo_name=None, check_location='', user=None):
1720 1720 self.repo_name = repo_name
1721 1721 return super(HasRepoPermissionAny, self).__call__(check_location, user)
1722 1722
1723 1723 def _get_repo_name(self):
1724 1724 if not self.repo_name:
1725 1725 _request = self._get_request()
1726 1726 self.repo_name = get_repo_slug(_request)
1727 1727 return self.repo_name
1728 1728
1729 1729 def check_permissions(self, user):
1730 1730 self.repo_name = self._get_repo_name()
1731 1731 perms = user.permissions
1732 1732 try:
1733 1733 user_perms = set([perms['repositories'][self.repo_name]])
1734 1734 except KeyError:
1735 1735 return False
1736 1736 if self.required_perms.intersection(user_perms):
1737 1737 return True
1738 1738 return False
1739 1739
1740 1740
1741 1741 class HasRepoGroupPermissionAny(PermsFunction):
1742 1742 def __call__(self, group_name=None, check_location='', user=None):
1743 1743 self.repo_group_name = group_name
1744 1744 return super(HasRepoGroupPermissionAny, self).__call__(
1745 1745 check_location, user)
1746 1746
1747 1747 def check_permissions(self, user):
1748 1748 perms = user.permissions
1749 1749 try:
1750 1750 user_perms = set(
1751 1751 [perms['repositories_groups'][self.repo_group_name]])
1752 1752 except KeyError:
1753 1753 return False
1754 1754 if self.required_perms.intersection(user_perms):
1755 1755 return True
1756 1756 return False
1757 1757
1758 1758
1759 1759 class HasRepoGroupPermissionAll(PermsFunction):
1760 1760 def __call__(self, group_name=None, check_location='', user=None):
1761 1761 self.repo_group_name = group_name
1762 1762 return super(HasRepoGroupPermissionAll, self).__call__(
1763 1763 check_location, user)
1764 1764
1765 1765 def check_permissions(self, user):
1766 1766 perms = user.permissions
1767 1767 try:
1768 1768 user_perms = set(
1769 1769 [perms['repositories_groups'][self.repo_group_name]])
1770 1770 except KeyError:
1771 1771 return False
1772 1772 if self.required_perms.issubset(user_perms):
1773 1773 return True
1774 1774 return False
1775 1775
1776 1776
1777 1777 class HasUserGroupPermissionAny(PermsFunction):
1778 1778 def __call__(self, user_group_name=None, check_location='', user=None):
1779 1779 self.user_group_name = user_group_name
1780 1780 return super(HasUserGroupPermissionAny, self).__call__(
1781 1781 check_location, user)
1782 1782
1783 1783 def check_permissions(self, user):
1784 1784 perms = user.permissions
1785 1785 try:
1786 1786 user_perms = set([perms['user_groups'][self.user_group_name]])
1787 1787 except KeyError:
1788 1788 return False
1789 1789 if self.required_perms.intersection(user_perms):
1790 1790 return True
1791 1791 return False
1792 1792
1793 1793
1794 1794 class HasUserGroupPermissionAll(PermsFunction):
1795 1795 def __call__(self, user_group_name=None, check_location='', user=None):
1796 1796 self.user_group_name = user_group_name
1797 1797 return super(HasUserGroupPermissionAll, self).__call__(
1798 1798 check_location, user)
1799 1799
1800 1800 def check_permissions(self, user):
1801 1801 perms = user.permissions
1802 1802 try:
1803 1803 user_perms = set([perms['user_groups'][self.user_group_name]])
1804 1804 except KeyError:
1805 1805 return False
1806 1806 if self.required_perms.issubset(user_perms):
1807 1807 return True
1808 1808 return False
1809 1809
1810 1810
1811 1811 # SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
1812 1812 class HasPermissionAnyMiddleware(object):
1813 1813 def __init__(self, *perms):
1814 1814 self.required_perms = set(perms)
1815 1815
1816 1816 def __call__(self, user, repo_name):
1817 1817 # repo_name MUST be unicode, since we handle keys in permission
1818 1818 # dict by unicode
1819 1819 repo_name = safe_unicode(repo_name)
1820 1820 user = AuthUser(user.user_id)
1821 1821 log.debug(
1822 1822 'Checking VCS protocol permissions %s for user:%s repo:`%s`',
1823 1823 self.required_perms, user, repo_name)
1824 1824
1825 1825 if self.check_permissions(user, repo_name):
1826 1826 log.debug('Permission to repo:`%s` GRANTED for user:%s @ %s',
1827 1827 repo_name, user, 'PermissionMiddleware')
1828 1828 return True
1829 1829
1830 1830 else:
1831 1831 log.debug('Permission to repo:`%s` DENIED for user:%s @ %s',
1832 1832 repo_name, user, 'PermissionMiddleware')
1833 1833 return False
1834 1834
1835 1835 def check_permissions(self, user, repo_name):
1836 1836 perms = user.permissions_with_scope({'repo_name': repo_name})
1837 1837
1838 1838 try:
1839 1839 user_perms = set([perms['repositories'][repo_name]])
1840 1840 except Exception:
1841 1841 log.exception('Error while accessing user permissions')
1842 1842 return False
1843 1843
1844 1844 if self.required_perms.intersection(user_perms):
1845 1845 return True
1846 1846 return False
1847 1847
1848 1848
1849 1849 # SPECIAL VERSION TO HANDLE API AUTH
1850 1850 class _BaseApiPerm(object):
1851 1851 def __init__(self, *perms):
1852 1852 self.required_perms = set(perms)
1853 1853
1854 1854 def __call__(self, check_location=None, user=None, repo_name=None,
1855 1855 group_name=None, user_group_name=None):
1856 1856 cls_name = self.__class__.__name__
1857 1857 check_scope = 'global:%s' % (self.required_perms,)
1858 1858 if repo_name:
1859 1859 check_scope += ', repo_name:%s' % (repo_name,)
1860 1860
1861 1861 if group_name:
1862 1862 check_scope += ', repo_group_name:%s' % (group_name,)
1863 1863
1864 1864 if user_group_name:
1865 1865 check_scope += ', user_group_name:%s' % (user_group_name,)
1866 1866
1867 1867 log.debug(
1868 1868 'checking cls:%s %s %s @ %s'
1869 1869 % (cls_name, self.required_perms, check_scope, check_location))
1870 1870 if not user:
1871 1871 log.debug('Empty User passed into arguments')
1872 1872 return False
1873 1873
1874 1874 # process user
1875 1875 if not isinstance(user, AuthUser):
1876 1876 user = AuthUser(user.user_id)
1877 1877 if not check_location:
1878 1878 check_location = 'unspecified'
1879 1879 if self.check_permissions(user.permissions, repo_name, group_name,
1880 1880 user_group_name):
1881 1881 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1882 1882 check_scope, user, check_location)
1883 1883 return True
1884 1884
1885 1885 else:
1886 1886 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1887 1887 check_scope, user, check_location)
1888 1888 return False
1889 1889
1890 1890 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1891 1891 user_group_name=None):
1892 1892 """
1893 1893 implement in child class should return True if permissions are ok,
1894 1894 False otherwise
1895 1895
1896 1896 :param perm_defs: dict with permission definitions
1897 1897 :param repo_name: repo name
1898 1898 """
1899 1899 raise NotImplementedError()
1900 1900
1901 1901
1902 1902 class HasPermissionAllApi(_BaseApiPerm):
1903 1903 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1904 1904 user_group_name=None):
1905 1905 if self.required_perms.issubset(perm_defs.get('global')):
1906 1906 return True
1907 1907 return False
1908 1908
1909 1909
1910 1910 class HasPermissionAnyApi(_BaseApiPerm):
1911 1911 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1912 1912 user_group_name=None):
1913 1913 if self.required_perms.intersection(perm_defs.get('global')):
1914 1914 return True
1915 1915 return False
1916 1916
1917 1917
1918 1918 class HasRepoPermissionAllApi(_BaseApiPerm):
1919 1919 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1920 1920 user_group_name=None):
1921 1921 try:
1922 1922 _user_perms = set([perm_defs['repositories'][repo_name]])
1923 1923 except KeyError:
1924 1924 log.warning(traceback.format_exc())
1925 1925 return False
1926 1926 if self.required_perms.issubset(_user_perms):
1927 1927 return True
1928 1928 return False
1929 1929
1930 1930
1931 1931 class HasRepoPermissionAnyApi(_BaseApiPerm):
1932 1932 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1933 1933 user_group_name=None):
1934 1934 try:
1935 1935 _user_perms = set([perm_defs['repositories'][repo_name]])
1936 1936 except KeyError:
1937 1937 log.warning(traceback.format_exc())
1938 1938 return False
1939 1939 if self.required_perms.intersection(_user_perms):
1940 1940 return True
1941 1941 return False
1942 1942
1943 1943
1944 1944 class HasRepoGroupPermissionAnyApi(_BaseApiPerm):
1945 1945 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1946 1946 user_group_name=None):
1947 1947 try:
1948 1948 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1949 1949 except KeyError:
1950 1950 log.warning(traceback.format_exc())
1951 1951 return False
1952 1952 if self.required_perms.intersection(_user_perms):
1953 1953 return True
1954 1954 return False
1955 1955
1956 1956
1957 1957 class HasRepoGroupPermissionAllApi(_BaseApiPerm):
1958 1958 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1959 1959 user_group_name=None):
1960 1960 try:
1961 1961 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1962 1962 except KeyError:
1963 1963 log.warning(traceback.format_exc())
1964 1964 return False
1965 1965 if self.required_perms.issubset(_user_perms):
1966 1966 return True
1967 1967 return False
1968 1968
1969 1969
1970 1970 class HasUserGroupPermissionAnyApi(_BaseApiPerm):
1971 1971 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1972 1972 user_group_name=None):
1973 1973 try:
1974 1974 _user_perms = set([perm_defs['user_groups'][user_group_name]])
1975 1975 except KeyError:
1976 1976 log.warning(traceback.format_exc())
1977 1977 return False
1978 1978 if self.required_perms.intersection(_user_perms):
1979 1979 return True
1980 1980 return False
1981 1981
1982 1982
1983 1983 def check_ip_access(source_ip, allowed_ips=None):
1984 1984 """
1985 1985 Checks if source_ip is a subnet of any of allowed_ips.
1986 1986
1987 1987 :param source_ip:
1988 1988 :param allowed_ips: list of allowed ips together with mask
1989 1989 """
1990 1990 log.debug('checking if ip:%s is subnet of %s' % (source_ip, allowed_ips))
1991 1991 source_ip_address = ipaddress.ip_address(source_ip)
1992 1992 if isinstance(allowed_ips, (tuple, list, set)):
1993 1993 for ip in allowed_ips:
1994 1994 try:
1995 1995 network_address = ipaddress.ip_network(ip, strict=False)
1996 1996 if source_ip_address in network_address:
1997 1997 log.debug('IP %s is network %s' %
1998 1998 (source_ip_address, network_address))
1999 1999 return True
2000 2000 # for any case we cannot determine the IP, don't crash just
2001 2001 # skip it and log as error, we want to say forbidden still when
2002 2002 # sending bad IP
2003 2003 except Exception:
2004 2004 log.error(traceback.format_exc())
2005 2005 continue
2006 2006 return False
2007 2007
2008 2008
2009 2009 def get_cython_compat_decorator(wrapper, func):
2010 2010 """
2011 2011 Creates a cython compatible decorator. The previously used
2012 2012 decorator.decorator() function seems to be incompatible with cython.
2013 2013
2014 2014 :param wrapper: __wrapper method of the decorator class
2015 2015 :param func: decorated function
2016 2016 """
2017 2017 @wraps(func)
2018 2018 def local_wrapper(*args, **kwds):
2019 2019 return wrapper(func, *args, **kwds)
2020 2020 local_wrapper.__wrapped__ = func
2021 2021 return local_wrapper
2022 2022
2023 2023
@@ -1,682 +1,682 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 import rhodecode
25 25 from rhodecode.config.routing import ADMIN_PREFIX
26 26 from rhodecode.lib.utils2 import md5
27 27 from rhodecode.model.db import RhodeCodeUi
28 28 from rhodecode.model.meta import Session
29 29 from rhodecode.model.settings import SettingsModel, IssueTrackerSettingsModel
30 30 from rhodecode.tests import url, assert_session_flash
31 31 from rhodecode.tests.utils import AssertResponse
32 32
33 33
34 34 UPDATE_DATA_QUALNAME = (
35 35 'rhodecode.apps.admin.views.system_info.AdminSystemInfoSettingsView.get_update_data')
36 36
37 37
38 38 @pytest.mark.usefixtures('autologin_user', 'app')
39 39 class TestAdminSettingsController(object):
40 40
41 41 @pytest.mark.parametrize('urlname', [
42 42 'admin_settings_vcs',
43 43 'admin_settings_mapping',
44 44 'admin_settings_global',
45 45 'admin_settings_visual',
46 46 'admin_settings_email',
47 47 'admin_settings_hooks',
48 48 'admin_settings_search',
49 49 ])
50 50 def test_simple_get(self, urlname, app):
51 51 app.get(url(urlname))
52 52
53 53 def test_create_custom_hook(self, csrf_token):
54 54 response = self.app.post(
55 55 url('admin_settings_hooks'),
56 56 params={
57 57 'new_hook_ui_key': 'test_hooks_1',
58 58 'new_hook_ui_value': 'cd /tmp',
59 59 'csrf_token': csrf_token})
60 60
61 61 response = response.follow()
62 62 response.mustcontain('test_hooks_1')
63 63 response.mustcontain('cd /tmp')
64 64
65 65 def test_create_custom_hook_delete(self, csrf_token):
66 66 response = self.app.post(
67 67 url('admin_settings_hooks'),
68 68 params={
69 69 'new_hook_ui_key': 'test_hooks_2',
70 70 'new_hook_ui_value': 'cd /tmp2',
71 71 'csrf_token': csrf_token})
72 72
73 73 response = response.follow()
74 74 response.mustcontain('test_hooks_2')
75 75 response.mustcontain('cd /tmp2')
76 76
77 77 hook_id = SettingsModel().get_ui_by_key('test_hooks_2').ui_id
78 78
79 79 # delete
80 80 self.app.post(
81 81 url('admin_settings_hooks'),
82 82 params={'hook_id': hook_id, 'csrf_token': csrf_token})
83 83 response = self.app.get(url('admin_settings_hooks'))
84 84 response.mustcontain(no=['test_hooks_2'])
85 85 response.mustcontain(no=['cd /tmp2'])
86 86
87 87
88 88 @pytest.mark.usefixtures('autologin_user', 'app')
89 89 class TestAdminSettingsGlobal(object):
90 90
91 91 def test_pre_post_code_code_active(self, csrf_token):
92 92 pre_code = 'rc-pre-code-187652122'
93 93 post_code = 'rc-postcode-98165231'
94 94
95 95 response = self.post_and_verify_settings({
96 96 'rhodecode_pre_code': pre_code,
97 97 'rhodecode_post_code': post_code,
98 98 'csrf_token': csrf_token,
99 99 })
100 100
101 101 response = response.follow()
102 102 response.mustcontain(pre_code, post_code)
103 103
104 104 def test_pre_post_code_code_inactive(self, csrf_token):
105 105 pre_code = 'rc-pre-code-187652122'
106 106 post_code = 'rc-postcode-98165231'
107 107 response = self.post_and_verify_settings({
108 108 'rhodecode_pre_code': '',
109 109 'rhodecode_post_code': '',
110 110 'csrf_token': csrf_token,
111 111 })
112 112
113 113 response = response.follow()
114 114 response.mustcontain(no=[pre_code, post_code])
115 115
116 116 def test_captcha_activate(self, csrf_token):
117 117 self.post_and_verify_settings({
118 118 'rhodecode_captcha_private_key': '1234567890',
119 119 'rhodecode_captcha_public_key': '1234567890',
120 120 'csrf_token': csrf_token,
121 121 })
122 122
123 123 response = self.app.get(ADMIN_PREFIX + '/register')
124 124 response.mustcontain('captcha')
125 125
126 126 def test_captcha_deactivate(self, csrf_token):
127 127 self.post_and_verify_settings({
128 128 'rhodecode_captcha_private_key': '',
129 129 'rhodecode_captcha_public_key': '1234567890',
130 130 'csrf_token': csrf_token,
131 131 })
132 132
133 133 response = self.app.get(ADMIN_PREFIX + '/register')
134 134 response.mustcontain(no=['captcha'])
135 135
136 136 def test_title_change(self, csrf_token):
137 137 old_title = 'RhodeCode'
138 138 new_title = old_title + '_changed'
139 139
140 140 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
141 141 response = self.post_and_verify_settings({
142 142 'rhodecode_title': new_title,
143 143 'csrf_token': csrf_token,
144 144 })
145 145
146 146 response = response.follow()
147 147 response.mustcontain(
148 148 """<div class="branding">- %s</div>""" % new_title)
149 149
150 150 def post_and_verify_settings(self, settings):
151 151 old_title = 'RhodeCode'
152 152 old_realm = 'RhodeCode authentication'
153 153 params = {
154 154 'rhodecode_title': old_title,
155 155 'rhodecode_realm': old_realm,
156 156 'rhodecode_pre_code': '',
157 157 'rhodecode_post_code': '',
158 158 'rhodecode_captcha_private_key': '',
159 159 'rhodecode_captcha_public_key': '',
160 160 'rhodecode_create_personal_repo_group': False,
161 161 'rhodecode_personal_repo_group_pattern': '${username}',
162 162 }
163 163 params.update(settings)
164 164 response = self.app.post(url('admin_settings_global'), params=params)
165 165
166 166 assert_session_flash(response, 'Updated application settings')
167 167 app_settings = SettingsModel().get_all_settings()
168 168 del settings['csrf_token']
169 169 for key, value in settings.iteritems():
170 170 assert app_settings[key] == value.decode('utf-8')
171 171
172 172 return response
173 173
174 174
175 175 @pytest.mark.usefixtures('autologin_user', 'app')
176 176 class TestAdminSettingsVcs(object):
177 177
178 178 def test_contains_svn_default_patterns(self, app):
179 179 response = app.get(url('admin_settings_vcs'))
180 180 expected_patterns = [
181 181 '/trunk',
182 182 '/branches/*',
183 183 '/tags/*',
184 184 ]
185 185 for pattern in expected_patterns:
186 186 response.mustcontain(pattern)
187 187
188 188 def test_add_new_svn_branch_and_tag_pattern(
189 189 self, app, backend_svn, form_defaults, disable_sql_cache,
190 190 csrf_token):
191 191 form_defaults.update({
192 192 'new_svn_branch': '/exp/branches/*',
193 193 'new_svn_tag': '/important_tags/*',
194 194 'csrf_token': csrf_token,
195 195 })
196 196
197 197 response = app.post(
198 198 url('admin_settings_vcs'), params=form_defaults, status=302)
199 199 response = response.follow()
200 200
201 201 # Expect to find the new values on the page
202 202 response.mustcontain('/exp/branches/*')
203 203 response.mustcontain('/important_tags/*')
204 204
205 205 # Expect that those patterns are used to match branches and tags now
206 206 repo = backend_svn['svn-simple-layout'].scm_instance()
207 207 assert 'exp/branches/exp-sphinx-docs' in repo.branches
208 208 assert 'important_tags/v0.5' in repo.tags
209 209
210 210 def test_add_same_svn_value_twice_shows_an_error_message(
211 211 self, app, form_defaults, csrf_token, settings_util):
212 212 settings_util.create_rhodecode_ui('vcs_svn_branch', '/test')
213 213 settings_util.create_rhodecode_ui('vcs_svn_tag', '/test')
214 214
215 215 response = app.post(
216 216 url('admin_settings_vcs'),
217 217 params={
218 218 'paths_root_path': form_defaults['paths_root_path'],
219 219 'new_svn_branch': '/test',
220 220 'new_svn_tag': '/test',
221 221 'csrf_token': csrf_token,
222 222 },
223 223 status=200)
224 224
225 225 response.mustcontain("Pattern already exists")
226 226 response.mustcontain("Some form inputs contain invalid data.")
227 227
228 228 @pytest.mark.parametrize('section', [
229 229 'vcs_svn_branch',
230 230 'vcs_svn_tag',
231 231 ])
232 232 def test_delete_svn_patterns(
233 233 self, section, app, csrf_token, settings_util):
234 234 setting = settings_util.create_rhodecode_ui(
235 235 section, '/test_delete', cleanup=False)
236 236
237 237 app.post(
238 238 url('admin_settings_vcs'),
239 239 params={
240 240 '_method': 'delete',
241 241 'delete_svn_pattern': setting.ui_id,
242 242 'csrf_token': csrf_token},
243 243 headers={'X-REQUESTED-WITH': 'XMLHttpRequest'})
244 244
245 245 @pytest.mark.parametrize('section', [
246 246 'vcs_svn_branch',
247 247 'vcs_svn_tag',
248 248 ])
249 249 def test_delete_svn_patterns_raises_400_when_no_xhr(
250 250 self, section, app, csrf_token, settings_util):
251 251 setting = settings_util.create_rhodecode_ui(section, '/test_delete')
252 252
253 253 app.post(
254 254 url('admin_settings_vcs'),
255 255 params={
256 256 '_method': 'delete',
257 257 'delete_svn_pattern': setting.ui_id,
258 258 'csrf_token': csrf_token},
259 259 status=400)
260 260
261 261 def test_extensions_hgsubversion(self, app, form_defaults, csrf_token):
262 262 form_defaults.update({
263 263 'csrf_token': csrf_token,
264 264 'extensions_hgsubversion': 'True',
265 265 })
266 266 response = app.post(
267 267 url('admin_settings_vcs'),
268 268 params=form_defaults,
269 269 status=302)
270 270
271 271 response = response.follow()
272 272 extensions_input = (
273 273 '<input id="extensions_hgsubversion" '
274 274 'name="extensions_hgsubversion" type="checkbox" '
275 275 'value="True" checked="checked" />')
276 276 response.mustcontain(extensions_input)
277 277
278 278 def test_extensions_hgevolve(self, app, form_defaults, csrf_token):
279 279 form_defaults.update({
280 280 'csrf_token': csrf_token,
281 281 'extensions_evolve': 'True',
282 282 })
283 283 response = app.post(
284 284 url('admin_settings_vcs'),
285 285 params=form_defaults,
286 286 status=302)
287 287
288 288 response = response.follow()
289 289 extensions_input = (
290 290 '<input id="extensions_evolve" '
291 291 'name="extensions_evolve" type="checkbox" '
292 292 'value="True" checked="checked" />')
293 293 response.mustcontain(extensions_input)
294 294
295 295 def test_has_a_section_for_pull_request_settings(self, app):
296 296 response = app.get(url('admin_settings_vcs'))
297 297 response.mustcontain('Pull Request Settings')
298 298
299 299 def test_has_an_input_for_invalidation_of_inline_comments(
300 300 self, app):
301 301 response = app.get(url('admin_settings_vcs'))
302 302 assert_response = AssertResponse(response)
303 303 assert_response.one_element_exists(
304 304 '[name=rhodecode_use_outdated_comments]')
305 305
306 306 @pytest.mark.parametrize('new_value', [True, False])
307 307 def test_allows_to_change_invalidation_of_inline_comments(
308 308 self, app, form_defaults, csrf_token, new_value):
309 309 setting_key = 'use_outdated_comments'
310 310 setting = SettingsModel().create_or_update_setting(
311 311 setting_key, not new_value, 'bool')
312 312 Session().add(setting)
313 313 Session().commit()
314 314
315 315 form_defaults.update({
316 316 'csrf_token': csrf_token,
317 317 'rhodecode_use_outdated_comments': str(new_value),
318 318 })
319 319 response = app.post(
320 320 url('admin_settings_vcs'),
321 321 params=form_defaults,
322 322 status=302)
323 323 response = response.follow()
324 324 setting = SettingsModel().get_setting_by_name(setting_key)
325 325 assert setting.app_settings_value is new_value
326 326
327 327 def test_has_a_section_for_labs_settings_if_enabled(self, app):
328 328 with mock.patch.dict(
329 329 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
330 330 response = self.app.get(url('admin_settings_vcs'))
331 331 response.mustcontain('Labs Settings')
332 332
333 333 def test_has_not_a_section_for_labs_settings_if_disables(self, app):
334 334 with mock.patch.dict(
335 335 rhodecode.CONFIG, {'labs_settings_active': 'false'}):
336 336 response = self.app.get(url('admin_settings_vcs'))
337 337 response.mustcontain(no='Labs Settings')
338 338
339 339 @pytest.mark.parametrize('new_value', [True, False])
340 340 def test_allows_to_change_hg_rebase_merge_strategy(
341 341 self, app, form_defaults, csrf_token, new_value):
342 342 setting_key = 'hg_use_rebase_for_merging'
343 343
344 344 form_defaults.update({
345 345 'csrf_token': csrf_token,
346 346 'rhodecode_' + setting_key: str(new_value),
347 347 })
348 348
349 349 with mock.patch.dict(
350 350 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
351 351 app.post(
352 352 url('admin_settings_vcs'),
353 353 params=form_defaults,
354 354 status=302)
355 355
356 356 setting = SettingsModel().get_setting_by_name(setting_key)
357 357 assert setting.app_settings_value is new_value
358 358
359 359 @pytest.fixture
360 360 def disable_sql_cache(self, request):
361 361 patcher = mock.patch(
362 362 'rhodecode.lib.caching_query.FromCache.process_query')
363 363 request.addfinalizer(patcher.stop)
364 364 patcher.start()
365 365
366 366 @pytest.fixture
367 367 def form_defaults(self):
368 368 from rhodecode.controllers.admin.settings import SettingsController
369 369 controller = SettingsController()
370 370 return controller._form_defaults()
371 371
372 372 # TODO: johbo: What we really want is to checkpoint before a test run and
373 373 # reset the session afterwards.
374 374 @pytest.fixture(scope='class', autouse=True)
375 375 def cleanup_settings(self, request, pylonsapp):
376 376 ui_id = RhodeCodeUi.ui_id
377 377 original_ids = list(
378 378 r.ui_id for r in RhodeCodeUi.query().values(ui_id))
379 379
380 380 @request.addfinalizer
381 381 def cleanup():
382 382 RhodeCodeUi.query().filter(
383 383 ui_id.notin_(original_ids)).delete(False)
384 384
385 385
386 386 @pytest.mark.usefixtures('autologin_user', 'app')
387 387 class TestLabsSettings(object):
388 388 def test_get_settings_page_disabled(self):
389 389 with mock.patch.dict(rhodecode.CONFIG,
390 390 {'labs_settings_active': 'false'}):
391 391 response = self.app.get(url('admin_settings_labs'), status=302)
392 392
393 393 assert response.location.endswith(url('admin_settings'))
394 394
395 395 def test_get_settings_page_enabled(self):
396 396 from rhodecode.controllers.admin import settings
397 397 lab_settings = [
398 398 settings.LabSetting(
399 399 key='rhodecode_bool',
400 400 type='bool',
401 401 group='bool group',
402 402 label='bool label',
403 403 help='bool help'
404 404 ),
405 405 settings.LabSetting(
406 406 key='rhodecode_text',
407 407 type='unicode',
408 408 group='text group',
409 409 label='text label',
410 410 help='text help'
411 411 ),
412 412 ]
413 413 with mock.patch.dict(rhodecode.CONFIG,
414 414 {'labs_settings_active': 'true'}):
415 415 with mock.patch.object(settings, '_LAB_SETTINGS', lab_settings):
416 416 response = self.app.get(url('admin_settings_labs'))
417 417
418 418 assert '<label>bool group:</label>' in response
419 419 assert '<label for="rhodecode_bool">bool label</label>' in response
420 420 assert '<p class="help-block">bool help</p>' in response
421 421 assert 'name="rhodecode_bool" type="checkbox"' in response
422 422
423 423 assert '<label>text group:</label>' in response
424 424 assert '<label for="rhodecode_text">text label</label>' in response
425 425 assert '<p class="help-block">text help</p>' in response
426 426 assert 'name="rhodecode_text" size="60" type="text"' in response
427 427
428 428
429 429 @pytest.mark.usefixtures('app')
430 430 class TestOpenSourceLicenses(object):
431 431
432 432 def _get_url(self):
433 433 return ADMIN_PREFIX + '/settings/open_source'
434 434
435 435 def test_records_are_displayed(self, autologin_user):
436 436 sample_licenses = {
437 437 "python2.7-pytest-2.7.1": {
438 438 "UNKNOWN": None
439 439 },
440 440 "python2.7-Markdown-2.6.2": {
441 441 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
442 442 }
443 443 }
444 444 read_licenses_patch = mock.patch(
445 445 'rhodecode.apps.admin.views.open_source_licenses.read_opensource_licenses',
446 446 return_value=sample_licenses)
447 447 with read_licenses_patch:
448 448 response = self.app.get(self._get_url(), status=200)
449 449
450 450 assert_response = AssertResponse(response)
451 451 assert_response.element_contains(
452 452 '.panel-heading', 'Licenses of Third Party Packages')
453 453 for name in sample_licenses:
454 454 response.mustcontain(name)
455 455 for license in sample_licenses[name]:
456 456 assert_response.element_contains('.panel-body', license)
457 457
458 458 def test_records_can_be_read(self, autologin_user):
459 459 response = self.app.get(self._get_url(), status=200)
460 460 assert_response = AssertResponse(response)
461 461 assert_response.element_contains(
462 462 '.panel-heading', 'Licenses of Third Party Packages')
463 463
464 464 def test_forbidden_when_normal_user(self, autologin_regular_user):
465 self.app.get(self._get_url(), status=403)
465 self.app.get(self._get_url(), status=404)
466 466
467 467
468 468 @pytest.mark.usefixtures('app')
469 469 class TestUserSessions(object):
470 470
471 471 def _get_url(self, name='admin_settings_sessions'):
472 472 return {
473 473 'admin_settings_sessions': ADMIN_PREFIX + '/settings/sessions',
474 474 'admin_settings_sessions_cleanup': ADMIN_PREFIX + '/settings/sessions/cleanup'
475 475 }[name]
476 476
477 477 def test_forbidden_when_normal_user(self, autologin_regular_user):
478 self.app.get(self._get_url(), status=403)
478 self.app.get(self._get_url(), status=404)
479 479
480 480 def test_show_sessions_page(self, autologin_user):
481 481 response = self.app.get(self._get_url(), status=200)
482 482 response.mustcontain('file')
483 483
484 484 def test_cleanup_old_sessions(self, autologin_user, csrf_token):
485 485
486 486 post_data = {
487 487 'csrf_token': csrf_token,
488 488 'expire_days': '60'
489 489 }
490 490 response = self.app.post(
491 491 self._get_url('admin_settings_sessions_cleanup'), params=post_data,
492 492 status=302)
493 493 assert_session_flash(response, 'Cleaned up old sessions')
494 494
495 495
496 496 @pytest.mark.usefixtures('app')
497 497 class TestAdminSystemInfo(object):
498 498 def _get_url(self, name='admin_settings_system'):
499 499 return {
500 500 'admin_settings_system': ADMIN_PREFIX + '/settings/system',
501 501 'admin_settings_system_update': ADMIN_PREFIX + '/settings/system/updates',
502 502 }[name]
503 503
504 504 def test_forbidden_when_normal_user(self, autologin_regular_user):
505 self.app.get(self._get_url(), status=403)
505 self.app.get(self._get_url(), status=404)
506 506
507 507 def test_system_info_page(self, autologin_user):
508 508 response = self.app.get(self._get_url())
509 509 response.mustcontain('RhodeCode Community Edition, version {}'.format(
510 510 rhodecode.__version__))
511 511
512 512 def test_system_update_new_version(self, autologin_user):
513 513 update_data = {
514 514 'versions': [
515 515 {
516 516 'version': '100.3.1415926535',
517 517 'general': 'The latest version we are ever going to ship'
518 518 },
519 519 {
520 520 'version': '0.0.0',
521 521 'general': 'The first version we ever shipped'
522 522 }
523 523 ]
524 524 }
525 525 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
526 526 response = self.app.get(self._get_url('admin_settings_system_update'))
527 527 response.mustcontain('A <b>new version</b> is available')
528 528
529 529 def test_system_update_nothing_new(self, autologin_user):
530 530 update_data = {
531 531 'versions': [
532 532 {
533 533 'version': '0.0.0',
534 534 'general': 'The first version we ever shipped'
535 535 }
536 536 ]
537 537 }
538 538 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
539 539 response = self.app.get(self._get_url('admin_settings_system_update'))
540 540 response.mustcontain(
541 541 'You already have the <b>latest</b> stable version.')
542 542
543 543 def test_system_update_bad_response(self, autologin_user):
544 544 with mock.patch(UPDATE_DATA_QUALNAME, side_effect=ValueError('foo')):
545 545 response = self.app.get(self._get_url('admin_settings_system_update'))
546 546 response.mustcontain(
547 547 'Bad data sent from update server')
548 548
549 549
550 550 @pytest.mark.usefixtures("app")
551 551 class TestAdminSettingsIssueTracker(object):
552 552 RC_PREFIX = 'rhodecode_'
553 553 SHORT_PATTERN_KEY = 'issuetracker_pat_'
554 554 PATTERN_KEY = RC_PREFIX + SHORT_PATTERN_KEY
555 555
556 556 def test_issuetracker_index(self, autologin_user):
557 557 response = self.app.get(url('admin_settings_issuetracker'))
558 558 assert response.status_code == 200
559 559
560 560 def test_add_empty_issuetracker_pattern(
561 561 self, request, autologin_user, csrf_token):
562 562 post_url = url('admin_settings_issuetracker_save')
563 563 post_data = {
564 564 'csrf_token': csrf_token
565 565 }
566 566 self.app.post(post_url, post_data, status=302)
567 567
568 568 def test_add_issuetracker_pattern(
569 569 self, request, autologin_user, csrf_token):
570 570 pattern = 'issuetracker_pat'
571 571 another_pattern = pattern+'1'
572 572 post_url = url('admin_settings_issuetracker_save')
573 573 post_data = {
574 574 'new_pattern_pattern_0': pattern,
575 575 'new_pattern_url_0': 'url',
576 576 'new_pattern_prefix_0': 'prefix',
577 577 'new_pattern_description_0': 'description',
578 578 'new_pattern_pattern_1': another_pattern,
579 579 'new_pattern_url_1': 'url1',
580 580 'new_pattern_prefix_1': 'prefix1',
581 581 'new_pattern_description_1': 'description1',
582 582 'csrf_token': csrf_token
583 583 }
584 584 self.app.post(post_url, post_data, status=302)
585 585 settings = SettingsModel().get_all_settings()
586 586 self.uid = md5(pattern)
587 587 assert settings[self.PATTERN_KEY+self.uid] == pattern
588 588 self.another_uid = md5(another_pattern)
589 589 assert settings[self.PATTERN_KEY+self.another_uid] == another_pattern
590 590
591 591 @request.addfinalizer
592 592 def cleanup():
593 593 defaults = SettingsModel().get_all_settings()
594 594
595 595 entries = [name for name in defaults if (
596 596 (self.uid in name) or (self.another_uid) in name)]
597 597 start = len(self.RC_PREFIX)
598 598 for del_key in entries:
599 599 # TODO: anderson: get_by_name needs name without prefix
600 600 entry = SettingsModel().get_setting_by_name(del_key[start:])
601 601 Session().delete(entry)
602 602
603 603 Session().commit()
604 604
605 605 def test_edit_issuetracker_pattern(
606 606 self, autologin_user, backend, csrf_token, request):
607 607 old_pattern = 'issuetracker_pat'
608 608 old_uid = md5(old_pattern)
609 609 pattern = 'issuetracker_pat_new'
610 610 self.new_uid = md5(pattern)
611 611
612 612 SettingsModel().create_or_update_setting(
613 613 self.SHORT_PATTERN_KEY+old_uid, old_pattern, 'unicode')
614 614
615 615 post_url = url('admin_settings_issuetracker_save')
616 616 post_data = {
617 617 'new_pattern_pattern_0': pattern,
618 618 'new_pattern_url_0': 'url',
619 619 'new_pattern_prefix_0': 'prefix',
620 620 'new_pattern_description_0': 'description',
621 621 'uid': old_uid,
622 622 'csrf_token': csrf_token
623 623 }
624 624 self.app.post(post_url, post_data, status=302)
625 625 settings = SettingsModel().get_all_settings()
626 626 assert settings[self.PATTERN_KEY+self.new_uid] == pattern
627 627 assert self.PATTERN_KEY+old_uid not in settings
628 628
629 629 @request.addfinalizer
630 630 def cleanup():
631 631 IssueTrackerSettingsModel().delete_entries(self.new_uid)
632 632
633 633 def test_replace_issuetracker_pattern_description(
634 634 self, autologin_user, csrf_token, request, settings_util):
635 635 prefix = 'issuetracker'
636 636 pattern = 'issuetracker_pat'
637 637 self.uid = md5(pattern)
638 638 pattern_key = '_'.join([prefix, 'pat', self.uid])
639 639 rc_pattern_key = '_'.join(['rhodecode', pattern_key])
640 640 desc_key = '_'.join([prefix, 'desc', self.uid])
641 641 rc_desc_key = '_'.join(['rhodecode', desc_key])
642 642 new_description = 'new_description'
643 643
644 644 settings_util.create_rhodecode_setting(
645 645 pattern_key, pattern, 'unicode', cleanup=False)
646 646 settings_util.create_rhodecode_setting(
647 647 desc_key, 'old description', 'unicode', cleanup=False)
648 648
649 649 post_url = url('admin_settings_issuetracker_save')
650 650 post_data = {
651 651 'new_pattern_pattern_0': pattern,
652 652 'new_pattern_url_0': 'url',
653 653 'new_pattern_prefix_0': 'prefix',
654 654 'new_pattern_description_0': new_description,
655 655 'uid': self.uid,
656 656 'csrf_token': csrf_token
657 657 }
658 658 self.app.post(post_url, post_data, status=302)
659 659 settings = SettingsModel().get_all_settings()
660 660 assert settings[rc_pattern_key] == pattern
661 661 assert settings[rc_desc_key] == new_description
662 662
663 663 @request.addfinalizer
664 664 def cleanup():
665 665 IssueTrackerSettingsModel().delete_entries(self.uid)
666 666
667 667 def test_delete_issuetracker_pattern(
668 668 self, autologin_user, backend, csrf_token, settings_util):
669 669 pattern = 'issuetracker_pat'
670 670 uid = md5(pattern)
671 671 settings_util.create_rhodecode_setting(
672 672 self.SHORT_PATTERN_KEY+uid, pattern, 'unicode', cleanup=False)
673 673
674 674 post_url = url('admin_issuetracker_delete')
675 675 post_data = {
676 676 '_method': 'delete',
677 677 'uid': uid,
678 678 'csrf_token': csrf_token
679 679 }
680 680 self.app.post(post_url, post_data, status=302)
681 681 settings = SettingsModel().get_all_settings()
682 682 assert 'rhodecode_%s%s' % (self.SHORT_PATTERN_KEY, uid) not in settings
@@ -1,273 +1,273 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.tests import *
24 24 from rhodecode.tests.fixture import Fixture
25 25 from rhodecode.lib import helpers as h
26 26
27 27 from rhodecode.model.db import Repository
28 28 from rhodecode.model.repo import RepoModel
29 29 from rhodecode.model.user import UserModel
30 30 from rhodecode.model.meta import Session
31 31
32 32 fixture = Fixture()
33 33
34 34
35 35 class _BaseTest(TestController):
36 36
37 37 REPO = None
38 38 REPO_TYPE = None
39 39 NEW_REPO = None
40 40 REPO_FORK = None
41 41
42 42 @pytest.fixture(autouse=True)
43 43 def prepare(self, request, pylonsapp):
44 44 self.username = u'forkuser'
45 45 self.password = u'qweqwe'
46 46 self.u1 = fixture.create_user(self.username, password=self.password,
47 47 email=u'fork_king@rhodecode.org')
48 48 Session().commit()
49 49 self.u1id = self.u1.user_id
50 50 request.addfinalizer(self.cleanup)
51 51
52 52 def cleanup(self):
53 53 u1 = UserModel().get(self.u1id)
54 54 Session().delete(u1)
55 55 Session().commit()
56 56
57 57 def test_index(self):
58 58 self.log_user()
59 59 repo_name = self.REPO
60 60 response = self.app.get(url(controller='forks', action='forks',
61 61 repo_name=repo_name))
62 62
63 63 response.mustcontain("""There are no forks yet""")
64 64
65 65 def test_no_permissions_to_fork(self):
66 66 usr = self.log_user(TEST_USER_REGULAR_LOGIN,
67 67 TEST_USER_REGULAR_PASS)['user_id']
68 68 user_model = UserModel()
69 69 user_model.revoke_perm(usr, 'hg.fork.repository')
70 70 user_model.grant_perm(usr, 'hg.fork.none')
71 71 u = UserModel().get(usr)
72 72 u.inherit_default_permissions = False
73 73 Session().commit()
74 74 # try create a fork
75 75 repo_name = self.REPO
76 76 self.app.post(
77 77 url(controller='forks', action='fork_create', repo_name=repo_name),
78 {'csrf_token': self.csrf_token}, status=403)
78 {'csrf_token': self.csrf_token}, status=404)
79 79
80 80 def test_index_with_fork(self):
81 81 self.log_user()
82 82
83 83 # create a fork
84 84 fork_name = self.REPO_FORK
85 85 description = 'fork of vcs test'
86 86 repo_name = self.REPO
87 87 source_repo = Repository.get_by_repo_name(repo_name)
88 88 creation_args = {
89 89 'repo_name': fork_name,
90 90 'repo_group': '',
91 91 'fork_parent_id': source_repo.repo_id,
92 92 'repo_type': self.REPO_TYPE,
93 93 'description': description,
94 94 'private': 'False',
95 95 'landing_rev': 'rev:tip',
96 96 'csrf_token': self.csrf_token,
97 97 }
98 98
99 99 self.app.post(url(controller='forks', action='fork_create',
100 100 repo_name=repo_name), creation_args)
101 101
102 102 response = self.app.get(url(controller='forks', action='forks',
103 103 repo_name=repo_name))
104 104
105 105 response.mustcontain(
106 106 """<a href="/%s">%s</a>""" % (fork_name, fork_name)
107 107 )
108 108
109 109 # remove this fork
110 110 fixture.destroy_repo(fork_name)
111 111
112 112 def test_fork_create_into_group(self):
113 113 self.log_user()
114 114 group = fixture.create_repo_group('vc')
115 115 group_id = group.group_id
116 116 fork_name = self.REPO_FORK
117 117 fork_name_full = 'vc/%s' % fork_name
118 118 description = 'fork of vcs test'
119 119 repo_name = self.REPO
120 120 source_repo = Repository.get_by_repo_name(repo_name)
121 121 creation_args = {
122 122 'repo_name': fork_name,
123 123 'repo_group': group_id,
124 124 'fork_parent_id': source_repo.repo_id,
125 125 'repo_type': self.REPO_TYPE,
126 126 'description': description,
127 127 'private': 'False',
128 128 'landing_rev': 'rev:tip',
129 129 'csrf_token': self.csrf_token,
130 130 }
131 131 self.app.post(url(controller='forks', action='fork_create',
132 132 repo_name=repo_name), creation_args)
133 133 repo = Repository.get_by_repo_name(fork_name_full)
134 134 assert repo.fork.repo_name == self.REPO
135 135
136 136 # run the check page that triggers the flash message
137 137 response = self.app.get(url('repo_check_home', repo_name=fork_name_full))
138 138 # test if we have a message that fork is ok
139 139 assert_session_flash(response,
140 140 'Forked repository %s as <a href="/%s">%s</a>'
141 141 % (repo_name, fork_name_full, fork_name_full))
142 142
143 143 # test if the fork was created in the database
144 144 fork_repo = Session().query(Repository)\
145 145 .filter(Repository.repo_name == fork_name_full).one()
146 146
147 147 assert fork_repo.repo_name == fork_name_full
148 148 assert fork_repo.fork.repo_name == repo_name
149 149
150 150 # test if the repository is visible in the list ?
151 151 response = self.app.get(h.route_path('repo_summary', repo_name=fork_name_full))
152 152 response.mustcontain(fork_name_full)
153 153 response.mustcontain(self.REPO_TYPE)
154 154
155 155 response.mustcontain('Fork of')
156 156 response.mustcontain('<a href="/%s">%s</a>' % (repo_name, repo_name))
157 157
158 158 fixture.destroy_repo(fork_name_full)
159 159 fixture.destroy_repo_group(group_id)
160 160
161 161 def test_z_fork_create(self):
162 162 self.log_user()
163 163 fork_name = self.REPO_FORK
164 164 description = 'fork of vcs test'
165 165 repo_name = self.REPO
166 166 source_repo = Repository.get_by_repo_name(repo_name)
167 167 creation_args = {
168 168 'repo_name': fork_name,
169 169 'repo_group': '',
170 170 'fork_parent_id': source_repo.repo_id,
171 171 'repo_type': self.REPO_TYPE,
172 172 'description': description,
173 173 'private': 'False',
174 174 'landing_rev': 'rev:tip',
175 175 'csrf_token': self.csrf_token,
176 176 }
177 177 self.app.post(url(controller='forks', action='fork_create',
178 178 repo_name=repo_name), creation_args)
179 179 repo = Repository.get_by_repo_name(self.REPO_FORK)
180 180 assert repo.fork.repo_name == self.REPO
181 181
182 182 # run the check page that triggers the flash message
183 183 response = self.app.get(url('repo_check_home', repo_name=fork_name))
184 184 # test if we have a message that fork is ok
185 185 assert_session_flash(response,
186 186 'Forked repository %s as <a href="/%s">%s</a>'
187 187 % (repo_name, fork_name, fork_name))
188 188
189 189 # test if the fork was created in the database
190 190 fork_repo = Session().query(Repository)\
191 191 .filter(Repository.repo_name == fork_name).one()
192 192
193 193 assert fork_repo.repo_name == fork_name
194 194 assert fork_repo.fork.repo_name == repo_name
195 195
196 196 # test if the repository is visible in the list ?
197 197 response = self.app.get(h.route_path('repo_summary', repo_name=fork_name))
198 198 response.mustcontain(fork_name)
199 199 response.mustcontain(self.REPO_TYPE)
200 200 response.mustcontain('Fork of')
201 201 response.mustcontain('<a href="/%s">%s</a>' % (repo_name, repo_name))
202 202
203 203 def test_zz_fork_permission_page(self):
204 204 usr = self.log_user(self.username, self.password)['user_id']
205 205 repo_name = self.REPO
206 206
207 207 forks = Repository.query()\
208 208 .filter(Repository.repo_type == self.REPO_TYPE)\
209 209 .filter(Repository.fork_id != None).all()
210 210 assert 1 == len(forks)
211 211
212 212 # set read permissions for this
213 213 RepoModel().grant_user_permission(repo=forks[0],
214 214 user=usr,
215 215 perm='repository.read')
216 216 Session().commit()
217 217
218 218 response = self.app.get(url(controller='forks', action='forks',
219 219 repo_name=repo_name))
220 220
221 221 response.mustcontain('fork of vcs test')
222 222
223 223 def test_zzz_fork_permission_page(self):
224 224 usr = self.log_user(self.username, self.password)['user_id']
225 225 repo_name = self.REPO
226 226
227 227 forks = Repository.query()\
228 228 .filter(Repository.repo_type == self.REPO_TYPE)\
229 229 .filter(Repository.fork_id != None).all()
230 230 assert 1 == len(forks)
231 231
232 232 # set none
233 233 RepoModel().grant_user_permission(repo=forks[0],
234 234 user=usr, perm='repository.none')
235 235 Session().commit()
236 236 # fork shouldn't be there
237 237 response = self.app.get(url(controller='forks', action='forks',
238 238 repo_name=repo_name))
239 239 response.mustcontain('There are no forks yet')
240 240
241 241
242 242 class TestGIT(_BaseTest):
243 243 REPO = GIT_REPO
244 244 NEW_REPO = NEW_GIT_REPO
245 245 REPO_TYPE = 'git'
246 246 REPO_FORK = GIT_FORK
247 247
248 248
249 249 class TestHG(_BaseTest):
250 250 REPO = HG_REPO
251 251 NEW_REPO = NEW_HG_REPO
252 252 REPO_TYPE = 'hg'
253 253 REPO_FORK = HG_FORK
254 254
255 255
256 256 @pytest.mark.usefixtures('app', 'autologin_user')
257 257 @pytest.mark.skip_backends('git','hg')
258 258 class TestSVNFork(object):
259 259
260 260 def test_fork_redirects(self, backend):
261 261 denied_actions = ['fork','fork_create']
262 262 for action in denied_actions:
263 263 response = self.app.get(url(
264 264 controller='forks', action=action,
265 265 repo_name=backend.repo_name))
266 266 assert response.status_int == 302
267 267
268 268 # Not allowed, redirect to the summary
269 269 redirected = response.follow()
270 270 summary_url = h.route_path('repo_summary', repo_name=backend.repo_name)
271 271
272 272 # URL adds leading slash and path doesn't have it
273 273 assert redirected.request.path == summary_url
@@ -1,264 +1,264 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23 from webob.exc import HTTPNotFound
24 24
25 25 import rhodecode
26 26 from rhodecode.model.db import Integration
27 27 from rhodecode.model.meta import Session
28 28 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
29 29 from rhodecode.tests.utils import AssertResponse
30 30 from rhodecode.integrations import integration_type_registry
31 31 from rhodecode.config.routing import ADMIN_PREFIX
32 32
33 33
34 34 @pytest.mark.usefixtures('app', 'autologin_user')
35 35 class TestIntegrationsView(object):
36 36 pass
37 37
38 38
39 39 class TestGlobalIntegrationsView(TestIntegrationsView):
40 40 def test_index_no_integrations(self, app):
41 41 url = ADMIN_PREFIX + '/integrations'
42 42 response = app.get(url)
43 43
44 44 assert response.status_code == 200
45 45 assert 'exist yet' in response.body
46 46
47 47 def test_index_with_integrations(self, app, global_integration_stub):
48 48 url = ADMIN_PREFIX + '/integrations'
49 49 response = app.get(url)
50 50
51 51 assert response.status_code == 200
52 52 assert 'exist yet' not in response.body
53 53 assert global_integration_stub.name in response.body
54 54
55 55 def test_new_integration_page(self, app):
56 56 url = ADMIN_PREFIX + '/integrations/new'
57 57
58 58 response = app.get(url)
59 59
60 60 assert response.status_code == 200
61 61
62 62 for integration_key in integration_type_registry:
63 63 nurl = (ADMIN_PREFIX + '/integrations/{integration}/new').format(
64 64 integration=integration_key)
65 65 assert nurl in response.body
66 66
67 67 @pytest.mark.parametrize(
68 68 'IntegrationType', integration_type_registry.values())
69 69 def test_get_create_integration_page(self, app, IntegrationType):
70 70 url = ADMIN_PREFIX + '/integrations/{integration_key}/new'.format(
71 71 integration_key=IntegrationType.key)
72 72
73 73 response = app.get(url)
74 74
75 75 assert response.status_code == 200
76 76 assert IntegrationType.display_name in response.body
77 77
78 78 def test_post_integration_page(self, app, StubIntegrationType, csrf_token,
79 79 test_repo_group, backend_random):
80 80 url = ADMIN_PREFIX + '/integrations/{integration_key}/new'.format(
81 81 integration_key=StubIntegrationType.key)
82 82
83 83 _post_integration_test_helper(app, url, csrf_token, admin_view=True,
84 84 repo=backend_random.repo, repo_group=test_repo_group)
85 85
86 86
87 87 class TestRepoGroupIntegrationsView(TestIntegrationsView):
88 88 def test_index_no_integrations(self, app, test_repo_group):
89 89 url = '/{repo_group_name}/settings/integrations'.format(
90 90 repo_group_name=test_repo_group.group_name)
91 91 response = app.get(url)
92 92
93 93 assert response.status_code == 200
94 94 assert 'exist yet' in response.body
95 95
96 96 def test_index_with_integrations(self, app, test_repo_group,
97 97 repogroup_integration_stub):
98 98 url = '/{repo_group_name}/settings/integrations'.format(
99 99 repo_group_name=test_repo_group.group_name)
100 100
101 101 stub_name = repogroup_integration_stub.name
102 102 response = app.get(url)
103 103
104 104 assert response.status_code == 200
105 105 assert 'exist yet' not in response.body
106 106 assert stub_name in response.body
107 107
108 108 def test_new_integration_page(self, app, test_repo_group):
109 109 repo_group_name = test_repo_group.group_name
110 110 url = '/{repo_group_name}/settings/integrations/new'.format(
111 111 repo_group_name=test_repo_group.group_name)
112 112
113 113 response = app.get(url)
114 114
115 115 assert response.status_code == 200
116 116
117 117 for integration_key in integration_type_registry:
118 118 nurl = ('/{repo_group_name}/settings/integrations'
119 119 '/{integration}/new').format(
120 120 repo_group_name=repo_group_name,
121 121 integration=integration_key)
122 122
123 123 assert nurl in response.body
124 124
125 125 @pytest.mark.parametrize(
126 126 'IntegrationType', integration_type_registry.values())
127 127 def test_get_create_integration_page(self, app, test_repo_group,
128 128 IntegrationType):
129 129 repo_group_name = test_repo_group.group_name
130 130 url = ('/{repo_group_name}/settings/integrations/{integration_key}/new'
131 131 ).format(repo_group_name=repo_group_name,
132 132 integration_key=IntegrationType.key)
133 133
134 134 response = app.get(url)
135 135
136 136 assert response.status_code == 200
137 137 assert IntegrationType.display_name in response.body
138 138
139 139 def test_post_integration_page(self, app, test_repo_group, backend_random,
140 140 StubIntegrationType, csrf_token):
141 141 repo_group_name = test_repo_group.group_name
142 142 url = ('/{repo_group_name}/settings/integrations/{integration_key}/new'
143 143 ).format(repo_group_name=repo_group_name,
144 144 integration_key=StubIntegrationType.key)
145 145
146 146 _post_integration_test_helper(app, url, csrf_token, admin_view=False,
147 147 repo=backend_random.repo, repo_group=test_repo_group)
148 148
149 149
150 150 class TestRepoIntegrationsView(TestIntegrationsView):
151 151 def test_index_no_integrations(self, app, backend_random):
152 152 url = '/{repo_name}/settings/integrations'.format(
153 153 repo_name=backend_random.repo.repo_name)
154 154 response = app.get(url)
155 155
156 156 assert response.status_code == 200
157 157 assert 'exist yet' in response.body
158 158
159 159 def test_index_with_integrations(self, app, repo_integration_stub):
160 160 url = '/{repo_name}/settings/integrations'.format(
161 161 repo_name=repo_integration_stub.repo.repo_name)
162 162 stub_name = repo_integration_stub.name
163 163
164 164 response = app.get(url)
165 165
166 166 assert response.status_code == 200
167 167 assert stub_name in response.body
168 168 assert 'exist yet' not in response.body
169 169
170 170 def test_new_integration_page(self, app, backend_random):
171 171 repo_name = backend_random.repo.repo_name
172 172 url = '/{repo_name}/settings/integrations/new'.format(
173 173 repo_name=repo_name)
174 174
175 175 response = app.get(url)
176 176
177 177 assert response.status_code == 200
178 178
179 179 for integration_key in integration_type_registry:
180 180 nurl = ('/{repo_name}/settings/integrations'
181 181 '/{integration}/new').format(
182 182 repo_name=repo_name,
183 183 integration=integration_key)
184 184
185 185 assert nurl in response.body
186 186
187 187 @pytest.mark.parametrize(
188 188 'IntegrationType', integration_type_registry.values())
189 189 def test_get_create_integration_page(self, app, backend_random,
190 190 IntegrationType):
191 191 repo_name = backend_random.repo.repo_name
192 192 url = '/{repo_name}/settings/integrations/{integration_key}/new'.format(
193 193 repo_name=repo_name, integration_key=IntegrationType.key)
194 194
195 195 response = app.get(url)
196 196
197 197 assert response.status_code == 200
198 198 assert IntegrationType.display_name in response.body
199 199
200 200 def test_post_integration_page(self, app, backend_random, test_repo_group,
201 201 StubIntegrationType, csrf_token):
202 202 repo_name = backend_random.repo.repo_name
203 203 url = '/{repo_name}/settings/integrations/{integration_key}/new'.format(
204 204 repo_name=repo_name, integration_key=StubIntegrationType.key)
205 205
206 206 _post_integration_test_helper(app, url, csrf_token, admin_view=False,
207 207 repo=backend_random.repo, repo_group=test_repo_group)
208 208
209 209
210 210 def _post_integration_test_helper(app, url, csrf_token, repo, repo_group,
211 211 admin_view):
212 212 """
213 213 Posts form data to create integration at the url given then deletes it and
214 214 checks if the redirect url is correct.
215 215 """
216 216
217 app.post(url, params={}, status=403) # missing csrf check
217 app.post(url, params={}, status=403) # missing csrf check
218 218 response = app.post(url, params={'csrf_token': csrf_token})
219 219 assert response.status_code == 200
220 220 assert 'Errors exist' in response.body
221 221
222 222 scopes_destinations = [
223 223 ('global',
224 224 ADMIN_PREFIX + '/integrations'),
225 225 ('root-repos',
226 226 ADMIN_PREFIX + '/integrations'),
227 227 ('repo:%s' % repo.repo_name,
228 228 '/%s/settings/integrations' % repo.repo_name),
229 229 ('repogroup:%s' % repo_group.group_name,
230 230 '/%s/settings/integrations' % repo_group.group_name),
231 231 ('repogroup-recursive:%s' % repo_group.group_name,
232 232 '/%s/settings/integrations' % repo_group.group_name),
233 233 ]
234 234
235 235 for scope, destination in scopes_destinations:
236 236 if admin_view:
237 237 destination = ADMIN_PREFIX + '/integrations'
238 238
239 239 form_data = [
240 240 ('csrf_token', csrf_token),
241 241 ('__start__', 'options:mapping'),
242 242 ('name', 'test integration'),
243 243 ('scope', scope),
244 244 ('enabled', 'true'),
245 245 ('__end__', 'options:mapping'),
246 246 ('__start__', 'settings:mapping'),
247 247 ('test_int_field', '34'),
248 248 ('test_string_field', ''), # empty value on purpose as it's required
249 249 ('__end__', 'settings:mapping'),
250 250 ]
251 251 errors_response = app.post(url, form_data)
252 252 assert 'Errors exist' in errors_response.body
253 253
254 254 form_data[-2] = ('test_string_field', 'data!')
255 255 assert Session().query(Integration).count() == 0
256 256 created_response = app.post(url, form_data)
257 257 assert Session().query(Integration).count() == 1
258 258
259 259 delete_response = app.post(
260 260 created_response.location,
261 261 params={'csrf_token': csrf_token, 'delete': 'delete'})
262 262
263 263 assert Session().query(Integration).count() == 0
264 264 assert delete_response.location.endswith(destination)
General Comments 0
You need to be logged in to leave comments. Login now