subscribers.py
399 lines
| 12.4 KiB
| text/x-python
|
PythonLexer
/ rhodecode / subscribers.py
r5088 | # Copyright (C) 2010-2023 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r1538 | import io | |||
r4582 | import shlex | |||
r4473 | import math | |||
r1538 | import re | |||
r3132 | import os | |||
r1392 | import datetime | |||
Martin Bornhold
|
r1019 | import logging | ||
r4934 | import queue | |||
r4926 | import subprocess | |||
r51 | ||||
r2488 | ||||
from dateutil.parser import parse | ||||
r1538 | from pyramid.interfaces import IRoutesMapper | |||
from pyramid.settings import asbool | ||||
from pyramid.path import AssetResolver | ||||
Martin Bornhold
|
r1019 | from threading import Thread | ||
r1 | ||||
r1538 | from rhodecode.config.jsroutes import generate_jsroutes_content | |||
r2351 | from rhodecode.lib.base import get_auth_user | |||
r5062 | from rhodecode.lib.celerylib.loader import set_celery_conf | |||
r2351 | ||||
r1309 | import rhodecode | |||
Martin Bornhold
|
r1019 | log = logging.getLogger(__name__) | ||
r1 | def add_renderer_globals(event): | |||
r1906 | from rhodecode.lib import helpers | |||
r21 | # TODO: When executed in pyramid view context the request is not available | |||
# in the event. Find a better solution to get the request. | ||||
r5062 | from pyramid.threadlocal import get_current_request | |||
r21 | request = event['request'] or get_current_request() | |||
r1 | # Add Pyramid translation as '_' to context | |||
event['_'] = request.translate | ||||
r1304 | event['_ungettext'] = request.plularize | |||
r1906 | event['h'] = helpers | |||
r1 | ||||
r1307 | def set_user_lang(event): | |||
r1309 | request = event.request | |||
cur_user = getattr(request, 'user', None) | ||||
r1307 | ||||
if cur_user: | ||||
user_lang = cur_user.get_instance().user_data.get('language') | ||||
if user_lang: | ||||
r1308 | log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang) | |||
r1307 | event.request._LOCALE_ = user_lang | |||
r4878 | def update_celery_conf(event): | |||
log.debug('Setting celery config from new request') | ||||
set_celery_conf(request=event.request, registry=event.request.registry) | ||||
r1903 | def add_request_user_context(event): | |||
""" | ||||
Adds auth user into request context | ||||
""" | ||||
r5062 | ||||
r1903 | request = event.request | |||
r2794 | # access req_id as soon as possible | |||
req_id = request.req_id | ||||
r1903 | ||||
if hasattr(request, 'vcs_call'): | ||||
# skip vcs calls | ||||
return | ||||
if hasattr(request, 'rpc_method'): | ||||
# skip api calls | ||||
return | ||||
r4002 | auth_user, auth_token = get_auth_user(request) | |||
r1903 | request.user = auth_user | |||
r4002 | request.user_auth_token = auth_token | |||
r1903 | request.environ['rc_auth_user'] = auth_user | |||
r5008 | request.environ['rc_auth_user_id'] = str(auth_user.user_id) | |||
r2794 | request.environ['rc_req_id'] = req_id | |||
r1903 | ||||
r2849 | ||||
r4768 | def reset_log_bucket(event): | |||
""" | ||||
reset the log bucket on new request | ||||
""" | ||||
request = event.request | ||||
request.req_id_records_init() | ||||
Martin Bornhold
|
r580 | def scan_repositories_if_enabled(event): | ||
""" | ||||
This is subscribed to the `pyramid.events.ApplicationCreated` event. It | ||||
does a repository scan if enabled in the settings. | ||||
""" | ||||
settings = event.app.registry.settings | ||||
vcs_server_enabled = settings['vcs.server.enable'] | ||||
import_on_startup = settings['startup.import_repos'] | ||||
if vcs_server_enabled and import_on_startup: | ||||
r1680 | from rhodecode.model.scm import ScmModel | |||
r5356 | from rhodecode.lib.utils import repo2db_mapper | |||
scm = ScmModel() | ||||
repositories = scm.repo_scan(scm.repos_path) | ||||
Martin Bornhold
|
r580 | repo2db_mapper(repositories, remove_obsolete=False) | ||
Martin Bornhold
|
r1019 | |||
r1392 | def write_metadata_if_needed(event): | |||
""" | ||||
Writes upgrade metadata | ||||
""" | ||||
import rhodecode | ||||
from rhodecode.lib import system_info | ||||
from rhodecode.lib import ext_json | ||||
r2488 | fname = '.rcmetadata.json' | |||
ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__')) | ||||
metadata_destination = os.path.join(ini_loc, fname) | ||||
def get_update_age(): | ||||
now = datetime.datetime.utcnow() | ||||
with open(metadata_destination, 'rb') as f: | ||||
data = ext_json.json.loads(f.read()) | ||||
if 'created_on' in data: | ||||
update_date = parse(data['created_on']) | ||||
diff = now - update_date | ||||
return diff.total_seconds() / 60.0 | ||||
return 0 | ||||
r1392 | def write(): | |||
r1524 | configuration = system_info.SysInfo( | |||
system_info.rhodecode_config)()['value'] | ||||
license_token = configuration['config']['license_token'] | ||||
r2488 | ||||
setup = dict( | ||||
workers=configuration['config']['server:main'].get( | ||||
'workers', '?'), | ||||
worker_type=configuration['config']['server:main'].get( | ||||
'worker_class', 'sync'), | ||||
) | ||||
r1392 | dbinfo = system_info.SysInfo(system_info.database_info)()['value'] | |||
del dbinfo['url'] | ||||
r2488 | ||||
r1392 | metadata = dict( | |||
desc='upgrade metadata info', | ||||
r1524 | license_token=license_token, | |||
r1392 | created_on=datetime.datetime.utcnow().isoformat(), | |||
usage=system_info.SysInfo(system_info.usage_info)()['value'], | ||||
platform=system_info.SysInfo(system_info.platform_type)()['value'], | ||||
database=dbinfo, | ||||
cpu=system_info.SysInfo(system_info.cpu)()['value'], | ||||
memory=system_info.SysInfo(system_info.memory)()['value'], | ||||
r2488 | setup=setup | |||
r1392 | ) | |||
with open(metadata_destination, 'wb') as f: | ||||
f.write(ext_json.json.dumps(metadata)) | ||||
r1681 | settings = event.app.registry.settings | |||
if settings.get('metadata.skip'): | ||||
return | ||||
r2488 | # only write this every 24h, workers restart caused unwanted delays | |||
try: | ||||
age_in_min = get_update_age() | ||||
except Exception: | ||||
age_in_min = 0 | ||||
r2539 | if age_in_min > 60 * 60 * 24: | |||
r2488 | return | |||
r1392 | try: | |||
write() | ||||
except Exception: | ||||
pass | ||||
r4473 | def write_usage_data(event): | |||
import rhodecode | ||||
from rhodecode.lib import system_info | ||||
from rhodecode.lib import ext_json | ||||
settings = event.app.registry.settings | ||||
instance_tag = settings.get('metadata.write_usage_tag') | ||||
if not settings.get('metadata.write_usage'): | ||||
return | ||||
def get_update_age(dest_file): | ||||
now = datetime.datetime.utcnow() | ||||
with open(dest_file, 'rb') as f: | ||||
data = ext_json.json.loads(f.read()) | ||||
if 'created_on' in data: | ||||
update_date = parse(data['created_on']) | ||||
diff = now - update_date | ||||
return math.ceil(diff.total_seconds() / 60.0) | ||||
return 0 | ||||
utc_date = datetime.datetime.utcnow() | ||||
hour_quarter = int(math.ceil((utc_date.hour + utc_date.minute/60.0) / 6.)) | ||||
fname = '.rc_usage_{date.year}{date.month:02d}{date.day:02d}_{hour}.json'.format( | ||||
date=utc_date, hour=hour_quarter) | ||||
ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__')) | ||||
usage_dir = os.path.join(ini_loc, '.rcusage') | ||||
if not os.path.isdir(usage_dir): | ||||
os.makedirs(usage_dir) | ||||
usage_metadata_destination = os.path.join(usage_dir, fname) | ||||
try: | ||||
age_in_min = get_update_age(usage_metadata_destination) | ||||
except Exception: | ||||
age_in_min = 0 | ||||
# write every 6th hour | ||||
if age_in_min and age_in_min < 60 * 6: | ||||
r4610 | log.debug('Usage file created %s minutes ago, skipping (threshold: %s minutes)...', | |||
r4473 | age_in_min, 60 * 6) | |||
return | ||||
def write(dest_file): | ||||
configuration = system_info.SysInfo(system_info.rhodecode_config)()['value'] | ||||
license_token = configuration['config']['license_token'] | ||||
metadata = dict( | ||||
desc='Usage data', | ||||
instance_tag=instance_tag, | ||||
license_token=license_token, | ||||
created_on=datetime.datetime.utcnow().isoformat(), | ||||
usage=system_info.SysInfo(system_info.usage_info)()['value'], | ||||
) | ||||
with open(dest_file, 'wb') as f: | ||||
r4974 | f.write(ext_json.formatted_json(metadata)) | |||
r4473 | ||||
try: | ||||
log.debug('Writing usage file at: %s', usage_metadata_destination) | ||||
write(usage_metadata_destination) | ||||
except Exception: | ||||
pass | ||||
r1538 | def write_js_routes_if_enabled(event): | |||
registry = event.app.registry | ||||
mapper = registry.queryUtility(IRoutesMapper) | ||||
r4973 | _argument_prog = re.compile(r'\{(.*?)\}|:\((.*)\)') | |||
r1538 | ||||
def _extract_route_information(route): | ||||
""" | ||||
Convert a route into tuple(name, path, args), eg: | ||||
('show_user', '/profile/%(username)s', ['username']) | ||||
""" | ||||
r5257 | route_path = route.pattern | |||
r1538 | pattern = route.pattern | |||
def replace(matchobj): | ||||
if matchobj.group(1): | ||||
return "%%(%s)s" % matchobj.group(1).split(':')[0] | ||||
else: | ||||
return "%%(%s)s" % matchobj.group(2) | ||||
r5257 | route_path = _argument_prog.sub(replace, route_path) | |||
r1538 | ||||
r5257 | if not route_path.startswith('/'): | |||
route_path = f'/{route_path}' | ||||
r1884 | ||||
r1538 | return ( | |||
route.name, | ||||
r5257 | route_path, | |||
r1538 | [(arg[0].split(':')[0] if arg[0] != '' else arg[1]) | |||
r5257 | for arg in _argument_prog.findall(pattern)] | |||
r1538 | ) | |||
def get_routes(): | ||||
# pyramid routes | ||||
for route in mapper.get_routes(): | ||||
if not route.name.startswith('__'): | ||||
yield _extract_route_information(route) | ||||
if asbool(registry.settings.get('generate_js_files', 'false')): | ||||
static_path = AssetResolver().resolve('rhodecode:public').abspath() | ||||
jsroutes = get_routes() | ||||
jsroutes_file_content = generate_jsroutes_content(jsroutes) | ||||
jsroutes_file_path = os.path.join( | ||||
static_path, 'js', 'rhodecode', 'routes.js') | ||||
r2823 | try: | |||
r5062 | with open(jsroutes_file_path, 'w', encoding='utf-8') as f: | |||
r2823 | f.write(jsroutes_file_content) | |||
r5257 | log.debug('generated JS files in %s', jsroutes_file_path) | |||
r2823 | except Exception: | |||
log.exception('Failed to write routes.js into %s', jsroutes_file_path) | ||||
r1538 | ||||
Martin Bornhold
|
r1019 | class Subscriber(object): | ||
Martin Bornhold
|
r1020 | """ | ||
Base class for subscribers to the pyramid event system. | ||||
""" | ||||
Martin Bornhold
|
r1019 | def __call__(self, event): | ||
self.run(event) | ||||
def run(self, event): | ||||
raise NotImplementedError('Subclass has to implement this.') | ||||
class AsyncSubscriber(Subscriber): | ||||
Martin Bornhold
|
r1020 | """ | ||
Subscriber that handles the execution of events in a separate task to not | ||||
block the execution of the code which triggers the event. It puts the | ||||
received events into a queue from which the worker process takes them in | ||||
order. | ||||
""" | ||||
Martin Bornhold
|
r1019 | def __init__(self): | ||
self._stop = False | ||||
r4934 | self._eventq = queue.Queue() | |||
Martin Bornhold
|
r1019 | self._worker = self.create_worker() | ||
self._worker.start() | ||||
def __call__(self, event): | ||||
self._eventq.put(event) | ||||
def create_worker(self): | ||||
worker = Thread(target=self.do_work) | ||||
worker.daemon = True | ||||
return worker | ||||
def stop_worker(self): | ||||
self._stop = False | ||||
self._eventq.put(None) | ||||
self._worker.join() | ||||
def do_work(self): | ||||
while not self._stop: | ||||
event = self._eventq.get() | ||||
if event is not None: | ||||
self.run(event) | ||||
class AsyncSubprocessSubscriber(AsyncSubscriber): | ||||
Martin Bornhold
|
r1020 | """ | ||
r4926 | Subscriber that uses the subprocess module to execute a command if an | |||
r4582 | event is received. Events are handled asynchronously:: | |||
subscriber = AsyncSubprocessSubscriber('ls -la', timeout=10) | ||||
subscriber(dummyEvent) # running __call__(event) | ||||
Martin Bornhold
|
r1020 | """ | ||
Martin Bornhold
|
r1019 | |||
def __init__(self, cmd, timeout=None): | ||||
r4582 | if not isinstance(cmd, (list, tuple)): | |||
cmd = shlex.split(cmd) | ||||
r5095 | super().__init__() | |||
Martin Bornhold
|
r1019 | self._cmd = cmd | ||
self._timeout = timeout | ||||
def run(self, event): | ||||
cmd = self._cmd | ||||
timeout = self._timeout | ||||
log.debug('Executing command %s.', cmd) | ||||
try: | ||||
r4926 | output = subprocess.check_output( | |||
cmd, timeout=timeout, stderr=subprocess.STDOUT) | ||||
Martin Bornhold
|
r1019 | log.debug('Command finished %s', cmd) | ||
if output: | ||||
log.debug('Command output: %s', output) | ||||
r4926 | except subprocess.TimeoutExpired as e: | |||
Martin Bornhold
|
r1019 | log.exception('Timeout while executing command.') | ||
if e.output: | ||||
log.error('Command output: %s', e.output) | ||||
r4926 | except subprocess.CalledProcessError as e: | |||
Martin Bornhold
|
r1019 | log.exception('Error while executing command.') | ||
if e.output: | ||||
log.error('Command output: %s', e.output) | ||||
r4582 | except Exception: | |||
Martin Bornhold
|
r1019 | log.exception( | ||
'Exception while executing command %s.', cmd) | ||||