##// END OF EJS Templates
hooks: simplified code
super-admin -
r4856:d9965ed6 default
parent child Browse files
Show More
@@ -1,349 +1,345 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27 import socket
27 import socket
28
28
29 from BaseHTTPServer import BaseHTTPRequestHandler
29 from BaseHTTPServer import BaseHTTPRequestHandler
30 from SocketServer import TCPServer
30 from SocketServer import TCPServer
31
31
32 import rhodecode
32 import rhodecode
33 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
34 from rhodecode.model import meta
34 from rhodecode.model import meta
35 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 from rhodecode.lib.base import bootstrap_request, bootstrap_config
36 from rhodecode.lib import hooks_base
36 from rhodecode.lib import hooks_base
37 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.utils2 import AttributeDict
38 from rhodecode.lib.ext_json import json
38 from rhodecode.lib.ext_json import json
39 from rhodecode.lib import rc_cache
39 from rhodecode.lib import rc_cache
40
40
41 log = logging.getLogger(__name__)
41 log = logging.getLogger(__name__)
42
42
43
43
44 class HooksHttpHandler(BaseHTTPRequestHandler):
44 class HooksHttpHandler(BaseHTTPRequestHandler):
45
45
46 def do_POST(self):
46 def do_POST(self):
47 method, extras = self._read_request()
47 method, extras = self._read_request()
48 txn_id = getattr(self.server, 'txn_id', None)
48 txn_id = getattr(self.server, 'txn_id', None)
49 if txn_id:
49 if txn_id:
50 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 log.debug('Computing TXN_ID based on `%s`:`%s`',
51 extras['repository'], extras['txn_id'])
51 extras['repository'], extras['txn_id'])
52 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 computed_txn_id = rc_cache.utils.compute_key_from_params(
53 extras['repository'], extras['txn_id'])
53 extras['repository'], extras['txn_id'])
54 if txn_id != computed_txn_id:
54 if txn_id != computed_txn_id:
55 raise Exception(
55 raise Exception(
56 'TXN ID fail: expected {} got {} instead'.format(
56 'TXN ID fail: expected {} got {} instead'.format(
57 txn_id, computed_txn_id))
57 txn_id, computed_txn_id))
58
58
59 try:
59 try:
60 result = self._call_hook(method, extras)
60 result = self._call_hook(method, extras)
61 except Exception as e:
61 except Exception as e:
62 exc_tb = traceback.format_exc()
62 exc_tb = traceback.format_exc()
63 result = {
63 result = {
64 'exception': e.__class__.__name__,
64 'exception': e.__class__.__name__,
65 'exception_traceback': exc_tb,
65 'exception_traceback': exc_tb,
66 'exception_args': e.args
66 'exception_args': e.args
67 }
67 }
68 self._write_response(result)
68 self._write_response(result)
69
69
70 def _read_request(self):
70 def _read_request(self):
71 length = int(self.headers['Content-Length'])
71 length = int(self.headers['Content-Length'])
72 body = self.rfile.read(length).decode('utf-8')
72 body = self.rfile.read(length).decode('utf-8')
73 data = json.loads(body)
73 data = json.loads(body)
74 return data['method'], data['extras']
74 return data['method'], data['extras']
75
75
76 def _write_response(self, result):
76 def _write_response(self, result):
77 self.send_response(200)
77 self.send_response(200)
78 self.send_header("Content-type", "text/json")
78 self.send_header("Content-type", "text/json")
79 self.end_headers()
79 self.end_headers()
80 self.wfile.write(json.dumps(result))
80 self.wfile.write(json.dumps(result))
81
81
82 def _call_hook(self, method, extras):
82 def _call_hook(self, method, extras):
83 hooks = Hooks()
83 hooks = Hooks()
84 try:
84 try:
85 result = getattr(hooks, method)(extras)
85 result = getattr(hooks, method)(extras)
86 finally:
86 finally:
87 meta.Session.remove()
87 meta.Session.remove()
88 return result
88 return result
89
89
90 def log_message(self, format, *args):
90 def log_message(self, format, *args):
91 """
91 """
92 This is an overridden method of BaseHTTPRequestHandler which logs using
92 This is an overridden method of BaseHTTPRequestHandler which logs using
93 logging library instead of writing directly to stderr.
93 logging library instead of writing directly to stderr.
94 """
94 """
95
95
96 message = format % args
96 message = format % args
97
97
98 log.debug(
98 log.debug(
99 "%s - - [%s] %s", self.client_address[0],
99 "%s - - [%s] %s", self.client_address[0],
100 self.log_date_time_string(), message)
100 self.log_date_time_string(), message)
101
101
102
102
103 class DummyHooksCallbackDaemon(object):
103 class DummyHooksCallbackDaemon(object):
104 hooks_uri = ''
104 hooks_uri = ''
105
105
106 def __init__(self):
106 def __init__(self):
107 self.hooks_module = Hooks.__module__
107 self.hooks_module = Hooks.__module__
108
108
109 def __enter__(self):
109 def __enter__(self):
110 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 log.debug('Running `%s` callback daemon', self.__class__.__name__)
111 return self
111 return self
112
112
113 def __exit__(self, exc_type, exc_val, exc_tb):
113 def __exit__(self, exc_type, exc_val, exc_tb):
114 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
115
115
116
116
117 class ThreadedHookCallbackDaemon(object):
117 class ThreadedHookCallbackDaemon(object):
118
118
119 _callback_thread = None
119 _callback_thread = None
120 _daemon = None
120 _daemon = None
121 _done = False
121 _done = False
122
122
123 def __init__(self, txn_id=None, host=None, port=None):
123 def __init__(self, txn_id=None, host=None, port=None):
124 self._prepare(txn_id=txn_id, host=host, port=port)
124 self._prepare(txn_id=txn_id, host=host, port=port)
125
125
126 def __enter__(self):
126 def __enter__(self):
127 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 log.debug('Running `%s` callback daemon', self.__class__.__name__)
128 self._run()
128 self._run()
129 return self
129 return self
130
130
131 def __exit__(self, exc_type, exc_val, exc_tb):
131 def __exit__(self, exc_type, exc_val, exc_tb):
132 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
133 self._stop()
133 self._stop()
134
134
135 def _prepare(self, txn_id=None, host=None, port=None):
135 def _prepare(self, txn_id=None, host=None, port=None):
136 raise NotImplementedError()
136 raise NotImplementedError()
137
137
138 def _run(self):
138 def _run(self):
139 raise NotImplementedError()
139 raise NotImplementedError()
140
140
141 def _stop(self):
141 def _stop(self):
142 raise NotImplementedError()
142 raise NotImplementedError()
143
143
144
144
145 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
146 """
146 """
147 Context manager which will run a callback daemon in a background thread.
147 Context manager which will run a callback daemon in a background thread.
148 """
148 """
149
149
150 hooks_uri = None
150 hooks_uri = None
151
151
152 # From Python docs: Polling reduces our responsiveness to a shutdown
152 # From Python docs: Polling reduces our responsiveness to a shutdown
153 # request and wastes cpu at all other times.
153 # request and wastes cpu at all other times.
154 POLL_INTERVAL = 0.01
154 POLL_INTERVAL = 0.01
155
155
156 def get_available_port():
156 def get_available_port(self):
157 family = socket.AF_INET
157 mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
158 socktype = socket.SOCK_STREAM
158 mysocket.bind(('127.0.0.1', 0))
159 host = '127.0.0.1'
160
161 mysocket = socket.socket(family, socktype)
162 mysocket.bind((host, 0))
163 port = mysocket.getsockname()[1]
159 port = mysocket.getsockname()[1]
164 mysocket.close()
160 mysocket.close()
165 del mysocket
161 del mysocket
166 return port
162 return port
167
163
168 def _prepare(self, txn_id=None, host=None, port=None):
164 def _prepare(self, txn_id=None, host=None, port=None):
169
165
170 host = host or '127.0.0.1'
166 host = host or '127.0.0.1'
171 port = port or self.get_available_port()
167 port = port or self.get_available_port()
172 server_address = (host, port)
168 server_address = (host, port)
173 self.hooks_uri = '{}:{}'.format(host, port)
169 self.hooks_uri = '{}:{}'.format(host, port)
174 self.txn_id = txn_id
170 self.txn_id = txn_id
175 self._done = False
171 self._done = False
176
172
177 log.debug(
173 log.debug(
178 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
174 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
179 self.hooks_uri, HooksHttpHandler)
175 self.hooks_uri, HooksHttpHandler)
180
176
181 self._daemon = TCPServer(server_address, HooksHttpHandler)
177 self._daemon = TCPServer(server_address, HooksHttpHandler)
182 # inject transaction_id for later verification
178 # inject transaction_id for later verification
183 self._daemon.txn_id = self.txn_id
179 self._daemon.txn_id = self.txn_id
184
180
185 def _run(self):
181 def _run(self):
186 log.debug("Running event loop of callback daemon in background thread")
182 log.debug("Running event loop of callback daemon in background thread")
187 callback_thread = threading.Thread(
183 callback_thread = threading.Thread(
188 target=self._daemon.serve_forever,
184 target=self._daemon.serve_forever,
189 kwargs={'poll_interval': self.POLL_INTERVAL})
185 kwargs={'poll_interval': self.POLL_INTERVAL})
190 callback_thread.daemon = True
186 callback_thread.daemon = True
191 callback_thread.start()
187 callback_thread.start()
192 self._callback_thread = callback_thread
188 self._callback_thread = callback_thread
193
189
194 def _stop(self):
190 def _stop(self):
195 log.debug("Waiting for background thread to finish.")
191 log.debug("Waiting for background thread to finish.")
196 self._daemon.shutdown()
192 self._daemon.shutdown()
197 self._callback_thread.join()
193 self._callback_thread.join()
198 self._daemon = None
194 self._daemon = None
199 self._callback_thread = None
195 self._callback_thread = None
200 if self.txn_id:
196 if self.txn_id:
201 txn_id_file = get_txn_id_data_path(self.txn_id)
197 txn_id_file = get_txn_id_data_path(self.txn_id)
202 log.debug('Cleaning up TXN ID %s', txn_id_file)
198 log.debug('Cleaning up TXN ID %s', txn_id_file)
203 if os.path.isfile(txn_id_file):
199 if os.path.isfile(txn_id_file):
204 os.remove(txn_id_file)
200 os.remove(txn_id_file)
205
201
206 log.debug("Background thread done.")
202 log.debug("Background thread done.")
207
203
208
204
209 def get_txn_id_data_path(txn_id):
205 def get_txn_id_data_path(txn_id):
210 import rhodecode
206 import rhodecode
211
207
212 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
208 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
213 final_dir = os.path.join(root, 'svn_txn_id')
209 final_dir = os.path.join(root, 'svn_txn_id')
214
210
215 if not os.path.isdir(final_dir):
211 if not os.path.isdir(final_dir):
216 os.makedirs(final_dir)
212 os.makedirs(final_dir)
217 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
213 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
218
214
219
215
220 def store_txn_id_data(txn_id, data_dict):
216 def store_txn_id_data(txn_id, data_dict):
221 if not txn_id:
217 if not txn_id:
222 log.warning('Cannot store txn_id because it is empty')
218 log.warning('Cannot store txn_id because it is empty')
223 return
219 return
224
220
225 path = get_txn_id_data_path(txn_id)
221 path = get_txn_id_data_path(txn_id)
226 try:
222 try:
227 with open(path, 'wb') as f:
223 with open(path, 'wb') as f:
228 f.write(json.dumps(data_dict))
224 f.write(json.dumps(data_dict))
229 except Exception:
225 except Exception:
230 log.exception('Failed to write txn_id metadata')
226 log.exception('Failed to write txn_id metadata')
231
227
232
228
233 def get_txn_id_from_store(txn_id):
229 def get_txn_id_from_store(txn_id):
234 """
230 """
235 Reads txn_id from store and if present returns the data for callback manager
231 Reads txn_id from store and if present returns the data for callback manager
236 """
232 """
237 path = get_txn_id_data_path(txn_id)
233 path = get_txn_id_data_path(txn_id)
238 try:
234 try:
239 with open(path, 'rb') as f:
235 with open(path, 'rb') as f:
240 return json.loads(f.read())
236 return json.loads(f.read())
241 except Exception:
237 except Exception:
242 return {}
238 return {}
243
239
244
240
245 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
241 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
246 txn_details = get_txn_id_from_store(txn_id)
242 txn_details = get_txn_id_from_store(txn_id)
247 port = txn_details.get('port', 0)
243 port = txn_details.get('port', 0)
248 if use_direct_calls:
244 if use_direct_calls:
249 callback_daemon = DummyHooksCallbackDaemon()
245 callback_daemon = DummyHooksCallbackDaemon()
250 extras['hooks_module'] = callback_daemon.hooks_module
246 extras['hooks_module'] = callback_daemon.hooks_module
251 else:
247 else:
252 if protocol == 'http':
248 if protocol == 'http':
253 callback_daemon = HttpHooksCallbackDaemon(
249 callback_daemon = HttpHooksCallbackDaemon(
254 txn_id=txn_id, host=host, port=port)
250 txn_id=txn_id, host=host, port=port)
255 else:
251 else:
256 log.error('Unsupported callback daemon protocol "%s"', protocol)
252 log.error('Unsupported callback daemon protocol "%s"', protocol)
257 raise Exception('Unsupported callback daemon protocol.')
253 raise Exception('Unsupported callback daemon protocol.')
258
254
259 extras['hooks_uri'] = callback_daemon.hooks_uri
255 extras['hooks_uri'] = callback_daemon.hooks_uri
260 extras['hooks_protocol'] = protocol
256 extras['hooks_protocol'] = protocol
261 extras['time'] = time.time()
257 extras['time'] = time.time()
262
258
263 # register txn_id
259 # register txn_id
264 extras['txn_id'] = txn_id
260 extras['txn_id'] = txn_id
265 log.debug('Prepared a callback daemon: %s at url `%s`',
261 log.debug('Prepared a callback daemon: %s at url `%s`',
266 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
262 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
267 return callback_daemon, extras
263 return callback_daemon, extras
268
264
269
265
270 class Hooks(object):
266 class Hooks(object):
271 """
267 """
272 Exposes the hooks for remote call backs
268 Exposes the hooks for remote call backs
273 """
269 """
274
270
275 def repo_size(self, extras):
271 def repo_size(self, extras):
276 log.debug("Called repo_size of %s object", self)
272 log.debug("Called repo_size of %s object", self)
277 return self._call_hook(hooks_base.repo_size, extras)
273 return self._call_hook(hooks_base.repo_size, extras)
278
274
279 def pre_pull(self, extras):
275 def pre_pull(self, extras):
280 log.debug("Called pre_pull of %s object", self)
276 log.debug("Called pre_pull of %s object", self)
281 return self._call_hook(hooks_base.pre_pull, extras)
277 return self._call_hook(hooks_base.pre_pull, extras)
282
278
283 def post_pull(self, extras):
279 def post_pull(self, extras):
284 log.debug("Called post_pull of %s object", self)
280 log.debug("Called post_pull of %s object", self)
285 return self._call_hook(hooks_base.post_pull, extras)
281 return self._call_hook(hooks_base.post_pull, extras)
286
282
287 def pre_push(self, extras):
283 def pre_push(self, extras):
288 log.debug("Called pre_push of %s object", self)
284 log.debug("Called pre_push of %s object", self)
289 return self._call_hook(hooks_base.pre_push, extras)
285 return self._call_hook(hooks_base.pre_push, extras)
290
286
291 def post_push(self, extras):
287 def post_push(self, extras):
292 log.debug("Called post_push of %s object", self)
288 log.debug("Called post_push of %s object", self)
293 return self._call_hook(hooks_base.post_push, extras)
289 return self._call_hook(hooks_base.post_push, extras)
294
290
295 def _call_hook(self, hook, extras):
291 def _call_hook(self, hook, extras):
296 extras = AttributeDict(extras)
292 extras = AttributeDict(extras)
297 server_url = extras['server_url']
293 server_url = extras['server_url']
298 request = bootstrap_request(application_url=server_url)
294 request = bootstrap_request(application_url=server_url)
299
295
300 bootstrap_config(request) # inject routes and other interfaces
296 bootstrap_config(request) # inject routes and other interfaces
301
297
302 # inject the user for usage in hooks
298 # inject the user for usage in hooks
303 request.user = AttributeDict({'username': extras.username,
299 request.user = AttributeDict({'username': extras.username,
304 'ip_addr': extras.ip,
300 'ip_addr': extras.ip,
305 'user_id': extras.user_id})
301 'user_id': extras.user_id})
306
302
307 extras.request = request
303 extras.request = request
308
304
309 try:
305 try:
310 result = hook(extras)
306 result = hook(extras)
311 if result is None:
307 if result is None:
312 raise Exception(
308 raise Exception(
313 'Failed to obtain hook result from func: {}'.format(hook))
309 'Failed to obtain hook result from func: {}'.format(hook))
314 except HTTPBranchProtected as handled_error:
310 except HTTPBranchProtected as handled_error:
315 # Those special cases doesn't need error reporting. It's a case of
311 # Those special cases doesn't need error reporting. It's a case of
316 # locked repo or protected branch
312 # locked repo or protected branch
317 result = AttributeDict({
313 result = AttributeDict({
318 'status': handled_error.code,
314 'status': handled_error.code,
319 'output': handled_error.explanation
315 'output': handled_error.explanation
320 })
316 })
321 except (HTTPLockedRC, Exception) as error:
317 except (HTTPLockedRC, Exception) as error:
322 # locked needs different handling since we need to also
318 # locked needs different handling since we need to also
323 # handle PULL operations
319 # handle PULL operations
324 exc_tb = ''
320 exc_tb = ''
325 if not isinstance(error, HTTPLockedRC):
321 if not isinstance(error, HTTPLockedRC):
326 exc_tb = traceback.format_exc()
322 exc_tb = traceback.format_exc()
327 log.exception('Exception when handling hook %s', hook)
323 log.exception('Exception when handling hook %s', hook)
328 error_args = error.args
324 error_args = error.args
329 return {
325 return {
330 'status': 128,
326 'status': 128,
331 'output': '',
327 'output': '',
332 'exception': type(error).__name__,
328 'exception': type(error).__name__,
333 'exception_traceback': exc_tb,
329 'exception_traceback': exc_tb,
334 'exception_args': error_args,
330 'exception_args': error_args,
335 }
331 }
336 finally:
332 finally:
337 meta.Session.remove()
333 meta.Session.remove()
338
334
339 log.debug('Got hook call response %s', result)
335 log.debug('Got hook call response %s', result)
340 return {
336 return {
341 'status': result.status,
337 'status': result.status,
342 'output': result.output,
338 'output': result.output,
343 }
339 }
344
340
345 def __enter__(self):
341 def __enter__(self):
346 return self
342 return self
347
343
348 def __exit__(self, exc_type, exc_val, exc_tb):
344 def __exit__(self, exc_type, exc_val, exc_tb):
349 pass
345 pass
General Comments 0
You need to be logged in to leave comments. Login now