##// END OF EJS Templates
svn: use shared configurable storage for svn_txn_id interception logic.
marcink -
r3021:6b1bbc7a stable
parent child Browse files
Show More
@@ -1,324 +1,330 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27
28 28 from BaseHTTPServer import BaseHTTPRequestHandler
29 29 from SocketServer import TCPServer
30 30
31 31 import rhodecode
32 32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 33 from rhodecode.model import meta
34 34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 35 from rhodecode.lib import hooks_base
36 36 from rhodecode.lib.utils2 import AttributeDict
37 37 from rhodecode.lib.ext_json import json
38 38 from rhodecode.lib import rc_cache
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class HooksHttpHandler(BaseHTTPRequestHandler):
44 44
45 45 def do_POST(self):
46 46 method, extras = self._read_request()
47 47 txn_id = getattr(self.server, 'txn_id', None)
48 48 if txn_id:
49 49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 50 extras['repository'], extras['txn_id'])
51 51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 52 extras['repository'], extras['txn_id'])
53 53 if txn_id != computed_txn_id:
54 54 raise Exception(
55 55 'TXN ID fail: expected {} got {} instead'.format(
56 56 txn_id, computed_txn_id))
57 57
58 58 try:
59 59 result = self._call_hook(method, extras)
60 60 except Exception as e:
61 61 exc_tb = traceback.format_exc()
62 62 result = {
63 63 'exception': e.__class__.__name__,
64 64 'exception_traceback': exc_tb,
65 65 'exception_args': e.args
66 66 }
67 67 self._write_response(result)
68 68
69 69 def _read_request(self):
70 70 length = int(self.headers['Content-Length'])
71 71 body = self.rfile.read(length).decode('utf-8')
72 72 data = json.loads(body)
73 73 return data['method'], data['extras']
74 74
75 75 def _write_response(self, result):
76 76 self.send_response(200)
77 77 self.send_header("Content-type", "text/json")
78 78 self.end_headers()
79 79 self.wfile.write(json.dumps(result))
80 80
81 81 def _call_hook(self, method, extras):
82 82 hooks = Hooks()
83 83 try:
84 84 result = getattr(hooks, method)(extras)
85 85 finally:
86 86 meta.Session.remove()
87 87 return result
88 88
89 89 def log_message(self, format, *args):
90 90 """
91 91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 92 logging library instead of writing directly to stderr.
93 93 """
94 94
95 95 message = format % args
96 96
97 97 log.debug(
98 98 "%s - - [%s] %s", self.client_address[0],
99 99 self.log_date_time_string(), message)
100 100
101 101
102 102 class DummyHooksCallbackDaemon(object):
103 103 hooks_uri = ''
104 104
105 105 def __init__(self):
106 106 self.hooks_module = Hooks.__module__
107 107
108 108 def __enter__(self):
109 109 log.debug('Running dummy hooks callback daemon')
110 110 return self
111 111
112 112 def __exit__(self, exc_type, exc_val, exc_tb):
113 113 log.debug('Exiting dummy hooks callback daemon')
114 114
115 115
116 116 class ThreadedHookCallbackDaemon(object):
117 117
118 118 _callback_thread = None
119 119 _daemon = None
120 120 _done = False
121 121
122 122 def __init__(self, txn_id=None, host=None, port=None):
123 123 self._prepare(txn_id=txn_id, host=None, port=port)
124 124
125 125 def __enter__(self):
126 126 self._run()
127 127 return self
128 128
129 129 def __exit__(self, exc_type, exc_val, exc_tb):
130 130 log.debug('Callback daemon exiting now...')
131 131 self._stop()
132 132
133 133 def _prepare(self, txn_id=None, host=None, port=None):
134 134 raise NotImplementedError()
135 135
136 136 def _run(self):
137 137 raise NotImplementedError()
138 138
139 139 def _stop(self):
140 140 raise NotImplementedError()
141 141
142 142
143 143 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
144 144 """
145 145 Context manager which will run a callback daemon in a background thread.
146 146 """
147 147
148 148 hooks_uri = None
149 149
150 150 # From Python docs: Polling reduces our responsiveness to a shutdown
151 151 # request and wastes cpu at all other times.
152 152 POLL_INTERVAL = 0.01
153 153
154 154 def _prepare(self, txn_id=None, host=None, port=None):
155 155 host = host or '127.0.0.1'
156 156 self._done = False
157 157 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
158 158 _, port = self._daemon.server_address
159 159 self.hooks_uri = '{}:{}'.format(host, port)
160 160 self.txn_id = txn_id
161 161 # inject transaction_id for later verification
162 162 self._daemon.txn_id = self.txn_id
163 163
164 164 log.debug(
165 165 "Preparing HTTP callback daemon at `%s` and registering hook object",
166 166 self.hooks_uri)
167 167
168 168 def _run(self):
169 169 log.debug("Running event loop of callback daemon in background thread")
170 170 callback_thread = threading.Thread(
171 171 target=self._daemon.serve_forever,
172 172 kwargs={'poll_interval': self.POLL_INTERVAL})
173 173 callback_thread.daemon = True
174 174 callback_thread.start()
175 175 self._callback_thread = callback_thread
176 176
177 177 def _stop(self):
178 178 log.debug("Waiting for background thread to finish.")
179 179 self._daemon.shutdown()
180 180 self._callback_thread.join()
181 181 self._daemon = None
182 182 self._callback_thread = None
183 183 if self.txn_id:
184 184 txn_id_file = get_txn_id_data_path(self.txn_id)
185 185 log.debug('Cleaning up TXN ID %s', txn_id_file)
186 186 if os.path.isfile(txn_id_file):
187 187 os.remove(txn_id_file)
188 188
189 189 log.debug("Background thread done.")
190 190
191 191
192 192 def get_txn_id_data_path(txn_id):
193 root = tempfile.gettempdir()
194 return os.path.join(root, 'rc_txn_id_{}'.format(txn_id))
193 import rhodecode
194
195 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
196 final_dir = os.path.join(root, 'svn_txn_id')
197
198 if not os.path.isdir(final_dir):
199 os.makedirs(final_dir)
200 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
195 201
196 202
197 203 def store_txn_id_data(txn_id, data_dict):
198 204 if not txn_id:
199 205 log.warning('Cannot store txn_id because it is empty')
200 206 return
201 207
202 208 path = get_txn_id_data_path(txn_id)
203 209 try:
204 210 with open(path, 'wb') as f:
205 211 f.write(json.dumps(data_dict))
206 212 except Exception:
207 213 log.exception('Failed to write txn_id metadata')
208 214
209 215
210 216 def get_txn_id_from_store(txn_id):
211 217 """
212 218 Reads txn_id from store and if present returns the data for callback manager
213 219 """
214 220 path = get_txn_id_data_path(txn_id)
215 221 try:
216 222 with open(path, 'rb') as f:
217 223 return json.loads(f.read())
218 224 except Exception:
219 225 return {}
220 226
221 227
222 228 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
223 229 txn_details = get_txn_id_from_store(txn_id)
224 230 port = txn_details.get('port', 0)
225 231 if use_direct_calls:
226 232 callback_daemon = DummyHooksCallbackDaemon()
227 233 extras['hooks_module'] = callback_daemon.hooks_module
228 234 else:
229 235 if protocol == 'http':
230 236 callback_daemon = HttpHooksCallbackDaemon(
231 237 txn_id=txn_id, host=host, port=port)
232 238 else:
233 239 log.error('Unsupported callback daemon protocol "%s"', protocol)
234 240 raise Exception('Unsupported callback daemon protocol.')
235 241
236 242 extras['hooks_uri'] = callback_daemon.hooks_uri
237 243 extras['hooks_protocol'] = protocol
238 244 extras['time'] = time.time()
239 245
240 246 # register txn_id
241 247 extras['txn_id'] = txn_id
242 248
243 249 log.debug('Prepared a callback daemon: %s at url `%s`',
244 250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
245 251 return callback_daemon, extras
246 252
247 253
248 254 class Hooks(object):
249 255 """
250 256 Exposes the hooks for remote call backs
251 257 """
252 258
253 259 def repo_size(self, extras):
254 260 log.debug("Called repo_size of %s object", self)
255 261 return self._call_hook(hooks_base.repo_size, extras)
256 262
257 263 def pre_pull(self, extras):
258 264 log.debug("Called pre_pull of %s object", self)
259 265 return self._call_hook(hooks_base.pre_pull, extras)
260 266
261 267 def post_pull(self, extras):
262 268 log.debug("Called post_pull of %s object", self)
263 269 return self._call_hook(hooks_base.post_pull, extras)
264 270
265 271 def pre_push(self, extras):
266 272 log.debug("Called pre_push of %s object", self)
267 273 return self._call_hook(hooks_base.pre_push, extras)
268 274
269 275 def post_push(self, extras):
270 276 log.debug("Called post_push of %s object", self)
271 277 return self._call_hook(hooks_base.post_push, extras)
272 278
273 279 def _call_hook(self, hook, extras):
274 280 extras = AttributeDict(extras)
275 281 server_url = extras['server_url']
276 282 request = bootstrap_request(application_url=server_url)
277 283
278 284 bootstrap_config(request) # inject routes and other interfaces
279 285
280 286 # inject the user for usage in hooks
281 287 request.user = AttributeDict({'username': extras.username,
282 288 'ip_addr': extras.ip,
283 289 'user_id': extras.user_id})
284 290
285 291 extras.request = request
286 292
287 293 try:
288 294 result = hook(extras)
289 295 except HTTPBranchProtected as handled_error:
290 296 # Those special cases doesn't need error reporting. It's a case of
291 297 # locked repo or protected branch
292 298 result = AttributeDict({
293 299 'status': handled_error.code,
294 300 'output': handled_error.explanation
295 301 })
296 302 except (HTTPLockedRC, Exception) as error:
297 303 # locked needs different handling since we need to also
298 304 # handle PULL operations
299 305 exc_tb = ''
300 306 if not isinstance(error, HTTPLockedRC):
301 307 exc_tb = traceback.format_exc()
302 308 log.exception('Exception when handling hook %s', hook)
303 309 error_args = error.args
304 310 return {
305 311 'status': 128,
306 312 'output': '',
307 313 'exception': type(error).__name__,
308 314 'exception_traceback': exc_tb,
309 315 'exception_args': error_args,
310 316 }
311 317 finally:
312 318 meta.Session.remove()
313 319
314 320 log.debug('Got hook call response %s', result)
315 321 return {
316 322 'status': result.status,
317 323 'output': result.output,
318 324 }
319 325
320 326 def __enter__(self):
321 327 return self
322 328
323 329 def __exit__(self, exc_type, exc_val, exc_tb):
324 330 pass
@@ -1,204 +1,204 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import base64
22 22 import logging
23 23 import urllib
24 24 from urlparse import urljoin
25 25
26 26 import requests
27 27 from pyramid.httpexceptions import HTTPNotAcceptable
28 28
29 29 from rhodecode.lib import rc_cache
30 30 from rhodecode.lib.middleware import simplevcs
31 31 from rhodecode.lib.utils import is_valid_repo
32 32 from rhodecode.lib.utils2 import str2bool, safe_int
33 33 from rhodecode.lib.ext_json import json
34 34 from rhodecode.lib.hooks_daemon import store_txn_id_data
35 35
36 36
37 37 log = logging.getLogger(__name__)
38 38
39 39
40 40 class SimpleSvnApp(object):
41 41 IGNORED_HEADERS = [
42 42 'connection', 'keep-alive', 'content-encoding',
43 43 'transfer-encoding', 'content-length']
44 44 rc_extras = {}
45 45
46 46 def __init__(self, config):
47 47 self.config = config
48 48
49 49 def __call__(self, environ, start_response):
50 50 request_headers = self._get_request_headers(environ)
51 51
52 52 data = environ['wsgi.input']
53 53 # johbo: Avoid that we end up with sending the request in chunked
54 54 # transfer encoding (mainly on Gunicorn). If we know the content
55 55 # length, then we should transfer the payload in one request.
56 56 if environ['REQUEST_METHOD'] == 'MKCOL' or 'CONTENT_LENGTH' in environ:
57 57 data = data.read()
58 58 if data.startswith('(create-txn-with-props'):
59 59 # store on-the-fly our rc_extra using svn revision properties
60 60 # those can be read later on in hooks executed so we have a way
61 61 # to pass in the data into svn hooks
62 62 rc_data = base64.urlsafe_b64encode(json.dumps(self.rc_extras))
63 63 rc_data_len = len(rc_data)
64 64 # header defines data lenght, and serialized data
65 65 skel = ' rc-scm-extras {} {}'.format(rc_data_len, rc_data)
66 66 data = data[:-2] + skel + '))'
67 67
68 68 log.debug('Calling: %s method via `%s`', environ['REQUEST_METHOD'],
69 69 self._get_url(environ['PATH_INFO']))
70 70
71 71 response = requests.request(
72 72 environ['REQUEST_METHOD'], self._get_url(environ['PATH_INFO']),
73 73 data=data, headers=request_headers)
74 74
75 75 if response.status_code not in [200, 401]:
76 76 if response.status_code >= 500:
77 log.error('Got SVN response:%s with text:`%s`',
77 log.error('Got SVN response:%s with text:\n`%s`',
78 78 response, response.text)
79 79 else:
80 log.debug('Got SVN response:%s with text:`%s`',
80 log.debug('Got SVN response:%s with text:\n`%s`',
81 81 response, response.text)
82 82 else:
83 83 log.debug('got response code: %s', response.status_code)
84 84
85 85 response_headers = self._get_response_headers(response.headers)
86 86
87 87 if response.headers.get('SVN-Txn-name'):
88 88 svn_tx_id = response.headers.get('SVN-Txn-name')
89 89 txn_id = rc_cache.utils.compute_key_from_params(
90 90 self.config['repository'], svn_tx_id)
91 91 port = safe_int(self.rc_extras['hooks_uri'].split(':')[-1])
92 92 store_txn_id_data(txn_id, {'port': port})
93 93
94 94 start_response(
95 95 '{} {}'.format(response.status_code, response.reason),
96 96 response_headers)
97 97 return response.iter_content(chunk_size=1024)
98 98
99 99 def _get_url(self, path):
100 100 url_path = urljoin(
101 101 self.config.get('subversion_http_server_url', ''), path)
102 102 url_path = urllib.quote(url_path, safe="/:=~+!$,;'")
103 103 return url_path
104 104
105 105 def _get_request_headers(self, environ):
106 106 headers = {}
107 107
108 108 for key in environ:
109 109 if not key.startswith('HTTP_'):
110 110 continue
111 111 new_key = key.split('_')
112 112 new_key = [k.capitalize() for k in new_key[1:]]
113 113 new_key = '-'.join(new_key)
114 114 headers[new_key] = environ[key]
115 115
116 116 if 'CONTENT_TYPE' in environ:
117 117 headers['Content-Type'] = environ['CONTENT_TYPE']
118 118
119 119 if 'CONTENT_LENGTH' in environ:
120 120 headers['Content-Length'] = environ['CONTENT_LENGTH']
121 121
122 122 return headers
123 123
124 124 def _get_response_headers(self, headers):
125 125 headers = [
126 126 (h, headers[h])
127 127 for h in headers
128 128 if h.lower() not in self.IGNORED_HEADERS
129 129 ]
130 130
131 131 return headers
132 132
133 133
134 134 class DisabledSimpleSvnApp(object):
135 135 def __init__(self, config):
136 136 self.config = config
137 137
138 138 def __call__(self, environ, start_response):
139 139 reason = 'Cannot handle SVN call because: SVN HTTP Proxy is not enabled'
140 140 log.warning(reason)
141 141 return HTTPNotAcceptable(reason)(environ, start_response)
142 142
143 143
144 144 class SimpleSvn(simplevcs.SimpleVCS):
145 145
146 146 SCM = 'svn'
147 147 READ_ONLY_COMMANDS = ('OPTIONS', 'PROPFIND', 'GET', 'REPORT')
148 148 DEFAULT_HTTP_SERVER = 'http://localhost:8090'
149 149
150 150 def _get_repository_name(self, environ):
151 151 """
152 152 Gets repository name out of PATH_INFO header
153 153
154 154 :param environ: environ where PATH_INFO is stored
155 155 """
156 156 path = environ['PATH_INFO'].split('!')
157 157 repo_name = path[0].strip('/')
158 158
159 159 # SVN includes the whole path in it's requests, including
160 160 # subdirectories inside the repo. Therefore we have to search for
161 161 # the repo root directory.
162 162 if not is_valid_repo(
163 163 repo_name, self.base_path, explicit_scm=self.SCM):
164 164 current_path = ''
165 165 for component in repo_name.split('/'):
166 166 current_path += component
167 167 if is_valid_repo(
168 168 current_path, self.base_path, explicit_scm=self.SCM):
169 169 return current_path
170 170 current_path += '/'
171 171
172 172 return repo_name
173 173
174 174 def _get_action(self, environ):
175 175 return (
176 176 'pull'
177 177 if environ['REQUEST_METHOD'] in self.READ_ONLY_COMMANDS
178 178 else 'push')
179 179
180 180 def _should_use_callback_daemon(self, extras, environ, action):
181 181 # only MERGE command triggers hooks, so we don't want to start
182 182 # hooks server too many times. POST however starts the svn transaction
183 183 # so we also need to run the init of callback daemon of POST
184 184 if environ['REQUEST_METHOD'] in ['MERGE', 'POST']:
185 185 return True
186 186 return False
187 187
188 188 def _create_wsgi_app(self, repo_path, repo_name, config):
189 189 if self._is_svn_enabled():
190 190 return SimpleSvnApp(config)
191 191 # we don't have http proxy enabled return dummy request handler
192 192 return DisabledSimpleSvnApp(config)
193 193
194 194 def _is_svn_enabled(self):
195 195 conf = self.repo_vcs_config
196 196 return str2bool(conf.get('vcs_svn_proxy', 'http_requests_enabled'))
197 197
198 198 def _create_config(self, extras, repo_name):
199 199 conf = self.repo_vcs_config
200 200 server_url = conf.get('vcs_svn_proxy', 'http_server_url')
201 201 server_url = server_url or self.DEFAULT_HTTP_SERVER
202 202
203 203 extras['subversion_http_server_url'] = server_url
204 204 return extras
General Comments 0
You need to be logged in to leave comments. Login now