##// END OF EJS Templates
Pull request !2405 Created on Tue, 13 Feb 2024 04:51:24, by
  • setup: change url to github
  • readme: provide better descriptions
  • ini: disable secure cookie by default
  • setup.py: include additional package data
  • README: mention getappenlight.com documentation
1 version available for this pull request, show versions.
ver Time Author Commit Description
32 commits hidden, click expand to show them.

The requested changes are too big and content was truncated. Show full diff

@@ -1,43 +1,45 b''
1 1 language: python
2 2
3 3 dist: xenial
4 4
5 5 notifications:
6 6 on_success: change
7 7 on_failure: always
8 8
9 9 matrix:
10 10 include:
11 11 - python: 3.5
12 env: TOXENV=py35
12 env: TOXENV=py35 ES_VERSION=6.6.2 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-${ES_VERSION}.tar.gz
13 13 - python: 3.6
14 env: TOXENV=py36
14 env: TOXENV=py36 ES_VERSION=6.6.2 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-${ES_VERSION}.tar.gz
15 15 addons:
16 16 postgresql: "9.6"
17 17 - python: 3.6
18 env: TOXENV=py36 PGPORT=5432
18 env: TOXENV=py36 PGPORT=5432 ES_VERSION=6.6.2 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-${ES_VERSION}.tar.gz
19 19 addons:
20 20 postgresql: "10"
21 21 apt:
22 22 packages:
23 23 - postgresql-10
24 24 - postgresql-client-10
25 25
26 26 install:
27 - wget ${ES_DOWNLOAD_URL}
28 - tar -xzf elasticsearch-oss-${ES_VERSION}.tar.gz
29 - ./elasticsearch-${ES_VERSION}/bin/elasticsearch &
27 30 - travis_retry pip install -U setuptools pip tox
28 31
29 32 script:
30 - travis_retry tox
33 - travis_retry tox -- -vv
31 34
32 35 services:
33 36 - postgresql
34 - elasticsearch
35 37 - redis
36 38
37 39 before_script:
38 40 - psql -c "create user test with encrypted password 'test';" -U postgres
39 41 - psql -c 'create database appenlight_test owner test;' -U postgres
40 42
41 43 after_success:
42 44 - pip install coveralls
43 45 - coveralls
@@ -1,4 +1,9 b''
1 Visit:
1 # AppEnlight
2
3 Performance, exception, and uptime monitoring for the Web
2 4
5 ![AppEnlight image](https://raw.githubusercontent.com/AppEnlight/appenlight/gh-pages/static/appenlight.png)
6
7 Visit:
3 8
4 9 [Readme moved to backend directory](backend/README.md)
@@ -1,21 +1,28 b''
1 1 # Changelog
2 2
3 3 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
4 4
5 5 <!--
6 6 PRs should document their user-visible changes (if any) in the
7 7 Unreleased section, uncommenting the header as necessary.
8 8 -->
9 9
10 10 <!-- ## Unreleased -->
11 11 <!-- ### Changed -->
12 12 <!-- ### Added -->
13 13 <!-- ### Removed -->
14 14 <!-- ### Fixed -->
15 15
16 16
17 ## [2.0.0rc1 - 2019-04-13]
18 ### Changed
19 * require Elasticsearch 6.x
20 * move data structure to single document per index
21 * got rid of bower and moved to npm in build process
22 * updated angular packages to new versions
23
17 24 ## [1.2.0 - 2019-03-17]
18 25 ### Changed
19 26 * Replaced elasticsearch client
20 27 * Bumped a most deps to latest version
21 28 * Preparing to move from elasticsearch 2.x to modern versions
@@ -1,2 +1,2 b''
1 1 include *.txt *.ini *.cfg *.rst *.md VERSION
2 recursive-include appenlight *.ico *.png *.css *.gif *.jpg *.pt *.txt *.mak *.mako *.js *.html *.xml *.jinja2 *.rst *.otf *.ttf *.svg *.woff *.eot
2 recursive-include src *.ico *.png *.css *.gif *.jpg *.pt *.txt *.mak *.mako *.js *.html *.xml *.jinja2 *.rst *.otf *.ttf *.svg *.woff *.woff2 *.eot
@@ -1,105 +1,112 b''
1 1 AppEnlight
2 2 -----------
3 3
4 Performance, exception, and uptime monitoring for the Web
5
6 ![AppEnlight image](https://raw.githubusercontent.com/AppEnlight/appenlight/gh-pages/static/appenlight.png)
7
4 8 Automatic Installation
5 9 ======================
6 10
7 Use the ansible scripts in the `automation` repository to build complete instance of application
11 Use the ansible or vagrant scripts in the `automation` repository to build complete instance of application.
8 12 You can also use `packer` files in `automation/packer` to create whole VM's for KVM and VMWare.
9 13
14 https://github.com/AppEnlight/automation
15
10 16 Manual Installation
11 17 ===================
12 18
13 19 To run the app you need to have meet prerequsites:
14 20
15 - python 3.5+
16 - running elasticsearch (2.3+/2.4 tested)
17 - running postgresql (9.5+ required)
21 - python 3.5+ (currently 3.6 tested)
22 - running elasticsearch (6.6.2 tested)
23 - running postgresql (9.5+ required, tested 9.6 and 10.6)
18 24 - running redis
19 25
20 26 Install the app by performing
21 27
22 28 pip install -r requirements.txt
23 29
24 30 python setup.py develop
25 31
26 32 Install the appenlight uptime plugin (`ae_uptime_ce` package from `appenlight-uptime-ce` repository).
27 33
28 After installing the application you need to perform following steps:
34 For production usage you can do:
29 35
30 1. (optional) generate production.ini (or use a copy of development.ini)
36 pip install appenlight
37 pip install ae_uptime_ce
31 38
32 39
33 appenlight-make-config production.ini
40 After installing the application you need to perform following steps:
41
42 1. (optional) generate production.ini (or use a copy of development.ini)
34 43
35 2. Setup database structure:
44 appenlight-make-config production.ini
36 45
46 2. Setup database structure (replace filename with the name you picked for `appenlight-make-config`):
37 47
38 appenlight-migratedb -c FILENAME.ini
48 appenlight-migratedb -c FILENAME.ini
39 49
40 50 3. To configure elasticsearch:
41 51
42
43 appenlight-reindex-elasticsearch -t all -c FILENAME.ini
52 appenlight-reindex-elasticsearch -t all -c FILENAME.ini
44 53
45 54 4. Create base database objects
46 55
47 56 (run this command with help flag to see how to create administrator user)
48 57
49
50 appenlight-initializedb -c FILENAME.ini
58 appenlight-initializedb -c FILENAME.ini
51 59
52 60 5. Generate static assets
53 61
54
55 appenlight-static -c FILENAME.ini
62 appenlight-static -c FILENAME.ini
56 63
57 64 Running application
58 65 ===================
59 66
60 67 To run the main app:
61 68
62 pserve development.ini
69 pserve FILENAME.ini
63 70
64 71 To run celery workers:
65 72
66 73 celery worker -A appenlight.celery -Q "reports,logs,metrics,default" --ini FILENAME.ini
67 74
68 75 To run celery beat:
69 76
70 77 celery beat -A appenlight.celery --ini FILENAME.ini
71 78
72 To run appenlight's uptime plugin:
79 To run appenlight's uptime plugin (example of uptime plugin config can be found here
80 https://github.com/AppEnlight/appenlight-uptime-ce ):
73 81
74 appenlight-uptime-monitor -c FILENAME.ini
82 appenlight-uptime-monitor -c UPTIME_PLUGIN_CONFIG_FILENAME.ini
75 83
76 84 Real-time Notifications
77 85 =======================
78 86
79 87 You should also run the `channelstream websocket server for real-time notifications
80 88
81 channelstream -i filename.ini
82
89 channelstream -i CHANELSTRAM_CONFIG_FILENAME.ini
90
91 Additional documentation
92 ========================
93
94 Visit https://getappenlight.com for additional server and client documentation.
95
83 96 Testing
84 97 =======
85 98
86 99 To run test suite:
87 100
88 101 py.test appenlight/tests/tests.py --cov appenlight (this looks for testing.ini in repo root)
89 102
90 103
91 104 Development
92 105 ===========
93 106
94 107 To develop appenlight frontend:
95 108
96 109 cd frontend
97 110 npm install
98 bower install
99 111 grunt watch
100 112
101
102 Tagging release
103 ===============
104
105 bumpversion --current-version 1.1.1 minor --verbose --tag --commit --dry-run
@@ -1,47 +1,47 b''
1 1 repoze.sendmail==4.4.1
2 2 pyramid==1.10.2
3 3 pyramid_tm==2.2.1
4 4 pyramid_debugtoolbar
5 5 pyramid_authstack==1.0.1
6 6 SQLAlchemy==1.2.18
7 7 alembic==1.0.8
8 8 webhelpers2==2.0
9 9 transaction==2.4.0
10 10 zope.sqlalchemy==1.1
11 11 pyramid_mailer==0.15.1
12 12 redis==3.2.1
13 13 redlock-py==1.0.8
14 14 pyramid_jinja2==2.8
15 15 psycopg2-binary==2.7.7
16 16 wtforms==2.2.1
17 17 celery==4.2.1
18 18 formencode==1.3.1
19 19 psutil==5.6.1
20 20 ziggurat_foundations==0.8.3
21 21 bcrypt==3.1.6
22 22 appenlight_client
23 23 markdown==3.0.1
24 24 colander==1.7
25 25 defusedxml==0.5.0
26 26 dogpile.cache==0.7.1
27 27 pyramid_redis_sessions==1.0.1
28 28 simplejson==3.16.0
29 29 waitress==1.2.1
30 30 gunicorn==19.9.0
31 31 uwsgi==2.0.18
32 32 requests==2.21.0
33 33 requests_oauthlib==1.2.0
34 34 gevent==1.4.0
35 35 pygments==2.3.1
36 36 lxml==4.3.2
37 37 paginate==0.5.6
38 38 paginate-sqlalchemy==0.3.0
39 elasticsearch>=2.0.0,<3.0.0
39 elasticsearch>=6.0.0,<7.0.0
40 40 mock==1.0.1
41 41 itsdangerous==1.1.0
42 42 camplight==0.9.6
43 43 jira==1.0.7
44 44 python-dateutil==2.5.3
45 45 authomatic==0.1.0.post1
46 46 cryptography==2.6.1
47 47
@@ -1,89 +1,99 b''
1 1 import os
2 2 import re
3 3
4 4 from setuptools import setup, find_packages
5 5
6 6 here = os.path.abspath(os.path.dirname(__file__))
7 7 README = open(os.path.join(here, "README.md")).read()
8 8 CHANGES = open(os.path.join(here, "CHANGELOG.md")).read()
9 9
10 10 REQUIREMENTS = open(os.path.join(here, "requirements.txt")).readlines()
11 11
12 12 compiled = re.compile("([^=><]*).*")
13 13
14 14
15 15 def parse_req(req):
16 16 return compiled.search(req).group(1).strip()
17 17
18 18
19 requires = [_f for _f in map(parse_req, REQUIREMENTS) if _f]
19 if "APPENLIGHT_DEVELOP" in os.environ:
20 requires = [_f for _f in map(parse_req, REQUIREMENTS) if _f]
21 else:
22 requires = REQUIREMENTS
20 23
21 24
22 25 def _get_meta_var(name, data, callback_handler=None):
23 26 import re
24 27
25 28 matches = re.compile(r"(?:%s)\s*=\s*(.*)" % name).search(data)
26 29 if matches:
27 30 if not callable(callback_handler):
28 31 callback_handler = lambda v: v
29 32
30 33 return callback_handler(eval(matches.groups()[0]))
31 34
32 35
33 36 with open(os.path.join(here, "src", "appenlight", "__init__.py"), "r") as _meta:
34 37 _metadata = _meta.read()
35 38
36 with open(os.path.join(here, "VERSION"), "r") as _meta_version:
37 __version__ = _meta_version.read().strip()
38
39 39 __license__ = _get_meta_var("__license__", _metadata)
40 40 __author__ = _get_meta_var("__author__", _metadata)
41 41 __url__ = _get_meta_var("__url__", _metadata)
42 42
43 43 found_packages = find_packages("src")
44 found_packages.append("appenlight.migrations")
44 45 found_packages.append("appenlight.migrations.versions")
45 46 setup(
46 47 name="appenlight",
47 48 description="appenlight",
48 long_description=README + "\n\n" + CHANGES,
49 long_description=README,
49 50 classifiers=[
51 "Framework :: Pyramid",
52 "License :: OSI Approved :: Apache Software License",
50 53 "Programming Language :: Python",
51 "Framework :: Pylons",
54 "Programming Language :: Python :: 3 :: Only",
55 "Programming Language :: Python :: 3.6",
56 "Topic :: System :: Monitoring",
57 "Topic :: Software Development",
58 "Topic :: Software Development :: Bug Tracking",
59 "Topic :: Internet :: Log Analysis",
52 60 "Topic :: Internet :: WWW/HTTP",
53 61 "Topic :: Internet :: WWW/HTTP :: WSGI :: Application",
54 62 ],
55 version=__version__,
63 version="2.0.0rc1",
56 64 license=__license__,
57 65 author=__author__,
58 url=__url__,
59 keywords="web wsgi bfg pylons pyramid",
66 url="https://github.com/AppEnlight/appenlight",
67 keywords="web wsgi bfg pylons pyramid flask django monitoring apm instrumentation appenlight",
68 python_requires=">=3.5",
69 long_description_content_type="text/markdown",
60 70 package_dir={"": "src"},
61 71 packages=found_packages,
62 72 include_package_data=True,
63 73 zip_safe=False,
64 74 test_suite="appenlight",
65 75 install_requires=requires,
66 76 extras_require={
67 77 "dev": [
68 78 "coverage",
69 79 "pytest",
70 80 "pyramid",
71 81 "tox",
72 82 "mock",
73 83 "pytest-mock",
74 84 "webtest",
75 85 ],
76 86 "lint": ["black"],
77 87 },
78 88 entry_points={
79 89 "paste.app_factory": ["main = appenlight:main"],
80 90 "console_scripts": [
81 91 "appenlight-cleanup = appenlight.scripts.cleanup:main",
82 92 "appenlight-initializedb = appenlight.scripts.initialize_db:main",
83 93 "appenlight-migratedb = appenlight.scripts.migratedb:main",
84 94 "appenlight-reindex-elasticsearch = appenlight.scripts.reindex_elasticsearch:main",
85 95 "appenlight-static = appenlight.scripts.static:main",
86 96 "appenlight-make-config = appenlight.scripts.make_config:main",
87 97 ],
88 98 },
89 99 )
@@ -1,708 +1,705 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import bisect
18 18 import collections
19 19 import math
20 20 from datetime import datetime, timedelta
21 21
22 22 import sqlalchemy as sa
23 23 import elasticsearch.exceptions
24 24 import elasticsearch.helpers
25 25
26 26 from celery.utils.log import get_task_logger
27 27 from zope.sqlalchemy import mark_changed
28 28 from pyramid.threadlocal import get_current_request, get_current_registry
29 29 from ziggurat_foundations.models.services.resource import ResourceService
30 30
31 31 from appenlight.celery import celery
32 32 from appenlight.models.report_group import ReportGroup
33 33 from appenlight.models import DBSession, Datastores
34 34 from appenlight.models.report import Report
35 35 from appenlight.models.log import Log
36 36 from appenlight.models.metric import Metric
37 37 from appenlight.models.event import Event
38 38
39 39 from appenlight.models.services.application import ApplicationService
40 40 from appenlight.models.services.event import EventService
41 41 from appenlight.models.services.log import LogService
42 42 from appenlight.models.services.report import ReportService
43 43 from appenlight.models.services.report_group import ReportGroupService
44 44 from appenlight.models.services.user import UserService
45 45 from appenlight.models.tag import Tag
46 46 from appenlight.lib import print_traceback
47 47 from appenlight.lib.utils import parse_proto, in_batches
48 48 from appenlight.lib.ext_json import json
49 49 from appenlight.lib.redis_keys import REDIS_KEYS
50 50 from appenlight.lib.enums import ReportType
51 51
52 52 log = get_task_logger(__name__)
53 53
54 54 sample_boundries = (
55 55 list(range(100, 1000, 100))
56 56 + list(range(1000, 10000, 1000))
57 57 + list(range(10000, 100000, 5000))
58 58 )
59 59
60 60
61 61 def pick_sample(total_occurences, report_type=None):
62 62 every = 1.0
63 63 position = bisect.bisect_left(sample_boundries, total_occurences)
64 64 if position > 0:
65 65 if report_type == ReportType.not_found:
66 66 divide = 10.0
67 67 else:
68 68 divide = 100.0
69 69 every = sample_boundries[position - 1] / divide
70 70 return total_occurences % every == 0
71 71
72 72
73 73 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
74 74 def test_exception_task():
75 75 log.error("test celery log", extra={"location": "celery"})
76 76 log.warning("test celery log", extra={"location": "celery"})
77 77 raise Exception("Celery exception test")
78 78
79 79
80 80 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
81 81 def test_retry_exception_task():
82 82 try:
83 83 import time
84 84
85 85 time.sleep(1.3)
86 86 log.error("test retry celery log", extra={"location": "celery"})
87 87 log.warning("test retry celery log", extra={"location": "celery"})
88 88 raise Exception("Celery exception test")
89 89 except Exception as exc:
90 90 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
91 91 raise
92 92 test_retry_exception_task.retry(exc=exc)
93 93
94 94
95 95 @celery.task(queue="reports", default_retry_delay=600, max_retries=144)
96 96 def add_reports(resource_id, request_params, dataset, **kwargs):
97 97 proto_version = parse_proto(request_params.get("protocol_version", ""))
98 98 current_time = datetime.utcnow().replace(second=0, microsecond=0)
99 99 try:
100 100 # we will store solr docs here for single insert
101 101 es_report_docs = {}
102 102 es_report_group_docs = {}
103 103 resource = ApplicationService.by_id(resource_id)
104 104
105 105 tags = []
106 106 es_slow_calls_docs = {}
107 107 es_reports_stats_rows = {}
108 108 for report_data in dataset:
109 109 # build report details for later
110 110 added_details = 0
111 111 report = Report()
112 112 report.set_data(report_data, resource, proto_version)
113 113 report._skip_ft_index = True
114 114
115 115 # find latest group in this months partition
116 116 report_group = ReportGroupService.by_hash_and_resource(
117 117 report.resource_id,
118 118 report.grouping_hash,
119 119 since_when=datetime.utcnow().date().replace(day=1),
120 120 )
121 121 occurences = report_data.get("occurences", 1)
122 122 if not report_group:
123 123 # total reports will be +1 moment later
124 124 report_group = ReportGroup(
125 125 grouping_hash=report.grouping_hash,
126 126 occurences=0,
127 127 total_reports=0,
128 128 last_report=0,
129 129 priority=report.priority,
130 130 error=report.error,
131 131 first_timestamp=report.start_time,
132 132 )
133 133 report_group._skip_ft_index = True
134 134 report_group.report_type = report.report_type
135 135 report.report_group_time = report_group.first_timestamp
136 136 add_sample = pick_sample(
137 137 report_group.occurences, report_type=report_group.report_type
138 138 )
139 139 if add_sample:
140 140 resource.report_groups.append(report_group)
141 141 report_group.reports.append(report)
142 142 added_details += 1
143 143 DBSession.flush()
144 144 if report.partition_id not in es_report_docs:
145 145 es_report_docs[report.partition_id] = []
146 146 es_report_docs[report.partition_id].append(report.es_doc())
147 147 tags.extend(list(report.tags.items()))
148 148 slow_calls = report.add_slow_calls(report_data, report_group)
149 149 DBSession.flush()
150 150 for s_call in slow_calls:
151 151 if s_call.partition_id not in es_slow_calls_docs:
152 152 es_slow_calls_docs[s_call.partition_id] = []
153 153 es_slow_calls_docs[s_call.partition_id].append(s_call.es_doc())
154 154 # try generating new stat rows if needed
155 155 else:
156 156 # required for postprocessing to not fail later
157 157 report.report_group = report_group
158 158
159 159 stat_row = ReportService.generate_stat_rows(report, resource, report_group)
160 160 if stat_row.partition_id not in es_reports_stats_rows:
161 161 es_reports_stats_rows[stat_row.partition_id] = []
162 162 es_reports_stats_rows[stat_row.partition_id].append(stat_row.es_doc())
163 163
164 164 # see if we should mark 10th occurence of report
165 165 last_occurences_10 = int(math.floor(report_group.occurences / 10))
166 166 curr_occurences_10 = int(
167 167 math.floor((report_group.occurences + report.occurences) / 10)
168 168 )
169 169 last_occurences_100 = int(math.floor(report_group.occurences / 100))
170 170 curr_occurences_100 = int(
171 171 math.floor((report_group.occurences + report.occurences) / 100)
172 172 )
173 173 notify_occurences_10 = last_occurences_10 != curr_occurences_10
174 174 notify_occurences_100 = last_occurences_100 != curr_occurences_100
175 175 report_group.occurences = ReportGroup.occurences + occurences
176 176 report_group.last_timestamp = report.start_time
177 177 report_group.summed_duration = ReportGroup.summed_duration + report.duration
178 178 summed_duration = ReportGroup.summed_duration + report.duration
179 179 summed_occurences = ReportGroup.occurences + occurences
180 180 report_group.average_duration = summed_duration / summed_occurences
181 181 report_group.run_postprocessing(report)
182 182 if added_details:
183 183 report_group.total_reports = ReportGroup.total_reports + 1
184 184 report_group.last_report = report.id
185 185 report_group.set_notification_info(
186 186 notify_10=notify_occurences_10, notify_100=notify_occurences_100
187 187 )
188 188 DBSession.flush()
189 189 report_group.get_report().notify_channel(report_group)
190 190 if report_group.partition_id not in es_report_group_docs:
191 191 es_report_group_docs[report_group.partition_id] = []
192 192 es_report_group_docs[report_group.partition_id].append(
193 193 report_group.es_doc()
194 194 )
195 195
196 196 action = "REPORT"
197 197 log_msg = "%s: %s %s, client: %s, proto: %s" % (
198 198 action,
199 199 report_data.get("http_status", "unknown"),
200 200 str(resource),
201 201 report_data.get("client"),
202 202 proto_version,
203 203 )
204 204 log.info(log_msg)
205 205 total_reports = len(dataset)
206 206 redis_pipeline = Datastores.redis.pipeline(transaction=False)
207 207 key = REDIS_KEYS["counters"]["reports_per_minute"].format(current_time)
208 208 redis_pipeline.incr(key, total_reports)
209 209 redis_pipeline.expire(key, 3600 * 24)
210 210 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
211 211 resource.owner_user_id, current_time
212 212 )
213 213 redis_pipeline.incr(key, total_reports)
214 214 redis_pipeline.expire(key, 3600)
215 215 key = REDIS_KEYS["counters"]["reports_per_hour_per_app"].format(
216 216 resource_id, current_time.replace(minute=0)
217 217 )
218 218 redis_pipeline.incr(key, total_reports)
219 219 redis_pipeline.expire(key, 3600 * 24 * 7)
220 220 redis_pipeline.sadd(
221 221 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
222 222 current_time.replace(minute=0)
223 223 ),
224 224 resource_id,
225 225 )
226 226 redis_pipeline.execute()
227 227
228 228 add_reports_es(es_report_group_docs, es_report_docs)
229 229 add_reports_slow_calls_es(es_slow_calls_docs)
230 230 add_reports_stats_rows_es(es_reports_stats_rows)
231 231 return True
232 232 except Exception as exc:
233 233 print_traceback(log)
234 234 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
235 235 raise
236 236 add_reports.retry(exc=exc)
237 237
238 238
239 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
240 240 def add_reports_es(report_group_docs, report_docs):
241 241 for k, v in report_group_docs.items():
242 to_update = {"_index": k, "_type": "report_group"}
242 to_update = {"_index": k, "_type": "report"}
243 243 [i.update(to_update) for i in v]
244 244 elasticsearch.helpers.bulk(Datastores.es, v)
245 245 for k, v in report_docs.items():
246 246 to_update = {"_index": k, "_type": "report"}
247 247 [i.update(to_update) for i in v]
248 248 elasticsearch.helpers.bulk(Datastores.es, v)
249 249
250 250
251 251 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
252 252 def add_reports_slow_calls_es(es_docs):
253 253 for k, v in es_docs.items():
254 254 to_update = {"_index": k, "_type": "log"}
255 255 [i.update(to_update) for i in v]
256 256 elasticsearch.helpers.bulk(Datastores.es, v)
257 257
258 258
259 259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
260 260 def add_reports_stats_rows_es(es_docs):
261 261 for k, v in es_docs.items():
262 to_update = {"_index": k, "_type": "log"}
262 to_update = {"_index": k, "_type": "report"}
263 263 [i.update(to_update) for i in v]
264 264 elasticsearch.helpers.bulk(Datastores.es, v)
265 265
266 266
267 267 @celery.task(queue="logs", default_retry_delay=600, max_retries=144)
268 268 def add_logs(resource_id, request_params, dataset, **kwargs):
269 269 proto_version = request_params.get("protocol_version")
270 270 current_time = datetime.utcnow().replace(second=0, microsecond=0)
271 271
272 272 try:
273 273 es_docs = collections.defaultdict(list)
274 274 resource = ApplicationService.by_id_cached()(resource_id)
275 275 resource = DBSession.merge(resource, load=False)
276 276 ns_pairs = []
277 277 for entry in dataset:
278 278 # gather pk and ns so we can remove older versions of row later
279 279 if entry["primary_key"] is not None:
280 280 ns_pairs.append({"pk": entry["primary_key"], "ns": entry["namespace"]})
281 281 log_entry = Log()
282 282 log_entry.set_data(entry, resource=resource)
283 283 log_entry._skip_ft_index = True
284 284 resource.logs.append(log_entry)
285 285 DBSession.flush()
286 286 # insert non pk rows first
287 287 if entry["primary_key"] is None:
288 288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 289
290 # 2nd pass to delete all log entries from db foe same pk/ns pair
290 # 2nd pass to delete all log entries from db for same pk/ns pair
291 291 if ns_pairs:
292 292 ids_to_delete = []
293 293 es_docs = collections.defaultdict(list)
294 294 es_docs_to_delete = collections.defaultdict(list)
295 295 found_pkey_logs = LogService.query_by_primary_key_and_namespace(
296 296 list_of_pairs=ns_pairs
297 297 )
298 298 log_dict = {}
299 299 for log_entry in found_pkey_logs:
300 300 log_key = (log_entry.primary_key, log_entry.namespace)
301 301 if log_key not in log_dict:
302 302 log_dict[log_key] = []
303 303 log_dict[log_key].append(log_entry)
304 304
305 305 for ns, entry_list in log_dict.items():
306 306 entry_list = sorted(entry_list, key=lambda x: x.timestamp)
307 307 # newest row needs to be indexed in es
308 308 log_entry = entry_list[-1]
309 309 # delete everything from pg and ES, leave the last row in pg
310 310 for e in entry_list[:-1]:
311 311 ids_to_delete.append(e.log_id)
312 312 es_docs_to_delete[e.partition_id].append(e.delete_hash)
313 313
314 314 es_docs_to_delete[log_entry.partition_id].append(log_entry.delete_hash)
315 315
316 316 es_docs[log_entry.partition_id].append(log_entry.es_doc())
317 317
318 318 if ids_to_delete:
319 319 query = DBSession.query(Log).filter(Log.log_id.in_(ids_to_delete))
320 320 query.delete(synchronize_session=False)
321 321 if es_docs_to_delete:
322 322 # batch this to avoid problems with default ES bulk limits
323 323 for es_index in es_docs_to_delete.keys():
324 324 for batch in in_batches(es_docs_to_delete[es_index], 20):
325 325 query = {"query": {"terms": {"delete_hash": batch}}}
326 326
327 327 try:
328 Datastores.es.transport.perform_request(
329 "DELETE",
330 "/{}/{}/_query".format(es_index, "log"),
328 Datastores.es.delete_by_query(
329 index=es_index,
330 doc_type="log",
331 331 body=query,
332 conflicts="proceed",
332 333 )
333 334 except elasticsearch.exceptions.NotFoundError as exc:
334 335 msg = "skipping index {}".format(es_index)
335 336 log.info(msg)
336 337
337 338 total_logs = len(dataset)
338 339
339 340 log_msg = "LOG_NEW: %s, entries: %s, proto:%s" % (
340 341 str(resource),
341 342 total_logs,
342 343 proto_version,
343 344 )
344 345 log.info(log_msg)
345 346 # mark_changed(session)
346 347 redis_pipeline = Datastores.redis.pipeline(transaction=False)
347 348 key = REDIS_KEYS["counters"]["logs_per_minute"].format(current_time)
348 349 redis_pipeline.incr(key, total_logs)
349 350 redis_pipeline.expire(key, 3600 * 24)
350 351 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
351 352 resource.owner_user_id, current_time
352 353 )
353 354 redis_pipeline.incr(key, total_logs)
354 355 redis_pipeline.expire(key, 3600)
355 356 key = REDIS_KEYS["counters"]["logs_per_hour_per_app"].format(
356 357 resource_id, current_time.replace(minute=0)
357 358 )
358 359 redis_pipeline.incr(key, total_logs)
359 360 redis_pipeline.expire(key, 3600 * 24 * 7)
360 361 redis_pipeline.sadd(
361 362 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
362 363 current_time.replace(minute=0)
363 364 ),
364 365 resource_id,
365 366 )
366 367 redis_pipeline.execute()
367 368 add_logs_es(es_docs)
368 369 return True
369 370 except Exception as exc:
370 371 print_traceback(log)
371 372 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
372 373 raise
373 374 add_logs.retry(exc=exc)
374 375
375 376
376 377 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
377 378 def add_logs_es(es_docs):
378 379 for k, v in es_docs.items():
379 380 to_update = {"_index": k, "_type": "log"}
380 381 [i.update(to_update) for i in v]
381 382 elasticsearch.helpers.bulk(Datastores.es, v)
382 383
383 384
384 385 @celery.task(queue="metrics", default_retry_delay=600, max_retries=144)
385 386 def add_metrics(resource_id, request_params, dataset, proto_version):
386 387 current_time = datetime.utcnow().replace(second=0, microsecond=0)
387 388 try:
388 389 resource = ApplicationService.by_id_cached()(resource_id)
389 390 resource = DBSession.merge(resource, load=False)
390 391 es_docs = []
391 392 rows = []
392 393 for metric in dataset:
393 394 tags = dict(metric["tags"])
394 395 server_n = tags.get("server_name", metric["server_name"]).lower()
395 396 tags["server_name"] = server_n or "unknown"
396 397 new_metric = Metric(
397 398 timestamp=metric["timestamp"],
398 399 resource_id=resource.resource_id,
399 400 namespace=metric["namespace"],
400 401 tags=tags,
401 402 )
402 403 rows.append(new_metric)
403 404 es_docs.append(new_metric.es_doc())
404 405 session = DBSession()
405 406 session.bulk_save_objects(rows)
406 407 session.flush()
407 408
408 409 action = "METRICS"
409 410 metrics_msg = "%s: %s, metrics: %s, proto:%s" % (
410 411 action,
411 412 str(resource),
412 413 len(dataset),
413 414 proto_version,
414 415 )
415 416 log.info(metrics_msg)
416 417
417 418 mark_changed(session)
418 419 redis_pipeline = Datastores.redis.pipeline(transaction=False)
419 420 key = REDIS_KEYS["counters"]["metrics_per_minute"].format(current_time)
420 421 redis_pipeline.incr(key, len(rows))
421 422 redis_pipeline.expire(key, 3600 * 24)
422 423 key = REDIS_KEYS["counters"]["events_per_minute_per_user"].format(
423 424 resource.owner_user_id, current_time
424 425 )
425 426 redis_pipeline.incr(key, len(rows))
426 427 redis_pipeline.expire(key, 3600)
427 428 key = REDIS_KEYS["counters"]["metrics_per_hour_per_app"].format(
428 429 resource_id, current_time.replace(minute=0)
429 430 )
430 431 redis_pipeline.incr(key, len(rows))
431 432 redis_pipeline.expire(key, 3600 * 24 * 7)
432 433 redis_pipeline.sadd(
433 434 REDIS_KEYS["apps_that_got_new_data_per_hour"].format(
434 435 current_time.replace(minute=0)
435 436 ),
436 437 resource_id,
437 438 )
438 439 redis_pipeline.execute()
439 440 add_metrics_es(es_docs)
440 441 return True
441 442 except Exception as exc:
442 443 print_traceback(log)
443 444 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
444 445 raise
445 446 add_metrics.retry(exc=exc)
446 447
447 448
448 449 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
449 450 def add_metrics_es(es_docs):
450 451 for doc in es_docs:
451 452 partition = "rcae_m_%s" % doc["timestamp"].strftime("%Y_%m_%d")
452 453 Datastores.es.index(partition, "log", doc)
453 454
454 455
455 456 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
456 457 def check_user_report_notifications(resource_id):
457 458 since_when = datetime.utcnow()
458 459 try:
459 460 request = get_current_request()
460 461 application = ApplicationService.by_id(resource_id)
461 462 if not application:
462 463 return
463 464 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
464 465 ReportType.error, resource_id
465 466 )
466 467 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
467 468 ReportType.slow, resource_id
468 469 )
469 470 error_group_ids = Datastores.redis.smembers(error_key)
470 471 slow_group_ids = Datastores.redis.smembers(slow_key)
471 472 Datastores.redis.delete(error_key)
472 473 Datastores.redis.delete(slow_key)
473 474 err_gids = [int(g_id) for g_id in error_group_ids]
474 475 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
475 476 group_ids = err_gids + slow_gids
476 477 occurence_dict = {}
477 478 for g_id in group_ids:
478 479 key = REDIS_KEYS["counters"]["report_group_occurences"].format(g_id)
479 480 val = Datastores.redis.get(key)
480 481 Datastores.redis.delete(key)
481 482 if val:
482 483 occurence_dict[g_id] = int(val)
483 484 else:
484 485 occurence_dict[g_id] = 1
485 486 report_groups = ReportGroupService.by_ids(group_ids)
486 487 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
487 488
488 489 ApplicationService.check_for_groups_alert(
489 490 application,
490 491 "alert",
491 492 report_groups=report_groups,
492 493 occurence_dict=occurence_dict,
493 494 )
494 495 users = set(
495 496 [p.user for p in ResourceService.users_for_perm(application, "view")]
496 497 )
497 498 report_groups = report_groups.all()
498 499 for user in users:
499 500 UserService.report_notify(
500 501 user,
501 502 request,
502 503 application,
503 504 report_groups=report_groups,
504 505 occurence_dict=occurence_dict,
505 506 )
506 507 for group in report_groups:
507 508 # marks report_groups as notified
508 509 if not group.notified:
509 510 group.notified = True
510 511 except Exception as exc:
511 512 print_traceback(log)
512 513 raise
513 514
514 515
515 516 @celery.task(queue="default", default_retry_delay=5, max_retries=2)
516 517 def check_alerts(resource_id):
517 518 since_when = datetime.utcnow()
518 519 try:
519 520 request = get_current_request()
520 521 application = ApplicationService.by_id(resource_id)
521 522 if not application:
522 523 return
523 524 error_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
524 525 ReportType.error, resource_id
525 526 )
526 527 slow_key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
527 528 ReportType.slow, resource_id
528 529 )
529 530 error_group_ids = Datastores.redis.smembers(error_key)
530 531 slow_group_ids = Datastores.redis.smembers(slow_key)
531 532 Datastores.redis.delete(error_key)
532 533 Datastores.redis.delete(slow_key)
533 534 err_gids = [int(g_id) for g_id in error_group_ids]
534 535 slow_gids = [int(g_id) for g_id in list(slow_group_ids)]
535 536 group_ids = err_gids + slow_gids
536 537 occurence_dict = {}
537 538 for g_id in group_ids:
538 539 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(
539 540 g_id
540 541 )
541 542 val = Datastores.redis.get(key)
542 543 Datastores.redis.delete(key)
543 544 if val:
544 545 occurence_dict[g_id] = int(val)
545 546 else:
546 547 occurence_dict[g_id] = 1
547 548 report_groups = ReportGroupService.by_ids(group_ids)
548 549 report_groups.options(sa.orm.joinedload(ReportGroup.last_report_ref))
549 550
550 551 ApplicationService.check_for_groups_alert(
551 552 application,
552 553 "alert",
553 554 report_groups=report_groups,
554 555 occurence_dict=occurence_dict,
555 556 since_when=since_when,
556 557 )
557 558 except Exception as exc:
558 559 print_traceback(log)
559 560 raise
560 561
561 562
562 563 @celery.task(queue="default", default_retry_delay=1, max_retries=2)
563 564 def close_alerts():
564 565 log.warning("Checking alerts")
565 566 since_when = datetime.utcnow()
566 567 try:
567 568 event_types = [
568 569 Event.types["error_report_alert"],
569 570 Event.types["slow_report_alert"],
570 571 ]
571 572 statuses = [Event.statuses["active"]]
572 573 # get events older than 5 min
573 574 events = EventService.by_type_and_status(
574 575 event_types, statuses, older_than=(since_when - timedelta(minutes=5))
575 576 )
576 577 for event in events:
577 578 # see if we can close them
578 579 event.validate_or_close(since_when=(since_when - timedelta(minutes=1)))
579 580 except Exception as exc:
580 581 print_traceback(log)
581 582 raise
582 583
583 584
584 585 @celery.task(queue="default", default_retry_delay=600, max_retries=144)
585 586 def update_tag_counter(tag_name, tag_value, count):
586 587 try:
587 588 query = (
588 589 DBSession.query(Tag)
589 590 .filter(Tag.name == tag_name)
590 591 .filter(
591 592 sa.cast(Tag.value, sa.types.TEXT)
592 593 == sa.cast(json.dumps(tag_value), sa.types.TEXT)
593 594 )
594 595 )
595 596 query.update(
596 597 {"times_seen": Tag.times_seen + count, "last_timestamp": datetime.utcnow()},
597 598 synchronize_session=False,
598 599 )
599 600 session = DBSession()
600 601 mark_changed(session)
601 602 return True
602 603 except Exception as exc:
603 604 print_traceback(log)
604 605 if celery.conf["CELERY_EAGER_PROPAGATES_EXCEPTIONS"]:
605 606 raise
606 607 update_tag_counter.retry(exc=exc)
607 608
608 609
609 610 @celery.task(queue="default")
610 611 def update_tag_counters():
611 612 """
612 613 Sets task to update counters for application tags
613 614 """
614 615 tags = Datastores.redis.lrange(REDIS_KEYS["seen_tag_list"], 0, -1)
615 616 Datastores.redis.delete(REDIS_KEYS["seen_tag_list"])
616 617 c = collections.Counter(tags)
617 618 for t_json, count in c.items():
618 619 tag_info = json.loads(t_json)
619 620 update_tag_counter.delay(tag_info[0], tag_info[1], count)
620 621
621 622
622 623 @celery.task(queue="default")
623 624 def daily_digest():
624 625 """
625 626 Sends daily digest with top 50 error reports
626 627 """
627 628 request = get_current_request()
628 629 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
629 630 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
630 631 since_when = datetime.utcnow() - timedelta(hours=8)
631 632 log.warning("Generating daily digests")
632 633 for resource_id in apps:
633 634 resource_id = resource_id.decode("utf8")
634 635 end_date = datetime.utcnow().replace(microsecond=0, second=0)
635 636 filter_settings = {
636 637 "resource": [resource_id],
637 638 "tags": [{"name": "type", "value": ["error"], "op": None}],
638 639 "type": "error",
639 640 "start_date": since_when,
640 641 "end_date": end_date,
641 642 }
642 643
643 644 reports = ReportGroupService.get_trending(
644 645 request, filter_settings=filter_settings, limit=50
645 646 )
646 647
647 648 application = ApplicationService.by_id(resource_id)
648 649 if application:
649 650 users = set(
650 651 [p.user for p in ResourceService.users_for_perm(application, "view")]
651 652 )
652 653 for user in users:
653 654 user.send_digest(
654 655 request, application, reports=reports, since_when=since_when
655 656 )
656 657
657 658
658 659 @celery.task(queue="default")
659 660 def notifications_reports():
660 661 """
661 662 Loop that checks redis for info and then issues new tasks to celery to
662 663 issue notifications
663 664 """
664 665 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports"])
665 666 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports"])
666 667 for app in apps:
667 668 log.warning("Notify for app: %s" % app)
668 669 check_user_report_notifications.delay(app.decode("utf8"))
669 670
670 671
671 672 @celery.task(queue="default")
672 673 def alerting_reports():
673 674 """
674 675 Loop that checks redis for info and then issues new tasks to celery to
675 676 perform the following:
676 677 - which applications should have new alerts opened
677 678 """
678 679
679 680 apps = Datastores.redis.smembers(REDIS_KEYS["apps_that_had_reports_alerting"])
680 681 Datastores.redis.delete(REDIS_KEYS["apps_that_had_reports_alerting"])
681 682 for app in apps:
682 683 log.warning("Notify for app: %s" % app)
683 684 check_alerts.delay(app.decode("utf8"))
684 685
685 686
686 687 @celery.task(
687 688 queue="default", soft_time_limit=3600 * 4, hard_time_limit=3600 * 4, max_retries=144
688 689 )
689 690 def logs_cleanup(resource_id, filter_settings):
690 691 request = get_current_request()
691 692 request.tm.begin()
692 es_query = {
693 "query": {
694 "bool": {"filter": [{"term": {"resource_id": resource_id}}]}
695 }
696 }
693 es_query = {"query": {"bool": {"filter": [{"term": {"resource_id": resource_id}}]}}}
697 694
698 695 query = DBSession.query(Log).filter(Log.resource_id == resource_id)
699 696 if filter_settings["namespace"]:
700 697 query = query.filter(Log.namespace == filter_settings["namespace"][0])
701 698 es_query["query"]["bool"]["filter"].append(
702 699 {"term": {"namespace": filter_settings["namespace"][0]}}
703 700 )
704 701 query.delete(synchronize_session=False)
705 702 request.tm.commit()
706 Datastores.es.transport.perform_request(
707 "DELETE", "/{}/{}/_query".format("rcae_l_*", "log"), body=es_query
703 Datastores.es.delete_by_query(
704 index="rcae_l_*", doc_type="log", body=es_query, conflicts="proceed"
708 705 )
@@ -1,558 +1,560 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 """
18 18 Utility functions.
19 19 """
20 20 import logging
21 21 import requests
22 22 import hashlib
23 23 import json
24 24 import copy
25 25 import uuid
26 26 import appenlight.lib.helpers as h
27 27 from collections import namedtuple
28 28 from datetime import timedelta, datetime, date
29 29 from dogpile.cache.api import NO_VALUE
30 30 from appenlight.models import Datastores
31 31 from appenlight.validators import LogSearchSchema, TagListSchema, accepted_search_params
32 32 from itsdangerous import TimestampSigner
33 33 from ziggurat_foundations.permissions import ALL_PERMISSIONS
34 34 from ziggurat_foundations.models.services.user import UserService
35 35 from dateutil.relativedelta import relativedelta
36 36 from dateutil.rrule import rrule, MONTHLY, DAILY
37 37
38 38 log = logging.getLogger(__name__)
39 39
40 40
41 41 Stat = namedtuple("Stat", "start_interval value")
42 42
43 43
44 44 def default_extractor(item):
45 45 """
46 46 :param item - item to extract date from
47 47 """
48 48 if hasattr(item, "start_interval"):
49 49 return item.start_interval
50 50 return item["start_interval"]
51 51
52 52
53 53 # fast gap generator
54 54 def gap_gen_default(start, step, itemiterator, end_time=None, iv_extractor=None):
55 55 """ generates a list of time/value items based on step and itemiterator
56 56 if there are entries missing from iterator time/None will be returned
57 57 instead
58 58 :param start - datetime - what time should we start generating our values
59 59 :param step - timedelta - stepsize
60 60 :param itemiterator - iterable - we will check this iterable for values
61 61 corresponding to generated steps
62 62 :param end_time - datetime - when last step is >= end_time stop iterating
63 63 :param iv_extractor - extracts current step from iterable items
64 64 """
65 65
66 66 if not iv_extractor:
67 67 iv_extractor = default_extractor
68 68
69 69 next_step = start
70 70 minutes = step.total_seconds() / 60.0
71 71 while next_step.minute % minutes != 0:
72 72 next_step = next_step.replace(minute=next_step.minute - 1)
73 73 for item in itemiterator:
74 74 item_start_interval = iv_extractor(item)
75 75 # do we have a match for current time step in our data?
76 76 # no gen a new tuple with 0 values
77 77 while next_step < item_start_interval:
78 78 yield Stat(next_step, None)
79 79 next_step = next_step + step
80 80 if next_step == item_start_interval:
81 81 yield Stat(item_start_interval, item)
82 82 next_step = next_step + step
83 83 if end_time:
84 84 while next_step < end_time:
85 85 yield Stat(next_step, None)
86 86 next_step = next_step + step
87 87
88 88
89 89 class DateTimeEncoder(json.JSONEncoder):
90 90 """ Simple datetime to ISO encoder for json serialization"""
91 91
92 92 def default(self, obj):
93 93 if isinstance(obj, date):
94 94 return obj.isoformat()
95 95 if isinstance(obj, datetime):
96 96 return obj.isoformat()
97 97 return json.JSONEncoder.default(self, obj)
98 98
99 99
100 100 def channelstream_request(
101 101 secret, endpoint, payload, throw_exceptions=False, servers=None
102 102 ):
103 103 responses = []
104 104 if not servers:
105 105 servers = []
106 106
107 107 signer = TimestampSigner(secret)
108 108 sig_for_server = signer.sign(endpoint)
109 109 for secret, server in [(s["secret"], s["server"]) for s in servers]:
110 110 response = {}
111 111 secret_headers = {
112 112 "x-channelstream-secret": sig_for_server,
113 113 "x-channelstream-endpoint": endpoint,
114 114 "Content-Type": "application/json",
115 115 }
116 116 url = "%s%s" % (server, endpoint)
117 117 try:
118 118 response = requests.post(
119 119 url,
120 120 data=json.dumps(payload, cls=DateTimeEncoder),
121 121 headers=secret_headers,
122 122 verify=False,
123 123 timeout=2,
124 124 ).json()
125 125 except requests.exceptions.RequestException as e:
126 126 if throw_exceptions:
127 127 raise
128 128 responses.append(response)
129 129 return responses
130 130
131 131
132 132 def add_cors_headers(response):
133 133 # allow CORS
134 134 response.headers.add("Access-Control-Allow-Origin", "*")
135 135 response.headers.add("XDomainRequestAllowed", "1")
136 136 response.headers.add("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
137 137 # response.headers.add('Access-Control-Allow-Credentials', 'true')
138 138 response.headers.add(
139 139 "Access-Control-Allow-Headers",
140 140 "Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie",
141 141 )
142 142 response.headers.add("Access-Control-Max-Age", "86400")
143 143
144 144
145 145 from sqlalchemy.sql import compiler
146 146 from psycopg2.extensions import adapt as sqlescape
147 147
148 148
149 149 # or use the appropiate escape function from your db driver
150 150
151 151
152 152 def compile_query(query):
153 153 dialect = query.session.bind.dialect
154 154 statement = query.statement
155 155 comp = compiler.SQLCompiler(dialect, statement)
156 156 comp.compile()
157 157 enc = dialect.encoding
158 158 params = {}
159 159 for k, v in comp.params.items():
160 160 if isinstance(v, str):
161 161 v = v.encode(enc)
162 162 params[k] = sqlescape(v)
163 163 return (comp.string.encode(enc) % params).decode(enc)
164 164
165 165
166 166 def convert_es_type(input_data):
167 167 """
168 168 This might need to convert some text or other types to corresponding ES types
169 169 """
170 170 return str(input_data)
171 171
172 172
173 173 ProtoVersion = namedtuple("ProtoVersion", ["major", "minor", "patch"])
174 174
175 175
176 176 def parse_proto(input_data):
177 177 try:
178 178 parts = [int(x) for x in input_data.split(".")]
179 179 while len(parts) < 3:
180 180 parts.append(0)
181 181 return ProtoVersion(*parts)
182 182 except Exception as e:
183 183 log.info("Unknown protocol version: %s" % e)
184 184 return ProtoVersion(99, 99, 99)
185 185
186 186
187 187 def es_index_name_limiter(
188 188 start_date=None, end_date=None, months_in_past=6, ixtypes=None
189 189 ):
190 190 """
191 191 This function limits the search to 6 months by default so we don't have to
192 192 query 300 elasticsearch indices for 20 years of historical data for example
193 193 """
194 194
195 195 # should be cached later
196 196 def get_possible_names():
197 197 return list(Datastores.es.indices.get_alias("*"))
198 198
199 199 possible_names = get_possible_names()
200 200 es_index_types = []
201 201 if not ixtypes:
202 202 ixtypes = ["reports", "metrics", "logs"]
203 203 for t in ixtypes:
204 204 if t == "reports":
205 205 es_index_types.append("rcae_r_%s")
206 206 elif t == "logs":
207 207 es_index_types.append("rcae_l_%s")
208 208 elif t == "metrics":
209 209 es_index_types.append("rcae_m_%s")
210 210 elif t == "uptime":
211 es_index_types.append("rcae_u_%s")
211 es_index_types.append("rcae_uptime_ce_%s")
212 212 elif t == "slow_calls":
213 213 es_index_types.append("rcae_sc_%s")
214 214
215 215 if start_date:
216 216 start_date = copy.copy(start_date)
217 217 else:
218 218 if not end_date:
219 219 end_date = datetime.utcnow()
220 220 start_date = end_date + relativedelta(months=months_in_past * -1)
221 221
222 222 if not end_date:
223 223 end_date = start_date + relativedelta(months=months_in_past)
224 224
225 225 index_dates = list(
226 226 rrule(
227 227 MONTHLY,
228 228 dtstart=start_date.date().replace(day=1),
229 229 until=end_date.date(),
230 230 count=36,
231 231 )
232 232 )
233 233 index_names = []
234 234 for ix_type in es_index_types:
235 235 to_extend = [
236 236 ix_type % d.strftime("%Y_%m")
237 237 for d in index_dates
238 238 if ix_type % d.strftime("%Y_%m") in possible_names
239 239 ]
240 240 index_names.extend(to_extend)
241 241 for day in list(
242 242 rrule(DAILY, dtstart=start_date.date(), until=end_date.date(), count=366)
243 243 ):
244 244 ix_name = ix_type % day.strftime("%Y_%m_%d")
245 245 if ix_name in possible_names:
246 246 index_names.append(ix_name)
247 247 return index_names
248 248
249 249
250 250 def build_filter_settings_from_query_dict(
251 251 request, params=None, override_app_ids=None, resource_permissions=None
252 252 ):
253 253 """
254 254 Builds list of normalized search terms for ES from query params
255 255 ensuring application list is restricted to only applications user
256 256 has access to
257 257
258 258 :param params (dictionary)
259 259 :param override_app_ids - list of application id's to use instead of
260 260 applications user normally has access to
261 261 """
262 262 params = copy.deepcopy(params)
263 263 applications = []
264 264 if not resource_permissions:
265 265 resource_permissions = ["view"]
266 266
267 267 if request.user:
268 268 applications = UserService.resources_with_perms(
269 269 request.user, resource_permissions, resource_types=["application"]
270 270 )
271 271
272 272 # CRITICAL - this ensures our resultset is limited to only the ones
273 273 # user has view permissions
274 274 all_possible_app_ids = set([app.resource_id for app in applications])
275 275
276 276 # if override is preset we force permission for app to be present
277 277 # this allows users to see dashboards and applications they would
278 278 # normally not be able to
279 279
280 280 if override_app_ids:
281 281 all_possible_app_ids = set(override_app_ids)
282 282
283 283 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
284 284 tag_schema = TagListSchema()
285 285 filter_settings = schema.deserialize(params)
286 286 tag_list = []
287 287 for k, v in list(filter_settings.items()):
288 288 if k in accepted_search_params:
289 289 continue
290 290 tag_list.append({"name": k, "value": v, "op": "eq"})
291 291 # remove the key from filter_settings
292 292 filter_settings.pop(k, None)
293 293 tags = tag_schema.deserialize(tag_list)
294 294 filter_settings["tags"] = tags
295 295 return filter_settings
296 296
297 297
298 298 def gen_uuid():
299 299 return str(uuid.uuid4())
300 300
301 301
302 302 def gen_uuid4_sha_hex():
303 303 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
304 304
305 305
306 306 def permission_tuple_to_dict(data):
307 307 out = {
308 308 "user_name": None,
309 309 "perm_name": data.perm_name,
310 310 "owner": data.owner,
311 311 "type": data.type,
312 312 "resource_name": None,
313 313 "resource_type": None,
314 314 "resource_id": None,
315 315 "group_name": None,
316 316 "group_id": None,
317 317 }
318 318 if data.user:
319 319 out["user_name"] = data.user.user_name
320 320 if data.perm_name == ALL_PERMISSIONS:
321 321 out["perm_name"] = "__all_permissions__"
322 322 if data.resource:
323 323 out["resource_name"] = data.resource.resource_name
324 324 out["resource_type"] = data.resource.resource_type
325 325 out["resource_id"] = data.resource.resource_id
326 326 if data.group:
327 327 out["group_name"] = data.group.group_name
328 328 out["group_id"] = data.group.id
329 329 return out
330 330
331 331
332 332 def get_cached_buckets(
333 333 request,
334 334 stats_since,
335 335 end_time,
336 336 fn,
337 337 cache_key,
338 338 gap_gen=None,
339 339 db_session=None,
340 340 step_interval=None,
341 341 iv_extractor=None,
342 342 rerange=False,
343 343 *args,
344 344 **kwargs
345 345 ):
346 346 """ Takes "fn" that should return some data and tries to load the data
347 347 dividing it into daily buckets - if the stats_since and end time give a
348 348 delta bigger than 24hours, then only "todays" data is computed on the fly
349 349
350 350 :param request: (request) request object
351 351 :param stats_since: (datetime) start date of buckets range
352 352 :param end_time: (datetime) end date of buckets range - utcnow() if None
353 353 :param fn: (callable) callable to use to populate buckets should have
354 354 following signature:
355 355 def get_data(request, since_when, until, *args, **kwargs):
356 356
357 357 :param cache_key: (string) cache key that will be used to build bucket
358 358 caches
359 359 :param gap_gen: (callable) gap generator - should return step intervals
360 360 to use with out `fn` callable
361 361 :param db_session: (Session) sqlalchemy session
362 362 :param step_interval: (timedelta) optional step interval if we want to
363 363 override the default determined from total start/end time delta
364 364 :param iv_extractor: (callable) used to get step intervals from data
365 365 returned by `fn` callable
366 366 :param rerange: (bool) handy if we want to change ranges from hours to
367 367 days when cached data is missing - will shorten execution time if `fn`
368 368 callable supports that and we are working with multiple rows - like metrics
369 369 :param args:
370 370 :param kwargs:
371 371
372 372 :return: iterable
373 373 """
374 374 if not end_time:
375 375 end_time = datetime.utcnow().replace(second=0, microsecond=0)
376 376 delta = end_time - stats_since
377 377 # if smaller than 3 days we want to group by 5min else by 1h,
378 378 # for 60 min group by min
379 379 if not gap_gen:
380 380 gap_gen = gap_gen_default
381 381 if not iv_extractor:
382 382 iv_extractor = default_extractor
383 383
384 384 # do not use custom interval if total time range with new iv would exceed
385 385 # end time
386 386 if not step_interval or stats_since + step_interval >= end_time:
387 387 if delta < h.time_deltas.get("12h")["delta"]:
388 388 step_interval = timedelta(seconds=60)
389 389 elif delta < h.time_deltas.get("3d")["delta"]:
390 390 step_interval = timedelta(seconds=60 * 5)
391 391 elif delta > h.time_deltas.get("2w")["delta"]:
392 392 step_interval = timedelta(days=1)
393 393 else:
394 394 step_interval = timedelta(minutes=60)
395 395
396 396 if step_interval >= timedelta(minutes=60):
397 397 log.info(
398 398 "cached_buckets:{}: adjusting start time "
399 399 "for hourly or daily intervals".format(cache_key)
400 400 )
401 401 stats_since = stats_since.replace(hour=0, minute=0)
402 402
403 403 ranges = [
404 404 i.start_interval
405 405 for i in list(gap_gen(stats_since, step_interval, [], end_time=end_time))
406 406 ]
407 407 buckets = {}
408 408 storage_key = "buckets:" + cache_key + "{}|{}"
409 409 # this means we basicly cache per hour in 3-14 day intervals but i think
410 410 # its fine at this point - will be faster than db access anyways
411 411
412 412 if len(ranges) >= 1:
413 413 last_ranges = [ranges[-1]]
414 414 else:
415 415 last_ranges = []
416 416 if step_interval >= timedelta(minutes=60):
417 417 for r in ranges:
418 418 k = storage_key.format(step_interval.total_seconds(), r)
419 419 value = request.registry.cache_regions.redis_day_30.get(k)
420 420 # last buckets are never loaded from cache
421 421 is_last_result = r >= end_time - timedelta(hours=6) or r in last_ranges
422 422 if value is not NO_VALUE and not is_last_result:
423 423 log.info(
424 424 "cached_buckets:{}: "
425 425 "loading range {} from cache".format(cache_key, r)
426 426 )
427 427 buckets[r] = value
428 428 else:
429 429 log.info(
430 430 "cached_buckets:{}: "
431 431 "loading range {} from storage".format(cache_key, r)
432 432 )
433 433 range_size = step_interval
434 434 if (
435 435 step_interval == timedelta(minutes=60)
436 436 and not is_last_result
437 437 and rerange
438 438 ):
439 439 range_size = timedelta(days=1)
440 440 r = r.replace(hour=0, minute=0)
441 441 log.info(
442 442 "cached_buckets:{}: "
443 443 "loading collapsed "
444 444 "range {} {}".format(cache_key, r, r + range_size)
445 445 )
446 446 bucket_data = fn(
447 447 request,
448 448 r,
449 449 r + range_size,
450 450 step_interval,
451 451 gap_gen,
452 452 bucket_count=len(ranges),
453 453 *args,
454 454 **kwargs
455 455 )
456 456 for b in bucket_data:
457 457 b_iv = iv_extractor(b)
458 458 buckets[b_iv] = b
459 459 k2 = storage_key.format(step_interval.total_seconds(), b_iv)
460 460 request.registry.cache_regions.redis_day_30.set(k2, b)
461 461 log.info("cached_buckets:{}: saving cache".format(cache_key))
462 462 else:
463 463 # bucket count is 1 for short time ranges <= 24h from now
464 464 bucket_data = fn(
465 465 request,
466 466 stats_since,
467 467 end_time,
468 468 step_interval,
469 469 gap_gen,
470 470 bucket_count=1,
471 471 *args,
472 472 **kwargs
473 473 )
474 474 for b in bucket_data:
475 475 buckets[iv_extractor(b)] = b
476 476 return buckets
477 477
478 478
479 479 def get_cached_split_data(
480 480 request, stats_since, end_time, fn, cache_key, db_session=None, *args, **kwargs
481 481 ):
482 482 """ Takes "fn" that should return some data and tries to load the data
483 483 dividing it into 2 buckets - cached "since_from" bucket and "today"
484 484 bucket - then the data can be reduced into single value
485 485
486 486 Data is cached if the stats_since and end time give a delta bigger
487 487 than 24hours - then only 24h is computed on the fly
488 488 """
489 489 if not end_time:
490 490 end_time = datetime.utcnow().replace(second=0, microsecond=0)
491 491 delta = end_time - stats_since
492 492
493 493 if delta >= timedelta(minutes=60):
494 494 log.info(
495 495 "cached_split_data:{}: adjusting start time "
496 496 "for hourly or daily intervals".format(cache_key)
497 497 )
498 498 stats_since = stats_since.replace(hour=0, minute=0)
499 499
500 500 storage_key = "buckets_split_data:" + cache_key + ":{}|{}"
501 501 old_end_time = end_time.replace(hour=0, minute=0)
502 502
503 503 final_storage_key = storage_key.format(delta.total_seconds(), old_end_time)
504 504 older_data = None
505 505
506 506 cdata = request.registry.cache_regions.redis_day_7.get(final_storage_key)
507 507
508 508 if cdata:
509 509 log.info("cached_split_data:{}: found old " "bucket data".format(cache_key))
510 510 older_data = cdata
511 511
512 512 if stats_since < end_time - h.time_deltas.get("24h")["delta"] and not cdata:
513 513 log.info(
514 514 "cached_split_data:{}: didn't find the "
515 515 "start bucket in cache so load older data".format(cache_key)
516 516 )
517 517 recent_stats_since = old_end_time
518 518 older_data = fn(
519 519 request,
520 520 stats_since,
521 521 recent_stats_since,
522 522 db_session=db_session,
523 523 *args,
524 524 **kwargs
525 525 )
526 526 request.registry.cache_regions.redis_day_7.set(final_storage_key, older_data)
527 527 elif stats_since < end_time - h.time_deltas.get("24h")["delta"]:
528 528 recent_stats_since = old_end_time
529 529 else:
530 530 recent_stats_since = stats_since
531 531
532 532 log.info(
533 533 "cached_split_data:{}: loading fresh "
534 534 "data bucksts from last 24h ".format(cache_key)
535 535 )
536 536 todays_data = fn(
537 537 request, recent_stats_since, end_time, db_session=db_session, *args, **kwargs
538 538 )
539 539 return older_data, todays_data
540 540
541 541
542 542 def in_batches(seq, size):
543 543 """
544 544 Splits am iterable into batches of specified size
545 545 :param seq (iterable)
546 546 :param size integer
547 547 """
548 548 return (seq[pos : pos + size] for pos in range(0, len(seq), size))
549 549
550 550
551 551 def get_es_info(cache_regions, es_conn):
552 552 @cache_regions.memory_min_10.cache_on_arguments()
553 553 def get_es_info_cached():
554 554 returned_info = {"raw_info": es_conn.info()}
555 returned_info["version"] = returned_info["raw_info"]["version"]["number"].split('.')
555 returned_info["version"] = returned_info["raw_info"]["version"]["number"].split(
556 "."
557 )
556 558 return returned_info
557 559
558 560 return get_es_info_cached()
@@ -1,132 +1,132 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import sqlalchemy as sa
18 18 import logging
19 19 import hashlib
20 20
21 21 from datetime import datetime
22 22 from appenlight.models import Base
23 23 from appenlight.lib.utils import convert_es_type
24 24 from appenlight.lib.enums import LogLevel
25 25 from sqlalchemy.dialects.postgresql import JSON
26 26 from ziggurat_foundations.models.base import BaseModel
27 27
28 28 log = logging.getLogger(__name__)
29 29
30 30
31 31 class Log(Base, BaseModel):
32 32 __tablename__ = "logs"
33 33 __table_args__ = {"implicit_returning": False}
34 34
35 35 log_id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
36 36 resource_id = sa.Column(
37 37 sa.Integer(),
38 38 sa.ForeignKey(
39 39 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
40 40 ),
41 41 nullable=False,
42 42 index=True,
43 43 )
44 44 log_level = sa.Column(sa.Unicode, nullable=False, index=True, default="INFO")
45 45 message = sa.Column(sa.UnicodeText(), default="")
46 46 timestamp = sa.Column(
47 47 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
48 48 )
49 49 request_id = sa.Column(sa.Unicode())
50 50 namespace = sa.Column(sa.Unicode())
51 51 primary_key = sa.Column(sa.Unicode())
52 52
53 53 tags = sa.Column(JSON(), default={})
54 54 permanent = sa.Column(sa.Boolean(), nullable=False, default=False)
55 55
56 56 def __str__(self):
57 57 return self.__unicode__().encode("utf8")
58 58
59 59 def __unicode__(self):
60 60 return "<Log id:%s, lv:%s, ns:%s >" % (
61 61 self.log_id,
62 62 self.log_level,
63 63 self.namespace,
64 64 )
65 65
66 66 def set_data(self, data, resource):
67 67 level = data.get("log_level").upper()
68 68 self.log_level = getattr(LogLevel, level, LogLevel.UNKNOWN)
69 69 self.message = data.get("message", "")
70 70 server_name = data.get("server", "").lower() or "unknown"
71 71 self.tags = {"server_name": server_name}
72 72 if data.get("tags"):
73 73 for tag_tuple in data["tags"]:
74 74 self.tags[tag_tuple[0]] = tag_tuple[1]
75 75 self.timestamp = data["date"]
76 76 r_id = data.get("request_id", "")
77 77 if not r_id:
78 78 r_id = ""
79 79 self.request_id = r_id.replace("-", "")
80 80 self.resource_id = resource.resource_id
81 81 self.namespace = data.get("namespace") or ""
82 82 self.permanent = data.get("permanent")
83 83 self.primary_key = data.get("primary_key")
84 84 if self.primary_key is not None:
85 85 self.tags["appenlight_primary_key"] = self.primary_key
86 86
87 87 def get_dict(self):
88 88 instance_dict = super(Log, self).get_dict()
89 89 instance_dict["log_level"] = LogLevel.key_from_value(self.log_level)
90 90 instance_dict["resource_name"] = self.application.resource_name
91 91 return instance_dict
92 92
93 93 @property
94 94 def delete_hash(self):
95 95 if not self.primary_key:
96 96 return None
97 97
98 98 to_hash = "{}_{}_{}".format(self.resource_id, self.primary_key, self.namespace)
99 99 return hashlib.sha1(to_hash.encode("utf8")).hexdigest()
100 100
101 101 def es_doc(self):
102 102 tags = {}
103 103 tag_list = []
104 104 for name, value in self.tags.items():
105 105 # replace dot in indexed tag name
106 106 name = name.replace(".", "_")
107 107 tag_list.append(name)
108 108 tags[name] = {
109 109 "values": convert_es_type(value),
110 110 "numeric_values": value
111 111 if (isinstance(value, (int, float)) and not isinstance(value, bool))
112 112 else None,
113 113 }
114 114 return {
115 "pg_id": str(self.log_id),
115 "log_id": str(self.log_id),
116 116 "delete_hash": self.delete_hash,
117 117 "resource_id": self.resource_id,
118 118 "request_id": self.request_id,
119 119 "log_level": LogLevel.key_from_value(self.log_level),
120 120 "timestamp": self.timestamp,
121 121 "message": self.message if self.message else "",
122 122 "namespace": self.namespace if self.namespace else "",
123 123 "tags": tags,
124 124 "tag_list": tag_list,
125 125 }
126 126
127 127 @property
128 128 def partition_id(self):
129 129 if self.permanent:
130 130 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m")
131 131 else:
132 132 return "rcae_l_%s" % self.timestamp.strftime("%Y_%m_%d")
@@ -1,68 +1,69 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import sqlalchemy as sa
20 20 from sqlalchemy.dialects.postgresql import JSON
21 21
22 22 from ziggurat_foundations.models.base import BaseModel
23 23 from appenlight.lib.utils import convert_es_type
24 24 from appenlight.models import Base
25 25
26 26
27 27 class Metric(Base, BaseModel):
28 28 __tablename__ = "metrics"
29 29 __table_args__ = {"implicit_returning": False}
30 30
31 31 pkey = sa.Column(sa.BigInteger(), primary_key=True)
32 32 resource_id = sa.Column(
33 33 sa.Integer(),
34 34 sa.ForeignKey("applications.resource_id"),
35 35 nullable=False,
36 36 primary_key=True,
37 37 )
38 38 timestamp = sa.Column(
39 39 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
40 40 )
41 41 tags = sa.Column(JSON(), default={})
42 42 namespace = sa.Column(sa.Unicode(255))
43 43
44 44 @property
45 45 def partition_id(self):
46 46 return "rcae_m_%s" % self.timestamp.strftime("%Y_%m_%d")
47 47
48 48 def es_doc(self):
49 49 tags = {}
50 50 tag_list = []
51 51 for name, value in self.tags.items():
52 52 # replace dot in indexed tag name
53 53 name = name.replace(".", "_")
54 54 tag_list.append(name)
55 55 tags[name] = {
56 56 "values": convert_es_type(value),
57 57 "numeric_values": value
58 58 if (isinstance(value, (int, float)) and not isinstance(value, bool))
59 59 else None,
60 60 }
61 61
62 62 return {
63 "metric_id": self.pkey,
63 64 "resource_id": self.resource_id,
64 65 "timestamp": self.timestamp,
65 66 "namespace": self.namespace,
66 67 "tags": tags,
67 68 "tag_list": tag_list,
68 69 }
@@ -1,529 +1,534 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime, timedelta
18 18 import math
19 19 import uuid
20 20 import hashlib
21 21 import copy
22 22 import urllib.parse
23 23 import logging
24 24 import sqlalchemy as sa
25 25
26 26 from appenlight.models import Base, Datastores
27 27 from appenlight.lib.utils.date_utils import convert_date
28 28 from appenlight.lib.utils import convert_es_type
29 29 from appenlight.models.slow_call import SlowCall
30 30 from appenlight.lib.utils import channelstream_request
31 31 from appenlight.lib.enums import ReportType, Language
32 32 from pyramid.threadlocal import get_current_registry, get_current_request
33 33 from sqlalchemy.dialects.postgresql import JSON
34 34 from ziggurat_foundations.models.base import BaseModel
35 35
36 36 log = logging.getLogger(__name__)
37 37
38 38 REPORT_TYPE_MATRIX = {
39 39 "http_status": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
40 40 "group:priority": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
41 41 "duration": {"type": "float", "ops": ("ge", "le")},
42 42 "url_domain": {
43 43 "type": "unicode",
44 44 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
45 45 },
46 46 "url_path": {
47 47 "type": "unicode",
48 48 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
49 49 },
50 50 "error": {
51 51 "type": "unicode",
52 52 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
53 53 },
54 54 "tags:server_name": {
55 55 "type": "unicode",
56 56 "ops": ("eq", "ne", "startswith", "endswith", "contains"),
57 57 },
58 58 "traceback": {"type": "unicode", "ops": ("contains",)},
59 59 "group:occurences": {"type": "int", "ops": ("eq", "ne", "ge", "le")},
60 60 }
61 61
62 62
63 63 class Report(Base, BaseModel):
64 64 __tablename__ = "reports"
65 65 __table_args__ = {"implicit_returning": False}
66 66
67 67 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
68 68 group_id = sa.Column(
69 69 sa.BigInteger,
70 70 sa.ForeignKey("reports_groups.id", ondelete="cascade", onupdate="cascade"),
71 71 )
72 72 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
73 73 report_type = sa.Column(sa.Integer(), nullable=False, index=True)
74 74 error = sa.Column(sa.UnicodeText(), index=True)
75 75 extra = sa.Column(JSON(), default={})
76 76 request = sa.Column(JSON(), nullable=False, default={})
77 77 ip = sa.Column(sa.String(39), index=True, default="")
78 78 username = sa.Column(sa.Unicode(255), default="")
79 79 user_agent = sa.Column(sa.Unicode(255), default="")
80 80 url = sa.Column(sa.UnicodeText(), index=True)
81 81 request_id = sa.Column(sa.Text())
82 82 request_stats = sa.Column(JSON(), nullable=False, default={})
83 83 traceback = sa.Column(JSON(), nullable=False, default=None)
84 84 traceback_hash = sa.Column(sa.Text())
85 85 start_time = sa.Column(
86 86 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
87 87 )
88 88 end_time = sa.Column(sa.DateTime())
89 89 duration = sa.Column(sa.Float, default=0)
90 90 http_status = sa.Column(sa.Integer, index=True)
91 91 url_domain = sa.Column(sa.Unicode(100), index=True)
92 92 url_path = sa.Column(sa.Unicode(255), index=True)
93 93 tags = sa.Column(JSON(), nullable=False, default={})
94 94 language = sa.Column(sa.Integer(), default=0)
95 95 # this is used to determine partition for the report
96 96 report_group_time = sa.Column(
97 97 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
98 98 )
99 99
100 100 logs = sa.orm.relationship(
101 101 "Log",
102 102 lazy="dynamic",
103 103 passive_deletes=True,
104 104 passive_updates=True,
105 105 primaryjoin="and_(Report.request_id==Log.request_id, "
106 106 "Log.request_id != None, Log.request_id != '')",
107 107 foreign_keys="[Log.request_id]",
108 108 )
109 109
110 110 slow_calls = sa.orm.relationship(
111 111 "SlowCall",
112 112 backref="detail",
113 113 cascade="all, delete-orphan",
114 114 passive_deletes=True,
115 115 passive_updates=True,
116 116 order_by="SlowCall.timestamp",
117 117 )
118 118
119 119 def set_data(self, data, resource, protocol_version=None):
120 120 self.http_status = data["http_status"]
121 121 self.priority = data["priority"]
122 122 self.error = data["error"]
123 123 report_language = data.get("language", "").lower()
124 124 self.language = getattr(Language, report_language, Language.unknown)
125 125 # we need temp holder here to decide later
126 126 # if we want to to commit the tags if report is marked for creation
127 127 self.tags = {"server_name": data["server"], "view_name": data["view_name"]}
128 128 if data.get("tags"):
129 129 for tag_tuple in data["tags"]:
130 130 self.tags[tag_tuple[0]] = tag_tuple[1]
131 131 self.traceback = data["traceback"]
132 132 stripped_traceback = self.stripped_traceback()
133 133 tb_repr = repr(stripped_traceback).encode("utf8")
134 134 self.traceback_hash = hashlib.sha1(tb_repr).hexdigest()
135 135 url_info = urllib.parse.urlsplit(data.get("url", ""), allow_fragments=False)
136 136 self.url_domain = url_info.netloc[:128]
137 137 self.url_path = url_info.path[:2048]
138 138 self.occurences = data["occurences"]
139 139 if self.error:
140 140 self.report_type = ReportType.error
141 141 else:
142 142 self.report_type = ReportType.slow
143 143
144 144 # but if its status 404 its 404 type
145 145 if self.http_status in [404, "404"] or self.error == "404 Not Found":
146 146 self.report_type = ReportType.not_found
147 147 self.error = ""
148 148
149 149 self.generate_grouping_hash(
150 150 data.get("appenlight.group_string", data.get("group_string")),
151 151 resource.default_grouping,
152 152 protocol_version,
153 153 )
154 154
155 155 # details
156 156 if data["http_status"] in [404, "404"]:
157 157 data = {
158 158 "username": data["username"],
159 159 "ip": data["ip"],
160 160 "url": data["url"],
161 161 "user_agent": data["user_agent"],
162 162 }
163 163 if data.get("HTTP_REFERER") or data.get("http_referer"):
164 164 data["HTTP_REFERER"] = data.get("HTTP_REFERER", "") or data.get(
165 165 "http_referer", ""
166 166 )
167 167
168 168 self.resource_id = resource.resource_id
169 169 self.username = data["username"]
170 170 self.user_agent = data["user_agent"]
171 171 self.ip = data["ip"]
172 172 self.extra = {}
173 173 if data.get("extra"):
174 174 for extra_tuple in data["extra"]:
175 175 self.extra[extra_tuple[0]] = extra_tuple[1]
176 176
177 177 self.url = data["url"]
178 178 self.request_id = data.get("request_id", "").replace("-", "") or str(
179 179 uuid.uuid4()
180 180 )
181 181 request_data = data.get("request", {})
182 182
183 183 self.request = request_data
184 self.request_stats = data.get("request_stats", {})
184 self.request_stats = data.get("request_stats") or {}
185 185 traceback = data.get("traceback")
186 186 if not traceback:
187 187 traceback = data.get("frameinfo")
188 188 self.traceback = traceback
189 189 start_date = convert_date(data.get("start_time"))
190 190 if not self.start_time or self.start_time < start_date:
191 191 self.start_time = start_date
192 192
193 193 self.end_time = convert_date(data.get("end_time"), False)
194 194 self.duration = 0
195 195
196 196 if self.start_time and self.end_time:
197 197 d = self.end_time - self.start_time
198 198 self.duration = d.total_seconds()
199 199
200 200 # update tags with other vars
201 201 if self.username:
202 202 self.tags["user_name"] = self.username
203 203 self.tags["report_language"] = Language.key_from_value(self.language)
204 204
205 205 def add_slow_calls(self, data, report_group):
206 206 slow_calls = []
207 207 for call in data.get("slow_calls", []):
208 208 sc_inst = SlowCall()
209 209 sc_inst.set_data(
210 210 call, resource_id=self.resource_id, report_group=report_group
211 211 )
212 212 slow_calls.append(sc_inst)
213 213 self.slow_calls.extend(slow_calls)
214 214 return slow_calls
215 215
216 216 def get_dict(self, request, details=False, exclude_keys=None, include_keys=None):
217 217 from appenlight.models.services.report_group import ReportGroupService
218 218
219 219 instance_dict = super(Report, self).get_dict()
220 220 instance_dict["req_stats"] = self.req_stats()
221 221 instance_dict["group"] = {}
222 222 instance_dict["group"]["id"] = self.report_group.id
223 223 instance_dict["group"]["total_reports"] = self.report_group.total_reports
224 224 instance_dict["group"]["last_report"] = self.report_group.last_report
225 225 instance_dict["group"]["priority"] = self.report_group.priority
226 226 instance_dict["group"]["occurences"] = self.report_group.occurences
227 227 instance_dict["group"]["last_timestamp"] = self.report_group.last_timestamp
228 228 instance_dict["group"]["first_timestamp"] = self.report_group.first_timestamp
229 229 instance_dict["group"]["public"] = self.report_group.public
230 230 instance_dict["group"]["fixed"] = self.report_group.fixed
231 231 instance_dict["group"]["read"] = self.report_group.read
232 232 instance_dict["group"]["average_duration"] = self.report_group.average_duration
233 233
234 234 instance_dict["resource_name"] = self.report_group.application.resource_name
235 235 instance_dict["report_type"] = self.report_type
236 236
237 237 if instance_dict["http_status"] == 404 and not instance_dict["error"]:
238 238 instance_dict["error"] = "404 Not Found"
239 239
240 240 if details:
241 241 instance_dict[
242 242 "affected_users_count"
243 243 ] = ReportGroupService.affected_users_count(self.report_group)
244 244 instance_dict["top_affected_users"] = [
245 245 {"username": u.username, "count": u.count}
246 246 for u in ReportGroupService.top_affected_users(self.report_group)
247 247 ]
248 248 instance_dict["application"] = {"integrations": []}
249 249 for integration in self.report_group.application.integrations:
250 250 if integration.front_visible:
251 251 instance_dict["application"]["integrations"].append(
252 252 {
253 253 "name": integration.integration_name,
254 254 "action": integration.integration_action,
255 255 }
256 256 )
257 257 instance_dict["comments"] = [
258 258 c.get_dict() for c in self.report_group.comments
259 259 ]
260 260
261 261 instance_dict["group"]["next_report"] = None
262 262 instance_dict["group"]["previous_report"] = None
263 263 next_in_group = self.get_next_in_group(request)
264 264 previous_in_group = self.get_previous_in_group(request)
265 265 if next_in_group:
266 266 instance_dict["group"]["next_report"] = next_in_group
267 267 if previous_in_group:
268 268 instance_dict["group"]["previous_report"] = previous_in_group
269 269
270 270 # slow call ordering
271 271 def find_parent(row, data):
272 272 for r in reversed(data):
273 273 try:
274 274 if (
275 275 row["timestamp"] > r["timestamp"]
276 276 and row["end_time"] < r["end_time"]
277 277 ):
278 278 return r
279 279 except TypeError as e:
280 280 log.warning("reports_view.find_parent: %s" % e)
281 281 return None
282 282
283 283 new_calls = []
284 284 calls = [c.get_dict() for c in self.slow_calls]
285 285 while calls:
286 286 # start from end
287 287 for x in range(len(calls) - 1, -1, -1):
288 288 parent = find_parent(calls[x], calls)
289 289 if parent:
290 290 parent["children"].append(calls[x])
291 291 else:
292 292 # no parent at all? append to new calls anyways
293 293 new_calls.append(calls[x])
294 294 # print 'append', calls[x]
295 295 del calls[x]
296 296 break
297 297 instance_dict["slow_calls"] = new_calls
298 298
299 299 instance_dict["front_url"] = self.get_public_url(request)
300 300
301 301 exclude_keys_list = exclude_keys or []
302 302 include_keys_list = include_keys or []
303 303 for k in list(instance_dict.keys()):
304 304 if k == "group":
305 305 continue
306 306 if k in exclude_keys_list or (k not in include_keys_list and include_keys):
307 307 del instance_dict[k]
308 308 return instance_dict
309 309
310 310 def get_previous_in_group(self, request):
311 311 query = {
312 312 "size": 1,
313 313 "query": {
314 314 "bool": {
315 315 "filter": [
316 316 {"term": {"group_id": self.group_id}},
317 {"range": {"pg_id": {"lt": self.id}}},
317 {"range": {"report_id": {"lt": self.id}}},
318 318 ]
319 319 }
320 320 },
321 321 "sort": [{"_doc": {"order": "desc"}}],
322 322 }
323 323 result = request.es_conn.search(
324 324 body=query, index=self.partition_id, doc_type="report"
325 325 )
326 326 if result["hits"]["total"]:
327 return result["hits"]["hits"][0]["_source"]["pg_id"]
327 return result["hits"]["hits"][0]["_source"]["report_id"]
328 328
329 329 def get_next_in_group(self, request):
330 330 query = {
331 331 "size": 1,
332 332 "query": {
333 333 "bool": {
334 334 "filter": [
335 335 {"term": {"group_id": self.group_id}},
336 {"range": {"pg_id": {"gt": self.id}}},
336 {"range": {"report_id": {"gt": self.id}}},
337 337 ]
338 338 }
339 339 },
340 340 "sort": [{"_doc": {"order": "asc"}}],
341 341 }
342 342 result = request.es_conn.search(
343 343 body=query, index=self.partition_id, doc_type="report"
344 344 )
345 345 if result["hits"]["total"]:
346 return result["hits"]["hits"][0]["_source"]["pg_id"]
346 return result["hits"]["hits"][0]["_source"]["report_id"]
347 347
348 348 def get_public_url(self, request=None, report_group=None, _app_url=None):
349 349 """
350 350 Returns url that user can use to visit specific report
351 351 """
352 352 if not request:
353 353 request = get_current_request()
354 354 url = request.route_url("/", _app_url=_app_url)
355 355 if report_group:
356 356 return (url + "ui/report/%s/%s") % (report_group.id, self.id)
357 357 return (url + "ui/report/%s/%s") % (self.group_id, self.id)
358 358
359 359 def req_stats(self):
360 360 stats = self.request_stats.copy()
361 361 stats["percentages"] = {}
362 362 stats["percentages"]["main"] = 100.0
363 363 main = stats.get("main", 0.0)
364 364 if not main:
365 365 return None
366 366 for name, call_time in stats.items():
367 367 if "calls" not in name and "main" not in name and "percentages" not in name:
368 368 stats["main"] -= call_time
369 369 stats["percentages"][name] = math.floor((call_time / main * 100.0))
370 370 stats["percentages"]["main"] -= stats["percentages"][name]
371 371 if stats["percentages"]["main"] < 0.0:
372 372 stats["percentages"]["main"] = 0.0
373 373 stats["main"] = 0.0
374 374 return stats
375 375
376 376 def generate_grouping_hash(
377 377 self, hash_string=None, default_grouping=None, protocol_version=None
378 378 ):
379 379 """
380 380 Generates SHA1 hash that will be used to group reports together
381 381 """
382 382 if not hash_string:
383 383 location = self.tags.get("view_name") or self.url_path
384 384 server_name = self.tags.get("server_name") or ""
385 385 if default_grouping == "url_traceback":
386 386 hash_string = "%s_%s_%s" % (self.traceback_hash, location, self.error)
387 387 if self.language == Language.javascript:
388 388 hash_string = "%s_%s" % (self.traceback_hash, self.error)
389 389
390 390 elif default_grouping == "traceback_server":
391 391 hash_string = "%s_%s" % (self.traceback_hash, server_name)
392 392 if self.language == Language.javascript:
393 393 hash_string = "%s_%s" % (self.traceback_hash, server_name)
394 394 else:
395 395 hash_string = "%s_%s" % (self.error, location)
396 396 month = datetime.utcnow().date().replace(day=1)
397 397 hash_string = "{}_{}".format(month, hash_string)
398 398 binary_string = hash_string.encode("utf8")
399 399 self.grouping_hash = hashlib.sha1(binary_string).hexdigest()
400 400 return self.grouping_hash
401 401
402 402 def stripped_traceback(self):
403 403 """
404 404 Traceback without local vars
405 405 """
406 406 stripped_traceback = copy.deepcopy(self.traceback)
407 407
408 408 if isinstance(stripped_traceback, list):
409 409 for row in stripped_traceback:
410 410 row.pop("vars", None)
411 411 return stripped_traceback
412 412
413 413 def notify_channel(self, report_group):
414 414 """
415 415 Sends notification to websocket channel
416 416 """
417 417 settings = get_current_registry().settings
418 418 log.info("notify channelstream")
419 419 if self.report_type != ReportType.error:
420 420 return
421 421 payload = {
422 422 "type": "message",
423 423 "user": "__system__",
424 424 "channel": "app_%s" % self.resource_id,
425 425 "message": {
426 426 "topic": "front_dashboard.new_topic",
427 427 "report": {
428 428 "group": {
429 429 "priority": report_group.priority,
430 430 "first_timestamp": report_group.first_timestamp,
431 431 "last_timestamp": report_group.last_timestamp,
432 432 "average_duration": report_group.average_duration,
433 433 "occurences": report_group.occurences,
434 434 },
435 435 "report_id": self.id,
436 436 "group_id": self.group_id,
437 437 "resource_id": self.resource_id,
438 438 "http_status": self.http_status,
439 439 "url_domain": self.url_domain,
440 440 "url_path": self.url_path,
441 441 "error": self.error or "",
442 442 "server": self.tags.get("server_name"),
443 443 "view_name": self.tags.get("view_name"),
444 444 "front_url": self.get_public_url(),
445 445 },
446 446 },
447 447 }
448 448 channelstream_request(
449 449 settings["cometd.secret"],
450 450 "/message",
451 451 [payload],
452 452 servers=[settings["cometd_servers"]],
453 453 )
454 454
455 455 def es_doc(self):
456 456 tags = {}
457 457 tag_list = []
458 458 for name, value in self.tags.items():
459 459 name = name.replace(".", "_")
460 460 tag_list.append(name)
461 461 tags[name] = {
462 462 "values": convert_es_type(value),
463 463 "numeric_values": value
464 464 if (isinstance(value, (int, float)) and not isinstance(value, bool))
465 465 else None,
466 466 }
467 467
468 468 if "user_name" not in self.tags and self.username:
469 469 tags["user_name"] = {"value": [self.username], "numeric_value": None}
470 470 return {
471 471 "_id": str(self.id),
472 "pg_id": str(self.id),
472 "report_id": str(self.id),
473 473 "resource_id": self.resource_id,
474 474 "http_status": self.http_status or "",
475 475 "start_time": self.start_time,
476 476 "end_time": self.end_time,
477 477 "url_domain": self.url_domain if self.url_domain else "",
478 478 "url_path": self.url_path if self.url_path else "",
479 479 "duration": self.duration,
480 480 "error": self.error if self.error else "",
481 481 "report_type": self.report_type,
482 482 "request_id": self.request_id,
483 483 "ip": self.ip,
484 484 "group_id": str(self.group_id),
485 "_parent": str(self.group_id),
485 "type": "report",
486 "join_field": {"name": "report", "parent": str(self.group_id)},
486 487 "tags": tags,
487 488 "tag_list": tag_list,
489 "_routing": str(self.group_id),
488 490 }
489 491
490 492 @property
491 493 def partition_id(self):
492 494 return "rcae_r_%s" % self.report_group_time.strftime("%Y_%m")
493 495
494 496 def partition_range(self):
495 497 start_date = self.report_group_time.date().replace(day=1)
496 498 end_date = start_date + timedelta(days=40)
497 499 end_date = end_date.replace(day=1)
498 500 return start_date, end_date
499 501
500 502
501 503 def after_insert(mapper, connection, target):
502 504 if not hasattr(target, "_skip_ft_index"):
503 505 data = target.es_doc()
504 506 data.pop("_id", None)
505 507 Datastores.es.index(
506 508 target.partition_id, "report", data, parent=target.group_id, id=target.id
507 509 )
508 510
509 511
510 512 def after_update(mapper, connection, target):
511 513 if not hasattr(target, "_skip_ft_index"):
512 514 data = target.es_doc()
513 515 data.pop("_id", None)
514 516 Datastores.es.index(
515 517 target.partition_id, "report", data, parent=target.group_id, id=target.id
516 518 )
517 519
518 520
519 521 def after_delete(mapper, connection, target):
520 522 if not hasattr(target, "_skip_ft_index"):
521 query = {"query": {"term": {"pg_id": target.id}}}
522 Datastores.es.transport.perform_request(
523 "DELETE", "/{}/{}/_query".format(target.partition_id, "report"), body=query
523 query = {"query": {"term": {"report_id": target.id}}}
524 Datastores.es.delete_by_query(
525 index=target.partition_id,
526 doc_type="report",
527 body=query,
528 conflicts="proceed",
524 529 )
525 530
526 531
527 532 sa.event.listen(Report, "after_insert", after_insert)
528 533 sa.event.listen(Report, "after_update", after_update)
529 534 sa.event.listen(Report, "after_delete", after_delete)
@@ -1,287 +1,283 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import sqlalchemy as sa
19 19
20 20 from datetime import datetime, timedelta
21 21
22 22 from pyramid.threadlocal import get_current_request
23 23 from sqlalchemy.dialects.postgresql import JSON
24 24 from ziggurat_foundations.models.base import BaseModel
25 25
26 26 from appenlight.models import Base, get_db_session, Datastores
27 27 from appenlight.lib.enums import ReportType
28 28 from appenlight.lib.rule import Rule
29 29 from appenlight.lib.redis_keys import REDIS_KEYS
30 30 from appenlight.models.report import REPORT_TYPE_MATRIX
31 31
32 32 log = logging.getLogger(__name__)
33 33
34 34
35 35 class ReportGroup(Base, BaseModel):
36 36 __tablename__ = "reports_groups"
37 37 __table_args__ = {"implicit_returning": False}
38 38
39 39 id = sa.Column(sa.BigInteger(), nullable=False, primary_key=True)
40 40 resource_id = sa.Column(
41 41 sa.Integer(),
42 42 sa.ForeignKey(
43 43 "applications.resource_id", onupdate="CASCADE", ondelete="CASCADE"
44 44 ),
45 45 nullable=False,
46 46 index=True,
47 47 )
48 48 priority = sa.Column(
49 49 sa.Integer, nullable=False, index=True, default=5, server_default="5"
50 50 )
51 51 first_timestamp = sa.Column(
52 52 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
53 53 )
54 54 last_timestamp = sa.Column(
55 55 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
56 56 )
57 57 error = sa.Column(sa.UnicodeText(), index=True)
58 58 grouping_hash = sa.Column(sa.String(40), default="")
59 59 triggered_postprocesses_ids = sa.Column(JSON(), nullable=False, default=list)
60 60 report_type = sa.Column(sa.Integer, default=1)
61 61 total_reports = sa.Column(sa.Integer, default=1)
62 62 last_report = sa.Column(sa.Integer)
63 63 occurences = sa.Column(sa.Integer, default=1)
64 64 average_duration = sa.Column(sa.Float, default=0)
65 65 summed_duration = sa.Column(sa.Float, default=0)
66 66 read = sa.Column(sa.Boolean(), index=True, default=False)
67 67 fixed = sa.Column(sa.Boolean(), index=True, default=False)
68 68 notified = sa.Column(sa.Boolean(), index=True, default=False)
69 69 public = sa.Column(sa.Boolean(), index=True, default=False)
70 70
71 71 reports = sa.orm.relationship(
72 72 "Report",
73 73 lazy="dynamic",
74 74 backref="report_group",
75 75 cascade="all, delete-orphan",
76 76 passive_deletes=True,
77 77 passive_updates=True,
78 78 )
79 79
80 80 comments = sa.orm.relationship(
81 81 "ReportComment",
82 82 lazy="dynamic",
83 83 backref="report",
84 84 cascade="all, delete-orphan",
85 85 passive_deletes=True,
86 86 passive_updates=True,
87 87 order_by="ReportComment.comment_id",
88 88 )
89 89
90 90 assigned_users = sa.orm.relationship(
91 91 "User",
92 92 backref=sa.orm.backref(
93 93 "assigned_reports_relation",
94 94 lazy="dynamic",
95 95 order_by=sa.desc(sa.text("reports_groups.id")),
96 96 ),
97 97 passive_deletes=True,
98 98 passive_updates=True,
99 99 secondary="reports_assignments",
100 100 order_by="User.user_name",
101 101 )
102 102
103 103 stats = sa.orm.relationship(
104 104 "ReportStat",
105 105 lazy="dynamic",
106 106 backref="report",
107 107 passive_deletes=True,
108 108 passive_updates=True,
109 109 )
110 110
111 111 last_report_ref = sa.orm.relationship(
112 112 "Report",
113 113 uselist=False,
114 114 primaryjoin="ReportGroup.last_report " "== Report.id",
115 115 foreign_keys="Report.id",
116 116 cascade="all, delete-orphan",
117 117 passive_deletes=True,
118 118 passive_updates=True,
119 119 )
120 120
121 121 def __repr__(self):
122 122 return "<ReportGroup id:{}>".format(self.id)
123 123
124 124 def get_report(self, report_id=None, public=False):
125 125 """
126 126 Gets report with specific id or latest report if id was not specified
127 127 """
128 128 from .report import Report
129 129
130 130 if not report_id:
131 131 return self.last_report_ref
132 132 else:
133 133 return self.reports.filter(Report.id == report_id).first()
134 134
135 135 def get_public_url(self, request, _app_url=None):
136 136 url = request.route_url("/", _app_url=_app_url)
137 137 return (url + "ui/report/%s") % self.id
138 138
139 139 def run_postprocessing(self, report):
140 140 """
141 141 Alters report group priority based on postprocessing configuration
142 142 """
143 143 request = get_current_request()
144 144 get_db_session(None, self).flush()
145 145 for action in self.application.postprocess_conf:
146 146 get_db_session(None, self).flush()
147 147 rule_obj = Rule(action.rule, REPORT_TYPE_MATRIX)
148 148 report_dict = report.get_dict(request)
149 149 # if was not processed yet
150 150 if (
151 151 rule_obj.match(report_dict)
152 152 and action.pkey not in self.triggered_postprocesses_ids
153 153 ):
154 154 action.postprocess(self)
155 155 # this way sqla can track mutation of list
156 156 self.triggered_postprocesses_ids = self.triggered_postprocesses_ids + [
157 157 action.pkey
158 158 ]
159 159
160 160 get_db_session(None, self).flush()
161 161 # do not go out of bounds
162 162 if self.priority < 1:
163 163 self.priority = 1
164 164 if self.priority > 10:
165 165 self.priority = 10
166 166
167 167 def get_dict(self, request):
168 168 instance_dict = super(ReportGroup, self).get_dict()
169 169 instance_dict["server_name"] = self.get_report().tags.get("server_name")
170 170 instance_dict["view_name"] = self.get_report().tags.get("view_name")
171 171 instance_dict["resource_name"] = self.application.resource_name
172 172 instance_dict["report_type"] = self.get_report().report_type
173 173 instance_dict["url_path"] = self.get_report().url_path
174 174 instance_dict["front_url"] = self.get_report().get_public_url(request)
175 175 del instance_dict["triggered_postprocesses_ids"]
176 176 return instance_dict
177 177
178 178 def es_doc(self):
179 179 return {
180 180 "_id": str(self.id),
181 "pg_id": str(self.id),
181 "group_id": str(self.id),
182 182 "resource_id": self.resource_id,
183 183 "error": self.error,
184 184 "fixed": self.fixed,
185 185 "public": self.public,
186 186 "read": self.read,
187 187 "priority": self.priority,
188 188 "occurences": self.occurences,
189 189 "average_duration": self.average_duration,
190 190 "summed_duration": self.summed_duration,
191 191 "first_timestamp": self.first_timestamp,
192 192 "last_timestamp": self.last_timestamp,
193 "type": "report_group",
194 "join_field": {"name": "report_group"},
193 195 }
194 196
195 197 def set_notification_info(self, notify_10=False, notify_100=False):
196 198 """
197 199 Update redis notification maps for notification job
198 200 """
199 201 current_time = datetime.utcnow().replace(second=0, microsecond=0)
200 202 # global app counter
201 203 key = REDIS_KEYS["counters"]["reports_per_type"].format(
202 204 self.report_type, current_time
203 205 )
204 206 redis_pipeline = Datastores.redis.pipeline()
205 207 redis_pipeline.incr(key)
206 208 redis_pipeline.expire(key, 3600 * 24)
207 209 # detailed app notification for alerts and notifications
208 210 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
209 211 redis_pipeline.sadd(
210 212 REDIS_KEYS["apps_that_had_reports_alerting"], self.resource_id
211 213 )
212 214 # only notify for exceptions here
213 215 if self.report_type == ReportType.error:
214 216 redis_pipeline.sadd(REDIS_KEYS["apps_that_had_reports"], self.resource_id)
215 217 redis_pipeline.sadd(
216 218 REDIS_KEYS["apps_that_had_error_reports_alerting"], self.resource_id
217 219 )
218 220 key = REDIS_KEYS["counters"]["report_group_occurences"].format(self.id)
219 221 redis_pipeline.incr(key)
220 222 redis_pipeline.expire(key, 3600 * 24)
221 223 key = REDIS_KEYS["counters"]["report_group_occurences_alerting"].format(self.id)
222 224 redis_pipeline.incr(key)
223 225 redis_pipeline.expire(key, 3600 * 24)
224 226
225 227 if notify_10:
226 228 key = REDIS_KEYS["counters"]["report_group_occurences_10th"].format(self.id)
227 229 redis_pipeline.setex(key, 3600 * 24, 1)
228 230 if notify_100:
229 231 key = REDIS_KEYS["counters"]["report_group_occurences_100th"].format(
230 232 self.id
231 233 )
232 234 redis_pipeline.setex(key, 3600 * 24, 1)
233 235
234 236 key = REDIS_KEYS["reports_to_notify_per_type_per_app"].format(
235 237 self.report_type, self.resource_id
236 238 )
237 239 redis_pipeline.sadd(key, self.id)
238 240 redis_pipeline.expire(key, 3600 * 24)
239 241 key = REDIS_KEYS["reports_to_notify_per_type_per_app_alerting"].format(
240 242 self.report_type, self.resource_id
241 243 )
242 244 redis_pipeline.sadd(key, self.id)
243 245 redis_pipeline.expire(key, 3600 * 24)
244 246 redis_pipeline.execute()
245 247
246 248 @property
247 249 def partition_id(self):
248 250 return "rcae_r_%s" % self.first_timestamp.strftime("%Y_%m")
249 251
250 252 def partition_range(self):
251 253 start_date = self.first_timestamp.date().replace(day=1)
252 254 end_date = start_date + timedelta(days=40)
253 255 end_date = end_date.replace(day=1)
254 256 return start_date, end_date
255 257
256 258
257 259 def after_insert(mapper, connection, target):
258 260 if not hasattr(target, "_skip_ft_index"):
259 261 data = target.es_doc()
260 262 data.pop("_id", None)
261 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
263 Datastores.es.index(target.partition_id, "report", data, id=target.id)
262 264
263 265
264 266 def after_update(mapper, connection, target):
265 267 if not hasattr(target, "_skip_ft_index"):
266 268 data = target.es_doc()
267 269 data.pop("_id", None)
268 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
270 Datastores.es.index(target.partition_id, "report", data, id=target.id)
269 271
270 272
271 273 def after_delete(mapper, connection, target):
272 274 query = {"query": {"term": {"group_id": target.id}}}
273 275 # delete by query
274 Datastores.es.transport.perform_request(
275 "DELETE", "/{}/{}/_query".format(target.partition_id, "report"), body=query
276 )
277 query = {"query": {"term": {"pg_id": target.id}}}
278 Datastores.es.transport.perform_request(
279 "DELETE",
280 "/{}/{}/_query".format(target.partition_id, "report_group"),
281 body=query,
276 Datastores.es.delete_by_query(
277 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
282 278 )
283 279
284 280
285 281 sa.event.listen(ReportGroup, "after_insert", after_insert)
286 282 sa.event.listen(ReportGroup, "after_update", after_update)
287 283 sa.event.listen(ReportGroup, "after_delete", after_delete)
@@ -1,79 +1,81 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import sqlalchemy as sa
18 18
19 19 from appenlight.lib.enums import ReportType
20 20 from appenlight.models import Base
21 21 from ziggurat_foundations.models.base import BaseModel
22 22
23 23
24 24 class ReportStat(Base, BaseModel):
25 25 __tablename__ = "reports_stats"
26 26 __table_args__ = {"implicit_returning": False}
27 27
28 28 group_id = sa.Column(
29 29 sa.BigInteger(), sa.ForeignKey("reports_groups.id"), nullable=False
30 30 )
31 31 resource_id = sa.Column(
32 32 sa.Integer(), sa.ForeignKey("applications.resource_id"), nullable=False
33 33 )
34 34 start_interval = sa.Column(sa.DateTime(), nullable=False)
35 35 occurences = sa.Column(sa.Integer, nullable=True, default=0)
36 36 owner_user_id = sa.Column(sa.Integer(), sa.ForeignKey("users.id"), nullable=True)
37 37 type = sa.Column(sa.Integer, nullable=True, default=0)
38 38 duration = sa.Column(sa.Float, nullable=True, default=0)
39 39 id = sa.Column(sa.BigInteger, nullable=False, primary_key=True)
40 40 server_name = sa.Column(sa.Unicode(128), nullable=False, default="")
41 41 view_name = sa.Column(sa.Unicode(128), nullable=False, default="")
42 42
43 43 @property
44 44 def partition_id(self):
45 45 return "rcae_r_%s" % self.start_interval.strftime("%Y_%m")
46 46
47 47 def es_doc(self):
48 48 return {
49 49 "resource_id": self.resource_id,
50 50 "timestamp": self.start_interval,
51 "pg_id": str(self.id),
51 "report_stat_id": str(self.id),
52 52 "permanent": True,
53 53 "request_id": None,
54 54 "log_level": "ERROR",
55 55 "message": None,
56 56 "namespace": "appenlight.error",
57 "group_id": str(self.group_id),
57 58 "tags": {
58 59 "duration": {"values": self.duration, "numeric_values": self.duration},
59 60 "occurences": {
60 61 "values": self.occurences,
61 62 "numeric_values": self.occurences,
62 63 },
63 64 "group_id": {"values": self.group_id, "numeric_values": self.group_id},
64 65 "type": {
65 66 "values": ReportType.key_from_value(self.type),
66 67 "numeric_values": self.type,
67 68 },
68 69 "server_name": {"values": self.server_name, "numeric_values": None},
69 70 "view_name": {"values": self.view_name, "numeric_values": None},
70 71 },
71 72 "tag_list": [
72 73 "duration",
73 74 "occurences",
74 75 "group_id",
75 76 "type",
76 77 "server_name",
77 78 "view_name",
78 79 ],
80 "type": "report_stat",
79 81 }
@@ -1,222 +1,218 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import paginate
18 18 import logging
19 19 import sqlalchemy as sa
20 20
21 21 from appenlight.models.log import Log
22 22 from appenlight.models import get_db_session, Datastores
23 23 from appenlight.models.services.base import BaseService
24 24 from appenlight.lib.utils import es_index_name_limiter
25 25
26 26 log = logging.getLogger(__name__)
27 27
28 28
29 29 class LogService(BaseService):
30 30 @classmethod
31 31 def get_logs(cls, resource_ids=None, filter_settings=None, db_session=None):
32 32 # ensure we always have id's passed
33 33 if not resource_ids:
34 34 # raise Exception('No App ID passed')
35 35 return []
36 36 db_session = get_db_session(db_session)
37 37 q = db_session.query(Log)
38 38 q = q.filter(Log.resource_id.in_(resource_ids))
39 39 if filter_settings.get("start_date"):
40 40 q = q.filter(Log.timestamp >= filter_settings.get("start_date"))
41 41 if filter_settings.get("end_date"):
42 42 q = q.filter(Log.timestamp <= filter_settings.get("end_date"))
43 43 if filter_settings.get("log_level"):
44 44 q = q.filter(Log.log_level == filter_settings.get("log_level").upper())
45 45 if filter_settings.get("request_id"):
46 46 request_id = filter_settings.get("request_id", "")
47 47 q = q.filter(Log.request_id == request_id.replace("-", ""))
48 48 if filter_settings.get("namespace"):
49 49 q = q.filter(Log.namespace == filter_settings.get("namespace"))
50 50 q = q.order_by(sa.desc(Log.timestamp))
51 51 return q
52 52
53 53 @classmethod
54 54 def es_query_builder(cls, app_ids, filter_settings):
55 55 if not filter_settings:
56 56 filter_settings = {}
57 57
58 58 query = {
59 "query": {
60 "bool": {
61 "filter": [{"terms": {"resource_id": list(app_ids)}}]
62 }
63 }
59 "query": {"bool": {"filter": [{"terms": {"resource_id": list(app_ids)}}]}}
64 60 }
65 61
66 62 start_date = filter_settings.get("start_date")
67 63 end_date = filter_settings.get("end_date")
68 64 filter_part = query["query"]["bool"]["filter"]
69 65
70 66 for tag in filter_settings.get("tags", []):
71 67 tag_values = [v.lower() for v in tag["value"]]
72 68 key = "tags.%s.values" % tag["name"].replace(".", "_")
73 69 filter_part.append({"terms": {key: tag_values}})
74 70
75 71 date_range = {"range": {"timestamp": {}}}
76 72 if start_date:
77 73 date_range["range"]["timestamp"]["gte"] = start_date
78 74 if end_date:
79 75 date_range["range"]["timestamp"]["lte"] = end_date
80 76 if start_date or end_date:
81 77 filter_part.append(date_range)
82 78
83 79 levels = filter_settings.get("level")
84 80 if levels:
85 81 filter_part.append({"terms": {"log_level": levels}})
86 82 namespaces = filter_settings.get("namespace")
87 83 if namespaces:
88 84 filter_part.append({"terms": {"namespace": namespaces}})
89 85
90 86 request_ids = filter_settings.get("request_id")
91 87 if request_ids:
92 88 filter_part.append({"terms": {"request_id": request_ids}})
93 89
94 90 messages = filter_settings.get("message")
95 91 if messages:
96 92 query["query"]["bool"]["must"] = {
97 93 "match": {"message": {"query": " ".join(messages), "operator": "and"}}
98 94 }
99 95 return query
100 96
101 97 @classmethod
102 98 def get_time_series_aggregate(cls, app_ids=None, filter_settings=None):
103 99 if not app_ids:
104 100 return {}
105 101 es_query = cls.es_query_builder(app_ids, filter_settings)
106 102 es_query["aggs"] = {
107 103 "events_over_time": {
108 104 "date_histogram": {
109 105 "field": "timestamp",
110 106 "interval": "1h",
111 107 "min_doc_count": 0,
112 108 "extended_bounds": {
113 109 "max": filter_settings.get("end_date"),
114 110 "min": filter_settings.get("start_date"),
115 111 },
116 112 }
117 113 }
118 114 }
119 115 log.debug(es_query)
120 116 index_names = es_index_name_limiter(
121 117 filter_settings.get("start_date"),
122 118 filter_settings.get("end_date"),
123 119 ixtypes=["logs"],
124 120 )
125 121 if index_names:
126 122 results = Datastores.es.search(
127 123 body=es_query, index=index_names, doc_type="log", size=0
128 124 )
129 125 else:
130 126 results = []
131 127 return results
132 128
133 129 @classmethod
134 130 def get_search_iterator(
135 cls,
136 app_ids=None,
137 page=1,
138 items_per_page=50,
139 order_by=None,
140 filter_settings=None,
141 limit=None,
131 cls,
132 app_ids=None,
133 page=1,
134 items_per_page=50,
135 order_by=None,
136 filter_settings=None,
137 limit=None,
142 138 ):
143 139 if not app_ids:
144 140 return {}, 0
145 141
146 142 es_query = cls.es_query_builder(app_ids, filter_settings)
147 143 sort_query = {"sort": [{"timestamp": {"order": "desc"}}]}
148 144 es_query.update(sort_query)
149 145 log.debug(es_query)
150 146 es_from = (page - 1) * items_per_page
151 147 index_names = es_index_name_limiter(
152 148 filter_settings.get("start_date"),
153 149 filter_settings.get("end_date"),
154 150 ixtypes=["logs"],
155 151 )
156 152 if not index_names:
157 153 return {}, 0
158 154
159 155 results = Datastores.es.search(
160 156 body=es_query,
161 157 index=index_names,
162 158 doc_type="log",
163 159 size=items_per_page,
164 160 from_=es_from,
165 161 )
166 162 if results["hits"]["total"] > 5000:
167 163 count = 5000
168 164 else:
169 165 count = results["hits"]["total"]
170 166 return results["hits"], count
171 167
172 168 @classmethod
173 169 def get_paginator_by_app_ids(
174 cls,
175 app_ids=None,
176 page=1,
177 item_count=None,
178 items_per_page=50,
179 order_by=None,
180 filter_settings=None,
181 exclude_columns=None,
182 db_session=None,
170 cls,
171 app_ids=None,
172 page=1,
173 item_count=None,
174 items_per_page=50,
175 order_by=None,
176 filter_settings=None,
177 exclude_columns=None,
178 db_session=None,
183 179 ):
184 180 if not filter_settings:
185 181 filter_settings = {}
186 182 results, item_count = cls.get_search_iterator(
187 183 app_ids, page, items_per_page, order_by, filter_settings
188 184 )
189 185 paginator = paginate.Page(
190 186 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
191 187 )
192 188 ordered_ids = tuple(
193 item["_source"]["pg_id"] for item in results.get("hits", [])
189 item["_source"]["log_id"] for item in results.get("hits", [])
194 190 )
195 191
196 192 sorted_instance_list = []
197 193 if ordered_ids:
198 194 db_session = get_db_session(db_session)
199 195 query = db_session.query(Log)
200 196 query = query.filter(Log.log_id.in_(ordered_ids))
201 197 query = query.order_by(sa.desc("timestamp"))
202 198 sa_items = query.all()
203 199 # resort by score
204 200 for i_id in ordered_ids:
205 201 for item in sa_items:
206 202 if str(item.log_id) == str(i_id):
207 203 sorted_instance_list.append(item)
208 204 paginator.sa_items = sorted_instance_list
209 205 return paginator
210 206
211 207 @classmethod
212 208 def query_by_primary_key_and_namespace(cls, list_of_pairs, db_session=None):
213 209 db_session = get_db_session(db_session)
214 210 list_of_conditions = []
215 211 query = db_session.query(Log)
216 212 for pair in list_of_pairs:
217 213 list_of_conditions.append(
218 214 sa.and_(Log.primary_key == pair["pk"], Log.namespace == pair["ns"])
219 215 )
220 216 query = query.filter(sa.or_(*list_of_conditions))
221 217 query = query.order_by(sa.asc(Log.timestamp), sa.asc(Log.log_id))
222 218 return query
@@ -1,519 +1,521 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import logging
18 18 import paginate
19 19 import sqlalchemy as sa
20 20 import appenlight.lib.helpers as h
21 21
22 22 from datetime import datetime
23 23
24 24 from appenlight.models import get_db_session, Datastores
25 25 from appenlight.models.report import Report
26 26 from appenlight.models.report_group import ReportGroup
27 27 from appenlight.models.report_comment import ReportComment
28 28 from appenlight.models.user import User
29 29 from appenlight.models.services.base import BaseService
30 30 from appenlight.lib.enums import ReportType
31 31 from appenlight.lib.utils import es_index_name_limiter
32 32
33 33 log = logging.getLogger(__name__)
34 34
35 35
36 36 class ReportGroupService(BaseService):
37 37 @classmethod
38 38 def get_trending(cls, request, filter_settings, limit=15, db_session=None):
39 39 """
40 40 Returns report groups trending for specific time interval
41 41 """
42 42 db_session = get_db_session(db_session)
43 43
44 44 tags = []
45 45 if filter_settings.get("tags"):
46 46 for tag in filter_settings["tags"]:
47 47 tags.append(
48 48 {"terms": {"tags.{}.values".format(tag["name"]): tag["value"]}}
49 49 )
50 50
51 51 index_names = es_index_name_limiter(
52 52 start_date=filter_settings["start_date"],
53 53 end_date=filter_settings["end_date"],
54 54 ixtypes=["reports"],
55 55 )
56 56
57 57 if not index_names or not filter_settings["resource"]:
58 58 return []
59 59
60 60 es_query = {
61 61 "aggs": {
62 62 "parent_agg": {
63 63 "aggs": {
64 64 "groups": {
65 65 "aggs": {
66 66 "sub_agg": {
67 "value_count": {"field": "tags.group_id.values"}
67 "value_count": {
68 "field": "tags.group_id.values.keyword"
69 }
68 70 }
69 71 },
70 72 "filter": {"exists": {"field": "tags.group_id.values"}},
71 73 }
72 74 },
73 "terms": {"field": "tags.group_id.values", "size": limit},
75 "terms": {"field": "tags.group_id.values.keyword", "size": limit},
74 76 }
75 77 },
76 78 "query": {
77 79 "bool": {
78 80 "filter": [
79 {
80 "terms": {
81 "resource_id": [filter_settings["resource"][0]]
82 }
83 },
81 {"terms": {"resource_id": [filter_settings["resource"][0]]}},
84 82 {
85 83 "range": {
86 84 "timestamp": {
87 85 "gte": filter_settings["start_date"],
88 86 "lte": filter_settings["end_date"],
89 87 }
90 88 }
91 89 },
92 90 ]
93 91 }
94 92 },
95 93 }
96 94 if tags:
97 95 es_query["query"]["bool"]["filter"].extend(tags)
98 96
99 97 result = Datastores.es.search(
100 body=es_query, index=index_names, doc_type="log", size=0
98 body=es_query, index=index_names, doc_type="report", size=0
101 99 )
102 100 series = []
103 101 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
104 102 series.append(
105 103 {"key": bucket["key"], "groups": bucket["groups"]["sub_agg"]["value"]}
106 104 )
107 105
108 106 report_groups_d = {}
109 107 for g in series:
110 108 report_groups_d[int(g["key"])] = g["groups"] or 0
111 109
112 110 query = db_session.query(ReportGroup)
113 111 query = query.filter(ReportGroup.id.in_(list(report_groups_d.keys())))
114 112 query = query.options(sa.orm.joinedload(ReportGroup.last_report_ref))
115 113 results = [(report_groups_d[group.id], group) for group in query]
116 114 return sorted(results, reverse=True, key=lambda x: x[0])
117 115
118 116 @classmethod
119 117 def get_search_iterator(
120 118 cls,
121 119 app_ids=None,
122 120 page=1,
123 121 items_per_page=50,
124 122 order_by=None,
125 123 filter_settings=None,
126 124 limit=None,
127 125 ):
128 126 if not app_ids:
129 127 return {}
130 128 if not filter_settings:
131 129 filter_settings = {}
132 130
133 131 query = {
134 132 "size": 0,
135 133 "query": {
136 134 "bool": {
137 135 "must": [],
138 136 "should": [],
139 "filter": [{"terms": {"resource_id": list(app_ids)}}]
137 "filter": [{"terms": {"resource_id": list(app_ids)}}],
140 138 }
141 139 },
142 140 "aggs": {
143 141 "top_groups": {
144 142 "terms": {
145 143 "size": 5000,
146 "field": "_parent",
144 "field": "join_field#report_group",
147 145 "order": {"newest": "desc"},
148 146 },
149 147 "aggs": {
150 148 "top_reports_hits": {
151 149 "top_hits": {"size": 1, "sort": {"start_time": "desc"}}
152 150 },
153 151 "newest": {"max": {"field": "start_time"}},
154 152 },
155 153 }
156 154 },
157 155 }
158 156
159 157 start_date = filter_settings.get("start_date")
160 158 end_date = filter_settings.get("end_date")
161 159 filter_part = query["query"]["bool"]["filter"]
162 160 date_range = {"range": {"start_time": {}}}
163 161 if start_date:
164 162 date_range["range"]["start_time"]["gte"] = start_date
165 163 if end_date:
166 164 date_range["range"]["start_time"]["lte"] = end_date
167 165 if start_date or end_date:
168 166 filter_part.append(date_range)
169 167
170 168 priorities = filter_settings.get("priority")
171 169
172 170 for tag in filter_settings.get("tags", []):
173 171 tag_values = [v.lower() for v in tag["value"]]
174 172 key = "tags.%s.values" % tag["name"].replace(".", "_")
175 173 filter_part.append({"terms": {key: tag_values}})
176 174
177 175 if priorities:
178 176 filter_part.append(
179 177 {
180 178 "has_parent": {
181 179 "parent_type": "report_group",
182 180 "query": {"terms": {"priority": priorities}},
183 181 }
184 182 }
185 183 )
186 184
187 185 min_occurences = filter_settings.get("min_occurences")
188 186 if min_occurences:
189 187 filter_part.append(
190 188 {
191 189 "has_parent": {
192 190 "parent_type": "report_group",
193 191 "query": {"range": {"occurences": {"gte": min_occurences[0]}}},
194 192 }
195 193 }
196 194 )
197 195
198 196 min_duration = filter_settings.get("min_duration")
199 197 max_duration = filter_settings.get("max_duration")
200 198
201 199 request_ids = filter_settings.get("request_id")
202 200 if request_ids:
203 201 filter_part.append({"terms": {"request_id": request_ids}})
204 202
205 203 duration_range = {"range": {"average_duration": {}}}
206 204 if min_duration:
207 205 duration_range["range"]["average_duration"]["gte"] = min_duration[0]
208 206 if max_duration:
209 207 duration_range["range"]["average_duration"]["lte"] = max_duration[0]
210 208 if min_duration or max_duration:
211 209 filter_part.append(
212 210 {"has_parent": {"parent_type": "report_group", "query": duration_range}}
213 211 )
214 212
215 213 http_status = filter_settings.get("http_status")
216 214 report_type = filter_settings.get("report_type", [ReportType.error])
217 215 # set error report type if http status is not found
218 216 # and we are dealing with slow reports
219 217 if not http_status or ReportType.slow in report_type:
220 218 filter_part.append({"terms": {"report_type": report_type}})
221 219 if http_status:
222 220 filter_part.append({"terms": {"http_status": http_status}})
223 221
224 222 messages = filter_settings.get("message")
225 223 if messages:
226 224 condition = {"match": {"message": " ".join(messages)}}
227 225 query["query"]["bool"]["must"].append(condition)
228 226 errors = filter_settings.get("error")
229 227 if errors:
230 228 condition = {"match": {"error": " ".join(errors)}}
231 229 query["query"]["bool"]["must"].append(condition)
232 230 url_domains = filter_settings.get("url_domain")
233 231 if url_domains:
234 232 condition = {"terms": {"url_domain": url_domains}}
235 233 query["query"]["bool"]["must"].append(condition)
236 234 url_paths = filter_settings.get("url_path")
237 235 if url_paths:
238 236 condition = {"terms": {"url_path": url_paths}}
239 237 query["query"]["bool"]["must"].append(condition)
240 238
241 239 if filter_settings.get("report_status"):
242 240 for status in filter_settings.get("report_status"):
243 241 if status == "never_reviewed":
244 242 filter_part.append(
245 243 {
246 244 "has_parent": {
247 245 "parent_type": "report_group",
248 246 "query": {"term": {"read": False}},
249 247 }
250 248 }
251 249 )
252 250 elif status == "reviewed":
253 251 filter_part.append(
254 252 {
255 253 "has_parent": {
256 254 "parent_type": "report_group",
257 255 "query": {"term": {"read": True}},
258 256 }
259 257 }
260 258 )
261 259 elif status == "public":
262 260 filter_part.append(
263 261 {
264 262 "has_parent": {
265 263 "parent_type": "report_group",
266 264 "query": {"term": {"public": True}},
267 265 }
268 266 }
269 267 )
270 268 elif status == "fixed":
271 269 filter_part.append(
272 270 {
273 271 "has_parent": {
274 272 "parent_type": "report_group",
275 273 "query": {"term": {"fixed": True}},
276 274 }
277 275 }
278 276 )
279 277
280 278 # logging.getLogger('pyelasticsearch').setLevel(logging.DEBUG)
281 279 index_names = es_index_name_limiter(
282 280 filter_settings.get("start_date"),
283 281 filter_settings.get("end_date"),
284 282 ixtypes=["reports"],
285 283 )
286 284 if index_names:
287 285 results = Datastores.es.search(
288 286 body=query,
289 287 index=index_names,
290 288 doc_type=["report", "report_group"],
291 289 size=0,
292 290 )
293 291 else:
294 292 return []
295 293 return results["aggregations"]
296 294
297 295 @classmethod
298 296 def get_paginator_by_app_ids(
299 297 cls,
300 298 app_ids=None,
301 299 page=1,
302 300 item_count=None,
303 301 items_per_page=50,
304 302 order_by=None,
305 303 filter_settings=None,
306 304 exclude_columns=None,
307 305 db_session=None,
308 306 ):
309 307 if not filter_settings:
310 308 filter_settings = {}
311 309 results = cls.get_search_iterator(
312 310 app_ids, page, items_per_page, order_by, filter_settings
313 311 )
314 312
315 313 ordered_ids = []
316 314 if results:
317 315 for item in results["top_groups"]["buckets"]:
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"]
316 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"][
317 "report_id"
318 ]
319 319 ordered_ids.append(pg_id)
320 320 log.info(filter_settings)
321 321 paginator = paginate.Page(
322 322 ordered_ids, items_per_page=items_per_page, **filter_settings
323 323 )
324 324 sa_items = ()
325 325 if paginator.items:
326 326 db_session = get_db_session(db_session)
327 327 # latest report detail
328 328 query = db_session.query(Report)
329 329 query = query.options(sa.orm.joinedload(Report.report_group))
330 330 query = query.filter(Report.id.in_(paginator.items))
331 331 if filter_settings.get("order_col"):
332 332 order_col = filter_settings.get("order_col")
333 333 if filter_settings.get("order_dir") == "dsc":
334 334 sort_on = "desc"
335 335 else:
336 336 sort_on = "asc"
337 337 if order_col == "when":
338 338 order_col = "last_timestamp"
339 339 query = query.order_by(
340 340 getattr(sa, sort_on)(getattr(ReportGroup, order_col))
341 341 )
342 342 sa_items = query.all()
343 343 sorted_instance_list = []
344 344 for i_id in ordered_ids:
345 345 for report in sa_items:
346 346 if str(report.id) == i_id and report not in sorted_instance_list:
347 347 sorted_instance_list.append(report)
348 348 paginator.sa_items = sorted_instance_list
349 349 return paginator
350 350
351 351 @classmethod
352 352 def by_app_ids(cls, app_ids=None, order_by=True, db_session=None):
353 353 db_session = get_db_session(db_session)
354 354 q = db_session.query(ReportGroup)
355 355 if app_ids:
356 356 q = q.filter(ReportGroup.resource_id.in_(app_ids))
357 357 if order_by:
358 358 q = q.order_by(sa.desc(ReportGroup.id))
359 359 return q
360 360
361 361 @classmethod
362 362 def by_id(cls, group_id, app_ids=None, db_session=None):
363 363 db_session = get_db_session(db_session)
364 364 q = db_session.query(ReportGroup).filter(ReportGroup.id == int(group_id))
365 365 if app_ids:
366 366 q = q.filter(ReportGroup.resource_id.in_(app_ids))
367 367 return q.first()
368 368
369 369 @classmethod
370 370 def by_ids(cls, group_ids=None, db_session=None):
371 371 db_session = get_db_session(db_session)
372 372 query = db_session.query(ReportGroup)
373 373 query = query.filter(ReportGroup.id.in_(group_ids))
374 374 return query
375 375
376 376 @classmethod
377 377 def by_hash_and_resource(
378 378 cls, resource_id, grouping_hash, since_when=None, db_session=None
379 379 ):
380 380 db_session = get_db_session(db_session)
381 381 q = db_session.query(ReportGroup)
382 382 q = q.filter(ReportGroup.resource_id == resource_id)
383 383 q = q.filter(ReportGroup.grouping_hash == grouping_hash)
384 384 q = q.filter(ReportGroup.fixed == False)
385 385 if since_when:
386 386 q = q.filter(ReportGroup.first_timestamp >= since_when)
387 387 return q.first()
388 388
389 389 @classmethod
390 390 def users_commenting(cls, report_group, exclude_user_id=None, db_session=None):
391 391 db_session = get_db_session(None, report_group)
392 392 query = db_session.query(User).distinct()
393 393 query = query.filter(User.id == ReportComment.owner_id)
394 394 query = query.filter(ReportComment.group_id == report_group.id)
395 395 if exclude_user_id:
396 396 query = query.filter(ReportComment.owner_id != exclude_user_id)
397 397 return query
398 398
399 399 @classmethod
400 400 def affected_users_count(cls, report_group, db_session=None):
401 401 db_session = get_db_session(db_session)
402 402 query = db_session.query(sa.func.count(Report.username))
403 403 query = query.filter(Report.group_id == report_group.id)
404 404 query = query.filter(Report.username != "")
405 405 query = query.filter(Report.username != None)
406 406 query = query.group_by(Report.username)
407 407 return query.count()
408 408
409 409 @classmethod
410 410 def top_affected_users(cls, report_group, db_session=None):
411 411 db_session = get_db_session(db_session)
412 412 count_label = sa.func.count(Report.username).label("count")
413 413 query = db_session.query(Report.username, count_label)
414 414 query = query.filter(Report.group_id == report_group.id)
415 415 query = query.filter(Report.username != None)
416 416 query = query.filter(Report.username != "")
417 417 query = query.group_by(Report.username)
418 418 query = query.order_by(sa.desc(count_label))
419 419 query = query.limit(50)
420 420 return query
421 421
422 422 @classmethod
423 423 def get_report_stats(cls, request, filter_settings):
424 424 """
425 425 Gets report dashboard graphs
426 426 Returns information for BAR charts with occurences/interval information
427 427 detailed means version that returns time intervals - non detailed
428 428 returns total sum
429 429 """
430 430 delta = filter_settings["end_date"] - filter_settings["start_date"]
431 431 if delta < h.time_deltas.get("12h")["delta"]:
432 432 interval = "1m"
433 433 elif delta <= h.time_deltas.get("3d")["delta"]:
434 434 interval = "5m"
435 435 elif delta >= h.time_deltas.get("2w")["delta"]:
436 436 interval = "24h"
437 437 else:
438 438 interval = "1h"
439 439
440 440 group_id = filter_settings.get("group_id")
441 441
442 442 es_query = {
443 443 "aggs": {
444 444 "parent_agg": {
445 445 "aggs": {
446 446 "types": {
447 447 "aggs": {
448 "sub_agg": {"terms": {"field": "tags.type.values"}}
448 "sub_agg": {
449 "terms": {"field": "tags.type.values.keyword"}
450 }
449 451 },
450 452 "filter": {
451 "and": [{"exists": {"field": "tags.type.values"}}]
453 "bool": {
454 "filter": [
455 {"exists": {"field": "tags.type.values"}}
456 ]
457 }
452 458 },
453 459 }
454 460 },
455 461 "date_histogram": {
456 462 "extended_bounds": {
457 463 "max": filter_settings["end_date"],
458 464 "min": filter_settings["start_date"],
459 465 },
460 466 "field": "timestamp",
461 467 "interval": interval,
462 468 "min_doc_count": 0,
463 469 },
464 470 }
465 471 },
466 472 "query": {
467 473 "bool": {
468 474 "filter": [
469 {
470 "terms": {
471 "resource_id": [filter_settings["resource"][0]]
472 }
473 },
475 {"terms": {"resource_id": [filter_settings["resource"][0]]}},
474 476 {
475 477 "range": {
476 478 "timestamp": {
477 479 "gte": filter_settings["start_date"],
478 480 "lte": filter_settings["end_date"],
479 481 }
480 482 }
481 483 },
482 484 ]
483 485 }
484 486 },
485 487 }
486 488 if group_id:
487 489 parent_agg = es_query["aggs"]["parent_agg"]
488 filters = parent_agg["aggs"]["types"]["filter"]["and"]
490 filters = parent_agg["aggs"]["types"]["filter"]["bool"]["filter"]
489 491 filters.append({"terms": {"tags.group_id.values": [group_id]}})
490 492
491 493 index_names = es_index_name_limiter(
492 494 start_date=filter_settings["start_date"],
493 495 end_date=filter_settings["end_date"],
494 496 ixtypes=["reports"],
495 497 )
496 498
497 499 if not index_names:
498 500 return []
499 501
500 502 result = Datastores.es.search(
501 503 body=es_query, index=index_names, doc_type="log", size=0
502 504 )
503 505 series = []
504 506 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
505 507 point = {
506 508 "x": datetime.utcfromtimestamp(int(bucket["key"]) / 1000),
507 509 "report": 0,
508 510 "not_found": 0,
509 511 "slow_report": 0,
510 512 }
511 513 for subbucket in bucket["types"]["sub_agg"]["buckets"]:
512 514 if subbucket["key"] == "slow":
513 515 point["slow_report"] = subbucket["doc_count"]
514 516 elif subbucket["key"] == "error":
515 517 point["report"] = subbucket["doc_count"]
516 518 elif subbucket["key"] == "not_found":
517 519 point["not_found"] = subbucket["doc_count"]
518 520 series.append(point)
519 521 return series
@@ -1,61 +1,65 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import Datastores
18 18 from appenlight.models.services.base import BaseService
19 19 from appenlight.lib.enums import ReportType
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class ReportStatService(BaseService):
24 24 @classmethod
25 25 def count_by_type(cls, report_type, resource_id, since_when):
26 26 report_type = ReportType.key_from_value(report_type)
27 27
28 28 index_names = es_index_name_limiter(start_date=since_when, ixtypes=["reports"])
29 29
30 30 es_query = {
31 31 "aggs": {
32 32 "reports": {
33 33 "aggs": {
34 "sub_agg": {"value_count": {"field": "tags.group_id.values"}}
34 "sub_agg": {
35 "value_count": {"field": "tags.group_id.values.keyword"}
36 }
35 37 },
36 38 "filter": {
37 "and": [
38 {"terms": {"resource_id": [resource_id]}},
39 {"exists": {"field": "tags.group_id.values"}},
40 ]
39 "bool": {
40 "filter": [
41 {"terms": {"resource_id": [resource_id]}},
42 {"exists": {"field": "tags.group_id.values"}},
43 ]
44 }
41 45 },
42 46 }
43 47 },
44 48 "query": {
45 49 "bool": {
46 50 "filter": [
47 51 {"terms": {"resource_id": [resource_id]}},
48 52 {"terms": {"tags.type.values": [report_type]}},
49 53 {"range": {"timestamp": {"gte": since_when}}},
50 54 ]
51 55 }
52 56 },
53 57 }
54 58
55 59 if index_names:
56 60 result = Datastores.es.search(
57 61 body=es_query, index=index_names, doc_type="log", size=0
58 62 )
59 63 return result["aggregations"]["reports"]["sub_agg"]["value"]
60 64 else:
61 65 return 0
@@ -1,607 +1,623 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from datetime import datetime
18 18
19 19 import appenlight.lib.helpers as h
20 20 from appenlight.models import get_db_session, Datastores
21 21 from appenlight.models.services.base import BaseService
22 22 from appenlight.lib.enums import ReportType
23 23 from appenlight.lib.utils import es_index_name_limiter
24 24
25 25 try:
26 26 from ae_uptime_ce.models.services.uptime_metric import UptimeMetricService
27 27 except ImportError:
28 28 UptimeMetricService = None
29 29
30 30
31 31 def check_key(key, stats, uptime, total_seconds):
32 32 if key not in stats:
33 33 stats[key] = {
34 34 "name": key,
35 35 "requests": 0,
36 36 "errors": 0,
37 37 "tolerated_requests": 0,
38 38 "frustrating_requests": 0,
39 39 "satisfying_requests": 0,
40 40 "total_minutes": total_seconds / 60.0,
41 41 "uptime": uptime,
42 42 "apdex": 0,
43 43 "rpm": 0,
44 44 "response_time": 0,
45 45 "avg_response_time": 0,
46 46 }
47 47
48 48
49 49 class RequestMetricService(BaseService):
50 50 @classmethod
51 51 def get_metrics_stats(cls, request, filter_settings, db_session=None):
52 52 delta = filter_settings["end_date"] - filter_settings["start_date"]
53 53 if delta < h.time_deltas.get("12h")["delta"]:
54 54 interval = "1m"
55 55 elif delta <= h.time_deltas.get("3d")["delta"]:
56 56 interval = "5m"
57 57 elif delta >= h.time_deltas.get("2w")["delta"]:
58 58 interval = "24h"
59 59 else:
60 60 interval = "1h"
61 61
62 62 filter_settings["namespace"] = ["appenlight.request_metric"]
63 63
64 64 es_query = {
65 65 "aggs": {
66 66 "parent_agg": {
67 67 "aggs": {
68 68 "custom": {
69 69 "aggs": {
70 70 "sub_agg": {
71 71 "sum": {"field": "tags.custom.numeric_values"}
72 72 }
73 73 },
74 74 "filter": {
75 75 "exists": {"field": "tags.custom.numeric_values"}
76 76 },
77 77 },
78 78 "main": {
79 79 "aggs": {
80 80 "sub_agg": {
81 81 "sum": {"field": "tags.main.numeric_values"}
82 82 }
83 83 },
84 84 "filter": {"exists": {"field": "tags.main.numeric_values"}},
85 85 },
86 86 "nosql": {
87 87 "aggs": {
88 88 "sub_agg": {
89 89 "sum": {"field": "tags.nosql.numeric_values"}
90 90 }
91 91 },
92 92 "filter": {
93 93 "exists": {"field": "tags.nosql.numeric_values"}
94 94 },
95 95 },
96 96 "remote": {
97 97 "aggs": {
98 98 "sub_agg": {
99 99 "sum": {"field": "tags.remote.numeric_values"}
100 100 }
101 101 },
102 102 "filter": {
103 103 "exists": {"field": "tags.remote.numeric_values"}
104 104 },
105 105 },
106 106 "requests": {
107 107 "aggs": {
108 108 "sub_agg": {
109 109 "sum": {"field": "tags.requests.numeric_values"}
110 110 }
111 111 },
112 112 "filter": {
113 113 "exists": {"field": "tags.requests.numeric_values"}
114 114 },
115 115 },
116 116 "sql": {
117 117 "aggs": {
118 118 "sub_agg": {"sum": {"field": "tags.sql.numeric_values"}}
119 119 },
120 120 "filter": {"exists": {"field": "tags.sql.numeric_values"}},
121 121 },
122 122 "tmpl": {
123 123 "aggs": {
124 124 "sub_agg": {
125 125 "sum": {"field": "tags.tmpl.numeric_values"}
126 126 }
127 127 },
128 128 "filter": {"exists": {"field": "tags.tmpl.numeric_values"}},
129 129 },
130 130 },
131 131 "date_histogram": {
132 132 "extended_bounds": {
133 133 "max": filter_settings["end_date"],
134 134 "min": filter_settings["start_date"],
135 135 },
136 136 "field": "timestamp",
137 137 "interval": interval,
138 138 "min_doc_count": 0,
139 139 },
140 140 }
141 141 },
142 142 "query": {
143 143 "bool": {
144 144 "filter": [
145 {
146 "terms": {
147 "resource_id": [filter_settings["resource"][0]]
148 }
149 },
145 {"terms": {"resource_id": [filter_settings["resource"][0]]}},
150 146 {
151 147 "range": {
152 148 "timestamp": {
153 149 "gte": filter_settings["start_date"],
154 150 "lte": filter_settings["end_date"],
155 151 }
156 152 }
157 153 },
158 154 {"terms": {"namespace": ["appenlight.request_metric"]}},
159 155 ]
160 156 }
161 157 },
162 158 }
163 159
164 160 index_names = es_index_name_limiter(
165 161 start_date=filter_settings["start_date"],
166 162 end_date=filter_settings["end_date"],
167 163 ixtypes=["metrics"],
168 164 )
169 165 if not index_names:
170 166 return []
171 167
172 168 result = Datastores.es.search(
173 169 body=es_query, index=index_names, doc_type="log", size=0
174 170 )
175 171
176 172 plot_data = []
177 173 for item in result["aggregations"]["parent_agg"]["buckets"]:
178 174 x_time = datetime.utcfromtimestamp(int(item["key"]) / 1000)
179 175 point = {"x": x_time}
180 176 for key in ["custom", "main", "nosql", "remote", "requests", "sql", "tmpl"]:
181 177 value = item[key]["sub_agg"]["value"]
182 178 point[key] = round(value, 3) if value else 0
183 179 plot_data.append(point)
184 180
185 181 return plot_data
186 182
187 183 @classmethod
188 184 def get_requests_breakdown(cls, request, filter_settings, db_session=None):
189 185 db_session = get_db_session(db_session)
190 186
191 187 # fetch total time of all requests in this time range
192 188 index_names = es_index_name_limiter(
193 189 start_date=filter_settings["start_date"],
194 190 end_date=filter_settings["end_date"],
195 191 ixtypes=["metrics"],
196 192 )
197 193
198 194 if index_names and filter_settings["resource"]:
199 195 es_query = {
200 196 "aggs": {
201 197 "main": {
202 198 "aggs": {
203 199 "sub_agg": {"sum": {"field": "tags.main.numeric_values"}}
204 200 },
205 201 "filter": {"exists": {"field": "tags.main.numeric_values"}},
206 202 }
207 203 },
208 204 "query": {
209 205 "bool": {
210 206 "filter": [
211 207 {
212 208 "terms": {
213 209 "resource_id": [filter_settings["resource"][0]]
214 210 }
215 211 },
216 212 {
217 213 "range": {
218 214 "timestamp": {
219 215 "gte": filter_settings["start_date"],
220 216 "lte": filter_settings["end_date"],
221 217 }
222 218 }
223 219 },
224 220 {"terms": {"namespace": ["appenlight.request_metric"]}},
225 221 ]
226 222 }
227 223 },
228 224 }
229 225 result = Datastores.es.search(
230 226 body=es_query, index=index_names, doc_type="log", size=0
231 227 )
232 228 total_time_spent = result["aggregations"]["main"]["sub_agg"]["value"]
233 229 else:
234 230 total_time_spent = 0
235 231 script_text = "doc['tags.main.numeric_values'].value / {}".format(
236 232 total_time_spent
237 233 )
234 if total_time_spent == 0:
235 script_text = "0"
238 236
239 237 if index_names and filter_settings["resource"]:
240 238 es_query = {
241 239 "aggs": {
242 240 "parent_agg": {
243 241 "aggs": {
244 242 "main": {
245 243 "aggs": {
246 244 "sub_agg": {
247 245 "sum": {"field": "tags.main.numeric_values"}
248 246 }
249 247 },
250 248 "filter": {
251 249 "exists": {"field": "tags.main.numeric_values"}
252 250 },
253 251 },
254 252 "percentage": {
255 "aggs": {
256 "sub_agg": {
257 "sum": {
258 "lang": "expression",
259 "script": script_text,
260 }
261 }
262 },
253 "aggs": {"sub_agg": {"sum": {"script": script_text}}},
263 254 "filter": {
264 255 "exists": {"field": "tags.main.numeric_values"}
265 256 },
266 257 },
267 258 "requests": {
268 259 "aggs": {
269 260 "sub_agg": {
270 261 "sum": {"field": "tags.requests.numeric_values"}
271 262 }
272 263 },
273 264 "filter": {
274 265 "exists": {"field": "tags.requests.numeric_values"}
275 266 },
276 267 },
277 268 },
278 269 "terms": {
279 "field": "tags.view_name.values",
270 "field": "tags.view_name.values.keyword",
280 271 "order": {"percentage>sub_agg": "desc"},
281 272 "size": 15,
282 273 },
283 274 }
284 275 },
285 276 "query": {
286 277 "bool": {
287 278 "filter": [
288 279 {
289 280 "terms": {
290 281 "resource_id": [filter_settings["resource"][0]]
291 282 }
292 283 },
293 284 {
294 285 "range": {
295 286 "timestamp": {
296 287 "gte": filter_settings["start_date"],
297 288 "lte": filter_settings["end_date"],
298 289 }
299 290 }
300 291 },
301 292 ]
302 293 }
303 294 },
304 295 }
305 296 result = Datastores.es.search(
306 297 body=es_query, index=index_names, doc_type="log", size=0
307 298 )
308 299 series = result["aggregations"]["parent_agg"]["buckets"]
309 300 else:
310 301 series = []
311 302
312 303 and_part = [
313 304 {"term": {"resource_id": filter_settings["resource"][0]}},
314 305 {"terms": {"tags.view_name.values": [row["key"] for row in series]}},
315 306 {"term": {"report_type": str(ReportType.slow)}},
316 307 ]
317 308 query = {
318 309 "aggs": {
319 310 "top_reports": {
320 "terms": {"field": "tags.view_name.values", "size": len(series)},
311 "terms": {
312 "field": "tags.view_name.values.keyword",
313 "size": len(series),
314 },
321 315 "aggs": {
322 316 "top_calls_hits": {
323 317 "top_hits": {"sort": {"start_time": "desc"}, "size": 5}
324 318 }
325 319 },
326 320 }
327 321 },
328 322 "query": {"bool": {"filter": and_part}},
329 323 }
330 324 details = {}
331 325 index_names = es_index_name_limiter(ixtypes=["reports"])
332 326 if index_names and series:
333 327 result = Datastores.es.search(
334 328 body=query, doc_type="report", size=0, index=index_names
335 329 )
336 330 for bucket in result["aggregations"]["top_reports"]["buckets"]:
337 331 details[bucket["key"]] = []
338 332
339 333 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
340 334 details[bucket["key"]].append(
341 335 {
342 "report_id": hit["_source"]["pg_id"],
336 "report_id": hit["_source"]["report_id"],
343 337 "group_id": hit["_source"]["group_id"],
344 338 }
345 339 )
346 340
347 341 results = []
348 342 for row in series:
349 343 result = {
350 344 "key": row["key"],
351 345 "main": row["main"]["sub_agg"]["value"],
352 346 "requests": row["requests"]["sub_agg"]["value"],
353 347 }
354 348 # es can return 'infinity'
355 349 try:
356 350 result["percentage"] = float(row["percentage"]["sub_agg"]["value"])
357 351 except ValueError:
358 352 result["percentage"] = 0
359 353
360 354 result["latest_details"] = details.get(row["key"]) or []
361 355 results.append(result)
362 356
363 357 return results
364 358
365 359 @classmethod
366 360 def get_apdex_stats(cls, request, filter_settings, threshold=1, db_session=None):
367 361 """
368 362 Returns information and calculates APDEX score per server for dashboard
369 363 server information (upper right stats boxes)
370 364 """
371 365 # Apdex t = (Satisfied Count + Tolerated Count / 2) / Total Samples
372 366 db_session = get_db_session(db_session)
373 367 index_names = es_index_name_limiter(
374 368 start_date=filter_settings["start_date"],
375 369 end_date=filter_settings["end_date"],
376 370 ixtypes=["metrics"],
377 371 )
378 372
379 373 requests_series = []
380 374
381 375 if index_names and filter_settings["resource"]:
382 376 es_query = {
383 377 "aggs": {
384 378 "parent_agg": {
385 379 "aggs": {
386 380 "frustrating": {
387 381 "aggs": {
388 382 "sub_agg": {
389 383 "sum": {"field": "tags.requests.numeric_values"}
390 384 }
391 385 },
392 386 "filter": {
393 "and": [
394 {
395 "range": {
396 "tags.main.numeric_values": {"gte": "4"}
397 }
398 },
399 {
400 "exists": {
401 "field": "tags.requests.numeric_values"
402 }
403 },
404 ]
387 "bool": {
388 "filter": [
389 {
390 "range": {
391 "tags.main.numeric_values": {
392 "gte": "4"
393 }
394 }
395 },
396 {
397 "exists": {
398 "field": "tags.requests.numeric_values"
399 }
400 },
401 ]
402 }
405 403 },
406 404 },
407 405 "main": {
408 406 "aggs": {
409 407 "sub_agg": {
410 408 "sum": {"field": "tags.main.numeric_values"}
411 409 }
412 410 },
413 411 "filter": {
414 412 "exists": {"field": "tags.main.numeric_values"}
415 413 },
416 414 },
417 415 "requests": {
418 416 "aggs": {
419 417 "sub_agg": {
420 418 "sum": {"field": "tags.requests.numeric_values"}
421 419 }
422 420 },
423 421 "filter": {
424 422 "exists": {"field": "tags.requests.numeric_values"}
425 423 },
426 424 },
427 425 "tolerated": {
428 426 "aggs": {
429 427 "sub_agg": {
430 428 "sum": {"field": "tags.requests.numeric_values"}
431 429 }
432 430 },
433 431 "filter": {
434 "and": [
435 {
436 "range": {
437 "tags.main.numeric_values": {"gte": "1"}
438 }
439 },
440 {
441 "range": {
442 "tags.main.numeric_values": {"lt": "4"}
443 }
444 },
445 {
446 "exists": {
447 "field": "tags.requests.numeric_values"
448 }
449 },
450 ]
432 "bool": {
433 "filter": [
434 {
435 "range": {
436 "tags.main.numeric_values": {
437 "gte": "1"
438 }
439 }
440 },
441 {
442 "range": {
443 "tags.main.numeric_values": {
444 "lt": "4"
445 }
446 }
447 },
448 {
449 "exists": {
450 "field": "tags.requests.numeric_values"
451 }
452 },
453 ]
454 }
451 455 },
452 456 },
453 457 },
454 "terms": {"field": "tags.server_name.values", "size": 999999},
458 "terms": {
459 "field": "tags.server_name.values.keyword",
460 "size": 999999,
461 },
455 462 }
456 463 },
457 464 "query": {
458 465 "bool": {
459 466 "filter": [
460 467 {
461 468 "terms": {
462 469 "resource_id": [filter_settings["resource"][0]]
463 470 }
464 471 },
465 472 {
466 473 "range": {
467 474 "timestamp": {
468 475 "gte": filter_settings["start_date"],
469 476 "lte": filter_settings["end_date"],
470 477 }
471 478 }
472 479 },
473 480 {"terms": {"namespace": ["appenlight.request_metric"]}},
474 481 ]
475 482 }
476 483 },
477 484 }
478 485
479 486 result = Datastores.es.search(
480 487 body=es_query, index=index_names, doc_type="log", size=0
481 488 )
482 489 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
483 490 requests_series.append(
484 491 {
485 492 "frustrating": bucket["frustrating"]["sub_agg"]["value"],
486 493 "main": bucket["main"]["sub_agg"]["value"],
487 494 "requests": bucket["requests"]["sub_agg"]["value"],
488 495 "tolerated": bucket["tolerated"]["sub_agg"]["value"],
489 496 "key": bucket["key"],
490 497 }
491 498 )
492 499
493 500 since_when = filter_settings["start_date"]
494 501 until = filter_settings["end_date"]
495 502
496 503 # total errors
497 504
498 505 index_names = es_index_name_limiter(
499 506 start_date=filter_settings["start_date"],
500 507 end_date=filter_settings["end_date"],
501 508 ixtypes=["reports"],
502 509 )
503 510
504 511 report_series = []
505 512 if index_names and filter_settings["resource"]:
506 513 report_type = ReportType.key_from_value(ReportType.error)
507 514 es_query = {
508 515 "aggs": {
509 516 "parent_agg": {
510 517 "aggs": {
511 518 "errors": {
512 519 "aggs": {
513 520 "sub_agg": {
514 521 "sum": {
515 522 "field": "tags.occurences.numeric_values"
516 523 }
517 524 }
518 525 },
519 526 "filter": {
520 "and": [
521 {"terms": {"tags.type.values": [report_type]}},
522 {
523 "exists": {
524 "field": "tags.occurences.numeric_values"
525 }
526 },
527 ]
527 "bool": {
528 "filter": [
529 {
530 "terms": {
531 "tags.type.values": [report_type]
532 }
533 },
534 {
535 "exists": {
536 "field": "tags.occurences.numeric_values"
537 }
538 },
539 ]
540 }
528 541 },
529 542 }
530 543 },
531 "terms": {"field": "tags.server_name.values", "size": 999999},
544 "terms": {
545 "field": "tags.server_name.values.keyword",
546 "size": 999999,
547 },
532 548 }
533 549 },
534 550 "query": {
535 551 "bool": {
536 552 "filter": [
537 553 {
538 554 "terms": {
539 555 "resource_id": [filter_settings["resource"][0]]
540 556 }
541 557 },
542 558 {
543 559 "range": {
544 560 "timestamp": {
545 561 "gte": filter_settings["start_date"],
546 562 "lte": filter_settings["end_date"],
547 563 }
548 564 }
549 565 },
550 566 {"terms": {"namespace": ["appenlight.error"]}},
551 567 ]
552 568 }
553 569 },
554 570 }
555 571 result = Datastores.es.search(
556 572 body=es_query, index=index_names, doc_type="log", size=0
557 573 )
558 574 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
559 575 report_series.append(
560 576 {
561 577 "key": bucket["key"],
562 578 "errors": bucket["errors"]["sub_agg"]["value"],
563 579 }
564 580 )
565 581
566 582 stats = {}
567 583 if UptimeMetricService is not None:
568 584 uptime = UptimeMetricService.get_uptime_by_app(
569 585 filter_settings["resource"][0], since_when=since_when, until=until
570 586 )
571 587 else:
572 588 uptime = 0
573 589
574 590 total_seconds = (until - since_when).total_seconds()
575 591
576 592 for stat in requests_series:
577 593 check_key(stat["key"], stats, uptime, total_seconds)
578 594 stats[stat["key"]]["requests"] = int(stat["requests"])
579 595 stats[stat["key"]]["response_time"] = stat["main"]
580 596 stats[stat["key"]]["tolerated_requests"] = stat["tolerated"]
581 597 stats[stat["key"]]["frustrating_requests"] = stat["frustrating"]
582 598 for server in report_series:
583 599 check_key(server["key"], stats, uptime, total_seconds)
584 600 stats[server["key"]]["errors"] = server["errors"]
585 601
586 602 server_stats = list(stats.values())
587 603 for stat in server_stats:
588 604 stat["satisfying_requests"] = (
589 605 stat["requests"]
590 606 - stat["errors"]
591 607 - stat["frustrating_requests"]
592 608 - stat["tolerated_requests"]
593 609 )
594 610 if stat["satisfying_requests"] < 0:
595 611 stat["satisfying_requests"] = 0
596 612
597 613 if stat["requests"]:
598 614 stat["avg_response_time"] = round(
599 615 stat["response_time"] / stat["requests"], 3
600 616 )
601 617 qual_requests = (
602 618 stat["satisfying_requests"] + stat["tolerated_requests"] / 2.0
603 619 )
604 620 stat["apdex"] = round((qual_requests / stat["requests"]) * 100, 2)
605 621 stat["rpm"] = round(stat["requests"] / stat["total_minutes"], 2)
606 622
607 623 return sorted(server_stats, key=lambda x: x["name"])
@@ -1,182 +1,181 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 from appenlight.models import get_db_session, Datastores
18 18 from appenlight.models.report import Report
19 19 from appenlight.models.services.base import BaseService
20 20 from appenlight.lib.utils import es_index_name_limiter
21 21
22 22
23 23 class SlowCallService(BaseService):
24 24 @classmethod
25 25 def get_time_consuming_calls(cls, request, filter_settings, db_session=None):
26 26 db_session = get_db_session(db_session)
27 27 # get slow calls from older partitions too
28 28 index_names = es_index_name_limiter(
29 29 start_date=filter_settings["start_date"],
30 30 end_date=filter_settings["end_date"],
31 31 ixtypes=["slow_calls"],
32 32 )
33 33 if index_names and filter_settings["resource"]:
34 34 # get longest time taking hashes
35 35 es_query = {
36 36 "aggs": {
37 37 "parent_agg": {
38 38 "aggs": {
39 39 "duration": {
40 40 "aggs": {
41 41 "sub_agg": {
42 42 "sum": {"field": "tags.duration.numeric_values"}
43 43 }
44 44 },
45 45 "filter": {
46 46 "exists": {"field": "tags.duration.numeric_values"}
47 47 },
48 48 },
49 49 "total": {
50 50 "aggs": {
51 51 "sub_agg": {
52 52 "value_count": {
53 "field": "tags.statement_hash.values"
53 "field": "tags.statement_hash.values.keyword"
54 54 }
55 55 }
56 56 },
57 57 "filter": {
58 58 "exists": {"field": "tags.statement_hash.values"}
59 59 },
60 60 },
61 61 },
62 62 "terms": {
63 "field": "tags.statement_hash.values",
63 "field": "tags.statement_hash.values.keyword",
64 64 "order": {"duration>sub_agg": "desc"},
65 65 "size": 15,
66 66 },
67 67 }
68 68 },
69 69 "query": {
70 70 "bool": {
71 71 "filter": [
72 72 {
73 73 "terms": {
74 74 "resource_id": [filter_settings["resource"][0]]
75 75 }
76 76 },
77 77 {
78 78 "range": {
79 79 "timestamp": {
80 80 "gte": filter_settings["start_date"],
81 81 "lte": filter_settings["end_date"],
82 82 }
83 83 }
84 84 },
85 85 ]
86 86 }
87 87 },
88 88 }
89 89 result = Datastores.es.search(
90 90 body=es_query, index=index_names, doc_type="log", size=0
91 91 )
92 92 results = result["aggregations"]["parent_agg"]["buckets"]
93 93 else:
94 94 return []
95 95 hashes = [i["key"] for i in results]
96 96
97 97 # get queries associated with hashes
98 98 calls_query = {
99 99 "aggs": {
100 100 "top_calls": {
101 "terms": {"field": "tags.statement_hash.values", "size": 15},
101 "terms": {
102 "field": "tags.statement_hash.values.keyword",
103 "size": 15,
104 },
102 105 "aggs": {
103 106 "top_calls_hits": {
104 107 "top_hits": {"sort": {"timestamp": "desc"}, "size": 5}
105 108 }
106 109 },
107 110 }
108 111 },
109 112 "query": {
110 113 "bool": {
111 114 "filter": [
112 {
113 "terms": {
114 "resource_id": [filter_settings["resource"][0]]
115 }
116 },
115 {"terms": {"resource_id": [filter_settings["resource"][0]]}},
117 116 {"terms": {"tags.statement_hash.values": hashes}},
118 117 {
119 118 "range": {
120 119 "timestamp": {
121 120 "gte": filter_settings["start_date"],
122 121 "lte": filter_settings["end_date"],
123 122 }
124 123 }
125 124 },
126 125 ]
127 126 }
128 127 },
129 128 }
130 129 calls = Datastores.es.search(
131 130 body=calls_query, index=index_names, doc_type="log", size=0
132 131 )
133 132 call_results = {}
134 133 report_ids = []
135 134 for call in calls["aggregations"]["top_calls"]["buckets"]:
136 135 hits = call["top_calls_hits"]["hits"]["hits"]
137 136 call_results[call["key"]] = [i["_source"] for i in hits]
138 137 report_ids.extend(
139 138 [i["_source"]["tags"]["report_id"]["values"] for i in hits]
140 139 )
141 140 if report_ids:
142 141 r_query = db_session.query(Report.group_id, Report.id)
143 142 r_query = r_query.filter(Report.id.in_(report_ids))
144 143 r_query = r_query.filter(Report.start_time >= filter_settings["start_date"])
145 144 else:
146 145 r_query = []
147 146 reports_reversed = {}
148 147 for report in r_query:
149 148 reports_reversed[report.id] = report.group_id
150 149
151 150 final_results = []
152 151 for item in results:
153 152 if item["key"] not in call_results:
154 153 continue
155 154 call = call_results[item["key"]][0]
156 155 row = {
157 156 "occurences": item["total"]["sub_agg"]["value"],
158 157 "total_duration": round(item["duration"]["sub_agg"]["value"]),
159 158 "statement": call["message"],
160 159 "statement_type": call["tags"]["type"]["values"],
161 160 "statement_subtype": call["tags"]["subtype"]["values"],
162 161 "statement_hash": item["key"],
163 162 "latest_details": [],
164 163 }
165 164 if row["statement_type"] in ["tmpl", " remote"]:
166 165 params = (
167 166 call["tags"]["parameters"]["values"]
168 167 if "parameters" in call["tags"]
169 168 else ""
170 169 )
171 170 row["statement"] = "{} ({})".format(call["message"], params)
172 171 for call in call_results[item["key"]]:
173 172 report_id = call["tags"]["report_id"]["values"]
174 173 group_id = reports_reversed.get(report_id)
175 174 if group_id:
176 175 row["latest_details"].append(
177 176 {"group_id": group_id, "report_id": report_id}
178 177 )
179 178
180 179 final_results.append(row)
181 180
182 181 return final_results
@@ -1,127 +1,127 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import sqlalchemy as sa
18 18 import hashlib
19 19
20 20 from datetime import datetime, timedelta
21 21 from appenlight.models import Base
22 22 from sqlalchemy.dialects.postgresql import JSON
23 23 from ziggurat_foundations.models.base import BaseModel
24 24
25 25
26 26 class SlowCall(Base, BaseModel):
27 27 __tablename__ = "slow_calls"
28 28 __table_args__ = {"implicit_returning": False}
29 29
30 30 resource_id = sa.Column(sa.Integer(), nullable=False, index=True)
31 31 id = sa.Column(sa.Integer, nullable=False, primary_key=True)
32 32 report_id = sa.Column(
33 33 sa.BigInteger,
34 34 sa.ForeignKey("reports.id", ondelete="cascade", onupdate="cascade"),
35 35 primary_key=True,
36 36 )
37 37 duration = sa.Column(sa.Float(), default=0)
38 38 statement = sa.Column(sa.UnicodeText(), default="")
39 39 statement_hash = sa.Column(sa.Unicode(60), default="")
40 40 parameters = sa.Column(JSON(), nullable=False, default=dict)
41 41 type = sa.Column(sa.Unicode(16), default="")
42 42 subtype = sa.Column(sa.Unicode(16), default=None)
43 43 location = sa.Column(sa.Unicode(255), default="")
44 44 timestamp = sa.Column(
45 45 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
46 46 )
47 47 report_group_time = sa.Column(
48 48 sa.DateTime(), default=datetime.utcnow, server_default=sa.func.now()
49 49 )
50 50
51 51 def set_data(
52 52 self, data, protocol_version=None, resource_id=None, report_group=None
53 53 ):
54 54 self.resource_id = resource_id
55 55 if data.get("start") and data.get("end"):
56 56 self.timestamp = data.get("start")
57 57 d = data.get("end") - data.get("start")
58 58 self.duration = d.total_seconds()
59 59 self.statement = data.get("statement", "")
60 60 self.type = data.get("type", "unknown")[:16]
61 61 self.parameters = data.get("parameters", {})
62 62 self.location = data.get("location", "")[:255]
63 63 self.report_group_time = report_group.first_timestamp
64 64 if "subtype" in data:
65 65 self.subtype = data.get("subtype", "unknown")[:16]
66 66 if self.type == "tmpl":
67 67 self.set_hash("{} {}".format(self.statement, self.parameters))
68 68 else:
69 69 self.set_hash()
70 70
71 71 def set_hash(self, custom_statement=None):
72 72 statement = custom_statement or self.statement
73 73 self.statement_hash = hashlib.sha1(statement.encode("utf8")).hexdigest()
74 74
75 75 @property
76 76 def end_time(self):
77 77 if self.duration and self.timestamp:
78 78 return self.timestamp + timedelta(seconds=self.duration)
79 79 return None
80 80
81 81 def get_dict(self):
82 82 instance_dict = super(SlowCall, self).get_dict()
83 83 instance_dict["children"] = []
84 84 instance_dict["end_time"] = self.end_time
85 85 return instance_dict
86 86
87 87 def es_doc(self):
88 88 doc = {
89 89 "resource_id": self.resource_id,
90 90 "timestamp": self.timestamp,
91 "pg_id": str(self.id),
91 "slow_call_id": str(self.id),
92 92 "permanent": False,
93 93 "request_id": None,
94 94 "log_level": "UNKNOWN",
95 95 "message": self.statement,
96 96 "namespace": "appenlight.slow_call",
97 97 "tags": {
98 98 "report_id": {
99 99 "values": self.report_id,
100 100 "numeric_values": self.report_id,
101 101 },
102 102 "duration": {"values": None, "numeric_values": self.duration},
103 103 "statement_hash": {
104 104 "values": self.statement_hash,
105 105 "numeric_values": None,
106 106 },
107 107 "type": {"values": self.type, "numeric_values": None},
108 108 "subtype": {"values": self.subtype, "numeric_values": None},
109 109 "location": {"values": self.location, "numeric_values": None},
110 110 "parameters": {"values": None, "numeric_values": None},
111 111 },
112 112 "tag_list": [
113 113 "report_id",
114 114 "duration",
115 115 "statement_hash",
116 116 "type",
117 117 "subtype",
118 118 "location",
119 119 ],
120 120 }
121 121 if isinstance(self.parameters, str):
122 122 doc["tags"]["parameters"]["values"] = self.parameters[:255]
123 123 return doc
124 124
125 125 @property
126 126 def partition_id(self):
127 127 return "rcae_sc_%s" % self.report_group_time.strftime("%Y_%m")
@@ -1,437 +1,559 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import argparse
18 18 import datetime
19 19 import logging
20 import copy
20 21
21 22 import sqlalchemy as sa
22 23 import elasticsearch.exceptions
23 24 import elasticsearch.helpers
24 25
25 26 from collections import defaultdict
26 27 from pyramid.paster import setup_logging
27 28 from pyramid.paster import bootstrap
28 29 from appenlight.models import DBSession, Datastores, metadata
29 30 from appenlight.lib import get_callable
30 31 from appenlight.models.report_group import ReportGroup
31 32 from appenlight.models.report import Report
32 33 from appenlight.models.report_stat import ReportStat
33 34 from appenlight.models.log import Log
34 35 from appenlight.models.slow_call import SlowCall
35 36 from appenlight.models.metric import Metric
36 37
37
38 38 log = logging.getLogger(__name__)
39 39
40 40 tables = {
41 41 "slow_calls_p_": [],
42 42 "reports_stats_p_": [],
43 43 "reports_p_": [],
44 44 "reports_groups_p_": [],
45 45 "logs_p_": [],
46 46 "metrics_p_": [],
47 47 }
48 48
49 49
50 50 def detect_tables(table_prefix):
51 51 found_tables = []
52 52 db_tables_query = """
53 53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
54 54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
55 55
56 56 for table in DBSession.execute(db_tables_query).fetchall():
57 57 tablename = table.tablename
58 58 if tablename.startswith(table_prefix):
59 59 t = sa.Table(
60 60 tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
61 61 )
62 62 found_tables.append(t)
63 63 return found_tables
64 64
65 65
66 66 def main():
67 67 """
68 68 Recreates Elasticsearch indexes
69 69 Performs reindex of whole db to Elasticsearch
70 70
71 71 """
72 72
73 73 # need parser twice because we first need to load ini file
74 74 # bootstrap pyramid and then load plugins
75 75 pre_parser = argparse.ArgumentParser(
76 76 description="Reindex AppEnlight data", add_help=False
77 77 )
78 78 pre_parser.add_argument(
79 79 "-c", "--config", required=True, help="Configuration ini file of application"
80 80 )
81 81 pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
82 82 pre_parser.add_argument(
83 83 "-t", "--types", nargs="+", help="Which parts of database should get reindexed"
84 84 )
85 85 args = pre_parser.parse_args()
86 86
87 87 config_uri = args.config
88 88 setup_logging(config_uri)
89 89 log.setLevel(logging.INFO)
90 90 env = bootstrap(config_uri)
91 91 parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
92 92 choices = {
93 93 "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
94 94 "logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
95 95 "metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
96 96 "slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
97 97 "template": "appenlight.scripts.reindex_elasticsearch:update_template",
98 98 }
99 99 for k, v in env["registry"].appenlight_plugins.items():
100 100 if v.get("fulltext_indexer"):
101 101 choices[k] = v["fulltext_indexer"]
102 102 parser.add_argument(
103 103 "-t",
104 104 "--types",
105 105 nargs="*",
106 106 choices=["all"] + list(choices.keys()),
107 107 default=[],
108 108 help="Which parts of database should get reindexed",
109 109 )
110 110 parser.add_argument(
111 111 "-c", "--config", required=True, help="Configuration ini file of application"
112 112 )
113 113 args = parser.parse_args()
114 114
115 115 if "all" in args.types:
116 116 args.types = list(choices.keys())
117 117
118 118 print("Selected types to reindex: {}".format(args.types))
119 119
120 120 log.info("settings {}".format(args.types))
121 121
122 122 if "template" in args.types:
123 123 get_callable(choices["template"])()
124 124 args.types.remove("template")
125 125 for selected in args.types:
126 126 get_callable(choices[selected])()
127 127
128 128
129 129 def update_template():
130 130 try:
131 Datastores.es.indices.delete_template("rcae")
131 Datastores.es.indices.delete_template("rcae_reports")
132 except elasticsearch.exceptions.NotFoundError as e:
133 log.error(e)
134
135 try:
136 Datastores.es.indices.delete_template("rcae_logs")
137 except elasticsearch.exceptions.NotFoundError as e:
138 log.error(e)
139 try:
140 Datastores.es.indices.delete_template("rcae_slow_calls")
141 except elasticsearch.exceptions.NotFoundError as e:
142 log.error(e)
143 try:
144 Datastores.es.indices.delete_template("rcae_metrics")
132 145 except elasticsearch.exceptions.NotFoundError as e:
133 146 log.error(e)
134 147 log.info("updating elasticsearch template")
135 148 tag_templates = [
136 149 {
137 150 "values": {
138 151 "path_match": "tags.*",
139 152 "mapping": {
140 153 "type": "object",
141 154 "properties": {
142 "values": {"type": "string", "analyzer": "tag_value"},
155 "values": {
156 "type": "text",
157 "analyzer": "tag_value",
158 "fields": {
159 "keyword": {"type": "keyword", "ignore_above": 256}
160 },
161 },
143 162 "numeric_values": {"type": "float"},
144 163 },
145 164 },
146 165 }
147 166 }
148 167 ]
149 168
150 template_schema = {
151 "template": "rcae_*",
169 shared_analysis = {
170 "analyzer": {
171 "url_path": {
172 "type": "custom",
173 "char_filter": [],
174 "tokenizer": "path_hierarchy",
175 "filter": [],
176 },
177 "tag_value": {
178 "type": "custom",
179 "char_filter": [],
180 "tokenizer": "keyword",
181 "filter": ["lowercase"],
182 },
183 }
184 }
185
186 shared_log_mapping = {
187 "_all": {"enabled": False},
188 "dynamic_templates": tag_templates,
189 "properties": {
190 "pg_id": {"type": "keyword", "index": True},
191 "delete_hash": {"type": "keyword", "index": True},
192 "resource_id": {"type": "integer"},
193 "timestamp": {"type": "date"},
194 "permanent": {"type": "boolean"},
195 "request_id": {"type": "keyword", "index": True},
196 "log_level": {"type": "text", "analyzer": "simple"},
197 "message": {"type": "text", "analyzer": "simple"},
198 "namespace": {
199 "type": "text",
200 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
201 },
202 "tags": {"type": "object"},
203 "tag_list": {
204 "type": "text",
205 "analyzer": "tag_value",
206 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
207 },
208 },
209 }
210
211 report_schema = {
212 "template": "rcae_r_*",
152 213 "settings": {
153 214 "index": {
154 215 "refresh_interval": "5s",
155 216 "translog": {"sync_interval": "5s", "durability": "async"},
156 217 },
157 218 "number_of_shards": 5,
158 "analysis": {
159 "analyzer": {
160 "url_path": {
161 "type": "custom",
162 "char_filter": [],
163 "tokenizer": "path_hierarchy",
164 "filter": [],
165 },
166 "tag_value": {
167 "type": "custom",
168 "char_filter": [],
169 "tokenizer": "keyword",
170 "filter": ["lowercase"],
171 },
172 }
173 },
219 "analysis": shared_analysis,
174 220 },
175 221 "mappings": {
176 "report_group": {
222 "report": {
177 223 "_all": {"enabled": False},
178 224 "dynamic_templates": tag_templates,
179 225 "properties": {
180 "pg_id": {"type": "string", "index": "not_analyzed"},
226 "type": {"type": "keyword", "index": True},
227 # report group
228 "group_id": {"type": "keyword", "index": True},
181 229 "resource_id": {"type": "integer"},
182 230 "priority": {"type": "integer"},
183 "error": {"type": "string", "analyzer": "simple"},
231 "error": {"type": "text", "analyzer": "simple"},
184 232 "read": {"type": "boolean"},
185 233 "occurences": {"type": "integer"},
186 234 "fixed": {"type": "boolean"},
187 235 "first_timestamp": {"type": "date"},
188 236 "last_timestamp": {"type": "date"},
189 237 "average_duration": {"type": "float"},
190 238 "summed_duration": {"type": "float"},
191 239 "public": {"type": "boolean"},
192 },
193 },
194 "report": {
195 "_all": {"enabled": False},
196 "dynamic_templates": tag_templates,
197 "properties": {
198 "pg_id": {"type": "string", "index": "not_analyzed"},
199 "resource_id": {"type": "integer"},
200 "group_id": {"type": "string"},
240 # report
241 "report_id": {"type": "keyword", "index": True},
201 242 "http_status": {"type": "integer"},
202 "ip": {"type": "string", "index": "not_analyzed"},
203 "url_domain": {"type": "string", "analyzer": "simple"},
204 "url_path": {"type": "string", "analyzer": "url_path"},
205 "error": {"type": "string", "analyzer": "simple"},
243 "ip": {"type": "keyword", "index": True},
244 "url_domain": {"type": "text", "analyzer": "simple"},
245 "url_path": {"type": "text", "analyzer": "url_path"},
206 246 "report_type": {"type": "integer"},
207 247 "start_time": {"type": "date"},
208 "request_id": {"type": "string", "index": "not_analyzed"},
248 "request_id": {"type": "keyword", "index": True},
209 249 "end_time": {"type": "date"},
210 250 "duration": {"type": "float"},
211 251 "tags": {"type": "object"},
212 "tag_list": {"type": "string", "analyzer": "tag_value"},
252 "tag_list": {
253 "type": "text",
254 "analyzer": "tag_value",
255 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
256 },
213 257 "extra": {"type": "object"},
214 },
215 "_parent": {"type": "report_group"},
216 },
217 "log": {
218 "_all": {"enabled": False},
219 "dynamic_templates": tag_templates,
220 "properties": {
221 "pg_id": {"type": "string", "index": "not_analyzed"},
222 "delete_hash": {"type": "string", "index": "not_analyzed"},
223 "resource_id": {"type": "integer"},
258 # report stats
259 "report_stat_id": {"type": "keyword", "index": True},
224 260 "timestamp": {"type": "date"},
225 261 "permanent": {"type": "boolean"},
226 "request_id": {"type": "string", "index": "not_analyzed"},
227 "log_level": {"type": "string", "analyzer": "simple"},
228 "message": {"type": "string", "analyzer": "simple"},
229 "namespace": {"type": "string", "index": "not_analyzed"},
230 "tags": {"type": "object"},
231 "tag_list": {"type": "string", "analyzer": "tag_value"},
262 "log_level": {"type": "text", "analyzer": "simple"},
263 "message": {"type": "text", "analyzer": "simple"},
264 "namespace": {
265 "type": "text",
266 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
267 },
268 "join_field": {
269 "type": "join",
270 "relations": {"report_group": ["report", "report_stat"]},
271 },
232 272 },
273 }
274 },
275 }
276
277 Datastores.es.indices.put_template("rcae_reports", body=report_schema)
278
279 logs_mapping = copy.deepcopy(shared_log_mapping)
280 logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"]
281 del logs_mapping["properties"]["pg_id"]
282
283 log_template = {
284 "template": "rcae_l_*",
285 "settings": {
286 "index": {
287 "refresh_interval": "5s",
288 "translog": {"sync_interval": "5s", "durability": "async"},
233 289 },
290 "number_of_shards": 5,
291 "analysis": shared_analysis,
234 292 },
293 "mappings": {"log": logs_mapping},
235 294 }
236 295
237 Datastores.es.indices.put_template("rcae", body=template_schema)
296 Datastores.es.indices.put_template("rcae_logs", body=log_template)
297
298 slow_call_mapping = copy.deepcopy(shared_log_mapping)
299 slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"][
300 "pg_id"
301 ]
302 del slow_call_mapping["properties"]["pg_id"]
303
304 slow_call_template = {
305 "template": "rcae_sc_*",
306 "settings": {
307 "index": {
308 "refresh_interval": "5s",
309 "translog": {"sync_interval": "5s", "durability": "async"},
310 },
311 "number_of_shards": 5,
312 "analysis": shared_analysis,
313 },
314 "mappings": {"log": slow_call_mapping},
315 }
316
317 Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template)
318
319 metric_mapping = copy.deepcopy(shared_log_mapping)
320 metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"]
321 del metric_mapping["properties"]["pg_id"]
322
323 metrics_template = {
324 "template": "rcae_m_*",
325 "settings": {
326 "index": {
327 "refresh_interval": "5s",
328 "translog": {"sync_interval": "5s", "durability": "async"},
329 },
330 "number_of_shards": 5,
331 "analysis": shared_analysis,
332 },
333 "mappings": {"log": metric_mapping},
334 }
335
336 Datastores.es.indices.put_template("rcae_metrics", body=metrics_template)
337
338 uptime_metric_mapping = copy.deepcopy(shared_log_mapping)
339 uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping[
340 "properties"
341 ]["pg_id"]
342 del uptime_metric_mapping["properties"]["pg_id"]
343
344 uptime_metrics_template = {
345 "template": "rcae_uptime_ce_*",
346 "settings": {
347 "index": {
348 "refresh_interval": "5s",
349 "translog": {"sync_interval": "5s", "durability": "async"},
350 },
351 "number_of_shards": 5,
352 "analysis": shared_analysis,
353 },
354 "mappings": {"log": shared_log_mapping},
355 }
356
357 Datastores.es.indices.put_template(
358 "rcae_uptime_metrics", body=uptime_metrics_template
359 )
238 360
239 361
240 362 def reindex_reports():
241 363 reports_groups_tables = detect_tables("reports_groups_p_")
242 364 try:
243 Datastores.es.indices.delete("rcae_r*")
365 Datastores.es.indices.delete("`rcae_r_*")
244 366 except elasticsearch.exceptions.NotFoundError as e:
245 367 log.error(e)
246 368
247 369 log.info("reindexing report groups")
248 370 i = 0
249 371 task_start = datetime.datetime.now()
250 372 for partition_table in reports_groups_tables:
251 373 conn = DBSession.connection().execution_options(stream_results=True)
252 374 result = conn.execute(partition_table.select())
253 375 while True:
254 376 chunk = result.fetchmany(2000)
255 377 if not chunk:
256 378 break
257 379 es_docs = defaultdict(list)
258 380 for row in chunk:
259 381 i += 1
260 382 item = ReportGroup(**dict(list(row.items())))
261 383 d_range = item.partition_id
262 384 es_docs[d_range].append(item.es_doc())
263 385 if es_docs:
264 386 name = partition_table.name
265 387 log.info("round {}, {}".format(i, name))
266 388 for k, v in es_docs.items():
267 to_update = {"_index": k, "_type": "report_group"}
389 to_update = {"_index": k, "_type": "report"}
268 390 [i.update(to_update) for i in v]
269 391 elasticsearch.helpers.bulk(Datastores.es, v)
270 392
271 393 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
272 394
273 395 i = 0
274 396 log.info("reindexing reports")
275 397 task_start = datetime.datetime.now()
276 398 reports_tables = detect_tables("reports_p_")
277 399 for partition_table in reports_tables:
278 400 conn = DBSession.connection().execution_options(stream_results=True)
279 401 result = conn.execute(partition_table.select())
280 402 while True:
281 403 chunk = result.fetchmany(2000)
282 404 if not chunk:
283 405 break
284 406 es_docs = defaultdict(list)
285 407 for row in chunk:
286 408 i += 1
287 409 item = Report(**dict(list(row.items())))
288 410 d_range = item.partition_id
289 411 es_docs[d_range].append(item.es_doc())
290 412 if es_docs:
291 413 name = partition_table.name
292 414 log.info("round {}, {}".format(i, name))
293 415 for k, v in es_docs.items():
294 416 to_update = {"_index": k, "_type": "report"}
295 417 [i.update(to_update) for i in v]
296 418 elasticsearch.helpers.bulk(Datastores.es, v)
297 419
298 420 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
299 421
300 422 log.info("reindexing reports stats")
301 423 i = 0
302 424 task_start = datetime.datetime.now()
303 425 reports_stats_tables = detect_tables("reports_stats_p_")
304 426 for partition_table in reports_stats_tables:
305 427 conn = DBSession.connection().execution_options(stream_results=True)
306 428 result = conn.execute(partition_table.select())
307 429 while True:
308 430 chunk = result.fetchmany(2000)
309 431 if not chunk:
310 432 break
311 433 es_docs = defaultdict(list)
312 434 for row in chunk:
313 435 rd = dict(list(row.items()))
314 436 # remove legacy columns
315 437 # TODO: remove the column later
316 438 rd.pop("size", None)
317 439 item = ReportStat(**rd)
318 440 i += 1
319 441 d_range = item.partition_id
320 442 es_docs[d_range].append(item.es_doc())
321 443 if es_docs:
322 444 name = partition_table.name
323 445 log.info("round {}, {}".format(i, name))
324 446 for k, v in es_docs.items():
325 to_update = {"_index": k, "_type": "log"}
447 to_update = {"_index": k, "_type": "report"}
326 448 [i.update(to_update) for i in v]
327 449 elasticsearch.helpers.bulk(Datastores.es, v)
328 450
329 451 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
330 452
331 453
332 454 def reindex_logs():
333 455 try:
334 Datastores.es.indices.delete("rcae_l*")
456 Datastores.es.indices.delete("rcae_l_*")
335 457 except elasticsearch.exceptions.NotFoundError as e:
336 458 log.error(e)
337 459
338 460 # logs
339 461 log.info("reindexing logs")
340 462 i = 0
341 463 task_start = datetime.datetime.now()
342 464 log_tables = detect_tables("logs_p_")
343 465 for partition_table in log_tables:
344 466 conn = DBSession.connection().execution_options(stream_results=True)
345 467 result = conn.execute(partition_table.select())
346 468 while True:
347 469 chunk = result.fetchmany(2000)
348 470 if not chunk:
349 471 break
350 472 es_docs = defaultdict(list)
351 473
352 474 for row in chunk:
353 475 i += 1
354 476 item = Log(**dict(list(row.items())))
355 477 d_range = item.partition_id
356 478 es_docs[d_range].append(item.es_doc())
357 479 if es_docs:
358 480 name = partition_table.name
359 481 log.info("round {}, {}".format(i, name))
360 482 for k, v in es_docs.items():
361 483 to_update = {"_index": k, "_type": "log"}
362 484 [i.update(to_update) for i in v]
363 485 elasticsearch.helpers.bulk(Datastores.es, v)
364 486
365 487 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
366 488
367 489
368 490 def reindex_metrics():
369 491 try:
370 Datastores.es.indices.delete("rcae_m*")
492 Datastores.es.indices.delete("rcae_m_*")
371 493 except elasticsearch.exceptions.NotFoundError as e:
372 494 log.error(e)
373 495
374 496 log.info("reindexing applications metrics")
375 497 i = 0
376 498 task_start = datetime.datetime.now()
377 499 metric_tables = detect_tables("metrics_p_")
378 500 for partition_table in metric_tables:
379 501 conn = DBSession.connection().execution_options(stream_results=True)
380 502 result = conn.execute(partition_table.select())
381 503 while True:
382 504 chunk = result.fetchmany(2000)
383 505 if not chunk:
384 506 break
385 507 es_docs = defaultdict(list)
386 508 for row in chunk:
387 509 i += 1
388 510 item = Metric(**dict(list(row.items())))
389 511 d_range = item.partition_id
390 512 es_docs[d_range].append(item.es_doc())
391 513 if es_docs:
392 514 name = partition_table.name
393 515 log.info("round {}, {}".format(i, name))
394 516 for k, v in es_docs.items():
395 517 to_update = {"_index": k, "_type": "log"}
396 518 [i.update(to_update) for i in v]
397 519 elasticsearch.helpers.bulk(Datastores.es, v)
398 520
399 521 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
400 522
401 523
402 524 def reindex_slow_calls():
403 525 try:
404 Datastores.es.indices.delete("rcae_sc*")
526 Datastores.es.indices.delete("rcae_sc_*")
405 527 except elasticsearch.exceptions.NotFoundError as e:
406 528 log.error(e)
407 529
408 530 log.info("reindexing slow calls")
409 531 i = 0
410 532 task_start = datetime.datetime.now()
411 533 slow_calls_tables = detect_tables("slow_calls_p_")
412 534 for partition_table in slow_calls_tables:
413 535 conn = DBSession.connection().execution_options(stream_results=True)
414 536 result = conn.execute(partition_table.select())
415 537 while True:
416 538 chunk = result.fetchmany(2000)
417 539 if not chunk:
418 540 break
419 541 es_docs = defaultdict(list)
420 542 for row in chunk:
421 543 i += 1
422 544 item = SlowCall(**dict(list(row.items())))
423 545 d_range = item.partition_id
424 546 es_docs[d_range].append(item.es_doc())
425 547 if es_docs:
426 548 name = partition_table.name
427 549 log.info("round {}, {}".format(i, name))
428 550 for k, v in es_docs.items():
429 551 to_update = {"_index": k, "_type": "log"}
430 552 [i.update(to_update) for i in v]
431 553 elasticsearch.helpers.bulk(Datastores.es, v)
432 554
433 555 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
434 556
435 557
436 558 if __name__ == "__main__":
437 559 main()
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: modified file
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
1 NO CONTENT: file was removed
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed
The requested commit or file is too big and content was truncated. Show full diff
1 NO CONTENT: file was removed
The requested commit or file is too big and content was truncated. Show full diff
General Comments 1
Under Review
author

Auto status change to "Under Review"

You need to be logged in to leave comments. Login now

Merge is not currently possible because of below failed checks.

  • - User `default` not allowed to perform merge.
  • - Pull request reviewer approval is pending.