##// END OF EJS Templates
Bump version: 5.0.2 → 5.0.3
Bump version: 5.0.2 → 5.0.3

File last commit:

r5298:25044729 default
r5443:6d066abe stable
Show More
hooks_daemon.py
451 lines | 14.9 KiB | text/x-python | PythonLexer
copyrights: updated for 2023
r5088 # Copyright (C) 2010-2023 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
svn: enable hooks and integration framework execution....
r2677 import os
import time
project: added all source files and assets
r1 import logging
svn: enable hooks and integration framework execution....
r2677 import tempfile
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 import traceback
project: added all source files and assets
r1 import threading
hooks: generate bind port before init of TcpServer for better logging.
r4855 import socket
hooks: support v2 hooks protocol using binary msgpack
r4902 import msgpack
hooks: new hook support for python3
r5081 import gevent
svn: enable hooks and integration framework execution....
r2677
py3: 2to3 fixes
r4931 from http.server import BaseHTTPRequestHandler
from socketserver import TCPServer
project: added all source files and assets
r1
dan
integrations: add integration support...
r411 import rhodecode
branch-permissions: handle vcs operations and branch permissions....
r2979 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
dan
db: move Session.remove to outer wsgi layer and also add it...
r669 from rhodecode.model import meta
project: added all source files and assets
r1 from rhodecode.lib import hooks_base
pyramid: ported pyramid routing for events
r2016 from rhodecode.lib.utils2 import AttributeDict
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 from rhodecode.lib.pyramid_utils import get_config
svn: enable hooks and integration framework execution....
r2677 from rhodecode.lib.ext_json import json
caches: fix import of compute_key_from_params function.
r2970 from rhodecode.lib import rc_cache
project: added all source files and assets
r1
log = logging.getLogger(__name__)
class HooksHttpHandler(BaseHTTPRequestHandler):
svn: enable hooks and integration framework execution....
r2677
hooks: new hook support for python3
r5081 JSON_HOOKS_PROTO = 'json.v1'
MSGPACK_HOOKS_PROTO = 'msgpack.v1'
# starting with RhodeCode 5.0.0 MsgPack is the default, prior it used json
DEFAULT_HOOKS_PROTO = MSGPACK_HOOKS_PROTO
@classmethod
def serialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO):
if proto == cls.MSGPACK_HOOKS_PROTO:
return msgpack.packb(data)
return json.dumps(data)
@classmethod
def deserialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO):
if proto == cls.MSGPACK_HOOKS_PROTO:
return msgpack.unpackb(data)
return json.loads(data)
project: added all source files and assets
r1 def do_POST(self):
hooks: support v2 hooks protocol using binary msgpack
r4902 hooks_proto, method, extras = self._read_request()
log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto)
svn: enable hooks and integration framework execution....
r2677 txn_id = getattr(self.server, 'txn_id', None)
if txn_id:
log.debug('Computing TXN_ID based on `%s`:`%s`',
extras['repository'], extras['txn_id'])
caches: fix import of compute_key_from_params function.
r2970 computed_txn_id = rc_cache.utils.compute_key_from_params(
svn: enable hooks and integration framework execution....
r2677 extras['repository'], extras['txn_id'])
if txn_id != computed_txn_id:
raise Exception(
'TXN ID fail: expected {} got {} instead'.format(
txn_id, computed_txn_id))
celery: update how reqquest object is passed arround....
r4878 request = getattr(self.server, 'request', None)
project: added all source files and assets
r1 try:
celery: update how reqquest object is passed arround....
r4878 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
result = self._call_hook_method(hooks, method, extras)
hooks: new hook support for python3
r5081
project: added all source files and assets
r1 except Exception as e:
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 exc_tb = traceback.format_exc()
project: added all source files and assets
r1 result = {
'exception': e.__class__.__name__,
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 'exception_traceback': exc_tb,
project: added all source files and assets
r1 'exception_args': e.args
}
hooks: support v2 hooks protocol using binary msgpack
r4902 self._write_response(hooks_proto, result)
project: added all source files and assets
r1
def _read_request(self):
length = int(self.headers['Content-Length'])
hooks: new hook support for python3
r5081 # respect sent headers, fallback to OLD proto for compatability
hooks_proto = self.headers.get('rc-hooks-protocol') or self.JSON_HOOKS_PROTO
if hooks_proto == self.MSGPACK_HOOKS_PROTO:
hooks: support v2 hooks protocol using binary msgpack
r4902 # support for new vcsserver msgpack based protocol hooks
hooks: new hook support for python3
r5081 body = self.rfile.read(length)
data = self.deserialize_data(body)
hooks: support v2 hooks protocol using binary msgpack
r4902 else:
python3: removed more unicode usage
r4952 body = self.rfile.read(length)
hooks: new hook support for python3
r5081 data = self.deserialize_data(body)
hooks: support v2 hooks protocol using binary msgpack
r4902
return hooks_proto, data['method'], data['extras']
project: added all source files and assets
r1
hooks: support v2 hooks protocol using binary msgpack
r4902 def _write_response(self, hooks_proto, result):
project: added all source files and assets
r1 self.send_response(200)
hooks: new hook support for python3
r5081 if hooks_proto == self.MSGPACK_HOOKS_PROTO:
hooks: support v2 hooks protocol using binary msgpack
r4902 self.send_header("Content-type", "application/msgpack")
self.end_headers()
hooks: new hook support for python3
r5081 data = self.serialize_data(result)
self.wfile.write(data)
hooks: support v2 hooks protocol using binary msgpack
r4902 else:
self.send_header("Content-type", "text/json")
self.end_headers()
hooks: new hook support for python3
r5081 data = self.serialize_data(result)
self.wfile.write(data)
project: added all source files and assets
r1
celery: update how reqquest object is passed arround....
r4878 def _call_hook_method(self, hooks, method, extras):
dan
db: move Session.remove to outer wsgi layer and also add it...
r669 try:
result = getattr(hooks, method)(extras)
finally:
meta.Session.remove()
project: added all source files and assets
r1 return result
def log_message(self, format, *args):
"""
core: removed pyro4 from Enterprise code. Fixes #5198
r1409 This is an overridden method of BaseHTTPRequestHandler which logs using
project: added all source files and assets
r1 logging library instead of writing directly to stderr.
"""
message = format % args
log.debug(
hooks: new hook support for python3
r5081 "HOOKS: client=%s - - [%s] %s", self.client_address,
project: added all source files and assets
r1 self.log_date_time_string(), message)
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 class BaseHooksCallbackDaemon:
"""
Basic context manager for actions that don't require some extra
"""
project: added all source files and assets
r1 def __init__(self):
self.hooks_module = Hooks.__module__
def __enter__(self):
dan
logging: updated hooks deamon logs
r3934 log.debug('Running `%s` callback daemon', self.__class__.__name__)
project: added all source files and assets
r1 return self
def __exit__(self, exc_type, exc_val, exc_tb):
dan
logging: updated hooks deamon logs
r3934 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
project: added all source files and assets
r1
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 class CeleryHooksCallbackDaemon(BaseHooksCallbackDaemon):
"""
Context manger for achieving a compatibility with celery backend
"""
def __init__(self, config):
self.task_queue = config.get('app:main', 'celery.broker_url')
self.task_backend = config.get('app:main', 'celery.result_backend')
project: added all source files and assets
r1 class ThreadedHookCallbackDaemon(object):
_callback_thread = None
_daemon = None
_done = False
hooks: new hook support for python3
r5081 use_gevent = False
project: added all source files and assets
r1
hooks: made the callback host configurable....
r2833 def __init__(self, txn_id=None, host=None, port=None):
hooks-daemon: fixed problem with lost hooks value from .ini file.
r4619 self._prepare(txn_id=txn_id, host=host, port=port)
hooks: new hook support for python3
r5081 if self.use_gevent:
self._run_func = self._run_gevent
self._stop_func = self._stop_gevent
else:
self._run_func = self._run
self._stop_func = self._stop
project: added all source files and assets
r1
def __enter__(self):
dan
logging: updated hooks deamon logs
r3934 log.debug('Running `%s` callback daemon', self.__class__.__name__)
hooks: new hook support for python3
r5081 self._run_func()
project: added all source files and assets
r1 return self
def __exit__(self, exc_type, exc_val, exc_tb):
dan
logging: updated hooks deamon logs
r3934 log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
hooks: new hook support for python3
r5081 self._stop_func()
project: added all source files and assets
r1
hooks: made the callback host configurable....
r2833 def _prepare(self, txn_id=None, host=None, port=None):
project: added all source files and assets
r1 raise NotImplementedError()
def _run(self):
raise NotImplementedError()
def _stop(self):
raise NotImplementedError()
hooks: new hook support for python3
r5081 def _run_gevent(self):
raise NotImplementedError()
def _stop_gevent(self):
raise NotImplementedError()
project: added all source files and assets
r1
class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
"""
Context manager which will run a callback daemon in a background thread.
"""
hooks_uri = None
# From Python docs: Polling reduces our responsiveness to a shutdown
# request and wastes cpu at all other times.
hooks: decrease pool interval to 10ms. For SVN operations and lots of requests...
r2264 POLL_INTERVAL = 0.01
project: added all source files and assets
r1
hooks: new hook support for python3
r5081 use_gevent = False
celery: update how reqquest object is passed arround....
r4878 @property
def _hook_prefix(self):
return 'HOOKS: {} '.format(self.hooks_uri)
hooks: allow to bind to existing hostname automatically if nothing explicitly is set.
r4859 def get_hostname(self):
return socket.gethostname() or '127.0.0.1'
hooks: reserver only high-ports to limit possibility of hitting port from rhodecode stack
r4860 def get_available_port(self, min_port=20000, max_port=65535):
pick_port: unified code for testing/hooks
r4866 from rhodecode.lib.utils2 import get_available_port as _get_port
return _get_port(min_port, max_port)
hooks: generate bind port before init of TcpServer for better logging.
r4855
hooks: made the callback host configurable....
r2833 def _prepare(self, txn_id=None, host=None, port=None):
celery: update how reqquest object is passed arround....
r4878 from pyramid.threadlocal import get_current_request
hooks: allow to bind to existing hostname automatically if nothing explicitly is set.
r4859 if not host or host == "*":
host = self.get_hostname()
if not port:
port = self.get_available_port()
hooks: fix logging info on callback daemon
r4854
hooks: generate bind port before init of TcpServer for better logging.
r4855 server_address = (host, port)
hooks: made the callback host configurable....
r2833 self.hooks_uri = '{}:{}'.format(host, port)
svn: enable hooks and integration framework execution....
r2677 self.txn_id = txn_id
hooks: generate bind port before init of TcpServer for better logging.
r4855 self._done = False
log.debug(
celery: update how reqquest object is passed arround....
r4878 "%s Preparing HTTP callback daemon registering hook object: %s",
self._hook_prefix, HooksHttpHandler)
hooks: generate bind port before init of TcpServer for better logging.
r4855
self._daemon = TCPServer(server_address, HooksHttpHandler)
svn: enable hooks and integration framework execution....
r2677 # inject transaction_id for later verification
self._daemon.txn_id = self.txn_id
hooks: fix logging info on callback daemon
r4854
celery: update how reqquest object is passed arround....
r4878 # pass the WEB app request into daemon
self._daemon.request = get_current_request()
project: added all source files and assets
r1 def _run(self):
hooks: new hook support for python3
r5081 log.debug("Running thread-based loop of callback daemon in background")
project: added all source files and assets
r1 callback_thread = threading.Thread(
target=self._daemon.serve_forever,
kwargs={'poll_interval': self.POLL_INTERVAL})
callback_thread.daemon = True
callback_thread.start()
self._callback_thread = callback_thread
hooks: new hook support for python3
r5081 def _run_gevent(self):
log.debug("Running gevent-based loop of callback daemon in background")
# create a new greenlet for the daemon's serve_forever method
callback_greenlet = gevent.spawn(
self._daemon.serve_forever,
poll_interval=self.POLL_INTERVAL)
# store reference to greenlet
self._callback_greenlet = callback_greenlet
# switch to this greenlet
gevent.sleep(0.01)
project: added all source files and assets
r1 def _stop(self):
log.debug("Waiting for background thread to finish.")
self._daemon.shutdown()
self._callback_thread.join()
self._daemon = None
self._callback_thread = None
svn: enable hooks and integration framework execution....
r2677 if self.txn_id:
txn_id_file = get_txn_id_data_path(self.txn_id)
log.debug('Cleaning up TXN ID %s', txn_id_file)
if os.path.isfile(txn_id_file):
os.remove(txn_id_file)
logging: don't stop logging on "waiting for background thread to finish" This is misleading as we're waiting for something and didn't finalize.
r2263 log.debug("Background thread done.")
project: added all source files and assets
r1
hooks: new hook support for python3
r5081 def _stop_gevent(self):
log.debug("Waiting for background greenlet to finish.")
# if greenlet exists and is running
if self._callback_greenlet and not self._callback_greenlet.dead:
# shutdown daemon if it exists
if self._daemon:
self._daemon.shutdown()
# kill the greenlet
self._callback_greenlet.kill()
self._daemon = None
self._callback_greenlet = None
if self.txn_id:
txn_id_file = get_txn_id_data_path(self.txn_id)
log.debug('Cleaning up TXN ID %s', txn_id_file)
if os.path.isfile(txn_id_file):
os.remove(txn_id_file)
log.debug("Background greenlet done.")
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677 def get_txn_id_data_path(txn_id):
svn: use shared configurable storage for svn_txn_id interception logic.
r3021 import rhodecode
root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
final_dir = os.path.join(root, 'svn_txn_id')
if not os.path.isdir(final_dir):
os.makedirs(final_dir)
return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
svn: enable hooks and integration framework execution....
r2677
def store_txn_id_data(txn_id, data_dict):
if not txn_id:
log.warning('Cannot store txn_id because it is empty')
return
path = get_txn_id_data_path(txn_id)
try:
with open(path, 'wb') as f:
f.write(json.dumps(data_dict))
except Exception:
log.exception('Failed to write txn_id metadata')
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677
def get_txn_id_from_store(txn_id):
"""
Reads txn_id from store and if present returns the data for callback manager
"""
path = get_txn_id_data_path(txn_id)
try:
with open(path, 'rb') as f:
return json.loads(f.read())
except Exception:
return {}
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 def prepare_callback_daemon(extras, protocol, host, txn_id=None):
svn: enable hooks and integration framework execution....
r2677 txn_details = get_txn_id_from_store(txn_id)
port = txn_details.get('port', 0)
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 match protocol:
case 'http':
hooks: made the callback host configurable....
r2833 callback_daemon = HttpHooksCallbackDaemon(
txn_id=txn_id, host=host, port=port)
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 case 'celery':
callback_daemon = CeleryHooksCallbackDaemon(get_config(extras['config']))
case 'local':
callback_daemon = BaseHooksCallbackDaemon()
case _:
Martin Bornhold
vcs: Raise an error in case of unsupported protocol.
r589 log.error('Unsupported callback daemon protocol "%s"', protocol)
raise Exception('Unsupported callback daemon protocol.')
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 extras['hooks_uri'] = getattr(callback_daemon, 'hooks_uri', '')
extras['task_queue'] = getattr(callback_daemon, 'task_queue', '')
extras['task_backend'] = getattr(callback_daemon, 'task_backend', '')
svn: enable hooks and integration framework execution....
r2677 extras['hooks_protocol'] = protocol
extras['time'] = time.time()
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677 # register txn_id
extras['txn_id'] = txn_id
feat(celery-hooks): added all needed changes to support new celery backend, removed DummyHooksCallbackDaemon, updated tests. Fixes: RCCE-55
r5298 log.debug('Prepared a callback daemon: %s',
callback_daemon.__class__.__name__)
project: added all source files and assets
r1 return callback_daemon, extras
class Hooks(object):
"""
Exposes the hooks for remote call backs
"""
celery: update how reqquest object is passed arround....
r4878 def __init__(self, request=None, log_prefix=''):
self.log_prefix = log_prefix
self.request = request
project: added all source files and assets
r1
def repo_size(self, extras):
celery: update how reqquest object is passed arround....
r4878 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.repo_size, extras)
def pre_pull(self, extras):
celery: update how reqquest object is passed arround....
r4878 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.pre_pull, extras)
def post_pull(self, extras):
celery: update how reqquest object is passed arround....
r4878 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.post_pull, extras)
def pre_push(self, extras):
celery: update how reqquest object is passed arround....
r4878 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.pre_push, extras)
def post_push(self, extras):
celery: update how reqquest object is passed arround....
r4878 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.post_push, extras)
def _call_hook(self, hook, extras):
extras = AttributeDict(extras)
ssh-support: enabled full handling of all backends via SSH....
r2187 server_url = extras['server_url']
events: make sure we propagate our dummy request with proper application_url....
r1960
celery: update how reqquest object is passed arround....
r4878 extras.request = self.request
project: added all source files and assets
r1
try:
result = hook(extras)
scm_app: add more debug info when unhandled errors happen on vcsserver.
r3093 if result is None:
raise Exception(
'Failed to obtain hook result from func: {}'.format(hook))
branch-permissions: handle vcs operations and branch permissions....
r2979 except HTTPBranchProtected as handled_error:
# Those special cases doesn't need error reporting. It's a case of
# locked repo or protected branch
result = AttributeDict({
'status': handled_error.code,
'output': handled_error.explanation
})
except (HTTPLockedRC, Exception) as error:
# locked needs different handling since we need to also
# handle PULL operations
exc_tb = ''
if not isinstance(error, HTTPLockedRC):
exc_tb = traceback.format_exc()
celery: update how reqquest object is passed arround....
r4878 log.exception('%sException when handling hook %s', self.log_prefix, hook)
project: added all source files and assets
r1 error_args = error.args
return {
'status': 128,
'output': '',
'exception': type(error).__name__,
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 'exception_traceback': exc_tb,
project: added all source files and assets
r1 'exception_args': error_args,
}
dan
integrations: add integration support...
r411 finally:
dan
pyro: remove db Session when callback finishes to avoid leaving...
r670 meta.Session.remove()
dan
integrations: add integration support...
r411
celery: update how reqquest object is passed arround....
r4878 log.debug('%sGot hook call response %s', self.log_prefix, result)
project: added all source files and assets
r1 return {
'status': result.status,
'output': result.output,
}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass