diff --git a/rhodecode/lib/hooks_base.py b/rhodecode/lib/hooks_base.py --- a/rhodecode/lib/hooks_base.py +++ b/rhodecode/lib/hooks_base.py @@ -94,11 +94,9 @@ def _get_scm_size(alias, root_path): def repo_size(extras): """Present size of repository after push.""" repo = Repository.get_by_repo_name(extras.repository) - vcs_part = safe_str('.%s' % repo.repo_type) - size_vcs, size_root, size_total = _get_scm_size(vcs_part, - repo.repo_full_path) - msg = ('Repository `%s` size summary %s:%s repo:%s total:%s\n' - % (repo.repo_name, vcs_part, size_vcs, size_root, size_total)) + vcs_part = f'.{repo.repo_type}' + size_vcs, size_root, size_total = _get_scm_size(vcs_part, repo.repo_full_path) + msg = (f'RhodeCode: `{repo.repo_name}` size summary {vcs_part}:{size_vcs} repo:{size_root} total:{size_total}\n') return HookResponse(0, msg) @@ -126,8 +124,8 @@ def pre_push(extras): hook_response = '' if not is_shadow_repo(extras): + if extras.commit_ids and extras.check_branch_perms: - auth_user = user.AuthUser() repo = Repository.get_by_repo_name(extras.repository) affected_branches = [] @@ -155,11 +153,10 @@ def pre_push(extras): elif branch_perm == 'branch.push' and is_forced is False: continue elif branch_perm == 'branch.push' and is_forced is True: - halt_message = 'Branch `{}` changes rejected by rule {}. ' \ - 'FORCE PUSH FORBIDDEN.'.format(branch_name, rule) + halt_message = f'Branch `{branch_name}` changes rejected by rule {rule}. ' \ + f'FORCE PUSH FORBIDDEN.' else: - halt_message = 'Branch `{}` changes rejected by rule {}.'.format( - branch_name, rule) + halt_message = f'Branch `{branch_name}` changes rejected by rule {rule}.' if halt_message: _http_ret = HTTPBranchProtected(halt_message) @@ -281,7 +278,7 @@ def post_push(extras): # make lock is a tri state False, True, None. We only release lock on False if extras.make_lock is False and not is_shadow_repo(extras): Repository.unlock(Repository.get_by_repo_name(extras.repository)) - msg = 'Released lock on repo `{}`\n'.format(safe_str(extras.repository)) + msg = f'Released lock on repo `{extras.repository}`\n' output += msg if extras.locked_by[0]: @@ -299,12 +296,12 @@ def post_push(extras): safe_str(extras.server_url), safe_str(extras.repository)) for branch_name in extras.new_refs['branches']: - output += 'RhodeCode: open pull request link: {}\n'.format( - tmpl.format(ref_type='branch', ref_name=safe_str(branch_name))) + pr_link = tmpl.format(ref_type='branch', ref_name=safe_str(branch_name)) + output += f'RhodeCode: open pull request link: {pr_link}\n' for book_name in extras.new_refs['bookmarks']: - output += 'RhodeCode: open pull request link: {}\n'.format( - tmpl.format(ref_type='bookmark', ref_name=safe_str(book_name))) + pr_link = tmpl.format(ref_type='bookmark', ref_name=safe_str(book_name)) + output += f'RhodeCode: open pull request link: {pr_link}\n' hook_response = '' if not is_shadow_repo(extras): @@ -319,9 +316,7 @@ def post_push(extras): def _locked_by_explanation(repo_name, user_name, reason): - message = ( - 'Repository `%s` locked by user `%s`. Reason:`%s`' - % (repo_name, user_name, reason)) + message = f'Repository `{repo_name}` locked by user `{user_name}`. Reason:`{reason}`' return message diff --git a/rhodecode/lib/hooks_daemon.py b/rhodecode/lib/hooks_daemon.py --- a/rhodecode/lib/hooks_daemon.py +++ b/rhodecode/lib/hooks_daemon.py @@ -25,6 +25,7 @@ import traceback import threading import socket import msgpack +import gevent from http.server import BaseHTTPRequestHandler from socketserver import TCPServer @@ -43,6 +44,23 @@ log = logging.getLogger(__name__) class HooksHttpHandler(BaseHTTPRequestHandler): + JSON_HOOKS_PROTO = 'json.v1' + MSGPACK_HOOKS_PROTO = 'msgpack.v1' + # starting with RhodeCode 5.0.0 MsgPack is the default, prior it used json + DEFAULT_HOOKS_PROTO = MSGPACK_HOOKS_PROTO + + @classmethod + def serialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO): + if proto == cls.MSGPACK_HOOKS_PROTO: + return msgpack.packb(data) + return json.dumps(data) + + @classmethod + def deserialize_data(cls, data, proto=DEFAULT_HOOKS_PROTO): + if proto == cls.MSGPACK_HOOKS_PROTO: + return msgpack.unpackb(data) + return json.loads(data) + def do_POST(self): hooks_proto, method, extras = self._read_request() log.debug('Handling HooksHttpHandler %s with %s proto', method, hooks_proto) @@ -62,6 +80,7 @@ class HooksHttpHandler(BaseHTTPRequestHa try: hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address)) result = self._call_hook_method(hooks, method, extras) + except Exception as e: exc_tb = traceback.format_exc() result = { @@ -73,26 +92,30 @@ class HooksHttpHandler(BaseHTTPRequestHa def _read_request(self): length = int(self.headers['Content-Length']) - hooks_proto = self.headers.get('rc-hooks-protocol') or 'json.v1' - if hooks_proto == 'msgpack.v1': + # respect sent headers, fallback to OLD proto for compatability + hooks_proto = self.headers.get('rc-hooks-protocol') or self.JSON_HOOKS_PROTO + if hooks_proto == self.MSGPACK_HOOKS_PROTO: # support for new vcsserver msgpack based protocol hooks - data = msgpack.unpackb(self.rfile.read(length), raw=False) + body = self.rfile.read(length) + data = self.deserialize_data(body) else: body = self.rfile.read(length) - data = json.loads(body) + data = self.deserialize_data(body) return hooks_proto, data['method'], data['extras'] def _write_response(self, hooks_proto, result): self.send_response(200) - if hooks_proto == 'msgpack.v1': + if hooks_proto == self.MSGPACK_HOOKS_PROTO: self.send_header("Content-type", "application/msgpack") self.end_headers() - self.wfile.write(msgpack.packb(result)) + data = self.serialize_data(result) + self.wfile.write(data) else: self.send_header("Content-type", "text/json") self.end_headers() - self.wfile.write(json.dumps(result)) + data = self.serialize_data(result) + self.wfile.write(data) def _call_hook_method(self, hooks, method, extras): try: @@ -110,7 +133,7 @@ class HooksHttpHandler(BaseHTTPRequestHa message = format % args log.debug( - "HOOKS: %s - - [%s] %s", self.client_address, + "HOOKS: client=%s - - [%s] %s", self.client_address, self.log_date_time_string(), message) @@ -133,18 +156,25 @@ class ThreadedHookCallbackDaemon(object) _callback_thread = None _daemon = None _done = False + use_gevent = False def __init__(self, txn_id=None, host=None, port=None): self._prepare(txn_id=txn_id, host=host, port=port) + if self.use_gevent: + self._run_func = self._run_gevent + self._stop_func = self._stop_gevent + else: + self._run_func = self._run + self._stop_func = self._stop def __enter__(self): log.debug('Running `%s` callback daemon', self.__class__.__name__) - self._run() + self._run_func() return self def __exit__(self, exc_type, exc_val, exc_tb): log.debug('Exiting `%s` callback daemon', self.__class__.__name__) - self._stop() + self._stop_func() def _prepare(self, txn_id=None, host=None, port=None): raise NotImplementedError() @@ -155,6 +185,12 @@ class ThreadedHookCallbackDaemon(object) def _stop(self): raise NotImplementedError() + def _run_gevent(self): + raise NotImplementedError() + + def _stop_gevent(self): + raise NotImplementedError() + class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon): """ @@ -167,6 +203,8 @@ class HttpHooksCallbackDaemon(ThreadedHo # request and wastes cpu at all other times. POLL_INTERVAL = 0.01 + use_gevent = False + @property def _hook_prefix(self): return 'HOOKS: {} '.format(self.hooks_uri) @@ -203,7 +241,7 @@ class HttpHooksCallbackDaemon(ThreadedHo self._daemon.request = get_current_request() def _run(self): - log.debug("Running event loop of callback daemon in background thread") + log.debug("Running thread-based loop of callback daemon in background") callback_thread = threading.Thread( target=self._daemon.serve_forever, kwargs={'poll_interval': self.POLL_INTERVAL}) @@ -211,6 +249,19 @@ class HttpHooksCallbackDaemon(ThreadedHo callback_thread.start() self._callback_thread = callback_thread + def _run_gevent(self): + log.debug("Running gevent-based loop of callback daemon in background") + # create a new greenlet for the daemon's serve_forever method + callback_greenlet = gevent.spawn( + self._daemon.serve_forever, + poll_interval=self.POLL_INTERVAL) + + # store reference to greenlet + self._callback_greenlet = callback_greenlet + + # switch to this greenlet + gevent.sleep(0.01) + def _stop(self): log.debug("Waiting for background thread to finish.") self._daemon.shutdown() @@ -225,6 +276,29 @@ class HttpHooksCallbackDaemon(ThreadedHo log.debug("Background thread done.") + def _stop_gevent(self): + log.debug("Waiting for background greenlet to finish.") + + # if greenlet exists and is running + if self._callback_greenlet and not self._callback_greenlet.dead: + # shutdown daemon if it exists + if self._daemon: + self._daemon.shutdown() + + # kill the greenlet + self._callback_greenlet.kill() + + self._daemon = None + self._callback_greenlet = None + + if self.txn_id: + txn_id_file = get_txn_id_data_path(self.txn_id) + log.debug('Cleaning up TXN ID %s', txn_id_file) + if os.path.isfile(txn_id_file): + os.remove(txn_id_file) + + log.debug("Background greenlet done.") + def get_txn_id_data_path(txn_id): import rhodecode