##// END OF EJS Templates
release: Release 5.2.0
andverb -
r1292:50174967 merge v5.2.0 stable
parent child Browse files
Show More
@@ -0,0 +1,53 b''
1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program; if not, write to the Free Software Foundation,
16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
18 import hashlib
19 from vcsserver.lib.str_utils import safe_bytes, safe_str
20
21
22 def md5(s):
23 return hashlib.md5(s).hexdigest()
24
25
26 def md5_safe(s, return_type=''):
27
28 val = md5(safe_bytes(s))
29 if return_type == 'str':
30 val = safe_str(val)
31 return val
32
33
34 def sha1(s):
35 return hashlib.sha1(s).hexdigest()
36
37
38 def sha1_safe(s, return_type=''):
39 val = sha1(safe_bytes(s))
40 if return_type == 'str':
41 val = safe_str(val)
42 return val
43
44
45 def sha256(s):
46 return hashlib.sha256(s).hexdigest()
47
48
49 def sha256_safe(s, return_type=''):
50 val = sha256(safe_bytes(s))
51 if return_type == 'str':
52 val = safe_str(val)
53 return val
@@ -1,144 +1,130 b''
1 .DEFAULT_GOAL := help
2
3 # Pretty print values cf. https://misc.flogisoft.com/bash/tip_colors_and_formatting
4 RESET := \033[0m # Reset all formatting
5 GREEN := \033[0;32m # Resets before setting 16b colour (32 -- green)
6 YELLOW := \033[0;33m
7 ORANGE := \033[0;38;5;208m # Reset then set 256b colour (208 -- orange)
8 PEACH := \033[0;38;5;216m
9
10
11 ## ---------------------------------------------------------------------------------- ##
12 ## ------------------------- Help usage builder ------------------------------------- ##
13 ## ---------------------------------------------------------------------------------- ##
14 # use '# >>> Build commands' to create section
15 # use '# target: target description' to create help for target
16 .PHONY: help
17 help:
18 @echo "Usage:"
19 @cat $(MAKEFILE_LIST) | grep -E '^# >>>|^# [A-Za-z0-9_.-]+:' | sed -E 's/^# //' | awk ' \
20 BEGIN { \
21 green="\033[32m"; \
22 yellow="\033[33m"; \
23 reset="\033[0m"; \
24 section=""; \
25 } \
26 /^>>>/ { \
27 section=substr($$0, 5); \
28 printf "\n" green ">>> %s" reset "\n", section; \
29 next; \
30 } \
31 /^([A-Za-z0-9_.-]+):/ { \
32 target=$$1; \
33 gsub(/:$$/, "", target); \
34 description=substr($$0, index($$0, ":") + 2); \
35 if (description == "") { description="-"; } \
36 printf " - " yellow "%-35s" reset " %s\n", target, description; \
37 } \
38 '
39
1 40 # required for pushd to work..
2 41 SHELL = /bin/bash
3 42
4
5 # set by: PATH_TO_OUTDATED_PACKAGES=/some/path/outdated_packages.py
6 OUTDATED_PACKAGES = ${PATH_TO_OUTDATED_PACKAGES}
43 # >>> Tests commands
7 44
8 45 .PHONY: clean
9 ## Cleanup compiled and cache py files
46 # clean: Cleanup compiled and cache py files
10 47 clean:
11 48 make test-clean
12 49 find . -type f \( -iname '*.c' -o -iname '*.pyc' -o -iname '*.so' -o -iname '*.orig' \) -exec rm '{}' ';'
13 50 find . -type d -name "build" -prune -exec rm -rf '{}' ';'
14 51
15 52
16 53 .PHONY: test
17 ## run test-clean and tests
54 # test: run test-clean and tests
18 55 test:
19 56 make test-clean
20 make test-only
57 unset RC_SQLALCHEMY_DB1_URL && unset RC_DB_URL && make test-only
21 58
22 59
23 60 .PHONY: test-clean
24 ## run test-clean and tests
61 # test-clean: run test-clean and tests
25 62 test-clean:
26 63 rm -rf coverage.xml htmlcov junit.xml pylint.log result
27 64 find . -type d -name "__pycache__" -prune -exec rm -rf '{}' ';'
28 65 find . -type f \( -iname '.coverage.*' \) -exec rm '{}' ';'
29 66
30 67
31 68 .PHONY: test-only
32 ## Run tests only without cleanup
69 # test-only: Run tests only without cleanup
33 70 test-only:
34 71 PYTHONHASHSEED=random \
35 72 py.test -x -vv -r xw -p no:sugar \
36 73 --cov-report=term-missing --cov-report=html \
37 74 --cov=vcsserver vcsserver
38 75
39
40 .PHONY: ruff-check
41 ## run a ruff analysis
42 ruff-check:
43 ruff check --ignore F401 --ignore I001 --ignore E402 --ignore E501 --ignore F841 --exclude rhodecode/lib/dbmigrate --exclude .eggs --exclude .dev .
76 # >>> Dev commands
44 77
45 .PHONY: pip-packages
46 ## Show outdated packages
47 pip-packages:
48 python ${OUTDATED_PACKAGES}
49
50
51 .PHONY: build
52 ## Build sdist/egg
53 build:
54 python -m build
55 78
56 79
57 80 .PHONY: dev-sh
58 ## make dev-sh
81 # dev-sh: make dev-sh
59 82 dev-sh:
60 83 sudo echo "deb [trusted=yes] https://apt.fury.io/rsteube/ /" | sudo tee -a "/etc/apt/sources.list.d/fury.list"
61 84 sudo apt-get update
62 85 sudo apt-get install -y zsh carapace-bin
63 86 rm -rf /home/rhodecode/.oh-my-zsh
64 87 curl https://raw.githubusercontent.com/robbyrussell/oh-my-zsh/master/tools/install.sh | sh
65 88 @echo "source <(carapace _carapace)" > /home/rhodecode/.zsrc
66 89 @echo "${RC_DEV_CMD_HELP}"
67 90 @PROMPT='%(?.%F{green}√.%F{red}?%?)%f %B%F{240}%1~%f%b %# ' zsh
68 91
69 92
70 93 .PHONY: dev-cleanup
71 ## Cleanup: pip freeze | grep -v "^-e" | grep -v "@" | xargs pip uninstall -y
94 # dev-cleanup: Cleanup: pip freeze | grep -v "^-e" | grep -v "@" | xargs pip uninstall -y
72 95 dev-cleanup:
73 96 pip freeze | grep -v "^-e" | grep -v "@" | xargs pip uninstall -y
74 97 rm -rf /tmp/*
75 98
76 99
77 100 .PHONY: dev-env
78 ## make dev-env based on the requirements files and install develop of packages
101 # dev-env: make dev-env based on the requirements files and install develop of packages
79 102 ## Cleanup: pip freeze | grep -v "^-e" | grep -v "@" | xargs pip uninstall -y
80 103 dev-env:
81 104 sudo -u root chown rhodecode:rhodecode /home/rhodecode/.cache/pip/
82 105 pip install build virtualenv
83 106 pip wheel --wheel-dir=/home/rhodecode/.cache/pip/wheels -r requirements.txt -r requirements_test.txt -r requirements_debug.txt
84 107 pip install --no-index --find-links=/home/rhodecode/.cache/pip/wheels -r requirements.txt -r requirements_test.txt -r requirements_debug.txt
85 108 pip install -e .
86 109
87 110
88 111 .PHONY: sh
89 ## shortcut for make dev-sh dev-env
112 # sh: shortcut for make dev-sh dev-env
90 113 sh:
91 114 make dev-env
92 115 make dev-sh
93 116
94 117
95 118 ## Allows changes of workers e.g make dev-srv-g workers=2
96 119 workers?=1
97 120
98 121 .PHONY: dev-srv
99 ## run gunicorn web server with reloader, use workers=N to set multiworker mode
122 # dev-srv: run gunicorn web server with reloader, use workers=N to set multiworker mode, workers=N allows changes of workers
100 123 dev-srv:
101 124 gunicorn --paste=.dev/dev.ini --bind=0.0.0.0:10010 --config=.dev/gunicorn_config.py --reload --workers=$(workers)
102 125
103
104 # Default command on calling make
105 .DEFAULT_GOAL := show-help
126 .PHONY: ruff-check
127 # ruff-check: run a ruff analysis
128 ruff-check:
129 ruff check --ignore F401 --ignore I001 --ignore E402 --ignore E501 --ignore F841 --exclude rhodecode/lib/dbmigrate --exclude .eggs --exclude .dev .
106 130
107 .PHONY: show-help
108 show-help:
109 @echo "$$(tput bold)Available rules:$$(tput sgr0)"
110 @echo
111 @sed -n -e "/^## / { \
112 h; \
113 s/.*//; \
114 :doc" \
115 -e "H; \
116 n; \
117 s/^## //; \
118 t doc" \
119 -e "s/:.*//; \
120 G; \
121 s/\\n## /---/; \
122 s/\\n/ /g; \
123 p; \
124 }" ${MAKEFILE_LIST} \
125 | LC_ALL='C' sort --ignore-case \
126 | awk -F '---' \
127 -v ncol=$$(tput cols) \
128 -v indent=19 \
129 -v col_on="$$(tput setaf 6)" \
130 -v col_off="$$(tput sgr0)" \
131 '{ \
132 printf "%s%*s%s ", col_on, -indent, $$1, col_off; \
133 n = split($$2, words, " "); \
134 line_length = ncol - indent; \
135 for (i = 1; i <= n; i++) { \
136 line_length -= length(words[i]) + 1; \
137 if (line_length <= 0) { \
138 line_length = ncol - indent - length(words[i]) - 1; \
139 printf "\n%*s ", -indent, " "; \
140 } \
141 printf "%s ", words[i]; \
142 } \
143 printf "\n"; \
144 }'
@@ -1,520 +1,545 b''
1 1 """
2 2 Gunicorn config extension and hooks. This config file adds some extra settings and memory management.
3 3 Gunicorn configuration should be managed by .ini files entries of RhodeCode or VCSServer
4 4 """
5 5
6 6 import gc
7 7 import os
8 8 import sys
9 9 import math
10 10 import time
11 11 import threading
12 12 import traceback
13 13 import random
14 14 import socket
15 15 import dataclasses
16 import json
16 17 from gunicorn.glogging import Logger
17 18
18 19
19 20 def get_workers():
20 21 import multiprocessing
21 22 return multiprocessing.cpu_count() * 2 + 1
22 23
23 24
24 25 bind = "127.0.0.1:10010"
25 26
26 27
27 28 # Error logging output for gunicorn (-) is stdout
28 29 errorlog = '-'
29 30
30 31 # Access logging output for gunicorn (-) is stdout
31 32 accesslog = '-'
32 33
33 34
34 35 # SERVER MECHANICS
35 36 # None == system temp dir
36 37 # worker_tmp_dir is recommended to be set to some tmpfs
37 38 worker_tmp_dir = None
38 39 tmp_upload_dir = None
39 40
40 # use re-use port logic
41 #reuse_port = True
41 # use re-use port logic to let linux internals load-balance the requests better.
42 reuse_port = True
42 43
43 44 # Custom log format
44 45 #access_log_format = (
45 46 # '%(t)s %(p)s INFO [GNCRN] %(h)-15s rqt:%(L)s %(s)s %(b)-6s "%(m)s:%(U)s %(q)s" usr:%(u)s "%(f)s" "%(a)s"')
46 47
47 48 # loki format for easier parsing in grafana
48 access_log_format = (
49 loki_access_log_format = (
49 50 'time="%(t)s" pid=%(p)s level="INFO" type="[GNCRN]" ip="%(h)-15s" rqt="%(L)s" response_code="%(s)s" response_bytes="%(b)-6s" uri="%(m)s:%(U)s %(q)s" user=":%(u)s" user_agent="%(a)s"')
50 51
52 # JSON format
53 json_access_log_format = json.dumps({
54 'time': r'%(t)s',
55 'pid': r'%(p)s',
56 'level': 'INFO',
57 'ip': r'%(h)s',
58 'request_time': r'%(L)s',
59 'remote_address': r'%(h)s',
60 'user_name': r'%(u)s',
61 'status': r'%(s)s',
62 'method': r'%(m)s',
63 'url_path': r'%(U)s',
64 'query_string': r'%(q)s',
65 'protocol': r'%(H)s',
66 'response_length': r'%(B)s',
67 'referer': r'%(f)s',
68 'user_agent': r'%(a)s',
69
70 })
71
72 access_log_format = loki_access_log_format
73 if os.environ.get('RC_LOGGING_FORMATTER') == 'json':
74 access_log_format = json_access_log_format
75
51 76 # self adjust workers based on CPU count, to use maximum of CPU and not overquota the resources
52 77 # workers = get_workers()
53 78
54 79 # Gunicorn access log level
55 80 loglevel = 'info'
56 81
57 82 # Process name visible in a process list
58 83 proc_name = "rhodecode_vcsserver"
59 84
60 85 # Type of worker class, one of `sync`, `gevent` or `gthread`
61 86 # currently `sync` is the only option allowed for vcsserver and for rhodecode all of 3 are allowed
62 87 # gevent:
63 88 # In this case, the maximum number of concurrent requests is (N workers * X worker_connections)
64 89 # e.g. workers =3 worker_connections=10 = 3*10, 30 concurrent requests can be handled
65 90 # gthread:
66 91 # In this case, the maximum number of concurrent requests is (N workers * X threads)
67 92 # e.g. workers = 3 threads=3 = 3*3, 9 concurrent requests can be handled
68 93 worker_class = 'sync'
69 94
70 95 # Sets the number of process workers. More workers means more concurrent connections
71 96 # RhodeCode can handle at the same time. Each additional worker also it increases
72 97 # memory usage as each has its own set of caches.
73 98 # The Recommended value is (2 * NUMBER_OF_CPUS + 1), eg 2CPU = 5 workers, but no more
74 99 # than 8-10 unless for huge deployments .e.g 700-1000 users.
75 100 # `instance_id = *` must be set in the [app:main] section below (which is the default)
76 101 # when using more than 1 worker.
77 102 workers = 2
78 103
79 104 # Threads numbers for worker class gthread
80 105 threads = 1
81 106
82 107 # The maximum number of simultaneous clients. Valid only for gevent
83 108 # In this case, the maximum number of concurrent requests is (N workers * X worker_connections)
84 109 # e.g workers =3 worker_connections=10 = 3*10, 30 concurrent requests can be handled
85 110 worker_connections = 10
86 111
87 112 # Max number of requests that worker will handle before being gracefully restarted.
88 113 # Prevents memory leaks, jitter adds variability so not all workers are restarted at once.
89 114 max_requests = 2000
90 115 max_requests_jitter = int(max_requests * 0.2) # 20% of max_requests
91 116
92 117 # The maximum number of pending connections.
93 118 # Exceeding this number results in the client getting an error when attempting to connect.
94 119 backlog = 64
95 120
96 121 # The Amount of time a worker can spend with handling a request before it
97 122 # gets killed and restarted. By default, set to 21600 (6hrs)
98 123 # Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
99 124 timeout = 21600
100 125
101 126 # The maximum size of HTTP request line in bytes.
102 127 # 0 for unlimited
103 128 limit_request_line = 0
104 129
105 130 # Limit the number of HTTP headers fields in a request.
106 131 # By default this value is 100 and can't be larger than 32768.
107 132 limit_request_fields = 32768
108 133
109 134 # Limit the allowed size of an HTTP request header field.
110 135 # Value is a positive number or 0.
111 136 # Setting it to 0 will allow unlimited header field sizes.
112 137 limit_request_field_size = 0
113 138
114 139 # Timeout for graceful workers restart.
115 140 # After receiving a restart signal, workers have this much time to finish
116 141 # serving requests. Workers still alive after the timeout (starting from the
117 142 # receipt of the restart signal) are force killed.
118 143 # Examples: 1800 (30min), 3600 (1hr), 7200 (2hr), 43200 (12h)
119 144 graceful_timeout = 21600
120 145
121 146 # The number of seconds to wait for requests on a Keep-Alive connection.
122 147 # Generally set in the 1-5 seconds range.
123 148 keepalive = 2
124 149
125 150 # Maximum memory usage that each worker can use before it will receive a
126 151 # graceful restart signal 0 = memory monitoring is disabled
127 152 # Examples: 268435456 (256MB), 536870912 (512MB)
128 153 # 1073741824 (1GB), 2147483648 (2GB), 4294967296 (4GB)
129 154 # Dynamic formula 1024 * 1024 * 256 == 256MBs
130 155 memory_max_usage = 0
131 156
132 157 # How often in seconds to check for memory usage for each gunicorn worker
133 158 memory_usage_check_interval = 60
134 159
135 160 # Threshold value for which we don't recycle worker if GarbageCollection
136 161 # frees up enough resources. Before each restart, we try to run GC on worker
137 162 # in case we get enough free memory after that; restart will not happen.
138 163 memory_usage_recovery_threshold = 0.8
139 164
140 165
141 166 @dataclasses.dataclass
142 167 class MemoryCheckConfig:
143 168 max_usage: int
144 169 check_interval: int
145 170 recovery_threshold: float
146 171
147 172
148 173 def _get_process_rss(pid=None):
149 174 try:
150 175 import psutil
151 176 if pid:
152 177 proc = psutil.Process(pid)
153 178 else:
154 179 proc = psutil.Process()
155 180 return proc.memory_info().rss
156 181 except Exception:
157 182 return None
158 183
159 184
160 185 def _get_config(ini_path):
161 186 import configparser
162 187
163 188 try:
164 189 config = configparser.RawConfigParser()
165 190 config.read(ini_path)
166 191 return config
167 192 except Exception:
168 193 return None
169 194
170 195
171 196 def get_memory_usage_params(config=None):
172 197 # memory spec defaults
173 198 _memory_max_usage = memory_max_usage
174 199 _memory_usage_check_interval = memory_usage_check_interval
175 200 _memory_usage_recovery_threshold = memory_usage_recovery_threshold
176 201
177 202 if config:
178 203 ini_path = os.path.abspath(config)
179 204 conf = _get_config(ini_path)
180 205
181 206 section = 'server:main'
182 207 if conf and conf.has_section(section):
183 208
184 209 if conf.has_option(section, 'memory_max_usage'):
185 210 _memory_max_usage = conf.getint(section, 'memory_max_usage')
186 211
187 212 if conf.has_option(section, 'memory_usage_check_interval'):
188 213 _memory_usage_check_interval = conf.getint(section, 'memory_usage_check_interval')
189 214
190 215 if conf.has_option(section, 'memory_usage_recovery_threshold'):
191 216 _memory_usage_recovery_threshold = conf.getfloat(section, 'memory_usage_recovery_threshold')
192 217
193 218 _memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
194 219 or _memory_max_usage)
195 220 _memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
196 221 or _memory_usage_check_interval)
197 222 _memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
198 223 or _memory_usage_recovery_threshold)
199 224
200 225 return MemoryCheckConfig(_memory_max_usage, _memory_usage_check_interval, _memory_usage_recovery_threshold)
201 226
202 227
203 228 def _time_with_offset(check_interval):
204 229 return time.time() - random.randint(0, check_interval/2.0)
205 230
206 231
207 232 def pre_fork(server, worker):
208 233 pass
209 234
210 235
211 236 def post_fork(server, worker):
212 237
213 238 memory_conf = get_memory_usage_params()
214 239 _memory_max_usage = memory_conf.max_usage
215 240 _memory_usage_check_interval = memory_conf.check_interval
216 241 _memory_usage_recovery_threshold = memory_conf.recovery_threshold
217 242
218 243 worker._memory_max_usage = int(os.environ.get('RC_GUNICORN_MEMORY_MAX_USAGE', '')
219 244 or _memory_max_usage)
220 245 worker._memory_usage_check_interval = int(os.environ.get('RC_GUNICORN_MEMORY_USAGE_CHECK_INTERVAL', '')
221 246 or _memory_usage_check_interval)
222 247 worker._memory_usage_recovery_threshold = float(os.environ.get('RC_GUNICORN_MEMORY_USAGE_RECOVERY_THRESHOLD', '')
223 248 or _memory_usage_recovery_threshold)
224 249
225 250 # register memory last check time, with some random offset so we don't recycle all
226 251 # at once
227 252 worker._last_memory_check_time = _time_with_offset(_memory_usage_check_interval)
228 253
229 254 if _memory_max_usage:
230 255 server.log.info("pid=[%-10s] WORKER spawned with max memory set at %s", worker.pid,
231 256 _format_data_size(_memory_max_usage))
232 257 else:
233 258 server.log.info("pid=[%-10s] WORKER spawned", worker.pid)
234 259
235 260
236 261 def pre_exec(server):
237 262 server.log.info("Forked child, re-executing.")
238 263
239 264
240 265 def on_starting(server):
241 266 server_lbl = '{} {}'.format(server.proc_name, server.address)
242 267 server.log.info("Server %s is starting.", server_lbl)
243 268 server.log.info('Config:')
244 269 server.log.info(f"\n{server.cfg}")
245 270 server.log.info(get_memory_usage_params())
246 271
247 272
248 273 def when_ready(server):
249 274 server.log.info("Server %s is ready. Spawning workers", server)
250 275
251 276
252 277 def on_reload(server):
253 278 pass
254 279
255 280
256 281 def _format_data_size(size, unit="B", precision=1, binary=True):
257 282 """Format a number using SI units (kilo, mega, etc.).
258 283
259 284 ``size``: The number as a float or int.
260 285
261 286 ``unit``: The unit name in plural form. Examples: "bytes", "B".
262 287
263 288 ``precision``: How many digits to the right of the decimal point. Default
264 289 is 1. 0 suppresses the decimal point.
265 290
266 291 ``binary``: If false, use base-10 decimal prefixes (kilo = K = 1000).
267 292 If true, use base-2 binary prefixes (kibi = Ki = 1024).
268 293
269 294 ``full_name``: If false (default), use the prefix abbreviation ("k" or
270 295 "Ki"). If true, use the full prefix ("kilo" or "kibi"). If false,
271 296 use abbreviation ("k" or "Ki").
272 297
273 298 """
274 299
275 300 if not binary:
276 301 base = 1000
277 302 multiples = ('', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
278 303 else:
279 304 base = 1024
280 305 multiples = ('', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi')
281 306
282 307 sign = ""
283 308 if size > 0:
284 309 m = int(math.log(size, base))
285 310 elif size < 0:
286 311 sign = "-"
287 312 size = -size
288 313 m = int(math.log(size, base))
289 314 else:
290 315 m = 0
291 316 if m > 8:
292 317 m = 8
293 318
294 319 if m == 0:
295 320 precision = '%.0f'
296 321 else:
297 322 precision = '%%.%df' % precision
298 323
299 324 size = precision % (size / math.pow(base, m))
300 325
301 326 return '%s%s %s%s' % (sign, size.strip(), multiples[m], unit)
302 327
303 328
304 329 def _check_memory_usage(worker):
305 330 _memory_max_usage = worker._memory_max_usage
306 331 if not _memory_max_usage:
307 332 return
308 333
309 334 _memory_usage_check_interval = worker._memory_usage_check_interval
310 335 _memory_usage_recovery_threshold = memory_max_usage * worker._memory_usage_recovery_threshold
311 336
312 337 elapsed = time.time() - worker._last_memory_check_time
313 338 if elapsed > _memory_usage_check_interval:
314 339 mem_usage = _get_process_rss()
315 340 if mem_usage and mem_usage > _memory_max_usage:
316 341 worker.log.info(
317 342 "memory usage %s > %s, forcing gc",
318 343 _format_data_size(mem_usage), _format_data_size(_memory_max_usage))
319 344 # Try to clean it up by forcing a full collection.
320 345 gc.collect()
321 346 mem_usage = _get_process_rss()
322 347 if mem_usage > _memory_usage_recovery_threshold:
323 348 # Didn't clean up enough, we'll have to terminate.
324 349 worker.log.warning(
325 350 "memory usage %s > %s after gc, quitting",
326 351 _format_data_size(mem_usage), _format_data_size(_memory_max_usage))
327 352 # This will cause worker to auto-restart itself
328 353 worker.alive = False
329 354 worker._last_memory_check_time = time.time()
330 355
331 356
332 357 def worker_int(worker):
333 358 worker.log.info("pid=[%-10s] worker received INT or QUIT signal", worker.pid)
334 359
335 360 # get traceback info, when a worker crashes
336 361 def get_thread_id(t_id):
337 362 id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
338 363 return id2name.get(t_id, "unknown_thread_id")
339 364
340 365 code = []
341 366 for thread_id, stack in sys._current_frames().items(): # noqa
342 367 code.append(
343 368 "\n# Thread: %s(%d)" % (get_thread_id(thread_id), thread_id))
344 369 for fname, lineno, name, line in traceback.extract_stack(stack):
345 370 code.append('File: "%s", line %d, in %s' % (fname, lineno, name))
346 371 if line:
347 372 code.append(" %s" % (line.strip()))
348 373 worker.log.debug("\n".join(code))
349 374
350 375
351 376 def worker_abort(worker):
352 377 worker.log.info("pid=[%-10s] worker received SIGABRT signal", worker.pid)
353 378
354 379
355 380 def worker_exit(server, worker):
356 381 worker.log.info("pid=[%-10s] worker exit", worker.pid)
357 382
358 383
359 384 def child_exit(server, worker):
360 385 worker.log.info("pid=[%-10s] worker child exit", worker.pid)
361 386
362 387
363 388 def pre_request(worker, req):
364 389 worker.start_time = time.time()
365 390 worker.log.debug(
366 391 "GNCRN PRE WORKER [cnt:%s]: %s %s", worker.nr, req.method, req.path)
367 392
368 393
369 394 def post_request(worker, req, environ, resp):
370 395 total_time = time.time() - worker.start_time
371 396 # Gunicorn sometimes has problems with reading the status_code
372 397 status_code = getattr(resp, 'status_code', '')
373 398 worker.log.debug(
374 399 "GNCRN POST WORKER [cnt:%s]: %s %s resp: %s, Load Time: %.4fs",
375 400 worker.nr, req.method, req.path, status_code, total_time)
376 401 _check_memory_usage(worker)
377 402
378 403
379 404 def _filter_proxy(ip):
380 405 """
381 406 Passed in IP addresses in HEADERS can be in a special format of multiple
382 407 ips. Those comma separated IPs are passed from various proxies in the
383 408 chain of request processing. The left-most being the original client.
384 409 We only care about the first IP which came from the org. client.
385 410
386 411 :param ip: ip string from headers
387 412 """
388 413 if ',' in ip:
389 414 _ips = ip.split(',')
390 415 _first_ip = _ips[0].strip()
391 416 return _first_ip
392 417 return ip
393 418
394 419
395 420 def _filter_port(ip):
396 421 """
397 422 Removes a port from ip, there are 4 main cases to handle here.
398 423 - ipv4 eg. 127.0.0.1
399 424 - ipv6 eg. ::1
400 425 - ipv4+port eg. 127.0.0.1:8080
401 426 - ipv6+port eg. [::1]:8080
402 427
403 428 :param ip:
404 429 """
405 430 def is_ipv6(ip_addr):
406 431 if hasattr(socket, 'inet_pton'):
407 432 try:
408 433 socket.inet_pton(socket.AF_INET6, ip_addr)
409 434 except socket.error:
410 435 return False
411 436 else:
412 437 return False
413 438 return True
414 439
415 440 if ':' not in ip: # must be ipv4 pure ip
416 441 return ip
417 442
418 443 if '[' in ip and ']' in ip: # ipv6 with port
419 444 return ip.split(']')[0][1:].lower()
420 445
421 446 # must be ipv6 or ipv4 with port
422 447 if is_ipv6(ip):
423 448 return ip
424 449 else:
425 450 ip, _port = ip.split(':')[:2] # means ipv4+port
426 451 return ip
427 452
428 453
429 454 def get_ip_addr(environ):
430 455 proxy_key = 'HTTP_X_REAL_IP'
431 456 proxy_key2 = 'HTTP_X_FORWARDED_FOR'
432 457 def_key = 'REMOTE_ADDR'
433 458
434 459 def _filters(x):
435 460 return _filter_port(_filter_proxy(x))
436 461
437 462 ip = environ.get(proxy_key)
438 463 if ip:
439 464 return _filters(ip)
440 465
441 466 ip = environ.get(proxy_key2)
442 467 if ip:
443 468 return _filters(ip)
444 469
445 470 ip = environ.get(def_key, '0.0.0.0')
446 471 return _filters(ip)
447 472
448 473
449 474 class RhodeCodeLogger(Logger):
450 475 """
451 476 Custom Logger that allows some customization that gunicorn doesn't allow
452 477 """
453 478
454 479 datefmt = r"%Y-%m-%d %H:%M:%S"
455 480
456 481 def __init__(self, cfg):
457 482 Logger.__init__(self, cfg)
458 483
459 484 def now(self):
460 485 """ return date in RhodeCode Log format """
461 486 now = time.time()
462 487 msecs = int((now - int(now)) * 1000)
463 488 return time.strftime(self.datefmt, time.localtime(now)) + '.{0:03d}'.format(msecs)
464 489
465 490 def atoms(self, resp, req, environ, request_time):
466 491 """ Gets atoms for log formatting.
467 492 """
468 493 status = resp.status
469 494 if isinstance(status, str):
470 495 status = status.split(None, 1)[0]
471 496 atoms = {
472 497 'h': get_ip_addr(environ),
473 498 'l': '-',
474 499 'u': self._get_user(environ) or '-',
475 500 't': self.now(),
476 501 'r': "%s %s %s" % (environ['REQUEST_METHOD'],
477 502 environ['RAW_URI'],
478 503 environ["SERVER_PROTOCOL"]),
479 504 's': status,
480 505 'm': environ.get('REQUEST_METHOD'),
481 506 'U': environ.get('PATH_INFO'),
482 507 'q': environ.get('QUERY_STRING'),
483 508 'H': environ.get('SERVER_PROTOCOL'),
484 509 'b': getattr(resp, 'sent', None) is not None and str(resp.sent) or '-',
485 510 'B': getattr(resp, 'sent', None),
486 511 'f': environ.get('HTTP_REFERER', '-'),
487 512 'a': environ.get('HTTP_USER_AGENT', '-'),
488 513 'T': request_time.seconds,
489 514 'D': (request_time.seconds * 1000000) + request_time.microseconds,
490 515 'M': (request_time.seconds * 1000) + int(request_time.microseconds/1000),
491 516 'L': "%d.%06d" % (request_time.seconds, request_time.microseconds),
492 517 'p': "<%s>" % os.getpid()
493 518 }
494 519
495 520 # add request headers
496 521 if hasattr(req, 'headers'):
497 522 req_headers = req.headers
498 523 else:
499 524 req_headers = req
500 525
501 526 if hasattr(req_headers, "items"):
502 527 req_headers = req_headers.items()
503 528
504 529 atoms.update({"{%s}i" % k.lower(): v for k, v in req_headers})
505 530
506 531 resp_headers = resp.headers
507 532 if hasattr(resp_headers, "items"):
508 533 resp_headers = resp_headers.items()
509 534
510 535 # add response headers
511 536 atoms.update({"{%s}o" % k.lower(): v for k, v in resp_headers})
512 537
513 538 # add environ variables
514 539 environ_variables = environ.items()
515 540 atoms.update({"{%s}e" % k.lower(): v for k, v in environ_variables})
516 541
517 542 return atoms
518 543
519 544
520 545 logger_class = RhodeCodeLogger
@@ -1,102 +1,102 b''
1 1 # deps, generated via pipdeptree --exclude setuptools,wheel,pipdeptree,pip -f | tr '[:upper:]' '[:lower:]'
2 2
3 3 async-timeout==4.0.3
4 4 atomicwrites==1.4.1
5 5 celery==5.3.6
6 6 billiard==4.2.0
7 7 click==8.1.3
8 8 click-didyoumean==0.3.0
9 9 click==8.1.3
10 10 click-plugins==1.1.1
11 11 click==8.1.3
12 12 click-repl==0.2.0
13 13 click==8.1.3
14 prompt-toolkit==3.0.38
15 wcwidth==0.2.6
14 prompt_toolkit==3.0.47
15 wcwidth==0.2.13
16 16 six==1.16.0
17 17 kombu==5.3.5
18 18 amqp==5.2.0
19 19 vine==5.1.0
20 20 vine==5.1.0
21 21 python-dateutil==2.8.2
22 22 six==1.16.0
23 23 tzdata==2024.1
24 24 vine==5.1.0
25 25 contextlib2==21.6.0
26 26 dogpile.cache==1.3.3
27 27 decorator==5.1.1
28 28 stevedore==5.1.0
29 29 pbr==5.11.1
30 30 dulwich==0.21.6
31 31 urllib3==1.26.14
32 fsspec==2024.6.0
33 gunicorn==21.2.0
34 packaging==24.0
32 fsspec==2024.9.0
33 gunicorn==23.0.0
34 packaging==24.1
35 35 hg-evolve==11.1.3
36 36 importlib-metadata==6.0.0
37 37 zipp==3.15.0
38 38 mercurial==6.7.4
39 39 more-itertools==9.1.0
40 40 msgpack==1.0.8
41 orjson==3.10.3
41 orjson==3.10.7
42 42 psutil==5.9.8
43 43 py==1.11.0
44 44 pygit2==1.13.3
45 45 cffi==1.16.0
46 46 pycparser==2.21
47 pygments==2.15.1
47 pygments==2.18.0
48 48 pyparsing==3.1.1
49 49 pyramid==2.0.2
50 50 hupper==1.12
51 51 plaster==1.1.2
52 52 plaster-pastedeploy==1.0.1
53 53 pastedeploy==3.1.0
54 54 plaster==1.1.2
55 55 translationstring==1.4
56 56 venusian==3.0.0
57 57 webob==1.8.7
58 58 zope.deprecation==5.0.0
59 zope.interface==6.3.0
60 redis==5.0.4
59 zope.interface==6.4.post2
60 redis==5.1.0
61 61 async-timeout==4.0.3
62 62 repoze.lru==0.7
63 s3fs==2024.6.0
63 s3fs==2024.9.0
64 64 aiobotocore==2.13.0
65 65 aiohttp==3.9.5
66 66 aiosignal==1.3.1
67 67 frozenlist==1.4.1
68 68 attrs==22.2.0
69 69 frozenlist==1.4.1
70 70 multidict==6.0.5
71 71 yarl==1.9.4
72 72 idna==3.4
73 73 multidict==6.0.5
74 74 aioitertools==0.11.0
75 75 botocore==1.34.106
76 76 jmespath==1.0.1
77 77 python-dateutil==2.8.2
78 78 six==1.16.0
79 79 urllib3==1.26.14
80 80 wrapt==1.16.0
81 81 aiohttp==3.9.5
82 82 aiosignal==1.3.1
83 83 frozenlist==1.4.1
84 84 attrs==22.2.0
85 85 frozenlist==1.4.1
86 86 multidict==6.0.5
87 87 yarl==1.9.4
88 88 idna==3.4
89 89 multidict==6.0.5
90 fsspec==2024.6.0
90 fsspec==2024.9.0
91 91 scandir==1.10.0
92 92 setproctitle==1.3.3
93 93 subvertpy==0.11.0
94 94 waitress==3.0.0
95 wcwidth==0.2.6
95 wcwidth==0.2.13
96 96
97 97
98 98 ## test related requirements
99 99 #-r requirements_test.txt
100 100
101 101 ## uncomment to add the debug libraries
102 102 #-r requirements_debug.txt
@@ -1,48 +1,48 b''
1 1 # test related requirements
2 2 mock==5.1.0
3 3 pytest-cov==4.1.0
4 4 coverage==7.4.3
5 5 pytest==8.1.1
6 6 iniconfig==2.0.0
7 packaging==24.0
7 packaging==24.1
8 8 pluggy==1.4.0
9 9 pytest-env==1.1.3
10 10 pytest==8.1.1
11 11 iniconfig==2.0.0
12 packaging==24.0
12 packaging==24.1
13 13 pluggy==1.4.0
14 14 pytest-profiling==1.7.0
15 15 gprof2dot==2022.7.29
16 16 pytest==8.1.1
17 17 iniconfig==2.0.0
18 packaging==24.0
18 packaging==24.1
19 19 pluggy==1.4.0
20 20 six==1.16.0
21 21 pytest-rerunfailures==13.0
22 packaging==24.0
22 packaging==24.1
23 23 pytest==8.1.1
24 24 iniconfig==2.0.0
25 packaging==24.0
25 packaging==24.1
26 26 pluggy==1.4.0
27 27 pytest-runner==6.0.1
28 28 pytest-sugar==1.0.0
29 packaging==24.0
29 packaging==24.1
30 30 pytest==8.1.1
31 31 iniconfig==2.0.0
32 packaging==24.0
32 packaging==24.1
33 33 pluggy==1.4.0
34 34 termcolor==2.4.0
35 35 pytest-timeout==2.3.1
36 36 pytest==8.1.1
37 37 iniconfig==2.0.0
38 packaging==24.0
38 packaging==24.1
39 39 pluggy==1.4.0
40 40 webtest==3.0.0
41 41 beautifulsoup4==4.12.3
42 42 soupsieve==2.5
43 43 waitress==3.0.0
44 44 webob==1.8.7
45 45
46 46 # RhodeCode test-data
47 47 rc_testdata @ https://code.rhodecode.com/upstream/rc-testdata-dist/raw/77378e9097f700b4c1b9391b56199fe63566b5c9/rc_testdata-0.11.0.tar.gz#egg=rc_testdata
48 48 rc_testdata==0.11.0
@@ -1,125 +1,131 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 """
19 19 Special exception handling over the wire.
20 20
21 21 Since we cannot assume that our client is able to import our exception classes,
22 22 this module provides a "wrapping" mechanism to raise plain exceptions
23 23 which contain an extra attribute `_vcs_kind` to allow a client to distinguish
24 24 different error conditions.
25 25 """
26 26
27 27 from pyramid.httpexceptions import HTTPLocked, HTTPForbidden
28 28
29 29
30 30 def _make_exception(kind, org_exc, *args):
31 31 """
32 32 Prepares a base `Exception` instance to be sent over the wire.
33 33
34 34 To give our caller a hint what this is about, it will attach an attribute
35 35 `_vcs_kind` to the exception.
36 36 """
37 37 exc = Exception(*args)
38 38 exc._vcs_kind = kind
39 39 exc._org_exc = org_exc
40 40 exc._org_exc_tb = getattr(org_exc, '_org_exc_tb', '')
41 41 return exc
42 42
43 43
44 44 def AbortException(org_exc=None):
45 45 def _make_exception_wrapper(*args):
46 46 return _make_exception('abort', org_exc, *args)
47 47 return _make_exception_wrapper
48 48
49 49
50 50 def ArchiveException(org_exc=None):
51 51 def _make_exception_wrapper(*args):
52 52 return _make_exception('archive', org_exc, *args)
53 53 return _make_exception_wrapper
54 54
55 55
56 def ClientNotSupportedException(org_exc=None):
57 def _make_exception_wrapper(*args):
58 return _make_exception('client_not_supported', org_exc, *args)
59 return _make_exception_wrapper
60
61
56 62 def LookupException(org_exc=None):
57 63 def _make_exception_wrapper(*args):
58 64 return _make_exception('lookup', org_exc, *args)
59 65 return _make_exception_wrapper
60 66
61 67
62 68 def VcsException(org_exc=None):
63 69 def _make_exception_wrapper(*args):
64 70 return _make_exception('error', org_exc, *args)
65 71 return _make_exception_wrapper
66 72
67 73
68 74 def RepositoryLockedException(org_exc=None):
69 75 def _make_exception_wrapper(*args):
70 76 return _make_exception('repo_locked', org_exc, *args)
71 77 return _make_exception_wrapper
72 78
73 79
74 80 def RepositoryBranchProtectedException(org_exc=None):
75 81 def _make_exception_wrapper(*args):
76 82 return _make_exception('repo_branch_protected', org_exc, *args)
77 83 return _make_exception_wrapper
78 84
79 85
80 86 def RequirementException(org_exc=None):
81 87 def _make_exception_wrapper(*args):
82 88 return _make_exception('requirement', org_exc, *args)
83 89 return _make_exception_wrapper
84 90
85 91
86 92 def UnhandledException(org_exc=None):
87 93 def _make_exception_wrapper(*args):
88 94 return _make_exception('unhandled', org_exc, *args)
89 95 return _make_exception_wrapper
90 96
91 97
92 98 def URLError(org_exc=None):
93 99 def _make_exception_wrapper(*args):
94 100 return _make_exception('url_error', org_exc, *args)
95 101 return _make_exception_wrapper
96 102
97 103
98 104 def SubrepoMergeException(org_exc=None):
99 105 def _make_exception_wrapper(*args):
100 106 return _make_exception('subrepo_merge_error', org_exc, *args)
101 107 return _make_exception_wrapper
102 108
103 109
104 110 class HTTPRepoLocked(HTTPLocked):
105 111 """
106 112 Subclass of HTTPLocked response that allows to set the title and status
107 113 code via constructor arguments.
108 114 """
109 115 def __init__(self, title, status_code=None, **kwargs):
110 116 self.code = status_code or HTTPLocked.code
111 117 self.title = title
112 118 super().__init__(**kwargs)
113 119
114 120
115 121 class HTTPRepoBranchProtected(HTTPForbidden):
116 122 def __init__(self, *args, **kwargs):
117 123 super(HTTPForbidden, self).__init__(*args, **kwargs)
118 124
119 125
120 126 class RefNotFoundException(KeyError):
121 127 pass
122 128
123 129
124 130 class NoContentException(ValueError):
125 131 pass
@@ -1,296 +1,302 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import re
19 19 import logging
20 20
21 from gunicorn.http.errors import NoMoreData
21 22 from pyramid.config import Configurator
22 23 from pyramid.response import Response, FileIter
23 24 from pyramid.httpexceptions import (
24 25 HTTPBadRequest, HTTPNotImplemented, HTTPNotFound, HTTPForbidden,
25 26 HTTPUnprocessableEntity)
26 27
27 28 from vcsserver.lib.ext_json import json
28 29 from vcsserver.git_lfs.lib import OidHandler, LFSOidStore
29 30 from vcsserver.git_lfs.utils import safe_result, get_cython_compat_decorator
30 31 from vcsserver.lib.str_utils import safe_int
31 32
32 33 log = logging.getLogger(__name__)
33 34
34 35
35 36 GIT_LFS_CONTENT_TYPE = 'application/vnd.git-lfs' # +json ?
36 37 GIT_LFS_PROTO_PAT = re.compile(r'^/(.+)/(info/lfs/(.+))')
37 38
38 39
39 40 def write_response_error(http_exception, text=None):
40 41 content_type = GIT_LFS_CONTENT_TYPE + '+json'
41 42 _exception = http_exception(content_type=content_type)
42 43 _exception.content_type = content_type
43 44 if text:
44 45 _exception.body = json.dumps({'message': text})
45 46 log.debug('LFS: writing response of type %s to client with text:%s',
46 47 http_exception, text)
47 48 return _exception
48 49
49 50
50 51 class AuthHeaderRequired:
51 52 """
52 53 Decorator to check if request has proper auth-header
53 54 """
54 55
55 56 def __call__(self, func):
56 57 return get_cython_compat_decorator(self.__wrapper, func)
57 58
58 59 def __wrapper(self, func, *fargs, **fkwargs):
59 60 request = fargs[1]
60 61 auth = request.authorization
61 62 if not auth:
62 63 return write_response_error(HTTPForbidden)
63 64 return func(*fargs[1:], **fkwargs)
64 65
65 66
66 67 # views
67 68
68 69 def lfs_objects(request):
69 70 # indicate not supported, V1 API
70 71 log.warning('LFS: v1 api not supported, reporting it back to client')
71 72 return write_response_error(HTTPNotImplemented, 'LFS: v1 api not supported')
72 73
73 74
74 75 @AuthHeaderRequired()
75 76 def lfs_objects_batch(request):
76 77 """
77 78 The client sends the following information to the Batch endpoint to transfer some objects:
78 79
79 80 operation - Should be download or upload.
80 81 transfers - An optional Array of String identifiers for transfer
81 82 adapters that the client has configured. If omitted, the basic
82 83 transfer adapter MUST be assumed by the server.
83 84 objects - An Array of objects to download.
84 85 oid - String OID of the LFS object.
85 86 size - Integer byte size of the LFS object. Must be at least zero.
86 87 """
87 88 request.response.content_type = GIT_LFS_CONTENT_TYPE + '+json'
88 89 auth = request.authorization
89 90 repo = request.matchdict.get('repo')
90 91 data = request.json
91 92 operation = data.get('operation')
92 93 http_scheme = request.registry.git_lfs_http_scheme
93 94
94 95 if operation not in ('download', 'upload'):
95 96 log.debug('LFS: unsupported operation:%s', operation)
96 97 return write_response_error(
97 98 HTTPBadRequest, f'unsupported operation mode: `{operation}`')
98 99
99 100 if 'objects' not in data:
100 101 log.debug('LFS: missing objects data')
101 102 return write_response_error(
102 103 HTTPBadRequest, 'missing objects data')
103 104
104 105 log.debug('LFS: handling operation of type: %s', operation)
105 106
106 107 objects = []
107 108 for o in data['objects']:
108 109 try:
109 110 oid = o['oid']
110 111 obj_size = o['size']
111 112 except KeyError:
112 113 log.exception('LFS, failed to extract data')
113 114 return write_response_error(
114 115 HTTPBadRequest, 'unsupported data in objects')
115 116
116 117 obj_data = {'oid': oid}
117 118 if http_scheme == 'http':
118 119 # Note(marcink): when using http, we might have a custom port
119 120 # so we skip setting it to http, url dispatch then wont generate a port in URL
120 121 # for development we need this
121 122 http_scheme = None
122 123
123 124 obj_href = request.route_url('lfs_objects_oid', repo=repo, oid=oid,
124 125 _scheme=http_scheme)
125 126 obj_verify_href = request.route_url('lfs_objects_verify', repo=repo,
126 127 _scheme=http_scheme)
127 128 store = LFSOidStore(
128 129 oid, repo, store_location=request.registry.git_lfs_store_path)
129 130 handler = OidHandler(
130 131 store, repo, auth, oid, obj_size, obj_data,
131 132 obj_href, obj_verify_href)
132 133
133 134 # this verifies also OIDs
134 135 actions, errors = handler.exec_operation(operation)
135 136 if errors:
136 137 log.warning('LFS: got following errors: %s', errors)
137 138 obj_data['errors'] = errors
138 139
139 140 if actions:
140 141 obj_data['actions'] = actions
141 142
142 143 obj_data['size'] = obj_size
143 144 obj_data['authenticated'] = True
144 145 objects.append(obj_data)
145 146
146 147 result = {'objects': objects, 'transfer': 'basic'}
147 148 log.debug('LFS Response %s', safe_result(result))
148 149
149 150 return result
150 151
151 152
152 153 def lfs_objects_oid_upload(request):
153 154 request.response.content_type = GIT_LFS_CONTENT_TYPE + '+json'
154 155 repo = request.matchdict.get('repo')
155 156 oid = request.matchdict.get('oid')
156 157 store = LFSOidStore(
157 158 oid, repo, store_location=request.registry.git_lfs_store_path)
158 159 engine = store.get_engine(mode='wb')
159 160 log.debug('LFS: starting chunked write of LFS oid: %s to storage', oid)
160 161
161 162 body = request.environ['wsgi.input']
162 163
163 164 with engine as f:
164 165 blksize = 64 * 1024 # 64kb
165 166 while True:
166 167 # read in chunks as stream comes in from Gunicorn
167 168 # this is a specific Gunicorn support function.
168 169 # might work differently on waitress
169 chunk = body.read(blksize)
170 try:
171 chunk = body.read(blksize)
172 except NoMoreData:
173 chunk = None
174
170 175 if not chunk:
171 176 break
177
172 178 f.write(chunk)
173 179
174 180 return {'upload': 'ok'}
175 181
176 182
177 183 def lfs_objects_oid_download(request):
178 184 repo = request.matchdict.get('repo')
179 185 oid = request.matchdict.get('oid')
180 186
181 187 store = LFSOidStore(
182 188 oid, repo, store_location=request.registry.git_lfs_store_path)
183 189 if not store.has_oid():
184 190 log.debug('LFS: oid %s does not exists in store', oid)
185 191 return write_response_error(
186 192 HTTPNotFound, f'requested file with oid `{oid}` not found in store')
187 193
188 194 # TODO(marcink): support range header ?
189 195 # Range: bytes=0-, `bytes=(\d+)\-.*`
190 196
191 197 f = open(store.oid_path, 'rb')
192 198 response = Response(
193 199 content_type='application/octet-stream', app_iter=FileIter(f))
194 200 response.headers.add('X-RC-LFS-Response-Oid', str(oid))
195 201 return response
196 202
197 203
198 204 def lfs_objects_verify(request):
199 205 request.response.content_type = GIT_LFS_CONTENT_TYPE + '+json'
200 206 repo = request.matchdict.get('repo')
201 207
202 208 data = request.json
203 209 oid = data.get('oid')
204 210 size = safe_int(data.get('size'))
205 211
206 212 if not (oid and size):
207 213 return write_response_error(
208 214 HTTPBadRequest, 'missing oid and size in request data')
209 215
210 216 store = LFSOidStore(
211 217 oid, repo, store_location=request.registry.git_lfs_store_path)
212 218 if not store.has_oid():
213 219 log.debug('LFS: oid %s does not exists in store', oid)
214 220 return write_response_error(
215 221 HTTPNotFound, f'oid `{oid}` does not exists in store')
216 222
217 223 store_size = store.size_oid()
218 224 if store_size != size:
219 225 msg = 'requested file size mismatch store size:{} requested:{}'.format(
220 226 store_size, size)
221 227 return write_response_error(
222 228 HTTPUnprocessableEntity, msg)
223 229
224 230 return {'message': {'size': 'ok', 'in_store': 'ok'}}
225 231
226 232
227 233 def lfs_objects_lock(request):
228 234 return write_response_error(
229 235 HTTPNotImplemented, 'GIT LFS locking api not supported')
230 236
231 237
232 238 def not_found(request):
233 239 return write_response_error(
234 240 HTTPNotFound, 'request path not found')
235 241
236 242
237 243 def lfs_disabled(request):
238 244 return write_response_error(
239 245 HTTPNotImplemented, 'GIT LFS disabled for this repo')
240 246
241 247
242 248 def git_lfs_app(config):
243 249
244 250 # v1 API deprecation endpoint
245 251 config.add_route('lfs_objects',
246 252 '/{repo:.*?[^/]}/info/lfs/objects')
247 253 config.add_view(lfs_objects, route_name='lfs_objects',
248 254 request_method='POST', renderer='json')
249 255
250 256 # locking API
251 257 config.add_route('lfs_objects_lock',
252 258 '/{repo:.*?[^/]}/info/lfs/locks')
253 259 config.add_view(lfs_objects_lock, route_name='lfs_objects_lock',
254 260 request_method=('POST', 'GET'), renderer='json')
255 261
256 262 config.add_route('lfs_objects_lock_verify',
257 263 '/{repo:.*?[^/]}/info/lfs/locks/verify')
258 264 config.add_view(lfs_objects_lock, route_name='lfs_objects_lock_verify',
259 265 request_method=('POST', 'GET'), renderer='json')
260 266
261 267 # batch API
262 268 config.add_route('lfs_objects_batch',
263 269 '/{repo:.*?[^/]}/info/lfs/objects/batch')
264 270 config.add_view(lfs_objects_batch, route_name='lfs_objects_batch',
265 271 request_method='POST', renderer='json')
266 272
267 273 # oid upload/download API
268 274 config.add_route('lfs_objects_oid',
269 275 '/{repo:.*?[^/]}/info/lfs/objects/{oid}')
270 276 config.add_view(lfs_objects_oid_upload, route_name='lfs_objects_oid',
271 277 request_method='PUT', renderer='json')
272 278 config.add_view(lfs_objects_oid_download, route_name='lfs_objects_oid',
273 279 request_method='GET', renderer='json')
274 280
275 281 # verification API
276 282 config.add_route('lfs_objects_verify',
277 283 '/{repo:.*?[^/]}/info/lfs/verify')
278 284 config.add_view(lfs_objects_verify, route_name='lfs_objects_verify',
279 285 request_method='POST', renderer='json')
280 286
281 287 # not found handler for API
282 288 config.add_notfound_view(not_found, renderer='json')
283 289
284 290
285 291 def create_app(git_lfs_enabled, git_lfs_store_path, git_lfs_http_scheme):
286 292 config = Configurator()
287 293 if git_lfs_enabled:
288 294 config.include(git_lfs_app)
289 295 config.registry.git_lfs_store_path = git_lfs_store_path
290 296 config.registry.git_lfs_http_scheme = git_lfs_http_scheme
291 297 else:
292 298 # not found handler for API, reporting disabled LFS support
293 299 config.add_notfound_view(lfs_disabled, renderer='json')
294 300
295 301 app = config.make_wsgi_app()
296 302 return app
@@ -1,822 +1,824 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import io
19 19 import os
20 20 import sys
21 21 import logging
22 22 import collections
23 23 import base64
24 24 import msgpack
25 25 import dataclasses
26 26 import pygit2
27 27
28 28 import http.client
29 29 from celery import Celery
30 30
31 31 import mercurial.scmutil
32 32 import mercurial.node
33 33
34 34 from vcsserver import exceptions, subprocessio, settings
35 35 from vcsserver.lib.ext_json import json
36 36 from vcsserver.lib.str_utils import ascii_str, safe_str
37 37 from vcsserver.lib.svn_txn_utils import get_txn_id_from_store
38 38 from vcsserver.remote.git_remote import Repository
39 39
40 40 celery_app = Celery('__vcsserver__')
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class HooksHttpClient:
45 45 proto = 'msgpack.v1'
46 46 connection = None
47 47
48 48 def __init__(self, hooks_uri):
49 49 self.hooks_uri = hooks_uri
50 50
51 51 def __repr__(self):
52 52 return f'{self.__class__}(hook_uri={self.hooks_uri}, proto={self.proto})'
53 53
54 54 def __call__(self, method, extras):
55 55 connection = http.client.HTTPConnection(self.hooks_uri)
56 56 # binary msgpack body
57 57 headers, body = self._serialize(method, extras)
58 58 log.debug('Doing a new hooks call using HTTPConnection to %s', self.hooks_uri)
59 59
60 60 try:
61 61 try:
62 62 connection.request('POST', '/', body, headers)
63 63 except Exception as error:
64 64 log.error('Hooks calling Connection failed on %s, org error: %s', connection.__dict__, error)
65 65 raise
66 66
67 67 response = connection.getresponse()
68 68 try:
69 69 return msgpack.load(response)
70 70 except Exception:
71 71 response_data = response.read()
72 72 log.exception('Failed to decode hook response json data. '
73 73 'response_code:%s, raw_data:%s',
74 74 response.status, response_data)
75 75 raise
76 76 finally:
77 77 connection.close()
78 78
79 79 @classmethod
80 80 def _serialize(cls, hook_name, extras):
81 81 data = {
82 82 'method': hook_name,
83 83 'extras': extras
84 84 }
85 85 headers = {
86 86 "rc-hooks-protocol": cls.proto,
87 87 "Connection": "keep-alive"
88 88 }
89 89 return headers, msgpack.packb(data)
90 90
91 91
92 92 class HooksCeleryClient:
93 93 TASK_TIMEOUT = 60 # time in seconds
94 94
95 95 def __init__(self, queue, backend):
96 96 celery_app.config_from_object({
97 97 'broker_url': queue, 'result_backend': backend,
98 98 'broker_connection_retry_on_startup': True,
99 99 'task_serializer': 'json',
100 100 'accept_content': ['json', 'msgpack'],
101 101 'result_serializer': 'json',
102 102 'result_accept_content': ['json', 'msgpack']
103 103 })
104 104 self.celery_app = celery_app
105 105
106 106 def __call__(self, method, extras):
107 107 inquired_task = self.celery_app.signature(
108 108 f'rhodecode.lib.celerylib.tasks.{method}'
109 109 )
110 110 return inquired_task.delay(extras).get(timeout=self.TASK_TIMEOUT)
111 111
112 112
113 113 class HooksShadowRepoClient:
114 114
115 115 def __call__(self, hook_name, extras):
116 116 return {'output': '', 'status': 0}
117 117
118 118
119 119 class RemoteMessageWriter:
120 120 """Writer base class."""
121 121 def write(self, message):
122 122 raise NotImplementedError()
123 123
124 124
125 125 class HgMessageWriter(RemoteMessageWriter):
126 126 """Writer that knows how to send messages to mercurial clients."""
127 127
128 128 def __init__(self, ui):
129 129 self.ui = ui
130 130
131 131 def write(self, message: str):
132 132 # TODO: Check why the quiet flag is set by default.
133 133 old = self.ui.quiet
134 134 self.ui.quiet = False
135 135 self.ui.status(message.encode('utf-8'))
136 136 self.ui.quiet = old
137 137
138 138
139 139 class GitMessageWriter(RemoteMessageWriter):
140 140 """Writer that knows how to send messages to git clients."""
141 141
142 142 def __init__(self, stdout=None):
143 143 self.stdout = stdout or sys.stdout
144 144
145 145 def write(self, message: str):
146 146 self.stdout.write(message)
147 147
148 148
149 149 class SvnMessageWriter(RemoteMessageWriter):
150 150 """Writer that knows how to send messages to svn clients."""
151 151
152 152 def __init__(self, stderr=None):
153 153 # SVN needs data sent to stderr for back-to-client messaging
154 154 self.stderr = stderr or sys.stderr
155 155
156 156 def write(self, message):
157 157 self.stderr.write(message)
158 158
159 159
160 160 def _handle_exception(result):
161 161 exception_class = result.get('exception')
162 162 exception_traceback = result.get('exception_traceback')
163 163 log.debug('Handling hook-call exception: %s', exception_class)
164 164
165 165 if exception_traceback:
166 166 log.error('Got traceback from remote call:%s', exception_traceback)
167 167
168 168 if exception_class == 'HTTPLockedRC':
169 169 raise exceptions.RepositoryLockedException()(*result['exception_args'])
170 elif exception_class == 'ClientNotSupportedError':
171 raise exceptions.ClientNotSupportedException()(*result['exception_args'])
170 172 elif exception_class == 'HTTPBranchProtected':
171 173 raise exceptions.RepositoryBranchProtectedException()(*result['exception_args'])
172 174 elif exception_class == 'RepositoryError':
173 175 raise exceptions.VcsException()(*result['exception_args'])
174 176 elif exception_class:
175 177 raise Exception(
176 178 f"""Got remote exception "{exception_class}" with args "{result['exception_args']}" """
177 179 )
178 180
179 181
180 182 def _get_hooks_client(extras):
181 183 hooks_uri = extras.get('hooks_uri')
182 184 task_queue = extras.get('task_queue')
183 185 task_backend = extras.get('task_backend')
184 186 is_shadow_repo = extras.get('is_shadow_repo')
185 187
186 188 if hooks_uri:
187 189 return HooksHttpClient(hooks_uri)
188 190 elif task_queue and task_backend:
189 191 return HooksCeleryClient(task_queue, task_backend)
190 192 elif is_shadow_repo:
191 193 return HooksShadowRepoClient()
192 194 else:
193 195 raise Exception("Hooks client not found!")
194 196
195 197
196 198 def _call_hook(hook_name, extras, writer):
197 199 hooks_client = _get_hooks_client(extras)
198 200 log.debug('Hooks, using client:%s', hooks_client)
199 201 result = hooks_client(hook_name, extras)
200 202 log.debug('Hooks got result: %s', result)
201 203 _handle_exception(result)
202 204 writer.write(result['output'])
203 205
204 206 return result['status']
205 207
206 208
207 209 def _extras_from_ui(ui):
208 210 hook_data = ui.config(b'rhodecode', b'RC_SCM_DATA')
209 211 if not hook_data:
210 212 # maybe it's inside environ ?
211 213 env_hook_data = os.environ.get('RC_SCM_DATA')
212 214 if env_hook_data:
213 215 hook_data = env_hook_data
214 216
215 217 extras = {}
216 218 if hook_data:
217 219 extras = json.loads(hook_data)
218 220 return extras
219 221
220 222
221 223 def _rev_range_hash(repo, node, check_heads=False):
222 224 from vcsserver.hgcompat import get_ctx
223 225
224 226 commits = []
225 227 revs = []
226 228 start = get_ctx(repo, node).rev()
227 229 end = len(repo)
228 230 for rev in range(start, end):
229 231 revs.append(rev)
230 232 ctx = get_ctx(repo, rev)
231 233 commit_id = ascii_str(mercurial.node.hex(ctx.node()))
232 234 branch = safe_str(ctx.branch())
233 235 commits.append((commit_id, branch))
234 236
235 237 parent_heads = []
236 238 if check_heads:
237 239 parent_heads = _check_heads(repo, start, end, revs)
238 240 return commits, parent_heads
239 241
240 242
241 243 def _check_heads(repo, start, end, commits):
242 244 from vcsserver.hgcompat import get_ctx
243 245 changelog = repo.changelog
244 246 parents = set()
245 247
246 248 for new_rev in commits:
247 249 for p in changelog.parentrevs(new_rev):
248 250 if p == mercurial.node.nullrev:
249 251 continue
250 252 if p < start:
251 253 parents.add(p)
252 254
253 255 for p in parents:
254 256 branch = get_ctx(repo, p).branch()
255 257 # The heads descending from that parent, on the same branch
256 258 parent_heads = {p}
257 259 reachable = {p}
258 260 for x in range(p + 1, end):
259 261 if get_ctx(repo, x).branch() != branch:
260 262 continue
261 263 for pp in changelog.parentrevs(x):
262 264 if pp in reachable:
263 265 reachable.add(x)
264 266 parent_heads.discard(pp)
265 267 parent_heads.add(x)
266 268 # More than one head? Suggest merging
267 269 if len(parent_heads) > 1:
268 270 return list(parent_heads)
269 271
270 272 return []
271 273
272 274
273 275 def _get_git_env():
274 276 env = {}
275 277 for k, v in os.environ.items():
276 278 if k.startswith('GIT'):
277 279 env[k] = v
278 280
279 281 # serialized version
280 282 return [(k, v) for k, v in env.items()]
281 283
282 284
283 285 def _get_hg_env(old_rev, new_rev, txnid, repo_path):
284 286 env = {}
285 287 for k, v in os.environ.items():
286 288 if k.startswith('HG'):
287 289 env[k] = v
288 290
289 291 env['HG_NODE'] = old_rev
290 292 env['HG_NODE_LAST'] = new_rev
291 293 env['HG_TXNID'] = txnid
292 294 env['HG_PENDING'] = repo_path
293 295
294 296 return [(k, v) for k, v in env.items()]
295 297
296 298
297 299 def _get_ini_settings(ini_file):
298 300 from vcsserver.http_main import sanitize_settings_and_apply_defaults
299 301 from vcsserver.lib.config_utils import get_app_config_lightweight, configure_and_store_settings
300 302
301 303 global_config = {'__file__': ini_file}
302 304 ini_settings = get_app_config_lightweight(ini_file)
303 305 sanitize_settings_and_apply_defaults(global_config, ini_settings)
304 306 configure_and_store_settings(global_config, ini_settings)
305 307
306 308 return ini_settings
307 309
308 310
309 311 def _fix_hooks_executables(ini_path=''):
310 312 """
311 313 This is a trick to set proper settings.EXECUTABLE paths for certain execution patterns
312 314 especially for subversion where hooks strip entire env, and calling just 'svn' command will most likely fail
313 315 because svn is not on PATH
314 316 """
315 317 # set defaults, in case we can't read from ini_file
316 318 core_binary_dir = settings.BINARY_DIR or '/usr/local/bin/rhodecode_bin/vcs_bin'
317 319 if ini_path:
318 320 ini_settings = _get_ini_settings(ini_path)
319 321 core_binary_dir = ini_settings['core.binary_dir']
320 322
321 323 settings.BINARY_DIR = core_binary_dir
322 324
323 325
324 326 def repo_size(ui, repo, **kwargs):
325 327 extras = _extras_from_ui(ui)
326 328 return _call_hook('repo_size', extras, HgMessageWriter(ui))
327 329
328 330
329 331 def pre_pull(ui, repo, **kwargs):
330 332 extras = _extras_from_ui(ui)
331 333 return _call_hook('pre_pull', extras, HgMessageWriter(ui))
332 334
333 335
334 336 def pre_pull_ssh(ui, repo, **kwargs):
335 337 extras = _extras_from_ui(ui)
336 338 if extras and extras.get('SSH'):
337 339 return pre_pull(ui, repo, **kwargs)
338 340 return 0
339 341
340 342
341 343 def post_pull(ui, repo, **kwargs):
342 344 extras = _extras_from_ui(ui)
343 345 return _call_hook('post_pull', extras, HgMessageWriter(ui))
344 346
345 347
346 348 def post_pull_ssh(ui, repo, **kwargs):
347 349 extras = _extras_from_ui(ui)
348 350 if extras and extras.get('SSH'):
349 351 return post_pull(ui, repo, **kwargs)
350 352 return 0
351 353
352 354
353 355 def pre_push(ui, repo, node=None, **kwargs):
354 356 """
355 357 Mercurial pre_push hook
356 358 """
357 359 extras = _extras_from_ui(ui)
358 360 detect_force_push = extras.get('detect_force_push')
359 361
360 362 rev_data = []
361 363 hook_type: str = safe_str(kwargs.get('hooktype'))
362 364
363 365 if node and hook_type == 'pretxnchangegroup':
364 366 branches = collections.defaultdict(list)
365 367 commits, _heads = _rev_range_hash(repo, node, check_heads=detect_force_push)
366 368 for commit_id, branch in commits:
367 369 branches[branch].append(commit_id)
368 370
369 371 for branch, commits in branches.items():
370 372 old_rev = ascii_str(kwargs.get('node_last')) or commits[0]
371 373 rev_data.append({
372 374 'total_commits': len(commits),
373 375 'old_rev': old_rev,
374 376 'new_rev': commits[-1],
375 377 'ref': '',
376 378 'type': 'branch',
377 379 'name': branch,
378 380 })
379 381
380 382 for push_ref in rev_data:
381 383 push_ref['multiple_heads'] = _heads
382 384
383 385 repo_path = os.path.join(
384 386 extras.get('repo_store', ''), extras.get('repository', ''))
385 387 push_ref['hg_env'] = _get_hg_env(
386 388 old_rev=push_ref['old_rev'],
387 389 new_rev=push_ref['new_rev'], txnid=ascii_str(kwargs.get('txnid')),
388 390 repo_path=repo_path)
389 391
390 392 extras['hook_type'] = hook_type or 'pre_push'
391 393 extras['commit_ids'] = rev_data
392 394
393 395 return _call_hook('pre_push', extras, HgMessageWriter(ui))
394 396
395 397
396 398 def pre_push_ssh(ui, repo, node=None, **kwargs):
397 399 extras = _extras_from_ui(ui)
398 400 if extras.get('SSH'):
399 401 return pre_push(ui, repo, node, **kwargs)
400 402
401 403 return 0
402 404
403 405
404 406 def pre_push_ssh_auth(ui, repo, node=None, **kwargs):
405 407 """
406 408 Mercurial pre_push hook for SSH
407 409 """
408 410 extras = _extras_from_ui(ui)
409 411 if extras.get('SSH'):
410 412 permission = extras['SSH_PERMISSIONS']
411 413
412 414 if 'repository.write' == permission or 'repository.admin' == permission:
413 415 return 0
414 416
415 417 # non-zero ret code
416 418 return 1
417 419
418 420 return 0
419 421
420 422
421 423 def post_push(ui, repo, node, **kwargs):
422 424 """
423 425 Mercurial post_push hook
424 426 """
425 427 extras = _extras_from_ui(ui)
426 428
427 429 commit_ids = []
428 430 branches = []
429 431 bookmarks = []
430 432 tags = []
431 433 hook_type: str = safe_str(kwargs.get('hooktype'))
432 434
433 435 commits, _heads = _rev_range_hash(repo, node)
434 436 for commit_id, branch in commits:
435 437 commit_ids.append(commit_id)
436 438 if branch not in branches:
437 439 branches.append(branch)
438 440
439 441 if hasattr(ui, '_rc_pushkey_bookmarks'):
440 442 bookmarks = ui._rc_pushkey_bookmarks
441 443
442 444 extras['hook_type'] = hook_type or 'post_push'
443 445 extras['commit_ids'] = commit_ids
444 446
445 447 extras['new_refs'] = {
446 448 'branches': branches,
447 449 'bookmarks': bookmarks,
448 450 'tags': tags
449 451 }
450 452
451 453 return _call_hook('post_push', extras, HgMessageWriter(ui))
452 454
453 455
454 456 def post_push_ssh(ui, repo, node, **kwargs):
455 457 """
456 458 Mercurial post_push hook for SSH
457 459 """
458 460 if _extras_from_ui(ui).get('SSH'):
459 461 return post_push(ui, repo, node, **kwargs)
460 462 return 0
461 463
462 464
463 465 def key_push(ui, repo, **kwargs):
464 466 from vcsserver.hgcompat import get_ctx
465 467
466 468 if kwargs['new'] != b'0' and kwargs['namespace'] == b'bookmarks':
467 469 # store new bookmarks in our UI object propagated later to post_push
468 470 ui._rc_pushkey_bookmarks = get_ctx(repo, kwargs['key']).bookmarks()
469 471 return
470 472
471 473
472 474 # backward compat
473 475 log_pull_action = post_pull
474 476
475 477 # backward compat
476 478 log_push_action = post_push
477 479
478 480
479 481 def handle_git_pre_receive(unused_repo_path, unused_revs, unused_env):
480 482 """
481 483 Old hook name: keep here for backward compatibility.
482 484
483 485 This is only required when the installed git hooks are not upgraded.
484 486 """
485 487 pass
486 488
487 489
488 490 def handle_git_post_receive(unused_repo_path, unused_revs, unused_env):
489 491 """
490 492 Old hook name: keep here for backward compatibility.
491 493
492 494 This is only required when the installed git hooks are not upgraded.
493 495 """
494 496 pass
495 497
496 498
497 499 @dataclasses.dataclass
498 500 class HookResponse:
499 501 status: int
500 502 output: str
501 503
502 504
503 505 def git_pre_pull(extras) -> HookResponse:
504 506 """
505 507 Pre pull hook.
506 508
507 509 :param extras: dictionary containing the keys defined in simplevcs
508 510 :type extras: dict
509 511
510 512 :return: status code of the hook. 0 for success.
511 513 :rtype: int
512 514 """
513 515
514 516 if 'pull' not in extras['hooks']:
515 517 return HookResponse(0, '')
516 518
517 519 stdout = io.StringIO()
518 520 try:
519 521 status_code = _call_hook('pre_pull', extras, GitMessageWriter(stdout))
520 522
521 523 except Exception as error:
522 524 log.exception('Failed to call pre_pull hook')
523 525 status_code = 128
524 526 stdout.write(f'ERROR: {error}\n')
525 527
526 528 return HookResponse(status_code, stdout.getvalue())
527 529
528 530
529 531 def git_post_pull(extras) -> HookResponse:
530 532 """
531 533 Post pull hook.
532 534
533 535 :param extras: dictionary containing the keys defined in simplevcs
534 536 :type extras: dict
535 537
536 538 :return: status code of the hook. 0 for success.
537 539 :rtype: int
538 540 """
539 541 if 'pull' not in extras['hooks']:
540 542 return HookResponse(0, '')
541 543
542 544 stdout = io.StringIO()
543 545 try:
544 546 status = _call_hook('post_pull', extras, GitMessageWriter(stdout))
545 547 except Exception as error:
546 548 status = 128
547 549 stdout.write(f'ERROR: {error}\n')
548 550
549 551 return HookResponse(status, stdout.getvalue())
550 552
551 553
552 554 def _parse_git_ref_lines(revision_lines):
553 555 rev_data = []
554 556 for revision_line in revision_lines or []:
555 557 old_rev, new_rev, ref = revision_line.strip().split(' ')
556 558 ref_data = ref.split('/', 2)
557 559 if ref_data[1] in ('tags', 'heads'):
558 560 rev_data.append({
559 561 # NOTE(marcink):
560 562 # we're unable to tell total_commits for git at this point
561 563 # but we set the variable for consistency with GIT
562 564 'total_commits': -1,
563 565 'old_rev': old_rev,
564 566 'new_rev': new_rev,
565 567 'ref': ref,
566 568 'type': ref_data[1],
567 569 'name': ref_data[2],
568 570 })
569 571 return rev_data
570 572
571 573
572 574 def git_pre_receive(unused_repo_path, revision_lines, env) -> int:
573 575 """
574 576 Pre push hook.
575 577
576 578 :return: status code of the hook. 0 for success.
577 579 """
578 580 extras = json.loads(env['RC_SCM_DATA'])
579 581 rev_data = _parse_git_ref_lines(revision_lines)
580 582 if 'push' not in extras['hooks']:
581 583 return 0
582 584 _fix_hooks_executables(env.get('RC_INI_FILE'))
583 585
584 586 empty_commit_id = '0' * 40
585 587
586 588 detect_force_push = extras.get('detect_force_push')
587 589
588 590 for push_ref in rev_data:
589 591 # store our git-env which holds the temp store
590 592 push_ref['git_env'] = _get_git_env()
591 593 push_ref['pruned_sha'] = ''
592 594 if not detect_force_push:
593 595 # don't check for forced-push when we don't need to
594 596 continue
595 597
596 598 type_ = push_ref['type']
597 599 new_branch = push_ref['old_rev'] == empty_commit_id
598 600 delete_branch = push_ref['new_rev'] == empty_commit_id
599 601 if type_ == 'heads' and not (new_branch or delete_branch):
600 602 old_rev = push_ref['old_rev']
601 603 new_rev = push_ref['new_rev']
602 604 cmd = [settings.GIT_EXECUTABLE(), 'rev-list', old_rev, f'^{new_rev}']
603 605 stdout, stderr = subprocessio.run_command(
604 606 cmd, env=os.environ.copy())
605 607 # means we're having some non-reachable objects, this forced push was used
606 608 if stdout:
607 609 push_ref['pruned_sha'] = stdout.splitlines()
608 610
609 611 extras['hook_type'] = 'pre_receive'
610 612 extras['commit_ids'] = rev_data
611 613
612 614 stdout = sys.stdout
613 615 status_code = _call_hook('pre_push', extras, GitMessageWriter(stdout))
614 616
615 617 return status_code
616 618
617 619
618 620 def git_post_receive(unused_repo_path, revision_lines, env) -> int:
619 621 """
620 622 Post push hook.
621 623
622 624 :return: status code of the hook. 0 for success.
623 625 """
624 626 extras = json.loads(env['RC_SCM_DATA'])
625 627 if 'push' not in extras['hooks']:
626 628 return 0
627 629
628 630 _fix_hooks_executables(env.get('RC_INI_FILE'))
629 631
630 632 rev_data = _parse_git_ref_lines(revision_lines)
631 633
632 634 git_revs = []
633 635
634 636 # N.B.(skreft): it is ok to just call git, as git before calling a
635 637 # subcommand sets the PATH environment variable so that it point to the
636 638 # correct version of the git executable.
637 639 empty_commit_id = '0' * 40
638 640 branches = []
639 641 tags = []
640 642 for push_ref in rev_data:
641 643 type_ = push_ref['type']
642 644
643 645 if type_ == 'heads':
644 646 # starting new branch case
645 647 if push_ref['old_rev'] == empty_commit_id:
646 648 push_ref_name = push_ref['name']
647 649
648 650 if push_ref_name not in branches:
649 651 branches.append(push_ref_name)
650 652
651 653 need_head_set = ''
652 654 with Repository(os.getcwd()) as repo:
653 655 try:
654 656 repo.head
655 657 except pygit2.GitError:
656 658 need_head_set = f'refs/heads/{push_ref_name}'
657 659
658 660 if need_head_set:
659 661 repo.set_head(need_head_set)
660 662 print(f"Setting default branch to {push_ref_name}")
661 663
662 664 cmd = [settings.GIT_EXECUTABLE(), 'for-each-ref', '--format=%(refname)', 'refs/heads/*']
663 665 stdout, stderr = subprocessio.run_command(
664 666 cmd, env=os.environ.copy())
665 667 heads = safe_str(stdout)
666 668 heads = heads.replace(push_ref['ref'], '')
667 669 heads = ' '.join(head for head
668 670 in heads.splitlines() if head) or '.'
669 671 cmd = [settings.GIT_EXECUTABLE(), 'log', '--reverse',
670 672 '--pretty=format:%H', '--', push_ref['new_rev'],
671 673 '--not', heads]
672 674 stdout, stderr = subprocessio.run_command(
673 675 cmd, env=os.environ.copy())
674 676 git_revs.extend(list(map(ascii_str, stdout.splitlines())))
675 677
676 678 # delete branch case
677 679 elif push_ref['new_rev'] == empty_commit_id:
678 680 git_revs.append(f'delete_branch=>{push_ref["name"]}')
679 681 else:
680 682 if push_ref['name'] not in branches:
681 683 branches.append(push_ref['name'])
682 684
683 685 cmd = [settings.GIT_EXECUTABLE(), 'log',
684 686 f'{push_ref["old_rev"]}..{push_ref["new_rev"]}',
685 687 '--reverse', '--pretty=format:%H']
686 688 stdout, stderr = subprocessio.run_command(
687 689 cmd, env=os.environ.copy())
688 690 # we get bytes from stdout, we need str to be consistent
689 691 log_revs = list(map(ascii_str, stdout.splitlines()))
690 692 git_revs.extend(log_revs)
691 693
692 694 # Pure pygit2 impl. but still 2-3x slower :/
693 695 # results = []
694 696 #
695 697 # with Repository(os.getcwd()) as repo:
696 698 # repo_new_rev = repo[push_ref['new_rev']]
697 699 # repo_old_rev = repo[push_ref['old_rev']]
698 700 # walker = repo.walk(repo_new_rev.id, pygit2.GIT_SORT_TOPOLOGICAL)
699 701 #
700 702 # for commit in walker:
701 703 # if commit.id == repo_old_rev.id:
702 704 # break
703 705 # results.append(commit.id.hex)
704 706 # # reverse the order, can't use GIT_SORT_REVERSE
705 707 # log_revs = results[::-1]
706 708
707 709 elif type_ == 'tags':
708 710 if push_ref['name'] not in tags:
709 711 tags.append(push_ref['name'])
710 712 git_revs.append(f'tag=>{push_ref["name"]}')
711 713
712 714 extras['hook_type'] = 'post_receive'
713 715 extras['commit_ids'] = git_revs
714 716 extras['new_refs'] = {
715 717 'branches': branches,
716 718 'bookmarks': [],
717 719 'tags': tags,
718 720 }
719 721
720 722 stdout = sys.stdout
721 723
722 724 if 'repo_size' in extras['hooks']:
723 725 try:
724 726 _call_hook('repo_size', extras, GitMessageWriter(stdout))
725 727 except Exception:
726 728 pass
727 729
728 730 status_code = _call_hook('post_push', extras, GitMessageWriter(stdout))
729 731 return status_code
730 732
731 733
732 734 def get_extras_from_txn_id(repo_path, txn_id):
733 735 extras = get_txn_id_from_store(repo_path, txn_id)
734 736 return extras
735 737
736 738
737 739 def svn_pre_commit(repo_path, commit_data, env):
738 740
739 741 path, txn_id = commit_data
740 742 branches = []
741 743 tags = []
742 744
743 745 if env.get('RC_SCM_DATA'):
744 746 extras = json.loads(env['RC_SCM_DATA'])
745 747 else:
746 748 ini_path = env.get('RC_INI_FILE')
747 749 if ini_path:
748 750 _get_ini_settings(ini_path)
749 751 # fallback method to read from TXN-ID stored data
750 752 extras = get_extras_from_txn_id(path, txn_id)
751 753
752 754 if not extras:
753 755 raise ValueError('SVN-PRE-COMMIT: Failed to extract context data in called extras for hook execution')
754 756
755 757 if extras.get('rc_internal_commit'):
756 758 # special marker for internal commit, we don't call hooks client
757 759 return 0
758 760
759 761 extras['hook_type'] = 'pre_commit'
760 762 extras['commit_ids'] = [txn_id]
761 763 extras['txn_id'] = txn_id
762 764 extras['new_refs'] = {
763 765 'total_commits': 1,
764 766 'branches': branches,
765 767 'bookmarks': [],
766 768 'tags': tags,
767 769 }
768 770
769 771 return _call_hook('pre_push', extras, SvnMessageWriter())
770 772
771 773
772 774 def svn_post_commit(repo_path, commit_data, env):
773 775 """
774 776 commit_data is path, rev, txn_id
775 777 """
776 778
777 779 if len(commit_data) == 3:
778 780 path, commit_id, txn_id = commit_data
779 781 elif len(commit_data) == 2:
780 782 log.error('Failed to extract txn_id from commit_data using legacy method. '
781 783 'Some functionality might be limited')
782 784 path, commit_id = commit_data
783 785 txn_id = None
784 786 else:
785 787 return 0
786 788
787 789 branches = []
788 790 tags = []
789 791
790 792 if env.get('RC_SCM_DATA'):
791 793 extras = json.loads(env['RC_SCM_DATA'])
792 794 else:
793 795 ini_path = env.get('RC_INI_FILE')
794 796 if ini_path:
795 797 _get_ini_settings(ini_path)
796 798 # fallback method to read from TXN-ID stored data
797 799 extras = get_extras_from_txn_id(path, txn_id)
798 800
799 801 if not extras and txn_id:
800 802 raise ValueError('SVN-POST-COMMIT: Failed to extract context data in called extras for hook execution')
801 803
802 804 if extras.get('rc_internal_commit'):
803 805 # special marker for internal commit, we don't call hooks client
804 806 return 0
805 807
806 808 extras['hook_type'] = 'post_commit'
807 809 extras['commit_ids'] = [commit_id]
808 810 extras['txn_id'] = txn_id
809 811 extras['new_refs'] = {
810 812 'branches': branches,
811 813 'bookmarks': [],
812 814 'tags': tags,
813 815 'total_commits': 1,
814 816 }
815 817
816 818 if 'repo_size' in extras['hooks']:
817 819 try:
818 820 _call_hook('repo_size', extras, SvnMessageWriter())
819 821 except Exception:
820 822 pass
821 823
822 824 return _call_hook('post_push', extras, SvnMessageWriter())
@@ -1,173 +1,173 b''
1 1 # Copyright (C) 2015-2024 RhodeCode GmbH
2 2 #
3 3 # This program is free software: you can redistribute it and/or modify
4 4 # it under the terms of the GNU Affero General Public License, version 3
5 5 # (only), as published by the Free Software Foundation.
6 6 #
7 7 # This program is distributed in the hope that it will be useful,
8 8 # but WITHOUT ANY WARRANTY; without even the implied warranty of
9 9 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 10 # GNU General Public License for more details.
11 11 #
12 12 # You should have received a copy of the GNU Affero General Public License
13 13 # along with this program. If not, see <http://www.gnu.org/licenses/>.
14 14 #
15 15 # This program is dual-licensed. If you wish to learn more about the
16 16 # RhodeCode Enterprise Edition, including its added features, Support services,
17 17 # and proprietary license terms, please see https://rhodecode.com/licenses/
18 18
19 19 import codecs
20 20 import hashlib
21 21 import logging
22 22 import os
23 23 import typing
24 24
25 25 import fsspec
26 26
27 27 from .base import BaseCache, BaseShard
28 28 from ..utils import ShardFileReader, NOT_GIVEN
29 29 from ...type_utils import str2bool
30 30
31 31 log = logging.getLogger(__name__)
32 32
33 33
34 34 class S3Shard(BaseShard):
35 35
36 36 def __init__(self, index, bucket, bucket_folder, fs, **settings):
37 37 self._index: int = index
38 38 self._bucket_folder: str = bucket_folder
39 39 self.storage_type: str = 'bucket'
40 40 self._bucket_main: str = bucket
41 41
42 42 self.fs = fs
43 43
44 44 @property
45 45 def bucket(self) -> str:
46 46 """Cache bucket final path."""
47 47 return os.path.join(self._bucket_main, self._bucket_folder)
48 48
49 49 def _get_keyfile(self, archive_key) -> tuple[str, str]:
50 50 key_file: str = f'{archive_key}-{self.key_suffix}'
51 51 return key_file, os.path.join(self.bucket, key_file)
52 52
53 53 def _get_writer(self, path, mode):
54 54 return self.fs.open(path, 'wb')
55 55
56 56 def _write_file(self, full_path, iterator, mode):
57 57
58 58 # ensure folder in bucket exists
59 59 destination = self.bucket
60 60 if not self.fs.exists(destination):
61 self.fs.mkdir(destination, s3_additional_kwargs={})
61 self.fs.mkdir(destination)
62 62
63 63 writer = self._get_writer(full_path, mode)
64 64
65 65 digest = hashlib.sha256()
66 66 with writer:
67 67 size = 0
68 68 for chunk in iterator:
69 69 size += len(chunk)
70 70 digest.update(chunk)
71 71 writer.write(chunk)
72 72
73 73 sha256 = digest.hexdigest()
74 74 log.debug('written new archive cache under %s, sha256: %s', full_path, sha256)
75 75 return size, sha256
76 76
77 77 def store(self, key, value_reader, metadata: dict | None = None):
78 78 return self._store(key, value_reader, metadata, mode='wb')
79 79
80 80 def fetch(self, key, retry=NOT_GIVEN,
81 81 retry_attempts=NOT_GIVEN, retry_backoff=1,
82 82 presigned_url_expires: int = 0) -> tuple[ShardFileReader, dict]:
83 83 return self._fetch(key, retry, retry_attempts, retry_backoff, presigned_url_expires=presigned_url_expires)
84 84
85 85 def remove(self, key):
86 86 return self._remove(key)
87 87
88 88 def random_filename(self):
89 89 """Return filename and full-path tuple for file storage.
90 90
91 91 Filename will be a randomly generated 28 character hexadecimal string
92 92 with ".archive_cache" suffixed. Two levels of sub-directories will be used to
93 93 reduce the size of directories. On older filesystems, lookups in
94 94 directories with many files may be slow.
95 95 """
96 96
97 97 hex_name = codecs.encode(os.urandom(16), 'hex').decode('utf-8')
98 98
99 99 archive_name = hex_name[4:] + '.archive_cache'
100 100 filename = f"{hex_name[:2]}-{hex_name[2:4]}-{archive_name}"
101 101
102 102 full_path = os.path.join(self.bucket, filename)
103 103 return archive_name, full_path
104 104
105 105 def __repr__(self):
106 106 return f'{self.__class__.__name__}(index={self._index}, bucket={self.bucket})'
107 107
108 108
109 109 class ObjectStoreCache(BaseCache):
110 110 shard_name: str = 'shard-{:03d}'
111 111 shard_cls = S3Shard
112 112
113 113 def __init__(self, locking_url, **settings):
114 114 """
115 115 Initialize objectstore cache instance.
116 116
117 117 :param str locking_url: redis url for a lock
118 118 :param settings: settings dict
119 119
120 120 """
121 121 self._locking_url = locking_url
122 122 self._config = settings
123 123
124 124 objectstore_url = self.get_conf('archive_cache.objectstore.url')
125 125 self._storage_path = objectstore_url # common path for all from BaseCache
126 126
127 127 self._shard_count = int(self.get_conf('archive_cache.objectstore.bucket_shards', pop=True))
128 128 if self._shard_count < 1:
129 129 raise ValueError('cache_shards must be 1 or more')
130 130
131 131 self._bucket = settings.pop('archive_cache.objectstore.bucket')
132 132 if not self._bucket:
133 133 raise ValueError('archive_cache.objectstore.bucket needs to have a value')
134 134
135 135 self._eviction_policy = self.get_conf('archive_cache.objectstore.eviction_policy', pop=True)
136 136 self._cache_size_limit = self.gb_to_bytes(int(self.get_conf('archive_cache.objectstore.cache_size_gb')))
137 137
138 138 self.retry = str2bool(self.get_conf('archive_cache.objectstore.retry', pop=True))
139 139 self.retry_attempts = int(self.get_conf('archive_cache.objectstore.retry_attempts', pop=True))
140 140 self.retry_backoff = int(self.get_conf('archive_cache.objectstore.retry_backoff', pop=True))
141 141
142 142 endpoint_url = settings.pop('archive_cache.objectstore.url')
143 143 key = settings.pop('archive_cache.objectstore.key')
144 144 secret = settings.pop('archive_cache.objectstore.secret')
145 145 region = settings.pop('archive_cache.objectstore.region')
146 146
147 147 log.debug('Initializing %s archival cache instance', self)
148 148
149 149 fs = fsspec.filesystem(
150 150 's3', anon=False, endpoint_url=endpoint_url, key=key, secret=secret, client_kwargs={'region_name': region}
151 151 )
152 152
153 153 # init main bucket
154 154 if not fs.exists(self._bucket):
155 155 fs.mkdir(self._bucket)
156 156
157 157 self._shards = tuple(
158 158 self.shard_cls(
159 159 index=num,
160 160 bucket=self._bucket,
161 161 bucket_folder=self.shard_name.format(num),
162 162 fs=fs,
163 163 **settings,
164 164 )
165 165 for num in range(self._shard_count)
166 166 )
167 167 self._hash = self._shards[0].hash
168 168
169 169 def _get_size(self, shard, archive_path):
170 170 return shard.fs.info(archive_path)['size']
171 171
172 172 def set_presigned_url_expiry(self, val: int) -> None:
173 173 self.presigned_url_expires = val
@@ -1,303 +1,310 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 #import errno
19 19 import fcntl
20 20 import functools
21 21 import logging
22 22 import os
23 23 import pickle
24 24 #import time
25 25
26 26 #import gevent
27 27 import msgpack
28 28 import redis
29 29
30 30 flock_org = fcntl.flock
31 31 from typing import Union
32 32
33 33 from dogpile.cache.api import Deserializer, Serializer
34 34 from dogpile.cache.backends import file as file_backend
35 35 from dogpile.cache.backends import memory as memory_backend
36 36 from dogpile.cache.backends import redis as redis_backend
37 37 from dogpile.cache.backends.file import FileLock
38 38 from dogpile.cache.util import memoized_property
39 39
40 from vcsserver.lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from vcsserver.lib.str_utils import safe_bytes, safe_str
42 from vcsserver.lib.type_utils import str2bool
40 from ...lib.memory_lru_dict import LRUDict, LRUDictDebug
41 from ...lib.str_utils import safe_bytes, safe_str
42 from ...lib.type_utils import str2bool
43 43
44 44 _default_max_size = 1024
45 45
46 46 log = logging.getLogger(__name__)
47 47
48 48
49 49 class LRUMemoryBackend(memory_backend.MemoryBackend):
50 50 key_prefix = 'lru_mem_backend'
51 51 pickle_values = False
52 52
53 53 def __init__(self, arguments):
54 54 self.max_size = arguments.pop('max_size', _default_max_size)
55 55
56 56 LRUDictClass = LRUDict
57 57 if arguments.pop('log_key_count', None):
58 58 LRUDictClass = LRUDictDebug
59 59
60 60 arguments['cache_dict'] = LRUDictClass(self.max_size)
61 61 super().__init__(arguments)
62 62
63 63 def __repr__(self):
64 64 return f'{self.__class__}(maxsize=`{self.max_size}`)'
65 65
66 66 def __str__(self):
67 67 return self.__repr__()
68 68
69 69 def delete(self, key):
70 70 try:
71 71 del self._cache[key]
72 72 except KeyError:
73 73 # we don't care if key isn't there at deletion
74 74 pass
75 75
76 76 def list_keys(self, prefix):
77 77 return list(self._cache.keys())
78 78
79 79 def delete_multi(self, keys):
80 80 for key in keys:
81 81 self.delete(key)
82 82
83 83 def delete_multi_by_prefix(self, prefix):
84 84 cache_keys = self.list_keys(prefix=prefix)
85 85 num_affected_keys = len(cache_keys)
86 86 if num_affected_keys:
87 87 self.delete_multi(cache_keys)
88 88 return num_affected_keys
89 89
90 90
91 91 class PickleSerializer:
92 92 serializer: None | Serializer = staticmethod( # type: ignore
93 93 functools.partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
94 94 )
95 95 deserializer: None | Deserializer = staticmethod( # type: ignore
96 96 functools.partial(pickle.loads)
97 97 )
98 98
99 99
100 100 class MsgPackSerializer:
101 101 serializer: None | Serializer = staticmethod( # type: ignore
102 102 msgpack.packb
103 103 )
104 104 deserializer: None | Deserializer = staticmethod( # type: ignore
105 105 functools.partial(msgpack.unpackb, use_list=False)
106 106 )
107 107
108 108
109 109 class CustomLockFactory(FileLock):
110 110
111 111 pass
112 112
113 113
114 114 class FileNamespaceBackend(PickleSerializer, file_backend.DBMBackend):
115 115 key_prefix = 'file_backend'
116 116
117 117 def __init__(self, arguments):
118 118 arguments['lock_factory'] = CustomLockFactory
119 119 db_file = arguments.get('filename')
120 120
121 121 log.debug('initialing cache-backend=%s db in %s', self.__class__.__name__, db_file)
122 122 db_file_dir = os.path.dirname(db_file)
123 123 if not os.path.isdir(db_file_dir):
124 124 os.makedirs(db_file_dir)
125 125
126 126 try:
127 127 super().__init__(arguments)
128 128 except Exception:
129 129 log.exception('Failed to initialize db at: %s', db_file)
130 130 raise
131 131
132 132 def __repr__(self):
133 133 return f'{self.__class__}(file=`{self.filename}`)'
134 134
135 135 def __str__(self):
136 136 return self.__repr__()
137 137
138 138 def _get_keys_pattern(self, prefix: bytes = b''):
139 139 return b'%b:%b' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
140 140
141 141 def list_keys(self, prefix: bytes = b''):
142 142 prefix = self._get_keys_pattern(prefix)
143 143
144 144 def cond(dbm_key: bytes):
145 145 if not prefix:
146 146 return True
147 147
148 148 if dbm_key.startswith(prefix):
149 149 return True
150 150 return False
151 151
152 152 with self._dbm_file(True) as dbm:
153 153 try:
154 154 return list(filter(cond, dbm.keys()))
155 155 except Exception:
156 156 log.error('Failed to fetch DBM keys from DB: %s', self.get_store())
157 157 raise
158 158
159 159 def delete_multi_by_prefix(self, prefix):
160 160 cache_keys = self.list_keys(prefix=prefix)
161 161 num_affected_keys = len(cache_keys)
162 162 if num_affected_keys:
163 163 self.delete_multi(cache_keys)
164 164 return num_affected_keys
165 165
166 166 def get_store(self):
167 167 return self.filename
168 168
169 def cleanup_store(self):
170 for ext in ("db", "dat", "pag", "dir"):
171 final_filename = self.filename + os.extsep + ext
172 if os.path.exists(final_filename):
173 os.remove(final_filename)
174 log.warning('Removed dbm file %s', final_filename)
175
169 176
170 177 class BaseRedisBackend(redis_backend.RedisBackend):
171 178 key_prefix = ''
172 179
173 180 def __init__(self, arguments):
174 181 self.db_conn = arguments.get('host', '') or arguments.get('url', '') or 'redis-host'
175 182 super().__init__(arguments)
176 183
177 184 self._lock_timeout = self.lock_timeout
178 185 self._lock_auto_renewal = str2bool(arguments.pop("lock_auto_renewal", True))
179 186
180 187 if self._lock_auto_renewal and not self._lock_timeout:
181 188 # set default timeout for auto_renewal
182 189 self._lock_timeout = 30
183 190
184 191 def __repr__(self):
185 192 return f'{self.__class__}(conn=`{self.db_conn}`)'
186 193
187 194 def __str__(self):
188 195 return self.__repr__()
189 196
190 197 def _create_client(self):
191 198 args = {}
192 199
193 200 if self.url is not None:
194 201 args.update(url=self.url)
195 202
196 203 else:
197 204 args.update(
198 205 host=self.host, password=self.password,
199 206 port=self.port, db=self.db
200 207 )
201 208
202 209 connection_pool = redis.ConnectionPool(**args)
203 210 self.writer_client = redis.StrictRedis(
204 211 connection_pool=connection_pool
205 212 )
206 213 self.reader_client = self.writer_client
207 214
208 215 def _get_keys_pattern(self, prefix: bytes = b''):
209 216 return b'%b:%b*' % (safe_bytes(self.key_prefix), safe_bytes(prefix))
210 217
211 218 def list_keys(self, prefix: bytes = b''):
212 219 prefix = self._get_keys_pattern(prefix)
213 220 return self.reader_client.keys(prefix)
214 221
215 222 def delete_multi_by_prefix(self, prefix, use_lua=False):
216 223 if use_lua:
217 224 # high efficient LUA script to delete ALL keys by prefix...
218 225 lua = """local keys = redis.call('keys', ARGV[1])
219 226 for i=1,#keys,5000 do
220 227 redis.call('del', unpack(keys, i, math.min(i+(5000-1), #keys)))
221 228 end
222 229 return #keys"""
223 230 num_affected_keys = self.writer_client.eval(
224 231 lua,
225 232 0,
226 233 f"{prefix}*")
227 234 else:
228 235 cache_keys = self.list_keys(prefix=prefix)
229 236 num_affected_keys = len(cache_keys)
230 237 if num_affected_keys:
231 238 self.delete_multi(cache_keys)
232 239 return num_affected_keys
233 240
234 241 def get_store(self):
235 242 return self.reader_client.connection_pool
236 243
237 244 def get_mutex(self, key):
238 245 if self.distributed_lock:
239 246 lock_key = f'_lock_{safe_str(key)}'
240 247 return get_mutex_lock(
241 248 self.writer_client, lock_key,
242 249 self._lock_timeout,
243 250 auto_renewal=self._lock_auto_renewal
244 251 )
245 252 else:
246 253 return None
247 254
248 255
249 256 class RedisPickleBackend(PickleSerializer, BaseRedisBackend):
250 257 key_prefix = 'redis_pickle_backend'
251 258 pass
252 259
253 260
254 261 class RedisMsgPackBackend(MsgPackSerializer, BaseRedisBackend):
255 262 key_prefix = 'redis_msgpack_backend'
256 263 pass
257 264
258 265
259 266 def get_mutex_lock(client, lock_key, lock_timeout, auto_renewal=False):
260 from vcsserver.lib._vendor import redis_lock
267 from ...lib._vendor import redis_lock
261 268
262 269 class _RedisLockWrapper:
263 270 """LockWrapper for redis_lock"""
264 271
265 272 @classmethod
266 273 def get_lock(cls):
267 274 return redis_lock.Lock(
268 275 redis_client=client,
269 276 name=lock_key,
270 277 expire=lock_timeout,
271 278 auto_renewal=auto_renewal,
272 279 strict=True,
273 280 )
274 281
275 282 def __repr__(self):
276 283 return f"{self.__class__.__name__}:{lock_key}"
277 284
278 285 def __str__(self):
279 286 return f"{self.__class__.__name__}:{lock_key}"
280 287
281 288 def __init__(self):
282 289 self.lock = self.get_lock()
283 290 self.lock_key = lock_key
284 291
285 292 def acquire(self, wait=True):
286 293 log.debug('Trying to acquire Redis lock for key %s', self.lock_key)
287 294 try:
288 295 acquired = self.lock.acquire(wait)
289 296 log.debug('Got lock for key %s, %s', self.lock_key, acquired)
290 297 return acquired
291 298 except redis_lock.AlreadyAcquired:
292 299 return False
293 300 except redis_lock.AlreadyStarted:
294 301 # refresh thread exists, but it also means we acquired the lock
295 302 return True
296 303
297 304 def release(self):
298 305 try:
299 306 self.lock.release()
300 307 except redis_lock.NotAcquired:
301 308 pass
302 309
303 310 return _RedisLockWrapper()
@@ -1,245 +1,245 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17
18 18 import functools
19 19 import logging
20 20 import os
21 21 import threading
22 22 import time
23 23
24 24 import decorator
25 25 from dogpile.cache import CacheRegion
26 26
27 27
28 from vcsserver.utils import sha1
29 from vcsserver.lib.str_utils import safe_bytes
30 from vcsserver.lib.type_utils import str2bool # noqa :required by imports from .utils
28 from ...lib.hash_utils import sha1
29 from ...lib.str_utils import safe_bytes
30 from ...lib.type_utils import str2bool # noqa :required by imports from .utils
31 31
32 32 from . import region_meta
33 33
34 34 log = logging.getLogger(__name__)
35 35
36 36
37 37 class RhodeCodeCacheRegion(CacheRegion):
38 38
39 39 def __repr__(self):
40 40 return f'`{self.__class__.__name__}(name={self.name}, backend={self.backend.__class__})`'
41 41
42 42 def conditional_cache_on_arguments(
43 43 self, namespace=None,
44 44 expiration_time=None,
45 45 should_cache_fn=None,
46 46 to_str=str,
47 47 function_key_generator=None,
48 48 condition=True):
49 49 """
50 50 Custom conditional decorator, that will not touch any dogpile internals if
51 51 condition isn't meet. This works a bit different from should_cache_fn
52 52 And it's faster in cases we don't ever want to compute cached values
53 53 """
54 54 expiration_time_is_callable = callable(expiration_time)
55 55 if not namespace:
56 56 namespace = getattr(self, '_default_namespace', None)
57 57
58 58 if function_key_generator is None:
59 59 function_key_generator = self.function_key_generator
60 60
61 61 def get_or_create_for_user_func(func_key_generator, user_func, *arg, **kw):
62 62
63 63 if not condition:
64 64 log.debug('Calling un-cached method:%s', user_func.__name__)
65 65 start = time.time()
66 66 result = user_func(*arg, **kw)
67 67 total = time.time() - start
68 68 log.debug('un-cached method:%s took %.4fs', user_func.__name__, total)
69 69 return result
70 70
71 71 key = func_key_generator(*arg, **kw)
72 72
73 73 timeout = expiration_time() if expiration_time_is_callable \
74 74 else expiration_time
75 75
76 76 log.debug('Calling cached method:`%s`', user_func.__name__)
77 77 return self.get_or_create(key, user_func, timeout, should_cache_fn, (arg, kw))
78 78
79 79 def cache_decorator(user_func):
80 80 if to_str is str:
81 81 # backwards compatible
82 82 key_generator = function_key_generator(namespace, user_func)
83 83 else:
84 84 key_generator = function_key_generator(namespace, user_func, to_str=to_str)
85 85
86 86 def refresh(*arg, **kw):
87 87 """
88 88 Like invalidate, but regenerates the value instead
89 89 """
90 90 key = key_generator(*arg, **kw)
91 91 value = user_func(*arg, **kw)
92 92 self.set(key, value)
93 93 return value
94 94
95 95 def invalidate(*arg, **kw):
96 96 key = key_generator(*arg, **kw)
97 97 self.delete(key)
98 98
99 99 def set_(value, *arg, **kw):
100 100 key = key_generator(*arg, **kw)
101 101 self.set(key, value)
102 102
103 103 def get(*arg, **kw):
104 104 key = key_generator(*arg, **kw)
105 105 return self.get(key)
106 106
107 107 user_func.set = set_
108 108 user_func.invalidate = invalidate
109 109 user_func.get = get
110 110 user_func.refresh = refresh
111 111 user_func.key_generator = key_generator
112 112 user_func.original = user_func
113 113
114 114 # Use `decorate` to preserve the signature of :param:`user_func`.
115 115 return decorator.decorate(user_func, functools.partial(
116 116 get_or_create_for_user_func, key_generator))
117 117
118 118 return cache_decorator
119 119
120 120
121 121 def make_region(*arg, **kw):
122 122 return RhodeCodeCacheRegion(*arg, **kw)
123 123
124 124
125 125 def get_default_cache_settings(settings, prefixes=None):
126 126 prefixes = prefixes or []
127 127 cache_settings = {}
128 128 for key in settings.keys():
129 129 for prefix in prefixes:
130 130 if key.startswith(prefix):
131 131 name = key.split(prefix)[1].strip()
132 132 val = settings[key]
133 133 if isinstance(val, str):
134 134 val = val.strip()
135 135 cache_settings[name] = val
136 136 return cache_settings
137 137
138 138
139 139 def compute_key_from_params(*args):
140 140 """
141 141 Helper to compute key from given params to be used in cache manager
142 142 """
143 143 return sha1(safe_bytes("_".join(map(str, args))))
144 144
145 145
146 146 def custom_key_generator(backend, namespace, fn):
147 147 func_name = fn.__name__
148 148
149 149 def generate_key(*args):
150 150 backend_pref = getattr(backend, 'key_prefix', None) or 'backend_prefix'
151 151 namespace_pref = namespace or 'default_namespace'
152 152 arg_key = compute_key_from_params(*args)
153 153 final_key = f"{backend_pref}:{namespace_pref}:{func_name}_{arg_key}"
154 154
155 155 return final_key
156 156
157 157 return generate_key
158 158
159 159
160 160 def backend_key_generator(backend):
161 161 """
162 162 Special wrapper that also sends over the backend to the key generator
163 163 """
164 164 def wrapper(namespace, fn):
165 165 return custom_key_generator(backend, namespace, fn)
166 166 return wrapper
167 167
168 168
169 169 def get_or_create_region(region_name, region_namespace: str = None, use_async_runner=False):
170 170 from .backends import FileNamespaceBackend
171 171 from . import async_creation_runner
172 172
173 173 region_obj = region_meta.dogpile_cache_regions.get(region_name)
174 174 if not region_obj:
175 175 reg_keys = list(region_meta.dogpile_cache_regions.keys())
176 176 raise OSError(f'Region `{region_name}` not in configured: {reg_keys}.')
177 177
178 178 region_uid_name = f'{region_name}:{region_namespace}'
179 179
180 180 # Special case for ONLY the FileNamespaceBackend backend. We register one-file-per-region
181 181 if isinstance(region_obj.actual_backend, FileNamespaceBackend):
182 182 if not region_namespace:
183 183 raise ValueError(f'{FileNamespaceBackend} used requires to specify region_namespace param')
184 184
185 185 region_exist = region_meta.dogpile_cache_regions.get(region_namespace)
186 186 if region_exist:
187 187 log.debug('Using already configured region: %s', region_namespace)
188 188 return region_exist
189 189
190 190 expiration_time = region_obj.expiration_time
191 191
192 192 cache_dir = region_meta.dogpile_config_defaults['cache_dir']
193 193 namespace_cache_dir = cache_dir
194 194
195 195 # we default the namespace_cache_dir to our default cache dir.
196 196 # however, if this backend is configured with filename= param, we prioritize that
197 197 # so all caches within that particular region, even those namespaced end up in the same path
198 198 if region_obj.actual_backend.filename:
199 199 namespace_cache_dir = os.path.dirname(region_obj.actual_backend.filename)
200 200
201 201 if not os.path.isdir(namespace_cache_dir):
202 202 os.makedirs(namespace_cache_dir)
203 203 new_region = make_region(
204 204 name=region_uid_name,
205 205 function_key_generator=backend_key_generator(region_obj.actual_backend)
206 206 )
207 207
208 208 namespace_filename = os.path.join(
209 209 namespace_cache_dir, f"{region_name}_{region_namespace}.cache_db")
210 210 # special type that allows 1db per namespace
211 211 new_region.configure(
212 212 backend='dogpile.cache.rc.file_namespace',
213 213 expiration_time=expiration_time,
214 214 arguments={"filename": namespace_filename}
215 215 )
216 216
217 217 # create and save in region caches
218 218 log.debug('configuring new region: %s', region_uid_name)
219 219 region_obj = region_meta.dogpile_cache_regions[region_namespace] = new_region
220 220
221 221 region_obj._default_namespace = region_namespace
222 222 if use_async_runner:
223 223 region_obj.async_creation_runner = async_creation_runner
224 224 return region_obj
225 225
226 226
227 227 def clear_cache_namespace(cache_region: str | RhodeCodeCacheRegion, cache_namespace_uid: str, method: str) -> int:
228 228 from . import CLEAR_DELETE, CLEAR_INVALIDATE
229 229
230 230 if not isinstance(cache_region, RhodeCodeCacheRegion):
231 231 cache_region = get_or_create_region(cache_region, cache_namespace_uid)
232 232 log.debug('clearing cache region: %s [prefix:%s] with method=%s',
233 233 cache_region, cache_namespace_uid, method)
234 234
235 235 num_affected_keys = 0
236 236
237 237 if method == CLEAR_INVALIDATE:
238 238 # NOTE: The CacheRegion.invalidate() method’s default mode of
239 239 # operation is to set a timestamp local to this CacheRegion in this Python process only.
240 240 # It does not impact other Python processes or regions as the timestamp is only stored locally in memory.
241 241 cache_region.invalidate(hard=True)
242 242
243 243 if method == CLEAR_DELETE:
244 244 num_affected_keys = cache_region.backend.delete_multi_by_prefix(prefix=cache_namespace_uid)
245 245 return num_affected_keys
@@ -1,123 +1,123 b''
1 1 # RhodeCode VCSServer provides access to different vcs backends via network.
2 2 # Copyright (C) 2014-2023 RhodeCode GmbH
3 3 #
4 4 # This program is free software; you can redistribute it and/or modify
5 5 # it under the terms of the GNU General Public License as published by
6 6 # the Free Software Foundation; either version 3 of the License, or
7 7 # (at your option) any later version.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU General Public License
15 15 # along with this program; if not, write to the Free Software Foundation,
16 16 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 17 import base64
18 18 import logging
19 19 import time
20 20
21 21 import msgpack
22 22
23 23 import vcsserver
24 24 from vcsserver.lib.str_utils import safe_str
25 25
26 26 log = logging.getLogger(__name__)
27 27
28 28
29 29 def get_access_path(environ):
30 30 path = environ.get('PATH_INFO')
31 31 return path
32 32
33 33
34 34 def get_user_agent(environ):
35 35 return environ.get('HTTP_USER_AGENT')
36 36
37 37
38 38 def get_call_context(request) -> dict:
39 39 cc = {}
40 40 registry = request.registry
41 41 if hasattr(registry, 'vcs_call_context'):
42 42 cc.update({
43 43 'X-RC-Method': registry.vcs_call_context.get('method'),
44 44 'X-RC-Repo-Name': registry.vcs_call_context.get('repo_name')
45 45 })
46 46
47 47 return cc
48 48
49 49
50 50 def get_headers_call_context(environ, strict=True):
51 51 if 'HTTP_X_RC_VCS_STREAM_CALL_CONTEXT' in environ:
52 52 packed_cc = base64.b64decode(environ['HTTP_X_RC_VCS_STREAM_CALL_CONTEXT'])
53 53 return msgpack.unpackb(packed_cc)
54 54 elif strict:
55 55 raise ValueError('Expected header HTTP_X_RC_VCS_STREAM_CALL_CONTEXT not found')
56 56
57 57
58 58 class RequestWrapperTween:
59 59 def __init__(self, handler, registry):
60 60 self.handler = handler
61 61 self.registry = registry
62 62
63 63 # one-time configuration code goes here
64 64
65 65 def __call__(self, request):
66 66 start = time.time()
67 log.debug('Starting request time measurement')
67 log.debug('Starting request processing')
68 68 response = None
69 69
70 70 try:
71 71 response = self.handler(request)
72 72 finally:
73 73 ua = get_user_agent(request.environ)
74 74 call_context = get_call_context(request)
75 75 vcs_method = call_context.get('X-RC-Method', '_NO_VCS_METHOD')
76 76 repo_name = call_context.get('X-RC-Repo-Name', '')
77 77
78 78 count = request.request_count()
79 79 _ver_ = vcsserver.get_version()
80 80 _path = safe_str(get_access_path(request.environ))
81 81
82 82 ip = '127.0.0.1'
83 83 match_route = request.matched_route.name if request.matched_route else "NOT_FOUND"
84 84 resp_code = getattr(response, 'status_code', 'UNDEFINED')
85 85
86 86 _view_path = f"{repo_name}@{_path}/{vcs_method}"
87 87
88 88 total = time.time() - start
89 89
90 90 log.info(
91 'Req[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
91 'Finished request processing: reqq[%4s] IP: %s %s Request to %s time: %.4fs [%s], VCSServer %s',
92 92 count, ip, request.environ.get('REQUEST_METHOD'),
93 93 _view_path, total, ua, _ver_,
94 94 extra={"time": total, "ver": _ver_, "code": resp_code,
95 95 "path": _path, "view_name": match_route, "user_agent": ua,
96 96 "vcs_method": vcs_method, "repo_name": repo_name}
97 97 )
98 98
99 99 statsd = request.registry.statsd
100 100 if statsd:
101 101 match_route = request.matched_route.name if request.matched_route else _path
102 102 elapsed_time_ms = round(1000.0 * total) # use ms only
103 103 statsd.timing(
104 104 "vcsserver_req_timing.histogram", elapsed_time_ms,
105 105 tags=[
106 106 f"view_name:{match_route}",
107 107 f"code:{resp_code}"
108 108 ],
109 109 use_decimals=False
110 110 )
111 111 statsd.incr(
112 112 "vcsserver_req_total", tags=[
113 113 f"view_name:{match_route}",
114 114 f"code:{resp_code}"
115 115 ])
116 116
117 117 return response
118 118
119 119
120 120 def includeme(config):
121 121 config.add_tween(
122 122 'vcsserver.tweens.request_wrapper.RequestWrapperTween',
123 123 )
General Comments 0
You need to be logged in to leave comments. Login now