##// END OF EJS Templates
logging: updated hooks deamon logs
dan -
r3934:2f9e92f8 default
parent child Browse files
Show More
@@ -1,332 +1,333 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2019 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27
28 28 from BaseHTTPServer import BaseHTTPRequestHandler
29 29 from SocketServer import TCPServer
30 30
31 31 import rhodecode
32 32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 33 from rhodecode.model import meta
34 34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 35 from rhodecode.lib import hooks_base
36 36 from rhodecode.lib.utils2 import AttributeDict
37 37 from rhodecode.lib.ext_json import json
38 38 from rhodecode.lib import rc_cache
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class HooksHttpHandler(BaseHTTPRequestHandler):
44 44
45 45 def do_POST(self):
46 46 method, extras = self._read_request()
47 47 txn_id = getattr(self.server, 'txn_id', None)
48 48 if txn_id:
49 49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 50 extras['repository'], extras['txn_id'])
51 51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 52 extras['repository'], extras['txn_id'])
53 53 if txn_id != computed_txn_id:
54 54 raise Exception(
55 55 'TXN ID fail: expected {} got {} instead'.format(
56 56 txn_id, computed_txn_id))
57 57
58 58 try:
59 59 result = self._call_hook(method, extras)
60 60 except Exception as e:
61 61 exc_tb = traceback.format_exc()
62 62 result = {
63 63 'exception': e.__class__.__name__,
64 64 'exception_traceback': exc_tb,
65 65 'exception_args': e.args
66 66 }
67 67 self._write_response(result)
68 68
69 69 def _read_request(self):
70 70 length = int(self.headers['Content-Length'])
71 71 body = self.rfile.read(length).decode('utf-8')
72 72 data = json.loads(body)
73 73 return data['method'], data['extras']
74 74
75 75 def _write_response(self, result):
76 76 self.send_response(200)
77 77 self.send_header("Content-type", "text/json")
78 78 self.end_headers()
79 79 self.wfile.write(json.dumps(result))
80 80
81 81 def _call_hook(self, method, extras):
82 82 hooks = Hooks()
83 83 try:
84 84 result = getattr(hooks, method)(extras)
85 85 finally:
86 86 meta.Session.remove()
87 87 return result
88 88
89 89 def log_message(self, format, *args):
90 90 """
91 91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 92 logging library instead of writing directly to stderr.
93 93 """
94 94
95 95 message = format % args
96 96
97 97 log.debug(
98 98 "%s - - [%s] %s", self.client_address[0],
99 99 self.log_date_time_string(), message)
100 100
101 101
102 102 class DummyHooksCallbackDaemon(object):
103 103 hooks_uri = ''
104 104
105 105 def __init__(self):
106 106 self.hooks_module = Hooks.__module__
107 107
108 108 def __enter__(self):
109 log.debug('Running dummy hooks callback daemon')
109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 110 return self
111 111
112 112 def __exit__(self, exc_type, exc_val, exc_tb):
113 log.debug('Exiting dummy hooks callback daemon')
113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114 114
115 115
116 116 class ThreadedHookCallbackDaemon(object):
117 117
118 118 _callback_thread = None
119 119 _daemon = None
120 120 _done = False
121 121
122 122 def __init__(self, txn_id=None, host=None, port=None):
123 123 self._prepare(txn_id=txn_id, host=None, port=port)
124 124
125 125 def __enter__(self):
126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
126 127 self._run()
127 128 return self
128 129
129 130 def __exit__(self, exc_type, exc_val, exc_tb):
130 log.debug('Callback daemon exiting now...')
131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
131 132 self._stop()
132 133
133 134 def _prepare(self, txn_id=None, host=None, port=None):
134 135 raise NotImplementedError()
135 136
136 137 def _run(self):
137 138 raise NotImplementedError()
138 139
139 140 def _stop(self):
140 141 raise NotImplementedError()
141 142
142 143
143 144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
144 145 """
145 146 Context manager which will run a callback daemon in a background thread.
146 147 """
147 148
148 149 hooks_uri = None
149 150
150 151 # From Python docs: Polling reduces our responsiveness to a shutdown
151 152 # request and wastes cpu at all other times.
152 153 POLL_INTERVAL = 0.01
153 154
154 155 def _prepare(self, txn_id=None, host=None, port=None):
155 156 host = host or '127.0.0.1'
156 157 self._done = False
157 158 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
158 159 _, port = self._daemon.server_address
159 160 self.hooks_uri = '{}:{}'.format(host, port)
160 161 self.txn_id = txn_id
161 162 # inject transaction_id for later verification
162 163 self._daemon.txn_id = self.txn_id
163 164
164 165 log.debug(
165 166 "Preparing HTTP callback daemon at `%s` and registering hook object",
166 167 self.hooks_uri)
167 168
168 169 def _run(self):
169 170 log.debug("Running event loop of callback daemon in background thread")
170 171 callback_thread = threading.Thread(
171 172 target=self._daemon.serve_forever,
172 173 kwargs={'poll_interval': self.POLL_INTERVAL})
173 174 callback_thread.daemon = True
174 175 callback_thread.start()
175 176 self._callback_thread = callback_thread
176 177
177 178 def _stop(self):
178 179 log.debug("Waiting for background thread to finish.")
179 180 self._daemon.shutdown()
180 181 self._callback_thread.join()
181 182 self._daemon = None
182 183 self._callback_thread = None
183 184 if self.txn_id:
184 185 txn_id_file = get_txn_id_data_path(self.txn_id)
185 186 log.debug('Cleaning up TXN ID %s', txn_id_file)
186 187 if os.path.isfile(txn_id_file):
187 188 os.remove(txn_id_file)
188 189
189 190 log.debug("Background thread done.")
190 191
191 192
192 193 def get_txn_id_data_path(txn_id):
193 194 import rhodecode
194 195
195 196 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
196 197 final_dir = os.path.join(root, 'svn_txn_id')
197 198
198 199 if not os.path.isdir(final_dir):
199 200 os.makedirs(final_dir)
200 201 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
201 202
202 203
203 204 def store_txn_id_data(txn_id, data_dict):
204 205 if not txn_id:
205 206 log.warning('Cannot store txn_id because it is empty')
206 207 return
207 208
208 209 path = get_txn_id_data_path(txn_id)
209 210 try:
210 211 with open(path, 'wb') as f:
211 212 f.write(json.dumps(data_dict))
212 213 except Exception:
213 214 log.exception('Failed to write txn_id metadata')
214 215
215 216
216 217 def get_txn_id_from_store(txn_id):
217 218 """
218 219 Reads txn_id from store and if present returns the data for callback manager
219 220 """
220 221 path = get_txn_id_data_path(txn_id)
221 222 try:
222 223 with open(path, 'rb') as f:
223 224 return json.loads(f.read())
224 225 except Exception:
225 226 return {}
226 227
227 228
228 229 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
229 230 txn_details = get_txn_id_from_store(txn_id)
230 231 port = txn_details.get('port', 0)
231 232 if use_direct_calls:
232 233 callback_daemon = DummyHooksCallbackDaemon()
233 234 extras['hooks_module'] = callback_daemon.hooks_module
234 235 else:
235 236 if protocol == 'http':
236 237 callback_daemon = HttpHooksCallbackDaemon(
237 238 txn_id=txn_id, host=host, port=port)
238 239 else:
239 240 log.error('Unsupported callback daemon protocol "%s"', protocol)
240 241 raise Exception('Unsupported callback daemon protocol.')
241 242
242 243 extras['hooks_uri'] = callback_daemon.hooks_uri
243 244 extras['hooks_protocol'] = protocol
244 245 extras['time'] = time.time()
245 246
246 247 # register txn_id
247 248 extras['txn_id'] = txn_id
248 249 log.debug('Prepared a callback daemon: %s at url `%s`',
249 250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
250 251 return callback_daemon, extras
251 252
252 253
253 254 class Hooks(object):
254 255 """
255 256 Exposes the hooks for remote call backs
256 257 """
257 258
258 259 def repo_size(self, extras):
259 260 log.debug("Called repo_size of %s object", self)
260 261 return self._call_hook(hooks_base.repo_size, extras)
261 262
262 263 def pre_pull(self, extras):
263 264 log.debug("Called pre_pull of %s object", self)
264 265 return self._call_hook(hooks_base.pre_pull, extras)
265 266
266 267 def post_pull(self, extras):
267 268 log.debug("Called post_pull of %s object", self)
268 269 return self._call_hook(hooks_base.post_pull, extras)
269 270
270 271 def pre_push(self, extras):
271 272 log.debug("Called pre_push of %s object", self)
272 273 return self._call_hook(hooks_base.pre_push, extras)
273 274
274 275 def post_push(self, extras):
275 276 log.debug("Called post_push of %s object", self)
276 277 return self._call_hook(hooks_base.post_push, extras)
277 278
278 279 def _call_hook(self, hook, extras):
279 280 extras = AttributeDict(extras)
280 281 server_url = extras['server_url']
281 282 request = bootstrap_request(application_url=server_url)
282 283
283 284 bootstrap_config(request) # inject routes and other interfaces
284 285
285 286 # inject the user for usage in hooks
286 287 request.user = AttributeDict({'username': extras.username,
287 288 'ip_addr': extras.ip,
288 289 'user_id': extras.user_id})
289 290
290 291 extras.request = request
291 292
292 293 try:
293 294 result = hook(extras)
294 295 if result is None:
295 296 raise Exception(
296 297 'Failed to obtain hook result from func: {}'.format(hook))
297 298 except HTTPBranchProtected as handled_error:
298 299 # Those special cases doesn't need error reporting. It's a case of
299 300 # locked repo or protected branch
300 301 result = AttributeDict({
301 302 'status': handled_error.code,
302 303 'output': handled_error.explanation
303 304 })
304 305 except (HTTPLockedRC, Exception) as error:
305 306 # locked needs different handling since we need to also
306 307 # handle PULL operations
307 308 exc_tb = ''
308 309 if not isinstance(error, HTTPLockedRC):
309 310 exc_tb = traceback.format_exc()
310 311 log.exception('Exception when handling hook %s', hook)
311 312 error_args = error.args
312 313 return {
313 314 'status': 128,
314 315 'output': '',
315 316 'exception': type(error).__name__,
316 317 'exception_traceback': exc_tb,
317 318 'exception_args': error_args,
318 319 }
319 320 finally:
320 321 meta.Session.remove()
321 322
322 323 log.debug('Got hook call response %s', result)
323 324 return {
324 325 'status': result.status,
325 326 'output': result.output,
326 327 }
327 328
328 329 def __enter__(self):
329 330 return self
330 331
331 332 def __exit__(self, exc_type, exc_val, exc_tb):
332 333 pass
General Comments 0
You need to be logged in to leave comments. Login now