##// END OF EJS Templates
tests: moved tests of admin user auth tokens into pyramid apps.
marcink -
r1519:897366ac default
parent child Browse files
Show More
1 NO CONTENT: new file 100644
@@ -0,0 +1,114 b''
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
21 import pytest
22
23 from rhodecode.model.db import User, UserApiKeys
24
25 from rhodecode.apps._base import ADMIN_PREFIX
26 from rhodecode.tests import (
27 TestController, TEST_USER_REGULAR_LOGIN, assert_session_flash)
28 from rhodecode.tests.fixture import Fixture
29 from rhodecode.tests.utils import AssertResponse
30
31 fixture = Fixture()
32
33
34
35 def route_path(name, **kwargs):
36 return {
37 'users':
38 ADMIN_PREFIX + '/users',
39 'users_data':
40 ADMIN_PREFIX + '/users_data',
41 'edit_user_auth_tokens':
42 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens',
43 'edit_user_auth_tokens_add':
44 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/new',
45 'edit_user_auth_tokens_delete':
46 ADMIN_PREFIX + '/users/{user_id}/edit/auth_tokens/delete',
47 }[name].format(**kwargs)
48
49
50 class TestAdminUsersView(TestController):
51
52 def test_auth_tokens_default_user(self):
53 self.log_user()
54 user = User.get_default_user()
55 response = self.app.get(
56 route_path('edit_user_auth_tokens', user_id=user.user_id),
57 status=302)
58
59 def test_auth_tokens(self):
60 self.log_user()
61
62 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
63 response = self.app.get(
64 route_path('edit_user_auth_tokens', user_id=user.user_id))
65 for token in user.auth_tokens:
66 response.mustcontain(token)
67 response.mustcontain('never')
68
69 @pytest.mark.parametrize("desc, lifetime", [
70 ('forever', -1),
71 ('5mins', 60*5),
72 ('30days', 60*60*24*30),
73 ])
74 def test_add_auth_token(self, desc, lifetime, user_util):
75 self.log_user()
76 user = user_util.create_user()
77 user_id = user.user_id
78
79 response = self.app.post(
80 route_path('edit_user_auth_tokens_add', user_id=user_id),
81 {'description': desc, 'lifetime': lifetime,
82 'csrf_token': self.csrf_token})
83 assert_session_flash(response, 'Auth token successfully created')
84
85 response = response.follow()
86 user = User.get(user_id)
87 for auth_token in user.auth_tokens:
88 response.mustcontain(auth_token)
89
90 def test_delete_auth_token(self, user_util):
91 self.log_user()
92 user = user_util.create_user()
93 user_id = user.user_id
94 keys = user.extra_auth_tokens
95 assert 2 == len(keys)
96
97 response = self.app.post(
98 route_path('edit_user_auth_tokens_add', user_id=user_id),
99 {'description': 'desc', 'lifetime': -1,
100 'csrf_token': self.csrf_token})
101 assert_session_flash(response, 'Auth token successfully created')
102 response.follow()
103
104 # now delete our key
105 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
106 assert 3 == len(keys)
107
108 response = self.app.post(
109 route_path('edit_user_auth_tokens_delete', user_id=user_id),
110 {'del_auth_token': keys[0].api_key, 'csrf_token': self.csrf_token})
111
112 assert_session_flash(response, 'Auth token successfully deleted')
113 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
114 assert 2 == len(keys)
@@ -1,620 +1,561 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 from sqlalchemy.orm.exc import NoResultFound
23 23
24 24 from rhodecode.lib.auth import check_password
25 25 from rhodecode.lib import helpers as h
26 26 from rhodecode.model import validators
27 27 from rhodecode.model.db import User, UserIpMap, UserApiKeys
28 28 from rhodecode.model.meta import Session
29 29 from rhodecode.model.user import UserModel
30 30 from rhodecode.tests import (
31 31 TestController, url, link_to, TEST_USER_ADMIN_LOGIN,
32 32 TEST_USER_REGULAR_LOGIN, assert_session_flash)
33 33 from rhodecode.tests.fixture import Fixture
34 34 from rhodecode.tests.utils import AssertResponse
35 35
36 36 fixture = Fixture()
37 37
38 38
39 39 class TestAdminUsersController(TestController):
40 40 test_user_1 = 'testme'
41 41 destroy_users = set()
42 42
43 43 @classmethod
44 44 def teardown_method(cls, method):
45 45 fixture.destroy_users(cls.destroy_users)
46 46
47 def test_index(self):
48 self.log_user()
49 self.app.get(url('users'))
50
51 47 def test_create(self):
52 48 self.log_user()
53 49 username = 'newtestuser'
54 50 password = 'test12'
55 51 password_confirmation = password
56 52 name = 'name'
57 53 lastname = 'lastname'
58 54 email = 'mail@mail.com'
59 55
60 56 response = self.app.get(url('new_user'))
61 57
62 58 response = self.app.post(url('users'), params={
63 59 'username': username,
64 60 'password': password,
65 61 'password_confirmation': password_confirmation,
66 62 'firstname': name,
67 63 'active': True,
68 64 'lastname': lastname,
69 65 'extern_name': 'rhodecode',
70 66 'extern_type': 'rhodecode',
71 67 'email': email,
72 68 'csrf_token': self.csrf_token,
73 69 })
74 70 user_link = link_to(
75 71 username,
76 72 url('edit_user', user_id=User.get_by_username(username).user_id))
77 73 assert_session_flash(response, 'Created user %s' % (user_link,))
78 74 self.destroy_users.add(username)
79 75
80 76 new_user = User.query().filter(User.username == username).one()
81 77
82 78 assert new_user.username == username
83 79 assert check_password(password, new_user.password)
84 80 assert new_user.name == name
85 81 assert new_user.lastname == lastname
86 82 assert new_user.email == email
87 83
88 84 response.follow()
89 85 response = response.follow()
90 86 response.mustcontain(username)
91 87
92 88 def test_create_err(self):
93 89 self.log_user()
94 90 username = 'new_user'
95 91 password = ''
96 92 name = 'name'
97 93 lastname = 'lastname'
98 94 email = 'errmail.com'
99 95
100 96 response = self.app.get(url('new_user'))
101 97
102 98 response = self.app.post(url('users'), params={
103 99 'username': username,
104 100 'password': password,
105 101 'name': name,
106 102 'active': False,
107 103 'lastname': lastname,
108 104 'email': email,
109 105 'csrf_token': self.csrf_token,
110 106 })
111 107
112 108 msg = validators.ValidUsername(
113 109 False, {})._messages['system_invalid_username']
114 110 msg = h.html_escape(msg % {'username': 'new_user'})
115 111 response.mustcontain('<span class="error-message">%s</span>' % msg)
116 112 response.mustcontain(
117 113 '<span class="error-message">Please enter a value</span>')
118 114 response.mustcontain(
119 115 '<span class="error-message">An email address must contain a'
120 116 ' single @</span>')
121 117
122 118 def get_user():
123 119 Session().query(User).filter(User.username == username).one()
124 120
125 121 with pytest.raises(NoResultFound):
126 122 get_user()
127 123
128 124 def test_new(self):
129 125 self.log_user()
130 126 self.app.get(url('new_user'))
131 127
132 128 @pytest.mark.parametrize("name, attrs", [
133 129 ('firstname', {'firstname': 'new_username'}),
134 130 ('lastname', {'lastname': 'new_username'}),
135 131 ('admin', {'admin': True}),
136 132 ('admin', {'admin': False}),
137 133 ('extern_type', {'extern_type': 'ldap'}),
138 134 ('extern_type', {'extern_type': None}),
139 135 ('extern_name', {'extern_name': 'test'}),
140 136 ('extern_name', {'extern_name': None}),
141 137 ('active', {'active': False}),
142 138 ('active', {'active': True}),
143 139 ('email', {'email': 'some@email.com'}),
144 140 ('language', {'language': 'de'}),
145 141 ('language', {'language': 'en'}),
146 142 # ('new_password', {'new_password': 'foobar123',
147 143 # 'password_confirmation': 'foobar123'})
148 144 ])
149 145 def test_update(self, name, attrs):
150 146 self.log_user()
151 147 usr = fixture.create_user(self.test_user_1, password='qweqwe',
152 148 email='testme@rhodecode.org',
153 149 extern_type='rhodecode',
154 150 extern_name=self.test_user_1,
155 151 skip_if_exists=True)
156 152 Session().commit()
157 153 self.destroy_users.add(self.test_user_1)
158 154 params = usr.get_api_data()
159 155 cur_lang = params['language'] or 'en'
160 156 params.update({
161 157 'password_confirmation': '',
162 158 'new_password': '',
163 159 'language': cur_lang,
164 160 '_method': 'put',
165 161 'csrf_token': self.csrf_token,
166 162 })
167 163 params.update({'new_password': ''})
168 164 params.update(attrs)
169 165 if name == 'email':
170 166 params['emails'] = [attrs['email']]
171 167 elif name == 'extern_type':
172 168 # cannot update this via form, expected value is original one
173 169 params['extern_type'] = "rhodecode"
174 170 elif name == 'extern_name':
175 171 # cannot update this via form, expected value is original one
176 172 params['extern_name'] = self.test_user_1
177 173 # special case since this user is not
178 174 # logged in yet his data is not filled
179 175 # so we use creation data
180 176
181 177 response = self.app.post(url('user', user_id=usr.user_id), params)
182 178 assert response.status_int == 302
183 179 assert_session_flash(response, 'User updated successfully')
184 180
185 181 updated_user = User.get_by_username(self.test_user_1)
186 182 updated_params = updated_user.get_api_data()
187 183 updated_params.update({'password_confirmation': ''})
188 184 updated_params.update({'new_password': ''})
189 185
190 186 del params['_method']
191 187 del params['csrf_token']
192 188 assert params == updated_params
193 189
194 190 def test_update_and_migrate_password(
195 191 self, autologin_user, real_crypto_backend):
196 192 from rhodecode.lib import auth
197 193
198 194 # create new user, with sha256 password
199 195 temp_user = 'test_admin_sha256'
200 196 user = fixture.create_user(temp_user)
201 197 user.password = auth._RhodeCodeCryptoSha256().hash_create(
202 198 b'test123')
203 199 Session().add(user)
204 200 Session().commit()
205 201 self.destroy_users.add('test_admin_sha256')
206 202
207 203 params = user.get_api_data()
208 204
209 205 params.update({
210 206 'password_confirmation': 'qweqwe123',
211 207 'new_password': 'qweqwe123',
212 208 'language': 'en',
213 209 '_method': 'put',
214 210 'csrf_token': autologin_user.csrf_token,
215 211 })
216 212
217 213 response = self.app.post(url('user', user_id=user.user_id), params)
218 214 assert response.status_int == 302
219 215 assert_session_flash(response, 'User updated successfully')
220 216
221 217 # new password should be bcrypted, after log-in and transfer
222 218 user = User.get_by_username(temp_user)
223 219 assert user.password.startswith('$')
224 220
225 221 updated_user = User.get_by_username(temp_user)
226 222 updated_params = updated_user.get_api_data()
227 223 updated_params.update({'password_confirmation': 'qweqwe123'})
228 224 updated_params.update({'new_password': 'qweqwe123'})
229 225
230 226 del params['_method']
231 227 del params['csrf_token']
232 228 assert params == updated_params
233 229
234 230 def test_delete(self):
235 231 self.log_user()
236 232 username = 'newtestuserdeleteme'
237 233
238 234 fixture.create_user(name=username)
239 235
240 236 new_user = Session().query(User)\
241 237 .filter(User.username == username).one()
242 238 response = self.app.post(url('user', user_id=new_user.user_id),
243 239 params={'_method': 'delete',
244 240 'csrf_token': self.csrf_token})
245 241
246 242 assert_session_flash(response, 'Successfully deleted user')
247 243
248 244 def test_delete_owner_of_repository(self):
249 245 self.log_user()
250 246 username = 'newtestuserdeleteme_repo_owner'
251 247 obj_name = 'test_repo'
252 248 usr = fixture.create_user(name=username)
253 249 self.destroy_users.add(username)
254 250 fixture.create_repo(obj_name, cur_user=usr.username)
255 251
256 252 new_user = Session().query(User)\
257 253 .filter(User.username == username).one()
258 254 response = self.app.post(url('user', user_id=new_user.user_id),
259 255 params={'_method': 'delete',
260 256 'csrf_token': self.csrf_token})
261 257
262 258 msg = 'user "%s" still owns 1 repositories and cannot be removed. ' \
263 259 'Switch owners or remove those repositories:%s' % (username,
264 260 obj_name)
265 261 assert_session_flash(response, msg)
266 262 fixture.destroy_repo(obj_name)
267 263
268 264 def test_delete_owner_of_repository_detaching(self):
269 265 self.log_user()
270 266 username = 'newtestuserdeleteme_repo_owner_detach'
271 267 obj_name = 'test_repo'
272 268 usr = fixture.create_user(name=username)
273 269 self.destroy_users.add(username)
274 270 fixture.create_repo(obj_name, cur_user=usr.username)
275 271
276 272 new_user = Session().query(User)\
277 273 .filter(User.username == username).one()
278 274 response = self.app.post(url('user', user_id=new_user.user_id),
279 275 params={'_method': 'delete',
280 276 'user_repos': 'detach',
281 277 'csrf_token': self.csrf_token})
282 278
283 279 msg = 'Detached 1 repositories'
284 280 assert_session_flash(response, msg)
285 281 fixture.destroy_repo(obj_name)
286 282
287 283 def test_delete_owner_of_repository_deleting(self):
288 284 self.log_user()
289 285 username = 'newtestuserdeleteme_repo_owner_delete'
290 286 obj_name = 'test_repo'
291 287 usr = fixture.create_user(name=username)
292 288 self.destroy_users.add(username)
293 289 fixture.create_repo(obj_name, cur_user=usr.username)
294 290
295 291 new_user = Session().query(User)\
296 292 .filter(User.username == username).one()
297 293 response = self.app.post(url('user', user_id=new_user.user_id),
298 294 params={'_method': 'delete',
299 295 'user_repos': 'delete',
300 296 'csrf_token': self.csrf_token})
301 297
302 298 msg = 'Deleted 1 repositories'
303 299 assert_session_flash(response, msg)
304 300
305 301 def test_delete_owner_of_repository_group(self):
306 302 self.log_user()
307 303 username = 'newtestuserdeleteme_repo_group_owner'
308 304 obj_name = 'test_group'
309 305 usr = fixture.create_user(name=username)
310 306 self.destroy_users.add(username)
311 307 fixture.create_repo_group(obj_name, cur_user=usr.username)
312 308
313 309 new_user = Session().query(User)\
314 310 .filter(User.username == username).one()
315 311 response = self.app.post(url('user', user_id=new_user.user_id),
316 312 params={'_method': 'delete',
317 313 'csrf_token': self.csrf_token})
318 314
319 315 msg = 'user "%s" still owns 1 repository groups and cannot be removed. ' \
320 316 'Switch owners or remove those repository groups:%s' % (username,
321 317 obj_name)
322 318 assert_session_flash(response, msg)
323 319 fixture.destroy_repo_group(obj_name)
324 320
325 321 def test_delete_owner_of_repository_group_detaching(self):
326 322 self.log_user()
327 323 username = 'newtestuserdeleteme_repo_group_owner_detach'
328 324 obj_name = 'test_group'
329 325 usr = fixture.create_user(name=username)
330 326 self.destroy_users.add(username)
331 327 fixture.create_repo_group(obj_name, cur_user=usr.username)
332 328
333 329 new_user = Session().query(User)\
334 330 .filter(User.username == username).one()
335 331 response = self.app.post(url('user', user_id=new_user.user_id),
336 332 params={'_method': 'delete',
337 333 'user_repo_groups': 'delete',
338 334 'csrf_token': self.csrf_token})
339 335
340 336 msg = 'Deleted 1 repository groups'
341 337 assert_session_flash(response, msg)
342 338
343 339 def test_delete_owner_of_repository_group_deleting(self):
344 340 self.log_user()
345 341 username = 'newtestuserdeleteme_repo_group_owner_delete'
346 342 obj_name = 'test_group'
347 343 usr = fixture.create_user(name=username)
348 344 self.destroy_users.add(username)
349 345 fixture.create_repo_group(obj_name, cur_user=usr.username)
350 346
351 347 new_user = Session().query(User)\
352 348 .filter(User.username == username).one()
353 349 response = self.app.post(url('user', user_id=new_user.user_id),
354 350 params={'_method': 'delete',
355 351 'user_repo_groups': 'detach',
356 352 'csrf_token': self.csrf_token})
357 353
358 354 msg = 'Detached 1 repository groups'
359 355 assert_session_flash(response, msg)
360 356 fixture.destroy_repo_group(obj_name)
361 357
362 358 def test_delete_owner_of_user_group(self):
363 359 self.log_user()
364 360 username = 'newtestuserdeleteme_user_group_owner'
365 361 obj_name = 'test_user_group'
366 362 usr = fixture.create_user(name=username)
367 363 self.destroy_users.add(username)
368 364 fixture.create_user_group(obj_name, cur_user=usr.username)
369 365
370 366 new_user = Session().query(User)\
371 367 .filter(User.username == username).one()
372 368 response = self.app.post(url('user', user_id=new_user.user_id),
373 369 params={'_method': 'delete',
374 370 'csrf_token': self.csrf_token})
375 371
376 372 msg = 'user "%s" still owns 1 user groups and cannot be removed. ' \
377 373 'Switch owners or remove those user groups:%s' % (username,
378 374 obj_name)
379 375 assert_session_flash(response, msg)
380 376 fixture.destroy_user_group(obj_name)
381 377
382 378 def test_delete_owner_of_user_group_detaching(self):
383 379 self.log_user()
384 380 username = 'newtestuserdeleteme_user_group_owner_detaching'
385 381 obj_name = 'test_user_group'
386 382 usr = fixture.create_user(name=username)
387 383 self.destroy_users.add(username)
388 384 fixture.create_user_group(obj_name, cur_user=usr.username)
389 385
390 386 new_user = Session().query(User)\
391 387 .filter(User.username == username).one()
392 388 try:
393 389 response = self.app.post(url('user', user_id=new_user.user_id),
394 390 params={'_method': 'delete',
395 391 'user_user_groups': 'detach',
396 392 'csrf_token': self.csrf_token})
397 393
398 394 msg = 'Detached 1 user groups'
399 395 assert_session_flash(response, msg)
400 396 finally:
401 397 fixture.destroy_user_group(obj_name)
402 398
403 399 def test_delete_owner_of_user_group_deleting(self):
404 400 self.log_user()
405 401 username = 'newtestuserdeleteme_user_group_owner_deleting'
406 402 obj_name = 'test_user_group'
407 403 usr = fixture.create_user(name=username)
408 404 self.destroy_users.add(username)
409 405 fixture.create_user_group(obj_name, cur_user=usr.username)
410 406
411 407 new_user = Session().query(User)\
412 408 .filter(User.username == username).one()
413 409 response = self.app.post(url('user', user_id=new_user.user_id),
414 410 params={'_method': 'delete',
415 411 'user_user_groups': 'delete',
416 412 'csrf_token': self.csrf_token})
417 413
418 414 msg = 'Deleted 1 user groups'
419 415 assert_session_flash(response, msg)
420 416
421 417 def test_edit(self):
422 418 self.log_user()
423 419 user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
424 420 self.app.get(url('edit_user', user_id=user.user_id))
425 421
426 422 @pytest.mark.parametrize(
427 423 'repo_create, repo_create_write, user_group_create, repo_group_create,'
428 424 'fork_create, inherit_default_permissions, expect_error,'
429 425 'expect_form_error', [
430 426 ('hg.create.none', 'hg.create.write_on_repogroup.false',
431 427 'hg.usergroup.create.false', 'hg.repogroup.create.false',
432 428 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
433 429 ('hg.create.repository', 'hg.create.write_on_repogroup.false',
434 430 'hg.usergroup.create.false', 'hg.repogroup.create.false',
435 431 'hg.fork.none', 'hg.inherit_default_perms.false', False, False),
436 432 ('hg.create.repository', 'hg.create.write_on_repogroup.true',
437 433 'hg.usergroup.create.true', 'hg.repogroup.create.true',
438 434 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
439 435 False),
440 436 ('hg.create.XXX', 'hg.create.write_on_repogroup.true',
441 437 'hg.usergroup.create.true', 'hg.repogroup.create.true',
442 438 'hg.fork.repository', 'hg.inherit_default_perms.false', False,
443 439 True),
444 440 ('', '', '', '', '', '', True, False),
445 441 ])
446 442 def test_global_perms_on_user(
447 443 self, repo_create, repo_create_write, user_group_create,
448 444 repo_group_create, fork_create, expect_error, expect_form_error,
449 445 inherit_default_permissions):
450 446 self.log_user()
451 447 user = fixture.create_user('dummy')
452 448 uid = user.user_id
453 449
454 450 # ENABLE REPO CREATE ON A GROUP
455 451 perm_params = {
456 452 'inherit_default_permissions': False,
457 453 'default_repo_create': repo_create,
458 454 'default_repo_create_on_write': repo_create_write,
459 455 'default_user_group_create': user_group_create,
460 456 'default_repo_group_create': repo_group_create,
461 457 'default_fork_create': fork_create,
462 458 'default_inherit_default_permissions': inherit_default_permissions,
463 459 '_method': 'put',
464 460 'csrf_token': self.csrf_token,
465 461 }
466 462 response = self.app.post(
467 463 url('edit_user_global_perms', user_id=uid),
468 464 params=perm_params)
469 465
470 466 if expect_form_error:
471 467 assert response.status_int == 200
472 468 response.mustcontain('Value must be one of')
473 469 else:
474 470 if expect_error:
475 471 msg = 'An error occurred during permissions saving'
476 472 else:
477 473 msg = 'User global permissions updated successfully'
478 474 ug = User.get(uid)
479 475 del perm_params['_method']
480 476 del perm_params['inherit_default_permissions']
481 477 del perm_params['csrf_token']
482 478 assert perm_params == ug.get_default_perms()
483 479 assert_session_flash(response, msg)
484 480 fixture.destroy_user(uid)
485 481
486 482 def test_global_permissions_initial_values(self, user_util):
487 483 self.log_user()
488 484 user = user_util.create_user()
489 485 uid = user.user_id
490 486 response = self.app.get(url('edit_user_global_perms', user_id=uid))
491 487 default_user = User.get_default_user()
492 488 default_permissions = default_user.get_default_perms()
493 489 assert_response = AssertResponse(response)
494 490 expected_permissions = (
495 491 'default_repo_create', 'default_repo_create_on_write',
496 492 'default_fork_create', 'default_repo_group_create',
497 493 'default_user_group_create', 'default_inherit_default_permissions')
498 494 for permission in expected_permissions:
499 495 css_selector = '[name={}][checked=checked]'.format(permission)
500 496 element = assert_response.get_element(css_selector)
501 497 assert element.value == default_permissions[permission]
502 498
503 499 def test_ips(self):
504 500 self.log_user()
505 501 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
506 502 response = self.app.get(url('edit_user_ips', user_id=user.user_id))
507 503 response.mustcontain('All IP addresses are allowed')
508 504
509 505 @pytest.mark.parametrize("test_name, ip, ip_range, failure", [
510 506 ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False),
511 507 ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False),
512 508 ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False),
513 509 ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False),
514 510 ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True),
515 511 ('127_bad_ip', 'foobar', 'foobar', True),
516 512 ])
517 513 def test_add_ip(self, test_name, ip, ip_range, failure):
518 514 self.log_user()
519 515 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
520 516 user_id = user.user_id
521 517
522 518 response = self.app.post(url('edit_user_ips', user_id=user_id),
523 519 params={'new_ip': ip, '_method': 'put',
524 520 'csrf_token': self.csrf_token})
525 521
526 522 if failure:
527 523 assert_session_flash(
528 524 response, 'Please enter a valid IPv4 or IpV6 address')
529 525 response = self.app.get(url('edit_user_ips', user_id=user_id))
530 526 response.mustcontain(no=[ip])
531 527 response.mustcontain(no=[ip_range])
532 528
533 529 else:
534 530 response = self.app.get(url('edit_user_ips', user_id=user_id))
535 531 response.mustcontain(ip)
536 532 response.mustcontain(ip_range)
537 533
538 534 # cleanup
539 535 for del_ip in UserIpMap.query().filter(
540 536 UserIpMap.user_id == user_id).all():
541 537 Session().delete(del_ip)
542 538 Session().commit()
543 539
544 540 def test_delete_ip(self):
545 541 self.log_user()
546 542 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
547 543 user_id = user.user_id
548 544 ip = '127.0.0.1/32'
549 545 ip_range = '127.0.0.1 - 127.0.0.1'
550 546 new_ip = UserModel().add_extra_ip(user_id, ip)
551 547 Session().commit()
552 548 new_ip_id = new_ip.ip_id
553 549
554 550 response = self.app.get(url('edit_user_ips', user_id=user_id))
555 551 response.mustcontain(ip)
556 552 response.mustcontain(ip_range)
557 553
558 554 self.app.post(url('edit_user_ips', user_id=user_id),
559 555 params={'_method': 'delete', 'del_ip_id': new_ip_id,
560 556 'csrf_token': self.csrf_token})
561 557
562 558 response = self.app.get(url('edit_user_ips', user_id=user_id))
563 559 response.mustcontain('All IP addresses are allowed')
564 560 response.mustcontain(no=[ip])
565 561 response.mustcontain(no=[ip_range])
566
567 def test_auth_tokens(self):
568 self.log_user()
569
570 user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
571 response = self.app.get(
572 url('edit_user_auth_tokens', user_id=user.user_id))
573 for token in user.auth_tokens:
574 response.mustcontain(token)
575 response.mustcontain('never')
576
577 @pytest.mark.parametrize("desc, lifetime", [
578 ('forever', -1),
579 ('5mins', 60*5),
580 ('30days', 60*60*24*30),
581 ])
582 def test_add_auth_token(self, desc, lifetime, user_util):
583 self.log_user()
584 user = user_util.create_user()
585 user_id = user.user_id
586
587 response = self.app.post(
588 url('edit_user_auth_tokens', user_id=user_id),
589 {'_method': 'put', 'description': desc, 'lifetime': lifetime,
590 'csrf_token': self.csrf_token})
591 assert_session_flash(response, 'Auth token successfully created')
592
593 response = response.follow()
594 user = User.get(user_id)
595 for auth_token in user.auth_tokens:
596 response.mustcontain(auth_token)
597
598 def test_remove_auth_token(self, user_util):
599 self.log_user()
600 user = user_util.create_user()
601 user_id = user.user_id
602
603 response = self.app.post(
604 url('edit_user_auth_tokens', user_id=user_id),
605 {'_method': 'put', 'description': 'desc', 'lifetime': -1,
606 'csrf_token': self.csrf_token})
607 assert_session_flash(response, 'Auth token successfully created')
608 response = response.follow()
609
610 # now delete our key
611 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
612 assert 3 == len(keys)
613
614 response = self.app.post(
615 url('edit_user_auth_tokens', user_id=user_id),
616 {'_method': 'delete', 'del_auth_token': keys[0].api_key,
617 'csrf_token': self.csrf_token})
618 assert_session_flash(response, 'Auth token successfully deleted')
619 keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
620 assert 2 == len(keys)
@@ -1,1808 +1,1806 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import CommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 65 from rhodecode.tests.utils import CustomTestApp
66 66 from rhodecode.tests.fixture import Fixture
67 67
68 68
69 69 def _split_comma(value):
70 70 return value.split(',')
71 71
72 72
73 73 def pytest_addoption(parser):
74 74 parser.addoption(
75 75 '--keep-tmp-path', action='store_true',
76 76 help="Keep the test temporary directories")
77 77 parser.addoption(
78 78 '--backends', action='store', type=_split_comma,
79 79 default=['git', 'hg', 'svn'],
80 80 help="Select which backends to test for backend specific tests.")
81 81 parser.addoption(
82 82 '--dbs', action='store', type=_split_comma,
83 83 default=['sqlite'],
84 84 help="Select which database to test for database specific tests. "
85 85 "Possible options are sqlite,postgres,mysql")
86 86 parser.addoption(
87 87 '--appenlight', '--ae', action='store_true',
88 88 help="Track statistics in appenlight.")
89 89 parser.addoption(
90 90 '--appenlight-api-key', '--ae-key',
91 91 help="API key for Appenlight.")
92 92 parser.addoption(
93 93 '--appenlight-url', '--ae-url',
94 94 default="https://ae.rhodecode.com",
95 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
96 96 parser.addoption(
97 97 '--sqlite-connection-string', action='store',
98 98 default='', help="Connection string for the dbs tests with SQLite")
99 99 parser.addoption(
100 100 '--postgres-connection-string', action='store',
101 101 default='', help="Connection string for the dbs tests with Postgres")
102 102 parser.addoption(
103 103 '--mysql-connection-string', action='store',
104 104 default='', help="Connection string for the dbs tests with MySQL")
105 105 parser.addoption(
106 106 '--repeat', type=int, default=100,
107 107 help="Number of repetitions in performance tests.")
108 108
109 109
110 110 def pytest_configure(config):
111 111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 112 from rhodecode.config import patches
113 113 patches.kombu_1_5_1_python_2_7_11()
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.function.backends.args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 185 return http_environ(
186 186 http_host_stub=http_host_stub())
187 187
188 188
189 189 @pytest.fixture
190 190 def http_host_stub():
191 191 """
192 192 Value of HTTP_HOST in the test run.
193 193 """
194 194 return 'test.example.com:80'
195 195
196 196
197 197 @pytest.fixture
198 198 def http_environ(http_host_stub):
199 199 """
200 200 HTTP extra environ keys.
201 201
202 202 User by the test application and as well for setting up the pylons
203 203 environment. In the case of the fixture "app" it should be possible
204 204 to override this for a specific test case.
205 205 """
206 206 return {
207 207 'SERVER_NAME': http_host_stub.split(':')[0],
208 208 'SERVER_PORT': http_host_stub.split(':')[1],
209 209 'HTTP_HOST': http_host_stub,
210 210 }
211 211
212 212
213 213 @pytest.fixture(scope='function')
214 214 def app(request, pylonsapp, http_environ):
215
216
217 215 app = CustomTestApp(
218 216 pylonsapp,
219 217 extra_environ=http_environ)
220 218 if request.cls:
221 219 request.cls.app = app
222 220 return app
223 221
224 222
225 223 @pytest.fixture(scope='session')
226 224 def app_settings(pylonsapp, pylons_config):
227 225 """
228 226 Settings dictionary used to create the app.
229 227
230 228 Parses the ini file and passes the result through the sanitize and apply
231 229 defaults mechanism in `rhodecode.config.middleware`.
232 230 """
233 231 from paste.deploy.loadwsgi import loadcontext, APP
234 232 from rhodecode.config.middleware import (
235 233 sanitize_settings_and_apply_defaults)
236 234 context = loadcontext(APP, 'config:' + pylons_config)
237 235 settings = sanitize_settings_and_apply_defaults(context.config())
238 236 return settings
239 237
240 238
241 239 @pytest.fixture(scope='session')
242 240 def db(app_settings):
243 241 """
244 242 Initializes the database connection.
245 243
246 244 It uses the same settings which are used to create the ``pylonsapp`` or
247 245 ``app`` fixtures.
248 246 """
249 247 from rhodecode.config.utils import initialize_database
250 248 initialize_database(app_settings)
251 249
252 250
253 251 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
254 252
255 253
256 254 def _autologin_user(app, *args):
257 255 session = login_user_session(app, *args)
258 256 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
259 257 return LoginData(csrf_token, session['rhodecode_user'])
260 258
261 259
262 260 @pytest.fixture
263 261 def autologin_user(app):
264 262 """
265 263 Utility fixture which makes sure that the admin user is logged in
266 264 """
267 265 return _autologin_user(app)
268 266
269 267
270 268 @pytest.fixture
271 269 def autologin_regular_user(app):
272 270 """
273 271 Utility fixture which makes sure that the regular user is logged in
274 272 """
275 273 return _autologin_user(
276 274 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
277 275
278 276
279 277 @pytest.fixture(scope='function')
280 278 def csrf_token(request, autologin_user):
281 279 return autologin_user.csrf_token
282 280
283 281
284 282 @pytest.fixture(scope='function')
285 283 def xhr_header(request):
286 284 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
287 285
288 286
289 287 @pytest.fixture
290 288 def real_crypto_backend(monkeypatch):
291 289 """
292 290 Switch the production crypto backend on for this test.
293 291
294 292 During the test run the crypto backend is replaced with a faster
295 293 implementation based on the MD5 algorithm.
296 294 """
297 295 monkeypatch.setattr(rhodecode, 'is_test', False)
298 296
299 297
300 298 @pytest.fixture(scope='class')
301 299 def index_location(request, pylonsapp):
302 300 index_location = pylonsapp.config['app_conf']['search.location']
303 301 if request.cls:
304 302 request.cls.index_location = index_location
305 303 return index_location
306 304
307 305
308 306 @pytest.fixture(scope='session', autouse=True)
309 307 def tests_tmp_path(request):
310 308 """
311 309 Create temporary directory to be used during the test session.
312 310 """
313 311 if not os.path.exists(TESTS_TMP_PATH):
314 312 os.makedirs(TESTS_TMP_PATH)
315 313
316 314 if not request.config.getoption('--keep-tmp-path'):
317 315 @request.addfinalizer
318 316 def remove_tmp_path():
319 317 shutil.rmtree(TESTS_TMP_PATH)
320 318
321 319 return TESTS_TMP_PATH
322 320
323 321
324 322 @pytest.fixture
325 323 def test_repo_group(request):
326 324 """
327 325 Create a temporary repository group, and destroy it after
328 326 usage automatically
329 327 """
330 328 fixture = Fixture()
331 329 repogroupid = 'test_repo_group_%s' % int(time.time())
332 330 repo_group = fixture.create_repo_group(repogroupid)
333 331
334 332 def _cleanup():
335 333 fixture.destroy_repo_group(repogroupid)
336 334
337 335 request.addfinalizer(_cleanup)
338 336 return repo_group
339 337
340 338
341 339 @pytest.fixture
342 340 def test_user_group(request):
343 341 """
344 342 Create a temporary user group, and destroy it after
345 343 usage automatically
346 344 """
347 345 fixture = Fixture()
348 346 usergroupid = 'test_user_group_%s' % int(time.time())
349 347 user_group = fixture.create_user_group(usergroupid)
350 348
351 349 def _cleanup():
352 350 fixture.destroy_user_group(user_group)
353 351
354 352 request.addfinalizer(_cleanup)
355 353 return user_group
356 354
357 355
358 356 @pytest.fixture(scope='session')
359 357 def test_repo(request):
360 358 container = TestRepoContainer()
361 359 request.addfinalizer(container._cleanup)
362 360 return container
363 361
364 362
365 363 class TestRepoContainer(object):
366 364 """
367 365 Container for test repositories which are used read only.
368 366
369 367 Repositories will be created on demand and re-used during the lifetime
370 368 of this object.
371 369
372 370 Usage to get the svn test repository "minimal"::
373 371
374 372 test_repo = TestContainer()
375 373 repo = test_repo('minimal', 'svn')
376 374
377 375 """
378 376
379 377 dump_extractors = {
380 378 'git': utils.extract_git_repo_from_dump,
381 379 'hg': utils.extract_hg_repo_from_dump,
382 380 'svn': utils.extract_svn_repo_from_dump,
383 381 }
384 382
385 383 def __init__(self):
386 384 self._cleanup_repos = []
387 385 self._fixture = Fixture()
388 386 self._repos = {}
389 387
390 388 def __call__(self, dump_name, backend_alias):
391 389 key = (dump_name, backend_alias)
392 390 if key not in self._repos:
393 391 repo = self._create_repo(dump_name, backend_alias)
394 392 self._repos[key] = repo.repo_id
395 393 return Repository.get(self._repos[key])
396 394
397 395 def _create_repo(self, dump_name, backend_alias):
398 396 repo_name = '%s-%s' % (backend_alias, dump_name)
399 397 backend_class = get_backend(backend_alias)
400 398 dump_extractor = self.dump_extractors[backend_alias]
401 399 repo_path = dump_extractor(dump_name, repo_name)
402 400 vcs_repo = backend_class(repo_path)
403 401 repo2db_mapper({repo_name: vcs_repo})
404 402 repo = RepoModel().get_by_repo_name(repo_name)
405 403 self._cleanup_repos.append(repo_name)
406 404 return repo
407 405
408 406 def _cleanup(self):
409 407 for repo_name in reversed(self._cleanup_repos):
410 408 self._fixture.destroy_repo(repo_name)
411 409
412 410
413 411 @pytest.fixture
414 412 def backend(request, backend_alias, pylonsapp, test_repo):
415 413 """
416 414 Parametrized fixture which represents a single backend implementation.
417 415
418 416 It respects the option `--backends` to focus the test run on specific
419 417 backend implementations.
420 418
421 419 It also supports `pytest.mark.xfail_backends` to mark tests as failing
422 420 for specific backends. This is intended as a utility for incremental
423 421 development of a new backend implementation.
424 422 """
425 423 if backend_alias not in request.config.getoption('--backends'):
426 424 pytest.skip("Backend %s not selected." % (backend_alias, ))
427 425
428 426 utils.check_xfail_backends(request.node, backend_alias)
429 427 utils.check_skip_backends(request.node, backend_alias)
430 428
431 429 repo_name = 'vcs_test_%s' % (backend_alias, )
432 430 backend = Backend(
433 431 alias=backend_alias,
434 432 repo_name=repo_name,
435 433 test_name=request.node.name,
436 434 test_repo_container=test_repo)
437 435 request.addfinalizer(backend.cleanup)
438 436 return backend
439 437
440 438
441 439 @pytest.fixture
442 440 def backend_git(request, pylonsapp, test_repo):
443 441 return backend(request, 'git', pylonsapp, test_repo)
444 442
445 443
446 444 @pytest.fixture
447 445 def backend_hg(request, pylonsapp, test_repo):
448 446 return backend(request, 'hg', pylonsapp, test_repo)
449 447
450 448
451 449 @pytest.fixture
452 450 def backend_svn(request, pylonsapp, test_repo):
453 451 return backend(request, 'svn', pylonsapp, test_repo)
454 452
455 453
456 454 @pytest.fixture
457 455 def backend_random(backend_git):
458 456 """
459 457 Use this to express that your tests need "a backend.
460 458
461 459 A few of our tests need a backend, so that we can run the code. This
462 460 fixture is intended to be used for such cases. It will pick one of the
463 461 backends and run the tests.
464 462
465 463 The fixture `backend` would run the test multiple times for each
466 464 available backend which is a pure waste of time if the test is
467 465 independent of the backend type.
468 466 """
469 467 # TODO: johbo: Change this to pick a random backend
470 468 return backend_git
471 469
472 470
473 471 @pytest.fixture
474 472 def backend_stub(backend_git):
475 473 """
476 474 Use this to express that your tests need a backend stub
477 475
478 476 TODO: mikhail: Implement a real stub logic instead of returning
479 477 a git backend
480 478 """
481 479 return backend_git
482 480
483 481
484 482 @pytest.fixture
485 483 def repo_stub(backend_stub):
486 484 """
487 485 Use this to express that your tests need a repository stub
488 486 """
489 487 return backend_stub.create_repo()
490 488
491 489
492 490 class Backend(object):
493 491 """
494 492 Represents the test configuration for one supported backend
495 493
496 494 Provides easy access to different test repositories based on
497 495 `__getitem__`. Such repositories will only be created once per test
498 496 session.
499 497 """
500 498
501 499 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
502 500 _master_repo = None
503 501 _commit_ids = {}
504 502
505 503 def __init__(self, alias, repo_name, test_name, test_repo_container):
506 504 self.alias = alias
507 505 self.repo_name = repo_name
508 506 self._cleanup_repos = []
509 507 self._test_name = test_name
510 508 self._test_repo_container = test_repo_container
511 509 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
512 510 # Fixture will survive in the end.
513 511 self._fixture = Fixture()
514 512
515 513 def __getitem__(self, key):
516 514 return self._test_repo_container(key, self.alias)
517 515
518 516 @property
519 517 def repo(self):
520 518 """
521 519 Returns the "current" repository. This is the vcs_test repo or the
522 520 last repo which has been created with `create_repo`.
523 521 """
524 522 from rhodecode.model.db import Repository
525 523 return Repository.get_by_repo_name(self.repo_name)
526 524
527 525 @property
528 526 def default_branch_name(self):
529 527 VcsRepository = get_backend(self.alias)
530 528 return VcsRepository.DEFAULT_BRANCH_NAME
531 529
532 530 @property
533 531 def default_head_id(self):
534 532 """
535 533 Returns the default head id of the underlying backend.
536 534
537 535 This will be the default branch name in case the backend does have a
538 536 default branch. In the other cases it will point to a valid head
539 537 which can serve as the base to create a new commit on top of it.
540 538 """
541 539 vcsrepo = self.repo.scm_instance()
542 540 head_id = (
543 541 vcsrepo.DEFAULT_BRANCH_NAME or
544 542 vcsrepo.commit_ids[-1])
545 543 return head_id
546 544
547 545 @property
548 546 def commit_ids(self):
549 547 """
550 548 Returns the list of commits for the last created repository
551 549 """
552 550 return self._commit_ids
553 551
554 552 def create_master_repo(self, commits):
555 553 """
556 554 Create a repository and remember it as a template.
557 555
558 556 This allows to easily create derived repositories to construct
559 557 more complex scenarios for diff, compare and pull requests.
560 558
561 559 Returns a commit map which maps from commit message to raw_id.
562 560 """
563 561 self._master_repo = self.create_repo(commits=commits)
564 562 return self._commit_ids
565 563
566 564 def create_repo(
567 565 self, commits=None, number_of_commits=0, heads=None,
568 566 name_suffix=u'', **kwargs):
569 567 """
570 568 Create a repository and record it for later cleanup.
571 569
572 570 :param commits: Optional. A sequence of dict instances.
573 571 Will add a commit per entry to the new repository.
574 572 :param number_of_commits: Optional. If set to a number, this number of
575 573 commits will be added to the new repository.
576 574 :param heads: Optional. Can be set to a sequence of of commit
577 575 names which shall be pulled in from the master repository.
578 576
579 577 """
580 578 self.repo_name = self._next_repo_name() + name_suffix
581 579 repo = self._fixture.create_repo(
582 580 self.repo_name, repo_type=self.alias, **kwargs)
583 581 self._cleanup_repos.append(repo.repo_name)
584 582
585 583 commits = commits or [
586 584 {'message': 'Commit %s of %s' % (x, self.repo_name)}
587 585 for x in xrange(number_of_commits)]
588 586 self._add_commits_to_repo(repo.scm_instance(), commits)
589 587 if heads:
590 588 self.pull_heads(repo, heads)
591 589
592 590 return repo
593 591
594 592 def pull_heads(self, repo, heads):
595 593 """
596 594 Make sure that repo contains all commits mentioned in `heads`
597 595 """
598 596 vcsmaster = self._master_repo.scm_instance()
599 597 vcsrepo = repo.scm_instance()
600 598 vcsrepo.config.clear_section('hooks')
601 599 commit_ids = [self._commit_ids[h] for h in heads]
602 600 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
603 601
604 602 def create_fork(self):
605 603 repo_to_fork = self.repo_name
606 604 self.repo_name = self._next_repo_name()
607 605 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
608 606 self._cleanup_repos.append(self.repo_name)
609 607 return repo
610 608
611 609 def new_repo_name(self, suffix=u''):
612 610 self.repo_name = self._next_repo_name() + suffix
613 611 self._cleanup_repos.append(self.repo_name)
614 612 return self.repo_name
615 613
616 614 def _next_repo_name(self):
617 615 return u"%s_%s" % (
618 616 self.invalid_repo_name.sub(u'_', self._test_name),
619 617 len(self._cleanup_repos))
620 618
621 619 def ensure_file(self, filename, content='Test content\n'):
622 620 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
623 621 commits = [
624 622 {'added': [
625 623 FileNode(filename, content=content),
626 624 ]},
627 625 ]
628 626 self._add_commits_to_repo(self.repo.scm_instance(), commits)
629 627
630 628 def enable_downloads(self):
631 629 repo = self.repo
632 630 repo.enable_downloads = True
633 631 Session().add(repo)
634 632 Session().commit()
635 633
636 634 def cleanup(self):
637 635 for repo_name in reversed(self._cleanup_repos):
638 636 self._fixture.destroy_repo(repo_name)
639 637
640 638 def _add_commits_to_repo(self, repo, commits):
641 639 commit_ids = _add_commits_to_repo(repo, commits)
642 640 if not commit_ids:
643 641 return
644 642 self._commit_ids = commit_ids
645 643
646 644 # Creating refs for Git to allow fetching them from remote repository
647 645 if self.alias == 'git':
648 646 refs = {}
649 647 for message in self._commit_ids:
650 648 # TODO: mikhail: do more special chars replacements
651 649 ref_name = 'refs/test-refs/{}'.format(
652 650 message.replace(' ', ''))
653 651 refs[ref_name] = self._commit_ids[message]
654 652 self._create_refs(repo, refs)
655 653
656 654 def _create_refs(self, repo, refs):
657 655 for ref_name in refs:
658 656 repo.set_refs(ref_name, refs[ref_name])
659 657
660 658
661 659 @pytest.fixture
662 660 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
663 661 """
664 662 Parametrized fixture which represents a single vcs backend implementation.
665 663
666 664 See the fixture `backend` for more details. This one implements the same
667 665 concept, but on vcs level. So it does not provide model instances etc.
668 666
669 667 Parameters are generated dynamically, see :func:`pytest_generate_tests`
670 668 for how this works.
671 669 """
672 670 if backend_alias not in request.config.getoption('--backends'):
673 671 pytest.skip("Backend %s not selected." % (backend_alias, ))
674 672
675 673 utils.check_xfail_backends(request.node, backend_alias)
676 674 utils.check_skip_backends(request.node, backend_alias)
677 675
678 676 repo_name = 'vcs_test_%s' % (backend_alias, )
679 677 repo_path = os.path.join(tests_tmp_path, repo_name)
680 678 backend = VcsBackend(
681 679 alias=backend_alias,
682 680 repo_path=repo_path,
683 681 test_name=request.node.name,
684 682 test_repo_container=test_repo)
685 683 request.addfinalizer(backend.cleanup)
686 684 return backend
687 685
688 686
689 687 @pytest.fixture
690 688 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
691 689 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
692 690
693 691
694 692 @pytest.fixture
695 693 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
696 694 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
697 695
698 696
699 697 @pytest.fixture
700 698 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
701 699 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
702 700
703 701
704 702 @pytest.fixture
705 703 def vcsbackend_random(vcsbackend_git):
706 704 """
707 705 Use this to express that your tests need "a vcsbackend".
708 706
709 707 The fixture `vcsbackend` would run the test multiple times for each
710 708 available vcs backend which is a pure waste of time if the test is
711 709 independent of the vcs backend type.
712 710 """
713 711 # TODO: johbo: Change this to pick a random backend
714 712 return vcsbackend_git
715 713
716 714
717 715 @pytest.fixture
718 716 def vcsbackend_stub(vcsbackend_git):
719 717 """
720 718 Use this to express that your test just needs a stub of a vcsbackend.
721 719
722 720 Plan is to eventually implement an in-memory stub to speed tests up.
723 721 """
724 722 return vcsbackend_git
725 723
726 724
727 725 class VcsBackend(object):
728 726 """
729 727 Represents the test configuration for one supported vcs backend.
730 728 """
731 729
732 730 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
733 731
734 732 def __init__(self, alias, repo_path, test_name, test_repo_container):
735 733 self.alias = alias
736 734 self._repo_path = repo_path
737 735 self._cleanup_repos = []
738 736 self._test_name = test_name
739 737 self._test_repo_container = test_repo_container
740 738
741 739 def __getitem__(self, key):
742 740 return self._test_repo_container(key, self.alias).scm_instance()
743 741
744 742 @property
745 743 def repo(self):
746 744 """
747 745 Returns the "current" repository. This is the vcs_test repo of the last
748 746 repo which has been created.
749 747 """
750 748 Repository = get_backend(self.alias)
751 749 return Repository(self._repo_path)
752 750
753 751 @property
754 752 def backend(self):
755 753 """
756 754 Returns the backend implementation class.
757 755 """
758 756 return get_backend(self.alias)
759 757
760 758 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
761 759 repo_name = self._next_repo_name()
762 760 self._repo_path = get_new_dir(repo_name)
763 761 repo_class = get_backend(self.alias)
764 762 src_url = None
765 763 if _clone_repo:
766 764 src_url = _clone_repo.path
767 765 repo = repo_class(self._repo_path, create=True, src_url=src_url)
768 766 self._cleanup_repos.append(repo)
769 767
770 768 commits = commits or [
771 769 {'message': 'Commit %s of %s' % (x, repo_name)}
772 770 for x in xrange(number_of_commits)]
773 771 _add_commits_to_repo(repo, commits)
774 772 return repo
775 773
776 774 def clone_repo(self, repo):
777 775 return self.create_repo(_clone_repo=repo)
778 776
779 777 def cleanup(self):
780 778 for repo in self._cleanup_repos:
781 779 shutil.rmtree(repo.path)
782 780
783 781 def new_repo_path(self):
784 782 repo_name = self._next_repo_name()
785 783 self._repo_path = get_new_dir(repo_name)
786 784 return self._repo_path
787 785
788 786 def _next_repo_name(self):
789 787 return "%s_%s" % (
790 788 self.invalid_repo_name.sub('_', self._test_name),
791 789 len(self._cleanup_repos))
792 790
793 791 def add_file(self, repo, filename, content='Test content\n'):
794 792 imc = repo.in_memory_commit
795 793 imc.add(FileNode(filename, content=content))
796 794 imc.commit(
797 795 message=u'Automatic commit from vcsbackend fixture',
798 796 author=u'Automatic')
799 797
800 798 def ensure_file(self, filename, content='Test content\n'):
801 799 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
802 800 self.add_file(self.repo, filename, content)
803 801
804 802
805 803 def _add_commits_to_repo(vcs_repo, commits):
806 804 commit_ids = {}
807 805 if not commits:
808 806 return commit_ids
809 807
810 808 imc = vcs_repo.in_memory_commit
811 809 commit = None
812 810
813 811 for idx, commit in enumerate(commits):
814 812 message = unicode(commit.get('message', 'Commit %s' % idx))
815 813
816 814 for node in commit.get('added', []):
817 815 imc.add(FileNode(node.path, content=node.content))
818 816 for node in commit.get('changed', []):
819 817 imc.change(FileNode(node.path, content=node.content))
820 818 for node in commit.get('removed', []):
821 819 imc.remove(FileNode(node.path))
822 820
823 821 parents = [
824 822 vcs_repo.get_commit(commit_id=commit_ids[p])
825 823 for p in commit.get('parents', [])]
826 824
827 825 operations = ('added', 'changed', 'removed')
828 826 if not any((commit.get(o) for o in operations)):
829 827 imc.add(FileNode('file_%s' % idx, content=message))
830 828
831 829 commit = imc.commit(
832 830 message=message,
833 831 author=unicode(commit.get('author', 'Automatic')),
834 832 date=commit.get('date'),
835 833 branch=commit.get('branch'),
836 834 parents=parents)
837 835
838 836 commit_ids[commit.message] = commit.raw_id
839 837
840 838 return commit_ids
841 839
842 840
843 841 @pytest.fixture
844 842 def reposerver(request):
845 843 """
846 844 Allows to serve a backend repository
847 845 """
848 846
849 847 repo_server = RepoServer()
850 848 request.addfinalizer(repo_server.cleanup)
851 849 return repo_server
852 850
853 851
854 852 class RepoServer(object):
855 853 """
856 854 Utility to serve a local repository for the duration of a test case.
857 855
858 856 Supports only Subversion so far.
859 857 """
860 858
861 859 url = None
862 860
863 861 def __init__(self):
864 862 self._cleanup_servers = []
865 863
866 864 def serve(self, vcsrepo):
867 865 if vcsrepo.alias != 'svn':
868 866 raise TypeError("Backend %s not supported" % vcsrepo.alias)
869 867
870 868 proc = subprocess32.Popen(
871 869 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
872 870 '--root', vcsrepo.path])
873 871 self._cleanup_servers.append(proc)
874 872 self.url = 'svn://localhost'
875 873
876 874 def cleanup(self):
877 875 for proc in self._cleanup_servers:
878 876 proc.terminate()
879 877
880 878
881 879 @pytest.fixture
882 880 def pr_util(backend, request):
883 881 """
884 882 Utility for tests of models and for functional tests around pull requests.
885 883
886 884 It gives an instance of :class:`PRTestUtility` which provides various
887 885 utility methods around one pull request.
888 886
889 887 This fixture uses `backend` and inherits its parameterization.
890 888 """
891 889
892 890 util = PRTestUtility(backend)
893 891
894 892 @request.addfinalizer
895 893 def cleanup():
896 894 util.cleanup()
897 895
898 896 return util
899 897
900 898
901 899 class PRTestUtility(object):
902 900
903 901 pull_request = None
904 902 pull_request_id = None
905 903 mergeable_patcher = None
906 904 mergeable_mock = None
907 905 notification_patcher = None
908 906
909 907 def __init__(self, backend):
910 908 self.backend = backend
911 909
912 910 def create_pull_request(
913 911 self, commits=None, target_head=None, source_head=None,
914 912 revisions=None, approved=False, author=None, mergeable=False,
915 913 enable_notifications=True, name_suffix=u'', reviewers=None,
916 914 title=u"Test", description=u"Description"):
917 915 self.set_mergeable(mergeable)
918 916 if not enable_notifications:
919 917 # mock notification side effect
920 918 self.notification_patcher = mock.patch(
921 919 'rhodecode.model.notification.NotificationModel.create')
922 920 self.notification_patcher.start()
923 921
924 922 if not self.pull_request:
925 923 if not commits:
926 924 commits = [
927 925 {'message': 'c1'},
928 926 {'message': 'c2'},
929 927 {'message': 'c3'},
930 928 ]
931 929 target_head = 'c1'
932 930 source_head = 'c2'
933 931 revisions = ['c2']
934 932
935 933 self.commit_ids = self.backend.create_master_repo(commits)
936 934 self.target_repository = self.backend.create_repo(
937 935 heads=[target_head], name_suffix=name_suffix)
938 936 self.source_repository = self.backend.create_repo(
939 937 heads=[source_head], name_suffix=name_suffix)
940 938 self.author = author or UserModel().get_by_username(
941 939 TEST_USER_ADMIN_LOGIN)
942 940
943 941 model = PullRequestModel()
944 942 self.create_parameters = {
945 943 'created_by': self.author,
946 944 'source_repo': self.source_repository.repo_name,
947 945 'source_ref': self._default_branch_reference(source_head),
948 946 'target_repo': self.target_repository.repo_name,
949 947 'target_ref': self._default_branch_reference(target_head),
950 948 'revisions': [self.commit_ids[r] for r in revisions],
951 949 'reviewers': reviewers or self._get_reviewers(),
952 950 'title': title,
953 951 'description': description,
954 952 }
955 953 self.pull_request = model.create(**self.create_parameters)
956 954 assert model.get_versions(self.pull_request) == []
957 955
958 956 self.pull_request_id = self.pull_request.pull_request_id
959 957
960 958 if approved:
961 959 self.approve()
962 960
963 961 Session().add(self.pull_request)
964 962 Session().commit()
965 963
966 964 return self.pull_request
967 965
968 966 def approve(self):
969 967 self.create_status_votes(
970 968 ChangesetStatus.STATUS_APPROVED,
971 969 *self.pull_request.reviewers)
972 970
973 971 def close(self):
974 972 PullRequestModel().close_pull_request(self.pull_request, self.author)
975 973
976 974 def _default_branch_reference(self, commit_message):
977 975 reference = '%s:%s:%s' % (
978 976 'branch',
979 977 self.backend.default_branch_name,
980 978 self.commit_ids[commit_message])
981 979 return reference
982 980
983 981 def _get_reviewers(self):
984 982 model = UserModel()
985 983 return [
986 984 model.get_by_username(TEST_USER_REGULAR_LOGIN),
987 985 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
988 986 ]
989 987
990 988 def update_source_repository(self, head=None):
991 989 heads = [head or 'c3']
992 990 self.backend.pull_heads(self.source_repository, heads=heads)
993 991
994 992 def add_one_commit(self, head=None):
995 993 self.update_source_repository(head=head)
996 994 old_commit_ids = set(self.pull_request.revisions)
997 995 PullRequestModel().update_commits(self.pull_request)
998 996 commit_ids = set(self.pull_request.revisions)
999 997 new_commit_ids = commit_ids - old_commit_ids
1000 998 assert len(new_commit_ids) == 1
1001 999 return new_commit_ids.pop()
1002 1000
1003 1001 def remove_one_commit(self):
1004 1002 assert len(self.pull_request.revisions) == 2
1005 1003 source_vcs = self.source_repository.scm_instance()
1006 1004 removed_commit_id = source_vcs.commit_ids[-1]
1007 1005
1008 1006 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1009 1007 # remove the if once that's sorted out.
1010 1008 if self.backend.alias == "git":
1011 1009 kwargs = {'branch_name': self.backend.default_branch_name}
1012 1010 else:
1013 1011 kwargs = {}
1014 1012 source_vcs.strip(removed_commit_id, **kwargs)
1015 1013
1016 1014 PullRequestModel().update_commits(self.pull_request)
1017 1015 assert len(self.pull_request.revisions) == 1
1018 1016 return removed_commit_id
1019 1017
1020 1018 def create_comment(self, linked_to=None):
1021 1019 comment = CommentsModel().create(
1022 1020 text=u"Test comment",
1023 1021 repo=self.target_repository.repo_name,
1024 1022 user=self.author,
1025 1023 pull_request=self.pull_request)
1026 1024 assert comment.pull_request_version_id is None
1027 1025
1028 1026 if linked_to:
1029 1027 PullRequestModel()._link_comments_to_version(linked_to)
1030 1028
1031 1029 return comment
1032 1030
1033 1031 def create_inline_comment(
1034 1032 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1035 1033 comment = CommentsModel().create(
1036 1034 text=u"Test comment",
1037 1035 repo=self.target_repository.repo_name,
1038 1036 user=self.author,
1039 1037 line_no=line_no,
1040 1038 f_path=file_path,
1041 1039 pull_request=self.pull_request)
1042 1040 assert comment.pull_request_version_id is None
1043 1041
1044 1042 if linked_to:
1045 1043 PullRequestModel()._link_comments_to_version(linked_to)
1046 1044
1047 1045 return comment
1048 1046
1049 1047 def create_version_of_pull_request(self):
1050 1048 pull_request = self.create_pull_request()
1051 1049 version = PullRequestModel()._create_version_from_snapshot(
1052 1050 pull_request)
1053 1051 return version
1054 1052
1055 1053 def create_status_votes(self, status, *reviewers):
1056 1054 for reviewer in reviewers:
1057 1055 ChangesetStatusModel().set_status(
1058 1056 repo=self.pull_request.target_repo,
1059 1057 status=status,
1060 1058 user=reviewer.user_id,
1061 1059 pull_request=self.pull_request)
1062 1060
1063 1061 def set_mergeable(self, value):
1064 1062 if not self.mergeable_patcher:
1065 1063 self.mergeable_patcher = mock.patch.object(
1066 1064 VcsSettingsModel, 'get_general_settings')
1067 1065 self.mergeable_mock = self.mergeable_patcher.start()
1068 1066 self.mergeable_mock.return_value = {
1069 1067 'rhodecode_pr_merge_enabled': value}
1070 1068
1071 1069 def cleanup(self):
1072 1070 # In case the source repository is already cleaned up, the pull
1073 1071 # request will already be deleted.
1074 1072 pull_request = PullRequest().get(self.pull_request_id)
1075 1073 if pull_request:
1076 1074 PullRequestModel().delete(pull_request)
1077 1075 Session().commit()
1078 1076
1079 1077 if self.notification_patcher:
1080 1078 self.notification_patcher.stop()
1081 1079
1082 1080 if self.mergeable_patcher:
1083 1081 self.mergeable_patcher.stop()
1084 1082
1085 1083
1086 1084 @pytest.fixture
1087 1085 def user_admin(pylonsapp):
1088 1086 """
1089 1087 Provides the default admin test user as an instance of `db.User`.
1090 1088 """
1091 1089 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1092 1090 return user
1093 1091
1094 1092
1095 1093 @pytest.fixture
1096 1094 def user_regular(pylonsapp):
1097 1095 """
1098 1096 Provides the default regular test user as an instance of `db.User`.
1099 1097 """
1100 1098 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1101 1099 return user
1102 1100
1103 1101
1104 1102 @pytest.fixture
1105 1103 def user_util(request, pylonsapp):
1106 1104 """
1107 1105 Provides a wired instance of `UserUtility` with integrated cleanup.
1108 1106 """
1109 1107 utility = UserUtility(test_name=request.node.name)
1110 1108 request.addfinalizer(utility.cleanup)
1111 1109 return utility
1112 1110
1113 1111
1114 1112 # TODO: johbo: Split this up into utilities per domain or something similar
1115 1113 class UserUtility(object):
1116 1114
1117 1115 def __init__(self, test_name="test"):
1118 1116 self._test_name = self._sanitize_name(test_name)
1119 1117 self.fixture = Fixture()
1120 1118 self.repo_group_ids = []
1121 1119 self.repos_ids = []
1122 1120 self.user_ids = []
1123 1121 self.user_group_ids = []
1124 1122 self.user_repo_permission_ids = []
1125 1123 self.user_group_repo_permission_ids = []
1126 1124 self.user_repo_group_permission_ids = []
1127 1125 self.user_group_repo_group_permission_ids = []
1128 1126 self.user_user_group_permission_ids = []
1129 1127 self.user_group_user_group_permission_ids = []
1130 1128 self.user_permissions = []
1131 1129
1132 1130 def _sanitize_name(self, name):
1133 1131 for char in ['[', ']']:
1134 1132 name = name.replace(char, '_')
1135 1133 return name
1136 1134
1137 1135 def create_repo_group(
1138 1136 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1139 1137 group_name = "{prefix}_repogroup_{count}".format(
1140 1138 prefix=self._test_name,
1141 1139 count=len(self.repo_group_ids))
1142 1140 repo_group = self.fixture.create_repo_group(
1143 1141 group_name, cur_user=owner)
1144 1142 if auto_cleanup:
1145 1143 self.repo_group_ids.append(repo_group.group_id)
1146 1144 return repo_group
1147 1145
1148 1146 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None, auto_cleanup=True):
1149 1147 repo_name = "{prefix}_repository_{count}".format(
1150 1148 prefix=self._test_name,
1151 1149 count=len(self.repos_ids))
1152 1150
1153 1151 repository = self.fixture.create_repo(
1154 1152 repo_name, cur_user=owner, repo_group=parent)
1155 1153 if auto_cleanup:
1156 1154 self.repos_ids.append(repository.repo_id)
1157 1155 return repository
1158 1156
1159 1157 def create_user(self, auto_cleanup=True, **kwargs):
1160 1158 user_name = "{prefix}_user_{count}".format(
1161 1159 prefix=self._test_name,
1162 1160 count=len(self.user_ids))
1163 1161 user = self.fixture.create_user(user_name, **kwargs)
1164 1162 if auto_cleanup:
1165 1163 self.user_ids.append(user.user_id)
1166 1164 return user
1167 1165
1168 1166 def create_user_with_group(self):
1169 1167 user = self.create_user()
1170 1168 user_group = self.create_user_group(members=[user])
1171 1169 return user, user_group
1172 1170
1173 1171 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1174 1172 auto_cleanup=True, **kwargs):
1175 1173 group_name = "{prefix}_usergroup_{count}".format(
1176 1174 prefix=self._test_name,
1177 1175 count=len(self.user_group_ids))
1178 1176 user_group = self.fixture.create_user_group(
1179 1177 group_name, cur_user=owner, **kwargs)
1180 1178
1181 1179 if auto_cleanup:
1182 1180 self.user_group_ids.append(user_group.users_group_id)
1183 1181 if members:
1184 1182 for user in members:
1185 1183 UserGroupModel().add_user_to_group(user_group, user)
1186 1184 return user_group
1187 1185
1188 1186 def grant_user_permission(self, user_name, permission_name):
1189 1187 self._inherit_default_user_permissions(user_name, False)
1190 1188 self.user_permissions.append((user_name, permission_name))
1191 1189
1192 1190 def grant_user_permission_to_repo_group(
1193 1191 self, repo_group, user, permission_name):
1194 1192 permission = RepoGroupModel().grant_user_permission(
1195 1193 repo_group, user, permission_name)
1196 1194 self.user_repo_group_permission_ids.append(
1197 1195 (repo_group.group_id, user.user_id))
1198 1196 return permission
1199 1197
1200 1198 def grant_user_group_permission_to_repo_group(
1201 1199 self, repo_group, user_group, permission_name):
1202 1200 permission = RepoGroupModel().grant_user_group_permission(
1203 1201 repo_group, user_group, permission_name)
1204 1202 self.user_group_repo_group_permission_ids.append(
1205 1203 (repo_group.group_id, user_group.users_group_id))
1206 1204 return permission
1207 1205
1208 1206 def grant_user_permission_to_repo(
1209 1207 self, repo, user, permission_name):
1210 1208 permission = RepoModel().grant_user_permission(
1211 1209 repo, user, permission_name)
1212 1210 self.user_repo_permission_ids.append(
1213 1211 (repo.repo_id, user.user_id))
1214 1212 return permission
1215 1213
1216 1214 def grant_user_group_permission_to_repo(
1217 1215 self, repo, user_group, permission_name):
1218 1216 permission = RepoModel().grant_user_group_permission(
1219 1217 repo, user_group, permission_name)
1220 1218 self.user_group_repo_permission_ids.append(
1221 1219 (repo.repo_id, user_group.users_group_id))
1222 1220 return permission
1223 1221
1224 1222 def grant_user_permission_to_user_group(
1225 1223 self, target_user_group, user, permission_name):
1226 1224 permission = UserGroupModel().grant_user_permission(
1227 1225 target_user_group, user, permission_name)
1228 1226 self.user_user_group_permission_ids.append(
1229 1227 (target_user_group.users_group_id, user.user_id))
1230 1228 return permission
1231 1229
1232 1230 def grant_user_group_permission_to_user_group(
1233 1231 self, target_user_group, user_group, permission_name):
1234 1232 permission = UserGroupModel().grant_user_group_permission(
1235 1233 target_user_group, user_group, permission_name)
1236 1234 self.user_group_user_group_permission_ids.append(
1237 1235 (target_user_group.users_group_id, user_group.users_group_id))
1238 1236 return permission
1239 1237
1240 1238 def revoke_user_permission(self, user_name, permission_name):
1241 1239 self._inherit_default_user_permissions(user_name, True)
1242 1240 UserModel().revoke_perm(user_name, permission_name)
1243 1241
1244 1242 def _inherit_default_user_permissions(self, user_name, value):
1245 1243 user = UserModel().get_by_username(user_name)
1246 1244 user.inherit_default_permissions = value
1247 1245 Session().add(user)
1248 1246 Session().commit()
1249 1247
1250 1248 def cleanup(self):
1251 1249 self._cleanup_permissions()
1252 1250 self._cleanup_repos()
1253 1251 self._cleanup_repo_groups()
1254 1252 self._cleanup_user_groups()
1255 1253 self._cleanup_users()
1256 1254
1257 1255 def _cleanup_permissions(self):
1258 1256 if self.user_permissions:
1259 1257 for user_name, permission_name in self.user_permissions:
1260 1258 self.revoke_user_permission(user_name, permission_name)
1261 1259
1262 1260 for permission in self.user_repo_permission_ids:
1263 1261 RepoModel().revoke_user_permission(*permission)
1264 1262
1265 1263 for permission in self.user_group_repo_permission_ids:
1266 1264 RepoModel().revoke_user_group_permission(*permission)
1267 1265
1268 1266 for permission in self.user_repo_group_permission_ids:
1269 1267 RepoGroupModel().revoke_user_permission(*permission)
1270 1268
1271 1269 for permission in self.user_group_repo_group_permission_ids:
1272 1270 RepoGroupModel().revoke_user_group_permission(*permission)
1273 1271
1274 1272 for permission in self.user_user_group_permission_ids:
1275 1273 UserGroupModel().revoke_user_permission(*permission)
1276 1274
1277 1275 for permission in self.user_group_user_group_permission_ids:
1278 1276 UserGroupModel().revoke_user_group_permission(*permission)
1279 1277
1280 1278 def _cleanup_repo_groups(self):
1281 1279 def _repo_group_compare(first_group_id, second_group_id):
1282 1280 """
1283 1281 Gives higher priority to the groups with the most complex paths
1284 1282 """
1285 1283 first_group = RepoGroup.get(first_group_id)
1286 1284 second_group = RepoGroup.get(second_group_id)
1287 1285 first_group_parts = (
1288 1286 len(first_group.group_name.split('/')) if first_group else 0)
1289 1287 second_group_parts = (
1290 1288 len(second_group.group_name.split('/')) if second_group else 0)
1291 1289 return cmp(second_group_parts, first_group_parts)
1292 1290
1293 1291 sorted_repo_group_ids = sorted(
1294 1292 self.repo_group_ids, cmp=_repo_group_compare)
1295 1293 for repo_group_id in sorted_repo_group_ids:
1296 1294 self.fixture.destroy_repo_group(repo_group_id)
1297 1295
1298 1296 def _cleanup_repos(self):
1299 1297 sorted_repos_ids = sorted(self.repos_ids)
1300 1298 for repo_id in sorted_repos_ids:
1301 1299 self.fixture.destroy_repo(repo_id)
1302 1300
1303 1301 def _cleanup_user_groups(self):
1304 1302 def _user_group_compare(first_group_id, second_group_id):
1305 1303 """
1306 1304 Gives higher priority to the groups with the most complex paths
1307 1305 """
1308 1306 first_group = UserGroup.get(first_group_id)
1309 1307 second_group = UserGroup.get(second_group_id)
1310 1308 first_group_parts = (
1311 1309 len(first_group.users_group_name.split('/'))
1312 1310 if first_group else 0)
1313 1311 second_group_parts = (
1314 1312 len(second_group.users_group_name.split('/'))
1315 1313 if second_group else 0)
1316 1314 return cmp(second_group_parts, first_group_parts)
1317 1315
1318 1316 sorted_user_group_ids = sorted(
1319 1317 self.user_group_ids, cmp=_user_group_compare)
1320 1318 for user_group_id in sorted_user_group_ids:
1321 1319 self.fixture.destroy_user_group(user_group_id)
1322 1320
1323 1321 def _cleanup_users(self):
1324 1322 for user_id in self.user_ids:
1325 1323 self.fixture.destroy_user(user_id)
1326 1324
1327 1325
1328 1326 # TODO: Think about moving this into a pytest-pyro package and make it a
1329 1327 # pytest plugin
1330 1328 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1331 1329 def pytest_runtest_makereport(item, call):
1332 1330 """
1333 1331 Adding the remote traceback if the exception has this information.
1334 1332
1335 1333 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1336 1334 to the exception instance.
1337 1335 """
1338 1336 outcome = yield
1339 1337 report = outcome.get_result()
1340 1338 if call.excinfo:
1341 1339 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1342 1340
1343 1341
1344 1342 def _add_vcsserver_remote_traceback(report, exc):
1345 1343 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1346 1344
1347 1345 if vcsserver_traceback:
1348 1346 section = 'VCSServer remote traceback ' + report.when
1349 1347 report.sections.append((section, vcsserver_traceback))
1350 1348
1351 1349
1352 1350 @pytest.fixture(scope='session')
1353 1351 def testrun():
1354 1352 return {
1355 1353 'uuid': uuid.uuid4(),
1356 1354 'start': datetime.datetime.utcnow().isoformat(),
1357 1355 'timestamp': int(time.time()),
1358 1356 }
1359 1357
1360 1358
1361 1359 @pytest.fixture(autouse=True)
1362 1360 def collect_appenlight_stats(request, testrun):
1363 1361 """
1364 1362 This fixture reports memory consumtion of single tests.
1365 1363
1366 1364 It gathers data based on `psutil` and sends them to Appenlight. The option
1367 1365 ``--ae`` has te be used to enable this fixture and the API key for your
1368 1366 application has to be provided in ``--ae-key``.
1369 1367 """
1370 1368 try:
1371 1369 # cygwin cannot have yet psutil support.
1372 1370 import psutil
1373 1371 except ImportError:
1374 1372 return
1375 1373
1376 1374 if not request.config.getoption('--appenlight'):
1377 1375 return
1378 1376 else:
1379 1377 # Only request the pylonsapp fixture if appenlight tracking is
1380 1378 # enabled. This will speed up a test run of unit tests by 2 to 3
1381 1379 # seconds if appenlight is not enabled.
1382 1380 pylonsapp = request.getfuncargvalue("pylonsapp")
1383 1381 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1384 1382 client = AppenlightClient(
1385 1383 url=url,
1386 1384 api_key=request.config.getoption('--appenlight-api-key'),
1387 1385 namespace=request.node.nodeid,
1388 1386 request=str(testrun['uuid']),
1389 1387 testrun=testrun)
1390 1388
1391 1389 client.collect({
1392 1390 'message': "Starting",
1393 1391 })
1394 1392
1395 1393 server_and_port = pylonsapp.config['vcs.server']
1396 1394 protocol = pylonsapp.config['vcs.server.protocol']
1397 1395 server = create_vcsserver_proxy(server_and_port, protocol)
1398 1396 with server:
1399 1397 vcs_pid = server.get_pid()
1400 1398 server.run_gc()
1401 1399 vcs_process = psutil.Process(vcs_pid)
1402 1400 mem = vcs_process.memory_info()
1403 1401 client.tag_before('vcsserver.rss', mem.rss)
1404 1402 client.tag_before('vcsserver.vms', mem.vms)
1405 1403
1406 1404 test_process = psutil.Process()
1407 1405 mem = test_process.memory_info()
1408 1406 client.tag_before('test.rss', mem.rss)
1409 1407 client.tag_before('test.vms', mem.vms)
1410 1408
1411 1409 client.tag_before('time', time.time())
1412 1410
1413 1411 @request.addfinalizer
1414 1412 def send_stats():
1415 1413 client.tag_after('time', time.time())
1416 1414 with server:
1417 1415 gc_stats = server.run_gc()
1418 1416 for tag, value in gc_stats.items():
1419 1417 client.tag_after(tag, value)
1420 1418 mem = vcs_process.memory_info()
1421 1419 client.tag_after('vcsserver.rss', mem.rss)
1422 1420 client.tag_after('vcsserver.vms', mem.vms)
1423 1421
1424 1422 mem = test_process.memory_info()
1425 1423 client.tag_after('test.rss', mem.rss)
1426 1424 client.tag_after('test.vms', mem.vms)
1427 1425
1428 1426 client.collect({
1429 1427 'message': "Finished",
1430 1428 })
1431 1429 client.send_stats()
1432 1430
1433 1431 return client
1434 1432
1435 1433
1436 1434 class AppenlightClient():
1437 1435
1438 1436 url_template = '{url}?protocol_version=0.5'
1439 1437
1440 1438 def __init__(
1441 1439 self, url, api_key, add_server=True, add_timestamp=True,
1442 1440 namespace=None, request=None, testrun=None):
1443 1441 self.url = self.url_template.format(url=url)
1444 1442 self.api_key = api_key
1445 1443 self.add_server = add_server
1446 1444 self.add_timestamp = add_timestamp
1447 1445 self.namespace = namespace
1448 1446 self.request = request
1449 1447 self.server = socket.getfqdn(socket.gethostname())
1450 1448 self.tags_before = {}
1451 1449 self.tags_after = {}
1452 1450 self.stats = []
1453 1451 self.testrun = testrun or {}
1454 1452
1455 1453 def tag_before(self, tag, value):
1456 1454 self.tags_before[tag] = value
1457 1455
1458 1456 def tag_after(self, tag, value):
1459 1457 self.tags_after[tag] = value
1460 1458
1461 1459 def collect(self, data):
1462 1460 if self.add_server:
1463 1461 data.setdefault('server', self.server)
1464 1462 if self.add_timestamp:
1465 1463 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1466 1464 if self.namespace:
1467 1465 data.setdefault('namespace', self.namespace)
1468 1466 if self.request:
1469 1467 data.setdefault('request', self.request)
1470 1468 self.stats.append(data)
1471 1469
1472 1470 def send_stats(self):
1473 1471 tags = [
1474 1472 ('testrun', self.request),
1475 1473 ('testrun.start', self.testrun['start']),
1476 1474 ('testrun.timestamp', self.testrun['timestamp']),
1477 1475 ('test', self.namespace),
1478 1476 ]
1479 1477 for key, value in self.tags_before.items():
1480 1478 tags.append((key + '.before', value))
1481 1479 try:
1482 1480 delta = self.tags_after[key] - value
1483 1481 tags.append((key + '.delta', delta))
1484 1482 except Exception:
1485 1483 pass
1486 1484 for key, value in self.tags_after.items():
1487 1485 tags.append((key + '.after', value))
1488 1486 self.collect({
1489 1487 'message': "Collected tags",
1490 1488 'tags': tags,
1491 1489 })
1492 1490
1493 1491 response = requests.post(
1494 1492 self.url,
1495 1493 headers={
1496 1494 'X-appenlight-api-key': self.api_key},
1497 1495 json=self.stats,
1498 1496 )
1499 1497
1500 1498 if not response.status_code == 200:
1501 1499 pprint.pprint(self.stats)
1502 1500 print response.headers
1503 1501 print response.text
1504 1502 raise Exception('Sending to appenlight failed')
1505 1503
1506 1504
1507 1505 @pytest.fixture
1508 1506 def gist_util(request, pylonsapp):
1509 1507 """
1510 1508 Provides a wired instance of `GistUtility` with integrated cleanup.
1511 1509 """
1512 1510 utility = GistUtility()
1513 1511 request.addfinalizer(utility.cleanup)
1514 1512 return utility
1515 1513
1516 1514
1517 1515 class GistUtility(object):
1518 1516 def __init__(self):
1519 1517 self.fixture = Fixture()
1520 1518 self.gist_ids = []
1521 1519
1522 1520 def create_gist(self, **kwargs):
1523 1521 gist = self.fixture.create_gist(**kwargs)
1524 1522 self.gist_ids.append(gist.gist_id)
1525 1523 return gist
1526 1524
1527 1525 def cleanup(self):
1528 1526 for id_ in self.gist_ids:
1529 1527 self.fixture.destroy_gists(str(id_))
1530 1528
1531 1529
1532 1530 @pytest.fixture
1533 1531 def enabled_backends(request):
1534 1532 backends = request.config.option.backends
1535 1533 return backends[:]
1536 1534
1537 1535
1538 1536 @pytest.fixture
1539 1537 def settings_util(request):
1540 1538 """
1541 1539 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1542 1540 """
1543 1541 utility = SettingsUtility()
1544 1542 request.addfinalizer(utility.cleanup)
1545 1543 return utility
1546 1544
1547 1545
1548 1546 class SettingsUtility(object):
1549 1547 def __init__(self):
1550 1548 self.rhodecode_ui_ids = []
1551 1549 self.rhodecode_setting_ids = []
1552 1550 self.repo_rhodecode_ui_ids = []
1553 1551 self.repo_rhodecode_setting_ids = []
1554 1552
1555 1553 def create_repo_rhodecode_ui(
1556 1554 self, repo, section, value, key=None, active=True, cleanup=True):
1557 1555 key = key or hashlib.sha1(
1558 1556 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1559 1557
1560 1558 setting = RepoRhodeCodeUi()
1561 1559 setting.repository_id = repo.repo_id
1562 1560 setting.ui_section = section
1563 1561 setting.ui_value = value
1564 1562 setting.ui_key = key
1565 1563 setting.ui_active = active
1566 1564 Session().add(setting)
1567 1565 Session().commit()
1568 1566
1569 1567 if cleanup:
1570 1568 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1571 1569 return setting
1572 1570
1573 1571 def create_rhodecode_ui(
1574 1572 self, section, value, key=None, active=True, cleanup=True):
1575 1573 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1576 1574
1577 1575 setting = RhodeCodeUi()
1578 1576 setting.ui_section = section
1579 1577 setting.ui_value = value
1580 1578 setting.ui_key = key
1581 1579 setting.ui_active = active
1582 1580 Session().add(setting)
1583 1581 Session().commit()
1584 1582
1585 1583 if cleanup:
1586 1584 self.rhodecode_ui_ids.append(setting.ui_id)
1587 1585 return setting
1588 1586
1589 1587 def create_repo_rhodecode_setting(
1590 1588 self, repo, name, value, type_, cleanup=True):
1591 1589 setting = RepoRhodeCodeSetting(
1592 1590 repo.repo_id, key=name, val=value, type=type_)
1593 1591 Session().add(setting)
1594 1592 Session().commit()
1595 1593
1596 1594 if cleanup:
1597 1595 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1598 1596 return setting
1599 1597
1600 1598 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1601 1599 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1602 1600 Session().add(setting)
1603 1601 Session().commit()
1604 1602
1605 1603 if cleanup:
1606 1604 self.rhodecode_setting_ids.append(setting.app_settings_id)
1607 1605
1608 1606 return setting
1609 1607
1610 1608 def cleanup(self):
1611 1609 for id_ in self.rhodecode_ui_ids:
1612 1610 setting = RhodeCodeUi.get(id_)
1613 1611 Session().delete(setting)
1614 1612
1615 1613 for id_ in self.rhodecode_setting_ids:
1616 1614 setting = RhodeCodeSetting.get(id_)
1617 1615 Session().delete(setting)
1618 1616
1619 1617 for id_ in self.repo_rhodecode_ui_ids:
1620 1618 setting = RepoRhodeCodeUi.get(id_)
1621 1619 Session().delete(setting)
1622 1620
1623 1621 for id_ in self.repo_rhodecode_setting_ids:
1624 1622 setting = RepoRhodeCodeSetting.get(id_)
1625 1623 Session().delete(setting)
1626 1624
1627 1625 Session().commit()
1628 1626
1629 1627
1630 1628 @pytest.fixture
1631 1629 def no_notifications(request):
1632 1630 notification_patcher = mock.patch(
1633 1631 'rhodecode.model.notification.NotificationModel.create')
1634 1632 notification_patcher.start()
1635 1633 request.addfinalizer(notification_patcher.stop)
1636 1634
1637 1635
1638 1636 @pytest.fixture
1639 1637 def silence_action_logger(request):
1640 1638 notification_patcher = mock.patch(
1641 1639 'rhodecode.lib.utils.action_logger')
1642 1640 notification_patcher.start()
1643 1641 request.addfinalizer(notification_patcher.stop)
1644 1642
1645 1643
1646 1644 @pytest.fixture(scope='session')
1647 1645 def repeat(request):
1648 1646 """
1649 1647 The number of repetitions is based on this fixture.
1650 1648
1651 1649 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1652 1650 tests are not too slow in our default test suite.
1653 1651 """
1654 1652 return request.config.getoption('--repeat')
1655 1653
1656 1654
1657 1655 @pytest.fixture
1658 1656 def rhodecode_fixtures():
1659 1657 return Fixture()
1660 1658
1661 1659
1662 1660 @pytest.fixture
1663 1661 def request_stub():
1664 1662 """
1665 1663 Stub request object.
1666 1664 """
1667 1665 request = pyramid.testing.DummyRequest()
1668 1666 request.scheme = 'https'
1669 1667 return request
1670 1668
1671 1669
1672 1670 @pytest.fixture
1673 1671 def config_stub(request, request_stub):
1674 1672 """
1675 1673 Set up pyramid.testing and return the Configurator.
1676 1674 """
1677 1675 config = pyramid.testing.setUp(request=request_stub)
1678 1676
1679 1677 @request.addfinalizer
1680 1678 def cleanup():
1681 1679 pyramid.testing.tearDown()
1682 1680
1683 1681 return config
1684 1682
1685 1683
1686 1684 @pytest.fixture
1687 1685 def StubIntegrationType():
1688 1686 class _StubIntegrationType(IntegrationTypeBase):
1689 1687 """ Test integration type class """
1690 1688
1691 1689 key = 'test'
1692 1690 display_name = 'Test integration type'
1693 1691 description = 'A test integration type for testing'
1694 1692 icon = 'test_icon_html_image'
1695 1693
1696 1694 def __init__(self, settings):
1697 1695 super(_StubIntegrationType, self).__init__(settings)
1698 1696 self.sent_events = [] # for testing
1699 1697
1700 1698 def send_event(self, event):
1701 1699 self.sent_events.append(event)
1702 1700
1703 1701 def settings_schema(self):
1704 1702 class SettingsSchema(colander.Schema):
1705 1703 test_string_field = colander.SchemaNode(
1706 1704 colander.String(),
1707 1705 missing=colander.required,
1708 1706 title='test string field',
1709 1707 )
1710 1708 test_int_field = colander.SchemaNode(
1711 1709 colander.Int(),
1712 1710 title='some integer setting',
1713 1711 )
1714 1712 return SettingsSchema()
1715 1713
1716 1714
1717 1715 integration_type_registry.register_integration_type(_StubIntegrationType)
1718 1716 return _StubIntegrationType
1719 1717
1720 1718 @pytest.fixture
1721 1719 def stub_integration_settings():
1722 1720 return {
1723 1721 'test_string_field': 'some data',
1724 1722 'test_int_field': 100,
1725 1723 }
1726 1724
1727 1725
1728 1726 @pytest.fixture
1729 1727 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1730 1728 stub_integration_settings):
1731 1729 integration = IntegrationModel().create(
1732 1730 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1733 1731 name='test repo integration',
1734 1732 repo=repo_stub, repo_group=None, child_repos_only=None)
1735 1733
1736 1734 @request.addfinalizer
1737 1735 def cleanup():
1738 1736 IntegrationModel().delete(integration)
1739 1737
1740 1738 return integration
1741 1739
1742 1740
1743 1741 @pytest.fixture
1744 1742 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1745 1743 stub_integration_settings):
1746 1744 integration = IntegrationModel().create(
1747 1745 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1748 1746 name='test repogroup integration',
1749 1747 repo=None, repo_group=test_repo_group, child_repos_only=True)
1750 1748
1751 1749 @request.addfinalizer
1752 1750 def cleanup():
1753 1751 IntegrationModel().delete(integration)
1754 1752
1755 1753 return integration
1756 1754
1757 1755
1758 1756 @pytest.fixture
1759 1757 def repogroup_recursive_integration_stub(request, test_repo_group,
1760 1758 StubIntegrationType, stub_integration_settings):
1761 1759 integration = IntegrationModel().create(
1762 1760 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1763 1761 name='test recursive repogroup integration',
1764 1762 repo=None, repo_group=test_repo_group, child_repos_only=False)
1765 1763
1766 1764 @request.addfinalizer
1767 1765 def cleanup():
1768 1766 IntegrationModel().delete(integration)
1769 1767
1770 1768 return integration
1771 1769
1772 1770
1773 1771 @pytest.fixture
1774 1772 def global_integration_stub(request, StubIntegrationType,
1775 1773 stub_integration_settings):
1776 1774 integration = IntegrationModel().create(
1777 1775 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1778 1776 name='test global integration',
1779 1777 repo=None, repo_group=None, child_repos_only=None)
1780 1778
1781 1779 @request.addfinalizer
1782 1780 def cleanup():
1783 1781 IntegrationModel().delete(integration)
1784 1782
1785 1783 return integration
1786 1784
1787 1785
1788 1786 @pytest.fixture
1789 1787 def root_repos_integration_stub(request, StubIntegrationType,
1790 1788 stub_integration_settings):
1791 1789 integration = IntegrationModel().create(
1792 1790 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1793 1791 name='test global integration',
1794 1792 repo=None, repo_group=None, child_repos_only=True)
1795 1793
1796 1794 @request.addfinalizer
1797 1795 def cleanup():
1798 1796 IntegrationModel().delete(integration)
1799 1797
1800 1798 return integration
1801 1799
1802 1800
1803 1801 @pytest.fixture
1804 1802 def local_dt_to_utc():
1805 1803 def _factory(dt):
1806 1804 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1807 1805 dateutil.tz.tzutc()).replace(tzinfo=None)
1808 1806 return _factory
General Comments 0
You need to be logged in to leave comments. Login now