##// END OF EJS Templates
tests: fixed some pytest deprecated calls, and warnings.
dan -
r3098:97c1a8b7 default
parent child Browse files
Show More
@@ -1,203 +1,203 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.model.repo import RepoModel
25 25 from rhodecode.tests import TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN
26 26 from rhodecode.api.tests.utils import (
27 27 build_data, api_call, assert_error, assert_ok, crash, jsonify)
28 28 from rhodecode.tests.fixture import Fixture
29 from rhodecode.tests.plugin import http_host_stub, http_host_only_stub
29 from rhodecode.tests.plugin import plain_http_host_only_stub
30 30
31 31 fixture = Fixture()
32 32
33 33 UPDATE_REPO_NAME = 'api_update_me'
34 34
35 35
36 36 class SAME_AS_UPDATES(object):
37 37 """ Constant used for tests below """
38 38
39 39
40 40 @pytest.mark.usefixtures("testuser_api", "app")
41 41 class TestApiUpdateRepo(object):
42 42
43 43 @pytest.mark.parametrize("updates, expected", [
44 44 ({'owner': TEST_USER_REGULAR_LOGIN},
45 45 SAME_AS_UPDATES),
46 46
47 47 ({'description': 'new description'},
48 48 SAME_AS_UPDATES),
49 49
50 50 ({'clone_uri': 'http://foo.com/repo'},
51 51 SAME_AS_UPDATES),
52 52
53 53 ({'clone_uri': None},
54 54 {'clone_uri': ''}),
55 55
56 56 ({'clone_uri': ''},
57 57 {'clone_uri': ''}),
58 58
59 59 ({'clone_uri': 'http://example.com/repo_pull'},
60 60 {'clone_uri': 'http://example.com/repo_pull'}),
61 61
62 62 ({'push_uri': ''},
63 63 {'push_uri': ''}),
64 64
65 65 ({'push_uri': 'http://example.com/repo_push'},
66 66 {'push_uri': 'http://example.com/repo_push'}),
67 67
68 68 ({'landing_rev': 'rev:tip'},
69 69 {'landing_rev': ['rev', 'tip']}),
70 70
71 71 ({'enable_statistics': True},
72 72 SAME_AS_UPDATES),
73 73
74 74 ({'enable_locking': True},
75 75 SAME_AS_UPDATES),
76 76
77 77 ({'enable_downloads': True},
78 78 SAME_AS_UPDATES),
79 79
80 80 ({'repo_name': 'new_repo_name'},
81 81 {
82 82 'repo_name': 'new_repo_name',
83 'url': 'http://{}/new_repo_name'.format(http_host_only_stub())
83 'url': 'http://{}/new_repo_name'.format(plain_http_host_only_stub())
84 84 }),
85 85
86 86 ({'repo_name': 'test_group_for_update/{}'.format(UPDATE_REPO_NAME),
87 87 '_group': 'test_group_for_update'},
88 88 {
89 89 'repo_name': 'test_group_for_update/{}'.format(UPDATE_REPO_NAME),
90 90 'url': 'http://{}/test_group_for_update/{}'.format(
91 http_host_only_stub(), UPDATE_REPO_NAME)
91 plain_http_host_only_stub(), UPDATE_REPO_NAME)
92 92 }),
93 93 ])
94 94 def test_api_update_repo(self, updates, expected, backend):
95 95 repo_name = UPDATE_REPO_NAME
96 96 repo = fixture.create_repo(repo_name, repo_type=backend.alias)
97 97 if updates.get('_group'):
98 98 fixture.create_repo_group(updates['_group'])
99 99
100 100 expected_api_data = repo.get_api_data(include_secrets=True)
101 101 if expected is SAME_AS_UPDATES:
102 102 expected_api_data.update(updates)
103 103 else:
104 104 expected_api_data.update(expected)
105 105
106 106 id_, params = build_data(
107 107 self.apikey, 'update_repo', repoid=repo_name, **updates)
108 108
109 109 with mock.patch('rhodecode.model.validation_schema.validators.url_validator'):
110 110 response = api_call(self.app, params)
111 111
112 112 if updates.get('repo_name'):
113 113 repo_name = updates['repo_name']
114 114
115 115 try:
116 116 expected = {
117 117 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
118 118 'repository': jsonify(expected_api_data)
119 119 }
120 120 assert_ok(id_, expected, given=response.body)
121 121 finally:
122 122 fixture.destroy_repo(repo_name)
123 123 if updates.get('_group'):
124 124 fixture.destroy_repo_group(updates['_group'])
125 125
126 126 def test_api_update_repo_fork_of_field(self, backend):
127 127 master_repo = backend.create_repo()
128 128 repo = backend.create_repo()
129 129 updates = {
130 130 'fork_of': master_repo.repo_name,
131 131 'fork_of_id': master_repo.repo_id
132 132 }
133 133 expected_api_data = repo.get_api_data(include_secrets=True)
134 134 expected_api_data.update(updates)
135 135
136 136 id_, params = build_data(
137 137 self.apikey, 'update_repo', repoid=repo.repo_name, **updates)
138 138 response = api_call(self.app, params)
139 139 expected = {
140 140 'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
141 141 'repository': jsonify(expected_api_data)
142 142 }
143 143 assert_ok(id_, expected, given=response.body)
144 144 result = response.json['result']['repository']
145 145 assert result['fork_of'] == master_repo.repo_name
146 146 assert result['fork_of_id'] == master_repo.repo_id
147 147
148 148 def test_api_update_repo_fork_of_not_found(self, backend):
149 149 master_repo_name = 'fake-parent-repo'
150 150 repo = backend.create_repo()
151 151 updates = {
152 152 'fork_of': master_repo_name
153 153 }
154 154 id_, params = build_data(
155 155 self.apikey, 'update_repo', repoid=repo.repo_name, **updates)
156 156 response = api_call(self.app, params)
157 157 expected = {
158 158 'repo_fork_of': 'Fork with id `{}` does not exists'.format(
159 159 master_repo_name)}
160 160 assert_error(id_, expected, given=response.body)
161 161
162 162 def test_api_update_repo_with_repo_group_not_existing(self):
163 163 repo_name = 'admin_owned'
164 164 fake_repo_group = 'test_group_for_update'
165 165 fixture.create_repo(repo_name)
166 166 updates = {'repo_name': '{}/{}'.format(fake_repo_group, repo_name)}
167 167 id_, params = build_data(
168 168 self.apikey, 'update_repo', repoid=repo_name, **updates)
169 169 response = api_call(self.app, params)
170 170 try:
171 171 expected = {
172 172 'repo_group': 'Repository group `{}` does not exist'.format(fake_repo_group)
173 173 }
174 174 assert_error(id_, expected, given=response.body)
175 175 finally:
176 176 fixture.destroy_repo(repo_name)
177 177
178 178 def test_api_update_repo_regular_user_not_allowed(self):
179 179 repo_name = 'admin_owned'
180 180 fixture.create_repo(repo_name)
181 181 updates = {'active': False}
182 182 id_, params = build_data(
183 183 self.apikey_regular, 'update_repo', repoid=repo_name, **updates)
184 184 response = api_call(self.app, params)
185 185 try:
186 186 expected = 'repository `%s` does not exist' % (repo_name,)
187 187 assert_error(id_, expected, given=response.body)
188 188 finally:
189 189 fixture.destroy_repo(repo_name)
190 190
191 191 @mock.patch.object(RepoModel, 'update', crash)
192 192 def test_api_update_repo_exception_occurred(self, backend):
193 193 repo_name = UPDATE_REPO_NAME
194 194 fixture.create_repo(repo_name, repo_type=backend.alias)
195 195 id_, params = build_data(
196 196 self.apikey, 'update_repo', repoid=repo_name,
197 197 owner=TEST_USER_ADMIN_LOGIN,)
198 198 response = api_call(self.app, params)
199 199 try:
200 200 expected = 'failed to update repo `%s`' % (repo_name,)
201 201 assert_error(id_, expected, given=response.body)
202 202 finally:
203 203 fixture.destroy_repo(repo_name)
@@ -1,62 +1,70 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import pytest
23 23 from pyramid.compat import configparser
24 24
25 25 from rhodecode.apps.ssh_support.lib.ssh_wrapper import SshWrapper
26 26 from rhodecode.lib.utils2 import AttributeDict
27 27
28 28
29 29 @pytest.fixture
30 30 def dummy_conf_file(tmpdir):
31 31 conf = configparser.ConfigParser()
32 32 conf.add_section('app:main')
33 33 conf.set('app:main', 'ssh.executable.hg', '/usr/bin/hg')
34 34 conf.set('app:main', 'ssh.executable.git', '/usr/bin/git')
35 35 conf.set('app:main', 'ssh.executable.svn', '/usr/bin/svnserve')
36 36
37 37 f_path = os.path.join(str(tmpdir), 'ssh_wrapper_test.ini')
38 38 with open(f_path, 'wb') as f:
39 39 conf.write(f)
40 40
41 41 return os.path.join(f_path)
42 42
43 43
44 @pytest.fixture
45 def dummy_env():
44 def plain_dummy_env():
46 45 return {
47 46 'request':
48 47 AttributeDict(host_url='http://localhost', script_name='/')
49 48 }
50 49
51 50
52 51 @pytest.fixture
52 def dummy_env():
53 return plain_dummy_env()
54
55
56 def plain_dummy_user():
57 return AttributeDict(username='test_user')
58
59
60 @pytest.fixture
53 61 def dummy_user():
54 return AttributeDict(username='test_user')
62 return plain_dummy_user()
55 63
56 64
57 65 @pytest.fixture
58 66 def ssh_wrapper(app, dummy_conf_file, dummy_env):
59 67 conn_info = '127.0.0.1 22 10.0.0.1 443'
60 68 return SshWrapper(
61 69 'random command', conn_info, 'auto', 'admin', '1', key_id='1',
62 70 shell=False, ini_path=dummy_conf_file, env=dummy_env)
@@ -1,152 +1,152 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import json
22 22 import mock
23 23 import pytest
24 24
25 25 from rhodecode.apps.ssh_support.lib.backends.git import GitServer
26 from rhodecode.apps.ssh_support.tests.conftest import dummy_env, dummy_user
26 from rhodecode.apps.ssh_support.tests.conftest import plain_dummy_env, plain_dummy_user
27 27
28 28
29 29 class GitServerCreator(object):
30 30 root = '/tmp/repo/path/'
31 31 git_path = '/usr/local/bin/git'
32 32 config_data = {
33 33 'app:main': {
34 34 'ssh.executable.git': git_path,
35 35 'vcs.hooks.protocol': 'http',
36 36 }
37 37 }
38 38 repo_name = 'test_git'
39 39 repo_mode = 'receive-pack'
40 user = dummy_user()
40 user = plain_dummy_user()
41 41
42 42 def __init__(self):
43 43 def config_get(part, key):
44 44 return self.config_data.get(part, {}).get(key)
45 45 self.config_mock = mock.Mock()
46 46 self.config_mock.get = mock.Mock(side_effect=config_get)
47 47
48 48 def create(self, **kwargs):
49 49 parameters = {
50 50 'store': self.root,
51 51 'ini_path': '',
52 52 'user': self.user,
53 53 'repo_name': self.repo_name,
54 54 'repo_mode': self.repo_mode,
55 55 'user_permissions': {
56 56 self.repo_name: 'repository.admin'
57 57 },
58 58 'config': self.config_mock,
59 'env': dummy_env()
59 'env': plain_dummy_env()
60 60 }
61 61 parameters.update(kwargs)
62 62 server = GitServer(**parameters)
63 63 return server
64 64
65 65
66 66 @pytest.fixture
67 67 def git_server(app):
68 68 return GitServerCreator()
69 69
70 70
71 71 class TestGitServer(object):
72 72
73 73 def test_command(self, git_server):
74 74 server = git_server.create()
75 75 expected_command = (
76 76 'cd {root}; {git_path} {repo_mode} \'{root}{repo_name}\''.format(
77 77 root=git_server.root, git_path=git_server.git_path,
78 78 repo_mode=git_server.repo_mode, repo_name=git_server.repo_name)
79 79 )
80 80 assert expected_command == server.tunnel.command()
81 81
82 82 @pytest.mark.parametrize('permissions, action, code', [
83 83 ({}, 'pull', -2),
84 84 ({'test_git': 'repository.read'}, 'pull', 0),
85 85 ({'test_git': 'repository.read'}, 'push', -2),
86 86 ({'test_git': 'repository.write'}, 'push', 0),
87 87 ({'test_git': 'repository.admin'}, 'push', 0),
88 88
89 89 ])
90 90 def test_permission_checks(self, git_server, permissions, action, code):
91 91 server = git_server.create(user_permissions=permissions)
92 92 result = server._check_permissions(action)
93 93 assert result is code
94 94
95 95 @pytest.mark.parametrize('permissions, value', [
96 96 ({}, False),
97 97 ({'test_git': 'repository.read'}, False),
98 98 ({'test_git': 'repository.write'}, True),
99 99 ({'test_git': 'repository.admin'}, True),
100 100
101 101 ])
102 102 def test_has_write_permissions(self, git_server, permissions, value):
103 103 server = git_server.create(user_permissions=permissions)
104 104 result = server.has_write_perm()
105 105 assert result is value
106 106
107 107 def test_run_returns_executes_command(self, git_server):
108 108 server = git_server.create()
109 109 from rhodecode.apps.ssh_support.lib.backends.git import GitTunnelWrapper
110 110 with mock.patch.object(GitTunnelWrapper, 'create_hooks_env') as _patch:
111 111 _patch.return_value = 0
112 112 with mock.patch.object(GitTunnelWrapper, 'command', return_value='date'):
113 113 exit_code = server.run()
114 114
115 115 assert exit_code == (0, False)
116 116
117 117 @pytest.mark.parametrize(
118 118 'repo_mode, action', [
119 119 ['receive-pack', 'push'],
120 120 ['upload-pack', 'pull']
121 121 ])
122 122 def test_update_environment(self, git_server, repo_mode, action):
123 123 server = git_server.create(repo_mode=repo_mode)
124 124 store = server.store
125 125
126 126 with mock.patch('os.environ', {'SSH_CLIENT': '10.10.10.10 b'}):
127 127 with mock.patch('os.putenv') as putenv_mock:
128 128 server.update_environment(action)
129 129
130 130 expected_data = {
131 131 'username': git_server.user.username,
132 132 'user_id': git_server.user.user_id,
133 133 'scm': 'git',
134 134 'repository': git_server.repo_name,
135 135 'make_lock': None,
136 136 'action': action,
137 137 'ip': '10.10.10.10',
138 138 'locked_by': [None, None],
139 139 'config': '',
140 140 'repo_store': store,
141 141 'server_url': None,
142 142 'hooks': ['push', 'pull'],
143 143 'is_shadow_repo': False,
144 144 'hooks_module': 'rhodecode.lib.hooks_daemon',
145 145 'check_branch_perms': False,
146 146 'detect_force_push': False,
147 147 'user_agent': u'ssh-user-agent',
148 148 'SSH': True,
149 149 'SSH_PERMISSIONS': 'repository.admin',
150 150 }
151 151 args, kwargs = putenv_mock.call_args
152 152 assert json.loads(args[1]) == expected_data
@@ -1,116 +1,116 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.apps.ssh_support.lib.backends.hg import MercurialServer
25 from rhodecode.apps.ssh_support.tests.conftest import dummy_env, dummy_user
25 from rhodecode.apps.ssh_support.tests.conftest import plain_dummy_env, plain_dummy_user
26 26
27 27
28 28 class MercurialServerCreator(object):
29 29 root = '/tmp/repo/path/'
30 30 hg_path = '/usr/local/bin/hg'
31 31
32 32 config_data = {
33 33 'app:main': {
34 34 'ssh.executable.hg': hg_path,
35 35 'vcs.hooks.protocol': 'http',
36 36 }
37 37 }
38 38 repo_name = 'test_hg'
39 user = dummy_user()
39 user = plain_dummy_user()
40 40
41 41 def __init__(self):
42 42 def config_get(part, key):
43 43 return self.config_data.get(part, {}).get(key)
44 44 self.config_mock = mock.Mock()
45 45 self.config_mock.get = mock.Mock(side_effect=config_get)
46 46
47 47 def create(self, **kwargs):
48 48 parameters = {
49 49 'store': self.root,
50 50 'ini_path': '',
51 51 'user': self.user,
52 52 'repo_name': self.repo_name,
53 53 'user_permissions': {
54 54 'test_hg': 'repository.admin'
55 55 },
56 56 'config': self.config_mock,
57 'env': dummy_env()
57 'env': plain_dummy_env()
58 58 }
59 59 parameters.update(kwargs)
60 60 server = MercurialServer(**parameters)
61 61 return server
62 62
63 63
64 64 @pytest.fixture
65 65 def hg_server(app):
66 66 return MercurialServerCreator()
67 67
68 68
69 69 class TestMercurialServer(object):
70 70
71 71 def test_command(self, hg_server):
72 72 server = hg_server.create()
73 73 expected_command = (
74 74 'cd {root}; {hg_path} -R {root}{repo_name} serve --stdio'.format(
75 75 root=hg_server.root, hg_path=hg_server.hg_path,
76 76 repo_name=hg_server.repo_name)
77 77 )
78 78 assert expected_command == server.tunnel.command()
79 79
80 80 @pytest.mark.parametrize('permissions, action, code', [
81 81 ({}, 'pull', -2),
82 82 ({'test_hg': 'repository.read'}, 'pull', 0),
83 83 ({'test_hg': 'repository.read'}, 'push', -2),
84 84 ({'test_hg': 'repository.write'}, 'push', 0),
85 85 ({'test_hg': 'repository.admin'}, 'push', 0),
86 86
87 87 ])
88 88 def test_permission_checks(self, hg_server, permissions, action, code):
89 89 server = hg_server.create(user_permissions=permissions)
90 90 result = server._check_permissions(action)
91 91 assert result is code
92 92
93 93 @pytest.mark.parametrize('permissions, value', [
94 94 ({}, False),
95 95 ({'test_hg': 'repository.read'}, False),
96 96 ({'test_hg': 'repository.write'}, True),
97 97 ({'test_hg': 'repository.admin'}, True),
98 98
99 99 ])
100 100 def test_has_write_permissions(self, hg_server, permissions, value):
101 101 server = hg_server.create(user_permissions=permissions)
102 102 result = server.has_write_perm()
103 103 assert result is value
104 104
105 105 def test_run_returns_executes_command(self, hg_server):
106 106 server = hg_server.create()
107 107 from rhodecode.apps.ssh_support.lib.backends.hg import MercurialTunnelWrapper
108 108 with mock.patch.object(MercurialTunnelWrapper, 'create_hooks_env') as _patch:
109 109 _patch.return_value = 0
110 110 with mock.patch.object(MercurialTunnelWrapper, 'command', return_value='date'):
111 111 exit_code = server.run()
112 112
113 113 assert exit_code == (0, False)
114 114
115 115
116 116
@@ -1,124 +1,124 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import mock
22 22 import pytest
23 23
24 24 from rhodecode.apps.ssh_support.lib.backends.svn import SubversionServer
25 from rhodecode.apps.ssh_support.tests.conftest import dummy_env, dummy_user
25 from rhodecode.apps.ssh_support.tests.conftest import plain_dummy_env, plain_dummy_user
26 26
27 27
28 28 class SubversionServerCreator(object):
29 29 root = '/tmp/repo/path/'
30 30 svn_path = '/usr/local/bin/svnserve'
31 31 config_data = {
32 32 'app:main': {
33 33 'ssh.executable.svn': svn_path,
34 34 'vcs.hooks.protocol': 'http',
35 35 }
36 36 }
37 37 repo_name = 'test-svn'
38 user = dummy_user()
38 user = plain_dummy_user()
39 39
40 40 def __init__(self):
41 41 def config_get(part, key):
42 42 return self.config_data.get(part, {}).get(key)
43 43 self.config_mock = mock.Mock()
44 44 self.config_mock.get = mock.Mock(side_effect=config_get)
45 45
46 46 def create(self, **kwargs):
47 47 parameters = {
48 48 'store': self.root,
49 49 'repo_name': self.repo_name,
50 50 'ini_path': '',
51 51 'user': self.user,
52 52 'user_permissions': {
53 53 self.repo_name: 'repository.admin'
54 54 },
55 55 'config': self.config_mock,
56 'env': dummy_env()
56 'env': plain_dummy_env()
57 57 }
58 58
59 59 parameters.update(kwargs)
60 60 server = SubversionServer(**parameters)
61 61 return server
62 62
63 63
64 64 @pytest.fixture
65 65 def svn_server(app):
66 66 return SubversionServerCreator()
67 67
68 68
69 69 class TestSubversionServer(object):
70 70 def test_command(self, svn_server):
71 71 server = svn_server.create()
72 72 expected_command = [
73 73 svn_server.svn_path, '-t', '--config-file',
74 74 server.tunnel.svn_conf_path, '-r', svn_server.root
75 75 ]
76 76
77 77 assert expected_command == server.tunnel.command()
78 78
79 79 @pytest.mark.parametrize('permissions, action, code', [
80 80 ({}, 'pull', -2),
81 81 ({'test-svn': 'repository.read'}, 'pull', 0),
82 82 ({'test-svn': 'repository.read'}, 'push', -2),
83 83 ({'test-svn': 'repository.write'}, 'push', 0),
84 84 ({'test-svn': 'repository.admin'}, 'push', 0),
85 85
86 86 ])
87 87 def test_permission_checks(self, svn_server, permissions, action, code):
88 88 server = svn_server.create(user_permissions=permissions)
89 89 result = server._check_permissions(action)
90 90 assert result is code
91 91
92 92 def test_run_returns_executes_command(self, svn_server):
93 93 server = svn_server.create()
94 94 from rhodecode.apps.ssh_support.lib.backends.svn import SubversionTunnelWrapper
95 95 with mock.patch.object(
96 96 SubversionTunnelWrapper, 'get_first_client_response',
97 97 return_value={'url': 'http://server/test-svn'}):
98 98 with mock.patch.object(
99 99 SubversionTunnelWrapper, 'patch_first_client_response',
100 100 return_value=0):
101 101 with mock.patch.object(
102 102 SubversionTunnelWrapper, 'sync',
103 103 return_value=0):
104 104 with mock.patch.object(
105 105 SubversionTunnelWrapper, 'command',
106 106 return_value=['date']):
107 107
108 108 exit_code = server.run()
109 109 # SVN has this differently configured, and we get in our mock env
110 110 # None as return code
111 111 assert exit_code == (None, False)
112 112
113 113 def test_run_returns_executes_command_that_cannot_extract_repo_name(self, svn_server):
114 114 server = svn_server.create()
115 115 from rhodecode.apps.ssh_support.lib.backends.svn import SubversionTunnelWrapper
116 116 with mock.patch.object(
117 117 SubversionTunnelWrapper, 'command',
118 118 return_value=['date']):
119 119 with mock.patch.object(
120 120 SubversionTunnelWrapper, 'get_first_client_response',
121 121 return_value=None):
122 122 exit_code = server.run()
123 123
124 124 assert exit_code == (1, False)
@@ -1,245 +1,245 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import datetime
25 25 import hashlib
26 26 import tempfile
27 27 from os.path import join as jn
28 28
29 29 from tempfile import _RandomNameSequence
30 30
31 31 import pytest
32 32
33 33 from rhodecode.model.db import User
34 34 from rhodecode.lib import auth
35 35 from rhodecode.lib import helpers as h
36 36 from rhodecode.lib.helpers import flash, link_to
37 37 from rhodecode.lib.utils2 import safe_str
38 38
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42 __all__ = [
43 43 'get_new_dir', 'TestController',
44 44 'link_to', 'clear_cache_regions',
45 45 'assert_session_flash', 'login_user', 'no_newline_id_generator',
46 46 'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'SVN_REPO',
47 47 'NEW_HG_REPO', 'NEW_GIT_REPO',
48 48 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
49 49 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
50 50 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
51 51 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
52 52 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
53 53 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'SCM_TESTS',
54 54 ]
55 55
56 56
57 57 # SOME GLOBALS FOR TESTS
58 58 TEST_DIR = tempfile.gettempdir()
59 59
60 60 TESTS_TMP_PATH = jn(TEST_DIR, 'rc_test_%s' % _RandomNameSequence().next())
61 61 TEST_USER_ADMIN_LOGIN = 'test_admin'
62 62 TEST_USER_ADMIN_PASS = 'test12'
63 63 TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
64 64
65 65 TEST_USER_REGULAR_LOGIN = 'test_regular'
66 66 TEST_USER_REGULAR_PASS = 'test12'
67 67 TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
68 68
69 69 TEST_USER_REGULAR2_LOGIN = 'test_regular2'
70 70 TEST_USER_REGULAR2_PASS = 'test12'
71 71 TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
72 72
73 73 HG_REPO = 'vcs_test_hg'
74 74 GIT_REPO = 'vcs_test_git'
75 75 SVN_REPO = 'vcs_test_svn'
76 76
77 77 NEW_HG_REPO = 'vcs_test_hg_new'
78 78 NEW_GIT_REPO = 'vcs_test_git_new'
79 79
80 80 HG_FORK = 'vcs_test_hg_fork'
81 81 GIT_FORK = 'vcs_test_git_fork'
82 82
83 83 ## VCS
84 84 SCM_TESTS = ['hg', 'git']
85 85 uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
86 86
87 87 TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
88 88 TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
89 89 TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
90 90
91 91 TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
92 92 TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
93 93 TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
94 94
95 95 TEST_REPO_PREFIX = 'vcs-test'
96 96
97 97
98 98 def clear_cache_regions(regions=None):
99 99 # dogpile
100 100 from rhodecode.lib.rc_cache import region_meta
101 101 for region_name, region in region_meta.dogpile_cache_regions.items():
102 102 if not regions or region_name in regions:
103 103 region.invalidate()
104 104
105 105
106 106 def get_new_dir(title):
107 107 """
108 108 Returns always new directory path.
109 109 """
110 110 from rhodecode.tests.vcs.utils import get_normalized_path
111 111 name_parts = [TEST_REPO_PREFIX]
112 112 if title:
113 113 name_parts.append(title)
114 114 hex_str = hashlib.sha1('%s %s' % (os.getpid(), time.time())).hexdigest()
115 115 name_parts.append(hex_str)
116 116 name = '-'.join(name_parts)
117 117 path = os.path.join(TEST_DIR, name)
118 118 return get_normalized_path(path)
119 119
120 120
121 121 def repo_id_generator(name):
122 122 numeric_hash = 0
123 123 for char in name:
124 124 numeric_hash += (ord(char))
125 125 return numeric_hash
126 126
127 127
128 128 @pytest.mark.usefixtures('app', 'index_location')
129 129 class TestController(object):
130 130
131 131 maxDiff = None
132 132
133 133 def log_user(self, username=TEST_USER_ADMIN_LOGIN,
134 134 password=TEST_USER_ADMIN_PASS):
135 135 self._logged_username = username
136 136 self._session = login_user_session(self.app, username, password)
137 137 self.csrf_token = auth.get_csrf_token(self._session)
138 138
139 139 return self._session['rhodecode_user']
140 140
141 141 def logout_user(self):
142 142 logout_user_session(self.app, auth.get_csrf_token(self._session))
143 143 self.csrf_token = None
144 144 self._logged_username = None
145 145 self._session = None
146 146
147 147 def _get_logged_user(self):
148 148 return User.get_by_username(self._logged_username)
149 149
150 150
151 151 def login_user_session(
152 152 app, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
153 153
154 154 response = app.post(
155 155 h.route_path('login'),
156 156 {'username': username, 'password': password})
157 157 if 'invalid user name' in response.body:
158 158 pytest.fail('could not login using %s %s' % (username, password))
159 159
160 160 assert response.status == '302 Found'
161 161 response = response.follow()
162 162 assert response.status == '200 OK'
163 163
164 164 session = response.get_session_from_response()
165 165 assert 'rhodecode_user' in session
166 166 rc_user = session['rhodecode_user']
167 167 assert rc_user.get('username') == username
168 168 assert rc_user.get('is_authenticated')
169 169
170 170 return session
171 171
172 172
173 173 def logout_user_session(app, csrf_token):
174 174 app.post(h.route_path('logout'), {'csrf_token': csrf_token}, status=302)
175 175
176 176
177 177 def login_user(app, username=TEST_USER_ADMIN_LOGIN,
178 178 password=TEST_USER_ADMIN_PASS):
179 179 return login_user_session(app, username, password)['rhodecode_user']
180 180
181 181
182 182 def assert_session_flash(response, msg=None, category=None, no_=None):
183 183 """
184 184 Assert on a flash message in the current session.
185 185
186 186 :param response: Response from give calll, it will contain flash
187 187 messages or bound session with them.
188 188 :param msg: The expected message. Will be evaluated if a
189 189 :class:`LazyString` is passed in.
190 190 :param category: Optional. If passed, the message category will be
191 191 checked as well.
192 192 :param no_: Optional. If passed, the message will be checked to NOT
193 193 be in the flash session
194 194 """
195 195 if msg is None and no_ is None:
196 196 raise ValueError("Parameter msg or no_ is required.")
197 197
198 198 if msg and no_:
199 199 raise ValueError("Please specify either msg or no_, but not both")
200 200
201 201 session = response.get_session_from_response()
202 202 messages = flash.pop_messages(session=session)
203 203 msg = _eval_if_lazy(msg)
204 204
205 205 if no_:
206 206 error_msg = 'unable to detect no_ message `%s` in empty flash list' % no_
207 207 else:
208 208 error_msg = 'unable to find message `%s` in empty flash list' % msg
209 209 assert messages, error_msg
210 210 message = messages[0]
211 211
212 212 message_text = _eval_if_lazy(message.message) or ''
213 213
214 214 if no_:
215 215 if no_ in message_text:
216 216 msg = u'msg `%s` found in session flash.' % (no_,)
217 217 pytest.fail(safe_str(msg))
218 218 else:
219 219 if msg not in message_text:
220 220 fail_msg = u'msg `%s` not found in session ' \
221 221 u'flash: got `%s` (type:%s) instead' % (
222 222 msg, message_text, type(message_text))
223 223
224 224 pytest.fail(safe_str(fail_msg))
225 225 if category:
226 226 assert category == message.category
227 227
228 228
229 229 def _eval_if_lazy(value):
230 230 return value.eval() if hasattr(value, 'eval') else value
231 231
232 232
233 233 def no_newline_id_generator(test_name):
234 234 """
235 235 Generates a test name without spaces or newlines characters. Used for
236 236 nicer output of progress of test
237 237 """
238 238 org_name = test_name
239 test_name = str(test_name)\
239 test_name = safe_str(test_name)\
240 240 .replace('\n', '_N') \
241 241 .replace('\r', '_N') \
242 242 .replace('\t', '_T') \
243 243 .replace(' ', '_S')
244 244
245 245 return test_name or 'test-with-empty-name'
@@ -1,292 +1,292 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 from subprocess32 import Popen, PIPE
22 22 import os
23 23 import shutil
24 24 import sys
25 25 import tempfile
26 26
27 27 import pytest
28 28 from sqlalchemy.engine import url
29 29
30 30 from rhodecode.tests.fixture import TestINI
31 31
32 32
33 33 def _get_dbs_from_metafunc(metafunc):
34 34 if hasattr(metafunc.function, 'dbs'):
35 35 # Supported backends by this test function, created from
36 36 # pytest.mark.dbs
37 backends = metafunc.function.dbs.args
37 backends = metafunc.definition.get_closest_marker('dbs').args
38 38 else:
39 39 backends = metafunc.config.getoption('--dbs')
40 40 return backends
41 41
42 42
43 43 def pytest_generate_tests(metafunc):
44 44 # Support test generation based on --dbs parameter
45 45 if 'db_backend' in metafunc.fixturenames:
46 46 requested_backends = set(metafunc.config.getoption('--dbs'))
47 47 backends = _get_dbs_from_metafunc(metafunc)
48 48 backends = requested_backends.intersection(backends)
49 49 # TODO: johbo: Disabling a backend did not work out with
50 50 # parametrization, find better way to achieve this.
51 51 if not backends:
52 52 metafunc.function._skip = True
53 53 metafunc.parametrize('db_backend_name', backends)
54 54
55 55
56 56 def pytest_collection_modifyitems(session, config, items):
57 57 remaining = [
58 58 i for i in items if not getattr(i.obj, '_skip', False)]
59 59 items[:] = remaining
60 60
61 61
62 62 @pytest.fixture
63 63 def db_backend(
64 64 request, db_backend_name, ini_config, tmpdir_factory):
65 65 basetemp = tmpdir_factory.getbasetemp().strpath
66 66 klass = _get_backend(db_backend_name)
67 67
68 68 option_name = '--{}-connection-string'.format(db_backend_name)
69 69 connection_string = request.config.getoption(option_name) or None
70 70
71 71 return klass(
72 72 config_file=ini_config, basetemp=basetemp,
73 73 connection_string=connection_string)
74 74
75 75
76 76 def _get_backend(backend_type):
77 77 return {
78 78 'sqlite': SQLiteDBBackend,
79 79 'postgres': PostgresDBBackend,
80 80 'mysql': MySQLDBBackend,
81 81 '': EmptyDBBackend
82 82 }[backend_type]
83 83
84 84
85 85 class DBBackend(object):
86 86 _store = os.path.dirname(os.path.abspath(__file__))
87 87 _type = None
88 88 _base_ini_config = [{'app:main': {'vcs.start_server': 'false',
89 89 'startup.import_repos': 'false',
90 90 'is_test': 'False'}}]
91 91 _db_url = [{'app:main': {'sqlalchemy.db1.url': ''}}]
92 92 _base_db_name = 'rhodecode_test_db_backend'
93 93
94 94 def __init__(
95 95 self, config_file, db_name=None, basetemp=None,
96 96 connection_string=None):
97 97
98 98 from rhodecode.lib.vcs.backends.hg import largefiles_store
99 99 from rhodecode.lib.vcs.backends.git import lfs_store
100 100
101 101 self.fixture_store = os.path.join(self._store, self._type)
102 102 self.db_name = db_name or self._base_db_name
103 103 self._base_ini_file = config_file
104 104 self.stderr = ''
105 105 self.stdout = ''
106 106 self._basetemp = basetemp or tempfile.gettempdir()
107 107 self._repos_location = os.path.join(self._basetemp, 'rc_test_repos')
108 108 self._repos_hg_largefiles_store = largefiles_store(self._basetemp)
109 109 self._repos_git_lfs_store = lfs_store(self._basetemp)
110 110 self.connection_string = connection_string
111 111
112 112 @property
113 113 def connection_string(self):
114 114 return self._connection_string
115 115
116 116 @connection_string.setter
117 117 def connection_string(self, new_connection_string):
118 118 if not new_connection_string:
119 119 new_connection_string = self.get_default_connection_string()
120 120 else:
121 121 new_connection_string = new_connection_string.format(
122 122 db_name=self.db_name)
123 123 url_parts = url.make_url(new_connection_string)
124 124 self._connection_string = new_connection_string
125 125 self.user = url_parts.username
126 126 self.password = url_parts.password
127 127 self.host = url_parts.host
128 128
129 129 def get_default_connection_string(self):
130 130 raise NotImplementedError('default connection_string is required.')
131 131
132 132 def execute(self, cmd, env=None, *args):
133 133 """
134 134 Runs command on the system with given ``args``.
135 135 """
136 136
137 137 command = cmd + ' ' + ' '.join(args)
138 138 sys.stdout.write(command)
139 139
140 140 # Tell Python to use UTF-8 encoding out stdout
141 141 _env = os.environ.copy()
142 142 _env['PYTHONIOENCODING'] = 'UTF-8'
143 143 if env:
144 144 _env.update(env)
145 145 self.p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, env=_env)
146 146 self.stdout, self.stderr = self.p.communicate()
147 147 sys.stdout.write('COMMAND:'+command+'\n')
148 148 sys.stdout.write(self.stdout)
149 149 return self.stdout, self.stderr
150 150
151 151 def assert_returncode_success(self):
152 152 if not self.p.returncode == 0:
153 153 print(self.stderr)
154 154 raise AssertionError('non 0 retcode:{}'.format(self.p.returncode))
155 155
156 156 def assert_correct_output(self, stdout, version):
157 157 assert 'UPGRADE FOR STEP {} COMPLETED'.format(version) in stdout
158 158
159 159 def setup_rhodecode_db(self, ini_params=None, env=None):
160 160 if not ini_params:
161 161 ini_params = self._base_ini_config
162 162
163 163 ini_params.extend(self._db_url)
164 164 with TestINI(self._base_ini_file, ini_params,
165 165 self._type, destroy=True) as _ini_file:
166 166
167 167 if not os.path.isdir(self._repos_location):
168 168 os.makedirs(self._repos_location)
169 169 if not os.path.isdir(self._repos_hg_largefiles_store):
170 170 os.makedirs(self._repos_hg_largefiles_store)
171 171 if not os.path.isdir(self._repos_git_lfs_store):
172 172 os.makedirs(self._repos_git_lfs_store)
173 173
174 174 return self.execute(
175 175 "rc-setup-app {0} --user=marcink "
176 176 "--email=marcin@rhodeocode.com --password={1} "
177 177 "--repos={2} --force-yes".format(
178 178 _ini_file, 'qweqwe', self._repos_location), env=env)
179 179
180 180 def upgrade_database(self, ini_params=None):
181 181 if not ini_params:
182 182 ini_params = self._base_ini_config
183 183 ini_params.extend(self._db_url)
184 184
185 185 test_ini = TestINI(
186 186 self._base_ini_file, ini_params, self._type, destroy=True)
187 187 with test_ini as ini_file:
188 188 if not os.path.isdir(self._repos_location):
189 189 os.makedirs(self._repos_location)
190 190
191 191 return self.execute(
192 192 "rc-upgrade-db {0} --force-yes".format(ini_file))
193 193
194 194 def setup_db(self):
195 195 raise NotImplementedError
196 196
197 197 def teardown_db(self):
198 198 raise NotImplementedError
199 199
200 200 def import_dump(self, dumpname):
201 201 raise NotImplementedError
202 202
203 203
204 204 class EmptyDBBackend(DBBackend):
205 205 _type = ''
206 206
207 207 def setup_db(self):
208 208 pass
209 209
210 210 def teardown_db(self):
211 211 pass
212 212
213 213 def import_dump(self, dumpname):
214 214 pass
215 215
216 216 def assert_returncode_success(self):
217 217 assert True
218 218
219 219
220 220 class SQLiteDBBackend(DBBackend):
221 221 _type = 'sqlite'
222 222
223 223 def get_default_connection_string(self):
224 224 return 'sqlite:///{}/{}.sqlite'.format(self._basetemp, self.db_name)
225 225
226 226 def setup_db(self):
227 227 # dump schema for tests
228 228 # cp -v $TEST_DB_NAME
229 229 self._db_url = [{'app:main': {
230 230 'sqlalchemy.db1.url': self.connection_string}}]
231 231
232 232 def import_dump(self, dumpname):
233 233 dump = os.path.join(self.fixture_store, dumpname)
234 234 target = os.path.join(self._basetemp, '{0.db_name}.sqlite'.format(self))
235 235 return self.execute('cp -v {} {}'.format(dump, target))
236 236
237 237 def teardown_db(self):
238 238 return self.execute("rm -rf {}.sqlite".format(
239 239 os.path.join(self._basetemp, self.db_name)))
240 240
241 241
242 242 class MySQLDBBackend(DBBackend):
243 243 _type = 'mysql'
244 244
245 245 def get_default_connection_string(self):
246 246 return 'mysql://root:qweqwe@127.0.0.1/{}'.format(self.db_name)
247 247
248 248 def setup_db(self):
249 249 # dump schema for tests
250 250 # mysqldump -uroot -pqweqwe $TEST_DB_NAME
251 251 self._db_url = [{'app:main': {
252 252 'sqlalchemy.db1.url': self.connection_string}}]
253 253 return self.execute("mysql -v -u{} -p{} -e 'create database '{}';'".format(
254 254 self.user, self.password, self.db_name))
255 255
256 256 def import_dump(self, dumpname):
257 257 dump = os.path.join(self.fixture_store, dumpname)
258 258 return self.execute("mysql -u{} -p{} {} < {}".format(
259 259 self.user, self.password, self.db_name, dump))
260 260
261 261 def teardown_db(self):
262 262 return self.execute("mysql -v -u{} -p{} -e 'drop database '{}';'".format(
263 263 self.user, self.password, self.db_name))
264 264
265 265
266 266 class PostgresDBBackend(DBBackend):
267 267 _type = 'postgres'
268 268
269 269 def get_default_connection_string(self):
270 270 return 'postgresql://postgres:qweqwe@localhost/{}'.format(self.db_name)
271 271
272 272 def setup_db(self):
273 273 # dump schema for tests
274 274 # pg_dump -U postgres -h localhost $TEST_DB_NAME
275 275 self._db_url = [{'app:main': {
276 276 'sqlalchemy.db1.url':
277 277 self.connection_string}}]
278 278 return self.execute("PGPASSWORD={} psql -U {} -h localhost "
279 279 "-c 'create database '{}';'".format(
280 280 self.password, self.user, self.db_name))
281 281
282 282 def teardown_db(self):
283 283 return self.execute("PGPASSWORD={} psql -U {} -h localhost "
284 284 "-c 'drop database if exists '{}';'".format(
285 285 self.password, self.user, self.db_name))
286 286
287 287 def import_dump(self, dumpname):
288 288 dump = os.path.join(self.fixture_store, dumpname)
289 289 return self.execute(
290 290 "PGPASSWORD={} psql -U {} -h localhost -d {} -1 "
291 291 "-f {}".format(
292 292 self.password, self.user, self.db_name, dump))
@@ -1,1867 +1,1886 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import collections
22 22 import datetime
23 23 import hashlib
24 24 import os
25 25 import re
26 26 import pprint
27 27 import shutil
28 28 import socket
29 29 import subprocess32
30 30 import time
31 31 import uuid
32 32 import dateutil.tz
33 33 import functools
34 34
35 35 import mock
36 36 import pyramid.testing
37 37 import pytest
38 38 import colander
39 39 import requests
40 40 import pyramid.paster
41 41
42 42 import rhodecode
43 43 from rhodecode.lib.utils2 import AttributeDict
44 44 from rhodecode.model.changeset_status import ChangesetStatusModel
45 45 from rhodecode.model.comment import CommentsModel
46 46 from rhodecode.model.db import (
47 47 PullRequest, Repository, RhodeCodeSetting, ChangesetStatus, RepoGroup,
48 48 UserGroup, RepoRhodeCodeUi, RepoRhodeCodeSetting, RhodeCodeUi)
49 49 from rhodecode.model.meta import Session
50 50 from rhodecode.model.pull_request import PullRequestModel
51 51 from rhodecode.model.repo import RepoModel
52 52 from rhodecode.model.repo_group import RepoGroupModel
53 53 from rhodecode.model.user import UserModel
54 54 from rhodecode.model.settings import VcsSettingsModel
55 55 from rhodecode.model.user_group import UserGroupModel
56 56 from rhodecode.model.integration import IntegrationModel
57 57 from rhodecode.integrations import integration_type_registry
58 58 from rhodecode.integrations.types.base import IntegrationTypeBase
59 59 from rhodecode.lib.utils import repo2db_mapper
60 60 from rhodecode.lib.vcs import create_vcsserver_proxy
61 61 from rhodecode.lib.vcs.backends import get_backend
62 62 from rhodecode.lib.vcs.nodes import FileNode
63 63 from rhodecode.tests import (
64 64 login_user_session, get_new_dir, utils, TESTS_TMP_PATH,
65 65 TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR2_LOGIN,
66 66 TEST_USER_REGULAR_PASS)
67 67 from rhodecode.tests.utils import CustomTestApp, set_anonymous_access
68 68 from rhodecode.tests.fixture import Fixture
69 69 from rhodecode.config import utils as config_utils
70 70
71 71 def _split_comma(value):
72 72 return value.split(',')
73 73
74 74
75 75 def pytest_addoption(parser):
76 76 parser.addoption(
77 77 '--keep-tmp-path', action='store_true',
78 78 help="Keep the test temporary directories")
79 79 parser.addoption(
80 80 '--backends', action='store', type=_split_comma,
81 81 default=['git', 'hg', 'svn'],
82 82 help="Select which backends to test for backend specific tests.")
83 83 parser.addoption(
84 84 '--dbs', action='store', type=_split_comma,
85 85 default=['sqlite'],
86 86 help="Select which database to test for database specific tests. "
87 87 "Possible options are sqlite,postgres,mysql")
88 88 parser.addoption(
89 89 '--appenlight', '--ae', action='store_true',
90 90 help="Track statistics in appenlight.")
91 91 parser.addoption(
92 92 '--appenlight-api-key', '--ae-key',
93 93 help="API key for Appenlight.")
94 94 parser.addoption(
95 95 '--appenlight-url', '--ae-url',
96 96 default="https://ae.rhodecode.com",
97 97 help="Appenlight service URL, defaults to https://ae.rhodecode.com")
98 98 parser.addoption(
99 99 '--sqlite-connection-string', action='store',
100 100 default='', help="Connection string for the dbs tests with SQLite")
101 101 parser.addoption(
102 102 '--postgres-connection-string', action='store',
103 103 default='', help="Connection string for the dbs tests with Postgres")
104 104 parser.addoption(
105 105 '--mysql-connection-string', action='store',
106 106 default='', help="Connection string for the dbs tests with MySQL")
107 107 parser.addoption(
108 108 '--repeat', type=int, default=100,
109 109 help="Number of repetitions in performance tests.")
110 110
111 111
112 112 def pytest_configure(config):
113 113 from rhodecode.config import patches
114 114
115 115
116 116 def pytest_collection_modifyitems(session, config, items):
117 117 # nottest marked, compare nose, used for transition from nose to pytest
118 118 remaining = [
119 119 i for i in items if getattr(i.obj, '__test__', True)]
120 120 items[:] = remaining
121 121
122 122
123 123 def pytest_generate_tests(metafunc):
124 124 # Support test generation based on --backend parameter
125 125 if 'backend_alias' in metafunc.fixturenames:
126 126 backends = get_backends_from_metafunc(metafunc)
127 127 scope = None
128 128 if not backends:
129 129 pytest.skip("Not enabled for any of selected backends")
130 130 metafunc.parametrize('backend_alias', backends, scope=scope)
131 131 elif hasattr(metafunc.function, 'backends'):
132 132 backends = get_backends_from_metafunc(metafunc)
133 133 if not backends:
134 134 pytest.skip("Not enabled for any of selected backends")
135 135
136 136
137 137 def get_backends_from_metafunc(metafunc):
138 138 requested_backends = set(metafunc.config.getoption('--backends'))
139 139 if hasattr(metafunc.function, 'backends'):
140 140 # Supported backends by this test function, created from
141 141 # pytest.mark.backends
142 backends = metafunc.function.backends.args
142 backends = metafunc.definition.get_closest_marker('backends').args
143 143 elif hasattr(metafunc.cls, 'backend_alias'):
144 144 # Support class attribute "backend_alias", this is mainly
145 145 # for legacy reasons for tests not yet using pytest.mark.backends
146 146 backends = [metafunc.cls.backend_alias]
147 147 else:
148 148 backends = metafunc.config.getoption('--backends')
149 149 return requested_backends.intersection(backends)
150 150
151 151
152 152 @pytest.fixture(scope='session', autouse=True)
153 153 def activate_example_rcextensions(request):
154 154 """
155 155 Patch in an example rcextensions module which verifies passed in kwargs.
156 156 """
157 157 from rhodecode.tests.other import example_rcextensions
158 158
159 159 old_extensions = rhodecode.EXTENSIONS
160 160 rhodecode.EXTENSIONS = example_rcextensions
161 161
162 162 @request.addfinalizer
163 163 def cleanup():
164 164 rhodecode.EXTENSIONS = old_extensions
165 165
166 166
167 167 @pytest.fixture
168 168 def capture_rcextensions():
169 169 """
170 170 Returns the recorded calls to entry points in rcextensions.
171 171 """
172 172 calls = rhodecode.EXTENSIONS.calls
173 173 calls.clear()
174 174 # Note: At this moment, it is still the empty dict, but that will
175 175 # be filled during the test run and since it is a reference this
176 176 # is enough to make it work.
177 177 return calls
178 178
179 179
180 180 @pytest.fixture(scope='session')
181 181 def http_environ_session():
182 182 """
183 183 Allow to use "http_environ" in session scope.
184 184 """
185 return http_environ(
186 http_host_stub=http_host_stub())
185 return plain_http_environ()
186
187
188 def plain_http_host_stub():
189 """
190 Value of HTTP_HOST in the test run.
191 """
192 return 'example.com:80'
187 193
188 194
189 195 @pytest.fixture
190 196 def http_host_stub():
191 197 """
192 198 Value of HTTP_HOST in the test run.
193 199 """
194 return 'example.com:80'
200 return plain_http_host_stub()
201
202
203 def plain_http_host_only_stub():
204 """
205 Value of HTTP_HOST in the test run.
206 """
207 return plain_http_host_stub().split(':')[0]
195 208
196 209
197 210 @pytest.fixture
198 211 def http_host_only_stub():
199 212 """
200 213 Value of HTTP_HOST in the test run.
201 214 """
202 return http_host_stub().split(':')[0]
215 return plain_http_host_only_stub()
203 216
204 217
205 @pytest.fixture
206 def http_environ(http_host_stub):
218 def plain_http_environ():
207 219 """
208 220 HTTP extra environ keys.
209 221
210 222 User by the test application and as well for setting up the pylons
211 223 environment. In the case of the fixture "app" it should be possible
212 224 to override this for a specific test case.
213 225 """
214 226 return {
215 'SERVER_NAME': http_host_only_stub(),
216 'SERVER_PORT': http_host_stub.split(':')[1],
217 'HTTP_HOST': http_host_stub,
227 'SERVER_NAME': plain_http_host_only_stub(),
228 'SERVER_PORT': plain_http_host_stub().split(':')[1],
229 'HTTP_HOST': plain_http_host_stub(),
218 230 'HTTP_USER_AGENT': 'rc-test-agent',
219 231 'REQUEST_METHOD': 'GET'
220 232 }
221 233
222 234
235 @pytest.fixture
236 def http_environ():
237 """
238 HTTP extra environ keys.
239
240 User by the test application and as well for setting up the pylons
241 environment. In the case of the fixture "app" it should be possible
242 to override this for a specific test case.
243 """
244 return plain_http_environ()
245
246
223 247 @pytest.fixture(scope='session')
224 248 def baseapp(ini_config, vcsserver, http_environ_session):
225 249 from rhodecode.lib.pyramid_utils import get_app_config
226 250 from rhodecode.config.middleware import make_pyramid_app
227 251
228 252 print("Using the RhodeCode configuration:{}".format(ini_config))
229 253 pyramid.paster.setup_logging(ini_config)
230 254
231 255 settings = get_app_config(ini_config)
232 256 app = make_pyramid_app({'__file__': ini_config}, **settings)
233 257
234 258 return app
235 259
236 260
237 261 @pytest.fixture(scope='function')
238 262 def app(request, config_stub, baseapp, http_environ):
239 263 app = CustomTestApp(
240 264 baseapp,
241 265 extra_environ=http_environ)
242 266 if request.cls:
243 267 request.cls.app = app
244 268 return app
245 269
246 270
247 271 @pytest.fixture(scope='session')
248 272 def app_settings(baseapp, ini_config):
249 273 """
250 274 Settings dictionary used to create the app.
251 275
252 276 Parses the ini file and passes the result through the sanitize and apply
253 277 defaults mechanism in `rhodecode.config.middleware`.
254 278 """
255 279 return baseapp.config.get_settings()
256 280
257 281
258 282 @pytest.fixture(scope='session')
259 283 def db_connection(ini_settings):
260 284 # Initialize the database connection.
261 285 config_utils.initialize_database(ini_settings)
262 286
263 287
264 288 LoginData = collections.namedtuple('LoginData', ('csrf_token', 'user'))
265 289
266 290
267 291 def _autologin_user(app, *args):
268 292 session = login_user_session(app, *args)
269 293 csrf_token = rhodecode.lib.auth.get_csrf_token(session)
270 294 return LoginData(csrf_token, session['rhodecode_user'])
271 295
272 296
273 297 @pytest.fixture
274 298 def autologin_user(app):
275 299 """
276 300 Utility fixture which makes sure that the admin user is logged in
277 301 """
278 302 return _autologin_user(app)
279 303
280 304
281 305 @pytest.fixture
282 306 def autologin_regular_user(app):
283 307 """
284 308 Utility fixture which makes sure that the regular user is logged in
285 309 """
286 310 return _autologin_user(
287 311 app, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
288 312
289 313
290 314 @pytest.fixture(scope='function')
291 315 def csrf_token(request, autologin_user):
292 316 return autologin_user.csrf_token
293 317
294 318
295 319 @pytest.fixture(scope='function')
296 320 def xhr_header(request):
297 321 return {'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'}
298 322
299 323
300 324 @pytest.fixture
301 325 def real_crypto_backend(monkeypatch):
302 326 """
303 327 Switch the production crypto backend on for this test.
304 328
305 329 During the test run the crypto backend is replaced with a faster
306 330 implementation based on the MD5 algorithm.
307 331 """
308 332 monkeypatch.setattr(rhodecode, 'is_test', False)
309 333
310 334
311 335 @pytest.fixture(scope='class')
312 336 def index_location(request, baseapp):
313 337 index_location = baseapp.config.get_settings()['search.location']
314 338 if request.cls:
315 339 request.cls.index_location = index_location
316 340 return index_location
317 341
318 342
319 343 @pytest.fixture(scope='session', autouse=True)
320 344 def tests_tmp_path(request):
321 345 """
322 346 Create temporary directory to be used during the test session.
323 347 """
324 348 if not os.path.exists(TESTS_TMP_PATH):
325 349 os.makedirs(TESTS_TMP_PATH)
326 350
327 351 if not request.config.getoption('--keep-tmp-path'):
328 352 @request.addfinalizer
329 353 def remove_tmp_path():
330 354 shutil.rmtree(TESTS_TMP_PATH)
331 355
332 356 return TESTS_TMP_PATH
333 357
334 358
335 359 @pytest.fixture
336 360 def test_repo_group(request):
337 361 """
338 362 Create a temporary repository group, and destroy it after
339 363 usage automatically
340 364 """
341 365 fixture = Fixture()
342 366 repogroupid = 'test_repo_group_%s' % str(time.time()).replace('.', '')
343 367 repo_group = fixture.create_repo_group(repogroupid)
344 368
345 369 def _cleanup():
346 370 fixture.destroy_repo_group(repogroupid)
347 371
348 372 request.addfinalizer(_cleanup)
349 373 return repo_group
350 374
351 375
352 376 @pytest.fixture
353 377 def test_user_group(request):
354 378 """
355 379 Create a temporary user group, and destroy it after
356 380 usage automatically
357 381 """
358 382 fixture = Fixture()
359 383 usergroupid = 'test_user_group_%s' % str(time.time()).replace('.', '')
360 384 user_group = fixture.create_user_group(usergroupid)
361 385
362 386 def _cleanup():
363 387 fixture.destroy_user_group(user_group)
364 388
365 389 request.addfinalizer(_cleanup)
366 390 return user_group
367 391
368 392
369 393 @pytest.fixture(scope='session')
370 394 def test_repo(request):
371 395 container = TestRepoContainer()
372 396 request.addfinalizer(container._cleanup)
373 397 return container
374 398
375 399
376 400 class TestRepoContainer(object):
377 401 """
378 402 Container for test repositories which are used read only.
379 403
380 404 Repositories will be created on demand and re-used during the lifetime
381 405 of this object.
382 406
383 407 Usage to get the svn test repository "minimal"::
384 408
385 409 test_repo = TestContainer()
386 410 repo = test_repo('minimal', 'svn')
387 411
388 412 """
389 413
390 414 dump_extractors = {
391 415 'git': utils.extract_git_repo_from_dump,
392 416 'hg': utils.extract_hg_repo_from_dump,
393 417 'svn': utils.extract_svn_repo_from_dump,
394 418 }
395 419
396 420 def __init__(self):
397 421 self._cleanup_repos = []
398 422 self._fixture = Fixture()
399 423 self._repos = {}
400 424
401 425 def __call__(self, dump_name, backend_alias, config=None):
402 426 key = (dump_name, backend_alias)
403 427 if key not in self._repos:
404 428 repo = self._create_repo(dump_name, backend_alias, config)
405 429 self._repos[key] = repo.repo_id
406 430 return Repository.get(self._repos[key])
407 431
408 432 def _create_repo(self, dump_name, backend_alias, config):
409 433 repo_name = '%s-%s' % (backend_alias, dump_name)
410 434 backend_class = get_backend(backend_alias)
411 435 dump_extractor = self.dump_extractors[backend_alias]
412 436 repo_path = dump_extractor(dump_name, repo_name)
413 437
414 438 vcs_repo = backend_class(repo_path, config=config)
415 439 repo2db_mapper({repo_name: vcs_repo})
416 440
417 441 repo = RepoModel().get_by_repo_name(repo_name)
418 442 self._cleanup_repos.append(repo_name)
419 443 return repo
420 444
421 445 def _cleanup(self):
422 446 for repo_name in reversed(self._cleanup_repos):
423 447 self._fixture.destroy_repo(repo_name)
424 448
425 449
426 @pytest.fixture
427 def backend(request, backend_alias, baseapp, test_repo):
428 """
429 Parametrized fixture which represents a single backend implementation.
430
431 It respects the option `--backends` to focus the test run on specific
432 backend implementations.
433
434 It also supports `pytest.mark.xfail_backends` to mark tests as failing
435 for specific backends. This is intended as a utility for incremental
436 development of a new backend implementation.
437 """
450 def backend_base(request, backend_alias, baseapp, test_repo):
438 451 if backend_alias not in request.config.getoption('--backends'):
439 452 pytest.skip("Backend %s not selected." % (backend_alias, ))
440 453
441 454 utils.check_xfail_backends(request.node, backend_alias)
442 455 utils.check_skip_backends(request.node, backend_alias)
443 456
444 457 repo_name = 'vcs_test_%s' % (backend_alias, )
445 458 backend = Backend(
446 459 alias=backend_alias,
447 460 repo_name=repo_name,
448 461 test_name=request.node.name,
449 462 test_repo_container=test_repo)
450 463 request.addfinalizer(backend.cleanup)
451 464 return backend
452 465
453 466
454 467 @pytest.fixture
468 def backend(request, backend_alias, baseapp, test_repo):
469 """
470 Parametrized fixture which represents a single backend implementation.
471
472 It respects the option `--backends` to focus the test run on specific
473 backend implementations.
474
475 It also supports `pytest.mark.xfail_backends` to mark tests as failing
476 for specific backends. This is intended as a utility for incremental
477 development of a new backend implementation.
478 """
479 return backend_base(request, backend_alias, baseapp, test_repo)
480
481
482 @pytest.fixture
455 483 def backend_git(request, baseapp, test_repo):
456 return backend(request, 'git', baseapp, test_repo)
484 return backend_base(request, 'git', baseapp, test_repo)
457 485
458 486
459 487 @pytest.fixture
460 488 def backend_hg(request, baseapp, test_repo):
461 return backend(request, 'hg', baseapp, test_repo)
489 return backend_base(request, 'hg', baseapp, test_repo)
462 490
463 491
464 492 @pytest.fixture
465 493 def backend_svn(request, baseapp, test_repo):
466 return backend(request, 'svn', baseapp, test_repo)
494 return backend_base(request, 'svn', baseapp, test_repo)
467 495
468 496
469 497 @pytest.fixture
470 498 def backend_random(backend_git):
471 499 """
472 500 Use this to express that your tests need "a backend.
473 501
474 502 A few of our tests need a backend, so that we can run the code. This
475 503 fixture is intended to be used for such cases. It will pick one of the
476 504 backends and run the tests.
477 505
478 506 The fixture `backend` would run the test multiple times for each
479 507 available backend which is a pure waste of time if the test is
480 508 independent of the backend type.
481 509 """
482 510 # TODO: johbo: Change this to pick a random backend
483 511 return backend_git
484 512
485 513
486 514 @pytest.fixture
487 515 def backend_stub(backend_git):
488 516 """
489 517 Use this to express that your tests need a backend stub
490 518
491 519 TODO: mikhail: Implement a real stub logic instead of returning
492 520 a git backend
493 521 """
494 522 return backend_git
495 523
496 524
497 525 @pytest.fixture
498 526 def repo_stub(backend_stub):
499 527 """
500 528 Use this to express that your tests need a repository stub
501 529 """
502 530 return backend_stub.create_repo()
503 531
504 532
505 533 class Backend(object):
506 534 """
507 535 Represents the test configuration for one supported backend
508 536
509 537 Provides easy access to different test repositories based on
510 538 `__getitem__`. Such repositories will only be created once per test
511 539 session.
512 540 """
513 541
514 542 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
515 543 _master_repo = None
516 544 _commit_ids = {}
517 545
518 546 def __init__(self, alias, repo_name, test_name, test_repo_container):
519 547 self.alias = alias
520 548 self.repo_name = repo_name
521 549 self._cleanup_repos = []
522 550 self._test_name = test_name
523 551 self._test_repo_container = test_repo_container
524 552 # TODO: johbo: Used as a delegate interim. Not yet sure if Backend or
525 553 # Fixture will survive in the end.
526 554 self._fixture = Fixture()
527 555
528 556 def __getitem__(self, key):
529 557 return self._test_repo_container(key, self.alias)
530 558
531 559 def create_test_repo(self, key, config=None):
532 560 return self._test_repo_container(key, self.alias, config)
533 561
534 562 @property
535 563 def repo(self):
536 564 """
537 565 Returns the "current" repository. This is the vcs_test repo or the
538 566 last repo which has been created with `create_repo`.
539 567 """
540 568 from rhodecode.model.db import Repository
541 569 return Repository.get_by_repo_name(self.repo_name)
542 570
543 571 @property
544 572 def default_branch_name(self):
545 573 VcsRepository = get_backend(self.alias)
546 574 return VcsRepository.DEFAULT_BRANCH_NAME
547 575
548 576 @property
549 577 def default_head_id(self):
550 578 """
551 579 Returns the default head id of the underlying backend.
552 580
553 581 This will be the default branch name in case the backend does have a
554 582 default branch. In the other cases it will point to a valid head
555 583 which can serve as the base to create a new commit on top of it.
556 584 """
557 585 vcsrepo = self.repo.scm_instance()
558 586 head_id = (
559 587 vcsrepo.DEFAULT_BRANCH_NAME or
560 588 vcsrepo.commit_ids[-1])
561 589 return head_id
562 590
563 591 @property
564 592 def commit_ids(self):
565 593 """
566 594 Returns the list of commits for the last created repository
567 595 """
568 596 return self._commit_ids
569 597
570 598 def create_master_repo(self, commits):
571 599 """
572 600 Create a repository and remember it as a template.
573 601
574 602 This allows to easily create derived repositories to construct
575 603 more complex scenarios for diff, compare and pull requests.
576 604
577 605 Returns a commit map which maps from commit message to raw_id.
578 606 """
579 607 self._master_repo = self.create_repo(commits=commits)
580 608 return self._commit_ids
581 609
582 610 def create_repo(
583 611 self, commits=None, number_of_commits=0, heads=None,
584 612 name_suffix=u'', bare=False, **kwargs):
585 613 """
586 614 Create a repository and record it for later cleanup.
587 615
588 616 :param commits: Optional. A sequence of dict instances.
589 617 Will add a commit per entry to the new repository.
590 618 :param number_of_commits: Optional. If set to a number, this number of
591 619 commits will be added to the new repository.
592 620 :param heads: Optional. Can be set to a sequence of of commit
593 621 names which shall be pulled in from the master repository.
594 622 :param name_suffix: adds special suffix to generated repo name
595 623 :param bare: set a repo as bare (no checkout)
596 624 """
597 625 self.repo_name = self._next_repo_name() + name_suffix
598 626 repo = self._fixture.create_repo(
599 627 self.repo_name, repo_type=self.alias, bare=bare, **kwargs)
600 628 self._cleanup_repos.append(repo.repo_name)
601 629
602 630 commits = commits or [
603 631 {'message': 'Commit %s of %s' % (x, self.repo_name)}
604 632 for x in range(number_of_commits)]
605 633 self._add_commits_to_repo(repo.scm_instance(), commits)
606 634 if heads:
607 635 self.pull_heads(repo, heads)
608 636
609 637 return repo
610 638
611 639 def pull_heads(self, repo, heads):
612 640 """
613 641 Make sure that repo contains all commits mentioned in `heads`
614 642 """
615 643 vcsmaster = self._master_repo.scm_instance()
616 644 vcsrepo = repo.scm_instance()
617 645 vcsrepo.config.clear_section('hooks')
618 646 commit_ids = [self._commit_ids[h] for h in heads]
619 647 vcsrepo.pull(vcsmaster.path, commit_ids=commit_ids)
620 648
621 649 def create_fork(self):
622 650 repo_to_fork = self.repo_name
623 651 self.repo_name = self._next_repo_name()
624 652 repo = self._fixture.create_fork(repo_to_fork, self.repo_name)
625 653 self._cleanup_repos.append(self.repo_name)
626 654 return repo
627 655
628 656 def new_repo_name(self, suffix=u''):
629 657 self.repo_name = self._next_repo_name() + suffix
630 658 self._cleanup_repos.append(self.repo_name)
631 659 return self.repo_name
632 660
633 661 def _next_repo_name(self):
634 662 return u"%s_%s" % (
635 663 self.invalid_repo_name.sub(u'_', self._test_name),
636 664 len(self._cleanup_repos))
637 665
638 666 def ensure_file(self, filename, content='Test content\n'):
639 667 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
640 668 commits = [
641 669 {'added': [
642 670 FileNode(filename, content=content),
643 671 ]},
644 672 ]
645 673 self._add_commits_to_repo(self.repo.scm_instance(), commits)
646 674
647 675 def enable_downloads(self):
648 676 repo = self.repo
649 677 repo.enable_downloads = True
650 678 Session().add(repo)
651 679 Session().commit()
652 680
653 681 def cleanup(self):
654 682 for repo_name in reversed(self._cleanup_repos):
655 683 self._fixture.destroy_repo(repo_name)
656 684
657 685 def _add_commits_to_repo(self, repo, commits):
658 686 commit_ids = _add_commits_to_repo(repo, commits)
659 687 if not commit_ids:
660 688 return
661 689 self._commit_ids = commit_ids
662 690
663 691 # Creating refs for Git to allow fetching them from remote repository
664 692 if self.alias == 'git':
665 693 refs = {}
666 694 for message in self._commit_ids:
667 695 # TODO: mikhail: do more special chars replacements
668 696 ref_name = 'refs/test-refs/{}'.format(
669 697 message.replace(' ', ''))
670 698 refs[ref_name] = self._commit_ids[message]
671 699 self._create_refs(repo, refs)
672 700
673 701 def _create_refs(self, repo, refs):
674 702 for ref_name in refs:
675 703 repo.set_refs(ref_name, refs[ref_name])
676 704
677 705
678 @pytest.fixture
679 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
680 """
681 Parametrized fixture which represents a single vcs backend implementation.
682
683 See the fixture `backend` for more details. This one implements the same
684 concept, but on vcs level. So it does not provide model instances etc.
685
686 Parameters are generated dynamically, see :func:`pytest_generate_tests`
687 for how this works.
688 """
706 def vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo):
689 707 if backend_alias not in request.config.getoption('--backends'):
690 708 pytest.skip("Backend %s not selected." % (backend_alias, ))
691 709
692 710 utils.check_xfail_backends(request.node, backend_alias)
693 711 utils.check_skip_backends(request.node, backend_alias)
694 712
695 713 repo_name = 'vcs_test_%s' % (backend_alias, )
696 714 repo_path = os.path.join(tests_tmp_path, repo_name)
697 715 backend = VcsBackend(
698 716 alias=backend_alias,
699 717 repo_path=repo_path,
700 718 test_name=request.node.name,
701 719 test_repo_container=test_repo)
702 720 request.addfinalizer(backend.cleanup)
703 721 return backend
704 722
705 723
706 724 @pytest.fixture
725 def vcsbackend(request, backend_alias, tests_tmp_path, baseapp, test_repo):
726 """
727 Parametrized fixture which represents a single vcs backend implementation.
728
729 See the fixture `backend` for more details. This one implements the same
730 concept, but on vcs level. So it does not provide model instances etc.
731
732 Parameters are generated dynamically, see :func:`pytest_generate_tests`
733 for how this works.
734 """
735 return vcsbackend_base(request, backend_alias, tests_tmp_path, baseapp, test_repo)
736
737
738 @pytest.fixture
707 739 def vcsbackend_git(request, tests_tmp_path, baseapp, test_repo):
708 return vcsbackend(request, 'git', tests_tmp_path, baseapp, test_repo)
740 return vcsbackend_base(request, 'git', tests_tmp_path, baseapp, test_repo)
709 741
710 742
711 743 @pytest.fixture
712 744 def vcsbackend_hg(request, tests_tmp_path, baseapp, test_repo):
713 return vcsbackend(request, 'hg', tests_tmp_path, baseapp, test_repo)
745 return vcsbackend_base(request, 'hg', tests_tmp_path, baseapp, test_repo)
714 746
715 747
716 748 @pytest.fixture
717 749 def vcsbackend_svn(request, tests_tmp_path, baseapp, test_repo):
718 return vcsbackend(request, 'svn', tests_tmp_path, baseapp, test_repo)
719
720
721 @pytest.fixture
722 def vcsbackend_random(vcsbackend_git):
723 """
724 Use this to express that your tests need "a vcsbackend".
725
726 The fixture `vcsbackend` would run the test multiple times for each
727 available vcs backend which is a pure waste of time if the test is
728 independent of the vcs backend type.
729 """
730 # TODO: johbo: Change this to pick a random backend
731 return vcsbackend_git
750 return vcsbackend_base(request, 'svn', tests_tmp_path, baseapp, test_repo)
732 751
733 752
734 753 @pytest.fixture
735 754 def vcsbackend_stub(vcsbackend_git):
736 755 """
737 756 Use this to express that your test just needs a stub of a vcsbackend.
738 757
739 758 Plan is to eventually implement an in-memory stub to speed tests up.
740 759 """
741 760 return vcsbackend_git
742 761
743 762
744 763 class VcsBackend(object):
745 764 """
746 765 Represents the test configuration for one supported vcs backend.
747 766 """
748 767
749 768 invalid_repo_name = re.compile(r'[^0-9a-zA-Z]+')
750 769
751 770 def __init__(self, alias, repo_path, test_name, test_repo_container):
752 771 self.alias = alias
753 772 self._repo_path = repo_path
754 773 self._cleanup_repos = []
755 774 self._test_name = test_name
756 775 self._test_repo_container = test_repo_container
757 776
758 777 def __getitem__(self, key):
759 778 return self._test_repo_container(key, self.alias).scm_instance()
760 779
761 780 @property
762 781 def repo(self):
763 782 """
764 783 Returns the "current" repository. This is the vcs_test repo of the last
765 784 repo which has been created.
766 785 """
767 786 Repository = get_backend(self.alias)
768 787 return Repository(self._repo_path)
769 788
770 789 @property
771 790 def backend(self):
772 791 """
773 792 Returns the backend implementation class.
774 793 """
775 794 return get_backend(self.alias)
776 795
777 796 def create_repo(self, commits=None, number_of_commits=0, _clone_repo=None,
778 797 bare=False):
779 798 repo_name = self._next_repo_name()
780 799 self._repo_path = get_new_dir(repo_name)
781 800 repo_class = get_backend(self.alias)
782 801 src_url = None
783 802 if _clone_repo:
784 803 src_url = _clone_repo.path
785 804 repo = repo_class(self._repo_path, create=True, src_url=src_url, bare=bare)
786 805 self._cleanup_repos.append(repo)
787 806
788 807 commits = commits or [
789 808 {'message': 'Commit %s of %s' % (x, repo_name)}
790 809 for x in xrange(number_of_commits)]
791 810 _add_commits_to_repo(repo, commits)
792 811 return repo
793 812
794 813 def clone_repo(self, repo):
795 814 return self.create_repo(_clone_repo=repo)
796 815
797 816 def cleanup(self):
798 817 for repo in self._cleanup_repos:
799 818 shutil.rmtree(repo.path)
800 819
801 820 def new_repo_path(self):
802 821 repo_name = self._next_repo_name()
803 822 self._repo_path = get_new_dir(repo_name)
804 823 return self._repo_path
805 824
806 825 def _next_repo_name(self):
807 826 return "%s_%s" % (
808 827 self.invalid_repo_name.sub('_', self._test_name),
809 828 len(self._cleanup_repos))
810 829
811 830 def add_file(self, repo, filename, content='Test content\n'):
812 831 imc = repo.in_memory_commit
813 832 imc.add(FileNode(filename, content=content))
814 833 imc.commit(
815 834 message=u'Automatic commit from vcsbackend fixture',
816 835 author=u'Automatic')
817 836
818 837 def ensure_file(self, filename, content='Test content\n'):
819 838 assert self._cleanup_repos, "Avoid writing into vcs_test repos"
820 839 self.add_file(self.repo, filename, content)
821 840
822 841
823 842 def _add_commits_to_repo(vcs_repo, commits):
824 843 commit_ids = {}
825 844 if not commits:
826 845 return commit_ids
827 846
828 847 imc = vcs_repo.in_memory_commit
829 848 commit = None
830 849
831 850 for idx, commit in enumerate(commits):
832 851 message = unicode(commit.get('message', 'Commit %s' % idx))
833 852
834 853 for node in commit.get('added', []):
835 854 imc.add(FileNode(node.path, content=node.content))
836 855 for node in commit.get('changed', []):
837 856 imc.change(FileNode(node.path, content=node.content))
838 857 for node in commit.get('removed', []):
839 858 imc.remove(FileNode(node.path))
840 859
841 860 parents = [
842 861 vcs_repo.get_commit(commit_id=commit_ids[p])
843 862 for p in commit.get('parents', [])]
844 863
845 864 operations = ('added', 'changed', 'removed')
846 865 if not any((commit.get(o) for o in operations)):
847 866 imc.add(FileNode('file_%s' % idx, content=message))
848 867
849 868 commit = imc.commit(
850 869 message=message,
851 870 author=unicode(commit.get('author', 'Automatic')),
852 871 date=commit.get('date'),
853 872 branch=commit.get('branch'),
854 873 parents=parents)
855 874
856 875 commit_ids[commit.message] = commit.raw_id
857 876
858 877 return commit_ids
859 878
860 879
861 880 @pytest.fixture
862 881 def reposerver(request):
863 882 """
864 883 Allows to serve a backend repository
865 884 """
866 885
867 886 repo_server = RepoServer()
868 887 request.addfinalizer(repo_server.cleanup)
869 888 return repo_server
870 889
871 890
872 891 class RepoServer(object):
873 892 """
874 893 Utility to serve a local repository for the duration of a test case.
875 894
876 895 Supports only Subversion so far.
877 896 """
878 897
879 898 url = None
880 899
881 900 def __init__(self):
882 901 self._cleanup_servers = []
883 902
884 903 def serve(self, vcsrepo):
885 904 if vcsrepo.alias != 'svn':
886 905 raise TypeError("Backend %s not supported" % vcsrepo.alias)
887 906
888 907 proc = subprocess32.Popen(
889 908 ['svnserve', '-d', '--foreground', '--listen-host', 'localhost',
890 909 '--root', vcsrepo.path])
891 910 self._cleanup_servers.append(proc)
892 911 self.url = 'svn://localhost'
893 912
894 913 def cleanup(self):
895 914 for proc in self._cleanup_servers:
896 915 proc.terminate()
897 916
898 917
899 918 @pytest.fixture
900 919 def pr_util(backend, request, config_stub):
901 920 """
902 921 Utility for tests of models and for functional tests around pull requests.
903 922
904 923 It gives an instance of :class:`PRTestUtility` which provides various
905 924 utility methods around one pull request.
906 925
907 926 This fixture uses `backend` and inherits its parameterization.
908 927 """
909 928
910 929 util = PRTestUtility(backend)
911 930 request.addfinalizer(util.cleanup)
912 931
913 932 return util
914 933
915 934
916 935 class PRTestUtility(object):
917 936
918 937 pull_request = None
919 938 pull_request_id = None
920 939 mergeable_patcher = None
921 940 mergeable_mock = None
922 941 notification_patcher = None
923 942
924 943 def __init__(self, backend):
925 944 self.backend = backend
926 945
927 946 def create_pull_request(
928 947 self, commits=None, target_head=None, source_head=None,
929 948 revisions=None, approved=False, author=None, mergeable=False,
930 949 enable_notifications=True, name_suffix=u'', reviewers=None,
931 950 title=u"Test", description=u"Description"):
932 951 self.set_mergeable(mergeable)
933 952 if not enable_notifications:
934 953 # mock notification side effect
935 954 self.notification_patcher = mock.patch(
936 955 'rhodecode.model.notification.NotificationModel.create')
937 956 self.notification_patcher.start()
938 957
939 958 if not self.pull_request:
940 959 if not commits:
941 960 commits = [
942 961 {'message': 'c1'},
943 962 {'message': 'c2'},
944 963 {'message': 'c3'},
945 964 ]
946 965 target_head = 'c1'
947 966 source_head = 'c2'
948 967 revisions = ['c2']
949 968
950 969 self.commit_ids = self.backend.create_master_repo(commits)
951 970 self.target_repository = self.backend.create_repo(
952 971 heads=[target_head], name_suffix=name_suffix)
953 972 self.source_repository = self.backend.create_repo(
954 973 heads=[source_head], name_suffix=name_suffix)
955 974 self.author = author or UserModel().get_by_username(
956 975 TEST_USER_ADMIN_LOGIN)
957 976
958 977 model = PullRequestModel()
959 978 self.create_parameters = {
960 979 'created_by': self.author,
961 980 'source_repo': self.source_repository.repo_name,
962 981 'source_ref': self._default_branch_reference(source_head),
963 982 'target_repo': self.target_repository.repo_name,
964 983 'target_ref': self._default_branch_reference(target_head),
965 984 'revisions': [self.commit_ids[r] for r in revisions],
966 985 'reviewers': reviewers or self._get_reviewers(),
967 986 'title': title,
968 987 'description': description,
969 988 }
970 989 self.pull_request = model.create(**self.create_parameters)
971 990 assert model.get_versions(self.pull_request) == []
972 991
973 992 self.pull_request_id = self.pull_request.pull_request_id
974 993
975 994 if approved:
976 995 self.approve()
977 996
978 997 Session().add(self.pull_request)
979 998 Session().commit()
980 999
981 1000 return self.pull_request
982 1001
983 1002 def approve(self):
984 1003 self.create_status_votes(
985 1004 ChangesetStatus.STATUS_APPROVED,
986 1005 *self.pull_request.reviewers)
987 1006
988 1007 def close(self):
989 1008 PullRequestModel().close_pull_request(self.pull_request, self.author)
990 1009
991 1010 def _default_branch_reference(self, commit_message):
992 1011 reference = '%s:%s:%s' % (
993 1012 'branch',
994 1013 self.backend.default_branch_name,
995 1014 self.commit_ids[commit_message])
996 1015 return reference
997 1016
998 1017 def _get_reviewers(self):
999 1018 return [
1000 1019 (TEST_USER_REGULAR_LOGIN, ['default1'], False, []),
1001 1020 (TEST_USER_REGULAR2_LOGIN, ['default2'], False, []),
1002 1021 ]
1003 1022
1004 1023 def update_source_repository(self, head=None):
1005 1024 heads = [head or 'c3']
1006 1025 self.backend.pull_heads(self.source_repository, heads=heads)
1007 1026
1008 1027 def add_one_commit(self, head=None):
1009 1028 self.update_source_repository(head=head)
1010 1029 old_commit_ids = set(self.pull_request.revisions)
1011 1030 PullRequestModel().update_commits(self.pull_request)
1012 1031 commit_ids = set(self.pull_request.revisions)
1013 1032 new_commit_ids = commit_ids - old_commit_ids
1014 1033 assert len(new_commit_ids) == 1
1015 1034 return new_commit_ids.pop()
1016 1035
1017 1036 def remove_one_commit(self):
1018 1037 assert len(self.pull_request.revisions) == 2
1019 1038 source_vcs = self.source_repository.scm_instance()
1020 1039 removed_commit_id = source_vcs.commit_ids[-1]
1021 1040
1022 1041 # TODO: johbo: Git and Mercurial have an inconsistent vcs api here,
1023 1042 # remove the if once that's sorted out.
1024 1043 if self.backend.alias == "git":
1025 1044 kwargs = {'branch_name': self.backend.default_branch_name}
1026 1045 else:
1027 1046 kwargs = {}
1028 1047 source_vcs.strip(removed_commit_id, **kwargs)
1029 1048
1030 1049 PullRequestModel().update_commits(self.pull_request)
1031 1050 assert len(self.pull_request.revisions) == 1
1032 1051 return removed_commit_id
1033 1052
1034 1053 def create_comment(self, linked_to=None):
1035 1054 comment = CommentsModel().create(
1036 1055 text=u"Test comment",
1037 1056 repo=self.target_repository.repo_name,
1038 1057 user=self.author,
1039 1058 pull_request=self.pull_request)
1040 1059 assert comment.pull_request_version_id is None
1041 1060
1042 1061 if linked_to:
1043 1062 PullRequestModel()._link_comments_to_version(linked_to)
1044 1063
1045 1064 return comment
1046 1065
1047 1066 def create_inline_comment(
1048 1067 self, linked_to=None, line_no=u'n1', file_path='file_1'):
1049 1068 comment = CommentsModel().create(
1050 1069 text=u"Test comment",
1051 1070 repo=self.target_repository.repo_name,
1052 1071 user=self.author,
1053 1072 line_no=line_no,
1054 1073 f_path=file_path,
1055 1074 pull_request=self.pull_request)
1056 1075 assert comment.pull_request_version_id is None
1057 1076
1058 1077 if linked_to:
1059 1078 PullRequestModel()._link_comments_to_version(linked_to)
1060 1079
1061 1080 return comment
1062 1081
1063 1082 def create_version_of_pull_request(self):
1064 1083 pull_request = self.create_pull_request()
1065 1084 version = PullRequestModel()._create_version_from_snapshot(
1066 1085 pull_request)
1067 1086 return version
1068 1087
1069 1088 def create_status_votes(self, status, *reviewers):
1070 1089 for reviewer in reviewers:
1071 1090 ChangesetStatusModel().set_status(
1072 1091 repo=self.pull_request.target_repo,
1073 1092 status=status,
1074 1093 user=reviewer.user_id,
1075 1094 pull_request=self.pull_request)
1076 1095
1077 1096 def set_mergeable(self, value):
1078 1097 if not self.mergeable_patcher:
1079 1098 self.mergeable_patcher = mock.patch.object(
1080 1099 VcsSettingsModel, 'get_general_settings')
1081 1100 self.mergeable_mock = self.mergeable_patcher.start()
1082 1101 self.mergeable_mock.return_value = {
1083 1102 'rhodecode_pr_merge_enabled': value}
1084 1103
1085 1104 def cleanup(self):
1086 1105 # In case the source repository is already cleaned up, the pull
1087 1106 # request will already be deleted.
1088 1107 pull_request = PullRequest().get(self.pull_request_id)
1089 1108 if pull_request:
1090 1109 PullRequestModel().delete(pull_request, pull_request.author)
1091 1110 Session().commit()
1092 1111
1093 1112 if self.notification_patcher:
1094 1113 self.notification_patcher.stop()
1095 1114
1096 1115 if self.mergeable_patcher:
1097 1116 self.mergeable_patcher.stop()
1098 1117
1099 1118
1100 1119 @pytest.fixture
1101 1120 def user_admin(baseapp):
1102 1121 """
1103 1122 Provides the default admin test user as an instance of `db.User`.
1104 1123 """
1105 1124 user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
1106 1125 return user
1107 1126
1108 1127
1109 1128 @pytest.fixture
1110 1129 def user_regular(baseapp):
1111 1130 """
1112 1131 Provides the default regular test user as an instance of `db.User`.
1113 1132 """
1114 1133 user = UserModel().get_by_username(TEST_USER_REGULAR_LOGIN)
1115 1134 return user
1116 1135
1117 1136
1118 1137 @pytest.fixture
1119 1138 def user_util(request, db_connection):
1120 1139 """
1121 1140 Provides a wired instance of `UserUtility` with integrated cleanup.
1122 1141 """
1123 1142 utility = UserUtility(test_name=request.node.name)
1124 1143 request.addfinalizer(utility.cleanup)
1125 1144 return utility
1126 1145
1127 1146
1128 1147 # TODO: johbo: Split this up into utilities per domain or something similar
1129 1148 class UserUtility(object):
1130 1149
1131 1150 def __init__(self, test_name="test"):
1132 1151 self._test_name = self._sanitize_name(test_name)
1133 1152 self.fixture = Fixture()
1134 1153 self.repo_group_ids = []
1135 1154 self.repos_ids = []
1136 1155 self.user_ids = []
1137 1156 self.user_group_ids = []
1138 1157 self.user_repo_permission_ids = []
1139 1158 self.user_group_repo_permission_ids = []
1140 1159 self.user_repo_group_permission_ids = []
1141 1160 self.user_group_repo_group_permission_ids = []
1142 1161 self.user_user_group_permission_ids = []
1143 1162 self.user_group_user_group_permission_ids = []
1144 1163 self.user_permissions = []
1145 1164
1146 1165 def _sanitize_name(self, name):
1147 1166 for char in ['[', ']']:
1148 1167 name = name.replace(char, '_')
1149 1168 return name
1150 1169
1151 1170 def create_repo_group(
1152 1171 self, owner=TEST_USER_ADMIN_LOGIN, auto_cleanup=True):
1153 1172 group_name = "{prefix}_repogroup_{count}".format(
1154 1173 prefix=self._test_name,
1155 1174 count=len(self.repo_group_ids))
1156 1175 repo_group = self.fixture.create_repo_group(
1157 1176 group_name, cur_user=owner)
1158 1177 if auto_cleanup:
1159 1178 self.repo_group_ids.append(repo_group.group_id)
1160 1179 return repo_group
1161 1180
1162 1181 def create_repo(self, owner=TEST_USER_ADMIN_LOGIN, parent=None,
1163 1182 auto_cleanup=True, repo_type='hg', bare=False):
1164 1183 repo_name = "{prefix}_repository_{count}".format(
1165 1184 prefix=self._test_name,
1166 1185 count=len(self.repos_ids))
1167 1186
1168 1187 repository = self.fixture.create_repo(
1169 1188 repo_name, cur_user=owner, repo_group=parent, repo_type=repo_type, bare=bare)
1170 1189 if auto_cleanup:
1171 1190 self.repos_ids.append(repository.repo_id)
1172 1191 return repository
1173 1192
1174 1193 def create_user(self, auto_cleanup=True, **kwargs):
1175 1194 user_name = "{prefix}_user_{count}".format(
1176 1195 prefix=self._test_name,
1177 1196 count=len(self.user_ids))
1178 1197 user = self.fixture.create_user(user_name, **kwargs)
1179 1198 if auto_cleanup:
1180 1199 self.user_ids.append(user.user_id)
1181 1200 return user
1182 1201
1183 1202 def create_additional_user_email(self, user, email):
1184 1203 uem = self.fixture.create_additional_user_email(user=user, email=email)
1185 1204 return uem
1186 1205
1187 1206 def create_user_with_group(self):
1188 1207 user = self.create_user()
1189 1208 user_group = self.create_user_group(members=[user])
1190 1209 return user, user_group
1191 1210
1192 1211 def create_user_group(self, owner=TEST_USER_ADMIN_LOGIN, members=None,
1193 1212 auto_cleanup=True, **kwargs):
1194 1213 group_name = "{prefix}_usergroup_{count}".format(
1195 1214 prefix=self._test_name,
1196 1215 count=len(self.user_group_ids))
1197 1216 user_group = self.fixture.create_user_group(
1198 1217 group_name, cur_user=owner, **kwargs)
1199 1218
1200 1219 if auto_cleanup:
1201 1220 self.user_group_ids.append(user_group.users_group_id)
1202 1221 if members:
1203 1222 for user in members:
1204 1223 UserGroupModel().add_user_to_group(user_group, user)
1205 1224 return user_group
1206 1225
1207 1226 def grant_user_permission(self, user_name, permission_name):
1208 1227 self._inherit_default_user_permissions(user_name, False)
1209 1228 self.user_permissions.append((user_name, permission_name))
1210 1229
1211 1230 def grant_user_permission_to_repo_group(
1212 1231 self, repo_group, user, permission_name):
1213 1232 permission = RepoGroupModel().grant_user_permission(
1214 1233 repo_group, user, permission_name)
1215 1234 self.user_repo_group_permission_ids.append(
1216 1235 (repo_group.group_id, user.user_id))
1217 1236 return permission
1218 1237
1219 1238 def grant_user_group_permission_to_repo_group(
1220 1239 self, repo_group, user_group, permission_name):
1221 1240 permission = RepoGroupModel().grant_user_group_permission(
1222 1241 repo_group, user_group, permission_name)
1223 1242 self.user_group_repo_group_permission_ids.append(
1224 1243 (repo_group.group_id, user_group.users_group_id))
1225 1244 return permission
1226 1245
1227 1246 def grant_user_permission_to_repo(
1228 1247 self, repo, user, permission_name):
1229 1248 permission = RepoModel().grant_user_permission(
1230 1249 repo, user, permission_name)
1231 1250 self.user_repo_permission_ids.append(
1232 1251 (repo.repo_id, user.user_id))
1233 1252 return permission
1234 1253
1235 1254 def grant_user_group_permission_to_repo(
1236 1255 self, repo, user_group, permission_name):
1237 1256 permission = RepoModel().grant_user_group_permission(
1238 1257 repo, user_group, permission_name)
1239 1258 self.user_group_repo_permission_ids.append(
1240 1259 (repo.repo_id, user_group.users_group_id))
1241 1260 return permission
1242 1261
1243 1262 def grant_user_permission_to_user_group(
1244 1263 self, target_user_group, user, permission_name):
1245 1264 permission = UserGroupModel().grant_user_permission(
1246 1265 target_user_group, user, permission_name)
1247 1266 self.user_user_group_permission_ids.append(
1248 1267 (target_user_group.users_group_id, user.user_id))
1249 1268 return permission
1250 1269
1251 1270 def grant_user_group_permission_to_user_group(
1252 1271 self, target_user_group, user_group, permission_name):
1253 1272 permission = UserGroupModel().grant_user_group_permission(
1254 1273 target_user_group, user_group, permission_name)
1255 1274 self.user_group_user_group_permission_ids.append(
1256 1275 (target_user_group.users_group_id, user_group.users_group_id))
1257 1276 return permission
1258 1277
1259 1278 def revoke_user_permission(self, user_name, permission_name):
1260 1279 self._inherit_default_user_permissions(user_name, True)
1261 1280 UserModel().revoke_perm(user_name, permission_name)
1262 1281
1263 1282 def _inherit_default_user_permissions(self, user_name, value):
1264 1283 user = UserModel().get_by_username(user_name)
1265 1284 user.inherit_default_permissions = value
1266 1285 Session().add(user)
1267 1286 Session().commit()
1268 1287
1269 1288 def cleanup(self):
1270 1289 self._cleanup_permissions()
1271 1290 self._cleanup_repos()
1272 1291 self._cleanup_repo_groups()
1273 1292 self._cleanup_user_groups()
1274 1293 self._cleanup_users()
1275 1294
1276 1295 def _cleanup_permissions(self):
1277 1296 if self.user_permissions:
1278 1297 for user_name, permission_name in self.user_permissions:
1279 1298 self.revoke_user_permission(user_name, permission_name)
1280 1299
1281 1300 for permission in self.user_repo_permission_ids:
1282 1301 RepoModel().revoke_user_permission(*permission)
1283 1302
1284 1303 for permission in self.user_group_repo_permission_ids:
1285 1304 RepoModel().revoke_user_group_permission(*permission)
1286 1305
1287 1306 for permission in self.user_repo_group_permission_ids:
1288 1307 RepoGroupModel().revoke_user_permission(*permission)
1289 1308
1290 1309 for permission in self.user_group_repo_group_permission_ids:
1291 1310 RepoGroupModel().revoke_user_group_permission(*permission)
1292 1311
1293 1312 for permission in self.user_user_group_permission_ids:
1294 1313 UserGroupModel().revoke_user_permission(*permission)
1295 1314
1296 1315 for permission in self.user_group_user_group_permission_ids:
1297 1316 UserGroupModel().revoke_user_group_permission(*permission)
1298 1317
1299 1318 def _cleanup_repo_groups(self):
1300 1319 def _repo_group_compare(first_group_id, second_group_id):
1301 1320 """
1302 1321 Gives higher priority to the groups with the most complex paths
1303 1322 """
1304 1323 first_group = RepoGroup.get(first_group_id)
1305 1324 second_group = RepoGroup.get(second_group_id)
1306 1325 first_group_parts = (
1307 1326 len(first_group.group_name.split('/')) if first_group else 0)
1308 1327 second_group_parts = (
1309 1328 len(second_group.group_name.split('/')) if second_group else 0)
1310 1329 return cmp(second_group_parts, first_group_parts)
1311 1330
1312 1331 sorted_repo_group_ids = sorted(
1313 1332 self.repo_group_ids, cmp=_repo_group_compare)
1314 1333 for repo_group_id in sorted_repo_group_ids:
1315 1334 self.fixture.destroy_repo_group(repo_group_id)
1316 1335
1317 1336 def _cleanup_repos(self):
1318 1337 sorted_repos_ids = sorted(self.repos_ids)
1319 1338 for repo_id in sorted_repos_ids:
1320 1339 self.fixture.destroy_repo(repo_id)
1321 1340
1322 1341 def _cleanup_user_groups(self):
1323 1342 def _user_group_compare(first_group_id, second_group_id):
1324 1343 """
1325 1344 Gives higher priority to the groups with the most complex paths
1326 1345 """
1327 1346 first_group = UserGroup.get(first_group_id)
1328 1347 second_group = UserGroup.get(second_group_id)
1329 1348 first_group_parts = (
1330 1349 len(first_group.users_group_name.split('/'))
1331 1350 if first_group else 0)
1332 1351 second_group_parts = (
1333 1352 len(second_group.users_group_name.split('/'))
1334 1353 if second_group else 0)
1335 1354 return cmp(second_group_parts, first_group_parts)
1336 1355
1337 1356 sorted_user_group_ids = sorted(
1338 1357 self.user_group_ids, cmp=_user_group_compare)
1339 1358 for user_group_id in sorted_user_group_ids:
1340 1359 self.fixture.destroy_user_group(user_group_id)
1341 1360
1342 1361 def _cleanup_users(self):
1343 1362 for user_id in self.user_ids:
1344 1363 self.fixture.destroy_user(user_id)
1345 1364
1346 1365
1347 1366 # TODO: Think about moving this into a pytest-pyro package and make it a
1348 1367 # pytest plugin
1349 1368 @pytest.hookimpl(tryfirst=True, hookwrapper=True)
1350 1369 def pytest_runtest_makereport(item, call):
1351 1370 """
1352 1371 Adding the remote traceback if the exception has this information.
1353 1372
1354 1373 VCSServer attaches this information as the attribute `_vcs_server_traceback`
1355 1374 to the exception instance.
1356 1375 """
1357 1376 outcome = yield
1358 1377 report = outcome.get_result()
1359 1378 if call.excinfo:
1360 1379 _add_vcsserver_remote_traceback(report, call.excinfo.value)
1361 1380
1362 1381
1363 1382 def _add_vcsserver_remote_traceback(report, exc):
1364 1383 vcsserver_traceback = getattr(exc, '_vcs_server_traceback', None)
1365 1384
1366 1385 if vcsserver_traceback:
1367 1386 section = 'VCSServer remote traceback ' + report.when
1368 1387 report.sections.append((section, vcsserver_traceback))
1369 1388
1370 1389
1371 1390 @pytest.fixture(scope='session')
1372 1391 def testrun():
1373 1392 return {
1374 1393 'uuid': uuid.uuid4(),
1375 1394 'start': datetime.datetime.utcnow().isoformat(),
1376 1395 'timestamp': int(time.time()),
1377 1396 }
1378 1397
1379 1398
1380 1399 @pytest.fixture(autouse=True)
1381 1400 def collect_appenlight_stats(request, testrun):
1382 1401 """
1383 1402 This fixture reports memory consumtion of single tests.
1384 1403
1385 1404 It gathers data based on `psutil` and sends them to Appenlight. The option
1386 1405 ``--ae`` has te be used to enable this fixture and the API key for your
1387 1406 application has to be provided in ``--ae-key``.
1388 1407 """
1389 1408 try:
1390 1409 # cygwin cannot have yet psutil support.
1391 1410 import psutil
1392 1411 except ImportError:
1393 1412 return
1394 1413
1395 1414 if not request.config.getoption('--appenlight'):
1396 1415 return
1397 1416 else:
1398 1417 # Only request the baseapp fixture if appenlight tracking is
1399 1418 # enabled. This will speed up a test run of unit tests by 2 to 3
1400 1419 # seconds if appenlight is not enabled.
1401 1420 baseapp = request.getfuncargvalue("baseapp")
1402 1421 url = '{}/api/logs'.format(request.config.getoption('--appenlight-url'))
1403 1422 client = AppenlightClient(
1404 1423 url=url,
1405 1424 api_key=request.config.getoption('--appenlight-api-key'),
1406 1425 namespace=request.node.nodeid,
1407 1426 request=str(testrun['uuid']),
1408 1427 testrun=testrun)
1409 1428
1410 1429 client.collect({
1411 1430 'message': "Starting",
1412 1431 })
1413 1432
1414 1433 server_and_port = baseapp.config.get_settings()['vcs.server']
1415 1434 protocol = baseapp.config.get_settings()['vcs.server.protocol']
1416 1435 server = create_vcsserver_proxy(server_and_port, protocol)
1417 1436 with server:
1418 1437 vcs_pid = server.get_pid()
1419 1438 server.run_gc()
1420 1439 vcs_process = psutil.Process(vcs_pid)
1421 1440 mem = vcs_process.memory_info()
1422 1441 client.tag_before('vcsserver.rss', mem.rss)
1423 1442 client.tag_before('vcsserver.vms', mem.vms)
1424 1443
1425 1444 test_process = psutil.Process()
1426 1445 mem = test_process.memory_info()
1427 1446 client.tag_before('test.rss', mem.rss)
1428 1447 client.tag_before('test.vms', mem.vms)
1429 1448
1430 1449 client.tag_before('time', time.time())
1431 1450
1432 1451 @request.addfinalizer
1433 1452 def send_stats():
1434 1453 client.tag_after('time', time.time())
1435 1454 with server:
1436 1455 gc_stats = server.run_gc()
1437 1456 for tag, value in gc_stats.items():
1438 1457 client.tag_after(tag, value)
1439 1458 mem = vcs_process.memory_info()
1440 1459 client.tag_after('vcsserver.rss', mem.rss)
1441 1460 client.tag_after('vcsserver.vms', mem.vms)
1442 1461
1443 1462 mem = test_process.memory_info()
1444 1463 client.tag_after('test.rss', mem.rss)
1445 1464 client.tag_after('test.vms', mem.vms)
1446 1465
1447 1466 client.collect({
1448 1467 'message': "Finished",
1449 1468 })
1450 1469 client.send_stats()
1451 1470
1452 1471 return client
1453 1472
1454 1473
1455 1474 class AppenlightClient():
1456 1475
1457 1476 url_template = '{url}?protocol_version=0.5'
1458 1477
1459 1478 def __init__(
1460 1479 self, url, api_key, add_server=True, add_timestamp=True,
1461 1480 namespace=None, request=None, testrun=None):
1462 1481 self.url = self.url_template.format(url=url)
1463 1482 self.api_key = api_key
1464 1483 self.add_server = add_server
1465 1484 self.add_timestamp = add_timestamp
1466 1485 self.namespace = namespace
1467 1486 self.request = request
1468 1487 self.server = socket.getfqdn(socket.gethostname())
1469 1488 self.tags_before = {}
1470 1489 self.tags_after = {}
1471 1490 self.stats = []
1472 1491 self.testrun = testrun or {}
1473 1492
1474 1493 def tag_before(self, tag, value):
1475 1494 self.tags_before[tag] = value
1476 1495
1477 1496 def tag_after(self, tag, value):
1478 1497 self.tags_after[tag] = value
1479 1498
1480 1499 def collect(self, data):
1481 1500 if self.add_server:
1482 1501 data.setdefault('server', self.server)
1483 1502 if self.add_timestamp:
1484 1503 data.setdefault('date', datetime.datetime.utcnow().isoformat())
1485 1504 if self.namespace:
1486 1505 data.setdefault('namespace', self.namespace)
1487 1506 if self.request:
1488 1507 data.setdefault('request', self.request)
1489 1508 self.stats.append(data)
1490 1509
1491 1510 def send_stats(self):
1492 1511 tags = [
1493 1512 ('testrun', self.request),
1494 1513 ('testrun.start', self.testrun['start']),
1495 1514 ('testrun.timestamp', self.testrun['timestamp']),
1496 1515 ('test', self.namespace),
1497 1516 ]
1498 1517 for key, value in self.tags_before.items():
1499 1518 tags.append((key + '.before', value))
1500 1519 try:
1501 1520 delta = self.tags_after[key] - value
1502 1521 tags.append((key + '.delta', delta))
1503 1522 except Exception:
1504 1523 pass
1505 1524 for key, value in self.tags_after.items():
1506 1525 tags.append((key + '.after', value))
1507 1526 self.collect({
1508 1527 'message': "Collected tags",
1509 1528 'tags': tags,
1510 1529 })
1511 1530
1512 1531 response = requests.post(
1513 1532 self.url,
1514 1533 headers={
1515 1534 'X-appenlight-api-key': self.api_key},
1516 1535 json=self.stats,
1517 1536 )
1518 1537
1519 1538 if not response.status_code == 200:
1520 1539 pprint.pprint(self.stats)
1521 1540 print(response.headers)
1522 1541 print(response.text)
1523 1542 raise Exception('Sending to appenlight failed')
1524 1543
1525 1544
1526 1545 @pytest.fixture
1527 1546 def gist_util(request, db_connection):
1528 1547 """
1529 1548 Provides a wired instance of `GistUtility` with integrated cleanup.
1530 1549 """
1531 1550 utility = GistUtility()
1532 1551 request.addfinalizer(utility.cleanup)
1533 1552 return utility
1534 1553
1535 1554
1536 1555 class GistUtility(object):
1537 1556 def __init__(self):
1538 1557 self.fixture = Fixture()
1539 1558 self.gist_ids = []
1540 1559
1541 1560 def create_gist(self, **kwargs):
1542 1561 gist = self.fixture.create_gist(**kwargs)
1543 1562 self.gist_ids.append(gist.gist_id)
1544 1563 return gist
1545 1564
1546 1565 def cleanup(self):
1547 1566 for id_ in self.gist_ids:
1548 1567 self.fixture.destroy_gists(str(id_))
1549 1568
1550 1569
1551 1570 @pytest.fixture
1552 1571 def enabled_backends(request):
1553 1572 backends = request.config.option.backends
1554 1573 return backends[:]
1555 1574
1556 1575
1557 1576 @pytest.fixture
1558 1577 def settings_util(request, db_connection):
1559 1578 """
1560 1579 Provides a wired instance of `SettingsUtility` with integrated cleanup.
1561 1580 """
1562 1581 utility = SettingsUtility()
1563 1582 request.addfinalizer(utility.cleanup)
1564 1583 return utility
1565 1584
1566 1585
1567 1586 class SettingsUtility(object):
1568 1587 def __init__(self):
1569 1588 self.rhodecode_ui_ids = []
1570 1589 self.rhodecode_setting_ids = []
1571 1590 self.repo_rhodecode_ui_ids = []
1572 1591 self.repo_rhodecode_setting_ids = []
1573 1592
1574 1593 def create_repo_rhodecode_ui(
1575 1594 self, repo, section, value, key=None, active=True, cleanup=True):
1576 1595 key = key or hashlib.sha1(
1577 1596 '{}{}{}'.format(section, value, repo.repo_id)).hexdigest()
1578 1597
1579 1598 setting = RepoRhodeCodeUi()
1580 1599 setting.repository_id = repo.repo_id
1581 1600 setting.ui_section = section
1582 1601 setting.ui_value = value
1583 1602 setting.ui_key = key
1584 1603 setting.ui_active = active
1585 1604 Session().add(setting)
1586 1605 Session().commit()
1587 1606
1588 1607 if cleanup:
1589 1608 self.repo_rhodecode_ui_ids.append(setting.ui_id)
1590 1609 return setting
1591 1610
1592 1611 def create_rhodecode_ui(
1593 1612 self, section, value, key=None, active=True, cleanup=True):
1594 1613 key = key or hashlib.sha1('{}{}'.format(section, value)).hexdigest()
1595 1614
1596 1615 setting = RhodeCodeUi()
1597 1616 setting.ui_section = section
1598 1617 setting.ui_value = value
1599 1618 setting.ui_key = key
1600 1619 setting.ui_active = active
1601 1620 Session().add(setting)
1602 1621 Session().commit()
1603 1622
1604 1623 if cleanup:
1605 1624 self.rhodecode_ui_ids.append(setting.ui_id)
1606 1625 return setting
1607 1626
1608 1627 def create_repo_rhodecode_setting(
1609 1628 self, repo, name, value, type_, cleanup=True):
1610 1629 setting = RepoRhodeCodeSetting(
1611 1630 repo.repo_id, key=name, val=value, type=type_)
1612 1631 Session().add(setting)
1613 1632 Session().commit()
1614 1633
1615 1634 if cleanup:
1616 1635 self.repo_rhodecode_setting_ids.append(setting.app_settings_id)
1617 1636 return setting
1618 1637
1619 1638 def create_rhodecode_setting(self, name, value, type_, cleanup=True):
1620 1639 setting = RhodeCodeSetting(key=name, val=value, type=type_)
1621 1640 Session().add(setting)
1622 1641 Session().commit()
1623 1642
1624 1643 if cleanup:
1625 1644 self.rhodecode_setting_ids.append(setting.app_settings_id)
1626 1645
1627 1646 return setting
1628 1647
1629 1648 def cleanup(self):
1630 1649 for id_ in self.rhodecode_ui_ids:
1631 1650 setting = RhodeCodeUi.get(id_)
1632 1651 Session().delete(setting)
1633 1652
1634 1653 for id_ in self.rhodecode_setting_ids:
1635 1654 setting = RhodeCodeSetting.get(id_)
1636 1655 Session().delete(setting)
1637 1656
1638 1657 for id_ in self.repo_rhodecode_ui_ids:
1639 1658 setting = RepoRhodeCodeUi.get(id_)
1640 1659 Session().delete(setting)
1641 1660
1642 1661 for id_ in self.repo_rhodecode_setting_ids:
1643 1662 setting = RepoRhodeCodeSetting.get(id_)
1644 1663 Session().delete(setting)
1645 1664
1646 1665 Session().commit()
1647 1666
1648 1667
1649 1668 @pytest.fixture
1650 1669 def no_notifications(request):
1651 1670 notification_patcher = mock.patch(
1652 1671 'rhodecode.model.notification.NotificationModel.create')
1653 1672 notification_patcher.start()
1654 1673 request.addfinalizer(notification_patcher.stop)
1655 1674
1656 1675
1657 1676 @pytest.fixture(scope='session')
1658 1677 def repeat(request):
1659 1678 """
1660 1679 The number of repetitions is based on this fixture.
1661 1680
1662 1681 Slower calls may divide it by 10 or 100. It is chosen in a way so that the
1663 1682 tests are not too slow in our default test suite.
1664 1683 """
1665 1684 return request.config.getoption('--repeat')
1666 1685
1667 1686
1668 1687 @pytest.fixture
1669 1688 def rhodecode_fixtures():
1670 1689 return Fixture()
1671 1690
1672 1691
1673 1692 @pytest.fixture
1674 1693 def context_stub():
1675 1694 """
1676 1695 Stub context object.
1677 1696 """
1678 1697 context = pyramid.testing.DummyResource()
1679 1698 return context
1680 1699
1681 1700
1682 1701 @pytest.fixture
1683 1702 def request_stub():
1684 1703 """
1685 1704 Stub request object.
1686 1705 """
1687 1706 from rhodecode.lib.base import bootstrap_request
1688 1707 request = bootstrap_request(scheme='https')
1689 1708 return request
1690 1709
1691 1710
1692 1711 @pytest.fixture
1693 1712 def config_stub(request, request_stub):
1694 1713 """
1695 1714 Set up pyramid.testing and return the Configurator.
1696 1715 """
1697 1716 from rhodecode.lib.base import bootstrap_config
1698 1717 config = bootstrap_config(request=request_stub)
1699 1718
1700 1719 @request.addfinalizer
1701 1720 def cleanup():
1702 1721 pyramid.testing.tearDown()
1703 1722
1704 1723 return config
1705 1724
1706 1725
1707 1726 @pytest.fixture
1708 1727 def StubIntegrationType():
1709 1728 class _StubIntegrationType(IntegrationTypeBase):
1710 1729 """ Test integration type class """
1711 1730
1712 1731 key = 'test'
1713 1732 display_name = 'Test integration type'
1714 1733 description = 'A test integration type for testing'
1715 1734
1716 1735 @classmethod
1717 1736 def icon(cls):
1718 1737 return 'test_icon_html_image'
1719 1738
1720 1739 def __init__(self, settings):
1721 1740 super(_StubIntegrationType, self).__init__(settings)
1722 1741 self.sent_events = [] # for testing
1723 1742
1724 1743 def send_event(self, event):
1725 1744 self.sent_events.append(event)
1726 1745
1727 1746 def settings_schema(self):
1728 1747 class SettingsSchema(colander.Schema):
1729 1748 test_string_field = colander.SchemaNode(
1730 1749 colander.String(),
1731 1750 missing=colander.required,
1732 1751 title='test string field',
1733 1752 )
1734 1753 test_int_field = colander.SchemaNode(
1735 1754 colander.Int(),
1736 1755 title='some integer setting',
1737 1756 )
1738 1757 return SettingsSchema()
1739 1758
1740 1759
1741 1760 integration_type_registry.register_integration_type(_StubIntegrationType)
1742 1761 return _StubIntegrationType
1743 1762
1744 1763 @pytest.fixture
1745 1764 def stub_integration_settings():
1746 1765 return {
1747 1766 'test_string_field': 'some data',
1748 1767 'test_int_field': 100,
1749 1768 }
1750 1769
1751 1770
1752 1771 @pytest.fixture
1753 1772 def repo_integration_stub(request, repo_stub, StubIntegrationType,
1754 1773 stub_integration_settings):
1755 1774 integration = IntegrationModel().create(
1756 1775 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1757 1776 name='test repo integration',
1758 1777 repo=repo_stub, repo_group=None, child_repos_only=None)
1759 1778
1760 1779 @request.addfinalizer
1761 1780 def cleanup():
1762 1781 IntegrationModel().delete(integration)
1763 1782
1764 1783 return integration
1765 1784
1766 1785
1767 1786 @pytest.fixture
1768 1787 def repogroup_integration_stub(request, test_repo_group, StubIntegrationType,
1769 1788 stub_integration_settings):
1770 1789 integration = IntegrationModel().create(
1771 1790 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1772 1791 name='test repogroup integration',
1773 1792 repo=None, repo_group=test_repo_group, child_repos_only=True)
1774 1793
1775 1794 @request.addfinalizer
1776 1795 def cleanup():
1777 1796 IntegrationModel().delete(integration)
1778 1797
1779 1798 return integration
1780 1799
1781 1800
1782 1801 @pytest.fixture
1783 1802 def repogroup_recursive_integration_stub(request, test_repo_group,
1784 1803 StubIntegrationType, stub_integration_settings):
1785 1804 integration = IntegrationModel().create(
1786 1805 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1787 1806 name='test recursive repogroup integration',
1788 1807 repo=None, repo_group=test_repo_group, child_repos_only=False)
1789 1808
1790 1809 @request.addfinalizer
1791 1810 def cleanup():
1792 1811 IntegrationModel().delete(integration)
1793 1812
1794 1813 return integration
1795 1814
1796 1815
1797 1816 @pytest.fixture
1798 1817 def global_integration_stub(request, StubIntegrationType,
1799 1818 stub_integration_settings):
1800 1819 integration = IntegrationModel().create(
1801 1820 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1802 1821 name='test global integration',
1803 1822 repo=None, repo_group=None, child_repos_only=None)
1804 1823
1805 1824 @request.addfinalizer
1806 1825 def cleanup():
1807 1826 IntegrationModel().delete(integration)
1808 1827
1809 1828 return integration
1810 1829
1811 1830
1812 1831 @pytest.fixture
1813 1832 def root_repos_integration_stub(request, StubIntegrationType,
1814 1833 stub_integration_settings):
1815 1834 integration = IntegrationModel().create(
1816 1835 StubIntegrationType, settings=stub_integration_settings, enabled=True,
1817 1836 name='test global integration',
1818 1837 repo=None, repo_group=None, child_repos_only=True)
1819 1838
1820 1839 @request.addfinalizer
1821 1840 def cleanup():
1822 1841 IntegrationModel().delete(integration)
1823 1842
1824 1843 return integration
1825 1844
1826 1845
1827 1846 @pytest.fixture
1828 1847 def local_dt_to_utc():
1829 1848 def _factory(dt):
1830 1849 return dt.replace(tzinfo=dateutil.tz.tzlocal()).astimezone(
1831 1850 dateutil.tz.tzutc()).replace(tzinfo=None)
1832 1851 return _factory
1833 1852
1834 1853
1835 1854 @pytest.fixture
1836 1855 def disable_anonymous_user(request, baseapp):
1837 1856 set_anonymous_access(False)
1838 1857
1839 1858 @request.addfinalizer
1840 1859 def cleanup():
1841 1860 set_anonymous_access(True)
1842 1861
1843 1862
1844 1863 @pytest.fixture(scope='module')
1845 1864 def rc_fixture(request):
1846 1865 return Fixture()
1847 1866
1848 1867
1849 1868 @pytest.fixture
1850 1869 def repo_groups(request):
1851 1870 fixture = Fixture()
1852 1871
1853 1872 session = Session()
1854 1873 zombie_group = fixture.create_repo_group('zombie')
1855 1874 parent_group = fixture.create_repo_group('parent')
1856 1875 child_group = fixture.create_repo_group('parent/child')
1857 1876 groups_in_db = session.query(RepoGroup).all()
1858 1877 assert len(groups_in_db) == 3
1859 1878 assert child_group.group_parent_id == parent_group.group_id
1860 1879
1861 1880 @request.addfinalizer
1862 1881 def cleanup():
1863 1882 fixture.destroy_repo_group(zombie_group)
1864 1883 fixture.destroy_repo_group(child_group)
1865 1884 fixture.destroy_repo_group(parent_group)
1866 1885
1867 1886 return zombie_group, parent_group, child_group
@@ -1,458 +1,458 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import threading
22 22 import time
23 23 import logging
24 24 import os.path
25 25 import subprocess32
26 26 import tempfile
27 27 import urllib2
28 28 from lxml.html import fromstring, tostring
29 29 from lxml.cssselect import CSSSelector
30 30 from urlparse import urlparse, parse_qsl
31 31 from urllib import unquote_plus
32 32 import webob
33 33
34 34 from webtest.app import TestResponse, TestApp, string_types
35 35 from webtest.compat import print_stderr
36 36
37 37 import pytest
38 38 import rc_testdata
39 39
40 40 from rhodecode.model.db import User, Repository
41 41 from rhodecode.model.meta import Session
42 42 from rhodecode.model.scm import ScmModel
43 43 from rhodecode.lib.vcs.backends.svn.repository import SubversionRepository
44 44 from rhodecode.lib.vcs.backends.base import EmptyCommit
45 45 from rhodecode.tests import login_user_session
46 46
47 47 log = logging.getLogger(__name__)
48 48
49 49
50 50 class CustomTestResponse(TestResponse):
51 51 def _save_output(self, out):
52 52 f = tempfile.NamedTemporaryFile(
53 53 delete=False, prefix='rc-test-', suffix='.html')
54 54 f.write(out)
55 55 return f.name
56 56
57 57 def mustcontain(self, *strings, **kw):
58 58 """
59 59 Assert that the response contains all of the strings passed
60 60 in as arguments.
61 61
62 62 Equivalent to::
63 63
64 64 assert string in res
65 65 """
66 66 if 'no' in kw:
67 67 no = kw['no']
68 68 del kw['no']
69 69 if isinstance(no, string_types):
70 70 no = [no]
71 71 else:
72 72 no = []
73 73 if kw:
74 74 raise TypeError(
75 75 "The only keyword argument allowed is 'no' got %s" % kw)
76 76
77 77 f = self._save_output(str(self))
78 78
79 79 for s in strings:
80 80 if not s in self:
81 81 print_stderr("Actual response (no %r):" % s)
82 82 print_stderr(str(self))
83 83 raise IndexError(
84 84 "Body does not contain string %r, output saved as %s" % (
85 85 s, f))
86 86
87 87 for no_s in no:
88 88 if no_s in self:
89 89 print_stderr("Actual response (has %r)" % no_s)
90 90 print_stderr(str(self))
91 91 raise IndexError(
92 92 "Body contains bad string %r, output saved as %s" % (
93 93 no_s, f))
94 94
95 95 def assert_response(self):
96 96 return AssertResponse(self)
97 97
98 98 def get_session_from_response(self):
99 99 """
100 100 This returns the session from a response object.
101 101 """
102 102
103 103 from pyramid_beaker import session_factory_from_settings
104 104 session = session_factory_from_settings(
105 105 self.test_app.app.config.get_settings())
106 106 return session(self.request)
107 107
108 108
109 109 class TestRequest(webob.BaseRequest):
110 110
111 111 # for py.test
112 112 disabled = True
113 113 ResponseClass = CustomTestResponse
114 114
115 115 def add_response_callback(self, callback):
116 116 pass
117 117
118 118
119 119 class CustomTestApp(TestApp):
120 120 """
121 121 Custom app to make mustcontain more usefull, and extract special methods
122 122 """
123 123 RequestClass = TestRequest
124 124 rc_login_data = {}
125 125 rc_current_session = None
126 126
127 127 def login(self, username=None, password=None):
128 128 from rhodecode.lib import auth
129 129
130 130 if username and password:
131 131 session = login_user_session(self, username, password)
132 132 else:
133 133 session = login_user_session(self)
134 134
135 135 self.rc_login_data['csrf_token'] = auth.get_csrf_token(session)
136 136 self.rc_current_session = session
137 137 return session['rhodecode_user']
138 138
139 139 @property
140 140 def csrf_token(self):
141 141 return self.rc_login_data['csrf_token']
142 142
143 143
144 144 def set_anonymous_access(enabled):
145 145 """(Dis)allows anonymous access depending on parameter `enabled`"""
146 146 user = User.get_default_user()
147 147 user.active = enabled
148 148 Session().add(user)
149 149 Session().commit()
150 150 time.sleep(1.5) # must sleep for cache (1s to expire)
151 151 log.info('anonymous access is now: %s', enabled)
152 152 assert enabled == User.get_default_user().active, (
153 153 'Cannot set anonymous access')
154 154
155 155
156 156 def check_xfail_backends(node, backend_alias):
157 157 # Using "xfail_backends" here intentionally, since this marks work
158 158 # which is "to be done" soon.
159 skip_marker = node.get_marker('xfail_backends')
159 skip_marker = node.get_closest_marker('xfail_backends')
160 160 if skip_marker and backend_alias in skip_marker.args:
161 161 msg = "Support for backend %s to be developed." % (backend_alias, )
162 162 msg = skip_marker.kwargs.get('reason', msg)
163 163 pytest.xfail(msg)
164 164
165 165
166 166 def check_skip_backends(node, backend_alias):
167 167 # Using "skip_backends" here intentionally, since this marks work which is
168 168 # not supported.
169 skip_marker = node.get_marker('skip_backends')
169 skip_marker = node.get_closest_marker('skip_backends')
170 170 if skip_marker and backend_alias in skip_marker.args:
171 171 msg = "Feature not supported for backend %s." % (backend_alias, )
172 172 msg = skip_marker.kwargs.get('reason', msg)
173 173 pytest.skip(msg)
174 174
175 175
176 176 def extract_git_repo_from_dump(dump_name, repo_name):
177 177 """Create git repo `repo_name` from dump `dump_name`."""
178 178 repos_path = ScmModel().repos_path
179 179 target_path = os.path.join(repos_path, repo_name)
180 180 rc_testdata.extract_git_dump(dump_name, target_path)
181 181 return target_path
182 182
183 183
184 184 def extract_hg_repo_from_dump(dump_name, repo_name):
185 185 """Create hg repo `repo_name` from dump `dump_name`."""
186 186 repos_path = ScmModel().repos_path
187 187 target_path = os.path.join(repos_path, repo_name)
188 188 rc_testdata.extract_hg_dump(dump_name, target_path)
189 189 return target_path
190 190
191 191
192 192 def extract_svn_repo_from_dump(dump_name, repo_name):
193 193 """Create a svn repo `repo_name` from dump `dump_name`."""
194 194 repos_path = ScmModel().repos_path
195 195 target_path = os.path.join(repos_path, repo_name)
196 196 SubversionRepository(target_path, create=True)
197 197 _load_svn_dump_into_repo(dump_name, target_path)
198 198 return target_path
199 199
200 200
201 201 def assert_message_in_log(log_records, message, levelno, module):
202 202 messages = [
203 203 r.message for r in log_records
204 204 if r.module == module and r.levelno == levelno
205 205 ]
206 206 assert message in messages
207 207
208 208
209 209 def _load_svn_dump_into_repo(dump_name, repo_path):
210 210 """
211 211 Utility to populate a svn repository with a named dump
212 212
213 213 Currently the dumps are in rc_testdata. They might later on be
214 214 integrated with the main repository once they stabilize more.
215 215 """
216 216 dump = rc_testdata.load_svn_dump(dump_name)
217 217 load_dump = subprocess32.Popen(
218 218 ['svnadmin', 'load', repo_path],
219 219 stdin=subprocess32.PIPE, stdout=subprocess32.PIPE,
220 220 stderr=subprocess32.PIPE)
221 221 out, err = load_dump.communicate(dump)
222 222 if load_dump.returncode != 0:
223 223 log.error("Output of load_dump command: %s", out)
224 224 log.error("Error output of load_dump command: %s", err)
225 225 raise Exception(
226 226 'Failed to load dump "%s" into repository at path "%s".'
227 227 % (dump_name, repo_path))
228 228
229 229
230 230 class AssertResponse(object):
231 231 """
232 232 Utility that helps to assert things about a given HTML response.
233 233 """
234 234
235 235 def __init__(self, response):
236 236 self.response = response
237 237
238 238 def get_imports(self):
239 239 return fromstring, tostring, CSSSelector
240 240
241 241 def one_element_exists(self, css_selector):
242 242 self.get_element(css_selector)
243 243
244 244 def no_element_exists(self, css_selector):
245 245 assert not self._get_elements(css_selector)
246 246
247 247 def element_equals_to(self, css_selector, expected_content):
248 248 element = self.get_element(css_selector)
249 249 element_text = self._element_to_string(element)
250 250 assert expected_content in element_text
251 251
252 252 def element_contains(self, css_selector, expected_content):
253 253 element = self.get_element(css_selector)
254 254 assert expected_content in element.text_content()
255 255
256 256 def element_value_contains(self, css_selector, expected_content):
257 257 element = self.get_element(css_selector)
258 258 assert expected_content in element.value
259 259
260 260 def contains_one_link(self, link_text, href):
261 261 fromstring, tostring, CSSSelector = self.get_imports()
262 262 doc = fromstring(self.response.body)
263 263 sel = CSSSelector('a[href]')
264 264 elements = [
265 265 e for e in sel(doc) if e.text_content().strip() == link_text]
266 266 assert len(elements) == 1, "Did not find link or found multiple links"
267 267 self._ensure_url_equal(elements[0].attrib.get('href'), href)
268 268
269 269 def contains_one_anchor(self, anchor_id):
270 270 fromstring, tostring, CSSSelector = self.get_imports()
271 271 doc = fromstring(self.response.body)
272 272 sel = CSSSelector('#' + anchor_id)
273 273 elements = sel(doc)
274 274 assert len(elements) == 1, 'cannot find 1 element {}'.format(anchor_id)
275 275
276 276 def _ensure_url_equal(self, found, expected):
277 277 assert _Url(found) == _Url(expected)
278 278
279 279 def get_element(self, css_selector):
280 280 elements = self._get_elements(css_selector)
281 281 assert len(elements) == 1, 'cannot find 1 element {}'.format(css_selector)
282 282 return elements[0]
283 283
284 284 def get_elements(self, css_selector):
285 285 return self._get_elements(css_selector)
286 286
287 287 def _get_elements(self, css_selector):
288 288 fromstring, tostring, CSSSelector = self.get_imports()
289 289 doc = fromstring(self.response.body)
290 290 sel = CSSSelector(css_selector)
291 291 elements = sel(doc)
292 292 return elements
293 293
294 294 def _element_to_string(self, element):
295 295 fromstring, tostring, CSSSelector = self.get_imports()
296 296 return tostring(element)
297 297
298 298
299 299 class _Url(object):
300 300 """
301 301 A url object that can be compared with other url orbjects
302 302 without regard to the vagaries of encoding, escaping, and ordering
303 303 of parameters in query strings.
304 304
305 305 Inspired by
306 306 http://stackoverflow.com/questions/5371992/comparing-two-urls-in-python
307 307 """
308 308
309 309 def __init__(self, url):
310 310 parts = urlparse(url)
311 311 _query = frozenset(parse_qsl(parts.query))
312 312 _path = unquote_plus(parts.path)
313 313 parts = parts._replace(query=_query, path=_path)
314 314 self.parts = parts
315 315
316 316 def __eq__(self, other):
317 317 return self.parts == other.parts
318 318
319 319 def __hash__(self):
320 320 return hash(self.parts)
321 321
322 322
323 323 def run_test_concurrently(times, raise_catched_exc=True):
324 324 """
325 325 Add this decorator to small pieces of code that you want to test
326 326 concurrently
327 327
328 328 ex:
329 329
330 330 @test_concurrently(25)
331 331 def my_test_function():
332 332 ...
333 333 """
334 334 def test_concurrently_decorator(test_func):
335 335 def wrapper(*args, **kwargs):
336 336 exceptions = []
337 337
338 338 def call_test_func():
339 339 try:
340 340 test_func(*args, **kwargs)
341 341 except Exception as e:
342 342 exceptions.append(e)
343 343 if raise_catched_exc:
344 344 raise
345 345 threads = []
346 346 for i in range(times):
347 347 threads.append(threading.Thread(target=call_test_func))
348 348 for t in threads:
349 349 t.start()
350 350 for t in threads:
351 351 t.join()
352 352 if exceptions:
353 353 raise Exception(
354 354 'test_concurrently intercepted %s exceptions: %s' % (
355 355 len(exceptions), exceptions))
356 356 return wrapper
357 357 return test_concurrently_decorator
358 358
359 359
360 360 def wait_for_url(url, timeout=10):
361 361 """
362 362 Wait until URL becomes reachable.
363 363
364 364 It polls the URL until the timeout is reached or it became reachable.
365 365 If will call to `py.test.fail` in case the URL is not reachable.
366 366 """
367 367 timeout = time.time() + timeout
368 368 last = 0
369 369 wait = 0.1
370 370
371 371 while timeout > last:
372 372 last = time.time()
373 373 if is_url_reachable(url):
374 374 break
375 375 elif (last + wait) > time.time():
376 376 # Go to sleep because not enough time has passed since last check.
377 377 time.sleep(wait)
378 378 else:
379 379 pytest.fail("Timeout while waiting for URL {}".format(url))
380 380
381 381
382 382 def is_url_reachable(url):
383 383 try:
384 384 urllib2.urlopen(url)
385 385 except urllib2.URLError:
386 386 return False
387 387 return True
388 388
389 389
390 390 def repo_on_filesystem(repo_name):
391 391 from rhodecode.lib import vcs
392 392 from rhodecode.tests import TESTS_TMP_PATH
393 393 repo = vcs.get_vcs_instance(
394 394 os.path.join(TESTS_TMP_PATH, repo_name), create=False)
395 395 return repo is not None
396 396
397 397
398 398 def commit_change(
399 399 repo, filename, content, message, vcs_type, parent=None, newfile=False):
400 400 from rhodecode.tests import TEST_USER_ADMIN_LOGIN
401 401
402 402 repo = Repository.get_by_repo_name(repo)
403 403 _commit = parent
404 404 if not parent:
405 405 _commit = EmptyCommit(alias=vcs_type)
406 406
407 407 if newfile:
408 408 nodes = {
409 409 filename: {
410 410 'content': content
411 411 }
412 412 }
413 413 commit = ScmModel().create_nodes(
414 414 user=TEST_USER_ADMIN_LOGIN, repo=repo,
415 415 message=message,
416 416 nodes=nodes,
417 417 parent_commit=_commit,
418 418 author=TEST_USER_ADMIN_LOGIN,
419 419 )
420 420 else:
421 421 commit = ScmModel().commit_change(
422 422 repo=repo.scm_instance(), repo_name=repo.repo_name,
423 423 commit=parent, user=TEST_USER_ADMIN_LOGIN,
424 424 author=TEST_USER_ADMIN_LOGIN,
425 425 message=message,
426 426 content=content,
427 427 f_path=filename
428 428 )
429 429 return commit
430 430
431 431
432 432 def permission_update_data_generator(csrf_token, default=None, grant=None, revoke=None):
433 433 if not default:
434 434 raise ValueError('Permission for default user must be given')
435 435 form_data = [(
436 436 'csrf_token', csrf_token
437 437 )]
438 438 # add default
439 439 form_data.extend([
440 440 ('u_perm_1', default)
441 441 ])
442 442
443 443 if grant:
444 444 for cnt, (obj_id, perm, obj_name, obj_type) in enumerate(grant, 1):
445 445 form_data.extend([
446 446 ('perm_new_member_perm_new{}'.format(cnt), perm),
447 447 ('perm_new_member_id_new{}'.format(cnt), obj_id),
448 448 ('perm_new_member_name_new{}'.format(cnt), obj_name),
449 449 ('perm_new_member_type_new{}'.format(cnt), obj_type),
450 450
451 451 ])
452 452 if revoke:
453 453 for obj_id, obj_type in revoke:
454 454 form_data.extend([
455 455 ('perm_del_member_id_{}'.format(obj_id), obj_id),
456 456 ('perm_del_member_type_{}'.format(obj_id), obj_type),
457 457 ])
458 458 return form_data
General Comments 0
You need to be logged in to leave comments. Login now