##// END OF EJS Templates
hooks: reserver only high-ports to limit possibility of hitting port from rhodecode stack
super-admin -
r4860:d80ce1a5 default
parent child Browse files
Show More
@@ -1,350 +1,357 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27 import socket
27 import socket
28 import random
28
29
29 from BaseHTTPServer import BaseHTTPRequestHandler
30 from BaseHTTPServer import BaseHTTPRequestHandler
30 from SocketServer import TCPServer
31 from SocketServer import TCPServer
31
32
32 import rhodecode
33 import rhodecode
33 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
34 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
34 from rhodecode.model import meta
35 from rhodecode.model import meta
35 from rhodecode.lib.base import bootstrap_request, bootstrap_config
36 from rhodecode.lib.base import bootstrap_request, bootstrap_config
36 from rhodecode.lib import hooks_base
37 from rhodecode.lib import hooks_base
37 from rhodecode.lib.utils2 import AttributeDict
38 from rhodecode.lib.utils2 import AttributeDict
38 from rhodecode.lib.ext_json import json
39 from rhodecode.lib.ext_json import json
39 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
40
41
41 log = logging.getLogger(__name__)
42 log = logging.getLogger(__name__)
42
43
43
44
44 class HooksHttpHandler(BaseHTTPRequestHandler):
45 class HooksHttpHandler(BaseHTTPRequestHandler):
45
46
46 def do_POST(self):
47 def do_POST(self):
47 method, extras = self._read_request()
48 method, extras = self._read_request()
48 txn_id = getattr(self.server, 'txn_id', None)
49 txn_id = getattr(self.server, 'txn_id', None)
49 if txn_id:
50 if txn_id:
50 log.debug('Computing TXN_ID based on `%s`:`%s`',
51 log.debug('Computing TXN_ID based on `%s`:`%s`',
51 extras['repository'], extras['txn_id'])
52 extras['repository'], extras['txn_id'])
52 computed_txn_id = rc_cache.utils.compute_key_from_params(
53 computed_txn_id = rc_cache.utils.compute_key_from_params(
53 extras['repository'], extras['txn_id'])
54 extras['repository'], extras['txn_id'])
54 if txn_id != computed_txn_id:
55 if txn_id != computed_txn_id:
55 raise Exception(
56 raise Exception(
56 'TXN ID fail: expected {} got {} instead'.format(
57 'TXN ID fail: expected {} got {} instead'.format(
57 txn_id, computed_txn_id))
58 txn_id, computed_txn_id))
58
59
59 try:
60 try:
60 result = self._call_hook(method, extras)
61 result = self._call_hook(method, extras)
61 except Exception as e:
62 except Exception as e:
62 exc_tb = traceback.format_exc()
63 exc_tb = traceback.format_exc()
63 result = {
64 result = {
64 'exception': e.__class__.__name__,
65 'exception': e.__class__.__name__,
65 'exception_traceback': exc_tb,
66 'exception_traceback': exc_tb,
66 'exception_args': e.args
67 'exception_args': e.args
67 }
68 }
68 self._write_response(result)
69 self._write_response(result)
69
70
70 def _read_request(self):
71 def _read_request(self):
71 length = int(self.headers['Content-Length'])
72 length = int(self.headers['Content-Length'])
72 body = self.rfile.read(length).decode('utf-8')
73 body = self.rfile.read(length).decode('utf-8')
73 data = json.loads(body)
74 data = json.loads(body)
74 return data['method'], data['extras']
75 return data['method'], data['extras']
75
76
76 def _write_response(self, result):
77 def _write_response(self, result):
77 self.send_response(200)
78 self.send_response(200)
78 self.send_header("Content-type", "text/json")
79 self.send_header("Content-type", "text/json")
79 self.end_headers()
80 self.end_headers()
80 self.wfile.write(json.dumps(result))
81 self.wfile.write(json.dumps(result))
81
82
82 def _call_hook(self, method, extras):
83 def _call_hook(self, method, extras):
83 hooks = Hooks()
84 hooks = Hooks()
84 try:
85 try:
85 result = getattr(hooks, method)(extras)
86 result = getattr(hooks, method)(extras)
86 finally:
87 finally:
87 meta.Session.remove()
88 meta.Session.remove()
88 return result
89 return result
89
90
90 def log_message(self, format, *args):
91 def log_message(self, format, *args):
91 """
92 """
92 This is an overridden method of BaseHTTPRequestHandler which logs using
93 This is an overridden method of BaseHTTPRequestHandler which logs using
93 logging library instead of writing directly to stderr.
94 logging library instead of writing directly to stderr.
94 """
95 """
95
96
96 message = format % args
97 message = format % args
97
98
98 log.debug(
99 log.debug(
99 "%s - - [%s] %s", self.client_address[0],
100 "%s - - [%s] %s", self.client_address[0],
100 self.log_date_time_string(), message)
101 self.log_date_time_string(), message)
101
102
102
103
103 class DummyHooksCallbackDaemon(object):
104 class DummyHooksCallbackDaemon(object):
104 hooks_uri = ''
105 hooks_uri = ''
105
106
106 def __init__(self):
107 def __init__(self):
107 self.hooks_module = Hooks.__module__
108 self.hooks_module = Hooks.__module__
108
109
109 def __enter__(self):
110 def __enter__(self):
110 log.debug('Running `%s` callback daemon', self.__class__.__name__)
111 log.debug('Running `%s` callback daemon', self.__class__.__name__)
111 return self
112 return self
112
113
113 def __exit__(self, exc_type, exc_val, exc_tb):
114 def __exit__(self, exc_type, exc_val, exc_tb):
114 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
115 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
115
116
116
117
117 class ThreadedHookCallbackDaemon(object):
118 class ThreadedHookCallbackDaemon(object):
118
119
119 _callback_thread = None
120 _callback_thread = None
120 _daemon = None
121 _daemon = None
121 _done = False
122 _done = False
122
123
123 def __init__(self, txn_id=None, host=None, port=None):
124 def __init__(self, txn_id=None, host=None, port=None):
124 self._prepare(txn_id=txn_id, host=host, port=port)
125 self._prepare(txn_id=txn_id, host=host, port=port)
125
126
126 def __enter__(self):
127 def __enter__(self):
127 log.debug('Running `%s` callback daemon', self.__class__.__name__)
128 log.debug('Running `%s` callback daemon', self.__class__.__name__)
128 self._run()
129 self._run()
129 return self
130 return self
130
131
131 def __exit__(self, exc_type, exc_val, exc_tb):
132 def __exit__(self, exc_type, exc_val, exc_tb):
132 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
133 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
133 self._stop()
134 self._stop()
134
135
135 def _prepare(self, txn_id=None, host=None, port=None):
136 def _prepare(self, txn_id=None, host=None, port=None):
136 raise NotImplementedError()
137 raise NotImplementedError()
137
138
138 def _run(self):
139 def _run(self):
139 raise NotImplementedError()
140 raise NotImplementedError()
140
141
141 def _stop(self):
142 def _stop(self):
142 raise NotImplementedError()
143 raise NotImplementedError()
143
144
144
145
145 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
146 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
146 """
147 """
147 Context manager which will run a callback daemon in a background thread.
148 Context manager which will run a callback daemon in a background thread.
148 """
149 """
149
150
150 hooks_uri = None
151 hooks_uri = None
151
152
152 # From Python docs: Polling reduces our responsiveness to a shutdown
153 # From Python docs: Polling reduces our responsiveness to a shutdown
153 # request and wastes cpu at all other times.
154 # request and wastes cpu at all other times.
154 POLL_INTERVAL = 0.01
155 POLL_INTERVAL = 0.01
155
156
156 def get_hostname(self):
157 def get_hostname(self):
157 return socket.gethostname() or '127.0.0.1'
158 return socket.gethostname() or '127.0.0.1'
158
159
159 def get_available_port(self):
160 def get_available_port(self, min_port=20000, max_port=65535):
160 mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
161 mysocket.bind((self.get_hostname(), 0))
162 hostname = self.get_hostname()
162 port = mysocket.getsockname()[1]
163
163 mysocket.close()
164 for _ in range(min_port, max_port):
164 del mysocket
165 pick_port = random.randint(min_port, max_port)
165 return port
166 try:
167 sock.bind((hostname, pick_port))
168 sock.close()
169 del sock
170 return pick_port
171 except OSError:
172 pass
166
173
167 def _prepare(self, txn_id=None, host=None, port=None):
174 def _prepare(self, txn_id=None, host=None, port=None):
168 if not host or host == "*":
175 if not host or host == "*":
169 host = self.get_hostname()
176 host = self.get_hostname()
170 if not port:
177 if not port:
171 port = self.get_available_port()
178 port = self.get_available_port()
172
179
173 server_address = (host, port)
180 server_address = (host, port)
174 self.hooks_uri = '{}:{}'.format(host, port)
181 self.hooks_uri = '{}:{}'.format(host, port)
175 self.txn_id = txn_id
182 self.txn_id = txn_id
176 self._done = False
183 self._done = False
177
184
178 log.debug(
185 log.debug(
179 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
186 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
180 self.hooks_uri, HooksHttpHandler)
187 self.hooks_uri, HooksHttpHandler)
181
188
182 self._daemon = TCPServer(server_address, HooksHttpHandler)
189 self._daemon = TCPServer(server_address, HooksHttpHandler)
183 # inject transaction_id for later verification
190 # inject transaction_id for later verification
184 self._daemon.txn_id = self.txn_id
191 self._daemon.txn_id = self.txn_id
185
192
186 def _run(self):
193 def _run(self):
187 log.debug("Running event loop of callback daemon in background thread")
194 log.debug("Running event loop of callback daemon in background thread")
188 callback_thread = threading.Thread(
195 callback_thread = threading.Thread(
189 target=self._daemon.serve_forever,
196 target=self._daemon.serve_forever,
190 kwargs={'poll_interval': self.POLL_INTERVAL})
197 kwargs={'poll_interval': self.POLL_INTERVAL})
191 callback_thread.daemon = True
198 callback_thread.daemon = True
192 callback_thread.start()
199 callback_thread.start()
193 self._callback_thread = callback_thread
200 self._callback_thread = callback_thread
194
201
195 def _stop(self):
202 def _stop(self):
196 log.debug("Waiting for background thread to finish.")
203 log.debug("Waiting for background thread to finish.")
197 self._daemon.shutdown()
204 self._daemon.shutdown()
198 self._callback_thread.join()
205 self._callback_thread.join()
199 self._daemon = None
206 self._daemon = None
200 self._callback_thread = None
207 self._callback_thread = None
201 if self.txn_id:
208 if self.txn_id:
202 txn_id_file = get_txn_id_data_path(self.txn_id)
209 txn_id_file = get_txn_id_data_path(self.txn_id)
203 log.debug('Cleaning up TXN ID %s', txn_id_file)
210 log.debug('Cleaning up TXN ID %s', txn_id_file)
204 if os.path.isfile(txn_id_file):
211 if os.path.isfile(txn_id_file):
205 os.remove(txn_id_file)
212 os.remove(txn_id_file)
206
213
207 log.debug("Background thread done.")
214 log.debug("Background thread done.")
208
215
209
216
210 def get_txn_id_data_path(txn_id):
217 def get_txn_id_data_path(txn_id):
211 import rhodecode
218 import rhodecode
212
219
213 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
220 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
214 final_dir = os.path.join(root, 'svn_txn_id')
221 final_dir = os.path.join(root, 'svn_txn_id')
215
222
216 if not os.path.isdir(final_dir):
223 if not os.path.isdir(final_dir):
217 os.makedirs(final_dir)
224 os.makedirs(final_dir)
218 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
225 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
219
226
220
227
221 def store_txn_id_data(txn_id, data_dict):
228 def store_txn_id_data(txn_id, data_dict):
222 if not txn_id:
229 if not txn_id:
223 log.warning('Cannot store txn_id because it is empty')
230 log.warning('Cannot store txn_id because it is empty')
224 return
231 return
225
232
226 path = get_txn_id_data_path(txn_id)
233 path = get_txn_id_data_path(txn_id)
227 try:
234 try:
228 with open(path, 'wb') as f:
235 with open(path, 'wb') as f:
229 f.write(json.dumps(data_dict))
236 f.write(json.dumps(data_dict))
230 except Exception:
237 except Exception:
231 log.exception('Failed to write txn_id metadata')
238 log.exception('Failed to write txn_id metadata')
232
239
233
240
234 def get_txn_id_from_store(txn_id):
241 def get_txn_id_from_store(txn_id):
235 """
242 """
236 Reads txn_id from store and if present returns the data for callback manager
243 Reads txn_id from store and if present returns the data for callback manager
237 """
244 """
238 path = get_txn_id_data_path(txn_id)
245 path = get_txn_id_data_path(txn_id)
239 try:
246 try:
240 with open(path, 'rb') as f:
247 with open(path, 'rb') as f:
241 return json.loads(f.read())
248 return json.loads(f.read())
242 except Exception:
249 except Exception:
243 return {}
250 return {}
244
251
245
252
246 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
253 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
247 txn_details = get_txn_id_from_store(txn_id)
254 txn_details = get_txn_id_from_store(txn_id)
248 port = txn_details.get('port', 0)
255 port = txn_details.get('port', 0)
249 if use_direct_calls:
256 if use_direct_calls:
250 callback_daemon = DummyHooksCallbackDaemon()
257 callback_daemon = DummyHooksCallbackDaemon()
251 extras['hooks_module'] = callback_daemon.hooks_module
258 extras['hooks_module'] = callback_daemon.hooks_module
252 else:
259 else:
253 if protocol == 'http':
260 if protocol == 'http':
254 callback_daemon = HttpHooksCallbackDaemon(
261 callback_daemon = HttpHooksCallbackDaemon(
255 txn_id=txn_id, host=host, port=port)
262 txn_id=txn_id, host=host, port=port)
256 else:
263 else:
257 log.error('Unsupported callback daemon protocol "%s"', protocol)
264 log.error('Unsupported callback daemon protocol "%s"', protocol)
258 raise Exception('Unsupported callback daemon protocol.')
265 raise Exception('Unsupported callback daemon protocol.')
259
266
260 extras['hooks_uri'] = callback_daemon.hooks_uri
267 extras['hooks_uri'] = callback_daemon.hooks_uri
261 extras['hooks_protocol'] = protocol
268 extras['hooks_protocol'] = protocol
262 extras['time'] = time.time()
269 extras['time'] = time.time()
263
270
264 # register txn_id
271 # register txn_id
265 extras['txn_id'] = txn_id
272 extras['txn_id'] = txn_id
266 log.debug('Prepared a callback daemon: %s at url `%s`',
273 log.debug('Prepared a callback daemon: %s at url `%s`',
267 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
274 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
268 return callback_daemon, extras
275 return callback_daemon, extras
269
276
270
277
271 class Hooks(object):
278 class Hooks(object):
272 """
279 """
273 Exposes the hooks for remote call backs
280 Exposes the hooks for remote call backs
274 """
281 """
275
282
276 def repo_size(self, extras):
283 def repo_size(self, extras):
277 log.debug("Called repo_size of %s object", self)
284 log.debug("Called repo_size of %s object", self)
278 return self._call_hook(hooks_base.repo_size, extras)
285 return self._call_hook(hooks_base.repo_size, extras)
279
286
280 def pre_pull(self, extras):
287 def pre_pull(self, extras):
281 log.debug("Called pre_pull of %s object", self)
288 log.debug("Called pre_pull of %s object", self)
282 return self._call_hook(hooks_base.pre_pull, extras)
289 return self._call_hook(hooks_base.pre_pull, extras)
283
290
284 def post_pull(self, extras):
291 def post_pull(self, extras):
285 log.debug("Called post_pull of %s object", self)
292 log.debug("Called post_pull of %s object", self)
286 return self._call_hook(hooks_base.post_pull, extras)
293 return self._call_hook(hooks_base.post_pull, extras)
287
294
288 def pre_push(self, extras):
295 def pre_push(self, extras):
289 log.debug("Called pre_push of %s object", self)
296 log.debug("Called pre_push of %s object", self)
290 return self._call_hook(hooks_base.pre_push, extras)
297 return self._call_hook(hooks_base.pre_push, extras)
291
298
292 def post_push(self, extras):
299 def post_push(self, extras):
293 log.debug("Called post_push of %s object", self)
300 log.debug("Called post_push of %s object", self)
294 return self._call_hook(hooks_base.post_push, extras)
301 return self._call_hook(hooks_base.post_push, extras)
295
302
296 def _call_hook(self, hook, extras):
303 def _call_hook(self, hook, extras):
297 extras = AttributeDict(extras)
304 extras = AttributeDict(extras)
298 server_url = extras['server_url']
305 server_url = extras['server_url']
299 request = bootstrap_request(application_url=server_url)
306 request = bootstrap_request(application_url=server_url)
300
307
301 bootstrap_config(request) # inject routes and other interfaces
308 bootstrap_config(request) # inject routes and other interfaces
302
309
303 # inject the user for usage in hooks
310 # inject the user for usage in hooks
304 request.user = AttributeDict({'username': extras.username,
311 request.user = AttributeDict({'username': extras.username,
305 'ip_addr': extras.ip,
312 'ip_addr': extras.ip,
306 'user_id': extras.user_id})
313 'user_id': extras.user_id})
307
314
308 extras.request = request
315 extras.request = request
309
316
310 try:
317 try:
311 result = hook(extras)
318 result = hook(extras)
312 if result is None:
319 if result is None:
313 raise Exception(
320 raise Exception(
314 'Failed to obtain hook result from func: {}'.format(hook))
321 'Failed to obtain hook result from func: {}'.format(hook))
315 except HTTPBranchProtected as handled_error:
322 except HTTPBranchProtected as handled_error:
316 # Those special cases doesn't need error reporting. It's a case of
323 # Those special cases doesn't need error reporting. It's a case of
317 # locked repo or protected branch
324 # locked repo or protected branch
318 result = AttributeDict({
325 result = AttributeDict({
319 'status': handled_error.code,
326 'status': handled_error.code,
320 'output': handled_error.explanation
327 'output': handled_error.explanation
321 })
328 })
322 except (HTTPLockedRC, Exception) as error:
329 except (HTTPLockedRC, Exception) as error:
323 # locked needs different handling since we need to also
330 # locked needs different handling since we need to also
324 # handle PULL operations
331 # handle PULL operations
325 exc_tb = ''
332 exc_tb = ''
326 if not isinstance(error, HTTPLockedRC):
333 if not isinstance(error, HTTPLockedRC):
327 exc_tb = traceback.format_exc()
334 exc_tb = traceback.format_exc()
328 log.exception('Exception when handling hook %s', hook)
335 log.exception('Exception when handling hook %s', hook)
329 error_args = error.args
336 error_args = error.args
330 return {
337 return {
331 'status': 128,
338 'status': 128,
332 'output': '',
339 'output': '',
333 'exception': type(error).__name__,
340 'exception': type(error).__name__,
334 'exception_traceback': exc_tb,
341 'exception_traceback': exc_tb,
335 'exception_args': error_args,
342 'exception_args': error_args,
336 }
343 }
337 finally:
344 finally:
338 meta.Session.remove()
345 meta.Session.remove()
339
346
340 log.debug('Got hook call response %s', result)
347 log.debug('Got hook call response %s', result)
341 return {
348 return {
342 'status': result.status,
349 'status': result.status,
343 'output': result.output,
350 'output': result.output,
344 }
351 }
345
352
346 def __enter__(self):
353 def __enter__(self):
347 return self
354 return self
348
355
349 def __exit__(self, exc_type, exc_val, exc_tb):
356 def __exit__(self, exc_type, exc_val, exc_tb):
350 pass
357 pass
General Comments 0
You need to be logged in to leave comments. Login now