##// END OF EJS Templates
utils: added get_es_info function
ergo -
Show More
@@ -1,548 +1,558 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
3 # Copyright 2010 - 2017 RhodeCode GmbH and the AppEnlight project authors
4 #
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
7 # You may obtain a copy of the License at
8 #
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
10 #
11 # Unless required by applicable law or agreed to in writing, software
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
15 # limitations under the License.
16
16
17 """
17 """
18 Utility functions.
18 Utility functions.
19 """
19 """
20 import logging
20 import logging
21 import requests
21 import requests
22 import hashlib
22 import hashlib
23 import json
23 import json
24 import copy
24 import copy
25 import uuid
25 import uuid
26 import appenlight.lib.helpers as h
26 import appenlight.lib.helpers as h
27 from collections import namedtuple
27 from collections import namedtuple
28 from datetime import timedelta, datetime, date
28 from datetime import timedelta, datetime, date
29 from dogpile.cache.api import NO_VALUE
29 from dogpile.cache.api import NO_VALUE
30 from appenlight.models import Datastores
30 from appenlight.models import Datastores
31 from appenlight.validators import LogSearchSchema, TagListSchema, accepted_search_params
31 from appenlight.validators import LogSearchSchema, TagListSchema, accepted_search_params
32 from itsdangerous import TimestampSigner
32 from itsdangerous import TimestampSigner
33 from ziggurat_foundations.permissions import ALL_PERMISSIONS
33 from ziggurat_foundations.permissions import ALL_PERMISSIONS
34 from ziggurat_foundations.models.services.user import UserService
34 from ziggurat_foundations.models.services.user import UserService
35 from dateutil.relativedelta import relativedelta
35 from dateutil.relativedelta import relativedelta
36 from dateutil.rrule import rrule, MONTHLY, DAILY
36 from dateutil.rrule import rrule, MONTHLY, DAILY
37
37
38 log = logging.getLogger(__name__)
38 log = logging.getLogger(__name__)
39
39
40
40
41 Stat = namedtuple("Stat", "start_interval value")
41 Stat = namedtuple("Stat", "start_interval value")
42
42
43
43
44 def default_extractor(item):
44 def default_extractor(item):
45 """
45 """
46 :param item - item to extract date from
46 :param item - item to extract date from
47 """
47 """
48 if hasattr(item, "start_interval"):
48 if hasattr(item, "start_interval"):
49 return item.start_interval
49 return item.start_interval
50 return item["start_interval"]
50 return item["start_interval"]
51
51
52
52
53 # fast gap generator
53 # fast gap generator
54 def gap_gen_default(start, step, itemiterator, end_time=None, iv_extractor=None):
54 def gap_gen_default(start, step, itemiterator, end_time=None, iv_extractor=None):
55 """ generates a list of time/value items based on step and itemiterator
55 """ generates a list of time/value items based on step and itemiterator
56 if there are entries missing from iterator time/None will be returned
56 if there are entries missing from iterator time/None will be returned
57 instead
57 instead
58 :param start - datetime - what time should we start generating our values
58 :param start - datetime - what time should we start generating our values
59 :param step - timedelta - stepsize
59 :param step - timedelta - stepsize
60 :param itemiterator - iterable - we will check this iterable for values
60 :param itemiterator - iterable - we will check this iterable for values
61 corresponding to generated steps
61 corresponding to generated steps
62 :param end_time - datetime - when last step is >= end_time stop iterating
62 :param end_time - datetime - when last step is >= end_time stop iterating
63 :param iv_extractor - extracts current step from iterable items
63 :param iv_extractor - extracts current step from iterable items
64 """
64 """
65
65
66 if not iv_extractor:
66 if not iv_extractor:
67 iv_extractor = default_extractor
67 iv_extractor = default_extractor
68
68
69 next_step = start
69 next_step = start
70 minutes = step.total_seconds() / 60.0
70 minutes = step.total_seconds() / 60.0
71 while next_step.minute % minutes != 0:
71 while next_step.minute % minutes != 0:
72 next_step = next_step.replace(minute=next_step.minute - 1)
72 next_step = next_step.replace(minute=next_step.minute - 1)
73 for item in itemiterator:
73 for item in itemiterator:
74 item_start_interval = iv_extractor(item)
74 item_start_interval = iv_extractor(item)
75 # do we have a match for current time step in our data?
75 # do we have a match for current time step in our data?
76 # no gen a new tuple with 0 values
76 # no gen a new tuple with 0 values
77 while next_step < item_start_interval:
77 while next_step < item_start_interval:
78 yield Stat(next_step, None)
78 yield Stat(next_step, None)
79 next_step = next_step + step
79 next_step = next_step + step
80 if next_step == item_start_interval:
80 if next_step == item_start_interval:
81 yield Stat(item_start_interval, item)
81 yield Stat(item_start_interval, item)
82 next_step = next_step + step
82 next_step = next_step + step
83 if end_time:
83 if end_time:
84 while next_step < end_time:
84 while next_step < end_time:
85 yield Stat(next_step, None)
85 yield Stat(next_step, None)
86 next_step = next_step + step
86 next_step = next_step + step
87
87
88
88
89 class DateTimeEncoder(json.JSONEncoder):
89 class DateTimeEncoder(json.JSONEncoder):
90 """ Simple datetime to ISO encoder for json serialization"""
90 """ Simple datetime to ISO encoder for json serialization"""
91
91
92 def default(self, obj):
92 def default(self, obj):
93 if isinstance(obj, date):
93 if isinstance(obj, date):
94 return obj.isoformat()
94 return obj.isoformat()
95 if isinstance(obj, datetime):
95 if isinstance(obj, datetime):
96 return obj.isoformat()
96 return obj.isoformat()
97 return json.JSONEncoder.default(self, obj)
97 return json.JSONEncoder.default(self, obj)
98
98
99
99
100 def channelstream_request(
100 def channelstream_request(
101 secret, endpoint, payload, throw_exceptions=False, servers=None
101 secret, endpoint, payload, throw_exceptions=False, servers=None
102 ):
102 ):
103 responses = []
103 responses = []
104 if not servers:
104 if not servers:
105 servers = []
105 servers = []
106
106
107 signer = TimestampSigner(secret)
107 signer = TimestampSigner(secret)
108 sig_for_server = signer.sign(endpoint)
108 sig_for_server = signer.sign(endpoint)
109 for secret, server in [(s["secret"], s["server"]) for s in servers]:
109 for secret, server in [(s["secret"], s["server"]) for s in servers]:
110 response = {}
110 response = {}
111 secret_headers = {
111 secret_headers = {
112 "x-channelstream-secret": sig_for_server,
112 "x-channelstream-secret": sig_for_server,
113 "x-channelstream-endpoint": endpoint,
113 "x-channelstream-endpoint": endpoint,
114 "Content-Type": "application/json",
114 "Content-Type": "application/json",
115 }
115 }
116 url = "%s%s" % (server, endpoint)
116 url = "%s%s" % (server, endpoint)
117 try:
117 try:
118 response = requests.post(
118 response = requests.post(
119 url,
119 url,
120 data=json.dumps(payload, cls=DateTimeEncoder),
120 data=json.dumps(payload, cls=DateTimeEncoder),
121 headers=secret_headers,
121 headers=secret_headers,
122 verify=False,
122 verify=False,
123 timeout=2,
123 timeout=2,
124 ).json()
124 ).json()
125 except requests.exceptions.RequestException as e:
125 except requests.exceptions.RequestException as e:
126 if throw_exceptions:
126 if throw_exceptions:
127 raise
127 raise
128 responses.append(response)
128 responses.append(response)
129 return responses
129 return responses
130
130
131
131
132 def add_cors_headers(response):
132 def add_cors_headers(response):
133 # allow CORS
133 # allow CORS
134 response.headers.add("Access-Control-Allow-Origin", "*")
134 response.headers.add("Access-Control-Allow-Origin", "*")
135 response.headers.add("XDomainRequestAllowed", "1")
135 response.headers.add("XDomainRequestAllowed", "1")
136 response.headers.add("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
136 response.headers.add("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
137 # response.headers.add('Access-Control-Allow-Credentials', 'true')
137 # response.headers.add('Access-Control-Allow-Credentials', 'true')
138 response.headers.add(
138 response.headers.add(
139 "Access-Control-Allow-Headers",
139 "Access-Control-Allow-Headers",
140 "Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie",
140 "Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, If-Modified-Since, X-File-Name, Cache-Control, Pragma, Origin, Connection, Referer, Cookie",
141 )
141 )
142 response.headers.add("Access-Control-Max-Age", "86400")
142 response.headers.add("Access-Control-Max-Age", "86400")
143
143
144
144
145 from sqlalchemy.sql import compiler
145 from sqlalchemy.sql import compiler
146 from psycopg2.extensions import adapt as sqlescape
146 from psycopg2.extensions import adapt as sqlescape
147
147
148
148
149 # or use the appropiate escape function from your db driver
149 # or use the appropiate escape function from your db driver
150
150
151
151
152 def compile_query(query):
152 def compile_query(query):
153 dialect = query.session.bind.dialect
153 dialect = query.session.bind.dialect
154 statement = query.statement
154 statement = query.statement
155 comp = compiler.SQLCompiler(dialect, statement)
155 comp = compiler.SQLCompiler(dialect, statement)
156 comp.compile()
156 comp.compile()
157 enc = dialect.encoding
157 enc = dialect.encoding
158 params = {}
158 params = {}
159 for k, v in comp.params.items():
159 for k, v in comp.params.items():
160 if isinstance(v, str):
160 if isinstance(v, str):
161 v = v.encode(enc)
161 v = v.encode(enc)
162 params[k] = sqlescape(v)
162 params[k] = sqlescape(v)
163 return (comp.string.encode(enc) % params).decode(enc)
163 return (comp.string.encode(enc) % params).decode(enc)
164
164
165
165
166 def convert_es_type(input_data):
166 def convert_es_type(input_data):
167 """
167 """
168 This might need to convert some text or other types to corresponding ES types
168 This might need to convert some text or other types to corresponding ES types
169 """
169 """
170 return str(input_data)
170 return str(input_data)
171
171
172
172
173 ProtoVersion = namedtuple("ProtoVersion", ["major", "minor", "patch"])
173 ProtoVersion = namedtuple("ProtoVersion", ["major", "minor", "patch"])
174
174
175
175
176 def parse_proto(input_data):
176 def parse_proto(input_data):
177 try:
177 try:
178 parts = [int(x) for x in input_data.split(".")]
178 parts = [int(x) for x in input_data.split(".")]
179 while len(parts) < 3:
179 while len(parts) < 3:
180 parts.append(0)
180 parts.append(0)
181 return ProtoVersion(*parts)
181 return ProtoVersion(*parts)
182 except Exception as e:
182 except Exception as e:
183 log.info("Unknown protocol version: %s" % e)
183 log.info("Unknown protocol version: %s" % e)
184 return ProtoVersion(99, 99, 99)
184 return ProtoVersion(99, 99, 99)
185
185
186
186
187 def es_index_name_limiter(
187 def es_index_name_limiter(
188 start_date=None, end_date=None, months_in_past=6, ixtypes=None
188 start_date=None, end_date=None, months_in_past=6, ixtypes=None
189 ):
189 ):
190 """
190 """
191 This function limits the search to 6 months by default so we don't have to
191 This function limits the search to 6 months by default so we don't have to
192 query 300 elasticsearch indices for 20 years of historical data for example
192 query 300 elasticsearch indices for 20 years of historical data for example
193 """
193 """
194
194
195 # should be cached later
195 # should be cached later
196 def get_possible_names():
196 def get_possible_names():
197 return list(Datastores.es.indices.get_alias("*"))
197 return list(Datastores.es.indices.get_alias("*"))
198
198
199 possible_names = get_possible_names()
199 possible_names = get_possible_names()
200 es_index_types = []
200 es_index_types = []
201 if not ixtypes:
201 if not ixtypes:
202 ixtypes = ["reports", "metrics", "logs"]
202 ixtypes = ["reports", "metrics", "logs"]
203 for t in ixtypes:
203 for t in ixtypes:
204 if t == "reports":
204 if t == "reports":
205 es_index_types.append("rcae_r_%s")
205 es_index_types.append("rcae_r_%s")
206 elif t == "logs":
206 elif t == "logs":
207 es_index_types.append("rcae_l_%s")
207 es_index_types.append("rcae_l_%s")
208 elif t == "metrics":
208 elif t == "metrics":
209 es_index_types.append("rcae_m_%s")
209 es_index_types.append("rcae_m_%s")
210 elif t == "uptime":
210 elif t == "uptime":
211 es_index_types.append("rcae_u_%s")
211 es_index_types.append("rcae_u_%s")
212 elif t == "slow_calls":
212 elif t == "slow_calls":
213 es_index_types.append("rcae_sc_%s")
213 es_index_types.append("rcae_sc_%s")
214
214
215 if start_date:
215 if start_date:
216 start_date = copy.copy(start_date)
216 start_date = copy.copy(start_date)
217 else:
217 else:
218 if not end_date:
218 if not end_date:
219 end_date = datetime.utcnow()
219 end_date = datetime.utcnow()
220 start_date = end_date + relativedelta(months=months_in_past * -1)
220 start_date = end_date + relativedelta(months=months_in_past * -1)
221
221
222 if not end_date:
222 if not end_date:
223 end_date = start_date + relativedelta(months=months_in_past)
223 end_date = start_date + relativedelta(months=months_in_past)
224
224
225 index_dates = list(
225 index_dates = list(
226 rrule(
226 rrule(
227 MONTHLY,
227 MONTHLY,
228 dtstart=start_date.date().replace(day=1),
228 dtstart=start_date.date().replace(day=1),
229 until=end_date.date(),
229 until=end_date.date(),
230 count=36,
230 count=36,
231 )
231 )
232 )
232 )
233 index_names = []
233 index_names = []
234 for ix_type in es_index_types:
234 for ix_type in es_index_types:
235 to_extend = [
235 to_extend = [
236 ix_type % d.strftime("%Y_%m")
236 ix_type % d.strftime("%Y_%m")
237 for d in index_dates
237 for d in index_dates
238 if ix_type % d.strftime("%Y_%m") in possible_names
238 if ix_type % d.strftime("%Y_%m") in possible_names
239 ]
239 ]
240 index_names.extend(to_extend)
240 index_names.extend(to_extend)
241 for day in list(
241 for day in list(
242 rrule(DAILY, dtstart=start_date.date(), until=end_date.date(), count=366)
242 rrule(DAILY, dtstart=start_date.date(), until=end_date.date(), count=366)
243 ):
243 ):
244 ix_name = ix_type % day.strftime("%Y_%m_%d")
244 ix_name = ix_type % day.strftime("%Y_%m_%d")
245 if ix_name in possible_names:
245 if ix_name in possible_names:
246 index_names.append(ix_name)
246 index_names.append(ix_name)
247 return index_names
247 return index_names
248
248
249
249
250 def build_filter_settings_from_query_dict(
250 def build_filter_settings_from_query_dict(
251 request, params=None, override_app_ids=None, resource_permissions=None
251 request, params=None, override_app_ids=None, resource_permissions=None
252 ):
252 ):
253 """
253 """
254 Builds list of normalized search terms for ES from query params
254 Builds list of normalized search terms for ES from query params
255 ensuring application list is restricted to only applications user
255 ensuring application list is restricted to only applications user
256 has access to
256 has access to
257
257
258 :param params (dictionary)
258 :param params (dictionary)
259 :param override_app_ids - list of application id's to use instead of
259 :param override_app_ids - list of application id's to use instead of
260 applications user normally has access to
260 applications user normally has access to
261 """
261 """
262 params = copy.deepcopy(params)
262 params = copy.deepcopy(params)
263 applications = []
263 applications = []
264 if not resource_permissions:
264 if not resource_permissions:
265 resource_permissions = ["view"]
265 resource_permissions = ["view"]
266
266
267 if request.user:
267 if request.user:
268 applications = UserService.resources_with_perms(
268 applications = UserService.resources_with_perms(
269 request.user, resource_permissions, resource_types=["application"]
269 request.user, resource_permissions, resource_types=["application"]
270 )
270 )
271
271
272 # CRITICAL - this ensures our resultset is limited to only the ones
272 # CRITICAL - this ensures our resultset is limited to only the ones
273 # user has view permissions
273 # user has view permissions
274 all_possible_app_ids = set([app.resource_id for app in applications])
274 all_possible_app_ids = set([app.resource_id for app in applications])
275
275
276 # if override is preset we force permission for app to be present
276 # if override is preset we force permission for app to be present
277 # this allows users to see dashboards and applications they would
277 # this allows users to see dashboards and applications they would
278 # normally not be able to
278 # normally not be able to
279
279
280 if override_app_ids:
280 if override_app_ids:
281 all_possible_app_ids = set(override_app_ids)
281 all_possible_app_ids = set(override_app_ids)
282
282
283 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
283 schema = LogSearchSchema().bind(resources=all_possible_app_ids)
284 tag_schema = TagListSchema()
284 tag_schema = TagListSchema()
285 filter_settings = schema.deserialize(params)
285 filter_settings = schema.deserialize(params)
286 tag_list = []
286 tag_list = []
287 for k, v in list(filter_settings.items()):
287 for k, v in list(filter_settings.items()):
288 if k in accepted_search_params:
288 if k in accepted_search_params:
289 continue
289 continue
290 tag_list.append({"name": k, "value": v, "op": "eq"})
290 tag_list.append({"name": k, "value": v, "op": "eq"})
291 # remove the key from filter_settings
291 # remove the key from filter_settings
292 filter_settings.pop(k, None)
292 filter_settings.pop(k, None)
293 tags = tag_schema.deserialize(tag_list)
293 tags = tag_schema.deserialize(tag_list)
294 filter_settings["tags"] = tags
294 filter_settings["tags"] = tags
295 return filter_settings
295 return filter_settings
296
296
297
297
298 def gen_uuid():
298 def gen_uuid():
299 return str(uuid.uuid4())
299 return str(uuid.uuid4())
300
300
301
301
302 def gen_uuid4_sha_hex():
302 def gen_uuid4_sha_hex():
303 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
303 return hashlib.sha1(uuid.uuid4().bytes).hexdigest()
304
304
305
305
306 def permission_tuple_to_dict(data):
306 def permission_tuple_to_dict(data):
307 out = {
307 out = {
308 "user_name": None,
308 "user_name": None,
309 "perm_name": data.perm_name,
309 "perm_name": data.perm_name,
310 "owner": data.owner,
310 "owner": data.owner,
311 "type": data.type,
311 "type": data.type,
312 "resource_name": None,
312 "resource_name": None,
313 "resource_type": None,
313 "resource_type": None,
314 "resource_id": None,
314 "resource_id": None,
315 "group_name": None,
315 "group_name": None,
316 "group_id": None,
316 "group_id": None,
317 }
317 }
318 if data.user:
318 if data.user:
319 out["user_name"] = data.user.user_name
319 out["user_name"] = data.user.user_name
320 if data.perm_name == ALL_PERMISSIONS:
320 if data.perm_name == ALL_PERMISSIONS:
321 out["perm_name"] = "__all_permissions__"
321 out["perm_name"] = "__all_permissions__"
322 if data.resource:
322 if data.resource:
323 out["resource_name"] = data.resource.resource_name
323 out["resource_name"] = data.resource.resource_name
324 out["resource_type"] = data.resource.resource_type
324 out["resource_type"] = data.resource.resource_type
325 out["resource_id"] = data.resource.resource_id
325 out["resource_id"] = data.resource.resource_id
326 if data.group:
326 if data.group:
327 out["group_name"] = data.group.group_name
327 out["group_name"] = data.group.group_name
328 out["group_id"] = data.group.id
328 out["group_id"] = data.group.id
329 return out
329 return out
330
330
331
331
332 def get_cached_buckets(
332 def get_cached_buckets(
333 request,
333 request,
334 stats_since,
334 stats_since,
335 end_time,
335 end_time,
336 fn,
336 fn,
337 cache_key,
337 cache_key,
338 gap_gen=None,
338 gap_gen=None,
339 db_session=None,
339 db_session=None,
340 step_interval=None,
340 step_interval=None,
341 iv_extractor=None,
341 iv_extractor=None,
342 rerange=False,
342 rerange=False,
343 *args,
343 *args,
344 **kwargs
344 **kwargs
345 ):
345 ):
346 """ Takes "fn" that should return some data and tries to load the data
346 """ Takes "fn" that should return some data and tries to load the data
347 dividing it into daily buckets - if the stats_since and end time give a
347 dividing it into daily buckets - if the stats_since and end time give a
348 delta bigger than 24hours, then only "todays" data is computed on the fly
348 delta bigger than 24hours, then only "todays" data is computed on the fly
349
349
350 :param request: (request) request object
350 :param request: (request) request object
351 :param stats_since: (datetime) start date of buckets range
351 :param stats_since: (datetime) start date of buckets range
352 :param end_time: (datetime) end date of buckets range - utcnow() if None
352 :param end_time: (datetime) end date of buckets range - utcnow() if None
353 :param fn: (callable) callable to use to populate buckets should have
353 :param fn: (callable) callable to use to populate buckets should have
354 following signature:
354 following signature:
355 def get_data(request, since_when, until, *args, **kwargs):
355 def get_data(request, since_when, until, *args, **kwargs):
356
356
357 :param cache_key: (string) cache key that will be used to build bucket
357 :param cache_key: (string) cache key that will be used to build bucket
358 caches
358 caches
359 :param gap_gen: (callable) gap generator - should return step intervals
359 :param gap_gen: (callable) gap generator - should return step intervals
360 to use with out `fn` callable
360 to use with out `fn` callable
361 :param db_session: (Session) sqlalchemy session
361 :param db_session: (Session) sqlalchemy session
362 :param step_interval: (timedelta) optional step interval if we want to
362 :param step_interval: (timedelta) optional step interval if we want to
363 override the default determined from total start/end time delta
363 override the default determined from total start/end time delta
364 :param iv_extractor: (callable) used to get step intervals from data
364 :param iv_extractor: (callable) used to get step intervals from data
365 returned by `fn` callable
365 returned by `fn` callable
366 :param rerange: (bool) handy if we want to change ranges from hours to
366 :param rerange: (bool) handy if we want to change ranges from hours to
367 days when cached data is missing - will shorten execution time if `fn`
367 days when cached data is missing - will shorten execution time if `fn`
368 callable supports that and we are working with multiple rows - like metrics
368 callable supports that and we are working with multiple rows - like metrics
369 :param args:
369 :param args:
370 :param kwargs:
370 :param kwargs:
371
371
372 :return: iterable
372 :return: iterable
373 """
373 """
374 if not end_time:
374 if not end_time:
375 end_time = datetime.utcnow().replace(second=0, microsecond=0)
375 end_time = datetime.utcnow().replace(second=0, microsecond=0)
376 delta = end_time - stats_since
376 delta = end_time - stats_since
377 # if smaller than 3 days we want to group by 5min else by 1h,
377 # if smaller than 3 days we want to group by 5min else by 1h,
378 # for 60 min group by min
378 # for 60 min group by min
379 if not gap_gen:
379 if not gap_gen:
380 gap_gen = gap_gen_default
380 gap_gen = gap_gen_default
381 if not iv_extractor:
381 if not iv_extractor:
382 iv_extractor = default_extractor
382 iv_extractor = default_extractor
383
383
384 # do not use custom interval if total time range with new iv would exceed
384 # do not use custom interval if total time range with new iv would exceed
385 # end time
385 # end time
386 if not step_interval or stats_since + step_interval >= end_time:
386 if not step_interval or stats_since + step_interval >= end_time:
387 if delta < h.time_deltas.get("12h")["delta"]:
387 if delta < h.time_deltas.get("12h")["delta"]:
388 step_interval = timedelta(seconds=60)
388 step_interval = timedelta(seconds=60)
389 elif delta < h.time_deltas.get("3d")["delta"]:
389 elif delta < h.time_deltas.get("3d")["delta"]:
390 step_interval = timedelta(seconds=60 * 5)
390 step_interval = timedelta(seconds=60 * 5)
391 elif delta > h.time_deltas.get("2w")["delta"]:
391 elif delta > h.time_deltas.get("2w")["delta"]:
392 step_interval = timedelta(days=1)
392 step_interval = timedelta(days=1)
393 else:
393 else:
394 step_interval = timedelta(minutes=60)
394 step_interval = timedelta(minutes=60)
395
395
396 if step_interval >= timedelta(minutes=60):
396 if step_interval >= timedelta(minutes=60):
397 log.info(
397 log.info(
398 "cached_buckets:{}: adjusting start time "
398 "cached_buckets:{}: adjusting start time "
399 "for hourly or daily intervals".format(cache_key)
399 "for hourly or daily intervals".format(cache_key)
400 )
400 )
401 stats_since = stats_since.replace(hour=0, minute=0)
401 stats_since = stats_since.replace(hour=0, minute=0)
402
402
403 ranges = [
403 ranges = [
404 i.start_interval
404 i.start_interval
405 for i in list(gap_gen(stats_since, step_interval, [], end_time=end_time))
405 for i in list(gap_gen(stats_since, step_interval, [], end_time=end_time))
406 ]
406 ]
407 buckets = {}
407 buckets = {}
408 storage_key = "buckets:" + cache_key + "{}|{}"
408 storage_key = "buckets:" + cache_key + "{}|{}"
409 # this means we basicly cache per hour in 3-14 day intervals but i think
409 # this means we basicly cache per hour in 3-14 day intervals but i think
410 # its fine at this point - will be faster than db access anyways
410 # its fine at this point - will be faster than db access anyways
411
411
412 if len(ranges) >= 1:
412 if len(ranges) >= 1:
413 last_ranges = [ranges[-1]]
413 last_ranges = [ranges[-1]]
414 else:
414 else:
415 last_ranges = []
415 last_ranges = []
416 if step_interval >= timedelta(minutes=60):
416 if step_interval >= timedelta(minutes=60):
417 for r in ranges:
417 for r in ranges:
418 k = storage_key.format(step_interval.total_seconds(), r)
418 k = storage_key.format(step_interval.total_seconds(), r)
419 value = request.registry.cache_regions.redis_day_30.get(k)
419 value = request.registry.cache_regions.redis_day_30.get(k)
420 # last buckets are never loaded from cache
420 # last buckets are never loaded from cache
421 is_last_result = r >= end_time - timedelta(hours=6) or r in last_ranges
421 is_last_result = r >= end_time - timedelta(hours=6) or r in last_ranges
422 if value is not NO_VALUE and not is_last_result:
422 if value is not NO_VALUE and not is_last_result:
423 log.info(
423 log.info(
424 "cached_buckets:{}: "
424 "cached_buckets:{}: "
425 "loading range {} from cache".format(cache_key, r)
425 "loading range {} from cache".format(cache_key, r)
426 )
426 )
427 buckets[r] = value
427 buckets[r] = value
428 else:
428 else:
429 log.info(
429 log.info(
430 "cached_buckets:{}: "
430 "cached_buckets:{}: "
431 "loading range {} from storage".format(cache_key, r)
431 "loading range {} from storage".format(cache_key, r)
432 )
432 )
433 range_size = step_interval
433 range_size = step_interval
434 if (
434 if (
435 step_interval == timedelta(minutes=60)
435 step_interval == timedelta(minutes=60)
436 and not is_last_result
436 and not is_last_result
437 and rerange
437 and rerange
438 ):
438 ):
439 range_size = timedelta(days=1)
439 range_size = timedelta(days=1)
440 r = r.replace(hour=0, minute=0)
440 r = r.replace(hour=0, minute=0)
441 log.info(
441 log.info(
442 "cached_buckets:{}: "
442 "cached_buckets:{}: "
443 "loading collapsed "
443 "loading collapsed "
444 "range {} {}".format(cache_key, r, r + range_size)
444 "range {} {}".format(cache_key, r, r + range_size)
445 )
445 )
446 bucket_data = fn(
446 bucket_data = fn(
447 request,
447 request,
448 r,
448 r,
449 r + range_size,
449 r + range_size,
450 step_interval,
450 step_interval,
451 gap_gen,
451 gap_gen,
452 bucket_count=len(ranges),
452 bucket_count=len(ranges),
453 *args,
453 *args,
454 **kwargs
454 **kwargs
455 )
455 )
456 for b in bucket_data:
456 for b in bucket_data:
457 b_iv = iv_extractor(b)
457 b_iv = iv_extractor(b)
458 buckets[b_iv] = b
458 buckets[b_iv] = b
459 k2 = storage_key.format(step_interval.total_seconds(), b_iv)
459 k2 = storage_key.format(step_interval.total_seconds(), b_iv)
460 request.registry.cache_regions.redis_day_30.set(k2, b)
460 request.registry.cache_regions.redis_day_30.set(k2, b)
461 log.info("cached_buckets:{}: saving cache".format(cache_key))
461 log.info("cached_buckets:{}: saving cache".format(cache_key))
462 else:
462 else:
463 # bucket count is 1 for short time ranges <= 24h from now
463 # bucket count is 1 for short time ranges <= 24h from now
464 bucket_data = fn(
464 bucket_data = fn(
465 request,
465 request,
466 stats_since,
466 stats_since,
467 end_time,
467 end_time,
468 step_interval,
468 step_interval,
469 gap_gen,
469 gap_gen,
470 bucket_count=1,
470 bucket_count=1,
471 *args,
471 *args,
472 **kwargs
472 **kwargs
473 )
473 )
474 for b in bucket_data:
474 for b in bucket_data:
475 buckets[iv_extractor(b)] = b
475 buckets[iv_extractor(b)] = b
476 return buckets
476 return buckets
477
477
478
478
479 def get_cached_split_data(
479 def get_cached_split_data(
480 request, stats_since, end_time, fn, cache_key, db_session=None, *args, **kwargs
480 request, stats_since, end_time, fn, cache_key, db_session=None, *args, **kwargs
481 ):
481 ):
482 """ Takes "fn" that should return some data and tries to load the data
482 """ Takes "fn" that should return some data and tries to load the data
483 dividing it into 2 buckets - cached "since_from" bucket and "today"
483 dividing it into 2 buckets - cached "since_from" bucket and "today"
484 bucket - then the data can be reduced into single value
484 bucket - then the data can be reduced into single value
485
485
486 Data is cached if the stats_since and end time give a delta bigger
486 Data is cached if the stats_since and end time give a delta bigger
487 than 24hours - then only 24h is computed on the fly
487 than 24hours - then only 24h is computed on the fly
488 """
488 """
489 if not end_time:
489 if not end_time:
490 end_time = datetime.utcnow().replace(second=0, microsecond=0)
490 end_time = datetime.utcnow().replace(second=0, microsecond=0)
491 delta = end_time - stats_since
491 delta = end_time - stats_since
492
492
493 if delta >= timedelta(minutes=60):
493 if delta >= timedelta(minutes=60):
494 log.info(
494 log.info(
495 "cached_split_data:{}: adjusting start time "
495 "cached_split_data:{}: adjusting start time "
496 "for hourly or daily intervals".format(cache_key)
496 "for hourly or daily intervals".format(cache_key)
497 )
497 )
498 stats_since = stats_since.replace(hour=0, minute=0)
498 stats_since = stats_since.replace(hour=0, minute=0)
499
499
500 storage_key = "buckets_split_data:" + cache_key + ":{}|{}"
500 storage_key = "buckets_split_data:" + cache_key + ":{}|{}"
501 old_end_time = end_time.replace(hour=0, minute=0)
501 old_end_time = end_time.replace(hour=0, minute=0)
502
502
503 final_storage_key = storage_key.format(delta.total_seconds(), old_end_time)
503 final_storage_key = storage_key.format(delta.total_seconds(), old_end_time)
504 older_data = None
504 older_data = None
505
505
506 cdata = request.registry.cache_regions.redis_day_7.get(final_storage_key)
506 cdata = request.registry.cache_regions.redis_day_7.get(final_storage_key)
507
507
508 if cdata:
508 if cdata:
509 log.info("cached_split_data:{}: found old " "bucket data".format(cache_key))
509 log.info("cached_split_data:{}: found old " "bucket data".format(cache_key))
510 older_data = cdata
510 older_data = cdata
511
511
512 if stats_since < end_time - h.time_deltas.get("24h")["delta"] and not cdata:
512 if stats_since < end_time - h.time_deltas.get("24h")["delta"] and not cdata:
513 log.info(
513 log.info(
514 "cached_split_data:{}: didn't find the "
514 "cached_split_data:{}: didn't find the "
515 "start bucket in cache so load older data".format(cache_key)
515 "start bucket in cache so load older data".format(cache_key)
516 )
516 )
517 recent_stats_since = old_end_time
517 recent_stats_since = old_end_time
518 older_data = fn(
518 older_data = fn(
519 request,
519 request,
520 stats_since,
520 stats_since,
521 recent_stats_since,
521 recent_stats_since,
522 db_session=db_session,
522 db_session=db_session,
523 *args,
523 *args,
524 **kwargs
524 **kwargs
525 )
525 )
526 request.registry.cache_regions.redis_day_7.set(final_storage_key, older_data)
526 request.registry.cache_regions.redis_day_7.set(final_storage_key, older_data)
527 elif stats_since < end_time - h.time_deltas.get("24h")["delta"]:
527 elif stats_since < end_time - h.time_deltas.get("24h")["delta"]:
528 recent_stats_since = old_end_time
528 recent_stats_since = old_end_time
529 else:
529 else:
530 recent_stats_since = stats_since
530 recent_stats_since = stats_since
531
531
532 log.info(
532 log.info(
533 "cached_split_data:{}: loading fresh "
533 "cached_split_data:{}: loading fresh "
534 "data bucksts from last 24h ".format(cache_key)
534 "data bucksts from last 24h ".format(cache_key)
535 )
535 )
536 todays_data = fn(
536 todays_data = fn(
537 request, recent_stats_since, end_time, db_session=db_session, *args, **kwargs
537 request, recent_stats_since, end_time, db_session=db_session, *args, **kwargs
538 )
538 )
539 return older_data, todays_data
539 return older_data, todays_data
540
540
541
541
542 def in_batches(seq, size):
542 def in_batches(seq, size):
543 """
543 """
544 Splits am iterable into batches of specified size
544 Splits am iterable into batches of specified size
545 :param seq (iterable)
545 :param seq (iterable)
546 :param size integer
546 :param size integer
547 """
547 """
548 return (seq[pos : pos + size] for pos in range(0, len(seq), size))
548 return (seq[pos : pos + size] for pos in range(0, len(seq), size))
549
550
551 def get_es_info(cache_regions, es_conn):
552 @cache_regions.memory_min_10.cache_on_arguments()
553 def get_es_info_cached():
554 returned_info = {"raw_info": es_conn.info()}
555 returned_info["version"] = returned_info["raw_info"]["version"]["number"].split('.')
556 return returned_info
557
558 return get_es_info_cached()
General Comments 0
You need to be logged in to leave comments. Login now