##// END OF EJS Templates
created basic TestClass for tests that does...
marcink -
r3829:5067d6e8 beta
parent child Browse files
Show More
@@ -1,193 +1,202 b''
1 1 """Pylons application test package
2 2
3 3 This package assumes the Pylons environment is already loaded, such as
4 4 when this script is imported from the `nosetests --with-pylons=test.ini`
5 5 command.
6 6
7 7 This module initializes the application via ``websetup`` (`paster
8 8 setup-app`) and provides the base testing objects.
9 9
10 10 nosetests -x - fail on first error
11 11 nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
12 12 nosetests --pdb --pdb-failures
13 13 nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
14 14
15 15 optional FLAGS:
16 16 RC_WHOOSH_TEST_DISABLE=1 - skip whoosh index building and tests
17 17 RC_NO_TMP_PATH=1 - disable new temp path for tests, used mostly for test_vcs_operations
18 18
19 19 """
20 20 import os
21 21 import time
22 22 import logging
23 23 import datetime
24 24 import hashlib
25 25 import tempfile
26 26 from os.path import join as jn
27 27
28 28 from unittest import TestCase
29 29 from tempfile import _RandomNameSequence
30 30
31 31 from paste.deploy import loadapp
32 32 from paste.script.appinstall import SetupCommand
33 33
34 34 import pylons
35 35 import pylons.test
36 36 from pylons import config, url
37 37 from pylons.i18n.translation import _get_translator
38 from pylons.util import ContextObj
38 39
39 40 from routes.util import URLGenerator
40 41 from webtest import TestApp
41 42 from nose.plugins.skip import SkipTest
42 43
43 44 from rhodecode import is_windows
44 45 from rhodecode.model.meta import Session
45 46 from rhodecode.model.db import User
46 47 from rhodecode.tests.nose_parametrized import parameterized
47 48 from rhodecode.lib.utils2 import safe_unicode, safe_str
48 49
49 50
50 51 os.environ['TZ'] = 'UTC'
51 52 if not is_windows:
52 53 time.tzset()
53 54
54 55 log = logging.getLogger(__name__)
55 56
56 57 __all__ = [
57 58 'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
58 'SkipTest', 'ldap_lib_installed',
59 'SkipTest', 'ldap_lib_installed', 'BaseTestCase', 'init_stack',
59 60 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
60 61 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
61 62 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
62 63 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
63 64 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
64 65 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
65 66 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
66 67 'GIT_REMOTE_REPO', 'SCM_TESTS',
67 68 ]
68 69
69 70 # Invoke websetup with the current config file
70 71 # SetupCommand('setup-app').run([config_file])
71 72
72 73 environ = {}
73 74
74 75 #SOME GLOBALS FOR TESTS
75 76
76 77 TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
77 78 TEST_USER_ADMIN_LOGIN = 'test_admin'
78 79 TEST_USER_ADMIN_PASS = 'test12'
79 80 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
80 81
81 82 TEST_USER_REGULAR_LOGIN = 'test_regular'
82 83 TEST_USER_REGULAR_PASS = 'test12'
83 84 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
84 85
85 86 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
86 87 TEST_USER_REGULAR2_PASS = 'test12'
87 88 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
88 89
89 90 HG_REPO = 'vcs_test_hg'
90 91 GIT_REPO = 'vcs_test_git'
91 92
92 93 NEW_HG_REPO = 'vcs_test_hg_new'
93 94 NEW_GIT_REPO = 'vcs_test_git_new'
94 95
95 96 HG_FORK = 'vcs_test_hg_fork'
96 97 GIT_FORK = 'vcs_test_git_fork'
97 98
98 99 ## VCS
99 100 SCM_TESTS = ['hg', 'git']
100 101 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
101 102
102 103 GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
103 104
104 105 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
105 106 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
106 107 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
107 108
108 109
109 110 HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
110 111
111 112 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
112 113 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
113 114 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
114 115
115 116 TEST_DIR = tempfile.gettempdir()
116 117 TEST_REPO_PREFIX = 'vcs-test'
117 118
118 119 # cached repos if any !
119 120 # comment out to get some other repos from bb or github
120 121 GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
121 122 HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
122 123
123 124 #skip ldap tests if LDAP lib is not installed
124 125 ldap_lib_installed = False
125 126 try:
126 127 import ldap
127 128 ldap_lib_installed = True
128 129 except ImportError:
129 130 # means that python-ldap is not installed
130 131 pass
131 132
132 133
133 134 def get_new_dir(title):
134 135 """
135 136 Returns always new directory path.
136 137 """
137 138 from rhodecode.tests.vcs.utils import get_normalized_path
138 139 name = TEST_REPO_PREFIX
139 140 if title:
140 141 name = '-'.join((name, title))
141 142 hex = hashlib.sha1(str(time.time())).hexdigest()
142 143 name = '-'.join((name, hex))
143 144 path = os.path.join(TEST_DIR, name)
144 145 return get_normalized_path(path)
145 146
146 147
147 class TestController(TestCase):
148 def init_stack(config=None):
149 if not config:
150 config = pylons.test.pylonsapp.config
151 url._push_object(URLGenerator(config['routes.map'], environ))
152 pylons.app_globals._push_object(config['pylons.app_globals'])
153 pylons.config._push_object(config)
154 pylons.tmpl_context._push_object(ContextObj())
155 # Initialize a translator for tests that utilize i18n
156 translator = _get_translator(pylons.config.get('lang'))
157 pylons.translator._push_object(translator)
158
159
160 class BaseTestCase(TestCase):
161 def __init__(self, *args, **kwargs):
162 self.wsgiapp = pylons.test.pylonsapp
163 init_stack(self.wsgiapp.config)
164 TestCase.__init__(self, *args, **kwargs)
165
166
167 class TestController(BaseTestCase):
148 168
149 169 def __init__(self, *args, **kwargs):
150 wsgiapp = pylons.test.pylonsapp
151 config = wsgiapp.config
152
153 self.app = TestApp(wsgiapp)
154 url._push_object(URLGenerator(config['routes.map'], environ))
155 pylons.app_globals._push_object(config['pylons.app_globals'])
156 pylons.config._push_object(config)
157
158 # Initialize a translator for tests that utilize i18n
159 translator = _get_translator(pylons.config.get('lang'))
160 pylons.translator._push_object(translator)
161
170 BaseTestCase.__init__(self, *args, **kwargs)
171 self.app = TestApp(self.wsgiapp)
162 172 self.index_location = config['app_conf']['index_dir']
163 TestCase.__init__(self, *args, **kwargs)
164 173
165 174 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
166 175 password=TEST_USER_ADMIN_PASS):
167 176 self._logged_username = username
168 177 response = self.app.post(url(controller='login', action='index'),
169 178 {'username': username,
170 179 'password': password})
171 180
172 181 if 'invalid user name' in response.body:
173 182 self.fail('could not login using %s %s' % (username, password))
174 183
175 184 self.assertEqual(response.status, '302 Found')
176 185 ses = response.session['rhodecode_user']
177 186 self.assertEqual(ses.get('username'), username)
178 187 response = response.follow()
179 188 self.assertEqual(ses.get('is_authenticated'), True)
180 189
181 190 return response.session['rhodecode_user']
182 191
183 192 def _get_logged_user(self):
184 193 return User.get_by_username(self._logged_username)
185 194
186 195 def checkSessionFlash(self, response, msg):
187 196 self.assertTrue('flash' in response.session,
188 197 msg='Response session:%r have no flash'
189 198 % response.session)
190 199 if not msg in response.session['flash'][0][1]:
191 200 msg = u'msg `%s` not found in session flash: got `%s` instead' % (
192 201 msg, response.session['flash'][0][1])
193 202 self.fail(safe_str(msg))
@@ -1,104 +1,101 b''
1 import os
2 import unittest
3 import functools
4 1 from rhodecode.tests import *
5 2 from rhodecode.tests.fixture import Fixture
6 3
7 4 from rhodecode.model.repos_group import ReposGroupModel
8 5 from rhodecode.model.repo import RepoModel
9 6 from rhodecode.model.db import RepoGroup, Repository, User
10 7 from rhodecode.model.user import UserModel
11 8
12 9 from rhodecode.lib.auth import AuthUser
13 10 from rhodecode.model.meta import Session
14 11
15 12
16 13 fixture = Fixture()
17 14
18 15
19 16 def _destroy_project_tree(test_u1_id):
20 17 Session.remove()
21 18 repos_group = RepoGroup.get_by_group_name(group_name='g0')
22 19 for el in reversed(repos_group.recursive_groups_and_repos()):
23 20 if isinstance(el, Repository):
24 21 RepoModel().delete(el)
25 22 elif isinstance(el, RepoGroup):
26 23 ReposGroupModel().delete(el, force_delete=True)
27 24
28 25 u = User.get(test_u1_id)
29 26 Session().delete(u)
30 27 Session().commit()
31 28
32 29
33 30 def _create_project_tree():
34 31 """
35 32 Creates a tree of groups and repositories to test permissions
36 33
37 34 structure
38 35 [g0] - group `g0` with 3 subgroups
39 36 |
40 37 |__[g0_1] group g0_1 with 2 groups 0 repos
41 38 | |
42 39 | |__[g0_1_1] group g0_1_1 with 1 group 2 repos
43 40 | | |__<g0/g0_1/g0_1_1/g0_1_1_r1>
44 41 | | |__<g0/g0_1/g0_1_1/g0_1_1_r2>
45 42 | |__<g0/g0_1/g0_1_r1>
46 43 |
47 44 |__[g0_2] 2 repos
48 45 | |
49 46 | |__<g0/g0_2/g0_2_r1>
50 47 | |__<g0/g0_2/g0_2_r2>
51 48 |
52 49 |__[g0_3] 1 repo
53 50 |
54 51 |_<g0/g0_3/g0_3_r1>
55 52 |_<g0/g0_3/g0_3_r2_private>
56 53
57 54 """
58 55 test_u1 = UserModel().create_or_update(
59 56 username=u'test_u1', password=u'qweqwe',
60 57 email=u'test_u1@rhodecode.org', firstname=u'test_u1', lastname=u'test_u1'
61 58 )
62 59 g0 = fixture.create_group('g0')
63 60 g0_1 = fixture.create_group('g0_1', group_parent_id=g0)
64 61 g0_1_1 = fixture.create_group('g0_1_1', group_parent_id=g0_1)
65 62 g0_1_1_r1 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1)
66 63 g0_1_1_r2 = fixture.create_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1)
67 64 g0_1_r1 = fixture.create_repo('g0/g0_1/g0_1_r1', repos_group=g0_1)
68 65 g0_2 = fixture.create_group('g0_2', group_parent_id=g0)
69 66 g0_2_r1 = fixture.create_repo('g0/g0_2/g0_2_r1', repos_group=g0_2)
70 67 g0_2_r2 = fixture.create_repo('g0/g0_2/g0_2_r2', repos_group=g0_2)
71 68 g0_3 = fixture.create_group('g0_3', group_parent_id=g0)
72 69 g0_3_r1 = fixture.create_repo('g0/g0_3/g0_3_r1', repos_group=g0_3)
73 70 g0_3_r2_private = fixture.create_repo('g0/g0_3/g0_3_r1_private',
74 71 repos_group=g0_3, repo_private=True)
75 72 return test_u1
76 73
77 74
78 75 def expected_count(group_name, objects=False):
79 76 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
80 77 objs = repos_group.recursive_groups_and_repos()
81 78 if objects:
82 79 return objs
83 80 return len(objs)
84 81
85 82
86 83 def _check_expected_count(items, repo_items, expected):
87 84 should_be = len(items + repo_items)
88 85 there_are = len(expected)
89 86 assert should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
90 87
91 88
92 89 def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
93 90 assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
94 91 % (obj_name, repo_perm, expected_perm))
95 92
96 93
97 94 def _get_perms(filter_='', recursive=True, key=None, test_u1_id=None):
98 95 test_u1 = AuthUser(user_id=test_u1_id)
99 96 for k, v in test_u1.permissions[key].items():
100 97 if recursive and k.startswith(filter_):
101 98 yield k, v
102 99 elif not recursive:
103 100 if k == filter_:
104 101 yield k, v
@@ -1,251 +1,250 b''
1 1 from __future__ import with_statement
2 2 import os
3 import unittest
4 3 from rhodecode.tests import *
5 4 from rhodecode.lib.diffs import DiffProcessor, NEW_FILENODE, DEL_FILENODE, \
6 5 MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
7 6
8 7 dn = os.path.dirname
9 8 FIXTURES = os.path.join(dn(dn(os.path.abspath(__file__))), 'fixtures')
10 9
11 10 DIFF_FIXTURES = {
12 11 'hg_diff_add_single_binary_file.diff': [
13 12 ('US Warszawa.jpg', 'A',
14 13 {'added': 0,
15 14 'deleted': 0,
16 15 'binary': True,
17 16 'ops': {NEW_FILENODE: 'new file 100755',
18 17 BIN_FILENODE: 'binary diff not shown'}}),
19 18 ],
20 19 'hg_diff_mod_single_binary_file.diff': [
21 20 ('US Warszawa.jpg', 'M',
22 21 {'added': 0,
23 22 'deleted': 0,
24 23 'binary': True,
25 24 'ops': {MOD_FILENODE: 'modified file',
26 25 BIN_FILENODE: 'binary diff not shown'}}),
27 26 ],
28 27
29 28 'hg_diff_mod_single_file_and_rename_and_chmod.diff': [
30 29 ('README', 'M',
31 30 {'added': 3,
32 31 'deleted': 0,
33 32 'binary': False,
34 33 'ops': {MOD_FILENODE: 'modified file',
35 34 RENAMED_FILENODE: 'file renamed from README.rst to README',
36 35 CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
37 36 ],
38 37 'hg_diff_rename_and_chmod_file.diff': [
39 38 ('README', 'M',
40 39 {'added': 3,
41 40 'deleted': 0,
42 41 'binary': False,
43 42 'ops': {MOD_FILENODE: 'modified file',
44 43 BIN_FILENODE: 'binary diff not shown'}}),
45 44 ],
46 45 'hg_diff_del_single_binary_file.diff': [
47 46 ('US Warszawa.jpg', 'D',
48 47 {'added': 0,
49 48 'deleted': 0,
50 49 'binary': True,
51 50 'ops': {DEL_FILENODE: 'deleted file',
52 51 BIN_FILENODE: 'binary diff not shown'}}),
53 52 ],
54 53 'hg_diff_chmod_and_mod_single_binary_file.diff': [
55 54 ('gravatar.png', 'M',
56 55 {'added': 0,
57 56 'deleted': 0,
58 57 'binary': True,
59 58 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
60 59 BIN_FILENODE: 'binary diff not shown'}}),
61 60 ],
62 61 'hg_diff_chmod.diff': [
63 62 ('file', 'M',
64 63 {'added': 0,
65 64 'deleted': 0,
66 65 'binary': True,
67 66 'ops': {CHMOD_FILENODE: 'modified file chmod 100755 => 100644'}}),
68 67 ],
69 68 'hg_diff_rename_file.diff': [
70 69 ('file_renamed', 'M',
71 70 {'added': 0,
72 71 'deleted': 0,
73 72 'binary': True,
74 73 'ops': {RENAMED_FILENODE: 'file renamed from file to file_renamed'}}),
75 74 ],
76 75 'hg_diff_rename_and_chmod_file.diff': [
77 76 ('README', 'M',
78 77 {'added': 0,
79 78 'deleted': 0,
80 79 'binary': True,
81 80 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755',
82 81 RENAMED_FILENODE: 'file renamed from README.rst to README'}}),
83 82 ],
84 83 'hg_diff_binary_and_normal.diff': [
85 84 ('img/baseline-10px.png', 'A',
86 85 {'added': 0,
87 86 'deleted': 0,
88 87 'binary': True,
89 88 'ops': {NEW_FILENODE: 'new file 100644',
90 89 BIN_FILENODE: 'binary diff not shown'}}),
91 90 ('js/jquery/hashgrid.js', 'A',
92 91 {'added': 340,
93 92 'deleted': 0,
94 93 'binary': False,
95 94 'ops': {NEW_FILENODE: 'new file 100755'}}),
96 95 ('index.html', 'M',
97 96 {'added': 3,
98 97 'deleted': 2,
99 98 'binary': False,
100 99 'ops': {MOD_FILENODE: 'modified file'}}),
101 100 ('less/docs.less', 'M',
102 101 {'added': 34,
103 102 'deleted': 0,
104 103 'binary': False,
105 104 'ops': {MOD_FILENODE: 'modified file'}}),
106 105 ('less/scaffolding.less', 'M',
107 106 {'added': 1,
108 107 'deleted': 3,
109 108 'binary': False,
110 109 'ops': {MOD_FILENODE: 'modified file'}}),
111 110 ('readme.markdown', 'M',
112 111 {'added': 1,
113 112 'deleted': 10,
114 113 'binary': False,
115 114 'ops': {MOD_FILENODE: 'modified file'}}),
116 115 ('img/baseline-20px.png', 'D',
117 116 {'added': 0,
118 117 'deleted': 0,
119 118 'binary': True,
120 119 'ops': {DEL_FILENODE: 'deleted file',
121 120 BIN_FILENODE: 'binary diff not shown'}}),
122 121 ('js/global.js', 'D',
123 122 {'added': 0,
124 123 'deleted': 75,
125 124 'binary': False,
126 125 'ops': {DEL_FILENODE: 'deleted file'}})
127 126 ],
128 127 'git_diff_chmod.diff': [
129 128 ('work-horus.xls', 'M',
130 129 {'added': 0,
131 130 'deleted': 0,
132 131 'binary': True,
133 132 'ops': {CHMOD_FILENODE: 'modified file chmod 100644 => 100755'}})
134 133 ],
135 134 'git_diff_rename_file.diff': [
136 135 ('file.xls', 'M',
137 136 {'added': 0,
138 137 'deleted': 0,
139 138 'binary': True,
140 139 'ops': {RENAMED_FILENODE: 'file renamed from work-horus.xls to file.xls'}})
141 140 ],
142 141 'git_diff_mod_single_binary_file.diff': [
143 142 ('US Warszawa.jpg', 'M',
144 143 {'added': 0,
145 144 'deleted': 0,
146 145 'binary': True,
147 146 'ops': {MOD_FILENODE: 'modified file',
148 147 BIN_FILENODE: 'binary diff not shown'}})
149 148 ],
150 149 'git_diff_binary_and_normal.diff': [
151 150 ('img/baseline-10px.png', 'A',
152 151 {'added': 0,
153 152 'deleted': 0,
154 153 'binary': True,
155 154 'ops': {NEW_FILENODE: 'new file 100644',
156 155 BIN_FILENODE: 'binary diff not shown'}}),
157 156 ('js/jquery/hashgrid.js', 'A',
158 157 {'added': 340,
159 158 'deleted': 0,
160 159 'binary': False,
161 160 'ops': {NEW_FILENODE: 'new file 100755'}}),
162 161 ('index.html', 'M',
163 162 {'added': 3,
164 163 'deleted': 2,
165 164 'binary': False,
166 165 'ops': {MOD_FILENODE: 'modified file'}}),
167 166 ('less/docs.less', 'M',
168 167 {'added': 34,
169 168 'deleted': 0,
170 169 'binary': False,
171 170 'ops': {MOD_FILENODE: 'modified file'}}),
172 171 ('less/scaffolding.less', 'M',
173 172 {'added': 1,
174 173 'deleted': 3,
175 174 'binary': False,
176 175 'ops': {MOD_FILENODE: 'modified file'}}),
177 176 ('readme.markdown', 'M',
178 177 {'added': 1,
179 178 'deleted': 10,
180 179 'binary': False,
181 180 'ops': {MOD_FILENODE: 'modified file'}}),
182 181 ('img/baseline-20px.png', 'D',
183 182 {'added': 0,
184 183 'deleted': 0,
185 184 'binary': True,
186 185 'ops': {DEL_FILENODE: 'deleted file',
187 186 BIN_FILENODE: 'binary diff not shown'}}),
188 187 ('js/global.js', 'D',
189 188 {'added': 0,
190 189 'deleted': 75,
191 190 'binary': False,
192 191 'ops': {DEL_FILENODE: 'deleted file'}}),
193 192 ],
194 193 'diff_with_diff_data.diff': [
195 194 ('vcs/backends/base.py', 'M',
196 195 {'added': 18,
197 196 'deleted': 2,
198 197 'binary': False,
199 198 'ops': {MOD_FILENODE: 'modified file'}}),
200 199 ('vcs/backends/git/repository.py', 'M',
201 200 {'added': 46,
202 201 'deleted': 15,
203 202 'binary': False,
204 203 'ops': {MOD_FILENODE: 'modified file'}}),
205 204 ('vcs/backends/hg.py', 'M',
206 205 {'added': 22,
207 206 'deleted': 3,
208 207 'binary': False,
209 208 'ops': {MOD_FILENODE: 'modified file'}}),
210 209 ('vcs/tests/test_git.py', 'M',
211 210 {'added': 5,
212 211 'deleted': 5,
213 212 'binary': False,
214 213 'ops': {MOD_FILENODE: 'modified file'}}),
215 214 ('vcs/tests/test_repository.py', 'M',
216 215 {'added': 174,
217 216 'deleted': 2,
218 217 'binary': False,
219 218 'ops': {MOD_FILENODE: 'modified file'}}),
220 219 ],
221 220 # 'large_diff.diff': [
222 221 # ('.hgignore', 'A', {'deleted': 0, 'binary': False, 'added': 3, 'ops': {1: 'new file 100644'}}),
223 222 # ('MANIFEST.in', 'A', {'deleted': 0, 'binary': False, 'added': 3, 'ops': {1: 'new file 100644'}}),
224 223 # ('README.txt', 'A', {'deleted': 0, 'binary': False, 'added': 19, 'ops': {1: 'new file 100644'}}),
225 224 # ('development.ini', 'A', {'deleted': 0, 'binary': False, 'added': 116, 'ops': {1: 'new file 100644'}}),
226 225 # ('docs/index.txt', 'A', {'deleted': 0, 'binary': False, 'added': 19, 'ops': {1: 'new file 100644'}}),
227 226 # ('ez_setup.py', 'A', {'deleted': 0, 'binary': False, 'added': 276, 'ops': {1: 'new file 100644'}}),
228 227 # ('hgapp.py', 'A', {'deleted': 0, 'binary': False, 'added': 26, 'ops': {1: 'new file 100644'}}),
229 228 # ('hgwebdir.config', 'A', {'deleted': 0, 'binary': False, 'added': 21, 'ops': {1: 'new file 100644'}}),
230 229 # ('pylons_app.egg-info/PKG-INFO', 'A', {'deleted': 0, 'binary': False, 'added': 10, 'ops': {1: 'new file 100644'}}),
231 230 # ('pylons_app.egg-info/SOURCES.txt', 'A', {'deleted': 0, 'binary': False, 'added': 33, 'ops': {1: 'new file 100644'}}),
232 231 # ('pylons_app.egg-info/dependency_links.txt', 'A', {'deleted': 0, 'binary': False, 'added': 1, 'ops': {1: 'new file 100644'}}),
233 232 # #TODO:
234 233 # ],
235 234
236 235 }
237 236
238 237
239 class DiffLibTest(unittest.TestCase):
238 class DiffLibTest(BaseTestCase):
240 239
241 240 @parameterized.expand([(x,) for x in DIFF_FIXTURES])
242 241 def test_diff(self, diff_fixture):
243 242
244 243 with open(os.path.join(FIXTURES, diff_fixture)) as f:
245 244 diff = f.read()
246 245
247 246 diff_proc = DiffProcessor(diff)
248 247 diff_proc_d = diff_proc.prepare()
249 248 data = [(x['filename'], x['operation'], x['stats']) for x in diff_proc_d]
250 249 expected_data = DIFF_FIXTURES[diff_fixture]
251 250 self.assertListEqual(expected_data, data)
@@ -1,187 +1,185 b''
1 import os
2 import unittest
3 1 from rhodecode.tests import *
4 2
5 3 from rhodecode.model.db import User, Notification, UserNotification
6 4 from rhodecode.model.user import UserModel
7 5
8 6 from rhodecode.model.meta import Session
9 7 from rhodecode.model.notification import NotificationModel
10 8
11 9
12 class TestNotifications(unittest.TestCase):
10 class TestNotifications(BaseTestCase):
13 11
14 12 def __init__(self, methodName='runTest'):
15 13 Session.remove()
16 14 self.u1 = UserModel().create_or_update(username=u'u1',
17 15 password=u'qweqwe',
18 16 email=u'u1@rhodecode.org',
19 17 firstname=u'u1', lastname=u'u1')
20 18 Session().commit()
21 19 self.u1 = self.u1.user_id
22 20
23 21 self.u2 = UserModel().create_or_update(username=u'u2',
24 22 password=u'qweqwe',
25 23 email=u'u2@rhodecode.org',
26 24 firstname=u'u2', lastname=u'u3')
27 25 Session().commit()
28 26 self.u2 = self.u2.user_id
29 27
30 28 self.u3 = UserModel().create_or_update(username=u'u3',
31 29 password=u'qweqwe',
32 30 email=u'u3@rhodecode.org',
33 31 firstname=u'u3', lastname=u'u3')
34 32 Session().commit()
35 33 self.u3 = self.u3.user_id
36 34
37 35 super(TestNotifications, self).__init__(methodName=methodName)
38 36
39 37 def _clean_notifications(self):
40 38 for n in Notification.query().all():
41 39 Session().delete(n)
42 40
43 41 Session().commit()
44 42 self.assertEqual(Notification.query().all(), [])
45 43
46 44 def tearDown(self):
47 45 self._clean_notifications()
48 46
49 47 def test_create_notification(self):
50 48 self.assertEqual([], Notification.query().all())
51 49 self.assertEqual([], UserNotification.query().all())
52 50
53 51 usrs = [self.u1, self.u2]
54 52 notification = NotificationModel().create(created_by=self.u1,
55 53 subject=u'subj', body=u'hi there',
56 54 recipients=usrs)
57 55 Session().commit()
58 56 u1 = User.get(self.u1)
59 57 u2 = User.get(self.u2)
60 58 u3 = User.get(self.u3)
61 59 notifications = Notification.query().all()
62 60 self.assertEqual(len(notifications), 1)
63 61
64 62 self.assertEqual(notifications[0].recipients, [u1, u2])
65 63 self.assertEqual(notification.notification_id,
66 64 notifications[0].notification_id)
67 65
68 66 unotification = UserNotification.query()\
69 67 .filter(UserNotification.notification == notification).all()
70 68
71 69 self.assertEqual(len(unotification), len(usrs))
72 70 self.assertEqual(set([x.user.user_id for x in unotification]),
73 71 set(usrs))
74 72
75 73 def test_user_notifications(self):
76 74 self.assertEqual([], Notification.query().all())
77 75 self.assertEqual([], UserNotification.query().all())
78 76
79 77 notification1 = NotificationModel().create(created_by=self.u1,
80 78 subject=u'subj', body=u'hi there1',
81 79 recipients=[self.u3])
82 80 Session().commit()
83 81 notification2 = NotificationModel().create(created_by=self.u1,
84 82 subject=u'subj', body=u'hi there2',
85 83 recipients=[self.u3])
86 84 Session().commit()
87 85 u3 = Session().query(User).get(self.u3)
88 86
89 87 self.assertEqual(sorted([x.notification for x in u3.notifications]),
90 88 sorted([notification2, notification1]))
91 89
92 90 def test_delete_notifications(self):
93 91 self.assertEqual([], Notification.query().all())
94 92 self.assertEqual([], UserNotification.query().all())
95 93
96 94 notification = NotificationModel().create(created_by=self.u1,
97 95 subject=u'title', body=u'hi there3',
98 96 recipients=[self.u3, self.u1, self.u2])
99 97 Session().commit()
100 98 notifications = Notification.query().all()
101 99 self.assertTrue(notification in notifications)
102 100
103 101 Notification.delete(notification.notification_id)
104 102 Session().commit()
105 103
106 104 notifications = Notification.query().all()
107 105 self.assertFalse(notification in notifications)
108 106
109 107 un = UserNotification.query().filter(UserNotification.notification
110 108 == notification).all()
111 109 self.assertEqual(un, [])
112 110
113 111 def test_delete_association(self):
114 112
115 113 self.assertEqual([], Notification.query().all())
116 114 self.assertEqual([], UserNotification.query().all())
117 115
118 116 notification = NotificationModel().create(created_by=self.u1,
119 117 subject=u'title', body=u'hi there3',
120 118 recipients=[self.u3, self.u1, self.u2])
121 119 Session().commit()
122 120
123 121 unotification = UserNotification.query()\
124 122 .filter(UserNotification.notification ==
125 123 notification)\
126 124 .filter(UserNotification.user_id == self.u3)\
127 125 .scalar()
128 126
129 127 self.assertEqual(unotification.user_id, self.u3)
130 128
131 129 NotificationModel().delete(self.u3,
132 130 notification.notification_id)
133 131 Session().commit()
134 132
135 133 u3notification = UserNotification.query()\
136 134 .filter(UserNotification.notification ==
137 135 notification)\
138 136 .filter(UserNotification.user_id == self.u3)\
139 137 .scalar()
140 138
141 139 self.assertEqual(u3notification, None)
142 140
143 141 # notification object is still there
144 142 self.assertEqual(Notification.query().all(), [notification])
145 143
146 144 #u1 and u2 still have assignments
147 145 u1notification = UserNotification.query()\
148 146 .filter(UserNotification.notification ==
149 147 notification)\
150 148 .filter(UserNotification.user_id == self.u1)\
151 149 .scalar()
152 150 self.assertNotEqual(u1notification, None)
153 151 u2notification = UserNotification.query()\
154 152 .filter(UserNotification.notification ==
155 153 notification)\
156 154 .filter(UserNotification.user_id == self.u2)\
157 155 .scalar()
158 156 self.assertNotEqual(u2notification, None)
159 157
160 158 def test_notification_counter(self):
161 159 self._clean_notifications()
162 160 self.assertEqual([], Notification.query().all())
163 161 self.assertEqual([], UserNotification.query().all())
164 162
165 163 NotificationModel().create(created_by=self.u1,
166 164 subject=u'title', body=u'hi there_delete',
167 165 recipients=[self.u3, self.u1])
168 166 Session().commit()
169 167
170 168 self.assertEqual(NotificationModel()
171 169 .get_unread_cnt_for_user(self.u1), 1)
172 170 self.assertEqual(NotificationModel()
173 171 .get_unread_cnt_for_user(self.u2), 0)
174 172 self.assertEqual(NotificationModel()
175 173 .get_unread_cnt_for_user(self.u3), 1)
176 174
177 175 notification = NotificationModel().create(created_by=self.u1,
178 176 subject=u'title', body=u'hi there3',
179 177 recipients=[self.u3, self.u1, self.u2])
180 178 Session().commit()
181 179
182 180 self.assertEqual(NotificationModel()
183 181 .get_unread_cnt_for_user(self.u1), 2)
184 182 self.assertEqual(NotificationModel()
185 183 .get_unread_cnt_for_user(self.u2), 1)
186 184 self.assertEqual(NotificationModel()
187 185 .get_unread_cnt_for_user(self.u3), 2)
@@ -1,529 +1,527 b''
1 import os
2 import unittest
3 1 from rhodecode.tests import *
4 2 from rhodecode.tests.fixture import Fixture
5 3 from rhodecode.model.repos_group import ReposGroupModel
6 4 from rhodecode.model.repo import RepoModel
7 5 from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm,\
8 6 Permission, UserToPerm
9 7 from rhodecode.model.user import UserModel
10 8
11 9 from rhodecode.model.meta import Session
12 10 from rhodecode.model.users_group import UserGroupModel
13 11 from rhodecode.lib.auth import AuthUser
14 12 from rhodecode.model.permission import PermissionModel
15 13
16 14
17 15 fixture = Fixture()
18 16
19 17
20 class TestPermissions(unittest.TestCase):
18 class TestPermissions(BaseTestCase):
21 19 def __init__(self, methodName='runTest'):
22 20 super(TestPermissions, self).__init__(methodName=methodName)
23 21
24 22 def setUp(self):
25 23 self.u1 = UserModel().create_or_update(
26 24 username=u'u1', password=u'qweqwe',
27 25 email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
28 26 )
29 27 self.u2 = UserModel().create_or_update(
30 28 username=u'u2', password=u'qweqwe',
31 29 email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
32 30 )
33 31 self.u3 = UserModel().create_or_update(
34 32 username=u'u3', password=u'qweqwe',
35 33 email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
36 34 )
37 35 self.anon = User.get_default_user()
38 36 self.a1 = UserModel().create_or_update(
39 37 username=u'a1', password=u'qweqwe',
40 38 email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
41 39 )
42 40 Session().commit()
43 41
44 42 def tearDown(self):
45 43 if hasattr(self, 'test_repo'):
46 44 RepoModel().delete(repo=self.test_repo)
47 45
48 46 UserModel().delete(self.u1)
49 47 UserModel().delete(self.u2)
50 48 UserModel().delete(self.u3)
51 49 UserModel().delete(self.a1)
52 50 if hasattr(self, 'g1'):
53 51 ReposGroupModel().delete(self.g1.group_id)
54 52 if hasattr(self, 'g2'):
55 53 ReposGroupModel().delete(self.g2.group_id)
56 54
57 55 if hasattr(self, 'ug1'):
58 56 UserGroupModel().delete(self.ug1, force=True)
59 57
60 58 Session().commit()
61 59
62 60 def test_default_perms_set(self):
63 61 u1_auth = AuthUser(user_id=self.u1.user_id)
64 62 perms = {
65 63 'repositories_groups': {},
66 64 'global': set([u'hg.create.repository', u'repository.read',
67 65 u'hg.register.manual_activate']),
68 66 'repositories': {u'vcs_test_hg': u'repository.read'}
69 67 }
70 68 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
71 69 perms['repositories'][HG_REPO])
72 70 new_perm = 'repository.write'
73 71 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
74 72 perm=new_perm)
75 73 Session().commit()
76 74
77 75 u1_auth = AuthUser(user_id=self.u1.user_id)
78 76 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
79 77 new_perm)
80 78
81 79 def test_default_admin_perms_set(self):
82 80 a1_auth = AuthUser(user_id=self.a1.user_id)
83 81 perms = {
84 82 'repositories_groups': {},
85 83 'global': set([u'hg.admin']),
86 84 'repositories': {u'vcs_test_hg': u'repository.admin'}
87 85 }
88 86 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
89 87 perms['repositories'][HG_REPO])
90 88 new_perm = 'repository.write'
91 89 RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1,
92 90 perm=new_perm)
93 91 Session().commit()
94 92 # cannot really downgrade admins permissions !? they still get's set as
95 93 # admin !
96 94 u1_auth = AuthUser(user_id=self.a1.user_id)
97 95 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
98 96 perms['repositories'][HG_REPO])
99 97
100 98 def test_default_group_perms(self):
101 99 self.g1 = fixture.create_group('test1', skip_if_exists=True)
102 100 self.g2 = fixture.create_group('test2', skip_if_exists=True)
103 101 u1_auth = AuthUser(user_id=self.u1.user_id)
104 102 perms = {
105 103 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
106 104 'global': set(Permission.DEFAULT_USER_PERMISSIONS),
107 105 'repositories': {u'vcs_test_hg': u'repository.read'}
108 106 }
109 107 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
110 108 perms['repositories'][HG_REPO])
111 109 self.assertEqual(u1_auth.permissions['repositories_groups'],
112 110 perms['repositories_groups'])
113 111 self.assertEqual(u1_auth.permissions['global'],
114 112 perms['global'])
115 113
116 114 def test_default_admin_group_perms(self):
117 115 self.g1 = fixture.create_group('test1', skip_if_exists=True)
118 116 self.g2 = fixture.create_group('test2', skip_if_exists=True)
119 117 a1_auth = AuthUser(user_id=self.a1.user_id)
120 118 perms = {
121 119 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'},
122 120 'global': set(['hg.admin']),
123 121 'repositories': {u'vcs_test_hg': 'repository.admin'}
124 122 }
125 123
126 124 self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
127 125 perms['repositories'][HG_REPO])
128 126 self.assertEqual(a1_auth.permissions['repositories_groups'],
129 127 perms['repositories_groups'])
130 128
131 129 def test_propagated_permission_from_users_group_by_explicit_perms_exist(self):
132 130 # make group
133 131 self.ug1 = fixture.create_user_group('G1')
134 132 UserGroupModel().add_user_to_group(self.ug1, self.u1)
135 133
136 134 # set permission to lower
137 135 new_perm = 'repository.none'
138 136 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
139 137 Session().commit()
140 138 u1_auth = AuthUser(user_id=self.u1.user_id)
141 139 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
142 140 new_perm)
143 141
144 142 # grant perm for group this should not override permission from user
145 143 # since it has explicitly set
146 144 new_perm_gr = 'repository.write'
147 145 RepoModel().grant_users_group_permission(repo=HG_REPO,
148 146 group_name=self.ug1,
149 147 perm=new_perm_gr)
150 148 # check perms
151 149 u1_auth = AuthUser(user_id=self.u1.user_id)
152 150 perms = {
153 151 'repositories_groups': {},
154 152 'global': set([u'hg.create.repository', u'repository.read',
155 153 u'hg.register.manual_activate']),
156 154 'repositories': {u'vcs_test_hg': u'repository.read'}
157 155 }
158 156 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
159 157 new_perm)
160 158 self.assertEqual(u1_auth.permissions['repositories_groups'],
161 159 perms['repositories_groups'])
162 160
163 161 def test_propagated_permission_from_users_group(self):
164 162 # make group
165 163 self.ug1 = fixture.create_user_group('G1')
166 164 UserGroupModel().add_user_to_group(self.ug1, self.u3)
167 165
168 166 # grant perm for group this should override default permission from user
169 167 new_perm_gr = 'repository.write'
170 168 RepoModel().grant_users_group_permission(repo=HG_REPO,
171 169 group_name=self.ug1,
172 170 perm=new_perm_gr)
173 171 # check perms
174 172 u3_auth = AuthUser(user_id=self.u3.user_id)
175 173 perms = {
176 174 'repositories_groups': {},
177 175 'global': set([u'hg.create.repository', u'repository.read',
178 176 u'hg.register.manual_activate']),
179 177 'repositories': {u'vcs_test_hg': u'repository.read'}
180 178 }
181 179 self.assertEqual(u3_auth.permissions['repositories'][HG_REPO],
182 180 new_perm_gr)
183 181 self.assertEqual(u3_auth.permissions['repositories_groups'],
184 182 perms['repositories_groups'])
185 183
186 184 def test_propagated_permission_from_users_group_lower_weight(self):
187 185 # make group
188 186 self.ug1 = fixture.create_user_group('G1')
189 187 # add user to group
190 188 UserGroupModel().add_user_to_group(self.ug1, self.u1)
191 189
192 190 # set permission to lower
193 191 new_perm_h = 'repository.write'
194 192 RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
195 193 perm=new_perm_h)
196 194 Session().commit()
197 195 u1_auth = AuthUser(user_id=self.u1.user_id)
198 196 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
199 197 new_perm_h)
200 198
201 199 # grant perm for group this should NOT override permission from user
202 200 # since it's lower than granted
203 201 new_perm_l = 'repository.read'
204 202 RepoModel().grant_users_group_permission(repo=HG_REPO,
205 203 group_name=self.ug1,
206 204 perm=new_perm_l)
207 205 # check perms
208 206 u1_auth = AuthUser(user_id=self.u1.user_id)
209 207 perms = {
210 208 'repositories_groups': {},
211 209 'global': set([u'hg.create.repository', u'repository.read',
212 210 u'hg.register.manual_activate']),
213 211 'repositories': {u'vcs_test_hg': u'repository.write'}
214 212 }
215 213 self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
216 214 new_perm_h)
217 215 self.assertEqual(u1_auth.permissions['repositories_groups'],
218 216 perms['repositories_groups'])
219 217
220 218 def test_repo_in_group_permissions(self):
221 219 self.g1 = fixture.create_group('group1', skip_if_exists=True)
222 220 self.g2 = fixture.create_group('group2', skip_if_exists=True)
223 221 # both perms should be read !
224 222 u1_auth = AuthUser(user_id=self.u1.user_id)
225 223 self.assertEqual(u1_auth.permissions['repositories_groups'],
226 224 {u'group1': u'group.read', u'group2': u'group.read'})
227 225
228 226 a1_auth = AuthUser(user_id=self.anon.user_id)
229 227 self.assertEqual(a1_auth.permissions['repositories_groups'],
230 228 {u'group1': u'group.read', u'group2': u'group.read'})
231 229
232 230 #Change perms to none for both groups
233 231 ReposGroupModel().grant_user_permission(repos_group=self.g1,
234 232 user=self.anon,
235 233 perm='group.none')
236 234 ReposGroupModel().grant_user_permission(repos_group=self.g2,
237 235 user=self.anon,
238 236 perm='group.none')
239 237
240 238 u1_auth = AuthUser(user_id=self.u1.user_id)
241 239 self.assertEqual(u1_auth.permissions['repositories_groups'],
242 240 {u'group1': u'group.none', u'group2': u'group.none'})
243 241
244 242 a1_auth = AuthUser(user_id=self.anon.user_id)
245 243 self.assertEqual(a1_auth.permissions['repositories_groups'],
246 244 {u'group1': u'group.none', u'group2': u'group.none'})
247 245
248 246 # add repo to group
249 247 name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm'])
250 248 self.test_repo = fixture.create_repo(name=name,
251 249 repo_type='hg',
252 250 repos_group=self.g1,
253 251 cur_user=self.u1,)
254 252
255 253 u1_auth = AuthUser(user_id=self.u1.user_id)
256 254 self.assertEqual(u1_auth.permissions['repositories_groups'],
257 255 {u'group1': u'group.none', u'group2': u'group.none'})
258 256
259 257 a1_auth = AuthUser(user_id=self.anon.user_id)
260 258 self.assertEqual(a1_auth.permissions['repositories_groups'],
261 259 {u'group1': u'group.none', u'group2': u'group.none'})
262 260
263 261 #grant permission for u2 !
264 262 ReposGroupModel().grant_user_permission(repos_group=self.g1,
265 263 user=self.u2,
266 264 perm='group.read')
267 265 ReposGroupModel().grant_user_permission(repos_group=self.g2,
268 266 user=self.u2,
269 267 perm='group.read')
270 268 Session().commit()
271 269 self.assertNotEqual(self.u1, self.u2)
272 270 #u1 and anon should have not change perms while u2 should !
273 271 u1_auth = AuthUser(user_id=self.u1.user_id)
274 272 self.assertEqual(u1_auth.permissions['repositories_groups'],
275 273 {u'group1': u'group.none', u'group2': u'group.none'})
276 274
277 275 u2_auth = AuthUser(user_id=self.u2.user_id)
278 276 self.assertEqual(u2_auth.permissions['repositories_groups'],
279 277 {u'group1': u'group.read', u'group2': u'group.read'})
280 278
281 279 a1_auth = AuthUser(user_id=self.anon.user_id)
282 280 self.assertEqual(a1_auth.permissions['repositories_groups'],
283 281 {u'group1': u'group.none', u'group2': u'group.none'})
284 282
285 283 def test_repo_group_user_as_user_group_member(self):
286 284 # create Group1
287 285 self.g1 = fixture.create_group('group1', skip_if_exists=True)
288 286 a1_auth = AuthUser(user_id=self.anon.user_id)
289 287
290 288 self.assertEqual(a1_auth.permissions['repositories_groups'],
291 289 {u'group1': u'group.read'})
292 290
293 291 # set default permission to none
294 292 ReposGroupModel().grant_user_permission(repos_group=self.g1,
295 293 user=self.anon,
296 294 perm='group.none')
297 295 # make group
298 296 self.ug1 = fixture.create_user_group('G1')
299 297 # add user to group
300 298 UserGroupModel().add_user_to_group(self.ug1, self.u1)
301 299 Session().commit()
302 300
303 301 # check if user is in the group
304 302 membrs = [x.user_id for x in UserGroupModel().get(self.ug1.users_group_id).members]
305 303 self.assertEqual(membrs, [self.u1.user_id])
306 304 # add some user to that group
307 305
308 306 # check his permissions
309 307 a1_auth = AuthUser(user_id=self.anon.user_id)
310 308 self.assertEqual(a1_auth.permissions['repositories_groups'],
311 309 {u'group1': u'group.none'})
312 310
313 311 u1_auth = AuthUser(user_id=self.u1.user_id)
314 312 self.assertEqual(u1_auth.permissions['repositories_groups'],
315 313 {u'group1': u'group.none'})
316 314
317 315 # grant ug1 read permissions for
318 316 ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
319 317 group_name=self.ug1,
320 318 perm='group.read')
321 319 Session().commit()
322 320 # check if the
323 321 obj = Session().query(UserGroupRepoGroupToPerm)\
324 322 .filter(UserGroupRepoGroupToPerm.group == self.g1)\
325 323 .filter(UserGroupRepoGroupToPerm.users_group == self.ug1)\
326 324 .scalar()
327 325 self.assertEqual(obj.permission.permission_name, 'group.read')
328 326
329 327 a1_auth = AuthUser(user_id=self.anon.user_id)
330 328
331 329 self.assertEqual(a1_auth.permissions['repositories_groups'],
332 330 {u'group1': u'group.none'})
333 331
334 332 u1_auth = AuthUser(user_id=self.u1.user_id)
335 333 self.assertEqual(u1_auth.permissions['repositories_groups'],
336 334 {u'group1': u'group.read'})
337 335
338 336 def test_inherited_permissions_from_default_on_user_enabled(self):
339 337 user_model = UserModel()
340 338 # enable fork and create on default user
341 339 usr = 'default'
342 340 user_model.revoke_perm(usr, 'hg.create.none')
343 341 user_model.grant_perm(usr, 'hg.create.repository')
344 342 user_model.revoke_perm(usr, 'hg.fork.none')
345 343 user_model.grant_perm(usr, 'hg.fork.repository')
346 344 # make sure inherit flag is turned on
347 345 self.u1.inherit_default_permissions = True
348 346 Session().commit()
349 347 u1_auth = AuthUser(user_id=self.u1.user_id)
350 348 # this user will have inherited permissions from default user
351 349 self.assertEqual(u1_auth.permissions['global'],
352 350 set(['hg.create.repository', 'hg.fork.repository',
353 351 'hg.register.manual_activate',
354 352 'hg.extern_activate.auto',
355 353 'repository.read', 'group.read',
356 354 'usergroup.read']))
357 355
358 356 def test_inherited_permissions_from_default_on_user_disabled(self):
359 357 user_model = UserModel()
360 358 # disable fork and create on default user
361 359 usr = 'default'
362 360 user_model.revoke_perm(usr, 'hg.create.repository')
363 361 user_model.grant_perm(usr, 'hg.create.none')
364 362 user_model.revoke_perm(usr, 'hg.fork.repository')
365 363 user_model.grant_perm(usr, 'hg.fork.none')
366 364 # make sure inherit flag is turned on
367 365 self.u1.inherit_default_permissions = True
368 366 Session().commit()
369 367 u1_auth = AuthUser(user_id=self.u1.user_id)
370 368 # this user will have inherited permissions from default user
371 369 self.assertEqual(u1_auth.permissions['global'],
372 370 set(['hg.create.none', 'hg.fork.none',
373 371 'hg.register.manual_activate',
374 372 'hg.extern_activate.auto',
375 373 'repository.read', 'group.read',
376 374 'usergroup.read']))
377 375
378 376 def test_non_inherited_permissions_from_default_on_user_enabled(self):
379 377 user_model = UserModel()
380 378 # enable fork and create on default user
381 379 usr = 'default'
382 380 user_model.revoke_perm(usr, 'hg.create.none')
383 381 user_model.grant_perm(usr, 'hg.create.repository')
384 382 user_model.revoke_perm(usr, 'hg.fork.none')
385 383 user_model.grant_perm(usr, 'hg.fork.repository')
386 384
387 385 #disable global perms on specific user
388 386 user_model.revoke_perm(self.u1, 'hg.create.repository')
389 387 user_model.grant_perm(self.u1, 'hg.create.none')
390 388 user_model.revoke_perm(self.u1, 'hg.fork.repository')
391 389 user_model.grant_perm(self.u1, 'hg.fork.none')
392 390
393 391 # make sure inherit flag is turned off
394 392 self.u1.inherit_default_permissions = False
395 393 Session().commit()
396 394 u1_auth = AuthUser(user_id=self.u1.user_id)
397 395 # this user will have non inherited permissions from he's
398 396 # explicitly set permissions
399 397 self.assertEqual(u1_auth.permissions['global'],
400 398 set(['hg.create.none', 'hg.fork.none',
401 399 'hg.register.manual_activate',
402 400 'hg.extern_activate.auto',
403 401 'repository.read', 'group.read',
404 402 'usergroup.read']))
405 403
406 404 def test_non_inherited_permissions_from_default_on_user_disabled(self):
407 405 user_model = UserModel()
408 406 # disable fork and create on default user
409 407 usr = 'default'
410 408 user_model.revoke_perm(usr, 'hg.create.repository')
411 409 user_model.grant_perm(usr, 'hg.create.none')
412 410 user_model.revoke_perm(usr, 'hg.fork.repository')
413 411 user_model.grant_perm(usr, 'hg.fork.none')
414 412
415 413 #enable global perms on specific user
416 414 user_model.revoke_perm(self.u1, 'hg.create.none')
417 415 user_model.grant_perm(self.u1, 'hg.create.repository')
418 416 user_model.revoke_perm(self.u1, 'hg.fork.none')
419 417 user_model.grant_perm(self.u1, 'hg.fork.repository')
420 418
421 419 # make sure inherit flag is turned off
422 420 self.u1.inherit_default_permissions = False
423 421 Session().commit()
424 422 u1_auth = AuthUser(user_id=self.u1.user_id)
425 423 # this user will have non inherited permissions from he's
426 424 # explicitly set permissions
427 425 self.assertEqual(u1_auth.permissions['global'],
428 426 set(['hg.create.repository', 'hg.fork.repository',
429 427 'hg.register.manual_activate',
430 428 'hg.extern_activate.auto',
431 429 'repository.read', 'group.read',
432 430 'usergroup.read']))
433 431
434 432 def test_owner_permissions_doesnot_get_overwritten_by_group(self):
435 433 #create repo as USER,
436 434 self.test_repo = fixture.create_repo(name='myownrepo',
437 435 repo_type='hg',
438 436 cur_user=self.u1)
439 437
440 438 #he has permissions of admin as owner
441 439 u1_auth = AuthUser(user_id=self.u1.user_id)
442 440 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
443 441 'repository.admin')
444 442 #set his permission as user group, he should still be admin
445 443 self.ug1 = fixture.create_user_group('G1')
446 444 UserGroupModel().add_user_to_group(self.ug1, self.u1)
447 445 RepoModel().grant_users_group_permission(self.test_repo,
448 446 group_name=self.ug1,
449 447 perm='repository.none')
450 448
451 449 Session().commit()
452 450 u1_auth = AuthUser(user_id=self.u1.user_id)
453 451 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
454 452 'repository.admin')
455 453
456 454 def test_owner_permissions_doesnot_get_overwritten_by_others(self):
457 455 #create repo as USER,
458 456 self.test_repo = fixture.create_repo(name='myownrepo',
459 457 repo_type='hg',
460 458 cur_user=self.u1)
461 459
462 460 #he has permissions of admin as owner
463 461 u1_auth = AuthUser(user_id=self.u1.user_id)
464 462 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
465 463 'repository.admin')
466 464 #set his permission as user, he should still be admin
467 465 RepoModel().grant_user_permission(self.test_repo, user=self.u1,
468 466 perm='repository.none')
469 467 Session().commit()
470 468 u1_auth = AuthUser(user_id=self.u1.user_id)
471 469 self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
472 470 'repository.admin')
473 471
474 472 def _test_def_perm_equal(self, user, change_factor=0):
475 473 perms = UserToPerm.query()\
476 474 .filter(UserToPerm.user == user)\
477 475 .all()
478 476 self.assertEqual(len(perms),
479 477 len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor,
480 478 msg=perms)
481 479
482 480 def test_set_default_permissions(self):
483 481 PermissionModel().create_default_permissions(user=self.u1)
484 482 self._test_def_perm_equal(user=self.u1)
485 483
486 484 def test_set_default_permissions_after_one_is_missing(self):
487 485 PermissionModel().create_default_permissions(user=self.u1)
488 486 self._test_def_perm_equal(user=self.u1)
489 487 #now we delete one, it should be re-created after another call
490 488 perms = UserToPerm.query()\
491 489 .filter(UserToPerm.user == self.u1)\
492 490 .all()
493 491 Session().delete(perms[0])
494 492 Session().commit()
495 493
496 494 self._test_def_perm_equal(user=self.u1, change_factor=-1)
497 495
498 496 #create missing one !
499 497 PermissionModel().create_default_permissions(user=self.u1)
500 498 self._test_def_perm_equal(user=self.u1)
501 499
502 500 @parameterized.expand([
503 501 ('repository.read', 'repository.none'),
504 502 ('group.read', 'group.none'),
505 503 ('usergroup.read', 'usergroup.none'),
506 504 ('hg.create.repository', 'hg.create.none'),
507 505 ('hg.fork.repository', 'hg.fork.none'),
508 506 ('hg.register.manual_activate', 'hg.register.auto_activate',)
509 507 ])
510 508 def test_set_default_permissions_after_modification(self, perm, modify_to):
511 509 PermissionModel().create_default_permissions(user=self.u1)
512 510 self._test_def_perm_equal(user=self.u1)
513 511
514 512 old = Permission.get_by_key(perm)
515 513 new = Permission.get_by_key(modify_to)
516 514 self.assertNotEqual(old, None)
517 515 self.assertNotEqual(new, None)
518 516
519 517 #now modify permissions
520 518 p = UserToPerm.query()\
521 519 .filter(UserToPerm.user == self.u1)\
522 520 .filter(UserToPerm.permission == old)\
523 521 .one()
524 522 p.permission = new
525 523 Session().add(p)
526 524 Session().commit()
527 525
528 526 PermissionModel().create_default_permissions(user=self.u1)
529 527 self._test_def_perm_equal(user=self.u1)
@@ -1,79 +1,77 b''
1 import os
2 import unittest
3 1 from rhodecode.tests import *
4 2
5 3 from rhodecode.model.meta import Session
6 4 from rhodecode.tests.fixture import Fixture
7 5 from rhodecode.model.repo import RepoModel
8 6 from rhodecode.model.db import Repository
9 7 from rhodecode.lib.exceptions import AttachedForksError
10 8
11 9 fixture = Fixture()
12 10
13 11
14 class TestRepos(unittest.TestCase):
12 class TestRepos(BaseTestCase):
15 13
16 14 def setUp(self):
17 15 pass
18 16
19 17 def tearDown(self):
20 18 Session.remove()
21 19
22 20 def test_remove_repo(self):
23 21 repo = fixture.create_repo(name='test-repo-1')
24 22 Session().commit()
25 23
26 24 RepoModel().delete(repo=repo)
27 25 Session().commit()
28 26
29 27 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
30 28
31 29 def test_remove_repo_repo_raises_exc_when_attached_forks(self):
32 30 repo = fixture.create_repo(name='test-repo-1')
33 31 Session().commit()
34 32
35 33 fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
36 34 Session().commit()
37 35
38 36 self.assertRaises(AttachedForksError, lambda: RepoModel().delete(repo=repo))
39 37
40 38 def test_remove_repo_delete_forks(self):
41 39 repo = fixture.create_repo(name='test-repo-1')
42 40 Session().commit()
43 41
44 42 fork = fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
45 43 Session().commit()
46 44
47 45 #fork of fork
48 46 fixture.create_fork(fork.repo_name, 'test-repo-fork-fork-1')
49 47 Session().commit()
50 48
51 49 RepoModel().delete(repo=repo, forks='delete')
52 50 Session().commit()
53 51
54 52 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
55 53 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-1'))
56 54 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-fork-1'))
57 55
58 56 def test_remove_repo_detach_forks(self):
59 57 repo = fixture.create_repo(name='test-repo-1')
60 58 Session().commit()
61 59
62 60 fork = fixture.create_fork(repo.repo_name, 'test-repo-fork-1')
63 61 Session().commit()
64 62
65 63 #fork of fork
66 64 fixture.create_fork(fork.repo_name, 'test-repo-fork-fork-1')
67 65 Session().commit()
68 66
69 67 RepoModel().delete(repo=repo, forks='detach')
70 68 Session().commit()
71 69
72 70 try:
73 71 self.assertEqual(None, Repository.get_by_repo_name(repo_name='test-repo-1'))
74 72 self.assertNotEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-1'))
75 73 self.assertNotEqual(None, Repository.get_by_repo_name(repo_name='test-repo-fork-fork-1'))
76 74 finally:
77 75 RepoModel().delete(repo='test-repo-fork-fork-1')
78 76 RepoModel().delete(repo='test-repo-fork-1')
79 77 Session().commit()
@@ -1,200 +1,199 b''
1 1 import os
2 import unittest
3 2 from sqlalchemy.exc import IntegrityError
4 3
5 4 from rhodecode.tests import *
6 5 from rhodecode.tests.fixture import Fixture
7 6
8 7 from rhodecode.model.repos_group import ReposGroupModel
9 8 from rhodecode.model.repo import RepoModel
10 9 from rhodecode.model.db import RepoGroup
11 10 from rhodecode.model.meta import Session
12 11
13 12
14 13 fixture = Fixture()
15 14
16 15
17 16 def _update_group(id_, group_name, desc='desc', parent_id=None):
18 17 form_data = fixture._get_group_create_params(group_name=group_name,
19 18 group_desc=desc,
20 19 group_parent_id=parent_id)
21 20 gr = ReposGroupModel().update(id_, form_data)
22 21 return gr
23 22
24 23
25 24 def _update_repo(name, **kwargs):
26 25 form_data = fixture._get_repo_create_params(**kwargs)
27 26 if not 'repo_name' in kwargs:
28 27 form_data['repo_name'] = name
29 28 if not 'perms_new' in kwargs:
30 29 form_data['perms_new'] = []
31 30 if not 'perms_updates' in kwargs:
32 31 form_data['perms_updates'] = []
33 32 r = RepoModel().update(name, **form_data)
34 33 return r
35 34
36 35
37 class TestReposGroups(unittest.TestCase):
36 class TestReposGroups(BaseTestCase):
38 37
39 38 def setUp(self):
40 39 self.g1 = fixture.create_group('test1', skip_if_exists=True)
41 40 self.g2 = fixture.create_group('test2', skip_if_exists=True)
42 41 self.g3 = fixture.create_group('test3', skip_if_exists=True)
43 42
44 43 def tearDown(self):
45 44 Session.remove()
46 45
47 46 def __check_path(self, *path):
48 47 """
49 48 Checks the path for existance !
50 49 """
51 50 path = [TESTS_TMP_PATH] + list(path)
52 51 path = os.path.join(*path)
53 52 return os.path.isdir(path)
54 53
55 54 def _check_folders(self):
56 55 print os.listdir(TESTS_TMP_PATH)
57 56
58 57 def __delete_group(self, id_):
59 58 ReposGroupModel().delete(id_)
60 59
61 60 def test_create_group(self):
62 61 g = fixture.create_group('newGroup')
63 62 Session().commit()
64 63 self.assertEqual(g.full_path, 'newGroup')
65 64
66 65 self.assertTrue(self.__check_path('newGroup'))
67 66
68 67 def test_create_same_name_group(self):
69 68 self.assertRaises(IntegrityError, lambda: fixture.create_group('newGroup'))
70 69 Session().rollback()
71 70
72 71 def test_same_subgroup(self):
73 72 sg1 = fixture.create_group('sub1', group_parent_id=self.g1.group_id)
74 73 self.assertEqual(sg1.parent_group, self.g1)
75 74 self.assertEqual(sg1.full_path, 'test1/sub1')
76 75 self.assertTrue(self.__check_path('test1', 'sub1'))
77 76
78 77 ssg1 = fixture.create_group('subsub1', group_parent_id=sg1.group_id)
79 78 self.assertEqual(ssg1.parent_group, sg1)
80 79 self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
81 80 self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
82 81
83 82 def test_remove_group(self):
84 83 sg1 = fixture.create_group('deleteme')
85 84 self.__delete_group(sg1.group_id)
86 85
87 86 self.assertEqual(RepoGroup.get(sg1.group_id), None)
88 87 self.assertFalse(self.__check_path('deteteme'))
89 88
90 89 sg1 = fixture.create_group('deleteme', group_parent_id=self.g1.group_id)
91 90 self.__delete_group(sg1.group_id)
92 91
93 92 self.assertEqual(RepoGroup.get(sg1.group_id), None)
94 93 self.assertFalse(self.__check_path('test1', 'deteteme'))
95 94
96 95 def test_rename_single_group(self):
97 96 sg1 = fixture.create_group('initial')
98 97
99 98 new_sg1 = _update_group(sg1.group_id, 'after')
100 99 self.assertTrue(self.__check_path('after'))
101 100 self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
102 101
103 102 def test_update_group_parent(self):
104 103
105 104 sg1 = fixture.create_group('initial', group_parent_id=self.g1.group_id)
106 105
107 106 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
108 107 self.assertTrue(self.__check_path('test1', 'after'))
109 108 self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
110 109
111 110 new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
112 111 self.assertTrue(self.__check_path('test3', 'after'))
113 112 self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
114 113
115 114 new_sg1 = _update_group(sg1.group_id, 'hello')
116 115 self.assertTrue(self.__check_path('hello'))
117 116
118 117 self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
119 118
120 119 def test_subgrouping_with_repo(self):
121 120
122 121 g1 = fixture.create_group('g1')
123 122 g2 = fixture.create_group('g2')
124 123 # create new repo
125 124 r = fixture.create_repo('john')
126 125
127 126 self.assertEqual(r.repo_name, 'john')
128 127 # put repo into group
129 128 r = _update_repo('john', repo_group=g1.group_id)
130 129 Session().commit()
131 130 self.assertEqual(r.repo_name, 'g1/john')
132 131
133 132 _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
134 133 self.assertTrue(self.__check_path('g2', 'g1'))
135 134
136 135 # test repo
137 136 self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
138 137 r.just_name]))
139 138
140 139 def test_move_to_root(self):
141 140 g1 = fixture.create_group('t11')
142 141 g2 = fixture.create_group('t22', group_parent_id=g1.group_id)
143 142
144 143 self.assertEqual(g2.full_path, 't11/t22')
145 144 self.assertTrue(self.__check_path('t11', 't22'))
146 145
147 146 g2 = _update_group(g2.group_id, 'g22', parent_id=None)
148 147 Session().commit()
149 148
150 149 self.assertEqual(g2.group_name, 'g22')
151 150 # we moved out group from t1 to '' so it's full path should be 'g2'
152 151 self.assertEqual(g2.full_path, 'g22')
153 152 self.assertFalse(self.__check_path('t11', 't22'))
154 153 self.assertTrue(self.__check_path('g22'))
155 154
156 155 def test_rename_top_level_group_in_nested_setup(self):
157 156 g1 = fixture.create_group('L1')
158 157 g2 = fixture.create_group('L2', group_parent_id=g1.group_id)
159 158 g3 = fixture.create_group('L3', group_parent_id=g2.group_id)
160 159
161 160 r = fixture.create_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
162 161
163 162 ##rename L1 all groups should be now changed
164 163 _update_group(g1.group_id, 'L1_NEW')
165 164 Session().commit()
166 165 self.assertEqual(g1.full_path, 'L1_NEW')
167 166 self.assertEqual(g2.full_path, 'L1_NEW/L2')
168 167 self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
169 168 self.assertEqual(r.repo_name, 'L1_NEW/L2/L3/L3_REPO')
170 169
171 170 def test_change_parent_of_top_level_group_in_nested_setup(self):
172 171 g1 = fixture.create_group('R1')
173 172 g2 = fixture.create_group('R2', group_parent_id=g1.group_id)
174 173 g3 = fixture.create_group('R3', group_parent_id=g2.group_id)
175 174 g4 = fixture.create_group('R1_NEW')
176 175
177 176 r = fixture.create_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
178 177 ##rename L1 all groups should be now changed
179 178 _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
180 179 Session().commit()
181 180 self.assertEqual(g1.full_path, 'R1_NEW/R1')
182 181 self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
183 182 self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
184 183 self.assertEqual(r.repo_name, 'R1_NEW/R1/R2/R3/R3_REPO')
185 184
186 185 def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
187 186 g1 = fixture.create_group('X1')
188 187 g2 = fixture.create_group('X2', group_parent_id=g1.group_id)
189 188 g3 = fixture.create_group('X3', group_parent_id=g2.group_id)
190 189 g4 = fixture.create_group('X1_NEW')
191 190
192 191 r = fixture.create_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
193 192
194 193 ##rename L1 all groups should be now changed
195 194 _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
196 195 Session().commit()
197 196 self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
198 197 self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
199 198 self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
200 199 self.assertEqual(r.repo_name, 'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
@@ -1,165 +1,162 b''
1 import os
2 import unittest
3 1 import functools
4 2 from rhodecode.tests import *
5 3
6 4 from rhodecode.model.repos_group import ReposGroupModel
7 5 from rhodecode.model.db import RepoGroup, Repository, User
8 6
9 7 from rhodecode.model.meta import Session
10 8 from nose.tools import with_setup
11 9 from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
12 10 _get_perms, _check_expected_count, expected_count, _destroy_project_tree
13 from rhodecode.model.repo import RepoModel
14 11
15 12
16 13 test_u1_id = None
17 14 _get_repo_perms = None
18 15 _get_group_perms = None
19 16
20 17
21 18 def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
22 19 """
23 20 Resets all permissions to perm attribute
24 21 """
25 22 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
26 23 if not repos_group:
27 24 raise Exception('Cannot get group %s' % group_name)
28 25 perms_updates = [[test_u1_id, perm, 'user']]
29 26 ReposGroupModel()._update_permissions(repos_group,
30 27 perms_updates=perms_updates,
31 28 recursive=recursive,
32 29 check_perms=False)
33 30 Session().commit()
34 31
35 32
36 33 def setup_module():
37 34 global test_u1_id, _get_repo_perms, _get_group_perms
38 35 test_u1 = _create_project_tree()
39 36 Session().commit()
40 37 test_u1_id = test_u1.user_id
41 38 _get_repo_perms = functools.partial(_get_perms, key='repositories',
42 39 test_u1_id=test_u1_id)
43 40 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
44 41 test_u1_id=test_u1_id)
45 42
46 43
47 44 def teardown_module():
48 45 _destroy_project_tree(test_u1_id)
49 46
50 47
51 48 @with_setup(permissions_setup_func)
52 49 def test_user_permissions_on_group_without_recursive_mode():
53 50 # set permission to g0 non-recursive mode
54 51 recursive = False
55 52 group = 'g0'
56 53 permissions_setup_func(group, 'group.write', recursive=recursive)
57 54
58 55 items = [x for x in _get_repo_perms(group, recursive)]
59 56 expected = 0
60 57 assert len(items) == expected, ' %s != %s' % (len(items), expected)
61 58 for name, perm in items:
62 59 yield check_tree_perms, name, perm, group, 'repository.read'
63 60
64 61 items = [x for x in _get_group_perms(group, recursive)]
65 62 expected = 1
66 63 assert len(items) == expected, ' %s != %s' % (len(items), expected)
67 64 for name, perm in items:
68 65 yield check_tree_perms, name, perm, group, 'group.write'
69 66
70 67
71 68 @with_setup(permissions_setup_func)
72 69 def test_user_permissions_on_group_without_recursive_mode_subgroup():
73 70 # set permission to g0 non-recursive mode
74 71 recursive = False
75 72 group = 'g0/g0_1'
76 73 permissions_setup_func(group, 'group.write', recursive=recursive)
77 74
78 75 items = [x for x in _get_repo_perms(group, recursive)]
79 76 expected = 0
80 77 assert len(items) == expected, ' %s != %s' % (len(items), expected)
81 78 for name, perm in items:
82 79 yield check_tree_perms, name, perm, group, 'repository.read'
83 80
84 81 items = [x for x in _get_group_perms(group, recursive)]
85 82 expected = 1
86 83 assert len(items) == expected, ' %s != %s' % (len(items), expected)
87 84 for name, perm in items:
88 85 yield check_tree_perms, name, perm, group, 'group.write'
89 86
90 87
91 88 @with_setup(permissions_setup_func)
92 89 def test_user_permissions_on_group_with_recursive_mode():
93 90
94 91 # set permission to g0 recursive mode, all children including
95 92 # other repos and groups should have this permission now set !
96 93 recursive = True
97 94 group = 'g0'
98 95 permissions_setup_func(group, 'group.write', recursive=recursive)
99 96
100 97 repo_items = [x for x in _get_repo_perms(group, recursive)]
101 98 items = [x for x in _get_group_perms(group, recursive)]
102 99 _check_expected_count(items, repo_items, expected_count(group, True))
103 100
104 101 for name, perm in repo_items:
105 102 if name == 'g0/g0_3/g0_3_r1_private':
106 103 yield check_tree_perms, name, perm, group, 'repository.none'
107 104 else:
108 105 yield check_tree_perms, name, perm, group, 'repository.write'
109 106
110 107 for name, perm in items:
111 108 yield check_tree_perms, name, perm, group, 'group.write'
112 109
113 110
114 111 @with_setup(permissions_setup_func)
115 112 def test_user_permissions_on_group_with_recursive_mode_inner_group():
116 113 ## set permission to g0_3 group to none
117 114 recursive = True
118 115 group = 'g0/g0_3'
119 116 permissions_setup_func(group, 'group.none', recursive=recursive)
120 117
121 118 repo_items = [x for x in _get_repo_perms(group, recursive)]
122 119 items = [x for x in _get_group_perms(group, recursive)]
123 120 _check_expected_count(items, repo_items, expected_count(group, True))
124 121
125 122 for name, perm in repo_items:
126 123 yield check_tree_perms, name, perm, group, 'repository.none'
127 124
128 125 for name, perm in items:
129 126 yield check_tree_perms, name, perm, group, 'group.none'
130 127
131 128
132 129 @with_setup(permissions_setup_func)
133 130 def test_user_permissions_on_group_with_recursive_mode_deepest():
134 131 ## set permission to g0_3 group to none
135 132 recursive = True
136 133 group = 'g0/g0_1/g0_1_1'
137 134 permissions_setup_func(group, 'group.write', recursive=recursive)
138 135
139 136 repo_items = [x for x in _get_repo_perms(group, recursive)]
140 137 items = [x for x in _get_group_perms(group, recursive)]
141 138 _check_expected_count(items, repo_items, expected_count(group, True))
142 139
143 140 for name, perm in repo_items:
144 141 yield check_tree_perms, name, perm, group, 'repository.write'
145 142
146 143 for name, perm in items:
147 144 yield check_tree_perms, name, perm, group, 'group.write'
148 145
149 146
150 147 @with_setup(permissions_setup_func)
151 148 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
152 149 ## set permission to g0_3 group to none
153 150 recursive = True
154 151 group = 'g0/g0_2'
155 152 permissions_setup_func(group, 'group.admin', recursive=recursive)
156 153
157 154 repo_items = [x for x in _get_repo_perms(group, recursive)]
158 155 items = [x for x in _get_group_perms(group, recursive)]
159 156 _check_expected_count(items, repo_items, expected_count(group, True))
160 157
161 158 for name, perm in repo_items:
162 159 yield check_tree_perms, name, perm, group, 'repository.admin'
163 160
164 161 for name, perm in items:
165 162 yield check_tree_perms, name, perm, group, 'group.admin'
@@ -1,131 +1,130 b''
1 import unittest
2 1 from rhodecode.tests import *
3 2
4 3 from rhodecode.model.db import User, UserGroup, UserGroupMember, UserEmailMap,\
5 4 Permission
6 5 from rhodecode.model.user import UserModel
7 6
8 7 from rhodecode.model.meta import Session
9 8 from rhodecode.model.users_group import UserGroupModel
10 9 from rhodecode.tests.fixture import Fixture
11 10
12 11 fixture = Fixture()
13 12
14 13
15 class TestUser(unittest.TestCase):
14 class TestUser(BaseTestCase):
16 15 def __init__(self, methodName='runTest'):
17 16 Session.remove()
18 17 super(TestUser, self).__init__(methodName=methodName)
19 18
20 19 def tearDown(self):
21 20 Session.remove()
22 21
23 22 def test_create_and_remove(self):
24 23 usr = UserModel().create_or_update(username=u'test_user',
25 24 password=u'qweqwe',
26 25 email=u'u232@rhodecode.org',
27 26 firstname=u'u1', lastname=u'u1')
28 27 Session().commit()
29 28 self.assertEqual(User.get_by_username(u'test_user'), usr)
30 29
31 30 # make user group
32 31 users_group = fixture.create_user_group('some_example_group')
33 32 Session().commit()
34 33
35 34 UserGroupModel().add_user_to_group(users_group, usr)
36 35 Session().commit()
37 36
38 37 self.assertEqual(UserGroup.get(users_group.users_group_id), users_group)
39 38 self.assertEqual(UserGroupMember.query().count(), 1)
40 39 UserModel().delete(usr.user_id)
41 40 Session().commit()
42 41
43 42 self.assertEqual(UserGroupMember.query().all(), [])
44 43
45 44 def test_additonal_email_as_main(self):
46 45 usr = UserModel().create_or_update(username=u'test_user',
47 46 password=u'qweqwe',
48 47 email=u'main_email@rhodecode.org',
49 48 firstname=u'u1', lastname=u'u1')
50 49 Session().commit()
51 50
52 51 def do():
53 52 m = UserEmailMap()
54 53 m.email = u'main_email@rhodecode.org'
55 54 m.user = usr
56 55 Session().add(m)
57 56 Session().commit()
58 57 self.assertRaises(AttributeError, do)
59 58
60 59 UserModel().delete(usr.user_id)
61 60 Session().commit()
62 61
63 62 def test_extra_email_map(self):
64 63 usr = UserModel().create_or_update(username=u'test_user',
65 64 password=u'qweqwe',
66 65 email=u'main_email@rhodecode.org',
67 66 firstname=u'u1', lastname=u'u1')
68 67 Session().commit()
69 68
70 69 m = UserEmailMap()
71 70 m.email = u'main_email2@rhodecode.org'
72 71 m.user = usr
73 72 Session().add(m)
74 73 Session().commit()
75 74
76 75 u = User.get_by_email(email='main_email@rhodecode.org')
77 76 self.assertEqual(usr.user_id, u.user_id)
78 77 self.assertEqual(usr.username, u.username)
79 78
80 79 u = User.get_by_email(email='main_email2@rhodecode.org')
81 80 self.assertEqual(usr.user_id, u.user_id)
82 81 self.assertEqual(usr.username, u.username)
83 82 u = User.get_by_email(email='main_email3@rhodecode.org')
84 83 self.assertEqual(None, u)
85 84
86 85 UserModel().delete(usr.user_id)
87 86 Session().commit()
88 87
89 88
90 class TestUsers(unittest.TestCase):
89 class TestUsers(BaseTestCase):
91 90
92 91 def __init__(self, methodName='runTest'):
93 92 super(TestUsers, self).__init__(methodName=methodName)
94 93
95 94 def setUp(self):
96 95 self.u1 = UserModel().create_or_update(username=u'u1',
97 96 password=u'qweqwe',
98 97 email=u'u1@rhodecode.org',
99 98 firstname=u'u1', lastname=u'u1')
100 99
101 100 def tearDown(self):
102 101 perm = Permission.query().all()
103 102 for p in perm:
104 103 UserModel().revoke_perm(self.u1, p)
105 104
106 105 UserModel().delete(self.u1)
107 106 Session().commit()
108 107 Session.remove()
109 108
110 109 def test_add_perm(self):
111 110 perm = Permission.query().all()[0]
112 111 UserModel().grant_perm(self.u1, perm)
113 112 Session().commit()
114 113 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
115 114
116 115 def test_has_perm(self):
117 116 perm = Permission.query().all()
118 117 for p in perm:
119 118 has_p = UserModel().has_perm(self.u1, p)
120 119 self.assertEqual(False, has_p)
121 120
122 121 def test_revoke_perm(self):
123 122 perm = Permission.query().all()[0]
124 123 UserModel().grant_perm(self.u1, perm)
125 124 Session().commit()
126 125 self.assertEqual(UserModel().has_perm(self.u1, perm), True)
127 126
128 127 #revoke
129 128 UserModel().revoke_perm(self.u1, perm)
130 129 Session().commit()
131 130 self.assertEqual(UserModel().has_perm(self.u1, perm), False)
@@ -1,172 +1,170 b''
1 import os
2 import unittest
3 1 import functools
4 2 from rhodecode.tests import *
5 3
6 4 from rhodecode.model.repos_group import ReposGroupModel
7 5 from rhodecode.model.db import RepoGroup
8 6
9 7 from rhodecode.model.meta import Session
10 8 from nose.tools import with_setup
11 9 from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
12 10 _get_perms, _check_expected_count, expected_count, _destroy_project_tree
13 11 from rhodecode.model.users_group import UserGroupModel
14 12 from rhodecode.tests.fixture import Fixture
15 13
16 14 fixture = Fixture()
17 15
18 16 test_u2_id = None
19 17 test_u2_gr_id = None
20 18 _get_repo_perms = None
21 19 _get_group_perms = None
22 20
23 21
24 22 def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
25 23 """
26 24 Resets all permissions to perm attribute
27 25 """
28 26 repos_group = RepoGroup.get_by_group_name(group_name=group_name)
29 27 if not repos_group:
30 28 raise Exception('Cannot get group %s' % group_name)
31 29 perms_updates = [[test_u2_gr_id, perm, 'users_group']]
32 30 ReposGroupModel()._update_permissions(repos_group,
33 31 perms_updates=perms_updates,
34 32 recursive=recursive,
35 33 check_perms=False)
36 34 Session().commit()
37 35
38 36
39 37 def setup_module():
40 38 global test_u2_id, test_u2_gr_id, _get_repo_perms, _get_group_perms
41 39 test_u2 = _create_project_tree()
42 40 Session().commit()
43 41 test_u2_id = test_u2.user_id
44 42
45 43 gr1 = fixture.create_user_group('perms_group_1')
46 44 Session().commit()
47 45 test_u2_gr_id = gr1.users_group_id
48 46 UserGroupModel().add_user_to_group(gr1, user=test_u2_id)
49 47 Session().commit()
50 48
51 49 _get_repo_perms = functools.partial(_get_perms, key='repositories',
52 50 test_u1_id=test_u2_id)
53 51 _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
54 52 test_u1_id=test_u2_id)
55 53
56 54
57 55 def teardown_module():
58 56 _destroy_project_tree(test_u2_id)
59 57
60 58
61 59 @with_setup(permissions_setup_func)
62 60 def test_user_permissions_on_group_without_recursive_mode():
63 61 # set permission to g0 non-recursive mode
64 62 recursive = False
65 63 group = 'g0'
66 64 permissions_setup_func(group, 'group.write', recursive=recursive)
67 65
68 66 items = [x for x in _get_repo_perms(group, recursive)]
69 67 expected = 0
70 68 assert len(items) == expected, ' %s != %s' % (len(items), expected)
71 69 for name, perm in items:
72 70 yield check_tree_perms, name, perm, group, 'repository.read'
73 71
74 72 items = [x for x in _get_group_perms(group, recursive)]
75 73 expected = 1
76 74 assert len(items) == expected, ' %s != %s' % (len(items), expected)
77 75 for name, perm in items:
78 76 yield check_tree_perms, name, perm, group, 'group.write'
79 77
80 78
81 79 @with_setup(permissions_setup_func)
82 80 def test_user_permissions_on_group_without_recursive_mode_subgroup():
83 81 # set permission to g0 non-recursive mode
84 82 recursive = False
85 83 group = 'g0/g0_1'
86 84 permissions_setup_func(group, 'group.write', recursive=recursive)
87 85
88 86 items = [x for x in _get_repo_perms(group, recursive)]
89 87 expected = 0
90 88 assert len(items) == expected, ' %s != %s' % (len(items), expected)
91 89 for name, perm in items:
92 90 yield check_tree_perms, name, perm, group, 'repository.read'
93 91
94 92 items = [x for x in _get_group_perms(group, recursive)]
95 93 expected = 1
96 94 assert len(items) == expected, ' %s != %s' % (len(items), expected)
97 95 for name, perm in items:
98 96 yield check_tree_perms, name, perm, group, 'group.write'
99 97
100 98
101 99 @with_setup(permissions_setup_func)
102 100 def test_user_permissions_on_group_with_recursive_mode():
103 101
104 102 # set permission to g0 recursive mode, all children including
105 103 # other repos and groups should have this permission now set !
106 104 recursive = True
107 105 group = 'g0'
108 106 permissions_setup_func(group, 'group.write', recursive=recursive)
109 107
110 108 repo_items = [x for x in _get_repo_perms(group, recursive)]
111 109 items = [x for x in _get_group_perms(group, recursive)]
112 110 _check_expected_count(items, repo_items, expected_count(group, True))
113 111
114 112 for name, perm in repo_items:
115 113 yield check_tree_perms, name, perm, group, 'repository.write'
116 114
117 115 for name, perm in items:
118 116 yield check_tree_perms, name, perm, group, 'group.write'
119 117
120 118
121 119 @with_setup(permissions_setup_func)
122 120 def test_user_permissions_on_group_with_recursive_mode_inner_group():
123 121 ## set permission to g0_3 group to none
124 122 recursive = True
125 123 group = 'g0/g0_3'
126 124 permissions_setup_func(group, 'group.none', recursive=recursive)
127 125
128 126 repo_items = [x for x in _get_repo_perms(group, recursive)]
129 127 items = [x for x in _get_group_perms(group, recursive)]
130 128 _check_expected_count(items, repo_items, expected_count(group, True))
131 129
132 130 for name, perm in repo_items:
133 131 yield check_tree_perms, name, perm, group, 'repository.none'
134 132
135 133 for name, perm in items:
136 134 yield check_tree_perms, name, perm, group, 'group.none'
137 135
138 136
139 137 @with_setup(permissions_setup_func)
140 138 def test_user_permissions_on_group_with_recursive_mode_deepest():
141 139 ## set permission to g0_3 group to none
142 140 recursive = True
143 141 group = 'g0/g0_1/g0_1_1'
144 142 permissions_setup_func(group, 'group.write', recursive=recursive)
145 143
146 144 repo_items = [x for x in _get_repo_perms(group, recursive)]
147 145 items = [x for x in _get_group_perms(group, recursive)]
148 146 _check_expected_count(items, repo_items, expected_count(group, True))
149 147
150 148 for name, perm in repo_items:
151 149 yield check_tree_perms, name, perm, group, 'repository.write'
152 150
153 151 for name, perm in items:
154 152 yield check_tree_perms, name, perm, group, 'group.write'
155 153
156 154
157 155 @with_setup(permissions_setup_func)
158 156 def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
159 157 ## set permission to g0_3 group to none
160 158 recursive = True
161 159 group = 'g0/g0_2'
162 160 permissions_setup_func(group, 'group.admin', recursive=recursive)
163 161
164 162 repo_items = [x for x in _get_repo_perms(group, recursive)]
165 163 items = [x for x in _get_group_perms(group, recursive)]
166 164 _check_expected_count(items, repo_items, expected_count(group, True))
167 165
168 166 for name, perm in repo_items:
169 167 yield check_tree_perms, name, perm, group, 'repository.admin'
170 168
171 169 for name, perm in items:
172 170 yield check_tree_perms, name, perm, group, 'group.admin'
@@ -1,295 +1,294 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_libs
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6
7 7 Package for testing various lib/helper functions in rhodecode
8 8
9 9 :created_on: Jun 9, 2011
10 10 :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
11 11 :license: GPLv3, see COPYING for more details.
12 12 """
13 13 # This program is free software: you can redistribute it and/or modify
14 14 # it under the terms of the GNU General Public License as published by
15 15 # the Free Software Foundation, either version 3 of the License, or
16 16 # (at your option) any later version.
17 17 #
18 18 # This program is distributed in the hope that it will be useful,
19 19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 21 # GNU General Public License for more details.
22 22 #
23 23 # You should have received a copy of the GNU General Public License
24 24 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 25 from __future__ import with_statement
26 import unittest
27 26 import datetime
28 27 import hashlib
29 28 import mock
30 29 from rhodecode.tests import *
31 30
32 31 proto = 'http'
33 32 TEST_URLS = [
34 33 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
35 34 '%s://127.0.0.1' % proto),
36 35 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
37 36 '%s://127.0.0.1' % proto),
38 37 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
39 38 '%s://127.0.0.1' % proto),
40 39 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
41 40 '%s://127.0.0.1:8080' % proto),
42 41 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
43 42 '%s://domain.org' % proto),
44 43 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
45 44 '8080'],
46 45 '%s://domain.org:8080' % proto),
47 46 ]
48 47
49 48 proto = 'https'
50 49 TEST_URLS += [
51 50 ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
52 51 '%s://127.0.0.1' % proto),
53 52 ('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
54 53 '%s://127.0.0.1' % proto),
55 54 ('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
56 55 '%s://127.0.0.1' % proto),
57 56 ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
58 57 '%s://127.0.0.1:8080' % proto),
59 58 ('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
60 59 '%s://domain.org' % proto),
61 60 ('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
62 61 '8080'],
63 62 '%s://domain.org:8080' % proto),
64 63 ]
65 64
66 65
67 class TestLibs(unittest.TestCase):
66 class TestLibs(BaseTestCase):
68 67
69 68 @parameterized.expand(TEST_URLS)
70 69 def test_uri_filter(self, test_url, expected, expected_creds):
71 70 from rhodecode.lib.utils2 import uri_filter
72 71 self.assertEqual(uri_filter(test_url), expected)
73 72
74 73 @parameterized.expand(TEST_URLS)
75 74 def test_credentials_filter(self, test_url, expected, expected_creds):
76 75 from rhodecode.lib.utils2 import credentials_filter
77 76 self.assertEqual(credentials_filter(test_url), expected_creds)
78 77
79 78 @parameterized.expand([('t', True),
80 79 ('true', True),
81 80 ('y', True),
82 81 ('yes', True),
83 82 ('on', True),
84 83 ('1', True),
85 84 ('Y', True),
86 85 ('yeS', True),
87 86 ('Y', True),
88 87 ('TRUE', True),
89 88 ('T', True),
90 89 ('False', False),
91 90 ('F', False),
92 91 ('FALSE', False),
93 92 ('0', False),
94 93 ('-1', False),
95 94 ('', False)
96 95 ])
97 96 def test_str2bool(self, str_bool, expected):
98 97 from rhodecode.lib.utils2 import str2bool
99 98 self.assertEqual(str2bool(str_bool), expected)
100 99
101 100 def test_mention_extractor(self):
102 101 from rhodecode.lib.utils2 import extract_mentioned_users
103 102 sample = (
104 103 "@first hi there @marcink here's my email marcin@email.com "
105 104 "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
106 105 "@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl "
107 106 "@marian.user just do it @marco-polo and next extract @marco_polo "
108 107 "user.dot hej ! not-needed maril@domain.org"
109 108 )
110 109
111 110 s = sorted([
112 111 'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
113 112 'marian.user', 'marco-polo', 'marco_polo'
114 113 ], key=lambda k: k.lower())
115 114 self.assertEqual(s, extract_mentioned_users(sample))
116 115
117 116 @parameterized.expand([
118 117 (dict(), u'just now'),
119 118 (dict(seconds= -1), u'1 second ago'),
120 119 (dict(seconds= -60 * 2), u'2 minutes ago'),
121 120 (dict(hours= -1), u'1 hour ago'),
122 121 (dict(hours= -24), u'1 day ago'),
123 122 (dict(hours= -24 * 5), u'5 days ago'),
124 123 (dict(months= -1), u'1 month ago'),
125 124 (dict(months= -1, days= -2), u'1 month and 2 days ago'),
126 125 (dict(years= -1, months= -1), u'1 year and 1 month ago'),
127 126 ])
128 127 def test_age(self, age_args, expected):
129 128 from rhodecode.lib.utils2 import age
130 129 from dateutil import relativedelta
131 130 n = datetime.datetime(year=2012, month=5, day=17)
132 131 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
133 132 self.assertEqual(age(n + delt(**age_args), now=n), expected)
134 133
135 134 @parameterized.expand([
136 135
137 136 (dict(), u'just now'),
138 137 (dict(seconds=1), u'in 1 second'),
139 138 (dict(seconds=60 * 2), u'in 2 minutes'),
140 139 (dict(hours=1), u'in 1 hour'),
141 140 (dict(hours=24), u'in 1 day'),
142 141 (dict(hours=24 * 5), u'in 5 days'),
143 142 (dict(months=1), u'in 1 month'),
144 143 (dict(months=1, days=1), u'in 1 month and 1 day'),
145 144 (dict(years=1, months=1), u'in 1 year and 1 month')
146 145 ])
147 146 def test_age_in_future(self, age_args, expected):
148 147 from rhodecode.lib.utils2 import age
149 148 from dateutil import relativedelta
150 149 n = datetime.datetime(year=2012, month=5, day=17)
151 150 delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
152 151 self.assertEqual(age(n + delt(**age_args), now=n), expected)
153 152
154 153 def test_tag_exctrator(self):
155 154 sample = (
156 155 "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
157 156 "[requires] [stale] [see<>=>] [see => http://url.com]"
158 157 "[requires => url] [lang => python] [just a tag]"
159 158 "[,d] [ => ULR ] [obsolete] [desc]]"
160 159 )
161 160 from rhodecode.lib.helpers import desc_stylize
162 161 res = desc_stylize(sample)
163 162 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
164 163 self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
165 164 self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
166 165 self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
167 166 self.assertTrue('<div class="metatag" tag="requires">requires =&gt; <a href="/url">url</a></div>' in res)
168 167 self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
169 168
170 169 def test_alternative_gravatar(self):
171 170 from rhodecode.lib.helpers import gravatar_url
172 171 _md5 = lambda s: hashlib.md5(s).hexdigest()
173 172
174 173 def fake_conf(**kwargs):
175 174 from pylons import config
176 175 config['app_conf'] = {}
177 176 config['app_conf']['use_gravatar'] = True
178 177 config['app_conf'].update(kwargs)
179 178 return config
180 179
181 180 class fake_url():
182 181 @classmethod
183 182 def current(cls, *args, **kwargs):
184 183 return 'https://server.com'
185 184
186 185 with mock.patch('pylons.url', fake_url):
187 186 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
188 187 with mock.patch('pylons.config', fake):
189 188 from pylons import url
190 189 assert url.current() == 'https://server.com'
191 190 grav = gravatar_url(email_address='test@foo.com', size=24)
192 191 assert grav == 'http://test.com/test@foo.com'
193 192
194 193 fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
195 194 with mock.patch('pylons.config', fake):
196 195 grav = gravatar_url(email_address='test@foo.com', size=24)
197 196 assert grav == 'http://test.com/test@foo.com'
198 197
199 198 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
200 199 with mock.patch('pylons.config', fake):
201 200 em = 'test@foo.com'
202 201 grav = gravatar_url(email_address=em, size=24)
203 202 assert grav == 'http://test.com/%s' % (_md5(em))
204 203
205 204 fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
206 205 with mock.patch('pylons.config', fake):
207 206 em = 'test@foo.com'
208 207 grav = gravatar_url(email_address=em, size=24)
209 208 assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
210 209
211 210 fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
212 211 with mock.patch('pylons.config', fake):
213 212 em = 'test@foo.com'
214 213 grav = gravatar_url(email_address=em, size=24)
215 214 assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
216 215
217 216 def _quick_url(self, text, tmpl="""<a class="revision-link" href="%s">%s</a>""", url_=None):
218 217 """
219 218 Changes `some text url[foo]` => `some text <a href="/">foo</a>
220 219
221 220 :param text:
222 221 """
223 222 import re
224 223 # quickly change expected url[] into a link
225 224 URL_PAT = re.compile(r'(?:url\[)(.+?)(?:\])')
226 225
227 226 def url_func(match_obj):
228 227 _url = match_obj.groups()[0]
229 228 return tmpl % (url_ or '/some-url', _url)
230 229 return URL_PAT.sub(url_func, text)
231 230
232 231 @parameterized.expand([
233 232 ("",
234 233 ""),
235 234 ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
236 235 "git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68"),
237 236 ("from rev 000000000000",
238 237 "from rev url[000000000000]"),
239 238 ("from rev 000000000000123123 also rev 000000000000",
240 239 "from rev url[000000000000123123] also rev url[000000000000]"),
241 240 ("this should-000 00",
242 241 "this should-000 00"),
243 242 ("longtextffffffffff rev 123123123123",
244 243 "longtextffffffffff rev url[123123123123]"),
245 244 ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
246 245 "rev ffffffffffffffffffffffffffffffffffffffffffffffffff"),
247 246 ("ffffffffffff some text traalaa",
248 247 "url[ffffffffffff] some text traalaa"),
249 248 ("""Multi line
250 249 123123123123
251 250 some text 123123123123
252 251 sometimes !
253 252 """,
254 253 """Multi line
255 254 url[123123123123]
256 255 some text url[123123123123]
257 256 sometimes !
258 257 """)
259 258 ])
260 259 def test_urlify_changesets(self, sample, expected):
261 260 def fake_url(self, *args, **kwargs):
262 261 return '/some-url'
263 262
264 263 expected = self._quick_url(expected)
265 264
266 265 with mock.patch('pylons.url', fake_url):
267 266 from rhodecode.lib.helpers import urlify_changesets
268 267 self.assertEqual(urlify_changesets(sample, 'repo_name'), expected)
269 268
270 269 @parameterized.expand([
271 270 ("",
272 271 "",
273 272 ""),
274 273 ("https://svn.apache.org/repos",
275 274 "url[https://svn.apache.org/repos]",
276 275 "https://svn.apache.org/repos"),
277 276 ("http://svn.apache.org/repos",
278 277 "url[http://svn.apache.org/repos]",
279 278 "http://svn.apache.org/repos"),
280 279 ("from rev a also rev http://google.com",
281 280 "from rev a also rev url[http://google.com]",
282 281 "http://google.com"),
283 282 ("""Multi line
284 283 https://foo.bar.com
285 284 some text lalala""",
286 285 """Multi line
287 286 url[https://foo.bar.com]
288 287 some text lalala""",
289 288 "https://foo.bar.com")
290 289 ])
291 290 def test_urlify_test(self, sample, expected, url_):
292 291 from rhodecode.lib.helpers import urlify_text
293 292 expected = self._quick_url(expected,
294 293 tmpl="""<a href="%s">%s</a>""", url_=url_)
295 294 self.assertEqual(urlify_text(sample), expected)
@@ -1,254 +1,253 b''
1 1 # -*- coding: utf-8 -*-
2 import unittest
3 2 import formencode
4 3
5 4 from rhodecode.tests import *
6 5
7 6 from rhodecode.model import validators as v
8 7 from rhodecode.model.users_group import UserGroupModel
9 8
10 9 from rhodecode.model.meta import Session
11 10 from rhodecode.model.repos_group import ReposGroupModel
12 11 from rhodecode.model.db import ChangesetStatus, Repository
13 12 from rhodecode.model.changeset_status import ChangesetStatusModel
14 13 from rhodecode.tests.fixture import Fixture
15 14
16 15 fixture = Fixture()
17 16
18 17
19 class TestReposGroups(unittest.TestCase):
18 class TestReposGroups(BaseTestCase):
20 19
21 20 def setUp(self):
22 21 pass
23 22
24 23 def tearDown(self):
25 24 Session.remove()
26 25
27 26 def test_Message_extractor(self):
28 27 validator = v.ValidUsername()
29 28 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
30 29
31 30 class StateObj(object):
32 31 pass
33 32
34 33 self.assertRaises(formencode.Invalid,
35 34 validator.to_python, 'default', StateObj)
36 35
37 36 def test_ValidUsername(self):
38 37 validator = v.ValidUsername()
39 38
40 39 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
41 40 self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
42 41 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
43 42 self.assertRaises(formencode.Invalid, validator.to_python,
44 43 TEST_USER_ADMIN_LOGIN)
45 44 self.assertEqual('test', validator.to_python('test'))
46 45
47 46 validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
48 47
49 48 def test_ValidRepoUser(self):
50 49 validator = v.ValidRepoUser()
51 50 self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
52 51 self.assertEqual(TEST_USER_ADMIN_LOGIN,
53 52 validator.to_python(TEST_USER_ADMIN_LOGIN))
54 53
55 54 def test_ValidUserGroup(self):
56 55 validator = v.ValidUserGroup()
57 56 self.assertRaises(formencode.Invalid, validator.to_python, 'default')
58 57 self.assertRaises(formencode.Invalid, validator.to_python, '.,')
59 58
60 59 gr = fixture.create_user_group('test')
61 60 gr2 = fixture.create_user_group('tes2')
62 61 Session().commit()
63 62 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
64 63 assert gr.users_group_id != None
65 64 validator = v.ValidUserGroup(edit=True,
66 65 old_data={'users_group_id':
67 66 gr2.users_group_id})
68 67
69 68 self.assertRaises(formencode.Invalid, validator.to_python, 'test')
70 69 self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
71 70 self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
72 71 UserGroupModel().delete(gr)
73 72 UserGroupModel().delete(gr2)
74 73 Session().commit()
75 74
76 75 def test_ValidReposGroup(self):
77 76 validator = v.ValidReposGroup()
78 77 model = ReposGroupModel()
79 78 self.assertRaises(formencode.Invalid, validator.to_python,
80 79 {'group_name': HG_REPO, })
81 80 gr = model.create(group_name='test_gr', group_description='desc',
82 81 parent=None,
83 82 just_db=True,
84 83 owner=TEST_USER_ADMIN_LOGIN)
85 84 self.assertRaises(formencode.Invalid,
86 85 validator.to_python, {'group_name': gr.group_name, })
87 86
88 87 validator = v.ValidReposGroup(edit=True,
89 88 old_data={'group_id': gr.group_id})
90 89 self.assertRaises(formencode.Invalid,
91 90 validator.to_python, {
92 91 'group_name': gr.group_name + 'n',
93 92 'group_parent_id': gr.group_id
94 93 })
95 94 model.delete(gr)
96 95
97 96 def test_ValidPassword(self):
98 97 validator = v.ValidPassword()
99 98 self.assertEqual('lol', validator.to_python('lol'))
100 99 self.assertEqual(None, validator.to_python(None))
101 100 self.assertRaises(formencode.Invalid, validator.to_python, 'Δ…Δ‡ΕΌΕΊ')
102 101
103 102 def test_ValidPasswordsMatch(self):
104 103 validator = v.ValidPasswordsMatch()
105 104 self.assertRaises(formencode.Invalid,
106 105 validator.to_python, {'password': 'pass',
107 106 'password_confirmation': 'pass2'})
108 107
109 108 self.assertRaises(formencode.Invalid,
110 109 validator.to_python, {'new_password': 'pass',
111 110 'password_confirmation': 'pass2'})
112 111
113 112 self.assertEqual({'new_password': 'pass',
114 113 'password_confirmation': 'pass'},
115 114 validator.to_python({'new_password': 'pass',
116 115 'password_confirmation': 'pass'}))
117 116
118 117 self.assertEqual({'password': 'pass',
119 118 'password_confirmation': 'pass'},
120 119 validator.to_python({'password': 'pass',
121 120 'password_confirmation': 'pass'}))
122 121
123 122 def test_ValidAuth(self):
124 123 validator = v.ValidAuth()
125 124 valid_creds = {
126 125 'username': TEST_USER_REGULAR2_LOGIN,
127 126 'password': TEST_USER_REGULAR2_PASS,
128 127 }
129 128 invalid_creds = {
130 129 'username': 'err',
131 130 'password': 'err',
132 131 }
133 132 self.assertEqual(valid_creds, validator.to_python(valid_creds))
134 133 self.assertRaises(formencode.Invalid,
135 134 validator.to_python, invalid_creds)
136 135
137 136 def test_ValidAuthToken(self):
138 137 validator = v.ValidAuthToken()
139 138 # this is untestable without a threadlocal
140 139 # self.assertRaises(formencode.Invalid,
141 140 # validator.to_python, 'BadToken')
142 141 validator
143 142
144 143 def test_ValidRepoName(self):
145 144 validator = v.ValidRepoName()
146 145
147 146 self.assertRaises(formencode.Invalid,
148 147 validator.to_python, {'repo_name': ''})
149 148
150 149 self.assertRaises(formencode.Invalid,
151 150 validator.to_python, {'repo_name': HG_REPO})
152 151
153 152 gr = ReposGroupModel().create(group_name='group_test',
154 153 group_description='desc',
155 154 parent=None,
156 155 owner=TEST_USER_ADMIN_LOGIN)
157 156 self.assertRaises(formencode.Invalid,
158 157 validator.to_python, {'repo_name': gr.group_name})
159 158
160 159 #TODO: write an error case for that ie. create a repo withinh a group
161 160 # self.assertRaises(formencode.Invalid,
162 161 # validator.to_python, {'repo_name': 'some',
163 162 # 'repo_group': gr.group_id})
164 163
165 164 def test_ValidForkName(self):
166 165 # this uses ValidRepoName validator
167 166 assert True
168 167
169 168 @parameterized.expand([
170 169 ('test', 'test'), ('lolz!', 'lolz'), (' aavv', 'aavv'),
171 170 ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
172 171 ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
173 172 ('/]re po', 're-po')])
174 173 def test_SlugifyName(self, name, expected):
175 174 validator = v.SlugifyName()
176 175 self.assertEqual(expected, validator.to_python(name))
177 176
178 177 def test_ValidCloneUri(self):
179 178 #TODO: write this one
180 179 pass
181 180
182 181 def test_ValidForkType(self):
183 182 validator = v.ValidForkType(old_data={'repo_type': 'hg'})
184 183 self.assertEqual('hg', validator.to_python('hg'))
185 184 self.assertRaises(formencode.Invalid, validator.to_python, 'git')
186 185
187 186 def test_ValidPerms(self):
188 187 #TODO: write this one
189 188 pass
190 189
191 190 def test_ValidSettings(self):
192 191 validator = v.ValidSettings()
193 192 self.assertEqual({'pass': 'pass'},
194 193 validator.to_python(value={'user': 'test',
195 194 'pass': 'pass'}))
196 195
197 196 self.assertEqual({'user2': 'test', 'pass': 'pass'},
198 197 validator.to_python(value={'user2': 'test',
199 198 'pass': 'pass'}))
200 199
201 200 def test_ValidPath(self):
202 201 validator = v.ValidPath()
203 202 self.assertEqual(TESTS_TMP_PATH,
204 203 validator.to_python(TESTS_TMP_PATH))
205 204 self.assertRaises(formencode.Invalid, validator.to_python,
206 205 '/no_such_dir')
207 206
208 207 def test_UniqSystemEmail(self):
209 208 validator = v.UniqSystemEmail(old_data={})
210 209
211 210 self.assertEqual('mail@python.org',
212 211 validator.to_python('MaiL@Python.org'))
213 212
214 213 email = TEST_USER_REGULAR2_EMAIL
215 214 self.assertRaises(formencode.Invalid, validator.to_python, email)
216 215
217 216 def test_ValidSystemEmail(self):
218 217 validator = v.ValidSystemEmail()
219 218 email = TEST_USER_REGULAR2_EMAIL
220 219
221 220 self.assertEqual(email, validator.to_python(email))
222 221 self.assertRaises(formencode.Invalid, validator.to_python, 'err')
223 222
224 223 def test_LdapLibValidator(self):
225 224 if ldap_lib_installed:
226 225 validator = v.LdapLibValidator()
227 226 self.assertEqual("DN", validator.to_python('DN'))
228 227 else:
229 228 validator = v.LdapLibValidator()
230 229 self.assertRaises(v.LdapImportError, validator.to_python, 'err')
231 230
232 231 def test_AttrLoginValidator(self):
233 232 validator = v.AttrLoginValidator()
234 233 self.assertEqual('DN_attr', validator.to_python('DN_attr'))
235 234
236 235 def test_NotReviewedRevisions(self):
237 236 repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
238 237 validator = v.NotReviewedRevisions(repo_id)
239 238 rev = '0' * 40
240 239 # add status for a rev, that should throw an error because it is already
241 240 # reviewed
242 241 new_status = ChangesetStatus()
243 242 new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
244 243 new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
245 244 new_status.status = ChangesetStatus.STATUS_APPROVED
246 245 new_status.comment = None
247 246 new_status.revision = rev
248 247 Session().add(new_status)
249 248 Session().commit()
250 249 try:
251 250 self.assertRaises(formencode.Invalid, validator.to_python, [rev])
252 251 finally:
253 252 Session().delete(new_status)
254 253 Session().commit()
@@ -1,507 +1,507 b''
1 1 # -*- coding: utf-8 -*-
2 2 """
3 3 rhodecode.tests.test_scm_operations
4 4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5 5
6 6 Test suite for making push/pull operations.
7 7 Run using after doing paster serve test.ini::
8 8 RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
9 9
10 10 You must have git > 1.8.1 for tests to work fine
11 11
12 12 :created_on: Dec 30, 2010
13 13 :author: marcink
14 14 :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
15 15 :license: GPLv3, see COPYING for more details.
16 16 """
17 17 # This program is free software: you can redistribute it and/or modify
18 18 # it under the terms of the GNU General Public License as published by
19 19 # the Free Software Foundation, either version 3 of the License, or
20 20 # (at your option) any later version.
21 21 #
22 22 # This program is distributed in the hope that it will be useful,
23 23 # but WITHOUT ANY WARRANTY; without even the implied warranty of
24 24 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 25 # GNU General Public License for more details.
26 26 #
27 27 # You should have received a copy of the GNU General Public License
28 28 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 29
30 30 import os
31 31 import tempfile
32 32 import unittest
33 33 import time
34 34 from os.path import join as jn
35 35 from os.path import dirname as dn
36 36
37 37 from tempfile import _RandomNameSequence
38 38 from subprocess import Popen, PIPE
39 39
40 40 from rhodecode.tests import *
41 41 from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\
42 42 CacheInvalidation
43 43 from rhodecode.model.meta import Session
44 44 from rhodecode.model.repo import RepoModel
45 45 from rhodecode.model.user import UserModel
46 46
47 47 DEBUG = True
48 48 HOST = '127.0.0.1:5000' # test host
49 49
50 50
51 51 class Command(object):
52 52
53 53 def __init__(self, cwd):
54 54 self.cwd = cwd
55 55
56 56 def execute(self, cmd, *args):
57 57 """
58 58 Runs command on the system with given ``args``.
59 59 """
60 60
61 61 command = cmd + ' ' + ' '.join(args)
62 62 if DEBUG:
63 63 print '*** CMD %s ***' % command
64 64 p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
65 65 stdout, stderr = p.communicate()
66 66 if DEBUG:
67 67 print stdout, stderr
68 68 return stdout, stderr
69 69
70 70
71 71 def _get_tmp_dir():
72 72 return tempfile.mkdtemp(prefix='rc_integration_test')
73 73
74 74
75 75 def _construct_url(repo, dest=None, **kwargs):
76 76 if dest is None:
77 77 #make temp clone
78 78 dest = _get_tmp_dir()
79 79 params = {
80 80 'user': TEST_USER_ADMIN_LOGIN,
81 81 'passwd': TEST_USER_ADMIN_PASS,
82 82 'host': HOST,
83 83 'cloned_repo': repo,
84 84 'dest': dest
85 85 }
86 86 params.update(**kwargs)
87 87 if params['user'] and params['passwd']:
88 88 _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
89 89 else:
90 90 _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
91 91 return _url
92 92
93 93
94 94 def _add_files_and_push(vcs, DEST, **kwargs):
95 95 """
96 96 Generate some files, add it to DEST repo and push back
97 97 vcs is git or hg and defines what VCS we want to make those files for
98 98
99 99 :param vcs:
100 100 :param DEST:
101 101 """
102 102 # commit some stuff into this repo
103 103 cwd = path = jn(DEST)
104 104 #added_file = jn(path, '%ssetupΔ…ΕΌΕΊΔ‡.py' % _RandomNameSequence().next())
105 105 added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
106 106 Command(cwd).execute('touch %s' % added_file)
107 107 Command(cwd).execute('%s add %s' % (vcs, added_file))
108 108
109 109 for i in xrange(kwargs.get('files_no', 3)):
110 110 cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
111 111 Command(cwd).execute(cmd)
112 112 author_str = 'Marcin KuΕΊminski <me@email.com>'
113 113 if vcs == 'hg':
114 114 cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
115 115 i, author_str, added_file
116 116 )
117 117 elif vcs == 'git':
118 118 cmd = """git commit -m 'commited new %s' --author '%s' %s """ % (
119 119 i, author_str, added_file
120 120 )
121 121 Command(cwd).execute(cmd)
122 122 # PUSH it back
123 123 if vcs == 'hg':
124 124 _REPO = HG_REPO
125 125 elif vcs == 'git':
126 126 _REPO = GIT_REPO
127 127
128 128 kwargs['dest'] = ''
129 129 clone_url = _construct_url(_REPO, **kwargs)
130 130 if 'clone_url' in kwargs:
131 131 clone_url = kwargs['clone_url']
132 132 if vcs == 'hg':
133 133 stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
134 134 elif vcs == 'git':
135 135 stdout, stderr = Command(cwd).execute('git push --verbose', clone_url + " master")
136 136
137 137 return stdout, stderr
138 138
139 139
140 140 def set_anonymous_access(enable=True):
141 141 user = User.get_by_username(User.DEFAULT_USER)
142 142 user.active = enable
143 143 Session().add(user)
144 144 Session().commit()
145 145 print '\tanonymous access is now:', enable
146 146 if enable != User.get_by_username(User.DEFAULT_USER).active:
147 147 raise Exception('Cannot set anonymous access')
148 148
149 149
150 150 #==============================================================================
151 151 # TESTS
152 152 #==============================================================================
153 153
154 class TestVCSOperations(unittest.TestCase):
154 class TestVCSOperations(BaseTestCase):
155 155
156 156 @classmethod
157 157 def setup_class(cls):
158 158 #DISABLE ANONYMOUS ACCESS
159 159 set_anonymous_access(False)
160 160
161 161 def setUp(self):
162 162 r = Repository.get_by_repo_name(GIT_REPO)
163 163 Repository.unlock(r)
164 164 r.enable_locking = False
165 165 Session().add(r)
166 166 Session().commit()
167 167
168 168 r = Repository.get_by_repo_name(HG_REPO)
169 169 Repository.unlock(r)
170 170 r.enable_locking = False
171 171 Session().add(r)
172 172 Session().commit()
173 173
174 174 def test_clone_hg_repo_by_admin(self):
175 175 clone_url = _construct_url(HG_REPO)
176 176 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
177 177
178 178 assert 'requesting all changes' in stdout
179 179 assert 'adding changesets' in stdout
180 180 assert 'adding manifests' in stdout
181 181 assert 'adding file changes' in stdout
182 182
183 183 assert stderr == ''
184 184
185 185 def test_clone_git_repo_by_admin(self):
186 186 clone_url = _construct_url(GIT_REPO)
187 187 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
188 188
189 189 assert 'Cloning into' in stdout
190 190 assert stderr == ''
191 191
192 192 def test_clone_wrong_credentials_hg(self):
193 193 clone_url = _construct_url(HG_REPO, passwd='bad!')
194 194 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
195 195 assert 'abort: authorization failed' in stderr
196 196
197 197 def test_clone_wrong_credentials_git(self):
198 198 clone_url = _construct_url(GIT_REPO, passwd='bad!')
199 199 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
200 200 assert 'fatal: Authentication failed' in stderr
201 201
202 202 def test_clone_git_dir_as_hg(self):
203 203 clone_url = _construct_url(GIT_REPO)
204 204 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
205 205 assert 'HTTP Error 404: Not Found' in stderr
206 206
207 207 def test_clone_hg_repo_as_git(self):
208 208 clone_url = _construct_url(HG_REPO)
209 209 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
210 210 assert 'not found:' in stderr
211 211
212 212 def test_clone_non_existing_path_hg(self):
213 213 clone_url = _construct_url('trololo')
214 214 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
215 215 assert 'HTTP Error 404: Not Found' in stderr
216 216
217 217 def test_clone_non_existing_path_git(self):
218 218 clone_url = _construct_url('trololo')
219 219 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
220 220 assert 'not found:' in stderr
221 221
222 222 def test_push_new_file_hg(self):
223 223 DEST = _get_tmp_dir()
224 224 clone_url = _construct_url(HG_REPO, dest=DEST)
225 225 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
226 226
227 227 stdout, stderr = _add_files_and_push('hg', DEST)
228 228
229 229 assert 'pushing to' in stdout
230 230 assert 'Repository size' in stdout
231 231 assert 'Last revision is now' in stdout
232 232
233 233 def test_push_new_file_git(self):
234 234 DEST = _get_tmp_dir()
235 235 clone_url = _construct_url(GIT_REPO, dest=DEST)
236 236 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
237 237
238 238 # commit some stuff into this repo
239 239 stdout, stderr = _add_files_and_push('git', DEST)
240 240
241 241 #WTF git stderr ?!
242 242 assert 'master -> master' in stderr
243 243
244 244 def test_push_invalidates_cache_hg(self):
245 245 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
246 246 ==HG_REPO).one()
247 247 key.cache_active = True
248 248 Session().add(key)
249 249 Session().commit()
250 250
251 251 DEST = _get_tmp_dir()
252 252 clone_url = _construct_url(HG_REPO, dest=DEST)
253 253 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
254 254
255 255 stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
256 256 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
257 257 ==HG_REPO).one()
258 258 self.assertEqual(key.cache_active, False)
259 259
260 260 def test_push_invalidates_cache_git(self):
261 261 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
262 262 ==GIT_REPO).one()
263 263 key.cache_active = True
264 264 Session().add(key)
265 265 Session().commit()
266 266
267 267 DEST = _get_tmp_dir()
268 268 clone_url = _construct_url(GIT_REPO, dest=DEST)
269 269 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
270 270
271 271 # commit some stuff into this repo
272 272 stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
273 273
274 274 key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
275 275 ==GIT_REPO).one()
276 276 self.assertEqual(key.cache_active, False)
277 277
278 278 def test_push_wrong_credentials_hg(self):
279 279 DEST = _get_tmp_dir()
280 280 clone_url = _construct_url(HG_REPO, dest=DEST)
281 281 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
282 282
283 283 stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
284 284 passwd='name')
285 285
286 286 assert 'abort: authorization failed' in stderr
287 287
288 288 def test_push_wrong_credentials_git(self):
289 289 DEST = _get_tmp_dir()
290 290 clone_url = _construct_url(GIT_REPO, dest=DEST)
291 291 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
292 292
293 293 stdout, stderr = _add_files_and_push('git', DEST, user='bad',
294 294 passwd='name')
295 295
296 296 assert 'fatal: Authentication failed' in stderr
297 297
298 298 def test_push_back_to_wrong_url_hg(self):
299 299 DEST = _get_tmp_dir()
300 300 clone_url = _construct_url(HG_REPO, dest=DEST)
301 301 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
302 302
303 303 stdout, stderr = _add_files_and_push('hg', DEST,
304 304 clone_url='http://127.0.0.1:5000/tmp',)
305 305
306 306 assert 'HTTP Error 404: Not Found' in stderr
307 307
308 308 def test_push_back_to_wrong_url_git(self):
309 309 DEST = _get_tmp_dir()
310 310 clone_url = _construct_url(GIT_REPO, dest=DEST)
311 311 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
312 312
313 313 stdout, stderr = _add_files_and_push('git', DEST,
314 314 clone_url='http://127.0.0.1:5000/tmp',)
315 315
316 316 assert 'not found:' in stderr
317 317
318 318 def test_clone_and_create_lock_hg(self):
319 319 # enable locking
320 320 r = Repository.get_by_repo_name(HG_REPO)
321 321 r.enable_locking = True
322 322 Session().add(r)
323 323 Session().commit()
324 324 # clone
325 325 clone_url = _construct_url(HG_REPO)
326 326 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
327 327
328 328 #check if lock was made
329 329 r = Repository.get_by_repo_name(HG_REPO)
330 330 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
331 331
332 332 def test_clone_and_create_lock_git(self):
333 333 # enable locking
334 334 r = Repository.get_by_repo_name(GIT_REPO)
335 335 r.enable_locking = True
336 336 Session().add(r)
337 337 Session().commit()
338 338 # clone
339 339 clone_url = _construct_url(GIT_REPO)
340 340 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
341 341
342 342 #check if lock was made
343 343 r = Repository.get_by_repo_name(GIT_REPO)
344 344 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
345 345
346 346 def test_clone_after_repo_was_locked_hg(self):
347 347 #lock repo
348 348 r = Repository.get_by_repo_name(HG_REPO)
349 349 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
350 350 #pull fails since repo is locked
351 351 clone_url = _construct_url(HG_REPO)
352 352 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
353 353 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
354 354 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
355 355 assert msg in stderr
356 356
357 357 def test_clone_after_repo_was_locked_git(self):
358 358 #lock repo
359 359 r = Repository.get_by_repo_name(GIT_REPO)
360 360 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
361 361 #pull fails since repo is locked
362 362 clone_url = _construct_url(GIT_REPO)
363 363 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
364 364 msg = ("""The requested URL returned error: 423""")
365 365 assert msg in stderr
366 366
367 367 def test_push_on_locked_repo_by_other_user_hg(self):
368 368 #clone some temp
369 369 DEST = _get_tmp_dir()
370 370 clone_url = _construct_url(HG_REPO, dest=DEST)
371 371 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
372 372
373 373 #lock repo
374 374 r = Repository.get_by_repo_name(HG_REPO)
375 375 # let this user actually push !
376 376 RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
377 377 perm='repository.write')
378 378 Session().commit()
379 379 Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
380 380
381 381 #push fails repo is locked by other user !
382 382 stdout, stderr = _add_files_and_push('hg', DEST,
383 383 user=TEST_USER_REGULAR_LOGIN,
384 384 passwd=TEST_USER_REGULAR_PASS)
385 385 msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
386 386 % (HG_REPO, TEST_USER_ADMIN_LOGIN))
387 387 assert msg in stderr
388 388
389 389 #TODO: fix me ! somehow during tests hooks don't get called on GIT
390 390 # def test_push_on_locked_repo_by_other_user_git(self):
391 391 # #clone some temp
392 392 # DEST = _get_tmp_dir()
393 393 # clone_url = _construct_url(GIT_REPO, dest=DEST)
394 394 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
395 395 #
396 396 # #lock repo
397 397 # r = Repository.get_by_repo_name(GIT_REPO)
398 398 # # let this user actually push !
399 399 # RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
400 400 # perm='repository.write')
401 401 # Session().commit()
402 402 # Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
403 403 #
404 404 # #push fails repo is locked by other user !
405 405 # stdout, stderr = _add_files_and_push('git', DEST,
406 406 # user=TEST_USER_REGULAR_LOGIN,
407 407 # passwd=TEST_USER_REGULAR_PASS)
408 408 # msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
409 409 # % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
410 410 # #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
411 411 # # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
412 412 # msg = "405 Method Not Allowed"
413 413 # assert msg in stderr
414 414
415 415 def test_push_unlocks_repository_hg(self):
416 416 # enable locking
417 417 r = Repository.get_by_repo_name(HG_REPO)
418 418 r.enable_locking = True
419 419 Session().add(r)
420 420 Session().commit()
421 421 #clone some temp
422 422 DEST = _get_tmp_dir()
423 423 clone_url = _construct_url(HG_REPO, dest=DEST)
424 424 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
425 425
426 426 #check for lock repo after clone
427 427 r = Repository.get_by_repo_name(HG_REPO)
428 428 assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
429 429
430 430 #push is ok and repo is now unlocked
431 431 stdout, stderr = _add_files_and_push('hg', DEST)
432 432 assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
433 433 #we need to cleanup the Session Here !
434 434 Session.remove()
435 435 r = Repository.get_by_repo_name(HG_REPO)
436 436 assert r.locked == [None, None]
437 437
438 438 #TODO: fix me ! somehow during tests hooks don't get called on GIT
439 439 # def test_push_unlocks_repository_git(self):
440 440 # # enable locking
441 441 # r = Repository.get_by_repo_name(GIT_REPO)
442 442 # r.enable_locking = True
443 443 # Session().add(r)
444 444 # Session().commit()
445 445 # #clone some temp
446 446 # DEST = _get_tmp_dir()
447 447 # clone_url = _construct_url(GIT_REPO, dest=DEST)
448 448 # stdout, stderr = Command('/tmp').execute('git clone', clone_url)
449 449 #
450 450 # #check for lock repo after clone
451 451 # r = Repository.get_by_repo_name(GIT_REPO)
452 452 # assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
453 453 #
454 454 # #push is ok and repo is now unlocked
455 455 # stdout, stderr = _add_files_and_push('git', DEST)
456 456 # #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
457 457 # #we need to cleanup the Session Here !
458 458 # Session.remove()
459 459 # r = Repository.get_by_repo_name(GIT_REPO)
460 460 # assert r.locked == [None, None]
461 461
462 462 def test_ip_restriction_hg(self):
463 463 user_model = UserModel()
464 464 try:
465 465 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
466 466 Session().commit()
467 467 clone_url = _construct_url(HG_REPO)
468 468 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
469 469 assert 'abort: HTTP Error 403: Forbidden' in stderr
470 470 finally:
471 471 #release IP restrictions
472 472 for ip in UserIpMap.getAll():
473 473 UserIpMap.delete(ip.ip_id)
474 474 Session().commit()
475 475
476 476 time.sleep(2)
477 477 clone_url = _construct_url(HG_REPO)
478 478 stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
479 479
480 480 assert 'requesting all changes' in stdout
481 481 assert 'adding changesets' in stdout
482 482 assert 'adding manifests' in stdout
483 483 assert 'adding file changes' in stdout
484 484
485 485 assert stderr == ''
486 486
487 487 def test_ip_restriction_git(self):
488 488 user_model = UserModel()
489 489 try:
490 490 user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
491 491 Session().commit()
492 492 clone_url = _construct_url(GIT_REPO)
493 493 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
494 494 msg = ("""The requested URL returned error: 403""")
495 495 assert msg in stderr
496 496 finally:
497 497 #release IP restrictions
498 498 for ip in UserIpMap.getAll():
499 499 UserIpMap.delete(ip.ip_id)
500 500 Session().commit()
501 501
502 502 time.sleep(2)
503 503 clone_url = _construct_url(GIT_REPO)
504 504 stdout, stderr = Command('/tmp').execute('git clone', clone_url)
505 505
506 506 assert 'Cloning into' in stdout
507 507 assert stderr == ''
General Comments 0
You need to be logged in to leave comments. Login now