##// END OF EJS Templates
git-lfs: streaming support for file upload....
marcink -
r1566:63143d9d default
parent child Browse files
Show More
@@ -1,147 +1,162 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2014-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Implementation of the scm_app interface using raw HTTP communication.
23 23 """
24 24
25 25 import base64
26 26 import logging
27 27 import urlparse
28 28 import wsgiref.util
29 29
30 30 import msgpack
31 31 import requests
32 32 import webob.request
33 33
34 34 import rhodecode
35 35
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 def create_git_wsgi_app(repo_path, repo_name, config):
41 41 url = _vcs_streaming_url() + 'git/'
42 42 return VcsHttpProxy(url, repo_path, repo_name, config)
43 43
44 44
45 45 def create_hg_wsgi_app(repo_path, repo_name, config):
46 46 url = _vcs_streaming_url() + 'hg/'
47 47 return VcsHttpProxy(url, repo_path, repo_name, config)
48 48
49 49
50 50 def _vcs_streaming_url():
51 51 template = 'http://{}/stream/'
52 52 return template.format(rhodecode.CONFIG['vcs.server'])
53 53
54 54
55 55 # TODO: johbo: Avoid the global.
56 56 session = requests.Session()
57 57 # Requests speedup, avoid reading .netrc and similar
58 58 session.trust_env = False
59 59
60 60 # prevent urllib3 spawning our logs.
61 61 logging.getLogger("requests.packages.urllib3.connectionpool").setLevel(
62 62 logging.WARNING)
63 63
64 64
65 65 class VcsHttpProxy(object):
66 66 """
67 67 A WSGI application which proxies vcs requests.
68 68
69 69 The goal is to shuffle the data around without touching it. The only
70 70 exception is the extra data from the config object which we send to the
71 71 server as well.
72 72 """
73 73
74 74 def __init__(self, url, repo_path, repo_name, config):
75 75 """
76 76 :param str url: The URL of the VCSServer to call.
77 77 """
78 78 self._url = url
79 79 self._repo_name = repo_name
80 80 self._repo_path = repo_path
81 81 self._config = config
82 82 log.debug(
83 83 "Creating VcsHttpProxy for repo %s, url %s",
84 84 repo_name, url)
85 85
86 86 def __call__(self, environ, start_response):
87 87 config = msgpack.packb(self._config)
88 88 request = webob.request.Request(environ)
89 89 request_headers = request.headers
90 90 request_headers.update({
91 91 # TODO: johbo: Remove this, rely on URL path only
92 92 'X-RC-Repo-Name': self._repo_name,
93 93 'X-RC-Repo-Path': self._repo_path,
94 94 'X-RC-Path-Info': environ['PATH_INFO'],
95 95 # TODO: johbo: Avoid encoding and put this into payload?
96 96 'X-RC-Repo-Config': base64.b64encode(config),
97 97 'X-RC-Locked-Status-Code': rhodecode.CONFIG.get('lock_ret_code')
98 98 })
99 99
100 100 method = environ['REQUEST_METHOD']
101 101
102 102 # Preserve the query string
103 103 url = self._url
104 104 url = urlparse.urljoin(url, self._repo_name)
105 105 if environ.get('QUERY_STRING'):
106 106 url += '?' + environ['QUERY_STRING']
107 107
108 108 response = session.request(
109 109 method, url,
110 110 data=_maybe_stream_request(environ),
111 111 headers=request_headers,
112 112 stream=True)
113 113
114 114 # Preserve the headers of the response, except hop_by_hop ones
115 115 response_headers = [
116 116 (h, v) for h, v in response.headers.items()
117 117 if not wsgiref.util.is_hop_by_hop(h)
118 118 ]
119 119
120 120 # Build status argument for start_reponse callable.
121 121 status = '{status_code} {reason_phrase}'.format(
122 122 status_code=response.status_code,
123 123 reason_phrase=response.reason)
124 124
125 125 start_response(status, response_headers)
126 126 return _maybe_stream_response(response)
127 127
128 128
129 129 def _maybe_stream_request(environ):
130 if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked':
130 path = environ['PATH_INFO']
131 stream = _is_request_chunked(environ)
132 log.debug('handling request `%s` with stream support: %s', path, stream)
133
134 if stream:
131 135 return environ['wsgi.input']
132 136 else:
133 137 return environ['wsgi.input'].read()
134 138
135 139
140 def _is_request_chunked(environ):
141 stream = environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked'
142 if not stream:
143 # git lfs should stream for PUT requests which are upload
144 stream = ('git-lfs' in environ.get('HTTP_USER_AGENT', '')
145 and environ['REQUEST_METHOD'] == 'PUT')
146 return stream
147
148
136 149 def _maybe_stream_response(response):
137 150 """
138 151 Try to generate chunks from the response if it is chunked.
139 152 """
140 if _is_chunked(response):
153 stream = _is_chunked(response)
154 log.debug('returning response with stream: %s', stream)
155 if stream:
141 156 return response.raw.read_chunked()
142 157 else:
143 158 return [response.content]
144 159
145 160
146 161 def _is_chunked(response):
147 162 return response.headers.get('Transfer-Encoding', '') == 'chunked'
@@ -1,1806 +1,1808 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33
34 34 import mock
35 35 import pyramid.testing
36 36 import pytest
37 37 import colander
38 38 import requests
39 39
40 40 import rhodecode
41 41 from rhodecode.lib.utils2 import AttributeDict
42 42 from rhodecode.model.changeset_status import ChangesetStatusModel
43 43 from rhodecode.model.comment import CommentsModel
44 44 from rhodecode.model.db import (
45 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
46 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
47 47 from rhodecode.model.meta import Session
48 48 from rhodecode.model.pull_request import PullRequestModel
49 49 from rhodecode.model.repo import RepoModel
50 50 from rhodecode.model.repo_group import RepoGroupModel
51 51 from rhodecode.model.user import UserModel
52 52 from rhodecode.model.settings import VcsSettingsModel
53 53 from rhodecode.model.user_group import UserGroupModel
54 54 from rhodecode.model.integration import IntegrationModel
55 55 from rhodecode.integrations import integration_type_registry
56 56 from rhodecode.integrations.types.base import IntegrationTypeBase
57 57 from rhodecode.lib.utils import repo2db_mapper
58 58 from rhodecode.lib.vcs import create_vcsserver_proxy
59 59 from rhodecode.lib.vcs.backends import get_backend
60 60 from rhodecode.lib.vcs.nodes import FileNode
61 61 from rhodecode.tests import (
62 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
63 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
64 64 TEST_USER_REGULAR_PASS)
65 65 from rhodecode.tests.utils import CustomTestApp
66 66 from rhodecode.tests.fixture import Fixture
67 67
68 68
69 69 def _split_comma(value):
70 70 return value.split(',')
71 71
72 72
73 73 def pytest_addoption(parser):
74 74 parser.addoption(
75 75 '--keep-tmp-path', action='store_true',
76 76 help="Keep the test temporary directories")
77 77 parser.addoption(
78 78 '--backends', action='store', type=_split_comma,
79 79 default=['git', 'hg', 'svn'],
80 80 help="Select which backends to test for backend specific tests.")
81 81 parser.addoption(
82 82 '--dbs', action='store', type=_split_comma,
83 83 default=['sqlite'],
84 84 help="Select which database to test for database specific tests. "
85 85 "Possible options are sqlite,postgres,mysql")
86 86 parser.addoption(
87 87 '--appenlight', '--ae', action='store_true',
88 88 help="Track statistics in appenlight.")
89 89 parser.addoption(
90 90 '--appenlight-api-key', '--ae-key',
91 91 help="API key for Appenlight.")
92 92 parser.addoption(
93 93 '--appenlight-url', '--ae-url',
94 94 default="https://ae.rhodecode.com",
95 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
96 96 parser.addoption(
97 97 '--sqlite-connection-string', action='store',
98 98 default='', help="Connection string for the dbs tests with SQLite")
99 99 parser.addoption(
100 100 '--postgres-connection-string', action='store',
101 101 default='', help="Connection string for the dbs tests with Postgres")
102 102 parser.addoption(
103 103 '--mysql-connection-string', action='store',
104 104 default='', help="Connection string for the dbs tests with MySQL")
105 105 parser.addoption(
106 106 '--repeat', type=int, default=100,
107 107 help="Number of repetitions in performance tests.")
108 108
109 109
110 110 def pytest_configure(config):
111 111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
112 112 from rhodecode.config import patches
113 113 patches.kombu_1_5_1_python_2_7_11()
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 142 backends = metafunc.function.backends.args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 185 return http_environ(
186 186 http_host_stub=http_host_stub())
187 187
188 188
189 189 @pytest.fixture
190 190 def http_host_stub():
191 191 """
192 192 Value of HTTP_HOST in the test run.
193 193 """
194 194 return 'test.example.com:80'
195 195
196 196
197 197 @pytest.fixture
198 198 def http_environ(http_host_stub):
199 199 """
200 200 HTTP extra environ keys.
201 201
202 202 User by the test application and as well for setting up the pylons
203 203 environment. In the case of the fixture "app" it should be possible
204 204 to override this for a specific test case.
205 205 """
206 206 return {
207 207 'SERVER_NAME': http_host_stub.split(':')[0],
208 208 'SERVER_PORT': http_host_stub.split(':')[1],
209 209 'HTTP_HOST': http_host_stub,
210 'HTTP_USER_AGENT': 'rc-test-agent',
211 'REQUEST_METHOD': 'GET'
210 212 }
211 213
212 214
213 215 @pytest.fixture(scope='function')
214 216 def app(request, pylonsapp, http_environ):
215 217 app = CustomTestApp(
216 218 pylonsapp,
217 219 extra_environ=http_environ)
218 220 if request.cls:
219 221 request.cls.app = app
220 222 return app
221 223
222 224
223 225 @pytest.fixture(scope='session')
224 226 def app_settings(pylonsapp, pylons_config):
225 227 """
226 228 Settings dictionary used to create the app.
227 229
228 230 Parses the ini file and passes the result through the sanitize and apply
229 231 defaults mechanism in `rhodecode.config.middleware`.
230 232 """
231 233 from paste.deploy.loadwsgi import loadcontext, APP
232 234 from rhodecode.config.middleware import (
233 235 sanitize_settings_and_apply_defaults)
234 236 context = loadcontext(APP, 'config:' + pylons_config)
235 237 settings = sanitize_settings_and_apply_defaults(context.config())
236 238 return settings
237 239
238 240
239 241 @pytest.fixture(scope='session')
240 242 def db(app_settings):
241 243 """
242 244 Initializes the database connection.
243 245
244 246 It uses the same settings which are used to create the ``pylonsapp`` or
245 247 ``app`` fixtures.
246 248 """
247 249 from rhodecode.config.utils import initialize_database
248 250 initialize_database(app_settings)
249 251
250 252
251 253 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
252 254
253 255
254 256 def _autologin_user(app, *args):
255 257 session = login_user_session(app, *args)
256 258 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
257 259 return LoginData(csrf_token, session['rhodecode_user'])
258 260
259 261
260 262 @pytest.fixture
261 263 def autologin_user(app):
262 264 """
263 265 Utility fixture which makes sure that the admin user is logged in
264 266 """
265 267 return _autologin_user(app)
266 268
267 269
268 270 @pytest.fixture
269 271 def autologin_regular_user(app):
270 272 """
271 273 Utility fixture which makes sure that the regular user is logged in
272 274 """
273 275 return _autologin_user(
274 276 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
275 277
276 278
277 279 @pytest.fixture(scope='function')
278 280 def csrf_token(request, autologin_user):
279 281 return autologin_user.csrf_token
280 282
281 283
282 284 @pytest.fixture(scope='function')
283 285 def xhr_header(request):
284 286 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
285 287
286 288
287 289 @pytest.fixture
288 290 def real_crypto_backend(monkeypatch):
289 291 """
290 292 Switch the production crypto backend on for this test.
291 293
292 294 During the test run the crypto backend is replaced with a faster
293 295 implementation based on the MD5 algorithm.
294 296 """
295 297 monkeypatch.setattr(rhodecode, 'is_test', False)
296 298
297 299
298 300 @pytest.fixture(scope='class')
299 301 def index_location(request, pylonsapp):
300 302 index_location = pylonsapp.config['app_conf']['search.location']
301 303 if request.cls:
302 304 request.cls.index_location = index_location
303 305 return index_location
304 306
305 307
306 308 @pytest.fixture(scope='session', autouse=True)
307 309 def tests_tmp_path(request):
308 310 """
309 311 Create temporary directory to be used during the test session.
310 312 """
311 313 if not os.path.exists(TESTS_TMP_PATH):
312 314 os.makedirs(TESTS_TMP_PATH)
313 315
314 316 if not request.config.getoption('--keep-tmp-path'):
315 317 @request.addfinalizer
316 318 def remove_tmp_path():
317 319 shutil.rmtree(TESTS_TMP_PATH)
318 320
319 321 return TESTS_TMP_PATH
320 322
321 323
322 324 @pytest.fixture
323 325 def test_repo_group(request):
324 326 """
325 327 Create a temporary repository group, and destroy it after
326 328 usage automatically
327 329 """
328 330 fixture = Fixture()
329 331 repogroupid = 'test_repo_group_%s' % int(time.time())
330 332 repo_group = fixture.create_repo_group(repogroupid)
331 333
332 334 def _cleanup():
333 335 fixture.destroy_repo_group(repogroupid)
334 336
335 337 request.addfinalizer(_cleanup)
336 338 return repo_group
337 339
338 340
339 341 @pytest.fixture
340 342 def test_user_group(request):
341 343 """
342 344 Create a temporary user group, and destroy it after
343 345 usage automatically
344 346 """
345 347 fixture = Fixture()
346 348 usergroupid = 'test_user_group_%s' % int(time.time())
347 349 user_group = fixture.create_user_group(usergroupid)
348 350
349 351 def _cleanup():
350 352 fixture.destroy_user_group(user_group)
351 353
352 354 request.addfinalizer(_cleanup)
353 355 return user_group
354 356
355 357
356 358 @pytest.fixture(scope='session')
357 359 def test_repo(request):
358 360 container = TestRepoContainer()
359 361 request.addfinalizer(container._cleanup)
360 362 return container
361 363
362 364
363 365 class TestRepoContainer(object):
364 366 """
365 367 Container for test repositories which are used read only.
366 368
367 369 Repositories will be created on demand and re-used during the lifetime
368 370 of this object.
369 371
370 372 Usage to get the svn test repository "minimal"::
371 373
372 374 test_repo = TestContainer()
373 375 repo = test_repo('minimal', 'svn')
374 376
375 377 """
376 378
377 379 dump_extractors = {
378 380 'git': utils.extract_git_repo_from_dump,
379 381 'hg': utils.extract_hg_repo_from_dump,
380 382 'svn': utils.extract_svn_repo_from_dump,
381 383 }
382 384
383 385 def __init__(self):
384 386 self._cleanup_repos = []
385 387 self._fixture = Fixture()
386 388 self._repos = {}
387 389
388 390 def __call__(self, dump_name, backend_alias):
389 391 key = (dump_name, backend_alias)
390 392 if key not in self._repos:
391 393 repo = self._create_repo(dump_name, backend_alias)
392 394 self._repos[key] = repo.repo_id
393 395 return Repository.get(self._repos[key])
394 396
395 397 def _create_repo(self, dump_name, backend_alias):
396 398 repo_name = '%s-%s' % (backend_alias, dump_name)
397 399 backend_class = get_backend(backend_alias)
398 400 dump_extractor = self.dump_extractors[backend_alias]
399 401 repo_path = dump_extractor(dump_name, repo_name)
400 402 vcs_repo = backend_class(repo_path)
401 403 repo2db_mapper({repo_name: vcs_repo})
402 404 repo = RepoModel().get_by_repo_name(repo_name)
403 405 self._cleanup_repos.append(repo_name)
404 406 return repo
405 407
406 408 def _cleanup(self):
407 409 for repo_name in reversed(self._cleanup_repos):
408 410 self._fixture.destroy_repo(repo_name)
409 411
410 412
411 413 @pytest.fixture
412 414 def backend(request, backend_alias, pylonsapp, test_repo):
413 415 """
414 416 Parametrized fixture which represents a single backend implementation.
415 417
416 418 It respects the option `--backends` to focus the test run on specific
417 419 backend implementations.
418 420
419 421 It also supports `pytest.mark.xfail_backends` to mark tests as failing
420 422 for specific backends. This is intended as a utility for incremental
421 423 development of a new backend implementation.
422 424 """
423 425 if backend_alias not in request.config.getoption('--backends'):
424 426 pytest.skip("Backend %s not selected." % (backend_alias, ))
425 427
426 428 utils.check_xfail_backends(request.node, backend_alias)
427 429 utils.check_skip_backends(request.node, backend_alias)
428 430
429 431 repo_name = 'vcs_test_%s' % (backend_alias, )
430 432 backend = Backend(
431 433 alias=backend_alias,
432 434 repo_name=repo_name,
433 435 test_name=request.node.name,
434 436 test_repo_container=test_repo)
435 437 request.addfinalizer(backend.cleanup)
436 438 return backend
437 439
438 440
439 441 @pytest.fixture
440 442 def backend_git(request, pylonsapp, test_repo):
441 443 return backend(request, 'git', pylonsapp, test_repo)
442 444
443 445
444 446 @pytest.fixture
445 447 def backend_hg(request, pylonsapp, test_repo):
446 448 return backend(request, 'hg', pylonsapp, test_repo)
447 449
448 450
449 451 @pytest.fixture
450 452 def backend_svn(request, pylonsapp, test_repo):
451 453 return backend(request, 'svn', pylonsapp, test_repo)
452 454
453 455
454 456 @pytest.fixture
455 457 def backend_random(backend_git):
456 458 """
457 459 Use this to express that your tests need "a backend.
458 460
459 461 A few of our tests need a backend, so that we can run the code. This
460 462 fixture is intended to be used for such cases. It will pick one of the
461 463 backends and run the tests.
462 464
463 465 The fixture `backend` would run the test multiple times for each
464 466 available backend which is a pure waste of time if the test is
465 467 independent of the backend type.
466 468 """
467 469 # TODO: johbo: Change this to pick a random backend
468 470 return backend_git
469 471
470 472
471 473 @pytest.fixture
472 474 def backend_stub(backend_git):
473 475 """
474 476 Use this to express that your tests need a backend stub
475 477
476 478 TODO: mikhail: Implement a real stub logic instead of returning
477 479 a git backend
478 480 """
479 481 return backend_git
480 482
481 483
482 484 @pytest.fixture
483 485 def repo_stub(backend_stub):
484 486 """
485 487 Use this to express that your tests need a repository stub
486 488 """
487 489 return backend_stub.create_repo()
488 490
489 491
490 492 class Backend(object):
491 493 """
492 494 Represents the test configuration for one supported backend
493 495
494 496 Provides easy access to different test repositories based on
495 497 `__getitem__`. Such repositories will only be created once per test
496 498 session.
497 499 """
498 500
499 501 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
500 502 _master_repo = None
501 503 _commit_ids = {}
502 504
503 505 def __init__(self, alias, repo_name, test_name, test_repo_container):
504 506 self.alias = alias
505 507 self.repo_name = repo_name
506 508 self._cleanup_repos = []
507 509 self._test_name = test_name
508 510 self._test_repo_container = test_repo_container
509 511 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
510 512 # Fixture will survive in the end.
511 513 self._fixture = Fixture()
512 514
513 515 def __getitem__(self, key):
514 516 return self._test_repo_container(key, self.alias)
515 517
516 518 @property
517 519 def repo(self):
518 520 """
519 521 Returns the "current" repository. This is the vcs_test repo or the
520 522 last repo which has been created with `create_repo`.
521 523 """
522 524 from rhodecode.model.db import Repository
523 525 return Repository.get_by_repo_name(self.repo_name)
524 526
525 527 @property
526 528 def default_branch_name(self):
527 529 VcsRepository = get_backend(self.alias)
528 530 return VcsRepository.DEFAULT_BRANCH_NAME
529 531
530 532 @property
531 533 def default_head_id(self):
532 534 """
533 535 Returns the default head id of the underlying backend.
534 536
535 537 This will be the default branch name in case the backend does have a
536 538 default branch. In the other cases it will point to a valid head
537 539 which can serve as the base to create a new commit on top of it.
538 540 """
539 541 vcsrepo = self.repo.scm_instance()
540 542 head_id = (
541 543 vcsrepo.DEFAULT_BRANCH_NAME or
542 544 vcsrepo.commit_ids[-1])
543 545 return head_id
544 546
545 547 @property
546 548 def commit_ids(self):
547 549 """
548 550 Returns the list of commits for the last created repository
549 551 """
550 552 return self._commit_ids
551 553
552 554 def create_master_repo(self, commits):
553 555 """
554 556 Create a repository and remember it as a template.
555 557
556 558 This allows to easily create derived repositories to construct
557 559 more complex scenarios for diff, compare and pull requests.
558 560
559 561 Returns a commit map which maps from commit message to raw_id.
560 562 """
561 563 self._master_repo = self.create_repo(commits=commits)
562 564 return self._commit_ids
563 565
564 566 def create_repo(
565 567 self, commits=None, number_of_commits=0, heads=None,
566 568 name_suffix=u'', **kwargs):
567 569 """
568 570 Create a repository and record it for later cleanup.
569 571
570 572 :param commits: Optional. A sequence of dict instances.
571 573 Will add a commit per entry to the new repository.
572 574 :param number_of_commits: Optional. If set to a number, this number of
573 575 commits will be added to the new repository.
574 576 :param heads: Optional. Can be set to a sequence of of commit
575 577 names which shall be pulled in from the master repository.
576 578
577 579 """
578 580 self.repo_name = self._next_repo_name() + name_suffix
579 581 repo = self._fixture.create_repo(
580 582 self.repo_name, repo_type=self.alias, **kwargs)
581 583 self._cleanup_repos.append(repo.repo_name)
582 584
583 585 commits = commits or [
584 586 {'message': 'Commit %s of %s' % (x, self.repo_name)}
585 587 for x in xrange(number_of_commits)]
586 588 self._add_commits_to_repo(repo.scm_instance(), commits)
587 589 if heads:
588 590 self.pull_heads(repo, heads)
589 591
590 592 return repo
591 593
592 594 def pull_heads(self, repo, heads):
593 595 """
594 596 Make sure that repo contains all commits mentioned in `heads`
595 597 """
596 598 vcsmaster = self._master_repo.scm_instance()
597 599 vcsrepo = repo.scm_instance()
598 600 vcsrepo.config.clear_section('hooks')
599 601 commit_ids = [self._commit_ids[h] for h in heads]
600 602 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
601 603
602 604 def create_fork(self):
603 605 repo_to_fork = self.repo_name
604 606 self.repo_name = self._next_repo_name()
605 607 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
606 608 self._cleanup_repos.append(self.repo_name)
607 609 return repo
608 610
609 611 def new_repo_name(self, suffix=u''):
610 612 self.repo_name = self._next_repo_name() + suffix
611 613 self._cleanup_repos.append(self.repo_name)
612 614 return self.repo_name
613 615
614 616 def _next_repo_name(self):
615 617 return u"%s_%s" % (
616 618 self.invalid_repo_name.sub(u'_', self._test_name),
617 619 len(self._cleanup_repos))
618 620
619 621 def ensure_file(self, filename, content='Test content\n'):
620 622 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
621 623 commits = [
622 624 {'added': [
623 625 FileNode(filename, content=content),
624 626 ]},
625 627 ]
626 628 self._add_commits_to_repo(self.repo.scm_instance(), commits)
627 629
628 630 def enable_downloads(self):
629 631 repo = self.repo
630 632 repo.enable_downloads = True
631 633 Session().add(repo)
632 634 Session().commit()
633 635
634 636 def cleanup(self):
635 637 for repo_name in reversed(self._cleanup_repos):
636 638 self._fixture.destroy_repo(repo_name)
637 639
638 640 def _add_commits_to_repo(self, repo, commits):
639 641 commit_ids = _add_commits_to_repo(repo, commits)
640 642 if not commit_ids:
641 643 return
642 644 self._commit_ids = commit_ids
643 645
644 646 # Creating refs for Git to allow fetching them from remote repository
645 647 if self.alias == 'git':
646 648 refs = {}
647 649 for message in self._commit_ids:
648 650 # TODO: mikhail: do more special chars replacements
649 651 ref_name = 'refs/test-refs/{}'.format(
650 652 message.replace(' ', ''))
651 653 refs[ref_name] = self._commit_ids[message]
652 654 self._create_refs(repo, refs)
653 655
654 656 def _create_refs(self, repo, refs):
655 657 for ref_name in refs:
656 658 repo.set_refs(ref_name, refs[ref_name])
657 659
658 660
659 661 @pytest.fixture
660 662 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
661 663 """
662 664 Parametrized fixture which represents a single vcs backend implementation.
663 665
664 666 See the fixture `backend` for more details. This one implements the same
665 667 concept, but on vcs level. So it does not provide model instances etc.
666 668
667 669 Parameters are generated dynamically, see :func:`pytest_generate_tests`
668 670 for how this works.
669 671 """
670 672 if backend_alias not in request.config.getoption('--backends'):
671 673 pytest.skip("Backend %s not selected." % (backend_alias, ))
672 674
673 675 utils.check_xfail_backends(request.node, backend_alias)
674 676 utils.check_skip_backends(request.node, backend_alias)
675 677
676 678 repo_name = 'vcs_test_%s' % (backend_alias, )
677 679 repo_path = os.path.join(tests_tmp_path, repo_name)
678 680 backend = VcsBackend(
679 681 alias=backend_alias,
680 682 repo_path=repo_path,
681 683 test_name=request.node.name,
682 684 test_repo_container=test_repo)
683 685 request.addfinalizer(backend.cleanup)
684 686 return backend
685 687
686 688
687 689 @pytest.fixture
688 690 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
689 691 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
690 692
691 693
692 694 @pytest.fixture
693 695 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
694 696 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
695 697
696 698
697 699 @pytest.fixture
698 700 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
699 701 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
700 702
701 703
702 704 @pytest.fixture
703 705 def vcsbackend_random(vcsbackend_git):
704 706 """
705 707 Use this to express that your tests need "a vcsbackend".
706 708
707 709 The fixture `vcsbackend` would run the test multiple times for each
708 710 available vcs backend which is a pure waste of time if the test is
709 711 independent of the vcs backend type.
710 712 """
711 713 # TODO: johbo: Change this to pick a random backend
712 714 return vcsbackend_git
713 715
714 716
715 717 @pytest.fixture
716 718 def vcsbackend_stub(vcsbackend_git):
717 719 """
718 720 Use this to express that your test just needs a stub of a vcsbackend.
719 721
720 722 Plan is to eventually implement an in-memory stub to speed tests up.
721 723 """
722 724 return vcsbackend_git
723 725
724 726
725 727 class VcsBackend(object):
726 728 """
727 729 Represents the test configuration for one supported vcs backend.
728 730 """
729 731
730 732 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
731 733
732 734 def __init__(self, alias, repo_path, test_name, test_repo_container):
733 735 self.alias = alias
734 736 self._repo_path = repo_path
735 737 self._cleanup_repos = []
736 738 self._test_name = test_name
737 739 self._test_repo_container = test_repo_container
738 740
739 741 def __getitem__(self, key):
740 742 return self._test_repo_container(key, self.alias).scm_instance()
741 743
742 744 @property
743 745 def repo(self):
744 746 """
745 747 Returns the "current" repository. This is the vcs_test repo of the last
746 748 repo which has been created.
747 749 """
748 750 Repository = get_backend(self.alias)
749 751 return Repository(self._repo_path)
750 752
751 753 @property
752 754 def backend(self):
753 755 """
754 756 Returns the backend implementation class.
755 757 """
756 758 return get_backend(self.alias)
757 759
758 760 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
759 761 repo_name = self._next_repo_name()
760 762 self._repo_path = get_new_dir(repo_name)
761 763 repo_class = get_backend(self.alias)
762 764 src_url = None
763 765 if _clone_repo:
764 766 src_url = _clone_repo.path
765 767 repo = repo_class(self._repo_path, create=True, src_url=src_url)
766 768 self._cleanup_repos.append(repo)
767 769
768 770 commits = commits or [
769 771 {'message': 'Commit %s of %s' % (x, repo_name)}
770 772 for x in xrange(number_of_commits)]
771 773 _add_commits_to_repo(repo, commits)
772 774 return repo
773 775
774 776 def clone_repo(self, repo):
775 777 return self.create_repo(_clone_repo=repo)
776 778
777 779 def cleanup(self):
778 780 for repo in self._cleanup_repos:
779 781 shutil.rmtree(repo.path)
780 782
781 783 def new_repo_path(self):
782 784 repo_name = self._next_repo_name()
783 785 self._repo_path = get_new_dir(repo_name)
784 786 return self._repo_path
785 787
786 788 def _next_repo_name(self):
787 789 return "%s_%s" % (
788 790 self.invalid_repo_name.sub('_', self._test_name),
789 791 len(self._cleanup_repos))
790 792
791 793 def add_file(self, repo, filename, content='Test content\n'):
792 794 imc = repo.in_memory_commit
793 795 imc.add(FileNode(filename, content=content))
794 796 imc.commit(
795 797 message=u'Automatic commit from vcsbackend fixture',
796 798 author=u'Automatic')
797 799
798 800 def ensure_file(self, filename, content='Test content\n'):
799 801 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
800 802 self.add_file(self.repo, filename, content)
801 803
802 804
803 805 def _add_commits_to_repo(vcs_repo, commits):
804 806 commit_ids = {}
805 807 if not commits:
806 808 return commit_ids
807 809
808 810 imc = vcs_repo.in_memory_commit
809 811 commit = None
810 812
811 813 for idx, commit in enumerate(commits):
812 814 message = unicode(commit.get('message', 'Commit %s' % idx))
813 815
814 816 for node in commit.get('added', []):
815 817 imc.add(FileNode(node.path, content=node.content))
816 818 for node in commit.get('changed', []):
817 819 imc.change(FileNode(node.path, content=node.content))
818 820 for node in commit.get('removed', []):
819 821 imc.remove(FileNode(node.path))
820 822
821 823 parents = [
822 824 vcs_repo.get_commit(commit_id=commit_ids[p])
823 825 for p in commit.get('parents', [])]
824 826
825 827 operations = ('added', 'changed', 'removed')
826 828 if not any((commit.get(o) for o in operations)):
827 829 imc.add(FileNode('file_%s' % idx, content=message))
828 830
829 831 commit = imc.commit(
830 832 message=message,
831 833 author=unicode(commit.get('author', 'Automatic')),
832 834 date=commit.get('date'),
833 835 branch=commit.get('branch'),
834 836 parents=parents)
835 837
836 838 commit_ids[commit.message] = commit.raw_id
837 839
838 840 return commit_ids
839 841
840 842
841 843 @pytest.fixture
842 844 def reposerver(request):
843 845 """
844 846 Allows to serve a backend repository
845 847 """
846 848
847 849 repo_server = RepoServer()
848 850 request.addfinalizer(repo_server.cleanup)
849 851 return repo_server
850 852
851 853
852 854 class RepoServer(object):
853 855 """
854 856 Utility to serve a local repository for the duration of a test case.
855 857
856 858 Supports only Subversion so far.
857 859 """
858 860
859 861 url = None
860 862
861 863 def __init__(self):
862 864 self._cleanup_servers = []
863 865
864 866 def serve(self, vcsrepo):
865 867 if vcsrepo.alias != 'svn':
866 868 raise TypeError("Backend %s not supported" % vcsrepo.alias)
867 869
868 870 proc = subprocess32.Popen(
869 871 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
870 872 '--root', vcsrepo.path])
871 873 self._cleanup_servers.append(proc)
872 874 self.url = 'svn://localhost'
873 875
874 876 def cleanup(self):
875 877 for proc in self._cleanup_servers:
876 878 proc.terminate()
877 879
878 880
879 881 @pytest.fixture
880 882 def pr_util(backend, request):
881 883 """
882 884 Utility for tests of models and for functional tests around pull requests.
883 885
884 886 It gives an instance of :class:`PRTestUtility` which provides various
885 887 utility methods around one pull request.
886 888
887 889 This fixture uses `backend` and inherits its parameterization.
888 890 """
889 891
890 892 util = PRTestUtility(backend)
891 893
892 894 @request.addfinalizer
893 895 def cleanup():
894 896 util.cleanup()
895 897
896 898 return util
897 899
898 900
899 901 class PRTestUtility(object):
900 902
901 903 pull_request = None
902 904 pull_request_id = None
903 905 mergeable_patcher = None
904 906 mergeable_mock = None
905 907 notification_patcher = None
906 908
907 909 def __init__(self, backend):
908 910 self.backend = backend
909 911
910 912 def create_pull_request(
911 913 self, commits=None, target_head=None, source_head=None,
912 914 revisions=None, approved=False, author=None, mergeable=False,
913 915 enable_notifications=True, name_suffix=u'', reviewers=None,
914 916 title=u"Test", description=u"Description"):
915 917 self.set_mergeable(mergeable)
916 918 if not enable_notifications:
917 919 # mock notification side effect
918 920 self.notification_patcher = mock.patch(
919 921 'rhodecode.model.notification.NotificationModel.create')
920 922 self.notification_patcher.start()
921 923
922 924 if not self.pull_request:
923 925 if not commits:
924 926 commits = [
925 927 {'message': 'c1'},
926 928 {'message': 'c2'},
927 929 {'message': 'c3'},
928 930 ]
929 931 target_head = 'c1'
930 932 source_head = 'c2'
931 933 revisions = ['c2']
932 934
933 935 self.commit_ids = self.backend.create_master_repo(commits)
934 936 self.target_repository = self.backend.create_repo(
935 937 heads=[target_head], name_suffix=name_suffix)
936 938 self.source_repository = self.backend.create_repo(
937 939 heads=[source_head], name_suffix=name_suffix)
938 940 self.author = author or UserModel().get_by_username(
939 941 TEST_USER_ADMIN_LOGIN)
940 942
941 943 model = PullRequestModel()
942 944 self.create_parameters = {
943 945 'created_by': self.author,
944 946 'source_repo': self.source_repository.repo_name,
945 947 'source_ref': self._default_branch_reference(source_head),
946 948 'target_repo': self.target_repository.repo_name,
947 949 'target_ref': self._default_branch_reference(target_head),
948 950 'revisions': [self.commit_ids[r] for r in revisions],
949 951 'reviewers': reviewers or self._get_reviewers(),
950 952 'title': title,
951 953 'description': description,
952 954 }
953 955 self.pull_request = model.create(**self.create_parameters)
954 956 assert model.get_versions(self.pull_request) == []
955 957
956 958 self.pull_request_id = self.pull_request.pull_request_id
957 959
958 960 if approved:
959 961 self.approve()
960 962
961 963 Session().add(self.pull_request)
962 964 Session().commit()
963 965
964 966 return self.pull_request
965 967
966 968 def approve(self):
967 969 self.create_status_votes(
968 970 ChangesetStatus.STATUS_APPROVED,
969 971 *self.pull_request.reviewers)
970 972
971 973 def close(self):
972 974 PullRequestModel().close_pull_request(self.pull_request, self.author)
973 975
974 976 def _default_branch_reference(self, commit_message):
975 977 reference = '%s:%s:%s' % (
976 978 'branch',
977 979 self.backend.default_branch_name,
978 980 self.commit_ids[commit_message])
979 981 return reference
980 982
981 983 def _get_reviewers(self):
982 984 model = UserModel()
983 985 return [
984 986 model.get_by_username(TEST_USER_REGULAR_LOGIN),
985 987 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
986 988 ]
987 989
988 990 def update_source_repository(self, head=None):
989 991 heads = [head or 'c3']
990 992 self.backend.pull_heads(self.source_repository, heads=heads)
991 993
992 994 def add_one_commit(self, head=None):
993 995 self.update_source_repository(head=head)
994 996 old_commit_ids = set(self.pull_request.revisions)
995 997 PullRequestModel().update_commits(self.pull_request)
996 998 commit_ids = set(self.pull_request.revisions)
997 999 new_commit_ids = commit_ids - old_commit_ids
998 1000 assert len(new_commit_ids) == 1
999 1001 return new_commit_ids.pop()
1000 1002
1001 1003 def remove_one_commit(self):
1002 1004 assert len(self.pull_request.revisions) == 2
1003 1005 source_vcs = self.source_repository.scm_instance()
1004 1006 removed_commit_id = source_vcs.commit_ids[-1]
1005 1007
1006 1008 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1007 1009 # remove the if once that's sorted out.
1008 1010 if self.backend.alias == "git":
1009 1011 kwargs = {'branch_name': self.backend.default_branch_name}
1010 1012 else:
1011 1013 kwargs = {}
1012 1014 source_vcs.strip(removed_commit_id, **kwargs)
1013 1015
1014 1016 PullRequestModel().update_commits(self.pull_request)
1015 1017 assert len(self.pull_request.revisions) == 1
1016 1018 return removed_commit_id
1017 1019
1018 1020 def create_comment(self, linked_to=None):
1019 1021 comment = CommentsModel().create(
1020 1022 text=u"Test comment",
1021 1023 repo=self.target_repository.repo_name,
1022 1024 user=self.author,
1023 1025 pull_request=self.pull_request)
1024 1026 assert comment.pull_request_version_id is None
1025 1027
1026 1028 if linked_to:
1027 1029 PullRequestModel()._link_comments_to_version(linked_to)
1028 1030
1029 1031 return comment
1030 1032
1031 1033 def create_inline_comment(
1032 1034 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1033 1035 comment = CommentsModel().create(
1034 1036 text=u"Test comment",
1035 1037 repo=self.target_repository.repo_name,
1036 1038 user=self.author,
1037 1039 line_no=line_no,
1038 1040 f_path=file_path,
1039 1041 pull_request=self.pull_request)
1040 1042 assert comment.pull_request_version_id is None
1041 1043
1042 1044 if linked_to:
1043 1045 PullRequestModel()._link_comments_to_version(linked_to)
1044 1046
1045 1047 return comment
1046 1048
1047 1049 def create_version_of_pull_request(self):
1048 1050 pull_request = self.create_pull_request()
1049 1051 version = PullRequestModel()._create_version_from_snapshot(
1050 1052 pull_request)
1051 1053 return version
1052 1054
1053 1055 def create_status_votes(self, status, *reviewers):
1054 1056 for reviewer in reviewers:
1055 1057 ChangesetStatusModel().set_status(
1056 1058 repo=self.pull_request.target_repo,
1057 1059 status=status,
1058 1060 user=reviewer.user_id,
1059 1061 pull_request=self.pull_request)
1060 1062
1061 1063 def set_mergeable(self, value):
1062 1064 if not self.mergeable_patcher:
1063 1065 self.mergeable_patcher = mock.patch.object(
1064 1066 VcsSettingsModel, 'get_general_settings')
1065 1067 self.mergeable_mock = self.mergeable_patcher.start()
1066 1068 self.mergeable_mock.return_value = {
1067 1069 'rhodecode_pr_merge_enabled': value}
1068 1070
1069 1071 def cleanup(self):
1070 1072 # In case the source repository is already cleaned up, the pull
1071 1073 # request will already be deleted.
1072 1074 pull_request = PullRequest().get(self.pull_request_id)
1073 1075 if pull_request:
1074 1076 PullRequestModel().delete(pull_request)
1075 1077 Session().commit()
1076 1078
1077 1079 if self.notification_patcher:
1078 1080 self.notification_patcher.stop()
1079 1081
1080 1082 if self.mergeable_patcher:
1081 1083 self.mergeable_patcher.stop()
1082 1084
1083 1085
1084 1086 @pytest.fixture
1085 1087 def user_admin(pylonsapp):
1086 1088 """
1087 1089 Provides the default admin test user as an instance of `db.User`.
1088 1090 """
1089 1091 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1090 1092 return user
1091 1093
1092 1094
1093 1095 @pytest.fixture
1094 1096 def user_regular(pylonsapp):
1095 1097 """
1096 1098 Provides the default regular test user as an instance of `db.User`.
1097 1099 """
1098 1100 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1099 1101 return user
1100 1102
1101 1103
1102 1104 @pytest.fixture
1103 1105 def user_util(request, pylonsapp):
1104 1106 """
1105 1107 Provides a wired instance of `UserUtility` with integrated cleanup.
1106 1108 """
1107 1109 utility = UserUtility(test_name=request.node.name)
1108 1110 request.addfinalizer(utility.cleanup)
1109 1111 return utility
1110 1112
1111 1113
1112 1114 # TODO: johbo: Split this up into utilities per domain or something similar
1113 1115 class UserUtility(object):
1114 1116
1115 1117 def __init__(self, test_name="test"):
1116 1118 self._test_name = self._sanitize_name(test_name)
1117 1119 self.fixture = Fixture()
1118 1120 self.repo_group_ids = []
1119 1121 self.repos_ids = []
1120 1122 self.user_ids = []
1121 1123 self.user_group_ids = []
1122 1124 self.user_repo_permission_ids = []
1123 1125 self.user_group_repo_permission_ids = []
1124 1126 self.user_repo_group_permission_ids = []
1125 1127 self.user_group_repo_group_permission_ids = []
1126 1128 self.user_user_group_permission_ids = []
1127 1129 self.user_group_user_group_permission_ids = []
1128 1130 self.user_permissions = []
1129 1131
1130 1132 def _sanitize_name(self, name):
1131 1133 for char in ['[', ']']:
1132 1134 name = name.replace(char, '_')
1133 1135 return name
1134 1136
1135 1137 def create_repo_group(
1136 1138 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1137 1139 group_name = "{prefix}_repogroup_{count}".format(
1138 1140 prefix=self._test_name,
1139 1141 count=len(self.repo_group_ids))
1140 1142 repo_group = self.fixture.create_repo_group(
1141 1143 group_name, cur_user=owner)
1142 1144 if auto_cleanup:
1143 1145 self.repo_group_ids.append(repo_group.group_id)
1144 1146 return repo_group
1145 1147
1146 1148 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None, auto_cleanup=True):
1147 1149 repo_name = "{prefix}_repository_{count}".format(
1148 1150 prefix=self._test_name,
1149 1151 count=len(self.repos_ids))
1150 1152
1151 1153 repository = self.fixture.create_repo(
1152 1154 repo_name, cur_user=owner, repo_group=parent)
1153 1155 if auto_cleanup:
1154 1156 self.repos_ids.append(repository.repo_id)
1155 1157 return repository
1156 1158
1157 1159 def create_user(self, auto_cleanup=True, **kwargs):
1158 1160 user_name = "{prefix}_user_{count}".format(
1159 1161 prefix=self._test_name,
1160 1162 count=len(self.user_ids))
1161 1163 user = self.fixture.create_user(user_name, **kwargs)
1162 1164 if auto_cleanup:
1163 1165 self.user_ids.append(user.user_id)
1164 1166 return user
1165 1167
1166 1168 def create_user_with_group(self):
1167 1169 user = self.create_user()
1168 1170 user_group = self.create_user_group(members=[user])
1169 1171 return user, user_group
1170 1172
1171 1173 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1172 1174 auto_cleanup=True, **kwargs):
1173 1175 group_name = "{prefix}_usergroup_{count}".format(
1174 1176 prefix=self._test_name,
1175 1177 count=len(self.user_group_ids))
1176 1178 user_group = self.fixture.create_user_group(
1177 1179 group_name, cur_user=owner, **kwargs)
1178 1180
1179 1181 if auto_cleanup:
1180 1182 self.user_group_ids.append(user_group.users_group_id)
1181 1183 if members:
1182 1184 for user in members:
1183 1185 UserGroupModel().add_user_to_group(user_group, user)
1184 1186 return user_group
1185 1187
1186 1188 def grant_user_permission(self, user_name, permission_name):
1187 1189 self._inherit_default_user_permissions(user_name, False)
1188 1190 self.user_permissions.append((user_name, permission_name))
1189 1191
1190 1192 def grant_user_permission_to_repo_group(
1191 1193 self, repo_group, user, permission_name):
1192 1194 permission = RepoGroupModel().grant_user_permission(
1193 1195 repo_group, user, permission_name)
1194 1196 self.user_repo_group_permission_ids.append(
1195 1197 (repo_group.group_id, user.user_id))
1196 1198 return permission
1197 1199
1198 1200 def grant_user_group_permission_to_repo_group(
1199 1201 self, repo_group, user_group, permission_name):
1200 1202 permission = RepoGroupModel().grant_user_group_permission(
1201 1203 repo_group, user_group, permission_name)
1202 1204 self.user_group_repo_group_permission_ids.append(
1203 1205 (repo_group.group_id, user_group.users_group_id))
1204 1206 return permission
1205 1207
1206 1208 def grant_user_permission_to_repo(
1207 1209 self, repo, user, permission_name):
1208 1210 permission = RepoModel().grant_user_permission(
1209 1211 repo, user, permission_name)
1210 1212 self.user_repo_permission_ids.append(
1211 1213 (repo.repo_id, user.user_id))
1212 1214 return permission
1213 1215
1214 1216 def grant_user_group_permission_to_repo(
1215 1217 self, repo, user_group, permission_name):
1216 1218 permission = RepoModel().grant_user_group_permission(
1217 1219 repo, user_group, permission_name)
1218 1220 self.user_group_repo_permission_ids.append(
1219 1221 (repo.repo_id, user_group.users_group_id))
1220 1222 return permission
1221 1223
1222 1224 def grant_user_permission_to_user_group(
1223 1225 self, target_user_group, user, permission_name):
1224 1226 permission = UserGroupModel().grant_user_permission(
1225 1227 target_user_group, user, permission_name)
1226 1228 self.user_user_group_permission_ids.append(
1227 1229 (target_user_group.users_group_id, user.user_id))
1228 1230 return permission
1229 1231
1230 1232 def grant_user_group_permission_to_user_group(
1231 1233 self, target_user_group, user_group, permission_name):
1232 1234 permission = UserGroupModel().grant_user_group_permission(
1233 1235 target_user_group, user_group, permission_name)
1234 1236 self.user_group_user_group_permission_ids.append(
1235 1237 (target_user_group.users_group_id, user_group.users_group_id))
1236 1238 return permission
1237 1239
1238 1240 def revoke_user_permission(self, user_name, permission_name):
1239 1241 self._inherit_default_user_permissions(user_name, True)
1240 1242 UserModel().revoke_perm(user_name, permission_name)
1241 1243
1242 1244 def _inherit_default_user_permissions(self, user_name, value):
1243 1245 user = UserModel().get_by_username(user_name)
1244 1246 user.inherit_default_permissions = value
1245 1247 Session().add(user)
1246 1248 Session().commit()
1247 1249
1248 1250 def cleanup(self):
1249 1251 self._cleanup_permissions()
1250 1252 self._cleanup_repos()
1251 1253 self._cleanup_repo_groups()
1252 1254 self._cleanup_user_groups()
1253 1255 self._cleanup_users()
1254 1256
1255 1257 def _cleanup_permissions(self):
1256 1258 if self.user_permissions:
1257 1259 for user_name, permission_name in self.user_permissions:
1258 1260 self.revoke_user_permission(user_name, permission_name)
1259 1261
1260 1262 for permission in self.user_repo_permission_ids:
1261 1263 RepoModel().revoke_user_permission(*permission)
1262 1264
1263 1265 for permission in self.user_group_repo_permission_ids:
1264 1266 RepoModel().revoke_user_group_permission(*permission)
1265 1267
1266 1268 for permission in self.user_repo_group_permission_ids:
1267 1269 RepoGroupModel().revoke_user_permission(*permission)
1268 1270
1269 1271 for permission in self.user_group_repo_group_permission_ids:
1270 1272 RepoGroupModel().revoke_user_group_permission(*permission)
1271 1273
1272 1274 for permission in self.user_user_group_permission_ids:
1273 1275 UserGroupModel().revoke_user_permission(*permission)
1274 1276
1275 1277 for permission in self.user_group_user_group_permission_ids:
1276 1278 UserGroupModel().revoke_user_group_permission(*permission)
1277 1279
1278 1280 def _cleanup_repo_groups(self):
1279 1281 def _repo_group_compare(first_group_id, second_group_id):
1280 1282 """
1281 1283 Gives higher priority to the groups with the most complex paths
1282 1284 """
1283 1285 first_group = RepoGroup.get(first_group_id)
1284 1286 second_group = RepoGroup.get(second_group_id)
1285 1287 first_group_parts = (
1286 1288 len(first_group.group_name.split('/')) if first_group else 0)
1287 1289 second_group_parts = (
1288 1290 len(second_group.group_name.split('/')) if second_group else 0)
1289 1291 return cmp(second_group_parts, first_group_parts)
1290 1292
1291 1293 sorted_repo_group_ids = sorted(
1292 1294 self.repo_group_ids, cmp=_repo_group_compare)
1293 1295 for repo_group_id in sorted_repo_group_ids:
1294 1296 self.fixture.destroy_repo_group(repo_group_id)
1295 1297
1296 1298 def _cleanup_repos(self):
1297 1299 sorted_repos_ids = sorted(self.repos_ids)
1298 1300 for repo_id in sorted_repos_ids:
1299 1301 self.fixture.destroy_repo(repo_id)
1300 1302
1301 1303 def _cleanup_user_groups(self):
1302 1304 def _user_group_compare(first_group_id, second_group_id):
1303 1305 """
1304 1306 Gives higher priority to the groups with the most complex paths
1305 1307 """
1306 1308 first_group = UserGroup.get(first_group_id)
1307 1309 second_group = UserGroup.get(second_group_id)
1308 1310 first_group_parts = (
1309 1311 len(first_group.users_group_name.split('/'))
1310 1312 if first_group else 0)
1311 1313 second_group_parts = (
1312 1314 len(second_group.users_group_name.split('/'))
1313 1315 if second_group else 0)
1314 1316 return cmp(second_group_parts, first_group_parts)
1315 1317
1316 1318 sorted_user_group_ids = sorted(
1317 1319 self.user_group_ids, cmp=_user_group_compare)
1318 1320 for user_group_id in sorted_user_group_ids:
1319 1321 self.fixture.destroy_user_group(user_group_id)
1320 1322
1321 1323 def _cleanup_users(self):
1322 1324 for user_id in self.user_ids:
1323 1325 self.fixture.destroy_user(user_id)
1324 1326
1325 1327
1326 1328 # TODO: Think about moving this into a pytest-pyro package and make it a
1327 1329 # pytest plugin
1328 1330 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1329 1331 def pytest_runtest_makereport(item, call):
1330 1332 """
1331 1333 Adding the remote traceback if the exception has this information.
1332 1334
1333 1335 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1334 1336 to the exception instance.
1335 1337 """
1336 1338 outcome = yield
1337 1339 report = outcome.get_result()
1338 1340 if call.excinfo:
1339 1341 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1340 1342
1341 1343
1342 1344 def _add_vcsserver_remote_traceback(report, exc):
1343 1345 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1344 1346
1345 1347 if vcsserver_traceback:
1346 1348 section = 'VCSServer remote traceback ' + report.when
1347 1349 report.sections.append((section, vcsserver_traceback))
1348 1350
1349 1351
1350 1352 @pytest.fixture(scope='session')
1351 1353 def testrun():
1352 1354 return {
1353 1355 'uuid': uuid.uuid4(),
1354 1356 'start': datetime.datetime.utcnow().isoformat(),
1355 1357 'timestamp': int(time.time()),
1356 1358 }
1357 1359
1358 1360
1359 1361 @pytest.fixture(autouse=True)
1360 1362 def collect_appenlight_stats(request, testrun):
1361 1363 """
1362 1364 This fixture reports memory consumtion of single tests.
1363 1365
1364 1366 It gathers data based on `psutil` and sends them to Appenlight. The option
1365 1367 ``--ae`` has te be used to enable this fixture and the API key for your
1366 1368 application has to be provided in ``--ae-key``.
1367 1369 """
1368 1370 try:
1369 1371 # cygwin cannot have yet psutil support.
1370 1372 import psutil
1371 1373 except ImportError:
1372 1374 return
1373 1375
1374 1376 if not request.config.getoption('--appenlight'):
1375 1377 return
1376 1378 else:
1377 1379 # Only request the pylonsapp fixture if appenlight tracking is
1378 1380 # enabled. This will speed up a test run of unit tests by 2 to 3
1379 1381 # seconds if appenlight is not enabled.
1380 1382 pylonsapp = request.getfuncargvalue("pylonsapp")
1381 1383 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1382 1384 client = AppenlightClient(
1383 1385 url=url,
1384 1386 api_key=request.config.getoption('--appenlight-api-key'),
1385 1387 namespace=request.node.nodeid,
1386 1388 request=str(testrun['uuid']),
1387 1389 testrun=testrun)
1388 1390
1389 1391 client.collect({
1390 1392 'message': "Starting",
1391 1393 })
1392 1394
1393 1395 server_and_port = pylonsapp.config['vcs.server']
1394 1396 protocol = pylonsapp.config['vcs.server.protocol']
1395 1397 server = create_vcsserver_proxy(server_and_port, protocol)
1396 1398 with server:
1397 1399 vcs_pid = server.get_pid()
1398 1400 server.run_gc()
1399 1401 vcs_process = psutil.Process(vcs_pid)
1400 1402 mem = vcs_process.memory_info()
1401 1403 client.tag_before('vcsserver.rss', mem.rss)
1402 1404 client.tag_before('vcsserver.vms', mem.vms)
1403 1405
1404 1406 test_process = psutil.Process()
1405 1407 mem = test_process.memory_info()
1406 1408 client.tag_before('test.rss', mem.rss)
1407 1409 client.tag_before('test.vms', mem.vms)
1408 1410
1409 1411 client.tag_before('time', time.time())
1410 1412
1411 1413 @request.addfinalizer
1412 1414 def send_stats():
1413 1415 client.tag_after('time', time.time())
1414 1416 with server:
1415 1417 gc_stats = server.run_gc()
1416 1418 for tag, value in gc_stats.items():
1417 1419 client.tag_after(tag, value)
1418 1420 mem = vcs_process.memory_info()
1419 1421 client.tag_after('vcsserver.rss', mem.rss)
1420 1422 client.tag_after('vcsserver.vms', mem.vms)
1421 1423
1422 1424 mem = test_process.memory_info()
1423 1425 client.tag_after('test.rss', mem.rss)
1424 1426 client.tag_after('test.vms', mem.vms)
1425 1427
1426 1428 client.collect({
1427 1429 'message': "Finished",
1428 1430 })
1429 1431 client.send_stats()
1430 1432
1431 1433 return client
1432 1434
1433 1435
1434 1436 class AppenlightClient():
1435 1437
1436 1438 url_template = '{url}?protocol_version=0.5'
1437 1439
1438 1440 def __init__(
1439 1441 self, url, api_key, add_server=True, add_timestamp=True,
1440 1442 namespace=None, request=None, testrun=None):
1441 1443 self.url = self.url_template.format(url=url)
1442 1444 self.api_key = api_key
1443 1445 self.add_server = add_server
1444 1446 self.add_timestamp = add_timestamp
1445 1447 self.namespace = namespace
1446 1448 self.request = request
1447 1449 self.server = socket.getfqdn(socket.gethostname())
1448 1450 self.tags_before = {}
1449 1451 self.tags_after = {}
1450 1452 self.stats = []
1451 1453 self.testrun = testrun or {}
1452 1454
1453 1455 def tag_before(self, tag, value):
1454 1456 self.tags_before[tag] = value
1455 1457
1456 1458 def tag_after(self, tag, value):
1457 1459 self.tags_after[tag] = value
1458 1460
1459 1461 def collect(self, data):
1460 1462 if self.add_server:
1461 1463 data.setdefault('server', self.server)
1462 1464 if self.add_timestamp:
1463 1465 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1464 1466 if self.namespace:
1465 1467 data.setdefault('namespace', self.namespace)
1466 1468 if self.request:
1467 1469 data.setdefault('request', self.request)
1468 1470 self.stats.append(data)
1469 1471
1470 1472 def send_stats(self):
1471 1473 tags = [
1472 1474 ('testrun', self.request),
1473 1475 ('testrun.start', self.testrun['start']),
1474 1476 ('testrun.timestamp', self.testrun['timestamp']),
1475 1477 ('test', self.namespace),
1476 1478 ]
1477 1479 for key, value in self.tags_before.items():
1478 1480 tags.append((key + '.before', value))
1479 1481 try:
1480 1482 delta = self.tags_after[key] - value
1481 1483 tags.append((key + '.delta', delta))
1482 1484 except Exception:
1483 1485 pass
1484 1486 for key, value in self.tags_after.items():
1485 1487 tags.append((key + '.after', value))
1486 1488 self.collect({
1487 1489 'message': "Collected tags",
1488 1490 'tags': tags,
1489 1491 })
1490 1492
1491 1493 response = requests.post(
1492 1494 self.url,
1493 1495 headers={
1494 1496 'X-appenlight-api-key': self.api_key},
1495 1497 json=self.stats,
1496 1498 )
1497 1499
1498 1500 if not response.status_code == 200:
1499 1501 pprint.pprint(self.stats)
1500 1502 print response.headers
1501 1503 print response.text
1502 1504 raise Exception('Sending to appenlight failed')
1503 1505
1504 1506
1505 1507 @pytest.fixture
1506 1508 def gist_util(request, pylonsapp):
1507 1509 """
1508 1510 Provides a wired instance of `GistUtility` with integrated cleanup.
1509 1511 """
1510 1512 utility = GistUtility()
1511 1513 request.addfinalizer(utility.cleanup)
1512 1514 return utility
1513 1515
1514 1516
1515 1517 class GistUtility(object):
1516 1518 def __init__(self):
1517 1519 self.fixture = Fixture()
1518 1520 self.gist_ids = []
1519 1521
1520 1522 def create_gist(self, **kwargs):
1521 1523 gist = self.fixture.create_gist(**kwargs)
1522 1524 self.gist_ids.append(gist.gist_id)
1523 1525 return gist
1524 1526
1525 1527 def cleanup(self):
1526 1528 for id_ in self.gist_ids:
1527 1529 self.fixture.destroy_gists(str(id_))
1528 1530
1529 1531
1530 1532 @pytest.fixture
1531 1533 def enabled_backends(request):
1532 1534 backends = request.config.option.backends
1533 1535 return backends[:]
1534 1536
1535 1537
1536 1538 @pytest.fixture
1537 1539 def settings_util(request):
1538 1540 """
1539 1541 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1540 1542 """
1541 1543 utility = SettingsUtility()
1542 1544 request.addfinalizer(utility.cleanup)
1543 1545 return utility
1544 1546
1545 1547
1546 1548 class SettingsUtility(object):
1547 1549 def __init__(self):
1548 1550 self.rhodecode_ui_ids = []
1549 1551 self.rhodecode_setting_ids = []
1550 1552 self.repo_rhodecode_ui_ids = []
1551 1553 self.repo_rhodecode_setting_ids = []
1552 1554
1553 1555 def create_repo_rhodecode_ui(
1554 1556 self, repo, section, value, key=None, active=True, cleanup=True):
1555 1557 key = key or hashlib.sha1(
1556 1558 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1557 1559
1558 1560 setting = RepoRhodeCodeUi()
1559 1561 setting.repository_id = repo.repo_id
1560 1562 setting.ui_section = section
1561 1563 setting.ui_value = value
1562 1564 setting.ui_key = key
1563 1565 setting.ui_active = active
1564 1566 Session().add(setting)
1565 1567 Session().commit()
1566 1568
1567 1569 if cleanup:
1568 1570 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1569 1571 return setting
1570 1572
1571 1573 def create_rhodecode_ui(
1572 1574 self, section, value, key=None, active=True, cleanup=True):
1573 1575 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1574 1576
1575 1577 setting = RhodeCodeUi()
1576 1578 setting.ui_section = section
1577 1579 setting.ui_value = value
1578 1580 setting.ui_key = key
1579 1581 setting.ui_active = active
1580 1582 Session().add(setting)
1581 1583 Session().commit()
1582 1584
1583 1585 if cleanup:
1584 1586 self.rhodecode_ui_ids.append(setting.ui_id)
1585 1587 return setting
1586 1588
1587 1589 def create_repo_rhodecode_setting(
1588 1590 self, repo, name, value, type_, cleanup=True):
1589 1591 setting = RepoRhodeCodeSetting(
1590 1592 repo.repo_id, key=name, val=value, type=type_)
1591 1593 Session().add(setting)
1592 1594 Session().commit()
1593 1595
1594 1596 if cleanup:
1595 1597 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1596 1598 return setting
1597 1599
1598 1600 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1599 1601 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1600 1602 Session().add(setting)
1601 1603 Session().commit()
1602 1604
1603 1605 if cleanup:
1604 1606 self.rhodecode_setting_ids.append(setting.app_settings_id)
1605 1607
1606 1608 return setting
1607 1609
1608 1610 def cleanup(self):
1609 1611 for id_ in self.rhodecode_ui_ids:
1610 1612 setting = RhodeCodeUi.get(id_)
1611 1613 Session().delete(setting)
1612 1614
1613 1615 for id_ in self.rhodecode_setting_ids:
1614 1616 setting = RhodeCodeSetting.get(id_)
1615 1617 Session().delete(setting)
1616 1618
1617 1619 for id_ in self.repo_rhodecode_ui_ids:
1618 1620 setting = RepoRhodeCodeUi.get(id_)
1619 1621 Session().delete(setting)
1620 1622
1621 1623 for id_ in self.repo_rhodecode_setting_ids:
1622 1624 setting = RepoRhodeCodeSetting.get(id_)
1623 1625 Session().delete(setting)
1624 1626
1625 1627 Session().commit()
1626 1628
1627 1629
1628 1630 @pytest.fixture
1629 1631 def no_notifications(request):
1630 1632 notification_patcher = mock.patch(
1631 1633 'rhodecode.model.notification.NotificationModel.create')
1632 1634 notification_patcher.start()
1633 1635 request.addfinalizer(notification_patcher.stop)
1634 1636
1635 1637
1636 1638 @pytest.fixture
1637 1639 def silence_action_logger(request):
1638 1640 notification_patcher = mock.patch(
1639 1641 'rhodecode.lib.utils.action_logger')
1640 1642 notification_patcher.start()
1641 1643 request.addfinalizer(notification_patcher.stop)
1642 1644
1643 1645
1644 1646 @pytest.fixture(scope='session')
1645 1647 def repeat(request):
1646 1648 """
1647 1649 The number of repetitions is based on this fixture.
1648 1650
1649 1651 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1650 1652 tests are not too slow in our default test suite.
1651 1653 """
1652 1654 return request.config.getoption('--repeat')
1653 1655
1654 1656
1655 1657 @pytest.fixture
1656 1658 def rhodecode_fixtures():
1657 1659 return Fixture()
1658 1660
1659 1661
1660 1662 @pytest.fixture
1661 1663 def request_stub():
1662 1664 """
1663 1665 Stub request object.
1664 1666 """
1665 1667 request = pyramid.testing.DummyRequest()
1666 1668 request.scheme = 'https'
1667 1669 return request
1668 1670
1669 1671
1670 1672 @pytest.fixture
1671 1673 def config_stub(request, request_stub):
1672 1674 """
1673 1675 Set up pyramid.testing and return the Configurator.
1674 1676 """
1675 1677 config = pyramid.testing.setUp(request=request_stub)
1676 1678
1677 1679 @request.addfinalizer
1678 1680 def cleanup():
1679 1681 pyramid.testing.tearDown()
1680 1682
1681 1683 return config
1682 1684
1683 1685
1684 1686 @pytest.fixture
1685 1687 def StubIntegrationType():
1686 1688 class _StubIntegrationType(IntegrationTypeBase):
1687 1689 """ Test integration type class """
1688 1690
1689 1691 key = 'test'
1690 1692 display_name = 'Test integration type'
1691 1693 description = 'A test integration type for testing'
1692 1694 icon = 'test_icon_html_image'
1693 1695
1694 1696 def __init__(self, settings):
1695 1697 super(_StubIntegrationType, self).__init__(settings)
1696 1698 self.sent_events = [] # for testing
1697 1699
1698 1700 def send_event(self, event):
1699 1701 self.sent_events.append(event)
1700 1702
1701 1703 def settings_schema(self):
1702 1704 class SettingsSchema(colander.Schema):
1703 1705 test_string_field = colander.SchemaNode(
1704 1706 colander.String(),
1705 1707 missing=colander.required,
1706 1708 title='test string field',
1707 1709 )
1708 1710 test_int_field = colander.SchemaNode(
1709 1711 colander.Int(),
1710 1712 title='some integer setting',
1711 1713 )
1712 1714 return SettingsSchema()
1713 1715
1714 1716
1715 1717 integration_type_registry.register_integration_type(_StubIntegrationType)
1716 1718 return _StubIntegrationType
1717 1719
1718 1720 @pytest.fixture
1719 1721 def stub_integration_settings():
1720 1722 return {
1721 1723 'test_string_field': 'some data',
1722 1724 'test_int_field': 100,
1723 1725 }
1724 1726
1725 1727
1726 1728 @pytest.fixture
1727 1729 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1728 1730 stub_integration_settings):
1729 1731 integration = IntegrationModel().create(
1730 1732 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1731 1733 name='test repo integration',
1732 1734 repo=repo_stub, repo_group=None, child_repos_only=None)
1733 1735
1734 1736 @request.addfinalizer
1735 1737 def cleanup():
1736 1738 IntegrationModel().delete(integration)
1737 1739
1738 1740 return integration
1739 1741
1740 1742
1741 1743 @pytest.fixture
1742 1744 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1743 1745 stub_integration_settings):
1744 1746 integration = IntegrationModel().create(
1745 1747 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1746 1748 name='test repogroup integration',
1747 1749 repo=None, repo_group=test_repo_group, child_repos_only=True)
1748 1750
1749 1751 @request.addfinalizer
1750 1752 def cleanup():
1751 1753 IntegrationModel().delete(integration)
1752 1754
1753 1755 return integration
1754 1756
1755 1757
1756 1758 @pytest.fixture
1757 1759 def repogroup_recursive_integration_stub(request, test_repo_group,
1758 1760 StubIntegrationType, stub_integration_settings):
1759 1761 integration = IntegrationModel().create(
1760 1762 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1761 1763 name='test recursive repogroup integration',
1762 1764 repo=None, repo_group=test_repo_group, child_repos_only=False)
1763 1765
1764 1766 @request.addfinalizer
1765 1767 def cleanup():
1766 1768 IntegrationModel().delete(integration)
1767 1769
1768 1770 return integration
1769 1771
1770 1772
1771 1773 @pytest.fixture
1772 1774 def global_integration_stub(request, StubIntegrationType,
1773 1775 stub_integration_settings):
1774 1776 integration = IntegrationModel().create(
1775 1777 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1776 1778 name='test global integration',
1777 1779 repo=None, repo_group=None, child_repos_only=None)
1778 1780
1779 1781 @request.addfinalizer
1780 1782 def cleanup():
1781 1783 IntegrationModel().delete(integration)
1782 1784
1783 1785 return integration
1784 1786
1785 1787
1786 1788 @pytest.fixture
1787 1789 def root_repos_integration_stub(request, StubIntegrationType,
1788 1790 stub_integration_settings):
1789 1791 integration = IntegrationModel().create(
1790 1792 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1791 1793 name='test global integration',
1792 1794 repo=None, repo_group=None, child_repos_only=True)
1793 1795
1794 1796 @request.addfinalizer
1795 1797 def cleanup():
1796 1798 IntegrationModel().delete(integration)
1797 1799
1798 1800 return integration
1799 1801
1800 1802
1801 1803 @pytest.fixture
1802 1804 def local_dt_to_utc():
1803 1805 def _factory(dt):
1804 1806 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1805 1807 dateutil.tz.tzutc()).replace(tzinfo=None)
1806 1808 return _factory
General Comments 0
You need to be logged in to leave comments. Login now