##// END OF EJS Templates
fix(hooks): make compat for WSL filesystem better by explicity only changing permissions if they are lower at creation time.
super-admin -
r1186:652285bb default
parent child Browse files
Show More
@@ -1,208 +1,220 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import re
18 import re
19 import os
19 import os
20 import sys
20 import sys
21 import datetime
21 import datetime
22 import logging
22 import logging
23 import pkg_resources
23 import pkg_resources
24
24
25 import vcsserver
25 import vcsserver
26 from vcsserver.str_utils import safe_bytes
26 from vcsserver.str_utils import safe_bytes
27
27
28 log = logging.getLogger(__name__)
28 log = logging.getLogger(__name__)
29
29
30 HOOKS_DIR_MODE = 0o755
note

Ok, isso é um teste. Não use maiusculas.

Certo.

Feito

31 HOOKS_FILE_MODE = 0o755
32
33
34 def set_permissions_if_needed(path_to_check, perms: oct):
35 # Get current permissions
36 current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits
37
38 # Check if current permissions are lower than required
39 if current_permissions < int(perms):
40 # Change the permissions if they are lower than required
41 os.chmod(path_to_check, perms)
42
30
43
31 def get_git_hooks_path(repo_path, bare):
44 def get_git_hooks_path(repo_path, bare):
32 hooks_path = os.path.join(repo_path, 'hooks')
45 hooks_path = os.path.join(repo_path, 'hooks')
33 if not bare:
46 if not bare:
34 hooks_path = os.path.join(repo_path, '.git', 'hooks')
47 hooks_path = os.path.join(repo_path, '.git', 'hooks')
35
48
36 return hooks_path
49 return hooks_path
37
50
38
51
39 def install_git_hooks(repo_path, bare, executable=None, force_create=False):
52 def install_git_hooks(repo_path, bare, executable=None, force_create=False):
40 """
53 """
41 Creates a RhodeCode hook inside a git repository
54 Creates a RhodeCode hook inside a git repository
42
55
43 :param repo_path: path to repository
56 :param repo_path: path to repository
44 :param bare: defines if repository is considered a bare git repo
57 :param bare: defines if repository is considered a bare git repo
45 :param executable: binary executable to put in the hooks
58 :param executable: binary executable to put in the hooks
46 :param force_create: Creates even if the same name hook exists
59 :param force_create: Creates even if the same name hook exists
47 """
60 """
48 executable = executable or sys.executable
61 executable = executable or sys.executable
49 hooks_path = get_git_hooks_path(repo_path, bare)
62 hooks_path = get_git_hooks_path(repo_path, bare)
50
63
51 # we always call it to ensure dir exists and it has a proper mode
64 # we always call it to ensure dir exists and it has a proper mode
52 if not os.path.exists(hooks_path):
65 if not os.path.exists(hooks_path):
53 # If it doesn't exist, create a new directory with the specified mode
66 # If it doesn't exist, create a new directory with the specified mode
54 os.makedirs(hooks_path, mode=0o777, exist_ok=True)
67 os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True)
55 else:
68 # If it exists, change the directory's mode to the specified mode
56 # If it exists, change the directory's mode to the specified mode
69 set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE)
57 os.chmod(hooks_path, mode=0o777)
58
70
59 tmpl_post = pkg_resources.resource_string(
71 tmpl_post = pkg_resources.resource_string(
60 'vcsserver', '/'.join(
72 'vcsserver', '/'.join(
61 ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl')))
73 ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl')))
62 tmpl_pre = pkg_resources.resource_string(
74 tmpl_pre = pkg_resources.resource_string(
63 'vcsserver', '/'.join(
75 'vcsserver', '/'.join(
64 ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl')))
76 ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl')))
65
77
66 path = '' # not used for now
78 path = '' # not used for now
67 timestamp = datetime.datetime.utcnow().isoformat()
79 timestamp = datetime.datetime.utcnow().isoformat()
68
80
69 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
81 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
70 log.debug('Installing git hook in repo %s', repo_path)
82 log.debug('Installing git hook in repo %s', repo_path)
71 _hook_file = os.path.join(hooks_path, f'{h_type}-receive')
83 _hook_file = os.path.join(hooks_path, f'{h_type}-receive')
72 _rhodecode_hook = check_rhodecode_hook(_hook_file)
84 _rhodecode_hook = check_rhodecode_hook(_hook_file)
73
85
74 if _rhodecode_hook or force_create:
86 if _rhodecode_hook or force_create:
75 log.debug('writing git %s hook file at %s !', h_type, _hook_file)
87 log.debug('writing git %s hook file at %s !', h_type, _hook_file)
76 try:
88 try:
77 with open(_hook_file, 'wb') as f:
89 with open(_hook_file, 'wb') as f:
78 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__))
90 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__))
79 template = template.replace(b'_DATE_', safe_bytes(timestamp))
91 template = template.replace(b'_DATE_', safe_bytes(timestamp))
80 template = template.replace(b'_ENV_', safe_bytes(executable))
92 template = template.replace(b'_ENV_', safe_bytes(executable))
81 template = template.replace(b'_PATH_', safe_bytes(path))
93 template = template.replace(b'_PATH_', safe_bytes(path))
82 f.write(template)
94 f.write(template)
83 os.chmod(_hook_file, 0o755)
95 set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE)
84 except OSError:
96 except OSError:
85 log.exception('error writing hook file %s', _hook_file)
97 log.exception('error writing hook file %s', _hook_file)
86 else:
98 else:
87 log.debug('skipping writing hook file')
99 log.debug('skipping writing hook file')
88
100
89 return True
101 return True
90
102
91
103
92 def get_svn_hooks_path(repo_path):
104 def get_svn_hooks_path(repo_path):
93 hooks_path = os.path.join(repo_path, 'hooks')
105 hooks_path = os.path.join(repo_path, 'hooks')
94
106
95 return hooks_path
107 return hooks_path
96
108
97
109
98 def install_svn_hooks(repo_path, executable=None, force_create=False):
110 def install_svn_hooks(repo_path, executable=None, force_create=False):
99 """
111 """
100 Creates RhodeCode hooks inside a svn repository
112 Creates RhodeCode hooks inside a svn repository
101
113
102 :param repo_path: path to repository
114 :param repo_path: path to repository
103 :param executable: binary executable to put in the hooks
115 :param executable: binary executable to put in the hooks
104 :param force_create: Create even if same name hook exists
116 :param force_create: Create even if same name hook exists
105 """
117 """
106 executable = executable or sys.executable
118 executable = executable or sys.executable
107 hooks_path = get_svn_hooks_path(repo_path)
119 hooks_path = get_svn_hooks_path(repo_path)
108 if not os.path.isdir(hooks_path):
120 if not os.path.isdir(hooks_path):
109 os.makedirs(hooks_path, mode=0o777, exist_ok=True)
121 os.makedirs(hooks_path, mode=0o777, exist_ok=True)
110
122
111 tmpl_post = pkg_resources.resource_string(
123 tmpl_post = pkg_resources.resource_string(
112 'vcsserver', '/'.join(
124 'vcsserver', '/'.join(
113 ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl')))
125 ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl')))
114 tmpl_pre = pkg_resources.resource_string(
126 tmpl_pre = pkg_resources.resource_string(
115 'vcsserver', '/'.join(
127 'vcsserver', '/'.join(
116 ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl')))
128 ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl')))
117
129
118 path = '' # not used for now
130 path = '' # not used for now
119 timestamp = datetime.datetime.utcnow().isoformat()
131 timestamp = datetime.datetime.utcnow().isoformat()
120
132
121 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
133 for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]:
122 log.debug('Installing svn hook in repo %s', repo_path)
134 log.debug('Installing svn hook in repo %s', repo_path)
123 _hook_file = os.path.join(hooks_path, f'{h_type}-commit')
135 _hook_file = os.path.join(hooks_path, f'{h_type}-commit')
124 _rhodecode_hook = check_rhodecode_hook(_hook_file)
136 _rhodecode_hook = check_rhodecode_hook(_hook_file)
125
137
126 if _rhodecode_hook or force_create:
138 if _rhodecode_hook or force_create:
127 log.debug('writing svn %s hook file at %s !', h_type, _hook_file)
139 log.debug('writing svn %s hook file at %s !', h_type, _hook_file)
128
140
129 try:
141 try:
130 with open(_hook_file, 'wb') as f:
142 with open(_hook_file, 'wb') as f:
131 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__))
143 template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__))
132 template = template.replace(b'_DATE_', safe_bytes(timestamp))
144 template = template.replace(b'_DATE_', safe_bytes(timestamp))
133 template = template.replace(b'_ENV_', safe_bytes(executable))
145 template = template.replace(b'_ENV_', safe_bytes(executable))
134 template = template.replace(b'_PATH_', safe_bytes(path))
146 template = template.replace(b'_PATH_', safe_bytes(path))
135
147
136 f.write(template)
148 f.write(template)
137 os.chmod(_hook_file, 0o755)
149 os.chmod(_hook_file, 0o755)
138 except OSError:
150 except OSError:
139 log.exception('error writing hook file %s', _hook_file)
151 log.exception('error writing hook file %s', _hook_file)
140 else:
152 else:
141 log.debug('skipping writing hook file')
153 log.debug('skipping writing hook file')
142
154
143 return True
155 return True
144
156
145
157
146 def get_version_from_hook(hook_path):
158 def get_version_from_hook(hook_path):
147 version = b''
159 version = b''
148 hook_content = read_hook_content(hook_path)
160 hook_content = read_hook_content(hook_path)
149 matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content)
161 matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content)
150 if matches:
162 if matches:
151 try:
163 try:
152 version = matches.groups()[0]
164 version = matches.groups()[0]
153 log.debug('got version %s from hooks.', version)
165 log.debug('got version %s from hooks.', version)
154 except Exception:
166 except Exception:
155 log.exception("Exception while reading the hook version.")
167 log.exception("Exception while reading the hook version.")
156 return version.replace(b"'", b"")
168 return version.replace(b"'", b"")
157
169
158
170
159 def check_rhodecode_hook(hook_path):
171 def check_rhodecode_hook(hook_path):
160 """
172 """
161 Check if the hook was created by RhodeCode
173 Check if the hook was created by RhodeCode
162 """
174 """
163 if not os.path.exists(hook_path):
175 if not os.path.exists(hook_path):
164 return True
176 return True
165
177
166 log.debug('hook exists, checking if it is from RhodeCode')
178 log.debug('hook exists, checking if it is from RhodeCode')
167
179
168 version = get_version_from_hook(hook_path)
180 version = get_version_from_hook(hook_path)
169 if version:
181 if version:
170 return True
182 return True
171
183
172 return False
184 return False
173
185
174
186
175 def read_hook_content(hook_path) -> bytes:
187 def read_hook_content(hook_path) -> bytes:
176 content = b''
188 content = b''
177 if os.path.isfile(hook_path):
189 if os.path.isfile(hook_path):
178 with open(hook_path, 'rb') as f:
190 with open(hook_path, 'rb') as f:
179 content = f.read()
191 content = f.read()
180 return content
192 return content
181
193
182
194
183 def get_git_pre_hook_version(repo_path, bare):
195 def get_git_pre_hook_version(repo_path, bare):
184 hooks_path = get_git_hooks_path(repo_path, bare)
196 hooks_path = get_git_hooks_path(repo_path, bare)
185 _hook_file = os.path.join(hooks_path, 'pre-receive')
197 _hook_file = os.path.join(hooks_path, 'pre-receive')
186 version = get_version_from_hook(_hook_file)
198 version = get_version_from_hook(_hook_file)
187 return version
199 return version
188
200
189
201
190 def get_git_post_hook_version(repo_path, bare):
202 def get_git_post_hook_version(repo_path, bare):
191 hooks_path = get_git_hooks_path(repo_path, bare)
203 hooks_path = get_git_hooks_path(repo_path, bare)
192 _hook_file = os.path.join(hooks_path, 'post-receive')
204 _hook_file = os.path.join(hooks_path, 'post-receive')
193 version = get_version_from_hook(_hook_file)
205 version = get_version_from_hook(_hook_file)
194 return version
206 return version
195
207
196
208
197 def get_svn_pre_hook_version(repo_path):
209 def get_svn_pre_hook_version(repo_path):
198 hooks_path = get_svn_hooks_path(repo_path)
210 hooks_path = get_svn_hooks_path(repo_path)
199 _hook_file = os.path.join(hooks_path, 'pre-commit')
211 _hook_file = os.path.join(hooks_path, 'pre-commit')
200 version = get_version_from_hook(_hook_file)
212 version = get_version_from_hook(_hook_file)
201 return version
213 return version
202
214
203
215
204 def get_svn_post_hook_version(repo_path):
216 def get_svn_post_hook_version(repo_path):
205 hooks_path = get_svn_hooks_path(repo_path)
217 hooks_path = get_svn_hooks_path(repo_path)
206 _hook_file = os.path.join(hooks_path, 'post-commit')
218 _hook_file = os.path.join(hooks_path, 'post-commit')
207 version = get_version_from_hook(_hook_file)
219 version = get_version_from_hook(_hook_file)
208 return version
220 return version
@@ -1,206 +1,289 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
3 #
4 # This program is free software; you can redistribute it and/or modify
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
7 # (at your option) any later version.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU General Public License
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
18 import os
18 import os
19 import sys
19 import sys
20 import stat
20 import stat
21 import pytest
21 import pytest
22 import vcsserver
22 import vcsserver
23 import tempfile
23 import tempfile
24 from vcsserver import hook_utils
24 from vcsserver import hook_utils
25 from vcsserver.hook_utils import set_permissions_if_needed, HOOKS_DIR_MODE, HOOKS_FILE_MODE
25 from vcsserver.tests.fixture import no_newline_id_generator
26 from vcsserver.tests.fixture import no_newline_id_generator
26 from vcsserver.str_utils import safe_bytes, safe_str
27 from vcsserver.str_utils import safe_bytes
27 from vcsserver.utils import AttributeDict
28 from vcsserver.utils import AttributeDict
28
29
29
30
30 class TestCheckRhodecodeHook:
31 class TestCheckRhodecodeHook:
31
32
32 def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir):
33 def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir):
33 hook = os.path.join(str(tmpdir), 'fake_hook_file.py')
34 hook = os.path.join(str(tmpdir), 'fake_hook_file.py')
34 with open(hook, 'wb') as f:
35 with open(hook, 'wb') as f:
35 f.write(b'dummy test')
36 f.write(b'dummy test')
36 result = hook_utils.check_rhodecode_hook(hook)
37 result = hook_utils.check_rhodecode_hook(hook)
37 assert result is False
38 assert result is False
38
39
39 def test_returns_true_when_no_hook_file_found(self, tmpdir):
40 def test_returns_true_when_no_hook_file_found(self, tmpdir):
40 hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py')
41 hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py')
41 result = hook_utils.check_rhodecode_hook(hook)
42 result = hook_utils.check_rhodecode_hook(hook)
42 assert result
43 assert result
43
44
44 @pytest.mark.parametrize("file_content, expected_result", [
45 @pytest.mark.parametrize("file_content, expected_result", [
45 ("RC_HOOK_VER = '3.3.3'\n", True),
46 ("RC_HOOK_VER = '3.3.3'\n", True),
46 ("RC_HOOK = '3.3.3'\n", False),
47 ("RC_HOOK = '3.3.3'\n", False),
47 ], ids=no_newline_id_generator)
48 ], ids=no_newline_id_generator)
48 def test_signatures(self, file_content, expected_result, tmpdir):
49 def test_signatures(self, file_content, expected_result, tmpdir):
49 hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py')
50 hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py')
50 with open(hook, 'wb') as f:
51 with open(hook, 'wb') as f:
51 f.write(safe_bytes(file_content))
52 f.write(safe_bytes(file_content))
52
53
53 result = hook_utils.check_rhodecode_hook(hook)
54 result = hook_utils.check_rhodecode_hook(hook)
54
55
55 assert result is expected_result
56 assert result is expected_result
56
57
57
58
58 class BaseInstallHooks:
59 class BaseInstallHooks:
59 HOOK_FILES = ()
60 HOOK_FILES = ()
60
61
62 def _check_hook_file_dir_mode(self, file_path):
63 dir_path = os.path.dirname(file_path)
64 assert os.path.exists(dir_path), f'dir {file_path} missing'
65 stat_info = os.stat(dir_path)
66
67 file_mode = stat.S_IMODE(stat_info.st_mode)
68 expected_mode = int(HOOKS_DIR_MODE)
69 assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {dir_path}'
70
61 def _check_hook_file_mode(self, file_path):
71 def _check_hook_file_mode(self, file_path):
62 assert os.path.exists(file_path), f'path {file_path} missing'
72 assert os.path.exists(file_path), f'path {file_path} missing'
63 stat_info = os.stat(file_path)
73 stat_info = os.stat(file_path)
64
74
65 file_mode = stat.S_IMODE(stat_info.st_mode)
75 file_mode = stat.S_IMODE(stat_info.st_mode)
66 expected_mode = int('755', 8)
76 expected_mode = int(HOOKS_FILE_MODE)
67 assert expected_mode == file_mode
77 assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {file_path}'
68
78
69 def _check_hook_file_content(self, file_path, executable):
79 def _check_hook_file_content(self, file_path, executable):
70 executable = executable or sys.executable
80 executable = executable or sys.executable
71 with open(file_path, 'rt') as hook_file:
81 with open(file_path, 'rt') as hook_file:
72 content = hook_file.read()
82 content = hook_file.read()
73
83
74 expected_env = '#!{}'.format(executable)
84 expected_env = '#!{}'.format(executable)
75 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.__version__)
85 expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.__version__)
76 assert content.strip().startswith(expected_env)
86 assert content.strip().startswith(expected_env)
77 assert expected_rc_version in content
87 assert expected_rc_version in content
78
88
79 def _create_fake_hook(self, file_path, content):
89 def _create_fake_hook(self, file_path, content):
80 with open(file_path, 'w') as hook_file:
90 with open(file_path, 'w') as hook_file:
81 hook_file.write(content)
91 hook_file.write(content)
82
92
83 def create_dummy_repo(self, repo_type):
93 def create_dummy_repo(self, repo_type):
84 tmpdir = tempfile.mkdtemp()
94 tmpdir = tempfile.mkdtemp()
85 repo = AttributeDict()
95 repo = AttributeDict()
86 if repo_type == 'git':
96 if repo_type == 'git':
87 repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo')
97 repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo')
88 os.makedirs(repo.path)
98 os.makedirs(repo.path)
89 os.makedirs(os.path.join(repo.path, 'hooks'))
99 os.makedirs(os.path.join(repo.path, 'hooks'))
90 repo.bare = True
100 repo.bare = True
91
101
92 elif repo_type == 'svn':
102 elif repo_type == 'svn':
93 repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo')
103 repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo')
94 os.makedirs(repo.path)
104 os.makedirs(repo.path)
95 os.makedirs(os.path.join(repo.path, 'hooks'))
105 os.makedirs(os.path.join(repo.path, 'hooks'))
96
106
97 return repo
107 return repo
98
108
99 def check_hooks(self, repo_path, repo_bare=True):
109 def check_hooks(self, repo_path, repo_bare=True):
100 for file_name in self.HOOK_FILES:
110 for file_name in self.HOOK_FILES:
101 if repo_bare:
111 if repo_bare:
102 file_path = os.path.join(repo_path, 'hooks', file_name)
112 file_path = os.path.join(repo_path, 'hooks', file_name)
103 else:
113 else:
104 file_path = os.path.join(repo_path, '.git', 'hooks', file_name)
114 file_path = os.path.join(repo_path, '.git', 'hooks', file_name)
115
116 self._check_hook_file_dir_mode(file_path)
105 self._check_hook_file_mode(file_path)
117 self._check_hook_file_mode(file_path)
106 self._check_hook_file_content(file_path, sys.executable)
118 self._check_hook_file_content(file_path, sys.executable)
107
119
108
120
109 class TestInstallGitHooks(BaseInstallHooks):
121 class TestInstallGitHooks(BaseInstallHooks):
110 HOOK_FILES = ('pre-receive', 'post-receive')
122 HOOK_FILES = ('pre-receive', 'post-receive')
111
123
112 def test_hooks_are_installed(self):
124 def test_hooks_are_installed(self):
113 repo = self.create_dummy_repo('git')
125 repo = self.create_dummy_repo('git')
114 result = hook_utils.install_git_hooks(repo.path, repo.bare)
126 result = hook_utils.install_git_hooks(repo.path, repo.bare)
115 assert result
127 assert result
116 self.check_hooks(repo.path, repo.bare)
128 self.check_hooks(repo.path, repo.bare)
117
129
118 def test_hooks_are_replaced(self):
130 def test_hooks_are_replaced(self):
119 repo = self.create_dummy_repo('git')
131 repo = self.create_dummy_repo('git')
120 hooks_path = os.path.join(repo.path, 'hooks')
132 hooks_path = os.path.join(repo.path, 'hooks')
121 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
133 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
122 self._create_fake_hook(
134 self._create_fake_hook(
123 file_path, content="RC_HOOK_VER = 'abcde'\n")
135 file_path, content="RC_HOOK_VER = 'abcde'\n")
124
136
125 result = hook_utils.install_git_hooks(repo.path, repo.bare)
137 result = hook_utils.install_git_hooks(repo.path, repo.bare)
126 assert result
138 assert result
127 self.check_hooks(repo.path, repo.bare)
139 self.check_hooks(repo.path, repo.bare)
128
140
129 def test_non_rc_hooks_are_not_replaced(self):
141 def test_non_rc_hooks_are_not_replaced(self):
130 repo = self.create_dummy_repo('git')
142 repo = self.create_dummy_repo('git')
131 hooks_path = os.path.join(repo.path, 'hooks')
143 hooks_path = os.path.join(repo.path, 'hooks')
132 non_rc_content = 'echo "non rc hook"\n'
144 non_rc_content = 'echo "non rc hook"\n'
133 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
145 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
134 self._create_fake_hook(
146 self._create_fake_hook(
135 file_path, content=non_rc_content)
147 file_path, content=non_rc_content)
136
148
137 result = hook_utils.install_git_hooks(repo.path, repo.bare)
149 result = hook_utils.install_git_hooks(repo.path, repo.bare)
138 assert result
150 assert result
139
151
140 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
152 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
141 with open(file_path, 'rt') as hook_file:
153 with open(file_path, 'rt') as hook_file:
142 content = hook_file.read()
154 content = hook_file.read()
143 assert content == non_rc_content
155 assert content == non_rc_content
144
156
145 def test_non_rc_hooks_are_replaced_with_force_flag(self):
157 def test_non_rc_hooks_are_replaced_with_force_flag(self):
146 repo = self.create_dummy_repo('git')
158 repo = self.create_dummy_repo('git')
147 hooks_path = os.path.join(repo.path, 'hooks')
159 hooks_path = os.path.join(repo.path, 'hooks')
148 non_rc_content = 'echo "non rc hook"\n'
160 non_rc_content = 'echo "non rc hook"\n'
149 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
161 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
150 self._create_fake_hook(
162 self._create_fake_hook(
151 file_path, content=non_rc_content)
163 file_path, content=non_rc_content)
152
164
153 result = hook_utils.install_git_hooks(
165 result = hook_utils.install_git_hooks(
154 repo.path, repo.bare, force_create=True)
166 repo.path, repo.bare, force_create=True)
155 assert result
167 assert result
156 self.check_hooks(repo.path, repo.bare)
168 self.check_hooks(repo.path, repo.bare)
157
169
158
170
159 class TestInstallSvnHooks(BaseInstallHooks):
171 class TestInstallSvnHooks(BaseInstallHooks):
160 HOOK_FILES = ('pre-commit', 'post-commit')
172 HOOK_FILES = ('pre-commit', 'post-commit')
161
173
162 def test_hooks_are_installed(self):
174 def test_hooks_are_installed(self):
163 repo = self.create_dummy_repo('svn')
175 repo = self.create_dummy_repo('svn')
164 result = hook_utils.install_svn_hooks(repo.path)
176 result = hook_utils.install_svn_hooks(repo.path)
165 assert result
177 assert result
166 self.check_hooks(repo.path)
178 self.check_hooks(repo.path)
167
179
168 def test_hooks_are_replaced(self):
180 def test_hooks_are_replaced(self):
169 repo = self.create_dummy_repo('svn')
181 repo = self.create_dummy_repo('svn')
170 hooks_path = os.path.join(repo.path, 'hooks')
182 hooks_path = os.path.join(repo.path, 'hooks')
171 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
183 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
172 self._create_fake_hook(
184 self._create_fake_hook(
173 file_path, content="RC_HOOK_VER = 'abcde'\n")
185 file_path, content="RC_HOOK_VER = 'abcde'\n")
174
186
175 result = hook_utils.install_svn_hooks(repo.path)
187 result = hook_utils.install_svn_hooks(repo.path)
176 assert result
188 assert result
177 self.check_hooks(repo.path)
189 self.check_hooks(repo.path)
178
190
179 def test_non_rc_hooks_are_not_replaced(self):
191 def test_non_rc_hooks_are_not_replaced(self):
180 repo = self.create_dummy_repo('svn')
192 repo = self.create_dummy_repo('svn')
181 hooks_path = os.path.join(repo.path, 'hooks')
193 hooks_path = os.path.join(repo.path, 'hooks')
182 non_rc_content = 'echo "non rc hook"\n'
194 non_rc_content = 'echo "non rc hook"\n'
183 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
195 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
184 self._create_fake_hook(
196 self._create_fake_hook(
185 file_path, content=non_rc_content)
197 file_path, content=non_rc_content)
186
198
187 result = hook_utils.install_svn_hooks(repo.path)
199 result = hook_utils.install_svn_hooks(repo.path)
188 assert result
200 assert result
189
201
190 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
202 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
191 with open(file_path, 'rt') as hook_file:
203 with open(file_path, 'rt') as hook_file:
192 content = hook_file.read()
204 content = hook_file.read()
193 assert content == non_rc_content
205 assert content == non_rc_content
194
206
195 def test_non_rc_hooks_are_replaced_with_force_flag(self):
207 def test_non_rc_hooks_are_replaced_with_force_flag(self):
196 repo = self.create_dummy_repo('svn')
208 repo = self.create_dummy_repo('svn')
197 hooks_path = os.path.join(repo.path, 'hooks')
209 hooks_path = os.path.join(repo.path, 'hooks')
198 non_rc_content = 'echo "non rc hook"\n'
210 non_rc_content = 'echo "non rc hook"\n'
199 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
211 for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]:
200 self._create_fake_hook(
212 self._create_fake_hook(
201 file_path, content=non_rc_content)
213 file_path, content=non_rc_content)
202
214
203 result = hook_utils.install_svn_hooks(
215 result = hook_utils.install_svn_hooks(
204 repo.path, force_create=True)
216 repo.path, force_create=True)
205 assert result
217 assert result
206 self.check_hooks(repo.path, )
218 self.check_hooks(repo.path, )
219
220
221 def create_test_file(filename):
222 """Utility function to create a test file."""
223 with open(filename, 'w') as f:
224 f.write("Test file")
225
226
227 def remove_test_file(filename):
228 """Utility function to remove a test file."""
229 if os.path.exists(filename):
230 os.remove(filename)
231
232
233 @pytest.fixture
234 def test_file():
235 filename = 'test_file.txt'
236 create_test_file(filename)
237 yield filename
238 remove_test_file(filename)
239
240
241 def test_increase_permissions(test_file):
242 # Set initial lower permissions
243 initial_perms = 0o644
244 os.chmod(test_file, initial_perms)
245
246 # Set higher permissions
247 new_perms = 0o666
248 set_permissions_if_needed(test_file, new_perms)
249
250 # Check if permissions were updated
251 assert (os.stat(test_file).st_mode & 0o777) == new_perms
252
253
254 def test_no_permission_change_needed(test_file):
255 # Set initial permissions
256 initial_perms = 0o666
257 os.chmod(test_file, initial_perms)
258
259 # Attempt to set the same permissions
260 set_permissions_if_needed(test_file, initial_perms)
261
262 # Check if permissions were unchanged
263 assert (os.stat(test_file).st_mode & 0o777) == initial_perms
264
265
266 def test_no_permission_reduction(test_file):
267 # Set initial higher permissions
268 initial_perms = 0o666
269 os.chmod(test_file, initial_perms)
270
271 # Attempt to set lower permissions
272 lower_perms = 0o644
273 set_permissions_if_needed(test_file, lower_perms)
274
275 # Check if permissions were not reduced
276 assert (os.stat(test_file).st_mode & 0o777) == initial_perms
277
278
279 def test_no_permission_reduction_when_on_777(test_file):
280 # Set initial higher permissions
281 initial_perms = 0o777
282 os.chmod(test_file, initial_perms)
283
284 # Attempt to set lower permissions
285 lower_perms = 0o755
286 set_permissions_if_needed(test_file, lower_perms)
287
288 # Check if permissions were not reduced
289 assert (os.stat(test_file).st_mode & 0o777) == initial_perms
General Comments 0
You need to be logged in to leave comments. Login now