##// END OF EJS Templates
elasticsearch: bump to ES 6.x
ergo -
Show More
@@ -1,43 +1,46 b''
1 1 language: python
2 2
3 3 dist: xenial
4 4
5 5 notifications:
6 6 on_success: change
7 7 on_failure: always
8 8
9 9 matrix:
10 10 include:
11 11 - python: 3.5
12 env: TOXENV=py35
12 env: TOXENV=py35 ES_VERSION=6.6.2 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-${ES_VERSION}.tar.gz
13 13 - python: 3.6
14 env: TOXENV=py36
14 env: TOXENV=py36 ES_VERSION=6.6.2 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-${ES_VERSION}.tar.gz
15 15 addons:
16 16 postgresql: "9.6"
17 17 - python: 3.6
18 env: TOXENV=py36 PGPORT=5432
18 env: TOXENV=py36 PGPORT=5432 ES_VERSION=6.6.2 ES_DOWNLOAD_URL=https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-${ES_VERSION}.tar.gz
19 19 addons:
20 20 postgresql: "10"
21 21 apt:
22 22 packages:
23 23 - postgresql-10
24 24 - postgresql-client-10
25 25
26 26 install:
27 - wget ${ES_DOWNLOAD_URL}
28 - tar -xzf elasticsearch-${ES_VERSION}.tar.gz
29 - ./elasticsearch-${ES_VERSION}/bin/elasticsearch &
27 30 - travis_retry pip install -U setuptools pip tox
28 31
29 32 script:
30 33 - travis_retry tox
31 34
32 35 services:
33 36 - postgresql
34 37 - elasticsearch
35 38 - redis
36 39
37 40 before_script:
38 41 - psql -c "create user test with encrypted password 'test';" -U postgres
39 42 - psql -c 'create database appenlight_test owner test;' -U postgres
40 43
41 44 after_success:
42 45 - pip install coveralls
43 46 - coveralls
@@ -1,577 +1,572 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 import argparse
18 18 import datetime
19 19 import logging
20 20 import copy
21 21
22 22 import sqlalchemy as sa
23 23 import elasticsearch.exceptions
24 24 import elasticsearch.helpers
25 25
26 26 from collections import defaultdict
27 27 from pyramid.paster import setup_logging
28 28 from pyramid.paster import bootstrap
29 29 from appenlight.models import DBSession, Datastores, metadata
30 30 from appenlight.lib import get_callable
31 31 from appenlight.models.report_group import ReportGroup
32 32 from appenlight.models.report import Report
33 33 from appenlight.models.report_stat import ReportStat
34 34 from appenlight.models.log import Log
35 35 from appenlight.models.slow_call import SlowCall
36 36 from appenlight.models.metric import Metric
37 37
38 38 log = logging.getLogger(__name__)
39 39
40 40 tables = {
41 41 "slow_calls_p_": [],
42 42 "reports_stats_p_": [],
43 43 "reports_p_": [],
44 44 "reports_groups_p_": [],
45 45 "logs_p_": [],
46 46 "metrics_p_": [],
47 47 }
48 48
49 49
50 50 def detect_tables(table_prefix):
51 51 found_tables = []
52 52 db_tables_query = """
53 53 SELECT tablename FROM pg_tables WHERE tablename NOT LIKE 'pg_%' AND
54 54 tablename NOT LIKE 'sql_%' ORDER BY tablename ASC;"""
55 55
56 56 for table in DBSession.execute(db_tables_query).fetchall():
57 57 tablename = table.tablename
58 58 if tablename.startswith(table_prefix):
59 59 t = sa.Table(
60 60 tablename, metadata, autoload=True, autoload_with=DBSession.bind.engine
61 61 )
62 62 found_tables.append(t)
63 63 return found_tables
64 64
65 65
66 66 def main():
67 67 """
68 68 Recreates Elasticsearch indexes
69 69 Performs reindex of whole db to Elasticsearch
70 70
71 71 """
72 72
73 73 # need parser twice because we first need to load ini file
74 74 # bootstrap pyramid and then load plugins
75 75 pre_parser = argparse.ArgumentParser(
76 76 description="Reindex AppEnlight data", add_help=False
77 77 )
78 78 pre_parser.add_argument(
79 79 "-c", "--config", required=True, help="Configuration ini file of application"
80 80 )
81 81 pre_parser.add_argument("-h", "--help", help="Show help", nargs="?")
82 82 pre_parser.add_argument(
83 83 "-t", "--types", nargs="+", help="Which parts of database should get reindexed"
84 84 )
85 85 args = pre_parser.parse_args()
86 86
87 87 config_uri = args.config
88 88 setup_logging(config_uri)
89 89 log.setLevel(logging.INFO)
90 90 env = bootstrap(config_uri)
91 91 parser = argparse.ArgumentParser(description="Reindex AppEnlight data")
92 92 choices = {
93 93 "reports": "appenlight.scripts.reindex_elasticsearch:reindex_reports",
94 94 "logs": "appenlight.scripts.reindex_elasticsearch:reindex_logs",
95 95 "metrics": "appenlight.scripts.reindex_elasticsearch:reindex_metrics",
96 96 "slow_calls": "appenlight.scripts.reindex_elasticsearch:reindex_slow_calls",
97 97 "template": "appenlight.scripts.reindex_elasticsearch:update_template",
98 98 }
99 99 for k, v in env["registry"].appenlight_plugins.items():
100 100 if v.get("fulltext_indexer"):
101 101 choices[k] = v["fulltext_indexer"]
102 102 parser.add_argument(
103 103 "-t",
104 104 "--types",
105 105 nargs="*",
106 106 choices=["all"] + list(choices.keys()),
107 107 default=[],
108 108 help="Which parts of database should get reindexed",
109 109 )
110 110 parser.add_argument(
111 111 "-c", "--config", required=True, help="Configuration ini file of application"
112 112 )
113 113 args = parser.parse_args()
114 114
115 115 if "all" in args.types:
116 116 args.types = list(choices.keys())
117 117
118 118 print("Selected types to reindex: {}".format(args.types))
119 119
120 120 log.info("settings {}".format(args.types))
121 121
122 122 if "template" in args.types:
123 123 get_callable(choices["template"])()
124 124 args.types.remove("template")
125 125 for selected in args.types:
126 126 get_callable(choices[selected])()
127 127
128 128
129 129 def update_template():
130 130 try:
131 131 Datastores.es.indices.delete_template("rcae_reports")
132 132 except elasticsearch.exceptions.NotFoundError as e:
133 133 log.error(e)
134 134
135 135 try:
136 136 Datastores.es.indices.delete_template("rcae_logs")
137 137 except elasticsearch.exceptions.NotFoundError as e:
138 138 log.error(e)
139 139 try:
140 140 Datastores.es.indices.delete_template("rcae_slow_calls")
141 141 except elasticsearch.exceptions.NotFoundError as e:
142 142 log.error(e)
143 143 try:
144 144 Datastores.es.indices.delete_template("rcae_metrics")
145 145 except elasticsearch.exceptions.NotFoundError as e:
146 146 log.error(e)
147 147 log.info("updating elasticsearch template")
148 148 tag_templates = [
149 149 {
150 150 "values": {
151 151 "path_match": "tags.*",
152 152 "mapping": {
153 153 "type": "object",
154 154 "properties": {
155 155 "values": {"type": "text", "analyzer": "tag_value",
156 156 "fields": {
157 157 "keyword": {
158 158 "type": "keyword",
159 159 "ignore_above": 256
160 160 }
161 161 }},
162 162 "numeric_values": {"type": "float"},
163 163 },
164 164 },
165 165 }
166 166 }
167 167 ]
168 168
169 169 shared_analysis = {
170 170 "analyzer": {
171 171 "url_path": {
172 172 "type": "custom",
173 173 "char_filter": [],
174 174 "tokenizer": "path_hierarchy",
175 175 "filter": [],
176 176 },
177 177 "tag_value": {
178 178 "type": "custom",
179 179 "char_filter": [],
180 180 "tokenizer": "keyword",
181 181 "filter": ["lowercase"],
182 182 },
183 183 }
184 184 }
185 185
186 186 shared_log_mapping = {
187 187 "_all": {"enabled": False},
188 188 "dynamic_templates": tag_templates,
189 189 "properties": {
190 190 "pg_id": {"type": "keyword", "index": True},
191 191 "delete_hash": {"type": "keyword", "index": True},
192 192 "resource_id": {"type": "integer"},
193 193 "timestamp": {"type": "date"},
194 194 "permanent": {"type": "boolean"},
195 195 "request_id": {"type": "keyword", "index": True},
196 196 "log_level": {"type": "text", "analyzer": "simple"},
197 197 "message": {"type": "text", "analyzer": "simple"},
198 198 "namespace": {
199 199 "type": "text",
200 200 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
201 201 },
202 202 "tags": {"type": "object"},
203 203 "tag_list": {"type": "text", "analyzer": "tag_value",
204 204 "fields": {
205 205 "keyword": {
206 206 "type": "keyword",
207 207 "ignore_above": 256
208 208 }
209 209 }},
210 210 },
211 211 }
212 212
213 213 report_schema = {
214 214 "template": "rcae_r_*",
215 215 "settings": {
216 216 "index": {
217 217 "refresh_interval": "5s",
218 "translog": {"sync_interval": "5s", "durability": "async"},
219 "mapping": {"single_type": True}
218 "translog": {"sync_interval": "5s", "durability": "async"}
220 219 },
221 220 "number_of_shards": 5,
222 221 "analysis": shared_analysis,
223 222 },
224 223 "mappings": {
225 224 "report": {
226 225 "_all": {"enabled": False},
227 226 "dynamic_templates": tag_templates,
228 227 "properties": {
229 228 "type": {"type": "keyword", "index": True},
230 229 # report group
231 230 "group_id": {"type": "keyword", "index": True},
232 231 "resource_id": {"type": "integer"},
233 232 "priority": {"type": "integer"},
234 233 "error": {"type": "text", "analyzer": "simple"},
235 234 "read": {"type": "boolean"},
236 235 "occurences": {"type": "integer"},
237 236 "fixed": {"type": "boolean"},
238 237 "first_timestamp": {"type": "date"},
239 238 "last_timestamp": {"type": "date"},
240 239 "average_duration": {"type": "float"},
241 240 "summed_duration": {"type": "float"},
242 241 "public": {"type": "boolean"},
243 242 # report
244 243
245 244 "report_id": {"type": "keyword", "index": True},
246 245 "http_status": {"type": "integer"},
247 246 "ip": {"type": "keyword", "index": True},
248 247 "url_domain": {"type": "text", "analyzer": "simple"},
249 248 "url_path": {"type": "text", "analyzer": "url_path"},
250 249 "report_type": {"type": "integer"},
251 250 "start_time": {"type": "date"},
252 251 "request_id": {"type": "keyword", "index": True},
253 252 "end_time": {"type": "date"},
254 253 "duration": {"type": "float"},
255 254 "tags": {"type": "object"},
256 255 "tag_list": {"type": "text", "analyzer": "tag_value",
257 256 "fields": {
258 257 "keyword": {
259 258 "type": "keyword",
260 259 "ignore_above": 256
261 260 }
262 261 }},
263 262 "extra": {"type": "object"},
264 263
265 264 # report stats
266 265
267 266 "report_stat_id": {"type": "keyword", "index": True},
268 267 "timestamp": {"type": "date"},
269 268 "permanent": {"type": "boolean"},
270 269 "log_level": {"type": "text", "analyzer": "simple"},
271 270 "message": {"type": "text", "analyzer": "simple"},
272 271 "namespace": {
273 272 "type": "text",
274 273 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
275 274 },
276 275
277 276 "join_field": {
278 277 "type": "join",
279 278 "relations": {
280 279 "report_group": ["report", "report_stat"]
281 280 }
282 281 }
283 282
284 283 },
285 284 }
286 285 }
287 286 }
288 287
289 288 Datastores.es.indices.put_template("rcae_reports", body=report_schema)
290 289
291 290 logs_mapping = copy.deepcopy(shared_log_mapping)
292 291 logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"]
293 292 del logs_mapping["properties"]["pg_id"]
294 293
295 294 log_template = {
296 295 "template": "rcae_l_*",
297 296 "settings": {
298 297 "index": {
299 298 "refresh_interval": "5s",
300 299 "translog": {"sync_interval": "5s", "durability": "async"},
301 "mapping": {"single_type": True}
302 300 },
303 301 "number_of_shards": 5,
304 302 "analysis": shared_analysis,
305 303 },
306 304 "mappings": {
307 305 "log": logs_mapping,
308 306 },
309 307 }
310 308
311 309 Datastores.es.indices.put_template("rcae_logs", body=log_template)
312 310
313 311 slow_call_mapping = copy.deepcopy(shared_log_mapping)
314 312 slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"]["pg_id"]
315 313 del slow_call_mapping["properties"]["pg_id"]
316 314
317 315 slow_call_template = {
318 316 "template": "rcae_sc_*",
319 317 "settings": {
320 318 "index": {
321 319 "refresh_interval": "5s",
322 320 "translog": {"sync_interval": "5s", "durability": "async"},
323 "mapping": {"single_type": True}
324 321 },
325 322 "number_of_shards": 5,
326 323 "analysis": shared_analysis,
327 324 },
328 325 "mappings": {
329 326 "log": slow_call_mapping,
330 327 },
331 328 }
332 329
333 330 Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template)
334 331
335 332 metric_mapping = copy.deepcopy(shared_log_mapping)
336 333 metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"]
337 334 del metric_mapping["properties"]["pg_id"]
338 335
339 336 metrics_template = {
340 337 "template": "rcae_m_*",
341 338 "settings": {
342 339 "index": {
343 340 "refresh_interval": "5s",
344 341 "translog": {"sync_interval": "5s", "durability": "async"},
345 "mapping": {"single_type": True}
346 342 },
347 343 "number_of_shards": 5,
348 344 "analysis": shared_analysis,
349 345 },
350 346 "mappings": {
351 347 "log": metric_mapping,
352 348 },
353 349 }
354 350
355 351 Datastores.es.indices.put_template("rcae_metrics", body=metrics_template)
356 352
357 353 uptime_metric_mapping = copy.deepcopy(shared_log_mapping)
358 354 uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping["properties"]["pg_id"]
359 355 del uptime_metric_mapping["properties"]["pg_id"]
360 356
361 357 uptime_metrics_template = {
362 358 "template": "rcae_uptime_ce_*",
363 359 "settings": {
364 360 "index": {
365 361 "refresh_interval": "5s",
366 362 "translog": {"sync_interval": "5s", "durability": "async"},
367 "mapping": {"single_type": True}
368 363 },
369 364 "number_of_shards": 5,
370 365 "analysis": shared_analysis,
371 366 },
372 367 "mappings": {
373 368 "log": shared_log_mapping,
374 369 },
375 370 }
376 371
377 372 Datastores.es.indices.put_template("rcae_uptime_metrics", body=uptime_metrics_template)
378 373
379 374
380 375 def reindex_reports():
381 376 reports_groups_tables = detect_tables("reports_groups_p_")
382 377 try:
383 378 Datastores.es.indices.delete("`rcae_r_*")
384 379 except elasticsearch.exceptions.NotFoundError as e:
385 380 log.error(e)
386 381
387 382 log.info("reindexing report groups")
388 383 i = 0
389 384 task_start = datetime.datetime.now()
390 385 for partition_table in reports_groups_tables:
391 386 conn = DBSession.connection().execution_options(stream_results=True)
392 387 result = conn.execute(partition_table.select())
393 388 while True:
394 389 chunk = result.fetchmany(2000)
395 390 if not chunk:
396 391 break
397 392 es_docs = defaultdict(list)
398 393 for row in chunk:
399 394 i += 1
400 395 item = ReportGroup(**dict(list(row.items())))
401 396 d_range = item.partition_id
402 397 es_docs[d_range].append(item.es_doc())
403 398 if es_docs:
404 399 name = partition_table.name
405 400 log.info("round {}, {}".format(i, name))
406 401 for k, v in es_docs.items():
407 402 to_update = {"_index": k, "_type": "report"}
408 403 [i.update(to_update) for i in v]
409 404 elasticsearch.helpers.bulk(Datastores.es, v)
410 405
411 406 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
412 407
413 408 i = 0
414 409 log.info("reindexing reports")
415 410 task_start = datetime.datetime.now()
416 411 reports_tables = detect_tables("reports_p_")
417 412 for partition_table in reports_tables:
418 413 conn = DBSession.connection().execution_options(stream_results=True)
419 414 result = conn.execute(partition_table.select())
420 415 while True:
421 416 chunk = result.fetchmany(2000)
422 417 if not chunk:
423 418 break
424 419 es_docs = defaultdict(list)
425 420 for row in chunk:
426 421 i += 1
427 422 item = Report(**dict(list(row.items())))
428 423 d_range = item.partition_id
429 424 es_docs[d_range].append(item.es_doc())
430 425 if es_docs:
431 426 name = partition_table.name
432 427 log.info("round {}, {}".format(i, name))
433 428 for k, v in es_docs.items():
434 429 to_update = {"_index": k, "_type": "report"}
435 430 [i.update(to_update) for i in v]
436 431 elasticsearch.helpers.bulk(Datastores.es, v)
437 432
438 433 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
439 434
440 435 log.info("reindexing reports stats")
441 436 i = 0
442 437 task_start = datetime.datetime.now()
443 438 reports_stats_tables = detect_tables("reports_stats_p_")
444 439 for partition_table in reports_stats_tables:
445 440 conn = DBSession.connection().execution_options(stream_results=True)
446 441 result = conn.execute(partition_table.select())
447 442 while True:
448 443 chunk = result.fetchmany(2000)
449 444 if not chunk:
450 445 break
451 446 es_docs = defaultdict(list)
452 447 for row in chunk:
453 448 rd = dict(list(row.items()))
454 449 # remove legacy columns
455 450 # TODO: remove the column later
456 451 rd.pop("size", None)
457 452 item = ReportStat(**rd)
458 453 i += 1
459 454 d_range = item.partition_id
460 455 es_docs[d_range].append(item.es_doc())
461 456 if es_docs:
462 457 name = partition_table.name
463 458 log.info("round {}, {}".format(i, name))
464 459 for k, v in es_docs.items():
465 460 to_update = {"_index": k, "_type": "report"}
466 461 [i.update(to_update) for i in v]
467 462 elasticsearch.helpers.bulk(Datastores.es, v)
468 463
469 464 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
470 465
471 466
472 467 def reindex_logs():
473 468 try:
474 469 Datastores.es.indices.delete("rcae_l_*")
475 470 except elasticsearch.exceptions.NotFoundError as e:
476 471 log.error(e)
477 472
478 473 # logs
479 474 log.info("reindexing logs")
480 475 i = 0
481 476 task_start = datetime.datetime.now()
482 477 log_tables = detect_tables("logs_p_")
483 478 for partition_table in log_tables:
484 479 conn = DBSession.connection().execution_options(stream_results=True)
485 480 result = conn.execute(partition_table.select())
486 481 while True:
487 482 chunk = result.fetchmany(2000)
488 483 if not chunk:
489 484 break
490 485 es_docs = defaultdict(list)
491 486
492 487 for row in chunk:
493 488 i += 1
494 489 item = Log(**dict(list(row.items())))
495 490 d_range = item.partition_id
496 491 es_docs[d_range].append(item.es_doc())
497 492 if es_docs:
498 493 name = partition_table.name
499 494 log.info("round {}, {}".format(i, name))
500 495 for k, v in es_docs.items():
501 496 to_update = {"_index": k, "_type": "log"}
502 497 [i.update(to_update) for i in v]
503 498 elasticsearch.helpers.bulk(Datastores.es, v)
504 499
505 500 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
506 501
507 502
508 503 def reindex_metrics():
509 504 try:
510 505 Datastores.es.indices.delete("rcae_m_*")
511 506 except elasticsearch.exceptions.NotFoundError as e:
512 507 log.error(e)
513 508
514 509 log.info("reindexing applications metrics")
515 510 i = 0
516 511 task_start = datetime.datetime.now()
517 512 metric_tables = detect_tables("metrics_p_")
518 513 for partition_table in metric_tables:
519 514 conn = DBSession.connection().execution_options(stream_results=True)
520 515 result = conn.execute(partition_table.select())
521 516 while True:
522 517 chunk = result.fetchmany(2000)
523 518 if not chunk:
524 519 break
525 520 es_docs = defaultdict(list)
526 521 for row in chunk:
527 522 i += 1
528 523 item = Metric(**dict(list(row.items())))
529 524 d_range = item.partition_id
530 525 es_docs[d_range].append(item.es_doc())
531 526 if es_docs:
532 527 name = partition_table.name
533 528 log.info("round {}, {}".format(i, name))
534 529 for k, v in es_docs.items():
535 530 to_update = {"_index": k, "_type": "log"}
536 531 [i.update(to_update) for i in v]
537 532 elasticsearch.helpers.bulk(Datastores.es, v)
538 533
539 534 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
540 535
541 536
542 537 def reindex_slow_calls():
543 538 try:
544 539 Datastores.es.indices.delete("rcae_sc_*")
545 540 except elasticsearch.exceptions.NotFoundError as e:
546 541 log.error(e)
547 542
548 543 log.info("reindexing slow calls")
549 544 i = 0
550 545 task_start = datetime.datetime.now()
551 546 slow_calls_tables = detect_tables("slow_calls_p_")
552 547 for partition_table in slow_calls_tables:
553 548 conn = DBSession.connection().execution_options(stream_results=True)
554 549 result = conn.execute(partition_table.select())
555 550 while True:
556 551 chunk = result.fetchmany(2000)
557 552 if not chunk:
558 553 break
559 554 es_docs = defaultdict(list)
560 555 for row in chunk:
561 556 i += 1
562 557 item = SlowCall(**dict(list(row.items())))
563 558 d_range = item.partition_id
564 559 es_docs[d_range].append(item.es_doc())
565 560 if es_docs:
566 561 name = partition_table.name
567 562 log.info("round {}, {}".format(i, name))
568 563 for k, v in es_docs.items():
569 564 to_update = {"_index": k, "_type": "log"}
570 565 [i.update(to_update) for i in v]
571 566 elasticsearch.helpers.bulk(Datastores.es, v)
572 567
573 568 log.info("total docs {} {}".format(i, datetime.datetime.now() - task_start))
574 569
575 570
576 571 if __name__ == "__main__":
577 572 main()
General Comments 4
Under Review
author

Auto status change to "Under Review"

Under Review
author

Auto status change to "Under Review"

You need to be logged in to leave comments. Login now