##// END OF EJS Templates
hooks: simplified code
super-admin -
r4856:d9965ed6 default
parent child Browse files
Show More
@@ -1,349 +1,345 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 20
21 21 import os
22 22 import time
23 23 import logging
24 24 import tempfile
25 25 import traceback
26 26 import threading
27 27 import socket
28 28
29 29 from BaseHTTPServer import BaseHTTPRequestHandler
30 30 from SocketServer import TCPServer
31 31
32 32 import rhodecode
33 33 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
34 34 from rhodecode.model import meta
35 35 from rhodecode.lib.base import bootstrap_request, bootstrap_config
36 36 from rhodecode.lib import hooks_base
37 37 from rhodecode.lib.utils2 import AttributeDict
38 38 from rhodecode.lib.ext_json import json
39 39 from rhodecode.lib import rc_cache
40 40
41 41 log = logging.getLogger(__name__)
42 42
43 43
44 44 class HooksHttpHandler(BaseHTTPRequestHandler):
45 45
46 46 def do_POST(self):
47 47 method, extras = self._read_request()
48 48 txn_id = getattr(self.server, 'txn_id', None)
49 49 if txn_id:
50 50 log.debug('Computing TXN_ID based on `%s`:`%s`',
51 51 extras['repository'], extras['txn_id'])
52 52 computed_txn_id = rc_cache.utils.compute_key_from_params(
53 53 extras['repository'], extras['txn_id'])
54 54 if txn_id != computed_txn_id:
55 55 raise Exception(
56 56 'TXN ID fail: expected {} got {} instead'.format(
57 57 txn_id, computed_txn_id))
58 58
59 59 try:
60 60 result = self._call_hook(method, extras)
61 61 except Exception as e:
62 62 exc_tb = traceback.format_exc()
63 63 result = {
64 64 'exception': e.__class__.__name__,
65 65 'exception_traceback': exc_tb,
66 66 'exception_args': e.args
67 67 }
68 68 self._write_response(result)
69 69
70 70 def _read_request(self):
71 71 length = int(self.headers['Content-Length'])
72 72 body = self.rfile.read(length).decode('utf-8')
73 73 data = json.loads(body)
74 74 return data['method'], data['extras']
75 75
76 76 def _write_response(self, result):
77 77 self.send_response(200)
78 78 self.send_header("Content-type", "text/json")
79 79 self.end_headers()
80 80 self.wfile.write(json.dumps(result))
81 81
82 82 def _call_hook(self, method, extras):
83 83 hooks = Hooks()
84 84 try:
85 85 result = getattr(hooks, method)(extras)
86 86 finally:
87 87 meta.Session.remove()
88 88 return result
89 89
90 90 def log_message(self, format, *args):
91 91 """
92 92 This is an overridden method of BaseHTTPRequestHandler which logs using
93 93 logging library instead of writing directly to stderr.
94 94 """
95 95
96 96 message = format % args
97 97
98 98 log.debug(
99 99 "%s - - [%s] %s", self.client_address[0],
100 100 self.log_date_time_string(), message)
101 101
102 102
103 103 class DummyHooksCallbackDaemon(object):
104 104 hooks_uri = ''
105 105
106 106 def __init__(self):
107 107 self.hooks_module = Hooks.__module__
108 108
109 109 def __enter__(self):
110 110 log.debug('Running `%s` callback daemon', self.__class__.__name__)
111 111 return self
112 112
113 113 def __exit__(self, exc_type, exc_val, exc_tb):
114 114 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
115 115
116 116
117 117 class ThreadedHookCallbackDaemon(object):
118 118
119 119 _callback_thread = None
120 120 _daemon = None
121 121 _done = False
122 122
123 123 def __init__(self, txn_id=None, host=None, port=None):
124 124 self._prepare(txn_id=txn_id, host=host, port=port)
125 125
126 126 def __enter__(self):
127 127 log.debug('Running `%s` callback daemon', self.__class__.__name__)
128 128 self._run()
129 129 return self
130 130
131 131 def __exit__(self, exc_type, exc_val, exc_tb):
132 132 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
133 133 self._stop()
134 134
135 135 def _prepare(self, txn_id=None, host=None, port=None):
136 136 raise NotImplementedError()
137 137
138 138 def _run(self):
139 139 raise NotImplementedError()
140 140
141 141 def _stop(self):
142 142 raise NotImplementedError()
143 143
144 144
145 145 class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
146 146 """
147 147 Context manager which will run a callback daemon in a background thread.
148 148 """
149 149
150 150 hooks_uri = None
151 151
152 152 # From Python docs: Polling reduces our responsiveness to a shutdown
153 153 # request and wastes cpu at all other times.
154 154 POLL_INTERVAL = 0.01
155 155
156 def get_available_port():
157 family = socket.AF_INET
158 socktype = socket.SOCK_STREAM
159 host = '127.0.0.1'
160
161 mysocket = socket.socket(family, socktype)
162 mysocket.bind((host, 0))
156 def get_available_port(self):
157 mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
158 mysocket.bind(('127.0.0.1', 0))
163 159 port = mysocket.getsockname()[1]
164 160 mysocket.close()
165 161 del mysocket
166 162 return port
167 163
168 164 def _prepare(self, txn_id=None, host=None, port=None):
169 165
170 166 host = host or '127.0.0.1'
171 167 port = port or self.get_available_port()
172 168 server_address = (host, port)
173 169 self.hooks_uri = '{}:{}'.format(host, port)
174 170 self.txn_id = txn_id
175 171 self._done = False
176 172
177 173 log.debug(
178 174 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
179 175 self.hooks_uri, HooksHttpHandler)
180 176
181 177 self._daemon = TCPServer(server_address, HooksHttpHandler)
182 178 # inject transaction_id for later verification
183 179 self._daemon.txn_id = self.txn_id
184 180
185 181 def _run(self):
186 182 log.debug("Running event loop of callback daemon in background thread")
187 183 callback_thread = threading.Thread(
188 184 target=self._daemon.serve_forever,
189 185 kwargs={'poll_interval': self.POLL_INTERVAL})
190 186 callback_thread.daemon = True
191 187 callback_thread.start()
192 188 self._callback_thread = callback_thread
193 189
194 190 def _stop(self):
195 191 log.debug("Waiting for background thread to finish.")
196 192 self._daemon.shutdown()
197 193 self._callback_thread.join()
198 194 self._daemon = None
199 195 self._callback_thread = None
200 196 if self.txn_id:
201 197 txn_id_file = get_txn_id_data_path(self.txn_id)
202 198 log.debug('Cleaning up TXN ID %s', txn_id_file)
203 199 if os.path.isfile(txn_id_file):
204 200 os.remove(txn_id_file)
205 201
206 202 log.debug("Background thread done.")
207 203
208 204
209 205 def get_txn_id_data_path(txn_id):
210 206 import rhodecode
211 207
212 208 root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
213 209 final_dir = os.path.join(root, 'svn_txn_id')
214 210
215 211 if not os.path.isdir(final_dir):
216 212 os.makedirs(final_dir)
217 213 return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
218 214
219 215
220 216 def store_txn_id_data(txn_id, data_dict):
221 217 if not txn_id:
222 218 log.warning('Cannot store txn_id because it is empty')
223 219 return
224 220
225 221 path = get_txn_id_data_path(txn_id)
226 222 try:
227 223 with open(path, 'wb') as f:
228 224 f.write(json.dumps(data_dict))
229 225 except Exception:
230 226 log.exception('Failed to write txn_id metadata')
231 227
232 228
233 229 def get_txn_id_from_store(txn_id):
234 230 """
235 231 Reads txn_id from store and if present returns the data for callback manager
236 232 """
237 233 path = get_txn_id_data_path(txn_id)
238 234 try:
239 235 with open(path, 'rb') as f:
240 236 return json.loads(f.read())
241 237 except Exception:
242 238 return {}
243 239
244 240
245 241 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
246 242 txn_details = get_txn_id_from_store(txn_id)
247 243 port = txn_details.get('port', 0)
248 244 if use_direct_calls:
249 245 callback_daemon = DummyHooksCallbackDaemon()
250 246 extras['hooks_module'] = callback_daemon.hooks_module
251 247 else:
252 248 if protocol == 'http':
253 249 callback_daemon = HttpHooksCallbackDaemon(
254 250 txn_id=txn_id, host=host, port=port)
255 251 else:
256 252 log.error('Unsupported callback daemon protocol "%s"', protocol)
257 253 raise Exception('Unsupported callback daemon protocol.')
258 254
259 255 extras['hooks_uri'] = callback_daemon.hooks_uri
260 256 extras['hooks_protocol'] = protocol
261 257 extras['time'] = time.time()
262 258
263 259 # register txn_id
264 260 extras['txn_id'] = txn_id
265 261 log.debug('Prepared a callback daemon: %s at url `%s`',
266 262 callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
267 263 return callback_daemon, extras
268 264
269 265
270 266 class Hooks(object):
271 267 """
272 268 Exposes the hooks for remote call backs
273 269 """
274 270
275 271 def repo_size(self, extras):
276 272 log.debug("Called repo_size of %s object", self)
277 273 return self._call_hook(hooks_base.repo_size, extras)
278 274
279 275 def pre_pull(self, extras):
280 276 log.debug("Called pre_pull of %s object", self)
281 277 return self._call_hook(hooks_base.pre_pull, extras)
282 278
283 279 def post_pull(self, extras):
284 280 log.debug("Called post_pull of %s object", self)
285 281 return self._call_hook(hooks_base.post_pull, extras)
286 282
287 283 def pre_push(self, extras):
288 284 log.debug("Called pre_push of %s object", self)
289 285 return self._call_hook(hooks_base.pre_push, extras)
290 286
291 287 def post_push(self, extras):
292 288 log.debug("Called post_push of %s object", self)
293 289 return self._call_hook(hooks_base.post_push, extras)
294 290
295 291 def _call_hook(self, hook, extras):
296 292 extras = AttributeDict(extras)
297 293 server_url = extras['server_url']
298 294 request = bootstrap_request(application_url=server_url)
299 295
300 296 bootstrap_config(request) # inject routes and other interfaces
301 297
302 298 # inject the user for usage in hooks
303 299 request.user = AttributeDict({'username': extras.username,
304 300 'ip_addr': extras.ip,
305 301 'user_id': extras.user_id})
306 302
307 303 extras.request = request
308 304
309 305 try:
310 306 result = hook(extras)
311 307 if result is None:
312 308 raise Exception(
313 309 'Failed to obtain hook result from func: {}'.format(hook))
314 310 except HTTPBranchProtected as handled_error:
315 311 # Those special cases doesn't need error reporting. It's a case of
316 312 # locked repo or protected branch
317 313 result = AttributeDict({
318 314 'status': handled_error.code,
319 315 'output': handled_error.explanation
320 316 })
321 317 except (HTTPLockedRC, Exception) as error:
322 318 # locked needs different handling since we need to also
323 319 # handle PULL operations
324 320 exc_tb = ''
325 321 if not isinstance(error, HTTPLockedRC):
326 322 exc_tb = traceback.format_exc()
327 323 log.exception('Exception when handling hook %s', hook)
328 324 error_args = error.args
329 325 return {
330 326 'status': 128,
331 327 'output': '',
332 328 'exception': type(error).__name__,
333 329 'exception_traceback': exc_tb,
334 330 'exception_args': error_args,
335 331 }
336 332 finally:
337 333 meta.Session.remove()
338 334
339 335 log.debug('Got hook call response %s', result)
340 336 return {
341 337 'status': result.status,
342 338 'output': result.output,
343 339 }
344 340
345 341 def __enter__(self):
346 342 return self
347 343
348 344 def __exit__(self, exc_type, exc_val, exc_tb):
349 345 pass
General Comments 0
You need to be logged in to leave comments. Login now