##// END OF EJS Templates
hooks: support v2 hooks protocol using binary msgpack
super-admin -
r4902:cae6657c default
parent child Browse files
Show More
@@ -1,352 +1,365 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27 import socket
27 import socket
28 import random
28 import msgpack
29
29
30 from BaseHTTPServer import BaseHTTPRequestHandler
30 from BaseHTTPServer import BaseHTTPRequestHandler
31 from SocketServer import TCPServer
31 from SocketServer import TCPServer
32
32
33 import rhodecode
33 import rhodecode
34 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
34 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
35 from rhodecode.model import meta
35 from rhodecode.model import meta
36 from rhodecode.lib.base import bootstrap_request, bootstrap_config
36 from rhodecode.lib.base import bootstrap_request, bootstrap_config
37 from rhodecode.lib import hooks_base
37 from rhodecode.lib import hooks_base
38 from rhodecode.lib.utils2 import AttributeDict
38 from rhodecode.lib.utils2 import AttributeDict
39 from rhodecode.lib.ext_json import json
39 from rhodecode.lib.ext_json import json
40 from rhodecode.lib import rc_cache
40 from rhodecode.lib import rc_cache
41
41
42 log = logging.getLogger(__name__)
42 log = logging.getLogger(__name__)
43
43
44
44
45 class HooksHttpHandler(BaseHTTPRequestHandler):
45 class HooksHttpHandler(BaseHTTPRequestHandler):
46
46
47 def do_POST(self):
47 def do_POST(self):
48 method, extras = self._read_request()
48 hooks_proto, method, extras = self._read_request()
49 log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto)
50
49 txn_id = getattr(self.server, 'txn_id', None)
51 txn_id = getattr(self.server, 'txn_id', None)
50 if txn_id:
52 if txn_id:
51 log.debug('Computing TXN_ID based on `%s`:`%s`',
53 log.debug('Computing TXN_ID based on `%s`:`%s`',
52 extras['repository'], extras['txn_id'])
54 extras['repository'], extras['txn_id'])
53 computed_txn_id = rc_cache.utils.compute_key_from_params(
55 computed_txn_id = rc_cache.utils.compute_key_from_params(
54 extras['repository'], extras['txn_id'])
56 extras['repository'], extras['txn_id'])
55 if txn_id != computed_txn_id:
57 if txn_id != computed_txn_id:
56 raise Exception(
58 raise Exception(
57 'TXN ID fail: expected {} got {} instead'.format(
59 'TXN ID fail: expected {} got {} instead'.format(
58 txn_id, computed_txn_id))
60 txn_id, computed_txn_id))
59
61
60 request = getattr(self.server, 'request', None)
62 request = getattr(self.server, 'request', None)
61 try:
63 try:
62 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
64 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
63 result = self._call_hook_method(hooks, method, extras)
65 result = self._call_hook_method(hooks, method, extras)
64 except Exception as e:
66 except Exception as e:
65 exc_tb = traceback.format_exc()
67 exc_tb = traceback.format_exc()
66 result = {
68 result = {
67 'exception': e.__class__.__name__,
69 'exception': e.__class__.__name__,
68 'exception_traceback': exc_tb,
70 'exception_traceback': exc_tb,
69 'exception_args': e.args
71 'exception_args': e.args
70 }
72 }
71 self._write_response(result)
73 self._write_response(hooks_proto, result)
72
74
73 def _read_request(self):
75 def _read_request(self):
74 length = int(self.headers['Content-Length'])
76 length = int(self.headers['Content-Length'])
75 body = self.rfile.read(length).decode('utf-8')
77 hooks_proto = self.headers.get('rc-hooks-protocol') or 'json.v1'
76 data = json.loads(body)
78 if hooks_proto == 'msgpack.v1':
77 return data['method'], data['extras']
79 # support for new vcsserver msgpack based protocol hooks
80 data = msgpack.unpackb(self.rfile.read(length), raw=False)
81 else:
82 body = self.rfile.read(length).decode('utf-8')
83 data = json.loads(body)
84
85 return hooks_proto, data['method'], data['extras']
78
86
79 def _write_response(self, result):
87 def _write_response(self, hooks_proto, result):
80 self.send_response(200)
88 self.send_response(200)
81 self.send_header("Content-type", "text/json")
89 if hooks_proto == 'msgpack.v1':
82 self.end_headers()
90 self.send_header("Content-type", "application/msgpack")
83 self.wfile.write(json.dumps(result))
91 self.end_headers()
92 self.wfile.write(msgpack.packb(result))
93 else:
94 self.send_header("Content-type", "text/json")
95 self.end_headers()
96 self.wfile.write(json.dumps(result))
84
97
85 def _call_hook_method(self, hooks, method, extras):
98 def _call_hook_method(self, hooks, method, extras):
86 try:
99 try:
87 result = getattr(hooks, method)(extras)
100 result = getattr(hooks, method)(extras)
88 finally:
101 finally:
89 meta.Session.remove()
102 meta.Session.remove()
90 return result
103 return result
91
104
92 def log_message(self, format, *args):
105 def log_message(self, format, *args):
93 """
106 """
94 This is an overridden method of BaseHTTPRequestHandler which logs using
107 This is an overridden method of BaseHTTPRequestHandler which logs using
95 logging library instead of writing directly to stderr.
108 logging library instead of writing directly to stderr.
96 """
109 """
97
110
98 message = format % args
111 message = format % args
99
112
100 log.debug(
113 log.debug(
101 "HOOKS: %s - - [%s] %s", self.client_address,
114 "HOOKS: %s - - [%s] %s", self.client_address,
102 self.log_date_time_string(), message)
115 self.log_date_time_string(), message)
103
116
104
117
105 class DummyHooksCallbackDaemon(object):
118 class DummyHooksCallbackDaemon(object):
106 hooks_uri = ''
119 hooks_uri = ''
107
120
108 def __init__(self):
121 def __init__(self):
109 self.hooks_module = Hooks.__module__
122 self.hooks_module = Hooks.__module__
110
123
111 def __enter__(self):
124 def __enter__(self):
112 log.debug('Running `%s` callback daemon', self.__class__.__name__)
125 log.debug('Running `%s` callback daemon', self.__class__.__name__)
113 return self
126 return self
114
127
115 def __exit__(self, exc_type, exc_val, exc_tb):
128 def __exit__(self, exc_type, exc_val, exc_tb):
116 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
129 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
117
130
118
131
119 class ThreadedHookCallbackDaemon(object):
132 class ThreadedHookCallbackDaemon(object):
120
133
121 _callback_thread = None
134 _callback_thread = None
122 _daemon = None
135 _daemon = None
123 _done = False
136 _done = False
124
137
125 def __init__(self, txn_id=None, host=None, port=None):
138 def __init__(self, txn_id=None, host=None, port=None):
126 self._prepare(txn_id=txn_id, host=host, port=port)
139 self._prepare(txn_id=txn_id, host=host, port=port)
127
140
128 def __enter__(self):
141 def __enter__(self):
129 log.debug('Running `%s` callback daemon', self.__class__.__name__)
142 log.debug('Running `%s` callback daemon', self.__class__.__name__)
130 self._run()
143 self._run()
131 return self
144 return self
132
145
133 def __exit__(self, exc_type, exc_val, exc_tb):
146 def __exit__(self, exc_type, exc_val, exc_tb):
134 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
147 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
135 self._stop()
148 self._stop()
136
149
137 def _prepare(self, txn_id=None, host=None, port=None):
150 def _prepare(self, txn_id=None, host=None, port=None):
138 raise NotImplementedError()
151 raise NotImplementedError()
139
152
140 def _run(self):
153 def _run(self):
141 raise NotImplementedError()
154 raise NotImplementedError()
142
155
143 def _stop(self):
156 def _stop(self):
144 raise NotImplementedError()
157 raise NotImplementedError()
145
158
146
159
147 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
160 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
148 """
161 """
149 Context manager which will run a callback daemon in a background thread.
162 Context manager which will run a callback daemon in a background thread.
150 """
163 """
151
164
152 hooks_uri = None
165 hooks_uri = None
153
166
154 # From Python docs: Polling reduces our responsiveness to a shutdown
167 # From Python docs: Polling reduces our responsiveness to a shutdown
155 # request and wastes cpu at all other times.
168 # request and wastes cpu at all other times.
156 POLL_INTERVAL = 0.01
169 POLL_INTERVAL = 0.01
157
170
158 @property
171 @property
159 def _hook_prefix(self):
172 def _hook_prefix(self):
160 return 'HOOKS: {} '.format(self.hooks_uri)
173 return 'HOOKS: {} '.format(self.hooks_uri)
161
174
162 def get_hostname(self):
175 def get_hostname(self):
163 return socket.gethostname() or '127.0.0.1'
176 return socket.gethostname() or '127.0.0.1'
164
177
165 def get_available_port(self, min_port=20000, max_port=65535):
178 def get_available_port(self, min_port=20000, max_port=65535):
166 from rhodecode.lib.utils2 import get_available_port as _get_port
179 from rhodecode.lib.utils2 import get_available_port as _get_port
167 return _get_port(min_port, max_port)
180 return _get_port(min_port, max_port)
168
181
169 def _prepare(self, txn_id=None, host=None, port=None):
182 def _prepare(self, txn_id=None, host=None, port=None):
170 from pyramid.threadlocal import get_current_request
183 from pyramid.threadlocal import get_current_request
171
184
172 if not host or host == "*":
185 if not host or host == "*":
173 host = self.get_hostname()
186 host = self.get_hostname()
174 if not port:
187 if not port:
175 port = self.get_available_port()
188 port = self.get_available_port()
176
189
177 server_address = (host, port)
190 server_address = (host, port)
178 self.hooks_uri = '{}:{}'.format(host, port)
191 self.hooks_uri = '{}:{}'.format(host, port)
179 self.txn_id = txn_id
192 self.txn_id = txn_id
180 self._done = False
193 self._done = False
181
194
182 log.debug(
195 log.debug(
183 "%s Preparing HTTP callback daemon registering hook object: %s",
196 "%s Preparing HTTP callback daemon registering hook object: %s",
184 self._hook_prefix, HooksHttpHandler)
197 self._hook_prefix, HooksHttpHandler)
185
198
186 self._daemon = TCPServer(server_address, HooksHttpHandler)
199 self._daemon = TCPServer(server_address, HooksHttpHandler)
187 # inject transaction_id for later verification
200 # inject transaction_id for later verification
188 self._daemon.txn_id = self.txn_id
201 self._daemon.txn_id = self.txn_id
189
202
190 # pass the WEB app request into daemon
203 # pass the WEB app request into daemon
191 self._daemon.request = get_current_request()
204 self._daemon.request = get_current_request()
192
205
193 def _run(self):
206 def _run(self):
194 log.debug("Running event loop of callback daemon in background thread")
207 log.debug("Running event loop of callback daemon in background thread")
195 callback_thread = threading.Thread(
208 callback_thread = threading.Thread(
196 target=self._daemon.serve_forever,
209 target=self._daemon.serve_forever,
197 kwargs={'poll_interval': self.POLL_INTERVAL})
210 kwargs={'poll_interval': self.POLL_INTERVAL})
198 callback_thread.daemon = True
211 callback_thread.daemon = True
199 callback_thread.start()
212 callback_thread.start()
200 self._callback_thread = callback_thread
213 self._callback_thread = callback_thread
201
214
202 def _stop(self):
215 def _stop(self):
203 log.debug("Waiting for background thread to finish.")
216 log.debug("Waiting for background thread to finish.")
204 self._daemon.shutdown()
217 self._daemon.shutdown()
205 self._callback_thread.join()
218 self._callback_thread.join()
206 self._daemon = None
219 self._daemon = None
207 self._callback_thread = None
220 self._callback_thread = None
208 if self.txn_id:
221 if self.txn_id:
209 txn_id_file = get_txn_id_data_path(self.txn_id)
222 txn_id_file = get_txn_id_data_path(self.txn_id)
210 log.debug('Cleaning up TXN ID %s', txn_id_file)
223 log.debug('Cleaning up TXN ID %s', txn_id_file)
211 if os.path.isfile(txn_id_file):
224 if os.path.isfile(txn_id_file):
212 os.remove(txn_id_file)
225 os.remove(txn_id_file)
213
226
214 log.debug("Background thread done.")
227 log.debug("Background thread done.")
215
228
216
229
217 def get_txn_id_data_path(txn_id):
230 def get_txn_id_data_path(txn_id):
218 import rhodecode
231 import rhodecode
219
232
220 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
233 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
221 final_dir = os.path.join(root, 'svn_txn_id')
234 final_dir = os.path.join(root, 'svn_txn_id')
222
235
223 if not os.path.isdir(final_dir):
236 if not os.path.isdir(final_dir):
224 os.makedirs(final_dir)
237 os.makedirs(final_dir)
225 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
238 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
226
239
227
240
228 def store_txn_id_data(txn_id, data_dict):
241 def store_txn_id_data(txn_id, data_dict):
229 if not txn_id:
242 if not txn_id:
230 log.warning('Cannot store txn_id because it is empty')
243 log.warning('Cannot store txn_id because it is empty')
231 return
244 return
232
245
233 path = get_txn_id_data_path(txn_id)
246 path = get_txn_id_data_path(txn_id)
234 try:
247 try:
235 with open(path, 'wb') as f:
248 with open(path, 'wb') as f:
236 f.write(json.dumps(data_dict))
249 f.write(json.dumps(data_dict))
237 except Exception:
250 except Exception:
238 log.exception('Failed to write txn_id metadata')
251 log.exception('Failed to write txn_id metadata')
239
252
240
253
241 def get_txn_id_from_store(txn_id):
254 def get_txn_id_from_store(txn_id):
242 """
255 """
243 Reads txn_id from store and if present returns the data for callback manager
256 Reads txn_id from store and if present returns the data for callback manager
244 """
257 """
245 path = get_txn_id_data_path(txn_id)
258 path = get_txn_id_data_path(txn_id)
246 try:
259 try:
247 with open(path, 'rb') as f:
260 with open(path, 'rb') as f:
248 return json.loads(f.read())
261 return json.loads(f.read())
249 except Exception:
262 except Exception:
250 return {}
263 return {}
251
264
252
265
253 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
266 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
254 txn_details = get_txn_id_from_store(txn_id)
267 txn_details = get_txn_id_from_store(txn_id)
255 port = txn_details.get('port', 0)
268 port = txn_details.get('port', 0)
256 if use_direct_calls:
269 if use_direct_calls:
257 callback_daemon = DummyHooksCallbackDaemon()
270 callback_daemon = DummyHooksCallbackDaemon()
258 extras['hooks_module'] = callback_daemon.hooks_module
271 extras['hooks_module'] = callback_daemon.hooks_module
259 else:
272 else:
260 if protocol == 'http':
273 if protocol == 'http':
261 callback_daemon = HttpHooksCallbackDaemon(
274 callback_daemon = HttpHooksCallbackDaemon(
262 txn_id=txn_id, host=host, port=port)
275 txn_id=txn_id, host=host, port=port)
263 else:
276 else:
264 log.error('Unsupported callback daemon protocol "%s"', protocol)
277 log.error('Unsupported callback daemon protocol "%s"', protocol)
265 raise Exception('Unsupported callback daemon protocol.')
278 raise Exception('Unsupported callback daemon protocol.')
266
279
267 extras['hooks_uri'] = callback_daemon.hooks_uri
280 extras['hooks_uri'] = callback_daemon.hooks_uri
268 extras['hooks_protocol'] = protocol
281 extras['hooks_protocol'] = protocol
269 extras['time'] = time.time()
282 extras['time'] = time.time()
270
283
271 # register txn_id
284 # register txn_id
272 extras['txn_id'] = txn_id
285 extras['txn_id'] = txn_id
273 log.debug('Prepared a callback daemon: %s at url `%s`',
286 log.debug('Prepared a callback daemon: %s at url `%s`',
274 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
287 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
275 return callback_daemon, extras
288 return callback_daemon, extras
276
289
277
290
278 class Hooks(object):
291 class Hooks(object):
279 """
292 """
280 Exposes the hooks for remote call backs
293 Exposes the hooks for remote call backs
281 """
294 """
282 def __init__(self, request=None, log_prefix=''):
295 def __init__(self, request=None, log_prefix=''):
283 self.log_prefix = log_prefix
296 self.log_prefix = log_prefix
284 self.request = request
297 self.request = request
285
298
286 def repo_size(self, extras):
299 def repo_size(self, extras):
287 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
300 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
288 return self._call_hook(hooks_base.repo_size, extras)
301 return self._call_hook(hooks_base.repo_size, extras)
289
302
290 def pre_pull(self, extras):
303 def pre_pull(self, extras):
291 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
304 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
292 return self._call_hook(hooks_base.pre_pull, extras)
305 return self._call_hook(hooks_base.pre_pull, extras)
293
306
294 def post_pull(self, extras):
307 def post_pull(self, extras):
295 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
308 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
296 return self._call_hook(hooks_base.post_pull, extras)
309 return self._call_hook(hooks_base.post_pull, extras)
297
310
298 def pre_push(self, extras):
311 def pre_push(self, extras):
299 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
312 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
300 return self._call_hook(hooks_base.pre_push, extras)
313 return self._call_hook(hooks_base.pre_push, extras)
301
314
302 def post_push(self, extras):
315 def post_push(self, extras):
303 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
316 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
304 return self._call_hook(hooks_base.post_push, extras)
317 return self._call_hook(hooks_base.post_push, extras)
305
318
306 def _call_hook(self, hook, extras):
319 def _call_hook(self, hook, extras):
307 extras = AttributeDict(extras)
320 extras = AttributeDict(extras)
308 server_url = extras['server_url']
321 server_url = extras['server_url']
309
322
310 extras.request = self.request
323 extras.request = self.request
311
324
312 try:
325 try:
313 result = hook(extras)
326 result = hook(extras)
314 if result is None:
327 if result is None:
315 raise Exception(
328 raise Exception(
316 'Failed to obtain hook result from func: {}'.format(hook))
329 'Failed to obtain hook result from func: {}'.format(hook))
317 except HTTPBranchProtected as handled_error:
330 except HTTPBranchProtected as handled_error:
318 # Those special cases doesn't need error reporting. It's a case of
331 # Those special cases doesn't need error reporting. It's a case of
319 # locked repo or protected branch
332 # locked repo or protected branch
320 result = AttributeDict({
333 result = AttributeDict({
321 'status': handled_error.code,
334 'status': handled_error.code,
322 'output': handled_error.explanation
335 'output': handled_error.explanation
323 })
336 })
324 except (HTTPLockedRC, Exception) as error:
337 except (HTTPLockedRC, Exception) as error:
325 # locked needs different handling since we need to also
338 # locked needs different handling since we need to also
326 # handle PULL operations
339 # handle PULL operations
327 exc_tb = ''
340 exc_tb = ''
328 if not isinstance(error, HTTPLockedRC):
341 if not isinstance(error, HTTPLockedRC):
329 exc_tb = traceback.format_exc()
342 exc_tb = traceback.format_exc()
330 log.exception('%sException when handling hook %s', self.log_prefix, hook)
343 log.exception('%sException when handling hook %s', self.log_prefix, hook)
331 error_args = error.args
344 error_args = error.args
332 return {
345 return {
333 'status': 128,
346 'status': 128,
334 'output': '',
347 'output': '',
335 'exception': type(error).__name__,
348 'exception': type(error).__name__,
336 'exception_traceback': exc_tb,
349 'exception_traceback': exc_tb,
337 'exception_args': error_args,
350 'exception_args': error_args,
338 }
351 }
339 finally:
352 finally:
340 meta.Session.remove()
353 meta.Session.remove()
341
354
342 log.debug('%sGot hook call response %s', self.log_prefix, result)
355 log.debug('%sGot hook call response %s', self.log_prefix, result)
343 return {
356 return {
344 'status': result.status,
357 'status': result.status,
345 'output': result.output,
358 'output': result.output,
346 }
359 }
347
360
348 def __enter__(self):
361 def __enter__(self):
349 return self
362 return self
350
363
351 def __exit__(self, exc_type, exc_val, exc_tb):
364 def __exit__(self, exc_type, exc_val, exc_tb):
352 pass
365 pass
General Comments 0
You need to be logged in to leave comments. Login now