##// END OF EJS Templates
auth: always consider the repo group owner an admin when computing it's permissions...
Mads Kiilerich -
r8770:e27ff6a9 stable
parent child Browse files
Show More
@@ -1,723 +1,726 b''
1 1 # -*- coding: utf-8 -*-
2 2 # This program is free software: you can redistribute it and/or modify
3 3 # it under the terms of the GNU General Public License as published by
4 4 # the Free Software Foundation, either version 3 of the License, or
5 5 # (at your option) any later version.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 """
15 15 kallithea.lib.auth
16 16 ~~~~~~~~~~~~~~~~~~
17 17
18 18 authentication and permission libraries
19 19
20 20 This file was forked by the Kallithea project in July 2014.
21 21 Original author and date, and relevant copyright and licensing information is below:
22 22 :created_on: Apr 4, 2010
23 23 :author: marcink
24 24 :copyright: (c) 2013 RhodeCode GmbH, and others.
25 25 :license: GPLv3, see LICENSE.md for more details.
26 26 """
27 27 import itertools
28 28 import logging
29 29
30 30 import ipaddr
31 31 from decorator import decorator
32 32 from sqlalchemy.orm import joinedload
33 33 from sqlalchemy.orm.exc import ObjectDeletedError
34 34 from tg import request
35 35 from tg.i18n import ugettext as _
36 36 from webob.exc import HTTPForbidden, HTTPFound
37 37
38 38 import kallithea
39 39 from kallithea.lib import webutils
40 40 from kallithea.lib.utils import get_repo_group_slug, get_repo_slug, get_user_group_slug
41 41 from kallithea.lib.vcs.utils.lazy import LazyProperty
42 42 from kallithea.lib.webutils import url
43 43 from kallithea.model import db, meta
44 44 from kallithea.model.user import UserModel
45 45
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49
50 50 PERM_WEIGHTS = db.Permission.PERM_WEIGHTS
51 51
52 52 def bump_permission(permissions, key, new_perm):
53 53 """Add a new permission for key to permissions.
54 54 Assuming the permissions are comparable, set the new permission if it
55 55 has higher weight, else drop it and keep the old permission.
56 56 """
57 57 cur_perm = permissions[key]
58 58 new_perm_val = PERM_WEIGHTS[new_perm]
59 59 cur_perm_val = PERM_WEIGHTS[cur_perm]
60 60 if new_perm_val > cur_perm_val:
61 61 permissions[key] = new_perm
62 62
63 63 class AuthUser(object):
64 64 """
65 65 Represents a Kallithea user, including various authentication and
66 66 authorization information. Typically used to store the current user,
67 67 but is also used as a generic user information data structure in
68 68 parts of the code, e.g. user management.
69 69
70 70 Constructed from a database `User` object, a user ID or cookie dict,
71 71 it looks up the user (if needed) and copies all attributes to itself,
72 72 adding various non-persistent data. If lookup fails but anonymous
73 73 access to Kallithea is enabled, the default user is loaded instead.
74 74
75 75 `AuthUser` does not by itself authenticate users. It's up to other parts of
76 76 the code to check e.g. if a supplied password is correct, and if so, trust
77 77 the AuthUser object as an authenticated user.
78 78
79 79 However, `AuthUser` does refuse to load a user that is not `active`.
80 80
81 81 Note that Kallithea distinguishes between the default user (an actual
82 82 user in the database with username "default") and "no user" (no actual
83 83 User object, AuthUser filled with blank values and username "None").
84 84
85 85 If the default user is active, that will always be used instead of
86 86 "no user". On the other hand, if the default user is disabled (and
87 87 there is no login information), we instead get "no user"; this should
88 88 only happen on the login page (as all other requests are redirected).
89 89
90 90 `is_default_user` specifically checks if the AuthUser is the user named
91 91 "default". Use `is_anonymous` to check for both "default" and "no user".
92 92 """
93 93
94 94 @classmethod
95 95 def make(cls, dbuser=None, is_external_auth=False, ip_addr=None):
96 96 """Create an AuthUser to be authenticated ... or return None if user for some reason can't be authenticated.
97 97 Checks that a non-None dbuser is provided, is active, and that the IP address is ok.
98 98 """
99 99 assert ip_addr is not None
100 100 if dbuser is None:
101 101 log.info('No db user for authentication')
102 102 return None
103 103 if not dbuser.active:
104 104 log.info('Db user %s not active', dbuser.username)
105 105 return None
106 106 allowed_ips = AuthUser.get_allowed_ips(dbuser.user_id)
107 107 if not check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
108 108 log.info('Access for %s from %s forbidden - not in %s', dbuser.username, ip_addr, allowed_ips)
109 109 return None
110 110 return cls(dbuser=dbuser, is_external_auth=is_external_auth)
111 111
112 112 def __init__(self, user_id=None, dbuser=None, is_external_auth=False):
113 113 self.is_external_auth = is_external_auth # container auth - don't show logout option
114 114
115 115 # These attributes will be overridden below if the requested user is
116 116 # found or anonymous access (using the default user) is enabled.
117 117 self.user_id = None
118 118 self.username = None
119 119 self.api_key = None
120 120 self.name = ''
121 121 self.lastname = ''
122 122 self.email = ''
123 123 self.admin = False
124 124
125 125 # Look up database user, if necessary.
126 126 if user_id is not None:
127 127 assert dbuser is None
128 128 log.debug('Auth User lookup by USER ID %s', user_id)
129 129 dbuser = UserModel().get(user_id)
130 130 assert dbuser is not None
131 131 else:
132 132 assert dbuser is not None
133 133 log.debug('Auth User lookup by database user %s', dbuser)
134 134
135 135 log.debug('filling %s data', dbuser)
136 136 self.is_anonymous = dbuser.is_default_user
137 137 if dbuser.is_default_user and not dbuser.active:
138 138 self.username = 'None'
139 139 self.is_default_user = False
140 140 else:
141 141 # copy non-confidential database fields from a `db.User` to this `AuthUser`.
142 142 for k, v in dbuser.get_dict().items():
143 143 assert k not in ['api_keys', 'permissions']
144 144 setattr(self, k, v)
145 145 self.is_default_user = dbuser.is_default_user
146 146 log.debug('Auth User is now %s', self)
147 147
148 148 @LazyProperty
149 149 def global_permissions(self):
150 150 log.debug('Getting global permissions for %s', self)
151 151
152 152 if self.is_admin:
153 153 return set(['hg.admin'])
154 154
155 155 global_permissions = set()
156 156
157 157 # default global permissions from the default user
158 158 default_global_perms = db.UserToPerm.query() \
159 159 .filter(db.UserToPerm.user_id == kallithea.DEFAULT_USER_ID) \
160 160 .options(joinedload(db.UserToPerm.permission))
161 161 for perm in default_global_perms:
162 162 global_permissions.add(perm.permission.permission_name)
163 163
164 164 # user group global permissions
165 165 user_perms_from_users_groups = meta.Session().query(db.UserGroupToPerm) \
166 166 .options(joinedload(db.UserGroupToPerm.permission)) \
167 167 .join((db.UserGroupMember, db.UserGroupToPerm.users_group_id ==
168 168 db.UserGroupMember.users_group_id)) \
169 169 .filter(db.UserGroupMember.user_id == self.user_id) \
170 170 .join((db.UserGroup, db.UserGroupMember.users_group_id ==
171 171 db.UserGroup.users_group_id)) \
172 172 .filter(db.UserGroup.users_group_active == True) \
173 173 .order_by(db.UserGroupToPerm.users_group_id) \
174 174 .all()
175 175 # need to group here by groups since user can be in more than
176 176 # one group
177 177 _grouped = [[x, list(y)] for x, y in
178 178 itertools.groupby(user_perms_from_users_groups,
179 179 lambda x:x.users_group)]
180 180 for gr, perms in _grouped:
181 181 for perm in perms:
182 182 global_permissions.add(perm.permission.permission_name)
183 183
184 184 # user specific global permissions
185 185 user_perms = meta.Session().query(db.UserToPerm) \
186 186 .options(joinedload(db.UserToPerm.permission)) \
187 187 .filter(db.UserToPerm.user_id == self.user_id).all()
188 188 for perm in user_perms:
189 189 global_permissions.add(perm.permission.permission_name)
190 190
191 191 # for each kind of global permissions, only keep the one with heighest weight
192 192 kind_max_perm = {}
193 193 for perm in sorted(global_permissions, key=lambda n: PERM_WEIGHTS.get(n, -1)):
194 194 kind = perm.rsplit('.', 1)[0]
195 195 kind_max_perm[kind] = perm
196 196 return set(kind_max_perm.values())
197 197
198 198 @LazyProperty
199 199 def repository_permissions(self):
200 200 log.debug('Getting repository permissions for %s', self)
201 201 repository_permissions = {}
202 202 default_repo_perms = db.Permission.get_default_perms(kallithea.DEFAULT_USER_ID)
203 203
204 204 if self.is_admin:
205 205 for perm in default_repo_perms:
206 206 r_k = perm.repository.repo_name
207 207 p = 'repository.admin'
208 208 repository_permissions[r_k] = p
209 209
210 210 else:
211 211 # defaults for repositories from default user
212 212 for perm in default_repo_perms:
213 213 r_k = perm.repository.repo_name
214 214 if perm.repository.owner_id == self.user_id:
215 215 p = 'repository.admin'
216 216 elif perm.repository.private:
217 217 p = 'repository.none'
218 218 else:
219 219 p = perm.permission.permission_name
220 220 repository_permissions[r_k] = p
221 221
222 222 # user group repository permissions
223 223 user_repo_perms_from_users_groups = \
224 224 meta.Session().query(db.UserGroupRepoToPerm) \
225 225 .join((db.UserGroup, db.UserGroupRepoToPerm.users_group_id ==
226 226 db.UserGroup.users_group_id)) \
227 227 .filter(db.UserGroup.users_group_active == True) \
228 228 .join((db.UserGroupMember, db.UserGroupRepoToPerm.users_group_id ==
229 229 db.UserGroupMember.users_group_id)) \
230 230 .filter(db.UserGroupMember.user_id == self.user_id) \
231 231 .options(joinedload(db.UserGroupRepoToPerm.repository)) \
232 232 .options(joinedload(db.UserGroupRepoToPerm.permission)) \
233 233 .all()
234 234 for perm in user_repo_perms_from_users_groups:
235 235 bump_permission(repository_permissions,
236 236 perm.repository.repo_name,
237 237 perm.permission.permission_name)
238 238
239 239 # user permissions for repositories
240 240 user_repo_perms = db.Permission.get_default_perms(self.user_id)
241 241 for perm in user_repo_perms:
242 242 bump_permission(repository_permissions,
243 243 perm.repository.repo_name,
244 244 perm.permission.permission_name)
245 245
246 246 return repository_permissions
247 247
248 248 @LazyProperty
249 249 def repository_group_permissions(self):
250 250 log.debug('Getting repository group permissions for %s', self)
251 251 repository_group_permissions = {}
252 252 default_repo_groups_perms = db.Permission.get_default_group_perms(kallithea.DEFAULT_USER_ID)
253 253
254 254 if self.is_admin:
255 255 for perm in default_repo_groups_perms:
256 256 rg_k = perm.group.group_name
257 257 p = 'group.admin'
258 258 repository_group_permissions[rg_k] = p
259 259
260 260 else:
261 261 # defaults for repository groups taken from default user permission
262 262 # on given group
263 263 for perm in default_repo_groups_perms:
264 264 rg_k = perm.group.group_name
265 p = perm.permission.permission_name
265 if perm.group.owner_id == self.user_id:
266 p = 'group.admin'
267 else:
268 p = perm.permission.permission_name
266 269 repository_group_permissions[rg_k] = p
267 270
268 271 # user group for repo groups permissions
269 272 user_repo_group_perms_from_users_groups = \
270 273 meta.Session().query(db.UserGroupRepoGroupToPerm) \
271 274 .join((db.UserGroup, db.UserGroupRepoGroupToPerm.users_group_id ==
272 275 db.UserGroup.users_group_id)) \
273 276 .filter(db.UserGroup.users_group_active == True) \
274 277 .join((db.UserGroupMember, db.UserGroupRepoGroupToPerm.users_group_id
275 278 == db.UserGroupMember.users_group_id)) \
276 279 .filter(db.UserGroupMember.user_id == self.user_id) \
277 280 .options(joinedload(db.UserGroupRepoGroupToPerm.permission)) \
278 281 .all()
279 282 for perm in user_repo_group_perms_from_users_groups:
280 283 bump_permission(repository_group_permissions,
281 284 perm.group.group_name,
282 285 perm.permission.permission_name)
283 286
284 287 # user explicit permissions for repository groups
285 288 user_repo_groups_perms = db.Permission.get_default_group_perms(self.user_id)
286 289 for perm in user_repo_groups_perms:
287 290 bump_permission(repository_group_permissions,
288 291 perm.group.group_name,
289 292 perm.permission.permission_name)
290 293
291 294 return repository_group_permissions
292 295
293 296 @LazyProperty
294 297 def user_group_permissions(self):
295 298 log.debug('Getting user group permissions for %s', self)
296 299 user_group_permissions = {}
297 300 default_user_group_perms = db.Permission.get_default_user_group_perms(kallithea.DEFAULT_USER_ID)
298 301
299 302 if self.is_admin:
300 303 for perm in default_user_group_perms:
301 304 u_k = perm.user_group.users_group_name
302 305 p = 'usergroup.admin'
303 306 user_group_permissions[u_k] = p
304 307
305 308 else:
306 309 # defaults for user groups taken from default user permission
307 310 # on given user group
308 311 for perm in default_user_group_perms:
309 312 u_k = perm.user_group.users_group_name
310 313 p = perm.permission.permission_name
311 314 user_group_permissions[u_k] = p
312 315
313 316 # user group for user group permissions
314 317 user_group_user_groups_perms = \
315 318 meta.Session().query(db.UserGroupUserGroupToPerm) \
316 319 .join((db.UserGroup, db.UserGroupUserGroupToPerm.target_user_group_id
317 320 == db.UserGroup.users_group_id)) \
318 321 .join((db.UserGroupMember, db.UserGroupUserGroupToPerm.user_group_id
319 322 == db.UserGroupMember.users_group_id)) \
320 323 .filter(db.UserGroupMember.user_id == self.user_id) \
321 324 .join((db.UserGroup, db.UserGroupMember.users_group_id ==
322 325 db.UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
323 326 .filter(db.UserGroup.users_group_active == True) \
324 327 .options(joinedload(db.UserGroupUserGroupToPerm.permission)) \
325 328 .all()
326 329 for perm in user_group_user_groups_perms:
327 330 bump_permission(user_group_permissions,
328 331 perm.target_user_group.users_group_name,
329 332 perm.permission.permission_name)
330 333
331 334 # user explicit permission for user groups
332 335 user_user_groups_perms = db.Permission.get_default_user_group_perms(self.user_id)
333 336 for perm in user_user_groups_perms:
334 337 bump_permission(user_group_permissions,
335 338 perm.user_group.users_group_name,
336 339 perm.permission.permission_name)
337 340
338 341 return user_group_permissions
339 342
340 343 @LazyProperty
341 344 def permissions(self):
342 345 """dict with all 4 kind of permissions - mainly for backwards compatibility"""
343 346 return {
344 347 'global': self.global_permissions,
345 348 'repositories': self.repository_permissions,
346 349 'repositories_groups': self.repository_group_permissions,
347 350 'user_groups': self.user_group_permissions,
348 351 }
349 352
350 353 def has_repository_permission_level(self, repo_name, level, purpose=None):
351 354 required_perms = {
352 355 'read': ['repository.read', 'repository.write', 'repository.admin'],
353 356 'write': ['repository.write', 'repository.admin'],
354 357 'admin': ['repository.admin'],
355 358 }[level]
356 359 actual_perm = self.repository_permissions.get(repo_name)
357 360 ok = actual_perm in required_perms
358 361 log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
359 362 self.username, level, repo_name, purpose, ok, actual_perm)
360 363 return ok
361 364
362 365 def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
363 366 required_perms = {
364 367 'read': ['group.read', 'group.write', 'group.admin'],
365 368 'write': ['group.write', 'group.admin'],
366 369 'admin': ['group.admin'],
367 370 }[level]
368 371 actual_perm = self.repository_group_permissions.get(repo_group_name)
369 372 ok = actual_perm in required_perms
370 373 log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
371 374 self.username, level, repo_group_name, purpose, ok, actual_perm)
372 375 return ok
373 376
374 377 def has_user_group_permission_level(self, user_group_name, level, purpose=None):
375 378 required_perms = {
376 379 'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
377 380 'write': ['usergroup.write', 'usergroup.admin'],
378 381 'admin': ['usergroup.admin'],
379 382 }[level]
380 383 actual_perm = self.user_group_permissions.get(user_group_name)
381 384 ok = actual_perm in required_perms
382 385 log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
383 386 self.username, level, user_group_name, purpose, ok, actual_perm)
384 387 return ok
385 388
386 389 @property
387 390 def api_keys(self):
388 391 return self._get_api_keys()
389 392
390 393 def _get_api_keys(self):
391 394 api_keys = [self.api_key]
392 395 for api_key in db.UserApiKeys.query() \
393 396 .filter_by(user_id=self.user_id, is_expired=False):
394 397 api_keys.append(api_key.api_key)
395 398
396 399 return api_keys
397 400
398 401 @property
399 402 def is_admin(self):
400 403 return self.admin
401 404
402 405 @property
403 406 def repositories_admin(self):
404 407 """
405 408 Returns list of repositories you're an admin of
406 409 """
407 410 return [x[0] for x in self.repository_permissions.items()
408 411 if x[1] == 'repository.admin']
409 412
410 413 @property
411 414 def repository_groups_admin(self):
412 415 """
413 416 Returns list of repository groups you're an admin of
414 417 """
415 418 return [x[0] for x in self.repository_group_permissions.items()
416 419 if x[1] == 'group.admin']
417 420
418 421 @property
419 422 def user_groups_admin(self):
420 423 """
421 424 Returns list of user groups you're an admin of
422 425 """
423 426 return [x[0] for x in self.user_group_permissions.items()
424 427 if x[1] == 'usergroup.admin']
425 428
426 429 def __repr__(self):
427 430 return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
428 431
429 432 def to_cookie(self):
430 433 """ Serializes this login session to a cookie `dict`. """
431 434 return {
432 435 'user_id': self.user_id,
433 436 'is_external_auth': self.is_external_auth,
434 437 }
435 438
436 439 @staticmethod
437 440 def from_cookie(cookie, ip_addr):
438 441 """
439 442 Deserializes an `AuthUser` from a cookie `dict` ... or return None.
440 443 """
441 444 return AuthUser.make(
442 445 dbuser=UserModel().get(cookie.get('user_id')),
443 446 is_external_auth=cookie.get('is_external_auth', False),
444 447 ip_addr=ip_addr,
445 448 )
446 449
447 450 @classmethod
448 451 def get_allowed_ips(cls, user_id):
449 452 _set = set()
450 453
451 454 default_ips = db.UserIpMap.query().filter(db.UserIpMap.user_id == kallithea.DEFAULT_USER_ID)
452 455 for ip in default_ips:
453 456 try:
454 457 _set.add(ip.ip_addr)
455 458 except ObjectDeletedError:
456 459 # since we use heavy caching sometimes it happens that we get
457 460 # deleted objects here, we just skip them
458 461 pass
459 462
460 463 user_ips = db.UserIpMap.query().filter(db.UserIpMap.user_id == user_id)
461 464 for ip in user_ips:
462 465 try:
463 466 _set.add(ip.ip_addr)
464 467 except ObjectDeletedError:
465 468 # since we use heavy caching sometimes it happens that we get
466 469 # deleted objects here, we just skip them
467 470 pass
468 471 return _set or set(['0.0.0.0/0', '::/0'])
469 472
470 473 def get_all_user_repos(self):
471 474 """
472 475 Gets all repositories that user have at least read access
473 476 """
474 477 repos = [repo_name
475 478 for repo_name, perm in self.repository_permissions.items()
476 479 if perm in ['repository.read', 'repository.write', 'repository.admin']
477 480 ]
478 481 return db.Repository.query().filter(db.Repository.repo_name.in_(repos))
479 482
480 483
481 484 #==============================================================================
482 485 # CHECK DECORATORS
483 486 #==============================================================================
484 487
485 488 def _redirect_to_login(message=None):
486 489 """Return an exception that must be raised. It will redirect to the login
487 490 page which will redirect back to the current URL after authentication.
488 491 The optional message will be shown in a flash message."""
489 492 if message:
490 493 webutils.flash(message, category='warning')
491 494 p = request.path_qs
492 495 log.debug('Redirecting to login page, origin: %s', p)
493 496 return HTTPFound(location=url('login_home', came_from=p))
494 497
495 498
496 499 # Use as decorator
497 500 class LoginRequired(object):
498 501 """Client must be logged in as a valid User, or we'll redirect to the login
499 502 page.
500 503
501 504 If the "default" user is enabled and allow_default_user is true, that is
502 505 considered valid too.
503 506
504 507 Also checks that IP address is allowed.
505 508 """
506 509
507 510 def __init__(self, allow_default_user=False):
508 511 self.allow_default_user = allow_default_user
509 512
510 513 def __call__(self, func):
511 514 return decorator(self.__wrapper, func)
512 515
513 516 def __wrapper(self, func, *fargs, **fkwargs):
514 517 controller = fargs[0]
515 518 user = request.authuser
516 519 loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
517 520 log.debug('Checking access for user %s @ %s', user, loc)
518 521
519 522 # regular user authentication
520 523 if user.is_default_user:
521 524 if self.allow_default_user:
522 525 log.info('default user @ %s', loc)
523 526 return func(*fargs, **fkwargs)
524 527 log.info('default user is redirected to login @ %s', loc)
525 528 elif user.is_anonymous: # default user is disabled and no proper authentication
526 529 log.info('anonymous user is redirected to login @ %s', loc)
527 530 else: # regular authentication
528 531 log.info('user %s authenticated with regular auth @ %s', user, loc)
529 532 return func(*fargs, **fkwargs)
530 533 raise _redirect_to_login()
531 534
532 535
533 536 # Use as decorator
534 537 class NotAnonymous(object):
535 538 """Ensures that client is not logged in as the "default" user, and
536 539 redirects to the login page otherwise. Must be used together with
537 540 LoginRequired."""
538 541
539 542 def __call__(self, func):
540 543 return decorator(self.__wrapper, func)
541 544
542 545 def __wrapper(self, func, *fargs, **fkwargs):
543 546 cls = fargs[0]
544 547 user = request.authuser
545 548
546 549 log.debug('Checking that user %s is not anonymous @%s', user.username, cls)
547 550
548 551 if user.is_default_user:
549 552 raise _redirect_to_login(_('You need to be a registered user to '
550 553 'perform this action'))
551 554 else:
552 555 return func(*fargs, **fkwargs)
553 556
554 557
555 558 class _PermsDecorator(object):
556 559 """Base class for controller decorators with multiple permissions"""
557 560
558 561 def __init__(self, *required_perms):
559 562 self.required_perms = required_perms # usually very short - a list is thus fine
560 563
561 564 def __call__(self, func):
562 565 return decorator(self.__wrapper, func)
563 566
564 567 def __wrapper(self, func, *fargs, **fkwargs):
565 568 cls = fargs[0]
566 569 user = request.authuser
567 570 log.debug('checking %s permissions %s for %s %s',
568 571 self.__class__.__name__, self.required_perms, cls, user)
569 572
570 573 if self.check_permissions(user):
571 574 log.debug('Permission granted for %s %s', cls, user)
572 575 return func(*fargs, **fkwargs)
573 576
574 577 else:
575 578 log.info('Permission denied for %s %s', cls, user)
576 579 if user.is_default_user:
577 580 raise _redirect_to_login(_('You need to be signed in to view this page'))
578 581 else:
579 582 raise HTTPForbidden()
580 583
581 584 def check_permissions(self, user):
582 585 raise NotImplementedError()
583 586
584 587
585 588 class HasPermissionAnyDecorator(_PermsDecorator):
586 589 """
587 590 Checks the user has any of the given global permissions.
588 591 """
589 592
590 593 def check_permissions(self, user):
591 594 return any(p in user.global_permissions for p in self.required_perms)
592 595
593 596
594 597 class _PermDecorator(_PermsDecorator):
595 598 """Base class for controller decorators with a single permission"""
596 599
597 600 def __init__(self, required_perm):
598 601 _PermsDecorator.__init__(self, [required_perm])
599 602 self.required_perm = required_perm
600 603
601 604
602 605 class HasRepoPermissionLevelDecorator(_PermDecorator):
603 606 """
604 607 Checks the user has at least the specified permission level for the requested repository.
605 608 """
606 609
607 610 def check_permissions(self, user):
608 611 repo_name = get_repo_slug(request)
609 612 return user.has_repository_permission_level(repo_name, self.required_perm)
610 613
611 614
612 615 class HasRepoGroupPermissionLevelDecorator(_PermDecorator):
613 616 """
614 617 Checks the user has any of given permissions for the requested repository group.
615 618 """
616 619
617 620 def check_permissions(self, user):
618 621 repo_group_name = get_repo_group_slug(request)
619 622 return user.has_repository_group_permission_level(repo_group_name, self.required_perm)
620 623
621 624
622 625 class HasUserGroupPermissionLevelDecorator(_PermDecorator):
623 626 """
624 627 Checks for access permission for any of given predicates for specific
625 628 user group. In order to fulfill the request any of predicates must be meet
626 629 """
627 630
628 631 def check_permissions(self, user):
629 632 user_group_name = get_user_group_slug(request)
630 633 return user.has_user_group_permission_level(user_group_name, self.required_perm)
631 634
632 635
633 636 #==============================================================================
634 637 # CHECK FUNCTIONS
635 638 #==============================================================================
636 639
637 640 class _PermsFunction(object):
638 641 """Base function for other check functions with multiple permissions"""
639 642
640 643 def __init__(self, *required_perms):
641 644 self.required_perms = required_perms # usually very short - a list is thus fine
642 645
643 646 def __bool__(self):
644 647 """ Defend against accidentally forgetting to call the object
645 648 and instead evaluating it directly in a boolean context,
646 649 which could have security implications.
647 650 """
648 651 raise AssertionError(self.__class__.__name__ + ' is not a bool and must be called!')
649 652
650 653 def __call__(self, *a, **b):
651 654 raise NotImplementedError()
652 655
653 656
654 657 class HasPermissionAny(_PermsFunction):
655 658
656 659 def __call__(self, purpose=None):
657 660 ok = any(p in request.authuser.global_permissions for p in self.required_perms)
658 661
659 662 log.debug('Check %s for global %s (%s): %s',
660 663 request.authuser.username, self.required_perms, purpose, ok)
661 664 return ok
662 665
663 666
664 667 class _PermFunction(_PermsFunction):
665 668 """Base function for other check functions with a single permission"""
666 669
667 670 def __init__(self, required_perm):
668 671 _PermsFunction.__init__(self, [required_perm])
669 672 self.required_perm = required_perm
670 673
671 674
672 675 class HasRepoPermissionLevel(_PermFunction):
673 676
674 677 def __call__(self, repo_name, purpose=None):
675 678 return request.authuser.has_repository_permission_level(repo_name, self.required_perm, purpose)
676 679
677 680
678 681 class HasRepoGroupPermissionLevel(_PermFunction):
679 682
680 683 def __call__(self, group_name, purpose=None):
681 684 return request.authuser.has_repository_group_permission_level(group_name, self.required_perm, purpose)
682 685
683 686
684 687 class HasUserGroupPermissionLevel(_PermFunction):
685 688
686 689 def __call__(self, user_group_name, purpose=None):
687 690 return request.authuser.has_user_group_permission_level(user_group_name, self.required_perm, purpose)
688 691
689 692
690 693 #==============================================================================
691 694 # SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
692 695 #==============================================================================
693 696
694 697 class HasPermissionAnyMiddleware(object):
695 698 def __init__(self, *perms):
696 699 self.required_perms = set(perms)
697 700
698 701 def __call__(self, authuser, repo_name, purpose=None):
699 702 try:
700 703 ok = authuser.repository_permissions[repo_name] in self.required_perms
701 704 except KeyError:
702 705 ok = False
703 706
704 707 log.debug('Middleware check %s for %s for repo %s (%s): %s', authuser.username, self.required_perms, repo_name, purpose, ok)
705 708 return ok
706 709
707 710
708 711 def check_ip_access(source_ip, allowed_ips=None):
709 712 """
710 713 Checks if source_ip is a subnet of any of allowed_ips.
711 714
712 715 :param source_ip:
713 716 :param allowed_ips: list of allowed ips together with mask
714 717 """
715 718 source_ip = source_ip.split('%', 1)[0]
716 719 log.debug('checking if ip:%s is subnet of %s', source_ip, allowed_ips)
717 720 if isinstance(allowed_ips, (tuple, list, set)):
718 721 for ip in allowed_ips:
719 722 if ipaddr.IPAddress(source_ip) in ipaddr.IPNetwork(ip):
720 723 log.debug('IP %s is network %s',
721 724 ipaddr.IPAddress(source_ip), ipaddr.IPNetwork(ip))
722 725 return True
723 726 return False
General Comments 0
You need to be logged in to leave comments. Login now