##// END OF EJS Templates
hooks: generate bind port before init of TcpServer for better logging.
super-admin -
r4855:2a635a31 default
parent child Browse files
Show More
@@ -1,334 +1,349 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 import socket
27 28
28 29 from BaseHTTPServer import BaseHTTPRequestHandler
29 30 from SocketServer import TCPServer
30 31
31 32 import rhodecode
32 33 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 34 from rhodecode.model import meta
34 35 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 36 from rhodecode.lib import hooks_base
36 37 from rhodecode.lib.utils2 import AttributeDict
37 38 from rhodecode.lib.ext_json import json
38 39 from rhodecode.lib import rc_cache
39 40
40 41 log = logging.getLogger(__name__)
41 42
42 43
43 44 class HooksHttpHandler(BaseHTTPRequestHandler):
44 45
45 46 def do_POST(self):
46 47 method, extras = self._read_request()
47 48 txn_id = getattr(self.server, 'txn_id', None)
48 49 if txn_id:
49 50 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 51 extras['repository'], extras['txn_id'])
51 52 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 53 extras['repository'], extras['txn_id'])
53 54 if txn_id != computed_txn_id:
54 55 raise Exception(
55 56 'TXN ID fail: expected {} got {} instead'.format(
56 57 txn_id, computed_txn_id))
57 58
58 59 try:
59 60 result = self._call_hook(method, extras)
60 61 except Exception as e:
61 62 exc_tb = traceback.format_exc()
62 63 result = {
63 64 'exception': e.__class__.__name__,
64 65 'exception_traceback': exc_tb,
65 66 'exception_args': e.args
66 67 }
67 68 self._write_response(result)
68 69
69 70 def _read_request(self):
70 71 length = int(self.headers['Content-Length'])
71 72 body = self.rfile.read(length).decode('utf-8')
72 73 data = json.loads(body)
73 74 return data['method'], data['extras']
74 75
75 76 def _write_response(self, result):
76 77 self.send_response(200)
77 78 self.send_header("Content-type", "text/json")
78 79 self.end_headers()
79 80 self.wfile.write(json.dumps(result))
80 81
81 82 def _call_hook(self, method, extras):
82 83 hooks = Hooks()
83 84 try:
84 85 result = getattr(hooks, method)(extras)
85 86 finally:
86 87 meta.Session.remove()
87 88 return result
88 89
89 90 def log_message(self, format, *args):
90 91 """
91 92 This is an overridden method of BaseHTTPRequestHandler which logs using
92 93 logging library instead of writing directly to stderr.
93 94 """
94 95
95 96 message = format % args
96 97
97 98 log.debug(
98 99 "%s - - [%s] %s", self.client_address[0],
99 100 self.log_date_time_string(), message)
100 101
101 102
102 103 class DummyHooksCallbackDaemon(object):
103 104 hooks_uri = ''
104 105
105 106 def __init__(self):
106 107 self.hooks_module = Hooks.__module__
107 108
108 109 def __enter__(self):
109 110 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 111 return self
111 112
112 113 def __exit__(self, exc_type, exc_val, exc_tb):
113 114 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114 115
115 116
116 117 class ThreadedHookCallbackDaemon(object):
117 118
118 119 _callback_thread = None
119 120 _daemon = None
120 121 _done = False
121 122
122 123 def __init__(self, txn_id=None, host=None, port=None):
123 124 self._prepare(txn_id=txn_id, host=host, port=port)
124 125
125 126 def __enter__(self):
126 127 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 128 self._run()
128 129 return self
129 130
130 131 def __exit__(self, exc_type, exc_val, exc_tb):
131 132 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 133 self._stop()
133 134
134 135 def _prepare(self, txn_id=None, host=None, port=None):
135 136 raise NotImplementedError()
136 137
137 138 def _run(self):
138 139 raise NotImplementedError()
139 140
140 141 def _stop(self):
141 142 raise NotImplementedError()
142 143
143 144
144 145 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 146 """
146 147 Context manager which will run a callback daemon in a background thread.
147 148 """
148 149
149 150 hooks_uri = None
150 151
151 152 # From Python docs: Polling reduces our responsiveness to a shutdown
152 153 # request and wastes cpu at all other times.
153 154 POLL_INTERVAL = 0.01
154 155
156 def get_available_port():
157 family = socket.AF_INET
158 socktype = socket.SOCK_STREAM
159 host = '127.0.0.1'
160
161 mysocket = socket.socket(family, socktype)
162 mysocket.bind((host, 0))
163 port = mysocket.getsockname()[1]
164 mysocket.close()
165 del mysocket
166 return port
167
155 168 def _prepare(self, txn_id=None, host=None, port=None):
156 169
157 170 host = host or '127.0.0.1'
158 self._done = False
171 port = port or self.get_available_port()
172 server_address = (host, port)
159 173 self.hooks_uri = '{}:{}'.format(host, port)
160 174 self.txn_id = txn_id
175 self._done = False
176
177 log.debug(
178 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
179 self.hooks_uri, HooksHttpHandler)
180
181 self._daemon = TCPServer(server_address, HooksHttpHandler)
161 182 # inject transaction_id for later verification
162 183 self._daemon.txn_id = self.txn_id
163 log.debug(
164 "Preparing HTTP callback daemon at `%s` and registering hook object",
165 self.hooks_uri)
166
167 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
168 _, port = self._daemon.server_address
169 184
170 185 def _run(self):
171 186 log.debug("Running event loop of callback daemon in background thread")
172 187 callback_thread = threading.Thread(
173 188 target=self._daemon.serve_forever,
174 189 kwargs={'poll_interval': self.POLL_INTERVAL})
175 190 callback_thread.daemon = True
176 191 callback_thread.start()
177 192 self._callback_thread = callback_thread
178 193
179 194 def _stop(self):
180 195 log.debug("Waiting for background thread to finish.")
181 196 self._daemon.shutdown()
182 197 self._callback_thread.join()
183 198 self._daemon = None
184 199 self._callback_thread = None
185 200 if self.txn_id:
186 201 txn_id_file = get_txn_id_data_path(self.txn_id)
187 202 log.debug('Cleaning up TXN ID %s', txn_id_file)
188 203 if os.path.isfile(txn_id_file):
189 204 os.remove(txn_id_file)
190 205
191 206 log.debug("Background thread done.")
192 207
193 208
194 209 def get_txn_id_data_path(txn_id):
195 210 import rhodecode
196 211
197 212 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
198 213 final_dir = os.path.join(root, 'svn_txn_id')
199 214
200 215 if not os.path.isdir(final_dir):
201 216 os.makedirs(final_dir)
202 217 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
203 218
204 219
205 220 def store_txn_id_data(txn_id, data_dict):
206 221 if not txn_id:
207 222 log.warning('Cannot store txn_id because it is empty')
208 223 return
209 224
210 225 path = get_txn_id_data_path(txn_id)
211 226 try:
212 227 with open(path, 'wb') as f:
213 228 f.write(json.dumps(data_dict))
214 229 except Exception:
215 230 log.exception('Failed to write txn_id metadata')
216 231
217 232
218 233 def get_txn_id_from_store(txn_id):
219 234 """
220 235 Reads txn_id from store and if present returns the data for callback manager
221 236 """
222 237 path = get_txn_id_data_path(txn_id)
223 238 try:
224 239 with open(path, 'rb') as f:
225 240 return json.loads(f.read())
226 241 except Exception:
227 242 return {}
228 243
229 244
230 245 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
231 246 txn_details = get_txn_id_from_store(txn_id)
232 247 port = txn_details.get('port', 0)
233 248 if use_direct_calls:
234 249 callback_daemon = DummyHooksCallbackDaemon()
235 250 extras['hooks_module'] = callback_daemon.hooks_module
236 251 else:
237 252 if protocol == 'http':
238 253 callback_daemon = HttpHooksCallbackDaemon(
239 254 txn_id=txn_id, host=host, port=port)
240 255 else:
241 256 log.error('Unsupported callback daemon protocol "%s"', protocol)
242 257 raise Exception('Unsupported callback daemon protocol.')
243 258
244 259 extras['hooks_uri'] = callback_daemon.hooks_uri
245 260 extras['hooks_protocol'] = protocol
246 261 extras['time'] = time.time()
247 262
248 263 # register txn_id
249 264 extras['txn_id'] = txn_id
250 265 log.debug('Prepared a callback daemon: %s at url `%s`',
251 266 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
252 267 return callback_daemon, extras
253 268
254 269
255 270 class Hooks(object):
256 271 """
257 272 Exposes the hooks for remote call backs
258 273 """
259 274
260 275 def repo_size(self, extras):
261 276 log.debug("Called repo_size of %s object", self)
262 277 return self._call_hook(hooks_base.repo_size, extras)
263 278
264 279 def pre_pull(self, extras):
265 280 log.debug("Called pre_pull of %s object", self)
266 281 return self._call_hook(hooks_base.pre_pull, extras)
267 282
268 283 def post_pull(self, extras):
269 284 log.debug("Called post_pull of %s object", self)
270 285 return self._call_hook(hooks_base.post_pull, extras)
271 286
272 287 def pre_push(self, extras):
273 288 log.debug("Called pre_push of %s object", self)
274 289 return self._call_hook(hooks_base.pre_push, extras)
275 290
276 291 def post_push(self, extras):
277 292 log.debug("Called post_push of %s object", self)
278 293 return self._call_hook(hooks_base.post_push, extras)
279 294
280 295 def _call_hook(self, hook, extras):
281 296 extras = AttributeDict(extras)
282 297 server_url = extras['server_url']
283 298 request = bootstrap_request(application_url=server_url)
284 299
285 300 bootstrap_config(request) # inject routes and other interfaces
286 301
287 302 # inject the user for usage in hooks
288 303 request.user = AttributeDict({'username': extras.username,
289 304 'ip_addr': extras.ip,
290 305 'user_id': extras.user_id})
291 306
292 307 extras.request = request
293 308
294 309 try:
295 310 result = hook(extras)
296 311 if result is None:
297 312 raise Exception(
298 313 'Failed to obtain hook result from func: {}'.format(hook))
299 314 except HTTPBranchProtected as handled_error:
300 315 # Those special cases doesn't need error reporting. It's a case of
301 316 # locked repo or protected branch
302 317 result = AttributeDict({
303 318 'status': handled_error.code,
304 319 'output': handled_error.explanation
305 320 })
306 321 except (HTTPLockedRC, Exception) as error:
307 322 # locked needs different handling since we need to also
308 323 # handle PULL operations
309 324 exc_tb = ''
310 325 if not isinstance(error, HTTPLockedRC):
311 326 exc_tb = traceback.format_exc()
312 327 log.exception('Exception when handling hook %s', hook)
313 328 error_args = error.args
314 329 return {
315 330 'status': 128,
316 331 'output': '',
317 332 'exception': type(error).__name__,
318 333 'exception_traceback': exc_tb,
319 334 'exception_args': error_args,
320 335 }
321 336 finally:
322 337 meta.Session.remove()
323 338
324 339 log.debug('Got hook call response %s', result)
325 340 return {
326 341 'status': result.status,
327 342 'output': result.output,
328 343 }
329 344
330 345 def __enter__(self):
331 346 return self
332 347
333 348 def __exit__(self, exc_type, exc_val, exc_tb):
334 349 pass
General Comments 0
You need to be logged in to leave comments. Login now