##// END OF EJS Templates
hooks: fix logging info on callback daemon
super-admin -
r4854:a2853d3d default
parent child Browse files
Show More
@@ -1,333 +1,334 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27
27
28 from BaseHTTPServer import BaseHTTPRequestHandler
28 from BaseHTTPServer import BaseHTTPRequestHandler
29 from SocketServer import TCPServer
29 from SocketServer import TCPServer
30
30
31 import rhodecode
31 import rhodecode
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 from rhodecode.model import meta
33 from rhodecode.model import meta
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 from rhodecode.lib import hooks_base
35 from rhodecode.lib import hooks_base
36 from rhodecode.lib.utils2 import AttributeDict
36 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.ext_json import json
38 from rhodecode.lib import rc_cache
38 from rhodecode.lib import rc_cache
39
39
40 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
41
41
42
42
43 class HooksHttpHandler(BaseHTTPRequestHandler):
43 class HooksHttpHandler(BaseHTTPRequestHandler):
44
44
45 def do_POST(self):
45 def do_POST(self):
46 method, extras = self._read_request()
46 method, extras = self._read_request()
47 txn_id = getattr(self.server, 'txn_id', None)
47 txn_id = getattr(self.server, 'txn_id', None)
48 if txn_id:
48 if txn_id:
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 extras['repository'], extras['txn_id'])
50 extras['repository'], extras['txn_id'])
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 extras['repository'], extras['txn_id'])
52 extras['repository'], extras['txn_id'])
53 if txn_id != computed_txn_id:
53 if txn_id != computed_txn_id:
54 raise Exception(
54 raise Exception(
55 'TXN ID fail: expected {} got {} instead'.format(
55 'TXN ID fail: expected {} got {} instead'.format(
56 txn_id, computed_txn_id))
56 txn_id, computed_txn_id))
57
57
58 try:
58 try:
59 result = self._call_hook(method, extras)
59 result = self._call_hook(method, extras)
60 except Exception as e:
60 except Exception as e:
61 exc_tb = traceback.format_exc()
61 exc_tb = traceback.format_exc()
62 result = {
62 result = {
63 'exception': e.__class__.__name__,
63 'exception': e.__class__.__name__,
64 'exception_traceback': exc_tb,
64 'exception_traceback': exc_tb,
65 'exception_args': e.args
65 'exception_args': e.args
66 }
66 }
67 self._write_response(result)
67 self._write_response(result)
68
68
69 def _read_request(self):
69 def _read_request(self):
70 length = int(self.headers['Content-Length'])
70 length = int(self.headers['Content-Length'])
71 body = self.rfile.read(length).decode('utf-8')
71 body = self.rfile.read(length).decode('utf-8')
72 data = json.loads(body)
72 data = json.loads(body)
73 return data['method'], data['extras']
73 return data['method'], data['extras']
74
74
75 def _write_response(self, result):
75 def _write_response(self, result):
76 self.send_response(200)
76 self.send_response(200)
77 self.send_header("Content-type", "text/json")
77 self.send_header("Content-type", "text/json")
78 self.end_headers()
78 self.end_headers()
79 self.wfile.write(json.dumps(result))
79 self.wfile.write(json.dumps(result))
80
80
81 def _call_hook(self, method, extras):
81 def _call_hook(self, method, extras):
82 hooks = Hooks()
82 hooks = Hooks()
83 try:
83 try:
84 result = getattr(hooks, method)(extras)
84 result = getattr(hooks, method)(extras)
85 finally:
85 finally:
86 meta.Session.remove()
86 meta.Session.remove()
87 return result
87 return result
88
88
89 def log_message(self, format, *args):
89 def log_message(self, format, *args):
90 """
90 """
91 This is an overridden method of BaseHTTPRequestHandler which logs using
91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 logging library instead of writing directly to stderr.
92 logging library instead of writing directly to stderr.
93 """
93 """
94
94
95 message = format % args
95 message = format % args
96
96
97 log.debug(
97 log.debug(
98 "%s - - [%s] %s", self.client_address[0],
98 "%s - - [%s] %s", self.client_address[0],
99 self.log_date_time_string(), message)
99 self.log_date_time_string(), message)
100
100
101
101
102 class DummyHooksCallbackDaemon(object):
102 class DummyHooksCallbackDaemon(object):
103 hooks_uri = ''
103 hooks_uri = ''
104
104
105 def __init__(self):
105 def __init__(self):
106 self.hooks_module = Hooks.__module__
106 self.hooks_module = Hooks.__module__
107
107
108 def __enter__(self):
108 def __enter__(self):
109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 return self
110 return self
111
111
112 def __exit__(self, exc_type, exc_val, exc_tb):
112 def __exit__(self, exc_type, exc_val, exc_tb):
113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114
114
115
115
116 class ThreadedHookCallbackDaemon(object):
116 class ThreadedHookCallbackDaemon(object):
117
117
118 _callback_thread = None
118 _callback_thread = None
119 _daemon = None
119 _daemon = None
120 _done = False
120 _done = False
121
121
122 def __init__(self, txn_id=None, host=None, port=None):
122 def __init__(self, txn_id=None, host=None, port=None):
123 self._prepare(txn_id=txn_id, host=host, port=port)
123 self._prepare(txn_id=txn_id, host=host, port=port)
124
124
125 def __enter__(self):
125 def __enter__(self):
126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 self._run()
127 self._run()
128 return self
128 return self
129
129
130 def __exit__(self, exc_type, exc_val, exc_tb):
130 def __exit__(self, exc_type, exc_val, exc_tb):
131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 self._stop()
132 self._stop()
133
133
134 def _prepare(self, txn_id=None, host=None, port=None):
134 def _prepare(self, txn_id=None, host=None, port=None):
135 raise NotImplementedError()
135 raise NotImplementedError()
136
136
137 def _run(self):
137 def _run(self):
138 raise NotImplementedError()
138 raise NotImplementedError()
139
139
140 def _stop(self):
140 def _stop(self):
141 raise NotImplementedError()
141 raise NotImplementedError()
142
142
143
143
144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 """
145 """
146 Context manager which will run a callback daemon in a background thread.
146 Context manager which will run a callback daemon in a background thread.
147 """
147 """
148
148
149 hooks_uri = None
149 hooks_uri = None
150
150
151 # From Python docs: Polling reduces our responsiveness to a shutdown
151 # From Python docs: Polling reduces our responsiveness to a shutdown
152 # request and wastes cpu at all other times.
152 # request and wastes cpu at all other times.
153 POLL_INTERVAL = 0.01
153 POLL_INTERVAL = 0.01
154
154
155 def _prepare(self, txn_id=None, host=None, port=None):
155 def _prepare(self, txn_id=None, host=None, port=None):
156
156 host = host or '127.0.0.1'
157 host = host or '127.0.0.1'
157 self._done = False
158 self._done = False
158 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
159 _, port = self._daemon.server_address
160 self.hooks_uri = '{}:{}'.format(host, port)
159 self.hooks_uri = '{}:{}'.format(host, port)
161 self.txn_id = txn_id
160 self.txn_id = txn_id
162 # inject transaction_id for later verification
161 # inject transaction_id for later verification
163 self._daemon.txn_id = self.txn_id
162 self._daemon.txn_id = self.txn_id
164
165 log.debug(
163 log.debug(
166 "Preparing HTTP callback daemon at `%s` and registering hook object",
164 "Preparing HTTP callback daemon at `%s` and registering hook object",
167 self.hooks_uri)
165 self.hooks_uri)
168
166
167 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
168 _, port = self._daemon.server_address
169
169 def _run(self):
170 def _run(self):
170 log.debug("Running event loop of callback daemon in background thread")
171 log.debug("Running event loop of callback daemon in background thread")
171 callback_thread = threading.Thread(
172 callback_thread = threading.Thread(
172 target=self._daemon.serve_forever,
173 target=self._daemon.serve_forever,
173 kwargs={'poll_interval': self.POLL_INTERVAL})
174 kwargs={'poll_interval': self.POLL_INTERVAL})
174 callback_thread.daemon = True
175 callback_thread.daemon = True
175 callback_thread.start()
176 callback_thread.start()
176 self._callback_thread = callback_thread
177 self._callback_thread = callback_thread
177
178
178 def _stop(self):
179 def _stop(self):
179 log.debug("Waiting for background thread to finish.")
180 log.debug("Waiting for background thread to finish.")
180 self._daemon.shutdown()
181 self._daemon.shutdown()
181 self._callback_thread.join()
182 self._callback_thread.join()
182 self._daemon = None
183 self._daemon = None
183 self._callback_thread = None
184 self._callback_thread = None
184 if self.txn_id:
185 if self.txn_id:
185 txn_id_file = get_txn_id_data_path(self.txn_id)
186 txn_id_file = get_txn_id_data_path(self.txn_id)
186 log.debug('Cleaning up TXN ID %s', txn_id_file)
187 log.debug('Cleaning up TXN ID %s', txn_id_file)
187 if os.path.isfile(txn_id_file):
188 if os.path.isfile(txn_id_file):
188 os.remove(txn_id_file)
189 os.remove(txn_id_file)
189
190
190 log.debug("Background thread done.")
191 log.debug("Background thread done.")
191
192
192
193
193 def get_txn_id_data_path(txn_id):
194 def get_txn_id_data_path(txn_id):
194 import rhodecode
195 import rhodecode
195
196
196 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
197 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
197 final_dir = os.path.join(root, 'svn_txn_id')
198 final_dir = os.path.join(root, 'svn_txn_id')
198
199
199 if not os.path.isdir(final_dir):
200 if not os.path.isdir(final_dir):
200 os.makedirs(final_dir)
201 os.makedirs(final_dir)
201 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
202 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
202
203
203
204
204 def store_txn_id_data(txn_id, data_dict):
205 def store_txn_id_data(txn_id, data_dict):
205 if not txn_id:
206 if not txn_id:
206 log.warning('Cannot store txn_id because it is empty')
207 log.warning('Cannot store txn_id because it is empty')
207 return
208 return
208
209
209 path = get_txn_id_data_path(txn_id)
210 path = get_txn_id_data_path(txn_id)
210 try:
211 try:
211 with open(path, 'wb') as f:
212 with open(path, 'wb') as f:
212 f.write(json.dumps(data_dict))
213 f.write(json.dumps(data_dict))
213 except Exception:
214 except Exception:
214 log.exception('Failed to write txn_id metadata')
215 log.exception('Failed to write txn_id metadata')
215
216
216
217
217 def get_txn_id_from_store(txn_id):
218 def get_txn_id_from_store(txn_id):
218 """
219 """
219 Reads txn_id from store and if present returns the data for callback manager
220 Reads txn_id from store and if present returns the data for callback manager
220 """
221 """
221 path = get_txn_id_data_path(txn_id)
222 path = get_txn_id_data_path(txn_id)
222 try:
223 try:
223 with open(path, 'rb') as f:
224 with open(path, 'rb') as f:
224 return json.loads(f.read())
225 return json.loads(f.read())
225 except Exception:
226 except Exception:
226 return {}
227 return {}
227
228
228
229
229 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
230 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
230 txn_details = get_txn_id_from_store(txn_id)
231 txn_details = get_txn_id_from_store(txn_id)
231 port = txn_details.get('port', 0)
232 port = txn_details.get('port', 0)
232 if use_direct_calls:
233 if use_direct_calls:
233 callback_daemon = DummyHooksCallbackDaemon()
234 callback_daemon = DummyHooksCallbackDaemon()
234 extras['hooks_module'] = callback_daemon.hooks_module
235 extras['hooks_module'] = callback_daemon.hooks_module
235 else:
236 else:
236 if protocol == 'http':
237 if protocol == 'http':
237 callback_daemon = HttpHooksCallbackDaemon(
238 callback_daemon = HttpHooksCallbackDaemon(
238 txn_id=txn_id, host=host, port=port)
239 txn_id=txn_id, host=host, port=port)
239 else:
240 else:
240 log.error('Unsupported callback daemon protocol "%s"', protocol)
241 log.error('Unsupported callback daemon protocol "%s"', protocol)
241 raise Exception('Unsupported callback daemon protocol.')
242 raise Exception('Unsupported callback daemon protocol.')
242
243
243 extras['hooks_uri'] = callback_daemon.hooks_uri
244 extras['hooks_uri'] = callback_daemon.hooks_uri
244 extras['hooks_protocol'] = protocol
245 extras['hooks_protocol'] = protocol
245 extras['time'] = time.time()
246 extras['time'] = time.time()
246
247
247 # register txn_id
248 # register txn_id
248 extras['txn_id'] = txn_id
249 extras['txn_id'] = txn_id
249 log.debug('Prepared a callback daemon: %s at url `%s`',
250 log.debug('Prepared a callback daemon: %s at url `%s`',
250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 return callback_daemon, extras
252 return callback_daemon, extras
252
253
253
254
254 class Hooks(object):
255 class Hooks(object):
255 """
256 """
256 Exposes the hooks for remote call backs
257 Exposes the hooks for remote call backs
257 """
258 """
258
259
259 def repo_size(self, extras):
260 def repo_size(self, extras):
260 log.debug("Called repo_size of %s object", self)
261 log.debug("Called repo_size of %s object", self)
261 return self._call_hook(hooks_base.repo_size, extras)
262 return self._call_hook(hooks_base.repo_size, extras)
262
263
263 def pre_pull(self, extras):
264 def pre_pull(self, extras):
264 log.debug("Called pre_pull of %s object", self)
265 log.debug("Called pre_pull of %s object", self)
265 return self._call_hook(hooks_base.pre_pull, extras)
266 return self._call_hook(hooks_base.pre_pull, extras)
266
267
267 def post_pull(self, extras):
268 def post_pull(self, extras):
268 log.debug("Called post_pull of %s object", self)
269 log.debug("Called post_pull of %s object", self)
269 return self._call_hook(hooks_base.post_pull, extras)
270 return self._call_hook(hooks_base.post_pull, extras)
270
271
271 def pre_push(self, extras):
272 def pre_push(self, extras):
272 log.debug("Called pre_push of %s object", self)
273 log.debug("Called pre_push of %s object", self)
273 return self._call_hook(hooks_base.pre_push, extras)
274 return self._call_hook(hooks_base.pre_push, extras)
274
275
275 def post_push(self, extras):
276 def post_push(self, extras):
276 log.debug("Called post_push of %s object", self)
277 log.debug("Called post_push of %s object", self)
277 return self._call_hook(hooks_base.post_push, extras)
278 return self._call_hook(hooks_base.post_push, extras)
278
279
279 def _call_hook(self, hook, extras):
280 def _call_hook(self, hook, extras):
280 extras = AttributeDict(extras)
281 extras = AttributeDict(extras)
281 server_url = extras['server_url']
282 server_url = extras['server_url']
282 request = bootstrap_request(application_url=server_url)
283 request = bootstrap_request(application_url=server_url)
283
284
284 bootstrap_config(request) # inject routes and other interfaces
285 bootstrap_config(request) # inject routes and other interfaces
285
286
286 # inject the user for usage in hooks
287 # inject the user for usage in hooks
287 request.user = AttributeDict({'username': extras.username,
288 request.user = AttributeDict({'username': extras.username,
288 'ip_addr': extras.ip,
289 'ip_addr': extras.ip,
289 'user_id': extras.user_id})
290 'user_id': extras.user_id})
290
291
291 extras.request = request
292 extras.request = request
292
293
293 try:
294 try:
294 result = hook(extras)
295 result = hook(extras)
295 if result is None:
296 if result is None:
296 raise Exception(
297 raise Exception(
297 'Failed to obtain hook result from func: {}'.format(hook))
298 'Failed to obtain hook result from func: {}'.format(hook))
298 except HTTPBranchProtected as handled_error:
299 except HTTPBranchProtected as handled_error:
299 # Those special cases doesn't need error reporting. It's a case of
300 # Those special cases doesn't need error reporting. It's a case of
300 # locked repo or protected branch
301 # locked repo or protected branch
301 result = AttributeDict({
302 result = AttributeDict({
302 'status': handled_error.code,
303 'status': handled_error.code,
303 'output': handled_error.explanation
304 'output': handled_error.explanation
304 })
305 })
305 except (HTTPLockedRC, Exception) as error:
306 except (HTTPLockedRC, Exception) as error:
306 # locked needs different handling since we need to also
307 # locked needs different handling since we need to also
307 # handle PULL operations
308 # handle PULL operations
308 exc_tb = ''
309 exc_tb = ''
309 if not isinstance(error, HTTPLockedRC):
310 if not isinstance(error, HTTPLockedRC):
310 exc_tb = traceback.format_exc()
311 exc_tb = traceback.format_exc()
311 log.exception('Exception when handling hook %s', hook)
312 log.exception('Exception when handling hook %s', hook)
312 error_args = error.args
313 error_args = error.args
313 return {
314 return {
314 'status': 128,
315 'status': 128,
315 'output': '',
316 'output': '',
316 'exception': type(error).__name__,
317 'exception': type(error).__name__,
317 'exception_traceback': exc_tb,
318 'exception_traceback': exc_tb,
318 'exception_args': error_args,
319 'exception_args': error_args,
319 }
320 }
320 finally:
321 finally:
321 meta.Session.remove()
322 meta.Session.remove()
322
323
323 log.debug('Got hook call response %s', result)
324 log.debug('Got hook call response %s', result)
324 return {
325 return {
325 'status': result.status,
326 'status': result.status,
326 'output': result.output,
327 'output': result.output,
327 }
328 }
328
329
329 def __enter__(self):
330 def __enter__(self):
330 return self
331 return self
331
332
332 def __exit__(self, exc_type, exc_val, exc_tb):
333 def __exit__(self, exc_type, exc_val, exc_tb):
333 pass
334 pass
General Comments 0
You need to be logged in to leave comments. Login now