Show More
|
1 | NO CONTENT: new file 100644 |
@@ -0,0 +1,316 | |||
|
1 | import os | |
|
2 | import unittest | |
|
3 | from rhodecode.tests import * | |
|
4 | ||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |
|
6 | from rhodecode.model.repo import RepoModel | |
|
7 | from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm | |
|
8 | from rhodecode.model.user import UserModel | |
|
9 | ||
|
10 | from rhodecode.model.meta import Session | |
|
11 | from rhodecode.model.users_group import UsersGroupModel | |
|
12 | from rhodecode.lib.auth import AuthUser | |
|
13 | ||
|
14 | ||
|
15 | def _make_group(path, desc='desc', parent_id=None, | |
|
16 | skip_if_exists=False): | |
|
17 | ||
|
18 | gr = RepoGroup.get_by_group_name(path) | |
|
19 | if gr and skip_if_exists: | |
|
20 | return gr | |
|
21 | ||
|
22 | gr = ReposGroupModel().create(path, desc, parent_id) | |
|
23 | return gr | |
|
24 | ||
|
25 | ||
|
26 | class TestPermissions(unittest.TestCase): | |
|
27 | def __init__(self, methodName='runTest'): | |
|
28 | super(TestPermissions, self).__init__(methodName=methodName) | |
|
29 | ||
|
30 | def setUp(self): | |
|
31 | self.u1 = UserModel().create_or_update( | |
|
32 | username=u'u1', password=u'qweqwe', | |
|
33 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' | |
|
34 | ) | |
|
35 | self.u2 = UserModel().create_or_update( | |
|
36 | username=u'u2', password=u'qweqwe', | |
|
37 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' | |
|
38 | ) | |
|
39 | self.anon = User.get_by_username('default') | |
|
40 | self.a1 = UserModel().create_or_update( | |
|
41 | username=u'a1', password=u'qweqwe', | |
|
42 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True | |
|
43 | ) | |
|
44 | Session().commit() | |
|
45 | ||
|
46 | def tearDown(self): | |
|
47 | if hasattr(self, 'test_repo'): | |
|
48 | RepoModel().delete(repo=self.test_repo) | |
|
49 | UserModel().delete(self.u1) | |
|
50 | UserModel().delete(self.u2) | |
|
51 | UserModel().delete(self.a1) | |
|
52 | if hasattr(self, 'g1'): | |
|
53 | ReposGroupModel().delete(self.g1.group_id) | |
|
54 | if hasattr(self, 'g2'): | |
|
55 | ReposGroupModel().delete(self.g2.group_id) | |
|
56 | ||
|
57 | if hasattr(self, 'ug1'): | |
|
58 | UsersGroupModel().delete(self.ug1, force=True) | |
|
59 | ||
|
60 | Session().commit() | |
|
61 | ||
|
62 | def test_default_perms_set(self): | |
|
63 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
64 | perms = { | |
|
65 | 'repositories_groups': {}, | |
|
66 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
67 | u'hg.register.manual_activate']), | |
|
68 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
69 | } | |
|
70 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
71 | perms['repositories'][HG_REPO]) | |
|
72 | new_perm = 'repository.write' | |
|
73 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |
|
74 | perm=new_perm) | |
|
75 | Session().commit() | |
|
76 | ||
|
77 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
78 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
79 | new_perm) | |
|
80 | ||
|
81 | def test_default_admin_perms_set(self): | |
|
82 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
83 | perms = { | |
|
84 | 'repositories_groups': {}, | |
|
85 | 'global': set([u'hg.admin']), | |
|
86 | 'repositories': {u'vcs_test_hg': u'repository.admin'} | |
|
87 | } | |
|
88 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
89 | perms['repositories'][HG_REPO]) | |
|
90 | new_perm = 'repository.write' | |
|
91 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, | |
|
92 | perm=new_perm) | |
|
93 | Session().commit() | |
|
94 | # cannot really downgrade admins permissions !? they still get's set as | |
|
95 | # admin ! | |
|
96 | u1_auth = AuthUser(user_id=self.a1.user_id) | |
|
97 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
98 | perms['repositories'][HG_REPO]) | |
|
99 | ||
|
100 | def test_default_group_perms(self): | |
|
101 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
102 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
103 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
104 | perms = { | |
|
105 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, | |
|
106 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), | |
|
107 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
108 | } | |
|
109 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
110 | perms['repositories'][HG_REPO]) | |
|
111 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
112 | perms['repositories_groups']) | |
|
113 | ||
|
114 | def test_default_admin_group_perms(self): | |
|
115 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
116 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
117 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
118 | perms = { | |
|
119 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, | |
|
120 | 'global': set(['hg.admin']), | |
|
121 | 'repositories': {u'vcs_test_hg': 'repository.admin'} | |
|
122 | } | |
|
123 | ||
|
124 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
125 | perms['repositories'][HG_REPO]) | |
|
126 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
127 | perms['repositories_groups']) | |
|
128 | ||
|
129 | def test_propagated_permission_from_users_group(self): | |
|
130 | # make group | |
|
131 | self.ug1 = UsersGroupModel().create('G1') | |
|
132 | # add user to group | |
|
133 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
134 | ||
|
135 | # set permission to lower | |
|
136 | new_perm = 'repository.none' | |
|
137 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |
|
138 | Session().commit() | |
|
139 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
140 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
141 | new_perm) | |
|
142 | ||
|
143 | # grant perm for group this should override permission from user | |
|
144 | new_perm = 'repository.write' | |
|
145 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
146 | group_name=self.ug1, | |
|
147 | perm=new_perm) | |
|
148 | # check perms | |
|
149 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
150 | perms = { | |
|
151 | 'repositories_groups': {}, | |
|
152 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
153 | u'hg.register.manual_activate']), | |
|
154 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
155 | } | |
|
156 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
157 | new_perm) | |
|
158 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
159 | perms['repositories_groups']) | |
|
160 | ||
|
161 | def test_propagated_permission_from_users_group_lower_weight(self): | |
|
162 | # make group | |
|
163 | self.ug1 = UsersGroupModel().create('G1') | |
|
164 | # add user to group | |
|
165 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
166 | ||
|
167 | # set permission to lower | |
|
168 | new_perm_h = 'repository.write' | |
|
169 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |
|
170 | perm=new_perm_h) | |
|
171 | Session().commit() | |
|
172 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
173 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
174 | new_perm_h) | |
|
175 | ||
|
176 | # grant perm for group this should NOT override permission from user | |
|
177 | # since it's lower than granted | |
|
178 | new_perm_l = 'repository.read' | |
|
179 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
180 | group_name=self.ug1, | |
|
181 | perm=new_perm_l) | |
|
182 | # check perms | |
|
183 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
184 | perms = { | |
|
185 | 'repositories_groups': {}, | |
|
186 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
187 | u'hg.register.manual_activate']), | |
|
188 | 'repositories': {u'vcs_test_hg': u'repository.write'} | |
|
189 | } | |
|
190 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
191 | new_perm_h) | |
|
192 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
193 | perms['repositories_groups']) | |
|
194 | ||
|
195 | def test_repo_in_group_permissions(self): | |
|
196 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
197 | self.g2 = _make_group('group2', skip_if_exists=True) | |
|
198 | Session().commit() | |
|
199 | # both perms should be read ! | |
|
200 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
201 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
202 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
203 | ||
|
204 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
205 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
206 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
207 | ||
|
208 | #Change perms to none for both groups | |
|
209 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
210 | user=self.anon, | |
|
211 | perm='group.none') | |
|
212 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
213 | user=self.anon, | |
|
214 | perm='group.none') | |
|
215 | ||
|
216 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
217 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
218 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
219 | ||
|
220 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
221 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
222 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
223 | ||
|
224 | # add repo to group | |
|
225 | name = RepoGroup.url_sep().join([self.g1.group_name, 'test_perm']) | |
|
226 | self.test_repo = RepoModel().create_repo( | |
|
227 | repo_name=name, | |
|
228 | repo_type='hg', | |
|
229 | description='', | |
|
230 | owner=self.u1, | |
|
231 | ) | |
|
232 | Session().commit() | |
|
233 | ||
|
234 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
235 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
236 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
237 | ||
|
238 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
239 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
240 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
241 | ||
|
242 | #grant permission for u2 ! | |
|
243 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
244 | user=self.u2, | |
|
245 | perm='group.read') | |
|
246 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
247 | user=self.u2, | |
|
248 | perm='group.read') | |
|
249 | Session().commit() | |
|
250 | self.assertNotEqual(self.u1, self.u2) | |
|
251 | #u1 and anon should have not change perms while u2 should ! | |
|
252 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
253 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
254 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
255 | ||
|
256 | u2_auth = AuthUser(user_id=self.u2.user_id) | |
|
257 | self.assertEqual(u2_auth.permissions['repositories_groups'], | |
|
258 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
259 | ||
|
260 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
261 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
262 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
263 | ||
|
264 | def test_repo_group_user_as_user_group_member(self): | |
|
265 | # create Group1 | |
|
266 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
267 | Session().commit() | |
|
268 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
269 | ||
|
270 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
271 | {u'group1': u'group.read'}) | |
|
272 | ||
|
273 | # set default permission to none | |
|
274 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
275 | user=self.anon, | |
|
276 | perm='group.none') | |
|
277 | # make group | |
|
278 | self.ug1 = UsersGroupModel().create('G1') | |
|
279 | # add user to group | |
|
280 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
281 | Session().commit() | |
|
282 | ||
|
283 | # check if user is in the group | |
|
284 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] | |
|
285 | self.assertEqual(membrs, [self.u1.user_id]) | |
|
286 | # add some user to that group | |
|
287 | ||
|
288 | # check his permissions | |
|
289 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
290 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
291 | {u'group1': u'group.none'}) | |
|
292 | ||
|
293 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
294 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
295 | {u'group1': u'group.none'}) | |
|
296 | ||
|
297 | # grant ug1 read permissions for | |
|
298 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, | |
|
299 | group_name=self.ug1, | |
|
300 | perm='group.read') | |
|
301 | Session().commit() | |
|
302 | # check if the | |
|
303 | obj = Session().query(UsersGroupRepoGroupToPerm)\ | |
|
304 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ | |
|
305 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ | |
|
306 | .scalar() | |
|
307 | self.assertEqual(obj.permission.permission_name, 'group.read') | |
|
308 | ||
|
309 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
310 | ||
|
311 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
312 | {u'group1': u'group.none'}) | |
|
313 | ||
|
314 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
315 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
316 | {u'group1': u'group.read'}) |
@@ -0,0 +1,170 | |||
|
1 | import os | |
|
2 | import unittest | |
|
3 | from rhodecode.tests import * | |
|
4 | ||
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |
|
6 | from rhodecode.model.repo import RepoModel | |
|
7 | from rhodecode.model.db import RepoGroup, User | |
|
8 | from rhodecode.model.meta import Session | |
|
9 | from sqlalchemy.exc import IntegrityError | |
|
10 | ||
|
11 | ||
|
12 | def _make_group(path, desc='desc', parent_id=None, | |
|
13 | skip_if_exists=False): | |
|
14 | ||
|
15 | gr = RepoGroup.get_by_group_name(path) | |
|
16 | if gr and skip_if_exists: | |
|
17 | return gr | |
|
18 | ||
|
19 | gr = ReposGroupModel().create(path, desc, parent_id) | |
|
20 | return gr | |
|
21 | ||
|
22 | ||
|
23 | class TestReposGroups(unittest.TestCase): | |
|
24 | ||
|
25 | def setUp(self): | |
|
26 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
27 | Session().commit() | |
|
28 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
29 | Session().commit() | |
|
30 | self.g3 = _make_group('test3', skip_if_exists=True) | |
|
31 | Session().commit() | |
|
32 | ||
|
33 | def tearDown(self): | |
|
34 | print 'out' | |
|
35 | ||
|
36 | def __check_path(self, *path): | |
|
37 | """ | |
|
38 | Checks the path for existance ! | |
|
39 | """ | |
|
40 | path = [TESTS_TMP_PATH] + list(path) | |
|
41 | path = os.path.join(*path) | |
|
42 | return os.path.isdir(path) | |
|
43 | ||
|
44 | def _check_folders(self): | |
|
45 | print os.listdir(TESTS_TMP_PATH) | |
|
46 | ||
|
47 | def __delete_group(self, id_): | |
|
48 | ReposGroupModel().delete(id_) | |
|
49 | ||
|
50 | def __update_group(self, id_, path, desc='desc', parent_id=None): | |
|
51 | form_data = dict( | |
|
52 | group_name=path, | |
|
53 | group_description=desc, | |
|
54 | group_parent_id=parent_id, | |
|
55 | perms_updates=[], | |
|
56 | perms_new=[] | |
|
57 | ) | |
|
58 | gr = ReposGroupModel().update(id_, form_data) | |
|
59 | return gr | |
|
60 | ||
|
61 | def test_create_group(self): | |
|
62 | g = _make_group('newGroup') | |
|
63 | self.assertEqual(g.full_path, 'newGroup') | |
|
64 | ||
|
65 | self.assertTrue(self.__check_path('newGroup')) | |
|
66 | ||
|
67 | def test_create_same_name_group(self): | |
|
68 | self.assertRaises(IntegrityError, lambda: _make_group('newGroup')) | |
|
69 | Session().rollback() | |
|
70 | ||
|
71 | def test_same_subgroup(self): | |
|
72 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) | |
|
73 | self.assertEqual(sg1.parent_group, self.g1) | |
|
74 | self.assertEqual(sg1.full_path, 'test1/sub1') | |
|
75 | self.assertTrue(self.__check_path('test1', 'sub1')) | |
|
76 | ||
|
77 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) | |
|
78 | self.assertEqual(ssg1.parent_group, sg1) | |
|
79 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') | |
|
80 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) | |
|
81 | ||
|
82 | def test_remove_group(self): | |
|
83 | sg1 = _make_group('deleteme') | |
|
84 | self.__delete_group(sg1.group_id) | |
|
85 | ||
|
86 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
87 | self.assertFalse(self.__check_path('deteteme')) | |
|
88 | ||
|
89 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) | |
|
90 | self.__delete_group(sg1.group_id) | |
|
91 | ||
|
92 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
93 | self.assertFalse(self.__check_path('test1', 'deteteme')) | |
|
94 | ||
|
95 | def test_rename_single_group(self): | |
|
96 | sg1 = _make_group('initial') | |
|
97 | ||
|
98 | new_sg1 = self.__update_group(sg1.group_id, 'after') | |
|
99 | self.assertTrue(self.__check_path('after')) | |
|
100 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) | |
|
101 | ||
|
102 | def test_update_group_parent(self): | |
|
103 | ||
|
104 | sg1 = _make_group('initial', parent_id=self.g1.group_id) | |
|
105 | ||
|
106 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) | |
|
107 | self.assertTrue(self.__check_path('test1', 'after')) | |
|
108 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) | |
|
109 | ||
|
110 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) | |
|
111 | self.assertTrue(self.__check_path('test3', 'after')) | |
|
112 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) | |
|
113 | ||
|
114 | new_sg1 = self.__update_group(sg1.group_id, 'hello') | |
|
115 | self.assertTrue(self.__check_path('hello')) | |
|
116 | ||
|
117 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) | |
|
118 | ||
|
119 | def test_subgrouping_with_repo(self): | |
|
120 | ||
|
121 | g1 = _make_group('g1') | |
|
122 | g2 = _make_group('g2') | |
|
123 | ||
|
124 | # create new repo | |
|
125 | form_data = dict(repo_name='john', | |
|
126 | repo_name_full='john', | |
|
127 | fork_name=None, | |
|
128 | description=None, | |
|
129 | repo_group=None, | |
|
130 | private=False, | |
|
131 | repo_type='hg', | |
|
132 | clone_uri=None, | |
|
133 | landing_rev='tip') | |
|
134 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | |
|
135 | r = RepoModel().create(form_data, cur_user) | |
|
136 | ||
|
137 | self.assertEqual(r.repo_name, 'john') | |
|
138 | ||
|
139 | # put repo into group | |
|
140 | form_data = form_data | |
|
141 | form_data['repo_group'] = g1.group_id | |
|
142 | form_data['perms_new'] = [] | |
|
143 | form_data['perms_updates'] = [] | |
|
144 | RepoModel().update(r.repo_name, form_data) | |
|
145 | self.assertEqual(r.repo_name, 'g1/john') | |
|
146 | ||
|
147 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) | |
|
148 | self.assertTrue(self.__check_path('g2', 'g1')) | |
|
149 | ||
|
150 | # test repo | |
|
151 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', | |
|
152 | r.just_name])) | |
|
153 | ||
|
154 | def test_move_to_root(self): | |
|
155 | g1 = _make_group('t11') | |
|
156 | Session().commit() | |
|
157 | g2 = _make_group('t22', parent_id=g1.group_id) | |
|
158 | Session().commit() | |
|
159 | ||
|
160 | self.assertEqual(g2.full_path, 't11/t22') | |
|
161 | self.assertTrue(self.__check_path('t11', 't22')) | |
|
162 | ||
|
163 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) | |
|
164 | Session().commit() | |
|
165 | ||
|
166 | self.assertEqual(g2.group_name, 'g22') | |
|
167 | # we moved out group from t1 to '' so it's full path should be 'g2' | |
|
168 | self.assertEqual(g2.full_path, 'g22') | |
|
169 | self.assertFalse(self.__check_path('t11', 't22')) | |
|
170 | self.assertTrue(self.__check_path('g22')) No newline at end of file |
@@ -0,0 +1,124 | |||
|
1 | import unittest | |
|
2 | from rhodecode.tests import * | |
|
3 | ||
|
4 | from rhodecode.model.db import User, UsersGroup, UsersGroupMember, UserEmailMap,\ | |
|
5 | Permission | |
|
6 | from rhodecode.model.user import UserModel | |
|
7 | ||
|
8 | from rhodecode.model.meta import Session | |
|
9 | from rhodecode.model.users_group import UsersGroupModel | |
|
10 | ||
|
11 | ||
|
12 | class TestUser(unittest.TestCase): | |
|
13 | def __init__(self, methodName='runTest'): | |
|
14 | Session.remove() | |
|
15 | super(TestUser, self).__init__(methodName=methodName) | |
|
16 | ||
|
17 | def test_create_and_remove(self): | |
|
18 | usr = UserModel().create_or_update(username=u'test_user', | |
|
19 | password=u'qweqwe', | |
|
20 | email=u'u232@rhodecode.org', | |
|
21 | firstname=u'u1', lastname=u'u1') | |
|
22 | Session().commit() | |
|
23 | self.assertEqual(User.get_by_username(u'test_user'), usr) | |
|
24 | ||
|
25 | # make users group | |
|
26 | users_group = UsersGroupModel().create('some_example_group') | |
|
27 | Session().commit() | |
|
28 | ||
|
29 | UsersGroupModel().add_user_to_group(users_group, usr) | |
|
30 | Session().commit() | |
|
31 | ||
|
32 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) | |
|
33 | self.assertEqual(UsersGroupMember.query().count(), 1) | |
|
34 | UserModel().delete(usr.user_id) | |
|
35 | Session().commit() | |
|
36 | ||
|
37 | self.assertEqual(UsersGroupMember.query().all(), []) | |
|
38 | ||
|
39 | def test_additonal_email_as_main(self): | |
|
40 | usr = UserModel().create_or_update(username=u'test_user', | |
|
41 | password=u'qweqwe', | |
|
42 | email=u'main_email@rhodecode.org', | |
|
43 | firstname=u'u1', lastname=u'u1') | |
|
44 | Session().commit() | |
|
45 | ||
|
46 | def do(): | |
|
47 | m = UserEmailMap() | |
|
48 | m.email = u'main_email@rhodecode.org' | |
|
49 | m.user = usr | |
|
50 | Session().add(m) | |
|
51 | Session().commit() | |
|
52 | self.assertRaises(AttributeError, do) | |
|
53 | ||
|
54 | UserModel().delete(usr.user_id) | |
|
55 | Session().commit() | |
|
56 | ||
|
57 | def test_extra_email_map(self): | |
|
58 | usr = UserModel().create_or_update(username=u'test_user', | |
|
59 | password=u'qweqwe', | |
|
60 | email=u'main_email@rhodecode.org', | |
|
61 | firstname=u'u1', lastname=u'u1') | |
|
62 | Session().commit() | |
|
63 | ||
|
64 | m = UserEmailMap() | |
|
65 | m.email = u'main_email2@rhodecode.org' | |
|
66 | m.user = usr | |
|
67 | Session().add(m) | |
|
68 | Session().commit() | |
|
69 | ||
|
70 | u = User.get_by_email(email='main_email@rhodecode.org') | |
|
71 | self.assertEqual(usr.user_id, u.user_id) | |
|
72 | self.assertEqual(usr.username, u.username) | |
|
73 | ||
|
74 | u = User.get_by_email(email='main_email2@rhodecode.org') | |
|
75 | self.assertEqual(usr.user_id, u.user_id) | |
|
76 | self.assertEqual(usr.username, u.username) | |
|
77 | u = User.get_by_email(email='main_email3@rhodecode.org') | |
|
78 | self.assertEqual(None, u) | |
|
79 | ||
|
80 | UserModel().delete(usr.user_id) | |
|
81 | Session().commit() | |
|
82 | ||
|
83 | ||
|
84 | class TestUsers(unittest.TestCase): | |
|
85 | ||
|
86 | def __init__(self, methodName='runTest'): | |
|
87 | super(TestUsers, self).__init__(methodName=methodName) | |
|
88 | ||
|
89 | def setUp(self): | |
|
90 | self.u1 = UserModel().create_or_update(username=u'u1', | |
|
91 | password=u'qweqwe', | |
|
92 | email=u'u1@rhodecode.org', | |
|
93 | firstname=u'u1', lastname=u'u1') | |
|
94 | ||
|
95 | def tearDown(self): | |
|
96 | perm = Permission.query().all() | |
|
97 | for p in perm: | |
|
98 | UserModel().revoke_perm(self.u1, p) | |
|
99 | ||
|
100 | UserModel().delete(self.u1) | |
|
101 | Session().commit() | |
|
102 | ||
|
103 | def test_add_perm(self): | |
|
104 | perm = Permission.query().all()[0] | |
|
105 | UserModel().grant_perm(self.u1, perm) | |
|
106 | Session().commit() | |
|
107 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
108 | ||
|
109 | def test_has_perm(self): | |
|
110 | perm = Permission.query().all() | |
|
111 | for p in perm: | |
|
112 | has_p = UserModel().has_perm(self.u1, p) | |
|
113 | self.assertEqual(False, has_p) | |
|
114 | ||
|
115 | def test_revoke_perm(self): | |
|
116 | perm = Permission.query().all()[0] | |
|
117 | UserModel().grant_perm(self.u1, perm) | |
|
118 | Session().commit() | |
|
119 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
120 | ||
|
121 | #revoke | |
|
122 | UserModel().revoke_perm(self.u1, perm) | |
|
123 | Session().commit() | |
|
124 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) |
This diff has been collapsed as it changes many lines, (604 lines changed) Show them Hide them | |||
@@ -1,761 +1,189 | |||
|
1 | 1 | import os |
|
2 | 2 | import unittest |
|
3 | 3 | from rhodecode.tests import * |
|
4 | 4 | |
|
5 | from rhodecode.model.repos_group import ReposGroupModel | |
|
6 | from rhodecode.model.repo import RepoModel | |
|
7 | from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \ | |
|
8 | UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm,\ | |
|
9 | Repository, UserEmailMap | |
|
10 | from sqlalchemy.exc import IntegrityError, DatabaseError | |
|
5 | from rhodecode.model.db import User, Notification, UserNotification | |
|
11 | 6 | from rhodecode.model.user import UserModel |
|
12 | 7 | |
|
13 | 8 | from rhodecode.model.meta import Session |
|
14 | 9 | from rhodecode.model.notification import NotificationModel |
|
15 | from rhodecode.model.users_group import UsersGroupModel | |
|
16 | from rhodecode.lib.auth import AuthUser | |
|
17 | ||
|
18 | ||
|
19 | def _make_group(path, desc='desc', parent_id=None, | |
|
20 | skip_if_exists=False): | |
|
21 | ||
|
22 | gr = RepoGroup.get_by_group_name(path) | |
|
23 | if gr and skip_if_exists: | |
|
24 | return gr | |
|
25 | ||
|
26 | gr = ReposGroupModel().create(path, desc, parent_id) | |
|
27 | return gr | |
|
28 | ||
|
29 | ||
|
30 | class TestReposGroups(unittest.TestCase): | |
|
31 | ||
|
32 | def setUp(self): | |
|
33 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
34 | Session.commit() | |
|
35 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
36 | Session.commit() | |
|
37 | self.g3 = _make_group('test3', skip_if_exists=True) | |
|
38 | Session.commit() | |
|
39 | ||
|
40 | def tearDown(self): | |
|
41 | print 'out' | |
|
42 | ||
|
43 | def __check_path(self, *path): | |
|
44 | """ | |
|
45 | Checks the path for existance ! | |
|
46 | """ | |
|
47 | path = [TESTS_TMP_PATH] + list(path) | |
|
48 | path = os.path.join(*path) | |
|
49 | return os.path.isdir(path) | |
|
50 | ||
|
51 | def _check_folders(self): | |
|
52 | print os.listdir(TESTS_TMP_PATH) | |
|
53 | ||
|
54 | def __delete_group(self, id_): | |
|
55 | ReposGroupModel().delete(id_) | |
|
56 | ||
|
57 | def __update_group(self, id_, path, desc='desc', parent_id=None): | |
|
58 | form_data = dict( | |
|
59 | group_name=path, | |
|
60 | group_description=desc, | |
|
61 | group_parent_id=parent_id, | |
|
62 | perms_updates=[], | |
|
63 | perms_new=[] | |
|
64 | ) | |
|
65 | gr = ReposGroupModel().update(id_, form_data) | |
|
66 | return gr | |
|
67 | ||
|
68 | def test_create_group(self): | |
|
69 | g = _make_group('newGroup') | |
|
70 | self.assertEqual(g.full_path, 'newGroup') | |
|
71 | ||
|
72 | self.assertTrue(self.__check_path('newGroup')) | |
|
73 | ||
|
74 | def test_create_same_name_group(self): | |
|
75 | self.assertRaises(IntegrityError, lambda:_make_group('newGroup')) | |
|
76 | Session.rollback() | |
|
77 | ||
|
78 | def test_same_subgroup(self): | |
|
79 | sg1 = _make_group('sub1', parent_id=self.g1.group_id) | |
|
80 | self.assertEqual(sg1.parent_group, self.g1) | |
|
81 | self.assertEqual(sg1.full_path, 'test1/sub1') | |
|
82 | self.assertTrue(self.__check_path('test1', 'sub1')) | |
|
83 | ||
|
84 | ssg1 = _make_group('subsub1', parent_id=sg1.group_id) | |
|
85 | self.assertEqual(ssg1.parent_group, sg1) | |
|
86 | self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1') | |
|
87 | self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1')) | |
|
88 | ||
|
89 | def test_remove_group(self): | |
|
90 | sg1 = _make_group('deleteme') | |
|
91 | self.__delete_group(sg1.group_id) | |
|
92 | ||
|
93 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
94 | self.assertFalse(self.__check_path('deteteme')) | |
|
95 | ||
|
96 | sg1 = _make_group('deleteme', parent_id=self.g1.group_id) | |
|
97 | self.__delete_group(sg1.group_id) | |
|
98 | ||
|
99 | self.assertEqual(RepoGroup.get(sg1.group_id), None) | |
|
100 | self.assertFalse(self.__check_path('test1', 'deteteme')) | |
|
101 | ||
|
102 | def test_rename_single_group(self): | |
|
103 | sg1 = _make_group('initial') | |
|
104 | ||
|
105 | new_sg1 = self.__update_group(sg1.group_id, 'after') | |
|
106 | self.assertTrue(self.__check_path('after')) | |
|
107 | self.assertEqual(RepoGroup.get_by_group_name('initial'), None) | |
|
108 | ||
|
109 | def test_update_group_parent(self): | |
|
110 | ||
|
111 | sg1 = _make_group('initial', parent_id=self.g1.group_id) | |
|
112 | ||
|
113 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id) | |
|
114 | self.assertTrue(self.__check_path('test1', 'after')) | |
|
115 | self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None) | |
|
116 | ||
|
117 | new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id) | |
|
118 | self.assertTrue(self.__check_path('test3', 'after')) | |
|
119 | self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None) | |
|
120 | ||
|
121 | new_sg1 = self.__update_group(sg1.group_id, 'hello') | |
|
122 | self.assertTrue(self.__check_path('hello')) | |
|
123 | ||
|
124 | self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1) | |
|
125 | ||
|
126 | def test_subgrouping_with_repo(self): | |
|
127 | ||
|
128 | g1 = _make_group('g1') | |
|
129 | g2 = _make_group('g2') | |
|
130 | ||
|
131 | # create new repo | |
|
132 | form_data = dict(repo_name='john', | |
|
133 | repo_name_full='john', | |
|
134 | fork_name=None, | |
|
135 | description=None, | |
|
136 | repo_group=None, | |
|
137 | private=False, | |
|
138 | repo_type='hg', | |
|
139 | clone_uri=None, | |
|
140 | landing_rev='tip') | |
|
141 | cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN) | |
|
142 | r = RepoModel().create(form_data, cur_user) | |
|
143 | ||
|
144 | self.assertEqual(r.repo_name, 'john') | |
|
145 | ||
|
146 | # put repo into group | |
|
147 | form_data = form_data | |
|
148 | form_data['repo_group'] = g1.group_id | |
|
149 | form_data['perms_new'] = [] | |
|
150 | form_data['perms_updates'] = [] | |
|
151 | RepoModel().update(r.repo_name, form_data) | |
|
152 | self.assertEqual(r.repo_name, 'g1/john') | |
|
153 | ||
|
154 | self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id) | |
|
155 | self.assertTrue(self.__check_path('g2', 'g1')) | |
|
156 | ||
|
157 | # test repo | |
|
158 | self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name])) | |
|
159 | ||
|
160 | def test_move_to_root(self): | |
|
161 | g1 = _make_group('t11') | |
|
162 | Session.commit() | |
|
163 | g2 = _make_group('t22', parent_id=g1.group_id) | |
|
164 | Session.commit() | |
|
165 | ||
|
166 | self.assertEqual(g2.full_path, 't11/t22') | |
|
167 | self.assertTrue(self.__check_path('t11', 't22')) | |
|
168 | ||
|
169 | g2 = self.__update_group(g2.group_id, 'g22', parent_id=None) | |
|
170 | Session.commit() | |
|
171 | ||
|
172 | self.assertEqual(g2.group_name, 'g22') | |
|
173 | # we moved out group from t1 to '' so it's full path should be 'g2' | |
|
174 | self.assertEqual(g2.full_path, 'g22') | |
|
175 | self.assertFalse(self.__check_path('t11', 't22')) | |
|
176 | self.assertTrue(self.__check_path('g22')) | |
|
177 | ||
|
178 | ||
|
179 | class TestUser(unittest.TestCase): | |
|
180 | def __init__(self, methodName='runTest'): | |
|
181 | Session.remove() | |
|
182 | super(TestUser, self).__init__(methodName=methodName) | |
|
183 | ||
|
184 | def test_create_and_remove(self): | |
|
185 | usr = UserModel().create_or_update(username=u'test_user', | |
|
186 | password=u'qweqwe', | |
|
187 | email=u'u232@rhodecode.org', | |
|
188 | firstname=u'u1', lastname=u'u1') | |
|
189 | Session.commit() | |
|
190 | self.assertEqual(User.get_by_username(u'test_user'), usr) | |
|
191 | ||
|
192 | # make users group | |
|
193 | users_group = UsersGroupModel().create('some_example_group') | |
|
194 | Session.commit() | |
|
195 | ||
|
196 | UsersGroupModel().add_user_to_group(users_group, usr) | |
|
197 | Session.commit() | |
|
198 | ||
|
199 | self.assertEqual(UsersGroup.get(users_group.users_group_id), users_group) | |
|
200 | self.assertEqual(UsersGroupMember.query().count(), 1) | |
|
201 | UserModel().delete(usr.user_id) | |
|
202 | Session.commit() | |
|
203 | ||
|
204 | self.assertEqual(UsersGroupMember.query().all(), []) | |
|
205 | ||
|
206 | def test_additonal_email_as_main(self): | |
|
207 | usr = UserModel().create_or_update(username=u'test_user', | |
|
208 | password=u'qweqwe', | |
|
209 | email=u'main_email@rhodecode.org', | |
|
210 | firstname=u'u1', lastname=u'u1') | |
|
211 | Session.commit() | |
|
212 | ||
|
213 | def do(): | |
|
214 | m = UserEmailMap() | |
|
215 | m.email = u'main_email@rhodecode.org' | |
|
216 | m.user = usr | |
|
217 | Session.add(m) | |
|
218 | Session.commit() | |
|
219 | self.assertRaises(AttributeError, do) | |
|
220 | ||
|
221 | UserModel().delete(usr.user_id) | |
|
222 | Session.commit() | |
|
223 | ||
|
224 | def test_extra_email_map(self): | |
|
225 | usr = UserModel().create_or_update(username=u'test_user', | |
|
226 | password=u'qweqwe', | |
|
227 | email=u'main_email@rhodecode.org', | |
|
228 | firstname=u'u1', lastname=u'u1') | |
|
229 | Session.commit() | |
|
230 | ||
|
231 | m = UserEmailMap() | |
|
232 | m.email = u'main_email2@rhodecode.org' | |
|
233 | m.user = usr | |
|
234 | Session.add(m) | |
|
235 | Session.commit() | |
|
236 | ||
|
237 | u = User.get_by_email(email='main_email@rhodecode.org') | |
|
238 | self.assertEqual(usr.user_id, u.user_id) | |
|
239 | self.assertEqual(usr.username, u.username) | |
|
240 | ||
|
241 | u = User.get_by_email(email='main_email2@rhodecode.org') | |
|
242 | self.assertEqual(usr.user_id, u.user_id) | |
|
243 | self.assertEqual(usr.username, u.username) | |
|
244 | u = User.get_by_email(email='main_email3@rhodecode.org') | |
|
245 | self.assertEqual(None, u) | |
|
246 | ||
|
247 | UserModel().delete(usr.user_id) | |
|
248 | Session.commit() | |
|
249 | 10 | |
|
250 | 11 | |
|
251 | 12 | class TestNotifications(unittest.TestCase): |
|
252 | 13 | |
|
253 | 14 | def __init__(self, methodName='runTest'): |
|
254 | 15 | Session.remove() |
|
255 | 16 | self.u1 = UserModel().create_or_update(username=u'u1', |
|
256 | 17 | password=u'qweqwe', |
|
257 | 18 | email=u'u1@rhodecode.org', |
|
258 | 19 | firstname=u'u1', lastname=u'u1') |
|
259 | Session.commit() | |
|
20 | Session().commit() | |
|
260 | 21 | self.u1 = self.u1.user_id |
|
261 | 22 | |
|
262 | 23 | self.u2 = UserModel().create_or_update(username=u'u2', |
|
263 | 24 | password=u'qweqwe', |
|
264 | 25 | email=u'u2@rhodecode.org', |
|
265 | 26 | firstname=u'u2', lastname=u'u3') |
|
266 | Session.commit() | |
|
27 | Session().commit() | |
|
267 | 28 | self.u2 = self.u2.user_id |
|
268 | 29 | |
|
269 | 30 | self.u3 = UserModel().create_or_update(username=u'u3', |
|
270 | 31 | password=u'qweqwe', |
|
271 | 32 | email=u'u3@rhodecode.org', |
|
272 | 33 | firstname=u'u3', lastname=u'u3') |
|
273 | Session.commit() | |
|
34 | Session().commit() | |
|
274 | 35 | self.u3 = self.u3.user_id |
|
275 | 36 | |
|
276 | 37 | super(TestNotifications, self).__init__(methodName=methodName) |
|
277 | 38 | |
|
278 | 39 | def _clean_notifications(self): |
|
279 | 40 | for n in Notification.query().all(): |
|
280 | Session.delete(n) | |
|
41 | Session().delete(n) | |
|
281 | 42 | |
|
282 | Session.commit() | |
|
43 | Session().commit() | |
|
283 | 44 | self.assertEqual(Notification.query().all(), []) |
|
284 | 45 | |
|
285 | 46 | def tearDown(self): |
|
286 | 47 | self._clean_notifications() |
|
287 | 48 | |
|
288 | 49 | def test_create_notification(self): |
|
289 | 50 | self.assertEqual([], Notification.query().all()) |
|
290 | 51 | self.assertEqual([], UserNotification.query().all()) |
|
291 | 52 | |
|
292 | 53 | usrs = [self.u1, self.u2] |
|
293 | 54 | notification = NotificationModel().create(created_by=self.u1, |
|
294 | 55 | subject=u'subj', body=u'hi there', |
|
295 | 56 | recipients=usrs) |
|
296 | Session.commit() | |
|
57 | Session().commit() | |
|
297 | 58 | u1 = User.get(self.u1) |
|
298 | 59 | u2 = User.get(self.u2) |
|
299 | 60 | u3 = User.get(self.u3) |
|
300 | 61 | notifications = Notification.query().all() |
|
301 | 62 | self.assertEqual(len(notifications), 1) |
|
302 | 63 | |
|
303 | 64 | unotification = UserNotification.query()\ |
|
304 | 65 | .filter(UserNotification.notification == notification).all() |
|
305 | 66 | |
|
306 | 67 | self.assertEqual(notifications[0].recipients, [u1, u2]) |
|
307 | 68 | self.assertEqual(notification.notification_id, |
|
308 | 69 | notifications[0].notification_id) |
|
309 | 70 | self.assertEqual(len(unotification), len(usrs)) |
|
310 | 71 | self.assertEqual([x.user.user_id for x in unotification], usrs) |
|
311 | 72 | |
|
312 | 73 | def test_user_notifications(self): |
|
313 | 74 | self.assertEqual([], Notification.query().all()) |
|
314 | 75 | self.assertEqual([], UserNotification.query().all()) |
|
315 | 76 | |
|
316 | 77 | notification1 = NotificationModel().create(created_by=self.u1, |
|
317 | 78 | subject=u'subj', body=u'hi there1', |
|
318 | 79 | recipients=[self.u3]) |
|
319 | Session.commit() | |
|
80 | Session().commit() | |
|
320 | 81 | notification2 = NotificationModel().create(created_by=self.u1, |
|
321 | 82 | subject=u'subj', body=u'hi there2', |
|
322 | 83 | recipients=[self.u3]) |
|
323 | Session.commit() | |
|
324 | u3 = Session.query(User).get(self.u3) | |
|
84 | Session().commit() | |
|
85 | u3 = Session().query(User).get(self.u3) | |
|
325 | 86 | |
|
326 | 87 | self.assertEqual(sorted([x.notification for x in u3.notifications]), |
|
327 | 88 | sorted([notification2, notification1])) |
|
328 | 89 | |
|
329 | 90 | def test_delete_notifications(self): |
|
330 | 91 | self.assertEqual([], Notification.query().all()) |
|
331 | 92 | self.assertEqual([], UserNotification.query().all()) |
|
332 | 93 | |
|
333 | 94 | notification = NotificationModel().create(created_by=self.u1, |
|
334 | 95 | subject=u'title', body=u'hi there3', |
|
335 | 96 | recipients=[self.u3, self.u1, self.u2]) |
|
336 | Session.commit() | |
|
97 | Session().commit() | |
|
337 | 98 | notifications = Notification.query().all() |
|
338 | 99 | self.assertTrue(notification in notifications) |
|
339 | 100 | |
|
340 | 101 | Notification.delete(notification.notification_id) |
|
341 | Session.commit() | |
|
102 | Session().commit() | |
|
342 | 103 | |
|
343 | 104 | notifications = Notification.query().all() |
|
344 | 105 | self.assertFalse(notification in notifications) |
|
345 | 106 | |
|
346 | 107 | un = UserNotification.query().filter(UserNotification.notification |
|
347 | 108 | == notification).all() |
|
348 | 109 | self.assertEqual(un, []) |
|
349 | 110 | |
|
350 | 111 | def test_delete_association(self): |
|
351 | 112 | |
|
352 | 113 | self.assertEqual([], Notification.query().all()) |
|
353 | 114 | self.assertEqual([], UserNotification.query().all()) |
|
354 | 115 | |
|
355 | 116 | notification = NotificationModel().create(created_by=self.u1, |
|
356 | 117 | subject=u'title', body=u'hi there3', |
|
357 | 118 | recipients=[self.u3, self.u1, self.u2]) |
|
358 | Session.commit() | |
|
119 | Session().commit() | |
|
359 | 120 | |
|
360 | 121 | unotification = UserNotification.query()\ |
|
361 | 122 | .filter(UserNotification.notification == |
|
362 | 123 | notification)\ |
|
363 | 124 | .filter(UserNotification.user_id == self.u3)\ |
|
364 | 125 | .scalar() |
|
365 | 126 | |
|
366 | 127 | self.assertEqual(unotification.user_id, self.u3) |
|
367 | 128 | |
|
368 | 129 | NotificationModel().delete(self.u3, |
|
369 | 130 | notification.notification_id) |
|
370 | Session.commit() | |
|
131 | Session().commit() | |
|
371 | 132 | |
|
372 | 133 | u3notification = UserNotification.query()\ |
|
373 | 134 | .filter(UserNotification.notification == |
|
374 | 135 | notification)\ |
|
375 | 136 | .filter(UserNotification.user_id == self.u3)\ |
|
376 | 137 | .scalar() |
|
377 | 138 | |
|
378 | 139 | self.assertEqual(u3notification, None) |
|
379 | 140 | |
|
380 | 141 | # notification object is still there |
|
381 | 142 | self.assertEqual(Notification.query().all(), [notification]) |
|
382 | 143 | |
|
383 | 144 | #u1 and u2 still have assignments |
|
384 | 145 | u1notification = UserNotification.query()\ |
|
385 | 146 | .filter(UserNotification.notification == |
|
386 | 147 | notification)\ |
|
387 | 148 | .filter(UserNotification.user_id == self.u1)\ |
|
388 | 149 | .scalar() |
|
389 | 150 | self.assertNotEqual(u1notification, None) |
|
390 | 151 | u2notification = UserNotification.query()\ |
|
391 | 152 | .filter(UserNotification.notification == |
|
392 | 153 | notification)\ |
|
393 | 154 | .filter(UserNotification.user_id == self.u2)\ |
|
394 | 155 | .scalar() |
|
395 | 156 | self.assertNotEqual(u2notification, None) |
|
396 | 157 | |
|
397 | 158 | def test_notification_counter(self): |
|
398 | 159 | self._clean_notifications() |
|
399 | 160 | self.assertEqual([], Notification.query().all()) |
|
400 | 161 | self.assertEqual([], UserNotification.query().all()) |
|
401 | 162 | |
|
402 | 163 | NotificationModel().create(created_by=self.u1, |
|
403 | 164 | subject=u'title', body=u'hi there_delete', |
|
404 | 165 | recipients=[self.u3, self.u1]) |
|
405 | Session.commit() | |
|
166 | Session().commit() | |
|
406 | 167 | |
|
407 | 168 | self.assertEqual(NotificationModel() |
|
408 | 169 | .get_unread_cnt_for_user(self.u1), 1) |
|
409 | 170 | self.assertEqual(NotificationModel() |
|
410 | 171 | .get_unread_cnt_for_user(self.u2), 0) |
|
411 | 172 | self.assertEqual(NotificationModel() |
|
412 | 173 | .get_unread_cnt_for_user(self.u3), 1) |
|
413 | 174 | |
|
414 | 175 | notification = NotificationModel().create(created_by=self.u1, |
|
415 | 176 | subject=u'title', body=u'hi there3', |
|
416 | 177 | recipients=[self.u3, self.u1, self.u2]) |
|
417 | Session.commit() | |
|
178 | Session().commit() | |
|
418 | 179 | |
|
419 | 180 | self.assertEqual(NotificationModel() |
|
420 | 181 | .get_unread_cnt_for_user(self.u1), 2) |
|
421 | 182 | self.assertEqual(NotificationModel() |
|
422 | 183 | .get_unread_cnt_for_user(self.u2), 1) |
|
423 | 184 | self.assertEqual(NotificationModel() |
|
424 | 185 | .get_unread_cnt_for_user(self.u3), 2) |
|
425 | 186 | |
|
426 | 187 | |
|
427 | class TestUsers(unittest.TestCase): | |
|
428 | ||
|
429 | def __init__(self, methodName='runTest'): | |
|
430 | super(TestUsers, self).__init__(methodName=methodName) | |
|
431 | ||
|
432 | def setUp(self): | |
|
433 | self.u1 = UserModel().create_or_update(username=u'u1', | |
|
434 | password=u'qweqwe', | |
|
435 | email=u'u1@rhodecode.org', | |
|
436 | firstname=u'u1', lastname=u'u1') | |
|
437 | ||
|
438 | def tearDown(self): | |
|
439 | perm = Permission.query().all() | |
|
440 | for p in perm: | |
|
441 | UserModel().revoke_perm(self.u1, p) | |
|
442 | ||
|
443 | UserModel().delete(self.u1) | |
|
444 | Session.commit() | |
|
445 | ||
|
446 | def test_add_perm(self): | |
|
447 | perm = Permission.query().all()[0] | |
|
448 | UserModel().grant_perm(self.u1, perm) | |
|
449 | Session.commit() | |
|
450 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
451 | ||
|
452 | def test_has_perm(self): | |
|
453 | perm = Permission.query().all() | |
|
454 | for p in perm: | |
|
455 | has_p = UserModel().has_perm(self.u1, p) | |
|
456 | self.assertEqual(False, has_p) | |
|
457 | ||
|
458 | def test_revoke_perm(self): | |
|
459 | perm = Permission.query().all()[0] | |
|
460 | UserModel().grant_perm(self.u1, perm) | |
|
461 | Session.commit() | |
|
462 | self.assertEqual(UserModel().has_perm(self.u1, perm), True) | |
|
463 | ||
|
464 | #revoke | |
|
465 | UserModel().revoke_perm(self.u1, perm) | |
|
466 | Session.commit() | |
|
467 | self.assertEqual(UserModel().has_perm(self.u1, perm), False) | |
|
468 | 188 | |
|
469 | 189 | |
|
470 | class TestPermissions(unittest.TestCase): | |
|
471 | def __init__(self, methodName='runTest'): | |
|
472 | super(TestPermissions, self).__init__(methodName=methodName) | |
|
473 | ||
|
474 | def setUp(self): | |
|
475 | self.u1 = UserModel().create_or_update( | |
|
476 | username=u'u1', password=u'qweqwe', | |
|
477 | email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1' | |
|
478 | ) | |
|
479 | self.u2 = UserModel().create_or_update( | |
|
480 | username=u'u2', password=u'qweqwe', | |
|
481 | email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2' | |
|
482 | ) | |
|
483 | self.anon = User.get_by_username('default') | |
|
484 | self.a1 = UserModel().create_or_update( | |
|
485 | username=u'a1', password=u'qweqwe', | |
|
486 | email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True | |
|
487 | ) | |
|
488 | Session.commit() | |
|
489 | ||
|
490 | def tearDown(self): | |
|
491 | if hasattr(self, 'test_repo'): | |
|
492 | RepoModel().delete(repo=self.test_repo) | |
|
493 | UserModel().delete(self.u1) | |
|
494 | UserModel().delete(self.u2) | |
|
495 | UserModel().delete(self.a1) | |
|
496 | if hasattr(self, 'g1'): | |
|
497 | ReposGroupModel().delete(self.g1.group_id) | |
|
498 | if hasattr(self, 'g2'): | |
|
499 | ReposGroupModel().delete(self.g2.group_id) | |
|
500 | ||
|
501 | if hasattr(self, 'ug1'): | |
|
502 | UsersGroupModel().delete(self.ug1, force=True) | |
|
503 | ||
|
504 | Session.commit() | |
|
505 | ||
|
506 | def test_default_perms_set(self): | |
|
507 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
508 | perms = { | |
|
509 | 'repositories_groups': {}, | |
|
510 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
511 | u'hg.register.manual_activate']), | |
|
512 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
513 | } | |
|
514 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
515 | perms['repositories'][HG_REPO]) | |
|
516 | new_perm = 'repository.write' | |
|
517 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |
|
518 | Session.commit() | |
|
519 | ||
|
520 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
521 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], new_perm) | |
|
522 | ||
|
523 | def test_default_admin_perms_set(self): | |
|
524 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
525 | perms = { | |
|
526 | 'repositories_groups': {}, | |
|
527 | 'global': set([u'hg.admin']), | |
|
528 | 'repositories': {u'vcs_test_hg': u'repository.admin'} | |
|
529 | } | |
|
530 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
531 | perms['repositories'][HG_REPO]) | |
|
532 | new_perm = 'repository.write' | |
|
533 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.a1, perm=new_perm) | |
|
534 | Session.commit() | |
|
535 | # cannot really downgrade admins permissions !? they still get's set as | |
|
536 | # admin ! | |
|
537 | u1_auth = AuthUser(user_id=self.a1.user_id) | |
|
538 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
539 | perms['repositories'][HG_REPO]) | |
|
540 | ||
|
541 | def test_default_group_perms(self): | |
|
542 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
543 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
544 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
545 | perms = { | |
|
546 | 'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'}, | |
|
547 | 'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']), | |
|
548 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
549 | } | |
|
550 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
551 | perms['repositories'][HG_REPO]) | |
|
552 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
553 | perms['repositories_groups']) | |
|
554 | ||
|
555 | def test_default_admin_group_perms(self): | |
|
556 | self.g1 = _make_group('test1', skip_if_exists=True) | |
|
557 | self.g2 = _make_group('test2', skip_if_exists=True) | |
|
558 | a1_auth = AuthUser(user_id=self.a1.user_id) | |
|
559 | perms = { | |
|
560 | 'repositories_groups': {u'test1': 'group.admin', u'test2': 'group.admin'}, | |
|
561 | 'global': set(['hg.admin']), | |
|
562 | 'repositories': {u'vcs_test_hg': 'repository.admin'} | |
|
563 | } | |
|
564 | ||
|
565 | self.assertEqual(a1_auth.permissions['repositories'][HG_REPO], | |
|
566 | perms['repositories'][HG_REPO]) | |
|
567 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
568 | perms['repositories_groups']) | |
|
569 | ||
|
570 | def test_propagated_permission_from_users_group(self): | |
|
571 | # make group | |
|
572 | self.ug1 = UsersGroupModel().create('G1') | |
|
573 | # add user to group | |
|
574 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
575 | ||
|
576 | # set permission to lower | |
|
577 | new_perm = 'repository.none' | |
|
578 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm) | |
|
579 | Session.commit() | |
|
580 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
581 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
582 | new_perm) | |
|
583 | ||
|
584 | # grant perm for group this should override permission from user | |
|
585 | new_perm = 'repository.write' | |
|
586 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
587 | group_name=self.ug1, | |
|
588 | perm=new_perm) | |
|
589 | # check perms | |
|
590 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
591 | perms = { | |
|
592 | 'repositories_groups': {}, | |
|
593 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
594 | u'hg.register.manual_activate']), | |
|
595 | 'repositories': {u'vcs_test_hg': u'repository.read'} | |
|
596 | } | |
|
597 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
598 | new_perm) | |
|
599 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
600 | perms['repositories_groups']) | |
|
601 | ||
|
602 | def test_propagated_permission_from_users_group_lower_weight(self): | |
|
603 | # make group | |
|
604 | self.ug1 = UsersGroupModel().create('G1') | |
|
605 | # add user to group | |
|
606 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
607 | ||
|
608 | # set permission to lower | |
|
609 | new_perm_h = 'repository.write' | |
|
610 | RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, | |
|
611 | perm=new_perm_h) | |
|
612 | Session.commit() | |
|
613 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
614 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
615 | new_perm_h) | |
|
616 | ||
|
617 | # grant perm for group this should NOT override permission from user | |
|
618 | # since it's lower than granted | |
|
619 | new_perm_l = 'repository.read' | |
|
620 | RepoModel().grant_users_group_permission(repo=HG_REPO, | |
|
621 | group_name=self.ug1, | |
|
622 | perm=new_perm_l) | |
|
623 | # check perms | |
|
624 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
625 | perms = { | |
|
626 | 'repositories_groups': {}, | |
|
627 | 'global': set([u'hg.create.repository', u'repository.read', | |
|
628 | u'hg.register.manual_activate']), | |
|
629 | 'repositories': {u'vcs_test_hg': u'repository.write'} | |
|
630 | } | |
|
631 | self.assertEqual(u1_auth.permissions['repositories'][HG_REPO], | |
|
632 | new_perm_h) | |
|
633 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
634 | perms['repositories_groups']) | |
|
635 | ||
|
636 | def test_repo_in_group_permissions(self): | |
|
637 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
638 | self.g2 = _make_group('group2', skip_if_exists=True) | |
|
639 | Session.commit() | |
|
640 | # both perms should be read ! | |
|
641 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
642 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
643 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
644 | ||
|
645 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
646 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
647 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
648 | ||
|
649 | #Change perms to none for both groups | |
|
650 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
651 | user=self.anon, | |
|
652 | perm='group.none') | |
|
653 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
654 | user=self.anon, | |
|
655 | perm='group.none') | |
|
656 | ||
|
657 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
658 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
659 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
660 | ||
|
661 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
662 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
663 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
664 | ||
|
665 | # add repo to group | |
|
666 | form_data = { | |
|
667 | 'repo_name': HG_REPO, | |
|
668 | 'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]), | |
|
669 | 'repo_type': 'hg', | |
|
670 | 'clone_uri': '', | |
|
671 | 'repo_group': self.g1.group_id, | |
|
672 | 'description': 'desc', | |
|
673 | 'private': False, | |
|
674 | 'landing_rev': 'tip' | |
|
675 | } | |
|
676 | self.test_repo = RepoModel().create(form_data, cur_user=self.u1) | |
|
677 | Session.commit() | |
|
678 | ||
|
679 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
680 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
681 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
682 | ||
|
683 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
684 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
685 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
686 | ||
|
687 | #grant permission for u2 ! | |
|
688 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
689 | user=self.u2, | |
|
690 | perm='group.read') | |
|
691 | ReposGroupModel().grant_user_permission(repos_group=self.g2, | |
|
692 | user=self.u2, | |
|
693 | perm='group.read') | |
|
694 | Session.commit() | |
|
695 | self.assertNotEqual(self.u1, self.u2) | |
|
696 | #u1 and anon should have not change perms while u2 should ! | |
|
697 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
698 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
699 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
700 | ||
|
701 | u2_auth = AuthUser(user_id=self.u2.user_id) | |
|
702 | self.assertEqual(u2_auth.permissions['repositories_groups'], | |
|
703 | {u'group1': u'group.read', u'group2': u'group.read'}) | |
|
704 | ||
|
705 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
706 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
707 | {u'group1': u'group.none', u'group2': u'group.none'}) | |
|
708 | ||
|
709 | def test_repo_group_user_as_user_group_member(self): | |
|
710 | # create Group1 | |
|
711 | self.g1 = _make_group('group1', skip_if_exists=True) | |
|
712 | Session.commit() | |
|
713 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
714 | ||
|
715 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
716 | {u'group1': u'group.read'}) | |
|
717 | ||
|
718 | # set default permission to none | |
|
719 | ReposGroupModel().grant_user_permission(repos_group=self.g1, | |
|
720 | user=self.anon, | |
|
721 | perm='group.none') | |
|
722 | # make group | |
|
723 | self.ug1 = UsersGroupModel().create('G1') | |
|
724 | # add user to group | |
|
725 | UsersGroupModel().add_user_to_group(self.ug1, self.u1) | |
|
726 | Session.commit() | |
|
727 | ||
|
728 | # check if user is in the group | |
|
729 | membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members] | |
|
730 | self.assertEqual(membrs, [self.u1.user_id]) | |
|
731 | # add some user to that group | |
|
732 | ||
|
733 | # check his permissions | |
|
734 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
735 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
736 | {u'group1': u'group.none'}) | |
|
737 | ||
|
738 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
739 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
740 | {u'group1': u'group.none'}) | |
|
741 | ||
|
742 | # grant ug1 read permissions for | |
|
743 | ReposGroupModel().grant_users_group_permission(repos_group=self.g1, | |
|
744 | group_name=self.ug1, | |
|
745 | perm='group.read') | |
|
746 | Session.commit() | |
|
747 | # check if the | |
|
748 | obj = Session.query(UsersGroupRepoGroupToPerm)\ | |
|
749 | .filter(UsersGroupRepoGroupToPerm.group == self.g1)\ | |
|
750 | .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\ | |
|
751 | .scalar() | |
|
752 | self.assertEqual(obj.permission.permission_name, 'group.read') | |
|
753 | ||
|
754 | a1_auth = AuthUser(user_id=self.anon.user_id) | |
|
755 | ||
|
756 | self.assertEqual(a1_auth.permissions['repositories_groups'], | |
|
757 | {u'group1': u'group.none'}) | |
|
758 | ||
|
759 | u1_auth = AuthUser(user_id=self.u1.user_id) | |
|
760 | self.assertEqual(u1_auth.permissions['repositories_groups'], | |
|
761 | {u'group1': u'group.read'}) |
|
1 | NO CONTENT: file renamed from rhodecode/tests/mem_watch to rhodecode/tests/scripts/mem_watch |
@@ -1,211 +1,213 | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | """ |
|
3 | 3 | rhodecode.tests.test_hg_operations |
|
4 | 4 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
5 | 5 | |
|
6 | 6 | Test suite for making push/pull operations |
|
7 | 7 | |
|
8 | 8 | :created_on: Dec 30, 2010 |
|
9 | 9 | :author: marcink |
|
10 | 10 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> |
|
11 | 11 | :license: GPLv3, see COPYING for more details. |
|
12 | 12 | """ |
|
13 | 13 | # This program is free software: you can redistribute it and/or modify |
|
14 | 14 | # it under the terms of the GNU General Public License as published by |
|
15 | 15 | # the Free Software Foundation, either version 3 of the License, or |
|
16 | 16 | # (at your option) any later version. |
|
17 | 17 | # |
|
18 | 18 | # This program is distributed in the hope that it will be useful, |
|
19 | 19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
20 | 20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
21 | 21 | # GNU General Public License for more details. |
|
22 | 22 | # |
|
23 | 23 | # You should have received a copy of the GNU General Public License |
|
24 | 24 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
25 | 25 | |
|
26 | 26 | import os |
|
27 | 27 | import sys |
|
28 | 28 | import shutil |
|
29 | 29 | import logging |
|
30 | 30 | from os.path import join as jn |
|
31 | 31 | from os.path import dirname as dn |
|
32 | 32 | |
|
33 | 33 | from tempfile import _RandomNameSequence |
|
34 | 34 | from subprocess import Popen, PIPE |
|
35 | 35 | |
|
36 | 36 | from paste.deploy import appconfig |
|
37 | 37 | from pylons import config |
|
38 | 38 | from sqlalchemy import engine_from_config |
|
39 | 39 | |
|
40 | 40 | from rhodecode.lib.utils import add_cache |
|
41 | 41 | from rhodecode.model import init_model |
|
42 | 42 | from rhodecode.model import meta |
|
43 | 43 | from rhodecode.model.db import User, Repository |
|
44 | 44 | from rhodecode.lib.auth import get_crypt_password |
|
45 | 45 | |
|
46 | 46 | from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO |
|
47 | 47 | from rhodecode.config.environment import load_environment |
|
48 | 48 | |
|
49 | 49 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) |
|
50 | 50 | conf = appconfig('config:development.ini', relative_to=rel_path) |
|
51 | 51 | load_environment(conf.global_conf, conf.local_conf) |
|
52 | 52 | |
|
53 | 53 | add_cache(conf) |
|
54 | 54 | |
|
55 | 55 | USER = 'test_admin' |
|
56 | 56 | PASS = 'test12' |
|
57 | 57 | HOST = 'hg.local' |
|
58 | 58 | METHOD = 'pull' |
|
59 | 59 | DEBUG = True |
|
60 | 60 | log = logging.getLogger(__name__) |
|
61 | 61 | |
|
62 | 62 | |
|
63 | 63 | class Command(object): |
|
64 | 64 | |
|
65 | 65 | def __init__(self, cwd): |
|
66 | 66 | self.cwd = cwd |
|
67 | 67 | |
|
68 | 68 | def execute(self, cmd, *args): |
|
69 | 69 | """Runs command on the system with given ``args``. |
|
70 | 70 | """ |
|
71 | 71 | |
|
72 | 72 | command = cmd + ' ' + ' '.join(args) |
|
73 | 73 | log.debug('Executing %s' % command) |
|
74 | 74 | if DEBUG: |
|
75 | 75 | print command |
|
76 | 76 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) |
|
77 | 77 | stdout, stderr = p.communicate() |
|
78 | 78 | if DEBUG: |
|
79 | 79 | print stdout, stderr |
|
80 | 80 | return stdout, stderr |
|
81 | 81 | |
|
82 | ||
|
82 | 83 | def get_session(): |
|
83 | 84 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
84 | 85 | init_model(engine) |
|
85 | 86 | sa = meta.Session |
|
86 | 87 | return sa |
|
87 | 88 | |
|
88 | 89 | |
|
89 | 90 | def create_test_user(force=True): |
|
90 | 91 | print 'creating test user' |
|
91 | 92 | sa = get_session() |
|
92 | 93 | |
|
93 | 94 | user = sa.query(User).filter(User.username == USER).scalar() |
|
94 | 95 | |
|
95 | 96 | if force and user is not None: |
|
96 | 97 | print 'removing current user' |
|
97 | 98 | for repo in sa.query(Repository).filter(Repository.user == user).all(): |
|
98 | 99 | sa.delete(repo) |
|
99 | 100 | sa.delete(user) |
|
100 | 101 | sa.commit() |
|
101 | 102 | |
|
102 | 103 | if user is None or force: |
|
103 | 104 | print 'creating new one' |
|
104 | 105 | new_usr = User() |
|
105 | 106 | new_usr.username = USER |
|
106 | 107 | new_usr.password = get_crypt_password(PASS) |
|
107 | 108 | new_usr.email = 'mail@mail.com' |
|
108 | 109 | new_usr.name = 'test' |
|
109 | 110 | new_usr.lastname = 'lasttestname' |
|
110 | 111 | new_usr.active = True |
|
111 | 112 | new_usr.admin = True |
|
112 | 113 | sa.add(new_usr) |
|
113 | 114 | sa.commit() |
|
114 | 115 | |
|
115 | 116 | print 'done' |
|
116 | 117 | |
|
117 | 118 | |
|
118 | 119 | def create_test_repo(force=True): |
|
119 | 120 | print 'creating test repo' |
|
120 | 121 | from rhodecode.model.repo import RepoModel |
|
121 | 122 | sa = get_session() |
|
122 | 123 | |
|
123 | 124 | user = sa.query(User).filter(User.username == USER).scalar() |
|
124 | 125 | if user is None: |
|
125 | 126 | raise Exception('user not found') |
|
126 | 127 | |
|
127 | ||
|
128 | 128 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
129 | 129 | |
|
130 | 130 | if repo is None: |
|
131 | 131 | print 'repo not found creating' |
|
132 | 132 | |
|
133 | 133 | form_data = {'repo_name':HG_REPO, |
|
134 | 134 | 'repo_type':'hg', |
|
135 | 135 | 'private':False, |
|
136 | 136 | 'clone_uri':'' } |
|
137 | 137 | rm = RepoModel(sa) |
|
138 | 138 | rm.base_path = '/home/hg' |
|
139 | 139 | rm.create(form_data, user) |
|
140 | 140 | |
|
141 | 141 | print 'done' |
|
142 | 142 | |
|
143 | ||
|
143 | 144 | def set_anonymous_access(enable=True): |
|
144 | 145 | sa = get_session() |
|
145 | 146 | user = sa.query(User).filter(User.username == 'default').one() |
|
146 | 147 | user.active = enable |
|
147 | 148 | sa.add(user) |
|
148 | 149 | sa.commit() |
|
149 | 150 | |
|
151 | ||
|
150 | 152 | def get_anonymous_access(): |
|
151 | 153 | sa = get_session() |
|
152 | 154 | return sa.query(User).filter(User.username == 'default').one().active |
|
153 | 155 | |
|
154 | 156 | |
|
155 | 157 | #============================================================================== |
|
156 | 158 | # TESTS |
|
157 | 159 | #============================================================================== |
|
158 | 160 | def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD, |
|
159 | 161 | seq=None): |
|
160 | 162 | cwd = path = jn(TESTS_TMP_PATH, repo) |
|
161 | 163 | |
|
162 | 164 | if seq == None: |
|
163 | 165 | seq = _RandomNameSequence().next() |
|
164 | 166 | |
|
165 | 167 | try: |
|
166 | 168 | shutil.rmtree(path, ignore_errors=True) |
|
167 | 169 | os.makedirs(path) |
|
168 | 170 | #print 'made dirs %s' % jn(path) |
|
169 | 171 | except OSError: |
|
170 | 172 | raise |
|
171 | 173 | |
|
172 | 174 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
173 | 175 | {'user':USER, |
|
174 | 176 | 'pass':PASS, |
|
175 | 177 | 'host':HOST, |
|
176 | 178 | 'cloned_repo':repo, } |
|
177 | 179 | |
|
178 | 180 | dest = path + seq |
|
179 | 181 | if method == 'pull': |
|
180 | 182 | stdout, stderr = Command(cwd).execute('hg', method, '--cwd', dest, clone_url) |
|
181 | 183 | else: |
|
182 | 184 | stdout, stderr = Command(cwd).execute('hg', method, clone_url, dest) |
|
183 | 185 | |
|
184 | 186 | if no_errors is False: |
|
185 | 187 | assert """adding file changes""" in stdout, 'no messages about cloning' |
|
186 | 188 | assert """abort""" not in stderr , 'got error from clone' |
|
187 | 189 | |
|
188 | 190 | if __name__ == '__main__': |
|
189 | 191 | try: |
|
190 | 192 | create_test_user(force=False) |
|
191 | 193 | seq = None |
|
192 | 194 | import time |
|
193 | 195 | |
|
194 | 196 | try: |
|
195 | 197 | METHOD = sys.argv[3] |
|
196 | 198 | except: |
|
197 | 199 | pass |
|
198 | 200 | |
|
199 | 201 | if METHOD == 'pull': |
|
200 | 202 | seq = _RandomNameSequence().next() |
|
201 | 203 | test_clone_with_credentials(repo=sys.argv[1], method='clone', |
|
202 | 204 | seq=seq) |
|
203 | 205 | s = time.time() |
|
204 | 206 | for i in range(1, int(sys.argv[2]) + 1): |
|
205 | 207 | print 'take', i |
|
206 | 208 | test_clone_with_credentials(repo=sys.argv[1], method=METHOD, |
|
207 | 209 | seq=seq) |
|
208 | 210 | print 'time taken %.3f' % (time.time() - s) |
|
209 | 211 | except Exception, e: |
|
210 | 212 | raise |
|
211 | 213 | sys.exit('stop on %s' % e) |
|
1 | NO CONTENT: file renamed from rhodecode/tests/rhodecode_crawler.py to rhodecode/tests/scripts/test_crawler.py |
@@ -1,401 +1,410 | |||
|
1 | 1 | # -*- coding: utf-8 -*- |
|
2 | 2 | """ |
|
3 | 3 | rhodecode.tests.test_hg_operations |
|
4 | 4 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
5 | 5 | |
|
6 | 6 | Test suite for making push/pull operations |
|
7 | 7 | |
|
8 | 8 | :created_on: Dec 30, 2010 |
|
9 | 9 | :author: marcink |
|
10 | 10 | :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com> |
|
11 | 11 | :license: GPLv3, see COPYING for more details. |
|
12 | 12 | """ |
|
13 | 13 | # This program is free software: you can redistribute it and/or modify |
|
14 | 14 | # it under the terms of the GNU General Public License as published by |
|
15 | 15 | # the Free Software Foundation, either version 3 of the License, or |
|
16 | 16 | # (at your option) any later version. |
|
17 | 17 | # |
|
18 | 18 | # This program is distributed in the hope that it will be useful, |
|
19 | 19 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
20 | 20 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
21 | 21 | # GNU General Public License for more details. |
|
22 | 22 | # |
|
23 | 23 | # You should have received a copy of the GNU General Public License |
|
24 | 24 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
25 | 25 | |
|
26 | 26 | import os |
|
27 | 27 | import time |
|
28 | 28 | import sys |
|
29 | 29 | import shutil |
|
30 | 30 | import logging |
|
31 | 31 | |
|
32 | 32 | from os.path import join as jn |
|
33 | 33 | from os.path import dirname as dn |
|
34 | 34 | |
|
35 | 35 | from tempfile import _RandomNameSequence |
|
36 | 36 | from subprocess import Popen, PIPE |
|
37 | 37 | |
|
38 | 38 | from paste.deploy import appconfig |
|
39 | 39 | from pylons import config |
|
40 | 40 | from sqlalchemy import engine_from_config |
|
41 | 41 | |
|
42 | 42 | from rhodecode.lib.utils import add_cache |
|
43 | 43 | from rhodecode.model import init_model |
|
44 | 44 | from rhodecode.model import meta |
|
45 | 45 | from rhodecode.model.db import User, Repository, UserLog |
|
46 | 46 | from rhodecode.lib.auth import get_crypt_password |
|
47 | 47 | |
|
48 | 48 | from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO |
|
49 | 49 | from rhodecode.config.environment import load_environment |
|
50 | 50 | |
|
51 | 51 | rel_path = dn(dn(dn(os.path.abspath(__file__)))) |
|
52 | 52 | |
|
53 | 53 | conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path) |
|
54 | 54 | load_environment(conf.global_conf, conf.local_conf) |
|
55 | 55 | |
|
56 | 56 | add_cache(conf) |
|
57 | 57 | |
|
58 | 58 | USER = 'test_admin' |
|
59 | 59 | PASS = 'test12' |
|
60 | 60 | HOST = '127.0.0.1:5000' |
|
61 | 61 | DEBUG = False |
|
62 | 62 | print 'DEBUG:', DEBUG |
|
63 | 63 | log = logging.getLogger(__name__) |
|
64 | 64 | |
|
65 | 65 | engine = engine_from_config(conf, 'sqlalchemy.db1.') |
|
66 | 66 | init_model(engine) |
|
67 | sa = meta.Session | |
|
67 | sa = meta.Session() | |
|
68 | ||
|
68 | 69 | |
|
69 | 70 | class Command(object): |
|
70 | 71 | |
|
71 | 72 | def __init__(self, cwd): |
|
72 | 73 | self.cwd = cwd |
|
73 | 74 | |
|
74 | 75 | def execute(self, cmd, *args): |
|
75 | 76 | """Runs command on the system with given ``args``. |
|
76 | 77 | """ |
|
77 | 78 | |
|
78 | 79 | command = cmd + ' ' + ' '.join(args) |
|
79 | 80 | log.debug('Executing %s' % command) |
|
80 | 81 | if DEBUG: |
|
81 | 82 | print command |
|
82 | 83 | p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd) |
|
83 | 84 | stdout, stderr = p.communicate() |
|
84 | 85 | if DEBUG: |
|
85 | 86 | print stdout, stderr |
|
86 | 87 | return stdout, stderr |
|
87 | 88 | |
|
88 | 89 | |
|
89 | 90 | def test_wrapp(func): |
|
90 | 91 | |
|
91 | 92 | def __wrapp(*args, **kwargs): |
|
92 | 93 | print '>>>%s' % func.__name__ |
|
93 | 94 | try: |
|
94 | 95 | res = func(*args, **kwargs) |
|
95 | 96 | except Exception, e: |
|
96 | 97 | print ('###############\n-' |
|
97 | 98 | '--%s failed %s--\n' |
|
98 | 99 | '###############\n' % (func.__name__, e)) |
|
99 | 100 | sys.exit() |
|
100 | 101 | print '++OK++' |
|
101 | 102 | return res |
|
102 | 103 | return __wrapp |
|
103 | 104 | |
|
104 | 105 | |
|
105 | 106 | def create_test_user(force=True): |
|
106 | 107 | print '\tcreating test user' |
|
107 | 108 | |
|
108 | 109 | user = User.get_by_username(USER) |
|
109 | 110 | |
|
110 | 111 | if force and user is not None: |
|
111 | 112 | print '\tremoving current user' |
|
112 | 113 | for repo in Repository.query().filter(Repository.user == user).all(): |
|
113 | 114 | sa.delete(repo) |
|
114 | 115 | sa.delete(user) |
|
115 | 116 | sa.commit() |
|
116 | 117 | |
|
117 | 118 | if user is None or force: |
|
118 | 119 | print '\tcreating new one' |
|
119 | 120 | new_usr = User() |
|
120 | 121 | new_usr.username = USER |
|
121 | 122 | new_usr.password = get_crypt_password(PASS) |
|
122 | 123 | new_usr.email = 'mail@mail.com' |
|
123 | 124 | new_usr.name = 'test' |
|
124 | 125 | new_usr.lastname = 'lasttestname' |
|
125 | 126 | new_usr.active = True |
|
126 | 127 | new_usr.admin = True |
|
127 | 128 | sa.add(new_usr) |
|
128 | 129 | sa.commit() |
|
129 | 130 | |
|
130 | 131 | print '\tdone' |
|
131 | 132 | |
|
132 | 133 | |
|
133 | 134 | def create_test_repo(force=True): |
|
134 | 135 | from rhodecode.model.repo import RepoModel |
|
135 | 136 | |
|
136 | 137 | user = User.get_by_username(USER) |
|
137 | 138 | if user is None: |
|
138 | 139 | raise Exception('user not found') |
|
139 | 140 | |
|
140 | ||
|
141 | 141 | repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() |
|
142 | 142 | |
|
143 | 143 | if repo is None: |
|
144 | 144 | print '\trepo not found creating' |
|
145 | 145 | |
|
146 | 146 | form_data = {'repo_name':HG_REPO, |
|
147 | 147 | 'repo_type':'hg', |
|
148 | 148 | 'private':False, |
|
149 | 149 | 'clone_uri':'' } |
|
150 | 150 | rm = RepoModel(sa) |
|
151 | 151 | rm.base_path = '/home/hg' |
|
152 | 152 | rm.create(form_data, user) |
|
153 | 153 | |
|
154 | 154 | |
|
155 | 155 | def set_anonymous_access(enable=True): |
|
156 | 156 | user = User.get_by_username('default') |
|
157 | 157 | user.active = enable |
|
158 | 158 | sa.add(user) |
|
159 | 159 | sa.commit() |
|
160 | 160 | print '\tanonymous access is now:', enable |
|
161 | 161 | if enable != User.get_by_username('default').active: |
|
162 | 162 | raise Exception('Cannot set anonymous access') |
|
163 | 163 | |
|
164 | ||
|
164 | 165 | def get_anonymous_access(): |
|
165 | 166 | user = User.get_by_username('default') |
|
166 | 167 | return user.active |
|
167 | 168 | |
|
168 | 169 | |
|
169 | 170 | #============================================================================== |
|
170 | 171 | # TESTS |
|
171 | 172 | #============================================================================== |
|
172 | 173 | @test_wrapp |
|
173 | 174 | def test_clone_with_credentials(no_errors=False): |
|
174 | 175 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
175 | 176 | |
|
176 | 177 | try: |
|
177 | 178 | shutil.rmtree(path, ignore_errors=True) |
|
178 | 179 | os.makedirs(path) |
|
179 | 180 | #print 'made dirs %s' % jn(path) |
|
180 | 181 | except OSError: |
|
181 | 182 | raise |
|
182 | 183 | |
|
183 | 184 | print '\tchecking if anonymous access is enabled' |
|
184 | 185 | anonymous_access = get_anonymous_access() |
|
185 | 186 | if anonymous_access: |
|
186 | 187 | print '\tenabled, disabling it ' |
|
187 | 188 | set_anonymous_access(enable=False) |
|
188 | 189 | |
|
189 | 190 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ |
|
190 | 191 | {'user':USER, |
|
191 | 192 | 'pass':PASS, |
|
192 | 193 | 'host':HOST, |
|
193 | 194 | 'cloned_repo':HG_REPO, |
|
194 | 195 | 'dest':path} |
|
195 | 196 | |
|
196 | 197 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
|
197 | 198 | |
|
198 | 199 | if no_errors is False: |
|
199 | 200 | assert """adding file changes""" in stdout, 'no messages about cloning' |
|
200 | 201 | assert """abort""" not in stderr , 'got error from clone' |
|
201 | 202 | |
|
202 | 203 | |
|
203 | 204 | @test_wrapp |
|
204 | 205 | def test_clone_anonymous(): |
|
205 | 206 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
206 | 207 | |
|
207 | 208 | try: |
|
208 | 209 | shutil.rmtree(path, ignore_errors=True) |
|
209 | 210 | os.makedirs(path) |
|
210 | 211 | #print 'made dirs %s' % jn(path) |
|
211 | 212 | except OSError: |
|
212 | 213 | raise |
|
213 | 214 | |
|
214 | 215 | |
|
215 | 216 | print '\tchecking if anonymous access is enabled' |
|
216 | 217 | anonymous_access = get_anonymous_access() |
|
217 | 218 | if not anonymous_access: |
|
218 | 219 | print '\tnot enabled, enabling it ' |
|
219 | 220 | set_anonymous_access(enable=True) |
|
220 | 221 | |
|
221 | 222 | clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ |
|
222 | 223 | {'user':USER, |
|
223 | 224 | 'pass':PASS, |
|
224 | 225 | 'host':HOST, |
|
225 | 226 | 'cloned_repo':HG_REPO, |
|
226 | 227 | 'dest':path} |
|
227 | 228 | |
|
228 | 229 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
|
229 | 230 | |
|
230 | 231 | assert """adding file changes""" in stdout, 'no messages about cloning' |
|
231 | 232 | assert """abort""" not in stderr , 'got error from clone' |
|
232 | 233 | |
|
233 | 234 | #disable if it was enabled |
|
234 | 235 | if not anonymous_access: |
|
235 | 236 | print '\tdisabling anonymous access' |
|
236 | 237 | set_anonymous_access(enable=False) |
|
237 | 238 | |
|
239 | ||
|
238 | 240 | @test_wrapp |
|
239 | 241 | def test_clone_wrong_credentials(): |
|
240 | 242 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
241 | 243 | |
|
242 | 244 | try: |
|
243 | 245 | shutil.rmtree(path, ignore_errors=True) |
|
244 | 246 | os.makedirs(path) |
|
245 | 247 | #print 'made dirs %s' % jn(path) |
|
246 | 248 | except OSError: |
|
247 | 249 | raise |
|
248 | 250 | |
|
249 | 251 | print '\tchecking if anonymous access is enabled' |
|
250 | 252 | anonymous_access = get_anonymous_access() |
|
251 | 253 | if anonymous_access: |
|
252 | 254 | print '\tenabled, disabling it ' |
|
253 | 255 | set_anonymous_access(enable=False) |
|
254 | 256 | |
|
255 | 257 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ |
|
256 | 258 | {'user':USER + 'error', |
|
257 | 259 | 'pass':PASS, |
|
258 | 260 | 'host':HOST, |
|
259 | 261 | 'cloned_repo':HG_REPO, |
|
260 | 262 | 'dest':path} |
|
261 | 263 | |
|
262 | 264 | stdout, stderr = Command(cwd).execute('hg clone', clone_url) |
|
263 | 265 | |
|
264 | 266 | if not """abort: authorization failed""" in stderr: |
|
265 | 267 | raise Exception('Failure') |
|
266 | 268 | |
|
269 | ||
|
267 | 270 | @test_wrapp |
|
268 | 271 | def test_pull(): |
|
269 | 272 | pass |
|
270 | 273 | |
|
274 | ||
|
271 | 275 | @test_wrapp |
|
272 | 276 | def test_push_modify_file(f_name='setup.py'): |
|
273 | 277 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
274 | 278 | modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) |
|
275 | 279 | for i in xrange(5): |
|
276 | 280 | cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) |
|
277 | 281 | Command(cwd).execute(cmd) |
|
278 | 282 | |
|
279 | 283 | cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file) |
|
280 | 284 | Command(cwd).execute(cmd) |
|
281 | 285 | |
|
282 | 286 | Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) |
|
283 | 287 | |
|
288 | ||
|
284 | 289 | @test_wrapp |
|
285 | 290 | def test_push_new_file(commits=15, with_clone=True): |
|
286 | 291 | |
|
287 | 292 | if with_clone: |
|
288 | 293 | test_clone_with_credentials(no_errors=True) |
|
289 | 294 | |
|
290 | 295 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
291 | 296 | added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) |
|
292 | 297 | |
|
293 | 298 | Command(cwd).execute('touch %s' % added_file) |
|
294 | 299 | |
|
295 | 300 | Command(cwd).execute('hg add %s' % added_file) |
|
296 | 301 | |
|
297 | 302 | for i in xrange(commits): |
|
298 | 303 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) |
|
299 | 304 | Command(cwd).execute(cmd) |
|
300 | 305 | |
|
301 | 306 | cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i, |
|
302 | 307 | 'Marcin Kuźminski <marcin@python-blog.com>', |
|
303 | 308 | added_file) |
|
304 | 309 | Command(cwd).execute(cmd) |
|
305 | 310 | |
|
306 | 311 | push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
307 | 312 | {'user':USER, |
|
308 | 313 | 'pass':PASS, |
|
309 | 314 | 'host':HOST, |
|
310 | 315 | 'cloned_repo':HG_REPO, |
|
311 | 316 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
|
312 | 317 | |
|
313 | 318 | Command(cwd).execute('hg push --verbose --debug %s' % push_url) |
|
314 | 319 | |
|
320 | ||
|
315 | 321 | @test_wrapp |
|
316 | 322 | def test_push_wrong_credentials(): |
|
317 | 323 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
318 | 324 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
319 | 325 | {'user':USER + 'xxx', |
|
320 | 326 | 'pass':PASS, |
|
321 | 327 | 'host':HOST, |
|
322 | 328 | 'cloned_repo':HG_REPO, |
|
323 | 329 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
|
324 | 330 | |
|
325 | 331 | modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py') |
|
326 | 332 | for i in xrange(5): |
|
327 | 333 | cmd = """echo 'added_line%s' >> %s""" % (i, modified_file) |
|
328 | 334 | Command(cwd).execute(cmd) |
|
329 | 335 | |
|
330 | 336 | cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file) |
|
331 | 337 | Command(cwd).execute(cmd) |
|
332 | 338 | |
|
333 | 339 | Command(cwd).execute('hg push %s' % clone_url) |
|
334 | 340 | |
|
341 | ||
|
335 | 342 | @test_wrapp |
|
336 | 343 | def test_push_wrong_path(): |
|
337 | 344 | cwd = path = jn(TESTS_TMP_PATH, HG_REPO) |
|
338 | 345 | added_file = jn(path, 'somefile.py') |
|
339 | 346 | |
|
340 | 347 | try: |
|
341 | 348 | shutil.rmtree(path, ignore_errors=True) |
|
342 | 349 | os.makedirs(path) |
|
343 | 350 | print '\tmade dirs %s' % jn(path) |
|
344 | 351 | except OSError: |
|
345 | 352 | raise |
|
346 | 353 | |
|
347 | 354 | Command(cwd).execute("""echo '' > %s""" % added_file) |
|
348 | 355 | Command(cwd).execute("""hg init %s""" % path) |
|
349 | 356 | Command(cwd).execute("""hg add %s""" % added_file) |
|
350 | 357 | |
|
351 | 358 | for i in xrange(2): |
|
352 | 359 | cmd = """echo 'added_line%s' >> %s""" % (i, added_file) |
|
353 | 360 | Command(cwd).execute(cmd) |
|
354 | 361 | |
|
355 | 362 | cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file) |
|
356 | 363 | Command(cwd).execute(cmd) |
|
357 | 364 | |
|
358 | 365 | clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ |
|
359 | 366 | {'user':USER, |
|
360 | 367 | 'pass':PASS, |
|
361 | 368 | 'host':HOST, |
|
362 | 369 | 'cloned_repo':HG_REPO + '_error', |
|
363 | 370 | 'dest':jn(TESTS_TMP_PATH, HG_REPO)} |
|
364 | 371 | |
|
365 | 372 | stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) |
|
366 | 373 | if not """abort: HTTP Error 403: Forbidden""" in stderr: |
|
367 | 374 | raise Exception('Failure') |
|
368 | 375 | |
|
376 | ||
|
369 | 377 | @test_wrapp |
|
370 | 378 | def get_logs(): |
|
371 | 379 | return UserLog.query().all() |
|
372 | 380 | |
|
381 | ||
|
373 | 382 | @test_wrapp |
|
374 | 383 | def test_logs(initial): |
|
375 | 384 | logs = UserLog.query().all() |
|
376 | 385 | operations = 4 |
|
377 | 386 | if len(initial) + operations != len(logs): |
|
378 | 387 | raise Exception("missing number of logs initial:%s vs current:%s" % \ |
|
379 | 388 | (len(initial), len(logs))) |
|
380 | 389 | |
|
381 | 390 | |
|
382 | 391 | if __name__ == '__main__': |
|
383 | 392 | create_test_user(force=False) |
|
384 | 393 | create_test_repo() |
|
385 | 394 | |
|
386 | 395 | initial_logs = get_logs() |
|
387 | 396 | print 'initial activity logs: %s' % len(initial_logs) |
|
388 | 397 | s = time.time() |
|
389 | 398 | #test_push_modify_file() |
|
390 | 399 | test_clone_with_credentials() |
|
391 | 400 | test_clone_wrong_credentials() |
|
392 | 401 | |
|
393 | 402 | test_push_new_file(commits=2, with_clone=True) |
|
394 | 403 | |
|
395 | 404 | test_clone_anonymous() |
|
396 | 405 | test_push_wrong_path() |
|
397 | 406 | |
|
398 | 407 | test_push_wrong_credentials() |
|
399 | 408 | |
|
400 | 409 | test_logs(initial_logs) |
|
401 | 410 | print 'finished ok in %.3f' % (time.time() - s) |
@@ -1,56 +1,56 | |||
|
1 | 1 | """ |
|
2 | 2 | Unit tests for vcs_ library. |
|
3 | 3 | |
|
4 | 4 | In order to run tests we need to prepare our environment first. Tests would be |
|
5 | 5 | run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from |
|
6 | 6 | ``vcs.backends.BACKENDS``. |
|
7 | 7 | |
|
8 | 8 | For each SCM we run tests for, we need some repository. We would use |
|
9 | 9 | repositories location from system environment variables or test suite defaults |
|
10 | 10 | - see ``conf`` module for more detail. We simply try to check if repository at |
|
11 | 11 | certain location exists, if not we would try to fetch them. At ``test_vcs`` or |
|
12 | 12 | ``test_common`` we run unit tests common for each repository type and for |
|
13 | 13 | example specific mercurial tests are located at ``test_hg`` module. |
|
14 | 14 | |
|
15 | 15 | Oh, and tests are run with ``unittest.collector`` wrapped by ``collector`` |
|
16 | 16 | function at ``tests/__init__.py``. |
|
17 | 17 | |
|
18 | 18 | .. _vcs: http://bitbucket.org/marcinkuzminski/vcs |
|
19 | 19 | .. _unittest: http://pypi.python.org/pypi/unittest |
|
20 | 20 | |
|
21 | 21 | """ |
|
22 | 22 | import os |
|
23 | 23 | from rhodecode.lib import vcs |
|
24 | 24 | from rhodecode.lib.vcs.utils.compat import unittest |
|
25 | from utils import VCSTestError, SCMFetcher | |
|
25 | 26 | from rhodecode.tests import * |
|
26 | from utils import VCSTestError, SCMFetcher | |
|
27 | 27 | |
|
28 | 28 | |
|
29 | 29 | def setup_package(): |
|
30 | 30 | """ |
|
31 | 31 | Prepares whole package for tests which mainly means it would try to fetch |
|
32 | 32 | test repositories or use already existing ones. |
|
33 | 33 | """ |
|
34 | 34 | fetchers = { |
|
35 | 35 | 'hg': { |
|
36 | 36 | 'alias': 'hg', |
|
37 | 37 | 'test_repo_path': TEST_HG_REPO, |
|
38 | 38 | 'remote_repo': HG_REMOTE_REPO, |
|
39 | 39 | 'clone_cmd': 'hg clone', |
|
40 | 40 | }, |
|
41 | 41 | 'git': { |
|
42 | 42 | 'alias': 'git', |
|
43 | 43 | 'test_repo_path': TEST_GIT_REPO, |
|
44 | 44 | 'remote_repo': GIT_REMOTE_REPO, |
|
45 | 45 | 'clone_cmd': 'git clone --bare', |
|
46 | 46 | }, |
|
47 | 47 | } |
|
48 | 48 | try: |
|
49 | 49 | for scm, fetcher_info in fetchers.items(): |
|
50 | 50 | fetcher = SCMFetcher(**fetcher_info) |
|
51 | 51 | fetcher.setup() |
|
52 | 52 | except VCSTestError, err: |
|
53 | 53 | raise RuntimeError(str(err)) |
|
54 | 54 | |
|
55 | 55 | #start_dir = os.path.abspath(os.path.dirname(__file__)) |
|
56 | 56 | #unittest.defaultTestLoader.discover(start_dir) |
General Comments 0
You need to be logged in to leave comments.
Login now