##// END OF EJS Templates
tests: fixed tests for random dict order could sometimes break the tests
marcink -
r2112:1550916c default
parent child Browse files
Show More
@@ -1,195 +1,196 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2016-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import json
22 22
23 23 import pytest
24 24 from mock import Mock, patch, call
25 25
26 26 from rhodecode.apps.ssh_support.lib.ssh_wrapper import GitServer
27 27
28 28
29 29 @pytest.fixture
30 30 def git_server():
31 31 return GitServerCreator()
32 32
33 33
34 34 class GitServerCreator(object):
35 35 root = '/tmp/repo/path/'
36 36 git_path = '/usr/local/bin/'
37 37 config_data = {
38 38 'app:main': {
39 39 'ssh.executable.git': git_path
40 40 }
41 41 }
42 42 repo_name = 'test_git'
43 43 repo_mode = 'receive-pack'
44 44 user = 'vcs'
45 45
46 46 def __init__(self):
47 47 def config_get(part, key):
48 48 return self.config_data.get(part, {}).get(key)
49 49 self.config_mock = Mock()
50 50 self.config_mock.get = Mock(side_effect=config_get)
51 51
52 52 def create(self, **kwargs):
53 53 parameters = {
54 54 'store': {'path': self.root},
55 55 'ini_path': '',
56 56 'user': self.user,
57 57 'repo_name': self.repo_name,
58 58 'repo_mode': self.repo_mode,
59 59 'user_permissions': {
60 60 self.repo_name: 'repo_admin'
61 61 },
62 62 'config': self.config_mock,
63 63 }
64 64 parameters.update(kwargs)
65 65 server = GitServer(**parameters)
66 66 return server
67 67
68 68
69 69 class TestGitServer(object):
70 70 def test_command(self, git_server):
71 71 server = git_server.create()
72 72 server.read_only = False
73 73 expected_command = (
74 74 'cd {root}; {git_path}-{repo_mode}'
75 75 ' \'{root}{repo_name}\''.format(
76 76 root=git_server.root, git_path=git_server.git_path,
77 77 repo_mode=git_server.repo_mode, repo_name=git_server.repo_name)
78 78 )
79 79 assert expected_command == server.command
80 80
81 81 def test_run_returns_exit_code_2_when_no_permissions(self, git_server, caplog):
82 82 server = git_server.create()
83 83 with patch.object(server, '_check_permissions') as permissions_mock:
84 84 with patch.object(server, '_update_environment'):
85 85 permissions_mock.return_value = 2
86 86 exit_code = server.run()
87 87
88 88 assert exit_code == (2, False)
89 89
90 90 def test_run_returns_executes_command(self, git_server, caplog):
91 91 server = git_server.create()
92 92 with patch.object(server, '_check_permissions') as permissions_mock:
93 93 with patch('os.system') as system_mock:
94 94 with patch.object(server, '_update_environment') as (
95 95 update_mock):
96 96 permissions_mock.return_value = 0
97 97 system_mock.return_value = 0
98 98 exit_code = server.run()
99 99
100 100 system_mock.assert_called_once_with(server.command)
101 101 update_mock.assert_called_once_with()
102 102
103 103 assert exit_code == (0, True)
104 104
105 105 @pytest.mark.parametrize(
106 106 'repo_mode, action', [
107 107 ['receive-pack', 'push'],
108 108 ['upload-pack', 'pull']
109 109 ])
110 110 def test_update_environment(self, git_server, repo_mode, action):
111 111 server = git_server.create(repo_mode=repo_mode)
112 112 with patch('os.environ', {'SSH_CLIENT': '10.10.10.10 b'}):
113 113 with patch('os.putenv') as putenv_mock:
114 114 server._update_environment()
115
115 116 expected_data = {
116 117 "username": git_server.user,
117 118 "scm": "git",
118 119 "repository": git_server.repo_name,
119 120 "make_lock": None,
120 121 "action": [action],
121 122 "ip": "10.10.10.10",
122 123 "locked_by": [None, None],
123 124 "config": ""
124 125 }
125 putenv_mock.assert_called_once_with(
126 'RC_SCM_DATA', json.dumps(expected_data))
126 args, kwargs = putenv_mock.call_args
127 assert json.loads(args[1]) == expected_data
127 128
128 129
129 130 class TestGitServerCheckPermissions(object):
130 131 def test_returns_2_when_no_permissions_found(self, git_server, caplog):
131 132 user_permissions = {}
132 133 server = git_server.create(user_permissions=user_permissions)
133 134 result = server._check_permissions()
134 135 assert result == 2
135 136
136 137 log_msg = 'permission for vcs on test_git are: None'
137 138 assert log_msg in [t[2] for t in caplog.record_tuples]
138 139
139 140 def test_returns_2_when_no_permissions(self, git_server, caplog):
140 141 user_permissions = {git_server.repo_name: 'repository.none'}
141 142 server = git_server.create(user_permissions=user_permissions)
142 143 result = server._check_permissions()
143 144 assert result == 2
144 145
145 146 log_msg = 'repo not found or no permissions'
146 147 assert log_msg in [t[2] for t in caplog.record_tuples]
147 148
148 149 @pytest.mark.parametrize(
149 150 'permission', ['repository.admin', 'repository.write'])
150 151 def test_access_allowed_when_user_has_write_permissions(
151 152 self, git_server, permission, caplog):
152 153 user_permissions = {git_server.repo_name: permission}
153 154 server = git_server.create(user_permissions=user_permissions)
154 155 result = server._check_permissions()
155 156 assert result is None
156 157
157 158 log_msg = 'Write Permissions for User "%s" granted to repo "%s"!' % (
158 159 git_server.user, git_server.repo_name)
159 160 assert log_msg in [t[2] for t in caplog.record_tuples]
160 161
161 162 def test_write_access_is_not_allowed_when_user_has_read_permission(
162 163 self, git_server, caplog):
163 164 user_permissions = {git_server.repo_name: 'repository.read'}
164 165 server = git_server.create(
165 166 user_permissions=user_permissions, repo_mode='receive-pack')
166 167 result = server._check_permissions()
167 168 assert result == -3
168 169
169 170 log_msg = 'Only Read Only access for User "%s" granted to repo "%s"! Failing!' % (
170 171 git_server.user, git_server.repo_name)
171 172 assert log_msg in [t[2] for t in caplog.record_tuples]
172 173
173 174 def test_read_access_allowed_when_user_has_read_permission(
174 175 self, git_server, caplog):
175 176 user_permissions = {git_server.repo_name: 'repository.read'}
176 177 server = git_server.create(
177 178 user_permissions=user_permissions, repo_mode='upload-pack')
178 179 result = server._check_permissions()
179 180 assert result is None
180 181
181 182 log_msg = 'Only Read Only access for User "%s" granted to repo "%s"!' % (
182 183 git_server.user, git_server.repo_name)
183 184 assert log_msg in [t[2] for t in caplog.record_tuples]
184 185
185 186 def test_returns_error_when_permission_not_recognised(
186 187 self, git_server, caplog):
187 188 user_permissions = {git_server.repo_name: 'repository.whatever'}
188 189 server = git_server.create(
189 190 user_permissions=user_permissions, repo_mode='upload-pack')
190 191 result = server._check_permissions()
191 192 assert result == -2
192 193
193 194 log_msg = 'Cannot properly fetch user permission. ' \
194 195 'Return value is: repository.whatever'
195 196 assert log_msg in [t[2] for t in caplog.record_tuples] No newline at end of file
@@ -1,128 +1,128 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2017 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import urlparse
22 22
23 23 import mock
24 24 import pytest
25 25 import simplejson as json
26 26
27 27 from rhodecode.lib.vcs.backends.base import Config
28 28 from rhodecode.tests.lib.middleware import mock_scm_app
29 29 import rhodecode.lib.middleware.simplehg as simplehg
30 30
31 31
32 32 def get_environ(url):
33 33 """Construct a minimum WSGI environ based on the URL."""
34 34 parsed_url = urlparse.urlparse(url)
35 35 environ = {
36 36 'PATH_INFO': parsed_url.path,
37 37 'QUERY_STRING': parsed_url.query,
38 38 }
39 39
40 40 return environ
41 41
42 42
43 43 @pytest.mark.parametrize(
44 44 'url, expected_action',
45 45 [
46 46 ('/foo/bar?cmd=unbundle&key=tip', 'push'),
47 47 ('/foo/bar?cmd=pushkey&key=tip', 'push'),
48 48 ('/foo/bar?cmd=listkeys&key=tip', 'pull'),
49 49 ('/foo/bar?cmd=changegroup&key=tip', 'pull'),
50 50 # Edge case: unknown argument: assume pull
51 51 ('/foo/bar?cmd=unknown&key=tip', 'pull'),
52 52 ('/foo/bar?cmd=&key=tip', 'pull'),
53 53 # Edge case: not cmd argument
54 54 ('/foo/bar?key=tip', 'pull'),
55 55 ])
56 56 def test_get_action(url, expected_action):
57 57 app = simplehg.SimpleHg(application=None,
58 58 config={'auth_ret_code': '', 'base_path': ''},
59 59 registry=None)
60 60 assert expected_action == app._get_action(get_environ(url))
61 61
62 62
63 63 @pytest.mark.parametrize(
64 64 'url, expected_repo_name',
65 65 [
66 66 ('/foo?cmd=unbundle&key=tip', 'foo'),
67 67 ('/foo/bar?cmd=pushkey&key=tip', 'foo/bar'),
68 68 ('/foo/bar/baz?cmd=listkeys&key=tip', 'foo/bar/baz'),
69 69 # Repos with trailing slashes.
70 70 ('/foo/?cmd=unbundle&key=tip', 'foo'),
71 71 ('/foo/bar/?cmd=pushkey&key=tip', 'foo/bar'),
72 72 ('/foo/bar/baz/?cmd=listkeys&key=tip', 'foo/bar/baz'),
73 73 ])
74 74 def test_get_repository_name(url, expected_repo_name):
75 75 app = simplehg.SimpleHg(application=None,
76 76 config={'auth_ret_code': '', 'base_path': ''},
77 77 registry=None)
78 78 assert expected_repo_name == app._get_repository_name(get_environ(url))
79 79
80 80
81 81 def test_get_config(pylonsapp, user_util):
82 82 repo = user_util.create_repo(repo_type='git')
83 83 app = simplehg.SimpleHg(application=None,
84 84 config={'auth_ret_code': '', 'base_path': ''},
85 85 registry=None)
86 extras = {'foo': 'FOO', 'bar': 'BAR'}
86 extras = [('foo', 'FOO', 'bar', 'BAR')]
87 87
88 88 hg_config = app._create_config(extras, repo_name=repo.repo_name)
89 89
90 90 config = simplehg.utils.make_db_config(repo=repo.repo_name)
91 91 config.set('rhodecode', 'RC_SCM_DATA', json.dumps(extras))
92 92 hg_config_org = config
93 93
94 94 expected_config = [
95 95 ('vcs_svn_tag', 'ff89f8c714d135d865f44b90e5413b88de19a55f', '/tags/*'),
96 96 ('web', 'push_ssl', 'False'),
97 97 ('web', 'allow_push', '*'),
98 98 ('web', 'allow_archive', 'gz zip bz2'),
99 99 ('web', 'baseurl', '/'),
100 100 ('vcs_git_lfs', 'store_location', hg_config_org.get('vcs_git_lfs', 'store_location')),
101 101 ('vcs_svn_branch', '9aac1a38c3b8a0cdc4ae0f960a5f83332bc4fa5e', '/branches/*'),
102 102 ('vcs_svn_branch', 'c7e6a611c87da06529fd0dd733308481d67c71a8', '/trunk'),
103 103 ('largefiles', 'usercache', hg_config_org.get('largefiles', 'usercache')),
104 104 ('hooks', 'preoutgoing.pre_pull', 'python:vcsserver.hooks.pre_pull'),
105 105 ('hooks', 'prechangegroup.pre_push', 'python:vcsserver.hooks.pre_push'),
106 106 ('hooks', 'outgoing.pull_logger', 'python:vcsserver.hooks.log_pull_action'),
107 107 ('hooks', 'pretxnchangegroup.pre_push', 'python:vcsserver.hooks.pre_push'),
108 108 ('hooks', 'changegroup.push_logger', 'python:vcsserver.hooks.log_push_action'),
109 109 ('hooks', 'changegroup.repo_size', 'python:vcsserver.hooks.repo_size'),
110 110 ('phases', 'publish', 'True'),
111 111 ('extensions', 'largefiles', ''),
112 112 ('paths', '/', hg_config_org.get('paths', '/')),
113 ('rhodecode', 'RC_SCM_DATA', '{"foo": "FOO", "bar": "BAR"}')
113 ('rhodecode', 'RC_SCM_DATA', '[["foo", "FOO", "bar", "BAR"]]')
114 114 ]
115 115 for entry in expected_config:
116 116 assert entry in hg_config
117 117
118 118
119 119 def test_create_wsgi_app_uses_scm_app_from_simplevcs():
120 120 config = {
121 121 'auth_ret_code': '',
122 122 'base_path': '',
123 123 'vcs.scm_app_implementation':
124 124 'rhodecode.tests.lib.middleware.mock_scm_app',
125 125 }
126 126 app = simplehg.SimpleHg(application=None, config=config, registry=None)
127 127 wsgi_app = app._create_wsgi_app('/tmp/test', 'test_repo', {})
128 128 assert wsgi_app is mock_scm_app.mock_hg_wsgi
General Comments 0
You need to be logged in to leave comments. Login now