From 32f4b6417ffa7329722a9ba2e11e58651d4e00e3 2019-03-17 13:01:31 From: Marcin Lulek Date: 2019-03-17 13:01:31 Subject: [PATCH] black: reformat source --- diff --git a/backend/setup.py b/backend/setup.py index 4ba8f67..ffcfc3a 100644 --- a/backend/setup.py +++ b/backend/setup.py @@ -4,12 +4,12 @@ import re from setuptools import setup, find_packages here = os.path.abspath(os.path.dirname(__file__)) -README = open(os.path.join(here, 'README.rst')).read() -CHANGES = open(os.path.join(here, 'CHANGELOG.rst')).read() +README = open(os.path.join(here, "README.rst")).read() +CHANGES = open(os.path.join(here, "CHANGELOG.rst")).read() -REQUIREMENTS = open(os.path.join(here, 'requirements.txt')).readlines() +REQUIREMENTS = open(os.path.join(here, "requirements.txt")).readlines() -compiled = re.compile('([^=><]*).*') +compiled = re.compile("([^=><]*).*") def parse_req(req): @@ -21,7 +21,8 @@ requires = [_f for _f in map(parse_req, REQUIREMENTS) if _f] def _get_meta_var(name, data, callback_handler=None): import re - matches = re.compile(r'(?:%s)\s*=\s*(.*)' % name).search(data) + + matches = re.compile(r"(?:%s)\s*=\s*(.*)" % name).search(data) if matches: if not callable(callback_handler): callback_handler = lambda v: v @@ -29,53 +30,60 @@ def _get_meta_var(name, data, callback_handler=None): return callback_handler(eval(matches.groups()[0])) -with open(os.path.join(here, 'src', 'appenlight', '__init__.py'), 'r') as _meta: +with open(os.path.join(here, "src", "appenlight", "__init__.py"), "r") as _meta: _metadata = _meta.read() -with open(os.path.join(here, 'VERSION'), 'r') as _meta_version: +with open(os.path.join(here, "VERSION"), "r") as _meta_version: __version__ = _meta_version.read().strip() -__license__ = _get_meta_var('__license__', _metadata) -__author__ = _get_meta_var('__author__', _metadata) -__url__ = _get_meta_var('__url__', _metadata) - -found_packages = find_packages('src') -found_packages.append('appenlight.migrations.versions') -setup(name='appenlight', - description='appenlight', - long_description=README + '\n\n' + CHANGES, - classifiers=[ - "Programming Language :: Python", - "Framework :: Pylons", - "Topic :: Internet :: WWW/HTTP", - "Topic :: Internet :: WWW/HTTP :: WSGI :: Application", - ], - version=__version__, - license=__license__, - author=__author__, - url=__url__, - keywords='web wsgi bfg pylons pyramid', - package_dir={'': 'src'}, - packages=found_packages, - include_package_data=True, - zip_safe=False, - test_suite='appenlight', - install_requires=requires, - extras_require={ - "dev": ["coverage", "pytest", "pyramid", "tox", "mock", "pytest-mock", "webtest"], - "lint": ["black"], - }, - entry_points={ - 'paste.app_factory': [ - 'main = appenlight:main' - ], - 'console_scripts': [ - 'appenlight-cleanup = appenlight.scripts.cleanup:main', - 'appenlight-initializedb = appenlight.scripts.initialize_db:main', - 'appenlight-migratedb = appenlight.scripts.migratedb:main', - 'appenlight-reindex-elasticsearch = appenlight.scripts.reindex_elasticsearch:main', - 'appenlight-static = appenlight.scripts.static:main', - 'appenlight-make-config = appenlight.scripts.make_config:main', - ] - } - ) +__license__ = _get_meta_var("__license__", _metadata) +__author__ = _get_meta_var("__author__", _metadata) +__url__ = _get_meta_var("__url__", _metadata) + +found_packages = find_packages("src") +found_packages.append("appenlight.migrations.versions") +setup( + name="appenlight", + description="appenlight", + long_description=README + "\n\n" + CHANGES, + classifiers=[ + "Programming Language :: Python", + "Framework :: Pylons", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Internet :: WWW/HTTP :: WSGI :: Application", + ], + version=__version__, + license=__license__, + author=__author__, + url=__url__, + keywords="web wsgi bfg pylons pyramid", + package_dir={"": "src"}, + packages=found_packages, + include_package_data=True, + zip_safe=False, + test_suite="appenlight", + install_requires=requires, + extras_require={ + "dev": [ + "coverage", + "pytest", + "pyramid", + "tox", + "mock", + "pytest-mock", + "webtest", + ], + "lint": ["black"], + }, + entry_points={ + "paste.app_factory": ["main = appenlight:main"], + "console_scripts": [ + "appenlight-cleanup = appenlight.scripts.cleanup:main", + "appenlight-initializedb = appenlight.scripts.initialize_db:main", + "appenlight-migratedb = appenlight.scripts.migratedb:main", + "appenlight-reindex-elasticsearch = appenlight.scripts.reindex_elasticsearch:main", + "appenlight-static = appenlight.scripts.static:main", + "appenlight-make-config = appenlight.scripts.make_config:main", + ], + }, +) diff --git a/backend/src/appenlight/__init__.py b/backend/src/appenlight/__init__.py index 0e0700e..6fb1d63 100644 --- a/backend/src/appenlight/__init__.py +++ b/backend/src/appenlight/__init__.py @@ -38,15 +38,17 @@ from redlock import Redlock from sqlalchemy import engine_from_config from appenlight.celery import configure_celery -from appenlight.lib.configurator import (CythonCompatConfigurator, - register_appenlight_plugin) +from appenlight.lib.configurator import ( + CythonCompatConfigurator, + register_appenlight_plugin, +) from appenlight.lib import cache_regions from appenlight.lib.ext_json import json from appenlight.security import groupfinder, AuthTokenAuthenticationPolicy -__license__ = 'Apache 2.0' -__author__ = 'RhodeCode GmbH' -__url__ = 'http://rhodecode.com' +__license__ = "Apache 2.0" +__author__ = "RhodeCode GmbH" +__url__ = "http://rhodecode.com" __version__ = pkg_resources.get_distribution("appenlight").parsed_version json_renderer = JSON(serializer=json.dumps, indent=4) @@ -59,7 +61,7 @@ def datetime_adapter(obj, request): def all_permissions_adapter(obj, request): - return '__all_permissions__' + return "__all_permissions__" json_renderer.add_adapter(datetime.datetime, datetime_adapter) @@ -70,91 +72,109 @@ def main(global_config, **settings): """ This function returns a Pyramid WSGI application. """ auth_tkt_policy = AuthTktAuthenticationPolicy( - settings['authtkt.secret'], - hashalg='sha512', + settings["authtkt.secret"], + hashalg="sha512", callback=groupfinder, max_age=2592000, - secure=asbool(settings.get('authtkt.secure', 'false'))) - auth_token_policy = AuthTokenAuthenticationPolicy( - callback=groupfinder + secure=asbool(settings.get("authtkt.secure", "false")), ) + auth_token_policy = AuthTokenAuthenticationPolicy(callback=groupfinder) authorization_policy = ACLAuthorizationPolicy() authentication_policy = AuthenticationStackPolicy() - authentication_policy.add_policy('auth_tkt', auth_tkt_policy) - authentication_policy.add_policy('auth_token', auth_token_policy) + authentication_policy.add_policy("auth_tkt", auth_tkt_policy) + authentication_policy.add_policy("auth_token", auth_token_policy) # set crypto key - encryption.ENCRYPTION_SECRET = settings.get('encryption_secret') + encryption.ENCRYPTION_SECRET = settings.get("encryption_secret") # import this later so encyption key can be monkeypatched from appenlight.models import DBSession, register_datastores # registration - settings['appenlight.disable_registration'] = asbool( - settings.get('appenlight.disable_registration')) + settings["appenlight.disable_registration"] = asbool( + settings.get("appenlight.disable_registration") + ) # update config with cometd info - settings['cometd_servers'] = {'server': settings['cometd.server'], - 'secret': settings['cometd.secret']} + settings["cometd_servers"] = { + "server": settings["cometd.server"], + "secret": settings["cometd.secret"], + } # Create the Pyramid Configurator. - settings['_mail_url'] = settings['mailing.app_url'] + settings["_mail_url"] = settings["mailing.app_url"] config = CythonCompatConfigurator( settings=settings, authentication_policy=authentication_policy, authorization_policy=authorization_policy, - root_factory='appenlight.security.RootFactory', - default_permission='view') + root_factory="appenlight.security.RootFactory", + default_permission="view", + ) # custom registry variables # resource type information - config.registry.resource_types = ['resource', 'application'] + config.registry.resource_types = ["resource", "application"] # plugin information config.registry.appenlight_plugins = {} - config.set_default_csrf_options(require_csrf=True, header='X-XSRF-TOKEN') - config.add_view_deriver('appenlight.predicates.csrf_view', - name='csrf_view') + config.set_default_csrf_options(require_csrf=True, header="X-XSRF-TOKEN") + config.add_view_deriver("appenlight.predicates.csrf_view", name="csrf_view") # later, when config is available - dogpile_config = {'url': settings['redis.url'], - "redis_expiration_time": 86400, - "redis_distributed_lock": True} + dogpile_config = { + "url": settings["redis.url"], + "redis_expiration_time": 86400, + "redis_distributed_lock": True, + } cache_regions.regions = cache_regions.CacheRegions(dogpile_config) config.registry.cache_regions = cache_regions.regions - engine = engine_from_config(settings, 'sqlalchemy.', - json_serializer=json.dumps) + engine = engine_from_config(settings, "sqlalchemy.", json_serializer=json.dumps) DBSession.configure(bind=engine) # json rederer that serializes datetime - config.add_renderer('json', json_renderer) - config.add_request_method('appenlight.lib.request.es_conn', 'es_conn', property=True) - config.add_request_method('appenlight.lib.request.get_user', 'user', - reify=True, property=True) - config.add_request_method('appenlight.lib.request.get_csrf_token', - 'csrf_token', reify=True, property=True) - config.add_request_method('appenlight.lib.request.safe_json_body', - 'safe_json_body', reify=True, property=True) - config.add_request_method('appenlight.lib.request.unsafe_json_body', - 'unsafe_json_body', reify=True, property=True) - config.add_request_method('appenlight.lib.request.add_flash_to_headers', - 'add_flash_to_headers') - config.add_request_method('appenlight.lib.request.get_authomatic', - 'authomatic', reify=True) - - config.include('pyramid_redis_sessions') - config.include('pyramid_tm') - config.include('pyramid_jinja2') - config.include('pyramid_mailer') - config.include('appenlight_client.ext.pyramid_tween') - config.include('ziggurat_foundations.ext.pyramid.sign_in') - es_server_list = aslist(settings['elasticsearch.nodes']) - redis_url = settings['redis.url'] - log.warning('Elasticsearch server list: {}'.format(es_server_list)) - log.warning('Redis server: {}'.format(redis_url)) + config.add_renderer("json", json_renderer) + config.add_request_method( + "appenlight.lib.request.es_conn", "es_conn", property=True + ) + config.add_request_method( + "appenlight.lib.request.get_user", "user", reify=True, property=True + ) + config.add_request_method( + "appenlight.lib.request.get_csrf_token", "csrf_token", reify=True, property=True + ) + config.add_request_method( + "appenlight.lib.request.safe_json_body", + "safe_json_body", + reify=True, + property=True, + ) + config.add_request_method( + "appenlight.lib.request.unsafe_json_body", + "unsafe_json_body", + reify=True, + property=True, + ) + config.add_request_method( + "appenlight.lib.request.add_flash_to_headers", "add_flash_to_headers" + ) + config.add_request_method( + "appenlight.lib.request.get_authomatic", "authomatic", reify=True + ) + + config.include("pyramid_redis_sessions") + config.include("pyramid_tm") + config.include("pyramid_jinja2") + config.include("pyramid_mailer") + config.include("appenlight_client.ext.pyramid_tween") + config.include("ziggurat_foundations.ext.pyramid.sign_in") + es_server_list = aslist(settings["elasticsearch.nodes"]) + redis_url = settings["redis.url"] + log.warning("Elasticsearch server list: {}".format(es_server_list)) + log.warning("Redis server: {}".format(redis_url)) config.registry.es_conn = Elasticsearch(es_server_list) config.registry.redis_conn = redis.StrictRedis.from_url(redis_url) - config.registry.redis_lockmgr = Redlock([settings['redis.redlock.url'], ], - retry_count=0, retry_delay=0) + config.registry.redis_lockmgr = Redlock( + [settings["redis.redlock.url"]], retry_count=0, retry_delay=0 + ) # mailer bw compat config.registry.mailer = config.registry.getUtility(IMailer) @@ -163,47 +183,56 @@ def main(global_config, **settings): config.set_session_factory(session_factory) # Configure renderers and event subscribers - config.add_jinja2_extension('jinja2.ext.loopcontrols') - config.add_jinja2_search_path('appenlight:templates') + config.add_jinja2_extension("jinja2.ext.loopcontrols") + config.add_jinja2_search_path("appenlight:templates") # event subscribers - config.add_subscriber("appenlight.subscribers.application_created", - "pyramid.events.ApplicationCreated") - config.add_subscriber("appenlight.subscribers.add_renderer_globals", - "pyramid.events.BeforeRender") - config.add_subscriber('appenlight.subscribers.new_request', - 'pyramid.events.NewRequest') - config.add_view_predicate('context_type_class', - 'appenlight.predicates.contextTypeClass') - - register_datastores(es_conn=config.registry.es_conn, - redis_conn=config.registry.redis_conn, - redis_lockmgr=config.registry.redis_lockmgr) + config.add_subscriber( + "appenlight.subscribers.application_created", + "pyramid.events.ApplicationCreated", + ) + config.add_subscriber( + "appenlight.subscribers.add_renderer_globals", "pyramid.events.BeforeRender" + ) + config.add_subscriber( + "appenlight.subscribers.new_request", "pyramid.events.NewRequest" + ) + config.add_view_predicate( + "context_type_class", "appenlight.predicates.contextTypeClass" + ) + + register_datastores( + es_conn=config.registry.es_conn, + redis_conn=config.registry.redis_conn, + redis_lockmgr=config.registry.redis_lockmgr, + ) # base stuff and scan # need to ensure webassets exists otherwise config.override_asset() # throws exception - if not os.path.exists(settings['webassets.dir']): - os.mkdir(settings['webassets.dir']) - config.add_static_view(path='appenlight:webassets', - name='static', cache_max_age=3600) - config.override_asset(to_override='appenlight:webassets/', - override_with=settings['webassets.dir']) - - config.include('appenlight.views') - config.include('appenlight.views.admin') - config.scan(ignore=['appenlight.migrations', 'appenlight.scripts', - 'appenlight.tests']) - - config.add_directive('register_appenlight_plugin', - register_appenlight_plugin) - - for entry_point in iter_entry_points(group='appenlight.plugins'): + if not os.path.exists(settings["webassets.dir"]): + os.mkdir(settings["webassets.dir"]) + config.add_static_view( + path="appenlight:webassets", name="static", cache_max_age=3600 + ) + config.override_asset( + to_override="appenlight:webassets/", override_with=settings["webassets.dir"] + ) + + config.include("appenlight.views") + config.include("appenlight.views.admin") + config.scan( + ignore=["appenlight.migrations", "appenlight.scripts", "appenlight.tests"] + ) + + config.add_directive("register_appenlight_plugin", register_appenlight_plugin) + + for entry_point in iter_entry_points(group="appenlight.plugins"): plugin = entry_point.load() plugin.includeme(config) # include other appenlight plugins explictly if needed - includes = aslist(settings.get('appenlight.includes', [])) + includes = aslist(settings.get("appenlight.includes", [])) for inc in includes: config.include(inc) @@ -211,8 +240,8 @@ def main(global_config, **settings): def pre_commit(): jinja_env = config.get_jinja2_environment() - jinja_env.filters['tojson'] = json.dumps - jinja_env.filters['toJSONUnsafe'] = jinja2_filters.toJSONUnsafe + jinja_env.filters["tojson"] = json.dumps + jinja_env.filters["toJSONUnsafe"] = jinja2_filters.toJSONUnsafe config.action(None, pre_commit, order=PHASE3_CONFIG + 999) diff --git a/backend/src/appenlight/celery/__init__.py b/backend/src/appenlight/celery/__init__.py index 3f61daa..8957371 100644 --- a/backend/src/appenlight/celery/__init__.py +++ b/backend/src/appenlight/celery/__init__.py @@ -34,15 +34,23 @@ from appenlight_client.ext.celery import register_signals log = logging.getLogger(__name__) -register('date_json', json_dumps, json_loads, - content_type='application/x-date_json', - content_encoding='utf-8') +register( + "date_json", + json_dumps, + json_loads, + content_type="application/x-date_json", + content_encoding="utf-8", +) celery = Celery() -celery.user_options['preload'].add( - Option('--ini', dest='ini', default=None, - help='Specifies pyramid configuration file location.') +celery.user_options["preload"].add( + Option( + "--ini", + dest="ini", + default=None, + help="Specifies pyramid configuration file location.", + ) ) @@ -51,19 +59,21 @@ def on_preload_parsed(options, **kwargs): """ This actually configures celery from pyramid config file """ - celery.conf['INI_PYRAMID'] = options['ini'] + celery.conf["INI_PYRAMID"] = options["ini"] import appenlight_client.client as e_client - ini_location = options['ini'] + + ini_location = options["ini"] if not ini_location: - raise Exception('You need to pass pyramid ini location using ' - '--ini=filename.ini argument to the worker') + raise Exception( + "You need to pass pyramid ini location using " + "--ini=filename.ini argument to the worker" + ) env = bootstrap(ini_location[0]) - api_key = env['request'].registry.settings['appenlight.api_key'] - tr_config = env['request'].registry.settings.get( - 'appenlight.transport_config') - CONFIG = e_client.get_config({'appenlight.api_key': api_key}) + api_key = env["request"].registry.settings["appenlight.api_key"] + tr_config = env["request"].registry.settings.get("appenlight.transport_config") + CONFIG = e_client.get_config({"appenlight.api_key": api_key}) if tr_config: - CONFIG['appenlight.transport_config'] = tr_config + CONFIG["appenlight.transport_config"] = tr_config APPENLIGHT_CLIENT = e_client.Client(CONFIG) # log.addHandler(APPENLIGHT_CLIENT.log_handler) register_signals(APPENLIGHT_CLIENT) @@ -71,101 +81,101 @@ def on_preload_parsed(options, **kwargs): celery_config = { - 'CELERY_IMPORTS': ["appenlight.celery.tasks", ], - 'CELERYD_TASK_TIME_LIMIT': 60, - 'CELERYD_MAX_TASKS_PER_CHILD': 1000, - 'CELERY_IGNORE_RESULT': True, - 'CELERY_ACCEPT_CONTENT': ['date_json'], - 'CELERY_TASK_SERIALIZER': 'date_json', - 'CELERY_RESULT_SERIALIZER': 'date_json', - 'BROKER_URL': None, - 'CELERYD_CONCURRENCY': None, - 'CELERY_TIMEZONE': None, - 'CELERYBEAT_SCHEDULE': { - 'alerting_reports': { - 'task': 'appenlight.celery.tasks.alerting_reports', - 'schedule': timedelta(seconds=60) + "CELERY_IMPORTS": ["appenlight.celery.tasks"], + "CELERYD_TASK_TIME_LIMIT": 60, + "CELERYD_MAX_TASKS_PER_CHILD": 1000, + "CELERY_IGNORE_RESULT": True, + "CELERY_ACCEPT_CONTENT": ["date_json"], + "CELERY_TASK_SERIALIZER": "date_json", + "CELERY_RESULT_SERIALIZER": "date_json", + "BROKER_URL": None, + "CELERYD_CONCURRENCY": None, + "CELERY_TIMEZONE": None, + "CELERYBEAT_SCHEDULE": { + "alerting_reports": { + "task": "appenlight.celery.tasks.alerting_reports", + "schedule": timedelta(seconds=60), }, - 'close_alerts': { - 'task': 'appenlight.celery.tasks.close_alerts', - 'schedule': timedelta(seconds=60) - } - } + "close_alerts": { + "task": "appenlight.celery.tasks.close_alerts", + "schedule": timedelta(seconds=60), + }, + }, } celery.config_from_object(celery_config) def configure_celery(pyramid_registry): settings = pyramid_registry.settings - celery_config['BROKER_URL'] = settings['celery.broker_url'] - celery_config['CELERYD_CONCURRENCY'] = settings['celery.concurrency'] - celery_config['CELERY_TIMEZONE'] = settings['celery.timezone'] + celery_config["BROKER_URL"] = settings["celery.broker_url"] + celery_config["CELERYD_CONCURRENCY"] = settings["celery.concurrency"] + celery_config["CELERY_TIMEZONE"] = settings["celery.timezone"] - notifications_seconds = int(settings.get('tasks.notifications_reports.interval', 60)) + notifications_seconds = int( + settings.get("tasks.notifications_reports.interval", 60) + ) - celery_config['CELERYBEAT_SCHEDULE']['notifications'] = { - 'task': 'appenlight.celery.tasks.notifications_reports', - 'schedule': timedelta(seconds=notifications_seconds) + celery_config["CELERYBEAT_SCHEDULE"]["notifications"] = { + "task": "appenlight.celery.tasks.notifications_reports", + "schedule": timedelta(seconds=notifications_seconds), } - celery_config['CELERYBEAT_SCHEDULE']['daily_digest'] = { - 'task': 'appenlight.celery.tasks.daily_digest', - 'schedule': crontab(minute=1, hour='4,12,20') + celery_config["CELERYBEAT_SCHEDULE"]["daily_digest"] = { + "task": "appenlight.celery.tasks.daily_digest", + "schedule": crontab(minute=1, hour="4,12,20"), } - if asbool(settings.get('celery.always_eager')): - celery_config['CELERY_ALWAYS_EAGER'] = True - celery_config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True + if asbool(settings.get("celery.always_eager")): + celery_config["CELERY_ALWAYS_EAGER"] = True + celery_config["CELERY_EAGER_PROPAGATES_EXCEPTIONS"] = True for plugin in pyramid_registry.appenlight_plugins.values(): - if plugin.get('celery_tasks'): - celery_config['CELERY_IMPORTS'].extend(plugin['celery_tasks']) - if plugin.get('celery_beats'): - for name, config in plugin['celery_beats']: - celery_config['CELERYBEAT_SCHEDULE'][name] = config + if plugin.get("celery_tasks"): + celery_config["CELERY_IMPORTS"].extend(plugin["celery_tasks"]) + if plugin.get("celery_beats"): + for name, config in plugin["celery_beats"]: + celery_config["CELERYBEAT_SCHEDULE"][name] = config celery.config_from_object(celery_config) @task_prerun.connect def task_prerun_signal(task_id, task, args, kwargs, **kwaargs): - if hasattr(celery, 'pyramid'): + if hasattr(celery, "pyramid"): env = celery.pyramid - env = prepare(registry=env['request'].registry) - proper_base_url = env['request'].registry.settings['mailing.app_url'] - tmp_req = Request.blank('/', base_url=proper_base_url) + env = prepare(registry=env["request"].registry) + proper_base_url = env["request"].registry.settings["mailing.app_url"] + tmp_req = Request.blank("/", base_url=proper_base_url) # ensure tasks generate url for right domain from config - env['request'].environ['HTTP_HOST'] = tmp_req.environ['HTTP_HOST'] - env['request'].environ['SERVER_PORT'] = tmp_req.environ['SERVER_PORT'] - env['request'].environ['SERVER_NAME'] = tmp_req.environ['SERVER_NAME'] - env['request'].environ['wsgi.url_scheme'] = \ - tmp_req.environ['wsgi.url_scheme'] + env["request"].environ["HTTP_HOST"] = tmp_req.environ["HTTP_HOST"] + env["request"].environ["SERVER_PORT"] = tmp_req.environ["SERVER_PORT"] + env["request"].environ["SERVER_NAME"] = tmp_req.environ["SERVER_NAME"] + env["request"].environ["wsgi.url_scheme"] = tmp_req.environ["wsgi.url_scheme"] get_current_request().tm.begin() @task_success.connect def task_success_signal(result, **kwargs): get_current_request().tm.commit() - if hasattr(celery, 'pyramid'): + if hasattr(celery, "pyramid"): celery.pyramid["closer"]() @task_retry.connect def task_retry_signal(request, reason, einfo, **kwargs): get_current_request().tm.abort() - if hasattr(celery, 'pyramid'): + if hasattr(celery, "pyramid"): celery.pyramid["closer"]() @task_failure.connect -def task_failure_signal(task_id, exception, args, kwargs, traceback, einfo, - **kwaargs): +def task_failure_signal(task_id, exception, args, kwargs, traceback, einfo, **kwaargs): get_current_request().tm.abort() - if hasattr(celery, 'pyramid'): + if hasattr(celery, "pyramid"): celery.pyramid["closer"]() @task_revoked.connect def task_revoked_signal(request, terminated, signum, expired, **kwaargs): get_current_request().tm.abort() - if hasattr(celery, 'pyramid'): + if hasattr(celery, "pyramid"): celery.pyramid["closer"]() diff --git a/backend/src/appenlight/celery/encoders.py b/backend/src/appenlight/celery/encoders.py index b016f37..4f96912 100644 --- a/backend/src/appenlight/celery/encoders.py +++ b/backend/src/appenlight/celery/encoders.py @@ -17,38 +17,29 @@ import json from datetime import datetime, date, timedelta -DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' +DATE_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" class DateEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): - return { - '__type__': '__datetime__', - 'iso': obj.strftime(DATE_FORMAT) - } + return {"__type__": "__datetime__", "iso": obj.strftime(DATE_FORMAT)} elif isinstance(obj, date): - return { - '__type__': '__date__', - 'iso': obj.strftime(DATE_FORMAT) - } + return {"__type__": "__date__", "iso": obj.strftime(DATE_FORMAT)} elif isinstance(obj, timedelta): - return { - '__type__': '__timedelta__', - 'seconds': obj.total_seconds() - } + return {"__type__": "__timedelta__", "seconds": obj.total_seconds()} else: return json.JSONEncoder.default(self, obj) def date_decoder(dct): - if '__type__' in dct: - if dct['__type__'] == '__datetime__': - return datetime.strptime(dct['iso'], DATE_FORMAT) - elif dct['__type__'] == '__date__': - return datetime.strptime(dct['iso'], DATE_FORMAT).date() - elif dct['__type__'] == '__timedelta__': - return timedelta(seconds=dct['seconds']) + if "__type__" in dct: + if dct["__type__"] == "__datetime__": + return datetime.strptime(dct["iso"], DATE_FORMAT) + elif dct["__type__"] == "__date__": + return datetime.strptime(dct["iso"], DATE_FORMAT).date() + elif dct["__type__"] == "__timedelta__": + return timedelta(seconds=dct["seconds"]) return dct @@ -57,4 +48,4 @@ def json_dumps(obj): def json_loads(obj): - return json.loads(obj.decode('utf8'), object_hook=date_decoder) + return json.loads(obj.decode("utf8"), object_hook=date_decoder) diff --git a/backend/src/appenlight/celery/tasks.py b/backend/src/appenlight/celery/tasks.py index a804224..f768f5e 100644 --- a/backend/src/appenlight/celery/tasks.py +++ b/backend/src/appenlight/celery/tasks.py @@ -51,9 +51,11 @@ from appenlight.lib.enums import ReportType log = get_task_logger(__name__) -sample_boundries = list(range(100, 1000, 100)) + \ - list(range(1000, 10000, 1000)) + \ - list(range(10000, 100000, 5000)) +sample_boundries = ( + list(range(100, 1000, 100)) + + list(range(1000, 10000, 1000)) + + list(range(10000, 100000, 5000)) +) def pick_sample(total_occurences, report_type=None): @@ -70,9 +72,9 @@ def pick_sample(total_occurences, report_type=None): @celery.task(queue="default", default_retry_delay=1, max_retries=2) def test_exception_task(): - log.error('test celery log', extra={'location': 'celery'}) - log.warning('test celery log', extra={'location': 'celery'}) - raise Exception('Celery exception test') + log.error("test celery log", extra={"location": "celery"}) + log.warning("test celery log", extra={"location": "celery"}) + raise Exception("Celery exception test") @celery.task(queue="default", default_retry_delay=1, max_retries=2) @@ -81,9 +83,9 @@ def test_retry_exception_task(): import time time.sleep(1.3) - log.error('test retry celery log', extra={'location': 'celery'}) - log.warning('test retry celery log', extra={'location': 'celery'}) - raise Exception('Celery exception test') + log.error("test retry celery log", extra={"location": "celery"}) + log.warning("test retry celery log", extra={"location": "celery"}) + raise Exception("Celery exception test") except Exception as exc: if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]: raise @@ -92,7 +94,7 @@ def test_retry_exception_task(): @celery.task(queue="reports", default_retry_delay=600, max_retries=144) def add_reports(resource_id, request_params, dataset, **kwargs): - proto_version = parse_proto(request_params.get('protocol_version', '')) + proto_version = parse_proto(request_params.get("protocol_version", "")) current_time = datetime.utcnow().replace(second=0, microsecond=0) try: # we will store solr docs here for single insert @@ -114,22 +116,26 @@ def add_reports(resource_id, request_params, dataset, **kwargs): report_group = ReportGroupService.by_hash_and_resource( report.resource_id, report.grouping_hash, - since_when=datetime.utcnow().date().replace(day=1) + since_when=datetime.utcnow().date().replace(day=1), ) - occurences = report_data.get('occurences', 1) + occurences = report_data.get("occurences", 1) if not report_group: # total reports will be +1 moment later - report_group = ReportGroup(grouping_hash=report.grouping_hash, - occurences=0, total_reports=0, - last_report=0, - priority=report.priority, - error=report.error, - first_timestamp=report.start_time) + report_group = ReportGroup( + grouping_hash=report.grouping_hash, + occurences=0, + total_reports=0, + last_report=0, + priority=report.priority, + error=report.error, + first_timestamp=report.start_time, + ) report_group._skip_ft_index = True report_group.report_type = report.report_type report.report_group_time = report_group.first_timestamp - add_sample = pick_sample(report_group.occurences, - report_type=report_group.report_type) + add_sample = pick_sample( + report_group.occurences, report_type=report_group.report_type + ) if add_sample: resource.report_groups.append(report_group) report_group.reports.append(report) @@ -144,28 +150,26 @@ def add_reports(resource_id, request_params, dataset, **kwargs): for s_call in slow_calls: if s_call.partition_id not in es_slow_calls_docs: es_slow_calls_docs[s_call.partition_id] = [] - es_slow_calls_docs[s_call.partition_id].append( - s_call.es_doc()) + es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc()) # try generating new stat rows if needed else: # required for postprocessing to not fail later report.report_group = report_group - stat_row = ReportService.generate_stat_rows( - report, resource, report_group) + stat_row = ReportService.generate_stat_rows(report, resource, report_group) if stat_row.partition_id not in es_reports_stats_rows: es_reports_stats_rows[stat_row.partition_id] = [] - es_reports_stats_rows[stat_row.partition_id].append( - stat_row.es_doc()) + es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc()) # see if we should mark 10th occurence of report last_occurences_10 = int(math.floor(report_group.occurences / 10)) - curr_occurences_10 = int(math.floor( - (report_group.occurences + report.occurences) / 10)) - last_occurences_100 = int( - math.floor(report_group.occurences / 100)) - curr_occurences_100 = int(math.floor( - (report_group.occurences + report.occurences) / 100)) + curr_occurences_10 = int( + math.floor((report_group.occurences + report.occurences) / 10) + ) + last_occurences_100 = int(math.floor(report_group.occurences / 100)) + curr_occurences_100 = int( + math.floor((report_group.occurences + report.occurences) / 100) + ) notify_occurences_10 = last_occurences_10 != curr_occurences_10 notify_occurences_100 = last_occurences_100 != curr_occurences_100 report_group.occurences = ReportGroup.occurences + occurences @@ -178,39 +182,47 @@ def add_reports(resource_id, request_params, dataset, **kwargs): if added_details: report_group.total_reports = ReportGroup.total_reports + 1 report_group.last_report = report.id - report_group.set_notification_info(notify_10=notify_occurences_10, - notify_100=notify_occurences_100) + report_group.set_notification_info( + notify_10=notify_occurences_10, notify_100=notify_occurences_100 + ) DBSession.flush() report_group.get_report().notify_channel(report_group) if report_group.partition_id not in es_report_group_docs: es_report_group_docs[report_group.partition_id] = [] es_report_group_docs[report_group.partition_id].append( - report_group.es_doc()) + report_group.es_doc() + ) - action = 'REPORT' - log_msg = '%s: %s %s, client: %s, proto: %s' % ( + action = "REPORT" + log_msg = "%s: %s %s, client: %s, proto: %s" % ( action, - report_data.get('http_status', 'unknown'), + report_data.get("http_status", "unknown"), str(resource), - report_data.get('client'), - proto_version) + report_data.get("client"), + proto_version, + ) log.info(log_msg) total_reports = len(dataset) redis_pipeline = Datastores.redis.pipeline(transaction=False) - key = REDIS_KEYS['counters']['reports_per_minute'].format(current_time) + key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time) redis_pipeline.incr(key, total_reports) redis_pipeline.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['events_per_minute_per_user'].format( - resource.owner_user_id, current_time) + key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format( + resource.owner_user_id, current_time + ) redis_pipeline.incr(key, total_reports) redis_pipeline.expire(key, 3600) - key = REDIS_KEYS['counters']['reports_per_hour_per_app'].format( - resource_id, current_time.replace(minute=0)) + key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format( + resource_id, current_time.replace(minute=0) + ) redis_pipeline.incr(key, total_reports) redis_pipeline.expire(key, 3600 * 24 * 7) redis_pipeline.sadd( - REDIS_KEYS['apps_that_got_new_data_per_hour'].format( - current_time.replace(minute=0)), resource_id) + REDIS_KEYS["apps_that_got_new_data_per_hour"].format( + current_time.replace(minute=0) + ), + resource_id, + ) redis_pipeline.execute() add_reports_es(es_report_group_docs, es_report_docs) @@ -227,11 +239,11 @@ def add_reports(resource_id, request_params, dataset, **kwargs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_es(report_group_docs, report_docs): for k, v in report_group_docs.items(): - to_update = {'_index': k, '_type': 'report_group'} + to_update = {"_index": k, "_type": "report_group"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) for k, v in report_docs.items(): - to_update = {'_index': k, '_type': 'report'} + to_update = {"_index": k, "_type": "report"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @@ -239,7 +251,7 @@ def add_reports_es(report_group_docs, report_docs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_slow_calls_es(es_docs): for k, v in es_docs.items(): - to_update = {'_index': k, '_type': 'log'} + to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @@ -247,14 +259,14 @@ def add_reports_slow_calls_es(es_docs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_reports_stats_rows_es(es_docs): for k, v in es_docs.items(): - to_update = {'_index': k, '_type': 'log'} + to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @celery.task(queue="logs", default_retry_delay=600, max_retries=144) def add_logs(resource_id, request_params, dataset, **kwargs): - proto_version = request_params.get('protocol_version') + proto_version = request_params.get("protocol_version") current_time = datetime.utcnow().replace(second=0, microsecond=0) try: @@ -264,16 +276,15 @@ def add_logs(resource_id, request_params, dataset, **kwargs): ns_pairs = [] for entry in dataset: # gather pk and ns so we can remove older versions of row later - if entry['primary_key'] is not None: - ns_pairs.append({"pk": entry['primary_key'], - "ns": entry['namespace']}) + if entry["primary_key"] is not None: + ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]}) log_entry = Log() log_entry.set_data(entry, resource=resource) log_entry._skip_ft_index = True resource.logs.append(log_entry) DBSession.flush() # insert non pk rows first - if entry['primary_key'] is None: + if entry["primary_key"] is None: es_docs[log_entry.partition_id].append(log_entry.es_doc()) # 2nd pass to delete all log entries from db foe same pk/ns pair @@ -282,7 +293,8 @@ def add_logs(resource_id, request_params, dataset, **kwargs): es_docs = collections.defaultdict(list) es_docs_to_delete = collections.defaultdict(list) found_pkey_logs = LogService.query_by_primary_key_and_namespace( - list_of_pairs=ns_pairs) + list_of_pairs=ns_pairs + ) log_dict = {} for log_entry in found_pkey_logs: log_key = (log_entry.primary_key, log_entry.namespace) @@ -299,51 +311,58 @@ def add_logs(resource_id, request_params, dataset, **kwargs): ids_to_delete.append(e.log_id) es_docs_to_delete[e.partition_id].append(e.delete_hash) - es_docs_to_delete[log_entry.partition_id].append( - log_entry.delete_hash) + es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash) es_docs[log_entry.partition_id].append(log_entry.es_doc()) if ids_to_delete: - query = DBSession.query(Log).filter( - Log.log_id.in_(ids_to_delete)) + query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete)) query.delete(synchronize_session=False) if es_docs_to_delete: # batch this to avoid problems with default ES bulk limits for es_index in es_docs_to_delete.keys(): for batch in in_batches(es_docs_to_delete[es_index], 20): - query = {"query": {'terms': {'delete_hash': batch}}} + query = {"query": {"terms": {"delete_hash": batch}}} try: Datastores.es.transport.perform_request( - "DELETE", '/{}/{}/_query'.format(es_index, 'log'), body=query) + "DELETE", + "/{}/{}/_query".format(es_index, "log"), + body=query, + ) except elasticsearch.exceptions.NotFoundError as exc: - msg = 'skipping index {}'.format(es_index) + msg = "skipping index {}".format(es_index) log.info(msg) total_logs = len(dataset) - log_msg = 'LOG_NEW: %s, entries: %s, proto:%s' % ( + log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % ( str(resource), total_logs, - proto_version) + proto_version, + ) log.info(log_msg) # mark_changed(session) redis_pipeline = Datastores.redis.pipeline(transaction=False) - key = REDIS_KEYS['counters']['logs_per_minute'].format(current_time) + key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time) redis_pipeline.incr(key, total_logs) redis_pipeline.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['events_per_minute_per_user'].format( - resource.owner_user_id, current_time) + key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format( + resource.owner_user_id, current_time + ) redis_pipeline.incr(key, total_logs) redis_pipeline.expire(key, 3600) - key = REDIS_KEYS['counters']['logs_per_hour_per_app'].format( - resource_id, current_time.replace(minute=0)) + key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format( + resource_id, current_time.replace(minute=0) + ) redis_pipeline.incr(key, total_logs) redis_pipeline.expire(key, 3600 * 24 * 7) redis_pipeline.sadd( - REDIS_KEYS['apps_that_got_new_data_per_hour'].format( - current_time.replace(minute=0)), resource_id) + REDIS_KEYS["apps_that_got_new_data_per_hour"].format( + current_time.replace(minute=0) + ), + resource_id, + ) redis_pipeline.execute() add_logs_es(es_docs) return True @@ -357,7 +376,7 @@ def add_logs(resource_id, request_params, dataset, **kwargs): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_logs_es(es_docs): for k, v in es_docs.items(): - to_update = {'_index': k, '_type': 'log'} + to_update = {"_index": k, "_type": "log"} [i.update(to_update) for i in v] elasticsearch.helpers.bulk(Datastores.es, v) @@ -371,45 +390,51 @@ def add_metrics(resource_id, request_params, dataset, proto_version): es_docs = [] rows = [] for metric in dataset: - tags = dict(metric['tags']) - server_n = tags.get('server_name', metric['server_name']).lower() - tags['server_name'] = server_n or 'unknown' + tags = dict(metric["tags"]) + server_n = tags.get("server_name", metric["server_name"]).lower() + tags["server_name"] = server_n or "unknown" new_metric = Metric( - timestamp=metric['timestamp'], + timestamp=metric["timestamp"], resource_id=resource.resource_id, - namespace=metric['namespace'], - tags=tags) + namespace=metric["namespace"], + tags=tags, + ) rows.append(new_metric) es_docs.append(new_metric.es_doc()) session = DBSession() session.bulk_save_objects(rows) session.flush() - action = 'METRICS' - metrics_msg = '%s: %s, metrics: %s, proto:%s' % ( + action = "METRICS" + metrics_msg = "%s: %s, metrics: %s, proto:%s" % ( action, str(resource), len(dataset), - proto_version + proto_version, ) log.info(metrics_msg) mark_changed(session) redis_pipeline = Datastores.redis.pipeline(transaction=False) - key = REDIS_KEYS['counters']['metrics_per_minute'].format(current_time) + key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time) redis_pipeline.incr(key, len(rows)) redis_pipeline.expire(key, 3600 * 24) - key = REDIS_KEYS['counters']['events_per_minute_per_user'].format( - resource.owner_user_id, current_time) + key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format( + resource.owner_user_id, current_time + ) redis_pipeline.incr(key, len(rows)) redis_pipeline.expire(key, 3600) - key = REDIS_KEYS['counters']['metrics_per_hour_per_app'].format( - resource_id, current_time.replace(minute=0)) + key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format( + resource_id, current_time.replace(minute=0) + ) redis_pipeline.incr(key, len(rows)) redis_pipeline.expire(key, 3600 * 24 * 7) redis_pipeline.sadd( - REDIS_KEYS['apps_that_got_new_data_per_hour'].format( - current_time.replace(minute=0)), resource_id) + REDIS_KEYS["apps_that_got_new_data_per_hour"].format( + current_time.replace(minute=0) + ), + resource_id, + ) redis_pipeline.execute() add_metrics_es(es_docs) return True @@ -423,8 +448,8 @@ def add_metrics(resource_id, request_params, dataset, proto_version): @celery.task(queue="es", default_retry_delay=600, max_retries=144) def add_metrics_es(es_docs): for doc in es_docs: - partition = 'rcae_m_%s' % doc['timestamp'].strftime('%Y_%m_%d') - Datastores.es.index(partition, 'log', doc) + partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d") + Datastores.es.index(partition, "log", doc) @celery.task(queue="default", default_retry_delay=5, max_retries=2) @@ -435,10 +460,12 @@ def check_user_report_notifications(resource_id): application = ApplicationService.by_id(resource_id) if not application: return - error_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format( - ReportType.error, resource_id) - slow_key = REDIS_KEYS['reports_to_notify_per_type_per_app'].format( - ReportType.slow, resource_id) + error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format( + ReportType.error, resource_id + ) + slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format( + ReportType.slow, resource_id + ) error_group_ids = Datastores.redis.smembers(error_key) slow_group_ids = Datastores.redis.smembers(slow_key) Datastores.redis.delete(error_key) @@ -448,8 +475,7 @@ def check_user_report_notifications(resource_id): group_ids = err_gids + slow_gids occurence_dict = {} for g_id in group_ids: - key = REDIS_KEYS['counters']['report_group_occurences'].format( - g_id) + key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id) val = Datastores.redis.get(key) Datastores.redis.delete(key) if val: @@ -460,14 +486,23 @@ def check_user_report_notifications(resource_id): report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref)) ApplicationService.check_for_groups_alert( - application, 'alert', report_groups=report_groups, - occurence_dict=occurence_dict) - users = set([p.user for p in ResourceService.users_for_perm(application, 'view')]) + application, + "alert", + report_groups=report_groups, + occurence_dict=occurence_dict, + ) + users = set( + [p.user for p in ResourceService.users_for_perm(application, "view")] + ) report_groups = report_groups.all() for user in users: - UserService.report_notify(user, request, application, - report_groups=report_groups, - occurence_dict=occurence_dict) + UserService.report_notify( + user, + request, + application, + report_groups=report_groups, + occurence_dict=occurence_dict, + ) for group in report_groups: # marks report_groups as notified if not group.notified: @@ -485,12 +520,12 @@ def check_alerts(resource_id): application = ApplicationService.by_id(resource_id) if not application: return - error_key = REDIS_KEYS[ - 'reports_to_notify_per_type_per_app_alerting'].format( - ReportType.error, resource_id) - slow_key = REDIS_KEYS[ - 'reports_to_notify_per_type_per_app_alerting'].format( - ReportType.slow, resource_id) + error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format( + ReportType.error, resource_id + ) + slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format( + ReportType.slow, resource_id + ) error_group_ids = Datastores.redis.smembers(error_key) slow_group_ids = Datastores.redis.smembers(slow_key) Datastores.redis.delete(error_key) @@ -500,9 +535,9 @@ def check_alerts(resource_id): group_ids = err_gids + slow_gids occurence_dict = {} for g_id in group_ids: - key = REDIS_KEYS['counters'][ - 'report_group_occurences_alerting'].format( - g_id) + key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format( + g_id + ) val = Datastores.redis.get(key) Datastores.redis.delete(key) if val: @@ -513,8 +548,12 @@ def check_alerts(resource_id): report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref)) ApplicationService.check_for_groups_alert( - application, 'alert', report_groups=report_groups, - occurence_dict=occurence_dict, since_when=since_when) + application, + "alert", + report_groups=report_groups, + occurence_dict=occurence_dict, + since_when=since_when, + ) except Exception as exc: print_traceback(log) raise @@ -522,21 +561,21 @@ def check_alerts(resource_id): @celery.task(queue="default", default_retry_delay=1, max_retries=2) def close_alerts(): - log.warning('Checking alerts') + log.warning("Checking alerts") since_when = datetime.utcnow() try: - event_types = [Event.types['error_report_alert'], - Event.types['slow_report_alert'], ] - statuses = [Event.statuses['active']] + event_types = [ + Event.types["error_report_alert"], + Event.types["slow_report_alert"], + ] + statuses = [Event.statuses["active"]] # get events older than 5 min events = EventService.by_type_and_status( - event_types, - statuses, - older_than=(since_when - timedelta(minutes=5))) + event_types, statuses, older_than=(since_when - timedelta(minutes=5)) + ) for event in events: # see if we can close them - event.validate_or_close( - since_when=(since_when - timedelta(minutes=1))) + event.validate_or_close(since_when=(since_when - timedelta(minutes=1))) except Exception as exc: print_traceback(log) raise @@ -545,12 +584,18 @@ def close_alerts(): @celery.task(queue="default", default_retry_delay=600, max_retries=144) def update_tag_counter(tag_name, tag_value, count): try: - query = DBSession.query(Tag).filter(Tag.name == tag_name).filter( - sa.cast(Tag.value, sa.types.TEXT) == sa.cast(json.dumps(tag_value), - sa.types.TEXT)) - query.update({'times_seen': Tag.times_seen + count, - 'last_timestamp': datetime.utcnow()}, - synchronize_session=False) + query = ( + DBSession.query(Tag) + .filter(Tag.name == tag_name) + .filter( + sa.cast(Tag.value, sa.types.TEXT) + == sa.cast(json.dumps(tag_value), sa.types.TEXT) + ) + ) + query.update( + {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()}, + synchronize_session=False, + ) session = DBSession() mark_changed(session) return True @@ -566,8 +611,8 @@ def update_tag_counters(): """ Sets task to update counters for application tags """ - tags = Datastores.redis.lrange(REDIS_KEYS['seen_tag_list'], 0, -1) - Datastores.redis.delete(REDIS_KEYS['seen_tag_list']) + tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1) + Datastores.redis.delete(REDIS_KEYS["seen_tag_list"]) c = collections.Counter(tags) for t_json, count in c.items(): tag_info = json.loads(t_json) @@ -580,28 +625,34 @@ def daily_digest(): Sends daily digest with top 50 error reports """ request = get_current_request() - apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports']) - Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports']) + apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"]) + Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"]) since_when = datetime.utcnow() - timedelta(hours=8) - log.warning('Generating daily digests') + log.warning("Generating daily digests") for resource_id in apps: - resource_id = resource_id.decode('utf8') + resource_id = resource_id.decode("utf8") end_date = datetime.utcnow().replace(microsecond=0, second=0) - filter_settings = {'resource': [resource_id], - 'tags': [{'name': 'type', - 'value': ['error'], 'op': None}], - 'type': 'error', 'start_date': since_when, - 'end_date': end_date} + filter_settings = { + "resource": [resource_id], + "tags": [{"name": "type", "value": ["error"], "op": None}], + "type": "error", + "start_date": since_when, + "end_date": end_date, + } reports = ReportGroupService.get_trending( - request, filter_settings=filter_settings, limit=50) + request, filter_settings=filter_settings, limit=50 + ) application = ApplicationService.by_id(resource_id) if application: - users = set([p.user for p in ResourceService.users_for_perm(application, 'view')]) + users = set( + [p.user for p in ResourceService.users_for_perm(application, "view")] + ) for user in users: - user.send_digest(request, application, reports=reports, - since_when=since_when) + user.send_digest( + request, application, reports=reports, since_when=since_when + ) @celery.task(queue="default") @@ -610,11 +661,12 @@ def notifications_reports(): Loop that checks redis for info and then issues new tasks to celery to issue notifications """ - apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports']) - Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports']) + apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"]) + Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"]) for app in apps: - log.warning('Notify for app: %s' % app) - check_user_report_notifications.delay(app.decode('utf8')) + log.warning("Notify for app: %s" % app) + check_user_report_notifications.delay(app.decode("utf8")) + @celery.task(queue="default") def alerting_reports(): @@ -624,34 +676,33 @@ def alerting_reports(): - which applications should have new alerts opened """ - apps = Datastores.redis.smembers(REDIS_KEYS['apps_that_had_reports_alerting']) - Datastores.redis.delete(REDIS_KEYS['apps_that_had_reports_alerting']) + apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"]) + Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"]) for app in apps: - log.warning('Notify for app: %s' % app) - check_alerts.delay(app.decode('utf8')) + log.warning("Notify for app: %s" % app) + check_alerts.delay(app.decode("utf8")) -@celery.task(queue="default", soft_time_limit=3600 * 4, - hard_time_limit=3600 * 4, max_retries=144) +@celery.task( + queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144 +) def logs_cleanup(resource_id, filter_settings): request = get_current_request() request.tm.begin() es_query = { "query": { - "filtered": { - "filter": { - "and": [{"term": {"resource_id": resource_id}}] - } - } + "filtered": {"filter": {"and": [{"term": {"resource_id": resource_id}}]}} } } query = DBSession.query(Log).filter(Log.resource_id == resource_id) - if filter_settings['namespace']: - query = query.filter(Log.namespace == filter_settings['namespace'][0]) - es_query['query']['filtered']['filter']['and'].append( - {"term": {"namespace": filter_settings['namespace'][0]}} + if filter_settings["namespace"]: + query = query.filter(Log.namespace == filter_settings["namespace"][0]) + es_query["query"]["filtered"]["filter"]["and"].append( + {"term": {"namespace": filter_settings["namespace"][0]}} ) query.delete(synchronize_session=False) request.tm.commit() - Datastores.es.transport.perform_request("DELETE", '/{}/{}/_query'.format('rcae_l_*', 'log'), body=es_query) + Datastores.es.transport.perform_request( + "DELETE", "/{}/{}/_query".format("rcae_l_*", "log"), body=es_query + ) diff --git a/backend/src/appenlight/fil.py b/backend/src/appenlight/fil.py index 08e5596..79d7115 100644 --- a/backend/src/appenlight/fil.py +++ b/backend/src/appenlight/fil.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + def filter_callable(structure, section=None): - structure['SOMEVAL'] = '***REMOVED***' + structure["SOMEVAL"] = "***REMOVED***" return structure diff --git a/backend/src/appenlight/forms.py b/backend/src/appenlight/forms.py index f294d04..b15bc6b 100644 --- a/backend/src/appenlight/forms.py +++ b/backend/src/appenlight/forms.py @@ -43,7 +43,7 @@ _ = str strip_filter = lambda x: x.strip() if x else None uppercase_filter = lambda x: x.upper() if x else None -FALSE_VALUES = ('false', '', False, None) +FALSE_VALUES = ("false", "", False, None) class CSRFException(Exception): @@ -51,11 +51,14 @@ class CSRFException(Exception): class ReactorForm(SecureForm): - def __init__(self, formdata=None, obj=None, prefix='', csrf_context=None, - **kwargs): - super(ReactorForm, self).__init__(formdata=formdata, obj=obj, - prefix=prefix, - csrf_context=csrf_context, **kwargs) + def __init__(self, formdata=None, obj=None, prefix="", csrf_context=None, **kwargs): + super(ReactorForm, self).__init__( + formdata=formdata, + obj=obj, + prefix=prefix, + csrf_context=csrf_context, + **kwargs + ) self._csrf_context = csrf_context def generate_csrf_token(self, csrf_context): @@ -63,14 +66,14 @@ class ReactorForm(SecureForm): def validate_csrf_token(self, field): request = self._csrf_context or pyramid.threadlocal.get_current_request() - is_from_auth_token = 'auth:auth_token' in request.effective_principals + is_from_auth_token = "auth:auth_token" in request.effective_principals if is_from_auth_token: return True if field.data != field.current_token: # try to save the day by using token from angular - if request.headers.get('X-XSRF-TOKEN') != field.current_token: - raise CSRFException('Invalid CSRF token') + if request.headers.get("X-XSRF-TOKEN") != field.current_token: + raise CSRFException("Invalid CSRF token") @property def errors_dict(self): @@ -105,45 +108,47 @@ class ReactorForm(SecureForm): class SignInForm(ReactorForm): came_from = wtforms.HiddenField() - sign_in_user_name = wtforms.StringField(_('User Name')) - sign_in_user_password = wtforms.PasswordField(_('Password')) + sign_in_user_name = wtforms.StringField(_("User Name")) + sign_in_user_password = wtforms.PasswordField(_("Password")) - ignore_labels = ['submit'] - css_classes = {'submit': 'btn btn-primary'} + ignore_labels = ["submit"] + css_classes = {"submit": "btn btn-primary"} - html_attrs = {'sign_in_user_name': {'placeholder': 'Your login'}, - 'sign_in_user_password': { - 'placeholder': 'Your password'}} + html_attrs = { + "sign_in_user_name": {"placeholder": "Your login"}, + "sign_in_user_password": {"placeholder": "Your password"}, + } from wtforms.widgets import html_params, HTMLString -def select_multi_checkbox(field, ul_class='set', **kwargs): +def select_multi_checkbox(field, ul_class="set", **kwargs): """Render a multi-checkbox widget""" - kwargs.setdefault('type', 'checkbox') - field_id = kwargs.pop('id', field.id) - html = ['") + return HTMLString("".join(html)) -def button_widget(field, button_cls='ButtonField btn btn-default', **kwargs): +def button_widget(field, button_cls="ButtonField btn btn-default", **kwargs): """Render a button widget""" - kwargs.setdefault('type', 'button') - field_id = kwargs.pop('id', field.id) - kwargs.setdefault('value', field.label.text) - html = ['' % (html_params(id=field_id, - class_=button_cls), - kwargs['value'],)] - return HTMLString(''.join(html)) + kwargs.setdefault("type", "button") + field_id = kwargs.pop("id", field.id) + kwargs.setdefault("value", field.label.text) + html = [ + "" + % (html_params(id=field_id, class_=button_cls), kwargs["value"]) + ] + return HTMLString("".join(html)) def clean_whitespace(value): @@ -157,33 +162,32 @@ def found_username_validator(form, field): # sets user to recover in email validator form.field_user = user if not user: - raise wtforms.ValidationError('This username does not exist') + raise wtforms.ValidationError("This username does not exist") def found_username_email_validator(form, field): user = UserService.by_email(field.data) if not user: - raise wtforms.ValidationError('Email is incorrect') + raise wtforms.ValidationError("Email is incorrect") def unique_username_validator(form, field): user = UserService.by_user_name(field.data) if user: - raise wtforms.ValidationError('This username already exists in system') + raise wtforms.ValidationError("This username already exists in system") def unique_groupname_validator(form, field): group = GroupService.by_group_name(field.data) - mod_group = getattr(form, '_modified_group', None) + mod_group = getattr(form, "_modified_group", None) if group and (not mod_group or mod_group.id != group.id): - raise wtforms.ValidationError( - 'This group name already exists in system') + raise wtforms.ValidationError("This group name already exists in system") def unique_email_validator(form, field): user = UserService.by_email(field.data) if user: - raise wtforms.ValidationError('This email already exists in system') + raise wtforms.ValidationError("This email already exists in system") def email_validator(form, field): @@ -196,145 +200,168 @@ def email_validator(form, field): def unique_alert_email_validator(form, field): q = DBSession.query(AlertChannel) - q = q.filter(AlertChannel.channel_name == 'email') + q = q.filter(AlertChannel.channel_name == "email") q = q.filter(AlertChannel.channel_value == field.data) email = q.first() if email: - raise wtforms.ValidationError( - 'This email already exists in alert system') + raise wtforms.ValidationError("This email already exists in alert system") def blocked_email_validator(form, field): blocked_emails = [ - 'goood-mail.org', - 'shoeonlineblog.com', - 'louboutinemart.com', - 'guccibagshere.com', - 'nikeshoesoutletforsale.com' + "goood-mail.org", + "shoeonlineblog.com", + "louboutinemart.com", + "guccibagshere.com", + "nikeshoesoutletforsale.com", ] - data = field.data or '' - domain = data.split('@')[-1] + data = field.data or "" + domain = data.split("@")[-1] if domain in blocked_emails: - raise wtforms.ValidationError('Don\'t spam') + raise wtforms.ValidationError("Don't spam") def old_password_validator(form, field): - if not UserService.check_password(field.user, field.data or ''): - raise wtforms.ValidationError('You need to enter correct password') + if not UserService.check_password(field.user, field.data or ""): + raise wtforms.ValidationError("You need to enter correct password") class UserRegisterForm(ReactorForm): user_name = wtforms.StringField( - _('User Name'), + _("User Name"), filters=[strip_filter], validators=[ wtforms.validators.Length(min=2, max=30), wtforms.validators.Regexp( - re.compile(r'^[\.\w-]+$', re.UNICODE), - message="Invalid characters used"), + re.compile(r"^[\.\w-]+$", re.UNICODE), message="Invalid characters used" + ), unique_username_validator, - wtforms.validators.DataRequired() - ]) + wtforms.validators.DataRequired(), + ], + ) - user_password = wtforms.PasswordField(_('User Password'), - filters=[strip_filter], - validators=[ - wtforms.validators.Length(min=4), - wtforms.validators.DataRequired() - ]) + user_password = wtforms.PasswordField( + _("User Password"), + filters=[strip_filter], + validators=[ + wtforms.validators.Length(min=4), + wtforms.validators.DataRequired(), + ], + ) - email = wtforms.StringField(_('Email Address'), - filters=[strip_filter], - validators=[email_validator, - unique_email_validator, - blocked_email_validator, - wtforms.validators.DataRequired()]) - first_name = wtforms.HiddenField(_('First Name')) - last_name = wtforms.HiddenField(_('Last Name')) + email = wtforms.StringField( + _("Email Address"), + filters=[strip_filter], + validators=[ + email_validator, + unique_email_validator, + blocked_email_validator, + wtforms.validators.DataRequired(), + ], + ) + first_name = wtforms.HiddenField(_("First Name")) + last_name = wtforms.HiddenField(_("Last Name")) - ignore_labels = ['submit'] - css_classes = {'submit': 'btn btn-primary'} + ignore_labels = ["submit"] + css_classes = {"submit": "btn btn-primary"} - html_attrs = {'user_name': {'placeholder': 'Your login'}, - 'user_password': {'placeholder': 'Your password'}, - 'email': {'placeholder': 'Your email'}} + html_attrs = { + "user_name": {"placeholder": "Your login"}, + "user_password": {"placeholder": "Your password"}, + "email": {"placeholder": "Your email"}, + } class UserCreateForm(UserRegisterForm): - status = wtforms.BooleanField('User status', - false_values=FALSE_VALUES) + status = wtforms.BooleanField("User status", false_values=FALSE_VALUES) class UserUpdateForm(UserCreateForm): user_name = None - user_password = wtforms.PasswordField(_('User Password'), - filters=[strip_filter], - validators=[ - wtforms.validators.Length(min=4), - wtforms.validators.Optional() - ]) - email = wtforms.StringField(_('Email Address'), - filters=[strip_filter], - validators=[email_validator, - wtforms.validators.DataRequired()]) + user_password = wtforms.PasswordField( + _("User Password"), + filters=[strip_filter], + validators=[wtforms.validators.Length(min=4), wtforms.validators.Optional()], + ) + email = wtforms.StringField( + _("Email Address"), + filters=[strip_filter], + validators=[email_validator, wtforms.validators.DataRequired()], + ) class LostPasswordForm(ReactorForm): - email = wtforms.StringField(_('Email Address'), - filters=[strip_filter], - validators=[email_validator, - found_username_email_validator, - wtforms.validators.DataRequired()]) + email = wtforms.StringField( + _("Email Address"), + filters=[strip_filter], + validators=[ + email_validator, + found_username_email_validator, + wtforms.validators.DataRequired(), + ], + ) - submit = wtforms.SubmitField(_('Reset password')) - ignore_labels = ['submit'] - css_classes = {'submit': 'btn btn-primary'} + submit = wtforms.SubmitField(_("Reset password")) + ignore_labels = ["submit"] + css_classes = {"submit": "btn btn-primary"} class ChangePasswordForm(ReactorForm): old_password = wtforms.PasswordField( - 'Old Password', + "Old Password", filters=[strip_filter], - validators=[old_password_validator, - wtforms.validators.DataRequired()]) + validators=[old_password_validator, wtforms.validators.DataRequired()], + ) new_password = wtforms.PasswordField( - 'New Password', + "New Password", filters=[strip_filter], - validators=[wtforms.validators.Length(min=4), - wtforms.validators.DataRequired()]) + validators=[ + wtforms.validators.Length(min=4), + wtforms.validators.DataRequired(), + ], + ) new_password_confirm = wtforms.PasswordField( - 'Confirm Password', + "Confirm Password", filters=[strip_filter], - validators=[wtforms.validators.EqualTo('new_password'), - wtforms.validators.DataRequired()]) - submit = wtforms.SubmitField('Change Password') - ignore_labels = ['submit'] - css_classes = {'submit': 'btn btn-primary'} + validators=[ + wtforms.validators.EqualTo("new_password"), + wtforms.validators.DataRequired(), + ], + ) + submit = wtforms.SubmitField("Change Password") + ignore_labels = ["submit"] + css_classes = {"submit": "btn btn-primary"} class CheckPasswordForm(ReactorForm): password = wtforms.PasswordField( - 'Password', + "Password", filters=[strip_filter], - validators=[old_password_validator, - wtforms.validators.DataRequired()]) + validators=[old_password_validator, wtforms.validators.DataRequired()], + ) class NewPasswordForm(ReactorForm): new_password = wtforms.PasswordField( - 'New Password', + "New Password", filters=[strip_filter], - validators=[wtforms.validators.Length(min=4), - wtforms.validators.DataRequired()]) + validators=[ + wtforms.validators.Length(min=4), + wtforms.validators.DataRequired(), + ], + ) new_password_confirm = wtforms.PasswordField( - 'Confirm Password', + "Confirm Password", filters=[strip_filter], - validators=[wtforms.validators.EqualTo('new_password'), - wtforms.validators.DataRequired()]) - submit = wtforms.SubmitField('Set Password') - ignore_labels = ['submit'] - css_classes = {'submit': 'btn btn-primary'} + validators=[ + wtforms.validators.EqualTo("new_password"), + wtforms.validators.DataRequired(), + ], + ) + submit = wtforms.SubmitField("Set Password") + ignore_labels = ["submit"] + css_classes = {"submit": "btn btn-primary"} class CORSTextAreaField(wtforms.StringField): @@ -342,261 +369,290 @@ class CORSTextAreaField(wtforms.StringField): This field represents an HTML ``