##// END OF EJS Templates
hooks: generate bind port before init of TcpServer for better logging.
super-admin -
r4855:2a635a31 default
parent child Browse files
Show More
@@ -1,334 +1,349 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27 import socket
27
28
28 from BaseHTTPServer import BaseHTTPRequestHandler
29 from BaseHTTPServer import BaseHTTPRequestHandler
29 from SocketServer import TCPServer
30 from SocketServer import TCPServer
30
31
31 import rhodecode
32 import rhodecode
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 from rhodecode.model import meta
34 from rhodecode.model import meta
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 from rhodecode.lib import hooks_base
36 from rhodecode.lib import hooks_base
36 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.ext_json import json
38 from rhodecode.lib.ext_json import json
38 from rhodecode.lib import rc_cache
39 from rhodecode.lib import rc_cache
39
40
40 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
41
42
42
43
43 class HooksHttpHandler(BaseHTTPRequestHandler):
44 class HooksHttpHandler(BaseHTTPRequestHandler):
44
45
45 def do_POST(self):
46 def do_POST(self):
46 method, extras = self._read_request()
47 method, extras = self._read_request()
47 txn_id = getattr(self.server, 'txn_id', None)
48 txn_id = getattr(self.server, 'txn_id', None)
48 if txn_id:
49 if txn_id:
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 extras['repository'], extras['txn_id'])
51 extras['repository'], extras['txn_id'])
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 extras['repository'], extras['txn_id'])
53 extras['repository'], extras['txn_id'])
53 if txn_id != computed_txn_id:
54 if txn_id != computed_txn_id:
54 raise Exception(
55 raise Exception(
55 'TXN ID fail: expected {} got {} instead'.format(
56 'TXN ID fail: expected {} got {} instead'.format(
56 txn_id, computed_txn_id))
57 txn_id, computed_txn_id))
57
58
58 try:
59 try:
59 result = self._call_hook(method, extras)
60 result = self._call_hook(method, extras)
60 except Exception as e:
61 except Exception as e:
61 exc_tb = traceback.format_exc()
62 exc_tb = traceback.format_exc()
62 result = {
63 result = {
63 'exception': e.__class__.__name__,
64 'exception': e.__class__.__name__,
64 'exception_traceback': exc_tb,
65 'exception_traceback': exc_tb,
65 'exception_args': e.args
66 'exception_args': e.args
66 }
67 }
67 self._write_response(result)
68 self._write_response(result)
68
69
69 def _read_request(self):
70 def _read_request(self):
70 length = int(self.headers['Content-Length'])
71 length = int(self.headers['Content-Length'])
71 body = self.rfile.read(length).decode('utf-8')
72 body = self.rfile.read(length).decode('utf-8')
72 data = json.loads(body)
73 data = json.loads(body)
73 return data['method'], data['extras']
74 return data['method'], data['extras']
74
75
75 def _write_response(self, result):
76 def _write_response(self, result):
76 self.send_response(200)
77 self.send_response(200)
77 self.send_header("Content-type", "text/json")
78 self.send_header("Content-type", "text/json")
78 self.end_headers()
79 self.end_headers()
79 self.wfile.write(json.dumps(result))
80 self.wfile.write(json.dumps(result))
80
81
81 def _call_hook(self, method, extras):
82 def _call_hook(self, method, extras):
82 hooks = Hooks()
83 hooks = Hooks()
83 try:
84 try:
84 result = getattr(hooks, method)(extras)
85 result = getattr(hooks, method)(extras)
85 finally:
86 finally:
86 meta.Session.remove()
87 meta.Session.remove()
87 return result
88 return result
88
89
89 def log_message(self, format, *args):
90 def log_message(self, format, *args):
90 """
91 """
91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 This is an overridden method of BaseHTTPRequestHandler which logs using
92 logging library instead of writing directly to stderr.
93 logging library instead of writing directly to stderr.
93 """
94 """
94
95
95 message = format % args
96 message = format % args
96
97
97 log.debug(
98 log.debug(
98 "%s - - [%s] %s", self.client_address[0],
99 "%s - - [%s] %s", self.client_address[0],
99 self.log_date_time_string(), message)
100 self.log_date_time_string(), message)
100
101
101
102
102 class DummyHooksCallbackDaemon(object):
103 class DummyHooksCallbackDaemon(object):
103 hooks_uri = ''
104 hooks_uri = ''
104
105
105 def __init__(self):
106 def __init__(self):
106 self.hooks_module = Hooks.__module__
107 self.hooks_module = Hooks.__module__
107
108
108 def __enter__(self):
109 def __enter__(self):
109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 return self
111 return self
111
112
112 def __exit__(self, exc_type, exc_val, exc_tb):
113 def __exit__(self, exc_type, exc_val, exc_tb):
113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114
115
115
116
116 class ThreadedHookCallbackDaemon(object):
117 class ThreadedHookCallbackDaemon(object):
117
118
118 _callback_thread = None
119 _callback_thread = None
119 _daemon = None
120 _daemon = None
120 _done = False
121 _done = False
121
122
122 def __init__(self, txn_id=None, host=None, port=None):
123 def __init__(self, txn_id=None, host=None, port=None):
123 self._prepare(txn_id=txn_id, host=host, port=port)
124 self._prepare(txn_id=txn_id, host=host, port=port)
124
125
125 def __enter__(self):
126 def __enter__(self):
126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 self._run()
128 self._run()
128 return self
129 return self
129
130
130 def __exit__(self, exc_type, exc_val, exc_tb):
131 def __exit__(self, exc_type, exc_val, exc_tb):
131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 self._stop()
133 self._stop()
133
134
134 def _prepare(self, txn_id=None, host=None, port=None):
135 def _prepare(self, txn_id=None, host=None, port=None):
135 raise NotImplementedError()
136 raise NotImplementedError()
136
137
137 def _run(self):
138 def _run(self):
138 raise NotImplementedError()
139 raise NotImplementedError()
139
140
140 def _stop(self):
141 def _stop(self):
141 raise NotImplementedError()
142 raise NotImplementedError()
142
143
143
144
144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 """
146 """
146 Context manager which will run a callback daemon in a background thread.
147 Context manager which will run a callback daemon in a background thread.
147 """
148 """
148
149
149 hooks_uri = None
150 hooks_uri = None
150
151
151 # From Python docs: Polling reduces our responsiveness to a shutdown
152 # From Python docs: Polling reduces our responsiveness to a shutdown
152 # request and wastes cpu at all other times.
153 # request and wastes cpu at all other times.
153 POLL_INTERVAL = 0.01
154 POLL_INTERVAL = 0.01
154
155
156 def get_available_port():
157 family = socket.AF_INET
158 socktype = socket.SOCK_STREAM
159 host = '127.0.0.1'
160
161 mysocket = socket.socket(family, socktype)
162 mysocket.bind((host, 0))
163 port = mysocket.getsockname()[1]
164 mysocket.close()
165 del mysocket
166 return port
167
155 def _prepare(self, txn_id=None, host=None, port=None):
168 def _prepare(self, txn_id=None, host=None, port=None):
156
169
157 host = host or '127.0.0.1'
170 host = host or '127.0.0.1'
158 self._done = False
171 port = port or self.get_available_port()
172 server_address = (host, port)
159 self.hooks_uri = '{}:{}'.format(host, port)
173 self.hooks_uri = '{}:{}'.format(host, port)
160 self.txn_id = txn_id
174 self.txn_id = txn_id
175 self._done = False
176
177 log.debug(
178 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
179 self.hooks_uri, HooksHttpHandler)
180
181 self._daemon = TCPServer(server_address, HooksHttpHandler)
161 # inject transaction_id for later verification
182 # inject transaction_id for later verification
162 self._daemon.txn_id = self.txn_id
183 self._daemon.txn_id = self.txn_id
163 log.debug(
164 "Preparing HTTP callback daemon at `%s` and registering hook object",
165 self.hooks_uri)
166
167 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
168 _, port = self._daemon.server_address
169
184
170 def _run(self):
185 def _run(self):
171 log.debug("Running event loop of callback daemon in background thread")
186 log.debug("Running event loop of callback daemon in background thread")
172 callback_thread = threading.Thread(
187 callback_thread = threading.Thread(
173 target=self._daemon.serve_forever,
188 target=self._daemon.serve_forever,
174 kwargs={'poll_interval': self.POLL_INTERVAL})
189 kwargs={'poll_interval': self.POLL_INTERVAL})
175 callback_thread.daemon = True
190 callback_thread.daemon = True
176 callback_thread.start()
191 callback_thread.start()
177 self._callback_thread = callback_thread
192 self._callback_thread = callback_thread
178
193
179 def _stop(self):
194 def _stop(self):
180 log.debug("Waiting for background thread to finish.")
195 log.debug("Waiting for background thread to finish.")
181 self._daemon.shutdown()
196 self._daemon.shutdown()
182 self._callback_thread.join()
197 self._callback_thread.join()
183 self._daemon = None
198 self._daemon = None
184 self._callback_thread = None
199 self._callback_thread = None
185 if self.txn_id:
200 if self.txn_id:
186 txn_id_file = get_txn_id_data_path(self.txn_id)
201 txn_id_file = get_txn_id_data_path(self.txn_id)
187 log.debug('Cleaning up TXN ID %s', txn_id_file)
202 log.debug('Cleaning up TXN ID %s', txn_id_file)
188 if os.path.isfile(txn_id_file):
203 if os.path.isfile(txn_id_file):
189 os.remove(txn_id_file)
204 os.remove(txn_id_file)
190
205
191 log.debug("Background thread done.")
206 log.debug("Background thread done.")
192
207
193
208
194 def get_txn_id_data_path(txn_id):
209 def get_txn_id_data_path(txn_id):
195 import rhodecode
210 import rhodecode
196
211
197 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
212 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
198 final_dir = os.path.join(root, 'svn_txn_id')
213 final_dir = os.path.join(root, 'svn_txn_id')
199
214
200 if not os.path.isdir(final_dir):
215 if not os.path.isdir(final_dir):
201 os.makedirs(final_dir)
216 os.makedirs(final_dir)
202 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
217 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
203
218
204
219
205 def store_txn_id_data(txn_id, data_dict):
220 def store_txn_id_data(txn_id, data_dict):
206 if not txn_id:
221 if not txn_id:
207 log.warning('Cannot store txn_id because it is empty')
222 log.warning('Cannot store txn_id because it is empty')
208 return
223 return
209
224
210 path = get_txn_id_data_path(txn_id)
225 path = get_txn_id_data_path(txn_id)
211 try:
226 try:
212 with open(path, 'wb') as f:
227 with open(path, 'wb') as f:
213 f.write(json.dumps(data_dict))
228 f.write(json.dumps(data_dict))
214 except Exception:
229 except Exception:
215 log.exception('Failed to write txn_id metadata')
230 log.exception('Failed to write txn_id metadata')
216
231
217
232
218 def get_txn_id_from_store(txn_id):
233 def get_txn_id_from_store(txn_id):
219 """
234 """
220 Reads txn_id from store and if present returns the data for callback manager
235 Reads txn_id from store and if present returns the data for callback manager
221 """
236 """
222 path = get_txn_id_data_path(txn_id)
237 path = get_txn_id_data_path(txn_id)
223 try:
238 try:
224 with open(path, 'rb') as f:
239 with open(path, 'rb') as f:
225 return json.loads(f.read())
240 return json.loads(f.read())
226 except Exception:
241 except Exception:
227 return {}
242 return {}
228
243
229
244
230 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
245 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
231 txn_details = get_txn_id_from_store(txn_id)
246 txn_details = get_txn_id_from_store(txn_id)
232 port = txn_details.get('port', 0)
247 port = txn_details.get('port', 0)
233 if use_direct_calls:
248 if use_direct_calls:
234 callback_daemon = DummyHooksCallbackDaemon()
249 callback_daemon = DummyHooksCallbackDaemon()
235 extras['hooks_module'] = callback_daemon.hooks_module
250 extras['hooks_module'] = callback_daemon.hooks_module
236 else:
251 else:
237 if protocol == 'http':
252 if protocol == 'http':
238 callback_daemon = HttpHooksCallbackDaemon(
253 callback_daemon = HttpHooksCallbackDaemon(
239 txn_id=txn_id, host=host, port=port)
254 txn_id=txn_id, host=host, port=port)
240 else:
255 else:
241 log.error('Unsupported callback daemon protocol "%s"', protocol)
256 log.error('Unsupported callback daemon protocol "%s"', protocol)
242 raise Exception('Unsupported callback daemon protocol.')
257 raise Exception('Unsupported callback daemon protocol.')
243
258
244 extras['hooks_uri'] = callback_daemon.hooks_uri
259 extras['hooks_uri'] = callback_daemon.hooks_uri
245 extras['hooks_protocol'] = protocol
260 extras['hooks_protocol'] = protocol
246 extras['time'] = time.time()
261 extras['time'] = time.time()
247
262
248 # register txn_id
263 # register txn_id
249 extras['txn_id'] = txn_id
264 extras['txn_id'] = txn_id
250 log.debug('Prepared a callback daemon: %s at url `%s`',
265 log.debug('Prepared a callback daemon: %s at url `%s`',
251 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
266 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
252 return callback_daemon, extras
267 return callback_daemon, extras
253
268
254
269
255 class Hooks(object):
270 class Hooks(object):
256 """
271 """
257 Exposes the hooks for remote call backs
272 Exposes the hooks for remote call backs
258 """
273 """
259
274
260 def repo_size(self, extras):
275 def repo_size(self, extras):
261 log.debug("Called repo_size of %s object", self)
276 log.debug("Called repo_size of %s object", self)
262 return self._call_hook(hooks_base.repo_size, extras)
277 return self._call_hook(hooks_base.repo_size, extras)
263
278
264 def pre_pull(self, extras):
279 def pre_pull(self, extras):
265 log.debug("Called pre_pull of %s object", self)
280 log.debug("Called pre_pull of %s object", self)
266 return self._call_hook(hooks_base.pre_pull, extras)
281 return self._call_hook(hooks_base.pre_pull, extras)
267
282
268 def post_pull(self, extras):
283 def post_pull(self, extras):
269 log.debug("Called post_pull of %s object", self)
284 log.debug("Called post_pull of %s object", self)
270 return self._call_hook(hooks_base.post_pull, extras)
285 return self._call_hook(hooks_base.post_pull, extras)
271
286
272 def pre_push(self, extras):
287 def pre_push(self, extras):
273 log.debug("Called pre_push of %s object", self)
288 log.debug("Called pre_push of %s object", self)
274 return self._call_hook(hooks_base.pre_push, extras)
289 return self._call_hook(hooks_base.pre_push, extras)
275
290
276 def post_push(self, extras):
291 def post_push(self, extras):
277 log.debug("Called post_push of %s object", self)
292 log.debug("Called post_push of %s object", self)
278 return self._call_hook(hooks_base.post_push, extras)
293 return self._call_hook(hooks_base.post_push, extras)
279
294
280 def _call_hook(self, hook, extras):
295 def _call_hook(self, hook, extras):
281 extras = AttributeDict(extras)
296 extras = AttributeDict(extras)
282 server_url = extras['server_url']
297 server_url = extras['server_url']
283 request = bootstrap_request(application_url=server_url)
298 request = bootstrap_request(application_url=server_url)
284
299
285 bootstrap_config(request) # inject routes and other interfaces
300 bootstrap_config(request) # inject routes and other interfaces
286
301
287 # inject the user for usage in hooks
302 # inject the user for usage in hooks
288 request.user = AttributeDict({'username': extras.username,
303 request.user = AttributeDict({'username': extras.username,
289 'ip_addr': extras.ip,
304 'ip_addr': extras.ip,
290 'user_id': extras.user_id})
305 'user_id': extras.user_id})
291
306
292 extras.request = request
307 extras.request = request
293
308
294 try:
309 try:
295 result = hook(extras)
310 result = hook(extras)
296 if result is None:
311 if result is None:
297 raise Exception(
312 raise Exception(
298 'Failed to obtain hook result from func: {}'.format(hook))
313 'Failed to obtain hook result from func: {}'.format(hook))
299 except HTTPBranchProtected as handled_error:
314 except HTTPBranchProtected as handled_error:
300 # Those special cases doesn't need error reporting. It's a case of
315 # Those special cases doesn't need error reporting. It's a case of
301 # locked repo or protected branch
316 # locked repo or protected branch
302 result = AttributeDict({
317 result = AttributeDict({
303 'status': handled_error.code,
318 'status': handled_error.code,
304 'output': handled_error.explanation
319 'output': handled_error.explanation
305 })
320 })
306 except (HTTPLockedRC, Exception) as error:
321 except (HTTPLockedRC, Exception) as error:
307 # locked needs different handling since we need to also
322 # locked needs different handling since we need to also
308 # handle PULL operations
323 # handle PULL operations
309 exc_tb = ''
324 exc_tb = ''
310 if not isinstance(error, HTTPLockedRC):
325 if not isinstance(error, HTTPLockedRC):
311 exc_tb = traceback.format_exc()
326 exc_tb = traceback.format_exc()
312 log.exception('Exception when handling hook %s', hook)
327 log.exception('Exception when handling hook %s', hook)
313 error_args = error.args
328 error_args = error.args
314 return {
329 return {
315 'status': 128,
330 'status': 128,
316 'output': '',
331 'output': '',
317 'exception': type(error).__name__,
332 'exception': type(error).__name__,
318 'exception_traceback': exc_tb,
333 'exception_traceback': exc_tb,
319 'exception_args': error_args,
334 'exception_args': error_args,
320 }
335 }
321 finally:
336 finally:
322 meta.Session.remove()
337 meta.Session.remove()
323
338
324 log.debug('Got hook call response %s', result)
339 log.debug('Got hook call response %s', result)
325 return {
340 return {
326 'status': result.status,
341 'status': result.status,
327 'output': result.output,
342 'output': result.output,
328 }
343 }
329
344
330 def __enter__(self):
345 def __enter__(self):
331 return self
346 return self
332
347
333 def __exit__(self, exc_type, exc_val, exc_tb):
348 def __exit__(self, exc_type, exc_val, exc_tb):
334 pass
349 pass
General Comments 0
You need to be logged in to leave comments. Login now