##// END OF EJS Templates
hooks-daemon: fixed problem with lost hooks value from .ini file.
milka -
r4619:5dcbb0fe default
parent child Browse files
Show More
@@ -1,333 +1,333 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import os
21 import os
22 import time
22 import time
23 import logging
23 import logging
24 import tempfile
24 import tempfile
25 import traceback
25 import traceback
26 import threading
26 import threading
27
27
28 from BaseHTTPServer import BaseHTTPRequestHandler
28 from BaseHTTPServer import BaseHTTPRequestHandler
29 from SocketServer import TCPServer
29 from SocketServer import TCPServer
30
30
31 import rhodecode
31 import rhodecode
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 from rhodecode.model import meta
33 from rhodecode.model import meta
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 from rhodecode.lib import hooks_base
35 from rhodecode.lib import hooks_base
36 from rhodecode.lib.utils2 import AttributeDict
36 from rhodecode.lib.utils2 import AttributeDict
37 from rhodecode.lib.ext_json import json
37 from rhodecode.lib.ext_json import json
38 from rhodecode.lib import rc_cache
38 from rhodecode.lib import rc_cache
39
39
40 log = logging.getLogger(__name__)
40 log = logging.getLogger(__name__)
41
41
42
42
43 class HooksHttpHandler(BaseHTTPRequestHandler):
43 class HooksHttpHandler(BaseHTTPRequestHandler):
44
44
45 def do_POST(self):
45 def do_POST(self):
46 method, extras = self._read_request()
46 method, extras = self._read_request()
47 txn_id = getattr(self.server, 'txn_id', None)
47 txn_id = getattr(self.server, 'txn_id', None)
48 if txn_id:
48 if txn_id:
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 extras['repository'], extras['txn_id'])
50 extras['repository'], extras['txn_id'])
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 extras['repository'], extras['txn_id'])
52 extras['repository'], extras['txn_id'])
53 if txn_id != computed_txn_id:
53 if txn_id != computed_txn_id:
54 raise Exception(
54 raise Exception(
55 'TXN ID fail: expected {} got {} instead'.format(
55 'TXN ID fail: expected {} got {} instead'.format(
56 txn_id, computed_txn_id))
56 txn_id, computed_txn_id))
57
57
58 try:
58 try:
59 result = self._call_hook(method, extras)
59 result = self._call_hook(method, extras)
60 except Exception as e:
60 except Exception as e:
61 exc_tb = traceback.format_exc()
61 exc_tb = traceback.format_exc()
62 result = {
62 result = {
63 'exception': e.__class__.__name__,
63 'exception': e.__class__.__name__,
64 'exception_traceback': exc_tb,
64 'exception_traceback': exc_tb,
65 'exception_args': e.args
65 'exception_args': e.args
66 }
66 }
67 self._write_response(result)
67 self._write_response(result)
68
68
69 def _read_request(self):
69 def _read_request(self):
70 length = int(self.headers['Content-Length'])
70 length = int(self.headers['Content-Length'])
71 body = self.rfile.read(length).decode('utf-8')
71 body = self.rfile.read(length).decode('utf-8')
72 data = json.loads(body)
72 data = json.loads(body)
73 return data['method'], data['extras']
73 return data['method'], data['extras']
74
74
75 def _write_response(self, result):
75 def _write_response(self, result):
76 self.send_response(200)
76 self.send_response(200)
77 self.send_header("Content-type", "text/json")
77 self.send_header("Content-type", "text/json")
78 self.end_headers()
78 self.end_headers()
79 self.wfile.write(json.dumps(result))
79 self.wfile.write(json.dumps(result))
80
80
81 def _call_hook(self, method, extras):
81 def _call_hook(self, method, extras):
82 hooks = Hooks()
82 hooks = Hooks()
83 try:
83 try:
84 result = getattr(hooks, method)(extras)
84 result = getattr(hooks, method)(extras)
85 finally:
85 finally:
86 meta.Session.remove()
86 meta.Session.remove()
87 return result
87 return result
88
88
89 def log_message(self, format, *args):
89 def log_message(self, format, *args):
90 """
90 """
91 This is an overridden method of BaseHTTPRequestHandler which logs using
91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 logging library instead of writing directly to stderr.
92 logging library instead of writing directly to stderr.
93 """
93 """
94
94
95 message = format % args
95 message = format % args
96
96
97 log.debug(
97 log.debug(
98 "%s - - [%s] %s", self.client_address[0],
98 "%s - - [%s] %s", self.client_address[0],
99 self.log_date_time_string(), message)
99 self.log_date_time_string(), message)
100
100
101
101
102 class DummyHooksCallbackDaemon(object):
102 class DummyHooksCallbackDaemon(object):
103 hooks_uri = ''
103 hooks_uri = ''
104
104
105 def __init__(self):
105 def __init__(self):
106 self.hooks_module = Hooks.__module__
106 self.hooks_module = Hooks.__module__
107
107
108 def __enter__(self):
108 def __enter__(self):
109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 return self
110 return self
111
111
112 def __exit__(self, exc_type, exc_val, exc_tb):
112 def __exit__(self, exc_type, exc_val, exc_tb):
113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114
114
115
115
116 class ThreadedHookCallbackDaemon(object):
116 class ThreadedHookCallbackDaemon(object):
117
117
118 _callback_thread = None
118 _callback_thread = None
119 _daemon = None
119 _daemon = None
120 _done = False
120 _done = False
121
121
122 def __init__(self, txn_id=None, host=None, port=None):
122 def __init__(self, txn_id=None, host=None, port=None):
123 self._prepare(txn_id=txn_id, host=None, port=port)
123 self._prepare(txn_id=txn_id, host=host, port=port)
124
124
125 def __enter__(self):
125 def __enter__(self):
126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 self._run()
127 self._run()
128 return self
128 return self
129
129
130 def __exit__(self, exc_type, exc_val, exc_tb):
130 def __exit__(self, exc_type, exc_val, exc_tb):
131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 self._stop()
132 self._stop()
133
133
134 def _prepare(self, txn_id=None, host=None, port=None):
134 def _prepare(self, txn_id=None, host=None, port=None):
135 raise NotImplementedError()
135 raise NotImplementedError()
136
136
137 def _run(self):
137 def _run(self):
138 raise NotImplementedError()
138 raise NotImplementedError()
139
139
140 def _stop(self):
140 def _stop(self):
141 raise NotImplementedError()
141 raise NotImplementedError()
142
142
143
143
144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 """
145 """
146 Context manager which will run a callback daemon in a background thread.
146 Context manager which will run a callback daemon in a background thread.
147 """
147 """
148
148
149 hooks_uri = None
149 hooks_uri = None
150
150
151 # From Python docs: Polling reduces our responsiveness to a shutdown
151 # From Python docs: Polling reduces our responsiveness to a shutdown
152 # request and wastes cpu at all other times.
152 # request and wastes cpu at all other times.
153 POLL_INTERVAL = 0.01
153 POLL_INTERVAL = 0.01
154
154
155 def _prepare(self, txn_id=None, host=None, port=None):
155 def _prepare(self, txn_id=None, host=None, port=None):
156 host = host or '127.0.0.1'
156 host = host or '127.0.0.1'
157 self._done = False
157 self._done = False
158 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
158 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
159 _, port = self._daemon.server_address
159 _, port = self._daemon.server_address
160 self.hooks_uri = '{}:{}'.format(host, port)
160 self.hooks_uri = '{}:{}'.format(host, port)
161 self.txn_id = txn_id
161 self.txn_id = txn_id
162 # inject transaction_id for later verification
162 # inject transaction_id for later verification
163 self._daemon.txn_id = self.txn_id
163 self._daemon.txn_id = self.txn_id
164
164
165 log.debug(
165 log.debug(
166 "Preparing HTTP callback daemon at `%s` and registering hook object",
166 "Preparing HTTP callback daemon at `%s` and registering hook object",
167 self.hooks_uri)
167 self.hooks_uri)
168
168
169 def _run(self):
169 def _run(self):
170 log.debug("Running event loop of callback daemon in background thread")
170 log.debug("Running event loop of callback daemon in background thread")
171 callback_thread = threading.Thread(
171 callback_thread = threading.Thread(
172 target=self._daemon.serve_forever,
172 target=self._daemon.serve_forever,
173 kwargs={'poll_interval': self.POLL_INTERVAL})
173 kwargs={'poll_interval': self.POLL_INTERVAL})
174 callback_thread.daemon = True
174 callback_thread.daemon = True
175 callback_thread.start()
175 callback_thread.start()
176 self._callback_thread = callback_thread
176 self._callback_thread = callback_thread
177
177
178 def _stop(self):
178 def _stop(self):
179 log.debug("Waiting for background thread to finish.")
179 log.debug("Waiting for background thread to finish.")
180 self._daemon.shutdown()
180 self._daemon.shutdown()
181 self._callback_thread.join()
181 self._callback_thread.join()
182 self._daemon = None
182 self._daemon = None
183 self._callback_thread = None
183 self._callback_thread = None
184 if self.txn_id:
184 if self.txn_id:
185 txn_id_file = get_txn_id_data_path(self.txn_id)
185 txn_id_file = get_txn_id_data_path(self.txn_id)
186 log.debug('Cleaning up TXN ID %s', txn_id_file)
186 log.debug('Cleaning up TXN ID %s', txn_id_file)
187 if os.path.isfile(txn_id_file):
187 if os.path.isfile(txn_id_file):
188 os.remove(txn_id_file)
188 os.remove(txn_id_file)
189
189
190 log.debug("Background thread done.")
190 log.debug("Background thread done.")
191
191
192
192
193 def get_txn_id_data_path(txn_id):
193 def get_txn_id_data_path(txn_id):
194 import rhodecode
194 import rhodecode
195
195
196 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
196 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
197 final_dir = os.path.join(root, 'svn_txn_id')
197 final_dir = os.path.join(root, 'svn_txn_id')
198
198
199 if not os.path.isdir(final_dir):
199 if not os.path.isdir(final_dir):
200 os.makedirs(final_dir)
200 os.makedirs(final_dir)
201 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
201 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
202
202
203
203
204 def store_txn_id_data(txn_id, data_dict):
204 def store_txn_id_data(txn_id, data_dict):
205 if not txn_id:
205 if not txn_id:
206 log.warning('Cannot store txn_id because it is empty')
206 log.warning('Cannot store txn_id because it is empty')
207 return
207 return
208
208
209 path = get_txn_id_data_path(txn_id)
209 path = get_txn_id_data_path(txn_id)
210 try:
210 try:
211 with open(path, 'wb') as f:
211 with open(path, 'wb') as f:
212 f.write(json.dumps(data_dict))
212 f.write(json.dumps(data_dict))
213 except Exception:
213 except Exception:
214 log.exception('Failed to write txn_id metadata')
214 log.exception('Failed to write txn_id metadata')
215
215
216
216
217 def get_txn_id_from_store(txn_id):
217 def get_txn_id_from_store(txn_id):
218 """
218 """
219 Reads txn_id from store and if present returns the data for callback manager
219 Reads txn_id from store and if present returns the data for callback manager
220 """
220 """
221 path = get_txn_id_data_path(txn_id)
221 path = get_txn_id_data_path(txn_id)
222 try:
222 try:
223 with open(path, 'rb') as f:
223 with open(path, 'rb') as f:
224 return json.loads(f.read())
224 return json.loads(f.read())
225 except Exception:
225 except Exception:
226 return {}
226 return {}
227
227
228
228
229 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
229 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
230 txn_details = get_txn_id_from_store(txn_id)
230 txn_details = get_txn_id_from_store(txn_id)
231 port = txn_details.get('port', 0)
231 port = txn_details.get('port', 0)
232 if use_direct_calls:
232 if use_direct_calls:
233 callback_daemon = DummyHooksCallbackDaemon()
233 callback_daemon = DummyHooksCallbackDaemon()
234 extras['hooks_module'] = callback_daemon.hooks_module
234 extras['hooks_module'] = callback_daemon.hooks_module
235 else:
235 else:
236 if protocol == 'http':
236 if protocol == 'http':
237 callback_daemon = HttpHooksCallbackDaemon(
237 callback_daemon = HttpHooksCallbackDaemon(
238 txn_id=txn_id, host=host, port=port)
238 txn_id=txn_id, host=host, port=port)
239 else:
239 else:
240 log.error('Unsupported callback daemon protocol "%s"', protocol)
240 log.error('Unsupported callback daemon protocol "%s"', protocol)
241 raise Exception('Unsupported callback daemon protocol.')
241 raise Exception('Unsupported callback daemon protocol.')
242
242
243 extras['hooks_uri'] = callback_daemon.hooks_uri
243 extras['hooks_uri'] = callback_daemon.hooks_uri
244 extras['hooks_protocol'] = protocol
244 extras['hooks_protocol'] = protocol
245 extras['time'] = time.time()
245 extras['time'] = time.time()
246
246
247 # register txn_id
247 # register txn_id
248 extras['txn_id'] = txn_id
248 extras['txn_id'] = txn_id
249 log.debug('Prepared a callback daemon: %s at url `%s`',
249 log.debug('Prepared a callback daemon: %s at url `%s`',
250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 return callback_daemon, extras
251 return callback_daemon, extras
252
252
253
253
254 class Hooks(object):
254 class Hooks(object):
255 """
255 """
256 Exposes the hooks for remote call backs
256 Exposes the hooks for remote call backs
257 """
257 """
258
258
259 def repo_size(self, extras):
259 def repo_size(self, extras):
260 log.debug("Called repo_size of %s object", self)
260 log.debug("Called repo_size of %s object", self)
261 return self._call_hook(hooks_base.repo_size, extras)
261 return self._call_hook(hooks_base.repo_size, extras)
262
262
263 def pre_pull(self, extras):
263 def pre_pull(self, extras):
264 log.debug("Called pre_pull of %s object", self)
264 log.debug("Called pre_pull of %s object", self)
265 return self._call_hook(hooks_base.pre_pull, extras)
265 return self._call_hook(hooks_base.pre_pull, extras)
266
266
267 def post_pull(self, extras):
267 def post_pull(self, extras):
268 log.debug("Called post_pull of %s object", self)
268 log.debug("Called post_pull of %s object", self)
269 return self._call_hook(hooks_base.post_pull, extras)
269 return self._call_hook(hooks_base.post_pull, extras)
270
270
271 def pre_push(self, extras):
271 def pre_push(self, extras):
272 log.debug("Called pre_push of %s object", self)
272 log.debug("Called pre_push of %s object", self)
273 return self._call_hook(hooks_base.pre_push, extras)
273 return self._call_hook(hooks_base.pre_push, extras)
274
274
275 def post_push(self, extras):
275 def post_push(self, extras):
276 log.debug("Called post_push of %s object", self)
276 log.debug("Called post_push of %s object", self)
277 return self._call_hook(hooks_base.post_push, extras)
277 return self._call_hook(hooks_base.post_push, extras)
278
278
279 def _call_hook(self, hook, extras):
279 def _call_hook(self, hook, extras):
280 extras = AttributeDict(extras)
280 extras = AttributeDict(extras)
281 server_url = extras['server_url']
281 server_url = extras['server_url']
282 request = bootstrap_request(application_url=server_url)
282 request = bootstrap_request(application_url=server_url)
283
283
284 bootstrap_config(request) # inject routes and other interfaces
284 bootstrap_config(request) # inject routes and other interfaces
285
285
286 # inject the user for usage in hooks
286 # inject the user for usage in hooks
287 request.user = AttributeDict({'username': extras.username,
287 request.user = AttributeDict({'username': extras.username,
288 'ip_addr': extras.ip,
288 'ip_addr': extras.ip,
289 'user_id': extras.user_id})
289 'user_id': extras.user_id})
290
290
291 extras.request = request
291 extras.request = request
292
292
293 try:
293 try:
294 result = hook(extras)
294 result = hook(extras)
295 if result is None:
295 if result is None:
296 raise Exception(
296 raise Exception(
297 'Failed to obtain hook result from func: {}'.format(hook))
297 'Failed to obtain hook result from func: {}'.format(hook))
298 except HTTPBranchProtected as handled_error:
298 except HTTPBranchProtected as handled_error:
299 # Those special cases doesn't need error reporting. It's a case of
299 # Those special cases doesn't need error reporting. It's a case of
300 # locked repo or protected branch
300 # locked repo or protected branch
301 result = AttributeDict({
301 result = AttributeDict({
302 'status': handled_error.code,
302 'status': handled_error.code,
303 'output': handled_error.explanation
303 'output': handled_error.explanation
304 })
304 })
305 except (HTTPLockedRC, Exception) as error:
305 except (HTTPLockedRC, Exception) as error:
306 # locked needs different handling since we need to also
306 # locked needs different handling since we need to also
307 # handle PULL operations
307 # handle PULL operations
308 exc_tb = ''
308 exc_tb = ''
309 if not isinstance(error, HTTPLockedRC):
309 if not isinstance(error, HTTPLockedRC):
310 exc_tb = traceback.format_exc()
310 exc_tb = traceback.format_exc()
311 log.exception('Exception when handling hook %s', hook)
311 log.exception('Exception when handling hook %s', hook)
312 error_args = error.args
312 error_args = error.args
313 return {
313 return {
314 'status': 128,
314 'status': 128,
315 'output': '',
315 'output': '',
316 'exception': type(error).__name__,
316 'exception': type(error).__name__,
317 'exception_traceback': exc_tb,
317 'exception_traceback': exc_tb,
318 'exception_args': error_args,
318 'exception_args': error_args,
319 }
319 }
320 finally:
320 finally:
321 meta.Session.remove()
321 meta.Session.remove()
322
322
323 log.debug('Got hook call response %s', result)
323 log.debug('Got hook call response %s', result)
324 return {
324 return {
325 'status': result.status,
325 'status': result.status,
326 'output': result.output,
326 'output': result.output,
327 }
327 }
328
328
329 def __enter__(self):
329 def __enter__(self):
330 return self
330 return self
331
331
332 def __exit__(self, exc_type, exc_val, exc_tb):
332 def __exit__(self, exc_type, exc_val, exc_tb):
333 pass
333 pass
General Comments 0
You need to be logged in to leave comments. Login now