##// END OF EJS Templates
pull-requests: increase stability of concurrent pull requests creation by flushing prematurly the statuses of commits....
pull-requests: increase stability of concurrent pull requests creation by flushing prematurly the statuses of commits. This is required to increase the versions on each concurrent call. Otherwise we could get into an integrity errors of commitsha+version+repo

File last commit:

r3363:f08e98b1 default
r3368:a4f559a8 default
Show More
hooks_daemon.py
332 lines | 10.6 KiB | text/x-python | PythonLexer
project: added all source files and assets
r1 # -*- coding: utf-8 -*-
docs: updated copyrights to 2019
r3363 # Copyright (C) 2010-2019 RhodeCode GmbH
project: added all source files and assets
r1 #
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, version 3
# (only), as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This program is dual-licensed. If you wish to learn more about the
# RhodeCode Enterprise Edition, including its added features, Support services,
# and proprietary license terms, please see https://rhodecode.com/licenses/
svn: enable hooks and integration framework execution....
r2677 import os
import time
project: added all source files and assets
r1 import logging
svn: enable hooks and integration framework execution....
r2677 import tempfile
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 import traceback
project: added all source files and assets
r1 import threading
svn: enable hooks and integration framework execution....
r2677
project: added all source files and assets
r1 from BaseHTTPServer import BaseHTTPRequestHandler
from SocketServer import TCPServer
dan
integrations: add integration support...
r411 import rhodecode
branch-permissions: handle vcs operations and branch permissions....
r2979 from rhodecode.lib.exceptions import HTTPLockedRC, HTTPBranchProtected
dan
db: move Session.remove to outer wsgi layer and also add it...
r669 from rhodecode.model import meta
events: fixed routing problems with new bootstrap_request logic.
r2335 from rhodecode.lib.base import bootstrap_request, bootstrap_config
project: added all source files and assets
r1 from rhodecode.lib import hooks_base
pyramid: ported pyramid routing for events
r2016 from rhodecode.lib.utils2 import AttributeDict
svn: enable hooks and integration framework execution....
r2677 from rhodecode.lib.ext_json import json
caches: fix import of compute_key_from_params function.
r2970 from rhodecode.lib import rc_cache
project: added all source files and assets
r1
log = logging.getLogger(__name__)
class HooksHttpHandler(BaseHTTPRequestHandler):
svn: enable hooks and integration framework execution....
r2677
project: added all source files and assets
r1 def do_POST(self):
method, extras = self._read_request()
svn: enable hooks and integration framework execution....
r2677 txn_id = getattr(self.server, 'txn_id', None)
if txn_id:
log.debug('Computing TXN_ID based on `%s`:`%s`',
extras['repository'], extras['txn_id'])
caches: fix import of compute_key_from_params function.
r2970 computed_txn_id = rc_cache.utils.compute_key_from_params(
svn: enable hooks and integration framework execution....
r2677 extras['repository'], extras['txn_id'])
if txn_id != computed_txn_id:
raise Exception(
'TXN ID fail: expected {} got {} instead'.format(
txn_id, computed_txn_id))
project: added all source files and assets
r1 try:
result = self._call_hook(method, extras)
except Exception as e:
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 exc_tb = traceback.format_exc()
project: added all source files and assets
r1 result = {
'exception': e.__class__.__name__,
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 'exception_traceback': exc_tb,
project: added all source files and assets
r1 'exception_args': e.args
}
self._write_response(result)
def _read_request(self):
length = int(self.headers['Content-Length'])
body = self.rfile.read(length).decode('utf-8')
data = json.loads(body)
return data['method'], data['extras']
def _write_response(self, result):
self.send_response(200)
self.send_header("Content-type", "text/json")
self.end_headers()
self.wfile.write(json.dumps(result))
def _call_hook(self, method, extras):
hooks = Hooks()
dan
db: move Session.remove to outer wsgi layer and also add it...
r669 try:
result = getattr(hooks, method)(extras)
finally:
meta.Session.remove()
project: added all source files and assets
r1 return result
def log_message(self, format, *args):
"""
core: removed pyro4 from Enterprise code. Fixes #5198
r1409 This is an overridden method of BaseHTTPRequestHandler which logs using
project: added all source files and assets
r1 logging library instead of writing directly to stderr.
"""
message = format % args
log.debug(
"%s - - [%s] %s", self.client_address[0],
self.log_date_time_string(), message)
class DummyHooksCallbackDaemon(object):
svn: enable hooks and integration framework execution....
r2677 hooks_uri = ''
project: added all source files and assets
r1 def __init__(self):
self.hooks_module = Hooks.__module__
def __enter__(self):
log.debug('Running dummy hooks callback daemon')
return self
def __exit__(self, exc_type, exc_val, exc_tb):
log.debug('Exiting dummy hooks callback daemon')
class ThreadedHookCallbackDaemon(object):
_callback_thread = None
_daemon = None
_done = False
hooks: made the callback host configurable....
r2833 def __init__(self, txn_id=None, host=None, port=None):
self._prepare(txn_id=txn_id, host=None, port=port)
project: added all source files and assets
r1
def __enter__(self):
self._run()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
hooks: added debug logs.
r2135 log.debug('Callback daemon exiting now...')
project: added all source files and assets
r1 self._stop()
hooks: made the callback host configurable....
r2833 def _prepare(self, txn_id=None, host=None, port=None):
project: added all source files and assets
r1 raise NotImplementedError()
def _run(self):
raise NotImplementedError()
def _stop(self):
raise NotImplementedError()
class HttpHooksCallbackDaemon(ThreadedHookCallbackDaemon):
"""
Context manager which will run a callback daemon in a background thread.
"""
hooks_uri = None
# From Python docs: Polling reduces our responsiveness to a shutdown
# request and wastes cpu at all other times.
hooks: decrease pool interval to 10ms. For SVN operations and lots of requests...
r2264 POLL_INTERVAL = 0.01
project: added all source files and assets
r1
hooks: made the callback host configurable....
r2833 def _prepare(self, txn_id=None, host=None, port=None):
host = host or '127.0.0.1'
project: added all source files and assets
r1 self._done = False
hooks: made the callback host configurable....
r2833 self._daemon = TCPServer((host, port or 0), HooksHttpHandler)
project: added all source files and assets
r1 _, port = self._daemon.server_address
hooks: made the callback host configurable....
r2833 self.hooks_uri = '{}:{}'.format(host, port)
svn: enable hooks and integration framework execution....
r2677 self.txn_id = txn_id
# inject transaction_id for later verification
self._daemon.txn_id = self.txn_id
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677 log.debug(
"Preparing HTTP callback daemon at `%s` and registering hook object",
self.hooks_uri)
project: added all source files and assets
r1
def _run(self):
log.debug("Running event loop of callback daemon in background thread")
callback_thread = threading.Thread(
target=self._daemon.serve_forever,
kwargs={'poll_interval': self.POLL_INTERVAL})
callback_thread.daemon = True
callback_thread.start()
self._callback_thread = callback_thread
def _stop(self):
log.debug("Waiting for background thread to finish.")
self._daemon.shutdown()
self._callback_thread.join()
self._daemon = None
self._callback_thread = None
svn: enable hooks and integration framework execution....
r2677 if self.txn_id:
txn_id_file = get_txn_id_data_path(self.txn_id)
log.debug('Cleaning up TXN ID %s', txn_id_file)
if os.path.isfile(txn_id_file):
os.remove(txn_id_file)
logging: don't stop logging on "waiting for background thread to finish" This is misleading as we're waiting for something and didn't finalize.
r2263 log.debug("Background thread done.")
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677 def get_txn_id_data_path(txn_id):
svn: use shared configurable storage for svn_txn_id interception logic.
r3021 import rhodecode
root = rhodecode.CONFIG.get('cache_dir') or tempfile.gettempdir()
final_dir = os.path.join(root, 'svn_txn_id')
if not os.path.isdir(final_dir):
os.makedirs(final_dir)
return os.path.join(final_dir, 'rc_txn_id_{}'.format(txn_id))
svn: enable hooks and integration framework execution....
r2677
def store_txn_id_data(txn_id, data_dict):
if not txn_id:
log.warning('Cannot store txn_id because it is empty')
return
path = get_txn_id_data_path(txn_id)
try:
with open(path, 'wb') as f:
f.write(json.dumps(data_dict))
except Exception:
log.exception('Failed to write txn_id metadata')
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677
def get_txn_id_from_store(txn_id):
"""
Reads txn_id from store and if present returns the data for callback manager
"""
path = get_txn_id_data_path(txn_id)
try:
with open(path, 'rb') as f:
return json.loads(f.read())
except Exception:
return {}
hooks: made the callback host configurable....
r2833 def prepare_callback_daemon(extras, protocol, host, use_direct_calls, txn_id=None):
svn: enable hooks and integration framework execution....
r2677 txn_details = get_txn_id_from_store(txn_id)
port = txn_details.get('port', 0)
project: added all source files and assets
r1 if use_direct_calls:
callback_daemon = DummyHooksCallbackDaemon()
extras['hooks_module'] = callback_daemon.hooks_module
else:
core: removed pyro4 from Enterprise code. Fixes #5198
r1409 if protocol == 'http':
hooks: made the callback host configurable....
r2833 callback_daemon = HttpHooksCallbackDaemon(
txn_id=txn_id, host=host, port=port)
Martin Bornhold
vcs: Raise an error in case of unsupported protocol.
r589 else:
log.error('Unsupported callback daemon protocol "%s"', protocol)
raise Exception('Unsupported callback daemon protocol.')
svn: enable hooks and integration framework execution....
r2677 extras['hooks_uri'] = callback_daemon.hooks_uri
extras['hooks_protocol'] = protocol
extras['time'] = time.time()
project: added all source files and assets
r1
svn: enable hooks and integration framework execution....
r2677 # register txn_id
extras['txn_id'] = txn_id
log.debug('Prepared a callback daemon: %s at url `%s`',
callback_daemon.__class__.__name__, callback_daemon.hooks_uri)
project: added all source files and assets
r1 return callback_daemon, extras
class Hooks(object):
"""
Exposes the hooks for remote call backs
"""
def repo_size(self, extras):
pyramid: ported pyramid routing for events
r2016 log.debug("Called repo_size of %s object", self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.repo_size, extras)
def pre_pull(self, extras):
pyramid: ported pyramid routing for events
r2016 log.debug("Called pre_pull of %s object", self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.pre_pull, extras)
def post_pull(self, extras):
pyramid: ported pyramid routing for events
r2016 log.debug("Called post_pull of %s object", self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.post_pull, extras)
def pre_push(self, extras):
pyramid: ported pyramid routing for events
r2016 log.debug("Called pre_push of %s object", self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.pre_push, extras)
def post_push(self, extras):
pyramid: ported pyramid routing for events
r2016 log.debug("Called post_push of %s object", self)
project: added all source files and assets
r1 return self._call_hook(hooks_base.post_push, extras)
def _call_hook(self, hook, extras):
extras = AttributeDict(extras)
ssh-support: enabled full handling of all backends via SSH....
r2187 server_url = extras['server_url']
events: fixed routing problems with new bootstrap_request logic.
r2335 request = bootstrap_request(application_url=server_url)
events: make sure we propagate our dummy request with proper application_url....
r1960
events: fixed routing problems with new bootstrap_request logic.
r2335 bootstrap_config(request) # inject routes and other interfaces
hooks: inject request.user for proper url generation in async methods....
r2418
# inject the user for usage in hooks
request.user = AttributeDict({'username': extras.username,
'ip_addr': extras.ip,
'user_id': extras.user_id})
events: fixed routing problems with new bootstrap_request logic.
r2335 extras.request = request
project: added all source files and assets
r1
try:
result = hook(extras)
scm_app: add more debug info when unhandled errors happen on vcsserver.
r3093 if result is None:
raise Exception(
'Failed to obtain hook result from func: {}'.format(hook))
branch-permissions: handle vcs operations and branch permissions....
r2979 except HTTPBranchProtected as handled_error:
# Those special cases doesn't need error reporting. It's a case of
# locked repo or protected branch
result = AttributeDict({
'status': handled_error.code,
'output': handled_error.explanation
})
except (HTTPLockedRC, Exception) as error:
# locked needs different handling since we need to also
# handle PULL operations
exc_tb = ''
if not isinstance(error, HTTPLockedRC):
exc_tb = traceback.format_exc()
log.exception('Exception when handling hook %s', hook)
project: added all source files and assets
r1 error_args = error.args
return {
'status': 128,
'output': '',
'exception': type(error).__name__,
exception-handling: propagate hooks tracebacks to vcsserver for easier debugging.
r1458 'exception_traceback': exc_tb,
project: added all source files and assets
r1 'exception_args': error_args,
}
dan
integrations: add integration support...
r411 finally:
dan
pyro: remove db Session when callback finishes to avoid leaving...
r670 meta.Session.remove()
dan
integrations: add integration support...
r411
ssh-support: enabled full handling of all backends via SSH....
r2187 log.debug('Got hook call response %s', result)
project: added all source files and assets
r1 return {
'status': result.status,
'output': result.output,
}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass