http_hooks_deamon.py
287 lines
| 9.6 KiB
| text/x-python
|
PythonLexer
r5325 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
# | ||||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
import os | ||||
import logging | ||||
import traceback | ||||
import threading | ||||
import socket | ||||
import msgpack | ||||
import gevent | ||||
from http.server import BaseHTTPRequestHandler | ||||
from socketserver import TCPServer | ||||
from rhodecode.model import meta | ||||
from rhodecode.lib.ext_json import json | ||||
from rhodecode.lib import rc_cache | ||||
r5459 | from rhodecode.lib.svn_txn_utils import get_txn_id_data_key | |||
r5325 | from rhodecode.lib.hook_daemon.hook_module import Hooks | |||
log = logging.getLogger(__name__) | ||||
class HooksHttpHandler(BaseHTTPRequestHandler): | ||||
JSON_HOOKS_PROTO = 'json.v1' | ||||
MSGPACK_HOOKS_PROTO = 'msgpack.v1' | ||||
# starting with RhodeCode 5.0.0 MsgPack is the default, prior it used json | ||||
DEFAULT_HOOKS_PROTO = MSGPACK_HOOKS_PROTO | ||||
@classmethod | ||||
def serialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO): | ||||
if proto == cls.MSGPACK_HOOKS_PROTO: | ||||
return msgpack.packb(data) | ||||
return json.dumps(data) | ||||
@classmethod | ||||
def deserialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO): | ||||
if proto == cls.MSGPACK_HOOKS_PROTO: | ||||
return msgpack.unpackb(data) | ||||
return json.loads(data) | ||||
def do_POST(self): | ||||
hooks_proto, method, extras = self._read_request() | ||||
log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto) | ||||
txn_id = getattr(self.server, 'txn_id', None) | ||||
if txn_id: | ||||
log.debug('Computing TXN_ID based on `%s`:`%s`', | ||||
extras['repository'], extras['txn_id']) | ||||
computed_txn_id = rc_cache.utils.compute_key_from_params( | ||||
extras['repository'], extras['txn_id']) | ||||
if txn_id != computed_txn_id: | ||||
raise Exception( | ||||
'TXN ID fail: expected {} got {} instead'.format( | ||||
txn_id, computed_txn_id)) | ||||
request = getattr(self.server, 'request', None) | ||||
try: | ||||
hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address)) | ||||
result = self._call_hook_method(hooks, method, extras) | ||||
except Exception as e: | ||||
exc_tb = traceback.format_exc() | ||||
result = { | ||||
'exception': e.__class__.__name__, | ||||
'exception_traceback': exc_tb, | ||||
'exception_args': e.args | ||||
} | ||||
self._write_response(hooks_proto, result) | ||||
def _read_request(self): | ||||
length = int(self.headers['Content-Length']) | ||||
# respect sent headers, fallback to OLD proto for compatability | ||||
hooks_proto = self.headers.get('rc-hooks-protocol') or self.JSON_HOOKS_PROTO | ||||
if hooks_proto == self.MSGPACK_HOOKS_PROTO: | ||||
# support for new vcsserver msgpack based protocol hooks | ||||
body = self.rfile.read(length) | ||||
data = self.deserialize_data(body) | ||||
else: | ||||
body = self.rfile.read(length) | ||||
data = self.deserialize_data(body) | ||||
return hooks_proto, data['method'], data['extras'] | ||||
def _write_response(self, hooks_proto, result): | ||||
self.send_response(200) | ||||
if hooks_proto == self.MSGPACK_HOOKS_PROTO: | ||||
self.send_header("Content-type", "application/msgpack") | ||||
self.end_headers() | ||||
data = self.serialize_data(result) | ||||
self.wfile.write(data) | ||||
else: | ||||
self.send_header("Content-type", "text/json") | ||||
self.end_headers() | ||||
data = self.serialize_data(result) | ||||
self.wfile.write(data) | ||||
def _call_hook_method(self, hooks, method, extras): | ||||
try: | ||||
result = getattr(hooks, method)(extras) | ||||
finally: | ||||
meta.Session.remove() | ||||
return result | ||||
def log_message(self, format, *args): | ||||
""" | ||||
This is an overridden method of BaseHTTPRequestHandler which logs using | ||||
a logging library instead of writing directly to stderr. | ||||
""" | ||||
message = format % args | ||||
log.debug( | ||||
"HOOKS: client=%s - - [%s] %s", self.client_address, | ||||
self.log_date_time_string(), message) | ||||
class ThreadedHookCallbackDaemon(object): | ||||
_callback_thread = None | ||||
_daemon = None | ||||
_done = False | ||||
use_gevent = False | ||||
def __init__(self, txn_id=None, host=None, port=None): | ||||
self._prepare(txn_id=txn_id, host=host, port=port) | ||||
if self.use_gevent: | ||||
self._run_func = self._run_gevent | ||||
self._stop_func = self._stop_gevent | ||||
else: | ||||
self._run_func = self._run | ||||
self._stop_func = self._stop | ||||
def __enter__(self): | ||||
log.debug('Running `%s` callback daemon', self.__class__.__name__) | ||||
self._run_func() | ||||
return self | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
log.debug('Exiting `%s` callback daemon', self.__class__.__name__) | ||||
self._stop_func() | ||||
def _prepare(self, txn_id=None, host=None, port=None): | ||||
raise NotImplementedError() | ||||
def _run(self): | ||||
raise NotImplementedError() | ||||
def _stop(self): | ||||
raise NotImplementedError() | ||||
def _run_gevent(self): | ||||
raise NotImplementedError() | ||||
def _stop_gevent(self): | ||||
raise NotImplementedError() | ||||
class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon): | ||||
""" | ||||
Context manager which will run a callback daemon in a background thread. | ||||
""" | ||||
hooks_uri = None | ||||
# From Python docs: Polling reduces our responsiveness to a shutdown | ||||
# request and wastes cpu at all other times. | ||||
POLL_INTERVAL = 0.01 | ||||
use_gevent = False | ||||
r5459 | def __repr__(self): | |||
return f'HttpHooksCallbackDaemon(hooks_uri={self.hooks_uri})' | ||||
r5325 | @property | |||
def _hook_prefix(self): | ||||
r5459 | return f'HOOKS: {self.hooks_uri} ' | |||
r5325 | ||||
def get_hostname(self): | ||||
return socket.gethostname() or '127.0.0.1' | ||||
def get_available_port(self, min_port=20000, max_port=65535): | ||||
from rhodecode.lib.utils2 import get_available_port as _get_port | ||||
return _get_port(min_port, max_port) | ||||
def _prepare(self, txn_id=None, host=None, port=None): | ||||
from pyramid.threadlocal import get_current_request | ||||
if not host or host == "*": | ||||
host = self.get_hostname() | ||||
if not port: | ||||
port = self.get_available_port() | ||||
server_address = (host, port) | ||||
r5459 | self.hooks_uri = f'{host}:{port}' | |||
r5325 | self.txn_id = txn_id | |||
self._done = False | ||||
log.debug( | ||||
"%s Preparing HTTP callback daemon registering hook object: %s", | ||||
self._hook_prefix, HooksHttpHandler) | ||||
self._daemon = TCPServer(server_address, HooksHttpHandler) | ||||
# inject transaction_id for later verification | ||||
self._daemon.txn_id = self.txn_id | ||||
# pass the WEB app request into daemon | ||||
self._daemon.request = get_current_request() | ||||
def _run(self): | ||||
log.debug("Running thread-based loop of callback daemon in background") | ||||
callback_thread = threading.Thread( | ||||
target=self._daemon.serve_forever, | ||||
kwargs={'poll_interval': self.POLL_INTERVAL}) | ||||
callback_thread.daemon = True | ||||
callback_thread.start() | ||||
self._callback_thread = callback_thread | ||||
def _run_gevent(self): | ||||
log.debug("Running gevent-based loop of callback daemon in background") | ||||
# create a new greenlet for the daemon's serve_forever method | ||||
callback_greenlet = gevent.spawn( | ||||
self._daemon.serve_forever, | ||||
poll_interval=self.POLL_INTERVAL) | ||||
# store reference to greenlet | ||||
self._callback_greenlet = callback_greenlet | ||||
# switch to this greenlet | ||||
gevent.sleep(0.01) | ||||
def _stop(self): | ||||
log.debug("Waiting for background thread to finish.") | ||||
self._daemon.shutdown() | ||||
self._callback_thread.join() | ||||
self._daemon = None | ||||
self._callback_thread = None | ||||
if self.txn_id: | ||||
r5459 | #TODO: figure out the repo_path... | |||
repo_path = '' | ||||
txn_id_file = get_txn_id_data_key(repo_path, self.txn_id) | ||||
r5325 | log.debug('Cleaning up TXN ID %s', txn_id_file) | |||
if os.path.isfile(txn_id_file): | ||||
os.remove(txn_id_file) | ||||
log.debug("Background thread done.") | ||||
def _stop_gevent(self): | ||||
log.debug("Waiting for background greenlet to finish.") | ||||
# if greenlet exists and is running | ||||
if self._callback_greenlet and not self._callback_greenlet.dead: | ||||
# shutdown daemon if it exists | ||||
if self._daemon: | ||||
self._daemon.shutdown() | ||||
# kill the greenlet | ||||
self._callback_greenlet.kill() | ||||
self._daemon = None | ||||
self._callback_greenlet = None | ||||
if self.txn_id: | ||||
r5459 | #TODO: figure out the repo_path... | |||
repo_path = '' | ||||
txn_id_file = get_txn_id_data_key(repo_path, self.txn_id) | ||||
r5325 | log.debug('Cleaning up TXN ID %s', txn_id_file) | |||
if os.path.isfile(txn_id_file): | ||||
os.remove(txn_id_file) | ||||
log.debug("Background greenlet done.") | ||||