##// END OF EJS Templates
fix(hooks): make compat for WSL filesystem better by explicity only changing permissions if they are lower at creation time.
super-admin -
r1186:652285bb default
parent child Browse files
Show More
@@ -1,208 +1,220 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import re
19 19 import os
20 20 import sys
21 21 import datetime
22 22 import logging
23 23 import pkg_resources
24 24
25 25 import vcsserver
26 26 from vcsserver.str_utils import safe_bytes
27 27
28 28 log = logging.getLogger(__name__)
29 29
30 HOOKS_DIR_MODE = 0o755
note

Ok, isso é um teste. Não use maiusculas.

Certo.

Feito

31 HOOKS_FILE_MODE = 0o755
32
33
34 def set_permissions_if_needed(path_to_check, perms: oct):
35 # Get current permissions
36 current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits
37
38 # Check if current permissions are lower than required
39 if current_permissions < int(perms):
40 # Change the permissions if they are lower than required
41 os.chmod(path_to_check, perms)
42
30 43
31 44 def get_git_hooks_path(repo_path, bare):
32 45 hooks_path = os.path.join(repo_path, 'hooks')
33 46 if not bare:
34 47 hooks_path = os.path.join(repo_path, '.git', 'hooks')
35 48
36 49 return hooks_path
37 50
38 51
39 52 def install_git_hooks(repo_path, bare, executable=None, force_create=False):
40 53 """
41 54 Creates a RhodeCode hook inside a git repository
42 55
43 56 :param repo_path: path to repository
44 57 :param bare: defines if repository is considered a bare git repo
45 58 :param executable: binary executable to put in the hooks
46 59 :param force_create: Creates even if the same name hook exists
47 60 """
48 61 executable = executable or sys.executable
49 62 hooks_path = get_git_hooks_path(repo_path, bare)
50 63
51 64 # we always call it to ensure dir exists and it has a proper mode
52 65 if not os.path.exists(hooks_path):
53 66 # If it doesn't exist, create a new directory with the specified mode
54 os.makedirs(hooks_path, mode=0o777, exist_ok=True)
55 else:
67 os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True)
56 68 # If it exists, change the directory's mode to the specified mode
57 os.chmod(hooks_path, mode=0o777)
69 set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE)
58 70
59 71 tmpl_post = pkg_resources.resource_string(
60 72 'vcsserver', '/'.join(
61 73 ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl')))
62 74 tmpl_pre = pkg_resources.resource_string(
63 75 'vcsserver', '/'.join(
64 76 ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl')))
65 77
66 78 path = '' # not used for now
67 79 timestamp = datetime.datetime.utcnow().isoformat()
68 80
69 81 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
70 82 log.debug('Installing git hook in repo %s', repo_path)
71 83 _hook_file = os.path.join(hooks_path, f'{h_type}-receive')
72 84 _rhodecode_hook = check_rhodecode_hook(_hook_file)
73 85
74 86 if _rhodecode_hook or force_create:
75 87 log.debug('writing git %s hook file at %s !', h_type, _hook_file)
76 88 try:
77 89 with open(_hook_file, 'wb') as f:
78 90 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__))
79 91 template = template.replace(b'_DATE_', safe_bytes(timestamp))
80 92 template = template.replace(b'_ENV_', safe_bytes(executable))
81 93 template = template.replace(b'_PATH_', safe_bytes(path))
82 94 f.write(template)
83 os.chmod(_hook_file, 0o755)
95 set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE)
84 96 except OSError:
85 97 log.exception('error writing hook file %s', _hook_file)
86 98 else:
87 99 log.debug('skipping writing hook file')
88 100
89 101 return True
90 102
91 103
92 104 def get_svn_hooks_path(repo_path):
93 105 hooks_path = os.path.join(repo_path, 'hooks')
94 106
95 107 return hooks_path
96 108
97 109
98 110 def install_svn_hooks(repo_path, executable=None, force_create=False):
99 111 """
100 112 Creates RhodeCode hooks inside a svn repository
101 113
102 114 :param repo_path: path to repository
103 115 :param executable: binary executable to put in the hooks
104 116 :param force_create: Create even if same name hook exists
105 117 """
106 118 executable = executable or sys.executable
107 119 hooks_path = get_svn_hooks_path(repo_path)
108 120 if not os.path.isdir(hooks_path):
109 121 os.makedirs(hooks_path, mode=0o777, exist_ok=True)
110 122
111 123 tmpl_post = pkg_resources.resource_string(
112 124 'vcsserver', '/'.join(
113 125 ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl')))
114 126 tmpl_pre = pkg_resources.resource_string(
115 127 'vcsserver', '/'.join(
116 128 ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl')))
117 129
118 130 path = '' # not used for now
119 131 timestamp = datetime.datetime.utcnow().isoformat()
120 132
121 133 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
122 134 log.debug('Installing svn hook in repo %s', repo_path)
123 135 _hook_file = os.path.join(hooks_path, f'{h_type}-commit')
124 136 _rhodecode_hook = check_rhodecode_hook(_hook_file)
125 137
126 138 if _rhodecode_hook or force_create:
127 139 log.debug('writing svn %s hook file at %s !', h_type, _hook_file)
128 140
129 141 try:
130 142 with open(_hook_file, 'wb') as f:
131 143 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__))
132 144 template = template.replace(b'_DATE_', safe_bytes(timestamp))
133 145 template = template.replace(b'_ENV_', safe_bytes(executable))
134 146 template = template.replace(b'_PATH_', safe_bytes(path))
135 147
136 148 f.write(template)
137 149 os.chmod(_hook_file, 0o755)
138 150 except OSError:
139 151 log.exception('error writing hook file %s', _hook_file)
140 152 else:
141 153 log.debug('skipping writing hook file')
142 154
143 155 return True
144 156
145 157
146 158 def get_version_from_hook(hook_path):
147 159 version = b''
148 160 hook_content = read_hook_content(hook_path)
149 161 matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content)
150 162 if matches:
151 163 try:
152 164 version = matches.groups()[0]
153 165 log.debug('got version %s from hooks.', version)
154 166 except Exception:
155 167 log.exception("Exception while reading the hook version.")
156 168 return version.replace(b"'", b"")
157 169
158 170
159 171 def check_rhodecode_hook(hook_path):
160 172 """
161 173 Check if the hook was created by RhodeCode
162 174 """
163 175 if not os.path.exists(hook_path):
164 176 return True
165 177
166 178 log.debug('hook exists, checking if it is from RhodeCode')
167 179
168 180 version = get_version_from_hook(hook_path)
169 181 if version:
170 182 return True
171 183
172 184 return False
173 185
174 186
175 187 def read_hook_content(hook_path) -> bytes:
176 188 content = b''
177 189 if os.path.isfile(hook_path):
178 190 with open(hook_path, 'rb') as f:
179 191 content = f.read()
180 192 return content
181 193
182 194
183 195 def get_git_pre_hook_version(repo_path, bare):
184 196 hooks_path = get_git_hooks_path(repo_path, bare)
185 197 _hook_file = os.path.join(hooks_path, 'pre-receive')
186 198 version = get_version_from_hook(_hook_file)
187 199 return version
188 200
189 201
190 202 def get_git_post_hook_version(repo_path, bare):
191 203 hooks_path = get_git_hooks_path(repo_path, bare)
192 204 _hook_file = os.path.join(hooks_path, 'post-receive')
193 205 version = get_version_from_hook(_hook_file)
194 206 return version
195 207
196 208
197 209 def get_svn_pre_hook_version(repo_path):
198 210 hooks_path = get_svn_hooks_path(repo_path)
199 211 _hook_file = os.path.join(hooks_path, 'pre-commit')
200 212 version = get_version_from_hook(_hook_file)
201 213 return version
202 214
203 215
204 216 def get_svn_post_hook_version(repo_path):
205 217 hooks_path = get_svn_hooks_path(repo_path)
206 218 _hook_file = os.path.join(hooks_path, 'post-commit')
207 219 version = get_version_from_hook(_hook_file)
208 220 return version
@@ -1,206 +1,289 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import os
19 19 import sys
20 20 import stat
21 21 import pytest
22 22 import vcsserver
23 23 import tempfile
24 24 from vcsserver import hook_utils
25 from vcsserver.hook_utils import set_permissions_if_needed, HOOKS_DIR_MODE, HOOKS_FILE_MODE
25 26 from vcsserver.tests.fixture import no_newline_id_generator
26 from vcsserver.str_utils import safe_bytes, safe_str
27 from vcsserver.str_utils import safe_bytes
27 28 from vcsserver.utils import AttributeDict
28 29
29 30
30 31 class TestCheckRhodecodeHook:
31 32
32 33 def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir):
33 34 hook = os.path.join(str(tmpdir), 'fake_hook_file.py')
34 35 with open(hook, 'wb') as f:
35 36 f.write(b'dummy test')
36 37 result = hook_utils.check_rhodecode_hook(hook)
37 38 assert result is False
38 39
39 40 def test_returns_true_when_no_hook_file_found(self, tmpdir):
40 41 hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py')
41 42 result = hook_utils.check_rhodecode_hook(hook)
42 43 assert result
43 44
44 45 @pytest.mark.parametrize("file_content, expected_result", [
45 46 ("RC_HOOK_VER = '3.3.3'\n", True),
46 47 ("RC_HOOK = '3.3.3'\n", False),
47 48 ], ids=no_newline_id_generator)
48 49 def test_signatures(self, file_content, expected_result, tmpdir):
49 50 hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py')
50 51 with open(hook, 'wb') as f:
51 52 f.write(safe_bytes(file_content))
52 53
53 54 result = hook_utils.check_rhodecode_hook(hook)
54 55
55 56 assert result is expected_result
56 57
57 58
58 59 class BaseInstallHooks:
59 60 HOOK_FILES = ()
60 61
62 def _check_hook_file_dir_mode(self, file_path):
63 dir_path = os.path.dirname(file_path)
64 assert os.path.exists(dir_path), f'dir {file_path} missing'
65 stat_info = os.stat(dir_path)
66
67 file_mode = stat.S_IMODE(stat_info.st_mode)
68 expected_mode = int(HOOKS_DIR_MODE)
69 assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {dir_path}'
70
61 71 def _check_hook_file_mode(self, file_path):
62 72 assert os.path.exists(file_path), f'path {file_path} missing'
63 73 stat_info = os.stat(file_path)
64 74
65 75 file_mode = stat.S_IMODE(stat_info.st_mode)
66 expected_mode = int('755', 8)
67 assert expected_mode == file_mode
76 expected_mode = int(HOOKS_FILE_MODE)
77 assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {file_path}'
68 78
69 79 def _check_hook_file_content(self, file_path, executable):
70 80 executable = executable or sys.executable
71 81 with open(file_path, 'rt') as hook_file:
72 82 content = hook_file.read()
73 83
74 84 expected_env = '#!{}'.format(executable)
75 85 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.__version__)
76 86 assert content.strip().startswith(expected_env)
77 87 assert expected_rc_version in content
78 88
79 89 def _create_fake_hook(self, file_path, content):
80 90 with open(file_path, 'w') as hook_file:
81 91 hook_file.write(content)
82 92
83 93 def create_dummy_repo(self, repo_type):
84 94 tmpdir = tempfile.mkdtemp()
85 95 repo = AttributeDict()
86 96 if repo_type == 'git':
87 97 repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo')
88 98 os.makedirs(repo.path)
89 99 os.makedirs(os.path.join(repo.path, 'hooks'))
90 100 repo.bare = True
91 101
92 102 elif repo_type == 'svn':
93 103 repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo')
94 104 os.makedirs(repo.path)
95 105 os.makedirs(os.path.join(repo.path, 'hooks'))
96 106
97 107 return repo
98 108
99 109 def check_hooks(self, repo_path, repo_bare=True):
100 110 for file_name in self.HOOK_FILES:
101 111 if repo_bare:
102 112 file_path = os.path.join(repo_path, 'hooks', file_name)
103 113 else:
104 114 file_path = os.path.join(repo_path, '.git', 'hooks', file_name)
115
116 self._check_hook_file_dir_mode(file_path)
105 117 self._check_hook_file_mode(file_path)
106 118 self._check_hook_file_content(file_path, sys.executable)
107 119
108 120
109 121 class TestInstallGitHooks(BaseInstallHooks):
110 122 HOOK_FILES = ('pre-receive', 'post-receive')
111 123
112 124 def test_hooks_are_installed(self):
113 125 repo = self.create_dummy_repo('git')
114 126 result = hook_utils.install_git_hooks(repo.path, repo.bare)
115 127 assert result
116 128 self.check_hooks(repo.path, repo.bare)
117 129
118 130 def test_hooks_are_replaced(self):
119 131 repo = self.create_dummy_repo('git')
120 132 hooks_path = os.path.join(repo.path, 'hooks')
121 133 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
122 134 self._create_fake_hook(
123 135 file_path, content="RC_HOOK_VER = 'abcde'\n")
124 136
125 137 result = hook_utils.install_git_hooks(repo.path, repo.bare)
126 138 assert result
127 139 self.check_hooks(repo.path, repo.bare)
128 140
129 141 def test_non_rc_hooks_are_not_replaced(self):
130 142 repo = self.create_dummy_repo('git')
131 143 hooks_path = os.path.join(repo.path, 'hooks')
132 144 non_rc_content = 'echo "non rc hook"\n'
133 145 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
134 146 self._create_fake_hook(
135 147 file_path, content=non_rc_content)
136 148
137 149 result = hook_utils.install_git_hooks(repo.path, repo.bare)
138 150 assert result
139 151
140 152 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
141 153 with open(file_path, 'rt') as hook_file:
142 154 content = hook_file.read()
143 155 assert content == non_rc_content
144 156
145 157 def test_non_rc_hooks_are_replaced_with_force_flag(self):
146 158 repo = self.create_dummy_repo('git')
147 159 hooks_path = os.path.join(repo.path, 'hooks')
148 160 non_rc_content = 'echo "non rc hook"\n'
149 161 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
150 162 self._create_fake_hook(
151 163 file_path, content=non_rc_content)
152 164
153 165 result = hook_utils.install_git_hooks(
154 166 repo.path, repo.bare, force_create=True)
155 167 assert result
156 168 self.check_hooks(repo.path, repo.bare)
157 169
158 170
159 171 class TestInstallSvnHooks(BaseInstallHooks):
160 172 HOOK_FILES = ('pre-commit', 'post-commit')
161 173
162 174 def test_hooks_are_installed(self):
163 175 repo = self.create_dummy_repo('svn')
164 176 result = hook_utils.install_svn_hooks(repo.path)
165 177 assert result
166 178 self.check_hooks(repo.path)
167 179
168 180 def test_hooks_are_replaced(self):
169 181 repo = self.create_dummy_repo('svn')
170 182 hooks_path = os.path.join(repo.path, 'hooks')
171 183 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
172 184 self._create_fake_hook(
173 185 file_path, content="RC_HOOK_VER = 'abcde'\n")
174 186
175 187 result = hook_utils.install_svn_hooks(repo.path)
176 188 assert result
177 189 self.check_hooks(repo.path)
178 190
179 191 def test_non_rc_hooks_are_not_replaced(self):
180 192 repo = self.create_dummy_repo('svn')
181 193 hooks_path = os.path.join(repo.path, 'hooks')
182 194 non_rc_content = 'echo "non rc hook"\n'
183 195 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
184 196 self._create_fake_hook(
185 197 file_path, content=non_rc_content)
186 198
187 199 result = hook_utils.install_svn_hooks(repo.path)
188 200 assert result
189 201
190 202 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
191 203 with open(file_path, 'rt') as hook_file:
192 204 content = hook_file.read()
193 205 assert content == non_rc_content
194 206
195 207 def test_non_rc_hooks_are_replaced_with_force_flag(self):
196 208 repo = self.create_dummy_repo('svn')
197 209 hooks_path = os.path.join(repo.path, 'hooks')
198 210 non_rc_content = 'echo "non rc hook"\n'
199 211 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
200 212 self._create_fake_hook(
201 213 file_path, content=non_rc_content)
202 214
203 215 result = hook_utils.install_svn_hooks(
204 216 repo.path, force_create=True)
205 217 assert result
206 218 self.check_hooks(repo.path, )
219
220
221 def create_test_file(filename):
222 """Utility function to create a test file."""
223 with open(filename, 'w') as f:
224 f.write("Test file")
225
226
227 def remove_test_file(filename):
228 """Utility function to remove a test file."""
229 if os.path.exists(filename):
230 os.remove(filename)
231
232
233 @pytest.fixture
234 def test_file():
235 filename = 'test_file.txt'
236 create_test_file(filename)
237 yield filename
238 remove_test_file(filename)
239
240
241 def test_increase_permissions(test_file):
242 # Set initial lower permissions
243 initial_perms = 0o644
244 os.chmod(test_file, initial_perms)
245
246 # Set higher permissions
247 new_perms = 0o666
248 set_permissions_if_needed(test_file, new_perms)
249
250 # Check if permissions were updated
251 assert (os.stat(test_file).st_mode & 0o777) == new_perms
252
253
254 def test_no_permission_change_needed(test_file):
255 # Set initial permissions
256 initial_perms = 0o666
257 os.chmod(test_file, initial_perms)
258
259 # Attempt to set the same permissions
260 set_permissions_if_needed(test_file, initial_perms)
261
262 # Check if permissions were unchanged
263 assert (os.stat(test_file).st_mode & 0o777) == initial_perms
264
265
266 def test_no_permission_reduction(test_file):
267 # Set initial higher permissions
268 initial_perms = 0o666
269 os.chmod(test_file, initial_perms)
270
271 # Attempt to set lower permissions
272 lower_perms = 0o644
273 set_permissions_if_needed(test_file, lower_perms)
274
275 # Check if permissions were not reduced
276 assert (os.stat(test_file).st_mode & 0o777) == initial_perms
277
278
279 def test_no_permission_reduction_when_on_777(test_file):
280 # Set initial higher permissions
281 initial_perms = 0o777
282 os.chmod(test_file, initial_perms)
283
284 # Attempt to set lower permissions
285 lower_perms = 0o755
286 set_permissions_if_needed(test_file, lower_perms)
287
288 # Check if permissions were not reduced
289 assert (os.stat(test_file).st_mode & 0o777) == initial_perms
General Comments 0
You need to be logged in to leave comments. Login now