##// END OF EJS Templates
chore(release): merged default into stable branch
chore(release): merged default into stable branch

File last commit:

r5459:7f730862 default
r5558:77c2b736 merge v5.3.0 stable
Show More
http_hooks_deamon.py
287 lines | 9.6 KiB | text/x-python | PythonLexer
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 # Copyright (C) 2010-2023 RhodeCode GmbH
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
import os
import logging
import traceback
import threading
import socket
import msgpack
import gevent
from http.server import BaseHTTPRequestHandler
from socketserver import TCPServer
from rhodecode.model import meta
from rhodecode.lib.ext_json import json
from rhodecode.lib import rc_cache
fix(svn): svn events fixes and change the way how we handle the events
r5459 from rhodecode.lib.svn_txn_utils import get_txn_id_data_key
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 from rhodecode.lib.hook_daemon.hook_module import Hooks
log = logging.getLogger(__name__)
class HooksHttpHandler(BaseHTTPRequestHandler):
JSON_HOOKS_PROTO = 'json.v1'
MSGPACK_HOOKS_PROTO = 'msgpack.v1'
# starting with RhodeCode 5.0.0 MsgPack is the default, prior it used json
DEFAULT_HOOKS_PROTO = MSGPACK_HOOKS_PROTO
@classmethod
def serialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO):
if proto == cls.MSGPACK_HOOKS_PROTO:
return msgpack.packb(data)
return json.dumps(data)
@classmethod
def deserialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO):
if proto == cls.MSGPACK_HOOKS_PROTO:
return msgpack.unpackb(data)
return json.loads(data)
def do_POST(self):
hooks_proto, method, extras = self._read_request()
log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto)
txn_id = getattr(self.server, 'txn_id', None)
if txn_id:
log.debug('Computing TXN_ID based on `%s`:`%s`',
extras['repository'], extras['txn_id'])
computed_txn_id = rc_cache.utils.compute_key_from_params(
extras['repository'], extras['txn_id'])
if txn_id != computed_txn_id:
raise Exception(
'TXN ID fail: expected {} got {} instead'.format(
txn_id, computed_txn_id))
request = getattr(self.server, 'request', None)
try:
hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
result = self._call_hook_method(hooks, method, extras)
except Exception as e:
exc_tb = traceback.format_exc()
result = {
'exception': e.__class__.__name__,
'exception_traceback': exc_tb,
'exception_args': e.args
}
self._write_response(hooks_proto, result)
def _read_request(self):
length = int(self.headers['Content-Length'])
# respect sent headers, fallback to OLD proto for compatability
hooks_proto = self.headers.get('rc-hooks-protocol') or self.JSON_HOOKS_PROTO
if hooks_proto == self.MSGPACK_HOOKS_PROTO:
# support for new vcsserver msgpack based protocol hooks
body = self.rfile.read(length)
data = self.deserialize_data(body)
else:
body = self.rfile.read(length)
data = self.deserialize_data(body)
return hooks_proto, data['method'], data['extras']
def _write_response(self, hooks_proto, result):
self.send_response(200)
if hooks_proto == self.MSGPACK_HOOKS_PROTO:
self.send_header("Content-type", "application/msgpack")
self.end_headers()
data = self.serialize_data(result)
self.wfile.write(data)
else:
self.send_header("Content-type", "text/json")
self.end_headers()
data = self.serialize_data(result)
self.wfile.write(data)
def _call_hook_method(self, hooks, method, extras):
try:
result = getattr(hooks, method)(extras)
finally:
meta.Session.remove()
return result
def log_message(self, format, *args):
"""
This is an overridden method of BaseHTTPRequestHandler which logs using
a logging library instead of writing directly to stderr.
"""
message = format % args
log.debug(
"HOOKS: client=%s - - [%s] %s", self.client_address,
self.log_date_time_string(), message)
class ThreadedHookCallbackDaemon(object):
_callback_thread = None
_daemon = None
_done = False
use_gevent = False
def __init__(self, txn_id=None, host=None, port=None):
self._prepare(txn_id=txn_id, host=host, port=port)
if self.use_gevent:
self._run_func = self._run_gevent
self._stop_func = self._stop_gevent
else:
self._run_func = self._run
self._stop_func = self._stop
def __enter__(self):
log.debug('Running `%s` callback daemon', self.__class__.__name__)
self._run_func()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
log.debug('Exiting `%s` callback daemon', self.__class__.__name__)
self._stop_func()
def _prepare(self, txn_id=None, host=None, port=None):
raise NotImplementedError()
def _run(self):
raise NotImplementedError()
def _stop(self):
raise NotImplementedError()
def _run_gevent(self):
raise NotImplementedError()
def _stop_gevent(self):
raise NotImplementedError()
class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
"""
Context manager which will run a callback daemon in a background thread.
"""
hooks_uri = None
# From Python docs: Polling reduces our responsiveness to a shutdown
# request and wastes cpu at all other times.
POLL_INTERVAL = 0.01
use_gevent = False
fix(svn): svn events fixes and change the way how we handle the events
r5459 def __repr__(self):
return f'HttpHooksCallbackDaemon(hooks_uri={self.hooks_uri})'
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 @property
def _hook_prefix(self):
fix(svn): svn events fixes and change the way how we handle the events
r5459 return f'HOOKS: {self.hooks_uri} '
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325
def get_hostname(self):
return socket.gethostname() or '127.0.0.1'
def get_available_port(self, min_port=20000, max_port=65535):
from rhodecode.lib.utils2 import get_available_port as _get_port
return _get_port(min_port, max_port)
def _prepare(self, txn_id=None, host=None, port=None):
from pyramid.threadlocal import get_current_request
if not host or host == "*":
host = self.get_hostname()
if not port:
port = self.get_available_port()
server_address = (host, port)
fix(svn): svn events fixes and change the way how we handle the events
r5459 self.hooks_uri = f'{host}:{port}'
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 self.txn_id = txn_id
self._done = False
log.debug(
"%s Preparing HTTP callback daemon registering hook object: %s",
self._hook_prefix, HooksHttpHandler)
self._daemon = TCPServer(server_address, HooksHttpHandler)
# inject transaction_id for later verification
self._daemon.txn_id = self.txn_id
# pass the WEB app request into daemon
self._daemon.request = get_current_request()
def _run(self):
log.debug("Running thread-based loop of callback daemon in background")
callback_thread = threading.Thread(
target=self._daemon.serve_forever,
kwargs={'poll_interval': self.POLL_INTERVAL})
callback_thread.daemon = True
callback_thread.start()
self._callback_thread = callback_thread
def _run_gevent(self):
log.debug("Running gevent-based loop of callback daemon in background")
# create a new greenlet for the daemon's serve_forever method
callback_greenlet = gevent.spawn(
self._daemon.serve_forever,
poll_interval=self.POLL_INTERVAL)
# store reference to greenlet
self._callback_greenlet = callback_greenlet
# switch to this greenlet
gevent.sleep(0.01)
def _stop(self):
log.debug("Waiting for background thread to finish.")
self._daemon.shutdown()
self._callback_thread.join()
self._daemon = None
self._callback_thread = None
if self.txn_id:
fix(svn): svn events fixes and change the way how we handle the events
r5459 #TODO: figure out the repo_path...
repo_path = ''
txn_id_file = get_txn_id_data_key(repo_path, self.txn_id)
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 log.debug('Cleaning up TXN ID %s', txn_id_file)
if os.path.isfile(txn_id_file):
os.remove(txn_id_file)
log.debug("Background thread done.")
def _stop_gevent(self):
log.debug("Waiting for background greenlet to finish.")
# if greenlet exists and is running
if self._callback_greenlet and not self._callback_greenlet.dead:
# shutdown daemon if it exists
if self._daemon:
self._daemon.shutdown()
# kill the greenlet
self._callback_greenlet.kill()
self._daemon = None
self._callback_greenlet = None
if self.txn_id:
fix(svn): svn events fixes and change the way how we handle the events
r5459 #TODO: figure out the repo_path...
repo_path = ''
txn_id_file = get_txn_id_data_key(repo_path, self.txn_id)
feat(ssh-wrapper-speedup): major rewrite of code to address imports problem with ssh-wrapper-v2...
r5325 log.debug('Cleaning up TXN ID %s', txn_id_file)
if os.path.isfile(txn_id_file):
os.remove(txn_id_file)
log.debug("Background greenlet done.")