# Copyright (C) 2010-2023 RhodeCode GmbH # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License, version 3 # (only), as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # # This program is dual-licensed. If you wish to learn more about the # RhodeCode Enterprise Edition, including its added features, Support services, # and proprietary license terms, please see https://rhodecode.com/licenses/ import os import logging import traceback import threading import socket import msgpack import gevent from http.server import BaseHTTPRequestHandler from socketserver import TCPServer from rhodecode.model import meta from rhodecode.lib.ext_json import json from rhodecode.lib import rc_cache from rhodecode.lib.hook_daemon.base import get_txn_id_data_path from rhodecode.lib.hook_daemon.hook_module import Hooks log = logging.getLogger(__name__) class HooksHttpHandler(BaseHTTPRequestHandler): JSON_HOOKS_PROTO = 'json.v1' MSGPACK_HOOKS_PROTO = 'msgpack.v1' # starting with RhodeCode 5.0.0 MsgPack is the default, prior it used json DEFAULT_HOOKS_PROTO = MSGPACK_HOOKS_PROTO @classmethod def serialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO): if proto == cls.MSGPACK_HOOKS_PROTO: return msgpack.packb(data) return json.dumps(data) @classmethod def deserialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO): if proto == cls.MSGPACK_HOOKS_PROTO: return msgpack.unpackb(data) return json.loads(data) def do_POST(self): hooks_proto, method, extras = self._read_request() log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto) txn_id = getattr(self.server, 'txn_id', None) if txn_id: log.debug('Computing TXN_ID based on `%s`:`%s`', extras['repository'], extras['txn_id']) computed_txn_id = rc_cache.utils.compute_key_from_params( extras['repository'], extras['txn_id']) if txn_id != computed_txn_id: raise Exception( 'TXN ID fail: expected {} got {} instead'.format( txn_id, computed_txn_id)) request = getattr(self.server, 'request', None) try: hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address)) result = self._call_hook_method(hooks, method, extras) except Exception as e: exc_tb = traceback.format_exc() result = { 'exception': e.__class__.__name__, 'exception_traceback': exc_tb, 'exception_args': e.args } self._write_response(hooks_proto, result) def _read_request(self): length = int(self.headers['Content-Length']) # respect sent headers, fallback to OLD proto for compatability hooks_proto = self.headers.get('rc-hooks-protocol') or self.JSON_HOOKS_PROTO if hooks_proto == self.MSGPACK_HOOKS_PROTO: # support for new vcsserver msgpack based protocol hooks body = self.rfile.read(length) data = self.deserialize_data(body) else: body = self.rfile.read(length) data = self.deserialize_data(body) return hooks_proto, data['method'], data['extras'] def _write_response(self, hooks_proto, result): self.send_response(200) if hooks_proto == self.MSGPACK_HOOKS_PROTO: self.send_header("Content-type", "application/msgpack") self.end_headers() data = self.serialize_data(result) self.wfile.write(data) else: self.send_header("Content-type", "text/json") self.end_headers() data = self.serialize_data(result) self.wfile.write(data) def _call_hook_method(self, hooks, method, extras): try: result = getattr(hooks, method)(extras) finally: meta.Session.remove() return result def log_message(self, format, *args): """ This is an overridden method of BaseHTTPRequestHandler which logs using a logging library instead of writing directly to stderr. """ message = format % args log.debug( "HOOKS: client=%s - - [%s] %s", self.client_address, self.log_date_time_string(), message) class ThreadedHookCallbackDaemon(object): _callback_thread = None _daemon = None _done = False use_gevent = False def __init__(self, txn_id=None, host=None, port=None): self._prepare(txn_id=txn_id, host=host, port=port) if self.use_gevent: self._run_func = self._run_gevent self._stop_func = self._stop_gevent else: self._run_func = self._run self._stop_func = self._stop def __enter__(self): log.debug('Running `%s` callback daemon', self.__class__.__name__) self._run_func() return self def __exit__(self, exc_type, exc_val, exc_tb): log.debug('Exiting `%s` callback daemon', self.__class__.__name__) self._stop_func() def _prepare(self, txn_id=None, host=None, port=None): raise NotImplementedError() def _run(self): raise NotImplementedError() def _stop(self): raise NotImplementedError() def _run_gevent(self): raise NotImplementedError() def _stop_gevent(self): raise NotImplementedError() class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon): """ Context manager which will run a callback daemon in a background thread. """ hooks_uri = None # From Python docs: Polling reduces our responsiveness to a shutdown # request and wastes cpu at all other times. POLL_INTERVAL = 0.01 use_gevent = False @property def _hook_prefix(self): return 'HOOKS: {} '.format(self.hooks_uri) def get_hostname(self): return socket.gethostname() or '127.0.0.1' def get_available_port(self, min_port=20000, max_port=65535): from rhodecode.lib.utils2 import get_available_port as _get_port return _get_port(min_port, max_port) def _prepare(self, txn_id=None, host=None, port=None): from pyramid.threadlocal import get_current_request if not host or host == "*": host = self.get_hostname() if not port: port = self.get_available_port() server_address = (host, port) self.hooks_uri = '{}:{}'.format(host, port) self.txn_id = txn_id self._done = False log.debug( "%s Preparing HTTP callback daemon registering hook object: %s", self._hook_prefix, HooksHttpHandler) self._daemon = TCPServer(server_address, HooksHttpHandler) # inject transaction_id for later verification self._daemon.txn_id = self.txn_id # pass the WEB app request into daemon self._daemon.request = get_current_request() def _run(self): log.debug("Running thread-based loop of callback daemon in background") callback_thread = threading.Thread( target=self._daemon.serve_forever, kwargs={'poll_interval': self.POLL_INTERVAL}) callback_thread.daemon = True callback_thread.start() self._callback_thread = callback_thread def _run_gevent(self): log.debug("Running gevent-based loop of callback daemon in background") # create a new greenlet for the daemon's serve_forever method callback_greenlet = gevent.spawn( self._daemon.serve_forever, poll_interval=self.POLL_INTERVAL) # store reference to greenlet self._callback_greenlet = callback_greenlet # switch to this greenlet gevent.sleep(0.01) def _stop(self): log.debug("Waiting for background thread to finish.") self._daemon.shutdown() self._callback_thread.join() self._daemon = None self._callback_thread = None if self.txn_id: txn_id_file = get_txn_id_data_path(self.txn_id) log.debug('Cleaning up TXN ID %s', txn_id_file) if os.path.isfile(txn_id_file): os.remove(txn_id_file) log.debug("Background thread done.") def _stop_gevent(self): log.debug("Waiting for background greenlet to finish.") # if greenlet exists and is running if self._callback_greenlet and not self._callback_greenlet.dead: # shutdown daemon if it exists if self._daemon: self._daemon.shutdown() # kill the greenlet self._callback_greenlet.kill() self._daemon = None self._callback_greenlet = None if self.txn_id: txn_id_file = get_txn_id_data_path(self.txn_id) log.debug('Cleaning up TXN ID %s', txn_id_file) if os.path.isfile(txn_id_file): os.remove(txn_id_file) log.debug("Background greenlet done.")