##// END OF EJS Templates
tests: fixed tests after removing hardcoded timezone in tests.
marcink -
r1351:8e93f34d default
parent child Browse files
Show More
@@ -1,1816 +1,1825 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 import dateutil.tz
32 33
33 34 import mock
34 35 import pyramid.testing
35 36 import pytest
36 37 import colander
37 38 import requests
38 39
39 40 import rhodecode
40 41 from rhodecode.lib.utils2 import AttributeDict
41 42 from rhodecode.model.changeset_status import ChangesetStatusModel
42 43 from rhodecode.model.comment import CommentsModel
43 44 from rhodecode.model.db import (
44 45 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
45 46 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
46 47 from rhodecode.model.meta import Session
47 48 from rhodecode.model.pull_request import PullRequestModel
48 49 from rhodecode.model.repo import RepoModel
49 50 from rhodecode.model.repo_group import RepoGroupModel
50 51 from rhodecode.model.user import UserModel
51 52 from rhodecode.model.settings import VcsSettingsModel
52 53 from rhodecode.model.user_group import UserGroupModel
53 54 from rhodecode.model.integration import IntegrationModel
54 55 from rhodecode.integrations import integration_type_registry
55 56 from rhodecode.integrations.types.base import IntegrationTypeBase
56 57 from rhodecode.lib.utils import repo2db_mapper
57 58 from rhodecode.lib.vcs import create_vcsserver_proxy
58 59 from rhodecode.lib.vcs.backends import get_backend
59 60 from rhodecode.lib.vcs.nodes import FileNode
60 61 from rhodecode.tests import (
61 62 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
62 63 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
63 64 TEST_USER_REGULAR_PASS)
64 65 from rhodecode.tests.utils import CustomTestApp
65 66 from rhodecode.tests.fixture import Fixture
66 67
67 68
68 69 def _split_comma(value):
69 70 return value.split(',')
70 71
71 72
72 73 def pytest_addoption(parser):
73 74 parser.addoption(
74 75 '--keep-tmp-path', action='store_true',
75 76 help="Keep the test temporary directories")
76 77 parser.addoption(
77 78 '--backends', action='store', type=_split_comma,
78 79 default=['git', 'hg', 'svn'],
79 80 help="Select which backends to test for backend specific tests.")
80 81 parser.addoption(
81 82 '--dbs', action='store', type=_split_comma,
82 83 default=['sqlite'],
83 84 help="Select which database to test for database specific tests. "
84 85 "Possible options are sqlite,postgres,mysql")
85 86 parser.addoption(
86 87 '--appenlight', '--ae', action='store_true',
87 88 help="Track statistics in appenlight.")
88 89 parser.addoption(
89 90 '--appenlight-api-key', '--ae-key',
90 91 help="API key for Appenlight.")
91 92 parser.addoption(
92 93 '--appenlight-url', '--ae-url',
93 94 default="https://ae.rhodecode.com",
94 95 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
95 96 parser.addoption(
96 97 '--sqlite-connection-string', action='store',
97 98 default='', help="Connection string for the dbs tests with SQLite")
98 99 parser.addoption(
99 100 '--postgres-connection-string', action='store',
100 101 default='', help="Connection string for the dbs tests with Postgres")
101 102 parser.addoption(
102 103 '--mysql-connection-string', action='store',
103 104 default='', help="Connection string for the dbs tests with MySQL")
104 105 parser.addoption(
105 106 '--repeat', type=int, default=100,
106 107 help="Number of repetitions in performance tests.")
107 108
108 109
109 110 def pytest_configure(config):
110 111 # Appy the kombu patch early on, needed for test discovery on Python 2.7.11
111 112 from rhodecode.config import patches
112 113 patches.kombu_1_5_1_python_2_7_11()
113 114
114 115
115 116 def pytest_collection_modifyitems(session, config, items):
116 117 # nottest marked, compare nose, used for transition from nose to pytest
117 118 remaining = [
118 119 i for i in items if getattr(i.obj, '__test__', True)]
119 120 items[:] = remaining
120 121
121 122
122 123 def pytest_generate_tests(metafunc):
123 124 # Support test generation based on --backend parameter
124 125 if 'backend_alias' in metafunc.fixturenames:
125 126 backends = get_backends_from_metafunc(metafunc)
126 127 scope = None
127 128 if not backends:
128 129 pytest.skip("Not enabled for any of selected backends")
129 130 metafunc.parametrize('backend_alias', backends, scope=scope)
130 131 elif hasattr(metafunc.function, 'backends'):
131 132 backends = get_backends_from_metafunc(metafunc)
132 133 if not backends:
133 134 pytest.skip("Not enabled for any of selected backends")
134 135
135 136
136 137 def get_backends_from_metafunc(metafunc):
137 138 requested_backends = set(metafunc.config.getoption('--backends'))
138 139 if hasattr(metafunc.function, 'backends'):
139 140 # Supported backends by this test function, created from
140 141 # pytest.mark.backends
141 142 backends = metafunc.function.backends.args
142 143 elif hasattr(metafunc.cls, 'backend_alias'):
143 144 # Support class attribute "backend_alias", this is mainly
144 145 # for legacy reasons for tests not yet using pytest.mark.backends
145 146 backends = [metafunc.cls.backend_alias]
146 147 else:
147 148 backends = metafunc.config.getoption('--backends')
148 149 return requested_backends.intersection(backends)
149 150
150 151
151 152 @pytest.fixture(scope='session', autouse=True)
152 153 def activate_example_rcextensions(request):
153 154 """
154 155 Patch in an example rcextensions module which verifies passed in kwargs.
155 156 """
156 157 from rhodecode.tests.other import example_rcextensions
157 158
158 159 old_extensions = rhodecode.EXTENSIONS
159 160 rhodecode.EXTENSIONS = example_rcextensions
160 161
161 162 @request.addfinalizer
162 163 def cleanup():
163 164 rhodecode.EXTENSIONS = old_extensions
164 165
165 166
166 167 @pytest.fixture
167 168 def capture_rcextensions():
168 169 """
169 170 Returns the recorded calls to entry points in rcextensions.
170 171 """
171 172 calls = rhodecode.EXTENSIONS.calls
172 173 calls.clear()
173 174 # Note: At this moment, it is still the empty dict, but that will
174 175 # be filled during the test run and since it is a reference this
175 176 # is enough to make it work.
176 177 return calls
177 178
178 179
179 180 @pytest.fixture(scope='session')
180 181 def http_environ_session():
181 182 """
182 183 Allow to use "http_environ" in session scope.
183 184 """
184 185 return http_environ(
185 186 http_host_stub=http_host_stub())
186 187
187 188
188 189 @pytest.fixture
189 190 def http_host_stub():
190 191 """
191 192 Value of HTTP_HOST in the test run.
192 193 """
193 194 return 'test.example.com:80'
194 195
195 196
196 197 @pytest.fixture
197 198 def http_environ(http_host_stub):
198 199 """
199 200 HTTP extra environ keys.
200 201
201 202 User by the test application and as well for setting up the pylons
202 203 environment. In the case of the fixture "app" it should be possible
203 204 to override this for a specific test case.
204 205 """
205 206 return {
206 207 'SERVER_NAME': http_host_stub.split(':')[0],
207 208 'SERVER_PORT': http_host_stub.split(':')[1],
208 209 'HTTP_HOST': http_host_stub,
209 210 }
210 211
211 212
212 213 @pytest.fixture(scope='function')
213 214 def app(request, pylonsapp, http_environ):
214 215
215 216
216 217 app = CustomTestApp(
217 218 pylonsapp,
218 219 extra_environ=http_environ)
219 220 if request.cls:
220 221 request.cls.app = app
221 222 return app
222 223
223 224
224 225 @pytest.fixture(scope='session')
225 226 def app_settings(pylonsapp, pylons_config):
226 227 """
227 228 Settings dictionary used to create the app.
228 229
229 230 Parses the ini file and passes the result through the sanitize and apply
230 231 defaults mechanism in `rhodecode.config.middleware`.
231 232 """
232 233 from paste.deploy.loadwsgi import loadcontext, APP
233 234 from rhodecode.config.middleware import (
234 235 sanitize_settings_and_apply_defaults)
235 236 context = loadcontext(APP, 'config:' + pylons_config)
236 237 settings = sanitize_settings_and_apply_defaults(context.config())
237 238 return settings
238 239
239 240
240 241 @pytest.fixture(scope='session')
241 242 def db(app_settings):
242 243 """
243 244 Initializes the database connection.
244 245
245 246 It uses the same settings which are used to create the ``pylonsapp`` or
246 247 ``app`` fixtures.
247 248 """
248 249 from rhodecode.config.utils import initialize_database
249 250 initialize_database(app_settings)
250 251
251 252
252 253 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
253 254
254 255
255 256 def _autologin_user(app, *args):
256 257 session = login_user_session(app, *args)
257 258 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
258 259 return LoginData(csrf_token, session['rhodecode_user'])
259 260
260 261
261 262 @pytest.fixture
262 263 def autologin_user(app):
263 264 """
264 265 Utility fixture which makes sure that the admin user is logged in
265 266 """
266 267 return _autologin_user(app)
267 268
268 269
269 270 @pytest.fixture
270 271 def autologin_regular_user(app):
271 272 """
272 273 Utility fixture which makes sure that the regular user is logged in
273 274 """
274 275 return _autologin_user(
275 276 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
276 277
277 278
278 279 @pytest.fixture(scope='function')
279 280 def csrf_token(request, autologin_user):
280 281 return autologin_user.csrf_token
281 282
282 283
283 284 @pytest.fixture(scope='function')
284 285 def xhr_header(request):
285 286 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
286 287
287 288
288 289 @pytest.fixture
289 290 def real_crypto_backend(monkeypatch):
290 291 """
291 292 Switch the production crypto backend on for this test.
292 293
293 294 During the test run the crypto backend is replaced with a faster
294 295 implementation based on the MD5 algorithm.
295 296 """
296 297 monkeypatch.setattr(rhodecode, 'is_test', False)
297 298
298 299
299 300 @pytest.fixture(scope='class')
300 301 def index_location(request, pylonsapp):
301 302 index_location = pylonsapp.config['app_conf']['search.location']
302 303 if request.cls:
303 304 request.cls.index_location = index_location
304 305 return index_location
305 306
306 307
307 308 @pytest.fixture(scope='session', autouse=True)
308 309 def tests_tmp_path(request):
309 310 """
310 311 Create temporary directory to be used during the test session.
311 312 """
312 313 if not os.path.exists(TESTS_TMP_PATH):
313 314 os.makedirs(TESTS_TMP_PATH)
314 315
315 316 if not request.config.getoption('--keep-tmp-path'):
316 317 @request.addfinalizer
317 318 def remove_tmp_path():
318 319 shutil.rmtree(TESTS_TMP_PATH)
319 320
320 321 return TESTS_TMP_PATH
321 322
322 323
323 324 @pytest.fixture(scope='session', autouse=True)
324 325 def patch_pyro_request_scope_proxy_factory(request):
325 326 """
326 327 Patch the pyro proxy factory to always use the same dummy request object
327 328 when under test. This will return the same pyro proxy on every call.
328 329 """
329 330 dummy_request = pyramid.testing.DummyRequest()
330 331
331 332 def mocked_call(self, request=None):
332 333 return self.getProxy(request=dummy_request)
333 334
334 335 patcher = mock.patch(
335 336 'rhodecode.lib.vcs.client.RequestScopeProxyFactory.__call__',
336 337 new=mocked_call)
337 338 patcher.start()
338 339
339 340 @request.addfinalizer
340 341 def undo_patching():
341 342 patcher.stop()
342 343
343 344
344 345 @pytest.fixture
345 346 def test_repo_group(request):
346 347 """
347 348 Create a temporary repository group, and destroy it after
348 349 usage automatically
349 350 """
350 351 fixture = Fixture()
351 352 repogroupid = 'test_repo_group_%s' % int(time.time())
352 353 repo_group = fixture.create_repo_group(repogroupid)
353 354
354 355 def _cleanup():
355 356 fixture.destroy_repo_group(repogroupid)
356 357
357 358 request.addfinalizer(_cleanup)
358 359 return repo_group
359 360
360 361
361 362 @pytest.fixture
362 363 def test_user_group(request):
363 364 """
364 365 Create a temporary user group, and destroy it after
365 366 usage automatically
366 367 """
367 368 fixture = Fixture()
368 369 usergroupid = 'test_user_group_%s' % int(time.time())
369 370 user_group = fixture.create_user_group(usergroupid)
370 371
371 372 def _cleanup():
372 373 fixture.destroy_user_group(user_group)
373 374
374 375 request.addfinalizer(_cleanup)
375 376 return user_group
376 377
377 378
378 379 @pytest.fixture(scope='session')
379 380 def test_repo(request):
380 381 container = TestRepoContainer()
381 382 request.addfinalizer(container._cleanup)
382 383 return container
383 384
384 385
385 386 class TestRepoContainer(object):
386 387 """
387 388 Container for test repositories which are used read only.
388 389
389 390 Repositories will be created on demand and re-used during the lifetime
390 391 of this object.
391 392
392 393 Usage to get the svn test repository "minimal"::
393 394
394 395 test_repo = TestContainer()
395 396 repo = test_repo('minimal', 'svn')
396 397
397 398 """
398 399
399 400 dump_extractors = {
400 401 'git': utils.extract_git_repo_from_dump,
401 402 'hg': utils.extract_hg_repo_from_dump,
402 403 'svn': utils.extract_svn_repo_from_dump,
403 404 }
404 405
405 406 def __init__(self):
406 407 self._cleanup_repos = []
407 408 self._fixture = Fixture()
408 409 self._repos = {}
409 410
410 411 def __call__(self, dump_name, backend_alias):
411 412 key = (dump_name, backend_alias)
412 413 if key not in self._repos:
413 414 repo = self._create_repo(dump_name, backend_alias)
414 415 self._repos[key] = repo.repo_id
415 416 return Repository.get(self._repos[key])
416 417
417 418 def _create_repo(self, dump_name, backend_alias):
418 419 repo_name = '%s-%s' % (backend_alias, dump_name)
419 420 backend_class = get_backend(backend_alias)
420 421 dump_extractor = self.dump_extractors[backend_alias]
421 422 repo_path = dump_extractor(dump_name, repo_name)
422 423 vcs_repo = backend_class(repo_path)
423 424 repo2db_mapper({repo_name: vcs_repo})
424 425 repo = RepoModel().get_by_repo_name(repo_name)
425 426 self._cleanup_repos.append(repo_name)
426 427 return repo
427 428
428 429 def _cleanup(self):
429 430 for repo_name in reversed(self._cleanup_repos):
430 431 self._fixture.destroy_repo(repo_name)
431 432
432 433
433 434 @pytest.fixture
434 435 def backend(request, backend_alias, pylonsapp, test_repo):
435 436 """
436 437 Parametrized fixture which represents a single backend implementation.
437 438
438 439 It respects the option `--backends` to focus the test run on specific
439 440 backend implementations.
440 441
441 442 It also supports `pytest.mark.xfail_backends` to mark tests as failing
442 443 for specific backends. This is intended as a utility for incremental
443 444 development of a new backend implementation.
444 445 """
445 446 if backend_alias not in request.config.getoption('--backends'):
446 447 pytest.skip("Backend %s not selected." % (backend_alias, ))
447 448
448 449 utils.check_xfail_backends(request.node, backend_alias)
449 450 utils.check_skip_backends(request.node, backend_alias)
450 451
451 452 repo_name = 'vcs_test_%s' % (backend_alias, )
452 453 backend = Backend(
453 454 alias=backend_alias,
454 455 repo_name=repo_name,
455 456 test_name=request.node.name,
456 457 test_repo_container=test_repo)
457 458 request.addfinalizer(backend.cleanup)
458 459 return backend
459 460
460 461
461 462 @pytest.fixture
462 463 def backend_git(request, pylonsapp, test_repo):
463 464 return backend(request, 'git', pylonsapp, test_repo)
464 465
465 466
466 467 @pytest.fixture
467 468 def backend_hg(request, pylonsapp, test_repo):
468 469 return backend(request, 'hg', pylonsapp, test_repo)
469 470
470 471
471 472 @pytest.fixture
472 473 def backend_svn(request, pylonsapp, test_repo):
473 474 return backend(request, 'svn', pylonsapp, test_repo)
474 475
475 476
476 477 @pytest.fixture
477 478 def backend_random(backend_git):
478 479 """
479 480 Use this to express that your tests need "a backend.
480 481
481 482 A few of our tests need a backend, so that we can run the code. This
482 483 fixture is intended to be used for such cases. It will pick one of the
483 484 backends and run the tests.
484 485
485 486 The fixture `backend` would run the test multiple times for each
486 487 available backend which is a pure waste of time if the test is
487 488 independent of the backend type.
488 489 """
489 490 # TODO: johbo: Change this to pick a random backend
490 491 return backend_git
491 492
492 493
493 494 @pytest.fixture
494 495 def backend_stub(backend_git):
495 496 """
496 497 Use this to express that your tests need a backend stub
497 498
498 499 TODO: mikhail: Implement a real stub logic instead of returning
499 500 a git backend
500 501 """
501 502 return backend_git
502 503
503 504
504 505 @pytest.fixture
505 506 def repo_stub(backend_stub):
506 507 """
507 508 Use this to express that your tests need a repository stub
508 509 """
509 510 return backend_stub.create_repo()
510 511
511 512
512 513 class Backend(object):
513 514 """
514 515 Represents the test configuration for one supported backend
515 516
516 517 Provides easy access to different test repositories based on
517 518 `__getitem__`. Such repositories will only be created once per test
518 519 session.
519 520 """
520 521
521 522 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
522 523 _master_repo = None
523 524 _commit_ids = {}
524 525
525 526 def __init__(self, alias, repo_name, test_name, test_repo_container):
526 527 self.alias = alias
527 528 self.repo_name = repo_name
528 529 self._cleanup_repos = []
529 530 self._test_name = test_name
530 531 self._test_repo_container = test_repo_container
531 532 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
532 533 # Fixture will survive in the end.
533 534 self._fixture = Fixture()
534 535
535 536 def __getitem__(self, key):
536 537 return self._test_repo_container(key, self.alias)
537 538
538 539 @property
539 540 def repo(self):
540 541 """
541 542 Returns the "current" repository. This is the vcs_test repo or the
542 543 last repo which has been created with `create_repo`.
543 544 """
544 545 from rhodecode.model.db import Repository
545 546 return Repository.get_by_repo_name(self.repo_name)
546 547
547 548 @property
548 549 def default_branch_name(self):
549 550 VcsRepository = get_backend(self.alias)
550 551 return VcsRepository.DEFAULT_BRANCH_NAME
551 552
552 553 @property
553 554 def default_head_id(self):
554 555 """
555 556 Returns the default head id of the underlying backend.
556 557
557 558 This will be the default branch name in case the backend does have a
558 559 default branch. In the other cases it will point to a valid head
559 560 which can serve as the base to create a new commit on top of it.
560 561 """
561 562 vcsrepo = self.repo.scm_instance()
562 563 head_id = (
563 564 vcsrepo.DEFAULT_BRANCH_NAME or
564 565 vcsrepo.commit_ids[-1])
565 566 return head_id
566 567
567 568 @property
568 569 def commit_ids(self):
569 570 """
570 571 Returns the list of commits for the last created repository
571 572 """
572 573 return self._commit_ids
573 574
574 575 def create_master_repo(self, commits):
575 576 """
576 577 Create a repository and remember it as a template.
577 578
578 579 This allows to easily create derived repositories to construct
579 580 more complex scenarios for diff, compare and pull requests.
580 581
581 582 Returns a commit map which maps from commit message to raw_id.
582 583 """
583 584 self._master_repo = self.create_repo(commits=commits)
584 585 return self._commit_ids
585 586
586 587 def create_repo(
587 588 self, commits=None, number_of_commits=0, heads=None,
588 589 name_suffix=u'', **kwargs):
589 590 """
590 591 Create a repository and record it for later cleanup.
591 592
592 593 :param commits: Optional. A sequence of dict instances.
593 594 Will add a commit per entry to the new repository.
594 595 :param number_of_commits: Optional. If set to a number, this number of
595 596 commits will be added to the new repository.
596 597 :param heads: Optional. Can be set to a sequence of of commit
597 598 names which shall be pulled in from the master repository.
598 599
599 600 """
600 601 self.repo_name = self._next_repo_name() + name_suffix
601 602 repo = self._fixture.create_repo(
602 603 self.repo_name, repo_type=self.alias, **kwargs)
603 604 self._cleanup_repos.append(repo.repo_name)
604 605
605 606 commits = commits or [
606 607 {'message': 'Commit %s of %s' % (x, self.repo_name)}
607 608 for x in xrange(number_of_commits)]
608 609 self._add_commits_to_repo(repo.scm_instance(), commits)
609 610 if heads:
610 611 self.pull_heads(repo, heads)
611 612
612 613 return repo
613 614
614 615 def pull_heads(self, repo, heads):
615 616 """
616 617 Make sure that repo contains all commits mentioned in `heads`
617 618 """
618 619 vcsmaster = self._master_repo.scm_instance()
619 620 vcsrepo = repo.scm_instance()
620 621 vcsrepo.config.clear_section('hooks')
621 622 commit_ids = [self._commit_ids[h] for h in heads]
622 623 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
623 624
624 625 def create_fork(self):
625 626 repo_to_fork = self.repo_name
626 627 self.repo_name = self._next_repo_name()
627 628 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
628 629 self._cleanup_repos.append(self.repo_name)
629 630 return repo
630 631
631 632 def new_repo_name(self, suffix=u''):
632 633 self.repo_name = self._next_repo_name() + suffix
633 634 self._cleanup_repos.append(self.repo_name)
634 635 return self.repo_name
635 636
636 637 def _next_repo_name(self):
637 638 return u"%s_%s" % (
638 639 self.invalid_repo_name.sub(u'_', self._test_name),
639 640 len(self._cleanup_repos))
640 641
641 642 def ensure_file(self, filename, content='Test content\n'):
642 643 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
643 644 commits = [
644 645 {'added': [
645 646 FileNode(filename, content=content),
646 647 ]},
647 648 ]
648 649 self._add_commits_to_repo(self.repo.scm_instance(), commits)
649 650
650 651 def enable_downloads(self):
651 652 repo = self.repo
652 653 repo.enable_downloads = True
653 654 Session().add(repo)
654 655 Session().commit()
655 656
656 657 def cleanup(self):
657 658 for repo_name in reversed(self._cleanup_repos):
658 659 self._fixture.destroy_repo(repo_name)
659 660
660 661 def _add_commits_to_repo(self, repo, commits):
661 662 commit_ids = _add_commits_to_repo(repo, commits)
662 663 if not commit_ids:
663 664 return
664 665 self._commit_ids = commit_ids
665 666
666 667 # Creating refs for Git to allow fetching them from remote repository
667 668 if self.alias == 'git':
668 669 refs = {}
669 670 for message in self._commit_ids:
670 671 # TODO: mikhail: do more special chars replacements
671 672 ref_name = 'refs/test-refs/{}'.format(
672 673 message.replace(' ', ''))
673 674 refs[ref_name] = self._commit_ids[message]
674 675 self._create_refs(repo, refs)
675 676
676 677 def _create_refs(self, repo, refs):
677 678 for ref_name in refs:
678 679 repo.set_refs(ref_name, refs[ref_name])
679 680
680 681
681 682 @pytest.fixture
682 683 def vcsbackend(request, backend_alias, tests_tmp_path, pylonsapp, test_repo):
683 684 """
684 685 Parametrized fixture which represents a single vcs backend implementation.
685 686
686 687 See the fixture `backend` for more details. This one implements the same
687 688 concept, but on vcs level. So it does not provide model instances etc.
688 689
689 690 Parameters are generated dynamically, see :func:`pytest_generate_tests`
690 691 for how this works.
691 692 """
692 693 if backend_alias not in request.config.getoption('--backends'):
693 694 pytest.skip("Backend %s not selected." % (backend_alias, ))
694 695
695 696 utils.check_xfail_backends(request.node, backend_alias)
696 697 utils.check_skip_backends(request.node, backend_alias)
697 698
698 699 repo_name = 'vcs_test_%s' % (backend_alias, )
699 700 repo_path = os.path.join(tests_tmp_path, repo_name)
700 701 backend = VcsBackend(
701 702 alias=backend_alias,
702 703 repo_path=repo_path,
703 704 test_name=request.node.name,
704 705 test_repo_container=test_repo)
705 706 request.addfinalizer(backend.cleanup)
706 707 return backend
707 708
708 709
709 710 @pytest.fixture
710 711 def vcsbackend_git(request, tests_tmp_path, pylonsapp, test_repo):
711 712 return vcsbackend(request, 'git', tests_tmp_path, pylonsapp, test_repo)
712 713
713 714
714 715 @pytest.fixture
715 716 def vcsbackend_hg(request, tests_tmp_path, pylonsapp, test_repo):
716 717 return vcsbackend(request, 'hg', tests_tmp_path, pylonsapp, test_repo)
717 718
718 719
719 720 @pytest.fixture
720 721 def vcsbackend_svn(request, tests_tmp_path, pylonsapp, test_repo):
721 722 return vcsbackend(request, 'svn', tests_tmp_path, pylonsapp, test_repo)
722 723
723 724
724 725 @pytest.fixture
725 726 def vcsbackend_random(vcsbackend_git):
726 727 """
727 728 Use this to express that your tests need "a vcsbackend".
728 729
729 730 The fixture `vcsbackend` would run the test multiple times for each
730 731 available vcs backend which is a pure waste of time if the test is
731 732 independent of the vcs backend type.
732 733 """
733 734 # TODO: johbo: Change this to pick a random backend
734 735 return vcsbackend_git
735 736
736 737
737 738 @pytest.fixture
738 739 def vcsbackend_stub(vcsbackend_git):
739 740 """
740 741 Use this to express that your test just needs a stub of a vcsbackend.
741 742
742 743 Plan is to eventually implement an in-memory stub to speed tests up.
743 744 """
744 745 return vcsbackend_git
745 746
746 747
747 748 class VcsBackend(object):
748 749 """
749 750 Represents the test configuration for one supported vcs backend.
750 751 """
751 752
752 753 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
753 754
754 755 def __init__(self, alias, repo_path, test_name, test_repo_container):
755 756 self.alias = alias
756 757 self._repo_path = repo_path
757 758 self._cleanup_repos = []
758 759 self._test_name = test_name
759 760 self._test_repo_container = test_repo_container
760 761
761 762 def __getitem__(self, key):
762 763 return self._test_repo_container(key, self.alias).scm_instance()
763 764
764 765 @property
765 766 def repo(self):
766 767 """
767 768 Returns the "current" repository. This is the vcs_test repo of the last
768 769 repo which has been created.
769 770 """
770 771 Repository = get_backend(self.alias)
771 772 return Repository(self._repo_path)
772 773
773 774 @property
774 775 def backend(self):
775 776 """
776 777 Returns the backend implementation class.
777 778 """
778 779 return get_backend(self.alias)
779 780
780 781 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None):
781 782 repo_name = self._next_repo_name()
782 783 self._repo_path = get_new_dir(repo_name)
783 784 repo_class = get_backend(self.alias)
784 785 src_url = None
785 786 if _clone_repo:
786 787 src_url = _clone_repo.path
787 788 repo = repo_class(self._repo_path, create=True, src_url=src_url)
788 789 self._cleanup_repos.append(repo)
789 790
790 791 commits = commits or [
791 792 {'message': 'Commit %s of %s' % (x, repo_name)}
792 793 for x in xrange(number_of_commits)]
793 794 _add_commits_to_repo(repo, commits)
794 795 return repo
795 796
796 797 def clone_repo(self, repo):
797 798 return self.create_repo(_clone_repo=repo)
798 799
799 800 def cleanup(self):
800 801 for repo in self._cleanup_repos:
801 802 shutil.rmtree(repo.path)
802 803
803 804 def new_repo_path(self):
804 805 repo_name = self._next_repo_name()
805 806 self._repo_path = get_new_dir(repo_name)
806 807 return self._repo_path
807 808
808 809 def _next_repo_name(self):
809 810 return "%s_%s" % (
810 811 self.invalid_repo_name.sub('_', self._test_name),
811 812 len(self._cleanup_repos))
812 813
813 814 def add_file(self, repo, filename, content='Test content\n'):
814 815 imc = repo.in_memory_commit
815 816 imc.add(FileNode(filename, content=content))
816 817 imc.commit(
817 818 message=u'Automatic commit from vcsbackend fixture',
818 819 author=u'Automatic')
819 820
820 821 def ensure_file(self, filename, content='Test content\n'):
821 822 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
822 823 self.add_file(self.repo, filename, content)
823 824
824 825
825 826 def _add_commits_to_repo(vcs_repo, commits):
826 827 commit_ids = {}
827 828 if not commits:
828 829 return commit_ids
829 830
830 831 imc = vcs_repo.in_memory_commit
831 832 commit = None
832 833
833 834 for idx, commit in enumerate(commits):
834 835 message = unicode(commit.get('message', 'Commit %s' % idx))
835 836
836 837 for node in commit.get('added', []):
837 838 imc.add(FileNode(node.path, content=node.content))
838 839 for node in commit.get('changed', []):
839 840 imc.change(FileNode(node.path, content=node.content))
840 841 for node in commit.get('removed', []):
841 842 imc.remove(FileNode(node.path))
842 843
843 844 parents = [
844 845 vcs_repo.get_commit(commit_id=commit_ids[p])
845 846 for p in commit.get('parents', [])]
846 847
847 848 operations = ('added', 'changed', 'removed')
848 849 if not any((commit.get(o) for o in operations)):
849 850 imc.add(FileNode('file_%s' % idx, content=message))
850 851
851 852 commit = imc.commit(
852 853 message=message,
853 854 author=unicode(commit.get('author', 'Automatic')),
854 855 date=commit.get('date'),
855 856 branch=commit.get('branch'),
856 857 parents=parents)
857 858
858 859 commit_ids[commit.message] = commit.raw_id
859 860
860 861 return commit_ids
861 862
862 863
863 864 @pytest.fixture
864 865 def reposerver(request):
865 866 """
866 867 Allows to serve a backend repository
867 868 """
868 869
869 870 repo_server = RepoServer()
870 871 request.addfinalizer(repo_server.cleanup)
871 872 return repo_server
872 873
873 874
874 875 class RepoServer(object):
875 876 """
876 877 Utility to serve a local repository for the duration of a test case.
877 878
878 879 Supports only Subversion so far.
879 880 """
880 881
881 882 url = None
882 883
883 884 def __init__(self):
884 885 self._cleanup_servers = []
885 886
886 887 def serve(self, vcsrepo):
887 888 if vcsrepo.alias != 'svn':
888 889 raise TypeError("Backend %s not supported" % vcsrepo.alias)
889 890
890 891 proc = subprocess32.Popen(
891 892 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
892 893 '--root', vcsrepo.path])
893 894 self._cleanup_servers.append(proc)
894 895 self.url = 'svn://localhost'
895 896
896 897 def cleanup(self):
897 898 for proc in self._cleanup_servers:
898 899 proc.terminate()
899 900
900 901
901 902 @pytest.fixture
902 903 def pr_util(backend, request):
903 904 """
904 905 Utility for tests of models and for functional tests around pull requests.
905 906
906 907 It gives an instance of :class:`PRTestUtility` which provides various
907 908 utility methods around one pull request.
908 909
909 910 This fixture uses `backend` and inherits its parameterization.
910 911 """
911 912
912 913 util = PRTestUtility(backend)
913 914
914 915 @request.addfinalizer
915 916 def cleanup():
916 917 util.cleanup()
917 918
918 919 return util
919 920
920 921
921 922 class PRTestUtility(object):
922 923
923 924 pull_request = None
924 925 pull_request_id = None
925 926 mergeable_patcher = None
926 927 mergeable_mock = None
927 928 notification_patcher = None
928 929
929 930 def __init__(self, backend):
930 931 self.backend = backend
931 932
932 933 def create_pull_request(
933 934 self, commits=None, target_head=None, source_head=None,
934 935 revisions=None, approved=False, author=None, mergeable=False,
935 936 enable_notifications=True, name_suffix=u'', reviewers=None,
936 937 title=u"Test", description=u"Description"):
937 938 self.set_mergeable(mergeable)
938 939 if not enable_notifications:
939 940 # mock notification side effect
940 941 self.notification_patcher = mock.patch(
941 942 'rhodecode.model.notification.NotificationModel.create')
942 943 self.notification_patcher.start()
943 944
944 945 if not self.pull_request:
945 946 if not commits:
946 947 commits = [
947 948 {'message': 'c1'},
948 949 {'message': 'c2'},
949 950 {'message': 'c3'},
950 951 ]
951 952 target_head = 'c1'
952 953 source_head = 'c2'
953 954 revisions = ['c2']
954 955
955 956 self.commit_ids = self.backend.create_master_repo(commits)
956 957 self.target_repository = self.backend.create_repo(
957 958 heads=[target_head], name_suffix=name_suffix)
958 959 self.source_repository = self.backend.create_repo(
959 960 heads=[source_head], name_suffix=name_suffix)
960 961 self.author = author or UserModel().get_by_username(
961 962 TEST_USER_ADMIN_LOGIN)
962 963
963 964 model = PullRequestModel()
964 965 self.create_parameters = {
965 966 'created_by': self.author,
966 967 'source_repo': self.source_repository.repo_name,
967 968 'source_ref': self._default_branch_reference(source_head),
968 969 'target_repo': self.target_repository.repo_name,
969 970 'target_ref': self._default_branch_reference(target_head),
970 971 'revisions': [self.commit_ids[r] for r in revisions],
971 972 'reviewers': reviewers or self._get_reviewers(),
972 973 'title': title,
973 974 'description': description,
974 975 }
975 976 self.pull_request = model.create(**self.create_parameters)
976 977 assert model.get_versions(self.pull_request) == []
977 978
978 979 self.pull_request_id = self.pull_request.pull_request_id
979 980
980 981 if approved:
981 982 self.approve()
982 983
983 984 Session().add(self.pull_request)
984 985 Session().commit()
985 986
986 987 return self.pull_request
987 988
988 989 def approve(self):
989 990 self.create_status_votes(
990 991 ChangesetStatus.STATUS_APPROVED,
991 992 *self.pull_request.reviewers)
992 993
993 994 def close(self):
994 995 PullRequestModel().close_pull_request(self.pull_request, self.author)
995 996
996 997 def _default_branch_reference(self, commit_message):
997 998 reference = '%s:%s:%s' % (
998 999 'branch',
999 1000 self.backend.default_branch_name,
1000 1001 self.commit_ids[commit_message])
1001 1002 return reference
1002 1003
1003 1004 def _get_reviewers(self):
1004 1005 model = UserModel()
1005 1006 return [
1006 1007 model.get_by_username(TEST_USER_REGULAR_LOGIN),
1007 1008 model.get_by_username(TEST_USER_REGULAR2_LOGIN),
1008 1009 ]
1009 1010
1010 1011 def update_source_repository(self, head=None):
1011 1012 heads = [head or 'c3']
1012 1013 self.backend.pull_heads(self.source_repository, heads=heads)
1013 1014
1014 1015 def add_one_commit(self, head=None):
1015 1016 self.update_source_repository(head=head)
1016 1017 old_commit_ids = set(self.pull_request.revisions)
1017 1018 PullRequestModel().update_commits(self.pull_request)
1018 1019 commit_ids = set(self.pull_request.revisions)
1019 1020 new_commit_ids = commit_ids - old_commit_ids
1020 1021 assert len(new_commit_ids) == 1
1021 1022 return new_commit_ids.pop()
1022 1023
1023 1024 def remove_one_commit(self):
1024 1025 assert len(self.pull_request.revisions) == 2
1025 1026 source_vcs = self.source_repository.scm_instance()
1026 1027 removed_commit_id = source_vcs.commit_ids[-1]
1027 1028
1028 1029 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1029 1030 # remove the if once that's sorted out.
1030 1031 if self.backend.alias == "git":
1031 1032 kwargs = {'branch_name': self.backend.default_branch_name}
1032 1033 else:
1033 1034 kwargs = {}
1034 1035 source_vcs.strip(removed_commit_id, **kwargs)
1035 1036
1036 1037 PullRequestModel().update_commits(self.pull_request)
1037 1038 assert len(self.pull_request.revisions) == 1
1038 1039 return removed_commit_id
1039 1040
1040 1041 def create_comment(self, linked_to=None):
1041 1042 comment = CommentsModel().create(
1042 1043 text=u"Test comment",
1043 1044 repo=self.target_repository.repo_name,
1044 1045 user=self.author,
1045 1046 pull_request=self.pull_request)
1046 1047 assert comment.pull_request_version_id is None
1047 1048
1048 1049 if linked_to:
1049 1050 PullRequestModel()._link_comments_to_version(linked_to)
1050 1051
1051 1052 return comment
1052 1053
1053 1054 def create_inline_comment(
1054 1055 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1055 1056 comment = CommentsModel().create(
1056 1057 text=u"Test comment",
1057 1058 repo=self.target_repository.repo_name,
1058 1059 user=self.author,
1059 1060 line_no=line_no,
1060 1061 f_path=file_path,
1061 1062 pull_request=self.pull_request)
1062 1063 assert comment.pull_request_version_id is None
1063 1064
1064 1065 if linked_to:
1065 1066 PullRequestModel()._link_comments_to_version(linked_to)
1066 1067
1067 1068 return comment
1068 1069
1069 1070 def create_version_of_pull_request(self):
1070 1071 pull_request = self.create_pull_request()
1071 1072 version = PullRequestModel()._create_version_from_snapshot(
1072 1073 pull_request)
1073 1074 return version
1074 1075
1075 1076 def create_status_votes(self, status, *reviewers):
1076 1077 for reviewer in reviewers:
1077 1078 ChangesetStatusModel().set_status(
1078 1079 repo=self.pull_request.target_repo,
1079 1080 status=status,
1080 1081 user=reviewer.user_id,
1081 1082 pull_request=self.pull_request)
1082 1083
1083 1084 def set_mergeable(self, value):
1084 1085 if not self.mergeable_patcher:
1085 1086 self.mergeable_patcher = mock.patch.object(
1086 1087 VcsSettingsModel, 'get_general_settings')
1087 1088 self.mergeable_mock = self.mergeable_patcher.start()
1088 1089 self.mergeable_mock.return_value = {
1089 1090 'rhodecode_pr_merge_enabled': value}
1090 1091
1091 1092 def cleanup(self):
1092 1093 # In case the source repository is already cleaned up, the pull
1093 1094 # request will already be deleted.
1094 1095 pull_request = PullRequest().get(self.pull_request_id)
1095 1096 if pull_request:
1096 1097 PullRequestModel().delete(pull_request)
1097 1098 Session().commit()
1098 1099
1099 1100 if self.notification_patcher:
1100 1101 self.notification_patcher.stop()
1101 1102
1102 1103 if self.mergeable_patcher:
1103 1104 self.mergeable_patcher.stop()
1104 1105
1105 1106
1106 1107 @pytest.fixture
1107 1108 def user_admin(pylonsapp):
1108 1109 """
1109 1110 Provides the default admin test user as an instance of `db.User`.
1110 1111 """
1111 1112 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1112 1113 return user
1113 1114
1114 1115
1115 1116 @pytest.fixture
1116 1117 def user_regular(pylonsapp):
1117 1118 """
1118 1119 Provides the default regular test user as an instance of `db.User`.
1119 1120 """
1120 1121 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1121 1122 return user
1122 1123
1123 1124
1124 1125 @pytest.fixture
1125 1126 def user_util(request, pylonsapp):
1126 1127 """
1127 1128 Provides a wired instance of `UserUtility` with integrated cleanup.
1128 1129 """
1129 1130 utility = UserUtility(test_name=request.node.name)
1130 1131 request.addfinalizer(utility.cleanup)
1131 1132 return utility
1132 1133
1133 1134
1134 1135 # TODO: johbo: Split this up into utilities per domain or something similar
1135 1136 class UserUtility(object):
1136 1137
1137 1138 def __init__(self, test_name="test"):
1138 1139 self._test_name = self._sanitize_name(test_name)
1139 1140 self.fixture = Fixture()
1140 1141 self.repo_group_ids = []
1141 1142 self.repos_ids = []
1142 1143 self.user_ids = []
1143 1144 self.user_group_ids = []
1144 1145 self.user_repo_permission_ids = []
1145 1146 self.user_group_repo_permission_ids = []
1146 1147 self.user_repo_group_permission_ids = []
1147 1148 self.user_group_repo_group_permission_ids = []
1148 1149 self.user_user_group_permission_ids = []
1149 1150 self.user_group_user_group_permission_ids = []
1150 1151 self.user_permissions = []
1151 1152
1152 1153 def _sanitize_name(self, name):
1153 1154 for char in ['[', ']']:
1154 1155 name = name.replace(char, '_')
1155 1156 return name
1156 1157
1157 1158 def create_repo_group(
1158 1159 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1159 1160 group_name = "{prefix}_repogroup_{count}".format(
1160 1161 prefix=self._test_name,
1161 1162 count=len(self.repo_group_ids))
1162 1163 repo_group = self.fixture.create_repo_group(
1163 1164 group_name, cur_user=owner)
1164 1165 if auto_cleanup:
1165 1166 self.repo_group_ids.append(repo_group.group_id)
1166 1167 return repo_group
1167 1168
1168 1169 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None, auto_cleanup=True):
1169 1170 repo_name = "{prefix}_repository_{count}".format(
1170 1171 prefix=self._test_name,
1171 1172 count=len(self.repos_ids))
1172 1173
1173 1174 repository = self.fixture.create_repo(
1174 1175 repo_name, cur_user=owner, repo_group=parent)
1175 1176 if auto_cleanup:
1176 1177 self.repos_ids.append(repository.repo_id)
1177 1178 return repository
1178 1179
1179 1180 def create_user(self, auto_cleanup=True, **kwargs):
1180 1181 user_name = "{prefix}_user_{count}".format(
1181 1182 prefix=self._test_name,
1182 1183 count=len(self.user_ids))
1183 1184 user = self.fixture.create_user(user_name, **kwargs)
1184 1185 if auto_cleanup:
1185 1186 self.user_ids.append(user.user_id)
1186 1187 return user
1187 1188
1188 1189 def create_user_with_group(self):
1189 1190 user = self.create_user()
1190 1191 user_group = self.create_user_group(members=[user])
1191 1192 return user, user_group
1192 1193
1193 1194 def create_user_group(self, members=None, auto_cleanup=True, **kwargs):
1194 1195 group_name = "{prefix}_usergroup_{count}".format(
1195 1196 prefix=self._test_name,
1196 1197 count=len(self.user_group_ids))
1197 1198 user_group = self.fixture.create_user_group(group_name, **kwargs)
1198 1199 if auto_cleanup:
1199 1200 self.user_group_ids.append(user_group.users_group_id)
1200 1201 if members:
1201 1202 for user in members:
1202 1203 UserGroupModel().add_user_to_group(user_group, user)
1203 1204 return user_group
1204 1205
1205 1206 def grant_user_permission(self, user_name, permission_name):
1206 1207 self._inherit_default_user_permissions(user_name, False)
1207 1208 self.user_permissions.append((user_name, permission_name))
1208 1209
1209 1210 def grant_user_permission_to_repo_group(
1210 1211 self, repo_group, user, permission_name):
1211 1212 permission = RepoGroupModel().grant_user_permission(
1212 1213 repo_group, user, permission_name)
1213 1214 self.user_repo_group_permission_ids.append(
1214 1215 (repo_group.group_id, user.user_id))
1215 1216 return permission
1216 1217
1217 1218 def grant_user_group_permission_to_repo_group(
1218 1219 self, repo_group, user_group, permission_name):
1219 1220 permission = RepoGroupModel().grant_user_group_permission(
1220 1221 repo_group, user_group, permission_name)
1221 1222 self.user_group_repo_group_permission_ids.append(
1222 1223 (repo_group.group_id, user_group.users_group_id))
1223 1224 return permission
1224 1225
1225 1226 def grant_user_permission_to_repo(
1226 1227 self, repo, user, permission_name):
1227 1228 permission = RepoModel().grant_user_permission(
1228 1229 repo, user, permission_name)
1229 1230 self.user_repo_permission_ids.append(
1230 1231 (repo.repo_id, user.user_id))
1231 1232 return permission
1232 1233
1233 1234 def grant_user_group_permission_to_repo(
1234 1235 self, repo, user_group, permission_name):
1235 1236 permission = RepoModel().grant_user_group_permission(
1236 1237 repo, user_group, permission_name)
1237 1238 self.user_group_repo_permission_ids.append(
1238 1239 (repo.repo_id, user_group.users_group_id))
1239 1240 return permission
1240 1241
1241 1242 def grant_user_permission_to_user_group(
1242 1243 self, target_user_group, user, permission_name):
1243 1244 permission = UserGroupModel().grant_user_permission(
1244 1245 target_user_group, user, permission_name)
1245 1246 self.user_user_group_permission_ids.append(
1246 1247 (target_user_group.users_group_id, user.user_id))
1247 1248 return permission
1248 1249
1249 1250 def grant_user_group_permission_to_user_group(
1250 1251 self, target_user_group, user_group, permission_name):
1251 1252 permission = UserGroupModel().grant_user_group_permission(
1252 1253 target_user_group, user_group, permission_name)
1253 1254 self.user_group_user_group_permission_ids.append(
1254 1255 (target_user_group.users_group_id, user_group.users_group_id))
1255 1256 return permission
1256 1257
1257 1258 def revoke_user_permission(self, user_name, permission_name):
1258 1259 self._inherit_default_user_permissions(user_name, True)
1259 1260 UserModel().revoke_perm(user_name, permission_name)
1260 1261
1261 1262 def _inherit_default_user_permissions(self, user_name, value):
1262 1263 user = UserModel().get_by_username(user_name)
1263 1264 user.inherit_default_permissions = value
1264 1265 Session().add(user)
1265 1266 Session().commit()
1266 1267
1267 1268 def cleanup(self):
1268 1269 self._cleanup_permissions()
1269 1270 self._cleanup_repos()
1270 1271 self._cleanup_repo_groups()
1271 1272 self._cleanup_user_groups()
1272 1273 self._cleanup_users()
1273 1274
1274 1275 def _cleanup_permissions(self):
1275 1276 if self.user_permissions:
1276 1277 for user_name, permission_name in self.user_permissions:
1277 1278 self.revoke_user_permission(user_name, permission_name)
1278 1279
1279 1280 for permission in self.user_repo_permission_ids:
1280 1281 RepoModel().revoke_user_permission(*permission)
1281 1282
1282 1283 for permission in self.user_group_repo_permission_ids:
1283 1284 RepoModel().revoke_user_group_permission(*permission)
1284 1285
1285 1286 for permission in self.user_repo_group_permission_ids:
1286 1287 RepoGroupModel().revoke_user_permission(*permission)
1287 1288
1288 1289 for permission in self.user_group_repo_group_permission_ids:
1289 1290 RepoGroupModel().revoke_user_group_permission(*permission)
1290 1291
1291 1292 for permission in self.user_user_group_permission_ids:
1292 1293 UserGroupModel().revoke_user_permission(*permission)
1293 1294
1294 1295 for permission in self.user_group_user_group_permission_ids:
1295 1296 UserGroupModel().revoke_user_group_permission(*permission)
1296 1297
1297 1298 def _cleanup_repo_groups(self):
1298 1299 def _repo_group_compare(first_group_id, second_group_id):
1299 1300 """
1300 1301 Gives higher priority to the groups with the most complex paths
1301 1302 """
1302 1303 first_group = RepoGroup.get(first_group_id)
1303 1304 second_group = RepoGroup.get(second_group_id)
1304 1305 first_group_parts = (
1305 1306 len(first_group.group_name.split('/')) if first_group else 0)
1306 1307 second_group_parts = (
1307 1308 len(second_group.group_name.split('/')) if second_group else 0)
1308 1309 return cmp(second_group_parts, first_group_parts)
1309 1310
1310 1311 sorted_repo_group_ids = sorted(
1311 1312 self.repo_group_ids, cmp=_repo_group_compare)
1312 1313 for repo_group_id in sorted_repo_group_ids:
1313 1314 self.fixture.destroy_repo_group(repo_group_id)
1314 1315
1315 1316 def _cleanup_repos(self):
1316 1317 sorted_repos_ids = sorted(self.repos_ids)
1317 1318 for repo_id in sorted_repos_ids:
1318 1319 self.fixture.destroy_repo(repo_id)
1319 1320
1320 1321 def _cleanup_user_groups(self):
1321 1322 def _user_group_compare(first_group_id, second_group_id):
1322 1323 """
1323 1324 Gives higher priority to the groups with the most complex paths
1324 1325 """
1325 1326 first_group = UserGroup.get(first_group_id)
1326 1327 second_group = UserGroup.get(second_group_id)
1327 1328 first_group_parts = (
1328 1329 len(first_group.users_group_name.split('/'))
1329 1330 if first_group else 0)
1330 1331 second_group_parts = (
1331 1332 len(second_group.users_group_name.split('/'))
1332 1333 if second_group else 0)
1333 1334 return cmp(second_group_parts, first_group_parts)
1334 1335
1335 1336 sorted_user_group_ids = sorted(
1336 1337 self.user_group_ids, cmp=_user_group_compare)
1337 1338 for user_group_id in sorted_user_group_ids:
1338 1339 self.fixture.destroy_user_group(user_group_id)
1339 1340
1340 1341 def _cleanup_users(self):
1341 1342 for user_id in self.user_ids:
1342 1343 self.fixture.destroy_user(user_id)
1343 1344
1344 1345
1345 1346 # TODO: Think about moving this into a pytest-pyro package and make it a
1346 1347 # pytest plugin
1347 1348 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1348 1349 def pytest_runtest_makereport(item, call):
1349 1350 """
1350 1351 Adding the remote traceback if the exception has this information.
1351 1352
1352 1353 Pyro4 attaches this information as the attribute `_vcs_server_traceback`
1353 1354 to the exception instance.
1354 1355 """
1355 1356 outcome = yield
1356 1357 report = outcome.get_result()
1357 1358 if call.excinfo:
1358 1359 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1359 1360
1360 1361
1361 1362 def _add_vcsserver_remote_traceback(report, exc):
1362 1363 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1363 1364
1364 1365 if vcsserver_traceback:
1365 1366 section = 'VCSServer remote traceback ' + report.when
1366 1367 report.sections.append((section, vcsserver_traceback))
1367 1368
1368 1369
1369 1370 @pytest.fixture(scope='session')
1370 1371 def testrun():
1371 1372 return {
1372 1373 'uuid': uuid.uuid4(),
1373 1374 'start': datetime.datetime.utcnow().isoformat(),
1374 1375 'timestamp': int(time.time()),
1375 1376 }
1376 1377
1377 1378
1378 1379 @pytest.fixture(autouse=True)
1379 1380 def collect_appenlight_stats(request, testrun):
1380 1381 """
1381 1382 This fixture reports memory consumtion of single tests.
1382 1383
1383 1384 It gathers data based on `psutil` and sends them to Appenlight. The option
1384 1385 ``--ae`` has te be used to enable this fixture and the API key for your
1385 1386 application has to be provided in ``--ae-key``.
1386 1387 """
1387 1388 try:
1388 1389 # cygwin cannot have yet psutil support.
1389 1390 import psutil
1390 1391 except ImportError:
1391 1392 return
1392 1393
1393 1394 if not request.config.getoption('--appenlight'):
1394 1395 return
1395 1396 else:
1396 1397 # Only request the pylonsapp fixture if appenlight tracking is
1397 1398 # enabled. This will speed up a test run of unit tests by 2 to 3
1398 1399 # seconds if appenlight is not enabled.
1399 1400 pylonsapp = request.getfuncargvalue("pylonsapp")
1400 1401 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1401 1402 client = AppenlightClient(
1402 1403 url=url,
1403 1404 api_key=request.config.getoption('--appenlight-api-key'),
1404 1405 namespace=request.node.nodeid,
1405 1406 request=str(testrun['uuid']),
1406 1407 testrun=testrun)
1407 1408
1408 1409 client.collect({
1409 1410 'message': "Starting",
1410 1411 })
1411 1412
1412 1413 server_and_port = pylonsapp.config['vcs.server']
1413 1414 server = create_vcsserver_proxy(server_and_port)
1414 1415 with server:
1415 1416 vcs_pid = server.get_pid()
1416 1417 server.run_gc()
1417 1418 vcs_process = psutil.Process(vcs_pid)
1418 1419 mem = vcs_process.memory_info()
1419 1420 client.tag_before('vcsserver.rss', mem.rss)
1420 1421 client.tag_before('vcsserver.vms', mem.vms)
1421 1422
1422 1423 test_process = psutil.Process()
1423 1424 mem = test_process.memory_info()
1424 1425 client.tag_before('test.rss', mem.rss)
1425 1426 client.tag_before('test.vms', mem.vms)
1426 1427
1427 1428 client.tag_before('time', time.time())
1428 1429
1429 1430 @request.addfinalizer
1430 1431 def send_stats():
1431 1432 client.tag_after('time', time.time())
1432 1433 with server:
1433 1434 gc_stats = server.run_gc()
1434 1435 for tag, value in gc_stats.items():
1435 1436 client.tag_after(tag, value)
1436 1437 mem = vcs_process.memory_info()
1437 1438 client.tag_after('vcsserver.rss', mem.rss)
1438 1439 client.tag_after('vcsserver.vms', mem.vms)
1439 1440
1440 1441 mem = test_process.memory_info()
1441 1442 client.tag_after('test.rss', mem.rss)
1442 1443 client.tag_after('test.vms', mem.vms)
1443 1444
1444 1445 client.collect({
1445 1446 'message': "Finished",
1446 1447 })
1447 1448 client.send_stats()
1448 1449
1449 1450 return client
1450 1451
1451 1452
1452 1453 class AppenlightClient():
1453 1454
1454 1455 url_template = '{url}?protocol_version=0.5'
1455 1456
1456 1457 def __init__(
1457 1458 self, url, api_key, add_server=True, add_timestamp=True,
1458 1459 namespace=None, request=None, testrun=None):
1459 1460 self.url = self.url_template.format(url=url)
1460 1461 self.api_key = api_key
1461 1462 self.add_server = add_server
1462 1463 self.add_timestamp = add_timestamp
1463 1464 self.namespace = namespace
1464 1465 self.request = request
1465 1466 self.server = socket.getfqdn(socket.gethostname())
1466 1467 self.tags_before = {}
1467 1468 self.tags_after = {}
1468 1469 self.stats = []
1469 1470 self.testrun = testrun or {}
1470 1471
1471 1472 def tag_before(self, tag, value):
1472 1473 self.tags_before[tag] = value
1473 1474
1474 1475 def tag_after(self, tag, value):
1475 1476 self.tags_after[tag] = value
1476 1477
1477 1478 def collect(self, data):
1478 1479 if self.add_server:
1479 1480 data.setdefault('server', self.server)
1480 1481 if self.add_timestamp:
1481 1482 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1482 1483 if self.namespace:
1483 1484 data.setdefault('namespace', self.namespace)
1484 1485 if self.request:
1485 1486 data.setdefault('request', self.request)
1486 1487 self.stats.append(data)
1487 1488
1488 1489 def send_stats(self):
1489 1490 tags = [
1490 1491 ('testrun', self.request),
1491 1492 ('testrun.start', self.testrun['start']),
1492 1493 ('testrun.timestamp', self.testrun['timestamp']),
1493 1494 ('test', self.namespace),
1494 1495 ]
1495 1496 for key, value in self.tags_before.items():
1496 1497 tags.append((key + '.before', value))
1497 1498 try:
1498 1499 delta = self.tags_after[key] - value
1499 1500 tags.append((key + '.delta', delta))
1500 1501 except Exception:
1501 1502 pass
1502 1503 for key, value in self.tags_after.items():
1503 1504 tags.append((key + '.after', value))
1504 1505 self.collect({
1505 1506 'message': "Collected tags",
1506 1507 'tags': tags,
1507 1508 })
1508 1509
1509 1510 response = requests.post(
1510 1511 self.url,
1511 1512 headers={
1512 1513 'X-appenlight-api-key': self.api_key},
1513 1514 json=self.stats,
1514 1515 )
1515 1516
1516 1517 if not response.status_code == 200:
1517 1518 pprint.pprint(self.stats)
1518 1519 print response.headers
1519 1520 print response.text
1520 1521 raise Exception('Sending to appenlight failed')
1521 1522
1522 1523
1523 1524 @pytest.fixture
1524 1525 def gist_util(request, pylonsapp):
1525 1526 """
1526 1527 Provides a wired instance of `GistUtility` with integrated cleanup.
1527 1528 """
1528 1529 utility = GistUtility()
1529 1530 request.addfinalizer(utility.cleanup)
1530 1531 return utility
1531 1532
1532 1533
1533 1534 class GistUtility(object):
1534 1535 def __init__(self):
1535 1536 self.fixture = Fixture()
1536 1537 self.gist_ids = []
1537 1538
1538 1539 def create_gist(self, **kwargs):
1539 1540 gist = self.fixture.create_gist(**kwargs)
1540 1541 self.gist_ids.append(gist.gist_id)
1541 1542 return gist
1542 1543
1543 1544 def cleanup(self):
1544 1545 for id_ in self.gist_ids:
1545 1546 self.fixture.destroy_gists(str(id_))
1546 1547
1547 1548
1548 1549 @pytest.fixture
1549 1550 def enabled_backends(request):
1550 1551 backends = request.config.option.backends
1551 1552 return backends[:]
1552 1553
1553 1554
1554 1555 @pytest.fixture
1555 1556 def settings_util(request):
1556 1557 """
1557 1558 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1558 1559 """
1559 1560 utility = SettingsUtility()
1560 1561 request.addfinalizer(utility.cleanup)
1561 1562 return utility
1562 1563
1563 1564
1564 1565 class SettingsUtility(object):
1565 1566 def __init__(self):
1566 1567 self.rhodecode_ui_ids = []
1567 1568 self.rhodecode_setting_ids = []
1568 1569 self.repo_rhodecode_ui_ids = []
1569 1570 self.repo_rhodecode_setting_ids = []
1570 1571
1571 1572 def create_repo_rhodecode_ui(
1572 1573 self, repo, section, value, key=None, active=True, cleanup=True):
1573 1574 key = key or hashlib.sha1(
1574 1575 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1575 1576
1576 1577 setting = RepoRhodeCodeUi()
1577 1578 setting.repository_id = repo.repo_id
1578 1579 setting.ui_section = section
1579 1580 setting.ui_value = value
1580 1581 setting.ui_key = key
1581 1582 setting.ui_active = active
1582 1583 Session().add(setting)
1583 1584 Session().commit()
1584 1585
1585 1586 if cleanup:
1586 1587 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1587 1588 return setting
1588 1589
1589 1590 def create_rhodecode_ui(
1590 1591 self, section, value, key=None, active=True, cleanup=True):
1591 1592 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1592 1593
1593 1594 setting = RhodeCodeUi()
1594 1595 setting.ui_section = section
1595 1596 setting.ui_value = value
1596 1597 setting.ui_key = key
1597 1598 setting.ui_active = active
1598 1599 Session().add(setting)
1599 1600 Session().commit()
1600 1601
1601 1602 if cleanup:
1602 1603 self.rhodecode_ui_ids.append(setting.ui_id)
1603 1604 return setting
1604 1605
1605 1606 def create_repo_rhodecode_setting(
1606 1607 self, repo, name, value, type_, cleanup=True):
1607 1608 setting = RepoRhodeCodeSetting(
1608 1609 repo.repo_id, key=name, val=value, type=type_)
1609 1610 Session().add(setting)
1610 1611 Session().commit()
1611 1612
1612 1613 if cleanup:
1613 1614 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1614 1615 return setting
1615 1616
1616 1617 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1617 1618 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1618 1619 Session().add(setting)
1619 1620 Session().commit()
1620 1621
1621 1622 if cleanup:
1622 1623 self.rhodecode_setting_ids.append(setting.app_settings_id)
1623 1624
1624 1625 return setting
1625 1626
1626 1627 def cleanup(self):
1627 1628 for id_ in self.rhodecode_ui_ids:
1628 1629 setting = RhodeCodeUi.get(id_)
1629 1630 Session().delete(setting)
1630 1631
1631 1632 for id_ in self.rhodecode_setting_ids:
1632 1633 setting = RhodeCodeSetting.get(id_)
1633 1634 Session().delete(setting)
1634 1635
1635 1636 for id_ in self.repo_rhodecode_ui_ids:
1636 1637 setting = RepoRhodeCodeUi.get(id_)
1637 1638 Session().delete(setting)
1638 1639
1639 1640 for id_ in self.repo_rhodecode_setting_ids:
1640 1641 setting = RepoRhodeCodeSetting.get(id_)
1641 1642 Session().delete(setting)
1642 1643
1643 1644 Session().commit()
1644 1645
1645 1646
1646 1647 @pytest.fixture
1647 1648 def no_notifications(request):
1648 1649 notification_patcher = mock.patch(
1649 1650 'rhodecode.model.notification.NotificationModel.create')
1650 1651 notification_patcher.start()
1651 1652 request.addfinalizer(notification_patcher.stop)
1652 1653
1653 1654
1654 1655 @pytest.fixture
1655 1656 def silence_action_logger(request):
1656 1657 notification_patcher = mock.patch(
1657 1658 'rhodecode.lib.utils.action_logger')
1658 1659 notification_patcher.start()
1659 1660 request.addfinalizer(notification_patcher.stop)
1660 1661
1661 1662
1662 1663 @pytest.fixture(scope='session')
1663 1664 def repeat(request):
1664 1665 """
1665 1666 The number of repetitions is based on this fixture.
1666 1667
1667 1668 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1668 1669 tests are not too slow in our default test suite.
1669 1670 """
1670 1671 return request.config.getoption('--repeat')
1671 1672
1672 1673
1673 1674 @pytest.fixture
1674 1675 def rhodecode_fixtures():
1675 1676 return Fixture()
1676 1677
1677 1678
1678 1679 @pytest.fixture
1679 1680 def request_stub():
1680 1681 """
1681 1682 Stub request object.
1682 1683 """
1683 1684 request = pyramid.testing.DummyRequest()
1684 1685 request.scheme = 'https'
1685 1686 return request
1686 1687
1687 1688
1688 1689 @pytest.fixture
1689 1690 def config_stub(request, request_stub):
1690 1691 """
1691 1692 Set up pyramid.testing and return the Configurator.
1692 1693 """
1693 1694 config = pyramid.testing.setUp(request=request_stub)
1694 1695
1695 1696 @request.addfinalizer
1696 1697 def cleanup():
1697 1698 pyramid.testing.tearDown()
1698 1699
1699 1700 return config
1700 1701
1701 1702
1702 1703 @pytest.fixture
1703 1704 def StubIntegrationType():
1704 1705 class _StubIntegrationType(IntegrationTypeBase):
1705 1706 """ Test integration type class """
1706 1707
1707 1708 key = 'test'
1708 1709 display_name = 'Test integration type'
1709 1710 description = 'A test integration type for testing'
1710 1711 icon = 'test_icon_html_image'
1711 1712
1712 1713 def __init__(self, settings):
1713 1714 super(_StubIntegrationType, self).__init__(settings)
1714 1715 self.sent_events = [] # for testing
1715 1716
1716 1717 def send_event(self, event):
1717 1718 self.sent_events.append(event)
1718 1719
1719 1720 def settings_schema(self):
1720 1721 class SettingsSchema(colander.Schema):
1721 1722 test_string_field = colander.SchemaNode(
1722 1723 colander.String(),
1723 1724 missing=colander.required,
1724 1725 title='test string field',
1725 1726 )
1726 1727 test_int_field = colander.SchemaNode(
1727 1728 colander.Int(),
1728 1729 title='some integer setting',
1729 1730 )
1730 1731 return SettingsSchema()
1731 1732
1732 1733
1733 1734 integration_type_registry.register_integration_type(_StubIntegrationType)
1734 1735 return _StubIntegrationType
1735 1736
1736 1737 @pytest.fixture
1737 1738 def stub_integration_settings():
1738 1739 return {
1739 1740 'test_string_field': 'some data',
1740 1741 'test_int_field': 100,
1741 1742 }
1742 1743
1743 1744
1744 1745 @pytest.fixture
1745 1746 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1746 1747 stub_integration_settings):
1747 1748 integration = IntegrationModel().create(
1748 1749 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1749 1750 name='test repo integration',
1750 1751 repo=repo_stub, repo_group=None, child_repos_only=None)
1751 1752
1752 1753 @request.addfinalizer
1753 1754 def cleanup():
1754 1755 IntegrationModel().delete(integration)
1755 1756
1756 1757 return integration
1757 1758
1758 1759
1759 1760 @pytest.fixture
1760 1761 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1761 1762 stub_integration_settings):
1762 1763 integration = IntegrationModel().create(
1763 1764 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1764 1765 name='test repogroup integration',
1765 1766 repo=None, repo_group=test_repo_group, child_repos_only=True)
1766 1767
1767 1768 @request.addfinalizer
1768 1769 def cleanup():
1769 1770 IntegrationModel().delete(integration)
1770 1771
1771 1772 return integration
1772 1773
1773 1774
1774 1775 @pytest.fixture
1775 1776 def repogroup_recursive_integration_stub(request, test_repo_group,
1776 1777 StubIntegrationType, stub_integration_settings):
1777 1778 integration = IntegrationModel().create(
1778 1779 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1779 1780 name='test recursive repogroup integration',
1780 1781 repo=None, repo_group=test_repo_group, child_repos_only=False)
1781 1782
1782 1783 @request.addfinalizer
1783 1784 def cleanup():
1784 1785 IntegrationModel().delete(integration)
1785 1786
1786 1787 return integration
1787 1788
1788 1789
1789 1790 @pytest.fixture
1790 1791 def global_integration_stub(request, StubIntegrationType,
1791 1792 stub_integration_settings):
1792 1793 integration = IntegrationModel().create(
1793 1794 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1794 1795 name='test global integration',
1795 1796 repo=None, repo_group=None, child_repos_only=None)
1796 1797
1797 1798 @request.addfinalizer
1798 1799 def cleanup():
1799 1800 IntegrationModel().delete(integration)
1800 1801
1801 1802 return integration
1802 1803
1803 1804
1804 1805 @pytest.fixture
1805 1806 def root_repos_integration_stub(request, StubIntegrationType,
1806 1807 stub_integration_settings):
1807 1808 integration = IntegrationModel().create(
1808 1809 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1809 1810 name='test global integration',
1810 1811 repo=None, repo_group=None, child_repos_only=True)
1811 1812
1812 1813 @request.addfinalizer
1813 1814 def cleanup():
1814 1815 IntegrationModel().delete(integration)
1815 1816
1816 1817 return integration
1818
1819
1820 @pytest.fixture
1821 def local_dt_to_utc():
1822 def _factory(dt):
1823 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1824 dateutil.tz.tzutc()).replace(tzinfo=None)
1825 return _factory
@@ -1,173 +1,146 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22
23 23 import pytest
24 24
25 25 from rhodecode.lib.vcs.nodes import FileNode
26 26 from rhodecode.tests.vcs.base import BackendTestMixin
27 27
28 28
29 29 class TestBranches(BackendTestMixin):
30 30
31 @classmethod
32 def _get_commits(cls):
33 commits = [
34 {
35 'message': 'Initial commit',
36 'author': 'Joe Doe <joe.doe@example.com>',
37 'date': datetime.datetime(2010, 1, 1, 20),
38 'added': [
39 FileNode('foobar', content='Foobar'),
40 FileNode('foobar2', content='Foobar II'),
41 FileNode('foo/bar/baz', content='baz here!'),
42 ],
43 },
44 {
45 'message': 'Changes...',
46 'author': 'Jane Doe <jane.doe@example.com>',
47 'date': datetime.datetime(2010, 1, 1, 21),
48 'added': [
49 FileNode('some/new.txt', content='news...'),
50 ],
51 'changed': [
52 FileNode('foobar', 'Foobar I'),
53 ],
54 'removed': [],
55 },
56 ]
57 return commits
58
59 31 def test_empty_repository_has_no_branches(self, vcsbackend):
60 32 empty_repo = vcsbackend.create_repo()
61 33 assert empty_repo.branches == {}
62 34
63 35 def test_branches_all(self, vcsbackend):
64 36 branch_count = {
65 37 'git': 1,
66 38 'hg': 1,
67 39 'svn': 0,
68 40 }
69 41 assert len(self.repo.branches_all) == branch_count[vcsbackend.alias]
70 42
71 43 def test_closed_branches(self):
72 44 assert len(self.repo.branches_closed) == 0
73 45
74 def test_simple(self):
46 def test_simple(self, local_dt_to_utc):
75 47 tip = self.repo.get_commit()
76 assert tip.date == datetime.datetime(2010, 1, 1, 21)
48 assert tip.message == 'Changes...'
49 assert tip.date == local_dt_to_utc(datetime.datetime(2010, 1, 1, 21))
77 50
78 51 @pytest.mark.backends("git", "hg")
79 52 def test_new_branch(self):
80 53 # This check must not be removed to ensure the 'branches' LazyProperty
81 54 # gets hit *before* the new 'foobar' branch got created:
82 55 assert 'foobar' not in self.repo.branches
83 56 self.imc.add(FileNode(
84 57 'docs/index.txt',
85 58 content='Documentation\n'))
86 59 foobar_tip = self.imc.commit(
87 60 message=u'New branch: foobar',
88 61 author=u'joe',
89 62 branch='foobar',
90 63 )
91 64 assert 'foobar' in self.repo.branches
92 65 assert foobar_tip.branch == 'foobar'
93 66
94 67 @pytest.mark.backends("git", "hg")
95 68 def test_new_head(self):
96 69 tip = self.repo.get_commit()
97 70 self.imc.add(FileNode(
98 71 'docs/index.txt',
99 72 content='Documentation\n'))
100 73 foobar_tip = self.imc.commit(
101 74 message=u'New branch: foobar',
102 75 author=u'joe',
103 76 branch='foobar',
104 77 parents=[tip],
105 78 )
106 79 self.imc.change(FileNode(
107 80 'docs/index.txt',
108 81 content='Documentation\nand more...\n'))
109 82 newtip = self.imc.commit(
110 83 message=u'At default branch',
111 84 author=u'joe',
112 85 branch=foobar_tip.branch,
113 86 parents=[foobar_tip],
114 87 )
115 88
116 89 newest_tip = self.imc.commit(
117 90 message=u'Merged with %s' % foobar_tip.raw_id,
118 91 author=u'joe',
119 92 branch=self.backend_class.DEFAULT_BRANCH_NAME,
120 93 parents=[newtip, foobar_tip],
121 94 )
122 95
123 96 assert newest_tip.branch == \
124 97 self.backend_class.DEFAULT_BRANCH_NAME
125 98
126 99 @pytest.mark.backends("git", "hg")
127 100 def test_branch_with_slash_in_name(self):
128 101 self.imc.add(FileNode('extrafile', content='Some data\n'))
129 102 self.imc.commit(
130 103 u'Branch with a slash!', author=u'joe',
131 104 branch='issue/123')
132 105 assert 'issue/123' in self.repo.branches
133 106
134 107 @pytest.mark.backends("git", "hg")
135 108 def test_branch_with_slash_in_name_and_similar_without(self):
136 109 self.imc.add(FileNode('extrafile', content='Some data\n'))
137 110 self.imc.commit(
138 111 u'Branch with a slash!', author=u'joe',
139 112 branch='issue/123')
140 113 self.imc.add(FileNode('extrafile II', content='Some data\n'))
141 114 self.imc.commit(
142 115 u'Branch without a slash...', author=u'joe',
143 116 branch='123')
144 117 assert 'issue/123' in self.repo.branches
145 118 assert '123' in self.repo.branches
146 119
147 120
148 class TestSvnBranches:
121 class TestSvnBranches(object):
149 122
150 123 def test_empty_repository_has_no_tags_and_branches(self, vcsbackend_svn):
151 124 empty_repo = vcsbackend_svn.create_repo()
152 125 assert empty_repo.branches == {}
153 126 assert empty_repo.tags == {}
154 127
155 128 def test_missing_structure_has_no_tags_and_branches(self, vcsbackend_svn):
156 129 repo = vcsbackend_svn.create_repo(number_of_commits=1)
157 130 assert repo.branches == {}
158 131 assert repo.tags == {}
159 132
160 133 def test_discovers_ordered_branches(self, vcsbackend_svn):
161 134 repo = vcsbackend_svn['svn-simple-layout']
162 135 expected_branches = [
163 136 'branches/add-docs',
164 137 'branches/argparse',
165 138 'trunk',
166 139 ]
167 140 assert repo.branches.keys() == expected_branches
168 141
169 142 def test_discovers_ordered_tags(self, vcsbackend_svn):
170 143 repo = vcsbackend_svn['svn-simple-layout']
171 144 expected_tags = [
172 145 'tags/v0.1', 'tags/v0.2', 'tags/v0.3', 'tags/v0.5']
173 146 assert repo.tags.keys() == expected_tags
@@ -1,577 +1,575 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22 import time
23 23
24 24 import pytest
25 25
26 26 from rhodecode.lib.vcs.backends.base import (
27 27 CollectionGenerator, FILEMODE_DEFAULT, EmptyCommit)
28 28 from rhodecode.lib.vcs.exceptions import (
29 29 BranchDoesNotExistError, CommitDoesNotExistError,
30 30 RepositoryError, EmptyRepositoryError)
31 31 from rhodecode.lib.vcs.nodes import (
32 32 FileNode, AddedFileNodesGenerator,
33 33 ChangedFileNodesGenerator, RemovedFileNodesGenerator)
34 34 from rhodecode.tests import get_new_dir
35 35 from rhodecode.tests.vcs.base import BackendTestMixin
36 36
37 37
38 38 class TestBaseChangeset:
39 39
40 40 def test_is_deprecated(self):
41 41 from rhodecode.lib.vcs.backends.base import BaseChangeset
42 42 pytest.deprecated_call(BaseChangeset)
43 43
44 44
45 45 class TestEmptyCommit:
46 46
47 47 def test_branch_without_alias_returns_none(self):
48 48 commit = EmptyCommit()
49 49 assert commit.branch is None
50 50
51 51
52 52 class TestCommitsInNonEmptyRepo(BackendTestMixin):
53 53 recreate_repo_per_test = True
54 54
55 55 @classmethod
56 56 def _get_commits(cls):
57 57 start_date = datetime.datetime(2010, 1, 1, 20)
58 58 for x in xrange(5):
59 59 yield {
60 60 'message': 'Commit %d' % x,
61 61 'author': 'Joe Doe <joe.doe@example.com>',
62 62 'date': start_date + datetime.timedelta(hours=12 * x),
63 63 'added': [
64 64 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
65 65 ],
66 66 }
67 67
68 68 def test_walk_returns_empty_list_in_case_of_file(self):
69 69 result = list(self.tip.walk('file_0.txt'))
70 70 assert result == []
71 71
72 72 @pytest.mark.backends("git", "hg")
73 73 def test_new_branch(self):
74 74 self.imc.add(FileNode('docs/index.txt',
75 75 content='Documentation\n'))
76 76 foobar_tip = self.imc.commit(
77 77 message=u'New branch: foobar',
78 78 author=u'joe',
79 79 branch='foobar',
80 80 )
81 81 assert 'foobar' in self.repo.branches
82 82 assert foobar_tip.branch == 'foobar'
83 83 # 'foobar' should be the only branch that contains the new commit
84 84 branch = self.repo.branches.values()
85 85 assert branch[0] != branch[1]
86 86
87 87 @pytest.mark.backends("git", "hg")
88 88 def test_new_head_in_default_branch(self):
89 89 tip = self.repo.get_commit()
90 90 self.imc.add(FileNode('docs/index.txt',
91 91 content='Documentation\n'))
92 92 foobar_tip = self.imc.commit(
93 93 message=u'New branch: foobar',
94 94 author=u'joe',
95 95 branch='foobar',
96 96 parents=[tip],
97 97 )
98 98 self.imc.change(FileNode('docs/index.txt',
99 99 content='Documentation\nand more...\n'))
100 100 newtip = self.imc.commit(
101 101 message=u'At default branch',
102 102 author=u'joe',
103 103 branch=foobar_tip.branch,
104 104 parents=[foobar_tip],
105 105 )
106 106
107 107 newest_tip = self.imc.commit(
108 108 message=u'Merged with %s' % foobar_tip.raw_id,
109 109 author=u'joe',
110 110 branch=self.backend_class.DEFAULT_BRANCH_NAME,
111 111 parents=[newtip, foobar_tip],
112 112 )
113 113
114 114 assert newest_tip.branch == self.backend_class.DEFAULT_BRANCH_NAME
115 115
116 116 @pytest.mark.backends("git", "hg")
117 117 def test_get_commits_respects_branch_name(self):
118 118 """
119 119 * e1930d0 (HEAD, master) Back in default branch
120 120 | * e1930d0 (docs) New Branch: docs2
121 121 | * dcc14fa New branch: docs
122 122 |/
123 123 * e63c41a Initial commit
124 124 ...
125 125 * 624d3db Commit 0
126 126
127 127 :return:
128 128 """
129 129 DEFAULT_BRANCH = self.repo.DEFAULT_BRANCH_NAME
130 130 TEST_BRANCH = 'docs'
131 131 org_tip = self.repo.get_commit()
132 132
133 133 self.imc.add(FileNode('readme.txt', content='Document\n'))
134 134 initial = self.imc.commit(
135 135 message=u'Initial commit',
136 136 author=u'joe',
137 137 parents=[org_tip],
138 138 branch=DEFAULT_BRANCH,)
139 139
140 140 self.imc.add(FileNode('newdoc.txt', content='foobar\n'))
141 141 docs_branch_commit1 = self.imc.commit(
142 142 message=u'New branch: docs',
143 143 author=u'joe',
144 144 parents=[initial],
145 145 branch=TEST_BRANCH,)
146 146
147 147 self.imc.add(FileNode('newdoc2.txt', content='foobar2\n'))
148 148 docs_branch_commit2 = self.imc.commit(
149 149 message=u'New branch: docs2',
150 150 author=u'joe',
151 151 parents=[docs_branch_commit1],
152 152 branch=TEST_BRANCH,)
153 153
154 154 self.imc.add(FileNode('newfile', content='hello world\n'))
155 155 self.imc.commit(
156 156 message=u'Back in default branch',
157 157 author=u'joe',
158 158 parents=[initial],
159 159 branch=DEFAULT_BRANCH,)
160 160
161 161 default_branch_commits = self.repo.get_commits(
162 162 branch_name=DEFAULT_BRANCH)
163 163 assert docs_branch_commit1 not in list(default_branch_commits)
164 164 assert docs_branch_commit2 not in list(default_branch_commits)
165 165
166 166 docs_branch_commits = self.repo.get_commits(
167 167 start_id=self.repo.commit_ids[0], end_id=self.repo.commit_ids[-1],
168 168 branch_name=TEST_BRANCH)
169 169 assert docs_branch_commit1 in list(docs_branch_commits)
170 170 assert docs_branch_commit2 in list(docs_branch_commits)
171 171
172 172 @pytest.mark.backends("svn")
173 173 def test_get_commits_respects_branch_name_svn(self, vcsbackend_svn):
174 174 repo = vcsbackend_svn['svn-simple-layout']
175 175 commits = repo.get_commits(branch_name='trunk')
176 176 commit_indexes = [c.idx for c in commits]
177 177 assert commit_indexes == [1, 2, 3, 7, 12, 15]
178 178
179 179 def test_get_commit_by_branch(self):
180 180 for branch, commit_id in self.repo.branches.iteritems():
181 181 assert commit_id == self.repo.get_commit(branch).raw_id
182 182
183 183 def test_get_commit_by_tag(self):
184 184 for tag, commit_id in self.repo.tags.iteritems():
185 185 assert commit_id == self.repo.get_commit(tag).raw_id
186 186
187 187 def test_get_commit_parents(self):
188 188 repo = self.repo
189 189 for test_idx in [1, 2, 3]:
190 190 commit = repo.get_commit(commit_idx=test_idx - 1)
191 191 assert [commit] == repo.get_commit(commit_idx=test_idx).parents
192 192
193 193 def test_get_commit_children(self):
194 194 repo = self.repo
195 195 for test_idx in [1, 2, 3]:
196 196 commit = repo.get_commit(commit_idx=test_idx + 1)
197 197 assert [commit] == repo.get_commit(commit_idx=test_idx).children
198 198
199 199
200 200 class TestCommits(BackendTestMixin):
201 201 recreate_repo_per_test = False
202 202
203 203 @classmethod
204 204 def _get_commits(cls):
205 205 start_date = datetime.datetime(2010, 1, 1, 20)
206 206 for x in xrange(5):
207 207 yield {
208 208 'message': u'Commit %d' % x,
209 209 'author': u'Joe Doe <joe.doe@example.com>',
210 210 'date': start_date + datetime.timedelta(hours=12 * x),
211 211 'added': [
212 212 FileNode('file_%d.txt' % x, content='Foobar %d' % x),
213 213 ],
214 214 }
215 215
216 216 def test_simple(self):
217 217 tip = self.repo.get_commit()
218 218 assert tip.date, datetime.datetime(2010, 1, 3 == 20)
219 219
220 220 def test_simple_serialized_commit(self):
221 221 tip = self.repo.get_commit()
222 222 # json.dumps(tip) uses .__json__() method
223 223 data = tip.__json__()
224 224 assert 'branch' in data
225 225 assert data['revision']
226 226
227 227 def test_retrieve_tip(self):
228 228 tip = self.repo.get_commit('tip')
229 229 assert tip == self.repo.get_commit()
230 230
231 231 def test_invalid(self):
232 232 with pytest.raises(CommitDoesNotExistError):
233 233 self.repo.get_commit(commit_idx=123456789)
234 234
235 235 def test_idx(self):
236 236 commit = self.repo[0]
237 237 assert commit.idx == 0
238 238
239 239 def test_negative_idx(self):
240 240 commit = self.repo.get_commit(commit_idx=-1)
241 241 assert commit.idx >= 0
242 242
243 243 def test_revision_is_deprecated(self):
244 244 def get_revision(commit):
245 245 return commit.revision
246 246
247 247 commit = self.repo[0]
248 248 pytest.deprecated_call(get_revision, commit)
249 249
250 250 def test_size(self):
251 251 tip = self.repo.get_commit()
252 252 size = 5 * len('Foobar N') # Size of 5 files
253 253 assert tip.size == size
254 254
255 255 def test_size_at_commit(self):
256 256 tip = self.repo.get_commit()
257 257 size = 5 * len('Foobar N') # Size of 5 files
258 258 assert self.repo.size_at_commit(tip.raw_id) == size
259 259
260 260 def test_size_at_first_commit(self):
261 261 commit = self.repo[0]
262 262 size = len('Foobar N') # Size of 1 file
263 263 assert self.repo.size_at_commit(commit.raw_id) == size
264 264
265 265 def test_author(self):
266 266 tip = self.repo.get_commit()
267 267 assert_text_equal(tip.author, u'Joe Doe <joe.doe@example.com>')
268 268
269 269 def test_author_name(self):
270 270 tip = self.repo.get_commit()
271 271 assert_text_equal(tip.author_name, u'Joe Doe')
272 272
273 273 def test_author_email(self):
274 274 tip = self.repo.get_commit()
275 275 assert_text_equal(tip.author_email, u'joe.doe@example.com')
276 276
277 277 def test_message(self):
278 278 tip = self.repo.get_commit()
279 279 assert_text_equal(tip.message, u'Commit 4')
280 280
281 281 def test_diff(self):
282 282 tip = self.repo.get_commit()
283 283 diff = tip.diff()
284 284 assert "+Foobar 4" in diff.raw
285 285
286 286 def test_prev(self):
287 287 tip = self.repo.get_commit()
288 288 prev_commit = tip.prev()
289 289 assert prev_commit.message == 'Commit 3'
290 290
291 291 def test_prev_raises_on_first_commit(self):
292 292 commit = self.repo.get_commit(commit_idx=0)
293 293 with pytest.raises(CommitDoesNotExistError):
294 294 commit.prev()
295 295
296 296 def test_prev_works_on_second_commit_issue_183(self):
297 297 commit = self.repo.get_commit(commit_idx=1)
298 298 prev_commit = commit.prev()
299 299 assert prev_commit.idx == 0
300 300
301 301 def test_next(self):
302 302 commit = self.repo.get_commit(commit_idx=2)
303 303 next_commit = commit.next()
304 304 assert next_commit.message == 'Commit 3'
305 305
306 306 def test_next_raises_on_tip(self):
307 307 commit = self.repo.get_commit()
308 308 with pytest.raises(CommitDoesNotExistError):
309 309 commit.next()
310 310
311 311 def test_get_file_commit(self):
312 312 commit = self.repo.get_commit()
313 313 commit.get_file_commit('file_4.txt')
314 314 assert commit.message == 'Commit 4'
315 315
316 316 def test_get_filenodes_generator(self):
317 317 tip = self.repo.get_commit()
318 318 filepaths = [node.path for node in tip.get_filenodes_generator()]
319 319 assert filepaths == ['file_%d.txt' % x for x in xrange(5)]
320 320
321 321 def test_get_file_annotate(self):
322 322 file_added_commit = self.repo.get_commit(commit_idx=3)
323 323 annotations = list(file_added_commit.get_file_annotate('file_3.txt'))
324
324 325 line_no, commit_id, commit_loader, line = annotations[0]
326
325 327 assert line_no == 1
326 328 assert commit_id == file_added_commit.raw_id
327 329 assert commit_loader() == file_added_commit
328
329 # git annotation is generated differently thus different results
330 if self.repo.alias == 'git':
331 assert line == '(Joe Doe 2010-01-03 08:00:00 +0000 1) Foobar 3'
332 else:
333 assert line == 'Foobar 3'
330 assert 'Foobar 3' in line
334 331
335 332 def test_get_file_annotate_does_not_exist(self):
336 333 file_added_commit = self.repo.get_commit(commit_idx=2)
337 334 # TODO: Should use a specific exception class here?
338 335 with pytest.raises(Exception):
339 336 list(file_added_commit.get_file_annotate('file_3.txt'))
340 337
341 338 def test_get_file_annotate_tip(self):
342 339 tip = self.repo.get_commit()
343 340 commit = self.repo.get_commit(commit_idx=3)
344 341 expected_values = list(commit.get_file_annotate('file_3.txt'))
345 342 annotations = list(tip.get_file_annotate('file_3.txt'))
346 343
347 344 # Note: Skip index 2 because the loader function is not the same
348 345 for idx in (0, 1, 3):
349 346 assert annotations[0][idx] == expected_values[0][idx]
350 347
351 348 def test_get_commits_is_ordered_by_date(self):
352 349 commits = self.repo.get_commits()
353 350 assert isinstance(commits, CollectionGenerator)
354 351 assert len(commits) == 0 or len(commits) != 0
355 352 commits = list(commits)
356 353 ordered_by_date = sorted(commits, key=lambda commit: commit.date)
357 354 assert commits == ordered_by_date
358 355
359 356 def test_get_commits_respects_start(self):
360 357 second_id = self.repo.commit_ids[1]
361 358 commits = self.repo.get_commits(start_id=second_id)
362 359 assert isinstance(commits, CollectionGenerator)
363 360 commits = list(commits)
364 361 assert len(commits) == 4
365 362
366 363 def test_get_commits_includes_start_commit(self):
367 364 second_id = self.repo.commit_ids[1]
368 365 commits = self.repo.get_commits(start_id=second_id)
369 366 assert isinstance(commits, CollectionGenerator)
370 367 commits = list(commits)
371 368 assert commits[0].raw_id == second_id
372 369
373 370 def test_get_commits_respects_end(self):
374 371 second_id = self.repo.commit_ids[1]
375 372 commits = self.repo.get_commits(end_id=second_id)
376 373 assert isinstance(commits, CollectionGenerator)
377 374 commits = list(commits)
378 375 assert commits[-1].raw_id == second_id
379 376 assert len(commits) == 2
380 377
381 378 def test_get_commits_respects_both_start_and_end(self):
382 379 second_id = self.repo.commit_ids[1]
383 380 third_id = self.repo.commit_ids[2]
384 381 commits = self.repo.get_commits(start_id=second_id, end_id=third_id)
385 382 assert isinstance(commits, CollectionGenerator)
386 383 commits = list(commits)
387 384 assert len(commits) == 2
388 385
389 386 def test_get_commits_on_empty_repo_raises_EmptyRepository_error(self):
390 387 repo_path = get_new_dir(str(time.time()))
391 388 repo = self.Backend(repo_path, create=True)
392 389
393 390 with pytest.raises(EmptyRepositoryError):
394 391 list(repo.get_commits(start_id='foobar'))
395 392
396 393 def test_get_commits_includes_end_commit(self):
397 394 second_id = self.repo.commit_ids[1]
398 395 commits = self.repo.get_commits(end_id=second_id)
399 396 assert isinstance(commits, CollectionGenerator)
400 397 assert len(commits) == 2
401 398 commits = list(commits)
402 399 assert commits[-1].raw_id == second_id
403 400
404 401 def test_get_commits_respects_start_date(self):
405 402 start_date = datetime.datetime(2010, 1, 2)
406 403 commits = self.repo.get_commits(start_date=start_date)
407 404 assert isinstance(commits, CollectionGenerator)
408 405 # Should be 4 commits after 2010-01-02 00:00:00
409 406 assert len(commits) == 4
410 407 for c in commits:
411 408 assert c.date >= start_date
412 409
413 410 def test_get_commits_respects_start_date_and_end_date(self):
414 411 start_date = datetime.datetime(2010, 1, 2)
415 412 end_date = datetime.datetime(2010, 1, 3)
416 413 commits = self.repo.get_commits(start_date=start_date,
417 414 end_date=end_date)
418 415 assert isinstance(commits, CollectionGenerator)
419 416 assert len(commits) == 2
420 417 for c in commits:
421 418 assert c.date >= start_date
422 419 assert c.date <= end_date
423 420
424 421 def test_get_commits_respects_end_date(self):
425 422 end_date = datetime.datetime(2010, 1, 2)
426 423 commits = self.repo.get_commits(end_date=end_date)
427 424 assert isinstance(commits, CollectionGenerator)
428 425 assert len(commits) == 1
429 426 for c in commits:
430 427 assert c.date <= end_date
431 428
432 429 def test_get_commits_respects_reverse(self):
433 430 commits = self.repo.get_commits() # no longer reverse support
434 431 assert isinstance(commits, CollectionGenerator)
435 432 assert len(commits) == 5
436 433 commit_ids = reversed([c.raw_id for c in commits])
437 434 assert list(commit_ids) == list(reversed(self.repo.commit_ids))
438 435
439 436 def test_get_commits_slice_generator(self):
440 437 commits = self.repo.get_commits(
441 438 branch_name=self.repo.DEFAULT_BRANCH_NAME)
442 439 assert isinstance(commits, CollectionGenerator)
443 440 commit_slice = list(commits[1:3])
444 441 assert len(commit_slice) == 2
445 442
446 443 def test_get_commits_raise_commitdoesnotexist_for_wrong_start(self):
447 444 with pytest.raises(CommitDoesNotExistError):
448 445 list(self.repo.get_commits(start_id='foobar'))
449 446
450 447 def test_get_commits_raise_commitdoesnotexist_for_wrong_end(self):
451 448 with pytest.raises(CommitDoesNotExistError):
452 449 list(self.repo.get_commits(end_id='foobar'))
453 450
454 451 def test_get_commits_raise_branchdoesnotexist_for_wrong_branch_name(self):
455 452 with pytest.raises(BranchDoesNotExistError):
456 453 list(self.repo.get_commits(branch_name='foobar'))
457 454
458 455 def test_get_commits_raise_repositoryerror_for_wrong_start_end(self):
459 456 start_id = self.repo.commit_ids[-1]
460 457 end_id = self.repo.commit_ids[0]
461 458 with pytest.raises(RepositoryError):
462 459 list(self.repo.get_commits(start_id=start_id, end_id=end_id))
463 460
464 461 def test_get_commits_raises_for_numerical_ids(self):
465 462 with pytest.raises(TypeError):
466 463 self.repo.get_commits(start_id=1, end_id=2)
467 464
468 465 def test_commit_equality(self):
469 466 commit1 = self.repo.get_commit(self.repo.commit_ids[0])
470 467 commit2 = self.repo.get_commit(self.repo.commit_ids[1])
471 468
472 469 assert commit1 == commit1
473 470 assert commit2 == commit2
474 471 assert commit1 != commit2
475 472 assert commit2 != commit1
476 473 assert commit1 != None
477 474 assert None != commit1
478 475 assert 1 != commit1
479 476 assert 'string' != commit1
480 477
481 478
482 479 @pytest.mark.parametrize("filename, expected", [
483 480 ("README.rst", False),
484 481 ("README", True),
485 482 ])
486 483 def test_commit_is_link(vcsbackend, filename, expected):
487 484 commit = vcsbackend.repo.get_commit()
488 485 link_status = commit.is_link(filename)
489 486 assert link_status is expected
490 487
491 488
492 489 class TestCommitsChanges(BackendTestMixin):
493 490 recreate_repo_per_test = False
494 491
495 492 @classmethod
496 493 def _get_commits(cls):
497 494 return [
498 495 {
499 496 'message': u'Initial',
500 497 'author': u'Joe Doe <joe.doe@example.com>',
501 498 'date': datetime.datetime(2010, 1, 1, 20),
502 499 'added': [
503 500 FileNode('foo/bar', content='foo'),
504 501 FileNode('foo/baΕ‚', content='foo'),
505 502 FileNode('foobar', content='foo'),
506 503 FileNode('qwe', content='foo'),
507 504 ],
508 505 },
509 506 {
510 507 'message': u'Massive changes',
511 508 'author': u'Joe Doe <joe.doe@example.com>',
512 509 'date': datetime.datetime(2010, 1, 1, 22),
513 510 'added': [FileNode('fallout', content='War never changes')],
514 511 'changed': [
515 512 FileNode('foo/bar', content='baz'),
516 513 FileNode('foobar', content='baz'),
517 514 ],
518 515 'removed': [FileNode('qwe')],
519 516 },
520 517 ]
521 518
522 def test_initial_commit(self):
519 def test_initial_commit(self, local_dt_to_utc):
523 520 commit = self.repo.get_commit(commit_idx=0)
524 521 assert set(commit.added) == set([
525 522 commit.get_node('foo/bar'),
526 523 commit.get_node('foo/baΕ‚'),
527 524 commit.get_node('foobar'),
528 525 commit.get_node('qwe'),
529 526 ])
530 527 assert set(commit.changed) == set()
531 528 assert set(commit.removed) == set()
532 529 assert set(commit.affected_files) == set(
533 530 ['foo/bar', 'foo/baΕ‚', 'foobar', 'qwe'])
534 assert commit.date == datetime.datetime(2010, 1, 1, 20, 0)
531 assert commit.date == local_dt_to_utc(
532 datetime.datetime(2010, 1, 1, 20, 0))
535 533
536 534 def test_head_added(self):
537 535 commit = self.repo.get_commit()
538 536 assert isinstance(commit.added, AddedFileNodesGenerator)
539 537 assert set(commit.added) == set([commit.get_node('fallout')])
540 538 assert isinstance(commit.changed, ChangedFileNodesGenerator)
541 539 assert set(commit.changed) == set([
542 540 commit.get_node('foo/bar'),
543 541 commit.get_node('foobar'),
544 542 ])
545 543 assert isinstance(commit.removed, RemovedFileNodesGenerator)
546 544 assert len(commit.removed) == 1
547 545 assert list(commit.removed)[0].path == 'qwe'
548 546
549 547 def test_get_filemode(self):
550 548 commit = self.repo.get_commit()
551 549 assert FILEMODE_DEFAULT == commit.get_file_mode('foo/bar')
552 550
553 551 def test_get_filemode_non_ascii(self):
554 552 commit = self.repo.get_commit()
555 553 assert FILEMODE_DEFAULT == commit.get_file_mode('foo/baΕ‚')
556 554 assert FILEMODE_DEFAULT == commit.get_file_mode(u'foo/baΕ‚')
557 555
558 556 def test_get_file_history(self):
559 557 commit = self.repo.get_commit()
560 558 history = commit.get_file_history('foo/bar')
561 559 assert len(history) == 2
562 560
563 561 def test_get_file_history_with_limit(self):
564 562 commit = self.repo.get_commit()
565 563 history = commit.get_file_history('foo/bar', limit=1)
566 564 assert len(history) == 1
567 565
568 566 def test_get_file_history_first_commit(self):
569 567 commit = self.repo[0]
570 568 history = commit.get_file_history('foo/bar')
571 569 assert len(history) == 1
572 570
573 571
574 572 def assert_text_equal(expected, given):
575 573 assert expected == given
576 574 assert isinstance(expected, unicode)
577 575 assert isinstance(given, unicode)
@@ -1,344 +1,344 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 """
22 22 Tests so called "in memory commits" commit API of vcs.
23 23 """
24 24 import datetime
25 25
26 26 import pytest
27 27
28 28 from rhodecode.lib.utils2 import safe_unicode
29 29 from rhodecode.lib.vcs.exceptions import (
30 30 EmptyRepositoryError, NodeAlreadyAddedError, NodeAlreadyExistsError,
31 31 NodeAlreadyRemovedError, NodeAlreadyChangedError, NodeDoesNotExistError,
32 32 NodeNotChangedError)
33 33 from rhodecode.lib.vcs.nodes import DirNode, FileNode
34 34 from rhodecode.tests.vcs.base import BackendTestMixin
35 35
36 36
37 37 @pytest.fixture
38 38 def nodes():
39 39 nodes = [
40 40 FileNode('foobar', content='Foo & bar'),
41 41 FileNode('foobar2', content='Foo & bar, doubled!'),
42 42 FileNode('foo bar with spaces', content=''),
43 43 FileNode('foo/bar/baz', content='Inside'),
44 44 FileNode(
45 45 'foo/bar/file.bin',
46 46 content=(
47 47 '\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00'
48 48 '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe'
49 49 '\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
50 50 '\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00'
51 51 '\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00'
52 52 '\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff'
53 53 '\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
54 54 )
55 55 ),
56 56 ]
57 57 return nodes
58 58
59 59
60 60 class TestInMemoryCommit(BackendTestMixin):
61 61 """
62 62 This is a backend independent test case class which should be created
63 63 with ``type`` method.
64 64
65 65 It is required to set following attributes at subclass:
66 66
67 67 - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
68 68 """
69 69
70 70 @classmethod
71 71 def _get_commits(cls):
72 72 return []
73 73
74 74 def test_add(self, nodes):
75 75 for node in nodes:
76 76 self.imc.add(node)
77 77
78 78 self.commit()
79 79 self.assert_succesful_commit(nodes)
80 80
81 81 @pytest.mark.skip_backends(
82 82 'svn', reason="Svn does not support commits on branches.")
83 83 def test_add_on_branch(self, nodes):
84 84 for node in nodes:
85 85 self.imc.add(node)
86 86 self.commit(branch=u'stable')
87 87 self.assert_succesful_commit(nodes)
88 88
89 89 def test_add_in_bulk(self, nodes):
90 90 self.imc.add(*nodes)
91 91
92 92 self.commit()
93 93 self.assert_succesful_commit(nodes)
94 94
95 95 def test_add_non_ascii_files(self):
96 96 nodes = [
97 97 FileNode('ΕΌΓ³Ε‚wik/zwierzΔ…tko_utf8_str', content='Δ‡Δ‡Δ‡Δ‡'),
98 98 FileNode(u'ΕΌΓ³Ε‚wik/zwierzΔ…tko_unicode', content=u'Δ‡Δ‡Δ‡Δ‡'),
99 99 ]
100 100
101 101 for node in nodes:
102 102 self.imc.add(node)
103 103
104 104 self.commit()
105 105 self.assert_succesful_commit(nodes)
106 106
107 107 def commit(self, branch=None):
108 108 self.old_commit_count = len(self.repo.commit_ids)
109 109 self.commit_message = u'Test commit with unicode: ΕΌΓ³Ε‚wik'
110 110 self.commit_author = unicode(self.__class__)
111 111 self.commit = self.imc.commit(
112 112 message=self.commit_message, author=self.commit_author,
113 113 branch=branch)
114 114
115 115 def test_add_actually_adds_all_nodes_at_second_commit_too(self):
116 116 to_add = [
117 117 FileNode('foo/bar/image.png', content='\0'),
118 118 FileNode('foo/README.txt', content='readme!'),
119 119 ]
120 120 self.imc.add(*to_add)
121 121 commit = self.imc.commit(u'Initial', u'joe.doe@example.com')
122 122 assert isinstance(commit.get_node('foo'), DirNode)
123 123 assert isinstance(commit.get_node('foo/bar'), DirNode)
124 124 self.assert_nodes_in_commit(commit, to_add)
125 125
126 126 # commit some more files again
127 127 to_add = [
128 128 FileNode('foo/bar/foobaz/bar', content='foo'),
129 129 FileNode('foo/bar/another/bar', content='foo'),
130 130 FileNode('foo/baz.txt', content='foo'),
131 131 FileNode('foobar/foobaz/file', content='foo'),
132 132 FileNode('foobar/barbaz', content='foo'),
133 133 ]
134 134 self.imc.add(*to_add)
135 135 commit = self.imc.commit(u'Another', u'joe.doe@example.com')
136 136 self.assert_nodes_in_commit(commit, to_add)
137 137
138 138 def test_add_raise_already_added(self):
139 139 node = FileNode('foobar', content='baz')
140 140 self.imc.add(node)
141 141 with pytest.raises(NodeAlreadyAddedError):
142 142 self.imc.add(node)
143 143
144 144 def test_check_integrity_raise_already_exist(self):
145 145 node = FileNode('foobar', content='baz')
146 146 self.imc.add(node)
147 147 self.imc.commit(message=u'Added foobar', author=unicode(self))
148 148 self.imc.add(node)
149 149 with pytest.raises(NodeAlreadyExistsError):
150 150 self.imc.commit(message='new message', author=str(self))
151 151
152 152 def test_change(self):
153 153 self.imc.add(FileNode('foo/bar/baz', content='foo'))
154 154 self.imc.add(FileNode('foo/fbar', content='foobar'))
155 155 tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
156 156
157 157 # Change node's content
158 158 node = FileNode('foo/bar/baz', content='My **changed** content')
159 159 self.imc.change(node)
160 160 self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
161 161
162 162 newtip = self.repo.get_commit()
163 163 assert tip != newtip
164 164 assert tip.id != newtip.id
165 165 self.assert_nodes_in_commit(newtip, (node,))
166 166
167 167 def test_change_non_ascii(self):
168 168 to_add = [
169 169 FileNode('ΕΌΓ³Ε‚wik/zwierzΔ…tko', content='Δ‡Δ‡Δ‡Δ‡'),
170 170 FileNode(u'ΕΌΓ³Ε‚wik/zwierzΔ…tko_uni', content=u'Δ‡Δ‡Δ‡Δ‡'),
171 171 ]
172 172 for node in to_add:
173 173 self.imc.add(node)
174 174
175 175 tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
176 176
177 177 # Change node's content
178 178 node = FileNode('ΕΌΓ³Ε‚wik/zwierzΔ…tko', content='My **changed** content')
179 179 self.imc.change(node)
180 180 self.imc.commit(u'Changed %s' % safe_unicode(node.path),
181 181 u'joe.doe@example.com')
182 182
183 183 node_uni = FileNode(
184 184 u'ΕΌΓ³Ε‚wik/zwierzΔ…tko_uni', content=u'My **changed** content')
185 185 self.imc.change(node_uni)
186 186 self.imc.commit(u'Changed %s' % safe_unicode(node_uni.path),
187 187 u'joe.doe@example.com')
188 188
189 189 newtip = self.repo.get_commit()
190 190 assert tip != newtip
191 191 assert tip.id != newtip.id
192 192
193 193 self.assert_nodes_in_commit(newtip, (node, node_uni))
194 194
195 195 def test_change_raise_empty_repository(self):
196 196 node = FileNode('foobar')
197 197 with pytest.raises(EmptyRepositoryError):
198 198 self.imc.change(node)
199 199
200 200 def test_check_integrity_change_raise_node_does_not_exist(self):
201 201 node = FileNode('foobar', content='baz')
202 202 self.imc.add(node)
203 203 self.imc.commit(message=u'Added foobar', author=unicode(self))
204 204 node = FileNode('not-foobar', content='')
205 205 self.imc.change(node)
206 206 with pytest.raises(NodeDoesNotExistError):
207 207 self.imc.commit(
208 208 message='Changed not existing node',
209 209 author=str(self))
210 210
211 211 def test_change_raise_node_already_changed(self):
212 212 node = FileNode('foobar', content='baz')
213 213 self.imc.add(node)
214 214 self.imc.commit(message=u'Added foobar', author=unicode(self))
215 215 node = FileNode('foobar', content='more baz')
216 216 self.imc.change(node)
217 217 with pytest.raises(NodeAlreadyChangedError):
218 218 self.imc.change(node)
219 219
220 220 def test_check_integrity_change_raise_node_not_changed(self, nodes):
221 221 self.test_add(nodes) # Performs first commit
222 222
223 223 node = FileNode(nodes[0].path, content=nodes[0].content)
224 224 self.imc.change(node)
225 225 with pytest.raises(NodeNotChangedError):
226 226 self.imc.commit(
227 227 message=u'Trying to mark node as changed without touching it',
228 228 author=unicode(self))
229 229
230 230 def test_change_raise_node_already_removed(self):
231 231 node = FileNode('foobar', content='baz')
232 232 self.imc.add(node)
233 233 self.imc.commit(message=u'Added foobar', author=unicode(self))
234 234 self.imc.remove(FileNode('foobar'))
235 235 with pytest.raises(NodeAlreadyRemovedError):
236 236 self.imc.change(node)
237 237
238 238 def test_remove(self, nodes):
239 239 self.test_add(nodes) # Performs first commit
240 240
241 241 tip = self.repo.get_commit()
242 242 node = nodes[0]
243 243 assert node.content == tip.get_node(node.path).content
244 244 self.imc.remove(node)
245 245 self.imc.commit(
246 246 message=u'Removed %s' % node.path, author=unicode(self))
247 247
248 248 newtip = self.repo.get_commit()
249 249 assert tip != newtip
250 250 assert tip.id != newtip.id
251 251 with pytest.raises(NodeDoesNotExistError):
252 252 newtip.get_node(node.path)
253 253
254 254 def test_remove_last_file_from_directory(self):
255 255 node = FileNode('omg/qwe/foo/bar', content='foobar')
256 256 self.imc.add(node)
257 257 self.imc.commit(u'added', u'joe doe')
258 258
259 259 self.imc.remove(node)
260 260 tip = self.imc.commit(u'removed', u'joe doe')
261 261 with pytest.raises(NodeDoesNotExistError):
262 262 tip.get_node('omg/qwe/foo/bar')
263 263
264 264 def test_remove_raise_node_does_not_exist(self, nodes):
265 265 self.imc.remove(nodes[0])
266 266 with pytest.raises(NodeDoesNotExistError):
267 267 self.imc.commit(
268 268 message='Trying to remove node at empty repository',
269 269 author=str(self))
270 270
271 271 def test_check_integrity_remove_raise_node_does_not_exist(self, nodes):
272 272 self.test_add(nodes) # Performs first commit
273 273
274 274 node = FileNode('no-such-file')
275 275 self.imc.remove(node)
276 276 with pytest.raises(NodeDoesNotExistError):
277 277 self.imc.commit(
278 278 message=u'Trying to remove not existing node',
279 279 author=unicode(self))
280 280
281 281 def test_remove_raise_node_already_removed(self, nodes):
282 282 self.test_add(nodes) # Performs first commit
283 283
284 284 node = FileNode(nodes[0].path)
285 285 self.imc.remove(node)
286 286 with pytest.raises(NodeAlreadyRemovedError):
287 287 self.imc.remove(node)
288 288
289 289 def test_remove_raise_node_already_changed(self, nodes):
290 290 self.test_add(nodes) # Performs first commit
291 291
292 292 node = FileNode(nodes[0].path, content='Bending time')
293 293 self.imc.change(node)
294 294 with pytest.raises(NodeAlreadyChangedError):
295 295 self.imc.remove(node)
296 296
297 297 def test_reset(self):
298 298 self.imc.add(FileNode('foo', content='bar'))
299 299 # self.imc.change(FileNode('baz', content='new'))
300 300 # self.imc.remove(FileNode('qwe'))
301 301 self.imc.reset()
302 302 assert not any((self.imc.added, self.imc.changed, self.imc.removed))
303 303
304 304 def test_multiple_commits(self):
305 305 N = 3 # number of commits to perform
306 306 last = None
307 307 for x in xrange(N):
308 308 fname = 'file%s' % str(x).rjust(5, '0')
309 309 content = 'foobar\n' * x
310 310 node = FileNode(fname, content=content)
311 311 self.imc.add(node)
312 312 commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
313 313 assert last != commit
314 314 last = commit
315 315
316 316 # Check commit number for same repo
317 317 assert len(self.repo.commit_ids) == N
318 318
319 319 # Check commit number for recreated repo
320 320 repo = self.Backend(self.repo_path)
321 321 assert len(repo.commit_ids) == N
322 322
323 def test_date_attr(self):
323 def test_date_attr(self, local_dt_to_utc):
324 324 node = FileNode('foobar.txt', content='Foobared!')
325 325 self.imc.add(node)
326 326 date = datetime.datetime(1985, 1, 30, 1, 45)
327 327 commit = self.imc.commit(
328 328 u"Committed at time when I was born ;-)",
329 329 author=u'lb', date=date)
330 330
331 assert commit.date == date
331 assert commit.date == local_dt_to_utc(date)
332 332
333 333 def assert_succesful_commit(self, added_nodes):
334 334 newtip = self.repo.get_commit()
335 335 assert self.commit == newtip
336 336 assert self.old_commit_count + 1 == len(self.repo.commit_ids)
337 337 assert newtip.message == self.commit_message
338 338 assert newtip.author == self.commit_author
339 339 assert not any((self.imc.added, self.imc.changed, self.imc.removed))
340 340 self.assert_nodes_in_commit(newtip, added_nodes)
341 341
342 342 def assert_nodes_in_commit(self, commit, nodes):
343 343 for node in nodes:
344 344 assert commit.get_node(node.path).content == node.content
@@ -1,532 +1,534 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import datetime
22 22 from urllib2 import URLError
23 23
24 24 import mock
25 25 import pytest
26 26
27 27 from rhodecode.lib.vcs import backends
28 28 from rhodecode.lib.vcs.backends.base import (
29 29 Config, BaseInMemoryCommit, Reference, MergeResponse, MergeFailureReason)
30 30 from rhodecode.lib.vcs.exceptions import VCSError, RepositoryError
31 31 from rhodecode.lib.vcs.nodes import FileNode
32 32 from rhodecode.tests.vcs.base import BackendTestMixin
33 33
34 34
35 35 class TestRepositoryBase(BackendTestMixin):
36 36 recreate_repo_per_test = False
37 37
38 38 def test_init_accepts_unicode_path(self, tmpdir):
39 39 path = unicode(tmpdir.join(u'unicode Γ€'))
40 40 self.Backend(path, create=True)
41 41
42 42 def test_init_accepts_str_path(self, tmpdir):
43 43 path = str(tmpdir.join('str Γ€'))
44 44 self.Backend(path, create=True)
45 45
46 46 def test_init_fails_if_path_does_not_exist(self, tmpdir):
47 47 path = unicode(tmpdir.join('i-do-not-exist'))
48 48 with pytest.raises(VCSError):
49 49 self.Backend(path)
50 50
51 51 def test_init_fails_if_path_is_not_a_valid_repository(self, tmpdir):
52 52 path = unicode(tmpdir.mkdir(u'unicode Γ€'))
53 53 with pytest.raises(VCSError):
54 54 self.Backend(path)
55 55
56 56 def test_has_commits_attribute(self):
57 57 self.repo.commit_ids
58 58
59 59 def test_name(self):
60 60 assert self.repo.name.startswith('vcs-test')
61 61
62 62 @pytest.mark.backends("hg", "git")
63 63 def test_has_default_branch_name(self):
64 64 assert self.repo.DEFAULT_BRANCH_NAME is not None
65 65
66 66 @pytest.mark.backends("svn")
67 67 def test_has_no_default_branch_name(self):
68 68 assert self.repo.DEFAULT_BRANCH_NAME is None
69 69
70 70 def test_has_empty_commit(self):
71 71 assert self.repo.EMPTY_COMMIT_ID is not None
72 72 assert self.repo.EMPTY_COMMIT is not None
73 73
74 74 def test_empty_changeset_is_deprecated(self):
75 75 def get_empty_changeset(repo):
76 76 return repo.EMPTY_CHANGESET
77 77 pytest.deprecated_call(get_empty_changeset, self.repo)
78 78
79 79 def test_bookmarks(self):
80 80 assert len(self.repo.bookmarks) == 0
81 81
82 82 # TODO: Cover two cases: Local repo path, remote URL
83 83 def test_check_url(self):
84 84 config = Config()
85 85 assert self.Backend.check_url(self.repo.path, config)
86 86
87 87 def test_check_url_invalid(self):
88 88 config = Config()
89 89 with pytest.raises(URLError):
90 90 self.Backend.check_url(self.repo.path + "invalid", config)
91 91
92 92 def test_get_contact(self):
93 self.repo.contact
93 assert self.repo.contact
94 94
95 95 def test_get_description(self):
96 self.repo.description
96 assert self.repo.description
97 97
98 98 def test_get_hook_location(self):
99 99 assert len(self.repo.get_hook_location()) != 0
100 100
101 def test_last_change(self):
102 assert self.repo.last_change >= datetime.datetime(2010, 1, 1, 21, 0)
101 def test_last_change(self, local_dt_to_utc):
102 assert self.repo.last_change >= local_dt_to_utc(
103 datetime.datetime(2010, 1, 1, 21, 0))
103 104
104 def test_last_change_in_empty_repository(self, vcsbackend):
105 def test_last_change_in_empty_repository(self, vcsbackend, local_dt_to_utc):
105 106 delta = datetime.timedelta(seconds=1)
106 start = datetime.datetime.now()
107
108 start = local_dt_to_utc(datetime.datetime.now())
107 109 empty_repo = vcsbackend.create_repo()
108 now = datetime.datetime.now()
110 now = local_dt_to_utc(datetime.datetime.now())
109 111 assert empty_repo.last_change >= start - delta
110 112 assert empty_repo.last_change <= now + delta
111 113
112 114 def test_repo_equality(self):
113 115 assert self.repo == self.repo
114 116
115 117 def test_repo_equality_broken_object(self):
116 118 import copy
117 119 _repo = copy.copy(self.repo)
118 120 delattr(_repo, 'path')
119 121 assert self.repo != _repo
120 122
121 123 def test_repo_equality_other_object(self):
122 124 class dummy(object):
123 125 path = self.repo.path
124 126 assert self.repo != dummy()
125 127
126 128 def test_get_commit_is_implemented(self):
127 129 self.repo.get_commit()
128 130
129 131 def test_get_commits_is_implemented(self):
130 132 commit_iter = iter(self.repo.get_commits())
131 133 commit = next(commit_iter)
132 134 assert commit.idx == 0
133 135
134 136 def test_supports_iteration(self):
135 137 repo_iter = iter(self.repo)
136 138 commit = next(repo_iter)
137 139 assert commit.idx == 0
138 140
139 141 def test_in_memory_commit(self):
140 142 imc = self.repo.in_memory_commit
141 143 assert isinstance(imc, BaseInMemoryCommit)
142 144
143 145 @pytest.mark.backends("hg")
144 146 def test__get_url_unicode(self):
145 147 url = u'/home/repos/malmΓΆ'
146 148 assert self.repo._get_url(url)
147 149
148 150
149 151 class TestDeprecatedRepositoryAPI(BackendTestMixin):
150 152 recreate_repo_per_test = False
151 153
152 154 def test_revisions_is_deprecated(self):
153 155 def get_revisions(repo):
154 156 return repo.revisions
155 157 pytest.deprecated_call(get_revisions, self.repo)
156 158
157 159 def test_get_changeset_is_deprecated(self):
158 160 pytest.deprecated_call(self.repo.get_changeset)
159 161
160 162 def test_get_changesets_is_deprecated(self):
161 163 pytest.deprecated_call(self.repo.get_changesets)
162 164
163 165 def test_in_memory_changeset_is_deprecated(self):
164 166 def get_imc(repo):
165 167 return repo.in_memory_changeset
166 168 pytest.deprecated_call(get_imc, self.repo)
167 169
168 170
169 171 # TODO: these tests are incomplete, must check the resulting compare result for
170 172 # correcteness
171 173 class TestRepositoryCompare:
172 174
173 175 @pytest.mark.parametrize('merge', [True, False])
174 176 def test_compare_commits_of_same_repository(self, vcsbackend, merge):
175 177 target_repo = vcsbackend.create_repo(number_of_commits=5)
176 178 target_repo.compare(
177 179 target_repo[1].raw_id, target_repo[3].raw_id, target_repo,
178 180 merge=merge)
179 181
180 182 @pytest.mark.xfail_backends('svn')
181 183 @pytest.mark.parametrize('merge', [True, False])
182 184 def test_compare_cloned_repositories(self, vcsbackend, merge):
183 185 target_repo = vcsbackend.create_repo(number_of_commits=5)
184 186 source_repo = vcsbackend.clone_repo(target_repo)
185 187 assert target_repo != source_repo
186 188
187 189 vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
188 190 source_commit = source_repo.get_commit()
189 191
190 192 target_repo.compare(
191 193 target_repo[1].raw_id, source_repo[3].raw_id, source_repo,
192 194 merge=merge)
193 195
194 196 @pytest.mark.xfail_backends('svn')
195 197 @pytest.mark.parametrize('merge', [True, False])
196 198 def test_compare_unrelated_repositories(self, vcsbackend, merge):
197 199 orig = vcsbackend.create_repo(number_of_commits=5)
198 200 unrelated = vcsbackend.create_repo(number_of_commits=5)
199 201 assert orig != unrelated
200 202
201 203 orig.compare(
202 204 orig[1].raw_id, unrelated[3].raw_id, unrelated, merge=merge)
203 205
204 206
205 207 class TestRepositoryGetCommonAncestor:
206 208
207 209 def test_get_common_ancestor_from_same_repo_existing(self, vcsbackend):
208 210 target_repo = vcsbackend.create_repo(number_of_commits=5)
209 211
210 212 expected_ancestor = target_repo[2].raw_id
211 213
212 214 assert target_repo.get_common_ancestor(
213 215 commit_id1=target_repo[2].raw_id,
214 216 commit_id2=target_repo[4].raw_id,
215 217 repo2=target_repo
216 218 ) == expected_ancestor
217 219
218 220 assert target_repo.get_common_ancestor(
219 221 commit_id1=target_repo[4].raw_id,
220 222 commit_id2=target_repo[2].raw_id,
221 223 repo2=target_repo
222 224 ) == expected_ancestor
223 225
224 226 @pytest.mark.xfail_backends("svn")
225 227 def test_get_common_ancestor_from_cloned_repo_existing(self, vcsbackend):
226 228 target_repo = vcsbackend.create_repo(number_of_commits=5)
227 229 source_repo = vcsbackend.clone_repo(target_repo)
228 230 assert target_repo != source_repo
229 231
230 232 vcsbackend.add_file(source_repo, 'newfile', 'somecontent')
231 233 source_commit = source_repo.get_commit()
232 234
233 235 expected_ancestor = target_repo[4].raw_id
234 236
235 237 assert target_repo.get_common_ancestor(
236 238 commit_id1=target_repo[4].raw_id,
237 239 commit_id2=source_commit.raw_id,
238 240 repo2=source_repo
239 241 ) == expected_ancestor
240 242
241 243 assert target_repo.get_common_ancestor(
242 244 commit_id1=source_commit.raw_id,
243 245 commit_id2=target_repo[4].raw_id,
244 246 repo2=target_repo
245 247 ) == expected_ancestor
246 248
247 249 @pytest.mark.xfail_backends("svn")
248 250 def test_get_common_ancestor_from_unrelated_repo_missing(self, vcsbackend):
249 251 original = vcsbackend.create_repo(number_of_commits=5)
250 252 unrelated = vcsbackend.create_repo(number_of_commits=5)
251 253 assert original != unrelated
252 254
253 255 assert original.get_common_ancestor(
254 256 commit_id1=original[0].raw_id,
255 257 commit_id2=unrelated[0].raw_id,
256 258 repo2=unrelated
257 259 ) == None
258 260
259 261 assert original.get_common_ancestor(
260 262 commit_id1=original[-1].raw_id,
261 263 commit_id2=unrelated[-1].raw_id,
262 264 repo2=unrelated
263 265 ) == None
264 266
265 267
266 268 @pytest.mark.backends("git", "hg")
267 269 class TestRepositoryMerge:
268 270 def prepare_for_success(self, vcsbackend):
269 271 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
270 272 self.source_repo = vcsbackend.clone_repo(self.target_repo)
271 273 vcsbackend.add_file(self.target_repo, 'README_MERGE1', 'Version 1')
272 274 vcsbackend.add_file(self.source_repo, 'README_MERGE2', 'Version 2')
273 275 imc = self.source_repo.in_memory_commit
274 276 imc.add(FileNode('file_x', content=self.source_repo.name))
275 277 imc.commit(
276 278 message=u'Automatic commit from repo merge test',
277 279 author=u'Automatic')
278 280 self.target_commit = self.target_repo.get_commit()
279 281 self.source_commit = self.source_repo.get_commit()
280 282 # This only works for Git and Mercurial
281 283 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
282 284 self.target_ref = Reference(
283 285 'branch', default_branch, self.target_commit.raw_id)
284 286 self.source_ref = Reference(
285 287 'branch', default_branch, self.source_commit.raw_id)
286 288 self.workspace = 'test-merge'
287 289
288 290 def prepare_for_conflict(self, vcsbackend):
289 291 self.target_repo = vcsbackend.create_repo(number_of_commits=1)
290 292 self.source_repo = vcsbackend.clone_repo(self.target_repo)
291 293 vcsbackend.add_file(self.target_repo, 'README_MERGE', 'Version 1')
292 294 vcsbackend.add_file(self.source_repo, 'README_MERGE', 'Version 2')
293 295 self.target_commit = self.target_repo.get_commit()
294 296 self.source_commit = self.source_repo.get_commit()
295 297 # This only works for Git and Mercurial
296 298 default_branch = self.target_repo.DEFAULT_BRANCH_NAME
297 299 self.target_ref = Reference(
298 300 'branch', default_branch, self.target_commit.raw_id)
299 301 self.source_ref = Reference(
300 302 'branch', default_branch, self.source_commit.raw_id)
301 303 self.workspace = 'test-merge'
302 304
303 305 def test_merge_success(self, vcsbackend):
304 306 self.prepare_for_success(vcsbackend)
305 307
306 308 merge_response = self.target_repo.merge(
307 309 self.target_ref, self.source_repo, self.source_ref, self.workspace,
308 310 'test user', 'test@rhodecode.com', 'merge message 1',
309 311 dry_run=False)
310 312 expected_merge_response = MergeResponse(
311 313 True, True, merge_response.merge_ref,
312 314 MergeFailureReason.NONE)
313 315 assert merge_response == expected_merge_response
314 316
315 317 target_repo = backends.get_backend(vcsbackend.alias)(
316 318 self.target_repo.path)
317 319 target_commits = list(target_repo.get_commits())
318 320 commit_ids = [c.raw_id for c in target_commits[:-1]]
319 321 assert self.source_ref.commit_id in commit_ids
320 322 assert self.target_ref.commit_id in commit_ids
321 323
322 324 merge_commit = target_commits[-1]
323 325 assert merge_commit.raw_id == merge_response.merge_ref.commit_id
324 326 assert merge_commit.message.strip() == 'merge message 1'
325 327 assert merge_commit.author == 'test user <test@rhodecode.com>'
326 328
327 329 # We call it twice so to make sure we can handle updates
328 330 target_ref = Reference(
329 331 self.target_ref.type, self.target_ref.name,
330 332 merge_response.merge_ref.commit_id)
331 333
332 334 merge_response = target_repo.merge(
333 335 target_ref, self.source_repo, self.source_ref, self.workspace,
334 336 'test user', 'test@rhodecode.com', 'merge message 2',
335 337 dry_run=False)
336 338 expected_merge_response = MergeResponse(
337 339 True, True, merge_response.merge_ref,
338 340 MergeFailureReason.NONE)
339 341 assert merge_response == expected_merge_response
340 342
341 343 target_repo = backends.get_backend(
342 344 vcsbackend.alias)(self.target_repo.path)
343 345 merge_commit = target_repo.get_commit(
344 346 merge_response.merge_ref.commit_id)
345 347 assert merge_commit.message.strip() == 'merge message 1'
346 348 assert merge_commit.author == 'test user <test@rhodecode.com>'
347 349
348 350 def test_merge_success_dry_run(self, vcsbackend):
349 351 self.prepare_for_success(vcsbackend)
350 352
351 353 merge_response = self.target_repo.merge(
352 354 self.target_ref, self.source_repo, self.source_ref, self.workspace,
353 355 dry_run=True)
354 356
355 357 # We call it twice so to make sure we can handle updates
356 358 merge_response_update = self.target_repo.merge(
357 359 self.target_ref, self.source_repo, self.source_ref, self.workspace,
358 360 dry_run=True)
359 361
360 362 # Multiple merges may differ in their commit id. Therefore we set the
361 363 # commit id to `None` before comparing the merge responses.
362 364 merge_response = merge_response._replace(
363 365 merge_ref=merge_response.merge_ref._replace(commit_id=None))
364 366 merge_response_update = merge_response_update._replace(
365 367 merge_ref=merge_response_update.merge_ref._replace(commit_id=None))
366 368
367 369 assert merge_response == merge_response_update
368 370 assert merge_response.possible is True
369 371 assert merge_response.executed is False
370 372 assert merge_response.merge_ref
371 373 assert merge_response.failure_reason is MergeFailureReason.NONE
372 374
373 375 @pytest.mark.parametrize('dry_run', [True, False])
374 376 def test_merge_conflict(self, vcsbackend, dry_run):
375 377 self.prepare_for_conflict(vcsbackend)
376 378 expected_merge_response = MergeResponse(
377 379 False, False, None, MergeFailureReason.MERGE_FAILED)
378 380
379 381 merge_response = self.target_repo.merge(
380 382 self.target_ref, self.source_repo, self.source_ref, self.workspace,
381 383 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
382 384 assert merge_response == expected_merge_response
383 385
384 386 # We call it twice so to make sure we can handle updates
385 387 merge_response = self.target_repo.merge(
386 388 self.target_ref, self.source_repo, self.source_ref, self.workspace,
387 389 'test_user', 'test@rhodecode.com', 'test message', dry_run=dry_run)
388 390 assert merge_response == expected_merge_response
389 391
390 392 def test_merge_target_is_not_head(self, vcsbackend):
391 393 self.prepare_for_success(vcsbackend)
392 394 expected_merge_response = MergeResponse(
393 395 False, False, None, MergeFailureReason.TARGET_IS_NOT_HEAD)
394 396
395 397 target_ref = Reference(
396 398 self.target_ref.type, self.target_ref.name, '0' * 40)
397 399
398 400 merge_response = self.target_repo.merge(
399 401 target_ref, self.source_repo, self.source_ref, self.workspace,
400 402 dry_run=True)
401 403
402 404 assert merge_response == expected_merge_response
403 405
404 406 def test_merge_missing_source_reference(self, vcsbackend):
405 407 self.prepare_for_success(vcsbackend)
406 408 expected_merge_response = MergeResponse(
407 409 False, False, None, MergeFailureReason.MISSING_SOURCE_REF)
408 410
409 411 source_ref = Reference(
410 412 self.source_ref.type, 'not_existing', self.source_ref.commit_id)
411 413
412 414 merge_response = self.target_repo.merge(
413 415 self.target_ref, self.source_repo, source_ref, self.workspace,
414 416 dry_run=True)
415 417
416 418 assert merge_response == expected_merge_response
417 419
418 420 def test_merge_raises_exception(self, vcsbackend):
419 421 self.prepare_for_success(vcsbackend)
420 422 expected_merge_response = MergeResponse(
421 423 False, False, None, MergeFailureReason.UNKNOWN)
422 424
423 425 with mock.patch.object(self.target_repo, '_merge_repo',
424 426 side_effect=RepositoryError()):
425 427 merge_response = self.target_repo.merge(
426 428 self.target_ref, self.source_repo, self.source_ref,
427 429 self.workspace, dry_run=True)
428 430
429 431 assert merge_response == expected_merge_response
430 432
431 433 def test_merge_invalid_user_name(self, vcsbackend):
432 434 repo = vcsbackend.create_repo(number_of_commits=1)
433 435 ref = Reference('branch', 'master', 'not_used')
434 436 with pytest.raises(ValueError):
435 437 repo.merge(ref, self, ref, 'workspace_id')
436 438
437 439 def test_merge_invalid_user_email(self, vcsbackend):
438 440 repo = vcsbackend.create_repo(number_of_commits=1)
439 441 ref = Reference('branch', 'master', 'not_used')
440 442 with pytest.raises(ValueError):
441 443 repo.merge(ref, self, ref, 'workspace_id', 'user name')
442 444
443 445 def test_merge_invalid_message(self, vcsbackend):
444 446 repo = vcsbackend.create_repo(number_of_commits=1)
445 447 ref = Reference('branch', 'master', 'not_used')
446 448 with pytest.raises(ValueError):
447 449 repo.merge(
448 450 ref, self, ref, 'workspace_id', 'user name', 'user@email.com')
449 451
450 452
451 453 class TestRepositoryStrip(BackendTestMixin):
452 454 recreate_repo_per_test = True
453 455
454 456 @classmethod
455 457 def _get_commits(cls):
456 458 commits = [
457 459 {
458 460 'message': 'Initial commit',
459 461 'author': 'Joe Doe <joe.doe@example.com>',
460 462 'date': datetime.datetime(2010, 1, 1, 20),
461 463 'branch': 'master',
462 464 'added': [
463 465 FileNode('foobar', content='foobar'),
464 466 FileNode('foobar2', content='foobar2'),
465 467 ],
466 468 },
467 469 ]
468 470 for x in xrange(10):
469 471 commit_data = {
470 472 'message': 'Changed foobar - commit%s' % x,
471 473 'author': 'Jane Doe <jane.doe@example.com>',
472 474 'date': datetime.datetime(2010, 1, 1, 21, x),
473 475 'branch': 'master',
474 476 'changed': [
475 477 FileNode('foobar', 'FOOBAR - %s' % x),
476 478 ],
477 479 }
478 480 commits.append(commit_data)
479 481 return commits
480 482
481 483 @pytest.mark.backends("git", "hg")
482 484 def test_strip_commit(self):
483 485 tip = self.repo.get_commit()
484 486 assert tip.idx == 10
485 487 self.repo.strip(tip.raw_id, self.repo.DEFAULT_BRANCH_NAME)
486 488
487 489 tip = self.repo.get_commit()
488 490 assert tip.idx == 9
489 491
490 492 @pytest.mark.backends("git", "hg")
491 493 def test_strip_multiple_commits(self):
492 494 tip = self.repo.get_commit()
493 495 assert tip.idx == 10
494 496
495 497 old = self.repo.get_commit(commit_idx=5)
496 498 self.repo.strip(old.raw_id, self.repo.DEFAULT_BRANCH_NAME)
497 499
498 500 tip = self.repo.get_commit()
499 501 assert tip.idx == 4
500 502
501 503
502 504 @pytest.mark.backends('hg', 'git')
503 505 class TestRepositoryPull:
504 506
505 507 def test_pull(self, vcsbackend):
506 508 source_repo = vcsbackend.repo
507 509 target_repo = vcsbackend.create_repo()
508 510 assert len(source_repo.commit_ids) > len(target_repo.commit_ids)
509 511
510 512 target_repo.pull(source_repo.path)
511 513 # Note: Get a fresh instance, avoids caching trouble
512 514 target_repo = vcsbackend.backend(target_repo.path)
513 515 assert len(source_repo.commit_ids) == len(target_repo.commit_ids)
514 516
515 517 def test_pull_wrong_path(self, vcsbackend):
516 518 target_repo = vcsbackend.create_repo()
517 519 with pytest.raises(RepositoryError):
518 520 target_repo.pull(target_repo.path + "wrong")
519 521
520 522 def test_pull_specific_commits(self, vcsbackend):
521 523 source_repo = vcsbackend.repo
522 524 target_repo = vcsbackend.create_repo()
523 525
524 526 second_commit = source_repo[1].raw_id
525 527 if vcsbackend.alias == 'git':
526 528 second_commit_ref = 'refs/test-refs/a'
527 529 source_repo.set_refs(second_commit_ref, second_commit)
528 530
529 531 target_repo.pull(source_repo.path, commit_ids=[second_commit])
530 532 target_repo = vcsbackend.backend(target_repo.path)
531 533 assert 2 == len(target_repo.commit_ids)
532 534 assert second_commit == target_repo.get_commit().raw_id
General Comments 0
You need to be logged in to leave comments. Login now