##// END OF EJS Templates
elasticsearch: move to single doctype indices
ergo -
Show More
@@ -239,7 +239,7 b' def add_reports(resource_id, request_params, dataset, **kwargs):'
239 239 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
240 240 def add_reports_es(report_group_docs, report_docs):
241 241 for k, v in report_group_docs.items():
242 to_update = {"_index": k, "_type": "report_group"}
242 to_update = {"_index": k, "_type": "report"}
243 243 [i.update(to_update) for i in v]
244 244 elasticsearch.helpers.bulk(Datastores.es, v)
245 245 for k, v in report_docs.items():
@@ -259,7 +259,7 b' def add_reports_slow_calls_es(es_docs):'
259 259 @celery.task(queue="es", default_retry_delay=600, max_retries=144)
260 260 def add_reports_stats_rows_es(es_docs):
261 261 for k, v in es_docs.items():
262 to_update = {"_index": k, "_type": "log"}
262 to_update = {"_index": k, "_type": "report"}
263 263 [i.update(to_update) for i in v]
264 264 elasticsearch.helpers.bulk(Datastores.es, v)
265 265
@@ -287,7 +287,7 b' def add_logs(resource_id, request_params, dataset, **kwargs):'
287 287 if entry["primary_key"] is None:
288 288 es_docs[log_entry.partition_id].append(log_entry.es_doc())
289 289
290 # 2nd pass to delete all log entries from db foe same pk/ns pair
290 # 2nd pass to delete all log entries from db for same pk/ns pair
291 291 if ns_pairs:
292 292 ids_to_delete = []
293 293 es_docs = collections.defaultdict(list)
@@ -112,7 +112,7 b' class Log(Base, BaseModel):'
112 112 else None,
113 113 }
114 114 return {
115 "pg_id": str(self.log_id),
115 "log_id": str(self.log_id),
116 116 "delete_hash": self.delete_hash,
117 117 "resource_id": self.resource_id,
118 118 "request_id": self.request_id,
@@ -60,6 +60,7 b' class Metric(Base, BaseModel):'
60 60 }
61 61
62 62 return {
63 "metric_id": self.pkey,
63 64 "resource_id": self.resource_id,
64 65 "timestamp": self.timestamp,
65 66 "namespace": self.namespace,
@@ -314,7 +314,7 b' class Report(Base, BaseModel):'
314 314 "bool": {
315 315 "filter": [
316 316 {"term": {"group_id": self.group_id}},
317 {"range": {"pg_id": {"lt": self.id}}},
317 {"range": {"report_id": {"lt": self.id}}},
318 318 ]
319 319 }
320 320 },
@@ -324,7 +324,7 b' class Report(Base, BaseModel):'
324 324 body=query, index=self.partition_id, doc_type="report"
325 325 )
326 326 if result["hits"]["total"]:
327 return result["hits"]["hits"][0]["_source"]["pg_id"]
327 return result["hits"]["hits"][0]["_source"]["report_id"]
328 328
329 329 def get_next_in_group(self, request):
330 330 query = {
@@ -333,7 +333,7 b' class Report(Base, BaseModel):'
333 333 "bool": {
334 334 "filter": [
335 335 {"term": {"group_id": self.group_id}},
336 {"range": {"pg_id": {"gt": self.id}}},
336 {"range": {"report_id": {"gt": self.id}}},
337 337 ]
338 338 }
339 339 },
@@ -343,7 +343,7 b' class Report(Base, BaseModel):'
343 343 body=query, index=self.partition_id, doc_type="report"
344 344 )
345 345 if result["hits"]["total"]:
346 return result["hits"]["hits"][0]["_source"]["pg_id"]
346 return result["hits"]["hits"][0]["_source"]["report_id"]
347 347
348 348 def get_public_url(self, request=None, report_group=None, _app_url=None):
349 349 """
@@ -469,7 +469,7 b' class Report(Base, BaseModel):'
469 469 tags["user_name"] = {"value": [self.username], "numeric_value": None}
470 470 return {
471 471 "_id": str(self.id),
472 "pg_id": str(self.id),
472 "report_id": str(self.id),
473 473 "resource_id": self.resource_id,
474 474 "http_status": self.http_status or "",
475 475 "start_time": self.start_time,
@@ -482,9 +482,14 b' class Report(Base, BaseModel):'
482 482 "request_id": self.request_id,
483 483 "ip": self.ip,
484 484 "group_id": str(self.group_id),
485 "_parent": str(self.group_id),
485 "type": "report",
486 "join_field": {
487 "name": "report",
488 "parent": str(self.group_id)
489 },
486 490 "tags": tags,
487 491 "tag_list": tag_list,
492 "_routing": str(self.group_id)
488 493 }
489 494
490 495 @property
@@ -518,7 +523,7 b' def after_update(mapper, connection, target):'
518 523
519 524 def after_delete(mapper, connection, target):
520 525 if not hasattr(target, "_skip_ft_index"):
521 query = {"query": {"term": {"pg_id": target.id}}}
526 query = {"query": {"term": {"report_id": target.id}}}
522 527 Datastores.es.delete_by_query(
523 528 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
524 529 )
@@ -178,7 +178,7 b' class ReportGroup(Base, BaseModel):'
178 178 def es_doc(self):
179 179 return {
180 180 "_id": str(self.id),
181 "pg_id": str(self.id),
181 "group_id": str(self.id),
182 182 "resource_id": self.resource_id,
183 183 "error": self.error,
184 184 "fixed": self.fixed,
@@ -190,6 +190,10 b' class ReportGroup(Base, BaseModel):'
190 190 "summed_duration": self.summed_duration,
191 191 "first_timestamp": self.first_timestamp,
192 192 "last_timestamp": self.last_timestamp,
193 "type": "report_group",
194 "join_field": {
195 "name": "report_group"
196 },
193 197 }
194 198
195 199 def set_notification_info(self, notify_10=False, notify_100=False):
@@ -258,14 +262,14 b' def after_insert(mapper, connection, target):'
258 262 if not hasattr(target, "_skip_ft_index"):
259 263 data = target.es_doc()
260 264 data.pop("_id", None)
261 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
265 Datastores.es.index(target.partition_id, "report", data, id=target.id)
262 266
263 267
264 268 def after_update(mapper, connection, target):
265 269 if not hasattr(target, "_skip_ft_index"):
266 270 data = target.es_doc()
267 271 data.pop("_id", None)
268 Datastores.es.index(target.partition_id, "report_group", data, id=target.id)
272 Datastores.es.index(target.partition_id, "report", data, id=target.id)
269 273
270 274
271 275 def after_delete(mapper, connection, target):
@@ -274,10 +278,6 b' def after_delete(mapper, connection, target):'
274 278 Datastores.es.delete_by_query(
275 279 index=target.partition_id, doc_type="report", body=query, conflicts="proceed"
276 280 )
277 query = {"query": {"term": {"pg_id": target.id}}}
278 Datastores.es.delete_by_query(
279 index=target.partition_id, doc_type="report_group", body=query, conflicts="proceed"
280 )
281 281
282 282
283 283 sa.event.listen(ReportGroup, "after_insert", after_insert)
@@ -48,12 +48,13 b' class ReportStat(Base, BaseModel):'
48 48 return {
49 49 "resource_id": self.resource_id,
50 50 "timestamp": self.start_interval,
51 "pg_id": str(self.id),
51 "report_stat_id": str(self.id),
52 52 "permanent": True,
53 53 "request_id": None,
54 54 "log_level": "ERROR",
55 55 "message": None,
56 56 "namespace": "appenlight.error",
57 "group_id": str(self.group_id),
57 58 "tags": {
58 59 "duration": {"values": self.duration, "numeric_values": self.duration},
59 60 "occurences": {
@@ -76,4 +77,5 b' class ReportStat(Base, BaseModel):'
76 77 "server_name",
77 78 "view_name",
78 79 ],
80 "type": "report_stat",
79 81 }
@@ -190,7 +190,7 b' class LogService(BaseService):'
190 190 [], item_count=item_count, items_per_page=items_per_page, **filter_settings
191 191 )
192 192 ordered_ids = tuple(
193 item["_source"]["pg_id"] for item in results.get("hits", [])
193 item["_source"]["log_id"] for item in results.get("hits", [])
194 194 )
195 195
196 196 sorted_instance_list = []
@@ -97,7 +97,7 b' class ReportGroupService(BaseService):'
97 97 es_query["query"]["bool"]["filter"].extend(tags)
98 98
99 99 result = Datastores.es.search(
100 body=es_query, index=index_names, doc_type="log", size=0
100 body=es_query, index=index_names, doc_type="report", size=0
101 101 )
102 102 series = []
103 103 for bucket in result["aggregations"]["parent_agg"]["buckets"]:
@@ -143,7 +143,7 b' class ReportGroupService(BaseService):'
143 143 "top_groups": {
144 144 "terms": {
145 145 "size": 5000,
146 "field": "_parent#report_group",
146 "field": "join_field#report_group",
147 147 "order": {"newest": "desc"},
148 148 },
149 149 "aggs": {
@@ -315,7 +315,7 b' class ReportGroupService(BaseService):'
315 315 ordered_ids = []
316 316 if results:
317 317 for item in results["top_groups"]["buckets"]:
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["pg_id"]
318 pg_id = item["top_reports_hits"]["hits"]["hits"][0]["_source"]["report_id"]
319 319 ordered_ids.append(pg_id)
320 320 log.info(filter_settings)
321 321 paginator = paginate.Page(
@@ -340,7 +340,7 b' class RequestMetricService(BaseService):'
340 340 for hit in bucket["top_calls_hits"]["hits"]["hits"]:
341 341 details[bucket["key"]].append(
342 342 {
343 "report_id": hit["_source"]["pg_id"],
343 "report_id": hit["_source"]["request_metric_id"],
344 344 "group_id": hit["_source"]["group_id"],
345 345 }
346 346 )
@@ -88,7 +88,7 b' class SlowCall(Base, BaseModel):'
88 88 doc = {
89 89 "resource_id": self.resource_id,
90 90 "timestamp": self.timestamp,
91 "pg_id": str(self.id),
91 "slow_call_id": str(self.id),
92 92 "permanent": False,
93 93 "request_id": None,
94 94 "log_level": "UNKNOWN",
@@ -17,6 +17,7 b''
17 17 import argparse
18 18 import datetime
19 19 import logging
20 import copy
20 21
21 22 import sqlalchemy as sa
22 23 import elasticsearch.exceptions
@@ -34,7 +35,6 b' from appenlight.models.log import Log'
34 35 from appenlight.models.slow_call import SlowCall
35 36 from appenlight.models.metric import Metric
36 37
37
38 38 log = logging.getLogger(__name__)
39 39
40 40 tables = {
@@ -128,7 +128,20 b' def main():'
128 128
129 129 def update_template():
130 130 try:
131 Datastores.es.indices.delete_template("rcae")
131 Datastores.es.indices.delete_template("rcae_reports")
132 except elasticsearch.exceptions.NotFoundError as e:
133 log.error(e)
134
135 try:
136 Datastores.es.indices.delete_template("rcae_logs")
137 except elasticsearch.exceptions.NotFoundError as e:
138 log.error(e)
139 try:
140 Datastores.es.indices.delete_template("rcae_slow_calls")
141 except elasticsearch.exceptions.NotFoundError as e:
142 log.error(e)
143 try:
144 Datastores.es.indices.delete_template("rcae_metrics")
132 145 except elasticsearch.exceptions.NotFoundError as e:
133 146 log.error(e)
134 147 log.info("updating elasticsearch template")
@@ -153,15 +166,7 b' def update_template():'
153 166 }
154 167 ]
155 168
156 template_schema = {
157 "template": "rcae_*",
158 "settings": {
159 "index": {
160 "refresh_interval": "5s",
161 "translog": {"sync_interval": "5s", "durability": "async"},
162 },
163 "number_of_shards": 5,
164 "analysis": {
169 shared_analysis = {
165 170 "analyzer": {
166 171 "url_path": {
167 172 "type": "custom",
@@ -176,14 +181,54 b' def update_template():'
176 181 "filter": ["lowercase"],
177 182 },
178 183 }
184 }
185
186 shared_log_mapping = {
187 "_all": {"enabled": False},
188 "dynamic_templates": tag_templates,
189 "properties": {
190 "pg_id": {"type": "keyword", "index": True},
191 "delete_hash": {"type": "keyword", "index": True},
192 "resource_id": {"type": "integer"},
193 "timestamp": {"type": "date"},
194 "permanent": {"type": "boolean"},
195 "request_id": {"type": "keyword", "index": True},
196 "log_level": {"type": "text", "analyzer": "simple"},
197 "message": {"type": "text", "analyzer": "simple"},
198 "namespace": {
199 "type": "text",
200 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
179 201 },
202 "tags": {"type": "object"},
203 "tag_list": {"type": "text", "analyzer": "tag_value",
204 "fields": {
205 "keyword": {
206 "type": "keyword",
207 "ignore_above": 256
208 }
209 }},
210 },
211 }
212
213 report_schema = {
214 "template": "rcae_r_*",
215 "settings": {
216 "index": {
217 "refresh_interval": "5s",
218 "translog": {"sync_interval": "5s", "durability": "async"},
219 "mapping": {"single_type": True}
220 },
221 "number_of_shards": 5,
222 "analysis": shared_analysis,
180 223 },
181 224 "mappings": {
182 "report_group": {
225 "report": {
183 226 "_all": {"enabled": False},
184 227 "dynamic_templates": tag_templates,
185 228 "properties": {
186 "pg_id": {"type": "keyword", "index": True},
229 "type": {"type": "keyword", "index": True},
230 # report group
231 "group_id": {"type": "keyword", "index": True},
187 232 "resource_id": {"type": "integer"},
188 233 "priority": {"type": "integer"},
189 234 "error": {"type": "text", "analyzer": "simple"},
@@ -195,20 +240,13 b' def update_template():'
195 240 "average_duration": {"type": "float"},
196 241 "summed_duration": {"type": "float"},
197 242 "public": {"type": "boolean"},
198 },
199 },
200 "report": {
201 "_all": {"enabled": False},
202 "dynamic_templates": tag_templates,
203 "properties": {
204 "pg_id": {"type": "keyword", "index": True},
205 "resource_id": {"type": "integer"},
206 "group_id": {"type": "keyword"},
243 # report
244
245 "report_id": {"type": "keyword", "index": True},
207 246 "http_status": {"type": "integer"},
208 247 "ip": {"type": "keyword", "index": True},
209 248 "url_domain": {"type": "text", "analyzer": "simple"},
210 249 "url_path": {"type": "text", "analyzer": "url_path"},
211 "error": {"type": "text", "analyzer": "simple"},
212 250 "report_type": {"type": "integer"},
213 251 "start_time": {"type": "date"},
214 252 "request_id": {"type": "keyword", "index": True},
@@ -223,45 +261,126 b' def update_template():'
223 261 }
224 262 }},
225 263 "extra": {"type": "object"},
226 },
227 "_parent": {"type": "report_group"},
228 },
229 "log": {
230 "_all": {"enabled": False},
231 "dynamic_templates": tag_templates,
232 "properties": {
233 "pg_id": {"type": "keyword", "index": True},
234 "delete_hash": {"type": "keyword", "index": True},
235 "resource_id": {"type": "integer"},
264
265 # report stats
266
267 "report_stat_id": {"type": "keyword", "index": True},
236 268 "timestamp": {"type": "date"},
237 269 "permanent": {"type": "boolean"},
238 "request_id": {"type": "keyword", "index": True},
239 270 "log_level": {"type": "text", "analyzer": "simple"},
240 271 "message": {"type": "text", "analyzer": "simple"},
241 272 "namespace": {
242 273 "type": "text",
243 274 "fields": {"keyword": {"type": "keyword", "ignore_above": 256}},
244 275 },
245 "tags": {"type": "object"},
246 "tag_list": {"type": "text", "analyzer": "tag_value",
247 "fields": {
248 "keyword": {
249 "type": "keyword",
250 "ignore_above": 256
276
277 "join_field": {
278 "type": "join",
279 "relations": {
280 "report_group": ["report", "report_stat"]
251 281 }
252 }},
282 }
283
253 284 },
285 }
286 }
287 }
288
289 Datastores.es.indices.put_template("rcae_reports", body=report_schema)
290
291 logs_mapping = copy.deepcopy(shared_log_mapping)
292 logs_mapping["properties"]["log_id"] = logs_mapping["properties"]["pg_id"]
293 del logs_mapping["properties"]["pg_id"]
294
295 log_template = {
296 "template": "rcae_l_*",
297 "settings": {
298 "index": {
299 "refresh_interval": "5s",
300 "translog": {"sync_interval": "5s", "durability": "async"},
301 "mapping": {"single_type": True}
302 },
303 "number_of_shards": 5,
304 "analysis": shared_analysis,
254 305 },
306 "mappings": {
307 "log": logs_mapping,
255 308 },
256 309 }
257 310
258 Datastores.es.indices.put_template("rcae", body=template_schema)
311 Datastores.es.indices.put_template("rcae_logs", body=log_template)
312
313 slow_call_mapping = copy.deepcopy(shared_log_mapping)
314 slow_call_mapping["properties"]["slow_call_id"] = slow_call_mapping["properties"]["pg_id"]
315 del slow_call_mapping["properties"]["pg_id"]
316
317 slow_call_template = {
318 "template": "rcae_sc_*",
319 "settings": {
320 "index": {
321 "refresh_interval": "5s",
322 "translog": {"sync_interval": "5s", "durability": "async"},
323 "mapping": {"single_type": True}
324 },
325 "number_of_shards": 5,
326 "analysis": shared_analysis,
327 },
328 "mappings": {
329 "log": slow_call_mapping,
330 },
331 }
332
333 Datastores.es.indices.put_template("rcae_slow_calls", body=slow_call_template)
334
335 metric_mapping = copy.deepcopy(shared_log_mapping)
336 metric_mapping["properties"]["metric_id"] = metric_mapping["properties"]["pg_id"]
337 del metric_mapping["properties"]["pg_id"]
338
339 metrics_template = {
340 "template": "rcae_m_*",
341 "settings": {
342 "index": {
343 "refresh_interval": "5s",
344 "translog": {"sync_interval": "5s", "durability": "async"},
345 "mapping": {"single_type": True}
346 },
347 "number_of_shards": 5,
348 "analysis": shared_analysis,
349 },
350 "mappings": {
351 "log": metric_mapping,
352 },
353 }
354
355 Datastores.es.indices.put_template("rcae_metrics", body=metrics_template)
356
357 uptime_metric_mapping = copy.deepcopy(shared_log_mapping)
358 uptime_metric_mapping["properties"]["uptime_id"] = uptime_metric_mapping["properties"]["pg_id"]
359 del uptime_metric_mapping["properties"]["pg_id"]
360
361 uptime_metrics_template = {
362 "template": "rcae_uptime_ce_*",
363 "settings": {
364 "index": {
365 "refresh_interval": "5s",
366 "translog": {"sync_interval": "5s", "durability": "async"},
367 "mapping": {"single_type": True}
368 },
369 "number_of_shards": 5,
370 "analysis": shared_analysis,
371 },
372 "mappings": {
373 "log": shared_log_mapping,
374 },
375 }
376
377 Datastores.es.indices.put_template("rcae_uptime_metrics", body=uptime_metrics_template)
259 378
260 379
261 380 def reindex_reports():
262 381 reports_groups_tables = detect_tables("reports_groups_p_")
263 382 try:
264 Datastores.es.indices.delete("rcae_r*")
383 Datastores.es.indices.delete("`rcae_r_*")
265 384 except elasticsearch.exceptions.NotFoundError as e:
266 385 log.error(e)
267 386
@@ -285,7 +404,7 b' def reindex_reports():'
285 404 name = partition_table.name
286 405 log.info("round {}, {}".format(i, name))
287 406 for k, v in es_docs.items():
288 to_update = {"_index": k, "_type": "report_group"}
407 to_update = {"_index": k, "_type": "report"}
289 408 [i.update(to_update) for i in v]
290 409 elasticsearch.helpers.bulk(Datastores.es, v)
291 410
@@ -343,7 +462,7 b' def reindex_reports():'
343 462 name = partition_table.name
344 463 log.info("round {}, {}".format(i, name))
345 464 for k, v in es_docs.items():
346 to_update = {"_index": k, "_type": "log"}
465 to_update = {"_index": k, "_type": "report"}
347 466 [i.update(to_update) for i in v]
348 467 elasticsearch.helpers.bulk(Datastores.es, v)
349 468
@@ -352,7 +471,7 b' def reindex_reports():'
352 471
353 472 def reindex_logs():
354 473 try:
355 Datastores.es.indices.delete("rcae_l*")
474 Datastores.es.indices.delete("rcae_l_*")
356 475 except elasticsearch.exceptions.NotFoundError as e:
357 476 log.error(e)
358 477
@@ -388,7 +507,7 b' def reindex_logs():'
388 507
389 508 def reindex_metrics():
390 509 try:
391 Datastores.es.indices.delete("rcae_m*")
510 Datastores.es.indices.delete("rcae_m_*")
392 511 except elasticsearch.exceptions.NotFoundError as e:
393 512 log.error(e)
394 513
@@ -422,7 +541,7 b' def reindex_metrics():'
422 541
423 542 def reindex_slow_calls():
424 543 try:
425 Datastores.es.indices.delete("rcae_sc*")
544 Datastores.es.indices.delete("rcae_sc_*")
426 545 except elasticsearch.exceptions.NotFoundError as e:
427 546 log.error(e)
428 547
General Comments 4
Under Review
author

Auto status change to "Under Review"

Under Review
author

Auto status change to "Under Review"

You need to be logged in to leave comments. Login now