##// END OF EJS Templates
security: use 404 instead of 403 code on permission decorator to prevent resource discovery attacks.
ergo -
r1817:7df55c97 default
parent child Browse files
Show More
@@ -1,2023 +1,2023 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 """
21 """
22 authentication and permission libraries
22 authentication and permission libraries
23 """
23 """
24
24
25 import os
25 import os
26 import inspect
26 import inspect
27 import collections
27 import collections
28 import fnmatch
28 import fnmatch
29 import hashlib
29 import hashlib
30 import itertools
30 import itertools
31 import logging
31 import logging
32 import random
32 import random
33 import traceback
33 import traceback
34 from functools import wraps
34 from functools import wraps
35
35
36 import ipaddress
36 import ipaddress
37 from pyramid.httpexceptions import HTTPForbidden, HTTPFound
37 from pyramid.httpexceptions import HTTPForbidden, HTTPFound, HTTPNotFound
38 from pylons.i18n.translation import _
38 from pylons.i18n.translation import _
39 # NOTE(marcink): this has to be removed only after pyramid migration,
39 # NOTE(marcink): this has to be removed only after pyramid migration,
40 # replace with _ = request.translate
40 # replace with _ = request.translate
41 from sqlalchemy.orm.exc import ObjectDeletedError
41 from sqlalchemy.orm.exc import ObjectDeletedError
42 from sqlalchemy.orm import joinedload
42 from sqlalchemy.orm import joinedload
43 from zope.cachedescriptors.property import Lazy as LazyProperty
43 from zope.cachedescriptors.property import Lazy as LazyProperty
44
44
45 import rhodecode
45 import rhodecode
46 from rhodecode.model import meta
46 from rhodecode.model import meta
47 from rhodecode.model.meta import Session
47 from rhodecode.model.meta import Session
48 from rhodecode.model.user import UserModel
48 from rhodecode.model.user import UserModel
49 from rhodecode.model.db import (
49 from rhodecode.model.db import (
50 User, Repository, Permission, UserToPerm, UserGroupToPerm, UserGroupMember,
50 User, Repository, Permission, UserToPerm, UserGroupToPerm, UserGroupMember,
51 UserIpMap, UserApiKeys, RepoGroup)
51 UserIpMap, UserApiKeys, RepoGroup)
52 from rhodecode.lib import caches
52 from rhodecode.lib import caches
53 from rhodecode.lib.utils2 import safe_unicode, aslist, safe_str, md5
53 from rhodecode.lib.utils2 import safe_unicode, aslist, safe_str, md5
54 from rhodecode.lib.utils import (
54 from rhodecode.lib.utils import (
55 get_repo_slug, get_repo_group_slug, get_user_group_slug)
55 get_repo_slug, get_repo_group_slug, get_user_group_slug)
56 from rhodecode.lib.caching_query import FromCache
56 from rhodecode.lib.caching_query import FromCache
57
57
58
58
59 if rhodecode.is_unix:
59 if rhodecode.is_unix:
60 import bcrypt
60 import bcrypt
61
61
62 log = logging.getLogger(__name__)
62 log = logging.getLogger(__name__)
63
63
64 csrf_token_key = "csrf_token"
64 csrf_token_key = "csrf_token"
65
65
66
66
67 class PasswordGenerator(object):
67 class PasswordGenerator(object):
68 """
68 """
69 This is a simple class for generating password from different sets of
69 This is a simple class for generating password from different sets of
70 characters
70 characters
71 usage::
71 usage::
72
72
73 passwd_gen = PasswordGenerator()
73 passwd_gen = PasswordGenerator()
74 #print 8-letter password containing only big and small letters
74 #print 8-letter password containing only big and small letters
75 of alphabet
75 of alphabet
76 passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
76 passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
77 """
77 """
78 ALPHABETS_NUM = r'''1234567890'''
78 ALPHABETS_NUM = r'''1234567890'''
79 ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
79 ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
80 ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
80 ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
81 ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
81 ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
82 ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
82 ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
83 + ALPHABETS_NUM + ALPHABETS_SPECIAL
83 + ALPHABETS_NUM + ALPHABETS_SPECIAL
84 ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
84 ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
85 ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
85 ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
86 ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
86 ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
87 ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
87 ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
88
88
89 def __init__(self, passwd=''):
89 def __init__(self, passwd=''):
90 self.passwd = passwd
90 self.passwd = passwd
91
91
92 def gen_password(self, length, type_=None):
92 def gen_password(self, length, type_=None):
93 if type_ is None:
93 if type_ is None:
94 type_ = self.ALPHABETS_FULL
94 type_ = self.ALPHABETS_FULL
95 self.passwd = ''.join([random.choice(type_) for _ in xrange(length)])
95 self.passwd = ''.join([random.choice(type_) for _ in xrange(length)])
96 return self.passwd
96 return self.passwd
97
97
98
98
99 class _RhodeCodeCryptoBase(object):
99 class _RhodeCodeCryptoBase(object):
100 ENC_PREF = None
100 ENC_PREF = None
101
101
102 def hash_create(self, str_):
102 def hash_create(self, str_):
103 """
103 """
104 hash the string using
104 hash the string using
105
105
106 :param str_: password to hash
106 :param str_: password to hash
107 """
107 """
108 raise NotImplementedError
108 raise NotImplementedError
109
109
110 def hash_check_with_upgrade(self, password, hashed):
110 def hash_check_with_upgrade(self, password, hashed):
111 """
111 """
112 Returns tuple in which first element is boolean that states that
112 Returns tuple in which first element is boolean that states that
113 given password matches it's hashed version, and the second is new hash
113 given password matches it's hashed version, and the second is new hash
114 of the password, in case this password should be migrated to new
114 of the password, in case this password should be migrated to new
115 cipher.
115 cipher.
116 """
116 """
117 checked_hash = self.hash_check(password, hashed)
117 checked_hash = self.hash_check(password, hashed)
118 return checked_hash, None
118 return checked_hash, None
119
119
120 def hash_check(self, password, hashed):
120 def hash_check(self, password, hashed):
121 """
121 """
122 Checks matching password with it's hashed value.
122 Checks matching password with it's hashed value.
123
123
124 :param password: password
124 :param password: password
125 :param hashed: password in hashed form
125 :param hashed: password in hashed form
126 """
126 """
127 raise NotImplementedError
127 raise NotImplementedError
128
128
129 def _assert_bytes(self, value):
129 def _assert_bytes(self, value):
130 """
130 """
131 Passing in an `unicode` object can lead to hard to detect issues
131 Passing in an `unicode` object can lead to hard to detect issues
132 if passwords contain non-ascii characters. Doing a type check
132 if passwords contain non-ascii characters. Doing a type check
133 during runtime, so that such mistakes are detected early on.
133 during runtime, so that such mistakes are detected early on.
134 """
134 """
135 if not isinstance(value, str):
135 if not isinstance(value, str):
136 raise TypeError(
136 raise TypeError(
137 "Bytestring required as input, got %r." % (value, ))
137 "Bytestring required as input, got %r." % (value, ))
138
138
139
139
140 class _RhodeCodeCryptoBCrypt(_RhodeCodeCryptoBase):
140 class _RhodeCodeCryptoBCrypt(_RhodeCodeCryptoBase):
141 ENC_PREF = ('$2a$10', '$2b$10')
141 ENC_PREF = ('$2a$10', '$2b$10')
142
142
143 def hash_create(self, str_):
143 def hash_create(self, str_):
144 self._assert_bytes(str_)
144 self._assert_bytes(str_)
145 return bcrypt.hashpw(str_, bcrypt.gensalt(10))
145 return bcrypt.hashpw(str_, bcrypt.gensalt(10))
146
146
147 def hash_check_with_upgrade(self, password, hashed):
147 def hash_check_with_upgrade(self, password, hashed):
148 """
148 """
149 Returns tuple in which first element is boolean that states that
149 Returns tuple in which first element is boolean that states that
150 given password matches it's hashed version, and the second is new hash
150 given password matches it's hashed version, and the second is new hash
151 of the password, in case this password should be migrated to new
151 of the password, in case this password should be migrated to new
152 cipher.
152 cipher.
153
153
154 This implements special upgrade logic which works like that:
154 This implements special upgrade logic which works like that:
155 - check if the given password == bcrypted hash, if yes then we
155 - check if the given password == bcrypted hash, if yes then we
156 properly used password and it was already in bcrypt. Proceed
156 properly used password and it was already in bcrypt. Proceed
157 without any changes
157 without any changes
158 - if bcrypt hash check is not working try with sha256. If hash compare
158 - if bcrypt hash check is not working try with sha256. If hash compare
159 is ok, it means we using correct but old hashed password. indicate
159 is ok, it means we using correct but old hashed password. indicate
160 hash change and proceed
160 hash change and proceed
161 """
161 """
162
162
163 new_hash = None
163 new_hash = None
164
164
165 # regular pw check
165 # regular pw check
166 password_match_bcrypt = self.hash_check(password, hashed)
166 password_match_bcrypt = self.hash_check(password, hashed)
167
167
168 # now we want to know if the password was maybe from sha256
168 # now we want to know if the password was maybe from sha256
169 # basically calling _RhodeCodeCryptoSha256().hash_check()
169 # basically calling _RhodeCodeCryptoSha256().hash_check()
170 if not password_match_bcrypt:
170 if not password_match_bcrypt:
171 if _RhodeCodeCryptoSha256().hash_check(password, hashed):
171 if _RhodeCodeCryptoSha256().hash_check(password, hashed):
172 new_hash = self.hash_create(password) # make new bcrypt hash
172 new_hash = self.hash_create(password) # make new bcrypt hash
173 password_match_bcrypt = True
173 password_match_bcrypt = True
174
174
175 return password_match_bcrypt, new_hash
175 return password_match_bcrypt, new_hash
176
176
177 def hash_check(self, password, hashed):
177 def hash_check(self, password, hashed):
178 """
178 """
179 Checks matching password with it's hashed value.
179 Checks matching password with it's hashed value.
180
180
181 :param password: password
181 :param password: password
182 :param hashed: password in hashed form
182 :param hashed: password in hashed form
183 """
183 """
184 self._assert_bytes(password)
184 self._assert_bytes(password)
185 try:
185 try:
186 return bcrypt.hashpw(password, hashed) == hashed
186 return bcrypt.hashpw(password, hashed) == hashed
187 except ValueError as e:
187 except ValueError as e:
188 # we're having a invalid salt here probably, we should not crash
188 # we're having a invalid salt here probably, we should not crash
189 # just return with False as it would be a wrong password.
189 # just return with False as it would be a wrong password.
190 log.debug('Failed to check password hash using bcrypt %s',
190 log.debug('Failed to check password hash using bcrypt %s',
191 safe_str(e))
191 safe_str(e))
192
192
193 return False
193 return False
194
194
195
195
196 class _RhodeCodeCryptoSha256(_RhodeCodeCryptoBase):
196 class _RhodeCodeCryptoSha256(_RhodeCodeCryptoBase):
197 ENC_PREF = '_'
197 ENC_PREF = '_'
198
198
199 def hash_create(self, str_):
199 def hash_create(self, str_):
200 self._assert_bytes(str_)
200 self._assert_bytes(str_)
201 return hashlib.sha256(str_).hexdigest()
201 return hashlib.sha256(str_).hexdigest()
202
202
203 def hash_check(self, password, hashed):
203 def hash_check(self, password, hashed):
204 """
204 """
205 Checks matching password with it's hashed value.
205 Checks matching password with it's hashed value.
206
206
207 :param password: password
207 :param password: password
208 :param hashed: password in hashed form
208 :param hashed: password in hashed form
209 """
209 """
210 self._assert_bytes(password)
210 self._assert_bytes(password)
211 return hashlib.sha256(password).hexdigest() == hashed
211 return hashlib.sha256(password).hexdigest() == hashed
212
212
213
213
214 class _RhodeCodeCryptoMd5(_RhodeCodeCryptoBase):
214 class _RhodeCodeCryptoMd5(_RhodeCodeCryptoBase):
215 ENC_PREF = '_'
215 ENC_PREF = '_'
216
216
217 def hash_create(self, str_):
217 def hash_create(self, str_):
218 self._assert_bytes(str_)
218 self._assert_bytes(str_)
219 return hashlib.md5(str_).hexdigest()
219 return hashlib.md5(str_).hexdigest()
220
220
221 def hash_check(self, password, hashed):
221 def hash_check(self, password, hashed):
222 """
222 """
223 Checks matching password with it's hashed value.
223 Checks matching password with it's hashed value.
224
224
225 :param password: password
225 :param password: password
226 :param hashed: password in hashed form
226 :param hashed: password in hashed form
227 """
227 """
228 self._assert_bytes(password)
228 self._assert_bytes(password)
229 return hashlib.md5(password).hexdigest() == hashed
229 return hashlib.md5(password).hexdigest() == hashed
230
230
231
231
232 def crypto_backend():
232 def crypto_backend():
233 """
233 """
234 Return the matching crypto backend.
234 Return the matching crypto backend.
235
235
236 Selection is based on if we run tests or not, we pick md5 backend to run
236 Selection is based on if we run tests or not, we pick md5 backend to run
237 tests faster since BCRYPT is expensive to calculate
237 tests faster since BCRYPT is expensive to calculate
238 """
238 """
239 if rhodecode.is_test:
239 if rhodecode.is_test:
240 RhodeCodeCrypto = _RhodeCodeCryptoMd5()
240 RhodeCodeCrypto = _RhodeCodeCryptoMd5()
241 else:
241 else:
242 RhodeCodeCrypto = _RhodeCodeCryptoBCrypt()
242 RhodeCodeCrypto = _RhodeCodeCryptoBCrypt()
243
243
244 return RhodeCodeCrypto
244 return RhodeCodeCrypto
245
245
246
246
247 def get_crypt_password(password):
247 def get_crypt_password(password):
248 """
248 """
249 Create the hash of `password` with the active crypto backend.
249 Create the hash of `password` with the active crypto backend.
250
250
251 :param password: The cleartext password.
251 :param password: The cleartext password.
252 :type password: unicode
252 :type password: unicode
253 """
253 """
254 password = safe_str(password)
254 password = safe_str(password)
255 return crypto_backend().hash_create(password)
255 return crypto_backend().hash_create(password)
256
256
257
257
258 def check_password(password, hashed):
258 def check_password(password, hashed):
259 """
259 """
260 Check if the value in `password` matches the hash in `hashed`.
260 Check if the value in `password` matches the hash in `hashed`.
261
261
262 :param password: The cleartext password.
262 :param password: The cleartext password.
263 :type password: unicode
263 :type password: unicode
264
264
265 :param hashed: The expected hashed version of the password.
265 :param hashed: The expected hashed version of the password.
266 :type hashed: The hash has to be passed in in text representation.
266 :type hashed: The hash has to be passed in in text representation.
267 """
267 """
268 password = safe_str(password)
268 password = safe_str(password)
269 return crypto_backend().hash_check(password, hashed)
269 return crypto_backend().hash_check(password, hashed)
270
270
271
271
272 def generate_auth_token(data, salt=None):
272 def generate_auth_token(data, salt=None):
273 """
273 """
274 Generates API KEY from given string
274 Generates API KEY from given string
275 """
275 """
276
276
277 if salt is None:
277 if salt is None:
278 salt = os.urandom(16)
278 salt = os.urandom(16)
279 return hashlib.sha1(safe_str(data) + salt).hexdigest()
279 return hashlib.sha1(safe_str(data) + salt).hexdigest()
280
280
281
281
282 class CookieStoreWrapper(object):
282 class CookieStoreWrapper(object):
283
283
284 def __init__(self, cookie_store):
284 def __init__(self, cookie_store):
285 self.cookie_store = cookie_store
285 self.cookie_store = cookie_store
286
286
287 def __repr__(self):
287 def __repr__(self):
288 return 'CookieStore<%s>' % (self.cookie_store)
288 return 'CookieStore<%s>' % (self.cookie_store)
289
289
290 def get(self, key, other=None):
290 def get(self, key, other=None):
291 if isinstance(self.cookie_store, dict):
291 if isinstance(self.cookie_store, dict):
292 return self.cookie_store.get(key, other)
292 return self.cookie_store.get(key, other)
293 elif isinstance(self.cookie_store, AuthUser):
293 elif isinstance(self.cookie_store, AuthUser):
294 return self.cookie_store.__dict__.get(key, other)
294 return self.cookie_store.__dict__.get(key, other)
295
295
296
296
297 def _cached_perms_data(user_id, scope, user_is_admin,
297 def _cached_perms_data(user_id, scope, user_is_admin,
298 user_inherit_default_permissions, explicit, algo):
298 user_inherit_default_permissions, explicit, algo):
299
299
300 permissions = PermissionCalculator(
300 permissions = PermissionCalculator(
301 user_id, scope, user_is_admin, user_inherit_default_permissions,
301 user_id, scope, user_is_admin, user_inherit_default_permissions,
302 explicit, algo)
302 explicit, algo)
303 return permissions.calculate()
303 return permissions.calculate()
304
304
305
305
306 class PermOrigin(object):
306 class PermOrigin(object):
307 ADMIN = 'superadmin'
307 ADMIN = 'superadmin'
308
308
309 REPO_USER = 'user:%s'
309 REPO_USER = 'user:%s'
310 REPO_USERGROUP = 'usergroup:%s'
310 REPO_USERGROUP = 'usergroup:%s'
311 REPO_OWNER = 'repo.owner'
311 REPO_OWNER = 'repo.owner'
312 REPO_DEFAULT = 'repo.default'
312 REPO_DEFAULT = 'repo.default'
313 REPO_PRIVATE = 'repo.private'
313 REPO_PRIVATE = 'repo.private'
314
314
315 REPOGROUP_USER = 'user:%s'
315 REPOGROUP_USER = 'user:%s'
316 REPOGROUP_USERGROUP = 'usergroup:%s'
316 REPOGROUP_USERGROUP = 'usergroup:%s'
317 REPOGROUP_OWNER = 'group.owner'
317 REPOGROUP_OWNER = 'group.owner'
318 REPOGROUP_DEFAULT = 'group.default'
318 REPOGROUP_DEFAULT = 'group.default'
319
319
320 USERGROUP_USER = 'user:%s'
320 USERGROUP_USER = 'user:%s'
321 USERGROUP_USERGROUP = 'usergroup:%s'
321 USERGROUP_USERGROUP = 'usergroup:%s'
322 USERGROUP_OWNER = 'usergroup.owner'
322 USERGROUP_OWNER = 'usergroup.owner'
323 USERGROUP_DEFAULT = 'usergroup.default'
323 USERGROUP_DEFAULT = 'usergroup.default'
324
324
325
325
326 class PermOriginDict(dict):
326 class PermOriginDict(dict):
327 """
327 """
328 A special dict used for tracking permissions along with their origins.
328 A special dict used for tracking permissions along with their origins.
329
329
330 `__setitem__` has been overridden to expect a tuple(perm, origin)
330 `__setitem__` has been overridden to expect a tuple(perm, origin)
331 `__getitem__` will return only the perm
331 `__getitem__` will return only the perm
332 `.perm_origin_stack` will return the stack of (perm, origin) set per key
332 `.perm_origin_stack` will return the stack of (perm, origin) set per key
333
333
334 >>> perms = PermOriginDict()
334 >>> perms = PermOriginDict()
335 >>> perms['resource'] = 'read', 'default'
335 >>> perms['resource'] = 'read', 'default'
336 >>> perms['resource']
336 >>> perms['resource']
337 'read'
337 'read'
338 >>> perms['resource'] = 'write', 'admin'
338 >>> perms['resource'] = 'write', 'admin'
339 >>> perms['resource']
339 >>> perms['resource']
340 'write'
340 'write'
341 >>> perms.perm_origin_stack
341 >>> perms.perm_origin_stack
342 {'resource': [('read', 'default'), ('write', 'admin')]}
342 {'resource': [('read', 'default'), ('write', 'admin')]}
343 """
343 """
344
344
345 def __init__(self, *args, **kw):
345 def __init__(self, *args, **kw):
346 dict.__init__(self, *args, **kw)
346 dict.__init__(self, *args, **kw)
347 self.perm_origin_stack = {}
347 self.perm_origin_stack = {}
348
348
349 def __setitem__(self, key, (perm, origin)):
349 def __setitem__(self, key, (perm, origin)):
350 self.perm_origin_stack.setdefault(key, []).append((perm, origin))
350 self.perm_origin_stack.setdefault(key, []).append((perm, origin))
351 dict.__setitem__(self, key, perm)
351 dict.__setitem__(self, key, perm)
352
352
353
353
354 class PermissionCalculator(object):
354 class PermissionCalculator(object):
355
355
356 def __init__(
356 def __init__(
357 self, user_id, scope, user_is_admin,
357 self, user_id, scope, user_is_admin,
358 user_inherit_default_permissions, explicit, algo):
358 user_inherit_default_permissions, explicit, algo):
359 self.user_id = user_id
359 self.user_id = user_id
360 self.user_is_admin = user_is_admin
360 self.user_is_admin = user_is_admin
361 self.inherit_default_permissions = user_inherit_default_permissions
361 self.inherit_default_permissions = user_inherit_default_permissions
362 self.explicit = explicit
362 self.explicit = explicit
363 self.algo = algo
363 self.algo = algo
364
364
365 scope = scope or {}
365 scope = scope or {}
366 self.scope_repo_id = scope.get('repo_id')
366 self.scope_repo_id = scope.get('repo_id')
367 self.scope_repo_group_id = scope.get('repo_group_id')
367 self.scope_repo_group_id = scope.get('repo_group_id')
368 self.scope_user_group_id = scope.get('user_group_id')
368 self.scope_user_group_id = scope.get('user_group_id')
369
369
370 self.default_user_id = User.get_default_user(cache=True).user_id
370 self.default_user_id = User.get_default_user(cache=True).user_id
371
371
372 self.permissions_repositories = PermOriginDict()
372 self.permissions_repositories = PermOriginDict()
373 self.permissions_repository_groups = PermOriginDict()
373 self.permissions_repository_groups = PermOriginDict()
374 self.permissions_user_groups = PermOriginDict()
374 self.permissions_user_groups = PermOriginDict()
375 self.permissions_global = set()
375 self.permissions_global = set()
376
376
377 self.default_repo_perms = Permission.get_default_repo_perms(
377 self.default_repo_perms = Permission.get_default_repo_perms(
378 self.default_user_id, self.scope_repo_id)
378 self.default_user_id, self.scope_repo_id)
379 self.default_repo_groups_perms = Permission.get_default_group_perms(
379 self.default_repo_groups_perms = Permission.get_default_group_perms(
380 self.default_user_id, self.scope_repo_group_id)
380 self.default_user_id, self.scope_repo_group_id)
381 self.default_user_group_perms = \
381 self.default_user_group_perms = \
382 Permission.get_default_user_group_perms(
382 Permission.get_default_user_group_perms(
383 self.default_user_id, self.scope_user_group_id)
383 self.default_user_id, self.scope_user_group_id)
384
384
385 def calculate(self):
385 def calculate(self):
386 if self.user_is_admin:
386 if self.user_is_admin:
387 return self._admin_permissions()
387 return self._admin_permissions()
388
388
389 self._calculate_global_default_permissions()
389 self._calculate_global_default_permissions()
390 self._calculate_global_permissions()
390 self._calculate_global_permissions()
391 self._calculate_default_permissions()
391 self._calculate_default_permissions()
392 self._calculate_repository_permissions()
392 self._calculate_repository_permissions()
393 self._calculate_repository_group_permissions()
393 self._calculate_repository_group_permissions()
394 self._calculate_user_group_permissions()
394 self._calculate_user_group_permissions()
395 return self._permission_structure()
395 return self._permission_structure()
396
396
397 def _admin_permissions(self):
397 def _admin_permissions(self):
398 """
398 """
399 admin user have all default rights for repositories
399 admin user have all default rights for repositories
400 and groups set to admin
400 and groups set to admin
401 """
401 """
402 self.permissions_global.add('hg.admin')
402 self.permissions_global.add('hg.admin')
403 self.permissions_global.add('hg.create.write_on_repogroup.true')
403 self.permissions_global.add('hg.create.write_on_repogroup.true')
404
404
405 # repositories
405 # repositories
406 for perm in self.default_repo_perms:
406 for perm in self.default_repo_perms:
407 r_k = perm.UserRepoToPerm.repository.repo_name
407 r_k = perm.UserRepoToPerm.repository.repo_name
408 p = 'repository.admin'
408 p = 'repository.admin'
409 self.permissions_repositories[r_k] = p, PermOrigin.ADMIN
409 self.permissions_repositories[r_k] = p, PermOrigin.ADMIN
410
410
411 # repository groups
411 # repository groups
412 for perm in self.default_repo_groups_perms:
412 for perm in self.default_repo_groups_perms:
413 rg_k = perm.UserRepoGroupToPerm.group.group_name
413 rg_k = perm.UserRepoGroupToPerm.group.group_name
414 p = 'group.admin'
414 p = 'group.admin'
415 self.permissions_repository_groups[rg_k] = p, PermOrigin.ADMIN
415 self.permissions_repository_groups[rg_k] = p, PermOrigin.ADMIN
416
416
417 # user groups
417 # user groups
418 for perm in self.default_user_group_perms:
418 for perm in self.default_user_group_perms:
419 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
419 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
420 p = 'usergroup.admin'
420 p = 'usergroup.admin'
421 self.permissions_user_groups[u_k] = p, PermOrigin.ADMIN
421 self.permissions_user_groups[u_k] = p, PermOrigin.ADMIN
422
422
423 return self._permission_structure()
423 return self._permission_structure()
424
424
425 def _calculate_global_default_permissions(self):
425 def _calculate_global_default_permissions(self):
426 """
426 """
427 global permissions taken from the default user
427 global permissions taken from the default user
428 """
428 """
429 default_global_perms = UserToPerm.query()\
429 default_global_perms = UserToPerm.query()\
430 .filter(UserToPerm.user_id == self.default_user_id)\
430 .filter(UserToPerm.user_id == self.default_user_id)\
431 .options(joinedload(UserToPerm.permission))
431 .options(joinedload(UserToPerm.permission))
432
432
433 for perm in default_global_perms:
433 for perm in default_global_perms:
434 self.permissions_global.add(perm.permission.permission_name)
434 self.permissions_global.add(perm.permission.permission_name)
435
435
436 def _calculate_global_permissions(self):
436 def _calculate_global_permissions(self):
437 """
437 """
438 Set global system permissions with user permissions or permissions
438 Set global system permissions with user permissions or permissions
439 taken from the user groups of the current user.
439 taken from the user groups of the current user.
440
440
441 The permissions include repo creating, repo group creating, forking
441 The permissions include repo creating, repo group creating, forking
442 etc.
442 etc.
443 """
443 """
444
444
445 # now we read the defined permissions and overwrite what we have set
445 # now we read the defined permissions and overwrite what we have set
446 # before those can be configured from groups or users explicitly.
446 # before those can be configured from groups or users explicitly.
447
447
448 # TODO: johbo: This seems to be out of sync, find out the reason
448 # TODO: johbo: This seems to be out of sync, find out the reason
449 # for the comment below and update it.
449 # for the comment below and update it.
450
450
451 # In case we want to extend this list we should be always in sync with
451 # In case we want to extend this list we should be always in sync with
452 # User.DEFAULT_USER_PERMISSIONS definitions
452 # User.DEFAULT_USER_PERMISSIONS definitions
453 _configurable = frozenset([
453 _configurable = frozenset([
454 'hg.fork.none', 'hg.fork.repository',
454 'hg.fork.none', 'hg.fork.repository',
455 'hg.create.none', 'hg.create.repository',
455 'hg.create.none', 'hg.create.repository',
456 'hg.usergroup.create.false', 'hg.usergroup.create.true',
456 'hg.usergroup.create.false', 'hg.usergroup.create.true',
457 'hg.repogroup.create.false', 'hg.repogroup.create.true',
457 'hg.repogroup.create.false', 'hg.repogroup.create.true',
458 'hg.create.write_on_repogroup.false',
458 'hg.create.write_on_repogroup.false',
459 'hg.create.write_on_repogroup.true',
459 'hg.create.write_on_repogroup.true',
460 'hg.inherit_default_perms.false', 'hg.inherit_default_perms.true'
460 'hg.inherit_default_perms.false', 'hg.inherit_default_perms.true'
461 ])
461 ])
462
462
463 # USER GROUPS comes first user group global permissions
463 # USER GROUPS comes first user group global permissions
464 user_perms_from_users_groups = Session().query(UserGroupToPerm)\
464 user_perms_from_users_groups = Session().query(UserGroupToPerm)\
465 .options(joinedload(UserGroupToPerm.permission))\
465 .options(joinedload(UserGroupToPerm.permission))\
466 .join((UserGroupMember, UserGroupToPerm.users_group_id ==
466 .join((UserGroupMember, UserGroupToPerm.users_group_id ==
467 UserGroupMember.users_group_id))\
467 UserGroupMember.users_group_id))\
468 .filter(UserGroupMember.user_id == self.user_id)\
468 .filter(UserGroupMember.user_id == self.user_id)\
469 .order_by(UserGroupToPerm.users_group_id)\
469 .order_by(UserGroupToPerm.users_group_id)\
470 .all()
470 .all()
471
471
472 # need to group here by groups since user can be in more than
472 # need to group here by groups since user can be in more than
473 # one group, so we get all groups
473 # one group, so we get all groups
474 _explicit_grouped_perms = [
474 _explicit_grouped_perms = [
475 [x, list(y)] for x, y in
475 [x, list(y)] for x, y in
476 itertools.groupby(user_perms_from_users_groups,
476 itertools.groupby(user_perms_from_users_groups,
477 lambda _x: _x.users_group)]
477 lambda _x: _x.users_group)]
478
478
479 for gr, perms in _explicit_grouped_perms:
479 for gr, perms in _explicit_grouped_perms:
480 # since user can be in multiple groups iterate over them and
480 # since user can be in multiple groups iterate over them and
481 # select the lowest permissions first (more explicit)
481 # select the lowest permissions first (more explicit)
482 # TODO: marcink: do this^^
482 # TODO: marcink: do this^^
483
483
484 # group doesn't inherit default permissions so we actually set them
484 # group doesn't inherit default permissions so we actually set them
485 if not gr.inherit_default_permissions:
485 if not gr.inherit_default_permissions:
486 # NEED TO IGNORE all previously set configurable permissions
486 # NEED TO IGNORE all previously set configurable permissions
487 # and replace them with explicitly set from this user
487 # and replace them with explicitly set from this user
488 # group permissions
488 # group permissions
489 self.permissions_global = self.permissions_global.difference(
489 self.permissions_global = self.permissions_global.difference(
490 _configurable)
490 _configurable)
491 for perm in perms:
491 for perm in perms:
492 self.permissions_global.add(perm.permission.permission_name)
492 self.permissions_global.add(perm.permission.permission_name)
493
493
494 # user explicit global permissions
494 # user explicit global permissions
495 user_perms = Session().query(UserToPerm)\
495 user_perms = Session().query(UserToPerm)\
496 .options(joinedload(UserToPerm.permission))\
496 .options(joinedload(UserToPerm.permission))\
497 .filter(UserToPerm.user_id == self.user_id).all()
497 .filter(UserToPerm.user_id == self.user_id).all()
498
498
499 if not self.inherit_default_permissions:
499 if not self.inherit_default_permissions:
500 # NEED TO IGNORE all configurable permissions and
500 # NEED TO IGNORE all configurable permissions and
501 # replace them with explicitly set from this user permissions
501 # replace them with explicitly set from this user permissions
502 self.permissions_global = self.permissions_global.difference(
502 self.permissions_global = self.permissions_global.difference(
503 _configurable)
503 _configurable)
504 for perm in user_perms:
504 for perm in user_perms:
505 self.permissions_global.add(perm.permission.permission_name)
505 self.permissions_global.add(perm.permission.permission_name)
506
506
507 def _calculate_default_permissions(self):
507 def _calculate_default_permissions(self):
508 """
508 """
509 Set default user permissions for repositories, repository groups
509 Set default user permissions for repositories, repository groups
510 taken from the default user.
510 taken from the default user.
511
511
512 Calculate inheritance of object permissions based on what we have now
512 Calculate inheritance of object permissions based on what we have now
513 in GLOBAL permissions. We check if .false is in GLOBAL since this is
513 in GLOBAL permissions. We check if .false is in GLOBAL since this is
514 explicitly set. Inherit is the opposite of .false being there.
514 explicitly set. Inherit is the opposite of .false being there.
515
515
516 .. note::
516 .. note::
517
517
518 the syntax is little bit odd but what we need to check here is
518 the syntax is little bit odd but what we need to check here is
519 the opposite of .false permission being in the list so even for
519 the opposite of .false permission being in the list so even for
520 inconsistent state when both .true/.false is there
520 inconsistent state when both .true/.false is there
521 .false is more important
521 .false is more important
522
522
523 """
523 """
524 user_inherit_object_permissions = not ('hg.inherit_default_perms.false'
524 user_inherit_object_permissions = not ('hg.inherit_default_perms.false'
525 in self.permissions_global)
525 in self.permissions_global)
526
526
527 # defaults for repositories, taken from `default` user permissions
527 # defaults for repositories, taken from `default` user permissions
528 # on given repo
528 # on given repo
529 for perm in self.default_repo_perms:
529 for perm in self.default_repo_perms:
530 r_k = perm.UserRepoToPerm.repository.repo_name
530 r_k = perm.UserRepoToPerm.repository.repo_name
531 o = PermOrigin.REPO_DEFAULT
531 o = PermOrigin.REPO_DEFAULT
532 if perm.Repository.private and not (
532 if perm.Repository.private and not (
533 perm.Repository.user_id == self.user_id):
533 perm.Repository.user_id == self.user_id):
534 # disable defaults for private repos,
534 # disable defaults for private repos,
535 p = 'repository.none'
535 p = 'repository.none'
536 o = PermOrigin.REPO_PRIVATE
536 o = PermOrigin.REPO_PRIVATE
537 elif perm.Repository.user_id == self.user_id:
537 elif perm.Repository.user_id == self.user_id:
538 # set admin if owner
538 # set admin if owner
539 p = 'repository.admin'
539 p = 'repository.admin'
540 o = PermOrigin.REPO_OWNER
540 o = PermOrigin.REPO_OWNER
541 else:
541 else:
542 p = perm.Permission.permission_name
542 p = perm.Permission.permission_name
543 # if we decide this user isn't inheriting permissions from
543 # if we decide this user isn't inheriting permissions from
544 # default user we set him to .none so only explicit
544 # default user we set him to .none so only explicit
545 # permissions work
545 # permissions work
546 if not user_inherit_object_permissions:
546 if not user_inherit_object_permissions:
547 p = 'repository.none'
547 p = 'repository.none'
548 self.permissions_repositories[r_k] = p, o
548 self.permissions_repositories[r_k] = p, o
549
549
550 # defaults for repository groups taken from `default` user permission
550 # defaults for repository groups taken from `default` user permission
551 # on given group
551 # on given group
552 for perm in self.default_repo_groups_perms:
552 for perm in self.default_repo_groups_perms:
553 rg_k = perm.UserRepoGroupToPerm.group.group_name
553 rg_k = perm.UserRepoGroupToPerm.group.group_name
554 o = PermOrigin.REPOGROUP_DEFAULT
554 o = PermOrigin.REPOGROUP_DEFAULT
555 if perm.RepoGroup.user_id == self.user_id:
555 if perm.RepoGroup.user_id == self.user_id:
556 # set admin if owner
556 # set admin if owner
557 p = 'group.admin'
557 p = 'group.admin'
558 o = PermOrigin.REPOGROUP_OWNER
558 o = PermOrigin.REPOGROUP_OWNER
559 else:
559 else:
560 p = perm.Permission.permission_name
560 p = perm.Permission.permission_name
561
561
562 # if we decide this user isn't inheriting permissions from default
562 # if we decide this user isn't inheriting permissions from default
563 # user we set him to .none so only explicit permissions work
563 # user we set him to .none so only explicit permissions work
564 if not user_inherit_object_permissions:
564 if not user_inherit_object_permissions:
565 p = 'group.none'
565 p = 'group.none'
566 self.permissions_repository_groups[rg_k] = p, o
566 self.permissions_repository_groups[rg_k] = p, o
567
567
568 # defaults for user groups taken from `default` user permission
568 # defaults for user groups taken from `default` user permission
569 # on given user group
569 # on given user group
570 for perm in self.default_user_group_perms:
570 for perm in self.default_user_group_perms:
571 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
571 u_k = perm.UserUserGroupToPerm.user_group.users_group_name
572 o = PermOrigin.USERGROUP_DEFAULT
572 o = PermOrigin.USERGROUP_DEFAULT
573 if perm.UserGroup.user_id == self.user_id:
573 if perm.UserGroup.user_id == self.user_id:
574 # set admin if owner
574 # set admin if owner
575 p = 'usergroup.admin'
575 p = 'usergroup.admin'
576 o = PermOrigin.USERGROUP_OWNER
576 o = PermOrigin.USERGROUP_OWNER
577 else:
577 else:
578 p = perm.Permission.permission_name
578 p = perm.Permission.permission_name
579
579
580 # if we decide this user isn't inheriting permissions from default
580 # if we decide this user isn't inheriting permissions from default
581 # user we set him to .none so only explicit permissions work
581 # user we set him to .none so only explicit permissions work
582 if not user_inherit_object_permissions:
582 if not user_inherit_object_permissions:
583 p = 'usergroup.none'
583 p = 'usergroup.none'
584 self.permissions_user_groups[u_k] = p, o
584 self.permissions_user_groups[u_k] = p, o
585
585
586 def _calculate_repository_permissions(self):
586 def _calculate_repository_permissions(self):
587 """
587 """
588 Repository permissions for the current user.
588 Repository permissions for the current user.
589
589
590 Check if the user is part of user groups for this repository and
590 Check if the user is part of user groups for this repository and
591 fill in the permission from it. `_choose_permission` decides of which
591 fill in the permission from it. `_choose_permission` decides of which
592 permission should be selected based on selected method.
592 permission should be selected based on selected method.
593 """
593 """
594
594
595 # user group for repositories permissions
595 # user group for repositories permissions
596 user_repo_perms_from_user_group = Permission\
596 user_repo_perms_from_user_group = Permission\
597 .get_default_repo_perms_from_user_group(
597 .get_default_repo_perms_from_user_group(
598 self.user_id, self.scope_repo_id)
598 self.user_id, self.scope_repo_id)
599
599
600 multiple_counter = collections.defaultdict(int)
600 multiple_counter = collections.defaultdict(int)
601 for perm in user_repo_perms_from_user_group:
601 for perm in user_repo_perms_from_user_group:
602 r_k = perm.UserGroupRepoToPerm.repository.repo_name
602 r_k = perm.UserGroupRepoToPerm.repository.repo_name
603 ug_k = perm.UserGroupRepoToPerm.users_group.users_group_name
603 ug_k = perm.UserGroupRepoToPerm.users_group.users_group_name
604 multiple_counter[r_k] += 1
604 multiple_counter[r_k] += 1
605 p = perm.Permission.permission_name
605 p = perm.Permission.permission_name
606 o = PermOrigin.REPO_USERGROUP % ug_k
606 o = PermOrigin.REPO_USERGROUP % ug_k
607
607
608 if perm.Repository.user_id == self.user_id:
608 if perm.Repository.user_id == self.user_id:
609 # set admin if owner
609 # set admin if owner
610 p = 'repository.admin'
610 p = 'repository.admin'
611 o = PermOrigin.REPO_OWNER
611 o = PermOrigin.REPO_OWNER
612 else:
612 else:
613 if multiple_counter[r_k] > 1:
613 if multiple_counter[r_k] > 1:
614 cur_perm = self.permissions_repositories[r_k]
614 cur_perm = self.permissions_repositories[r_k]
615 p = self._choose_permission(p, cur_perm)
615 p = self._choose_permission(p, cur_perm)
616 self.permissions_repositories[r_k] = p, o
616 self.permissions_repositories[r_k] = p, o
617
617
618 # user explicit permissions for repositories, overrides any specified
618 # user explicit permissions for repositories, overrides any specified
619 # by the group permission
619 # by the group permission
620 user_repo_perms = Permission.get_default_repo_perms(
620 user_repo_perms = Permission.get_default_repo_perms(
621 self.user_id, self.scope_repo_id)
621 self.user_id, self.scope_repo_id)
622 for perm in user_repo_perms:
622 for perm in user_repo_perms:
623 r_k = perm.UserRepoToPerm.repository.repo_name
623 r_k = perm.UserRepoToPerm.repository.repo_name
624 o = PermOrigin.REPO_USER % perm.UserRepoToPerm.user.username
624 o = PermOrigin.REPO_USER % perm.UserRepoToPerm.user.username
625 # set admin if owner
625 # set admin if owner
626 if perm.Repository.user_id == self.user_id:
626 if perm.Repository.user_id == self.user_id:
627 p = 'repository.admin'
627 p = 'repository.admin'
628 o = PermOrigin.REPO_OWNER
628 o = PermOrigin.REPO_OWNER
629 else:
629 else:
630 p = perm.Permission.permission_name
630 p = perm.Permission.permission_name
631 if not self.explicit:
631 if not self.explicit:
632 cur_perm = self.permissions_repositories.get(
632 cur_perm = self.permissions_repositories.get(
633 r_k, 'repository.none')
633 r_k, 'repository.none')
634 p = self._choose_permission(p, cur_perm)
634 p = self._choose_permission(p, cur_perm)
635 self.permissions_repositories[r_k] = p, o
635 self.permissions_repositories[r_k] = p, o
636
636
637 def _calculate_repository_group_permissions(self):
637 def _calculate_repository_group_permissions(self):
638 """
638 """
639 Repository group permissions for the current user.
639 Repository group permissions for the current user.
640
640
641 Check if the user is part of user groups for repository groups and
641 Check if the user is part of user groups for repository groups and
642 fill in the permissions from it. `_choose_permmission` decides of which
642 fill in the permissions from it. `_choose_permmission` decides of which
643 permission should be selected based on selected method.
643 permission should be selected based on selected method.
644 """
644 """
645 # user group for repo groups permissions
645 # user group for repo groups permissions
646 user_repo_group_perms_from_user_group = Permission\
646 user_repo_group_perms_from_user_group = Permission\
647 .get_default_group_perms_from_user_group(
647 .get_default_group_perms_from_user_group(
648 self.user_id, self.scope_repo_group_id)
648 self.user_id, self.scope_repo_group_id)
649
649
650 multiple_counter = collections.defaultdict(int)
650 multiple_counter = collections.defaultdict(int)
651 for perm in user_repo_group_perms_from_user_group:
651 for perm in user_repo_group_perms_from_user_group:
652 g_k = perm.UserGroupRepoGroupToPerm.group.group_name
652 g_k = perm.UserGroupRepoGroupToPerm.group.group_name
653 ug_k = perm.UserGroupRepoGroupToPerm.users_group.users_group_name
653 ug_k = perm.UserGroupRepoGroupToPerm.users_group.users_group_name
654 o = PermOrigin.REPOGROUP_USERGROUP % ug_k
654 o = PermOrigin.REPOGROUP_USERGROUP % ug_k
655 multiple_counter[g_k] += 1
655 multiple_counter[g_k] += 1
656 p = perm.Permission.permission_name
656 p = perm.Permission.permission_name
657 if perm.RepoGroup.user_id == self.user_id:
657 if perm.RepoGroup.user_id == self.user_id:
658 # set admin if owner, even for member of other user group
658 # set admin if owner, even for member of other user group
659 p = 'group.admin'
659 p = 'group.admin'
660 o = PermOrigin.REPOGROUP_OWNER
660 o = PermOrigin.REPOGROUP_OWNER
661 else:
661 else:
662 if multiple_counter[g_k] > 1:
662 if multiple_counter[g_k] > 1:
663 cur_perm = self.permissions_repository_groups[g_k]
663 cur_perm = self.permissions_repository_groups[g_k]
664 p = self._choose_permission(p, cur_perm)
664 p = self._choose_permission(p, cur_perm)
665 self.permissions_repository_groups[g_k] = p, o
665 self.permissions_repository_groups[g_k] = p, o
666
666
667 # user explicit permissions for repository groups
667 # user explicit permissions for repository groups
668 user_repo_groups_perms = Permission.get_default_group_perms(
668 user_repo_groups_perms = Permission.get_default_group_perms(
669 self.user_id, self.scope_repo_group_id)
669 self.user_id, self.scope_repo_group_id)
670 for perm in user_repo_groups_perms:
670 for perm in user_repo_groups_perms:
671 rg_k = perm.UserRepoGroupToPerm.group.group_name
671 rg_k = perm.UserRepoGroupToPerm.group.group_name
672 u_k = perm.UserRepoGroupToPerm.user.username
672 u_k = perm.UserRepoGroupToPerm.user.username
673 o = PermOrigin.REPOGROUP_USER % u_k
673 o = PermOrigin.REPOGROUP_USER % u_k
674
674
675 if perm.RepoGroup.user_id == self.user_id:
675 if perm.RepoGroup.user_id == self.user_id:
676 # set admin if owner
676 # set admin if owner
677 p = 'group.admin'
677 p = 'group.admin'
678 o = PermOrigin.REPOGROUP_OWNER
678 o = PermOrigin.REPOGROUP_OWNER
679 else:
679 else:
680 p = perm.Permission.permission_name
680 p = perm.Permission.permission_name
681 if not self.explicit:
681 if not self.explicit:
682 cur_perm = self.permissions_repository_groups.get(
682 cur_perm = self.permissions_repository_groups.get(
683 rg_k, 'group.none')
683 rg_k, 'group.none')
684 p = self._choose_permission(p, cur_perm)
684 p = self._choose_permission(p, cur_perm)
685 self.permissions_repository_groups[rg_k] = p, o
685 self.permissions_repository_groups[rg_k] = p, o
686
686
687 def _calculate_user_group_permissions(self):
687 def _calculate_user_group_permissions(self):
688 """
688 """
689 User group permissions for the current user.
689 User group permissions for the current user.
690 """
690 """
691 # user group for user group permissions
691 # user group for user group permissions
692 user_group_from_user_group = Permission\
692 user_group_from_user_group = Permission\
693 .get_default_user_group_perms_from_user_group(
693 .get_default_user_group_perms_from_user_group(
694 self.user_id, self.scope_user_group_id)
694 self.user_id, self.scope_user_group_id)
695
695
696 multiple_counter = collections.defaultdict(int)
696 multiple_counter = collections.defaultdict(int)
697 for perm in user_group_from_user_group:
697 for perm in user_group_from_user_group:
698 g_k = perm.UserGroupUserGroupToPerm\
698 g_k = perm.UserGroupUserGroupToPerm\
699 .target_user_group.users_group_name
699 .target_user_group.users_group_name
700 u_k = perm.UserGroupUserGroupToPerm\
700 u_k = perm.UserGroupUserGroupToPerm\
701 .user_group.users_group_name
701 .user_group.users_group_name
702 o = PermOrigin.USERGROUP_USERGROUP % u_k
702 o = PermOrigin.USERGROUP_USERGROUP % u_k
703 multiple_counter[g_k] += 1
703 multiple_counter[g_k] += 1
704 p = perm.Permission.permission_name
704 p = perm.Permission.permission_name
705
705
706 if perm.UserGroup.user_id == self.user_id:
706 if perm.UserGroup.user_id == self.user_id:
707 # set admin if owner, even for member of other user group
707 # set admin if owner, even for member of other user group
708 p = 'usergroup.admin'
708 p = 'usergroup.admin'
709 o = PermOrigin.USERGROUP_OWNER
709 o = PermOrigin.USERGROUP_OWNER
710 else:
710 else:
711 if multiple_counter[g_k] > 1:
711 if multiple_counter[g_k] > 1:
712 cur_perm = self.permissions_user_groups[g_k]
712 cur_perm = self.permissions_user_groups[g_k]
713 p = self._choose_permission(p, cur_perm)
713 p = self._choose_permission(p, cur_perm)
714 self.permissions_user_groups[g_k] = p, o
714 self.permissions_user_groups[g_k] = p, o
715
715
716 # user explicit permission for user groups
716 # user explicit permission for user groups
717 user_user_groups_perms = Permission.get_default_user_group_perms(
717 user_user_groups_perms = Permission.get_default_user_group_perms(
718 self.user_id, self.scope_user_group_id)
718 self.user_id, self.scope_user_group_id)
719 for perm in user_user_groups_perms:
719 for perm in user_user_groups_perms:
720 ug_k = perm.UserUserGroupToPerm.user_group.users_group_name
720 ug_k = perm.UserUserGroupToPerm.user_group.users_group_name
721 u_k = perm.UserUserGroupToPerm.user.username
721 u_k = perm.UserUserGroupToPerm.user.username
722 o = PermOrigin.USERGROUP_USER % u_k
722 o = PermOrigin.USERGROUP_USER % u_k
723
723
724 if perm.UserGroup.user_id == self.user_id:
724 if perm.UserGroup.user_id == self.user_id:
725 # set admin if owner
725 # set admin if owner
726 p = 'usergroup.admin'
726 p = 'usergroup.admin'
727 o = PermOrigin.USERGROUP_OWNER
727 o = PermOrigin.USERGROUP_OWNER
728 else:
728 else:
729 p = perm.Permission.permission_name
729 p = perm.Permission.permission_name
730 if not self.explicit:
730 if not self.explicit:
731 cur_perm = self.permissions_user_groups.get(
731 cur_perm = self.permissions_user_groups.get(
732 ug_k, 'usergroup.none')
732 ug_k, 'usergroup.none')
733 p = self._choose_permission(p, cur_perm)
733 p = self._choose_permission(p, cur_perm)
734 self.permissions_user_groups[ug_k] = p, o
734 self.permissions_user_groups[ug_k] = p, o
735
735
736 def _choose_permission(self, new_perm, cur_perm):
736 def _choose_permission(self, new_perm, cur_perm):
737 new_perm_val = Permission.PERM_WEIGHTS[new_perm]
737 new_perm_val = Permission.PERM_WEIGHTS[new_perm]
738 cur_perm_val = Permission.PERM_WEIGHTS[cur_perm]
738 cur_perm_val = Permission.PERM_WEIGHTS[cur_perm]
739 if self.algo == 'higherwin':
739 if self.algo == 'higherwin':
740 if new_perm_val > cur_perm_val:
740 if new_perm_val > cur_perm_val:
741 return new_perm
741 return new_perm
742 return cur_perm
742 return cur_perm
743 elif self.algo == 'lowerwin':
743 elif self.algo == 'lowerwin':
744 if new_perm_val < cur_perm_val:
744 if new_perm_val < cur_perm_val:
745 return new_perm
745 return new_perm
746 return cur_perm
746 return cur_perm
747
747
748 def _permission_structure(self):
748 def _permission_structure(self):
749 return {
749 return {
750 'global': self.permissions_global,
750 'global': self.permissions_global,
751 'repositories': self.permissions_repositories,
751 'repositories': self.permissions_repositories,
752 'repositories_groups': self.permissions_repository_groups,
752 'repositories_groups': self.permissions_repository_groups,
753 'user_groups': self.permissions_user_groups,
753 'user_groups': self.permissions_user_groups,
754 }
754 }
755
755
756
756
757 def allowed_auth_token_access(controller_name, whitelist=None, auth_token=None):
757 def allowed_auth_token_access(controller_name, whitelist=None, auth_token=None):
758 """
758 """
759 Check if given controller_name is in whitelist of auth token access
759 Check if given controller_name is in whitelist of auth token access
760 """
760 """
761 if not whitelist:
761 if not whitelist:
762 from rhodecode import CONFIG
762 from rhodecode import CONFIG
763 whitelist = aslist(
763 whitelist = aslist(
764 CONFIG.get('api_access_controllers_whitelist'), sep=',')
764 CONFIG.get('api_access_controllers_whitelist'), sep=',')
765 log.debug(
765 log.debug(
766 'Allowed controllers for AUTH TOKEN access: %s' % (whitelist,))
766 'Allowed controllers for AUTH TOKEN access: %s' % (whitelist,))
767
767
768 auth_token_access_valid = False
768 auth_token_access_valid = False
769 for entry in whitelist:
769 for entry in whitelist:
770 if fnmatch.fnmatch(controller_name, entry):
770 if fnmatch.fnmatch(controller_name, entry):
771 auth_token_access_valid = True
771 auth_token_access_valid = True
772 break
772 break
773
773
774 if auth_token_access_valid:
774 if auth_token_access_valid:
775 log.debug('controller:%s matches entry in whitelist'
775 log.debug('controller:%s matches entry in whitelist'
776 % (controller_name,))
776 % (controller_name,))
777 else:
777 else:
778 msg = ('controller: %s does *NOT* match any entry in whitelist'
778 msg = ('controller: %s does *NOT* match any entry in whitelist'
779 % (controller_name,))
779 % (controller_name,))
780 if auth_token:
780 if auth_token:
781 # if we use auth token key and don't have access it's a warning
781 # if we use auth token key and don't have access it's a warning
782 log.warning(msg)
782 log.warning(msg)
783 else:
783 else:
784 log.debug(msg)
784 log.debug(msg)
785
785
786 return auth_token_access_valid
786 return auth_token_access_valid
787
787
788
788
789 class AuthUser(object):
789 class AuthUser(object):
790 """
790 """
791 A simple object that handles all attributes of user in RhodeCode
791 A simple object that handles all attributes of user in RhodeCode
792
792
793 It does lookup based on API key,given user, or user present in session
793 It does lookup based on API key,given user, or user present in session
794 Then it fills all required information for such user. It also checks if
794 Then it fills all required information for such user. It also checks if
795 anonymous access is enabled and if so, it returns default user as logged in
795 anonymous access is enabled and if so, it returns default user as logged in
796 """
796 """
797 GLOBAL_PERMS = [x[0] for x in Permission.PERMS]
797 GLOBAL_PERMS = [x[0] for x in Permission.PERMS]
798
798
799 def __init__(self, user_id=None, api_key=None, username=None, ip_addr=None):
799 def __init__(self, user_id=None, api_key=None, username=None, ip_addr=None):
800
800
801 self.user_id = user_id
801 self.user_id = user_id
802 self._api_key = api_key
802 self._api_key = api_key
803
803
804 self.api_key = None
804 self.api_key = None
805 self.feed_token = ''
805 self.feed_token = ''
806 self.username = username
806 self.username = username
807 self.ip_addr = ip_addr
807 self.ip_addr = ip_addr
808 self.name = ''
808 self.name = ''
809 self.lastname = ''
809 self.lastname = ''
810 self.first_name = ''
810 self.first_name = ''
811 self.last_name = ''
811 self.last_name = ''
812 self.email = ''
812 self.email = ''
813 self.is_authenticated = False
813 self.is_authenticated = False
814 self.admin = False
814 self.admin = False
815 self.inherit_default_permissions = False
815 self.inherit_default_permissions = False
816 self.password = ''
816 self.password = ''
817
817
818 self.anonymous_user = None # propagated on propagate_data
818 self.anonymous_user = None # propagated on propagate_data
819 self.propagate_data()
819 self.propagate_data()
820 self._instance = None
820 self._instance = None
821 self._permissions_scoped_cache = {} # used to bind scoped calculation
821 self._permissions_scoped_cache = {} # used to bind scoped calculation
822
822
823 @LazyProperty
823 @LazyProperty
824 def permissions(self):
824 def permissions(self):
825 return self.get_perms(user=self, cache=False)
825 return self.get_perms(user=self, cache=False)
826
826
827 def permissions_with_scope(self, scope):
827 def permissions_with_scope(self, scope):
828 """
828 """
829 Call the get_perms function with scoped data. The scope in that function
829 Call the get_perms function with scoped data. The scope in that function
830 narrows the SQL calls to the given ID of objects resulting in fetching
830 narrows the SQL calls to the given ID of objects resulting in fetching
831 Just particular permission we want to obtain. If scope is an empty dict
831 Just particular permission we want to obtain. If scope is an empty dict
832 then it basically narrows the scope to GLOBAL permissions only.
832 then it basically narrows the scope to GLOBAL permissions only.
833
833
834 :param scope: dict
834 :param scope: dict
835 """
835 """
836 if 'repo_name' in scope:
836 if 'repo_name' in scope:
837 obj = Repository.get_by_repo_name(scope['repo_name'])
837 obj = Repository.get_by_repo_name(scope['repo_name'])
838 if obj:
838 if obj:
839 scope['repo_id'] = obj.repo_id
839 scope['repo_id'] = obj.repo_id
840 _scope = {
840 _scope = {
841 'repo_id': -1,
841 'repo_id': -1,
842 'user_group_id': -1,
842 'user_group_id': -1,
843 'repo_group_id': -1,
843 'repo_group_id': -1,
844 }
844 }
845 _scope.update(scope)
845 _scope.update(scope)
846 cache_key = "_".join(map(safe_str, reduce(lambda a, b: a+b,
846 cache_key = "_".join(map(safe_str, reduce(lambda a, b: a+b,
847 _scope.items())))
847 _scope.items())))
848 if cache_key not in self._permissions_scoped_cache:
848 if cache_key not in self._permissions_scoped_cache:
849 # store in cache to mimic how the @LazyProperty works,
849 # store in cache to mimic how the @LazyProperty works,
850 # the difference here is that we use the unique key calculated
850 # the difference here is that we use the unique key calculated
851 # from params and values
851 # from params and values
852 res = self.get_perms(user=self, cache=False, scope=_scope)
852 res = self.get_perms(user=self, cache=False, scope=_scope)
853 self._permissions_scoped_cache[cache_key] = res
853 self._permissions_scoped_cache[cache_key] = res
854 return self._permissions_scoped_cache[cache_key]
854 return self._permissions_scoped_cache[cache_key]
855
855
856 def get_instance(self):
856 def get_instance(self):
857 return User.get(self.user_id)
857 return User.get(self.user_id)
858
858
859 def update_lastactivity(self):
859 def update_lastactivity(self):
860 if self.user_id:
860 if self.user_id:
861 User.get(self.user_id).update_lastactivity()
861 User.get(self.user_id).update_lastactivity()
862
862
863 def propagate_data(self):
863 def propagate_data(self):
864 """
864 """
865 Fills in user data and propagates values to this instance. Maps fetched
865 Fills in user data and propagates values to this instance. Maps fetched
866 user attributes to this class instance attributes
866 user attributes to this class instance attributes
867 """
867 """
868 log.debug('starting data propagation for new potential AuthUser')
868 log.debug('starting data propagation for new potential AuthUser')
869 user_model = UserModel()
869 user_model = UserModel()
870 anon_user = self.anonymous_user = User.get_default_user(cache=True)
870 anon_user = self.anonymous_user = User.get_default_user(cache=True)
871 is_user_loaded = False
871 is_user_loaded = False
872
872
873 # lookup by userid
873 # lookup by userid
874 if self.user_id is not None and self.user_id != anon_user.user_id:
874 if self.user_id is not None and self.user_id != anon_user.user_id:
875 log.debug('Trying Auth User lookup by USER ID: `%s`' % self.user_id)
875 log.debug('Trying Auth User lookup by USER ID: `%s`' % self.user_id)
876 is_user_loaded = user_model.fill_data(self, user_id=self.user_id)
876 is_user_loaded = user_model.fill_data(self, user_id=self.user_id)
877
877
878 # try go get user by api key
878 # try go get user by api key
879 elif self._api_key and self._api_key != anon_user.api_key:
879 elif self._api_key and self._api_key != anon_user.api_key:
880 log.debug('Trying Auth User lookup by API KEY: `%s`' % self._api_key)
880 log.debug('Trying Auth User lookup by API KEY: `%s`' % self._api_key)
881 is_user_loaded = user_model.fill_data(self, api_key=self._api_key)
881 is_user_loaded = user_model.fill_data(self, api_key=self._api_key)
882
882
883 # lookup by username
883 # lookup by username
884 elif self.username:
884 elif self.username:
885 log.debug('Trying Auth User lookup by USER NAME: `%s`' % self.username)
885 log.debug('Trying Auth User lookup by USER NAME: `%s`' % self.username)
886 is_user_loaded = user_model.fill_data(self, username=self.username)
886 is_user_loaded = user_model.fill_data(self, username=self.username)
887 else:
887 else:
888 log.debug('No data in %s that could been used to log in' % self)
888 log.debug('No data in %s that could been used to log in' % self)
889
889
890 if not is_user_loaded:
890 if not is_user_loaded:
891 log.debug('Failed to load user. Fallback to default user')
891 log.debug('Failed to load user. Fallback to default user')
892 # if we cannot authenticate user try anonymous
892 # if we cannot authenticate user try anonymous
893 if anon_user.active:
893 if anon_user.active:
894 user_model.fill_data(self, user_id=anon_user.user_id)
894 user_model.fill_data(self, user_id=anon_user.user_id)
895 # then we set this user is logged in
895 # then we set this user is logged in
896 self.is_authenticated = True
896 self.is_authenticated = True
897 else:
897 else:
898 # in case of disabled anonymous user we reset some of the
898 # in case of disabled anonymous user we reset some of the
899 # parameters so such user is "corrupted", skipping the fill_data
899 # parameters so such user is "corrupted", skipping the fill_data
900 for attr in ['user_id', 'username', 'admin', 'active']:
900 for attr in ['user_id', 'username', 'admin', 'active']:
901 setattr(self, attr, None)
901 setattr(self, attr, None)
902 self.is_authenticated = False
902 self.is_authenticated = False
903
903
904 if not self.username:
904 if not self.username:
905 self.username = 'None'
905 self.username = 'None'
906
906
907 log.debug('Auth User is now %s' % self)
907 log.debug('Auth User is now %s' % self)
908
908
909 def get_perms(self, user, scope=None, explicit=True, algo='higherwin',
909 def get_perms(self, user, scope=None, explicit=True, algo='higherwin',
910 cache=False):
910 cache=False):
911 """
911 """
912 Fills user permission attribute with permissions taken from database
912 Fills user permission attribute with permissions taken from database
913 works for permissions given for repositories, and for permissions that
913 works for permissions given for repositories, and for permissions that
914 are granted to groups
914 are granted to groups
915
915
916 :param user: instance of User object from database
916 :param user: instance of User object from database
917 :param explicit: In case there are permissions both for user and a group
917 :param explicit: In case there are permissions both for user and a group
918 that user is part of, explicit flag will defiine if user will
918 that user is part of, explicit flag will defiine if user will
919 explicitly override permissions from group, if it's False it will
919 explicitly override permissions from group, if it's False it will
920 make decision based on the algo
920 make decision based on the algo
921 :param algo: algorithm to decide what permission should be choose if
921 :param algo: algorithm to decide what permission should be choose if
922 it's multiple defined, eg user in two different groups. It also
922 it's multiple defined, eg user in two different groups. It also
923 decides if explicit flag is turned off how to specify the permission
923 decides if explicit flag is turned off how to specify the permission
924 for case when user is in a group + have defined separate permission
924 for case when user is in a group + have defined separate permission
925 """
925 """
926 user_id = user.user_id
926 user_id = user.user_id
927 user_is_admin = user.is_admin
927 user_is_admin = user.is_admin
928
928
929 # inheritance of global permissions like create repo/fork repo etc
929 # inheritance of global permissions like create repo/fork repo etc
930 user_inherit_default_permissions = user.inherit_default_permissions
930 user_inherit_default_permissions = user.inherit_default_permissions
931
931
932 log.debug('Computing PERMISSION tree for scope %s' % (scope, ))
932 log.debug('Computing PERMISSION tree for scope %s' % (scope, ))
933 compute = caches.conditional_cache(
933 compute = caches.conditional_cache(
934 'short_term', 'cache_desc',
934 'short_term', 'cache_desc',
935 condition=cache, func=_cached_perms_data)
935 condition=cache, func=_cached_perms_data)
936 result = compute(user_id, scope, user_is_admin,
936 result = compute(user_id, scope, user_is_admin,
937 user_inherit_default_permissions, explicit, algo)
937 user_inherit_default_permissions, explicit, algo)
938
938
939 result_repr = []
939 result_repr = []
940 for k in result:
940 for k in result:
941 result_repr.append((k, len(result[k])))
941 result_repr.append((k, len(result[k])))
942
942
943 log.debug('PERMISSION tree computed %s' % (result_repr,))
943 log.debug('PERMISSION tree computed %s' % (result_repr,))
944 return result
944 return result
945
945
946 @property
946 @property
947 def is_default(self):
947 def is_default(self):
948 return self.username == User.DEFAULT_USER
948 return self.username == User.DEFAULT_USER
949
949
950 @property
950 @property
951 def is_admin(self):
951 def is_admin(self):
952 return self.admin
952 return self.admin
953
953
954 @property
954 @property
955 def is_user_object(self):
955 def is_user_object(self):
956 return self.user_id is not None
956 return self.user_id is not None
957
957
958 @property
958 @property
959 def repositories_admin(self):
959 def repositories_admin(self):
960 """
960 """
961 Returns list of repositories you're an admin of
961 Returns list of repositories you're an admin of
962 """
962 """
963 return [
963 return [
964 x[0] for x in self.permissions['repositories'].iteritems()
964 x[0] for x in self.permissions['repositories'].iteritems()
965 if x[1] == 'repository.admin']
965 if x[1] == 'repository.admin']
966
966
967 @property
967 @property
968 def repository_groups_admin(self):
968 def repository_groups_admin(self):
969 """
969 """
970 Returns list of repository groups you're an admin of
970 Returns list of repository groups you're an admin of
971 """
971 """
972 return [
972 return [
973 x[0] for x in self.permissions['repositories_groups'].iteritems()
973 x[0] for x in self.permissions['repositories_groups'].iteritems()
974 if x[1] == 'group.admin']
974 if x[1] == 'group.admin']
975
975
976 @property
976 @property
977 def user_groups_admin(self):
977 def user_groups_admin(self):
978 """
978 """
979 Returns list of user groups you're an admin of
979 Returns list of user groups you're an admin of
980 """
980 """
981 return [
981 return [
982 x[0] for x in self.permissions['user_groups'].iteritems()
982 x[0] for x in self.permissions['user_groups'].iteritems()
983 if x[1] == 'usergroup.admin']
983 if x[1] == 'usergroup.admin']
984
984
985 @property
985 @property
986 def ip_allowed(self):
986 def ip_allowed(self):
987 """
987 """
988 Checks if ip_addr used in constructor is allowed from defined list of
988 Checks if ip_addr used in constructor is allowed from defined list of
989 allowed ip_addresses for user
989 allowed ip_addresses for user
990
990
991 :returns: boolean, True if ip is in allowed ip range
991 :returns: boolean, True if ip is in allowed ip range
992 """
992 """
993 # check IP
993 # check IP
994 inherit = self.inherit_default_permissions
994 inherit = self.inherit_default_permissions
995 return AuthUser.check_ip_allowed(self.user_id, self.ip_addr,
995 return AuthUser.check_ip_allowed(self.user_id, self.ip_addr,
996 inherit_from_default=inherit)
996 inherit_from_default=inherit)
997 @property
997 @property
998 def personal_repo_group(self):
998 def personal_repo_group(self):
999 return RepoGroup.get_user_personal_repo_group(self.user_id)
999 return RepoGroup.get_user_personal_repo_group(self.user_id)
1000
1000
1001 @classmethod
1001 @classmethod
1002 def check_ip_allowed(cls, user_id, ip_addr, inherit_from_default):
1002 def check_ip_allowed(cls, user_id, ip_addr, inherit_from_default):
1003 allowed_ips = AuthUser.get_allowed_ips(
1003 allowed_ips = AuthUser.get_allowed_ips(
1004 user_id, cache=True, inherit_from_default=inherit_from_default)
1004 user_id, cache=True, inherit_from_default=inherit_from_default)
1005 if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
1005 if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
1006 log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
1006 log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
1007 return True
1007 return True
1008 else:
1008 else:
1009 log.info('Access for IP:%s forbidden, '
1009 log.info('Access for IP:%s forbidden, '
1010 'not in %s' % (ip_addr, allowed_ips))
1010 'not in %s' % (ip_addr, allowed_ips))
1011 return False
1011 return False
1012
1012
1013 def __repr__(self):
1013 def __repr__(self):
1014 return "<AuthUser('id:%s[%s] ip:%s auth:%s')>"\
1014 return "<AuthUser('id:%s[%s] ip:%s auth:%s')>"\
1015 % (self.user_id, self.username, self.ip_addr, self.is_authenticated)
1015 % (self.user_id, self.username, self.ip_addr, self.is_authenticated)
1016
1016
1017 def set_authenticated(self, authenticated=True):
1017 def set_authenticated(self, authenticated=True):
1018 if self.user_id != self.anonymous_user.user_id:
1018 if self.user_id != self.anonymous_user.user_id:
1019 self.is_authenticated = authenticated
1019 self.is_authenticated = authenticated
1020
1020
1021 def get_cookie_store(self):
1021 def get_cookie_store(self):
1022 return {
1022 return {
1023 'username': self.username,
1023 'username': self.username,
1024 'password': md5(self.password),
1024 'password': md5(self.password),
1025 'user_id': self.user_id,
1025 'user_id': self.user_id,
1026 'is_authenticated': self.is_authenticated
1026 'is_authenticated': self.is_authenticated
1027 }
1027 }
1028
1028
1029 @classmethod
1029 @classmethod
1030 def from_cookie_store(cls, cookie_store):
1030 def from_cookie_store(cls, cookie_store):
1031 """
1031 """
1032 Creates AuthUser from a cookie store
1032 Creates AuthUser from a cookie store
1033
1033
1034 :param cls:
1034 :param cls:
1035 :param cookie_store:
1035 :param cookie_store:
1036 """
1036 """
1037 user_id = cookie_store.get('user_id')
1037 user_id = cookie_store.get('user_id')
1038 username = cookie_store.get('username')
1038 username = cookie_store.get('username')
1039 api_key = cookie_store.get('api_key')
1039 api_key = cookie_store.get('api_key')
1040 return AuthUser(user_id, api_key, username)
1040 return AuthUser(user_id, api_key, username)
1041
1041
1042 @classmethod
1042 @classmethod
1043 def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
1043 def get_allowed_ips(cls, user_id, cache=False, inherit_from_default=False):
1044 _set = set()
1044 _set = set()
1045
1045
1046 if inherit_from_default:
1046 if inherit_from_default:
1047 default_ips = UserIpMap.query().filter(
1047 default_ips = UserIpMap.query().filter(
1048 UserIpMap.user == User.get_default_user(cache=True))
1048 UserIpMap.user == User.get_default_user(cache=True))
1049 if cache:
1049 if cache:
1050 default_ips = default_ips.options(
1050 default_ips = default_ips.options(
1051 FromCache("sql_cache_short", "get_user_ips_default"))
1051 FromCache("sql_cache_short", "get_user_ips_default"))
1052
1052
1053 # populate from default user
1053 # populate from default user
1054 for ip in default_ips:
1054 for ip in default_ips:
1055 try:
1055 try:
1056 _set.add(ip.ip_addr)
1056 _set.add(ip.ip_addr)
1057 except ObjectDeletedError:
1057 except ObjectDeletedError:
1058 # since we use heavy caching sometimes it happens that
1058 # since we use heavy caching sometimes it happens that
1059 # we get deleted objects here, we just skip them
1059 # we get deleted objects here, we just skip them
1060 pass
1060 pass
1061
1061
1062 user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
1062 user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
1063 if cache:
1063 if cache:
1064 user_ips = user_ips.options(
1064 user_ips = user_ips.options(
1065 FromCache("sql_cache_short", "get_user_ips_%s" % user_id))
1065 FromCache("sql_cache_short", "get_user_ips_%s" % user_id))
1066
1066
1067 for ip in user_ips:
1067 for ip in user_ips:
1068 try:
1068 try:
1069 _set.add(ip.ip_addr)
1069 _set.add(ip.ip_addr)
1070 except ObjectDeletedError:
1070 except ObjectDeletedError:
1071 # since we use heavy caching sometimes it happens that we get
1071 # since we use heavy caching sometimes it happens that we get
1072 # deleted objects here, we just skip them
1072 # deleted objects here, we just skip them
1073 pass
1073 pass
1074 return _set or set(['0.0.0.0/0', '::/0'])
1074 return _set or set(['0.0.0.0/0', '::/0'])
1075
1075
1076
1076
1077 def set_available_permissions(config):
1077 def set_available_permissions(config):
1078 """
1078 """
1079 This function will propagate pylons globals with all available defined
1079 This function will propagate pylons globals with all available defined
1080 permission given in db. We don't want to check each time from db for new
1080 permission given in db. We don't want to check each time from db for new
1081 permissions since adding a new permission also requires application restart
1081 permissions since adding a new permission also requires application restart
1082 ie. to decorate new views with the newly created permission
1082 ie. to decorate new views with the newly created permission
1083
1083
1084 :param config: current pylons config instance
1084 :param config: current pylons config instance
1085
1085
1086 """
1086 """
1087 log.info('getting information about all available permissions')
1087 log.info('getting information about all available permissions')
1088 try:
1088 try:
1089 sa = meta.Session
1089 sa = meta.Session
1090 all_perms = sa.query(Permission).all()
1090 all_perms = sa.query(Permission).all()
1091 config['available_permissions'] = [x.permission_name for x in all_perms]
1091 config['available_permissions'] = [x.permission_name for x in all_perms]
1092 except Exception:
1092 except Exception:
1093 log.error(traceback.format_exc())
1093 log.error(traceback.format_exc())
1094 finally:
1094 finally:
1095 meta.Session.remove()
1095 meta.Session.remove()
1096
1096
1097
1097
1098 def get_csrf_token(session=None, force_new=False, save_if_missing=True):
1098 def get_csrf_token(session=None, force_new=False, save_if_missing=True):
1099 """
1099 """
1100 Return the current authentication token, creating one if one doesn't
1100 Return the current authentication token, creating one if one doesn't
1101 already exist and the save_if_missing flag is present.
1101 already exist and the save_if_missing flag is present.
1102
1102
1103 :param session: pass in the pylons session, else we use the global ones
1103 :param session: pass in the pylons session, else we use the global ones
1104 :param force_new: force to re-generate the token and store it in session
1104 :param force_new: force to re-generate the token and store it in session
1105 :param save_if_missing: save the newly generated token if it's missing in
1105 :param save_if_missing: save the newly generated token if it's missing in
1106 session
1106 session
1107 """
1107 """
1108 if not session:
1108 if not session:
1109 from pylons import session
1109 from pylons import session
1110
1110
1111 if (csrf_token_key not in session and save_if_missing) or force_new:
1111 if (csrf_token_key not in session and save_if_missing) or force_new:
1112 token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
1112 token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
1113 session[csrf_token_key] = token
1113 session[csrf_token_key] = token
1114 if hasattr(session, 'save'):
1114 if hasattr(session, 'save'):
1115 session.save()
1115 session.save()
1116 return session.get(csrf_token_key)
1116 return session.get(csrf_token_key)
1117
1117
1118
1118
1119 def get_request(perm_class):
1119 def get_request(perm_class):
1120 from pyramid.threadlocal import get_current_request
1120 from pyramid.threadlocal import get_current_request
1121 pyramid_request = get_current_request()
1121 pyramid_request = get_current_request()
1122 if not pyramid_request:
1122 if not pyramid_request:
1123 # return global request of pylons in case pyramid isn't available
1123 # return global request of pylons in case pyramid isn't available
1124 # NOTE(marcink): this should be removed after migration to pyramid
1124 # NOTE(marcink): this should be removed after migration to pyramid
1125 from pylons import request
1125 from pylons import request
1126 return request
1126 return request
1127 return pyramid_request
1127 return pyramid_request
1128
1128
1129
1129
1130 # CHECK DECORATORS
1130 # CHECK DECORATORS
1131 class CSRFRequired(object):
1131 class CSRFRequired(object):
1132 """
1132 """
1133 Decorator for authenticating a form
1133 Decorator for authenticating a form
1134
1134
1135 This decorator uses an authorization token stored in the client's
1135 This decorator uses an authorization token stored in the client's
1136 session for prevention of certain Cross-site request forgery (CSRF)
1136 session for prevention of certain Cross-site request forgery (CSRF)
1137 attacks (See
1137 attacks (See
1138 http://en.wikipedia.org/wiki/Cross-site_request_forgery for more
1138 http://en.wikipedia.org/wiki/Cross-site_request_forgery for more
1139 information).
1139 information).
1140
1140
1141 For use with the ``webhelpers.secure_form`` helper functions.
1141 For use with the ``webhelpers.secure_form`` helper functions.
1142
1142
1143 """
1143 """
1144 def __init__(self, token=csrf_token_key, header='X-CSRF-Token',
1144 def __init__(self, token=csrf_token_key, header='X-CSRF-Token',
1145 except_methods=None):
1145 except_methods=None):
1146 self.token = token
1146 self.token = token
1147 self.header = header
1147 self.header = header
1148 self.except_methods = except_methods or []
1148 self.except_methods = except_methods or []
1149
1149
1150 def __call__(self, func):
1150 def __call__(self, func):
1151 return get_cython_compat_decorator(self.__wrapper, func)
1151 return get_cython_compat_decorator(self.__wrapper, func)
1152
1152
1153 def _get_csrf(self, _request):
1153 def _get_csrf(self, _request):
1154 return _request.POST.get(self.token, _request.headers.get(self.header))
1154 return _request.POST.get(self.token, _request.headers.get(self.header))
1155
1155
1156 def check_csrf(self, _request, cur_token):
1156 def check_csrf(self, _request, cur_token):
1157 supplied_token = self._get_csrf(_request)
1157 supplied_token = self._get_csrf(_request)
1158 return supplied_token and supplied_token == cur_token
1158 return supplied_token and supplied_token == cur_token
1159
1159
1160 def _get_request(self):
1160 def _get_request(self):
1161 return get_request(self)
1161 return get_request(self)
1162
1162
1163 def __wrapper(self, func, *fargs, **fkwargs):
1163 def __wrapper(self, func, *fargs, **fkwargs):
1164 request = self._get_request()
1164 request = self._get_request()
1165
1165
1166 if request.method in self.except_methods:
1166 if request.method in self.except_methods:
1167 return func(*fargs, **fkwargs)
1167 return func(*fargs, **fkwargs)
1168
1168
1169 cur_token = get_csrf_token(save_if_missing=False)
1169 cur_token = get_csrf_token(save_if_missing=False)
1170 if self.check_csrf(request, cur_token):
1170 if self.check_csrf(request, cur_token):
1171 if request.POST.get(self.token):
1171 if request.POST.get(self.token):
1172 del request.POST[self.token]
1172 del request.POST[self.token]
1173 return func(*fargs, **fkwargs)
1173 return func(*fargs, **fkwargs)
1174 else:
1174 else:
1175 reason = 'token-missing'
1175 reason = 'token-missing'
1176 supplied_token = self._get_csrf(request)
1176 supplied_token = self._get_csrf(request)
1177 if supplied_token and cur_token != supplied_token:
1177 if supplied_token and cur_token != supplied_token:
1178 reason = 'token-mismatch [%s:%s]' % (
1178 reason = 'token-mismatch [%s:%s]' % (
1179 cur_token or ''[:6], supplied_token or ''[:6])
1179 cur_token or ''[:6], supplied_token or ''[:6])
1180
1180
1181 csrf_message = \
1181 csrf_message = \
1182 ("Cross-site request forgery detected, request denied. See "
1182 ("Cross-site request forgery detected, request denied. See "
1183 "http://en.wikipedia.org/wiki/Cross-site_request_forgery for "
1183 "http://en.wikipedia.org/wiki/Cross-site_request_forgery for "
1184 "more information.")
1184 "more information.")
1185 log.warn('Cross-site request forgery detected, request %r DENIED: %s '
1185 log.warn('Cross-site request forgery detected, request %r DENIED: %s '
1186 'REMOTE_ADDR:%s, HEADERS:%s' % (
1186 'REMOTE_ADDR:%s, HEADERS:%s' % (
1187 request, reason, request.remote_addr, request.headers))
1187 request, reason, request.remote_addr, request.headers))
1188
1188
1189 raise HTTPForbidden(explanation=csrf_message)
1189 raise HTTPForbidden(explanation=csrf_message)
1190
1190
1191
1191
1192 class LoginRequired(object):
1192 class LoginRequired(object):
1193 """
1193 """
1194 Must be logged in to execute this function else
1194 Must be logged in to execute this function else
1195 redirect to login page
1195 redirect to login page
1196
1196
1197 :param api_access: if enabled this checks only for valid auth token
1197 :param api_access: if enabled this checks only for valid auth token
1198 and grants access based on valid token
1198 and grants access based on valid token
1199 """
1199 """
1200 def __init__(self, auth_token_access=None):
1200 def __init__(self, auth_token_access=None):
1201 self.auth_token_access = auth_token_access
1201 self.auth_token_access = auth_token_access
1202
1202
1203 def __call__(self, func):
1203 def __call__(self, func):
1204 return get_cython_compat_decorator(self.__wrapper, func)
1204 return get_cython_compat_decorator(self.__wrapper, func)
1205
1205
1206 def _get_request(self):
1206 def _get_request(self):
1207 return get_request(self)
1207 return get_request(self)
1208
1208
1209 def __wrapper(self, func, *fargs, **fkwargs):
1209 def __wrapper(self, func, *fargs, **fkwargs):
1210 from rhodecode.lib import helpers as h
1210 from rhodecode.lib import helpers as h
1211 cls = fargs[0]
1211 cls = fargs[0]
1212 user = cls._rhodecode_user
1212 user = cls._rhodecode_user
1213 request = self._get_request()
1213 request = self._get_request()
1214
1214
1215 loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
1215 loc = "%s:%s" % (cls.__class__.__name__, func.__name__)
1216 log.debug('Starting login restriction checks for user: %s' % (user,))
1216 log.debug('Starting login restriction checks for user: %s' % (user,))
1217 # check if our IP is allowed
1217 # check if our IP is allowed
1218 ip_access_valid = True
1218 ip_access_valid = True
1219 if not user.ip_allowed:
1219 if not user.ip_allowed:
1220 h.flash(h.literal(_('IP %s not allowed' % (user.ip_addr,))),
1220 h.flash(h.literal(_('IP %s not allowed' % (user.ip_addr,))),
1221 category='warning')
1221 category='warning')
1222 ip_access_valid = False
1222 ip_access_valid = False
1223
1223
1224 # check if we used an APIKEY and it's a valid one
1224 # check if we used an APIKEY and it's a valid one
1225 # defined white-list of controllers which API access will be enabled
1225 # defined white-list of controllers which API access will be enabled
1226 _auth_token = request.GET.get(
1226 _auth_token = request.GET.get(
1227 'auth_token', '') or request.GET.get('api_key', '')
1227 'auth_token', '') or request.GET.get('api_key', '')
1228 auth_token_access_valid = allowed_auth_token_access(
1228 auth_token_access_valid = allowed_auth_token_access(
1229 loc, auth_token=_auth_token)
1229 loc, auth_token=_auth_token)
1230
1230
1231 # explicit controller is enabled or API is in our whitelist
1231 # explicit controller is enabled or API is in our whitelist
1232 if self.auth_token_access or auth_token_access_valid:
1232 if self.auth_token_access or auth_token_access_valid:
1233 log.debug('Checking AUTH TOKEN access for %s' % (cls,))
1233 log.debug('Checking AUTH TOKEN access for %s' % (cls,))
1234 db_user = user.get_instance()
1234 db_user = user.get_instance()
1235
1235
1236 if db_user:
1236 if db_user:
1237 if self.auth_token_access:
1237 if self.auth_token_access:
1238 roles = self.auth_token_access
1238 roles = self.auth_token_access
1239 else:
1239 else:
1240 roles = [UserApiKeys.ROLE_HTTP]
1240 roles = [UserApiKeys.ROLE_HTTP]
1241 token_match = db_user.authenticate_by_token(
1241 token_match = db_user.authenticate_by_token(
1242 _auth_token, roles=roles)
1242 _auth_token, roles=roles)
1243 else:
1243 else:
1244 log.debug('Unable to fetch db instance for auth user: %s', user)
1244 log.debug('Unable to fetch db instance for auth user: %s', user)
1245 token_match = False
1245 token_match = False
1246
1246
1247 if _auth_token and token_match:
1247 if _auth_token and token_match:
1248 auth_token_access_valid = True
1248 auth_token_access_valid = True
1249 log.debug('AUTH TOKEN ****%s is VALID' % (_auth_token[-4:],))
1249 log.debug('AUTH TOKEN ****%s is VALID' % (_auth_token[-4:],))
1250 else:
1250 else:
1251 auth_token_access_valid = False
1251 auth_token_access_valid = False
1252 if not _auth_token:
1252 if not _auth_token:
1253 log.debug("AUTH TOKEN *NOT* present in request")
1253 log.debug("AUTH TOKEN *NOT* present in request")
1254 else:
1254 else:
1255 log.warning(
1255 log.warning(
1256 "AUTH TOKEN ****%s *NOT* valid" % _auth_token[-4:])
1256 "AUTH TOKEN ****%s *NOT* valid" % _auth_token[-4:])
1257
1257
1258 log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
1258 log.debug('Checking if %s is authenticated @ %s' % (user.username, loc))
1259 reason = 'RHODECODE_AUTH' if user.is_authenticated \
1259 reason = 'RHODECODE_AUTH' if user.is_authenticated \
1260 else 'AUTH_TOKEN_AUTH'
1260 else 'AUTH_TOKEN_AUTH'
1261
1261
1262 if ip_access_valid and (
1262 if ip_access_valid and (
1263 user.is_authenticated or auth_token_access_valid):
1263 user.is_authenticated or auth_token_access_valid):
1264 log.info(
1264 log.info(
1265 'user %s authenticating with:%s IS authenticated on func %s'
1265 'user %s authenticating with:%s IS authenticated on func %s'
1266 % (user, reason, loc))
1266 % (user, reason, loc))
1267
1267
1268 # update user data to check last activity
1268 # update user data to check last activity
1269 user.update_lastactivity()
1269 user.update_lastactivity()
1270 Session().commit()
1270 Session().commit()
1271 return func(*fargs, **fkwargs)
1271 return func(*fargs, **fkwargs)
1272 else:
1272 else:
1273 log.warning(
1273 log.warning(
1274 'user %s authenticating with:%s NOT authenticated on '
1274 'user %s authenticating with:%s NOT authenticated on '
1275 'func: %s: IP_ACCESS:%s AUTH_TOKEN_ACCESS:%s'
1275 'func: %s: IP_ACCESS:%s AUTH_TOKEN_ACCESS:%s'
1276 % (user, reason, loc, ip_access_valid,
1276 % (user, reason, loc, ip_access_valid,
1277 auth_token_access_valid))
1277 auth_token_access_valid))
1278 # we preserve the get PARAM
1278 # we preserve the get PARAM
1279 came_from = request.path_qs
1279 came_from = request.path_qs
1280 log.debug('redirecting to login page with %s' % (came_from,))
1280 log.debug('redirecting to login page with %s' % (came_from,))
1281 raise HTTPFound(
1281 raise HTTPFound(
1282 h.route_path('login', _query={'came_from': came_from}))
1282 h.route_path('login', _query={'came_from': came_from}))
1283
1283
1284
1284
1285 class NotAnonymous(object):
1285 class NotAnonymous(object):
1286 """
1286 """
1287 Must be logged in to execute this function else
1287 Must be logged in to execute this function else
1288 redirect to login page
1288 redirect to login page
1289 """
1289 """
1290
1290
1291 def __call__(self, func):
1291 def __call__(self, func):
1292 return get_cython_compat_decorator(self.__wrapper, func)
1292 return get_cython_compat_decorator(self.__wrapper, func)
1293
1293
1294 def _get_request(self):
1294 def _get_request(self):
1295 return get_request(self)
1295 return get_request(self)
1296
1296
1297 def __wrapper(self, func, *fargs, **fkwargs):
1297 def __wrapper(self, func, *fargs, **fkwargs):
1298 import rhodecode.lib.helpers as h
1298 import rhodecode.lib.helpers as h
1299 cls = fargs[0]
1299 cls = fargs[0]
1300 self.user = cls._rhodecode_user
1300 self.user = cls._rhodecode_user
1301 request = self._get_request()
1301 request = self._get_request()
1302
1302
1303 log.debug('Checking if user is not anonymous @%s' % cls)
1303 log.debug('Checking if user is not anonymous @%s' % cls)
1304
1304
1305 anonymous = self.user.username == User.DEFAULT_USER
1305 anonymous = self.user.username == User.DEFAULT_USER
1306
1306
1307 if anonymous:
1307 if anonymous:
1308 came_from = request.path_qs
1308 came_from = request.path_qs
1309 h.flash(_('You need to be a registered user to '
1309 h.flash(_('You need to be a registered user to '
1310 'perform this action'),
1310 'perform this action'),
1311 category='warning')
1311 category='warning')
1312 raise HTTPFound(
1312 raise HTTPFound(
1313 h.route_path('login', _query={'came_from': came_from}))
1313 h.route_path('login', _query={'came_from': came_from}))
1314 else:
1314 else:
1315 return func(*fargs, **fkwargs)
1315 return func(*fargs, **fkwargs)
1316
1316
1317
1317
1318 class XHRRequired(object):
1318 class XHRRequired(object):
1319 # TODO(marcink): remove this in favor of the predicates in pyramid routes
1319 # TODO(marcink): remove this in favor of the predicates in pyramid routes
1320
1320
1321 def __call__(self, func):
1321 def __call__(self, func):
1322 return get_cython_compat_decorator(self.__wrapper, func)
1322 return get_cython_compat_decorator(self.__wrapper, func)
1323
1323
1324 def _get_request(self):
1324 def _get_request(self):
1325 return get_request(self)
1325 return get_request(self)
1326
1326
1327 def __wrapper(self, func, *fargs, **fkwargs):
1327 def __wrapper(self, func, *fargs, **fkwargs):
1328 from pylons.controllers.util import abort
1328 from pylons.controllers.util import abort
1329 request = self._get_request()
1329 request = self._get_request()
1330
1330
1331 log.debug('Checking if request is XMLHttpRequest (XHR)')
1331 log.debug('Checking if request is XMLHttpRequest (XHR)')
1332 xhr_message = 'This is not a valid XMLHttpRequest (XHR) request'
1332 xhr_message = 'This is not a valid XMLHttpRequest (XHR) request'
1333
1333
1334 if not request.is_xhr:
1334 if not request.is_xhr:
1335 abort(400, detail=xhr_message)
1335 abort(400, detail=xhr_message)
1336
1336
1337 return func(*fargs, **fkwargs)
1337 return func(*fargs, **fkwargs)
1338
1338
1339
1339
1340 class HasAcceptedRepoType(object):
1340 class HasAcceptedRepoType(object):
1341 """
1341 """
1342 Check if requested repo is within given repo type aliases
1342 Check if requested repo is within given repo type aliases
1343 """
1343 """
1344
1344
1345 # TODO(marcink): remove this in favor of the predicates in pyramid routes
1345 # TODO(marcink): remove this in favor of the predicates in pyramid routes
1346
1346
1347 def __init__(self, *repo_type_list):
1347 def __init__(self, *repo_type_list):
1348 self.repo_type_list = set(repo_type_list)
1348 self.repo_type_list = set(repo_type_list)
1349
1349
1350 def __call__(self, func):
1350 def __call__(self, func):
1351 return get_cython_compat_decorator(self.__wrapper, func)
1351 return get_cython_compat_decorator(self.__wrapper, func)
1352
1352
1353 def __wrapper(self, func, *fargs, **fkwargs):
1353 def __wrapper(self, func, *fargs, **fkwargs):
1354 import rhodecode.lib.helpers as h
1354 import rhodecode.lib.helpers as h
1355 cls = fargs[0]
1355 cls = fargs[0]
1356 rhodecode_repo = cls.rhodecode_repo
1356 rhodecode_repo = cls.rhodecode_repo
1357
1357
1358 log.debug('%s checking repo type for %s in %s',
1358 log.debug('%s checking repo type for %s in %s',
1359 self.__class__.__name__,
1359 self.__class__.__name__,
1360 rhodecode_repo.alias, self.repo_type_list)
1360 rhodecode_repo.alias, self.repo_type_list)
1361
1361
1362 if rhodecode_repo.alias in self.repo_type_list:
1362 if rhodecode_repo.alias in self.repo_type_list:
1363 return func(*fargs, **fkwargs)
1363 return func(*fargs, **fkwargs)
1364 else:
1364 else:
1365 h.flash(h.literal(
1365 h.flash(h.literal(
1366 _('Action not supported for %s.' % rhodecode_repo.alias)),
1366 _('Action not supported for %s.' % rhodecode_repo.alias)),
1367 category='warning')
1367 category='warning')
1368 raise HTTPFound(
1368 raise HTTPFound(
1369 h.route_path('repo_summary',
1369 h.route_path('repo_summary',
1370 repo_name=cls.rhodecode_db_repo.repo_name))
1370 repo_name=cls.rhodecode_db_repo.repo_name))
1371
1371
1372
1372
1373 class PermsDecorator(object):
1373 class PermsDecorator(object):
1374 """
1374 """
1375 Base class for controller decorators, we extract the current user from
1375 Base class for controller decorators, we extract the current user from
1376 the class itself, which has it stored in base controllers
1376 the class itself, which has it stored in base controllers
1377 """
1377 """
1378
1378
1379 def __init__(self, *required_perms):
1379 def __init__(self, *required_perms):
1380 self.required_perms = set(required_perms)
1380 self.required_perms = set(required_perms)
1381
1381
1382 def __call__(self, func):
1382 def __call__(self, func):
1383 return get_cython_compat_decorator(self.__wrapper, func)
1383 return get_cython_compat_decorator(self.__wrapper, func)
1384
1384
1385 def _get_request(self):
1385 def _get_request(self):
1386 return get_request(self)
1386 return get_request(self)
1387
1387
1388 def _get_came_from(self):
1388 def _get_came_from(self):
1389 _request = self._get_request()
1389 _request = self._get_request()
1390
1390
1391 # both pylons/pyramid has this attribute
1391 # both pylons/pyramid has this attribute
1392 return _request.path_qs
1392 return _request.path_qs
1393
1393
1394 def __wrapper(self, func, *fargs, **fkwargs):
1394 def __wrapper(self, func, *fargs, **fkwargs):
1395 import rhodecode.lib.helpers as h
1395 import rhodecode.lib.helpers as h
1396 cls = fargs[0]
1396 cls = fargs[0]
1397 _user = cls._rhodecode_user
1397 _user = cls._rhodecode_user
1398
1398
1399 log.debug('checking %s permissions %s for %s %s',
1399 log.debug('checking %s permissions %s for %s %s',
1400 self.__class__.__name__, self.required_perms, cls, _user)
1400 self.__class__.__name__, self.required_perms, cls, _user)
1401
1401
1402 if self.check_permissions(_user):
1402 if self.check_permissions(_user):
1403 log.debug('Permission granted for %s %s', cls, _user)
1403 log.debug('Permission granted for %s %s', cls, _user)
1404 return func(*fargs, **fkwargs)
1404 return func(*fargs, **fkwargs)
1405
1405
1406 else:
1406 else:
1407 log.debug('Permission denied for %s %s', cls, _user)
1407 log.debug('Permission denied for %s %s', cls, _user)
1408 anonymous = _user.username == User.DEFAULT_USER
1408 anonymous = _user.username == User.DEFAULT_USER
1409
1409
1410 if anonymous:
1410 if anonymous:
1411 came_from = self._get_came_from()
1411 came_from = self._get_came_from()
1412 h.flash(_('You need to be signed in to view this page'),
1412 h.flash(_('You need to be signed in to view this page'),
1413 category='warning')
1413 category='warning')
1414 raise HTTPFound(
1414 raise HTTPFound(
1415 h.route_path('login', _query={'came_from': came_from}))
1415 h.route_path('login', _query={'came_from': came_from}))
1416
1416
1417 else:
1417 else:
1418 # redirect with forbidden ret code
1418 # redirect with 404 to prevent resource discovery
1419 raise HTTPForbidden()
1419 raise HTTPNotFound()
1420
1420
1421 def check_permissions(self, user):
1421 def check_permissions(self, user):
1422 """Dummy function for overriding"""
1422 """Dummy function for overriding"""
1423 raise NotImplementedError(
1423 raise NotImplementedError(
1424 'You have to write this function in child class')
1424 'You have to write this function in child class')
1425
1425
1426
1426
1427 class HasPermissionAllDecorator(PermsDecorator):
1427 class HasPermissionAllDecorator(PermsDecorator):
1428 """
1428 """
1429 Checks for access permission for all given predicates. All of them
1429 Checks for access permission for all given predicates. All of them
1430 have to be meet in order to fulfill the request
1430 have to be meet in order to fulfill the request
1431 """
1431 """
1432
1432
1433 def check_permissions(self, user):
1433 def check_permissions(self, user):
1434 perms = user.permissions_with_scope({})
1434 perms = user.permissions_with_scope({})
1435 if self.required_perms.issubset(perms['global']):
1435 if self.required_perms.issubset(perms['global']):
1436 return True
1436 return True
1437 return False
1437 return False
1438
1438
1439
1439
1440 class HasPermissionAnyDecorator(PermsDecorator):
1440 class HasPermissionAnyDecorator(PermsDecorator):
1441 """
1441 """
1442 Checks for access permission for any of given predicates. In order to
1442 Checks for access permission for any of given predicates. In order to
1443 fulfill the request any of predicates must be meet
1443 fulfill the request any of predicates must be meet
1444 """
1444 """
1445
1445
1446 def check_permissions(self, user):
1446 def check_permissions(self, user):
1447 perms = user.permissions_with_scope({})
1447 perms = user.permissions_with_scope({})
1448 if self.required_perms.intersection(perms['global']):
1448 if self.required_perms.intersection(perms['global']):
1449 return True
1449 return True
1450 return False
1450 return False
1451
1451
1452
1452
1453 class HasRepoPermissionAllDecorator(PermsDecorator):
1453 class HasRepoPermissionAllDecorator(PermsDecorator):
1454 """
1454 """
1455 Checks for access permission for all given predicates for specific
1455 Checks for access permission for all given predicates for specific
1456 repository. All of them have to be meet in order to fulfill the request
1456 repository. All of them have to be meet in order to fulfill the request
1457 """
1457 """
1458 def _get_repo_name(self):
1458 def _get_repo_name(self):
1459 _request = self._get_request()
1459 _request = self._get_request()
1460 return get_repo_slug(_request)
1460 return get_repo_slug(_request)
1461
1461
1462 def check_permissions(self, user):
1462 def check_permissions(self, user):
1463 perms = user.permissions
1463 perms = user.permissions
1464 repo_name = self._get_repo_name()
1464 repo_name = self._get_repo_name()
1465
1465
1466 try:
1466 try:
1467 user_perms = set([perms['repositories'][repo_name]])
1467 user_perms = set([perms['repositories'][repo_name]])
1468 except KeyError:
1468 except KeyError:
1469 log.debug('cannot locate repo with name: `%s` in permissions defs',
1469 log.debug('cannot locate repo with name: `%s` in permissions defs',
1470 repo_name)
1470 repo_name)
1471 return False
1471 return False
1472
1472
1473 log.debug('checking `%s` permissions for repo `%s`',
1473 log.debug('checking `%s` permissions for repo `%s`',
1474 user_perms, repo_name)
1474 user_perms, repo_name)
1475 if self.required_perms.issubset(user_perms):
1475 if self.required_perms.issubset(user_perms):
1476 return True
1476 return True
1477 return False
1477 return False
1478
1478
1479
1479
1480 class HasRepoPermissionAnyDecorator(PermsDecorator):
1480 class HasRepoPermissionAnyDecorator(PermsDecorator):
1481 """
1481 """
1482 Checks for access permission for any of given predicates for specific
1482 Checks for access permission for any of given predicates for specific
1483 repository. In order to fulfill the request any of predicates must be meet
1483 repository. In order to fulfill the request any of predicates must be meet
1484 """
1484 """
1485 def _get_repo_name(self):
1485 def _get_repo_name(self):
1486 _request = self._get_request()
1486 _request = self._get_request()
1487 return get_repo_slug(_request)
1487 return get_repo_slug(_request)
1488
1488
1489 def check_permissions(self, user):
1489 def check_permissions(self, user):
1490 perms = user.permissions
1490 perms = user.permissions
1491 repo_name = self._get_repo_name()
1491 repo_name = self._get_repo_name()
1492
1492
1493 try:
1493 try:
1494 user_perms = set([perms['repositories'][repo_name]])
1494 user_perms = set([perms['repositories'][repo_name]])
1495 except KeyError:
1495 except KeyError:
1496 log.debug('cannot locate repo with name: `%s` in permissions defs',
1496 log.debug('cannot locate repo with name: `%s` in permissions defs',
1497 repo_name)
1497 repo_name)
1498 return False
1498 return False
1499
1499
1500 log.debug('checking `%s` permissions for repo `%s`',
1500 log.debug('checking `%s` permissions for repo `%s`',
1501 user_perms, repo_name)
1501 user_perms, repo_name)
1502 if self.required_perms.intersection(user_perms):
1502 if self.required_perms.intersection(user_perms):
1503 return True
1503 return True
1504 return False
1504 return False
1505
1505
1506
1506
1507 class HasRepoGroupPermissionAllDecorator(PermsDecorator):
1507 class HasRepoGroupPermissionAllDecorator(PermsDecorator):
1508 """
1508 """
1509 Checks for access permission for all given predicates for specific
1509 Checks for access permission for all given predicates for specific
1510 repository group. All of them have to be meet in order to
1510 repository group. All of them have to be meet in order to
1511 fulfill the request
1511 fulfill the request
1512 """
1512 """
1513 def _get_repo_group_name(self):
1513 def _get_repo_group_name(self):
1514 _request = self._get_request()
1514 _request = self._get_request()
1515 return get_repo_group_slug(_request)
1515 return get_repo_group_slug(_request)
1516
1516
1517 def check_permissions(self, user):
1517 def check_permissions(self, user):
1518 perms = user.permissions
1518 perms = user.permissions
1519 group_name = self._get_repo_group_name()
1519 group_name = self._get_repo_group_name()
1520 try:
1520 try:
1521 user_perms = set([perms['repositories_groups'][group_name]])
1521 user_perms = set([perms['repositories_groups'][group_name]])
1522 except KeyError:
1522 except KeyError:
1523 log.debug('cannot locate repo group with name: `%s` in permissions defs',
1523 log.debug('cannot locate repo group with name: `%s` in permissions defs',
1524 group_name)
1524 group_name)
1525 return False
1525 return False
1526
1526
1527 log.debug('checking `%s` permissions for repo group `%s`',
1527 log.debug('checking `%s` permissions for repo group `%s`',
1528 user_perms, group_name)
1528 user_perms, group_name)
1529 if self.required_perms.issubset(user_perms):
1529 if self.required_perms.issubset(user_perms):
1530 return True
1530 return True
1531 return False
1531 return False
1532
1532
1533
1533
1534 class HasRepoGroupPermissionAnyDecorator(PermsDecorator):
1534 class HasRepoGroupPermissionAnyDecorator(PermsDecorator):
1535 """
1535 """
1536 Checks for access permission for any of given predicates for specific
1536 Checks for access permission for any of given predicates for specific
1537 repository group. In order to fulfill the request any
1537 repository group. In order to fulfill the request any
1538 of predicates must be met
1538 of predicates must be met
1539 """
1539 """
1540 def _get_repo_group_name(self):
1540 def _get_repo_group_name(self):
1541 _request = self._get_request()
1541 _request = self._get_request()
1542 return get_repo_group_slug(_request)
1542 return get_repo_group_slug(_request)
1543
1543
1544 def check_permissions(self, user):
1544 def check_permissions(self, user):
1545 perms = user.permissions
1545 perms = user.permissions
1546 group_name = self._get_repo_group_name()
1546 group_name = self._get_repo_group_name()
1547
1547
1548 try:
1548 try:
1549 user_perms = set([perms['repositories_groups'][group_name]])
1549 user_perms = set([perms['repositories_groups'][group_name]])
1550 except KeyError:
1550 except KeyError:
1551 log.debug('cannot locate repo group with name: `%s` in permissions defs',
1551 log.debug('cannot locate repo group with name: `%s` in permissions defs',
1552 group_name)
1552 group_name)
1553 return False
1553 return False
1554
1554
1555 log.debug('checking `%s` permissions for repo group `%s`',
1555 log.debug('checking `%s` permissions for repo group `%s`',
1556 user_perms, group_name)
1556 user_perms, group_name)
1557 if self.required_perms.intersection(user_perms):
1557 if self.required_perms.intersection(user_perms):
1558 return True
1558 return True
1559 return False
1559 return False
1560
1560
1561
1561
1562 class HasUserGroupPermissionAllDecorator(PermsDecorator):
1562 class HasUserGroupPermissionAllDecorator(PermsDecorator):
1563 """
1563 """
1564 Checks for access permission for all given predicates for specific
1564 Checks for access permission for all given predicates for specific
1565 user group. All of them have to be meet in order to fulfill the request
1565 user group. All of them have to be meet in order to fulfill the request
1566 """
1566 """
1567 def _get_user_group_name(self):
1567 def _get_user_group_name(self):
1568 _request = self._get_request()
1568 _request = self._get_request()
1569 return get_user_group_slug(_request)
1569 return get_user_group_slug(_request)
1570
1570
1571 def check_permissions(self, user):
1571 def check_permissions(self, user):
1572 perms = user.permissions
1572 perms = user.permissions
1573 group_name = self._get_user_group_name()
1573 group_name = self._get_user_group_name()
1574 try:
1574 try:
1575 user_perms = set([perms['user_groups'][group_name]])
1575 user_perms = set([perms['user_groups'][group_name]])
1576 except KeyError:
1576 except KeyError:
1577 return False
1577 return False
1578
1578
1579 if self.required_perms.issubset(user_perms):
1579 if self.required_perms.issubset(user_perms):
1580 return True
1580 return True
1581 return False
1581 return False
1582
1582
1583
1583
1584 class HasUserGroupPermissionAnyDecorator(PermsDecorator):
1584 class HasUserGroupPermissionAnyDecorator(PermsDecorator):
1585 """
1585 """
1586 Checks for access permission for any of given predicates for specific
1586 Checks for access permission for any of given predicates for specific
1587 user group. In order to fulfill the request any of predicates must be meet
1587 user group. In order to fulfill the request any of predicates must be meet
1588 """
1588 """
1589 def _get_user_group_name(self):
1589 def _get_user_group_name(self):
1590 _request = self._get_request()
1590 _request = self._get_request()
1591 return get_user_group_slug(_request)
1591 return get_user_group_slug(_request)
1592
1592
1593 def check_permissions(self, user):
1593 def check_permissions(self, user):
1594 perms = user.permissions
1594 perms = user.permissions
1595 group_name = self._get_user_group_name()
1595 group_name = self._get_user_group_name()
1596 try:
1596 try:
1597 user_perms = set([perms['user_groups'][group_name]])
1597 user_perms = set([perms['user_groups'][group_name]])
1598 except KeyError:
1598 except KeyError:
1599 return False
1599 return False
1600
1600
1601 if self.required_perms.intersection(user_perms):
1601 if self.required_perms.intersection(user_perms):
1602 return True
1602 return True
1603 return False
1603 return False
1604
1604
1605
1605
1606 # CHECK FUNCTIONS
1606 # CHECK FUNCTIONS
1607 class PermsFunction(object):
1607 class PermsFunction(object):
1608 """Base function for other check functions"""
1608 """Base function for other check functions"""
1609
1609
1610 def __init__(self, *perms):
1610 def __init__(self, *perms):
1611 self.required_perms = set(perms)
1611 self.required_perms = set(perms)
1612 self.repo_name = None
1612 self.repo_name = None
1613 self.repo_group_name = None
1613 self.repo_group_name = None
1614 self.user_group_name = None
1614 self.user_group_name = None
1615
1615
1616 def __bool__(self):
1616 def __bool__(self):
1617 frame = inspect.currentframe()
1617 frame = inspect.currentframe()
1618 stack_trace = traceback.format_stack(frame)
1618 stack_trace = traceback.format_stack(frame)
1619 log.error('Checking bool value on a class instance of perm '
1619 log.error('Checking bool value on a class instance of perm '
1620 'function is not allowed: %s' % ''.join(stack_trace))
1620 'function is not allowed: %s' % ''.join(stack_trace))
1621 # rather than throwing errors, here we always return False so if by
1621 # rather than throwing errors, here we always return False so if by
1622 # accident someone checks truth for just an instance it will always end
1622 # accident someone checks truth for just an instance it will always end
1623 # up in returning False
1623 # up in returning False
1624 return False
1624 return False
1625 __nonzero__ = __bool__
1625 __nonzero__ = __bool__
1626
1626
1627 def __call__(self, check_location='', user=None):
1627 def __call__(self, check_location='', user=None):
1628 if not user:
1628 if not user:
1629 log.debug('Using user attribute from global request')
1629 log.debug('Using user attribute from global request')
1630 # TODO: remove this someday,put as user as attribute here
1630 # TODO: remove this someday,put as user as attribute here
1631 request = self._get_request()
1631 request = self._get_request()
1632 user = request.user
1632 user = request.user
1633
1633
1634 # init auth user if not already given
1634 # init auth user if not already given
1635 if not isinstance(user, AuthUser):
1635 if not isinstance(user, AuthUser):
1636 log.debug('Wrapping user %s into AuthUser', user)
1636 log.debug('Wrapping user %s into AuthUser', user)
1637 user = AuthUser(user.user_id)
1637 user = AuthUser(user.user_id)
1638
1638
1639 cls_name = self.__class__.__name__
1639 cls_name = self.__class__.__name__
1640 check_scope = self._get_check_scope(cls_name)
1640 check_scope = self._get_check_scope(cls_name)
1641 check_location = check_location or 'unspecified location'
1641 check_location = check_location or 'unspecified location'
1642
1642
1643 log.debug('checking cls:%s %s usr:%s %s @ %s', cls_name,
1643 log.debug('checking cls:%s %s usr:%s %s @ %s', cls_name,
1644 self.required_perms, user, check_scope, check_location)
1644 self.required_perms, user, check_scope, check_location)
1645 if not user:
1645 if not user:
1646 log.warning('Empty user given for permission check')
1646 log.warning('Empty user given for permission check')
1647 return False
1647 return False
1648
1648
1649 if self.check_permissions(user):
1649 if self.check_permissions(user):
1650 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1650 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1651 check_scope, user, check_location)
1651 check_scope, user, check_location)
1652 return True
1652 return True
1653
1653
1654 else:
1654 else:
1655 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1655 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1656 check_scope, user, check_location)
1656 check_scope, user, check_location)
1657 return False
1657 return False
1658
1658
1659 def _get_request(self):
1659 def _get_request(self):
1660 return get_request(self)
1660 return get_request(self)
1661
1661
1662 def _get_check_scope(self, cls_name):
1662 def _get_check_scope(self, cls_name):
1663 return {
1663 return {
1664 'HasPermissionAll': 'GLOBAL',
1664 'HasPermissionAll': 'GLOBAL',
1665 'HasPermissionAny': 'GLOBAL',
1665 'HasPermissionAny': 'GLOBAL',
1666 'HasRepoPermissionAll': 'repo:%s' % self.repo_name,
1666 'HasRepoPermissionAll': 'repo:%s' % self.repo_name,
1667 'HasRepoPermissionAny': 'repo:%s' % self.repo_name,
1667 'HasRepoPermissionAny': 'repo:%s' % self.repo_name,
1668 'HasRepoGroupPermissionAll': 'repo_group:%s' % self.repo_group_name,
1668 'HasRepoGroupPermissionAll': 'repo_group:%s' % self.repo_group_name,
1669 'HasRepoGroupPermissionAny': 'repo_group:%s' % self.repo_group_name,
1669 'HasRepoGroupPermissionAny': 'repo_group:%s' % self.repo_group_name,
1670 'HasUserGroupPermissionAll': 'user_group:%s' % self.user_group_name,
1670 'HasUserGroupPermissionAll': 'user_group:%s' % self.user_group_name,
1671 'HasUserGroupPermissionAny': 'user_group:%s' % self.user_group_name,
1671 'HasUserGroupPermissionAny': 'user_group:%s' % self.user_group_name,
1672 }.get(cls_name, '?:%s' % cls_name)
1672 }.get(cls_name, '?:%s' % cls_name)
1673
1673
1674 def check_permissions(self, user):
1674 def check_permissions(self, user):
1675 """Dummy function for overriding"""
1675 """Dummy function for overriding"""
1676 raise Exception('You have to write this function in child class')
1676 raise Exception('You have to write this function in child class')
1677
1677
1678
1678
1679 class HasPermissionAll(PermsFunction):
1679 class HasPermissionAll(PermsFunction):
1680 def check_permissions(self, user):
1680 def check_permissions(self, user):
1681 perms = user.permissions_with_scope({})
1681 perms = user.permissions_with_scope({})
1682 if self.required_perms.issubset(perms.get('global')):
1682 if self.required_perms.issubset(perms.get('global')):
1683 return True
1683 return True
1684 return False
1684 return False
1685
1685
1686
1686
1687 class HasPermissionAny(PermsFunction):
1687 class HasPermissionAny(PermsFunction):
1688 def check_permissions(self, user):
1688 def check_permissions(self, user):
1689 perms = user.permissions_with_scope({})
1689 perms = user.permissions_with_scope({})
1690 if self.required_perms.intersection(perms.get('global')):
1690 if self.required_perms.intersection(perms.get('global')):
1691 return True
1691 return True
1692 return False
1692 return False
1693
1693
1694
1694
1695 class HasRepoPermissionAll(PermsFunction):
1695 class HasRepoPermissionAll(PermsFunction):
1696 def __call__(self, repo_name=None, check_location='', user=None):
1696 def __call__(self, repo_name=None, check_location='', user=None):
1697 self.repo_name = repo_name
1697 self.repo_name = repo_name
1698 return super(HasRepoPermissionAll, self).__call__(check_location, user)
1698 return super(HasRepoPermissionAll, self).__call__(check_location, user)
1699
1699
1700 def _get_repo_name(self):
1700 def _get_repo_name(self):
1701 if not self.repo_name:
1701 if not self.repo_name:
1702 _request = self._get_request()
1702 _request = self._get_request()
1703 self.repo_name = get_repo_slug(_request)
1703 self.repo_name = get_repo_slug(_request)
1704 return self.repo_name
1704 return self.repo_name
1705
1705
1706 def check_permissions(self, user):
1706 def check_permissions(self, user):
1707 self.repo_name = self._get_repo_name()
1707 self.repo_name = self._get_repo_name()
1708 perms = user.permissions
1708 perms = user.permissions
1709 try:
1709 try:
1710 user_perms = set([perms['repositories'][self.repo_name]])
1710 user_perms = set([perms['repositories'][self.repo_name]])
1711 except KeyError:
1711 except KeyError:
1712 return False
1712 return False
1713 if self.required_perms.issubset(user_perms):
1713 if self.required_perms.issubset(user_perms):
1714 return True
1714 return True
1715 return False
1715 return False
1716
1716
1717
1717
1718 class HasRepoPermissionAny(PermsFunction):
1718 class HasRepoPermissionAny(PermsFunction):
1719 def __call__(self, repo_name=None, check_location='', user=None):
1719 def __call__(self, repo_name=None, check_location='', user=None):
1720 self.repo_name = repo_name
1720 self.repo_name = repo_name
1721 return super(HasRepoPermissionAny, self).__call__(check_location, user)
1721 return super(HasRepoPermissionAny, self).__call__(check_location, user)
1722
1722
1723 def _get_repo_name(self):
1723 def _get_repo_name(self):
1724 if not self.repo_name:
1724 if not self.repo_name:
1725 _request = self._get_request()
1725 _request = self._get_request()
1726 self.repo_name = get_repo_slug(_request)
1726 self.repo_name = get_repo_slug(_request)
1727 return self.repo_name
1727 return self.repo_name
1728
1728
1729 def check_permissions(self, user):
1729 def check_permissions(self, user):
1730 self.repo_name = self._get_repo_name()
1730 self.repo_name = self._get_repo_name()
1731 perms = user.permissions
1731 perms = user.permissions
1732 try:
1732 try:
1733 user_perms = set([perms['repositories'][self.repo_name]])
1733 user_perms = set([perms['repositories'][self.repo_name]])
1734 except KeyError:
1734 except KeyError:
1735 return False
1735 return False
1736 if self.required_perms.intersection(user_perms):
1736 if self.required_perms.intersection(user_perms):
1737 return True
1737 return True
1738 return False
1738 return False
1739
1739
1740
1740
1741 class HasRepoGroupPermissionAny(PermsFunction):
1741 class HasRepoGroupPermissionAny(PermsFunction):
1742 def __call__(self, group_name=None, check_location='', user=None):
1742 def __call__(self, group_name=None, check_location='', user=None):
1743 self.repo_group_name = group_name
1743 self.repo_group_name = group_name
1744 return super(HasRepoGroupPermissionAny, self).__call__(
1744 return super(HasRepoGroupPermissionAny, self).__call__(
1745 check_location, user)
1745 check_location, user)
1746
1746
1747 def check_permissions(self, user):
1747 def check_permissions(self, user):
1748 perms = user.permissions
1748 perms = user.permissions
1749 try:
1749 try:
1750 user_perms = set(
1750 user_perms = set(
1751 [perms['repositories_groups'][self.repo_group_name]])
1751 [perms['repositories_groups'][self.repo_group_name]])
1752 except KeyError:
1752 except KeyError:
1753 return False
1753 return False
1754 if self.required_perms.intersection(user_perms):
1754 if self.required_perms.intersection(user_perms):
1755 return True
1755 return True
1756 return False
1756 return False
1757
1757
1758
1758
1759 class HasRepoGroupPermissionAll(PermsFunction):
1759 class HasRepoGroupPermissionAll(PermsFunction):
1760 def __call__(self, group_name=None, check_location='', user=None):
1760 def __call__(self, group_name=None, check_location='', user=None):
1761 self.repo_group_name = group_name
1761 self.repo_group_name = group_name
1762 return super(HasRepoGroupPermissionAll, self).__call__(
1762 return super(HasRepoGroupPermissionAll, self).__call__(
1763 check_location, user)
1763 check_location, user)
1764
1764
1765 def check_permissions(self, user):
1765 def check_permissions(self, user):
1766 perms = user.permissions
1766 perms = user.permissions
1767 try:
1767 try:
1768 user_perms = set(
1768 user_perms = set(
1769 [perms['repositories_groups'][self.repo_group_name]])
1769 [perms['repositories_groups'][self.repo_group_name]])
1770 except KeyError:
1770 except KeyError:
1771 return False
1771 return False
1772 if self.required_perms.issubset(user_perms):
1772 if self.required_perms.issubset(user_perms):
1773 return True
1773 return True
1774 return False
1774 return False
1775
1775
1776
1776
1777 class HasUserGroupPermissionAny(PermsFunction):
1777 class HasUserGroupPermissionAny(PermsFunction):
1778 def __call__(self, user_group_name=None, check_location='', user=None):
1778 def __call__(self, user_group_name=None, check_location='', user=None):
1779 self.user_group_name = user_group_name
1779 self.user_group_name = user_group_name
1780 return super(HasUserGroupPermissionAny, self).__call__(
1780 return super(HasUserGroupPermissionAny, self).__call__(
1781 check_location, user)
1781 check_location, user)
1782
1782
1783 def check_permissions(self, user):
1783 def check_permissions(self, user):
1784 perms = user.permissions
1784 perms = user.permissions
1785 try:
1785 try:
1786 user_perms = set([perms['user_groups'][self.user_group_name]])
1786 user_perms = set([perms['user_groups'][self.user_group_name]])
1787 except KeyError:
1787 except KeyError:
1788 return False
1788 return False
1789 if self.required_perms.intersection(user_perms):
1789 if self.required_perms.intersection(user_perms):
1790 return True
1790 return True
1791 return False
1791 return False
1792
1792
1793
1793
1794 class HasUserGroupPermissionAll(PermsFunction):
1794 class HasUserGroupPermissionAll(PermsFunction):
1795 def __call__(self, user_group_name=None, check_location='', user=None):
1795 def __call__(self, user_group_name=None, check_location='', user=None):
1796 self.user_group_name = user_group_name
1796 self.user_group_name = user_group_name
1797 return super(HasUserGroupPermissionAll, self).__call__(
1797 return super(HasUserGroupPermissionAll, self).__call__(
1798 check_location, user)
1798 check_location, user)
1799
1799
1800 def check_permissions(self, user):
1800 def check_permissions(self, user):
1801 perms = user.permissions
1801 perms = user.permissions
1802 try:
1802 try:
1803 user_perms = set([perms['user_groups'][self.user_group_name]])
1803 user_perms = set([perms['user_groups'][self.user_group_name]])
1804 except KeyError:
1804 except KeyError:
1805 return False
1805 return False
1806 if self.required_perms.issubset(user_perms):
1806 if self.required_perms.issubset(user_perms):
1807 return True
1807 return True
1808 return False
1808 return False
1809
1809
1810
1810
1811 # SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
1811 # SPECIAL VERSION TO HANDLE MIDDLEWARE AUTH
1812 class HasPermissionAnyMiddleware(object):
1812 class HasPermissionAnyMiddleware(object):
1813 def __init__(self, *perms):
1813 def __init__(self, *perms):
1814 self.required_perms = set(perms)
1814 self.required_perms = set(perms)
1815
1815
1816 def __call__(self, user, repo_name):
1816 def __call__(self, user, repo_name):
1817 # repo_name MUST be unicode, since we handle keys in permission
1817 # repo_name MUST be unicode, since we handle keys in permission
1818 # dict by unicode
1818 # dict by unicode
1819 repo_name = safe_unicode(repo_name)
1819 repo_name = safe_unicode(repo_name)
1820 user = AuthUser(user.user_id)
1820 user = AuthUser(user.user_id)
1821 log.debug(
1821 log.debug(
1822 'Checking VCS protocol permissions %s for user:%s repo:`%s`',
1822 'Checking VCS protocol permissions %s for user:%s repo:`%s`',
1823 self.required_perms, user, repo_name)
1823 self.required_perms, user, repo_name)
1824
1824
1825 if self.check_permissions(user, repo_name):
1825 if self.check_permissions(user, repo_name):
1826 log.debug('Permission to repo:`%s` GRANTED for user:%s @ %s',
1826 log.debug('Permission to repo:`%s` GRANTED for user:%s @ %s',
1827 repo_name, user, 'PermissionMiddleware')
1827 repo_name, user, 'PermissionMiddleware')
1828 return True
1828 return True
1829
1829
1830 else:
1830 else:
1831 log.debug('Permission to repo:`%s` DENIED for user:%s @ %s',
1831 log.debug('Permission to repo:`%s` DENIED for user:%s @ %s',
1832 repo_name, user, 'PermissionMiddleware')
1832 repo_name, user, 'PermissionMiddleware')
1833 return False
1833 return False
1834
1834
1835 def check_permissions(self, user, repo_name):
1835 def check_permissions(self, user, repo_name):
1836 perms = user.permissions_with_scope({'repo_name': repo_name})
1836 perms = user.permissions_with_scope({'repo_name': repo_name})
1837
1837
1838 try:
1838 try:
1839 user_perms = set([perms['repositories'][repo_name]])
1839 user_perms = set([perms['repositories'][repo_name]])
1840 except Exception:
1840 except Exception:
1841 log.exception('Error while accessing user permissions')
1841 log.exception('Error while accessing user permissions')
1842 return False
1842 return False
1843
1843
1844 if self.required_perms.intersection(user_perms):
1844 if self.required_perms.intersection(user_perms):
1845 return True
1845 return True
1846 return False
1846 return False
1847
1847
1848
1848
1849 # SPECIAL VERSION TO HANDLE API AUTH
1849 # SPECIAL VERSION TO HANDLE API AUTH
1850 class _BaseApiPerm(object):
1850 class _BaseApiPerm(object):
1851 def __init__(self, *perms):
1851 def __init__(self, *perms):
1852 self.required_perms = set(perms)
1852 self.required_perms = set(perms)
1853
1853
1854 def __call__(self, check_location=None, user=None, repo_name=None,
1854 def __call__(self, check_location=None, user=None, repo_name=None,
1855 group_name=None, user_group_name=None):
1855 group_name=None, user_group_name=None):
1856 cls_name = self.__class__.__name__
1856 cls_name = self.__class__.__name__
1857 check_scope = 'global:%s' % (self.required_perms,)
1857 check_scope = 'global:%s' % (self.required_perms,)
1858 if repo_name:
1858 if repo_name:
1859 check_scope += ', repo_name:%s' % (repo_name,)
1859 check_scope += ', repo_name:%s' % (repo_name,)
1860
1860
1861 if group_name:
1861 if group_name:
1862 check_scope += ', repo_group_name:%s' % (group_name,)
1862 check_scope += ', repo_group_name:%s' % (group_name,)
1863
1863
1864 if user_group_name:
1864 if user_group_name:
1865 check_scope += ', user_group_name:%s' % (user_group_name,)
1865 check_scope += ', user_group_name:%s' % (user_group_name,)
1866
1866
1867 log.debug(
1867 log.debug(
1868 'checking cls:%s %s %s @ %s'
1868 'checking cls:%s %s %s @ %s'
1869 % (cls_name, self.required_perms, check_scope, check_location))
1869 % (cls_name, self.required_perms, check_scope, check_location))
1870 if not user:
1870 if not user:
1871 log.debug('Empty User passed into arguments')
1871 log.debug('Empty User passed into arguments')
1872 return False
1872 return False
1873
1873
1874 # process user
1874 # process user
1875 if not isinstance(user, AuthUser):
1875 if not isinstance(user, AuthUser):
1876 user = AuthUser(user.user_id)
1876 user = AuthUser(user.user_id)
1877 if not check_location:
1877 if not check_location:
1878 check_location = 'unspecified'
1878 check_location = 'unspecified'
1879 if self.check_permissions(user.permissions, repo_name, group_name,
1879 if self.check_permissions(user.permissions, repo_name, group_name,
1880 user_group_name):
1880 user_group_name):
1881 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1881 log.debug('Permission to repo:`%s` GRANTED for user:`%s` @ %s',
1882 check_scope, user, check_location)
1882 check_scope, user, check_location)
1883 return True
1883 return True
1884
1884
1885 else:
1885 else:
1886 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1886 log.debug('Permission to repo:`%s` DENIED for user:`%s` @ %s',
1887 check_scope, user, check_location)
1887 check_scope, user, check_location)
1888 return False
1888 return False
1889
1889
1890 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1890 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1891 user_group_name=None):
1891 user_group_name=None):
1892 """
1892 """
1893 implement in child class should return True if permissions are ok,
1893 implement in child class should return True if permissions are ok,
1894 False otherwise
1894 False otherwise
1895
1895
1896 :param perm_defs: dict with permission definitions
1896 :param perm_defs: dict with permission definitions
1897 :param repo_name: repo name
1897 :param repo_name: repo name
1898 """
1898 """
1899 raise NotImplementedError()
1899 raise NotImplementedError()
1900
1900
1901
1901
1902 class HasPermissionAllApi(_BaseApiPerm):
1902 class HasPermissionAllApi(_BaseApiPerm):
1903 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1903 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1904 user_group_name=None):
1904 user_group_name=None):
1905 if self.required_perms.issubset(perm_defs.get('global')):
1905 if self.required_perms.issubset(perm_defs.get('global')):
1906 return True
1906 return True
1907 return False
1907 return False
1908
1908
1909
1909
1910 class HasPermissionAnyApi(_BaseApiPerm):
1910 class HasPermissionAnyApi(_BaseApiPerm):
1911 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1911 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1912 user_group_name=None):
1912 user_group_name=None):
1913 if self.required_perms.intersection(perm_defs.get('global')):
1913 if self.required_perms.intersection(perm_defs.get('global')):
1914 return True
1914 return True
1915 return False
1915 return False
1916
1916
1917
1917
1918 class HasRepoPermissionAllApi(_BaseApiPerm):
1918 class HasRepoPermissionAllApi(_BaseApiPerm):
1919 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1919 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1920 user_group_name=None):
1920 user_group_name=None):
1921 try:
1921 try:
1922 _user_perms = set([perm_defs['repositories'][repo_name]])
1922 _user_perms = set([perm_defs['repositories'][repo_name]])
1923 except KeyError:
1923 except KeyError:
1924 log.warning(traceback.format_exc())
1924 log.warning(traceback.format_exc())
1925 return False
1925 return False
1926 if self.required_perms.issubset(_user_perms):
1926 if self.required_perms.issubset(_user_perms):
1927 return True
1927 return True
1928 return False
1928 return False
1929
1929
1930
1930
1931 class HasRepoPermissionAnyApi(_BaseApiPerm):
1931 class HasRepoPermissionAnyApi(_BaseApiPerm):
1932 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1932 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1933 user_group_name=None):
1933 user_group_name=None):
1934 try:
1934 try:
1935 _user_perms = set([perm_defs['repositories'][repo_name]])
1935 _user_perms = set([perm_defs['repositories'][repo_name]])
1936 except KeyError:
1936 except KeyError:
1937 log.warning(traceback.format_exc())
1937 log.warning(traceback.format_exc())
1938 return False
1938 return False
1939 if self.required_perms.intersection(_user_perms):
1939 if self.required_perms.intersection(_user_perms):
1940 return True
1940 return True
1941 return False
1941 return False
1942
1942
1943
1943
1944 class HasRepoGroupPermissionAnyApi(_BaseApiPerm):
1944 class HasRepoGroupPermissionAnyApi(_BaseApiPerm):
1945 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1945 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1946 user_group_name=None):
1946 user_group_name=None):
1947 try:
1947 try:
1948 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1948 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1949 except KeyError:
1949 except KeyError:
1950 log.warning(traceback.format_exc())
1950 log.warning(traceback.format_exc())
1951 return False
1951 return False
1952 if self.required_perms.intersection(_user_perms):
1952 if self.required_perms.intersection(_user_perms):
1953 return True
1953 return True
1954 return False
1954 return False
1955
1955
1956
1956
1957 class HasRepoGroupPermissionAllApi(_BaseApiPerm):
1957 class HasRepoGroupPermissionAllApi(_BaseApiPerm):
1958 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1958 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1959 user_group_name=None):
1959 user_group_name=None):
1960 try:
1960 try:
1961 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1961 _user_perms = set([perm_defs['repositories_groups'][group_name]])
1962 except KeyError:
1962 except KeyError:
1963 log.warning(traceback.format_exc())
1963 log.warning(traceback.format_exc())
1964 return False
1964 return False
1965 if self.required_perms.issubset(_user_perms):
1965 if self.required_perms.issubset(_user_perms):
1966 return True
1966 return True
1967 return False
1967 return False
1968
1968
1969
1969
1970 class HasUserGroupPermissionAnyApi(_BaseApiPerm):
1970 class HasUserGroupPermissionAnyApi(_BaseApiPerm):
1971 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1971 def check_permissions(self, perm_defs, repo_name=None, group_name=None,
1972 user_group_name=None):
1972 user_group_name=None):
1973 try:
1973 try:
1974 _user_perms = set([perm_defs['user_groups'][user_group_name]])
1974 _user_perms = set([perm_defs['user_groups'][user_group_name]])
1975 except KeyError:
1975 except KeyError:
1976 log.warning(traceback.format_exc())
1976 log.warning(traceback.format_exc())
1977 return False
1977 return False
1978 if self.required_perms.intersection(_user_perms):
1978 if self.required_perms.intersection(_user_perms):
1979 return True
1979 return True
1980 return False
1980 return False
1981
1981
1982
1982
1983 def check_ip_access(source_ip, allowed_ips=None):
1983 def check_ip_access(source_ip, allowed_ips=None):
1984 """
1984 """
1985 Checks if source_ip is a subnet of any of allowed_ips.
1985 Checks if source_ip is a subnet of any of allowed_ips.
1986
1986
1987 :param source_ip:
1987 :param source_ip:
1988 :param allowed_ips: list of allowed ips together with mask
1988 :param allowed_ips: list of allowed ips together with mask
1989 """
1989 """
1990 log.debug('checking if ip:%s is subnet of %s' % (source_ip, allowed_ips))
1990 log.debug('checking if ip:%s is subnet of %s' % (source_ip, allowed_ips))
1991 source_ip_address = ipaddress.ip_address(source_ip)
1991 source_ip_address = ipaddress.ip_address(source_ip)
1992 if isinstance(allowed_ips, (tuple, list, set)):
1992 if isinstance(allowed_ips, (tuple, list, set)):
1993 for ip in allowed_ips:
1993 for ip in allowed_ips:
1994 try:
1994 try:
1995 network_address = ipaddress.ip_network(ip, strict=False)
1995 network_address = ipaddress.ip_network(ip, strict=False)
1996 if source_ip_address in network_address:
1996 if source_ip_address in network_address:
1997 log.debug('IP %s is network %s' %
1997 log.debug('IP %s is network %s' %
1998 (source_ip_address, network_address))
1998 (source_ip_address, network_address))
1999 return True
1999 return True
2000 # for any case we cannot determine the IP, don't crash just
2000 # for any case we cannot determine the IP, don't crash just
2001 # skip it and log as error, we want to say forbidden still when
2001 # skip it and log as error, we want to say forbidden still when
2002 # sending bad IP
2002 # sending bad IP
2003 except Exception:
2003 except Exception:
2004 log.error(traceback.format_exc())
2004 log.error(traceback.format_exc())
2005 continue
2005 continue
2006 return False
2006 return False
2007
2007
2008
2008
2009 def get_cython_compat_decorator(wrapper, func):
2009 def get_cython_compat_decorator(wrapper, func):
2010 """
2010 """
2011 Creates a cython compatible decorator. The previously used
2011 Creates a cython compatible decorator. The previously used
2012 decorator.decorator() function seems to be incompatible with cython.
2012 decorator.decorator() function seems to be incompatible with cython.
2013
2013
2014 :param wrapper: __wrapper method of the decorator class
2014 :param wrapper: __wrapper method of the decorator class
2015 :param func: decorated function
2015 :param func: decorated function
2016 """
2016 """
2017 @wraps(func)
2017 @wraps(func)
2018 def local_wrapper(*args, **kwds):
2018 def local_wrapper(*args, **kwds):
2019 return wrapper(func, *args, **kwds)
2019 return wrapper(func, *args, **kwds)
2020 local_wrapper.__wrapped__ = func
2020 local_wrapper.__wrapped__ = func
2021 return local_wrapper
2021 return local_wrapper
2022
2022
2023
2023
@@ -1,682 +1,682 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import mock
21 import mock
22 import pytest
22 import pytest
23
23
24 import rhodecode
24 import rhodecode
25 from rhodecode.config.routing import ADMIN_PREFIX
25 from rhodecode.config.routing import ADMIN_PREFIX
26 from rhodecode.lib.utils2 import md5
26 from rhodecode.lib.utils2 import md5
27 from rhodecode.model.db import RhodeCodeUi
27 from rhodecode.model.db import RhodeCodeUi
28 from rhodecode.model.meta import Session
28 from rhodecode.model.meta import Session
29 from rhodecode.model.settings import SettingsModel, IssueTrackerSettingsModel
29 from rhodecode.model.settings import SettingsModel, IssueTrackerSettingsModel
30 from rhodecode.tests import url, assert_session_flash
30 from rhodecode.tests import url, assert_session_flash
31 from rhodecode.tests.utils import AssertResponse
31 from rhodecode.tests.utils import AssertResponse
32
32
33
33
34 UPDATE_DATA_QUALNAME = (
34 UPDATE_DATA_QUALNAME = (
35 'rhodecode.apps.admin.views.system_info.AdminSystemInfoSettingsView.get_update_data')
35 'rhodecode.apps.admin.views.system_info.AdminSystemInfoSettingsView.get_update_data')
36
36
37
37
38 @pytest.mark.usefixtures('autologin_user', 'app')
38 @pytest.mark.usefixtures('autologin_user', 'app')
39 class TestAdminSettingsController(object):
39 class TestAdminSettingsController(object):
40
40
41 @pytest.mark.parametrize('urlname', [
41 @pytest.mark.parametrize('urlname', [
42 'admin_settings_vcs',
42 'admin_settings_vcs',
43 'admin_settings_mapping',
43 'admin_settings_mapping',
44 'admin_settings_global',
44 'admin_settings_global',
45 'admin_settings_visual',
45 'admin_settings_visual',
46 'admin_settings_email',
46 'admin_settings_email',
47 'admin_settings_hooks',
47 'admin_settings_hooks',
48 'admin_settings_search',
48 'admin_settings_search',
49 ])
49 ])
50 def test_simple_get(self, urlname, app):
50 def test_simple_get(self, urlname, app):
51 app.get(url(urlname))
51 app.get(url(urlname))
52
52
53 def test_create_custom_hook(self, csrf_token):
53 def test_create_custom_hook(self, csrf_token):
54 response = self.app.post(
54 response = self.app.post(
55 url('admin_settings_hooks'),
55 url('admin_settings_hooks'),
56 params={
56 params={
57 'new_hook_ui_key': 'test_hooks_1',
57 'new_hook_ui_key': 'test_hooks_1',
58 'new_hook_ui_value': 'cd /tmp',
58 'new_hook_ui_value': 'cd /tmp',
59 'csrf_token': csrf_token})
59 'csrf_token': csrf_token})
60
60
61 response = response.follow()
61 response = response.follow()
62 response.mustcontain('test_hooks_1')
62 response.mustcontain('test_hooks_1')
63 response.mustcontain('cd /tmp')
63 response.mustcontain('cd /tmp')
64
64
65 def test_create_custom_hook_delete(self, csrf_token):
65 def test_create_custom_hook_delete(self, csrf_token):
66 response = self.app.post(
66 response = self.app.post(
67 url('admin_settings_hooks'),
67 url('admin_settings_hooks'),
68 params={
68 params={
69 'new_hook_ui_key': 'test_hooks_2',
69 'new_hook_ui_key': 'test_hooks_2',
70 'new_hook_ui_value': 'cd /tmp2',
70 'new_hook_ui_value': 'cd /tmp2',
71 'csrf_token': csrf_token})
71 'csrf_token': csrf_token})
72
72
73 response = response.follow()
73 response = response.follow()
74 response.mustcontain('test_hooks_2')
74 response.mustcontain('test_hooks_2')
75 response.mustcontain('cd /tmp2')
75 response.mustcontain('cd /tmp2')
76
76
77 hook_id = SettingsModel().get_ui_by_key('test_hooks_2').ui_id
77 hook_id = SettingsModel().get_ui_by_key('test_hooks_2').ui_id
78
78
79 # delete
79 # delete
80 self.app.post(
80 self.app.post(
81 url('admin_settings_hooks'),
81 url('admin_settings_hooks'),
82 params={'hook_id': hook_id, 'csrf_token': csrf_token})
82 params={'hook_id': hook_id, 'csrf_token': csrf_token})
83 response = self.app.get(url('admin_settings_hooks'))
83 response = self.app.get(url('admin_settings_hooks'))
84 response.mustcontain(no=['test_hooks_2'])
84 response.mustcontain(no=['test_hooks_2'])
85 response.mustcontain(no=['cd /tmp2'])
85 response.mustcontain(no=['cd /tmp2'])
86
86
87
87
88 @pytest.mark.usefixtures('autologin_user', 'app')
88 @pytest.mark.usefixtures('autologin_user', 'app')
89 class TestAdminSettingsGlobal(object):
89 class TestAdminSettingsGlobal(object):
90
90
91 def test_pre_post_code_code_active(self, csrf_token):
91 def test_pre_post_code_code_active(self, csrf_token):
92 pre_code = 'rc-pre-code-187652122'
92 pre_code = 'rc-pre-code-187652122'
93 post_code = 'rc-postcode-98165231'
93 post_code = 'rc-postcode-98165231'
94
94
95 response = self.post_and_verify_settings({
95 response = self.post_and_verify_settings({
96 'rhodecode_pre_code': pre_code,
96 'rhodecode_pre_code': pre_code,
97 'rhodecode_post_code': post_code,
97 'rhodecode_post_code': post_code,
98 'csrf_token': csrf_token,
98 'csrf_token': csrf_token,
99 })
99 })
100
100
101 response = response.follow()
101 response = response.follow()
102 response.mustcontain(pre_code, post_code)
102 response.mustcontain(pre_code, post_code)
103
103
104 def test_pre_post_code_code_inactive(self, csrf_token):
104 def test_pre_post_code_code_inactive(self, csrf_token):
105 pre_code = 'rc-pre-code-187652122'
105 pre_code = 'rc-pre-code-187652122'
106 post_code = 'rc-postcode-98165231'
106 post_code = 'rc-postcode-98165231'
107 response = self.post_and_verify_settings({
107 response = self.post_and_verify_settings({
108 'rhodecode_pre_code': '',
108 'rhodecode_pre_code': '',
109 'rhodecode_post_code': '',
109 'rhodecode_post_code': '',
110 'csrf_token': csrf_token,
110 'csrf_token': csrf_token,
111 })
111 })
112
112
113 response = response.follow()
113 response = response.follow()
114 response.mustcontain(no=[pre_code, post_code])
114 response.mustcontain(no=[pre_code, post_code])
115
115
116 def test_captcha_activate(self, csrf_token):
116 def test_captcha_activate(self, csrf_token):
117 self.post_and_verify_settings({
117 self.post_and_verify_settings({
118 'rhodecode_captcha_private_key': '1234567890',
118 'rhodecode_captcha_private_key': '1234567890',
119 'rhodecode_captcha_public_key': '1234567890',
119 'rhodecode_captcha_public_key': '1234567890',
120 'csrf_token': csrf_token,
120 'csrf_token': csrf_token,
121 })
121 })
122
122
123 response = self.app.get(ADMIN_PREFIX + '/register')
123 response = self.app.get(ADMIN_PREFIX + '/register')
124 response.mustcontain('captcha')
124 response.mustcontain('captcha')
125
125
126 def test_captcha_deactivate(self, csrf_token):
126 def test_captcha_deactivate(self, csrf_token):
127 self.post_and_verify_settings({
127 self.post_and_verify_settings({
128 'rhodecode_captcha_private_key': '',
128 'rhodecode_captcha_private_key': '',
129 'rhodecode_captcha_public_key': '1234567890',
129 'rhodecode_captcha_public_key': '1234567890',
130 'csrf_token': csrf_token,
130 'csrf_token': csrf_token,
131 })
131 })
132
132
133 response = self.app.get(ADMIN_PREFIX + '/register')
133 response = self.app.get(ADMIN_PREFIX + '/register')
134 response.mustcontain(no=['captcha'])
134 response.mustcontain(no=['captcha'])
135
135
136 def test_title_change(self, csrf_token):
136 def test_title_change(self, csrf_token):
137 old_title = 'RhodeCode'
137 old_title = 'RhodeCode'
138 new_title = old_title + '_changed'
138 new_title = old_title + '_changed'
139
139
140 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
140 for new_title in ['Changed', 'Ε»Γ³Ε‚wik', old_title]:
141 response = self.post_and_verify_settings({
141 response = self.post_and_verify_settings({
142 'rhodecode_title': new_title,
142 'rhodecode_title': new_title,
143 'csrf_token': csrf_token,
143 'csrf_token': csrf_token,
144 })
144 })
145
145
146 response = response.follow()
146 response = response.follow()
147 response.mustcontain(
147 response.mustcontain(
148 """<div class="branding">- %s</div>""" % new_title)
148 """<div class="branding">- %s</div>""" % new_title)
149
149
150 def post_and_verify_settings(self, settings):
150 def post_and_verify_settings(self, settings):
151 old_title = 'RhodeCode'
151 old_title = 'RhodeCode'
152 old_realm = 'RhodeCode authentication'
152 old_realm = 'RhodeCode authentication'
153 params = {
153 params = {
154 'rhodecode_title': old_title,
154 'rhodecode_title': old_title,
155 'rhodecode_realm': old_realm,
155 'rhodecode_realm': old_realm,
156 'rhodecode_pre_code': '',
156 'rhodecode_pre_code': '',
157 'rhodecode_post_code': '',
157 'rhodecode_post_code': '',
158 'rhodecode_captcha_private_key': '',
158 'rhodecode_captcha_private_key': '',
159 'rhodecode_captcha_public_key': '',
159 'rhodecode_captcha_public_key': '',
160 'rhodecode_create_personal_repo_group': False,
160 'rhodecode_create_personal_repo_group': False,
161 'rhodecode_personal_repo_group_pattern': '${username}',
161 'rhodecode_personal_repo_group_pattern': '${username}',
162 }
162 }
163 params.update(settings)
163 params.update(settings)
164 response = self.app.post(url('admin_settings_global'), params=params)
164 response = self.app.post(url('admin_settings_global'), params=params)
165
165
166 assert_session_flash(response, 'Updated application settings')
166 assert_session_flash(response, 'Updated application settings')
167 app_settings = SettingsModel().get_all_settings()
167 app_settings = SettingsModel().get_all_settings()
168 del settings['csrf_token']
168 del settings['csrf_token']
169 for key, value in settings.iteritems():
169 for key, value in settings.iteritems():
170 assert app_settings[key] == value.decode('utf-8')
170 assert app_settings[key] == value.decode('utf-8')
171
171
172 return response
172 return response
173
173
174
174
175 @pytest.mark.usefixtures('autologin_user', 'app')
175 @pytest.mark.usefixtures('autologin_user', 'app')
176 class TestAdminSettingsVcs(object):
176 class TestAdminSettingsVcs(object):
177
177
178 def test_contains_svn_default_patterns(self, app):
178 def test_contains_svn_default_patterns(self, app):
179 response = app.get(url('admin_settings_vcs'))
179 response = app.get(url('admin_settings_vcs'))
180 expected_patterns = [
180 expected_patterns = [
181 '/trunk',
181 '/trunk',
182 '/branches/*',
182 '/branches/*',
183 '/tags/*',
183 '/tags/*',
184 ]
184 ]
185 for pattern in expected_patterns:
185 for pattern in expected_patterns:
186 response.mustcontain(pattern)
186 response.mustcontain(pattern)
187
187
188 def test_add_new_svn_branch_and_tag_pattern(
188 def test_add_new_svn_branch_and_tag_pattern(
189 self, app, backend_svn, form_defaults, disable_sql_cache,
189 self, app, backend_svn, form_defaults, disable_sql_cache,
190 csrf_token):
190 csrf_token):
191 form_defaults.update({
191 form_defaults.update({
192 'new_svn_branch': '/exp/branches/*',
192 'new_svn_branch': '/exp/branches/*',
193 'new_svn_tag': '/important_tags/*',
193 'new_svn_tag': '/important_tags/*',
194 'csrf_token': csrf_token,
194 'csrf_token': csrf_token,
195 })
195 })
196
196
197 response = app.post(
197 response = app.post(
198 url('admin_settings_vcs'), params=form_defaults, status=302)
198 url('admin_settings_vcs'), params=form_defaults, status=302)
199 response = response.follow()
199 response = response.follow()
200
200
201 # Expect to find the new values on the page
201 # Expect to find the new values on the page
202 response.mustcontain('/exp/branches/*')
202 response.mustcontain('/exp/branches/*')
203 response.mustcontain('/important_tags/*')
203 response.mustcontain('/important_tags/*')
204
204
205 # Expect that those patterns are used to match branches and tags now
205 # Expect that those patterns are used to match branches and tags now
206 repo = backend_svn['svn-simple-layout'].scm_instance()
206 repo = backend_svn['svn-simple-layout'].scm_instance()
207 assert 'exp/branches/exp-sphinx-docs' in repo.branches
207 assert 'exp/branches/exp-sphinx-docs' in repo.branches
208 assert 'important_tags/v0.5' in repo.tags
208 assert 'important_tags/v0.5' in repo.tags
209
209
210 def test_add_same_svn_value_twice_shows_an_error_message(
210 def test_add_same_svn_value_twice_shows_an_error_message(
211 self, app, form_defaults, csrf_token, settings_util):
211 self, app, form_defaults, csrf_token, settings_util):
212 settings_util.create_rhodecode_ui('vcs_svn_branch', '/test')
212 settings_util.create_rhodecode_ui('vcs_svn_branch', '/test')
213 settings_util.create_rhodecode_ui('vcs_svn_tag', '/test')
213 settings_util.create_rhodecode_ui('vcs_svn_tag', '/test')
214
214
215 response = app.post(
215 response = app.post(
216 url('admin_settings_vcs'),
216 url('admin_settings_vcs'),
217 params={
217 params={
218 'paths_root_path': form_defaults['paths_root_path'],
218 'paths_root_path': form_defaults['paths_root_path'],
219 'new_svn_branch': '/test',
219 'new_svn_branch': '/test',
220 'new_svn_tag': '/test',
220 'new_svn_tag': '/test',
221 'csrf_token': csrf_token,
221 'csrf_token': csrf_token,
222 },
222 },
223 status=200)
223 status=200)
224
224
225 response.mustcontain("Pattern already exists")
225 response.mustcontain("Pattern already exists")
226 response.mustcontain("Some form inputs contain invalid data.")
226 response.mustcontain("Some form inputs contain invalid data.")
227
227
228 @pytest.mark.parametrize('section', [
228 @pytest.mark.parametrize('section', [
229 'vcs_svn_branch',
229 'vcs_svn_branch',
230 'vcs_svn_tag',
230 'vcs_svn_tag',
231 ])
231 ])
232 def test_delete_svn_patterns(
232 def test_delete_svn_patterns(
233 self, section, app, csrf_token, settings_util):
233 self, section, app, csrf_token, settings_util):
234 setting = settings_util.create_rhodecode_ui(
234 setting = settings_util.create_rhodecode_ui(
235 section, '/test_delete', cleanup=False)
235 section, '/test_delete', cleanup=False)
236
236
237 app.post(
237 app.post(
238 url('admin_settings_vcs'),
238 url('admin_settings_vcs'),
239 params={
239 params={
240 '_method': 'delete',
240 '_method': 'delete',
241 'delete_svn_pattern': setting.ui_id,
241 'delete_svn_pattern': setting.ui_id,
242 'csrf_token': csrf_token},
242 'csrf_token': csrf_token},
243 headers={'X-REQUESTED-WITH': 'XMLHttpRequest'})
243 headers={'X-REQUESTED-WITH': 'XMLHttpRequest'})
244
244
245 @pytest.mark.parametrize('section', [
245 @pytest.mark.parametrize('section', [
246 'vcs_svn_branch',
246 'vcs_svn_branch',
247 'vcs_svn_tag',
247 'vcs_svn_tag',
248 ])
248 ])
249 def test_delete_svn_patterns_raises_400_when_no_xhr(
249 def test_delete_svn_patterns_raises_400_when_no_xhr(
250 self, section, app, csrf_token, settings_util):
250 self, section, app, csrf_token, settings_util):
251 setting = settings_util.create_rhodecode_ui(section, '/test_delete')
251 setting = settings_util.create_rhodecode_ui(section, '/test_delete')
252
252
253 app.post(
253 app.post(
254 url('admin_settings_vcs'),
254 url('admin_settings_vcs'),
255 params={
255 params={
256 '_method': 'delete',
256 '_method': 'delete',
257 'delete_svn_pattern': setting.ui_id,
257 'delete_svn_pattern': setting.ui_id,
258 'csrf_token': csrf_token},
258 'csrf_token': csrf_token},
259 status=400)
259 status=400)
260
260
261 def test_extensions_hgsubversion(self, app, form_defaults, csrf_token):
261 def test_extensions_hgsubversion(self, app, form_defaults, csrf_token):
262 form_defaults.update({
262 form_defaults.update({
263 'csrf_token': csrf_token,
263 'csrf_token': csrf_token,
264 'extensions_hgsubversion': 'True',
264 'extensions_hgsubversion': 'True',
265 })
265 })
266 response = app.post(
266 response = app.post(
267 url('admin_settings_vcs'),
267 url('admin_settings_vcs'),
268 params=form_defaults,
268 params=form_defaults,
269 status=302)
269 status=302)
270
270
271 response = response.follow()
271 response = response.follow()
272 extensions_input = (
272 extensions_input = (
273 '<input id="extensions_hgsubversion" '
273 '<input id="extensions_hgsubversion" '
274 'name="extensions_hgsubversion" type="checkbox" '
274 'name="extensions_hgsubversion" type="checkbox" '
275 'value="True" checked="checked" />')
275 'value="True" checked="checked" />')
276 response.mustcontain(extensions_input)
276 response.mustcontain(extensions_input)
277
277
278 def test_extensions_hgevolve(self, app, form_defaults, csrf_token):
278 def test_extensions_hgevolve(self, app, form_defaults, csrf_token):
279 form_defaults.update({
279 form_defaults.update({
280 'csrf_token': csrf_token,
280 'csrf_token': csrf_token,
281 'extensions_evolve': 'True',
281 'extensions_evolve': 'True',
282 })
282 })
283 response = app.post(
283 response = app.post(
284 url('admin_settings_vcs'),
284 url('admin_settings_vcs'),
285 params=form_defaults,
285 params=form_defaults,
286 status=302)
286 status=302)
287
287
288 response = response.follow()
288 response = response.follow()
289 extensions_input = (
289 extensions_input = (
290 '<input id="extensions_evolve" '
290 '<input id="extensions_evolve" '
291 'name="extensions_evolve" type="checkbox" '
291 'name="extensions_evolve" type="checkbox" '
292 'value="True" checked="checked" />')
292 'value="True" checked="checked" />')
293 response.mustcontain(extensions_input)
293 response.mustcontain(extensions_input)
294
294
295 def test_has_a_section_for_pull_request_settings(self, app):
295 def test_has_a_section_for_pull_request_settings(self, app):
296 response = app.get(url('admin_settings_vcs'))
296 response = app.get(url('admin_settings_vcs'))
297 response.mustcontain('Pull Request Settings')
297 response.mustcontain('Pull Request Settings')
298
298
299 def test_has_an_input_for_invalidation_of_inline_comments(
299 def test_has_an_input_for_invalidation_of_inline_comments(
300 self, app):
300 self, app):
301 response = app.get(url('admin_settings_vcs'))
301 response = app.get(url('admin_settings_vcs'))
302 assert_response = AssertResponse(response)
302 assert_response = AssertResponse(response)
303 assert_response.one_element_exists(
303 assert_response.one_element_exists(
304 '[name=rhodecode_use_outdated_comments]')
304 '[name=rhodecode_use_outdated_comments]')
305
305
306 @pytest.mark.parametrize('new_value', [True, False])
306 @pytest.mark.parametrize('new_value', [True, False])
307 def test_allows_to_change_invalidation_of_inline_comments(
307 def test_allows_to_change_invalidation_of_inline_comments(
308 self, app, form_defaults, csrf_token, new_value):
308 self, app, form_defaults, csrf_token, new_value):
309 setting_key = 'use_outdated_comments'
309 setting_key = 'use_outdated_comments'
310 setting = SettingsModel().create_or_update_setting(
310 setting = SettingsModel().create_or_update_setting(
311 setting_key, not new_value, 'bool')
311 setting_key, not new_value, 'bool')
312 Session().add(setting)
312 Session().add(setting)
313 Session().commit()
313 Session().commit()
314
314
315 form_defaults.update({
315 form_defaults.update({
316 'csrf_token': csrf_token,
316 'csrf_token': csrf_token,
317 'rhodecode_use_outdated_comments': str(new_value),
317 'rhodecode_use_outdated_comments': str(new_value),
318 })
318 })
319 response = app.post(
319 response = app.post(
320 url('admin_settings_vcs'),
320 url('admin_settings_vcs'),
321 params=form_defaults,
321 params=form_defaults,
322 status=302)
322 status=302)
323 response = response.follow()
323 response = response.follow()
324 setting = SettingsModel().get_setting_by_name(setting_key)
324 setting = SettingsModel().get_setting_by_name(setting_key)
325 assert setting.app_settings_value is new_value
325 assert setting.app_settings_value is new_value
326
326
327 def test_has_a_section_for_labs_settings_if_enabled(self, app):
327 def test_has_a_section_for_labs_settings_if_enabled(self, app):
328 with mock.patch.dict(
328 with mock.patch.dict(
329 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
329 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
330 response = self.app.get(url('admin_settings_vcs'))
330 response = self.app.get(url('admin_settings_vcs'))
331 response.mustcontain('Labs Settings')
331 response.mustcontain('Labs Settings')
332
332
333 def test_has_not_a_section_for_labs_settings_if_disables(self, app):
333 def test_has_not_a_section_for_labs_settings_if_disables(self, app):
334 with mock.patch.dict(
334 with mock.patch.dict(
335 rhodecode.CONFIG, {'labs_settings_active': 'false'}):
335 rhodecode.CONFIG, {'labs_settings_active': 'false'}):
336 response = self.app.get(url('admin_settings_vcs'))
336 response = self.app.get(url('admin_settings_vcs'))
337 response.mustcontain(no='Labs Settings')
337 response.mustcontain(no='Labs Settings')
338
338
339 @pytest.mark.parametrize('new_value', [True, False])
339 @pytest.mark.parametrize('new_value', [True, False])
340 def test_allows_to_change_hg_rebase_merge_strategy(
340 def test_allows_to_change_hg_rebase_merge_strategy(
341 self, app, form_defaults, csrf_token, new_value):
341 self, app, form_defaults, csrf_token, new_value):
342 setting_key = 'hg_use_rebase_for_merging'
342 setting_key = 'hg_use_rebase_for_merging'
343
343
344 form_defaults.update({
344 form_defaults.update({
345 'csrf_token': csrf_token,
345 'csrf_token': csrf_token,
346 'rhodecode_' + setting_key: str(new_value),
346 'rhodecode_' + setting_key: str(new_value),
347 })
347 })
348
348
349 with mock.patch.dict(
349 with mock.patch.dict(
350 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
350 rhodecode.CONFIG, {'labs_settings_active': 'true'}):
351 app.post(
351 app.post(
352 url('admin_settings_vcs'),
352 url('admin_settings_vcs'),
353 params=form_defaults,
353 params=form_defaults,
354 status=302)
354 status=302)
355
355
356 setting = SettingsModel().get_setting_by_name(setting_key)
356 setting = SettingsModel().get_setting_by_name(setting_key)
357 assert setting.app_settings_value is new_value
357 assert setting.app_settings_value is new_value
358
358
359 @pytest.fixture
359 @pytest.fixture
360 def disable_sql_cache(self, request):
360 def disable_sql_cache(self, request):
361 patcher = mock.patch(
361 patcher = mock.patch(
362 'rhodecode.lib.caching_query.FromCache.process_query')
362 'rhodecode.lib.caching_query.FromCache.process_query')
363 request.addfinalizer(patcher.stop)
363 request.addfinalizer(patcher.stop)
364 patcher.start()
364 patcher.start()
365
365
366 @pytest.fixture
366 @pytest.fixture
367 def form_defaults(self):
367 def form_defaults(self):
368 from rhodecode.controllers.admin.settings import SettingsController
368 from rhodecode.controllers.admin.settings import SettingsController
369 controller = SettingsController()
369 controller = SettingsController()
370 return controller._form_defaults()
370 return controller._form_defaults()
371
371
372 # TODO: johbo: What we really want is to checkpoint before a test run and
372 # TODO: johbo: What we really want is to checkpoint before a test run and
373 # reset the session afterwards.
373 # reset the session afterwards.
374 @pytest.fixture(scope='class', autouse=True)
374 @pytest.fixture(scope='class', autouse=True)
375 def cleanup_settings(self, request, pylonsapp):
375 def cleanup_settings(self, request, pylonsapp):
376 ui_id = RhodeCodeUi.ui_id
376 ui_id = RhodeCodeUi.ui_id
377 original_ids = list(
377 original_ids = list(
378 r.ui_id for r in RhodeCodeUi.query().values(ui_id))
378 r.ui_id for r in RhodeCodeUi.query().values(ui_id))
379
379
380 @request.addfinalizer
380 @request.addfinalizer
381 def cleanup():
381 def cleanup():
382 RhodeCodeUi.query().filter(
382 RhodeCodeUi.query().filter(
383 ui_id.notin_(original_ids)).delete(False)
383 ui_id.notin_(original_ids)).delete(False)
384
384
385
385
386 @pytest.mark.usefixtures('autologin_user', 'app')
386 @pytest.mark.usefixtures('autologin_user', 'app')
387 class TestLabsSettings(object):
387 class TestLabsSettings(object):
388 def test_get_settings_page_disabled(self):
388 def test_get_settings_page_disabled(self):
389 with mock.patch.dict(rhodecode.CONFIG,
389 with mock.patch.dict(rhodecode.CONFIG,
390 {'labs_settings_active': 'false'}):
390 {'labs_settings_active': 'false'}):
391 response = self.app.get(url('admin_settings_labs'), status=302)
391 response = self.app.get(url('admin_settings_labs'), status=302)
392
392
393 assert response.location.endswith(url('admin_settings'))
393 assert response.location.endswith(url('admin_settings'))
394
394
395 def test_get_settings_page_enabled(self):
395 def test_get_settings_page_enabled(self):
396 from rhodecode.controllers.admin import settings
396 from rhodecode.controllers.admin import settings
397 lab_settings = [
397 lab_settings = [
398 settings.LabSetting(
398 settings.LabSetting(
399 key='rhodecode_bool',
399 key='rhodecode_bool',
400 type='bool',
400 type='bool',
401 group='bool group',
401 group='bool group',
402 label='bool label',
402 label='bool label',
403 help='bool help'
403 help='bool help'
404 ),
404 ),
405 settings.LabSetting(
405 settings.LabSetting(
406 key='rhodecode_text',
406 key='rhodecode_text',
407 type='unicode',
407 type='unicode',
408 group='text group',
408 group='text group',
409 label='text label',
409 label='text label',
410 help='text help'
410 help='text help'
411 ),
411 ),
412 ]
412 ]
413 with mock.patch.dict(rhodecode.CONFIG,
413 with mock.patch.dict(rhodecode.CONFIG,
414 {'labs_settings_active': 'true'}):
414 {'labs_settings_active': 'true'}):
415 with mock.patch.object(settings, '_LAB_SETTINGS', lab_settings):
415 with mock.patch.object(settings, '_LAB_SETTINGS', lab_settings):
416 response = self.app.get(url('admin_settings_labs'))
416 response = self.app.get(url('admin_settings_labs'))
417
417
418 assert '<label>bool group:</label>' in response
418 assert '<label>bool group:</label>' in response
419 assert '<label for="rhodecode_bool">bool label</label>' in response
419 assert '<label for="rhodecode_bool">bool label</label>' in response
420 assert '<p class="help-block">bool help</p>' in response
420 assert '<p class="help-block">bool help</p>' in response
421 assert 'name="rhodecode_bool" type="checkbox"' in response
421 assert 'name="rhodecode_bool" type="checkbox"' in response
422
422
423 assert '<label>text group:</label>' in response
423 assert '<label>text group:</label>' in response
424 assert '<label for="rhodecode_text">text label</label>' in response
424 assert '<label for="rhodecode_text">text label</label>' in response
425 assert '<p class="help-block">text help</p>' in response
425 assert '<p class="help-block">text help</p>' in response
426 assert 'name="rhodecode_text" size="60" type="text"' in response
426 assert 'name="rhodecode_text" size="60" type="text"' in response
427
427
428
428
429 @pytest.mark.usefixtures('app')
429 @pytest.mark.usefixtures('app')
430 class TestOpenSourceLicenses(object):
430 class TestOpenSourceLicenses(object):
431
431
432 def _get_url(self):
432 def _get_url(self):
433 return ADMIN_PREFIX + '/settings/open_source'
433 return ADMIN_PREFIX + '/settings/open_source'
434
434
435 def test_records_are_displayed(self, autologin_user):
435 def test_records_are_displayed(self, autologin_user):
436 sample_licenses = {
436 sample_licenses = {
437 "python2.7-pytest-2.7.1": {
437 "python2.7-pytest-2.7.1": {
438 "UNKNOWN": None
438 "UNKNOWN": None
439 },
439 },
440 "python2.7-Markdown-2.6.2": {
440 "python2.7-Markdown-2.6.2": {
441 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
441 "BSD-3-Clause": "http://spdx.org/licenses/BSD-3-Clause"
442 }
442 }
443 }
443 }
444 read_licenses_patch = mock.patch(
444 read_licenses_patch = mock.patch(
445 'rhodecode.apps.admin.views.open_source_licenses.read_opensource_licenses',
445 'rhodecode.apps.admin.views.open_source_licenses.read_opensource_licenses',
446 return_value=sample_licenses)
446 return_value=sample_licenses)
447 with read_licenses_patch:
447 with read_licenses_patch:
448 response = self.app.get(self._get_url(), status=200)
448 response = self.app.get(self._get_url(), status=200)
449
449
450 assert_response = AssertResponse(response)
450 assert_response = AssertResponse(response)
451 assert_response.element_contains(
451 assert_response.element_contains(
452 '.panel-heading', 'Licenses of Third Party Packages')
452 '.panel-heading', 'Licenses of Third Party Packages')
453 for name in sample_licenses:
453 for name in sample_licenses:
454 response.mustcontain(name)
454 response.mustcontain(name)
455 for license in sample_licenses[name]:
455 for license in sample_licenses[name]:
456 assert_response.element_contains('.panel-body', license)
456 assert_response.element_contains('.panel-body', license)
457
457
458 def test_records_can_be_read(self, autologin_user):
458 def test_records_can_be_read(self, autologin_user):
459 response = self.app.get(self._get_url(), status=200)
459 response = self.app.get(self._get_url(), status=200)
460 assert_response = AssertResponse(response)
460 assert_response = AssertResponse(response)
461 assert_response.element_contains(
461 assert_response.element_contains(
462 '.panel-heading', 'Licenses of Third Party Packages')
462 '.panel-heading', 'Licenses of Third Party Packages')
463
463
464 def test_forbidden_when_normal_user(self, autologin_regular_user):
464 def test_forbidden_when_normal_user(self, autologin_regular_user):
465 self.app.get(self._get_url(), status=403)
465 self.app.get(self._get_url(), status=404)
466
466
467
467
468 @pytest.mark.usefixtures('app')
468 @pytest.mark.usefixtures('app')
469 class TestUserSessions(object):
469 class TestUserSessions(object):
470
470
471 def _get_url(self, name='admin_settings_sessions'):
471 def _get_url(self, name='admin_settings_sessions'):
472 return {
472 return {
473 'admin_settings_sessions': ADMIN_PREFIX + '/settings/sessions',
473 'admin_settings_sessions': ADMIN_PREFIX + '/settings/sessions',
474 'admin_settings_sessions_cleanup': ADMIN_PREFIX + '/settings/sessions/cleanup'
474 'admin_settings_sessions_cleanup': ADMIN_PREFIX + '/settings/sessions/cleanup'
475 }[name]
475 }[name]
476
476
477 def test_forbidden_when_normal_user(self, autologin_regular_user):
477 def test_forbidden_when_normal_user(self, autologin_regular_user):
478 self.app.get(self._get_url(), status=403)
478 self.app.get(self._get_url(), status=404)
479
479
480 def test_show_sessions_page(self, autologin_user):
480 def test_show_sessions_page(self, autologin_user):
481 response = self.app.get(self._get_url(), status=200)
481 response = self.app.get(self._get_url(), status=200)
482 response.mustcontain('file')
482 response.mustcontain('file')
483
483
484 def test_cleanup_old_sessions(self, autologin_user, csrf_token):
484 def test_cleanup_old_sessions(self, autologin_user, csrf_token):
485
485
486 post_data = {
486 post_data = {
487 'csrf_token': csrf_token,
487 'csrf_token': csrf_token,
488 'expire_days': '60'
488 'expire_days': '60'
489 }
489 }
490 response = self.app.post(
490 response = self.app.post(
491 self._get_url('admin_settings_sessions_cleanup'), params=post_data,
491 self._get_url('admin_settings_sessions_cleanup'), params=post_data,
492 status=302)
492 status=302)
493 assert_session_flash(response, 'Cleaned up old sessions')
493 assert_session_flash(response, 'Cleaned up old sessions')
494
494
495
495
496 @pytest.mark.usefixtures('app')
496 @pytest.mark.usefixtures('app')
497 class TestAdminSystemInfo(object):
497 class TestAdminSystemInfo(object):
498 def _get_url(self, name='admin_settings_system'):
498 def _get_url(self, name='admin_settings_system'):
499 return {
499 return {
500 'admin_settings_system': ADMIN_PREFIX + '/settings/system',
500 'admin_settings_system': ADMIN_PREFIX + '/settings/system',
501 'admin_settings_system_update': ADMIN_PREFIX + '/settings/system/updates',
501 'admin_settings_system_update': ADMIN_PREFIX + '/settings/system/updates',
502 }[name]
502 }[name]
503
503
504 def test_forbidden_when_normal_user(self, autologin_regular_user):
504 def test_forbidden_when_normal_user(self, autologin_regular_user):
505 self.app.get(self._get_url(), status=403)
505 self.app.get(self._get_url(), status=404)
506
506
507 def test_system_info_page(self, autologin_user):
507 def test_system_info_page(self, autologin_user):
508 response = self.app.get(self._get_url())
508 response = self.app.get(self._get_url())
509 response.mustcontain('RhodeCode Community Edition, version {}'.format(
509 response.mustcontain('RhodeCode Community Edition, version {}'.format(
510 rhodecode.__version__))
510 rhodecode.__version__))
511
511
512 def test_system_update_new_version(self, autologin_user):
512 def test_system_update_new_version(self, autologin_user):
513 update_data = {
513 update_data = {
514 'versions': [
514 'versions': [
515 {
515 {
516 'version': '100.3.1415926535',
516 'version': '100.3.1415926535',
517 'general': 'The latest version we are ever going to ship'
517 'general': 'The latest version we are ever going to ship'
518 },
518 },
519 {
519 {
520 'version': '0.0.0',
520 'version': '0.0.0',
521 'general': 'The first version we ever shipped'
521 'general': 'The first version we ever shipped'
522 }
522 }
523 ]
523 ]
524 }
524 }
525 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
525 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
526 response = self.app.get(self._get_url('admin_settings_system_update'))
526 response = self.app.get(self._get_url('admin_settings_system_update'))
527 response.mustcontain('A <b>new version</b> is available')
527 response.mustcontain('A <b>new version</b> is available')
528
528
529 def test_system_update_nothing_new(self, autologin_user):
529 def test_system_update_nothing_new(self, autologin_user):
530 update_data = {
530 update_data = {
531 'versions': [
531 'versions': [
532 {
532 {
533 'version': '0.0.0',
533 'version': '0.0.0',
534 'general': 'The first version we ever shipped'
534 'general': 'The first version we ever shipped'
535 }
535 }
536 ]
536 ]
537 }
537 }
538 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
538 with mock.patch(UPDATE_DATA_QUALNAME, return_value=update_data):
539 response = self.app.get(self._get_url('admin_settings_system_update'))
539 response = self.app.get(self._get_url('admin_settings_system_update'))
540 response.mustcontain(
540 response.mustcontain(
541 'You already have the <b>latest</b> stable version.')
541 'You already have the <b>latest</b> stable version.')
542
542
543 def test_system_update_bad_response(self, autologin_user):
543 def test_system_update_bad_response(self, autologin_user):
544 with mock.patch(UPDATE_DATA_QUALNAME, side_effect=ValueError('foo')):
544 with mock.patch(UPDATE_DATA_QUALNAME, side_effect=ValueError('foo')):
545 response = self.app.get(self._get_url('admin_settings_system_update'))
545 response = self.app.get(self._get_url('admin_settings_system_update'))
546 response.mustcontain(
546 response.mustcontain(
547 'Bad data sent from update server')
547 'Bad data sent from update server')
548
548
549
549
550 @pytest.mark.usefixtures("app")
550 @pytest.mark.usefixtures("app")
551 class TestAdminSettingsIssueTracker(object):
551 class TestAdminSettingsIssueTracker(object):
552 RC_PREFIX = 'rhodecode_'
552 RC_PREFIX = 'rhodecode_'
553 SHORT_PATTERN_KEY = 'issuetracker_pat_'
553 SHORT_PATTERN_KEY = 'issuetracker_pat_'
554 PATTERN_KEY = RC_PREFIX + SHORT_PATTERN_KEY
554 PATTERN_KEY = RC_PREFIX + SHORT_PATTERN_KEY
555
555
556 def test_issuetracker_index(self, autologin_user):
556 def test_issuetracker_index(self, autologin_user):
557 response = self.app.get(url('admin_settings_issuetracker'))
557 response = self.app.get(url('admin_settings_issuetracker'))
558 assert response.status_code == 200
558 assert response.status_code == 200
559
559
560 def test_add_empty_issuetracker_pattern(
560 def test_add_empty_issuetracker_pattern(
561 self, request, autologin_user, csrf_token):
561 self, request, autologin_user, csrf_token):
562 post_url = url('admin_settings_issuetracker_save')
562 post_url = url('admin_settings_issuetracker_save')
563 post_data = {
563 post_data = {
564 'csrf_token': csrf_token
564 'csrf_token': csrf_token
565 }
565 }
566 self.app.post(post_url, post_data, status=302)
566 self.app.post(post_url, post_data, status=302)
567
567
568 def test_add_issuetracker_pattern(
568 def test_add_issuetracker_pattern(
569 self, request, autologin_user, csrf_token):
569 self, request, autologin_user, csrf_token):
570 pattern = 'issuetracker_pat'
570 pattern = 'issuetracker_pat'
571 another_pattern = pattern+'1'
571 another_pattern = pattern+'1'
572 post_url = url('admin_settings_issuetracker_save')
572 post_url = url('admin_settings_issuetracker_save')
573 post_data = {
573 post_data = {
574 'new_pattern_pattern_0': pattern,
574 'new_pattern_pattern_0': pattern,
575 'new_pattern_url_0': 'url',
575 'new_pattern_url_0': 'url',
576 'new_pattern_prefix_0': 'prefix',
576 'new_pattern_prefix_0': 'prefix',
577 'new_pattern_description_0': 'description',
577 'new_pattern_description_0': 'description',
578 'new_pattern_pattern_1': another_pattern,
578 'new_pattern_pattern_1': another_pattern,
579 'new_pattern_url_1': 'url1',
579 'new_pattern_url_1': 'url1',
580 'new_pattern_prefix_1': 'prefix1',
580 'new_pattern_prefix_1': 'prefix1',
581 'new_pattern_description_1': 'description1',
581 'new_pattern_description_1': 'description1',
582 'csrf_token': csrf_token
582 'csrf_token': csrf_token
583 }
583 }
584 self.app.post(post_url, post_data, status=302)
584 self.app.post(post_url, post_data, status=302)
585 settings = SettingsModel().get_all_settings()
585 settings = SettingsModel().get_all_settings()
586 self.uid = md5(pattern)
586 self.uid = md5(pattern)
587 assert settings[self.PATTERN_KEY+self.uid] == pattern
587 assert settings[self.PATTERN_KEY+self.uid] == pattern
588 self.another_uid = md5(another_pattern)
588 self.another_uid = md5(another_pattern)
589 assert settings[self.PATTERN_KEY+self.another_uid] == another_pattern
589 assert settings[self.PATTERN_KEY+self.another_uid] == another_pattern
590
590
591 @request.addfinalizer
591 @request.addfinalizer
592 def cleanup():
592 def cleanup():
593 defaults = SettingsModel().get_all_settings()
593 defaults = SettingsModel().get_all_settings()
594
594
595 entries = [name for name in defaults if (
595 entries = [name for name in defaults if (
596 (self.uid in name) or (self.another_uid) in name)]
596 (self.uid in name) or (self.another_uid) in name)]
597 start = len(self.RC_PREFIX)
597 start = len(self.RC_PREFIX)
598 for del_key in entries:
598 for del_key in entries:
599 # TODO: anderson: get_by_name needs name without prefix
599 # TODO: anderson: get_by_name needs name without prefix
600 entry = SettingsModel().get_setting_by_name(del_key[start:])
600 entry = SettingsModel().get_setting_by_name(del_key[start:])
601 Session().delete(entry)
601 Session().delete(entry)
602
602
603 Session().commit()
603 Session().commit()
604
604
605 def test_edit_issuetracker_pattern(
605 def test_edit_issuetracker_pattern(
606 self, autologin_user, backend, csrf_token, request):
606 self, autologin_user, backend, csrf_token, request):
607 old_pattern = 'issuetracker_pat'
607 old_pattern = 'issuetracker_pat'
608 old_uid = md5(old_pattern)
608 old_uid = md5(old_pattern)
609 pattern = 'issuetracker_pat_new'
609 pattern = 'issuetracker_pat_new'
610 self.new_uid = md5(pattern)
610 self.new_uid = md5(pattern)
611
611
612 SettingsModel().create_or_update_setting(
612 SettingsModel().create_or_update_setting(
613 self.SHORT_PATTERN_KEY+old_uid, old_pattern, 'unicode')
613 self.SHORT_PATTERN_KEY+old_uid, old_pattern, 'unicode')
614
614
615 post_url = url('admin_settings_issuetracker_save')
615 post_url = url('admin_settings_issuetracker_save')
616 post_data = {
616 post_data = {
617 'new_pattern_pattern_0': pattern,
617 'new_pattern_pattern_0': pattern,
618 'new_pattern_url_0': 'url',
618 'new_pattern_url_0': 'url',
619 'new_pattern_prefix_0': 'prefix',
619 'new_pattern_prefix_0': 'prefix',
620 'new_pattern_description_0': 'description',
620 'new_pattern_description_0': 'description',
621 'uid': old_uid,
621 'uid': old_uid,
622 'csrf_token': csrf_token
622 'csrf_token': csrf_token
623 }
623 }
624 self.app.post(post_url, post_data, status=302)
624 self.app.post(post_url, post_data, status=302)
625 settings = SettingsModel().get_all_settings()
625 settings = SettingsModel().get_all_settings()
626 assert settings[self.PATTERN_KEY+self.new_uid] == pattern
626 assert settings[self.PATTERN_KEY+self.new_uid] == pattern
627 assert self.PATTERN_KEY+old_uid not in settings
627 assert self.PATTERN_KEY+old_uid not in settings
628
628
629 @request.addfinalizer
629 @request.addfinalizer
630 def cleanup():
630 def cleanup():
631 IssueTrackerSettingsModel().delete_entries(self.new_uid)
631 IssueTrackerSettingsModel().delete_entries(self.new_uid)
632
632
633 def test_replace_issuetracker_pattern_description(
633 def test_replace_issuetracker_pattern_description(
634 self, autologin_user, csrf_token, request, settings_util):
634 self, autologin_user, csrf_token, request, settings_util):
635 prefix = 'issuetracker'
635 prefix = 'issuetracker'
636 pattern = 'issuetracker_pat'
636 pattern = 'issuetracker_pat'
637 self.uid = md5(pattern)
637 self.uid = md5(pattern)
638 pattern_key = '_'.join([prefix, 'pat', self.uid])
638 pattern_key = '_'.join([prefix, 'pat', self.uid])
639 rc_pattern_key = '_'.join(['rhodecode', pattern_key])
639 rc_pattern_key = '_'.join(['rhodecode', pattern_key])
640 desc_key = '_'.join([prefix, 'desc', self.uid])
640 desc_key = '_'.join([prefix, 'desc', self.uid])
641 rc_desc_key = '_'.join(['rhodecode', desc_key])
641 rc_desc_key = '_'.join(['rhodecode', desc_key])
642 new_description = 'new_description'
642 new_description = 'new_description'
643
643
644 settings_util.create_rhodecode_setting(
644 settings_util.create_rhodecode_setting(
645 pattern_key, pattern, 'unicode', cleanup=False)
645 pattern_key, pattern, 'unicode', cleanup=False)
646 settings_util.create_rhodecode_setting(
646 settings_util.create_rhodecode_setting(
647 desc_key, 'old description', 'unicode', cleanup=False)
647 desc_key, 'old description', 'unicode', cleanup=False)
648
648
649 post_url = url('admin_settings_issuetracker_save')
649 post_url = url('admin_settings_issuetracker_save')
650 post_data = {
650 post_data = {
651 'new_pattern_pattern_0': pattern,
651 'new_pattern_pattern_0': pattern,
652 'new_pattern_url_0': 'url',
652 'new_pattern_url_0': 'url',
653 'new_pattern_prefix_0': 'prefix',
653 'new_pattern_prefix_0': 'prefix',
654 'new_pattern_description_0': new_description,
654 'new_pattern_description_0': new_description,
655 'uid': self.uid,
655 'uid': self.uid,
656 'csrf_token': csrf_token
656 'csrf_token': csrf_token
657 }
657 }
658 self.app.post(post_url, post_data, status=302)
658 self.app.post(post_url, post_data, status=302)
659 settings = SettingsModel().get_all_settings()
659 settings = SettingsModel().get_all_settings()
660 assert settings[rc_pattern_key] == pattern
660 assert settings[rc_pattern_key] == pattern
661 assert settings[rc_desc_key] == new_description
661 assert settings[rc_desc_key] == new_description
662
662
663 @request.addfinalizer
663 @request.addfinalizer
664 def cleanup():
664 def cleanup():
665 IssueTrackerSettingsModel().delete_entries(self.uid)
665 IssueTrackerSettingsModel().delete_entries(self.uid)
666
666
667 def test_delete_issuetracker_pattern(
667 def test_delete_issuetracker_pattern(
668 self, autologin_user, backend, csrf_token, settings_util):
668 self, autologin_user, backend, csrf_token, settings_util):
669 pattern = 'issuetracker_pat'
669 pattern = 'issuetracker_pat'
670 uid = md5(pattern)
670 uid = md5(pattern)
671 settings_util.create_rhodecode_setting(
671 settings_util.create_rhodecode_setting(
672 self.SHORT_PATTERN_KEY+uid, pattern, 'unicode', cleanup=False)
672 self.SHORT_PATTERN_KEY+uid, pattern, 'unicode', cleanup=False)
673
673
674 post_url = url('admin_issuetracker_delete')
674 post_url = url('admin_issuetracker_delete')
675 post_data = {
675 post_data = {
676 '_method': 'delete',
676 '_method': 'delete',
677 'uid': uid,
677 'uid': uid,
678 'csrf_token': csrf_token
678 'csrf_token': csrf_token
679 }
679 }
680 self.app.post(post_url, post_data, status=302)
680 self.app.post(post_url, post_data, status=302)
681 settings = SettingsModel().get_all_settings()
681 settings = SettingsModel().get_all_settings()
682 assert 'rhodecode_%s%s' % (self.SHORT_PATTERN_KEY, uid) not in settings
682 assert 'rhodecode_%s%s' % (self.SHORT_PATTERN_KEY, uid) not in settings
@@ -1,273 +1,273 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import pytest
21 import pytest
22
22
23 from rhodecode.tests import *
23 from rhodecode.tests import *
24 from rhodecode.tests.fixture import Fixture
24 from rhodecode.tests.fixture import Fixture
25 from rhodecode.lib import helpers as h
25 from rhodecode.lib import helpers as h
26
26
27 from rhodecode.model.db import Repository
27 from rhodecode.model.db import Repository
28 from rhodecode.model.repo import RepoModel
28 from rhodecode.model.repo import RepoModel
29 from rhodecode.model.user import UserModel
29 from rhodecode.model.user import UserModel
30 from rhodecode.model.meta import Session
30 from rhodecode.model.meta import Session
31
31
32 fixture = Fixture()
32 fixture = Fixture()
33
33
34
34
35 class _BaseTest(TestController):
35 class _BaseTest(TestController):
36
36
37 REPO = None
37 REPO = None
38 REPO_TYPE = None
38 REPO_TYPE = None
39 NEW_REPO = None
39 NEW_REPO = None
40 REPO_FORK = None
40 REPO_FORK = None
41
41
42 @pytest.fixture(autouse=True)
42 @pytest.fixture(autouse=True)
43 def prepare(self, request, pylonsapp):
43 def prepare(self, request, pylonsapp):
44 self.username = u'forkuser'
44 self.username = u'forkuser'
45 self.password = u'qweqwe'
45 self.password = u'qweqwe'
46 self.u1 = fixture.create_user(self.username, password=self.password,
46 self.u1 = fixture.create_user(self.username, password=self.password,
47 email=u'fork_king@rhodecode.org')
47 email=u'fork_king@rhodecode.org')
48 Session().commit()
48 Session().commit()
49 self.u1id = self.u1.user_id
49 self.u1id = self.u1.user_id
50 request.addfinalizer(self.cleanup)
50 request.addfinalizer(self.cleanup)
51
51
52 def cleanup(self):
52 def cleanup(self):
53 u1 = UserModel().get(self.u1id)
53 u1 = UserModel().get(self.u1id)
54 Session().delete(u1)
54 Session().delete(u1)
55 Session().commit()
55 Session().commit()
56
56
57 def test_index(self):
57 def test_index(self):
58 self.log_user()
58 self.log_user()
59 repo_name = self.REPO
59 repo_name = self.REPO
60 response = self.app.get(url(controller='forks', action='forks',
60 response = self.app.get(url(controller='forks', action='forks',
61 repo_name=repo_name))
61 repo_name=repo_name))
62
62
63 response.mustcontain("""There are no forks yet""")
63 response.mustcontain("""There are no forks yet""")
64
64
65 def test_no_permissions_to_fork(self):
65 def test_no_permissions_to_fork(self):
66 usr = self.log_user(TEST_USER_REGULAR_LOGIN,
66 usr = self.log_user(TEST_USER_REGULAR_LOGIN,
67 TEST_USER_REGULAR_PASS)['user_id']
67 TEST_USER_REGULAR_PASS)['user_id']
68 user_model = UserModel()
68 user_model = UserModel()
69 user_model.revoke_perm(usr, 'hg.fork.repository')
69 user_model.revoke_perm(usr, 'hg.fork.repository')
70 user_model.grant_perm(usr, 'hg.fork.none')
70 user_model.grant_perm(usr, 'hg.fork.none')
71 u = UserModel().get(usr)
71 u = UserModel().get(usr)
72 u.inherit_default_permissions = False
72 u.inherit_default_permissions = False
73 Session().commit()
73 Session().commit()
74 # try create a fork
74 # try create a fork
75 repo_name = self.REPO
75 repo_name = self.REPO
76 self.app.post(
76 self.app.post(
77 url(controller='forks', action='fork_create', repo_name=repo_name),
77 url(controller='forks', action='fork_create', repo_name=repo_name),
78 {'csrf_token': self.csrf_token}, status=403)
78 {'csrf_token': self.csrf_token}, status=404)
79
79
80 def test_index_with_fork(self):
80 def test_index_with_fork(self):
81 self.log_user()
81 self.log_user()
82
82
83 # create a fork
83 # create a fork
84 fork_name = self.REPO_FORK
84 fork_name = self.REPO_FORK
85 description = 'fork of vcs test'
85 description = 'fork of vcs test'
86 repo_name = self.REPO
86 repo_name = self.REPO
87 source_repo = Repository.get_by_repo_name(repo_name)
87 source_repo = Repository.get_by_repo_name(repo_name)
88 creation_args = {
88 creation_args = {
89 'repo_name': fork_name,
89 'repo_name': fork_name,
90 'repo_group': '',
90 'repo_group': '',
91 'fork_parent_id': source_repo.repo_id,
91 'fork_parent_id': source_repo.repo_id,
92 'repo_type': self.REPO_TYPE,
92 'repo_type': self.REPO_TYPE,
93 'description': description,
93 'description': description,
94 'private': 'False',
94 'private': 'False',
95 'landing_rev': 'rev:tip',
95 'landing_rev': 'rev:tip',
96 'csrf_token': self.csrf_token,
96 'csrf_token': self.csrf_token,
97 }
97 }
98
98
99 self.app.post(url(controller='forks', action='fork_create',
99 self.app.post(url(controller='forks', action='fork_create',
100 repo_name=repo_name), creation_args)
100 repo_name=repo_name), creation_args)
101
101
102 response = self.app.get(url(controller='forks', action='forks',
102 response = self.app.get(url(controller='forks', action='forks',
103 repo_name=repo_name))
103 repo_name=repo_name))
104
104
105 response.mustcontain(
105 response.mustcontain(
106 """<a href="/%s">%s</a>""" % (fork_name, fork_name)
106 """<a href="/%s">%s</a>""" % (fork_name, fork_name)
107 )
107 )
108
108
109 # remove this fork
109 # remove this fork
110 fixture.destroy_repo(fork_name)
110 fixture.destroy_repo(fork_name)
111
111
112 def test_fork_create_into_group(self):
112 def test_fork_create_into_group(self):
113 self.log_user()
113 self.log_user()
114 group = fixture.create_repo_group('vc')
114 group = fixture.create_repo_group('vc')
115 group_id = group.group_id
115 group_id = group.group_id
116 fork_name = self.REPO_FORK
116 fork_name = self.REPO_FORK
117 fork_name_full = 'vc/%s' % fork_name
117 fork_name_full = 'vc/%s' % fork_name
118 description = 'fork of vcs test'
118 description = 'fork of vcs test'
119 repo_name = self.REPO
119 repo_name = self.REPO
120 source_repo = Repository.get_by_repo_name(repo_name)
120 source_repo = Repository.get_by_repo_name(repo_name)
121 creation_args = {
121 creation_args = {
122 'repo_name': fork_name,
122 'repo_name': fork_name,
123 'repo_group': group_id,
123 'repo_group': group_id,
124 'fork_parent_id': source_repo.repo_id,
124 'fork_parent_id': source_repo.repo_id,
125 'repo_type': self.REPO_TYPE,
125 'repo_type': self.REPO_TYPE,
126 'description': description,
126 'description': description,
127 'private': 'False',
127 'private': 'False',
128 'landing_rev': 'rev:tip',
128 'landing_rev': 'rev:tip',
129 'csrf_token': self.csrf_token,
129 'csrf_token': self.csrf_token,
130 }
130 }
131 self.app.post(url(controller='forks', action='fork_create',
131 self.app.post(url(controller='forks', action='fork_create',
132 repo_name=repo_name), creation_args)
132 repo_name=repo_name), creation_args)
133 repo = Repository.get_by_repo_name(fork_name_full)
133 repo = Repository.get_by_repo_name(fork_name_full)
134 assert repo.fork.repo_name == self.REPO
134 assert repo.fork.repo_name == self.REPO
135
135
136 # run the check page that triggers the flash message
136 # run the check page that triggers the flash message
137 response = self.app.get(url('repo_check_home', repo_name=fork_name_full))
137 response = self.app.get(url('repo_check_home', repo_name=fork_name_full))
138 # test if we have a message that fork is ok
138 # test if we have a message that fork is ok
139 assert_session_flash(response,
139 assert_session_flash(response,
140 'Forked repository %s as <a href="/%s">%s</a>'
140 'Forked repository %s as <a href="/%s">%s</a>'
141 % (repo_name, fork_name_full, fork_name_full))
141 % (repo_name, fork_name_full, fork_name_full))
142
142
143 # test if the fork was created in the database
143 # test if the fork was created in the database
144 fork_repo = Session().query(Repository)\
144 fork_repo = Session().query(Repository)\
145 .filter(Repository.repo_name == fork_name_full).one()
145 .filter(Repository.repo_name == fork_name_full).one()
146
146
147 assert fork_repo.repo_name == fork_name_full
147 assert fork_repo.repo_name == fork_name_full
148 assert fork_repo.fork.repo_name == repo_name
148 assert fork_repo.fork.repo_name == repo_name
149
149
150 # test if the repository is visible in the list ?
150 # test if the repository is visible in the list ?
151 response = self.app.get(h.route_path('repo_summary', repo_name=fork_name_full))
151 response = self.app.get(h.route_path('repo_summary', repo_name=fork_name_full))
152 response.mustcontain(fork_name_full)
152 response.mustcontain(fork_name_full)
153 response.mustcontain(self.REPO_TYPE)
153 response.mustcontain(self.REPO_TYPE)
154
154
155 response.mustcontain('Fork of')
155 response.mustcontain('Fork of')
156 response.mustcontain('<a href="/%s">%s</a>' % (repo_name, repo_name))
156 response.mustcontain('<a href="/%s">%s</a>' % (repo_name, repo_name))
157
157
158 fixture.destroy_repo(fork_name_full)
158 fixture.destroy_repo(fork_name_full)
159 fixture.destroy_repo_group(group_id)
159 fixture.destroy_repo_group(group_id)
160
160
161 def test_z_fork_create(self):
161 def test_z_fork_create(self):
162 self.log_user()
162 self.log_user()
163 fork_name = self.REPO_FORK
163 fork_name = self.REPO_FORK
164 description = 'fork of vcs test'
164 description = 'fork of vcs test'
165 repo_name = self.REPO
165 repo_name = self.REPO
166 source_repo = Repository.get_by_repo_name(repo_name)
166 source_repo = Repository.get_by_repo_name(repo_name)
167 creation_args = {
167 creation_args = {
168 'repo_name': fork_name,
168 'repo_name': fork_name,
169 'repo_group': '',
169 'repo_group': '',
170 'fork_parent_id': source_repo.repo_id,
170 'fork_parent_id': source_repo.repo_id,
171 'repo_type': self.REPO_TYPE,
171 'repo_type': self.REPO_TYPE,
172 'description': description,
172 'description': description,
173 'private': 'False',
173 'private': 'False',
174 'landing_rev': 'rev:tip',
174 'landing_rev': 'rev:tip',
175 'csrf_token': self.csrf_token,
175 'csrf_token': self.csrf_token,
176 }
176 }
177 self.app.post(url(controller='forks', action='fork_create',
177 self.app.post(url(controller='forks', action='fork_create',
178 repo_name=repo_name), creation_args)
178 repo_name=repo_name), creation_args)
179 repo = Repository.get_by_repo_name(self.REPO_FORK)
179 repo = Repository.get_by_repo_name(self.REPO_FORK)
180 assert repo.fork.repo_name == self.REPO
180 assert repo.fork.repo_name == self.REPO
181
181
182 # run the check page that triggers the flash message
182 # run the check page that triggers the flash message
183 response = self.app.get(url('repo_check_home', repo_name=fork_name))
183 response = self.app.get(url('repo_check_home', repo_name=fork_name))
184 # test if we have a message that fork is ok
184 # test if we have a message that fork is ok
185 assert_session_flash(response,
185 assert_session_flash(response,
186 'Forked repository %s as <a href="/%s">%s</a>'
186 'Forked repository %s as <a href="/%s">%s</a>'
187 % (repo_name, fork_name, fork_name))
187 % (repo_name, fork_name, fork_name))
188
188
189 # test if the fork was created in the database
189 # test if the fork was created in the database
190 fork_repo = Session().query(Repository)\
190 fork_repo = Session().query(Repository)\
191 .filter(Repository.repo_name == fork_name).one()
191 .filter(Repository.repo_name == fork_name).one()
192
192
193 assert fork_repo.repo_name == fork_name
193 assert fork_repo.repo_name == fork_name
194 assert fork_repo.fork.repo_name == repo_name
194 assert fork_repo.fork.repo_name == repo_name
195
195
196 # test if the repository is visible in the list ?
196 # test if the repository is visible in the list ?
197 response = self.app.get(h.route_path('repo_summary', repo_name=fork_name))
197 response = self.app.get(h.route_path('repo_summary', repo_name=fork_name))
198 response.mustcontain(fork_name)
198 response.mustcontain(fork_name)
199 response.mustcontain(self.REPO_TYPE)
199 response.mustcontain(self.REPO_TYPE)
200 response.mustcontain('Fork of')
200 response.mustcontain('Fork of')
201 response.mustcontain('<a href="/%s">%s</a>' % (repo_name, repo_name))
201 response.mustcontain('<a href="/%s">%s</a>' % (repo_name, repo_name))
202
202
203 def test_zz_fork_permission_page(self):
203 def test_zz_fork_permission_page(self):
204 usr = self.log_user(self.username, self.password)['user_id']
204 usr = self.log_user(self.username, self.password)['user_id']
205 repo_name = self.REPO
205 repo_name = self.REPO
206
206
207 forks = Repository.query()\
207 forks = Repository.query()\
208 .filter(Repository.repo_type == self.REPO_TYPE)\
208 .filter(Repository.repo_type == self.REPO_TYPE)\
209 .filter(Repository.fork_id != None).all()
209 .filter(Repository.fork_id != None).all()
210 assert 1 == len(forks)
210 assert 1 == len(forks)
211
211
212 # set read permissions for this
212 # set read permissions for this
213 RepoModel().grant_user_permission(repo=forks[0],
213 RepoModel().grant_user_permission(repo=forks[0],
214 user=usr,
214 user=usr,
215 perm='repository.read')
215 perm='repository.read')
216 Session().commit()
216 Session().commit()
217
217
218 response = self.app.get(url(controller='forks', action='forks',
218 response = self.app.get(url(controller='forks', action='forks',
219 repo_name=repo_name))
219 repo_name=repo_name))
220
220
221 response.mustcontain('fork of vcs test')
221 response.mustcontain('fork of vcs test')
222
222
223 def test_zzz_fork_permission_page(self):
223 def test_zzz_fork_permission_page(self):
224 usr = self.log_user(self.username, self.password)['user_id']
224 usr = self.log_user(self.username, self.password)['user_id']
225 repo_name = self.REPO
225 repo_name = self.REPO
226
226
227 forks = Repository.query()\
227 forks = Repository.query()\
228 .filter(Repository.repo_type == self.REPO_TYPE)\
228 .filter(Repository.repo_type == self.REPO_TYPE)\
229 .filter(Repository.fork_id != None).all()
229 .filter(Repository.fork_id != None).all()
230 assert 1 == len(forks)
230 assert 1 == len(forks)
231
231
232 # set none
232 # set none
233 RepoModel().grant_user_permission(repo=forks[0],
233 RepoModel().grant_user_permission(repo=forks[0],
234 user=usr, perm='repository.none')
234 user=usr, perm='repository.none')
235 Session().commit()
235 Session().commit()
236 # fork shouldn't be there
236 # fork shouldn't be there
237 response = self.app.get(url(controller='forks', action='forks',
237 response = self.app.get(url(controller='forks', action='forks',
238 repo_name=repo_name))
238 repo_name=repo_name))
239 response.mustcontain('There are no forks yet')
239 response.mustcontain('There are no forks yet')
240
240
241
241
242 class TestGIT(_BaseTest):
242 class TestGIT(_BaseTest):
243 REPO = GIT_REPO
243 REPO = GIT_REPO
244 NEW_REPO = NEW_GIT_REPO
244 NEW_REPO = NEW_GIT_REPO
245 REPO_TYPE = 'git'
245 REPO_TYPE = 'git'
246 REPO_FORK = GIT_FORK
246 REPO_FORK = GIT_FORK
247
247
248
248
249 class TestHG(_BaseTest):
249 class TestHG(_BaseTest):
250 REPO = HG_REPO
250 REPO = HG_REPO
251 NEW_REPO = NEW_HG_REPO
251 NEW_REPO = NEW_HG_REPO
252 REPO_TYPE = 'hg'
252 REPO_TYPE = 'hg'
253 REPO_FORK = HG_FORK
253 REPO_FORK = HG_FORK
254
254
255
255
256 @pytest.mark.usefixtures('app', 'autologin_user')
256 @pytest.mark.usefixtures('app', 'autologin_user')
257 @pytest.mark.skip_backends('git','hg')
257 @pytest.mark.skip_backends('git','hg')
258 class TestSVNFork(object):
258 class TestSVNFork(object):
259
259
260 def test_fork_redirects(self, backend):
260 def test_fork_redirects(self, backend):
261 denied_actions = ['fork','fork_create']
261 denied_actions = ['fork','fork_create']
262 for action in denied_actions:
262 for action in denied_actions:
263 response = self.app.get(url(
263 response = self.app.get(url(
264 controller='forks', action=action,
264 controller='forks', action=action,
265 repo_name=backend.repo_name))
265 repo_name=backend.repo_name))
266 assert response.status_int == 302
266 assert response.status_int == 302
267
267
268 # Not allowed, redirect to the summary
268 # Not allowed, redirect to the summary
269 redirected = response.follow()
269 redirected = response.follow()
270 summary_url = h.route_path('repo_summary', repo_name=backend.repo_name)
270 summary_url = h.route_path('repo_summary', repo_name=backend.repo_name)
271
271
272 # URL adds leading slash and path doesn't have it
272 # URL adds leading slash and path doesn't have it
273 assert redirected.request.path == summary_url
273 assert redirected.request.path == summary_url
@@ -1,264 +1,264 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import mock
21 import mock
22 import pytest
22 import pytest
23 from webob.exc import HTTPNotFound
23 from webob.exc import HTTPNotFound
24
24
25 import rhodecode
25 import rhodecode
26 from rhodecode.model.db import Integration
26 from rhodecode.model.db import Integration
27 from rhodecode.model.meta import Session
27 from rhodecode.model.meta import Session
28 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
28 from rhodecode.tests import assert_session_flash, url, TEST_USER_ADMIN_LOGIN
29 from rhodecode.tests.utils import AssertResponse
29 from rhodecode.tests.utils import AssertResponse
30 from rhodecode.integrations import integration_type_registry
30 from rhodecode.integrations import integration_type_registry
31 from rhodecode.config.routing import ADMIN_PREFIX
31 from rhodecode.config.routing import ADMIN_PREFIX
32
32
33
33
34 @pytest.mark.usefixtures('app', 'autologin_user')
34 @pytest.mark.usefixtures('app', 'autologin_user')
35 class TestIntegrationsView(object):
35 class TestIntegrationsView(object):
36 pass
36 pass
37
37
38
38
39 class TestGlobalIntegrationsView(TestIntegrationsView):
39 class TestGlobalIntegrationsView(TestIntegrationsView):
40 def test_index_no_integrations(self, app):
40 def test_index_no_integrations(self, app):
41 url = ADMIN_PREFIX + '/integrations'
41 url = ADMIN_PREFIX + '/integrations'
42 response = app.get(url)
42 response = app.get(url)
43
43
44 assert response.status_code == 200
44 assert response.status_code == 200
45 assert 'exist yet' in response.body
45 assert 'exist yet' in response.body
46
46
47 def test_index_with_integrations(self, app, global_integration_stub):
47 def test_index_with_integrations(self, app, global_integration_stub):
48 url = ADMIN_PREFIX + '/integrations'
48 url = ADMIN_PREFIX + '/integrations'
49 response = app.get(url)
49 response = app.get(url)
50
50
51 assert response.status_code == 200
51 assert response.status_code == 200
52 assert 'exist yet' not in response.body
52 assert 'exist yet' not in response.body
53 assert global_integration_stub.name in response.body
53 assert global_integration_stub.name in response.body
54
54
55 def test_new_integration_page(self, app):
55 def test_new_integration_page(self, app):
56 url = ADMIN_PREFIX + '/integrations/new'
56 url = ADMIN_PREFIX + '/integrations/new'
57
57
58 response = app.get(url)
58 response = app.get(url)
59
59
60 assert response.status_code == 200
60 assert response.status_code == 200
61
61
62 for integration_key in integration_type_registry:
62 for integration_key in integration_type_registry:
63 nurl = (ADMIN_PREFIX + '/integrations/{integration}/new').format(
63 nurl = (ADMIN_PREFIX + '/integrations/{integration}/new').format(
64 integration=integration_key)
64 integration=integration_key)
65 assert nurl in response.body
65 assert nurl in response.body
66
66
67 @pytest.mark.parametrize(
67 @pytest.mark.parametrize(
68 'IntegrationType', integration_type_registry.values())
68 'IntegrationType', integration_type_registry.values())
69 def test_get_create_integration_page(self, app, IntegrationType):
69 def test_get_create_integration_page(self, app, IntegrationType):
70 url = ADMIN_PREFIX + '/integrations/{integration_key}/new'.format(
70 url = ADMIN_PREFIX + '/integrations/{integration_key}/new'.format(
71 integration_key=IntegrationType.key)
71 integration_key=IntegrationType.key)
72
72
73 response = app.get(url)
73 response = app.get(url)
74
74
75 assert response.status_code == 200
75 assert response.status_code == 200
76 assert IntegrationType.display_name in response.body
76 assert IntegrationType.display_name in response.body
77
77
78 def test_post_integration_page(self, app, StubIntegrationType, csrf_token,
78 def test_post_integration_page(self, app, StubIntegrationType, csrf_token,
79 test_repo_group, backend_random):
79 test_repo_group, backend_random):
80 url = ADMIN_PREFIX + '/integrations/{integration_key}/new'.format(
80 url = ADMIN_PREFIX + '/integrations/{integration_key}/new'.format(
81 integration_key=StubIntegrationType.key)
81 integration_key=StubIntegrationType.key)
82
82
83 _post_integration_test_helper(app, url, csrf_token, admin_view=True,
83 _post_integration_test_helper(app, url, csrf_token, admin_view=True,
84 repo=backend_random.repo, repo_group=test_repo_group)
84 repo=backend_random.repo, repo_group=test_repo_group)
85
85
86
86
87 class TestRepoGroupIntegrationsView(TestIntegrationsView):
87 class TestRepoGroupIntegrationsView(TestIntegrationsView):
88 def test_index_no_integrations(self, app, test_repo_group):
88 def test_index_no_integrations(self, app, test_repo_group):
89 url = '/{repo_group_name}/settings/integrations'.format(
89 url = '/{repo_group_name}/settings/integrations'.format(
90 repo_group_name=test_repo_group.group_name)
90 repo_group_name=test_repo_group.group_name)
91 response = app.get(url)
91 response = app.get(url)
92
92
93 assert response.status_code == 200
93 assert response.status_code == 200
94 assert 'exist yet' in response.body
94 assert 'exist yet' in response.body
95
95
96 def test_index_with_integrations(self, app, test_repo_group,
96 def test_index_with_integrations(self, app, test_repo_group,
97 repogroup_integration_stub):
97 repogroup_integration_stub):
98 url = '/{repo_group_name}/settings/integrations'.format(
98 url = '/{repo_group_name}/settings/integrations'.format(
99 repo_group_name=test_repo_group.group_name)
99 repo_group_name=test_repo_group.group_name)
100
100
101 stub_name = repogroup_integration_stub.name
101 stub_name = repogroup_integration_stub.name
102 response = app.get(url)
102 response = app.get(url)
103
103
104 assert response.status_code == 200
104 assert response.status_code == 200
105 assert 'exist yet' not in response.body
105 assert 'exist yet' not in response.body
106 assert stub_name in response.body
106 assert stub_name in response.body
107
107
108 def test_new_integration_page(self, app, test_repo_group):
108 def test_new_integration_page(self, app, test_repo_group):
109 repo_group_name = test_repo_group.group_name
109 repo_group_name = test_repo_group.group_name
110 url = '/{repo_group_name}/settings/integrations/new'.format(
110 url = '/{repo_group_name}/settings/integrations/new'.format(
111 repo_group_name=test_repo_group.group_name)
111 repo_group_name=test_repo_group.group_name)
112
112
113 response = app.get(url)
113 response = app.get(url)
114
114
115 assert response.status_code == 200
115 assert response.status_code == 200
116
116
117 for integration_key in integration_type_registry:
117 for integration_key in integration_type_registry:
118 nurl = ('/{repo_group_name}/settings/integrations'
118 nurl = ('/{repo_group_name}/settings/integrations'
119 '/{integration}/new').format(
119 '/{integration}/new').format(
120 repo_group_name=repo_group_name,
120 repo_group_name=repo_group_name,
121 integration=integration_key)
121 integration=integration_key)
122
122
123 assert nurl in response.body
123 assert nurl in response.body
124
124
125 @pytest.mark.parametrize(
125 @pytest.mark.parametrize(
126 'IntegrationType', integration_type_registry.values())
126 'IntegrationType', integration_type_registry.values())
127 def test_get_create_integration_page(self, app, test_repo_group,
127 def test_get_create_integration_page(self, app, test_repo_group,
128 IntegrationType):
128 IntegrationType):
129 repo_group_name = test_repo_group.group_name
129 repo_group_name = test_repo_group.group_name
130 url = ('/{repo_group_name}/settings/integrations/{integration_key}/new'
130 url = ('/{repo_group_name}/settings/integrations/{integration_key}/new'
131 ).format(repo_group_name=repo_group_name,
131 ).format(repo_group_name=repo_group_name,
132 integration_key=IntegrationType.key)
132 integration_key=IntegrationType.key)
133
133
134 response = app.get(url)
134 response = app.get(url)
135
135
136 assert response.status_code == 200
136 assert response.status_code == 200
137 assert IntegrationType.display_name in response.body
137 assert IntegrationType.display_name in response.body
138
138
139 def test_post_integration_page(self, app, test_repo_group, backend_random,
139 def test_post_integration_page(self, app, test_repo_group, backend_random,
140 StubIntegrationType, csrf_token):
140 StubIntegrationType, csrf_token):
141 repo_group_name = test_repo_group.group_name
141 repo_group_name = test_repo_group.group_name
142 url = ('/{repo_group_name}/settings/integrations/{integration_key}/new'
142 url = ('/{repo_group_name}/settings/integrations/{integration_key}/new'
143 ).format(repo_group_name=repo_group_name,
143 ).format(repo_group_name=repo_group_name,
144 integration_key=StubIntegrationType.key)
144 integration_key=StubIntegrationType.key)
145
145
146 _post_integration_test_helper(app, url, csrf_token, admin_view=False,
146 _post_integration_test_helper(app, url, csrf_token, admin_view=False,
147 repo=backend_random.repo, repo_group=test_repo_group)
147 repo=backend_random.repo, repo_group=test_repo_group)
148
148
149
149
150 class TestRepoIntegrationsView(TestIntegrationsView):
150 class TestRepoIntegrationsView(TestIntegrationsView):
151 def test_index_no_integrations(self, app, backend_random):
151 def test_index_no_integrations(self, app, backend_random):
152 url = '/{repo_name}/settings/integrations'.format(
152 url = '/{repo_name}/settings/integrations'.format(
153 repo_name=backend_random.repo.repo_name)
153 repo_name=backend_random.repo.repo_name)
154 response = app.get(url)
154 response = app.get(url)
155
155
156 assert response.status_code == 200
156 assert response.status_code == 200
157 assert 'exist yet' in response.body
157 assert 'exist yet' in response.body
158
158
159 def test_index_with_integrations(self, app, repo_integration_stub):
159 def test_index_with_integrations(self, app, repo_integration_stub):
160 url = '/{repo_name}/settings/integrations'.format(
160 url = '/{repo_name}/settings/integrations'.format(
161 repo_name=repo_integration_stub.repo.repo_name)
161 repo_name=repo_integration_stub.repo.repo_name)
162 stub_name = repo_integration_stub.name
162 stub_name = repo_integration_stub.name
163
163
164 response = app.get(url)
164 response = app.get(url)
165
165
166 assert response.status_code == 200
166 assert response.status_code == 200
167 assert stub_name in response.body
167 assert stub_name in response.body
168 assert 'exist yet' not in response.body
168 assert 'exist yet' not in response.body
169
169
170 def test_new_integration_page(self, app, backend_random):
170 def test_new_integration_page(self, app, backend_random):
171 repo_name = backend_random.repo.repo_name
171 repo_name = backend_random.repo.repo_name
172 url = '/{repo_name}/settings/integrations/new'.format(
172 url = '/{repo_name}/settings/integrations/new'.format(
173 repo_name=repo_name)
173 repo_name=repo_name)
174
174
175 response = app.get(url)
175 response = app.get(url)
176
176
177 assert response.status_code == 200
177 assert response.status_code == 200
178
178
179 for integration_key in integration_type_registry:
179 for integration_key in integration_type_registry:
180 nurl = ('/{repo_name}/settings/integrations'
180 nurl = ('/{repo_name}/settings/integrations'
181 '/{integration}/new').format(
181 '/{integration}/new').format(
182 repo_name=repo_name,
182 repo_name=repo_name,
183 integration=integration_key)
183 integration=integration_key)
184
184
185 assert nurl in response.body
185 assert nurl in response.body
186
186
187 @pytest.mark.parametrize(
187 @pytest.mark.parametrize(
188 'IntegrationType', integration_type_registry.values())
188 'IntegrationType', integration_type_registry.values())
189 def test_get_create_integration_page(self, app, backend_random,
189 def test_get_create_integration_page(self, app, backend_random,
190 IntegrationType):
190 IntegrationType):
191 repo_name = backend_random.repo.repo_name
191 repo_name = backend_random.repo.repo_name
192 url = '/{repo_name}/settings/integrations/{integration_key}/new'.format(
192 url = '/{repo_name}/settings/integrations/{integration_key}/new'.format(
193 repo_name=repo_name, integration_key=IntegrationType.key)
193 repo_name=repo_name, integration_key=IntegrationType.key)
194
194
195 response = app.get(url)
195 response = app.get(url)
196
196
197 assert response.status_code == 200
197 assert response.status_code == 200
198 assert IntegrationType.display_name in response.body
198 assert IntegrationType.display_name in response.body
199
199
200 def test_post_integration_page(self, app, backend_random, test_repo_group,
200 def test_post_integration_page(self, app, backend_random, test_repo_group,
201 StubIntegrationType, csrf_token):
201 StubIntegrationType, csrf_token):
202 repo_name = backend_random.repo.repo_name
202 repo_name = backend_random.repo.repo_name
203 url = '/{repo_name}/settings/integrations/{integration_key}/new'.format(
203 url = '/{repo_name}/settings/integrations/{integration_key}/new'.format(
204 repo_name=repo_name, integration_key=StubIntegrationType.key)
204 repo_name=repo_name, integration_key=StubIntegrationType.key)
205
205
206 _post_integration_test_helper(app, url, csrf_token, admin_view=False,
206 _post_integration_test_helper(app, url, csrf_token, admin_view=False,
207 repo=backend_random.repo, repo_group=test_repo_group)
207 repo=backend_random.repo, repo_group=test_repo_group)
208
208
209
209
210 def _post_integration_test_helper(app, url, csrf_token, repo, repo_group,
210 def _post_integration_test_helper(app, url, csrf_token, repo, repo_group,
211 admin_view):
211 admin_view):
212 """
212 """
213 Posts form data to create integration at the url given then deletes it and
213 Posts form data to create integration at the url given then deletes it and
214 checks if the redirect url is correct.
214 checks if the redirect url is correct.
215 """
215 """
216
216
217 app.post(url, params={}, status=403) # missing csrf check
217 app.post(url, params={}, status=403) # missing csrf check
218 response = app.post(url, params={'csrf_token': csrf_token})
218 response = app.post(url, params={'csrf_token': csrf_token})
219 assert response.status_code == 200
219 assert response.status_code == 200
220 assert 'Errors exist' in response.body
220 assert 'Errors exist' in response.body
221
221
222 scopes_destinations = [
222 scopes_destinations = [
223 ('global',
223 ('global',
224 ADMIN_PREFIX + '/integrations'),
224 ADMIN_PREFIX + '/integrations'),
225 ('root-repos',
225 ('root-repos',
226 ADMIN_PREFIX + '/integrations'),
226 ADMIN_PREFIX + '/integrations'),
227 ('repo:%s' % repo.repo_name,
227 ('repo:%s' % repo.repo_name,
228 '/%s/settings/integrations' % repo.repo_name),
228 '/%s/settings/integrations' % repo.repo_name),
229 ('repogroup:%s' % repo_group.group_name,
229 ('repogroup:%s' % repo_group.group_name,
230 '/%s/settings/integrations' % repo_group.group_name),
230 '/%s/settings/integrations' % repo_group.group_name),
231 ('repogroup-recursive:%s' % repo_group.group_name,
231 ('repogroup-recursive:%s' % repo_group.group_name,
232 '/%s/settings/integrations' % repo_group.group_name),
232 '/%s/settings/integrations' % repo_group.group_name),
233 ]
233 ]
234
234
235 for scope, destination in scopes_destinations:
235 for scope, destination in scopes_destinations:
236 if admin_view:
236 if admin_view:
237 destination = ADMIN_PREFIX + '/integrations'
237 destination = ADMIN_PREFIX + '/integrations'
238
238
239 form_data = [
239 form_data = [
240 ('csrf_token', csrf_token),
240 ('csrf_token', csrf_token),
241 ('__start__', 'options:mapping'),
241 ('__start__', 'options:mapping'),
242 ('name', 'test integration'),
242 ('name', 'test integration'),
243 ('scope', scope),
243 ('scope', scope),
244 ('enabled', 'true'),
244 ('enabled', 'true'),
245 ('__end__', 'options:mapping'),
245 ('__end__', 'options:mapping'),
246 ('__start__', 'settings:mapping'),
246 ('__start__', 'settings:mapping'),
247 ('test_int_field', '34'),
247 ('test_int_field', '34'),
248 ('test_string_field', ''), # empty value on purpose as it's required
248 ('test_string_field', ''), # empty value on purpose as it's required
249 ('__end__', 'settings:mapping'),
249 ('__end__', 'settings:mapping'),
250 ]
250 ]
251 errors_response = app.post(url, form_data)
251 errors_response = app.post(url, form_data)
252 assert 'Errors exist' in errors_response.body
252 assert 'Errors exist' in errors_response.body
253
253
254 form_data[-2] = ('test_string_field', 'data!')
254 form_data[-2] = ('test_string_field', 'data!')
255 assert Session().query(Integration).count() == 0
255 assert Session().query(Integration).count() == 0
256 created_response = app.post(url, form_data)
256 created_response = app.post(url, form_data)
257 assert Session().query(Integration).count() == 1
257 assert Session().query(Integration).count() == 1
258
258
259 delete_response = app.post(
259 delete_response = app.post(
260 created_response.location,
260 created_response.location,
261 params={'csrf_token': csrf_token, 'delete': 'delete'})
261 params={'csrf_token': csrf_token, 'delete': 'delete'})
262
262
263 assert Session().query(Integration).count() == 0
263 assert Session().query(Integration).count() == 0
264 assert delete_response.location.endswith(destination)
264 assert delete_response.location.endswith(destination)
General Comments 0
You need to be logged in to leave comments. Login now