subscribers.py
315 lines
| 10.0 KiB
| text/x-python
|
PythonLexer
/ rhodecode / subscribers.py
r1 | # -*- coding: utf-8 -*- | |||
r1271 | # Copyright (C) 2010-2017 RhodeCode GmbH | |||
r1 | # | |||
# This program is free software: you can redistribute it and/or modify | ||||
# it under the terms of the GNU Affero General Public License, version 3 | ||||
# (only), as published by the Free Software Foundation. | ||||
# | ||||
# This program is distributed in the hope that it will be useful, | ||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||
# GNU General Public License for more details. | ||||
# | ||||
# You should have received a copy of the GNU Affero General Public License | ||||
# along with this program. If not, see <http://www.gnu.org/licenses/>. | ||||
# | ||||
# This program is dual-licensed. If you wish to learn more about the | ||||
# RhodeCode Enterprise Edition, including its added features, Support services, | ||||
# and proprietary license terms, please see https://rhodecode.com/licenses/ | ||||
r1538 | import io | |||
import re | ||||
r1392 | import datetime | |||
Martin Bornhold
|
r1019 | import logging | ||
r1 | import pylons | |||
Martin Bornhold
|
r1019 | import Queue | ||
import subprocess32 | ||||
r1392 | import os | |||
r51 | ||||
from pyramid.i18n import get_localizer | ||||
r21 | from pyramid.threadlocal import get_current_request | |||
r1538 | from pyramid.interfaces import IRoutesMapper | |||
from pyramid.settings import asbool | ||||
from pyramid.path import AssetResolver | ||||
Martin Bornhold
|
r1019 | from threading import Thread | ||
r1 | ||||
r51 | from rhodecode.translation import _ as tsf | |||
r1538 | from rhodecode.config.jsroutes import generate_jsroutes_content | |||
r1 | ||||
r1309 | import rhodecode | |||
from pylons.i18n.translation import _get_translator | ||||
from pylons.util import ContextObj | ||||
from routes.util import URLGenerator | ||||
from rhodecode.lib.base import attach_context_attributes, get_auth_user | ||||
Martin Bornhold
|
r1019 | log = logging.getLogger(__name__) | ||
r1 | def add_renderer_globals(event): | |||
# Put pylons stuff into the context. This will be removed as soon as | ||||
# migration to pyramid is finished. | ||||
conf = pylons.config._current_obj() | ||||
event['h'] = conf.get('pylons.h') | ||||
event['c'] = pylons.tmpl_context | ||||
event['url'] = pylons.url | ||||
r21 | # TODO: When executed in pyramid view context the request is not available | |||
# in the event. Find a better solution to get the request. | ||||
request = event['request'] or get_current_request() | ||||
r1 | # Add Pyramid translation as '_' to context | |||
event['_'] = request.translate | ||||
r1304 | event['_ungettext'] = request.plularize | |||
r1 | ||||
def add_localizer(event): | ||||
request = event.request | ||||
localizer = get_localizer(request) | ||||
def auto_translate(*args, **kwargs): | ||||
return localizer.translate(tsf(*args, **kwargs)) | ||||
request.localizer = localizer | ||||
request.translate = auto_translate | ||||
r1304 | request.plularize = localizer.pluralize | |||
Martin Bornhold
|
r580 | |||
r1307 | def set_user_lang(event): | |||
r1309 | request = event.request | |||
cur_user = getattr(request, 'user', None) | ||||
r1307 | ||||
if cur_user: | ||||
user_lang = cur_user.get_instance().user_data.get('language') | ||||
if user_lang: | ||||
r1308 | log.debug('lang: setting current user:%s language to: %s', cur_user, user_lang) | |||
r1307 | event.request._LOCALE_ = user_lang | |||
r1309 | def add_pylons_context(event): | |||
request = event.request | ||||
config = rhodecode.CONFIG | ||||
environ = request.environ | ||||
session = request.session | ||||
if hasattr(request, 'vcs_call'): | ||||
# skip vcs calls | ||||
return | ||||
# Setup pylons globals. | ||||
pylons.config._push_object(config) | ||||
pylons.request._push_object(request) | ||||
pylons.session._push_object(session) | ||||
pylons.translator._push_object(_get_translator(config.get('lang'))) | ||||
pylons.url._push_object(URLGenerator(config['routes.map'], environ)) | ||||
session_key = ( | ||||
config['pylons.environ_config'].get('session', 'beaker.session')) | ||||
environ[session_key] = session | ||||
if hasattr(request, 'rpc_method'): | ||||
# skip api calls | ||||
return | ||||
# Get the rhodecode auth user object and make it available. | ||||
auth_user = get_auth_user(environ) | ||||
request.user = auth_user | ||||
environ['rc_auth_user'] = auth_user | ||||
# Setup the pylons context object ('c') | ||||
context = ContextObj() | ||||
context.rhodecode_user = auth_user | ||||
r1776 | attach_context_attributes(context, request, request.user.user_id) | |||
r1309 | pylons.tmpl_context._push_object(context) | |||
Martin Bornhold
|
r580 | def scan_repositories_if_enabled(event): | ||
""" | ||||
This is subscribed to the `pyramid.events.ApplicationCreated` event. It | ||||
does a repository scan if enabled in the settings. | ||||
""" | ||||
settings = event.app.registry.settings | ||||
vcs_server_enabled = settings['vcs.server.enable'] | ||||
import_on_startup = settings['startup.import_repos'] | ||||
if vcs_server_enabled and import_on_startup: | ||||
r1680 | from rhodecode.model.scm import ScmModel | |||
from rhodecode.lib.utils import repo2db_mapper, get_rhodecode_base_path | ||||
Martin Bornhold
|
r580 | repositories = ScmModel().repo_scan(get_rhodecode_base_path()) | ||
repo2db_mapper(repositories, remove_obsolete=False) | ||||
Martin Bornhold
|
r1019 | |||
r1392 | def write_metadata_if_needed(event): | |||
""" | ||||
Writes upgrade metadata | ||||
""" | ||||
import rhodecode | ||||
from rhodecode.lib import system_info | ||||
from rhodecode.lib import ext_json | ||||
def write(): | ||||
fname = '.rcmetadata.json' | ||||
ini_loc = os.path.dirname(rhodecode.CONFIG.get('__file__')) | ||||
metadata_destination = os.path.join(ini_loc, fname) | ||||
r1524 | configuration = system_info.SysInfo( | |||
system_info.rhodecode_config)()['value'] | ||||
license_token = configuration['config']['license_token'] | ||||
r1392 | dbinfo = system_info.SysInfo(system_info.database_info)()['value'] | |||
del dbinfo['url'] | ||||
metadata = dict( | ||||
desc='upgrade metadata info', | ||||
r1524 | license_token=license_token, | |||
r1392 | created_on=datetime.datetime.utcnow().isoformat(), | |||
usage=system_info.SysInfo(system_info.usage_info)()['value'], | ||||
platform=system_info.SysInfo(system_info.platform_type)()['value'], | ||||
database=dbinfo, | ||||
cpu=system_info.SysInfo(system_info.cpu)()['value'], | ||||
memory=system_info.SysInfo(system_info.memory)()['value'], | ||||
) | ||||
with open(metadata_destination, 'wb') as f: | ||||
f.write(ext_json.json.dumps(metadata)) | ||||
r1681 | settings = event.app.registry.settings | |||
if settings.get('metadata.skip'): | ||||
return | ||||
r1392 | try: | |||
write() | ||||
except Exception: | ||||
pass | ||||
r1538 | def write_js_routes_if_enabled(event): | |||
registry = event.app.registry | ||||
mapper = registry.queryUtility(IRoutesMapper) | ||||
_argument_prog = re.compile('\{(.*?)\}|:\((.*)\)') | ||||
def _extract_route_information(route): | ||||
""" | ||||
Convert a route into tuple(name, path, args), eg: | ||||
('show_user', '/profile/%(username)s', ['username']) | ||||
""" | ||||
routepath = route.pattern | ||||
pattern = route.pattern | ||||
def replace(matchobj): | ||||
if matchobj.group(1): | ||||
return "%%(%s)s" % matchobj.group(1).split(':')[0] | ||||
else: | ||||
return "%%(%s)s" % matchobj.group(2) | ||||
routepath = _argument_prog.sub(replace, routepath) | ||||
r1884 | if not routepath.startswith('/'): | |||
routepath = '/'+routepath | ||||
r1538 | return ( | |||
route.name, | ||||
routepath, | ||||
[(arg[0].split(':')[0] if arg[0] != '' else arg[1]) | ||||
for arg in _argument_prog.findall(pattern)] | ||||
) | ||||
def get_routes(): | ||||
# pylons routes | ||||
for route in rhodecode.CONFIG['routes.map'].jsroutes(): | ||||
yield route | ||||
# pyramid routes | ||||
for route in mapper.get_routes(): | ||||
if not route.name.startswith('__'): | ||||
yield _extract_route_information(route) | ||||
if asbool(registry.settings.get('generate_js_files', 'false')): | ||||
static_path = AssetResolver().resolve('rhodecode:public').abspath() | ||||
jsroutes = get_routes() | ||||
jsroutes_file_content = generate_jsroutes_content(jsroutes) | ||||
jsroutes_file_path = os.path.join( | ||||
static_path, 'js', 'rhodecode', 'routes.js') | ||||
with io.open(jsroutes_file_path, 'w', encoding='utf-8') as f: | ||||
f.write(jsroutes_file_content) | ||||
Martin Bornhold
|
r1019 | class Subscriber(object): | ||
Martin Bornhold
|
r1020 | """ | ||
Base class for subscribers to the pyramid event system. | ||||
""" | ||||
Martin Bornhold
|
r1019 | def __call__(self, event): | ||
self.run(event) | ||||
def run(self, event): | ||||
raise NotImplementedError('Subclass has to implement this.') | ||||
class AsyncSubscriber(Subscriber): | ||||
Martin Bornhold
|
r1020 | """ | ||
Subscriber that handles the execution of events in a separate task to not | ||||
block the execution of the code which triggers the event. It puts the | ||||
received events into a queue from which the worker process takes them in | ||||
order. | ||||
""" | ||||
Martin Bornhold
|
r1019 | def __init__(self): | ||
self._stop = False | ||||
self._eventq = Queue.Queue() | ||||
self._worker = self.create_worker() | ||||
self._worker.start() | ||||
def __call__(self, event): | ||||
self._eventq.put(event) | ||||
def create_worker(self): | ||||
worker = Thread(target=self.do_work) | ||||
worker.daemon = True | ||||
return worker | ||||
def stop_worker(self): | ||||
self._stop = False | ||||
self._eventq.put(None) | ||||
self._worker.join() | ||||
def do_work(self): | ||||
while not self._stop: | ||||
event = self._eventq.get() | ||||
if event is not None: | ||||
self.run(event) | ||||
class AsyncSubprocessSubscriber(AsyncSubscriber): | ||||
Martin Bornhold
|
r1020 | """ | ||
Subscriber that uses the subprocess32 module to execute a command if an | ||||
event is received. Events are handled asynchronously. | ||||
""" | ||||
Martin Bornhold
|
r1019 | |||
def __init__(self, cmd, timeout=None): | ||||
super(AsyncSubprocessSubscriber, self).__init__() | ||||
self._cmd = cmd | ||||
self._timeout = timeout | ||||
def run(self, event): | ||||
cmd = self._cmd | ||||
timeout = self._timeout | ||||
log.debug('Executing command %s.', cmd) | ||||
try: | ||||
output = subprocess32.check_output( | ||||
cmd, timeout=timeout, stderr=subprocess32.STDOUT) | ||||
log.debug('Command finished %s', cmd) | ||||
if output: | ||||
log.debug('Command output: %s', output) | ||||
except subprocess32.TimeoutExpired as e: | ||||
log.exception('Timeout while executing command.') | ||||
if e.output: | ||||
log.error('Command output: %s', e.output) | ||||
except subprocess32.CalledProcessError as e: | ||||
log.exception('Error while executing command.') | ||||
if e.output: | ||||
log.error('Command output: %s', e.output) | ||||
except: | ||||
log.exception( | ||||
'Exception while executing command %s.', cmd) | ||||