|
|
# Copyright (C) 2010-2023 RhodeCode GmbH
|
|
|
#
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
# it under the terms of the GNU Affero General Public License, version 3
|
|
|
# (only), as published by the Free Software Foundation.
|
|
|
#
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
# GNU General Public License for more details.
|
|
|
#
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
#
|
|
|
# This program is dual-licensed. If you wish to learn more about the
|
|
|
# RhodeCode Enterprise Edition, including its added features, Support services,
|
|
|
# and proprietary license terms, please see https://rhodecode.com/licenses/
|
|
|
|
|
|
import os
|
|
|
import time
|
|
|
import logging
|
|
|
import tempfile
|
|
|
import traceback
|
|
|
import threading
|
|
|
import socket
|
|
|
import msgpack
|
|
|
import gevent
|
|
|
|
|
|
from http.server import BaseHTTPRequestHandler
|
|
|
from socketserver import TCPServer
|
|
|
|
|
|
import rhodecode
|
|
|
from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
|
|
|
from rhodecode.model import meta
|
|
|
from rhodecode.lib import hooks_base
|
|
|
from rhodecode.lib.utils2 import AttributeDict
|
|
|
from rhodecode.lib.ext_json import json
|
|
|
from rhodecode.lib import rc_cache
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class HooksHttpHandler(BaseHTTPRequestHandler):
|
|
|
|
|
|
JSON_HOOKS_PROTO = 'json.v1'
|
|
|
MSGPACK_HOOKS_PROTO = 'msgpack.v1'
|
|
|
# starting with RhodeCode 5.0.0 MsgPack is the default, prior it used json
|
|
|
DEFAULT_HOOKS_PROTO = MSGPACK_HOOKS_PROTO
|
|
|
|
|
|
@classmethod
|
|
|
def serialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO):
|
|
|
if proto == cls.MSGPACK_HOOKS_PROTO:
|
|
|
return msgpack.packb(data)
|
|
|
return json.dumps(data)
|
|
|
|
|
|
@classmethod
|
|
|
def deserialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO):
|
|
|
if proto == cls.MSGPACK_HOOKS_PROTO:
|
|
|
return msgpack.unpackb(data)
|
|
|
return json.loads(data)
|
|
|
|
|
|
def do_POST(self):
|
|
|
hooks_proto, method, extras = self._read_request()
|
|
|
log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto)
|
|
|
|
|
|
txn_id = getattr(self.server, 'txn_id', None)
|
|
|
if txn_id:
|
|
|
log.debug('Computing TXN_ID based on `%s`:`%s`',
|
|
|
extras['repository'], extras['txn_id'])
|
|
|
computed_txn_id = rc_cache.utils.compute_key_from_params(
|
|
|
extras['repository'], extras['txn_id'])
|
|
|
if txn_id != computed_txn_id:
|
|
|
raise Exception(
|
|
|
'TXN ID fail: expected {} got {} instead'.format(
|
|
|
txn_id, computed_txn_id))
|
|
|
|
|
|
request = getattr(self.server, 'request', None)
|
|
|
try:
|
|
|
hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
|
|
|
result = self._call_hook_method(hooks, method, extras)
|
|
|
|
|
|
except Exception as e:
|
|
|
exc_tb = traceback.format_exc()
|
|
|
result = {
|
|
|
'exception': e.__class__.__name__,
|
|
|
'exception_traceback': exc_tb,
|
|
|
'exception_args': e.args
|
|
|
}
|
|
|
self._write_response(hooks_proto, result)
|
|
|
|
|
|
def _read_request(self):
|
|
|
length = int(self.headers['Content-Length'])
|
|
|
# respect sent headers, fallback to OLD proto for compatability
|
|
|
hooks_proto = self.headers.get('rc-hooks-protocol') or self.JSON_HOOKS_PROTO
|
|
|
if hooks_proto == self.MSGPACK_HOOKS_PROTO:
|
|
|
# support for new vcsserver msgpack based protocol hooks
|
|
|
body = self.rfile.read(length)
|
|
|
data = self.deserialize_data(body)
|
|
|
else:
|
|
|
body = self.rfile.read(length)
|
|
|
data = self.deserialize_data(body)
|
|
|
|
|
|
return hooks_proto, data['method'], data['extras']
|
|
|
|
|
|
def _write_response(self, hooks_proto, result):
|
|
|
self.send_response(200)
|
|
|
if hooks_proto == self.MSGPACK_HOOKS_PROTO:
|
|
|
self.send_header("Content-type", "application/msgpack")
|
|
|
self.end_headers()
|
|
|
data = self.serialize_data(result)
|
|
|
self.wfile.write(data)
|
|
|
else:
|
|
|
self.send_header("Content-type", "text/json")
|
|
|
self.end_headers()
|
|
|
data = self.serialize_data(result)
|
|
|
self.wfile.write(data)
|
|
|
|
|
|
def _call_hook_method(self, hooks, method, extras):
|
|
|
try:
|
|
|
result = getattr(hooks, method)(extras)
|
|
|
finally:
|
|
|
meta.Session.remove()
|
|
|
return result
|
|
|
|
|
|
def log_message(self, format, *args):
|
|
|
"""
|
|
|
This is an overridden method of BaseHTTPRequestHandler which logs using
|
|
|
logging library instead of writing directly to stderr.
|
|
|
"""
|
|
|
|
|
|
message = format % args
|
|
|
|
|
|
log.debug(
|
|
|
"HOOKS: client=%s - - [%s] %s", self.client_address,
|
|
|
self.log_date_time_string(), message)
|
|
|
|
|
|
|
|
|
class DummyHooksCallbackDaemon(object):
|
|
|
hooks_uri = ''
|
|
|
|
|
|
def __init__(self):
|
|
|
self.hooks_module = Hooks.__module__
|
|
|
|
|
|
def __enter__(self):
|
|
|
log.debug('Running `%s` callback daemon', self.__class__.__name__)
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
|
|
|
|
|
|
|
|
|
class ThreadedHookCallbackDaemon(object):
|
|
|
|
|
|
_callback_thread = None
|
|
|
_daemon = None
|
|
|
_done = False
|
|
|
use_gevent = False
|
|
|
|
|
|
def __init__(self, txn_id=None, host=None, port=None):
|
|
|
self._prepare(txn_id=txn_id, host=host, port=port)
|
|
|
if self.use_gevent:
|
|
|
self._run_func = self._run_gevent
|
|
|
self._stop_func = self._stop_gevent
|
|
|
else:
|
|
|
self._run_func = self._run
|
|
|
self._stop_func = self._stop
|
|
|
|
|
|
def __enter__(self):
|
|
|
log.debug('Running `%s` callback daemon', self.__class__.__name__)
|
|
|
self._run_func()
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
|
|
|
self._stop_func()
|
|
|
|
|
|
def _prepare(self, txn_id=None, host=None, port=None):
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
def _run(self):
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
def _stop(self):
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
def _run_gevent(self):
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
def _stop_gevent(self):
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
|
|
|
"""
|
|
|
Context manager which will run a callback daemon in a background thread.
|
|
|
"""
|
|
|
|
|
|
hooks_uri = None
|
|
|
|
|
|
# From Python docs: Polling reduces our responsiveness to a shutdown
|
|
|
# request and wastes cpu at all other times.
|
|
|
POLL_INTERVAL = 0.01
|
|
|
|
|
|
use_gevent = False
|
|
|
|
|
|
@property
|
|
|
def _hook_prefix(self):
|
|
|
return 'HOOKS: {} '.format(self.hooks_uri)
|
|
|
|
|
|
def get_hostname(self):
|
|
|
return socket.gethostname() or '127.0.0.1'
|
|
|
|
|
|
def get_available_port(self, min_port=20000, max_port=65535):
|
|
|
from rhodecode.lib.utils2 import get_available_port as _get_port
|
|
|
return _get_port(min_port, max_port)
|
|
|
|
|
|
def _prepare(self, txn_id=None, host=None, port=None):
|
|
|
from pyramid.threadlocal import get_current_request
|
|
|
|
|
|
if not host or host == "*":
|
|
|
host = self.get_hostname()
|
|
|
if not port:
|
|
|
port = self.get_available_port()
|
|
|
|
|
|
server_address = (host, port)
|
|
|
self.hooks_uri = '{}:{}'.format(host, port)
|
|
|
self.txn_id = txn_id
|
|
|
self._done = False
|
|
|
|
|
|
log.debug(
|
|
|
"%s Preparing HTTP callback daemon registering hook object: %s",
|
|
|
self._hook_prefix, HooksHttpHandler)
|
|
|
|
|
|
self._daemon = TCPServer(server_address, HooksHttpHandler)
|
|
|
# inject transaction_id for later verification
|
|
|
self._daemon.txn_id = self.txn_id
|
|
|
|
|
|
# pass the WEB app request into daemon
|
|
|
self._daemon.request = get_current_request()
|
|
|
|
|
|
def _run(self):
|
|
|
log.debug("Running thread-based loop of callback daemon in background")
|
|
|
callback_thread = threading.Thread(
|
|
|
target=self._daemon.serve_forever,
|
|
|
kwargs={'poll_interval': self.POLL_INTERVAL})
|
|
|
callback_thread.daemon = True
|
|
|
callback_thread.start()
|
|
|
self._callback_thread = callback_thread
|
|
|
|
|
|
def _run_gevent(self):
|
|
|
log.debug("Running gevent-based loop of callback daemon in background")
|
|
|
# create a new greenlet for the daemon's serve_forever method
|
|
|
callback_greenlet = gevent.spawn(
|
|
|
self._daemon.serve_forever,
|
|
|
poll_interval=self.POLL_INTERVAL)
|
|
|
|
|
|
# store reference to greenlet
|
|
|
self._callback_greenlet = callback_greenlet
|
|
|
|
|
|
# switch to this greenlet
|
|
|
gevent.sleep(0.01)
|
|
|
|
|
|
def _stop(self):
|
|
|
log.debug("Waiting for background thread to finish.")
|
|
|
self._daemon.shutdown()
|
|
|
self._callback_thread.join()
|
|
|
self._daemon = None
|
|
|
self._callback_thread = None
|
|
|
if self.txn_id:
|
|
|
txn_id_file = get_txn_id_data_path(self.txn_id)
|
|
|
log.debug('Cleaning up TXN ID %s', txn_id_file)
|
|
|
if os.path.isfile(txn_id_file):
|
|
|
os.remove(txn_id_file)
|
|
|
|
|
|
log.debug("Background thread done.")
|
|
|
|
|
|
def _stop_gevent(self):
|
|
|
log.debug("Waiting for background greenlet to finish.")
|
|
|
|
|
|
# if greenlet exists and is running
|
|
|
if self._callback_greenlet and not self._callback_greenlet.dead:
|
|
|
# shutdown daemon if it exists
|
|
|
if self._daemon:
|
|
|
self._daemon.shutdown()
|
|
|
|
|
|
# kill the greenlet
|
|
|
self._callback_greenlet.kill()
|
|
|
|
|
|
self._daemon = None
|
|
|
self._callback_greenlet = None
|
|
|
|
|
|
if self.txn_id:
|
|
|
txn_id_file = get_txn_id_data_path(self.txn_id)
|
|
|
log.debug('Cleaning up TXN ID %s', txn_id_file)
|
|
|
if os.path.isfile(txn_id_file):
|
|
|
os.remove(txn_id_file)
|
|
|
|
|
|
log.debug("Background greenlet done.")
|
|
|
|
|
|
|
|
|
def get_txn_id_data_path(txn_id):
|
|
|
import rhodecode
|
|
|
|
|
|
root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
|
|
|
final_dir = os.path.join(root, 'svn_txn_id')
|
|
|
|
|
|
if not os.path.isdir(final_dir):
|
|
|
os.makedirs(final_dir)
|
|
|
return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
|
|
|
|
|
|
|
|
|
def store_txn_id_data(txn_id, data_dict):
|
|
|
if not txn_id:
|
|
|
log.warning('Cannot store txn_id because it is empty')
|
|
|
return
|
|
|
|
|
|
path = get_txn_id_data_path(txn_id)
|
|
|
try:
|
|
|
with open(path, 'wb') as f:
|
|
|
f.write(json.dumps(data_dict))
|
|
|
except Exception:
|
|
|
log.exception('Failed to write txn_id metadata')
|
|
|
|
|
|
|
|
|
def get_txn_id_from_store(txn_id):
|
|
|
"""
|
|
|
Reads txn_id from store and if present returns the data for callback manager
|
|
|
"""
|
|
|
path = get_txn_id_data_path(txn_id)
|
|
|
try:
|
|
|
with open(path, 'rb') as f:
|
|
|
return json.loads(f.read())
|
|
|
except Exception:
|
|
|
return {}
|
|
|
|
|
|
|
|
|
def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
|
|
|
txn_details = get_txn_id_from_store(txn_id)
|
|
|
port = txn_details.get('port', 0)
|
|
|
if use_direct_calls:
|
|
|
callback_daemon = DummyHooksCallbackDaemon()
|
|
|
extras['hooks_module'] = callback_daemon.hooks_module
|
|
|
else:
|
|
|
if protocol == 'http':
|
|
|
callback_daemon = HttpHooksCallbackDaemon(
|
|
|
txn_id=txn_id, host=host, port=port)
|
|
|
else:
|
|
|
log.error('Unsupported callback daemon protocol "%s"', protocol)
|
|
|
raise Exception('Unsupported callback daemon protocol.')
|
|
|
|
|
|
extras['hooks_uri'] = callback_daemon.hooks_uri
|
|
|
extras['hooks_protocol'] = protocol
|
|
|
extras['time'] = time.time()
|
|
|
|
|
|
# register txn_id
|
|
|
extras['txn_id'] = txn_id
|
|
|
log.debug('Prepared a callback daemon: %s at url `%s`',
|
|
|
callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
|
|
|
return callback_daemon, extras
|
|
|
|
|
|
|
|
|
class Hooks(object):
|
|
|
"""
|
|
|
Exposes the hooks for remote call backs
|
|
|
"""
|
|
|
def __init__(self, request=None, log_prefix=''):
|
|
|
self.log_prefix = log_prefix
|
|
|
self.request = request
|
|
|
|
|
|
def repo_size(self, extras):
|
|
|
log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
|
|
|
return self._call_hook(hooks_base.repo_size, extras)
|
|
|
|
|
|
def pre_pull(self, extras):
|
|
|
log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
|
|
|
return self._call_hook(hooks_base.pre_pull, extras)
|
|
|
|
|
|
def post_pull(self, extras):
|
|
|
log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
|
|
|
return self._call_hook(hooks_base.post_pull, extras)
|
|
|
|
|
|
def pre_push(self, extras):
|
|
|
log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
|
|
|
return self._call_hook(hooks_base.pre_push, extras)
|
|
|
|
|
|
def post_push(self, extras):
|
|
|
log.debug("%sCalled post_push of %s object", self.log_prefix, self)
|
|
|
return self._call_hook(hooks_base.post_push, extras)
|
|
|
|
|
|
def _call_hook(self, hook, extras):
|
|
|
extras = AttributeDict(extras)
|
|
|
server_url = extras['server_url']
|
|
|
|
|
|
extras.request = self.request
|
|
|
|
|
|
try:
|
|
|
result = hook(extras)
|
|
|
if result is None:
|
|
|
raise Exception(
|
|
|
'Failed to obtain hook result from func: {}'.format(hook))
|
|
|
except HTTPBranchProtected as handled_error:
|
|
|
# Those special cases doesn't need error reporting. It's a case of
|
|
|
# locked repo or protected branch
|
|
|
result = AttributeDict({
|
|
|
'status': handled_error.code,
|
|
|
'output': handled_error.explanation
|
|
|
})
|
|
|
except (HTTPLockedRC, Exception) as error:
|
|
|
# locked needs different handling since we need to also
|
|
|
# handle PULL operations
|
|
|
exc_tb = ''
|
|
|
if not isinstance(error, HTTPLockedRC):
|
|
|
exc_tb = traceback.format_exc()
|
|
|
log.exception('%sException when handling hook %s', self.log_prefix, hook)
|
|
|
error_args = error.args
|
|
|
return {
|
|
|
'status': 128,
|
|
|
'output': '',
|
|
|
'exception': type(error).__name__,
|
|
|
'exception_traceback': exc_tb,
|
|
|
'exception_args': error_args,
|
|
|
}
|
|
|
finally:
|
|
|
meta.Session.remove()
|
|
|
|
|
|
log.debug('%sGot hook call response %s', self.log_prefix, result)
|
|
|
return {
|
|
|
'status': result.status,
|
|
|
'output': result.output,
|
|
|
}
|
|
|
|
|
|
def __enter__(self):
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
pass
|
|
|
|