##// END OF EJS Templates
auth: make owner of user group give proper admin permissions to the user group....
marcink -
r1443:6321ed72 default
parent child Browse files
Show More
@@ -0,0 +1,128 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import pytest
22
23 from rhodecode.tests import (
24 TestController, url, assert_session_flash, link_to)
25 from rhodecode.model.db import User, UserGroup
26 from rhodecode.model.meta import Session
27 from rhodecode.tests.fixture import Fixture
28
29
30 fixture = Fixture()
31
32
33 class TestAdminUsersGroupsController(TestController):
34
35 def test_regular_user_cannot_see_admin_interfaces(self, user_util):
36 user = user_util.create_user(password='qweqwe')
37 self.log_user(user.username, 'qweqwe')
38
39 # check if in home view, such user doesn't see the "admin" menus
40 response = self.app.get(url('home'))
41
42 assert_response = response.assert_response()
43
44 assert_response.no_element_exists('li.local-admin-repos')
45 assert_response.no_element_exists('li.local-admin-repo-groups')
46 assert_response.no_element_exists('li.local-admin-user-groups')
47
48 response = self.app.get(url('repos'), status=200)
49 response.mustcontain('data: []')
50
51 response = self.app.get(url('repo_groups'), status=200)
52 response.mustcontain('data: []')
53
54 response = self.app.get(url('users_groups'), status=200)
55 response.mustcontain('data: []')
56
57 def test_regular_user_can_see_admin_interfaces_if_owner(self, user_util):
58 user = user_util.create_user(password='qweqwe')
59 username = user.username
60
61 repo = user_util.create_repo(owner=username)
62 repo_name = repo.repo_name
63
64 repo_group = user_util.create_repo_group(owner=username)
65 repo_group_name = repo_group.group_name
66
67 user_group = user_util.create_user_group(owner=username)
68 user_group_name = user_group.users_group_name
69
70 self.log_user(username, 'qweqwe')
71 # check if in home view, such user doesn't see the "admin" menus
72 response = self.app.get(url('home'))
73
74 assert_response = response.assert_response()
75
76 assert_response.one_element_exists('li.local-admin-repos')
77 assert_response.one_element_exists('li.local-admin-repo-groups')
78 assert_response.one_element_exists('li.local-admin-user-groups')
79
80 # admin interfaces have visible elements
81 response = self.app.get(url('repos'), status=200)
82 response.mustcontain('"name_raw": "{}"'.format(repo_name))
83
84 response = self.app.get(url('repo_groups'), status=200)
85 response.mustcontain('"name_raw": "{}"'.format(repo_group_name))
86
87 response = self.app.get(url('users_groups'), status=200)
88 response.mustcontain('"group_name_raw": "{}"'.format(user_group_name))
89
90 def test_regular_user_can_see_admin_interfaces_if_admin_perm(self, user_util):
91 user = user_util.create_user(password='qweqwe')
92 username = user.username
93
94 repo = user_util.create_repo()
95 repo_name = repo.repo_name
96
97 repo_group = user_util.create_repo_group()
98 repo_group_name = repo_group.group_name
99
100 user_group = user_util.create_user_group()
101 user_group_name = user_group.users_group_name
102
103 user_util.grant_user_permission_to_repo(
104 repo, user, 'repository.admin')
105 user_util.grant_user_permission_to_repo_group(
106 repo_group, user, 'group.admin')
107 user_util.grant_user_permission_to_user_group(
108 user_group, user, 'usergroup.admin')
109
110 self.log_user(username, 'qweqwe')
111 # check if in home view, such user doesn't see the "admin" menus
112 response = self.app.get(url('home'))
113
114 assert_response = response.assert_response()
115
116 assert_response.one_element_exists('li.local-admin-repos')
117 assert_response.one_element_exists('li.local-admin-repo-groups')
118 assert_response.one_element_exists('li.local-admin-user-groups')
119
120 # admin interfaces have visible elements
121 response = self.app.get(url('repos'), status=200)
122 response.mustcontain('"name_raw": "{}"'.format(repo_name))
123
124 response = self.app.get(url('repo_groups'), status=200)
125 response.mustcontain('"name_raw": "{}"'.format(repo_group_name))
126
127 response = self.app.get(url('users_groups'), status=200)
128 response.mustcontain('"group_name_raw": "{}"'.format(user_group_name))
@@ -1,1909 +1,1929 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 authentication and permission libraries
23 23 """
24 24
25 25 import inspect
26 26 import collections
27 27 import fnmatch
28 28 import hashlib
29 29 import itertools
30 30 import logging
31 31 import os
32 32 import random
33 33 import time
34 34 import traceback
35 35 from functools import wraps
36 36
37 37 import ipaddress
38 38 from pyramid.httpexceptions import HTTPForbidden
39 39 from pylons import url, request
40 40 from pylons.controllers.util import abort, redirect
41 41 from pylons.i18n.translation import _
42 42 from sqlalchemy import or_
43 43 from sqlalchemy.orm.exc import ObjectDeletedError
44 44 from sqlalchemy.orm import joinedload
45 45 from zope.cachedescriptors.property import Lazy as LazyProperty
46 46
47 47 import rhodecode
48 48 from rhodecode.model import meta
49 49 from rhodecode.model.meta import Session
50 50 from rhodecode.model.user import UserModel
51 51 from rhodecode.model.db import (
52 52 User, Repository, Permission, UserToPerm, UserGroupToPerm, UserGroupMember,
53 53 UserIpMap, UserApiKeys, RepoGroup)
54 54 from rhodecode.lib import caches
55 55 from rhodecode.lib.utils2 import safe_unicode, aslist, safe_str, md5
56 56 from rhodecode.lib.utils import (
57 57 get_repo_slug, get_repo_group_slug, get_user_group_slug)
58 58 from rhodecode.lib.caching_query import FromCache
59 59
60 60
61 61 if rhodecode.is_unix:
62 62 import bcrypt
63 63
64 64 log = logging.getLogger(__name__)
65 65
66 66 csrf_token_key = "csrf_token"
67 67
68 68
69 69 class PasswordGenerator(object):
70 70 """
71 71 This is a simple class for generating password from different sets of
72 72 characters
73 73 usage::
74 74
75 75 passwd_gen = PasswordGenerator()
76 76 #print 8-letter password containing only big and small letters
77 77 of alphabet
78 78 passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
79 79 """
80 80 ALPHABETS_NUM = r'''1234567890'''
81 81 ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
82 82 ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
83 83 ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
84 84 ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
85 85 + ALPHABETS_NUM + ALPHABETS_SPECIAL
86 86 ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
87 87 ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
88 88 ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
89 89 ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
90 90
91 91 def __init__(self, passwd=''):
92 92 self.passwd = passwd
93 93
94 94 def gen_password(self, length, type_=None):
95 95 if type_ is None:
96 96 type_ = self.ALPHABETS_FULL
97 97 self.passwd = ''.join([random.choice(type_) for _ in xrange(length)])
98 98 return self.passwd
99 99
100 100
101 101 class _RhodeCodeCryptoBase(object):
102 102 ENC_PREF = None
103 103
104 104 def hash_create(self, str_):
105 105 """
106 106 hash the string using
107 107
108 108 :param str_: password to hash
109 109 """
110 110 raise NotImplementedError
111 111
112 112 def hash_check_with_upgrade(self, password, hashed):
113 113 """
114 114 Returns tuple in which first element is boolean that states that
115 115 given password matches it's hashed version, and the second is new hash
116 116 of the password, in case this password should be migrated to new
117 117 cipher.
118 118 """
119 119 checked_hash = self.hash_check(password, hashed)
120 120 return checked_hash, None
121 121
122 122 def hash_check(self, password, hashed):
123 123 """
124 124 Checks matching password with it's hashed value.
125 125
126 126 :param password: password
127 127 :param hashed: password in hashed form
128 128 """
129 129 raise NotImplementedError
130 130
131 131 def _assert_bytes(self, value):
132 132 """
133 133 Passing in an `unicode` object can lead to hard to detect issues
134 134 if passwords contain non-ascii characters. Doing a type check
135 135 during runtime, so that such mistakes are detected early on.
136 136 """
137 137 if not isinstance(value, str):
138 138 raise TypeError(
139 139 "Bytestring required as input, got %r." % (value, ))
140 140
141 141
142 142 class _RhodeCodeCryptoBCrypt(_RhodeCodeCryptoBase):
143 143 ENC_PREF = '$2a$10'
144 144
145 145 def hash_create(self, str_):
146 146 self._assert_bytes(str_)
147 147 return bcrypt.hashpw(str_, bcrypt.gensalt(10))
148 148
149 149 def hash_check_with_upgrade(self, password, hashed):
150 150 """
151 151 Returns tuple in which first element is boolean that states that
152 152 given password matches it's hashed version, and the second is new hash
153 153 of the password, in case this password should be migrated to new
154 154 cipher.
155 155
156 156 This implements special upgrade logic which works like that:
157 157 - check if the given password == bcrypted hash, if yes then we
158 158 properly used password and it was already in bcrypt. Proceed
159 159 without any changes
160 160 - if bcrypt hash check is not working try with sha256. If hash compare
161 161 is ok, it means we using correct but old hashed password. indicate
162 162 hash change and proceed
163 163 """
164 164
165 165 new_hash = None
166 166
167 167 # regular pw check
168 168 password_match_bcrypt = self.hash_check(password, hashed)
169 169
170 170 # now we want to know if the password was maybe from sha256
171 171 # basically calling _RhodeCodeCryptoSha256().hash_check()
172 172 if not password_match_bcrypt:
173 173 if _RhodeCodeCryptoSha256().hash_check(password, hashed):
174 174 new_hash = self.hash_create(password) # make new bcrypt hash
175 175 password_match_bcrypt = True
176 176
177 177 return password_match_bcrypt, new_hash
178 178
179 179 def hash_check(self, password, hashed):
180 180 """
181 181 Checks matching password with it's hashed value.
182 182
183 183 :param password: password
184 184 :param hashed: password in hashed form
185 185 """
186 186 self._assert_bytes(password)
187 187 try:
188 188 return bcrypt.hashpw(password, hashed) == hashed
189 189 except ValueError as e:
190 190 # we're having a invalid salt here probably, we should not crash
191 191 # just return with False as it would be a wrong password.
192 192 log.debug('Failed to check password hash using bcrypt %s',
193 193 safe_str(e))
194 194
195 195 return False
196 196
197 197
198 198 class _RhodeCodeCryptoSha256(_RhodeCodeCryptoBase):
199 199 ENC_PREF = '_'
200 200
201 201 def hash_create(self, str_):
202 202 self._assert_bytes(str_)
203 203 return hashlib.sha256(str_).hexdigest()
204 204
205 205 def hash_check(self, password, hashed):
206 206 """
207 207 Checks matching password with it's hashed value.
208 208
209 209 :param password: password
210 210 :param hashed: password in hashed form
211 211 """
212 212 self._assert_bytes(password)
213 213 return hashlib.sha256(password).hexdigest() == hashed
214 214
215 215
216 216 class _RhodeCodeCryptoMd5(_RhodeCodeCryptoBase):
217 217 ENC_PREF = '_'
218 218
219 219 def hash_create(self, str_):
220 220 self._assert_bytes(str_)
221 221 return hashlib.md5(str_).hexdigest()
222 222
223 223 def hash_check(self, password, hashed):
224 224 """
225 225 Checks matching password with it's hashed value.
226 226
227 227 :param password: password
228 228 :param hashed: password in hashed form
229 229 """
230 230 self._assert_bytes(password)
231 231 return hashlib.md5(password).hexdigest() == hashed
232 232
233 233
234 234 def crypto_backend():
235 235 """
236 236 Return the matching crypto backend.
237 237
238 238 Selection is based on if we run tests or not, we pick md5 backend to run
239 239 tests faster since BCRYPT is expensive to calculate
240 240 """
241 241 if rhodecode.is_test:
242 242 RhodeCodeCrypto = _RhodeCodeCryptoMd5()
243 243 else:
244 244 RhodeCodeCrypto = _RhodeCodeCryptoBCrypt()
245 245
246 246 return RhodeCodeCrypto
247 247
248 248
249 249 def get_crypt_password(password):
250 250 """
251 251 Create the hash of `password` with the active crypto backend.
252 252
253 253 :param password: The cleartext password.
254 254 :type password: unicode
255 255 """
256 256 password = safe_str(password)
257 257 return crypto_backend().hash_create(password)
258 258
259 259
260 260 def check_password(password, hashed):
261 261 """
262 262 Check if the value in `password` matches the hash in `hashed`.
263 263
264 264 :param password: The cleartext password.
265 265 :type password: unicode
266 266
267 267 :param hashed: The expected hashed version of the password.
268 268 :type hashed: The hash has to be passed in in text representation.
269 269 """
270 270 password = safe_str(password)
271 271 return crypto_backend().hash_check(password, hashed)
272 272
273 273
274 274 def generate_auth_token(data, salt=None):
275 275 """
276 276 Generates API KEY from given string
277 277 """
278 278
279 279 if salt is None:
280 280 salt = os.urandom(16)
281 281 return hashlib.sha1(safe_str(data) + salt).hexdigest()
282 282
283 283
284 284 class CookieStoreWrapper(object):
285 285
286 286 def __init__(self, cookie_store):
287 287 self.cookie_store = cookie_store
288 288
289 289 def __repr__(self):
290 290 return 'CookieStore<%s>' % (self.cookie_store)
291 291
292 292 def get(self, key, other=None):
293 293 if isinstance(self.cookie_store, dict):
294 294 return self.cookie_store.get(key, other)
295 295 elif isinstance(self.cookie_store, AuthUser):
296 296 return self.cookie_store.__dict__.get(key, other)
297 297
298 298
299 299 def _cached_perms_data(user_id, scope, user_is_admin,
300 300 user_inherit_default_permissions, explicit, algo):
301 301
302 302 permissions = PermissionCalculator(
303 303 user_id, scope, user_is_admin, user_inherit_default_permissions,
304 304 explicit, algo)
305 305 return permissions.calculate()
306 306
307 307 class PermOrigin:
308 308 ADMIN = 'superadmin'
309 309
310 310 REPO_USER = 'user:%s'
311 311 REPO_USERGROUP = 'usergroup:%s'
312 312 REPO_OWNER = 'repo.owner'
313 313 REPO_DEFAULT = 'repo.default'
314 314 REPO_PRIVATE = 'repo.private'
315 315
316 316 REPOGROUP_USER = 'user:%s'
317 317 REPOGROUP_USERGROUP = 'usergroup:%s'
318 318 REPOGROUP_OWNER = 'group.owner'
319 319 REPOGROUP_DEFAULT = 'group.default'
320 320
321 321 USERGROUP_USER = 'user:%s'
322 322 USERGROUP_USERGROUP = 'usergroup:%s'
323 323 USERGROUP_OWNER = 'usergroup.owner'
324 324 USERGROUP_DEFAULT = 'usergroup.default'
325 325
326 326
327 327 class PermOriginDict(dict):
328 328 """
329 329 A special dict used for tracking permissions along with their origins.
330 330
331 331 `__setitem__` has been overridden to expect a tuple(perm, origin)
332 332 `__getitem__` will return only the perm
333 333 `.perm_origin_stack` will return the stack of (perm, origin) set per key
334 334
335 335 >>> perms = PermOriginDict()
336 336 >>> perms['resource'] = 'read', 'default'
337 337 >>> perms['resource']
338 338 'read'
339 339 >>> perms['resource'] = 'write', 'admin'
340 340 >>> perms['resource']
341 341 'write'
342 342 >>> perms.perm_origin_stack
343 343 {'resource': [('read', 'default'), ('write', 'admin')]}
344 344 """
345 345
346 346
347 347 def __init__(self, *args, **kw):
348 348 dict.__init__(self, *args, **kw)
349 349 self.perm_origin_stack = {}
350 350
351 351 def __setitem__(self, key, (perm, origin)):
352 352 self.perm_origin_stack.setdefault(key, []).append((perm, origin))
353 353 dict.__setitem__(self, key, perm)
354 354
355 355
356 356 class PermissionCalculator(object):
357 357
358 358 def __init__(
359 359 self, user_id, scope, user_is_admin,
360 360 user_inherit_default_permissions, explicit, algo):
361 361 self.user_id = user_id
362 362 self.user_is_admin = user_is_admin
363 363 self.inherit_default_permissions = user_inherit_default_permissions
364 364 self.explicit = explicit
365 365 self.algo = algo
366 366
367 367 scope = scope or {}
368 368 self.scope_repo_id = scope.get('repo_id')
369 369 self.scope_repo_group_id = scope.get('repo_group_id')
370 370 self.scope_user_group_id = scope.get('user_group_id')
371 371
372 372 self.default_user_id = User.get_default_user(cache=True).user_id
373 373
374 374 self.permissions_repositories = PermOriginDict()
375 375 self.permissions_repository_groups = PermOriginDict()
376 376 self.permissions_user_groups = PermOriginDict()
377 377 self.permissions_global = set()
378 378
379 379 self.default_repo_perms = Permission.get_default_repo_perms(
380 380 self.default_user_id, self.scope_repo_id)
381 381 self.default_repo_groups_perms = Permission.get_default_group_perms(
382 382 self.default_user_id, self.scope_repo_group_id)
383 383 self.default_user_group_perms = \
384 384 Permission.get_default_user_group_perms(
385 385 self.default_user_id, self.scope_user_group_id)
386 386
387 387 def calculate(self):
388 388 if self.user_is_admin:
389 389 return self._admin_permissions()
390 390
391 391 self._calculate_global_default_permissions()
392 392 self._calculate_global_permissions()
393 393 self._calculate_default_permissions()
394 394 self._calculate_repository_permissions()
395 395 self._calculate_repository_group_permissions()
396 396 self._calculate_user_group_permissions()
397 397 return self._permission_structure()
398 398
399 399 def _admin_permissions(self):
400 400 """
401 401 admin user have all default rights for repositories
402 402 and groups set to admin
403 403 """
404 404 self.permissions_global.add('hg.admin')
405 405 self.permissions_global.add('hg.create.write_on_repogroup.true')
406 406
407 407 # repositories
408 408 for perm in self.default_repo_perms:
409 409 r_k = perm.UserRepoToPerm.repository.repo_name
410 410 p = 'repository.admin'
411 411 self.permissions_repositories[r_k] = p, PermOrigin.ADMIN
412 412
413 413 # repository groups
414 414 for perm in self.default_repo_groups_perms:
415 415 rg_k = perm.UserRepoGroupToPerm.group.group_name
416 416 p = 'group.admin'
417 417 self.permissions_repository_groups[rg_k] = p, PermOrigin.ADMIN
418 418
419 419 # user groups
420 420 for perm in self.default_user_group_perms:
421 421 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
422 422 p = 'usergroup.admin'
423 423 self.permissions_user_groups[u_k] = p, PermOrigin.ADMIN
424 424
425 425 return self._permission_structure()
426 426
427 427 def _calculate_global_default_permissions(self):
428 428 """
429 429 global permissions taken from the default user
430 430 """
431 431 default_global_perms = UserToPerm.query()\
432 432 .filter(UserToPerm.user_id == self.default_user_id)\
433 433 .options(joinedload(UserToPerm.permission))
434 434
435 435 for perm in default_global_perms:
436 436 self.permissions_global.add(perm.permission.permission_name)
437 437
438 438 def _calculate_global_permissions(self):
439 439 """
440 440 Set global system permissions with user permissions or permissions
441 441 taken from the user groups of the current user.
442 442
443 443 The permissions include repo creating, repo group creating, forking
444 444 etc.
445 445 """
446 446
447 447 # now we read the defined permissions and overwrite what we have set
448 448 # before those can be configured from groups or users explicitly.
449 449
450 450 # TODO: johbo: This seems to be out of sync, find out the reason
451 451 # for the comment below and update it.
452 452
453 453 # In case we want to extend this list we should be always in sync with
454 454 # User.DEFAULT_USER_PERMISSIONS definitions
455 455 _configurable = frozenset([
456 456 'hg.fork.none', 'hg.fork.repository',
457 457 'hg.create.none', 'hg.create.repository',
458 458 'hg.usergroup.create.false', 'hg.usergroup.create.true',
459 459 'hg.repogroup.create.false', 'hg.repogroup.create.true',
460 460 'hg.create.write_on_repogroup.false',
461 461 'hg.create.write_on_repogroup.true',
462 462 'hg.inherit_default_perms.false', 'hg.inherit_default_perms.true'
463 463 ])
464 464
465 465 # USER GROUPS comes first user group global permissions
466 466 user_perms_from_users_groups = Session().query(UserGroupToPerm)\
467 467 .options(joinedload(UserGroupToPerm.permission))\
468 468 .join((UserGroupMember, UserGroupToPerm.users_group_id ==
469 469 UserGroupMember.users_group_id))\
470 470 .filter(UserGroupMember.user_id == self.user_id)\
471 471 .order_by(UserGroupToPerm.users_group_id)\
472 472 .all()
473 473
474 474 # need to group here by groups since user can be in more than
475 475 # one group, so we get all groups
476 476 _explicit_grouped_perms = [
477 477 [x, list(y)] for x, y in
478 478 itertools.groupby(user_perms_from_users_groups,
479 479 lambda _x: _x.users_group)]
480 480
481 481 for gr, perms in _explicit_grouped_perms:
482 482 # since user can be in multiple groups iterate over them and
483 483 # select the lowest permissions first (more explicit)
484 484 # TODO: marcink: do this^^
485 485
486 486 # group doesn't inherit default permissions so we actually set them
487 487 if not gr.inherit_default_permissions:
488 488 # NEED TO IGNORE all previously set configurable permissions
489 489 # and replace them with explicitly set from this user
490 490 # group permissions
491 491 self.permissions_global = self.permissions_global.difference(
492 492 _configurable)
493 493 for perm in perms:
494 494 self.permissions_global.add(perm.permission.permission_name)
495 495
496 496 # user explicit global permissions
497 497 user_perms = Session().query(UserToPerm)\
498 498 .options(joinedload(UserToPerm.permission))\
499 499 .filter(UserToPerm.user_id == self.user_id).all()
500 500
501 501 if not self.inherit_default_permissions:
502 502 # NEED TO IGNORE all configurable permissions and
503 503 # replace them with explicitly set from this user permissions
504 504 self.permissions_global = self.permissions_global.difference(
505 505 _configurable)
506 506 for perm in user_perms:
507 507 self.permissions_global.add(perm.permission.permission_name)
508 508
509 509 def _calculate_default_permissions(self):
510 510 """
511 511 Set default user permissions for repositories, repository groups
512 512 taken from the default user.
513 513
514 514 Calculate inheritance of object permissions based on what we have now
515 515 in GLOBAL permissions. We check if .false is in GLOBAL since this is
516 516 explicitly set. Inherit is the opposite of .false being there.
517 517
518 518 .. note::
519 519
520 520 the syntax is little bit odd but what we need to check here is
521 521 the opposite of .false permission being in the list so even for
522 522 inconsistent state when both .true/.false is there
523 523 .false is more important
524 524
525 525 """
526 526 user_inherit_object_permissions = not ('hg.inherit_default_perms.false'
527 527 in self.permissions_global)
528 528
529 529 # defaults for repositories, taken from `default` user permissions
530 530 # on given repo
531 531 for perm in self.default_repo_perms:
532 532 r_k = perm.UserRepoToPerm.repository.repo_name
533 533 o = PermOrigin.REPO_DEFAULT
534 534 if perm.Repository.private and not (
535 535 perm.Repository.user_id == self.user_id):
536 536 # disable defaults for private repos,
537 537 p = 'repository.none'
538 538 o = PermOrigin.REPO_PRIVATE
539 539 elif perm.Repository.user_id == self.user_id:
540 540 # set admin if owner
541 541 p = 'repository.admin'
542 542 o = PermOrigin.REPO_OWNER
543 543 else:
544 544 p = perm.Permission.permission_name
545 545 # if we decide this user isn't inheriting permissions from
546 546 # default user we set him to .none so only explicit
547 547 # permissions work
548 548 if not user_inherit_object_permissions:
549 549 p = 'repository.none'
550 550 self.permissions_repositories[r_k] = p, o
551 551
552 552 # defaults for repository groups taken from `default` user permission
553 553 # on given group
554 554 for perm in self.default_repo_groups_perms:
555 555 rg_k = perm.UserRepoGroupToPerm.group.group_name
556 556 o = PermOrigin.REPOGROUP_DEFAULT
557 557 if perm.RepoGroup.user_id == self.user_id:
558 558 # set admin if owner
559 559 p = 'group.admin'
560 560 o = PermOrigin.REPOGROUP_OWNER
561 561 else:
562 562 p = perm.Permission.permission_name
563 563
564 564 # if we decide this user isn't inheriting permissions from default
565 565 # user we set him to .none so only explicit permissions work
566 566 if not user_inherit_object_permissions:
567 567 p = 'group.none'
568 568 self.permissions_repository_groups[rg_k] = p, o
569 569
570 570 # defaults for user groups taken from `default` user permission
571 571 # on given user group
572 572 for perm in self.default_user_group_perms:
573 573 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
574 o = PermOrigin.USERGROUP_DEFAULT
575 if perm.UserGroup.user_id == self.user_id:
576 # set admin if owner
577 p = 'usergroup.admin'
578 o = PermOrigin.USERGROUP_OWNER
579 else:
574 580 p = perm.Permission.permission_name
575 o = PermOrigin.USERGROUP_DEFAULT
581
576 582 # if we decide this user isn't inheriting permissions from default
577 583 # user we set him to .none so only explicit permissions work
578 584 if not user_inherit_object_permissions:
579 585 p = 'usergroup.none'
580 586 self.permissions_user_groups[u_k] = p, o
581 587
582 588 def _calculate_repository_permissions(self):
583 589 """
584 590 Repository permissions for the current user.
585 591
586 592 Check if the user is part of user groups for this repository and
587 593 fill in the permission from it. `_choose_permission` decides of which
588 594 permission should be selected based on selected method.
589 595 """
590 596
591 597 # user group for repositories permissions
592 598 user_repo_perms_from_user_group = Permission\
593 599 .get_default_repo_perms_from_user_group(
594 600 self.user_id, self.scope_repo_id)
595 601
596 602 multiple_counter = collections.defaultdict(int)
597 603 for perm in user_repo_perms_from_user_group:
598 604 r_k = perm.UserGroupRepoToPerm.repository.repo_name
599 605 ug_k = perm.UserGroupRepoToPerm.users_group.users_group_name
600 606 multiple_counter[r_k] += 1
601 607 p = perm.Permission.permission_name
602 608 o = PermOrigin.REPO_USERGROUP % ug_k
603 609
604 610 if perm.Repository.user_id == self.user_id:
605 611 # set admin if owner
606 612 p = 'repository.admin'
607 613 o = PermOrigin.REPO_OWNER
608 614 else:
609 615 if multiple_counter[r_k] > 1:
610 616 cur_perm = self.permissions_repositories[r_k]
611 617 p = self._choose_permission(p, cur_perm)
612 618 self.permissions_repositories[r_k] = p, o
613 619
614 620 # user explicit permissions for repositories, overrides any specified
615 621 # by the group permission
616 622 user_repo_perms = Permission.get_default_repo_perms(
617 623 self.user_id, self.scope_repo_id)
618 624 for perm in user_repo_perms:
619 625 r_k = perm.UserRepoToPerm.repository.repo_name
620 626 o = PermOrigin.REPO_USER % perm.UserRepoToPerm.user.username
621 627 # set admin if owner
622 628 if perm.Repository.user_id == self.user_id:
623 629 p = 'repository.admin'
624 630 o = PermOrigin.REPO_OWNER
625 631 else:
626 632 p = perm.Permission.permission_name
627 633 if not self.explicit:
628 634 cur_perm = self.permissions_repositories.get(
629 635 r_k, 'repository.none')
630 636 p = self._choose_permission(p, cur_perm)
631 637 self.permissions_repositories[r_k] = p, o
632 638
633 639 def _calculate_repository_group_permissions(self):
634 640 """
635 641 Repository group permissions for the current user.
636 642
637 643 Check if the user is part of user groups for repository groups and
638 644 fill in the permissions from it. `_choose_permmission` decides of which
639 645 permission should be selected based on selected method.
640 646 """
641 647 # user group for repo groups permissions
642 648 user_repo_group_perms_from_user_group = Permission\
643 649 .get_default_group_perms_from_user_group(
644 650 self.user_id, self.scope_repo_group_id)
645 651
646 652 multiple_counter = collections.defaultdict(int)
647 653 for perm in user_repo_group_perms_from_user_group:
648 654 g_k = perm.UserGroupRepoGroupToPerm.group.group_name
649 655 ug_k = perm.UserGroupRepoGroupToPerm.users_group.users_group_name
650 656 o = PermOrigin.REPOGROUP_USERGROUP % ug_k
651 657 multiple_counter[g_k] += 1
652 658 p = perm.Permission.permission_name
653 659 if perm.RepoGroup.user_id == self.user_id:
654 # set admin if owner
660 # set admin if owner, even for member of other user group
655 661 p = 'group.admin'
656 662 o = PermOrigin.REPOGROUP_OWNER
657 663 else:
658 664 if multiple_counter[g_k] > 1:
659 665 cur_perm = self.permissions_repository_groups[g_k]
660 666 p = self._choose_permission(p, cur_perm)
661 667 self.permissions_repository_groups[g_k] = p, o
662 668
663 669 # user explicit permissions for repository groups
664 670 user_repo_groups_perms = Permission.get_default_group_perms(
665 671 self.user_id, self.scope_repo_group_id)
666 672 for perm in user_repo_groups_perms:
667 673 rg_k = perm.UserRepoGroupToPerm.group.group_name
668 674 u_k = perm.UserRepoGroupToPerm.user.username
669 675 o = PermOrigin.REPOGROUP_USER % u_k
670 676
671 677 if perm.RepoGroup.user_id == self.user_id:
672 678 # set admin if owner
673 679 p = 'group.admin'
674 680 o = PermOrigin.REPOGROUP_OWNER
675 681 else:
676 682 p = perm.Permission.permission_name
677 683 if not self.explicit:
678 684 cur_perm = self.permissions_repository_groups.get(
679 685 rg_k, 'group.none')
680 686 p = self._choose_permission(p, cur_perm)
681 687 self.permissions_repository_groups[rg_k] = p, o
682 688
683 689 def _calculate_user_group_permissions(self):
684 690 """
685 691 User group permissions for the current user.
686 692 """
687 693 # user group for user group permissions
688 694 user_group_from_user_group = Permission\
689 695 .get_default_user_group_perms_from_user_group(
690 self.user_id, self.scope_repo_group_id)
696 self.user_id, self.scope_user_group_id)
691 697
692 698 multiple_counter = collections.defaultdict(int)
693 699 for perm in user_group_from_user_group:
694 700 g_k = perm.UserGroupUserGroupToPerm\
695 701 .target_user_group.users_group_name
696 702 u_k = perm.UserGroupUserGroupToPerm\
697 703 .user_group.users_group_name
698 704 o = PermOrigin.USERGROUP_USERGROUP % u_k
699 705 multiple_counter[g_k] += 1
700 706 p = perm.Permission.permission_name
707
708 if perm.UserGroup.user_id == self.user_id:
709 # set admin if owner, even for member of other user group
710 p = 'usergroup.admin'
711 o = PermOrigin.USERGROUP_OWNER
712 else:
701 713 if multiple_counter[g_k] > 1:
702 714 cur_perm = self.permissions_user_groups[g_k]
703 715 p = self._choose_permission(p, cur_perm)
704 716 self.permissions_user_groups[g_k] = p, o
705 717
706 718 # user explicit permission for user groups
707 719 user_user_groups_perms = Permission.get_default_user_group_perms(
708 720 self.user_id, self.scope_user_group_id)
709 721 for perm in user_user_groups_perms:
710 722 ug_k = perm.UserUserGroupToPerm.user_group.users_group_name
711 723 u_k = perm.UserUserGroupToPerm.user.username
724 o = PermOrigin.USERGROUP_USER % u_k
725
726 if perm.UserGroup.user_id == self.user_id:
727 # set admin if owner
728 p = 'usergroup.admin'
729 o = PermOrigin.USERGROUP_OWNER
730 else:
712 731 p = perm.Permission.permission_name
713 o = PermOrigin.USERGROUP_USER % u_k
714 732 if not self.explicit:
715 733 cur_perm = self.permissions_user_groups.get(
716 734 ug_k, 'usergroup.none')
717 735 p = self._choose_permission(p, cur_perm)
718 736 self.permissions_user_groups[ug_k] = p, o
719 737
720 738 def _choose_permission(self, new_perm, cur_perm):
721 739 new_perm_val = Permission.PERM_WEIGHTS[new_perm]
722 740 cur_perm_val = Permission.PERM_WEIGHTS[cur_perm]
723 741 if self.algo == 'higherwin':
724 742 if new_perm_val > cur_perm_val:
725 743 return new_perm
726 744 return cur_perm
727 745 elif self.algo == 'lowerwin':
728 746 if new_perm_val < cur_perm_val:
729 747 return new_perm
730 748 return cur_perm
731 749
732 750 def _permission_structure(self):
733 751 return {
734 752 'global': self.permissions_global,
735 753 'repositories': self.permissions_repositories,
736 754 'repositories_groups': self.permissions_repository_groups,
737 755 'user_groups': self.permissions_user_groups,
738 756 }
739 757
740 758
741 759 def allowed_auth_token_access(controller_name, whitelist=None, auth_token=None):
742 760 """
743 761 Check if given controller_name is in whitelist of auth token access
744 762 """
745 763 if not whitelist:
746 764 from rhodecode import CONFIG
747 765 whitelist = aslist(
748 766 CONFIG.get('api_access_controllers_whitelist'), sep=',')
749 767 log.debug(
750 768 'Allowed controllers for AUTH TOKEN access: %s' % (whitelist,))
751 769
752 770 auth_token_access_valid = False
753 771 for entry in whitelist:
754 772 if fnmatch.fnmatch(controller_name, entry):
755 773 auth_token_access_valid = True
756 774 break
757 775
758 776 if auth_token_access_valid:
759 777 log.debug('controller:%s matches entry in whitelist'
760 778 % (controller_name,))
761 779 else:
762 780 msg = ('controller: %s does *NOT* match any entry in whitelist'
763 781 % (controller_name,))
764 782 if auth_token:
765 783 # if we use auth token key and don't have access it's a warning
766 784 log.warning(msg)
767 785 else:
768 786 log.debug(msg)
769 787
770 788 return auth_token_access_valid
771 789
772 790
773 791 class AuthUser(object):
774 792 """
775 793 A simple object that handles all attributes of user in RhodeCode
776 794
777 795 It does lookup based on API key,given user, or user present in session
778 796 Then it fills all required information for such user. It also checks if
779 797 anonymous access is enabled and if so, it returns default user as logged in
780 798 """
781 799 GLOBAL_PERMS = [x[0] for x in Permission.PERMS]
782 800
783 801 def __init__(self, user_id=None, api_key=None, username=None, ip_addr=None):
784 802
785 803 self.user_id = user_id
786 804 self._api_key = api_key
787 805
788 806 self.api_key = None
789 807 self.feed_token = ''
790 808 self.username = username
791 809 self.ip_addr = ip_addr
792 810 self.name = ''
793 811 self.lastname = ''
794 812 self.email = ''
795 813 self.is_authenticated = False
796 814 self.admin = False
797 815 self.inherit_default_permissions = False
798 816 self.password = ''
799 817
800 818 self.anonymous_user = None # propagated on propagate_data
801 819 self.propagate_data()
802 820 self._instance = None
803 821 self._permissions_scoped_cache = {} # used to bind scoped calculation
804 822
805 823 @LazyProperty
806 824 def permissions(self):
807 825 return self.get_perms(user=self, cache=False)
808 826
809 827 def permissions_with_scope(self, scope):
810 828 """
811 829 Call the get_perms function with scoped data. The scope in that function
812 830 narrows the SQL calls to the given ID of objects resulting in fetching
813 831 Just particular permission we want to obtain. If scope is an empty dict
814 832 then it basically narrows the scope to GLOBAL permissions only.
815 833
816 834 :param scope: dict
817 835 """
818 836 if 'repo_name' in scope:
819 837 obj = Repository.get_by_repo_name(scope['repo_name'])
820 838 if obj:
821 839 scope['repo_id'] = obj.repo_id
822 840 _scope = {
823 841 'repo_id': -1,
824 842 'user_group_id': -1,
825 843 'repo_group_id': -1,
826 844 }
827 845 _scope.update(scope)
828 846 cache_key = "_".join(map(safe_str, reduce(lambda a, b: a+b,
829 847 _scope.items())))
830 848 if cache_key not in self._permissions_scoped_cache:
831 849 # store in cache to mimic how the @LazyProperty works,
832 850 # the difference here is that we use the unique key calculated
833 851 # from params and values
834 852 res = self.get_perms(user=self, cache=False, scope=_scope)
835 853 self._permissions_scoped_cache[cache_key] = res
836 854 return self._permissions_scoped_cache[cache_key]
837 855
838 856 def get_instance(self):
839 857 return User.get(self.user_id)
840 858
841 859 def update_lastactivity(self):
842 860 if self.user_id:
843 861 User.get(self.user_id).update_lastactivity()
844 862
845 863 def propagate_data(self):
846 864 """
847 865 Fills in user data and propagates values to this instance. Maps fetched
848 866 user attributes to this class instance attributes
849 867 """
850 868 log.debug('starting data propagation for new potential AuthUser')
851 869 user_model = UserModel()
852 870 anon_user = self.anonymous_user = User.get_default_user(cache=True)
853 871 is_user_loaded = False
854 872
855 873 # lookup by userid
856 874 if self.user_id is not None and self.user_id != anon_user.user_id:
857 875 log.debug('Trying Auth User lookup by USER ID: `%s`' % self.user_id)
858 876 is_user_loaded = user_model.fill_data(self, user_id=self.user_id)
859 877
860 878 # try go get user by api key
861 879 elif self._api_key and self._api_key != anon_user.api_key:
862 880 log.debug('Trying Auth User lookup by API KEY: `%s`' % self._api_key)
863 881 is_user_loaded = user_model.fill_data(self, api_key=self._api_key)
864 882
865 883 # lookup by username
866 884 elif self.username:
867 885 log.debug('Trying Auth User lookup by USER NAME: `%s`' % self.username)
868 886 is_user_loaded = user_model.fill_data(self, username=self.username)
869 887 else:
870 888 log.debug('No data in %s that could been used to log in' % self)
871 889
872 890 if not is_user_loaded:
873 891 log.debug('Failed to load user. Fallback to default user')
874 892 # if we cannot authenticate user try anonymous
875 893 if anon_user.active:
876 894 user_model.fill_data(self, user_id=anon_user.user_id)
877 895 # then we set this user is logged in
878 896 self.is_authenticated = True
879 897 else:
880 898 # in case of disabled anonymous user we reset some of the
881 899 # parameters so such user is "corrupted", skipping the fill_data
882 900 for attr in ['user_id', 'username', 'admin', 'active']:
883 901 setattr(self, attr, None)
884 902 self.is_authenticated = False
885 903
886 904 if not self.username:
887 905 self.username = 'None'
888 906
889 907 log.debug('Auth User is now %s' % self)
890 908
891 909 def get_perms(self, user, scope=None, explicit=True, algo='higherwin',
892 910 cache=False):
893 911 """
894 912 Fills user permission attribute with permissions taken from database
895 913 works for permissions given for repositories, and for permissions that
896 914 are granted to groups
897 915
898 916 :param user: instance of User object from database
899 917 :param explicit: In case there are permissions both for user and a group
900 918 that user is part of, explicit flag will defiine if user will
901 919 explicitly override permissions from group, if it's False it will
902 920 make decision based on the algo
903 921 :param algo: algorithm to decide what permission should be choose if
904 922 it's multiple defined, eg user in two different groups. It also
905 923 decides if explicit flag is turned off how to specify the permission
906 924 for case when user is in a group + have defined separate permission
907 925 """
908 926 user_id = user.user_id
909 927 user_is_admin = user.is_admin
910 928
911 929 # inheritance of global permissions like create repo/fork repo etc
912 930 user_inherit_default_permissions = user.inherit_default_permissions
913 931
914 932 log.debug('Computing PERMISSION tree for scope %s' % (scope, ))
915 933 compute = caches.conditional_cache(
916 934 'short_term', 'cache_desc',
917 935 condition=cache, func=_cached_perms_data)
918 936 result = compute(user_id, scope, user_is_admin,
919 937 user_inherit_default_permissions, explicit, algo)
920 938
921 939 result_repr = []
922 940 for k in result:
923 941 result_repr.append((k, len(result[k])))
924 942
925 943 log.debug('PERMISSION tree computed %s' % (result_repr,))
926 944 return result
927 945
928 946 @property
929 947 def is_default(self):
930 948 return self.username == User.DEFAULT_USER
931 949
932 950 @property
933 951 def is_admin(self):
934 952 return self.admin
935 953
936 954 @property
937 955 def is_user_object(self):
938 956 return self.user_id is not None
939 957
940 958 @property
941 959 def repositories_admin(self):
942 960 """
943 961 Returns list of repositories you're an admin of
944 962 """
945 return [x[0] for x in self.permissions['repositories'].iteritems()
963 return [
964 x[0] for x in self.permissions['repositories'].iteritems()
946 965 if x[1] == 'repository.admin']
947 966
948 967 @property
949 968 def repository_groups_admin(self):
950 969 """
951 970 Returns list of repository groups you're an admin of
952 971 """
953 return [x[0]
954 for x in self.permissions['repositories_groups'].iteritems()
972 return [
973 x[0] for x in self.permissions['repositories_groups'].iteritems()
955 974 if x[1] == 'group.admin']
956 975
957 976 @property
958 977 def user_groups_admin(self):
959 978 """
960 979 Returns list of user groups you're an admin of
961 980 """
962 return [x[0] for x in self.permissions['user_groups'].iteritems()
981 return [
982 x[0] for x in self.permissions['user_groups'].iteritems()
963 983 if x[1] == 'usergroup.admin']
964 984
965 985 @property
966 986 def ip_allowed(self):
967 987 """
968 988 Checks if ip_addr used in constructor is allowed from defined list of
969 989 allowed ip_addresses for user
970 990
971 991 :returns: boolean, True if ip is in allowed ip range
972 992 """
973 993 # check IP
974 994 inherit = self.inherit_default_permissions
975 995 return AuthUser.check_ip_allowed(self.user_id, self.ip_addr,
976 996 inherit_from_default=inherit)
977 997 @property
978 998 def personal_repo_group(self):
979 999 return RepoGroup.get_user_personal_repo_group(self.user_id)
980 1000
981 1001 @classmethod
982 1002 def check_ip_allowed(cls, user_id, ip_addr, inherit_from_default):
983 1003 allowed_ips = AuthUser.get_allowed_ips(
984 1004 user_id, cache=True, inherit_from_default=inherit_from_default)
985 1005 if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
986 1006 log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
987 1007 return True
988 1008 else:
989 1009 log.info('Access for IP:%s forbidden, '
990 1010 'not in %s' % (ip_addr, allowed_ips))
991 1011 return False
992 1012
993 1013 def __repr__(self):
994 1014 return "<AuthUser('id:%s[%s] ip:%s auth:%s')>"\
995 1015 % (self.user_id, self.username, self.ip_addr, self.is_authenticated)
996 1016
997 1017 def set_authenticated(self, authenticated=True):
998 1018 if self.user_id != self.anonymous_user.user_id:
999 1019 self.is_authenticated = authenticated
1000 1020
1001 1021 def get_cookie_store(self):
1002 1022 return {
1003 1023 'username': self.username,
1004 1024 'password': md5(self.password),
1005 1025 'user_id': self.user_id,
1006 1026 'is_authenticated': self.is_authenticated
1007 1027 }
1008 1028
1009 1029 @classmethod
1010 1030 def from_cookie_store(cls, cookie_store):
1011 1031 """
1012 1032 Creates AuthUser from a cookie store
1013 1033
1014 1034 :param cls:
1015 1035 :param cookie_store:
1016 1036 """
1017 1037 user_id = cookie_store.get('user_id')
1018 1038 username = cookie_store.get('username')
1019 1039 api_key = cookie_store.get('api_key')
1020 1040 return AuthUser(user_id, api_key, username)
1021 1041
1022 1042 @classmethod
1023 1043 def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
1024 1044 _set = set()
1025 1045
1026 1046 if inherit_from_default:
1027 1047 default_ips = UserIpMap.query().filter(
1028 1048 UserIpMap.user == User.get_default_user(cache=True))
1029 1049 if cache:
1030 1050 default_ips = default_ips.options(FromCache("sql_cache_short",
1031 1051 "get_user_ips_default"))
1032 1052
1033 1053 # populate from default user
1034 1054 for ip in default_ips:
1035 1055 try:
1036 1056 _set.add(ip.ip_addr)
1037 1057 except ObjectDeletedError:
1038 1058 # since we use heavy caching sometimes it happens that
1039 1059 # we get deleted objects here, we just skip them
1040 1060 pass
1041 1061
1042 1062 user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
1043 1063 if cache:
1044 1064 user_ips = user_ips.options(FromCache("sql_cache_short",
1045 1065 "get_user_ips_%s" % user_id))
1046 1066
1047 1067 for ip in user_ips:
1048 1068 try:
1049 1069 _set.add(ip.ip_addr)
1050 1070 except ObjectDeletedError:
1051 1071 # since we use heavy caching sometimes it happens that we get
1052 1072 # deleted objects here, we just skip them
1053 1073 pass
1054 1074 return _set or set(['0.0.0.0/0', '::/0'])
1055 1075
1056 1076
1057 1077 def set_available_permissions(config):
1058 1078 """
1059 1079 This function will propagate pylons globals with all available defined
1060 1080 permission given in db. We don't want to check each time from db for new
1061 1081 permissions since adding a new permission also requires application restart
1062 1082 ie. to decorate new views with the newly created permission
1063 1083
1064 1084 :param config: current pylons config instance
1065 1085
1066 1086 """
1067 1087 log.info('getting information about all available permissions')
1068 1088 try:
1069 1089 sa = meta.Session
1070 1090 all_perms = sa.query(Permission).all()
1071 1091 config['available_permissions'] = [x.permission_name for x in all_perms]
1072 1092 except Exception:
1073 1093 log.error(traceback.format_exc())
1074 1094 finally:
1075 1095 meta.Session.remove()
1076 1096
1077 1097
1078 1098 def get_csrf_token(session=None, force_new=False, save_if_missing=True):
1079 1099 """
1080 1100 Return the current authentication token, creating one if one doesn't
1081 1101 already exist and the save_if_missing flag is present.
1082 1102
1083 1103 :param session: pass in the pylons session, else we use the global ones
1084 1104 :param force_new: force to re-generate the token and store it in session
1085 1105 :param save_if_missing: save the newly generated token if it's missing in
1086 1106 session
1087 1107 """
1088 1108 if not session:
1089 1109 from pylons import session
1090 1110
1091 1111 if (csrf_token_key not in session and save_if_missing) or force_new:
1092 1112 token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
1093 1113 session[csrf_token_key] = token
1094 1114 if hasattr(session, 'save'):
1095 1115 session.save()
1096 1116 return session.get(csrf_token_key)
1097 1117
1098 1118
1099 1119 # CHECK DECORATORS
1100 1120 class CSRFRequired(object):
1101 1121 """
1102 1122 Decorator for authenticating a form
1103 1123
1104 1124 This decorator uses an authorization token stored in the client's
1105 1125 session for prevention of certain Cross-site request forgery (CSRF)
1106 1126 attacks (See
1107 1127 http://en.wikipedia.org/wiki/Cross-site_request_forgery for more
1108 1128 information).
1109 1129
1110 1130 For use with the ``webhelpers.secure_form`` helper functions.
1111 1131
1112 1132 """
1113 1133 def __init__(self, token=csrf_token_key, header='X-CSRF-Token',
1114 1134 except_methods=None):
1115 1135 self.token = token
1116 1136 self.header = header
1117 1137 self.except_methods = except_methods or []
1118 1138
1119 1139 def __call__(self, func):
1120 1140 return get_cython_compat_decorator(self.__wrapper, func)
1121 1141
1122 1142 def _get_csrf(self, _request):
1123 1143 return _request.POST.get(self.token, _request.headers.get(self.header))
1124 1144
1125 1145 def check_csrf(self, _request, cur_token):
1126 1146 supplied_token = self._get_csrf(_request)
1127 1147 return supplied_token and supplied_token == cur_token
1128 1148
1129 1149 def __wrapper(self, func, *fargs, **fkwargs):
1130 1150 if request.method in self.except_methods:
1131 1151 return func(*fargs, **fkwargs)
1132 1152
1133 1153 cur_token = get_csrf_token(save_if_missing=False)
1134 1154 if self.check_csrf(request, cur_token):
1135 1155 if request.POST.get(self.token):
1136 1156 del request.POST[self.token]
1137 1157 return func(*fargs, **fkwargs)
1138 1158 else:
1139 1159 reason = 'token-missing'
1140 1160 supplied_token = self._get_csrf(request)
1141 1161 if supplied_token and cur_token != supplied_token:
1142 1162 reason = 'token-mismatch [%s:%s]' % (cur_token or ''[:6],
1143 1163 supplied_token or ''[:6])
1144 1164
1145 1165 csrf_message = \
1146 1166 ("Cross-site request forgery detected, request denied. See "
1147 1167 "http://en.wikipedia.org/wiki/Cross-site_request_forgery for "
1148 1168 "more information.")
1149 1169 log.warn('Cross-site request forgery detected, request %r DENIED: %s '
1150 1170 'REMOTE_ADDR:%s, HEADERS:%s' % (
1151 1171 request, reason, request.remote_addr, request.headers))
1152 1172
1153 1173 raise HTTPForbidden(explanation=csrf_message)
1154 1174
1155 1175
1156 1176 class LoginRequired(object):
1157 1177 """
1158 1178 Must be logged in to execute this function else
1159 1179 redirect to login page
1160 1180
1161 1181 :param api_access: if enabled this checks only for valid auth token
1162 1182 and grants access based on valid token
1163 1183 """
1164 1184 def __init__(self, auth_token_access=None):
1165 1185 self.auth_token_access = auth_token_access
1166 1186
1167 1187 def __call__(self, func):
1168 1188 return get_cython_compat_decorator(self.__wrapper, func)
1169 1189
1170 1190 def __wrapper(self, func, *fargs, **fkwargs):
1171 1191 from rhodecode.lib import helpers as h
1172 1192 cls = fargs[0]
1173 1193 user = cls._rhodecode_user
1174 1194 loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
1175 1195 log.debug('Starting login restriction checks for user: %s' % (user,))
1176 1196 # check if our IP is allowed
1177 1197 ip_access_valid = True
1178 1198 if not user.ip_allowed:
1179 1199 h.flash(h.literal(_('IP %s not allowed' % (user.ip_addr,))),
1180 1200 category='warning')
1181 1201 ip_access_valid = False
1182 1202
1183 1203 # check if we used an APIKEY and it's a valid one
1184 1204 # defined white-list of controllers which API access will be enabled
1185 1205 _auth_token = request.GET.get(
1186 1206 'auth_token', '') or request.GET.get('api_key', '')
1187 1207 auth_token_access_valid = allowed_auth_token_access(
1188 1208 loc, auth_token=_auth_token)
1189 1209
1190 1210 # explicit controller is enabled or API is in our whitelist
1191 1211 if self.auth_token_access or auth_token_access_valid:
1192 1212 log.debug('Checking AUTH TOKEN access for %s' % (cls,))
1193 1213 db_user = user.get_instance()
1194 1214
1195 1215 if db_user:
1196 1216 if self.auth_token_access:
1197 1217 roles = self.auth_token_access
1198 1218 else:
1199 1219 roles = [UserApiKeys.ROLE_HTTP]
1200 1220 token_match = db_user.authenticate_by_token(
1201 1221 _auth_token, roles=roles, include_builtin_token=True)
1202 1222 else:
1203 1223 log.debug('Unable to fetch db instance for auth user: %s', user)
1204 1224 token_match = False
1205 1225
1206 1226 if _auth_token and token_match:
1207 1227 auth_token_access_valid = True
1208 1228 log.debug('AUTH TOKEN ****%s is VALID' % (_auth_token[-4:],))
1209 1229 else:
1210 1230 auth_token_access_valid = False
1211 1231 if not _auth_token:
1212 1232 log.debug("AUTH TOKEN *NOT* present in request")
1213 1233 else:
1214 1234 log.warning(
1215 1235 "AUTH TOKEN ****%s *NOT* valid" % _auth_token[-4:])
1216 1236
1217 1237 log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
1218 1238 reason = 'RHODECODE_AUTH' if user.is_authenticated \
1219 1239 else 'AUTH_TOKEN_AUTH'
1220 1240
1221 1241 if ip_access_valid and (
1222 1242 user.is_authenticated or auth_token_access_valid):
1223 1243 log.info(
1224 1244 'user %s authenticating with:%s IS authenticated on func %s'
1225 1245 % (user, reason, loc))
1226 1246
1227 1247 # update user data to check last activity
1228 1248 user.update_lastactivity()
1229 1249 Session().commit()
1230 1250 return func(*fargs, **fkwargs)
1231 1251 else:
1232 1252 log.warning(
1233 1253 'user %s authenticating with:%s NOT authenticated on '
1234 1254 'func: %s: IP_ACCESS:%s AUTH_TOKEN_ACCESS:%s'
1235 1255 % (user, reason, loc, ip_access_valid,
1236 1256 auth_token_access_valid))
1237 1257 # we preserve the get PARAM
1238 1258 came_from = request.path_qs
1239 1259
1240 1260 log.debug('redirecting to login page with %s' % (came_from,))
1241 1261 return redirect(
1242 1262 h.route_path('login', _query={'came_from': came_from}))
1243 1263
1244 1264
1245 1265 class NotAnonymous(object):
1246 1266 """
1247 1267 Must be logged in to execute this function else
1248 1268 redirect to login page"""
1249 1269
1250 1270 def __call__(self, func):
1251 1271 return get_cython_compat_decorator(self.__wrapper, func)
1252 1272
1253 1273 def __wrapper(self, func, *fargs, **fkwargs):
1254 1274 cls = fargs[0]
1255 1275 self.user = cls._rhodecode_user
1256 1276
1257 1277 log.debug('Checking if user is not anonymous @%s' % cls)
1258 1278
1259 1279 anonymous = self.user.username == User.DEFAULT_USER
1260 1280
1261 1281 if anonymous:
1262 1282 came_from = request.path_qs
1263 1283
1264 1284 import rhodecode.lib.helpers as h
1265 1285 h.flash(_('You need to be a registered user to '
1266 1286 'perform this action'),
1267 1287 category='warning')
1268 1288 return redirect(
1269 1289 h.route_path('login', _query={'came_from': came_from}))
1270 1290 else:
1271 1291 return func(*fargs, **fkwargs)
1272 1292
1273 1293
1274 1294 class XHRRequired(object):
1275 1295 def __call__(self, func):
1276 1296 return get_cython_compat_decorator(self.__wrapper, func)
1277 1297
1278 1298 def __wrapper(self, func, *fargs, **fkwargs):
1279 1299 log.debug('Checking if request is XMLHttpRequest (XHR)')
1280 1300 xhr_message = 'This is not a valid XMLHttpRequest (XHR) request'
1281 1301 if not request.is_xhr:
1282 1302 abort(400, detail=xhr_message)
1283 1303
1284 1304 return func(*fargs, **fkwargs)
1285 1305
1286 1306
1287 1307 class HasAcceptedRepoType(object):
1288 1308 """
1289 1309 Check if requested repo is within given repo type aliases
1290 1310
1291 1311 TODO: anderson: not sure where to put this decorator
1292 1312 """
1293 1313
1294 1314 def __init__(self, *repo_type_list):
1295 1315 self.repo_type_list = set(repo_type_list)
1296 1316
1297 1317 def __call__(self, func):
1298 1318 return get_cython_compat_decorator(self.__wrapper, func)
1299 1319
1300 1320 def __wrapper(self, func, *fargs, **fkwargs):
1301 1321 cls = fargs[0]
1302 1322 rhodecode_repo = cls.rhodecode_repo
1303 1323
1304 1324 log.debug('%s checking repo type for %s in %s',
1305 1325 self.__class__.__name__,
1306 1326 rhodecode_repo.alias, self.repo_type_list)
1307 1327
1308 1328 if rhodecode_repo.alias in self.repo_type_list:
1309 1329 return func(*fargs, **fkwargs)
1310 1330 else:
1311 1331 import rhodecode.lib.helpers as h
1312 1332 h.flash(h.literal(
1313 1333 _('Action not supported for %s.' % rhodecode_repo.alias)),
1314 1334 category='warning')
1315 1335 return redirect(
1316 1336 url('summary_home', repo_name=cls.rhodecode_db_repo.repo_name))
1317 1337
1318 1338
1319 1339 class PermsDecorator(object):
1320 1340 """
1321 1341 Base class for controller decorators, we extract the current user from
1322 1342 the class itself, which has it stored in base controllers
1323 1343 """
1324 1344
1325 1345 def __init__(self, *required_perms):
1326 1346 self.required_perms = set(required_perms)
1327 1347
1328 1348 def __call__(self, func):
1329 1349 return get_cython_compat_decorator(self.__wrapper, func)
1330 1350
1331 1351 def __wrapper(self, func, *fargs, **fkwargs):
1332 1352 cls = fargs[0]
1333 1353 _user = cls._rhodecode_user
1334 1354
1335 1355 log.debug('checking %s permissions %s for %s %s',
1336 1356 self.__class__.__name__, self.required_perms, cls, _user)
1337 1357
1338 1358 if self.check_permissions(_user):
1339 1359 log.debug('Permission granted for %s %s', cls, _user)
1340 1360 return func(*fargs, **fkwargs)
1341 1361
1342 1362 else:
1343 1363 log.debug('Permission denied for %s %s', cls, _user)
1344 1364 anonymous = _user.username == User.DEFAULT_USER
1345 1365
1346 1366 if anonymous:
1347 1367 came_from = request.path_qs
1348 1368
1349 1369 import rhodecode.lib.helpers as h
1350 1370 h.flash(_('You need to be signed in to view this page'),
1351 1371 category='warning')
1352 1372 return redirect(
1353 1373 h.route_path('login', _query={'came_from': came_from}))
1354 1374
1355 1375 else:
1356 1376 # redirect with forbidden ret code
1357 1377 return abort(403)
1358 1378
1359 1379 def check_permissions(self, user):
1360 1380 """Dummy function for overriding"""
1361 1381 raise NotImplementedError(
1362 1382 'You have to write this function in child class')
1363 1383
1364 1384
1365 1385 class HasPermissionAllDecorator(PermsDecorator):
1366 1386 """
1367 1387 Checks for access permission for all given predicates. All of them
1368 1388 have to be meet in order to fulfill the request
1369 1389 """
1370 1390
1371 1391 def check_permissions(self, user):
1372 1392 perms = user.permissions_with_scope({})
1373 1393 if self.required_perms.issubset(perms['global']):
1374 1394 return True
1375 1395 return False
1376 1396
1377 1397
1378 1398 class HasPermissionAnyDecorator(PermsDecorator):
1379 1399 """
1380 1400 Checks for access permission for any of given predicates. In order to
1381 1401 fulfill the request any of predicates must be meet
1382 1402 """
1383 1403
1384 1404 def check_permissions(self, user):
1385 1405 perms = user.permissions_with_scope({})
1386 1406 if self.required_perms.intersection(perms['global']):
1387 1407 return True
1388 1408 return False
1389 1409
1390 1410
1391 1411 class HasRepoPermissionAllDecorator(PermsDecorator):
1392 1412 """
1393 1413 Checks for access permission for all given predicates for specific
1394 1414 repository. All of them have to be meet in order to fulfill the request
1395 1415 """
1396 1416
1397 1417 def check_permissions(self, user):
1398 1418 perms = user.permissions
1399 1419 repo_name = get_repo_slug(request)
1400 1420 try:
1401 1421 user_perms = set([perms['repositories'][repo_name]])
1402 1422 except KeyError:
1403 1423 return False
1404 1424 if self.required_perms.issubset(user_perms):
1405 1425 return True
1406 1426 return False
1407 1427
1408 1428
1409 1429 class HasRepoPermissionAnyDecorator(PermsDecorator):
1410 1430 """
1411 1431 Checks for access permission for any of given predicates for specific
1412 1432 repository. In order to fulfill the request any of predicates must be meet
1413 1433 """
1414 1434
1415 1435 def check_permissions(self, user):
1416 1436 perms = user.permissions
1417 1437 repo_name = get_repo_slug(request)
1418 1438 try:
1419 1439 user_perms = set([perms['repositories'][repo_name]])
1420 1440 except KeyError:
1421 1441 return False
1422 1442
1423 1443 if self.required_perms.intersection(user_perms):
1424 1444 return True
1425 1445 return False
1426 1446
1427 1447
1428 1448 class HasRepoGroupPermissionAllDecorator(PermsDecorator):
1429 1449 """
1430 1450 Checks for access permission for all given predicates for specific
1431 1451 repository group. All of them have to be meet in order to
1432 1452 fulfill the request
1433 1453 """
1434 1454
1435 1455 def check_permissions(self, user):
1436 1456 perms = user.permissions
1437 1457 group_name = get_repo_group_slug(request)
1438 1458 try:
1439 1459 user_perms = set([perms['repositories_groups'][group_name]])
1440 1460 except KeyError:
1441 1461 return False
1442 1462
1443 1463 if self.required_perms.issubset(user_perms):
1444 1464 return True
1445 1465 return False
1446 1466
1447 1467
1448 1468 class HasRepoGroupPermissionAnyDecorator(PermsDecorator):
1449 1469 """
1450 1470 Checks for access permission for any of given predicates for specific
1451 1471 repository group. In order to fulfill the request any
1452 1472 of predicates must be met
1453 1473 """
1454 1474
1455 1475 def check_permissions(self, user):
1456 1476 perms = user.permissions
1457 1477 group_name = get_repo_group_slug(request)
1458 1478 try:
1459 1479 user_perms = set([perms['repositories_groups'][group_name]])
1460 1480 except KeyError:
1461 1481 return False
1462 1482
1463 1483 if self.required_perms.intersection(user_perms):
1464 1484 return True
1465 1485 return False
1466 1486
1467 1487
1468 1488 class HasUserGroupPermissionAllDecorator(PermsDecorator):
1469 1489 """
1470 1490 Checks for access permission for all given predicates for specific
1471 1491 user group. All of them have to be meet in order to fulfill the request
1472 1492 """
1473 1493
1474 1494 def check_permissions(self, user):
1475 1495 perms = user.permissions
1476 1496 group_name = get_user_group_slug(request)
1477 1497 try:
1478 1498 user_perms = set([perms['user_groups'][group_name]])
1479 1499 except KeyError:
1480 1500 return False
1481 1501
1482 1502 if self.required_perms.issubset(user_perms):
1483 1503 return True
1484 1504 return False
1485 1505
1486 1506
1487 1507 class HasUserGroupPermissionAnyDecorator(PermsDecorator):
1488 1508 """
1489 1509 Checks for access permission for any of given predicates for specific
1490 1510 user group. In order to fulfill the request any of predicates must be meet
1491 1511 """
1492 1512
1493 1513 def check_permissions(self, user):
1494 1514 perms = user.permissions
1495 1515 group_name = get_user_group_slug(request)
1496 1516 try:
1497 1517 user_perms = set([perms['user_groups'][group_name]])
1498 1518 except KeyError:
1499 1519 return False
1500 1520
1501 1521 if self.required_perms.intersection(user_perms):
1502 1522 return True
1503 1523 return False
1504 1524
1505 1525
1506 1526 # CHECK FUNCTIONS
1507 1527 class PermsFunction(object):
1508 1528 """Base function for other check functions"""
1509 1529
1510 1530 def __init__(self, *perms):
1511 1531 self.required_perms = set(perms)
1512 1532 self.repo_name = None
1513 1533 self.repo_group_name = None
1514 1534 self.user_group_name = None
1515 1535
1516 1536 def __bool__(self):
1517 1537 frame = inspect.currentframe()
1518 1538 stack_trace = traceback.format_stack(frame)
1519 1539 log.error('Checking bool value on a class instance of perm '
1520 1540 'function is not allowed: %s' % ''.join(stack_trace))
1521 1541 # rather than throwing errors, here we always return False so if by
1522 1542 # accident someone checks truth for just an instance it will always end
1523 1543 # up in returning False
1524 1544 return False
1525 1545 __nonzero__ = __bool__
1526 1546
1527 1547 def __call__(self, check_location='', user=None):
1528 1548 if not user:
1529 1549 log.debug('Using user attribute from global request')
1530 1550 # TODO: remove this someday,put as user as attribute here
1531 1551 user = request.user
1532 1552
1533 1553 # init auth user if not already given
1534 1554 if not isinstance(user, AuthUser):
1535 1555 log.debug('Wrapping user %s into AuthUser', user)
1536 1556 user = AuthUser(user.user_id)
1537 1557
1538 1558 cls_name = self.__class__.__name__
1539 1559 check_scope = self._get_check_scope(cls_name)
1540 1560 check_location = check_location or 'unspecified location'
1541 1561
1542 1562 log.debug('checking cls:%s %s usr:%s %s @ %s', cls_name,
1543 1563 self.required_perms, user, check_scope, check_location)
1544 1564 if not user:
1545 1565 log.warning('Empty user given for permission check')
1546 1566 return False
1547 1567
1548 1568 if self.check_permissions(user):
1549 1569 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1550 1570 check_scope, user, check_location)
1551 1571 return True
1552 1572
1553 1573 else:
1554 1574 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1555 1575 check_scope, user, check_location)
1556 1576 return False
1557 1577
1558 1578 def _get_check_scope(self, cls_name):
1559 1579 return {
1560 1580 'HasPermissionAll': 'GLOBAL',
1561 1581 'HasPermissionAny': 'GLOBAL',
1562 1582 'HasRepoPermissionAll': 'repo:%s' % self.repo_name,
1563 1583 'HasRepoPermissionAny': 'repo:%s' % self.repo_name,
1564 1584 'HasRepoGroupPermissionAll': 'repo_group:%s' % self.repo_group_name,
1565 1585 'HasRepoGroupPermissionAny': 'repo_group:%s' % self.repo_group_name,
1566 1586 'HasUserGroupPermissionAll': 'user_group:%s' % self.user_group_name,
1567 1587 'HasUserGroupPermissionAny': 'user_group:%s' % self.user_group_name,
1568 1588 }.get(cls_name, '?:%s' % cls_name)
1569 1589
1570 1590 def check_permissions(self, user):
1571 1591 """Dummy function for overriding"""
1572 1592 raise Exception('You have to write this function in child class')
1573 1593
1574 1594
1575 1595 class HasPermissionAll(PermsFunction):
1576 1596 def check_permissions(self, user):
1577 1597 perms = user.permissions_with_scope({})
1578 1598 if self.required_perms.issubset(perms.get('global')):
1579 1599 return True
1580 1600 return False
1581 1601
1582 1602
1583 1603 class HasPermissionAny(PermsFunction):
1584 1604 def check_permissions(self, user):
1585 1605 perms = user.permissions_with_scope({})
1586 1606 if self.required_perms.intersection(perms.get('global')):
1587 1607 return True
1588 1608 return False
1589 1609
1590 1610
1591 1611 class HasRepoPermissionAll(PermsFunction):
1592 1612 def __call__(self, repo_name=None, check_location='', user=None):
1593 1613 self.repo_name = repo_name
1594 1614 return super(HasRepoPermissionAll, self).__call__(check_location, user)
1595 1615
1596 1616 def check_permissions(self, user):
1597 1617 if not self.repo_name:
1598 1618 self.repo_name = get_repo_slug(request)
1599 1619
1600 1620 perms = user.permissions
1601 1621 try:
1602 1622 user_perms = set([perms['repositories'][self.repo_name]])
1603 1623 except KeyError:
1604 1624 return False
1605 1625 if self.required_perms.issubset(user_perms):
1606 1626 return True
1607 1627 return False
1608 1628
1609 1629
1610 1630 class HasRepoPermissionAny(PermsFunction):
1611 1631 def __call__(self, repo_name=None, check_location='', user=None):
1612 1632 self.repo_name = repo_name
1613 1633 return super(HasRepoPermissionAny, self).__call__(check_location, user)
1614 1634
1615 1635 def check_permissions(self, user):
1616 1636 if not self.repo_name:
1617 1637 self.repo_name = get_repo_slug(request)
1618 1638
1619 1639 perms = user.permissions
1620 1640 try:
1621 1641 user_perms = set([perms['repositories'][self.repo_name]])
1622 1642 except KeyError:
1623 1643 return False
1624 1644 if self.required_perms.intersection(user_perms):
1625 1645 return True
1626 1646 return False
1627 1647
1628 1648
1629 1649 class HasRepoGroupPermissionAny(PermsFunction):
1630 1650 def __call__(self, group_name=None, check_location='', user=None):
1631 1651 self.repo_group_name = group_name
1632 1652 return super(HasRepoGroupPermissionAny, self).__call__(
1633 1653 check_location, user)
1634 1654
1635 1655 def check_permissions(self, user):
1636 1656 perms = user.permissions
1637 1657 try:
1638 1658 user_perms = set(
1639 1659 [perms['repositories_groups'][self.repo_group_name]])
1640 1660 except KeyError:
1641 1661 return False
1642 1662 if self.required_perms.intersection(user_perms):
1643 1663 return True
1644 1664 return False
1645 1665
1646 1666
1647 1667 class HasRepoGroupPermissionAll(PermsFunction):
1648 1668 def __call__(self, group_name=None, check_location='', user=None):
1649 1669 self.repo_group_name = group_name
1650 1670 return super(HasRepoGroupPermissionAll, self).__call__(
1651 1671 check_location, user)
1652 1672
1653 1673 def check_permissions(self, user):
1654 1674 perms = user.permissions
1655 1675 try:
1656 1676 user_perms = set(
1657 1677 [perms['repositories_groups'][self.repo_group_name]])
1658 1678 except KeyError:
1659 1679 return False
1660 1680 if self.required_perms.issubset(user_perms):
1661 1681 return True
1662 1682 return False
1663 1683
1664 1684
1665 1685 class HasUserGroupPermissionAny(PermsFunction):
1666 1686 def __call__(self, user_group_name=None, check_location='', user=None):
1667 1687 self.user_group_name = user_group_name
1668 1688 return super(HasUserGroupPermissionAny, self).__call__(
1669 1689 check_location, user)
1670 1690
1671 1691 def check_permissions(self, user):
1672 1692 perms = user.permissions
1673 1693 try:
1674 1694 user_perms = set([perms['user_groups'][self.user_group_name]])
1675 1695 except KeyError:
1676 1696 return False
1677 1697 if self.required_perms.intersection(user_perms):
1678 1698 return True
1679 1699 return False
1680 1700
1681 1701
1682 1702 class HasUserGroupPermissionAll(PermsFunction):
1683 1703 def __call__(self, user_group_name=None, check_location='', user=None):
1684 1704 self.user_group_name = user_group_name
1685 1705 return super(HasUserGroupPermissionAll, self).__call__(
1686 1706 check_location, user)
1687 1707
1688 1708 def check_permissions(self, user):
1689 1709 perms = user.permissions
1690 1710 try:
1691 1711 user_perms = set([perms['user_groups'][self.user_group_name]])
1692 1712 except KeyError:
1693 1713 return False
1694 1714 if self.required_perms.issubset(user_perms):
1695 1715 return True
1696 1716 return False
1697 1717
1698 1718
1699 1719 # SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
1700 1720 class HasPermissionAnyMiddleware(object):
1701 1721 def __init__(self, *perms):
1702 1722 self.required_perms = set(perms)
1703 1723
1704 1724 def __call__(self, user, repo_name):
1705 1725 # repo_name MUST be unicode, since we handle keys in permission
1706 1726 # dict by unicode
1707 1727 repo_name = safe_unicode(repo_name)
1708 1728 user = AuthUser(user.user_id)
1709 1729 log.debug(
1710 1730 'Checking VCS protocol permissions %s for user:%s repo:`%s`',
1711 1731 self.required_perms, user, repo_name)
1712 1732
1713 1733 if self.check_permissions(user, repo_name):
1714 1734 log.debug('Permission to repo:`%s` GRANTED for user:%s @ %s',
1715 1735 repo_name, user, 'PermissionMiddleware')
1716 1736 return True
1717 1737
1718 1738 else:
1719 1739 log.debug('Permission to repo:`%s` DENIED for user:%s @ %s',
1720 1740 repo_name, user, 'PermissionMiddleware')
1721 1741 return False
1722 1742
1723 1743 def check_permissions(self, user, repo_name):
1724 1744 perms = user.permissions_with_scope({'repo_name': repo_name})
1725 1745
1726 1746 try:
1727 1747 user_perms = set([perms['repositories'][repo_name]])
1728 1748 except Exception:
1729 1749 log.exception('Error while accessing user permissions')
1730 1750 return False
1731 1751
1732 1752 if self.required_perms.intersection(user_perms):
1733 1753 return True
1734 1754 return False
1735 1755
1736 1756
1737 1757 # SPECIAL VERSION TO HANDLE API AUTH
1738 1758 class _BaseApiPerm(object):
1739 1759 def __init__(self, *perms):
1740 1760 self.required_perms = set(perms)
1741 1761
1742 1762 def __call__(self, check_location=None, user=None, repo_name=None,
1743 1763 group_name=None, user_group_name=None):
1744 1764 cls_name = self.__class__.__name__
1745 1765 check_scope = 'global:%s' % (self.required_perms,)
1746 1766 if repo_name:
1747 1767 check_scope += ', repo_name:%s' % (repo_name,)
1748 1768
1749 1769 if group_name:
1750 1770 check_scope += ', repo_group_name:%s' % (group_name,)
1751 1771
1752 1772 if user_group_name:
1753 1773 check_scope += ', user_group_name:%s' % (user_group_name,)
1754 1774
1755 1775 log.debug(
1756 1776 'checking cls:%s %s %s @ %s'
1757 1777 % (cls_name, self.required_perms, check_scope, check_location))
1758 1778 if not user:
1759 1779 log.debug('Empty User passed into arguments')
1760 1780 return False
1761 1781
1762 1782 # process user
1763 1783 if not isinstance(user, AuthUser):
1764 1784 user = AuthUser(user.user_id)
1765 1785 if not check_location:
1766 1786 check_location = 'unspecified'
1767 1787 if self.check_permissions(user.permissions, repo_name, group_name,
1768 1788 user_group_name):
1769 1789 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1770 1790 check_scope, user, check_location)
1771 1791 return True
1772 1792
1773 1793 else:
1774 1794 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1775 1795 check_scope, user, check_location)
1776 1796 return False
1777 1797
1778 1798 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1779 1799 user_group_name=None):
1780 1800 """
1781 1801 implement in child class should return True if permissions are ok,
1782 1802 False otherwise
1783 1803
1784 1804 :param perm_defs: dict with permission definitions
1785 1805 :param repo_name: repo name
1786 1806 """
1787 1807 raise NotImplementedError()
1788 1808
1789 1809
1790 1810 class HasPermissionAllApi(_BaseApiPerm):
1791 1811 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1792 1812 user_group_name=None):
1793 1813 if self.required_perms.issubset(perm_defs.get('global')):
1794 1814 return True
1795 1815 return False
1796 1816
1797 1817
1798 1818 class HasPermissionAnyApi(_BaseApiPerm):
1799 1819 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1800 1820 user_group_name=None):
1801 1821 if self.required_perms.intersection(perm_defs.get('global')):
1802 1822 return True
1803 1823 return False
1804 1824
1805 1825
1806 1826 class HasRepoPermissionAllApi(_BaseApiPerm):
1807 1827 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1808 1828 user_group_name=None):
1809 1829 try:
1810 1830 _user_perms = set([perm_defs['repositories'][repo_name]])
1811 1831 except KeyError:
1812 1832 log.warning(traceback.format_exc())
1813 1833 return False
1814 1834 if self.required_perms.issubset(_user_perms):
1815 1835 return True
1816 1836 return False
1817 1837
1818 1838
1819 1839 class HasRepoPermissionAnyApi(_BaseApiPerm):
1820 1840 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1821 1841 user_group_name=None):
1822 1842 try:
1823 1843 _user_perms = set([perm_defs['repositories'][repo_name]])
1824 1844 except KeyError:
1825 1845 log.warning(traceback.format_exc())
1826 1846 return False
1827 1847 if self.required_perms.intersection(_user_perms):
1828 1848 return True
1829 1849 return False
1830 1850
1831 1851
1832 1852 class HasRepoGroupPermissionAnyApi(_BaseApiPerm):
1833 1853 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1834 1854 user_group_name=None):
1835 1855 try:
1836 1856 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1837 1857 except KeyError:
1838 1858 log.warning(traceback.format_exc())
1839 1859 return False
1840 1860 if self.required_perms.intersection(_user_perms):
1841 1861 return True
1842 1862 return False
1843 1863
1844 1864
1845 1865 class HasRepoGroupPermissionAllApi(_BaseApiPerm):
1846 1866 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1847 1867 user_group_name=None):
1848 1868 try:
1849 1869 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1850 1870 except KeyError:
1851 1871 log.warning(traceback.format_exc())
1852 1872 return False
1853 1873 if self.required_perms.issubset(_user_perms):
1854 1874 return True
1855 1875 return False
1856 1876
1857 1877
1858 1878 class HasUserGroupPermissionAnyApi(_BaseApiPerm):
1859 1879 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1860 1880 user_group_name=None):
1861 1881 try:
1862 1882 _user_perms = set([perm_defs['user_groups'][user_group_name]])
1863 1883 except KeyError:
1864 1884 log.warning(traceback.format_exc())
1865 1885 return False
1866 1886 if self.required_perms.intersection(_user_perms):
1867 1887 return True
1868 1888 return False
1869 1889
1870 1890
1871 1891 def check_ip_access(source_ip, allowed_ips=None):
1872 1892 """
1873 1893 Checks if source_ip is a subnet of any of allowed_ips.
1874 1894
1875 1895 :param source_ip:
1876 1896 :param allowed_ips: list of allowed ips together with mask
1877 1897 """
1878 1898 log.debug('checking if ip:%s is subnet of %s' % (source_ip, allowed_ips))
1879 1899 source_ip_address = ipaddress.ip_address(source_ip)
1880 1900 if isinstance(allowed_ips, (tuple, list, set)):
1881 1901 for ip in allowed_ips:
1882 1902 try:
1883 1903 network_address = ipaddress.ip_network(ip, strict=False)
1884 1904 if source_ip_address in network_address:
1885 1905 log.debug('IP %s is network %s' %
1886 1906 (source_ip_address, network_address))
1887 1907 return True
1888 1908 # for any case we cannot determine the IP, don't crash just
1889 1909 # skip it and log as error, we want to say forbidden still when
1890 1910 # sending bad IP
1891 1911 except Exception:
1892 1912 log.error(traceback.format_exc())
1893 1913 continue
1894 1914 return False
1895 1915
1896 1916
1897 1917 def get_cython_compat_decorator(wrapper, func):
1898 1918 """
1899 1919 Creates a cython compatible decorator. The previously used
1900 1920 decorator.decorator() function seems to be incompatible with cython.
1901 1921
1902 1922 :param wrapper: __wrapper method of the decorator class
1903 1923 :param func: decorated function
1904 1924 """
1905 1925 @wraps(func)
1906 1926 def local_wrapper(*args, **kwds):
1907 1927 return wrapper(func, *args, **kwds)
1908 1928 local_wrapper.__wrapped__ = func
1909 1929 return local_wrapper
@@ -1,598 +1,598 b''
1 1 ## -*- coding: utf-8 -*-
2 2 <%inherit file="root.mako"/>
3 3
4 4 <div class="outerwrapper">
5 5 <!-- HEADER -->
6 6 <div class="header">
7 7 <div id="header-inner" class="wrapper">
8 8 <div id="logo">
9 9 <div class="logo-wrapper">
10 10 <a href="${h.url('home')}"><img src="${h.asset('images/rhodecode-logo-white-216x60.png')}" alt="RhodeCode"/></a>
11 11 </div>
12 12 %if c.rhodecode_name:
13 13 <div class="branding">- ${h.branding(c.rhodecode_name)}</div>
14 14 %endif
15 15 </div>
16 16 <!-- MENU BAR NAV -->
17 17 ${self.menu_bar_nav()}
18 18 <!-- END MENU BAR NAV -->
19 19 </div>
20 20 </div>
21 21 ${self.menu_bar_subnav()}
22 22 <!-- END HEADER -->
23 23
24 24 <!-- CONTENT -->
25 25 <div id="content" class="wrapper">
26 26 <div class="main">
27 27 ${next.main()}
28 28 </div>
29 29 </div>
30 30 <!-- END CONTENT -->
31 31
32 32 </div>
33 33 <!-- FOOTER -->
34 34 <div id="footer">
35 35 <div id="footer-inner" class="title wrapper">
36 36 <div>
37 37 <p class="footer-link-right">
38 38 % if c.visual.show_version:
39 39 RhodeCode Enterprise ${c.rhodecode_version} ${c.rhodecode_edition}
40 40 % endif
41 41 &copy; 2010-${h.datetime.today().year}, <a href="${h.url('rhodecode_official')}" target="_blank">RhodeCode GmbH</a>. All rights reserved.
42 42 % if c.visual.rhodecode_support_url:
43 43 <a href="${c.visual.rhodecode_support_url}" target="_blank">${_('Support')}</a>
44 44 % endif
45 45 </p>
46 46 <% sid = 'block' if request.GET.get('showrcid') else 'none' %>
47 47 <p class="server-instance" style="display:${sid}">
48 48 ## display hidden instance ID if specially defined
49 49 % if c.rhodecode_instanceid:
50 50 ${_('RhodeCode instance id: %s') % c.rhodecode_instanceid}
51 51 % endif
52 52 </p>
53 53 </div>
54 54 </div>
55 55 </div>
56 56
57 57 <!-- END FOOTER -->
58 58
59 59 ### MAKO DEFS ###
60 60
61 61 <%def name="menu_bar_subnav()">
62 62 </%def>
63 63
64 64 <%def name="breadcrumbs(class_='breadcrumbs')">
65 65 <div class="${class_}">
66 66 ${self.breadcrumbs_links()}
67 67 </div>
68 68 </%def>
69 69
70 70 <%def name="admin_menu()">
71 71 <ul class="admin_menu submenu">
72 72 <li><a href="${h.url('admin_home')}">${_('Admin journal')}</a></li>
73 73 <li><a href="${h.url('repos')}">${_('Repositories')}</a></li>
74 74 <li><a href="${h.url('repo_groups')}">${_('Repository groups')}</a></li>
75 75 <li><a href="${h.url('users')}">${_('Users')}</a></li>
76 76 <li><a href="${h.url('users_groups')}">${_('User groups')}</a></li>
77 77 <li><a href="${h.url('admin_permissions_application')}">${_('Permissions')}</a></li>
78 78 <li><a href="${h.route_path('auth_home', traverse='')}">${_('Authentication')}</a></li>
79 79 <li><a href="${h.route_path('global_integrations_home')}">${_('Integrations')}</a></li>
80 80 <li><a href="${h.url('admin_defaults_repositories')}">${_('Defaults')}</a></li>
81 81 <li class="last"><a href="${h.url('admin_settings')}">${_('Settings')}</a></li>
82 82 </ul>
83 83 </%def>
84 84
85 85
86 86 <%def name="dt_info_panel(elements)">
87 87 <dl class="dl-horizontal">
88 88 %for dt, dd, title, show_items in elements:
89 89 <dt>${dt}:</dt>
90 90 <dd title="${title}">
91 91 %if callable(dd):
92 92 ## allow lazy evaluation of elements
93 93 ${dd()}
94 94 %else:
95 95 ${dd}
96 96 %endif
97 97 %if show_items:
98 98 <span class="btn-collapse" data-toggle="item-${h.md5_safe(dt)[:6]}-details">${_('Show More')} </span>
99 99 %endif
100 100 </dd>
101 101
102 102 %if show_items:
103 103 <div class="collapsable-content" data-toggle="item-${h.md5_safe(dt)[:6]}-details" style="display: none">
104 104 %for item in show_items:
105 105 <dt></dt>
106 106 <dd>${item}</dd>
107 107 %endfor
108 108 </div>
109 109 %endif
110 110
111 111 %endfor
112 112 </dl>
113 113 </%def>
114 114
115 115
116 116 <%def name="gravatar(email, size=16)">
117 117 <%
118 118 if (size > 16):
119 119 gravatar_class = 'gravatar gravatar-large'
120 120 else:
121 121 gravatar_class = 'gravatar'
122 122 %>
123 123 <%doc>
124 124 TODO: johbo: For now we serve double size images to make it smooth
125 125 for retina. This is how it worked until now. Should be replaced
126 126 with a better solution at some point.
127 127 </%doc>
128 128 <img class="${gravatar_class}" src="${h.gravatar_url(email, size * 2)}" height="${size}" width="${size}">
129 129 </%def>
130 130
131 131
132 132 <%def name="gravatar_with_user(contact, size=16, show_disabled=False)">
133 133 <% email = h.email_or_none(contact) %>
134 134 <div class="rc-user tooltip" title="${h.author_string(email)}">
135 135 ${self.gravatar(email, size)}
136 136 <span class="${'user user-disabled' if show_disabled else 'user'}"> ${h.link_to_user(contact)}</span>
137 137 </div>
138 138 </%def>
139 139
140 140
141 141 ## admin menu used for people that have some admin resources
142 142 <%def name="admin_menu_simple(repositories=None, repository_groups=None, user_groups=None)">
143 143 <ul class="submenu">
144 144 %if repositories:
145 <li><a href="${h.url('repos')}">${_('Repositories')}</a></li>
145 <li class="local-admin-repos"><a href="${h.url('repos')}">${_('Repositories')}</a></li>
146 146 %endif
147 147 %if repository_groups:
148 <li><a href="${h.url('repo_groups')}">${_('Repository groups')}</a></li>
148 <li class="local-admin-repo-groups"><a href="${h.url('repo_groups')}">${_('Repository groups')}</a></li>
149 149 %endif
150 150 %if user_groups:
151 <li><a href="${h.url('users_groups')}">${_('User groups')}</a></li>
151 <li class="local-admin-user-groups"><a href="${h.url('users_groups')}">${_('User groups')}</a></li>
152 152 %endif
153 153 </ul>
154 154 </%def>
155 155
156 156 <%def name="repo_page_title(repo_instance)">
157 157 <div class="title-content">
158 158 <div class="title-main">
159 159 ## SVN/HG/GIT icons
160 160 %if h.is_hg(repo_instance):
161 161 <i class="icon-hg"></i>
162 162 %endif
163 163 %if h.is_git(repo_instance):
164 164 <i class="icon-git"></i>
165 165 %endif
166 166 %if h.is_svn(repo_instance):
167 167 <i class="icon-svn"></i>
168 168 %endif
169 169
170 170 ## public/private
171 171 %if repo_instance.private:
172 172 <i class="icon-repo-private"></i>
173 173 %else:
174 174 <i class="icon-repo-public"></i>
175 175 %endif
176 176
177 177 ## repo name with group name
178 178 ${h.breadcrumb_repo_link(c.rhodecode_db_repo)}
179 179
180 180 </div>
181 181
182 182 ## FORKED
183 183 %if repo_instance.fork:
184 184 <p>
185 185 <i class="icon-code-fork"></i> ${_('Fork of')}
186 186 <a href="${h.url('summary_home',repo_name=repo_instance.fork.repo_name)}">${repo_instance.fork.repo_name}</a>
187 187 </p>
188 188 %endif
189 189
190 190 ## IMPORTED FROM REMOTE
191 191 %if repo_instance.clone_uri:
192 192 <p>
193 193 <i class="icon-code-fork"></i> ${_('Clone from')}
194 194 <a href="${h.url(h.safe_str(h.hide_credentials(repo_instance.clone_uri)))}">${h.hide_credentials(repo_instance.clone_uri)}</a>
195 195 </p>
196 196 %endif
197 197
198 198 ## LOCKING STATUS
199 199 %if repo_instance.locked[0]:
200 200 <p class="locking_locked">
201 201 <i class="icon-repo-lock"></i>
202 202 ${_('Repository locked by %(user)s') % {'user': h.person_by_id(repo_instance.locked[0])}}
203 203 </p>
204 204 %elif repo_instance.enable_locking:
205 205 <p class="locking_unlocked">
206 206 <i class="icon-repo-unlock"></i>
207 207 ${_('Repository not locked. Pull repository to lock it.')}
208 208 </p>
209 209 %endif
210 210
211 211 </div>
212 212 </%def>
213 213
214 214 <%def name="repo_menu(active=None)">
215 215 <%
216 216 def is_active(selected):
217 217 if selected == active:
218 218 return "active"
219 219 %>
220 220
221 221 <!--- CONTEXT BAR -->
222 222 <div id="context-bar">
223 223 <div class="wrapper">
224 224 <ul id="context-pages" class="horizontal-list navigation">
225 225 <li class="${is_active('summary')}"><a class="menulink" href="${h.url('summary_home', repo_name=c.repo_name)}"><div class="menulabel">${_('Summary')}</div></a></li>
226 226 <li class="${is_active('changelog')}"><a class="menulink" href="${h.url('changelog_home', repo_name=c.repo_name)}"><div class="menulabel">${_('Changelog')}</div></a></li>
227 227 <li class="${is_active('files')}"><a class="menulink" href="${h.url('files_home', repo_name=c.repo_name, revision=c.rhodecode_db_repo.landing_rev[1])}"><div class="menulabel">${_('Files')}</div></a></li>
228 228 <li class="${is_active('compare')}">
229 229 <a class="menulink" href="${h.url('compare_home',repo_name=c.repo_name)}"><div class="menulabel">${_('Compare')}</div></a>
230 230 </li>
231 231 ## TODO: anderson: ideally it would have a function on the scm_instance "enable_pullrequest() and enable_fork()"
232 232 %if c.rhodecode_db_repo.repo_type in ['git','hg']:
233 233 <li class="${is_active('showpullrequest')}">
234 234 <a class="menulink" href="${h.url('pullrequest_show_all',repo_name=c.repo_name)}" title="${_('Show Pull Requests for %s') % c.repo_name}">
235 235 %if c.repository_pull_requests:
236 236 <span class="pr_notifications">${c.repository_pull_requests}</span>
237 237 %endif
238 238 <div class="menulabel">${_('Pull Requests')}</div>
239 239 </a>
240 240 </li>
241 241 %endif
242 242 <li class="${is_active('options')}">
243 243 <a class="menulink" href="#" class="dropdown"><div class="menulabel">${_('Options')} <div class="show_more"></div></div></a>
244 244 <ul class="submenu">
245 245 %if h.HasRepoPermissionAll('repository.admin')(c.repo_name):
246 246 <li><a href="${h.url('edit_repo',repo_name=c.repo_name)}">${_('Settings')}</a></li>
247 247 %endif
248 248 %if c.rhodecode_db_repo.fork:
249 249 <li><a href="${h.url('compare_url',repo_name=c.rhodecode_db_repo.fork.repo_name,source_ref_type=c.rhodecode_db_repo.landing_rev[0],source_ref=c.rhodecode_db_repo.landing_rev[1], target_repo=c.repo_name,target_ref_type='branch' if request.GET.get('branch') else c.rhodecode_db_repo.landing_rev[0],target_ref=request.GET.get('branch') or c.rhodecode_db_repo.landing_rev[1], merge=1)}">
250 250 ${_('Compare fork')}</a></li>
251 251 %endif
252 252
253 253 <li><a href="${h.url('search_repo_home',repo_name=c.repo_name)}">${_('Search')}</a></li>
254 254
255 255 %if h.HasRepoPermissionAny('repository.write','repository.admin')(c.repo_name) and c.rhodecode_db_repo.enable_locking:
256 256 %if c.rhodecode_db_repo.locked[0]:
257 257 <li><a class="locking_del" href="${h.url('toggle_locking',repo_name=c.repo_name)}">${_('Unlock')}</a></li>
258 258 %else:
259 259 <li><a class="locking_add" href="${h.url('toggle_locking',repo_name=c.repo_name)}">${_('Lock')}</a></li>
260 260 %endif
261 261 %endif
262 262 %if c.rhodecode_user.username != h.DEFAULT_USER:
263 263 %if c.rhodecode_db_repo.repo_type in ['git','hg']:
264 264 <li><a href="${h.url('repo_fork_home',repo_name=c.repo_name)}">${_('Fork')}</a></li>
265 265 <li><a href="${h.url('pullrequest_home',repo_name=c.repo_name)}">${_('Create Pull Request')}</a></li>
266 266 %endif
267 267 %endif
268 268 </ul>
269 269 </li>
270 270 </ul>
271 271 </div>
272 272 <div class="clear"></div>
273 273 </div>
274 274 <!--- END CONTEXT BAR -->
275 275
276 276 </%def>
277 277
278 278 <%def name="usermenu(active=False)">
279 279 ## USER MENU
280 280 <li id="quick_login_li" class="${'active' if active else ''}">
281 281 <a id="quick_login_link" class="menulink childs">
282 282 ${gravatar(c.rhodecode_user.email, 20)}
283 283 <span class="user">
284 284 %if c.rhodecode_user.username != h.DEFAULT_USER:
285 285 <span class="menu_link_user">${c.rhodecode_user.username}</span><div class="show_more"></div>
286 286 %else:
287 287 <span>${_('Sign in')}</span>
288 288 %endif
289 289 </span>
290 290 </a>
291 291
292 292 <div class="user-menu submenu">
293 293 <div id="quick_login">
294 294 %if c.rhodecode_user.username == h.DEFAULT_USER:
295 295 <h4>${_('Sign in to your account')}</h4>
296 296 ${h.form(h.route_path('login', _query={'came_from': h.url.current()}), needs_csrf_token=False)}
297 297 <div class="form form-vertical">
298 298 <div class="fields">
299 299 <div class="field">
300 300 <div class="label">
301 301 <label for="username">${_('Username')}:</label>
302 302 </div>
303 303 <div class="input">
304 304 ${h.text('username',class_='focus',tabindex=1)}
305 305 </div>
306 306
307 307 </div>
308 308 <div class="field">
309 309 <div class="label">
310 310 <label for="password">${_('Password')}:</label>
311 311 %if h.HasPermissionAny('hg.password_reset.enabled')():
312 312 <span class="forgot_password">${h.link_to(_('(Forgot password?)'),h.route_path('reset_password'), class_='pwd_reset')}</span>
313 313 %endif
314 314 </div>
315 315 <div class="input">
316 316 ${h.password('password',class_='focus',tabindex=2)}
317 317 </div>
318 318 </div>
319 319 <div class="buttons">
320 320 <div class="register">
321 321 %if h.HasPermissionAny('hg.admin', 'hg.register.auto_activate', 'hg.register.manual_activate')():
322 322 ${h.link_to(_("Don't have an account ?"),h.route_path('register'))}
323 323 %endif
324 324 </div>
325 325 <div class="submit">
326 326 ${h.submit('sign_in',_('Sign In'),class_="btn btn-small",tabindex=3)}
327 327 </div>
328 328 </div>
329 329 </div>
330 330 </div>
331 331 ${h.end_form()}
332 332 %else:
333 333 <div class="">
334 334 <div class="big_gravatar">${gravatar(c.rhodecode_user.email, 48)}</div>
335 335 <div class="full_name">${c.rhodecode_user.full_name_or_username}</div>
336 336 <div class="email">${c.rhodecode_user.email}</div>
337 337 </div>
338 338 <div class="">
339 339 <ol class="links">
340 340 <li>${h.link_to(_(u'My account'),h.url('my_account'))}</li>
341 341 % if c.rhodecode_user.personal_repo_group:
342 342 <li>${h.link_to(_(u'My personal group'), h.url('repo_group_home', group_name=c.rhodecode_user.personal_repo_group.group_name))}</li>
343 343 % endif
344 344 <li class="logout">
345 345 ${h.secure_form(h.route_path('logout'))}
346 346 ${h.submit('log_out', _(u'Sign Out'),class_="btn btn-primary")}
347 347 ${h.end_form()}
348 348 </li>
349 349 </ol>
350 350 </div>
351 351 %endif
352 352 </div>
353 353 </div>
354 354 %if c.rhodecode_user.username != h.DEFAULT_USER:
355 355 <div class="pill_container">
356 356 % if c.unread_notifications == 0:
357 357 <a class="menu_link_notifications empty" href="${h.url('notifications')}">${c.unread_notifications}</a>
358 358 % else:
359 359 <a class="menu_link_notifications" href="${h.url('notifications')}">${c.unread_notifications}</a>
360 360 % endif
361 361 </div>
362 362 % endif
363 363 </li>
364 364 </%def>
365 365
366 366 <%def name="menu_items(active=None)">
367 367 <%
368 368 def is_active(selected):
369 369 if selected == active:
370 370 return "active"
371 371 return ""
372 372 %>
373 373 <ul id="quick" class="main_nav navigation horizontal-list">
374 374 <!-- repo switcher -->
375 375 <li class="${is_active('repositories')} repo_switcher_li has_select2">
376 376 <input id="repo_switcher" name="repo_switcher" type="hidden">
377 377 </li>
378 378
379 379 ## ROOT MENU
380 380 %if c.rhodecode_user.username != h.DEFAULT_USER:
381 381 <li class="${is_active('journal')}">
382 382 <a class="menulink" title="${_('Show activity journal')}" href="${h.url('journal')}">
383 383 <div class="menulabel">${_('Journal')}</div>
384 384 </a>
385 385 </li>
386 386 %else:
387 387 <li class="${is_active('journal')}">
388 388 <a class="menulink" title="${_('Show Public activity journal')}" href="${h.url('public_journal')}">
389 389 <div class="menulabel">${_('Public journal')}</div>
390 390 </a>
391 391 </li>
392 392 %endif
393 393 <li class="${is_active('gists')}">
394 394 <a class="menulink childs" title="${_('Show Gists')}" href="${h.url('gists')}">
395 395 <div class="menulabel">${_('Gists')}</div>
396 396 </a>
397 397 </li>
398 398 <li class="${is_active('search')}">
399 399 <a class="menulink" title="${_('Search in repositories you have access to')}" href="${h.url('search')}">
400 400 <div class="menulabel">${_('Search')}</div>
401 401 </a>
402 402 </li>
403 403 % if h.HasPermissionAll('hg.admin')('access admin main page'):
404 404 <li class="${is_active('admin')}">
405 405 <a class="menulink childs" title="${_('Admin settings')}" href="#" onclick="return false;">
406 406 <div class="menulabel">${_('Admin')} <div class="show_more"></div></div>
407 407 </a>
408 408 ${admin_menu()}
409 409 </li>
410 410 % elif c.rhodecode_user.repositories_admin or c.rhodecode_user.repository_groups_admin or c.rhodecode_user.user_groups_admin:
411 411 <li class="${is_active('admin')}">
412 412 <a class="menulink childs" title="${_('Delegated Admin settings')}">
413 413 <div class="menulabel">${_('Admin')} <div class="show_more"></div></div>
414 414 </a>
415 415 ${admin_menu_simple(c.rhodecode_user.repositories_admin,
416 416 c.rhodecode_user.repository_groups_admin,
417 417 c.rhodecode_user.user_groups_admin or h.HasPermissionAny('hg.usergroup.create.true')())}
418 418 </li>
419 419 % endif
420 420 % if c.debug_style:
421 421 <li class="${is_active('debug_style')}">
422 422 <a class="menulink" title="${_('Style')}" href="${h.url('debug_style_home')}">
423 423 <div class="menulabel">${_('Style')}</div>
424 424 </a>
425 425 </li>
426 426 % endif
427 427 ## render extra user menu
428 428 ${usermenu(active=(active=='my_account'))}
429 429 </ul>
430 430
431 431 <script type="text/javascript">
432 432 var visual_show_public_icon = "${c.visual.show_public_icon}" == "True";
433 433
434 434 /*format the look of items in the list*/
435 435 var format = function(state, escapeMarkup){
436 436 if (!state.id){
437 437 return state.text; // optgroup
438 438 }
439 439 var obj_dict = state.obj;
440 440 var tmpl = '';
441 441
442 442 if(obj_dict && state.type == 'repo'){
443 443 if(obj_dict['repo_type'] === 'hg'){
444 444 tmpl += '<i class="icon-hg"></i> ';
445 445 }
446 446 else if(obj_dict['repo_type'] === 'git'){
447 447 tmpl += '<i class="icon-git"></i> ';
448 448 }
449 449 else if(obj_dict['repo_type'] === 'svn'){
450 450 tmpl += '<i class="icon-svn"></i> ';
451 451 }
452 452 if(obj_dict['private']){
453 453 tmpl += '<i class="icon-lock" ></i> ';
454 454 }
455 455 else if(visual_show_public_icon){
456 456 tmpl += '<i class="icon-unlock-alt"></i> ';
457 457 }
458 458 }
459 459 if(obj_dict && state.type == 'commit') {
460 460 tmpl += '<i class="icon-tag"></i>';
461 461 }
462 462 if(obj_dict && state.type == 'group'){
463 463 tmpl += '<i class="icon-folder-close"></i> ';
464 464 }
465 465 tmpl += escapeMarkup(state.text);
466 466 return tmpl;
467 467 };
468 468
469 469 var formatResult = function(result, container, query, escapeMarkup) {
470 470 return format(result, escapeMarkup);
471 471 };
472 472
473 473 var formatSelection = function(data, container, escapeMarkup) {
474 474 return format(data, escapeMarkup);
475 475 };
476 476
477 477 $("#repo_switcher").select2({
478 478 cachedDataSource: {},
479 479 minimumInputLength: 2,
480 480 placeholder: '<div class="menulabel">${_('Go to')} <div class="show_more"></div></div>',
481 481 dropdownAutoWidth: true,
482 482 formatResult: formatResult,
483 483 formatSelection: formatSelection,
484 484 containerCssClass: "repo-switcher",
485 485 dropdownCssClass: "repo-switcher-dropdown",
486 486 escapeMarkup: function(m){
487 487 // don't escape our custom placeholder
488 488 if(m.substr(0,23) == '<div class="menulabel">'){
489 489 return m;
490 490 }
491 491
492 492 return Select2.util.escapeMarkup(m);
493 493 },
494 494 query: $.debounce(250, function(query){
495 495 self = this;
496 496 var cacheKey = query.term;
497 497 var cachedData = self.cachedDataSource[cacheKey];
498 498
499 499 if (cachedData) {
500 500 query.callback({results: cachedData.results});
501 501 } else {
502 502 $.ajax({
503 503 url: "${h.url('goto_switcher_data')}",
504 504 data: {'query': query.term},
505 505 dataType: 'json',
506 506 type: 'GET',
507 507 success: function(data) {
508 508 self.cachedDataSource[cacheKey] = data;
509 509 query.callback({results: data.results});
510 510 },
511 511 error: function(data, textStatus, errorThrown) {
512 512 alert("Error while fetching entries.\nError code {0} ({1}).".format(data.status, data.statusText));
513 513 }
514 514 })
515 515 }
516 516 })
517 517 });
518 518
519 519 $("#repo_switcher").on('select2-selecting', function(e){
520 520 e.preventDefault();
521 521 window.location = e.choice.url;
522 522 });
523 523
524 524 </script>
525 525 <script src="${h.asset('js/rhodecode/base/keyboard-bindings.js', ver=c.rhodecode_version_hash)}"></script>
526 526 </%def>
527 527
528 528 <div class="modal" id="help_kb" tabindex="-1" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true">
529 529 <div class="modal-dialog">
530 530 <div class="modal-content">
531 531 <div class="modal-header">
532 532 <button type="button" class="close" data-dismiss="modal" aria-hidden="true">&times;</button>
533 533 <h4 class="modal-title" id="myModalLabel">${_('Keyboard shortcuts')}</h4>
534 534 </div>
535 535 <div class="modal-body">
536 536 <div class="block-left">
537 537 <table class="keyboard-mappings">
538 538 <tbody>
539 539 <tr>
540 540 <th></th>
541 541 <th>${_('Site-wide shortcuts')}</th>
542 542 </tr>
543 543 <%
544 544 elems = [
545 545 ('/', 'Open quick search box'),
546 546 ('g h', 'Goto home page'),
547 547 ('g g', 'Goto my private gists page'),
548 548 ('g G', 'Goto my public gists page'),
549 549 ('n r', 'New repository page'),
550 550 ('n g', 'New gist page'),
551 551 ]
552 552 %>
553 553 %for key, desc in elems:
554 554 <tr>
555 555 <td class="keys">
556 556 <span class="key tag">${key}</span>
557 557 </td>
558 558 <td>${desc}</td>
559 559 </tr>
560 560 %endfor
561 561 </tbody>
562 562 </table>
563 563 </div>
564 564 <div class="block-left">
565 565 <table class="keyboard-mappings">
566 566 <tbody>
567 567 <tr>
568 568 <th></th>
569 569 <th>${_('Repositories')}</th>
570 570 </tr>
571 571 <%
572 572 elems = [
573 573 ('g s', 'Goto summary page'),
574 574 ('g c', 'Goto changelog page'),
575 575 ('g f', 'Goto files page'),
576 576 ('g F', 'Goto files page with file search activated'),
577 577 ('g p', 'Goto pull requests page'),
578 578 ('g o', 'Goto repository settings'),
579 579 ('g O', 'Goto repository permissions settings'),
580 580 ]
581 581 %>
582 582 %for key, desc in elems:
583 583 <tr>
584 584 <td class="keys">
585 585 <span class="key tag">${key}</span>
586 586 </td>
587 587 <td>${desc}</td>
588 588 </tr>
589 589 %endfor
590 590 </tbody>
591 591 </table>
592 592 </div>
593 593 </div>
594 594 <div class="modal-footer">
595 595 </div>
596 596 </div><!-- /.modal-content -->
597 597 </div><!-- /.modal-dialog -->
598 598 </div><!-- /.modal -->
@@ -1,221 +1,220 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22
23 23 from rhodecode.lib import helpers as h
24 24 from rhodecode.model.meta import Session
25 25 from rhodecode.model.repo_group import RepoGroupModel
26 26 from rhodecode.tests import (
27 27 url, TestController, assert_session_flash, GIT_REPO, HG_REPO,
28 28 TESTS_TMP_PATH, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
29 29 from rhodecode.tests.fixture import Fixture
30 30
31 31 fixture = Fixture()
32 32
33 33
34 34 def test_update(app, csrf_token, autologin_user, user_util):
35 35 repo_group = user_util.create_repo_group()
36 36 description = 'description for newly created repo group'
37 37 Session().commit()
38 38 response = app.post(
39 39 url('update_repo_group', group_name=repo_group.group_name),
40 40 fixture._get_group_create_params(
41 41 group_name=repo_group.group_name,
42 42 group_description=description,
43 43 csrf_token=csrf_token,
44 44 _method='PUT')
45 45 )
46 46 # TODO: anderson: johbo: we believe that this update should return
47 47 # a redirect instead of rendering the template.
48 48 assert response.status_code == 200
49 49
50 50
51 51 def test_edit(app, user_util, autologin_user):
52 52 repo_group = user_util.create_repo_group()
53 53 Session().commit()
54 54 response = app.get(
55 55 url('edit_repo_group', group_name=repo_group.group_name))
56 56 assert response.status_code == 200
57 57
58 58
59 59 def test_edit_repo_group_perms(app, user_util, autologin_user):
60 60 repo_group = user_util.create_repo_group()
61 61 Session().commit()
62 62 response = app.get(
63 63 url('edit_repo_group_perms', group_name=repo_group.group_name))
64 64 assert response.status_code == 200
65 65
66 66
67 67 def test_update_fails_when_parent_pointing_to_self(
68 68 app, csrf_token, user_util, autologin_user):
69 69 group = user_util.create_repo_group()
70 70 response = app.post(
71 71 url('update_repo_group', group_name=group.group_name),
72 72 fixture._get_group_create_params(
73 73 group_parent_id=group.group_id,
74 74 csrf_token=csrf_token,
75 75 _method='PUT')
76 76 )
77 77 response.mustcontain(
78 78 '<select class="medium error" id="group_parent_id"'
79 79 ' name="group_parent_id">')
80 80 response.mustcontain('<span class="error-message">Value must be one of:')
81 81
82 82
83 83 class _BaseTest(TestController):
84 84
85 85 REPO_GROUP = None
86 86 NEW_REPO_GROUP = None
87 87 REPO = None
88 88 REPO_TYPE = None
89 89
90 90 def test_index(self):
91 91 self.log_user()
92 92 response = self.app.get(url('repo_groups'))
93 93 response.mustcontain('data: []')
94 94
95 95 def test_index_after_creating_group(self):
96 96 self.log_user()
97 97 fixture.create_repo_group('test_repo_group')
98 98 response = self.app.get(url('repo_groups'))
99 99 response.mustcontain('"name_raw": "test_repo_group"')
100 100 fixture.destroy_repo_group('test_repo_group')
101 101
102 102 def test_new(self):
103 103 self.log_user()
104 104 self.app.get(url('new_repo_group'))
105 105
106 106 def test_new_by_regular_user_no_permission(self):
107 107 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
108 108 self.app.get(url('new_repo_group'), status=403)
109 109
110 110 def test_create(self):
111 111 self.log_user()
112 112 repo_group_name = self.NEW_REPO_GROUP
113 113 repo_group_name_unicode = repo_group_name.decode('utf8')
114 114 description = 'description for newly created repo group'
115 115
116 116 response = self.app.post(
117 117 url('repo_groups'),
118 118 fixture._get_group_create_params(
119 119 group_name=repo_group_name,
120 120 group_description=description,
121 121 csrf_token=self.csrf_token))
122 122
123 123 # run the check page that triggers the flash message
124 124 # response = self.app.get(url('repo_check_home', repo_name=repo_name))
125 125 # assert response.json == {u'result': True}
126 126 assert_session_flash(
127 127 response,
128 128 u'Created repository group <a href="%s">%s</a>' % (
129 129 h.url('repo_group_home', group_name=repo_group_name),
130 130 repo_group_name_unicode))
131 131
132 132 # # test if the repo group was created in the database
133 133 new_repo_group = RepoGroupModel()._get_repo_group(
134 134 repo_group_name_unicode)
135 135 assert new_repo_group is not None
136 136
137 137 assert new_repo_group.group_name == repo_group_name_unicode
138 138 assert new_repo_group.group_description == description
139 139
140 #
141 # # test if the repository is visible in the list ?
140 # test if the repository is visible in the list ?
142 141 response = self.app.get(
143 142 url('repo_group_home', group_name=repo_group_name))
144 143 response.mustcontain(repo_group_name)
145 144
146 145 # test if the repository group was created on filesystem
147 146 is_on_filesystem = os.path.isdir(
148 147 os.path.join(TESTS_TMP_PATH, repo_group_name))
149 148 if not is_on_filesystem:
150 149 self.fail('no repo group %s in filesystem' % repo_group_name)
151 150
152 151 RepoGroupModel().delete(repo_group_name_unicode)
153 152 Session().commit()
154 153
155 154 def test_create_subgroup(self, user_util):
156 155 self.log_user()
157 156 repo_group_name = self.NEW_REPO_GROUP
158 157 parent_group = user_util.create_repo_group()
159 158 parent_group_name = parent_group.group_name
160 159
161 160 expected_group_name = '{}/{}'.format(
162 161 parent_group_name, repo_group_name)
163 162 expected_group_name_unicode = expected_group_name.decode('utf8')
164 163
165 164 try:
166 165 response = self.app.post(
167 166 url('repo_groups'),
168 167 fixture._get_group_create_params(
169 168 group_name=repo_group_name,
170 169 group_parent_id=parent_group.group_id,
171 170 group_description='Test desciption',
172 171 csrf_token=self.csrf_token))
173 172
174 173 assert_session_flash(
175 174 response,
176 175 u'Created repository group <a href="%s">%s</a>' % (
177 176 h.url('repo_group_home', group_name=expected_group_name),
178 177 expected_group_name_unicode))
179 178 finally:
180 179 RepoGroupModel().delete(expected_group_name_unicode)
181 180 Session().commit()
182 181
183 182 def test_user_with_creation_permissions_cannot_create_subgroups(
184 183 self, user_util):
185 184
186 185 user_util.grant_user_permission(
187 186 TEST_USER_REGULAR_LOGIN, 'hg.repogroup.create.true')
188 187 parent_group = user_util.create_repo_group()
189 188 parent_group_id = parent_group.group_id
190 189 self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
191 190 self.app.get(
192 191 url('new_repo_group', parent_group=parent_group_id,),
193 192 status=403)
194 193
195 194
196 195 class TestRepoGroupsControllerGIT(_BaseTest):
197 196 REPO_GROUP = None
198 197 NEW_REPO_GROUP = 'git_repo'
199 198 REPO = GIT_REPO
200 199 REPO_TYPE = 'git'
201 200
202 201
203 202 class TestRepoGroupsControllerHG(_BaseTest):
204 203 REPO_GROUP = None
205 204 NEW_REPO_GROUP = 'hg_repo'
206 205 REPO = HG_REPO
207 206 REPO_TYPE = 'hg'
208 207
209 208
210 209 class TestRepoGroupsControllerNumericalHG(_BaseTest):
211 210 REPO_GROUP = None
212 211 NEW_REPO_GROUP = '12345'
213 212 REPO = HG_REPO
214 213 REPO_TYPE = 'hg'
215 214
216 215
217 216 class TestRepoGroupsControllerNonAsciiHG(_BaseTest):
218 217 REPO_GROUP = None
219 218 NEW_REPO_GROUP = 'hg_repo_Δ…Δ‡'
220 219 REPO = HG_REPO
221 220 REPO_TYPE = 'hg'
@@ -1,695 +1,717 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22
23 23 from rhodecode.lib.auth import AuthUser
24 24 from rhodecode.model.db import (
25 25 RepoGroup, User, UserGroupRepoGroupToPerm, Permission, UserToPerm,
26 26 UserGroupToPerm)
27 27 from rhodecode.model.meta import Session
28 28 from rhodecode.model.permission import PermissionModel
29 29 from rhodecode.model.repo import RepoModel
30 30 from rhodecode.model.repo_group import RepoGroupModel
31 31 from rhodecode.model.user import UserModel
32 32 from rhodecode.model.user_group import UserGroupModel
33 33 from rhodecode.tests.fixture import Fixture
34 34
35 35
36 36 fixture = Fixture()
37 37
38 38
39 39 @pytest.fixture
40 40 def repo_name(backend_hg):
41 41 return backend_hg.repo_name
42 42
43 43
44 44 class TestPermissions(object):
45 45
46 46 @pytest.fixture(scope='class', autouse=True)
47 47 def default_permissions(self, request, pylonsapp):
48 48 # recreate default user to get a clean start
49 49 PermissionModel().create_default_user_permissions(
50 50 user=User.DEFAULT_USER, force=True)
51 51 Session().commit()
52 52
53 53 @pytest.fixture(autouse=True)
54 54 def prepare_users(self, request):
55 55 # TODO: User creation is a duplicate of test_nofitications, check
56 56 # if that can be unified
57 57 self.u1 = UserModel().create_or_update(
58 58 username=u'u1', password=u'qweqwe',
59 59 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
60 60 )
61 61 self.u2 = UserModel().create_or_update(
62 62 username=u'u2', password=u'qweqwe',
63 63 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
64 64 )
65 65 self.u3 = UserModel().create_or_update(
66 66 username=u'u3', password=u'qweqwe',
67 67 email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
68 68 )
69 69 self.anon = User.get_default_user()
70 70 self.a1 = UserModel().create_or_update(
71 71 username=u'a1', password=u'qweqwe',
72 72 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1',
73 73 admin=True
74 74 )
75 75 Session().commit()
76 76
77 77 request.addfinalizer(self.cleanup)
78 78
79 79 def cleanup(self):
80 80 if hasattr(self, 'test_repo'):
81 81 RepoModel().delete(repo=self.test_repo)
82 82
83 83 if hasattr(self, 'g1'):
84 84 RepoGroupModel().delete(self.g1.group_id)
85 85 if hasattr(self, 'g2'):
86 86 RepoGroupModel().delete(self.g2.group_id)
87 87
88 88 UserModel().delete(self.u1)
89 89 UserModel().delete(self.u2)
90 90 UserModel().delete(self.u3)
91 91 UserModel().delete(self.a1)
92 92
93 93 if hasattr(self, 'ug1'):
94 94 UserGroupModel().delete(self.ug1, force=True)
95 95
96 96 Session().commit()
97 97
98 98 def test_default_perms_set(self, repo_name):
99 99 assert repo_perms(self.u1)[repo_name] == 'repository.read'
100 100 new_perm = 'repository.write'
101 101 RepoModel().grant_user_permission(repo=repo_name, user=self.u1,
102 102 perm=new_perm)
103 103 Session().commit()
104 104 assert repo_perms(self.u1)[repo_name] == new_perm
105 105
106 106 def test_default_admin_perms_set(self, repo_name):
107 107 assert repo_perms(self.a1)[repo_name] == 'repository.admin'
108 108 RepoModel().grant_user_permission(repo=repo_name, user=self.a1,
109 109 perm='repository.write')
110 110 Session().commit()
111 111 # cannot really downgrade admins permissions !? they still gets set as
112 112 # admin !
113 113 assert repo_perms(self.a1)[repo_name] == 'repository.admin'
114 114
115 115 def test_default_group_perms(self, repo_name):
116 116 self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
117 117 self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
118 118
119 119 assert repo_perms(self.u1)[repo_name] == 'repository.read'
120 120 assert group_perms(self.u1) == {
121 121 'test1': 'group.read', 'test2': 'group.read'}
122 122 assert global_perms(self.u1) == set(
123 123 Permission.DEFAULT_USER_PERMISSIONS)
124 124
125 125 def test_default_admin_group_perms(self, repo_name):
126 126 self.g1 = fixture.create_repo_group('test1', skip_if_exists=True)
127 127 self.g2 = fixture.create_repo_group('test2', skip_if_exists=True)
128 128
129 129 assert repo_perms(self.a1)[repo_name] == 'repository.admin'
130 130 assert group_perms(self.a1) == {
131 131 'test1': 'group.admin', 'test2': 'group.admin'}
132 132
133 def test_default_owner_group_perms(self):
134 # "u1" shall be owner without any special permission assigned
135 self.g1 = fixture.create_repo_group('test1')
136 assert group_perms(self.u1) == {'test1': 'group.read'}
133 def test_default_owner_repo_perms(self, backend, user_util, test_repo):
134 user = user_util.create_user()
135 repo = test_repo('minimal', backend.alias)
136 org_owner = repo.user
137 assert repo_perms(user)[repo.repo_name] == 'repository.read'
138
139 repo.user = user
140 assert repo_perms(user)[repo.repo_name] == 'repository.admin'
141 repo.user = org_owner
142
143 def test_default_owner_repo_group_perms(self, user_util, test_repo_group):
144 user = user_util.create_user()
145 org_owner = test_repo_group.user
137 146
138 # Make him owner, but do not add any special permissions
139 self.g1.user = self.u1
140 assert group_perms(self.u1) == {'test1': 'group.admin'}
147 assert group_perms(user)[test_repo_group.group_name] == 'group.read'
148
149 test_repo_group.user = user
150 assert group_perms(user)[test_repo_group.group_name] == 'group.admin'
151 test_repo_group.user = org_owner
152
153 def test_default_owner_user_group_perms(self, user_util, test_user_group):
154 user = user_util.create_user()
155 org_owner = test_user_group.user
156
157 assert user_group_perms(user)[test_user_group.users_group_name] == 'usergroup.read'
158
159 test_user_group.user = user
160 assert user_group_perms(user)[test_user_group.users_group_name] == 'usergroup.admin'
161
162 test_user_group.user = org_owner
141 163
142 164 def test_propagated_permission_from_users_group_by_explicit_perms_exist(
143 165 self, repo_name):
144 166 # make group
145 167 self.ug1 = fixture.create_user_group('G1')
146 168 UserGroupModel().add_user_to_group(self.ug1, self.u1)
147 169
148 170 # set permission to lower
149 171 new_perm = 'repository.none'
150 172 RepoModel().grant_user_permission(
151 173 repo=repo_name, user=self.u1, perm=new_perm)
152 174 Session().commit()
153 175 assert repo_perms(self.u1)[repo_name] == new_perm
154 176
155 177 # grant perm for group this should not override permission from user
156 178 # since it has explicitly set
157 179 new_perm_gr = 'repository.write'
158 180 RepoModel().grant_user_group_permission(
159 181 repo=repo_name, group_name=self.ug1, perm=new_perm_gr)
160 182
161 183 assert repo_perms(self.u1)[repo_name] == new_perm
162 184 assert group_perms(self.u1) == {}
163 185
164 186 def test_propagated_permission_from_users_group(self, repo_name):
165 187 # make group
166 188 self.ug1 = fixture.create_user_group('G1')
167 189 UserGroupModel().add_user_to_group(self.ug1, self.u3)
168 190
169 191 # grant perm for group
170 192 # this should override default permission from user
171 193 new_perm_gr = 'repository.write'
172 194 RepoModel().grant_user_group_permission(
173 195 repo=repo_name, group_name=self.ug1, perm=new_perm_gr)
174 196
175 197 assert repo_perms(self.u3)[repo_name] == new_perm_gr
176 198 assert group_perms(self.u3) == {}
177 199
178 200 def test_propagated_permission_from_users_group_lower_weight(
179 201 self, repo_name):
180 202 # make group with user
181 203 self.ug1 = fixture.create_user_group('G1')
182 204 UserGroupModel().add_user_to_group(self.ug1, self.u1)
183 205
184 206 # set permission to lower
185 207 new_perm_h = 'repository.write'
186 208 RepoModel().grant_user_permission(
187 209 repo=repo_name, user=self.u1, perm=new_perm_h)
188 210 Session().commit()
189 211
190 212 assert repo_perms(self.u1)[repo_name] == new_perm_h
191 213
192 214 # grant perm for group this should NOT override permission from user
193 215 # since it's lower than granted
194 216 new_perm_l = 'repository.read'
195 217 RepoModel().grant_user_group_permission(
196 218 repo=repo_name, group_name=self.ug1, perm=new_perm_l)
197 219
198 220 assert repo_perms(self.u1)[repo_name] == new_perm_h
199 221 assert group_perms(self.u1) == {}
200 222
201 223 def test_repo_in_group_permissions(self):
202 224 self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
203 225 self.g2 = fixture.create_repo_group('group2', skip_if_exists=True)
204 226 # both perms should be read !
205 227 assert group_perms(self.u1) == \
206 228 {u'group1': u'group.read', u'group2': u'group.read'}
207 229
208 230 assert group_perms(self.anon) == \
209 231 {u'group1': u'group.read', u'group2': u'group.read'}
210 232
211 233 # Change perms to none for both groups
212 234 RepoGroupModel().grant_user_permission(
213 235 repo_group=self.g1, user=self.anon, perm='group.none')
214 236 RepoGroupModel().grant_user_permission(
215 237 repo_group=self.g2, user=self.anon, perm='group.none')
216 238
217 239 assert group_perms(self.u1) == \
218 240 {u'group1': u'group.none', u'group2': u'group.none'}
219 241 assert group_perms(self.anon) == \
220 242 {u'group1': u'group.none', u'group2': u'group.none'}
221 243
222 244 # add repo to group
223 245 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
224 246 self.test_repo = fixture.create_repo(name=name,
225 247 repo_type='hg',
226 248 repo_group=self.g1,
227 249 cur_user=self.u1,)
228 250
229 251 assert group_perms(self.u1) == \
230 252 {u'group1': u'group.none', u'group2': u'group.none'}
231 253 assert group_perms(self.anon) == \
232 254 {u'group1': u'group.none', u'group2': u'group.none'}
233 255
234 256 # grant permission for u2 !
235 257 RepoGroupModel().grant_user_permission(
236 258 repo_group=self.g1, user=self.u2, perm='group.read')
237 259 RepoGroupModel().grant_user_permission(
238 260 repo_group=self.g2, user=self.u2, perm='group.read')
239 261 Session().commit()
240 262 assert self.u1 != self.u2
241 263
242 264 # u1 and anon should have not change perms while u2 should !
243 265 assert group_perms(self.u1) == \
244 266 {u'group1': u'group.none', u'group2': u'group.none'}
245 267 assert group_perms(self.u2) == \
246 268 {u'group1': u'group.read', u'group2': u'group.read'}
247 269 assert group_perms(self.anon) == \
248 270 {u'group1': u'group.none', u'group2': u'group.none'}
249 271
250 272 def test_repo_group_user_as_user_group_member(self):
251 273 # create Group1
252 274 self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
253 275 assert group_perms(self.anon) == {u'group1': u'group.read'}
254 276
255 277 # set default permission to none
256 278 RepoGroupModel().grant_user_permission(
257 279 repo_group=self.g1, user=self.anon, perm='group.none')
258 280 # make group
259 281 self.ug1 = fixture.create_user_group('G1')
260 282 # add user to group
261 283 UserGroupModel().add_user_to_group(self.ug1, self.u1)
262 284 Session().commit()
263 285
264 286 # check if user is in the group
265 287 ug1 = UserGroupModel().get(self.ug1.users_group_id)
266 288 members = [x.user_id for x in ug1.members]
267 289 assert members == [self.u1.user_id]
268 290 # add some user to that group
269 291
270 292 # check his permissions
271 293 assert group_perms(self.anon) == {u'group1': u'group.none'}
272 294 assert group_perms(self.u1) == {u'group1': u'group.none'}
273 295
274 296 # grant ug1 read permissions for
275 297 RepoGroupModel().grant_user_group_permission(
276 298 repo_group=self.g1, group_name=self.ug1, perm='group.read')
277 299 Session().commit()
278 300
279 301 # check if the
280 302 obj = Session().query(UserGroupRepoGroupToPerm)\
281 303 .filter(UserGroupRepoGroupToPerm.group == self.g1)\
282 304 .filter(UserGroupRepoGroupToPerm.users_group == self.ug1)\
283 305 .scalar()
284 306 assert obj.permission.permission_name == 'group.read'
285 307
286 308 assert group_perms(self.anon) == {u'group1': u'group.none'}
287 309 assert group_perms(self.u1) == {u'group1': u'group.read'}
288 310
289 311 def test_inherited_permissions_from_default_on_user_enabled(self):
290 312 # enable fork and create on default user
291 313 _form_result = {
292 314 'default_repo_create': 'hg.create.repository',
293 315 'default_fork_create': 'hg.fork.repository'
294 316 }
295 317 PermissionModel().set_new_user_perms(
296 318 User.get_default_user(), _form_result)
297 319 Session().commit()
298 320
299 321 # make sure inherit flag is turned on
300 322 self.u1.inherit_default_permissions = True
301 323 Session().commit()
302 324
303 325 # this user will have inherited permissions from default user
304 326 assert global_perms(self.u1) == default_perms()
305 327
306 328 def test_inherited_permissions_from_default_on_user_disabled(self):
307 329 # disable fork and create on default user
308 330 _form_result = {
309 331 'default_repo_create': 'hg.create.none',
310 332 'default_fork_create': 'hg.fork.none'
311 333 }
312 334 PermissionModel().set_new_user_perms(
313 335 User.get_default_user(), _form_result)
314 336 Session().commit()
315 337
316 338 # make sure inherit flag is turned on
317 339 self.u1.inherit_default_permissions = True
318 340 Session().commit()
319 341
320 342 # this user will have inherited permissions from default user
321 343 expected_perms = default_perms(
322 344 added=['hg.create.none', 'hg.fork.none'],
323 345 removed=['hg.create.repository', 'hg.fork.repository'])
324 346 assert global_perms(self.u1) == expected_perms
325 347
326 348 def test_non_inherited_permissions_from_default_on_user_enabled(self):
327 349 user_model = UserModel()
328 350 # enable fork and create on default user
329 351 usr = User.DEFAULT_USER
330 352 user_model.revoke_perm(usr, 'hg.create.none')
331 353 user_model.grant_perm(usr, 'hg.create.repository')
332 354 user_model.revoke_perm(usr, 'hg.fork.none')
333 355 user_model.grant_perm(usr, 'hg.fork.repository')
334 356
335 357 # disable global perms on specific user
336 358 user_model.revoke_perm(self.u1, 'hg.create.repository')
337 359 user_model.grant_perm(self.u1, 'hg.create.none')
338 360 user_model.revoke_perm(self.u1, 'hg.fork.repository')
339 361 user_model.grant_perm(self.u1, 'hg.fork.none')
340 362
341 363 # make sure inherit flag is turned off
342 364 self.u1.inherit_default_permissions = False
343 365 Session().commit()
344 366
345 367 # this user will have non inherited permissions from he's
346 368 # explicitly set permissions
347 369 assert global_perms(self.u1) == set([
348 370 'hg.create.none',
349 371 'hg.fork.none',
350 372 'hg.register.manual_activate',
351 373 'hg.password_reset.enabled',
352 374 'hg.extern_activate.auto',
353 375 'repository.read',
354 376 'group.read',
355 377 'usergroup.read',
356 378 ])
357 379
358 380 def test_non_inherited_permissions_from_default_on_user_disabled(self):
359 381 user_model = UserModel()
360 382 # disable fork and create on default user
361 383 usr = User.DEFAULT_USER
362 384 user_model.revoke_perm(usr, 'hg.create.repository')
363 385 user_model.grant_perm(usr, 'hg.create.none')
364 386 user_model.revoke_perm(usr, 'hg.fork.repository')
365 387 user_model.grant_perm(usr, 'hg.fork.none')
366 388
367 389 # enable global perms on specific user
368 390 user_model.revoke_perm(self.u1, 'hg.create.none')
369 391 user_model.grant_perm(self.u1, 'hg.create.repository')
370 392 user_model.revoke_perm(self.u1, 'hg.fork.none')
371 393 user_model.grant_perm(self.u1, 'hg.fork.repository')
372 394
373 395 # make sure inherit flag is turned off
374 396 self.u1.inherit_default_permissions = False
375 397 Session().commit()
376 398
377 399 # this user will have non inherited permissions from he's
378 400 # explicitly set permissions
379 401 assert global_perms(self.u1) == set([
380 402 'hg.create.repository',
381 403 'hg.fork.repository',
382 404 'hg.register.manual_activate',
383 405 'hg.password_reset.enabled',
384 406 'hg.extern_activate.auto',
385 407 'repository.read',
386 408 'group.read',
387 409 'usergroup.read',
388 410 ])
389 411
390 412 @pytest.mark.parametrize('perm, expected_perm', [
391 413 ('hg.inherit_default_perms.false', 'repository.none', ),
392 414 ('hg.inherit_default_perms.true', 'repository.read', ),
393 415 ])
394 416 def test_inherited_permissions_on_objects(self, perm, expected_perm):
395 417 _form_result = {
396 418 'default_inherit_default_permissions': perm,
397 419 }
398 420 PermissionModel().set_new_user_perms(
399 421 User.get_default_user(), _form_result)
400 422 Session().commit()
401 423
402 424 # make sure inherit flag is turned on
403 425 self.u1.inherit_default_permissions = True
404 426 Session().commit()
405 427
406 428 # this user will have inherited permissions from default user
407 429 assert global_perms(self.u1) == set([
408 430 'hg.create.none',
409 431 'hg.fork.none',
410 432 'hg.register.manual_activate',
411 433 'hg.password_reset.enabled',
412 434 'hg.extern_activate.auto',
413 435 'repository.read',
414 436 'group.read',
415 437 'usergroup.read',
416 438 'hg.create.write_on_repogroup.true',
417 439 'hg.usergroup.create.false',
418 440 'hg.repogroup.create.false',
419 441 perm,
420 442 ])
421 443
422 444 assert set(repo_perms(self.u1).values()) == set([expected_perm])
423 445
424 446 def test_repo_owner_permissions_not_overwritten_by_group(self):
425 447 # create repo as USER,
426 448 self.test_repo = fixture.create_repo(name='myownrepo',
427 449 repo_type='hg',
428 450 cur_user=self.u1)
429 451
430 452 # he has permissions of admin as owner
431 453 assert repo_perms(self.u1)['myownrepo'] == 'repository.admin'
432 454
433 455 # set his permission as user group, he should still be admin
434 456 self.ug1 = fixture.create_user_group('G1')
435 457 UserGroupModel().add_user_to_group(self.ug1, self.u1)
436 458 RepoModel().grant_user_group_permission(
437 459 self.test_repo,
438 460 group_name=self.ug1,
439 461 perm='repository.none')
440 462 Session().commit()
441 463
442 464 assert repo_perms(self.u1)['myownrepo'] == 'repository.admin'
443 465
444 466 def test_repo_owner_permissions_not_overwritten_by_others(self):
445 467 # create repo as USER,
446 468 self.test_repo = fixture.create_repo(name='myownrepo',
447 469 repo_type='hg',
448 470 cur_user=self.u1)
449 471
450 472 # he has permissions of admin as owner
451 473 assert repo_perms(self.u1)['myownrepo'] == 'repository.admin'
452 474
453 475 # set his permission as user, he should still be admin
454 476 RepoModel().grant_user_permission(
455 477 self.test_repo, user=self.u1, perm='repository.none')
456 478 Session().commit()
457 479
458 480 assert repo_perms(self.u1)['myownrepo'] == 'repository.admin'
459 481
460 482 def test_repo_group_owner_permissions_not_overwritten_by_group(self):
461 483 # "u1" shall be owner without any special permission assigned
462 484 self.g1 = fixture.create_repo_group('test1')
463 485
464 486 # Make user group and grant a permission to user group
465 487 self.ug1 = fixture.create_user_group('G1')
466 488 UserGroupModel().add_user_to_group(self.ug1, self.u1)
467 489 RepoGroupModel().grant_user_group_permission(
468 490 repo_group=self.g1, group_name=self.ug1, perm='group.write')
469 491
470 492 # Verify that user does not get any special permission if he is not
471 493 # owner
472 494 assert group_perms(self.u1) == {'test1': 'group.write'}
473 495
474 496 # Make him owner of the repo group
475 497 self.g1.user = self.u1
476 498 assert group_perms(self.u1) == {'test1': 'group.admin'}
477 499
478 500 def test_repo_group_owner_permissions_not_overwritten_by_others(self):
479 501 # "u1" shall be owner without any special permission assigned
480 502 self.g1 = fixture.create_repo_group('test1')
481 503 RepoGroupModel().grant_user_permission(
482 504 repo_group=self.g1, user=self.u1, perm='group.write')
483 505
484 506 # Verify that user does not get any special permission if he is not
485 507 # owner
486 508 assert group_perms(self.u1) == {'test1': 'group.write'}
487 509
488 510 # Make him owner of the repo group
489 511 self.g1.user = self.u1
490 512 assert group_perms(self.u1) == {u'test1': 'group.admin'}
491 513
492 514 def _test_def_user_perm_equal(
493 515 self, user, change_factor=0, compare_keys=None):
494 516 perms = UserToPerm.query().filter(UserToPerm.user == user).all()
495 517 assert len(perms) == \
496 518 len(Permission.DEFAULT_USER_PERMISSIONS) + change_factor
497 519 if compare_keys:
498 520 assert set(
499 521 x.permissions.permission_name for x in perms) == compare_keys
500 522
501 523 def _test_def_user_group_perm_equal(
502 524 self, user_group, change_factor=0, compare_keys=None):
503 525 perms = UserGroupToPerm.query().filter(
504 526 UserGroupToPerm.users_group == user_group).all()
505 527 assert len(perms) == \
506 528 len(Permission.DEFAULT_USER_PERMISSIONS) + change_factor
507 529 if compare_keys:
508 530 assert set(
509 531 x.permissions.permission_name for x in perms) == compare_keys
510 532
511 533 def test_set_default_permissions(self):
512 534 PermissionModel().create_default_user_permissions(user=self.u1)
513 535 self._test_def_user_perm_equal(user=self.u1)
514 536
515 537 def test_set_default_permissions_after_one_is_missing(self):
516 538 PermissionModel().create_default_user_permissions(user=self.u1)
517 539 self._test_def_user_perm_equal(user=self.u1)
518 540 # now we delete one, it should be re-created after another call
519 541 perms = UserToPerm.query().filter(UserToPerm.user == self.u1).all()
520 542 Session().delete(perms[0])
521 543 Session().commit()
522 544
523 545 self._test_def_user_perm_equal(user=self.u1, change_factor=-1)
524 546
525 547 # create missing one !
526 548 PermissionModel().create_default_user_permissions(user=self.u1)
527 549 self._test_def_user_perm_equal(user=self.u1)
528 550
529 551 @pytest.mark.parametrize("perm, modify_to", [
530 552 ('repository.read', 'repository.none'),
531 553 ('group.read', 'group.none'),
532 554 ('usergroup.read', 'usergroup.none'),
533 555 ('hg.create.repository', 'hg.create.none'),
534 556 ('hg.fork.repository', 'hg.fork.none'),
535 557 ('hg.register.manual_activate', 'hg.register.auto_activate',)
536 558 ])
537 559 def test_set_default_permissions_after_modification(self, perm, modify_to):
538 560 PermissionModel().create_default_user_permissions(user=self.u1)
539 561 self._test_def_user_perm_equal(user=self.u1)
540 562
541 563 old = Permission.get_by_key(perm)
542 564 new = Permission.get_by_key(modify_to)
543 565 assert old is not None
544 566 assert new is not None
545 567
546 568 # now modify permissions
547 569 p = UserToPerm.query().filter(
548 570 UserToPerm.user == self.u1).filter(
549 571 UserToPerm.permission == old).one()
550 572 p.permission = new
551 573 Session().add(p)
552 574 Session().commit()
553 575
554 576 PermissionModel().create_default_user_permissions(user=self.u1)
555 577 self._test_def_user_perm_equal(user=self.u1)
556 578
557 579 def test_clear_user_perms(self):
558 580 PermissionModel().create_default_user_permissions(user=self.u1)
559 581 self._test_def_user_perm_equal(user=self.u1)
560 582
561 583 # now clear permissions
562 584 cleared = PermissionModel()._clear_user_perms(self.u1.user_id)
563 585 self._test_def_user_perm_equal(user=self.u1,
564 586 change_factor=len(cleared)*-1)
565 587
566 588 def test_clear_user_group_perms(self):
567 589 self.ug1 = fixture.create_user_group('G1')
568 590 PermissionModel().create_default_user_group_permissions(
569 591 user_group=self.ug1)
570 592 self._test_def_user_group_perm_equal(user_group=self.ug1)
571 593
572 594 # now clear permissions
573 595 cleared = PermissionModel()._clear_user_group_perms(
574 596 self.ug1.users_group_id)
575 597 self._test_def_user_group_perm_equal(user_group=self.ug1,
576 598 change_factor=len(cleared)*-1)
577 599
578 600 @pytest.mark.parametrize("form_result", [
579 601 {},
580 602 {'default_repo_create': 'hg.create.repository'},
581 603 {'default_repo_create': 'hg.create.repository',
582 604 'default_repo_perm': 'repository.read'},
583 605 {'default_repo_create': 'hg.create.none',
584 606 'default_repo_perm': 'repository.write',
585 607 'default_fork_create': 'hg.fork.none'},
586 608 ])
587 609 def test_set_new_user_permissions(self, form_result):
588 610 _form_result = {}
589 611 _form_result.update(form_result)
590 612 PermissionModel().set_new_user_perms(self.u1, _form_result)
591 613 Session().commit()
592 614 change_factor = -1 * (len(Permission.DEFAULT_USER_PERMISSIONS)
593 615 - len(form_result.keys()))
594 616 self._test_def_user_perm_equal(
595 617 self.u1, change_factor=change_factor)
596 618
597 619 @pytest.mark.parametrize("form_result", [
598 620 {},
599 621 {'default_repo_create': 'hg.create.repository'},
600 622 {'default_repo_create': 'hg.create.repository',
601 623 'default_repo_perm': 'repository.read'},
602 624 {'default_repo_create': 'hg.create.none',
603 625 'default_repo_perm': 'repository.write',
604 626 'default_fork_create': 'hg.fork.none'},
605 627 ])
606 628 def test_set_new_user_group_permissions(self, form_result):
607 629 _form_result = {}
608 630 _form_result.update(form_result)
609 631 self.ug1 = fixture.create_user_group('G1')
610 632 PermissionModel().set_new_user_group_perms(self.ug1, _form_result)
611 633 Session().commit()
612 634 change_factor = -1 * (len(Permission.DEFAULT_USER_PERMISSIONS)
613 635 - len(form_result.keys()))
614 636 self._test_def_user_group_perm_equal(
615 637 self.ug1, change_factor=change_factor)
616 638
617 639 @pytest.mark.parametrize("group_active, expected_perm", [
618 640 (True, 'repository.admin'),
619 641 (False, 'repository.read'),
620 642 ])
621 643 def test_get_default_repo_perms_from_user_group_with_active_group(
622 644 self, backend, user_util, group_active, expected_perm):
623 645 repo = backend.create_repo()
624 646 user = user_util.create_user()
625 647 user_group = user_util.create_user_group(
626 648 members=[user], users_group_active=group_active)
627 649
628 650 user_util.grant_user_group_permission_to_repo(
629 651 repo, user_group, 'repository.admin')
630 652 permissions = repo_perms(user)
631 653 repo_permission = permissions.get(repo.repo_name)
632 654 assert repo_permission == expected_perm
633 655
634 656 @pytest.mark.parametrize("group_active, expected_perm", [
635 657 (True, 'group.admin'),
636 658 (False, 'group.read')
637 659 ])
638 660 def test_get_default_group_perms_from_user_group_with_active_group(
639 661 self, user_util, group_active, expected_perm):
640 662 user = user_util.create_user()
641 663 repo_group = user_util.create_repo_group()
642 664 user_group = user_util.create_user_group(
643 665 members=[user], users_group_active=group_active)
644 666
645 667 user_util.grant_user_group_permission_to_repo_group(
646 668 repo_group, user_group, 'group.admin')
647 669 permissions = group_perms(user)
648 670 group_permission = permissions.get(repo_group.name)
649 671 assert group_permission == expected_perm
650 672
651 673 @pytest.mark.parametrize("group_active, expected_perm", [
652 674 (True, 'usergroup.admin'),
653 675 (False, 'usergroup.read')
654 676 ])
655 677 def test_get_default_user_group_perms_from_user_group_with_active_group(
656 678 self, user_util, group_active, expected_perm):
657 679 user = user_util.create_user()
658 680 user_group = user_util.create_user_group(
659 681 members=[user], users_group_active=group_active)
660 682 target_user_group = user_util.create_user_group()
661 683
662 684 user_util.grant_user_group_permission_to_user_group(
663 685 target_user_group, user_group, 'usergroup.admin')
664 686 permissions = user_group_perms(user)
665 687 group_permission = permissions.get(target_user_group.users_group_name)
666 688 assert group_permission == expected_perm
667 689
668 690
669 691 def repo_perms(user):
670 692 auth_user = AuthUser(user_id=user.user_id)
671 693 return auth_user.permissions['repositories']
672 694
673 695
674 696 def group_perms(user):
675 697 auth_user = AuthUser(user_id=user.user_id)
676 698 return auth_user.permissions['repositories_groups']
677 699
678 700
679 701 def user_group_perms(user):
680 702 auth_user = AuthUser(user_id=user.user_id)
681 703 return auth_user.permissions['user_groups']
682 704
683 705
684 706 def global_perms(user):
685 707 auth_user = AuthUser(user_id=user.user_id)
686 708 return auth_user.permissions['global']
687 709
688 710
689 711 def default_perms(added=None, removed=None):
690 712 expected_perms = set(Permission.DEFAULT_USER_PERMISSIONS)
691 713 if removed:
692 714 expected_perms.difference_update(removed)
693 715 if added:
694 716 expected_perms.update(added)
695 717 return expected_perms
General Comments 0
You need to be logged in to leave comments. Login now