##// END OF EJS Templates
tests: Adapt tests to recent changes.
Martin Bornhold -
r592:36736ad1 default
parent child Browse files
Show More
1 NO CONTENT: new file 100644
@@ -1,115 +1,53 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21
22 22 import mock
23 23 import pytest
24 24
25 25 from rhodecode.config import environment
26 26
27 27
28 class TestUseDirectHookCalls(object):
29 @pytest.mark.parametrize('config', [
30 {
31 'vcs.hooks.direct_calls': 'true',
32 'base_path': 'fake_base_path'
33 }
34 ])
35 def test_returns_true_when_conditions_are_met(self, config):
36 result = environment._use_direct_hook_calls(config)
37 assert result is True
38
39 @pytest.mark.parametrize('config', [
40 {
41 'vcs.hooks.direct_calls': 'false',
42 'base_path': 'fake_base_path'
43 },
44 {
45 'base_path': 'fake_base_path'
46 }
47 ])
48 def test_returns_false_when_conditions_are_not_met(self, config):
49 result = environment._use_direct_hook_calls(config)
50 assert result is False
51
52
53 class TestGetVcsHooksProtocol(object):
54 def test_returns_pyro4_by_default(self):
55 config = {}
56 result = environment._get_vcs_hooks_protocol(config)
57 assert result == 'pyro4'
58
59 @pytest.mark.parametrize('protocol', ['PYRO4', 'HTTP', 'Pyro4', 'Http'])
60 def test_returns_lower_case_value(self, protocol):
61 config = {
62 'vcs.hooks.protocol': protocol
63 }
64 result = environment._get_vcs_hooks_protocol(config)
65 assert result == protocol.lower()
66
67
68 class TestLoadEnvironment(object):
69 def test_calls_use_direct_hook_calls(self, _external_calls_patcher):
70 global_conf = {
71 'here': '',
72 'vcs.connection_timeout': '0',
73 'vcs.server.enable': 'false'
74 }
75 app_conf = {
76 'cache_dir': '/tmp/',
77 '__file__': '/tmp/abcde.ini'
78 }
79 direct_calls_patcher = mock.patch.object(
80 environment, '_use_direct_hook_calls', return_value=True)
81 protocol_patcher = mock.patch.object(
82 environment, '_get_vcs_hooks_protocol', return_value='http')
83 with direct_calls_patcher as direct_calls_mock, \
84 protocol_patcher as protocol_mock:
85 environment.load_environment(global_conf, app_conf)
86 direct_calls_mock.call_count == 1
87 protocol_mock.call_count == 1
88
89
90 28 @pytest.fixture
91 29 def _external_calls_patcher(request):
92 30 # TODO: mikhail: This is a temporary solution. Ideally load_environment
93 31 # should be split into multiple small testable functions.
94 32 utils_patcher = mock.patch.object(environment, 'utils')
95 33
96 34 rhodecode_patcher = mock.patch.object(environment, 'rhodecode')
97 35
98 36 db_config = mock.Mock()
99 37 db_config.items.return_value = {
100 38 'paths': [['/tmp/abc', '/tmp/def']]
101 39 }
102 40 db_config_patcher = mock.patch.object(
103 41 environment, 'make_db_config', return_value=db_config)
104 42
105 43 set_config_patcher = mock.patch.object(environment, 'set_rhodecode_config')
106 44
107 45 utils_patcher.start()
108 46 rhodecode_patcher.start()
109 47 db_config_patcher.start()
110 48 set_config_patcher.start()
111 49
112 50 request.addfinalizer(utils_patcher.stop)
113 51 request.addfinalizer(rhodecode_patcher.stop)
114 52 request.addfinalizer(db_config_patcher.stop)
115 53 request.addfinalizer(set_config_patcher.stop)
@@ -1,107 +1,110 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import pytest
22 22 import urlparse
23 23
24 24 from rhodecode.tests.lib.middleware import mock_scm_app
25 25 import rhodecode.lib.middleware.simplegit as simplegit
26 26
27 27
28 28 def get_environ(url):
29 29 """Construct a minimum WSGI environ based on the URL."""
30 30 parsed_url = urlparse.urlparse(url)
31 31 environ = {
32 32 'PATH_INFO': parsed_url.path,
33 33 'QUERY_STRING': parsed_url.query,
34 34 }
35 35
36 36 return environ
37 37
38 38
39 39 @pytest.mark.parametrize(
40 40 'url, expected_action',
41 41 [
42 42 ('/foo/bar/info/refs?service=git-upload-pack', 'pull'),
43 43 ('/foo/bar/info/refs?service=git-receive-pack', 'push'),
44 44 ('/foo/bar/git-upload-pack', 'pull'),
45 45 ('/foo/bar/git-receive-pack', 'push'),
46 46 # Edge case: missing data for info/refs
47 47 ('/foo/info/refs?service=', 'pull'),
48 48 ('/foo/info/refs', 'pull'),
49 49 # Edge case: git command comes with service argument
50 50 ('/foo/git-upload-pack?service=git-receive-pack', 'pull'),
51 51 ('/foo/git-receive-pack?service=git-upload-pack', 'push'),
52 52 # Edge case: repo name conflicts with git commands
53 53 ('/git-receive-pack/git-upload-pack', 'pull'),
54 54 ('/git-receive-pack/git-receive-pack', 'push'),
55 55 ('/git-upload-pack/git-upload-pack', 'pull'),
56 56 ('/git-upload-pack/git-receive-pack', 'push'),
57 57 ('/foo/git-receive-pack', 'push'),
58 58 # Edge case: not a smart protocol url
59 59 ('/foo/bar', 'pull'),
60 60 ])
61 def test_get_action(url, expected_action):
61 def test_get_action(url, expected_action, pylonsapp):
62 62 app = simplegit.SimpleGit(application=None,
63 config={'auth_ret_code': '', 'base_path': ''})
63 config={'auth_ret_code': '', 'base_path': ''},
64 registry=None)
64 65 assert expected_action == app._get_action(get_environ(url))
65 66
66 67
67 68 @pytest.mark.parametrize(
68 69 'url, expected_repo_name',
69 70 [
70 71 ('/foo/info/refs?service=git-upload-pack', 'foo'),
71 72 ('/foo/bar/info/refs?service=git-receive-pack', 'foo/bar'),
72 73 ('/foo/git-upload-pack', 'foo'),
73 74 ('/foo/git-receive-pack', 'foo'),
74 75 ('/foo/bar/git-upload-pack', 'foo/bar'),
75 76 ('/foo/bar/git-receive-pack', 'foo/bar'),
76 77 ])
77 def test_get_repository_name(url, expected_repo_name):
78 def test_get_repository_name(url, expected_repo_name, pylonsapp):
78 79 app = simplegit.SimpleGit(application=None,
79 config={'auth_ret_code': '', 'base_path': ''})
80 config={'auth_ret_code': '', 'base_path': ''},
81 registry=None)
80 82 assert expected_repo_name == app._get_repository_name(get_environ(url))
81 83
82 84
83 def test_get_config():
85 def test_get_config(pylonsapp):
84 86 app = simplegit.SimpleGit(application=None,
85 config={'auth_ret_code': '', 'base_path': ''})
87 config={'auth_ret_code': '', 'base_path': ''},
88 registry=None)
86 89 extras = {'foo': 'FOO', 'bar': 'BAR'}
87 90
88 91 # We copy the extras as the method below will change the contents.
89 92 config = app._create_config(dict(extras), repo_name='test-repo')
90 93 expected_config = dict(extras)
91 94 expected_config.update({
92 95 'git_update_server_info': False,
93 96 })
94 97
95 98 assert config == expected_config
96 99
97 100
98 def test_create_wsgi_app_uses_scm_app_from_simplevcs():
101 def test_create_wsgi_app_uses_scm_app_from_simplevcs(pylonsapp):
99 102 config = {
100 103 'auth_ret_code': '',
101 104 'base_path': '',
102 105 'vcs.scm_app_implementation':
103 106 'rhodecode.tests.lib.middleware.mock_scm_app',
104 107 }
105 app = simplegit.SimpleGit(application=None, config=config)
108 app = simplegit.SimpleGit(application=None, config=config, registry=None)
106 109 wsgi_app = app._create_wsgi_app('/tmp/test', 'test_repo', {})
107 110 assert wsgi_app is mock_scm_app.mock_git_wsgi
@@ -1,113 +1,116 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25 import simplejson as json
26 26
27 27 from rhodecode.lib.vcs.backends.base import Config
28 28 from rhodecode.tests.lib.middleware import mock_scm_app
29 29 import rhodecode.lib.middleware.simplehg as simplehg
30 30
31 31
32 32 def get_environ(url):
33 33 """Construct a minimum WSGI environ based on the URL."""
34 34 parsed_url = urlparse.urlparse(url)
35 35 environ = {
36 36 'PATH_INFO': parsed_url.path,
37 37 'QUERY_STRING': parsed_url.query,
38 38 }
39 39
40 40 return environ
41 41
42 42
43 43 @pytest.mark.parametrize(
44 44 'url, expected_action',
45 45 [
46 46 ('/foo/bar?cmd=unbundle&key=tip', 'push'),
47 47 ('/foo/bar?cmd=pushkey&key=tip', 'push'),
48 48 ('/foo/bar?cmd=listkeys&key=tip', 'pull'),
49 49 ('/foo/bar?cmd=changegroup&key=tip', 'pull'),
50 50 # Edge case: unknown argument: assume pull
51 51 ('/foo/bar?cmd=unknown&key=tip', 'pull'),
52 52 ('/foo/bar?cmd=&key=tip', 'pull'),
53 53 # Edge case: not cmd argument
54 54 ('/foo/bar?key=tip', 'pull'),
55 55 ])
56 56 def test_get_action(url, expected_action):
57 57 app = simplehg.SimpleHg(application=None,
58 config={'auth_ret_code': '', 'base_path': ''})
58 config={'auth_ret_code': '', 'base_path': ''},
59 registry=None)
59 60 assert expected_action == app._get_action(get_environ(url))
60 61
61 62
62 63 @pytest.mark.parametrize(
63 64 'url, expected_repo_name',
64 65 [
65 66 ('/foo?cmd=unbundle&key=tip', 'foo'),
66 67 ('/foo/bar?cmd=pushkey&key=tip', 'foo/bar'),
67 68 ('/foo/bar/baz?cmd=listkeys&key=tip', 'foo/bar/baz'),
68 69 # Repos with trailing slashes.
69 70 ('/foo/?cmd=unbundle&key=tip', 'foo'),
70 71 ('/foo/bar/?cmd=pushkey&key=tip', 'foo/bar'),
71 72 ('/foo/bar/baz/?cmd=listkeys&key=tip', 'foo/bar/baz'),
72 73 ])
73 74 def test_get_repository_name(url, expected_repo_name):
74 75 app = simplehg.SimpleHg(application=None,
75 config={'auth_ret_code': '', 'base_path': ''})
76 config={'auth_ret_code': '', 'base_path': ''},
77 registry=None)
76 78 assert expected_repo_name == app._get_repository_name(get_environ(url))
77 79
78 80
79 81 def test_get_config():
80 82 app = simplehg.SimpleHg(application=None,
81 config={'auth_ret_code': '', 'base_path': ''})
83 config={'auth_ret_code': '', 'base_path': ''},
84 registry=None)
82 85 extras = {'foo': 'FOO', 'bar': 'BAR'}
83 86
84 87 mock_config = Config()
85 88 mock_config.set('a1', 'b1', 'c1')
86 89 mock_config.set('a2', 'b2', 'c2')
87 90 # We mock the call to make_db_config, otherwise we need to wait for the
88 91 # pylonsaspp
89 92 with mock.patch('rhodecode.lib.utils.make_db_config',
90 93 return_value=mock_config) as make_db_config_mock:
91 94 hg_config = app._create_config(extras, repo_name='test-repo')
92 95
93 96 make_db_config_mock.assert_called_once_with(repo='test-repo')
94 97 assert isinstance(hg_config, list)
95 98
96 99 # Remove the entries from the mock_config so to get only the extras
97 100 hg_config.remove(('a1', 'b1', 'c1'))
98 101 hg_config.remove(('a2', 'b2', 'c2'))
99 102
100 103 assert hg_config[0][:2] == ('rhodecode', 'RC_SCM_DATA')
101 104 assert json.loads(hg_config[0][-1]) == extras
102 105
103 106
104 107 def test_create_wsgi_app_uses_scm_app_from_simplevcs():
105 108 config = {
106 109 'auth_ret_code': '',
107 110 'base_path': '',
108 111 'vcs.scm_app_implementation':
109 112 'rhodecode.tests.lib.middleware.mock_scm_app',
110 113 }
111 app = simplehg.SimpleHg(application=None, config=config)
114 app = simplehg.SimpleHg(application=None, config=config, registry=None)
112 115 wsgi_app = app._create_wsgi_app('/tmp/test', 'test_repo', {})
113 116 assert wsgi_app is mock_scm_app.mock_hg_wsgi
@@ -1,185 +1,186 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 from StringIO import StringIO
22 22
23 23 import pytest
24 24 from mock import patch, Mock
25 25
26 26 import rhodecode
27 27 from rhodecode.lib.middleware.simplesvn import SimpleSvn, SimpleSvnApp
28 28
29 29
30 30 class TestSimpleSvn(object):
31 31 @pytest.fixture(autouse=True)
32 32 def simple_svn(self, pylonsapp):
33 33 self.app = SimpleSvn(
34 34 application='None',
35 35 config={'auth_ret_code': '',
36 'base_path': rhodecode.CONFIG['base_path']})
36 'base_path': rhodecode.CONFIG['base_path']},
37 registry=None)
37 38
38 39 def test_get_config(self):
39 40 extras = {'foo': 'FOO', 'bar': 'BAR'}
40 41 config = self.app._create_config(extras, repo_name='test-repo')
41 42 assert config == extras
42 43
43 44 @pytest.mark.parametrize(
44 45 'method', ['OPTIONS', 'PROPFIND', 'GET', 'REPORT'])
45 46 def test_get_action_returns_pull(self, method):
46 47 environment = {'REQUEST_METHOD': method}
47 48 action = self.app._get_action(environment)
48 49 assert action == 'pull'
49 50
50 51 @pytest.mark.parametrize(
51 52 'method', [
52 53 'MKACTIVITY', 'PROPPATCH', 'PUT', 'CHECKOUT', 'MKCOL', 'MOVE',
53 54 'COPY', 'DELETE', 'LOCK', 'UNLOCK', 'MERGE'
54 55 ])
55 56 def test_get_action_returns_push(self, method):
56 57 environment = {'REQUEST_METHOD': method}
57 58 action = self.app._get_action(environment)
58 59 assert action == 'push'
59 60
60 61 @pytest.mark.parametrize(
61 62 'path, expected_name', [
62 63 ('/hello-svn', 'hello-svn'),
63 64 ('/hello-svn/', 'hello-svn'),
64 65 ('/group/hello-svn/', 'group/hello-svn'),
65 66 ('/group/hello-svn/!svn/vcc/default', 'group/hello-svn'),
66 67 ])
67 68 def test_get_repository_name(self, path, expected_name):
68 69 environment = {'PATH_INFO': path}
69 70 name = self.app._get_repository_name(environment)
70 71 assert name == expected_name
71 72
72 73 def test_get_repository_name_subfolder(self, backend_svn):
73 74 repo = backend_svn.repo
74 75 environment = {
75 76 'PATH_INFO': '/{}/path/with/subfolders'.format(repo.repo_name)}
76 77 name = self.app._get_repository_name(environment)
77 78 assert name == repo.repo_name
78 79
79 80 def test_create_wsgi_app(self):
80 81 with patch('rhodecode.lib.middleware.simplesvn.SimpleSvnApp') as (
81 82 wsgi_app_mock):
82 83 config = Mock()
83 84 wsgi_app = self.app._create_wsgi_app(
84 85 repo_path='', repo_name='', config=config)
85 86
86 87 wsgi_app_mock.assert_called_once_with(config)
87 88 assert wsgi_app == wsgi_app_mock()
88 89
89 90
90 91 class TestSimpleSvnApp(object):
91 92 data = '<xml></xml>'
92 93 path = '/group/my-repo'
93 94 wsgi_input = StringIO(data)
94 95 environment = {
95 96 'HTTP_DAV': (
96 97 'http://subversion.tigris.org/xmlns/dav/svn/depth,'
97 98 ' http://subversion.tigris.org/xmlns/dav/svn/mergeinfo'),
98 99 'HTTP_USER_AGENT': 'SVN/1.8.11 (x86_64-linux) serf/1.3.8',
99 100 'REQUEST_METHOD': 'OPTIONS',
100 101 'PATH_INFO': path,
101 102 'wsgi.input': wsgi_input,
102 103 'CONTENT_TYPE': 'text/xml',
103 104 'CONTENT_LENGTH': '130'
104 105 }
105 106
106 107 def setup_method(self, method):
107 108 self.host = 'http://localhost/'
108 109 self.app = SimpleSvnApp(
109 110 config={'subversion_http_server_url': self.host})
110 111
111 112 def test_get_request_headers_with_content_type(self):
112 113 expected_headers = {
113 114 'Dav': self.environment['HTTP_DAV'],
114 115 'User-Agent': self.environment['HTTP_USER_AGENT'],
115 116 'Content-Type': self.environment['CONTENT_TYPE'],
116 117 'Content-Length': self.environment['CONTENT_LENGTH']
117 118 }
118 119 headers = self.app._get_request_headers(self.environment)
119 120 assert headers == expected_headers
120 121
121 122 def test_get_request_headers_without_content_type(self):
122 123 environment = self.environment.copy()
123 124 environment.pop('CONTENT_TYPE')
124 125 expected_headers = {
125 126 'Dav': environment['HTTP_DAV'],
126 127 'Content-Length': self.environment['CONTENT_LENGTH'],
127 128 'User-Agent': environment['HTTP_USER_AGENT'],
128 129 }
129 130 request_headers = self.app._get_request_headers(environment)
130 131 assert request_headers == expected_headers
131 132
132 133 def test_get_response_headers(self):
133 134 headers = {
134 135 'Connection': 'keep-alive',
135 136 'Keep-Alive': 'timeout=5, max=100',
136 137 'Transfer-Encoding': 'chunked',
137 138 'Content-Encoding': 'gzip',
138 139 'MS-Author-Via': 'DAV',
139 140 'SVN-Supported-Posts': 'create-txn-with-props'
140 141 }
141 142 expected_headers = [
142 143 ('MS-Author-Via', 'DAV'),
143 144 ('SVN-Supported-Posts', 'create-txn-with-props')
144 145 ]
145 146 response_headers = self.app._get_response_headers(headers)
146 147 assert sorted(response_headers) == sorted(expected_headers)
147 148
148 149 def test_get_url(self):
149 150 url = self.app._get_url(self.path)
150 151 expected_url = '{}{}'.format(self.host.strip('/'), self.path)
151 152 assert url == expected_url
152 153
153 154 def test_call(self):
154 155 start_response = Mock()
155 156 response_mock = Mock()
156 157 response_mock.headers = {
157 158 'Content-Encoding': 'gzip',
158 159 'MS-Author-Via': 'DAV',
159 160 'SVN-Supported-Posts': 'create-txn-with-props'
160 161 }
161 162 response_mock.status_code = 200
162 163 response_mock.reason = 'OK'
163 164 with patch('rhodecode.lib.middleware.simplesvn.requests.request') as (
164 165 request_mock):
165 166 request_mock.return_value = response_mock
166 167 self.app(self.environment, start_response)
167 168
168 169 expected_url = '{}{}'.format(self.host.strip('/'), self.path)
169 170 expected_request_headers = {
170 171 'Dav': self.environment['HTTP_DAV'],
171 172 'User-Agent': self.environment['HTTP_USER_AGENT'],
172 173 'Content-Type': self.environment['CONTENT_TYPE'],
173 174 'Content-Length': self.environment['CONTENT_LENGTH']
174 175 }
175 176 expected_response_headers = [
176 177 ('SVN-Supported-Posts', 'create-txn-with-props'),
177 178 ('MS-Author-Via', 'DAV')
178 179 ]
179 180 request_mock.assert_called_once_with(
180 181 self.environment['REQUEST_METHOD'], expected_url,
181 182 data=self.data, headers=expected_request_headers)
182 183 response_mock.iter_content.assert_called_once_with(chunk_size=1024)
183 184 args, _ = start_response.call_args
184 185 assert args[0] == '200 OK'
185 186 assert sorted(args[1]) == sorted(expected_response_headers)
@@ -1,304 +1,305 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import base64
22 22
23 23 import mock
24 24 import pytest
25 25 import webtest.app
26 26
27 27 from rhodecode.lib.caching_query import FromCache
28 from rhodecode.lib.hooks_daemon import (
29 Pyro4HooksCallbackDaemon, DummyHooksCallbackDaemon,
30 HttpHooksCallbackDaemon)
28 from rhodecode.lib.hooks_daemon import DummyHooksCallbackDaemon
31 29 from rhodecode.lib.middleware import simplevcs
32 30 from rhodecode.lib.middleware.https_fixup import HttpsFixup
33 31 from rhodecode.lib.middleware.utils import scm_app
34 32 from rhodecode.model.db import User, _hash_key
35 33 from rhodecode.model.meta import Session
36 34 from rhodecode.tests import (
37 35 HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
38 36 from rhodecode.tests.lib.middleware import mock_scm_app
39 37 from rhodecode.tests.utils import set_anonymous_access
40 38
41 39
42 40 class StubVCSController(simplevcs.SimpleVCS):
43 41
44 42 SCM = 'hg'
45 43 stub_response_body = tuple()
46 44
47 45 def _get_repository_name(self, environ):
48 46 return HG_REPO
49 47
50 48 def _get_action(self, environ):
51 49 return "pull"
52 50
53 51 def _create_wsgi_app(self, repo_path, repo_name, config):
54 52 def fake_app(environ, start_response):
55 53 start_response('200 OK', [])
56 54 return self.stub_response_body
57 55 return fake_app
58 56
59 57 def _create_config(self, extras, repo_name):
60 58 return None
61 59
62 60
63 61 @pytest.fixture
64 62 def vcscontroller(pylonsapp, config_stub):
65 63 config_stub.testing_securitypolicy()
66 64 config_stub.include('rhodecode.authentication')
67 65
68 66 set_anonymous_access(True)
69 controller = StubVCSController(pylonsapp, pylonsapp.config)
67 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
70 68 app = HttpsFixup(controller, pylonsapp.config)
71 69 app = webtest.app.TestApp(app)
72 70
73 71 _remove_default_user_from_query_cache()
74 72
75 73 # Sanity checks that things are set up correctly
76 74 app.get('/' + HG_REPO, status=200)
77 75
78 76 app.controller = controller
79 77 return app
80 78
81 79
82 80 def _remove_default_user_from_query_cache():
83 81 user = User.get_default_user(cache=True)
84 82 query = Session().query(User).filter(User.username == user.username)
85 83 query = query.options(FromCache(
86 84 "sql_cache_short", "get_user_%s" % _hash_key(user.username)))
87 85 query.invalidate()
88 86 Session().expire(user)
89 87
90 88
91 89 @pytest.fixture
92 90 def disable_anonymous_user(request, pylonsapp):
93 91 set_anonymous_access(False)
94 92
95 93 @request.addfinalizer
96 94 def cleanup():
97 95 set_anonymous_access(True)
98 96
99 97
100 98 def test_handles_exceptions_during_permissions_checks(
101 99 vcscontroller, disable_anonymous_user):
102 100 user_and_pass = '%s:%s' % (TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS)
103 101 auth_password = base64.encodestring(user_and_pass).strip()
104 102 extra_environ = {
105 103 'AUTH_TYPE': 'Basic',
106 104 'HTTP_AUTHORIZATION': 'Basic %s' % auth_password,
107 105 'REMOTE_USER': TEST_USER_ADMIN_LOGIN,
108 106 }
109 107
110 108 # Verify that things are hooked up correctly
111 109 vcscontroller.get('/', status=200, extra_environ=extra_environ)
112 110
113 111 # Simulate trouble during permission checks
114 112 with mock.patch('rhodecode.model.db.User.get_by_username',
115 113 side_effect=Exception) as get_user:
116 114 # Verify that a correct 500 is returned and check that the expected
117 115 # code path was hit.
118 116 vcscontroller.get('/', status=500, extra_environ=extra_environ)
119 117 assert get_user.called
120 118
121 119
122 120 def test_returns_forbidden_if_no_anonymous_access(
123 121 vcscontroller, disable_anonymous_user):
124 122 vcscontroller.get('/', status=401)
125 123
126 124
127 125 class StubFailVCSController(simplevcs.SimpleVCS):
128 126 def _handle_request(self, environ, start_response):
129 127 raise Exception("BOOM")
130 128
131 129
132 130 @pytest.fixture(scope='module')
133 131 def fail_controller(pylonsapp):
134 controller = StubFailVCSController(pylonsapp, pylonsapp.config)
132 controller = StubFailVCSController(pylonsapp, pylonsapp.config, None)
135 133 controller = HttpsFixup(controller, pylonsapp.config)
136 134 controller = webtest.app.TestApp(controller)
137 135 return controller
138 136
139 137
140 138 def test_handles_exceptions_as_internal_server_error(fail_controller):
141 139 fail_controller.get('/', status=500)
142 140
143 141
144 142 def test_provides_traceback_for_appenlight(fail_controller):
145 143 response = fail_controller.get(
146 144 '/', status=500, extra_environ={'appenlight.client': 'fake'})
147 145 assert 'appenlight.__traceback' in response.request.environ
148 146
149 147
150 148 def test_provides_utils_scm_app_as_scm_app_by_default(pylonsapp):
151 controller = StubVCSController(pylonsapp, pylonsapp.config)
149 controller = StubVCSController(pylonsapp, pylonsapp.config, None)
152 150 assert controller.scm_app is scm_app
153 151
154 152
155 153 def test_allows_to_override_scm_app_via_config(pylonsapp):
156 154 config = pylonsapp.config.copy()
157 155 config['vcs.scm_app_implementation'] = (
158 156 'rhodecode.tests.lib.middleware.mock_scm_app')
159 controller = StubVCSController(pylonsapp, config)
157 controller = StubVCSController(pylonsapp, config, None)
160 158 assert controller.scm_app is mock_scm_app
161 159
162 160
163 161 @pytest.mark.parametrize('query_string, expected', [
164 162 ('cmd=stub_command', True),
165 163 ('cmd=listkeys', False),
166 164 ])
167 165 def test_should_check_locking(query_string, expected):
168 166 result = simplevcs._should_check_locking(query_string)
169 167 assert result == expected
170 168
171 169
172 170 @mock.patch.multiple(
173 171 'Pyro4.config', SERVERTYPE='multiplex', POLLTIMEOUT=0.01)
174 172 class TestGenerateVcsResponse:
175 173
176 174 def test_ensures_that_start_response_is_called_early_enough(self):
177 175 self.call_controller_with_response_body(iter(['a', 'b']))
178 176 assert self.start_response.called
179 177
180 178 def test_invalidates_cache_after_body_is_consumed(self):
181 179 result = self.call_controller_with_response_body(iter(['a', 'b']))
182 180 assert not self.was_cache_invalidated()
183 181 # Consume the result
184 182 list(result)
185 183 assert self.was_cache_invalidated()
186 184
187 185 @mock.patch('rhodecode.lib.middleware.simplevcs.HTTPLockedRC')
188 186 def test_handles_locking_exception(self, http_locked_rc):
189 187 result = self.call_controller_with_response_body(
190 188 self.raise_result_iter(vcs_kind='repo_locked'))
191 189 assert not http_locked_rc.called
192 190 # Consume the result
193 191 list(result)
194 192 assert http_locked_rc.called
195 193
196 194 @mock.patch('rhodecode.lib.middleware.simplevcs.HTTPRequirementError')
197 195 def test_handles_requirement_exception(self, http_requirement):
198 196 result = self.call_controller_with_response_body(
199 197 self.raise_result_iter(vcs_kind='requirement'))
200 198 assert not http_requirement.called
201 199 # Consume the result
202 200 list(result)
203 201 assert http_requirement.called
204 202
205 203 @mock.patch('rhodecode.lib.middleware.simplevcs.HTTPLockedRC')
206 204 def test_handles_locking_exception_in_app_call(self, http_locked_rc):
207 205 app_factory_patcher = mock.patch.object(
208 206 StubVCSController, '_create_wsgi_app')
209 207 with app_factory_patcher as app_factory:
210 208 app_factory().side_effect = self.vcs_exception()
211 209 result = self.call_controller_with_response_body(['a'])
212 210 list(result)
213 211 assert http_locked_rc.called
214 212
215 213 def test_raises_unknown_exceptions(self):
216 214 result = self.call_controller_with_response_body(
217 215 self.raise_result_iter(vcs_kind='unknown'))
218 216 with pytest.raises(Exception):
219 217 list(result)
220 218
221 219 def test_prepare_callback_daemon_is_called(self):
222 220 def side_effect(extras):
223 221 return DummyHooksCallbackDaemon(), extras
224 222
225 223 prepare_patcher = mock.patch.object(
226 224 StubVCSController, '_prepare_callback_daemon')
227 225 with prepare_patcher as prepare_mock:
228 226 prepare_mock.side_effect = side_effect
229 227 self.call_controller_with_response_body(iter(['a', 'b']))
230 228 assert prepare_mock.called
231 229 assert prepare_mock.call_count == 1
232 230
233 231 def call_controller_with_response_body(self, response_body):
234 controller = StubVCSController(None, {'base_path': 'fake_base_path'})
232 settings = {
233 'base_path': 'fake_base_path',
234 'vcs.hooks.protocol': 'http',
235 'vcs.hooks.direct_calls': False,
236 }
237 controller = StubVCSController(None, settings, None)
235 238 controller._invalidate_cache = mock.Mock()
236 239 controller.stub_response_body = response_body
237 240 self.start_response = mock.Mock()
238 241 result = controller._generate_vcs_response(
239 242 environ={}, start_response=self.start_response,
240 243 repo_path='fake_repo_path',
241 244 repo_name='fake_repo_name',
242 245 extras={}, action='push')
243 246 self.controller = controller
244 247 return result
245 248
246 249 def raise_result_iter(self, vcs_kind='repo_locked'):
247 250 """
248 251 Simulates an exception due to a vcs raised exception if kind vcs_kind
249 252 """
250 253 raise self.vcs_exception(vcs_kind=vcs_kind)
251 254 yield "never_reached"
252 255
253 256 def vcs_exception(self, vcs_kind='repo_locked'):
254 257 locked_exception = Exception('TEST_MESSAGE')
255 258 locked_exception._vcs_kind = vcs_kind
256 259 return locked_exception
257 260
258 261 def was_cache_invalidated(self):
259 262 return self.controller._invalidate_cache.called
260 263
261 264
262 265 class TestInitializeGenerator:
263 266
264 267 def test_drains_first_element(self):
265 268 gen = self.factory(['__init__', 1, 2])
266 269 result = list(gen)
267 270 assert result == [1, 2]
268 271
269 272 @pytest.mark.parametrize('values', [
270 273 [],
271 274 [1, 2],
272 275 ])
273 276 def test_raises_value_error(self, values):
274 277 with pytest.raises(ValueError):
275 278 self.factory(values)
276 279
277 280 @simplevcs.initialize_generator
278 281 def factory(self, iterable):
279 282 for elem in iterable:
280 283 yield elem
281 284
282 285
283 286 class TestPrepareHooksDaemon(object):
284 def test_calls_imported_prepare_callback_daemon(self):
285 config = {
286 'base_path': 'fake_base_path',
287 'vcs.hooks.direct_calls': False,
288 'vcs.hooks.protocol': 'http'
289 }
287 def test_calls_imported_prepare_callback_daemon(self, pylonsapp):
288 settings = pylonsapp.application.config
290 289 expected_extras = {'extra1': 'value1'}
291 290 daemon = DummyHooksCallbackDaemon()
292 291
293 controller = StubVCSController(None, config)
292 controller = StubVCSController(None, settings, None)
294 293 prepare_patcher = mock.patch.object(
295 294 simplevcs, 'prepare_callback_daemon',
296 295 return_value=(daemon, expected_extras))
297 296 with prepare_patcher as prepare_mock:
298 297 callback_daemon, extras = controller._prepare_callback_daemon(
299 298 expected_extras.copy())
300 299 prepare_mock.assert_called_once_with(
301 expected_extras, protocol='http', use_direct_calls=False)
300 expected_extras,
301 protocol=settings['vcs.hooks.protocol'],
302 use_direct_calls=settings['vcs.hooks.direct_calls'])
302 303
303 304 assert callback_daemon == daemon
304 305 assert extras == extras
@@ -1,134 +1,136 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 from mock import patch, Mock
22 22
23 23 import rhodecode
24 24 from rhodecode.lib.middleware import vcs
25 25
26 26
27 27 def test_is_hg():
28 28 environ = {
29 29 'PATH_INFO': '/rhodecode-dev',
30 30 'QUERY_STRING': 'cmd=changegroup',
31 31 'HTTP_ACCEPT': 'application/mercurial'
32 32 }
33 33 assert vcs.is_hg(environ)
34 34
35 35
36 36 def test_is_hg_no_cmd():
37 37 environ = {
38 38 'PATH_INFO': '/rhodecode-dev',
39 39 'QUERY_STRING': '',
40 40 'HTTP_ACCEPT': 'application/mercurial'
41 41 }
42 42 assert not vcs.is_hg(environ)
43 43
44 44
45 45 def test_is_hg_empty_cmd():
46 46 environ = {
47 47 'PATH_INFO': '/rhodecode-dev',
48 48 'QUERY_STRING': 'cmd=',
49 49 'HTTP_ACCEPT': 'application/mercurial'
50 50 }
51 51 assert not vcs.is_hg(environ)
52 52
53 53
54 54 def test_is_svn_returns_true_if_subversion_is_in_a_dav_header():
55 55 environ = {
56 56 'PATH_INFO': '/rhodecode-dev',
57 57 'HTTP_DAV': 'http://subversion.tigris.org/xmlns/dav/svn/log-revprops'
58 58 }
59 59 assert vcs.is_svn(environ) is True
60 60
61 61
62 62 def test_is_svn_returns_false_if_subversion_is_not_in_a_dav_header():
63 63 environ = {
64 64 'PATH_INFO': '/rhodecode-dev',
65 65 'HTTP_DAV': 'http://stuff.tigris.org/xmlns/dav/svn/log-revprops'
66 66 }
67 67 assert vcs.is_svn(environ) is False
68 68
69 69
70 70 def test_is_svn_returns_false_if_no_dav_header():
71 71 environ = {
72 72 'PATH_INFO': '/rhodecode-dev',
73 73 }
74 74 assert vcs.is_svn(environ) is False
75 75
76 76
77 77 def test_is_svn_returns_true_if_magic_path_segment():
78 78 environ = {
79 79 'PATH_INFO': '/stub-repository/!svn/rev/4',
80 80 }
81 81 assert vcs.is_svn(environ)
82 82
83 83
84 84 def test_is_svn_allows_to_configure_the_magic_path(monkeypatch):
85 85 """
86 86 This is intended as a fallback in case someone has configured his
87 87 Subversion server with a different magic path segment.
88 88 """
89 89 monkeypatch.setitem(
90 90 rhodecode.CONFIG, 'rhodecode_subversion_magic_path', '/!my-magic')
91 91 environ = {
92 92 'PATH_INFO': '/stub-repository/!my-magic/rev/4',
93 93 }
94 94 assert vcs.is_svn(environ)
95 95
96 96
97 97 class TestVCSMiddleware(object):
98 98 def test_get_handler_app_retuns_svn_app_when_proxy_enabled(self):
99 99 environ = {
100 100 'PATH_INFO': 'rhodecode-dev',
101 101 'HTTP_DAV': 'http://subversion.tigris.org/xmlns/dav/svn/log'
102 102 }
103 103 app = Mock()
104 104 config = Mock()
105 registry = Mock()
105 106 middleware = vcs.VCSMiddleware(
106 app, config=config, appenlight_client=None)
107 app, config=config, appenlight_client=None, registry=registry)
107 108 snv_patch = patch('rhodecode.lib.middleware.vcs.SimpleSvn')
108 109 settings_patch = patch.dict(
109 110 rhodecode.CONFIG,
110 111 {'rhodecode_proxy_subversion_http_requests': True})
111 112 with snv_patch as svn_mock, settings_patch:
112 113 svn_mock.return_value = None
113 114 middleware._get_handler_app(environ)
114 115
115 svn_mock.assert_called_once_with(app, config)
116 svn_mock.assert_called_once_with(app, config, registry)
116 117
117 118 def test_get_handler_app_retuns_no_svn_app_when_proxy_disabled(self):
118 119 environ = {
119 120 'PATH_INFO': 'rhodecode-dev',
120 121 'HTTP_DAV': 'http://subversion.tigris.org/xmlns/dav/svn/log'
121 122 }
122 123 app = Mock()
123 124 config = Mock()
125 registry = Mock()
124 126 middleware = vcs.VCSMiddleware(
125 app, config=config, appenlight_client=None)
127 app, config=config, appenlight_client=None, registry=registry)
126 128 snv_patch = patch('rhodecode.lib.middleware.vcs.SimpleSvn')
127 129 settings_patch = patch.dict(
128 130 rhodecode.CONFIG,
129 131 {'rhodecode_proxy_subversion_http_requests': False})
130 132 with snv_patch as svn_mock, settings_patch:
131 133 app = middleware._get_handler_app(environ)
132 134
133 135 assert svn_mock.call_count == 0
134 136 assert app is None
General Comments 0
You need to be logged in to leave comments. Login now