##// END OF EJS Templates
utils: added get_es_info function
ergo -
Show More
@@ -1,548 +1,558 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 4 #
5 5 # Licensed under the Apache License, Version 2.0 (the "License");
6 6 # you may not use this file except in compliance with the License.
7 7 # You may obtain a copy of the License at
8 8 #
9 9 # http://www.apache.org/licenses/LICENSE-2.0
10 10 #
11 11 # Unless required by applicable law or agreed to in writing, software
12 12 # distributed under the License is distributed on an "AS IS" BASIS,
13 13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 14 # See the License for the specific language governing permissions and
15 15 # limitations under the License.
16 16
17 17 """
18 18 Utility functions.
19 19 """
20 20 import logging
21 21 import requests
22 22 import hashlib
23 23 import json
24 24 import copy
25 25 import uuid
26 26 import appenlight.lib.helpers as h
27 27 from collections import namedtuple
28 28 from datetime import timedelta, datetime, date
29 29 from dogpile.cache.api import NO_VALUE
30 30 from appenlight.models import Datastores
31 31 from appenlight.validators import LogSearchSchema, TagListSchema, accepted_search_params
32 32 from itsdangerous import TimestampSigner
33 33 from ziggurat_foundations.permissions import ALL_PERMISSIONS
34 34 from ziggurat_foundations.models.services.user import UserService
35 35 from dateutil.relativedelta import relativedelta
36 36 from dateutil.rrule import rrule, MONTHLY, DAILY
37 37
38 38 log = logging.getLogger(__name__)
39 39
40 40
41 41 Stat = namedtuple("Stat", "start_interval value")
42 42
43 43
44 44 def default_extractor(item):
45 45 """
46 46 :param item - item to extract date from
47 47 """
48 48 if hasattr(item, "start_interval"):
49 49 return item.start_interval
50 50 return item["start_interval"]
51 51
52 52
53 53 # fast gap generator
54 54 def gap_gen_default(start, step, itemiterator, end_time=None, iv_extractor=None):
55 55 """ generates a list of time/value items based on step and itemiterator
56 56 if there are entries missing from iterator time/None will be returned
57 57 instead
58 58 :param start - datetime - what time should we start generating our values
59 59 :param step - timedelta - stepsize
60 60 :param itemiterator - iterable - we will check this iterable for values
61 61 corresponding to generated steps
62 62 :param end_time - datetime - when last step is >= end_time stop iterating
63 63 :param iv_extractor - extracts current step from iterable items
64 64 """
65 65
66 66 if not iv_extractor:
67 67 iv_extractor = default_extractor
68 68
69 69 next_step = start
70 70 minutes = step.total_seconds() / 60.0
71 71 while next_step.minute % minutes != 0:
72 72 next_step = next_step.replace(minute=next_step.minute - 1)
73 73 for item in itemiterator:
74 74 item_start_interval = iv_extractor(item)
75 75 # do we have a match for current time step in our data?
76 76 # no gen a new tuple with 0 values
77 77 while next_step < item_start_interval:
78 78 yield Stat(next_step, None)
79 79 next_step = next_step + step
80 80 if next_step == item_start_interval:
81 81 yield Stat(item_start_interval, item)
82 82 next_step = next_step + step
83 83 if end_time:
84 84 while next_step < end_time:
85 85 yield Stat(next_step, None)
86 86 next_step = next_step + step
87 87
88 88
89 89 class DateTimeEncoder(json.JSONEncoder):
90 90 """ Simple datetime to ISO encoder for json serialization"""
91 91
92 92 def default(self, obj):
93 93 if isinstance(obj, date):
94 94 return obj.isoformat()
95 95 if isinstance(obj, datetime):
96 96 return obj.isoformat()
97 97 return json.JSONEncoder.default(self, obj)
98 98
99 99
100 100 def channelstream_request(
101 101 secret, endpoint, payload, throw_exceptions=False, servers=None
102 102 ):
103 103 responses = []
104 104 if not servers:
105 105 servers = []
106 106
107 107 signer = TimestampSigner(secret)
108 108 sig_for_server = signer.sign(endpoint)
109 109 for secret, server in [(s["secret"], s["server"]) for s in servers]:
110 110 response = {}
111 111 secret_headers = {
112 112 "x-channelstream-secret": sig_for_server,
113 113 "x-channelstream-endpoint": endpoint,
114 114 "Content-Type": "application/json",
115 115 }
116 116 url = "%s%s" % (server, endpoint)
117 117 try:
118 118 response = requests.post(
119 119 url,
120 120 data=json.dumps(payload, cls=DateTimeEncoder),
121 121 headers=secret_headers,
122 122 verify=False,
123 123 timeout=2,
124 124 ).json()
125 125 except requests.exceptions.RequestException as e:
126 126 if throw_exceptions:
127 127 raise
128 128 responses.append(response)
129 129 return responses
130 130
131 131
132 132 def add_cors_headers(response):
133 133 # allow CORS
134 134 response.headers.add("Access-Control-Allow-Origin", "*")
135 135 response.headers.add("XDomainRequestAllowed", "1")
136 136 response.headers.add("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
137 137 # response.headers.add('Access-Control-Allow-Credentials', 'true')
138 138 response.headers.add(
139 139 "Access-Control-Allow-Headers",
140 140 "Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie",
141 141 )
142 142 response.headers.add("Access-Control-Max-Age", "86400")
143 143
144 144
145 145 from sqlalchemy.sql import compiler
146 146 from psycopg2.extensions import adapt as sqlescape
147 147
148 148
149 149 # or use the appropiate escape function from your db driver
150 150
151 151
152 152 def compile_query(query):
153 153 dialect = query.session.bind.dialect
154 154 statement = query.statement
155 155 comp = compiler.SQLCompiler(dialect, statement)
156 156 comp.compile()
157 157 enc = dialect.encoding
158 158 params = {}
159 159 for k, v in comp.params.items():
160 160 if isinstance(v, str):
161 161 v = v.encode(enc)
162 162 params[k] = sqlescape(v)
163 163 return (comp.string.encode(enc) % params).decode(enc)
164 164
165 165
166 166 def convert_es_type(input_data):
167 167 """
168 168 This might need to convert some text or other types to corresponding ES types
169 169 """
170 170 return str(input_data)
171 171
172 172
173 173 ProtoVersion = namedtuple("ProtoVersion", ["major", "minor", "patch"])
174 174
175 175
176 176 def parse_proto(input_data):
177 177 try:
178 178 parts = [int(x) for x in input_data.split(".")]
179 179 while len(parts) < 3:
180 180 parts.append(0)
181 181 return ProtoVersion(*parts)
182 182 except Exception as e:
183 183 log.info("Unknown protocol version: %s" % e)
184 184 return ProtoVersion(99, 99, 99)
185 185
186 186
187 187 def es_index_name_limiter(
188 188 start_date=None, end_date=None, months_in_past=6, ixtypes=None
189 189 ):
190 190 """
191 191 This function limits the search to 6 months by default so we don't have to
192 192 query 300 elasticsearch indices for 20 years of historical data for example
193 193 """
194 194
195 195 # should be cached later
196 196 def get_possible_names():
197 197 return list(Datastores.es.indices.get_alias("*"))
198 198
199 199 possible_names = get_possible_names()
200 200 es_index_types = []
201 201 if not ixtypes:
202 202 ixtypes = ["reports", "metrics", "logs"]
203 203 for t in ixtypes:
204 204 if t == "reports":
205 205 es_index_types.append("rcae_r_%s")
206 206 elif t == "logs":
207 207 es_index_types.append("rcae_l_%s")
208 208 elif t == "metrics":
209 209 es_index_types.append("rcae_m_%s")
210 210 elif t == "uptime":
211 211 es_index_types.append("rcae_u_%s")
212 212 elif t == "slow_calls":
213 213 es_index_types.append("rcae_sc_%s")
214 214
215 215 if start_date:
216 216 start_date = copy.copy(start_date)
217 217 else:
218 218 if not end_date:
219 219 end_date = datetime.utcnow()
220 220 start_date = end_date + relativedelta(months=months_in_past * -1)
221 221
222 222 if not end_date:
223 223 end_date = start_date + relativedelta(months=months_in_past)
224 224
225 225 index_dates = list(
226 226 rrule(
227 227 MONTHLY,
228 228 dtstart=start_date.date().replace(day=1),
229 229 until=end_date.date(),
230 230 count=36,
231 231 )
232 232 )
233 233 index_names = []
234 234 for ix_type in es_index_types:
235 235 to_extend = [
236 236 ix_type % d.strftime("%Y_%m")
237 237 for d in index_dates
238 238 if ix_type % d.strftime("%Y_%m") in possible_names
239 239 ]
240 240 index_names.extend(to_extend)
241 241 for day in list(
242 242 rrule(DAILY, dtstart=start_date.date(), until=end_date.date(), count=366)
243 243 ):
244 244 ix_name = ix_type % day.strftime("%Y_%m_%d")
245 245 if ix_name in possible_names:
246 246 index_names.append(ix_name)
247 247 return index_names
248 248
249 249
250 250 def build_filter_settings_from_query_dict(
251 251 request, params=None, override_app_ids=None, resource_permissions=None
252 252 ):
253 253 """
254 254 Builds list of normalized search terms for ES from query params
255 255 ensuring application list is restricted to only applications user
256 256 has access to
257 257
258 258 :param params (dictionary)
259 259 :param override_app_ids - list of application id's to use instead of
260 260 applications user normally has access to
261 261 """
262 262 params = copy.deepcopy(params)
263 263 applications = []
264 264 if not resource_permissions:
265 265 resource_permissions = ["view"]
266 266
267 267 if request.user:
268 268 applications = UserService.resources_with_perms(
269 269 request.user, resource_permissions, resource_types=["application"]
270 270 )
271 271
272 272 # CRITICAL - this ensures our resultset is limited to only the ones
273 273 # user has view permissions
274 274 all_possible_app_ids = set([app.resource_id for app in applications])
275 275
276 276 # if override is preset we force permission for app to be present
277 277 # this allows users to see dashboards and applications they would
278 278 # normally not be able to
279 279
280 280 if override_app_ids:
281 281 all_possible_app_ids = set(override_app_ids)
282 282
283 283 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
284 284 tag_schema = TagListSchema()
285 285 filter_settings = schema.deserialize(params)
286 286 tag_list = []
287 287 for k, v in list(filter_settings.items()):
288 288 if k in accepted_search_params:
289 289 continue
290 290 tag_list.append({"name": k, "value": v, "op": "eq"})
291 291 # remove the key from filter_settings
292 292 filter_settings.pop(k, None)
293 293 tags = tag_schema.deserialize(tag_list)
294 294 filter_settings["tags"] = tags
295 295 return filter_settings
296 296
297 297
298 298 def gen_uuid():
299 299 return str(uuid.uuid4())
300 300
301 301
302 302 def gen_uuid4_sha_hex():
303 303 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
304 304
305 305
306 306 def permission_tuple_to_dict(data):
307 307 out = {
308 308 "user_name": None,
309 309 "perm_name": data.perm_name,
310 310 "owner": data.owner,
311 311 "type": data.type,
312 312 "resource_name": None,
313 313 "resource_type": None,
314 314 "resource_id": None,
315 315 "group_name": None,
316 316 "group_id": None,
317 317 }
318 318 if data.user:
319 319 out["user_name"] = data.user.user_name
320 320 if data.perm_name == ALL_PERMISSIONS:
321 321 out["perm_name"] = "__all_permissions__"
322 322 if data.resource:
323 323 out["resource_name"] = data.resource.resource_name
324 324 out["resource_type"] = data.resource.resource_type
325 325 out["resource_id"] = data.resource.resource_id
326 326 if data.group:
327 327 out["group_name"] = data.group.group_name
328 328 out["group_id"] = data.group.id
329 329 return out
330 330
331 331
332 332 def get_cached_buckets(
333 333 request,
334 334 stats_since,
335 335 end_time,
336 336 fn,
337 337 cache_key,
338 338 gap_gen=None,
339 339 db_session=None,
340 340 step_interval=None,
341 341 iv_extractor=None,
342 342 rerange=False,
343 343 *args,
344 344 **kwargs
345 345 ):
346 346 """ Takes "fn" that should return some data and tries to load the data
347 347 dividing it into daily buckets - if the stats_since and end time give a
348 348 delta bigger than 24hours, then only "todays" data is computed on the fly
349 349
350 350 :param request: (request) request object
351 351 :param stats_since: (datetime) start date of buckets range
352 352 :param end_time: (datetime) end date of buckets range - utcnow() if None
353 353 :param fn: (callable) callable to use to populate buckets should have
354 354 following signature:
355 355 def get_data(request, since_when, until, *args, **kwargs):
356 356
357 357 :param cache_key: (string) cache key that will be used to build bucket
358 358 caches
359 359 :param gap_gen: (callable) gap generator - should return step intervals
360 360 to use with out `fn` callable
361 361 :param db_session: (Session) sqlalchemy session
362 362 :param step_interval: (timedelta) optional step interval if we want to
363 363 override the default determined from total start/end time delta
364 364 :param iv_extractor: (callable) used to get step intervals from data
365 365 returned by `fn` callable
366 366 :param rerange: (bool) handy if we want to change ranges from hours to
367 367 days when cached data is missing - will shorten execution time if `fn`
368 368 callable supports that and we are working with multiple rows - like metrics
369 369 :param args:
370 370 :param kwargs:
371 371
372 372 :return: iterable
373 373 """
374 374 if not end_time:
375 375 end_time = datetime.utcnow().replace(second=0, microsecond=0)
376 376 delta = end_time - stats_since
377 377 # if smaller than 3 days we want to group by 5min else by 1h,
378 378 # for 60 min group by min
379 379 if not gap_gen:
380 380 gap_gen = gap_gen_default
381 381 if not iv_extractor:
382 382 iv_extractor = default_extractor
383 383
384 384 # do not use custom interval if total time range with new iv would exceed
385 385 # end time
386 386 if not step_interval or stats_since + step_interval >= end_time:
387 387 if delta < h.time_deltas.get("12h")["delta"]:
388 388 step_interval = timedelta(seconds=60)
389 389 elif delta < h.time_deltas.get("3d")["delta"]:
390 390 step_interval = timedelta(seconds=60 * 5)
391 391 elif delta > h.time_deltas.get("2w")["delta"]:
392 392 step_interval = timedelta(days=1)
393 393 else:
394 394 step_interval = timedelta(minutes=60)
395 395
396 396 if step_interval >= timedelta(minutes=60):
397 397 log.info(
398 398 "cached_buckets:{}: adjusting start time "
399 399 "for hourly or daily intervals".format(cache_key)
400 400 )
401 401 stats_since = stats_since.replace(hour=0, minute=0)
402 402
403 403 ranges = [
404 404 i.start_interval
405 405 for i in list(gap_gen(stats_since, step_interval, [], end_time=end_time))
406 406 ]
407 407 buckets = {}
408 408 storage_key = "buckets:" + cache_key + "{}|{}"
409 409 # this means we basicly cache per hour in 3-14 day intervals but i think
410 410 # its fine at this point - will be faster than db access anyways
411 411
412 412 if len(ranges) >= 1:
413 413 last_ranges = [ranges[-1]]
414 414 else:
415 415 last_ranges = []
416 416 if step_interval >= timedelta(minutes=60):
417 417 for r in ranges:
418 418 k = storage_key.format(step_interval.total_seconds(), r)
419 419 value = request.registry.cache_regions.redis_day_30.get(k)
420 420 # last buckets are never loaded from cache
421 421 is_last_result = r >= end_time - timedelta(hours=6) or r in last_ranges
422 422 if value is not NO_VALUE and not is_last_result:
423 423 log.info(
424 424 "cached_buckets:{}: "
425 425 "loading range {} from cache".format(cache_key, r)
426 426 )
427 427 buckets[r] = value
428 428 else:
429 429 log.info(
430 430 "cached_buckets:{}: "
431 431 "loading range {} from storage".format(cache_key, r)
432 432 )
433 433 range_size = step_interval
434 434 if (
435 435 step_interval == timedelta(minutes=60)
436 436 and not is_last_result
437 437 and rerange
438 438 ):
439 439 range_size = timedelta(days=1)
440 440 r = r.replace(hour=0, minute=0)
441 441 log.info(
442 442 "cached_buckets:{}: "
443 443 "loading collapsed "
444 444 "range {} {}".format(cache_key, r, r + range_size)
445 445 )
446 446 bucket_data = fn(
447 447 request,
448 448 r,
449 449 r + range_size,
450 450 step_interval,
451 451 gap_gen,
452 452 bucket_count=len(ranges),
453 453 *args,
454 454 **kwargs
455 455 )
456 456 for b in bucket_data:
457 457 b_iv = iv_extractor(b)
458 458 buckets[b_iv] = b
459 459 k2 = storage_key.format(step_interval.total_seconds(), b_iv)
460 460 request.registry.cache_regions.redis_day_30.set(k2, b)
461 461 log.info("cached_buckets:{}: saving cache".format(cache_key))
462 462 else:
463 463 # bucket count is 1 for short time ranges <= 24h from now
464 464 bucket_data = fn(
465 465 request,
466 466 stats_since,
467 467 end_time,
468 468 step_interval,
469 469 gap_gen,
470 470 bucket_count=1,
471 471 *args,
472 472 **kwargs
473 473 )
474 474 for b in bucket_data:
475 475 buckets[iv_extractor(b)] = b
476 476 return buckets
477 477
478 478
479 479 def get_cached_split_data(
480 480 request, stats_since, end_time, fn, cache_key, db_session=None, *args, **kwargs
481 481 ):
482 482 """ Takes "fn" that should return some data and tries to load the data
483 483 dividing it into 2 buckets - cached "since_from" bucket and "today"
484 484 bucket - then the data can be reduced into single value
485 485
486 486 Data is cached if the stats_since and end time give a delta bigger
487 487 than 24hours - then only 24h is computed on the fly
488 488 """
489 489 if not end_time:
490 490 end_time = datetime.utcnow().replace(second=0, microsecond=0)
491 491 delta = end_time - stats_since
492 492
493 493 if delta >= timedelta(minutes=60):
494 494 log.info(
495 495 "cached_split_data:{}: adjusting start time "
496 496 "for hourly or daily intervals".format(cache_key)
497 497 )
498 498 stats_since = stats_since.replace(hour=0, minute=0)
499 499
500 500 storage_key = "buckets_split_data:" + cache_key + ":{}|{}"
501 501 old_end_time = end_time.replace(hour=0, minute=0)
502 502
503 503 final_storage_key = storage_key.format(delta.total_seconds(), old_end_time)
504 504 older_data = None
505 505
506 506 cdata = request.registry.cache_regions.redis_day_7.get(final_storage_key)
507 507
508 508 if cdata:
509 509 log.info("cached_split_data:{}: found old " "bucket data".format(cache_key))
510 510 older_data = cdata
511 511
512 512 if stats_since < end_time - h.time_deltas.get("24h")["delta"] and not cdata:
513 513 log.info(
514 514 "cached_split_data:{}: didn't find the "
515 515 "start bucket in cache so load older data".format(cache_key)
516 516 )
517 517 recent_stats_since = old_end_time
518 518 older_data = fn(
519 519 request,
520 520 stats_since,
521 521 recent_stats_since,
522 522 db_session=db_session,
523 523 *args,
524 524 **kwargs
525 525 )
526 526 request.registry.cache_regions.redis_day_7.set(final_storage_key, older_data)
527 527 elif stats_since < end_time - h.time_deltas.get("24h")["delta"]:
528 528 recent_stats_since = old_end_time
529 529 else:
530 530 recent_stats_since = stats_since
531 531
532 532 log.info(
533 533 "cached_split_data:{}: loading fresh "
534 534 "data bucksts from last 24h ".format(cache_key)
535 535 )
536 536 todays_data = fn(
537 537 request, recent_stats_since, end_time, db_session=db_session, *args, **kwargs
538 538 )
539 539 return older_data, todays_data
540 540
541 541
542 542 def in_batches(seq, size):
543 543 """
544 544 Splits am iterable into batches of specified size
545 545 :param seq (iterable)
546 546 :param size integer
547 547 """
548 548 return (seq[pos : pos + size] for pos in range(0, len(seq), size))
549
550
551 def get_es_info(cache_regions, es_conn):
552 @cache_regions.memory_min_10.cache_on_arguments()
553 def get_es_info_cached():
554 returned_info = {"raw_info": es_conn.info()}
555 returned_info["version"] = returned_info["raw_info"]["version"]["number"].split('.')
556 return returned_info
557
558 return get_es_info_cached()
General Comments 0
You need to be logged in to leave comments. Login now