hooks_daemon.py
350 lines
| 11.2 KiB
| text/x-python
|
PythonLexer
r1 | # -*- coding: utf-8 -*- | |||
r4306 | # Copyright (C) 2010-2020 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r2677 | import os | |||
import time | ||||
r1 | import logging | |||
r2677 | import tempfile | |||
r1458 | import traceback | |||
r1 | import threading | |||
r4855 | import socket | |||
r2677 | ||||
r1 | from BaseHTTPServer import BaseHTTPRequestHandler | |||
from SocketServer import TCPServer | ||||
r411 | import rhodecode | |||
r2979 | from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected | |||
r669 | from rhodecode.model import meta | |||
r2335 | from rhodecode.lib.base import bootstrap_request, bootstrap_config | |||
r1 | from rhodecode.lib import hooks_base | |||
r2016 | from rhodecode.lib.utils2 import AttributeDict | |||
r2677 | from rhodecode.lib.ext_json import json | |||
r2970 | from rhodecode.lib import rc_cache | |||
r1 | ||||
log = logging.getLogger(__name__) | ||||
class HooksHttpHandler(BaseHTTPRequestHandler): | ||||
r2677 | ||||
r1 | def do_POST(self): | |||
method, extras = self._read_request() | ||||
r2677 | txn_id = getattr(self.server, 'txn_id', None) | |||
if txn_id: | ||||
log.debug('Computing TXN_ID based on `%s`:`%s`', | ||||
extras['repository'], extras['txn_id']) | ||||
r2970 | computed_txn_id = rc_cache.utils.compute_key_from_params( | |||
r2677 | extras['repository'], extras['txn_id']) | |||
if txn_id != computed_txn_id: | ||||
raise Exception( | ||||
'TXN ID fail: expected {} got {} instead'.format( | ||||
txn_id, computed_txn_id)) | ||||
r1 | try: | |||
result = self._call_hook(method, extras) | ||||
except Exception as e: | ||||
r1458 | exc_tb = traceback.format_exc() | |||
r1 | result = { | |||
'exception': e.__class__.__name__, | ||||
r1458 | 'exception_traceback': exc_tb, | |||
r1 | 'exception_args': e.args | |||
} | ||||
self._write_response(result) | ||||
def _read_request(self): | ||||
length = int(self.headers['Content-Length']) | ||||
body = self.rfile.read(length).decode('utf-8') | ||||
data = json.loads(body) | ||||
return data['method'], data['extras'] | ||||
def _write_response(self, result): | ||||
self.send_response(200) | ||||
self.send_header("Content-type", "text/json") | ||||
self.end_headers() | ||||
self.wfile.write(json.dumps(result)) | ||||
def _call_hook(self, method, extras): | ||||
hooks = Hooks() | ||||
r669 | try: | |||
result = getattr(hooks, method)(extras) | ||||
finally: | ||||
meta.Session.remove() | ||||
r1 | return result | |||
def log_message(self, format, *args): | ||||
""" | ||||
r1409 | This is an overridden method of BaseHTTPRequestHandler which logs using | |||
r1 | logging library instead of writing directly to stderr. | |||
""" | ||||
message = format % args | ||||
log.debug( | ||||
"%s - - [%s] %s", self.client_address[0], | ||||
self.log_date_time_string(), message) | ||||
class DummyHooksCallbackDaemon(object): | ||||
r2677 | hooks_uri = '' | |||
r1 | def __init__(self): | |||
self.hooks_module = Hooks.__module__ | ||||
def __enter__(self): | ||||
r3934 | log.debug('Running `%s` callback daemon', self.__class__.__name__) | |||
r1 | return self | |||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
r3934 | log.debug('Exiting `%s` callback daemon', self.__class__.__name__) | |||
r1 | ||||
class ThreadedHookCallbackDaemon(object): | ||||
_callback_thread = None | ||||
_daemon = None | ||||
_done = False | ||||
r2833 | def __init__(self, txn_id=None, host=None, port=None): | |||
r4619 | self._prepare(txn_id=txn_id, host=host, port=port) | |||
r1 | ||||
def __enter__(self): | ||||
r3934 | log.debug('Running `%s` callback daemon', self.__class__.__name__) | |||
r1 | self._run() | |||
return self | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
r3934 | log.debug('Exiting `%s` callback daemon', self.__class__.__name__) | |||
r1 | self._stop() | |||
r2833 | def _prepare(self, txn_id=None, host=None, port=None): | |||
r1 | raise NotImplementedError() | |||
def _run(self): | ||||
raise NotImplementedError() | ||||
def _stop(self): | ||||
raise NotImplementedError() | ||||
class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon): | ||||
""" | ||||
Context manager which will run a callback daemon in a background thread. | ||||
""" | ||||
hooks_uri = None | ||||
# From Python docs: Polling reduces our responsiveness to a shutdown | ||||
# request and wastes cpu at all other times. | ||||
r2264 | POLL_INTERVAL = 0.01 | |||
r1 | ||||
r4859 | def get_hostname(self): | |||
return socket.gethostname() or '127.0.0.1' | ||||
r4856 | def get_available_port(self): | |||
mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
r4859 | mysocket.bind((self.get_hostname(), 0)) | |||
r4855 | port = mysocket.getsockname()[1] | |||
mysocket.close() | ||||
del mysocket | ||||
return port | ||||
r2833 | def _prepare(self, txn_id=None, host=None, port=None): | |||
r4859 | if not host or host == "*": | |||
host = self.get_hostname() | ||||
if not port: | ||||
port = self.get_available_port() | ||||
r4854 | ||||
r4855 | server_address = (host, port) | |||
r2833 | self.hooks_uri = '{}:{}'.format(host, port) | |||
r2677 | self.txn_id = txn_id | |||
r4855 | self._done = False | |||
log.debug( | ||||
"Preparing HTTP callback daemon at `%s` and registering hook object: %s", | ||||
self.hooks_uri, HooksHttpHandler) | ||||
self._daemon = TCPServer(server_address, HooksHttpHandler) | ||||
r2677 | # inject transaction_id for later verification | |||
self._daemon.txn_id = self.txn_id | ||||
r4854 | ||||
r1 | def _run(self): | |||
log.debug("Running event loop of callback daemon in background thread") | ||||
callback_thread = threading.Thread( | ||||
target=self._daemon.serve_forever, | ||||
kwargs={'poll_interval': self.POLL_INTERVAL}) | ||||
callback_thread.daemon = True | ||||
callback_thread.start() | ||||
self._callback_thread = callback_thread | ||||
def _stop(self): | ||||
log.debug("Waiting for background thread to finish.") | ||||
self._daemon.shutdown() | ||||
self._callback_thread.join() | ||||
self._daemon = None | ||||
self._callback_thread = None | ||||
r2677 | if self.txn_id: | |||
txn_id_file = get_txn_id_data_path(self.txn_id) | ||||
log.debug('Cleaning up TXN ID %s', txn_id_file) | ||||
if os.path.isfile(txn_id_file): | ||||
os.remove(txn_id_file) | ||||
r2263 | log.debug("Background thread done.") | |||
r1 | ||||
r2677 | def get_txn_id_data_path(txn_id): | |||
r3021 | import rhodecode | |||
root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir() | ||||
final_dir = os.path.join(root, 'svn_txn_id') | ||||
if not os.path.isdir(final_dir): | ||||
os.makedirs(final_dir) | ||||
return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id)) | ||||
r2677 | ||||
def store_txn_id_data(txn_id, data_dict): | ||||
if not txn_id: | ||||
log.warning('Cannot store txn_id because it is empty') | ||||
return | ||||
path = get_txn_id_data_path(txn_id) | ||||
try: | ||||
with open(path, 'wb') as f: | ||||
f.write(json.dumps(data_dict)) | ||||
except Exception: | ||||
log.exception('Failed to write txn_id metadata') | ||||
r1 | ||||
r2677 | ||||
def get_txn_id_from_store(txn_id): | ||||
""" | ||||
Reads txn_id from store and if present returns the data for callback manager | ||||
""" | ||||
path = get_txn_id_data_path(txn_id) | ||||
try: | ||||
with open(path, 'rb') as f: | ||||
return json.loads(f.read()) | ||||
except Exception: | ||||
return {} | ||||
r2833 | def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None): | |||
r2677 | txn_details = get_txn_id_from_store(txn_id) | |||
port = txn_details.get('port', 0) | ||||
r1 | if use_direct_calls: | |||
callback_daemon = DummyHooksCallbackDaemon() | ||||
extras['hooks_module'] = callback_daemon.hooks_module | ||||
else: | ||||
r1409 | if protocol == 'http': | |||
r2833 | callback_daemon = HttpHooksCallbackDaemon( | |||
txn_id=txn_id, host=host, port=port) | ||||
Martin Bornhold
|
r589 | else: | ||
log.error('Unsupported callback daemon protocol "%s"', protocol) | ||||
raise Exception('Unsupported callback daemon protocol.') | ||||
r2677 | extras['hooks_uri'] = callback_daemon.hooks_uri | |||
extras['hooks_protocol'] = protocol | ||||
extras['time'] = time.time() | ||||
r1 | ||||
r2677 | # register txn_id | |||
extras['txn_id'] = txn_id | ||||
log.debug('Prepared a callback daemon: %s at url `%s`', | ||||
callback_daemon.__class__.__name__, callback_daemon.hooks_uri) | ||||
r1 | return callback_daemon, extras | |||
class Hooks(object): | ||||
""" | ||||
Exposes the hooks for remote call backs | ||||
""" | ||||
def repo_size(self, extras): | ||||
r2016 | log.debug("Called repo_size of %s object", self) | |||
r1 | return self._call_hook(hooks_base.repo_size, extras) | |||
def pre_pull(self, extras): | ||||
r2016 | log.debug("Called pre_pull of %s object", self) | |||
r1 | return self._call_hook(hooks_base.pre_pull, extras) | |||
def post_pull(self, extras): | ||||
r2016 | log.debug("Called post_pull of %s object", self) | |||
r1 | return self._call_hook(hooks_base.post_pull, extras) | |||
def pre_push(self, extras): | ||||
r2016 | log.debug("Called pre_push of %s object", self) | |||
r1 | return self._call_hook(hooks_base.pre_push, extras) | |||
def post_push(self, extras): | ||||
r2016 | log.debug("Called post_push of %s object", self) | |||
r1 | return self._call_hook(hooks_base.post_push, extras) | |||
def _call_hook(self, hook, extras): | ||||
extras = AttributeDict(extras) | ||||
r2187 | server_url = extras['server_url'] | |||
r2335 | request = bootstrap_request(application_url=server_url) | |||
r1960 | ||||
r2335 | bootstrap_config(request) # inject routes and other interfaces | |||
r2418 | ||||
# inject the user for usage in hooks | ||||
request.user = AttributeDict({'username': extras.username, | ||||
'ip_addr': extras.ip, | ||||
'user_id': extras.user_id}) | ||||
r2335 | extras.request = request | |||
r1 | ||||
try: | ||||
result = hook(extras) | ||||
r3093 | if result is None: | |||
raise Exception( | ||||
'Failed to obtain hook result from func: {}'.format(hook)) | ||||
r2979 | except HTTPBranchProtected as handled_error: | |||
# Those special cases doesn't need error reporting. It's a case of | ||||
# locked repo or protected branch | ||||
result = AttributeDict({ | ||||
'status': handled_error.code, | ||||
'output': handled_error.explanation | ||||
}) | ||||
except (HTTPLockedRC, Exception) as error: | ||||
# locked needs different handling since we need to also | ||||
# handle PULL operations | ||||
exc_tb = '' | ||||
if not isinstance(error, HTTPLockedRC): | ||||
exc_tb = traceback.format_exc() | ||||
log.exception('Exception when handling hook %s', hook) | ||||
r1 | error_args = error.args | |||
return { | ||||
'status': 128, | ||||
'output': '', | ||||
'exception': type(error).__name__, | ||||
r1458 | 'exception_traceback': exc_tb, | |||
r1 | 'exception_args': error_args, | |||
} | ||||
r411 | finally: | |||
r670 | meta.Session.remove() | |||
r411 | ||||
r2187 | log.debug('Got hook call response %s', result) | |||
r1 | return { | |||
'status': result.status, | ||||
'output': result.output, | ||||
} | ||||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
pass | ||||