##// END OF EJS Templates
hooks: reserver only high-ports to limit possibility of hitting port from rhodecode stack
super-admin -
r4860:d80ce1a5 default
parent child Browse files
Show More
@@ -1,350 +1,357 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27 import socket
28 import random
28 29
29 30 from BaseHTTPServer import BaseHTTPRequestHandler
30 31 from SocketServer import TCPServer
31 32
32 33 import rhodecode
33 34 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
34 35 from rhodecode.model import meta
35 36 from rhodecode.lib.base import bootstrap_request, bootstrap_config
36 37 from rhodecode.lib import hooks_base
37 38 from rhodecode.lib.utils2 import AttributeDict
38 39 from rhodecode.lib.ext_json import json
39 40 from rhodecode.lib import rc_cache
40 41
41 42 log = logging.getLogger(__name__)
42 43
43 44
44 45 class HooksHttpHandler(BaseHTTPRequestHandler):
45 46
46 47 def do_POST(self):
47 48 method, extras = self._read_request()
48 49 txn_id = getattr(self.server, 'txn_id', None)
49 50 if txn_id:
50 51 log.debug('Computing TXN_ID based on `%s`:`%s`',
51 52 extras['repository'], extras['txn_id'])
52 53 computed_txn_id = rc_cache.utils.compute_key_from_params(
53 54 extras['repository'], extras['txn_id'])
54 55 if txn_id != computed_txn_id:
55 56 raise Exception(
56 57 'TXN ID fail: expected {} got {} instead'.format(
57 58 txn_id, computed_txn_id))
58 59
59 60 try:
60 61 result = self._call_hook(method, extras)
61 62 except Exception as e:
62 63 exc_tb = traceback.format_exc()
63 64 result = {
64 65 'exception': e.__class__.__name__,
65 66 'exception_traceback': exc_tb,
66 67 'exception_args': e.args
67 68 }
68 69 self._write_response(result)
69 70
70 71 def _read_request(self):
71 72 length = int(self.headers['Content-Length'])
72 73 body = self.rfile.read(length).decode('utf-8')
73 74 data = json.loads(body)
74 75 return data['method'], data['extras']
75 76
76 77 def _write_response(self, result):
77 78 self.send_response(200)
78 79 self.send_header("Content-type", "text/json")
79 80 self.end_headers()
80 81 self.wfile.write(json.dumps(result))
81 82
82 83 def _call_hook(self, method, extras):
83 84 hooks = Hooks()
84 85 try:
85 86 result = getattr(hooks, method)(extras)
86 87 finally:
87 88 meta.Session.remove()
88 89 return result
89 90
90 91 def log_message(self, format, *args):
91 92 """
92 93 This is an overridden method of BaseHTTPRequestHandler which logs using
93 94 logging library instead of writing directly to stderr.
94 95 """
95 96
96 97 message = format % args
97 98
98 99 log.debug(
99 100 "%s - - [%s] %s", self.client_address[0],
100 101 self.log_date_time_string(), message)
101 102
102 103
103 104 class DummyHooksCallbackDaemon(object):
104 105 hooks_uri = ''
105 106
106 107 def __init__(self):
107 108 self.hooks_module = Hooks.__module__
108 109
109 110 def __enter__(self):
110 111 log.debug('Running `%s` callback daemon', self.__class__.__name__)
111 112 return self
112 113
113 114 def __exit__(self, exc_type, exc_val, exc_tb):
114 115 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
115 116
116 117
117 118 class ThreadedHookCallbackDaemon(object):
118 119
119 120 _callback_thread = None
120 121 _daemon = None
121 122 _done = False
122 123
123 124 def __init__(self, txn_id=None, host=None, port=None):
124 125 self._prepare(txn_id=txn_id, host=host, port=port)
125 126
126 127 def __enter__(self):
127 128 log.debug('Running `%s` callback daemon', self.__class__.__name__)
128 129 self._run()
129 130 return self
130 131
131 132 def __exit__(self, exc_type, exc_val, exc_tb):
132 133 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
133 134 self._stop()
134 135
135 136 def _prepare(self, txn_id=None, host=None, port=None):
136 137 raise NotImplementedError()
137 138
138 139 def _run(self):
139 140 raise NotImplementedError()
140 141
141 142 def _stop(self):
142 143 raise NotImplementedError()
143 144
144 145
145 146 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
146 147 """
147 148 Context manager which will run a callback daemon in a background thread.
148 149 """
149 150
150 151 hooks_uri = None
151 152
152 153 # From Python docs: Polling reduces our responsiveness to a shutdown
153 154 # request and wastes cpu at all other times.
154 155 POLL_INTERVAL = 0.01
155 156
156 157 def get_hostname(self):
157 158 return socket.gethostname() or '127.0.0.1'
158 159
159 def get_available_port(self):
160 mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161 mysocket.bind((self.get_hostname(), 0))
162 port = mysocket.getsockname()[1]
163 mysocket.close()
164 del mysocket
165 return port
160 def get_available_port(self, min_port=20000, max_port=65535):
161 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
162 hostname = self.get_hostname()
163
164 for _ in range(min_port, max_port):
165 pick_port = random.randint(min_port, max_port)
166 try:
167 sock.bind((hostname, pick_port))
168 sock.close()
169 del sock
170 return pick_port
171 except OSError:
172 pass
166 173
167 174 def _prepare(self, txn_id=None, host=None, port=None):
168 175 if not host or host == "*":
169 176 host = self.get_hostname()
170 177 if not port:
171 178 port = self.get_available_port()
172 179
173 180 server_address = (host, port)
174 181 self.hooks_uri = '{}:{}'.format(host, port)
175 182 self.txn_id = txn_id
176 183 self._done = False
177 184
178 185 log.debug(
179 186 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
180 187 self.hooks_uri, HooksHttpHandler)
181 188
182 189 self._daemon = TCPServer(server_address, HooksHttpHandler)
183 190 # inject transaction_id for later verification
184 191 self._daemon.txn_id = self.txn_id
185 192
186 193 def _run(self):
187 194 log.debug("Running event loop of callback daemon in background thread")
188 195 callback_thread = threading.Thread(
189 196 target=self._daemon.serve_forever,
190 197 kwargs={'poll_interval': self.POLL_INTERVAL})
191 198 callback_thread.daemon = True
192 199 callback_thread.start()
193 200 self._callback_thread = callback_thread
194 201
195 202 def _stop(self):
196 203 log.debug("Waiting for background thread to finish.")
197 204 self._daemon.shutdown()
198 205 self._callback_thread.join()
199 206 self._daemon = None
200 207 self._callback_thread = None
201 208 if self.txn_id:
202 209 txn_id_file = get_txn_id_data_path(self.txn_id)
203 210 log.debug('Cleaning up TXN ID %s', txn_id_file)
204 211 if os.path.isfile(txn_id_file):
205 212 os.remove(txn_id_file)
206 213
207 214 log.debug("Background thread done.")
208 215
209 216
210 217 def get_txn_id_data_path(txn_id):
211 218 import rhodecode
212 219
213 220 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
214 221 final_dir = os.path.join(root, 'svn_txn_id')
215 222
216 223 if not os.path.isdir(final_dir):
217 224 os.makedirs(final_dir)
218 225 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
219 226
220 227
221 228 def store_txn_id_data(txn_id, data_dict):
222 229 if not txn_id:
223 230 log.warning('Cannot store txn_id because it is empty')
224 231 return
225 232
226 233 path = get_txn_id_data_path(txn_id)
227 234 try:
228 235 with open(path, 'wb') as f:
229 236 f.write(json.dumps(data_dict))
230 237 except Exception:
231 238 log.exception('Failed to write txn_id metadata')
232 239
233 240
234 241 def get_txn_id_from_store(txn_id):
235 242 """
236 243 Reads txn_id from store and if present returns the data for callback manager
237 244 """
238 245 path = get_txn_id_data_path(txn_id)
239 246 try:
240 247 with open(path, 'rb') as f:
241 248 return json.loads(f.read())
242 249 except Exception:
243 250 return {}
244 251
245 252
246 253 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
247 254 txn_details = get_txn_id_from_store(txn_id)
248 255 port = txn_details.get('port', 0)
249 256 if use_direct_calls:
250 257 callback_daemon = DummyHooksCallbackDaemon()
251 258 extras['hooks_module'] = callback_daemon.hooks_module
252 259 else:
253 260 if protocol == 'http':
254 261 callback_daemon = HttpHooksCallbackDaemon(
255 262 txn_id=txn_id, host=host, port=port)
256 263 else:
257 264 log.error('Unsupported callback daemon protocol "%s"', protocol)
258 265 raise Exception('Unsupported callback daemon protocol.')
259 266
260 267 extras['hooks_uri'] = callback_daemon.hooks_uri
261 268 extras['hooks_protocol'] = protocol
262 269 extras['time'] = time.time()
263 270
264 271 # register txn_id
265 272 extras['txn_id'] = txn_id
266 273 log.debug('Prepared a callback daemon: %s at url `%s`',
267 274 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
268 275 return callback_daemon, extras
269 276
270 277
271 278 class Hooks(object):
272 279 """
273 280 Exposes the hooks for remote call backs
274 281 """
275 282
276 283 def repo_size(self, extras):
277 284 log.debug("Called repo_size of %s object", self)
278 285 return self._call_hook(hooks_base.repo_size, extras)
279 286
280 287 def pre_pull(self, extras):
281 288 log.debug("Called pre_pull of %s object", self)
282 289 return self._call_hook(hooks_base.pre_pull, extras)
283 290
284 291 def post_pull(self, extras):
285 292 log.debug("Called post_pull of %s object", self)
286 293 return self._call_hook(hooks_base.post_pull, extras)
287 294
288 295 def pre_push(self, extras):
289 296 log.debug("Called pre_push of %s object", self)
290 297 return self._call_hook(hooks_base.pre_push, extras)
291 298
292 299 def post_push(self, extras):
293 300 log.debug("Called post_push of %s object", self)
294 301 return self._call_hook(hooks_base.post_push, extras)
295 302
296 303 def _call_hook(self, hook, extras):
297 304 extras = AttributeDict(extras)
298 305 server_url = extras['server_url']
299 306 request = bootstrap_request(application_url=server_url)
300 307
301 308 bootstrap_config(request) # inject routes and other interfaces
302 309
303 310 # inject the user for usage in hooks
304 311 request.user = AttributeDict({'username': extras.username,
305 312 'ip_addr': extras.ip,
306 313 'user_id': extras.user_id})
307 314
308 315 extras.request = request
309 316
310 317 try:
311 318 result = hook(extras)
312 319 if result is None:
313 320 raise Exception(
314 321 'Failed to obtain hook result from func: {}'.format(hook))
315 322 except HTTPBranchProtected as handled_error:
316 323 # Those special cases doesn't need error reporting. It's a case of
317 324 # locked repo or protected branch
318 325 result = AttributeDict({
319 326 'status': handled_error.code,
320 327 'output': handled_error.explanation
321 328 })
322 329 except (HTTPLockedRC, Exception) as error:
323 330 # locked needs different handling since we need to also
324 331 # handle PULL operations
325 332 exc_tb = ''
326 333 if not isinstance(error, HTTPLockedRC):
327 334 exc_tb = traceback.format_exc()
328 335 log.exception('Exception when handling hook %s', hook)
329 336 error_args = error.args
330 337 return {
331 338 'status': 128,
332 339 'output': '',
333 340 'exception': type(error).__name__,
334 341 'exception_traceback': exc_tb,
335 342 'exception_args': error_args,
336 343 }
337 344 finally:
338 345 meta.Session.remove()
339 346
340 347 log.debug('Got hook call response %s', result)
341 348 return {
342 349 'status': result.status,
343 350 'output': result.output,
344 351 }
345 352
346 353 def __enter__(self):
347 354 return self
348 355
349 356 def __exit__(self, exc_type, exc_val, exc_tb):
350 357 pass
General Comments 0
You need to be logged in to leave comments. Login now