Ok, isso é um teste. Não use maiusculas.
Show More
@@ -1,208 +1,220 b'' | |||||
1 | # RhodeCode VCSServer provides access to different vcs backends via network. |
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
2 | # Copyright (C) 2014-2023 RhodeCode GmbH |
|
2 | # Copyright (C) 2014-2023 RhodeCode GmbH | |
3 | # |
|
3 | # | |
4 | # This program is free software; you can redistribute it and/or modify |
|
4 | # This program is free software; you can redistribute it and/or modify | |
5 | # it under the terms of the GNU General Public License as published by |
|
5 | # it under the terms of the GNU General Public License as published by | |
6 | # the Free Software Foundation; either version 3 of the License, or |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
7 | # (at your option) any later version. |
|
7 | # (at your option) any later version. | |
8 | # |
|
8 | # | |
9 | # This program is distributed in the hope that it will be useful, |
|
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU General Public License for more details. |
|
12 | # GNU General Public License for more details. | |
13 | # |
|
13 | # | |
14 | # You should have received a copy of the GNU General Public License |
|
14 | # You should have received a copy of the GNU General Public License | |
15 | # along with this program; if not, write to the Free Software Foundation, |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
17 |
|
17 | |||
18 | import re |
|
18 | import re | |
19 | import os |
|
19 | import os | |
20 | import sys |
|
20 | import sys | |
21 | import datetime |
|
21 | import datetime | |
22 | import logging |
|
22 | import logging | |
23 | import pkg_resources |
|
23 | import pkg_resources | |
24 |
|
24 | |||
25 | import vcsserver |
|
25 | import vcsserver | |
26 | from vcsserver.str_utils import safe_bytes |
|
26 | from vcsserver.str_utils import safe_bytes | |
27 |
|
27 | |||
28 | log = logging.getLogger(__name__) |
|
28 | log = logging.getLogger(__name__) | |
29 |
|
29 | |||
|
30 |
HOOKS_DIR_MODE = 0o755
Certo. Feito |
|||
|
31 | HOOKS_FILE_MODE = 0o755 | |||
|
32 | ||||
|
33 | ||||
|
34 | def set_permissions_if_needed(path_to_check, perms: oct): | |||
|
35 | # Get current permissions | |||
|
36 | current_permissions = os.stat(path_to_check).st_mode & 0o777 # Extract permission bits | |||
|
37 | ||||
|
38 | # Check if current permissions are lower than required | |||
|
39 | if current_permissions < int(perms): | |||
|
40 | # Change the permissions if they are lower than required | |||
|
41 | os.chmod(path_to_check, perms) | |||
|
42 | ||||
30 |
|
43 | |||
31 | def get_git_hooks_path(repo_path, bare): |
|
44 | def get_git_hooks_path(repo_path, bare): | |
32 | hooks_path = os.path.join(repo_path, 'hooks') |
|
45 | hooks_path = os.path.join(repo_path, 'hooks') | |
33 | if not bare: |
|
46 | if not bare: | |
34 | hooks_path = os.path.join(repo_path, '.git', 'hooks') |
|
47 | hooks_path = os.path.join(repo_path, '.git', 'hooks') | |
35 |
|
48 | |||
36 | return hooks_path |
|
49 | return hooks_path | |
37 |
|
50 | |||
38 |
|
51 | |||
39 | def install_git_hooks(repo_path, bare, executable=None, force_create=False): |
|
52 | def install_git_hooks(repo_path, bare, executable=None, force_create=False): | |
40 | """ |
|
53 | """ | |
41 | Creates a RhodeCode hook inside a git repository |
|
54 | Creates a RhodeCode hook inside a git repository | |
42 |
|
55 | |||
43 | :param repo_path: path to repository |
|
56 | :param repo_path: path to repository | |
44 | :param bare: defines if repository is considered a bare git repo |
|
57 | :param bare: defines if repository is considered a bare git repo | |
45 | :param executable: binary executable to put in the hooks |
|
58 | :param executable: binary executable to put in the hooks | |
46 | :param force_create: Creates even if the same name hook exists |
|
59 | :param force_create: Creates even if the same name hook exists | |
47 | """ |
|
60 | """ | |
48 | executable = executable or sys.executable |
|
61 | executable = executable or sys.executable | |
49 | hooks_path = get_git_hooks_path(repo_path, bare) |
|
62 | hooks_path = get_git_hooks_path(repo_path, bare) | |
50 |
|
63 | |||
51 | # we always call it to ensure dir exists and it has a proper mode |
|
64 | # we always call it to ensure dir exists and it has a proper mode | |
52 | if not os.path.exists(hooks_path): |
|
65 | if not os.path.exists(hooks_path): | |
53 | # If it doesn't exist, create a new directory with the specified mode |
|
66 | # If it doesn't exist, create a new directory with the specified mode | |
54 |
os.makedirs(hooks_path, mode= |
|
67 | os.makedirs(hooks_path, mode=HOOKS_DIR_MODE, exist_ok=True) | |
55 | else: |
|
|||
56 |
|
|
68 | # If it exists, change the directory's mode to the specified mode | |
57 | os.chmod(hooks_path, mode=0o777) |
|
69 | set_permissions_if_needed(hooks_path, perms=HOOKS_DIR_MODE) | |
58 |
|
70 | |||
59 | tmpl_post = pkg_resources.resource_string( |
|
71 | tmpl_post = pkg_resources.resource_string( | |
60 | 'vcsserver', '/'.join( |
|
72 | 'vcsserver', '/'.join( | |
61 | ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl'))) |
|
73 | ('hook_utils', 'hook_templates', 'git_post_receive.py.tmpl'))) | |
62 | tmpl_pre = pkg_resources.resource_string( |
|
74 | tmpl_pre = pkg_resources.resource_string( | |
63 | 'vcsserver', '/'.join( |
|
75 | 'vcsserver', '/'.join( | |
64 | ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl'))) |
|
76 | ('hook_utils', 'hook_templates', 'git_pre_receive.py.tmpl'))) | |
65 |
|
77 | |||
66 | path = '' # not used for now |
|
78 | path = '' # not used for now | |
67 | timestamp = datetime.datetime.utcnow().isoformat() |
|
79 | timestamp = datetime.datetime.utcnow().isoformat() | |
68 |
|
80 | |||
69 | for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: |
|
81 | for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: | |
70 | log.debug('Installing git hook in repo %s', repo_path) |
|
82 | log.debug('Installing git hook in repo %s', repo_path) | |
71 | _hook_file = os.path.join(hooks_path, f'{h_type}-receive') |
|
83 | _hook_file = os.path.join(hooks_path, f'{h_type}-receive') | |
72 | _rhodecode_hook = check_rhodecode_hook(_hook_file) |
|
84 | _rhodecode_hook = check_rhodecode_hook(_hook_file) | |
73 |
|
85 | |||
74 | if _rhodecode_hook or force_create: |
|
86 | if _rhodecode_hook or force_create: | |
75 | log.debug('writing git %s hook file at %s !', h_type, _hook_file) |
|
87 | log.debug('writing git %s hook file at %s !', h_type, _hook_file) | |
76 | try: |
|
88 | try: | |
77 | with open(_hook_file, 'wb') as f: |
|
89 | with open(_hook_file, 'wb') as f: | |
78 | template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__)) |
|
90 | template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__)) | |
79 | template = template.replace(b'_DATE_', safe_bytes(timestamp)) |
|
91 | template = template.replace(b'_DATE_', safe_bytes(timestamp)) | |
80 | template = template.replace(b'_ENV_', safe_bytes(executable)) |
|
92 | template = template.replace(b'_ENV_', safe_bytes(executable)) | |
81 | template = template.replace(b'_PATH_', safe_bytes(path)) |
|
93 | template = template.replace(b'_PATH_', safe_bytes(path)) | |
82 | f.write(template) |
|
94 | f.write(template) | |
83 | os.chmod(_hook_file, 0o755) |
|
95 | set_permissions_if_needed(_hook_file, perms=HOOKS_FILE_MODE) | |
84 | except OSError: |
|
96 | except OSError: | |
85 | log.exception('error writing hook file %s', _hook_file) |
|
97 | log.exception('error writing hook file %s', _hook_file) | |
86 | else: |
|
98 | else: | |
87 | log.debug('skipping writing hook file') |
|
99 | log.debug('skipping writing hook file') | |
88 |
|
100 | |||
89 | return True |
|
101 | return True | |
90 |
|
102 | |||
91 |
|
103 | |||
92 | def get_svn_hooks_path(repo_path): |
|
104 | def get_svn_hooks_path(repo_path): | |
93 | hooks_path = os.path.join(repo_path, 'hooks') |
|
105 | hooks_path = os.path.join(repo_path, 'hooks') | |
94 |
|
106 | |||
95 | return hooks_path |
|
107 | return hooks_path | |
96 |
|
108 | |||
97 |
|
109 | |||
98 | def install_svn_hooks(repo_path, executable=None, force_create=False): |
|
110 | def install_svn_hooks(repo_path, executable=None, force_create=False): | |
99 | """ |
|
111 | """ | |
100 | Creates RhodeCode hooks inside a svn repository |
|
112 | Creates RhodeCode hooks inside a svn repository | |
101 |
|
113 | |||
102 | :param repo_path: path to repository |
|
114 | :param repo_path: path to repository | |
103 | :param executable: binary executable to put in the hooks |
|
115 | :param executable: binary executable to put in the hooks | |
104 | :param force_create: Create even if same name hook exists |
|
116 | :param force_create: Create even if same name hook exists | |
105 | """ |
|
117 | """ | |
106 | executable = executable or sys.executable |
|
118 | executable = executable or sys.executable | |
107 | hooks_path = get_svn_hooks_path(repo_path) |
|
119 | hooks_path = get_svn_hooks_path(repo_path) | |
108 | if not os.path.isdir(hooks_path): |
|
120 | if not os.path.isdir(hooks_path): | |
109 | os.makedirs(hooks_path, mode=0o777, exist_ok=True) |
|
121 | os.makedirs(hooks_path, mode=0o777, exist_ok=True) | |
110 |
|
122 | |||
111 | tmpl_post = pkg_resources.resource_string( |
|
123 | tmpl_post = pkg_resources.resource_string( | |
112 | 'vcsserver', '/'.join( |
|
124 | 'vcsserver', '/'.join( | |
113 | ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl'))) |
|
125 | ('hook_utils', 'hook_templates', 'svn_post_commit_hook.py.tmpl'))) | |
114 | tmpl_pre = pkg_resources.resource_string( |
|
126 | tmpl_pre = pkg_resources.resource_string( | |
115 | 'vcsserver', '/'.join( |
|
127 | 'vcsserver', '/'.join( | |
116 | ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl'))) |
|
128 | ('hook_utils', 'hook_templates', 'svn_pre_commit_hook.py.tmpl'))) | |
117 |
|
129 | |||
118 | path = '' # not used for now |
|
130 | path = '' # not used for now | |
119 | timestamp = datetime.datetime.utcnow().isoformat() |
|
131 | timestamp = datetime.datetime.utcnow().isoformat() | |
120 |
|
132 | |||
121 | for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: |
|
133 | for h_type, template in [('pre', tmpl_pre), ('post', tmpl_post)]: | |
122 | log.debug('Installing svn hook in repo %s', repo_path) |
|
134 | log.debug('Installing svn hook in repo %s', repo_path) | |
123 | _hook_file = os.path.join(hooks_path, f'{h_type}-commit') |
|
135 | _hook_file = os.path.join(hooks_path, f'{h_type}-commit') | |
124 | _rhodecode_hook = check_rhodecode_hook(_hook_file) |
|
136 | _rhodecode_hook = check_rhodecode_hook(_hook_file) | |
125 |
|
137 | |||
126 | if _rhodecode_hook or force_create: |
|
138 | if _rhodecode_hook or force_create: | |
127 | log.debug('writing svn %s hook file at %s !', h_type, _hook_file) |
|
139 | log.debug('writing svn %s hook file at %s !', h_type, _hook_file) | |
128 |
|
140 | |||
129 | try: |
|
141 | try: | |
130 | with open(_hook_file, 'wb') as f: |
|
142 | with open(_hook_file, 'wb') as f: | |
131 | template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__)) |
|
143 | template = template.replace(b'_TMPL_', safe_bytes(vcsserver.__version__)) | |
132 | template = template.replace(b'_DATE_', safe_bytes(timestamp)) |
|
144 | template = template.replace(b'_DATE_', safe_bytes(timestamp)) | |
133 | template = template.replace(b'_ENV_', safe_bytes(executable)) |
|
145 | template = template.replace(b'_ENV_', safe_bytes(executable)) | |
134 | template = template.replace(b'_PATH_', safe_bytes(path)) |
|
146 | template = template.replace(b'_PATH_', safe_bytes(path)) | |
135 |
|
147 | |||
136 | f.write(template) |
|
148 | f.write(template) | |
137 | os.chmod(_hook_file, 0o755) |
|
149 | os.chmod(_hook_file, 0o755) | |
138 | except OSError: |
|
150 | except OSError: | |
139 | log.exception('error writing hook file %s', _hook_file) |
|
151 | log.exception('error writing hook file %s', _hook_file) | |
140 | else: |
|
152 | else: | |
141 | log.debug('skipping writing hook file') |
|
153 | log.debug('skipping writing hook file') | |
142 |
|
154 | |||
143 | return True |
|
155 | return True | |
144 |
|
156 | |||
145 |
|
157 | |||
146 | def get_version_from_hook(hook_path): |
|
158 | def get_version_from_hook(hook_path): | |
147 | version = b'' |
|
159 | version = b'' | |
148 | hook_content = read_hook_content(hook_path) |
|
160 | hook_content = read_hook_content(hook_path) | |
149 | matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content) |
|
161 | matches = re.search(rb'RC_HOOK_VER\s*=\s*(.*)', hook_content) | |
150 | if matches: |
|
162 | if matches: | |
151 | try: |
|
163 | try: | |
152 | version = matches.groups()[0] |
|
164 | version = matches.groups()[0] | |
153 | log.debug('got version %s from hooks.', version) |
|
165 | log.debug('got version %s from hooks.', version) | |
154 | except Exception: |
|
166 | except Exception: | |
155 | log.exception("Exception while reading the hook version.") |
|
167 | log.exception("Exception while reading the hook version.") | |
156 | return version.replace(b"'", b"") |
|
168 | return version.replace(b"'", b"") | |
157 |
|
169 | |||
158 |
|
170 | |||
159 | def check_rhodecode_hook(hook_path): |
|
171 | def check_rhodecode_hook(hook_path): | |
160 | """ |
|
172 | """ | |
161 | Check if the hook was created by RhodeCode |
|
173 | Check if the hook was created by RhodeCode | |
162 | """ |
|
174 | """ | |
163 | if not os.path.exists(hook_path): |
|
175 | if not os.path.exists(hook_path): | |
164 | return True |
|
176 | return True | |
165 |
|
177 | |||
166 | log.debug('hook exists, checking if it is from RhodeCode') |
|
178 | log.debug('hook exists, checking if it is from RhodeCode') | |
167 |
|
179 | |||
168 | version = get_version_from_hook(hook_path) |
|
180 | version = get_version_from_hook(hook_path) | |
169 | if version: |
|
181 | if version: | |
170 | return True |
|
182 | return True | |
171 |
|
183 | |||
172 | return False |
|
184 | return False | |
173 |
|
185 | |||
174 |
|
186 | |||
175 | def read_hook_content(hook_path) -> bytes: |
|
187 | def read_hook_content(hook_path) -> bytes: | |
176 | content = b'' |
|
188 | content = b'' | |
177 | if os.path.isfile(hook_path): |
|
189 | if os.path.isfile(hook_path): | |
178 | with open(hook_path, 'rb') as f: |
|
190 | with open(hook_path, 'rb') as f: | |
179 | content = f.read() |
|
191 | content = f.read() | |
180 | return content |
|
192 | return content | |
181 |
|
193 | |||
182 |
|
194 | |||
183 | def get_git_pre_hook_version(repo_path, bare): |
|
195 | def get_git_pre_hook_version(repo_path, bare): | |
184 | hooks_path = get_git_hooks_path(repo_path, bare) |
|
196 | hooks_path = get_git_hooks_path(repo_path, bare) | |
185 | _hook_file = os.path.join(hooks_path, 'pre-receive') |
|
197 | _hook_file = os.path.join(hooks_path, 'pre-receive') | |
186 | version = get_version_from_hook(_hook_file) |
|
198 | version = get_version_from_hook(_hook_file) | |
187 | return version |
|
199 | return version | |
188 |
|
200 | |||
189 |
|
201 | |||
190 | def get_git_post_hook_version(repo_path, bare): |
|
202 | def get_git_post_hook_version(repo_path, bare): | |
191 | hooks_path = get_git_hooks_path(repo_path, bare) |
|
203 | hooks_path = get_git_hooks_path(repo_path, bare) | |
192 | _hook_file = os.path.join(hooks_path, 'post-receive') |
|
204 | _hook_file = os.path.join(hooks_path, 'post-receive') | |
193 | version = get_version_from_hook(_hook_file) |
|
205 | version = get_version_from_hook(_hook_file) | |
194 | return version |
|
206 | return version | |
195 |
|
207 | |||
196 |
|
208 | |||
197 | def get_svn_pre_hook_version(repo_path): |
|
209 | def get_svn_pre_hook_version(repo_path): | |
198 | hooks_path = get_svn_hooks_path(repo_path) |
|
210 | hooks_path = get_svn_hooks_path(repo_path) | |
199 | _hook_file = os.path.join(hooks_path, 'pre-commit') |
|
211 | _hook_file = os.path.join(hooks_path, 'pre-commit') | |
200 | version = get_version_from_hook(_hook_file) |
|
212 | version = get_version_from_hook(_hook_file) | |
201 | return version |
|
213 | return version | |
202 |
|
214 | |||
203 |
|
215 | |||
204 | def get_svn_post_hook_version(repo_path): |
|
216 | def get_svn_post_hook_version(repo_path): | |
205 | hooks_path = get_svn_hooks_path(repo_path) |
|
217 | hooks_path = get_svn_hooks_path(repo_path) | |
206 | _hook_file = os.path.join(hooks_path, 'post-commit') |
|
218 | _hook_file = os.path.join(hooks_path, 'post-commit') | |
207 | version = get_version_from_hook(_hook_file) |
|
219 | version = get_version_from_hook(_hook_file) | |
208 | return version |
|
220 | return version |
@@ -1,206 +1,289 b'' | |||||
1 | # RhodeCode VCSServer provides access to different vcs backends via network. |
|
1 | # RhodeCode VCSServer provides access to different vcs backends via network. | |
2 | # Copyright (C) 2014-2023 RhodeCode GmbH |
|
2 | # Copyright (C) 2014-2023 RhodeCode GmbH | |
3 | # |
|
3 | # | |
4 | # This program is free software; you can redistribute it and/or modify |
|
4 | # This program is free software; you can redistribute it and/or modify | |
5 | # it under the terms of the GNU General Public License as published by |
|
5 | # it under the terms of the GNU General Public License as published by | |
6 | # the Free Software Foundation; either version 3 of the License, or |
|
6 | # the Free Software Foundation; either version 3 of the License, or | |
7 | # (at your option) any later version. |
|
7 | # (at your option) any later version. | |
8 | # |
|
8 | # | |
9 | # This program is distributed in the hope that it will be useful, |
|
9 | # This program is distributed in the hope that it will be useful, | |
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
10 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
11 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 | # GNU General Public License for more details. |
|
12 | # GNU General Public License for more details. | |
13 | # |
|
13 | # | |
14 | # You should have received a copy of the GNU General Public License |
|
14 | # You should have received a copy of the GNU General Public License | |
15 | # along with this program; if not, write to the Free Software Foundation, |
|
15 | # along with this program; if not, write to the Free Software Foundation, | |
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
|
16 | # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
17 |
|
17 | |||
18 | import os |
|
18 | import os | |
19 | import sys |
|
19 | import sys | |
20 | import stat |
|
20 | import stat | |
21 | import pytest |
|
21 | import pytest | |
22 | import vcsserver |
|
22 | import vcsserver | |
23 | import tempfile |
|
23 | import tempfile | |
24 | from vcsserver import hook_utils |
|
24 | from vcsserver import hook_utils | |
|
25 | from vcsserver.hook_utils import set_permissions_if_needed, HOOKS_DIR_MODE, HOOKS_FILE_MODE | |||
25 | from vcsserver.tests.fixture import no_newline_id_generator |
|
26 | from vcsserver.tests.fixture import no_newline_id_generator | |
26 |
from vcsserver.str_utils import safe_bytes |
|
27 | from vcsserver.str_utils import safe_bytes | |
27 | from vcsserver.utils import AttributeDict |
|
28 | from vcsserver.utils import AttributeDict | |
28 |
|
29 | |||
29 |
|
30 | |||
30 | class TestCheckRhodecodeHook: |
|
31 | class TestCheckRhodecodeHook: | |
31 |
|
32 | |||
32 | def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir): |
|
33 | def test_returns_false_when_hook_file_is_wrong_found(self, tmpdir): | |
33 | hook = os.path.join(str(tmpdir), 'fake_hook_file.py') |
|
34 | hook = os.path.join(str(tmpdir), 'fake_hook_file.py') | |
34 | with open(hook, 'wb') as f: |
|
35 | with open(hook, 'wb') as f: | |
35 | f.write(b'dummy test') |
|
36 | f.write(b'dummy test') | |
36 | result = hook_utils.check_rhodecode_hook(hook) |
|
37 | result = hook_utils.check_rhodecode_hook(hook) | |
37 | assert result is False |
|
38 | assert result is False | |
38 |
|
39 | |||
39 | def test_returns_true_when_no_hook_file_found(self, tmpdir): |
|
40 | def test_returns_true_when_no_hook_file_found(self, tmpdir): | |
40 | hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py') |
|
41 | hook = os.path.join(str(tmpdir), 'fake_hook_file_not_existing.py') | |
41 | result = hook_utils.check_rhodecode_hook(hook) |
|
42 | result = hook_utils.check_rhodecode_hook(hook) | |
42 | assert result |
|
43 | assert result | |
43 |
|
44 | |||
44 | @pytest.mark.parametrize("file_content, expected_result", [ |
|
45 | @pytest.mark.parametrize("file_content, expected_result", [ | |
45 | ("RC_HOOK_VER = '3.3.3'\n", True), |
|
46 | ("RC_HOOK_VER = '3.3.3'\n", True), | |
46 | ("RC_HOOK = '3.3.3'\n", False), |
|
47 | ("RC_HOOK = '3.3.3'\n", False), | |
47 | ], ids=no_newline_id_generator) |
|
48 | ], ids=no_newline_id_generator) | |
48 | def test_signatures(self, file_content, expected_result, tmpdir): |
|
49 | def test_signatures(self, file_content, expected_result, tmpdir): | |
49 | hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py') |
|
50 | hook = os.path.join(str(tmpdir), 'fake_hook_file_1.py') | |
50 | with open(hook, 'wb') as f: |
|
51 | with open(hook, 'wb') as f: | |
51 | f.write(safe_bytes(file_content)) |
|
52 | f.write(safe_bytes(file_content)) | |
52 |
|
53 | |||
53 | result = hook_utils.check_rhodecode_hook(hook) |
|
54 | result = hook_utils.check_rhodecode_hook(hook) | |
54 |
|
55 | |||
55 | assert result is expected_result |
|
56 | assert result is expected_result | |
56 |
|
57 | |||
57 |
|
58 | |||
58 | class BaseInstallHooks: |
|
59 | class BaseInstallHooks: | |
59 | HOOK_FILES = () |
|
60 | HOOK_FILES = () | |
60 |
|
61 | |||
|
62 | def _check_hook_file_dir_mode(self, file_path): | |||
|
63 | dir_path = os.path.dirname(file_path) | |||
|
64 | assert os.path.exists(dir_path), f'dir {file_path} missing' | |||
|
65 | stat_info = os.stat(dir_path) | |||
|
66 | ||||
|
67 | file_mode = stat.S_IMODE(stat_info.st_mode) | |||
|
68 | expected_mode = int(HOOKS_DIR_MODE) | |||
|
69 | assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {dir_path}' | |||
|
70 | ||||
61 | def _check_hook_file_mode(self, file_path): |
|
71 | def _check_hook_file_mode(self, file_path): | |
62 | assert os.path.exists(file_path), f'path {file_path} missing' |
|
72 | assert os.path.exists(file_path), f'path {file_path} missing' | |
63 | stat_info = os.stat(file_path) |
|
73 | stat_info = os.stat(file_path) | |
64 |
|
74 | |||
65 | file_mode = stat.S_IMODE(stat_info.st_mode) |
|
75 | file_mode = stat.S_IMODE(stat_info.st_mode) | |
66 |
expected_mode = int( |
|
76 | expected_mode = int(HOOKS_FILE_MODE) | |
67 | assert expected_mode == file_mode |
|
77 | assert expected_mode == file_mode, f'expected mode: {oct(expected_mode)} got: {oct(file_mode)} for {file_path}' | |
68 |
|
78 | |||
69 | def _check_hook_file_content(self, file_path, executable): |
|
79 | def _check_hook_file_content(self, file_path, executable): | |
70 | executable = executable or sys.executable |
|
80 | executable = executable or sys.executable | |
71 | with open(file_path, 'rt') as hook_file: |
|
81 | with open(file_path, 'rt') as hook_file: | |
72 | content = hook_file.read() |
|
82 | content = hook_file.read() | |
73 |
|
83 | |||
74 | expected_env = '#!{}'.format(executable) |
|
84 | expected_env = '#!{}'.format(executable) | |
75 | expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.__version__) |
|
85 | expected_rc_version = "\nRC_HOOK_VER = '{}'\n".format(vcsserver.__version__) | |
76 | assert content.strip().startswith(expected_env) |
|
86 | assert content.strip().startswith(expected_env) | |
77 | assert expected_rc_version in content |
|
87 | assert expected_rc_version in content | |
78 |
|
88 | |||
79 | def _create_fake_hook(self, file_path, content): |
|
89 | def _create_fake_hook(self, file_path, content): | |
80 | with open(file_path, 'w') as hook_file: |
|
90 | with open(file_path, 'w') as hook_file: | |
81 | hook_file.write(content) |
|
91 | hook_file.write(content) | |
82 |
|
92 | |||
83 | def create_dummy_repo(self, repo_type): |
|
93 | def create_dummy_repo(self, repo_type): | |
84 | tmpdir = tempfile.mkdtemp() |
|
94 | tmpdir = tempfile.mkdtemp() | |
85 | repo = AttributeDict() |
|
95 | repo = AttributeDict() | |
86 | if repo_type == 'git': |
|
96 | if repo_type == 'git': | |
87 | repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo') |
|
97 | repo.path = os.path.join(tmpdir, 'test_git_hooks_installation_repo') | |
88 | os.makedirs(repo.path) |
|
98 | os.makedirs(repo.path) | |
89 | os.makedirs(os.path.join(repo.path, 'hooks')) |
|
99 | os.makedirs(os.path.join(repo.path, 'hooks')) | |
90 | repo.bare = True |
|
100 | repo.bare = True | |
91 |
|
101 | |||
92 | elif repo_type == 'svn': |
|
102 | elif repo_type == 'svn': | |
93 | repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo') |
|
103 | repo.path = os.path.join(tmpdir, 'test_svn_hooks_installation_repo') | |
94 | os.makedirs(repo.path) |
|
104 | os.makedirs(repo.path) | |
95 | os.makedirs(os.path.join(repo.path, 'hooks')) |
|
105 | os.makedirs(os.path.join(repo.path, 'hooks')) | |
96 |
|
106 | |||
97 | return repo |
|
107 | return repo | |
98 |
|
108 | |||
99 | def check_hooks(self, repo_path, repo_bare=True): |
|
109 | def check_hooks(self, repo_path, repo_bare=True): | |
100 | for file_name in self.HOOK_FILES: |
|
110 | for file_name in self.HOOK_FILES: | |
101 | if repo_bare: |
|
111 | if repo_bare: | |
102 | file_path = os.path.join(repo_path, 'hooks', file_name) |
|
112 | file_path = os.path.join(repo_path, 'hooks', file_name) | |
103 | else: |
|
113 | else: | |
104 | file_path = os.path.join(repo_path, '.git', 'hooks', file_name) |
|
114 | file_path = os.path.join(repo_path, '.git', 'hooks', file_name) | |
|
115 | ||||
|
116 | self._check_hook_file_dir_mode(file_path) | |||
105 | self._check_hook_file_mode(file_path) |
|
117 | self._check_hook_file_mode(file_path) | |
106 | self._check_hook_file_content(file_path, sys.executable) |
|
118 | self._check_hook_file_content(file_path, sys.executable) | |
107 |
|
119 | |||
108 |
|
120 | |||
109 | class TestInstallGitHooks(BaseInstallHooks): |
|
121 | class TestInstallGitHooks(BaseInstallHooks): | |
110 | HOOK_FILES = ('pre-receive', 'post-receive') |
|
122 | HOOK_FILES = ('pre-receive', 'post-receive') | |
111 |
|
123 | |||
112 | def test_hooks_are_installed(self): |
|
124 | def test_hooks_are_installed(self): | |
113 | repo = self.create_dummy_repo('git') |
|
125 | repo = self.create_dummy_repo('git') | |
114 | result = hook_utils.install_git_hooks(repo.path, repo.bare) |
|
126 | result = hook_utils.install_git_hooks(repo.path, repo.bare) | |
115 | assert result |
|
127 | assert result | |
116 | self.check_hooks(repo.path, repo.bare) |
|
128 | self.check_hooks(repo.path, repo.bare) | |
117 |
|
129 | |||
118 | def test_hooks_are_replaced(self): |
|
130 | def test_hooks_are_replaced(self): | |
119 | repo = self.create_dummy_repo('git') |
|
131 | repo = self.create_dummy_repo('git') | |
120 | hooks_path = os.path.join(repo.path, 'hooks') |
|
132 | hooks_path = os.path.join(repo.path, 'hooks') | |
121 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
133 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
122 | self._create_fake_hook( |
|
134 | self._create_fake_hook( | |
123 | file_path, content="RC_HOOK_VER = 'abcde'\n") |
|
135 | file_path, content="RC_HOOK_VER = 'abcde'\n") | |
124 |
|
136 | |||
125 | result = hook_utils.install_git_hooks(repo.path, repo.bare) |
|
137 | result = hook_utils.install_git_hooks(repo.path, repo.bare) | |
126 | assert result |
|
138 | assert result | |
127 | self.check_hooks(repo.path, repo.bare) |
|
139 | self.check_hooks(repo.path, repo.bare) | |
128 |
|
140 | |||
129 | def test_non_rc_hooks_are_not_replaced(self): |
|
141 | def test_non_rc_hooks_are_not_replaced(self): | |
130 | repo = self.create_dummy_repo('git') |
|
142 | repo = self.create_dummy_repo('git') | |
131 | hooks_path = os.path.join(repo.path, 'hooks') |
|
143 | hooks_path = os.path.join(repo.path, 'hooks') | |
132 | non_rc_content = 'echo "non rc hook"\n' |
|
144 | non_rc_content = 'echo "non rc hook"\n' | |
133 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
145 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
134 | self._create_fake_hook( |
|
146 | self._create_fake_hook( | |
135 | file_path, content=non_rc_content) |
|
147 | file_path, content=non_rc_content) | |
136 |
|
148 | |||
137 | result = hook_utils.install_git_hooks(repo.path, repo.bare) |
|
149 | result = hook_utils.install_git_hooks(repo.path, repo.bare) | |
138 | assert result |
|
150 | assert result | |
139 |
|
151 | |||
140 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
152 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
141 | with open(file_path, 'rt') as hook_file: |
|
153 | with open(file_path, 'rt') as hook_file: | |
142 | content = hook_file.read() |
|
154 | content = hook_file.read() | |
143 | assert content == non_rc_content |
|
155 | assert content == non_rc_content | |
144 |
|
156 | |||
145 | def test_non_rc_hooks_are_replaced_with_force_flag(self): |
|
157 | def test_non_rc_hooks_are_replaced_with_force_flag(self): | |
146 | repo = self.create_dummy_repo('git') |
|
158 | repo = self.create_dummy_repo('git') | |
147 | hooks_path = os.path.join(repo.path, 'hooks') |
|
159 | hooks_path = os.path.join(repo.path, 'hooks') | |
148 | non_rc_content = 'echo "non rc hook"\n' |
|
160 | non_rc_content = 'echo "non rc hook"\n' | |
149 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
161 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
150 | self._create_fake_hook( |
|
162 | self._create_fake_hook( | |
151 | file_path, content=non_rc_content) |
|
163 | file_path, content=non_rc_content) | |
152 |
|
164 | |||
153 | result = hook_utils.install_git_hooks( |
|
165 | result = hook_utils.install_git_hooks( | |
154 | repo.path, repo.bare, force_create=True) |
|
166 | repo.path, repo.bare, force_create=True) | |
155 | assert result |
|
167 | assert result | |
156 | self.check_hooks(repo.path, repo.bare) |
|
168 | self.check_hooks(repo.path, repo.bare) | |
157 |
|
169 | |||
158 |
|
170 | |||
159 | class TestInstallSvnHooks(BaseInstallHooks): |
|
171 | class TestInstallSvnHooks(BaseInstallHooks): | |
160 | HOOK_FILES = ('pre-commit', 'post-commit') |
|
172 | HOOK_FILES = ('pre-commit', 'post-commit') | |
161 |
|
173 | |||
162 | def test_hooks_are_installed(self): |
|
174 | def test_hooks_are_installed(self): | |
163 | repo = self.create_dummy_repo('svn') |
|
175 | repo = self.create_dummy_repo('svn') | |
164 | result = hook_utils.install_svn_hooks(repo.path) |
|
176 | result = hook_utils.install_svn_hooks(repo.path) | |
165 | assert result |
|
177 | assert result | |
166 | self.check_hooks(repo.path) |
|
178 | self.check_hooks(repo.path) | |
167 |
|
179 | |||
168 | def test_hooks_are_replaced(self): |
|
180 | def test_hooks_are_replaced(self): | |
169 | repo = self.create_dummy_repo('svn') |
|
181 | repo = self.create_dummy_repo('svn') | |
170 | hooks_path = os.path.join(repo.path, 'hooks') |
|
182 | hooks_path = os.path.join(repo.path, 'hooks') | |
171 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
183 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
172 | self._create_fake_hook( |
|
184 | self._create_fake_hook( | |
173 | file_path, content="RC_HOOK_VER = 'abcde'\n") |
|
185 | file_path, content="RC_HOOK_VER = 'abcde'\n") | |
174 |
|
186 | |||
175 | result = hook_utils.install_svn_hooks(repo.path) |
|
187 | result = hook_utils.install_svn_hooks(repo.path) | |
176 | assert result |
|
188 | assert result | |
177 | self.check_hooks(repo.path) |
|
189 | self.check_hooks(repo.path) | |
178 |
|
190 | |||
179 | def test_non_rc_hooks_are_not_replaced(self): |
|
191 | def test_non_rc_hooks_are_not_replaced(self): | |
180 | repo = self.create_dummy_repo('svn') |
|
192 | repo = self.create_dummy_repo('svn') | |
181 | hooks_path = os.path.join(repo.path, 'hooks') |
|
193 | hooks_path = os.path.join(repo.path, 'hooks') | |
182 | non_rc_content = 'echo "non rc hook"\n' |
|
194 | non_rc_content = 'echo "non rc hook"\n' | |
183 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
195 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
184 | self._create_fake_hook( |
|
196 | self._create_fake_hook( | |
185 | file_path, content=non_rc_content) |
|
197 | file_path, content=non_rc_content) | |
186 |
|
198 | |||
187 | result = hook_utils.install_svn_hooks(repo.path) |
|
199 | result = hook_utils.install_svn_hooks(repo.path) | |
188 | assert result |
|
200 | assert result | |
189 |
|
201 | |||
190 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
202 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
191 | with open(file_path, 'rt') as hook_file: |
|
203 | with open(file_path, 'rt') as hook_file: | |
192 | content = hook_file.read() |
|
204 | content = hook_file.read() | |
193 | assert content == non_rc_content |
|
205 | assert content == non_rc_content | |
194 |
|
206 | |||
195 | def test_non_rc_hooks_are_replaced_with_force_flag(self): |
|
207 | def test_non_rc_hooks_are_replaced_with_force_flag(self): | |
196 | repo = self.create_dummy_repo('svn') |
|
208 | repo = self.create_dummy_repo('svn') | |
197 | hooks_path = os.path.join(repo.path, 'hooks') |
|
209 | hooks_path = os.path.join(repo.path, 'hooks') | |
198 | non_rc_content = 'echo "non rc hook"\n' |
|
210 | non_rc_content = 'echo "non rc hook"\n' | |
199 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: |
|
211 | for file_path in [os.path.join(hooks_path, f) for f in self.HOOK_FILES]: | |
200 | self._create_fake_hook( |
|
212 | self._create_fake_hook( | |
201 | file_path, content=non_rc_content) |
|
213 | file_path, content=non_rc_content) | |
202 |
|
214 | |||
203 | result = hook_utils.install_svn_hooks( |
|
215 | result = hook_utils.install_svn_hooks( | |
204 | repo.path, force_create=True) |
|
216 | repo.path, force_create=True) | |
205 | assert result |
|
217 | assert result | |
206 | self.check_hooks(repo.path, ) |
|
218 | self.check_hooks(repo.path, ) | |
|
219 | ||||
|
220 | ||||
|
221 | def create_test_file(filename): | |||
|
222 | """Utility function to create a test file.""" | |||
|
223 | with open(filename, 'w') as f: | |||
|
224 | f.write("Test file") | |||
|
225 | ||||
|
226 | ||||
|
227 | def remove_test_file(filename): | |||
|
228 | """Utility function to remove a test file.""" | |||
|
229 | if os.path.exists(filename): | |||
|
230 | os.remove(filename) | |||
|
231 | ||||
|
232 | ||||
|
233 | @pytest.fixture | |||
|
234 | def test_file(): | |||
|
235 | filename = 'test_file.txt' | |||
|
236 | create_test_file(filename) | |||
|
237 | yield filename | |||
|
238 | remove_test_file(filename) | |||
|
239 | ||||
|
240 | ||||
|
241 | def test_increase_permissions(test_file): | |||
|
242 | # Set initial lower permissions | |||
|
243 | initial_perms = 0o644 | |||
|
244 | os.chmod(test_file, initial_perms) | |||
|
245 | ||||
|
246 | # Set higher permissions | |||
|
247 | new_perms = 0o666 | |||
|
248 | set_permissions_if_needed(test_file, new_perms) | |||
|
249 | ||||
|
250 | # Check if permissions were updated | |||
|
251 | assert (os.stat(test_file).st_mode & 0o777) == new_perms | |||
|
252 | ||||
|
253 | ||||
|
254 | def test_no_permission_change_needed(test_file): | |||
|
255 | # Set initial permissions | |||
|
256 | initial_perms = 0o666 | |||
|
257 | os.chmod(test_file, initial_perms) | |||
|
258 | ||||
|
259 | # Attempt to set the same permissions | |||
|
260 | set_permissions_if_needed(test_file, initial_perms) | |||
|
261 | ||||
|
262 | # Check if permissions were unchanged | |||
|
263 | assert (os.stat(test_file).st_mode & 0o777) == initial_perms | |||
|
264 | ||||
|
265 | ||||
|
266 | def test_no_permission_reduction(test_file): | |||
|
267 | # Set initial higher permissions | |||
|
268 | initial_perms = 0o666 | |||
|
269 | os.chmod(test_file, initial_perms) | |||
|
270 | ||||
|
271 | # Attempt to set lower permissions | |||
|
272 | lower_perms = 0o644 | |||
|
273 | set_permissions_if_needed(test_file, lower_perms) | |||
|
274 | ||||
|
275 | # Check if permissions were not reduced | |||
|
276 | assert (os.stat(test_file).st_mode & 0o777) == initial_perms | |||
|
277 | ||||
|
278 | ||||
|
279 | def test_no_permission_reduction_when_on_777(test_file): | |||
|
280 | # Set initial higher permissions | |||
|
281 | initial_perms = 0o777 | |||
|
282 | os.chmod(test_file, initial_perms) | |||
|
283 | ||||
|
284 | # Attempt to set lower permissions | |||
|
285 | lower_perms = 0o755 | |||
|
286 | set_permissions_if_needed(test_file, lower_perms) | |||
|
287 | ||||
|
288 | # Check if permissions were not reduced | |||
|
289 | assert (os.stat(test_file).st_mode & 0o777) == initial_perms |
General Comments 0
You need to be logged in to leave comments.
Login now