##// END OF EJS Templates
hooks: support v2 hooks protocol using binary msgpack
super-admin -
r4902:cae6657c default
parent child Browse files
Show More
@@ -1,352 +1,365 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27 import socket
28 import random
28 import msgpack
29 29
30 30 from BaseHTTPServer import BaseHTTPRequestHandler
31 31 from SocketServer import TCPServer
32 32
33 33 import rhodecode
34 34 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
35 35 from rhodecode.model import meta
36 36 from rhodecode.lib.base import bootstrap_request, bootstrap_config
37 37 from rhodecode.lib import hooks_base
38 38 from rhodecode.lib.utils2 import AttributeDict
39 39 from rhodecode.lib.ext_json import json
40 40 from rhodecode.lib import rc_cache
41 41
42 42 log = logging.getLogger(__name__)
43 43
44 44
45 45 class HooksHttpHandler(BaseHTTPRequestHandler):
46 46
47 47 def do_POST(self):
48 method, extras = self._read_request()
48 hooks_proto, method, extras = self._read_request()
49 log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto)
50
49 51 txn_id = getattr(self.server, 'txn_id', None)
50 52 if txn_id:
51 53 log.debug('Computing TXN_ID based on `%s`:`%s`',
52 54 extras['repository'], extras['txn_id'])
53 55 computed_txn_id = rc_cache.utils.compute_key_from_params(
54 56 extras['repository'], extras['txn_id'])
55 57 if txn_id != computed_txn_id:
56 58 raise Exception(
57 59 'TXN ID fail: expected {} got {} instead'.format(
58 60 txn_id, computed_txn_id))
59 61
60 62 request = getattr(self.server, 'request', None)
61 63 try:
62 64 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
63 65 result = self._call_hook_method(hooks, method, extras)
64 66 except Exception as e:
65 67 exc_tb = traceback.format_exc()
66 68 result = {
67 69 'exception': e.__class__.__name__,
68 70 'exception_traceback': exc_tb,
69 71 'exception_args': e.args
70 72 }
71 self._write_response(result)
73 self._write_response(hooks_proto, result)
72 74
73 75 def _read_request(self):
74 76 length = int(self.headers['Content-Length'])
77 hooks_proto = self.headers.get('rc-hooks-protocol') or 'json.v1'
78 if hooks_proto == 'msgpack.v1':
79 # support for new vcsserver msgpack based protocol hooks
80 data = msgpack.unpackb(self.rfile.read(length), raw=False)
81 else:
75 82 body = self.rfile.read(length).decode('utf-8')
76 83 data = json.loads(body)
77 return data['method'], data['extras']
84
85 return hooks_proto, data['method'], data['extras']
78 86
79 def _write_response(self, result):
87 def _write_response(self, hooks_proto, result):
80 88 self.send_response(200)
89 if hooks_proto == 'msgpack.v1':
90 self.send_header("Content-type", "application/msgpack")
91 self.end_headers()
92 self.wfile.write(msgpack.packb(result))
93 else:
81 94 self.send_header("Content-type", "text/json")
82 95 self.end_headers()
83 96 self.wfile.write(json.dumps(result))
84 97
85 98 def _call_hook_method(self, hooks, method, extras):
86 99 try:
87 100 result = getattr(hooks, method)(extras)
88 101 finally:
89 102 meta.Session.remove()
90 103 return result
91 104
92 105 def log_message(self, format, *args):
93 106 """
94 107 This is an overridden method of BaseHTTPRequestHandler which logs using
95 108 logging library instead of writing directly to stderr.
96 109 """
97 110
98 111 message = format % args
99 112
100 113 log.debug(
101 114 "HOOKS: %s - - [%s] %s", self.client_address,
102 115 self.log_date_time_string(), message)
103 116
104 117
105 118 class DummyHooksCallbackDaemon(object):
106 119 hooks_uri = ''
107 120
108 121 def __init__(self):
109 122 self.hooks_module = Hooks.__module__
110 123
111 124 def __enter__(self):
112 125 log.debug('Running `%s` callback daemon', self.__class__.__name__)
113 126 return self
114 127
115 128 def __exit__(self, exc_type, exc_val, exc_tb):
116 129 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
117 130
118 131
119 132 class ThreadedHookCallbackDaemon(object):
120 133
121 134 _callback_thread = None
122 135 _daemon = None
123 136 _done = False
124 137
125 138 def __init__(self, txn_id=None, host=None, port=None):
126 139 self._prepare(txn_id=txn_id, host=host, port=port)
127 140
128 141 def __enter__(self):
129 142 log.debug('Running `%s` callback daemon', self.__class__.__name__)
130 143 self._run()
131 144 return self
132 145
133 146 def __exit__(self, exc_type, exc_val, exc_tb):
134 147 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
135 148 self._stop()
136 149
137 150 def _prepare(self, txn_id=None, host=None, port=None):
138 151 raise NotImplementedError()
139 152
140 153 def _run(self):
141 154 raise NotImplementedError()
142 155
143 156 def _stop(self):
144 157 raise NotImplementedError()
145 158
146 159
147 160 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
148 161 """
149 162 Context manager which will run a callback daemon in a background thread.
150 163 """
151 164
152 165 hooks_uri = None
153 166
154 167 # From Python docs: Polling reduces our responsiveness to a shutdown
155 168 # request and wastes cpu at all other times.
156 169 POLL_INTERVAL = 0.01
157 170
158 171 @property
159 172 def _hook_prefix(self):
160 173 return 'HOOKS: {} '.format(self.hooks_uri)
161 174
162 175 def get_hostname(self):
163 176 return socket.gethostname() or '127.0.0.1'
164 177
165 178 def get_available_port(self, min_port=20000, max_port=65535):
166 179 from rhodecode.lib.utils2 import get_available_port as _get_port
167 180 return _get_port(min_port, max_port)
168 181
169 182 def _prepare(self, txn_id=None, host=None, port=None):
170 183 from pyramid.threadlocal import get_current_request
171 184
172 185 if not host or host == "*":
173 186 host = self.get_hostname()
174 187 if not port:
175 188 port = self.get_available_port()
176 189
177 190 server_address = (host, port)
178 191 self.hooks_uri = '{}:{}'.format(host, port)
179 192 self.txn_id = txn_id
180 193 self._done = False
181 194
182 195 log.debug(
183 196 "%s Preparing HTTP callback daemon registering hook object: %s",
184 197 self._hook_prefix, HooksHttpHandler)
185 198
186 199 self._daemon = TCPServer(server_address, HooksHttpHandler)
187 200 # inject transaction_id for later verification
188 201 self._daemon.txn_id = self.txn_id
189 202
190 203 # pass the WEB app request into daemon
191 204 self._daemon.request = get_current_request()
192 205
193 206 def _run(self):
194 207 log.debug("Running event loop of callback daemon in background thread")
195 208 callback_thread = threading.Thread(
196 209 target=self._daemon.serve_forever,
197 210 kwargs={'poll_interval': self.POLL_INTERVAL})
198 211 callback_thread.daemon = True
199 212 callback_thread.start()
200 213 self._callback_thread = callback_thread
201 214
202 215 def _stop(self):
203 216 log.debug("Waiting for background thread to finish.")
204 217 self._daemon.shutdown()
205 218 self._callback_thread.join()
206 219 self._daemon = None
207 220 self._callback_thread = None
208 221 if self.txn_id:
209 222 txn_id_file = get_txn_id_data_path(self.txn_id)
210 223 log.debug('Cleaning up TXN ID %s', txn_id_file)
211 224 if os.path.isfile(txn_id_file):
212 225 os.remove(txn_id_file)
213 226
214 227 log.debug("Background thread done.")
215 228
216 229
217 230 def get_txn_id_data_path(txn_id):
218 231 import rhodecode
219 232
220 233 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
221 234 final_dir = os.path.join(root, 'svn_txn_id')
222 235
223 236 if not os.path.isdir(final_dir):
224 237 os.makedirs(final_dir)
225 238 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
226 239
227 240
228 241 def store_txn_id_data(txn_id, data_dict):
229 242 if not txn_id:
230 243 log.warning('Cannot store txn_id because it is empty')
231 244 return
232 245
233 246 path = get_txn_id_data_path(txn_id)
234 247 try:
235 248 with open(path, 'wb') as f:
236 249 f.write(json.dumps(data_dict))
237 250 except Exception:
238 251 log.exception('Failed to write txn_id metadata')
239 252
240 253
241 254 def get_txn_id_from_store(txn_id):
242 255 """
243 256 Reads txn_id from store and if present returns the data for callback manager
244 257 """
245 258 path = get_txn_id_data_path(txn_id)
246 259 try:
247 260 with open(path, 'rb') as f:
248 261 return json.loads(f.read())
249 262 except Exception:
250 263 return {}
251 264
252 265
253 266 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
254 267 txn_details = get_txn_id_from_store(txn_id)
255 268 port = txn_details.get('port', 0)
256 269 if use_direct_calls:
257 270 callback_daemon = DummyHooksCallbackDaemon()
258 271 extras['hooks_module'] = callback_daemon.hooks_module
259 272 else:
260 273 if protocol == 'http':
261 274 callback_daemon = HttpHooksCallbackDaemon(
262 275 txn_id=txn_id, host=host, port=port)
263 276 else:
264 277 log.error('Unsupported callback daemon protocol "%s"', protocol)
265 278 raise Exception('Unsupported callback daemon protocol.')
266 279
267 280 extras['hooks_uri'] = callback_daemon.hooks_uri
268 281 extras['hooks_protocol'] = protocol
269 282 extras['time'] = time.time()
270 283
271 284 # register txn_id
272 285 extras['txn_id'] = txn_id
273 286 log.debug('Prepared a callback daemon: %s at url `%s`',
274 287 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
275 288 return callback_daemon, extras
276 289
277 290
278 291 class Hooks(object):
279 292 """
280 293 Exposes the hooks for remote call backs
281 294 """
282 295 def __init__(self, request=None, log_prefix=''):
283 296 self.log_prefix = log_prefix
284 297 self.request = request
285 298
286 299 def repo_size(self, extras):
287 300 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
288 301 return self._call_hook(hooks_base.repo_size, extras)
289 302
290 303 def pre_pull(self, extras):
291 304 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
292 305 return self._call_hook(hooks_base.pre_pull, extras)
293 306
294 307 def post_pull(self, extras):
295 308 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
296 309 return self._call_hook(hooks_base.post_pull, extras)
297 310
298 311 def pre_push(self, extras):
299 312 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
300 313 return self._call_hook(hooks_base.pre_push, extras)
301 314
302 315 def post_push(self, extras):
303 316 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
304 317 return self._call_hook(hooks_base.post_push, extras)
305 318
306 319 def _call_hook(self, hook, extras):
307 320 extras = AttributeDict(extras)
308 321 server_url = extras['server_url']
309 322
310 323 extras.request = self.request
311 324
312 325 try:
313 326 result = hook(extras)
314 327 if result is None:
315 328 raise Exception(
316 329 'Failed to obtain hook result from func: {}'.format(hook))
317 330 except HTTPBranchProtected as handled_error:
318 331 # Those special cases doesn't need error reporting. It's a case of
319 332 # locked repo or protected branch
320 333 result = AttributeDict({
321 334 'status': handled_error.code,
322 335 'output': handled_error.explanation
323 336 })
324 337 except (HTTPLockedRC, Exception) as error:
325 338 # locked needs different handling since we need to also
326 339 # handle PULL operations
327 340 exc_tb = ''
328 341 if not isinstance(error, HTTPLockedRC):
329 342 exc_tb = traceback.format_exc()
330 343 log.exception('%sException when handling hook %s', self.log_prefix, hook)
331 344 error_args = error.args
332 345 return {
333 346 'status': 128,
334 347 'output': '',
335 348 'exception': type(error).__name__,
336 349 'exception_traceback': exc_tb,
337 350 'exception_args': error_args,
338 351 }
339 352 finally:
340 353 meta.Session.remove()
341 354
342 355 log.debug('%sGot hook call response %s', self.log_prefix, result)
343 356 return {
344 357 'status': result.status,
345 358 'output': result.output,
346 359 }
347 360
348 361 def __enter__(self):
349 362 return self
350 363
351 364 def __exit__(self, exc_type, exc_val, exc_tb):
352 365 pass
General Comments 0
You need to be logged in to leave comments. Login now