##// END OF EJS Templates
hooks: fix logging info on callback daemon
super-admin -
r4854:a2853d3d default
parent child Browse files
Show More
@@ -1,333 +1,334 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27
28 28 from BaseHTTPServer import BaseHTTPRequestHandler
29 29 from SocketServer import TCPServer
30 30
31 31 import rhodecode
32 32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 33 from rhodecode.model import meta
34 34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 35 from rhodecode.lib import hooks_base
36 36 from rhodecode.lib.utils2 import AttributeDict
37 37 from rhodecode.lib.ext_json import json
38 38 from rhodecode.lib import rc_cache
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class HooksHttpHandler(BaseHTTPRequestHandler):
44 44
45 45 def do_POST(self):
46 46 method, extras = self._read_request()
47 47 txn_id = getattr(self.server, 'txn_id', None)
48 48 if txn_id:
49 49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 50 extras['repository'], extras['txn_id'])
51 51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 52 extras['repository'], extras['txn_id'])
53 53 if txn_id != computed_txn_id:
54 54 raise Exception(
55 55 'TXN ID fail: expected {} got {} instead'.format(
56 56 txn_id, computed_txn_id))
57 57
58 58 try:
59 59 result = self._call_hook(method, extras)
60 60 except Exception as e:
61 61 exc_tb = traceback.format_exc()
62 62 result = {
63 63 'exception': e.__class__.__name__,
64 64 'exception_traceback': exc_tb,
65 65 'exception_args': e.args
66 66 }
67 67 self._write_response(result)
68 68
69 69 def _read_request(self):
70 70 length = int(self.headers['Content-Length'])
71 71 body = self.rfile.read(length).decode('utf-8')
72 72 data = json.loads(body)
73 73 return data['method'], data['extras']
74 74
75 75 def _write_response(self, result):
76 76 self.send_response(200)
77 77 self.send_header("Content-type", "text/json")
78 78 self.end_headers()
79 79 self.wfile.write(json.dumps(result))
80 80
81 81 def _call_hook(self, method, extras):
82 82 hooks = Hooks()
83 83 try:
84 84 result = getattr(hooks, method)(extras)
85 85 finally:
86 86 meta.Session.remove()
87 87 return result
88 88
89 89 def log_message(self, format, *args):
90 90 """
91 91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 92 logging library instead of writing directly to stderr.
93 93 """
94 94
95 95 message = format % args
96 96
97 97 log.debug(
98 98 "%s - - [%s] %s", self.client_address[0],
99 99 self.log_date_time_string(), message)
100 100
101 101
102 102 class DummyHooksCallbackDaemon(object):
103 103 hooks_uri = ''
104 104
105 105 def __init__(self):
106 106 self.hooks_module = Hooks.__module__
107 107
108 108 def __enter__(self):
109 109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 110 return self
111 111
112 112 def __exit__(self, exc_type, exc_val, exc_tb):
113 113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114 114
115 115
116 116 class ThreadedHookCallbackDaemon(object):
117 117
118 118 _callback_thread = None
119 119 _daemon = None
120 120 _done = False
121 121
122 122 def __init__(self, txn_id=None, host=None, port=None):
123 123 self._prepare(txn_id=txn_id, host=host, port=port)
124 124
125 125 def __enter__(self):
126 126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 127 self._run()
128 128 return self
129 129
130 130 def __exit__(self, exc_type, exc_val, exc_tb):
131 131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 132 self._stop()
133 133
134 134 def _prepare(self, txn_id=None, host=None, port=None):
135 135 raise NotImplementedError()
136 136
137 137 def _run(self):
138 138 raise NotImplementedError()
139 139
140 140 def _stop(self):
141 141 raise NotImplementedError()
142 142
143 143
144 144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 145 """
146 146 Context manager which will run a callback daemon in a background thread.
147 147 """
148 148
149 149 hooks_uri = None
150 150
151 151 # From Python docs: Polling reduces our responsiveness to a shutdown
152 152 # request and wastes cpu at all other times.
153 153 POLL_INTERVAL = 0.01
154 154
155 155 def _prepare(self, txn_id=None, host=None, port=None):
156
156 157 host = host or '127.0.0.1'
157 158 self._done = False
158 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
159 _, port = self._daemon.server_address
160 159 self.hooks_uri = '{}:{}'.format(host, port)
161 160 self.txn_id = txn_id
162 161 # inject transaction_id for later verification
163 162 self._daemon.txn_id = self.txn_id
164
165 163 log.debug(
166 164 "Preparing HTTP callback daemon at `%s` and registering hook object",
167 165 self.hooks_uri)
168 166
167 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
168 _, port = self._daemon.server_address
169
169 170 def _run(self):
170 171 log.debug("Running event loop of callback daemon in background thread")
171 172 callback_thread = threading.Thread(
172 173 target=self._daemon.serve_forever,
173 174 kwargs={'poll_interval': self.POLL_INTERVAL})
174 175 callback_thread.daemon = True
175 176 callback_thread.start()
176 177 self._callback_thread = callback_thread
177 178
178 179 def _stop(self):
179 180 log.debug("Waiting for background thread to finish.")
180 181 self._daemon.shutdown()
181 182 self._callback_thread.join()
182 183 self._daemon = None
183 184 self._callback_thread = None
184 185 if self.txn_id:
185 186 txn_id_file = get_txn_id_data_path(self.txn_id)
186 187 log.debug('Cleaning up TXN ID %s', txn_id_file)
187 188 if os.path.isfile(txn_id_file):
188 189 os.remove(txn_id_file)
189 190
190 191 log.debug("Background thread done.")
191 192
192 193
193 194 def get_txn_id_data_path(txn_id):
194 195 import rhodecode
195 196
196 197 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
197 198 final_dir = os.path.join(root, 'svn_txn_id')
198 199
199 200 if not os.path.isdir(final_dir):
200 201 os.makedirs(final_dir)
201 202 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
202 203
203 204
204 205 def store_txn_id_data(txn_id, data_dict):
205 206 if not txn_id:
206 207 log.warning('Cannot store txn_id because it is empty')
207 208 return
208 209
209 210 path = get_txn_id_data_path(txn_id)
210 211 try:
211 212 with open(path, 'wb') as f:
212 213 f.write(json.dumps(data_dict))
213 214 except Exception:
214 215 log.exception('Failed to write txn_id metadata')
215 216
216 217
217 218 def get_txn_id_from_store(txn_id):
218 219 """
219 220 Reads txn_id from store and if present returns the data for callback manager
220 221 """
221 222 path = get_txn_id_data_path(txn_id)
222 223 try:
223 224 with open(path, 'rb') as f:
224 225 return json.loads(f.read())
225 226 except Exception:
226 227 return {}
227 228
228 229
229 230 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
230 231 txn_details = get_txn_id_from_store(txn_id)
231 232 port = txn_details.get('port', 0)
232 233 if use_direct_calls:
233 234 callback_daemon = DummyHooksCallbackDaemon()
234 235 extras['hooks_module'] = callback_daemon.hooks_module
235 236 else:
236 237 if protocol == 'http':
237 238 callback_daemon = HttpHooksCallbackDaemon(
238 239 txn_id=txn_id, host=host, port=port)
239 240 else:
240 241 log.error('Unsupported callback daemon protocol "%s"', protocol)
241 242 raise Exception('Unsupported callback daemon protocol.')
242 243
243 244 extras['hooks_uri'] = callback_daemon.hooks_uri
244 245 extras['hooks_protocol'] = protocol
245 246 extras['time'] = time.time()
246 247
247 248 # register txn_id
248 249 extras['txn_id'] = txn_id
249 250 log.debug('Prepared a callback daemon: %s at url `%s`',
250 251 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 252 return callback_daemon, extras
252 253
253 254
254 255 class Hooks(object):
255 256 """
256 257 Exposes the hooks for remote call backs
257 258 """
258 259
259 260 def repo_size(self, extras):
260 261 log.debug("Called repo_size of %s object", self)
261 262 return self._call_hook(hooks_base.repo_size, extras)
262 263
263 264 def pre_pull(self, extras):
264 265 log.debug("Called pre_pull of %s object", self)
265 266 return self._call_hook(hooks_base.pre_pull, extras)
266 267
267 268 def post_pull(self, extras):
268 269 log.debug("Called post_pull of %s object", self)
269 270 return self._call_hook(hooks_base.post_pull, extras)
270 271
271 272 def pre_push(self, extras):
272 273 log.debug("Called pre_push of %s object", self)
273 274 return self._call_hook(hooks_base.pre_push, extras)
274 275
275 276 def post_push(self, extras):
276 277 log.debug("Called post_push of %s object", self)
277 278 return self._call_hook(hooks_base.post_push, extras)
278 279
279 280 def _call_hook(self, hook, extras):
280 281 extras = AttributeDict(extras)
281 282 server_url = extras['server_url']
282 283 request = bootstrap_request(application_url=server_url)
283 284
284 285 bootstrap_config(request) # inject routes and other interfaces
285 286
286 287 # inject the user for usage in hooks
287 288 request.user = AttributeDict({'username': extras.username,
288 289 'ip_addr': extras.ip,
289 290 'user_id': extras.user_id})
290 291
291 292 extras.request = request
292 293
293 294 try:
294 295 result = hook(extras)
295 296 if result is None:
296 297 raise Exception(
297 298 'Failed to obtain hook result from func: {}'.format(hook))
298 299 except HTTPBranchProtected as handled_error:
299 300 # Those special cases doesn't need error reporting. It's a case of
300 301 # locked repo or protected branch
301 302 result = AttributeDict({
302 303 'status': handled_error.code,
303 304 'output': handled_error.explanation
304 305 })
305 306 except (HTTPLockedRC, Exception) as error:
306 307 # locked needs different handling since we need to also
307 308 # handle PULL operations
308 309 exc_tb = ''
309 310 if not isinstance(error, HTTPLockedRC):
310 311 exc_tb = traceback.format_exc()
311 312 log.exception('Exception when handling hook %s', hook)
312 313 error_args = error.args
313 314 return {
314 315 'status': 128,
315 316 'output': '',
316 317 'exception': type(error).__name__,
317 318 'exception_traceback': exc_tb,
318 319 'exception_args': error_args,
319 320 }
320 321 finally:
321 322 meta.Session.remove()
322 323
323 324 log.debug('Got hook call response %s', result)
324 325 return {
325 326 'status': result.status,
326 327 'output': result.output,
327 328 }
328 329
329 330 def __enter__(self):
330 331 return self
331 332
332 333 def __exit__(self, exc_type, exc_val, exc_tb):
333 334 pass
General Comments 0
You need to be logged in to leave comments. Login now