##// END OF EJS Templates
hooks-daemon: fixed problem with lost hooks value from .ini file.
milka -
r4619:5dcbb0fe default
parent child Browse files
Show More
@@ -1,333 +1,333 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27
28 28 from BaseHTTPServer import BaseHTTPRequestHandler
29 29 from SocketServer import TCPServer
30 30
31 31 import rhodecode
32 32 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
33 33 from rhodecode.model import meta
34 34 from rhodecode.lib.base import bootstrap_request, bootstrap_config
35 35 from rhodecode.lib import hooks_base
36 36 from rhodecode.lib.utils2 import AttributeDict
37 37 from rhodecode.lib.ext_json import json
38 38 from rhodecode.lib import rc_cache
39 39
40 40 log = logging.getLogger(__name__)
41 41
42 42
43 43 class HooksHttpHandler(BaseHTTPRequestHandler):
44 44
45 45 def do_POST(self):
46 46 method, extras = self._read_request()
47 47 txn_id = getattr(self.server, 'txn_id', None)
48 48 if txn_id:
49 49 log.debug('Computing TXN_ID based on `%s`:`%s`',
50 50 extras['repository'], extras['txn_id'])
51 51 computed_txn_id = rc_cache.utils.compute_key_from_params(
52 52 extras['repository'], extras['txn_id'])
53 53 if txn_id != computed_txn_id:
54 54 raise Exception(
55 55 'TXN ID fail: expected {} got {} instead'.format(
56 56 txn_id, computed_txn_id))
57 57
58 58 try:
59 59 result = self._call_hook(method, extras)
60 60 except Exception as e:
61 61 exc_tb = traceback.format_exc()
62 62 result = {
63 63 'exception': e.__class__.__name__,
64 64 'exception_traceback': exc_tb,
65 65 'exception_args': e.args
66 66 }
67 67 self._write_response(result)
68 68
69 69 def _read_request(self):
70 70 length = int(self.headers['Content-Length'])
71 71 body = self.rfile.read(length).decode('utf-8')
72 72 data = json.loads(body)
73 73 return data['method'], data['extras']
74 74
75 75 def _write_response(self, result):
76 76 self.send_response(200)
77 77 self.send_header("Content-type", "text/json")
78 78 self.end_headers()
79 79 self.wfile.write(json.dumps(result))
80 80
81 81 def _call_hook(self, method, extras):
82 82 hooks = Hooks()
83 83 try:
84 84 result = getattr(hooks, method)(extras)
85 85 finally:
86 86 meta.Session.remove()
87 87 return result
88 88
89 89 def log_message(self, format, *args):
90 90 """
91 91 This is an overridden method of BaseHTTPRequestHandler which logs using
92 92 logging library instead of writing directly to stderr.
93 93 """
94 94
95 95 message = format % args
96 96
97 97 log.debug(
98 98 "%s - - [%s] %s", self.client_address[0],
99 99 self.log_date_time_string(), message)
100 100
101 101
102 102 class DummyHooksCallbackDaemon(object):
103 103 hooks_uri = ''
104 104
105 105 def __init__(self):
106 106 self.hooks_module = Hooks.__module__
107 107
108 108 def __enter__(self):
109 109 log.debug('Running `%s` callback daemon', self.__class__.__name__)
110 110 return self
111 111
112 112 def __exit__(self, exc_type, exc_val, exc_tb):
113 113 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
114 114
115 115
116 116 class ThreadedHookCallbackDaemon(object):
117 117
118 118 _callback_thread = None
119 119 _daemon = None
120 120 _done = False
121 121
122 122 def __init__(self, txn_id=None, host=None, port=None):
123 self._prepare(txn_id=txn_id, host=None, port=port)
123 self._prepare(txn_id=txn_id, host=host, port=port)
124 124
125 125 def __enter__(self):
126 126 log.debug('Running `%s` callback daemon', self.__class__.__name__)
127 127 self._run()
128 128 return self
129 129
130 130 def __exit__(self, exc_type, exc_val, exc_tb):
131 131 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
132 132 self._stop()
133 133
134 134 def _prepare(self, txn_id=None, host=None, port=None):
135 135 raise NotImplementedError()
136 136
137 137 def _run(self):
138 138 raise NotImplementedError()
139 139
140 140 def _stop(self):
141 141 raise NotImplementedError()
142 142
143 143
144 144 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
145 145 """
146 146 Context manager which will run a callback daemon in a background thread.
147 147 """
148 148
149 149 hooks_uri = None
150 150
151 151 # From Python docs: Polling reduces our responsiveness to a shutdown
152 152 # request and wastes cpu at all other times.
153 153 POLL_INTERVAL = 0.01
154 154
155 155 def _prepare(self, txn_id=None, host=None, port=None):
156 156 host = host or '127.0.0.1'
157 157 self._done = False
158 158 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
159 159 _, port = self._daemon.server_address
160 160 self.hooks_uri = '{}:{}'.format(host, port)
161 161 self.txn_id = txn_id
162 162 # inject transaction_id for later verification
163 163 self._daemon.txn_id = self.txn_id
164 164
165 165 log.debug(
166 166 "Preparing HTTP callback daemon at `%s` and registering hook object",
167 167 self.hooks_uri)
168 168
169 169 def _run(self):
170 170 log.debug("Running event loop of callback daemon in background thread")
171 171 callback_thread = threading.Thread(
172 172 target=self._daemon.serve_forever,
173 173 kwargs={'poll_interval': self.POLL_INTERVAL})
174 174 callback_thread.daemon = True
175 175 callback_thread.start()
176 176 self._callback_thread = callback_thread
177 177
178 178 def _stop(self):
179 179 log.debug("Waiting for background thread to finish.")
180 180 self._daemon.shutdown()
181 181 self._callback_thread.join()
182 182 self._daemon = None
183 183 self._callback_thread = None
184 184 if self.txn_id:
185 185 txn_id_file = get_txn_id_data_path(self.txn_id)
186 186 log.debug('Cleaning up TXN ID %s', txn_id_file)
187 187 if os.path.isfile(txn_id_file):
188 188 os.remove(txn_id_file)
189 189
190 190 log.debug("Background thread done.")
191 191
192 192
193 193 def get_txn_id_data_path(txn_id):
194 194 import rhodecode
195 195
196 196 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
197 197 final_dir = os.path.join(root, 'svn_txn_id')
198 198
199 199 if not os.path.isdir(final_dir):
200 200 os.makedirs(final_dir)
201 201 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
202 202
203 203
204 204 def store_txn_id_data(txn_id, data_dict):
205 205 if not txn_id:
206 206 log.warning('Cannot store txn_id because it is empty')
207 207 return
208 208
209 209 path = get_txn_id_data_path(txn_id)
210 210 try:
211 211 with open(path, 'wb') as f:
212 212 f.write(json.dumps(data_dict))
213 213 except Exception:
214 214 log.exception('Failed to write txn_id metadata')
215 215
216 216
217 217 def get_txn_id_from_store(txn_id):
218 218 """
219 219 Reads txn_id from store and if present returns the data for callback manager
220 220 """
221 221 path = get_txn_id_data_path(txn_id)
222 222 try:
223 223 with open(path, 'rb') as f:
224 224 return json.loads(f.read())
225 225 except Exception:
226 226 return {}
227 227
228 228
229 229 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
230 230 txn_details = get_txn_id_from_store(txn_id)
231 231 port = txn_details.get('port', 0)
232 232 if use_direct_calls:
233 233 callback_daemon = DummyHooksCallbackDaemon()
234 234 extras['hooks_module'] = callback_daemon.hooks_module
235 235 else:
236 236 if protocol == 'http':
237 237 callback_daemon = HttpHooksCallbackDaemon(
238 238 txn_id=txn_id, host=host, port=port)
239 239 else:
240 240 log.error('Unsupported callback daemon protocol "%s"', protocol)
241 241 raise Exception('Unsupported callback daemon protocol.')
242 242
243 243 extras['hooks_uri'] = callback_daemon.hooks_uri
244 244 extras['hooks_protocol'] = protocol
245 245 extras['time'] = time.time()
246 246
247 247 # register txn_id
248 248 extras['txn_id'] = txn_id
249 249 log.debug('Prepared a callback daemon: %s at url `%s`',
250 250 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
251 251 return callback_daemon, extras
252 252
253 253
254 254 class Hooks(object):
255 255 """
256 256 Exposes the hooks for remote call backs
257 257 """
258 258
259 259 def repo_size(self, extras):
260 260 log.debug("Called repo_size of %s object", self)
261 261 return self._call_hook(hooks_base.repo_size, extras)
262 262
263 263 def pre_pull(self, extras):
264 264 log.debug("Called pre_pull of %s object", self)
265 265 return self._call_hook(hooks_base.pre_pull, extras)
266 266
267 267 def post_pull(self, extras):
268 268 log.debug("Called post_pull of %s object", self)
269 269 return self._call_hook(hooks_base.post_pull, extras)
270 270
271 271 def pre_push(self, extras):
272 272 log.debug("Called pre_push of %s object", self)
273 273 return self._call_hook(hooks_base.pre_push, extras)
274 274
275 275 def post_push(self, extras):
276 276 log.debug("Called post_push of %s object", self)
277 277 return self._call_hook(hooks_base.post_push, extras)
278 278
279 279 def _call_hook(self, hook, extras):
280 280 extras = AttributeDict(extras)
281 281 server_url = extras['server_url']
282 282 request = bootstrap_request(application_url=server_url)
283 283
284 284 bootstrap_config(request) # inject routes and other interfaces
285 285
286 286 # inject the user for usage in hooks
287 287 request.user = AttributeDict({'username': extras.username,
288 288 'ip_addr': extras.ip,
289 289 'user_id': extras.user_id})
290 290
291 291 extras.request = request
292 292
293 293 try:
294 294 result = hook(extras)
295 295 if result is None:
296 296 raise Exception(
297 297 'Failed to obtain hook result from func: {}'.format(hook))
298 298 except HTTPBranchProtected as handled_error:
299 299 # Those special cases doesn't need error reporting. It's a case of
300 300 # locked repo or protected branch
301 301 result = AttributeDict({
302 302 'status': handled_error.code,
303 303 'output': handled_error.explanation
304 304 })
305 305 except (HTTPLockedRC, Exception) as error:
306 306 # locked needs different handling since we need to also
307 307 # handle PULL operations
308 308 exc_tb = ''
309 309 if not isinstance(error, HTTPLockedRC):
310 310 exc_tb = traceback.format_exc()
311 311 log.exception('Exception when handling hook %s', hook)
312 312 error_args = error.args
313 313 return {
314 314 'status': 128,
315 315 'output': '',
316 316 'exception': type(error).__name__,
317 317 'exception_traceback': exc_tb,
318 318 'exception_args': error_args,
319 319 }
320 320 finally:
321 321 meta.Session.remove()
322 322
323 323 log.debug('Got hook call response %s', result)
324 324 return {
325 325 'status': result.status,
326 326 'output': result.output,
327 327 }
328 328
329 329 def __enter__(self):
330 330 return self
331 331
332 332 def __exit__(self, exc_type, exc_val, exc_tb):
333 333 pass
General Comments 0
You need to be logged in to leave comments. Login now