##// END OF EJS Templates
api: params should be plain dictionary
ergo -
Show More
@@ -1,438 +1,438 b''
1 1 # -*- coding: utf-8 -*-
2 2
3 3 # Copyright (C) 2010-2016 RhodeCode GmbH
4 4 #
5 5 # This program is free software: you can redistribute it and/or modify
6 6 # it under the terms of the GNU Affero General Public License, version 3
7 7 # (only), as published by the Free Software Foundation.
8 8 #
9 9 # This program is distributed in the hope that it will be useful,
10 10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 12 # GNU General Public License for more details.
13 13 #
14 14 # You should have received a copy of the GNU Affero General Public License
15 15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 16 #
17 17 # This program is dual-licensed. If you wish to learn more about the
18 18 # AppEnlight Enterprise Edition, including its added features, Support
19 19 # services, and proprietary license terms, please see
20 20 # https://rhodecode.com/licenses/
21 21
22 22 import base64
23 23 import io
24 24 import datetime
25 25 import json
26 26 import logging
27 27 import urllib.request, urllib.parse, urllib.error
28 28 import zlib
29 29
30 30 from gzip import GzipFile
31 31 from pyramid.view import view_config
32 32 from pyramid.httpexceptions import HTTPBadRequest
33 33
34 34 import appenlight.celery.tasks as tasks
35 35 from appenlight.lib.api import rate_limiting, check_cors
36 36 from appenlight.lib.enums import ParsedSentryEventType
37 37 from appenlight.lib.utils import parse_proto
38 38 from appenlight.lib.utils.airbrake import parse_airbrake_xml
39 39 from appenlight.lib.utils.date_utils import convert_date
40 40 from appenlight.lib.utils.sentry import parse_sentry_event
41 41 from appenlight.lib.request import JSONException
42 42 from appenlight.validators import (LogListSchema,
43 43 MetricsListSchema,
44 44 GeneralMetricsListSchema,
45 45 GeneralMetricsPermanentListSchema,
46 46 GeneralMetricSchema,
47 47 GeneralMetricPermanentSchema,
48 48 LogListPermanentSchema,
49 49 ReportListSchema_0_5,
50 50 LogSchema,
51 51 LogSchemaPermanent,
52 52 ReportSchema_0_5)
53 53
54 54 log = logging.getLogger(__name__)
55 55
56 56
57 57 @view_config(route_name='api_logs', renderer='string', permission='create',
58 58 require_csrf=False)
59 59 @view_config(route_name='api_log', renderer='string', permission='create',
60 60 require_csrf=False)
61 61 def logs_create(request):
62 62 """
63 63 Endpoint for log aggregation
64 64 """
65 65 application = request.context.resource
66 66 if request.method.upper() == 'OPTIONS':
67 67 return check_cors(request, application)
68 68 else:
69 69 check_cors(request, application, should_return=False)
70 70
71 71 params = dict(request.params.copy())
72 72 proto_version = parse_proto(params.get('protocol_version', ''))
73 73 payload = request.unsafe_json_body
74 74 sequence_accepted = request.matched_route.name == 'api_logs'
75 75
76 76 if sequence_accepted:
77 77 if application.allow_permanent_storage:
78 78 schema = LogListPermanentSchema().bind(
79 79 utcnow=datetime.datetime.utcnow())
80 80 else:
81 81 schema = LogListSchema().bind(
82 82 utcnow=datetime.datetime.utcnow())
83 83 else:
84 84 if application.allow_permanent_storage:
85 85 schema = LogSchemaPermanent().bind(
86 86 utcnow=datetime.datetime.utcnow())
87 87 else:
88 88 schema = LogSchema().bind(
89 89 utcnow=datetime.datetime.utcnow())
90 90
91 91 deserialized_logs = schema.deserialize(payload)
92 92 if sequence_accepted is False:
93 93 deserialized_logs = [deserialized_logs]
94 94
95 95 rate_limiting(request, application, 'per_application_logs_rate_limit',
96 96 len(deserialized_logs))
97 97
98 98 # pprint.pprint(deserialized_logs)
99 99
100 100 # we need to split those out so we can process the pkey ones one by one
101 101 non_pkey_logs = [log_dict for log_dict in deserialized_logs
102 102 if not log_dict['primary_key']]
103 103 pkey_dict = {}
104 104 # try to process the logs as best as we can and group together to reduce
105 105 # the amount of
106 106 for log_dict in deserialized_logs:
107 107 if log_dict['primary_key']:
108 108 key = (log_dict['primary_key'], log_dict['namespace'],)
109 109 if not key in pkey_dict:
110 110 pkey_dict[key] = []
111 111 pkey_dict[key].append(log_dict)
112 112
113 113 if non_pkey_logs:
114 114 log.debug('%s non-pkey logs received: %s' % (application,
115 115 len(non_pkey_logs)))
116 116 tasks.add_logs.delay(application.resource_id, params, non_pkey_logs)
117 117 if pkey_dict:
118 118 logs_to_insert = []
119 119 for primary_key_tuple, payload in pkey_dict.items():
120 120 sorted_logs = sorted(payload, key=lambda x: x['date'])
121 121 logs_to_insert.append(sorted_logs[-1])
122 122 log.debug('%s pkey logs received: %s' % (application,
123 123 len(logs_to_insert)))
124 124 tasks.add_logs.delay(application.resource_id, params, logs_to_insert)
125 125
126 126 log.info('LOG call %s %s client:%s' % (
127 127 application, proto_version, request.headers.get('user_agent')))
128 128 return 'OK: Logs accepted'
129 129
130 130
131 131 @view_config(route_name='api_request_stats', renderer='string',
132 132 permission='create', require_csrf=False)
133 133 @view_config(route_name='api_metrics', renderer='string',
134 134 permission='create', require_csrf=False)
135 135 def request_metrics_create(request):
136 136 """
137 137 Endpoint for performance metrics, aggregates view performance stats
138 138 and converts them to general metric row
139 139 """
140 140 application = request.context.resource
141 141 if request.method.upper() == 'OPTIONS':
142 142 return check_cors(request, application)
143 143 else:
144 144 check_cors(request, application, should_return=False)
145 145
146 146 params = dict(request.params.copy())
147 147 proto_version = parse_proto(params.get('protocol_version', ''))
148 148
149 149 payload = request.unsafe_json_body
150 150 schema = MetricsListSchema()
151 151 dataset = schema.deserialize(payload)
152 152
153 153 rate_limiting(request, application, 'per_application_metrics_rate_limit',
154 154 len(dataset))
155 155
156 156 # looping report data
157 157 metrics = {}
158 158 for metric in dataset:
159 159 server_name = metric.get('server', '').lower() or 'unknown'
160 160 start_interval = convert_date(metric['timestamp'])
161 161 start_interval = start_interval.replace(second=0, microsecond=0)
162 162
163 163 for view_name, view_metrics in metric['metrics']:
164 164 key = '%s%s%s' % (metric['server'], start_interval, view_name)
165 165 if start_interval not in metrics:
166 166 metrics[key] = {"requests": 0, "main": 0, "sql": 0,
167 167 "nosql": 0, "remote": 0, "tmpl": 0,
168 168 "custom": 0, 'sql_calls': 0,
169 169 'nosql_calls': 0,
170 170 'remote_calls': 0, 'tmpl_calls': 0,
171 171 'custom_calls': 0,
172 172 "start_interval": start_interval,
173 173 "server_name": server_name,
174 174 "view_name": view_name
175 175 }
176 176 metrics[key]["requests"] += int(view_metrics['requests'])
177 177 metrics[key]["main"] += round(view_metrics['main'], 5)
178 178 metrics[key]["sql"] += round(view_metrics['sql'], 5)
179 179 metrics[key]["nosql"] += round(view_metrics['nosql'], 5)
180 180 metrics[key]["remote"] += round(view_metrics['remote'], 5)
181 181 metrics[key]["tmpl"] += round(view_metrics['tmpl'], 5)
182 182 metrics[key]["custom"] += round(view_metrics.get('custom', 0.0),
183 183 5)
184 184 metrics[key]["sql_calls"] += int(
185 185 view_metrics.get('sql_calls', 0))
186 186 metrics[key]["nosql_calls"] += int(
187 187 view_metrics.get('nosql_calls', 0))
188 188 metrics[key]["remote_calls"] += int(
189 189 view_metrics.get('remote_calls', 0))
190 190 metrics[key]["tmpl_calls"] += int(
191 191 view_metrics.get('tmpl_calls', 0))
192 192 metrics[key]["custom_calls"] += int(
193 193 view_metrics.get('custom_calls', 0))
194 194
195 195 if not metrics[key]["requests"]:
196 196 # fix this here because validator can't
197 197 metrics[key]["requests"] = 1
198 198 # metrics dict is being built to minimize
199 199 # the amount of queries used
200 200 # in case we get multiple rows from same minute
201 201
202 202 normalized_metrics = []
203 203 for metric in metrics.values():
204 204 new_metric = {
205 205 'namespace': 'appenlight.request_metric',
206 206 'timestamp': metric.pop('start_interval'),
207 207 'server_name': metric['server_name'],
208 208 'tags': list(metric.items())
209 209 }
210 210 normalized_metrics.append(new_metric)
211 211
212 212 tasks.add_metrics.delay(application.resource_id, params,
213 213 normalized_metrics, proto_version)
214 214
215 215 log.info('REQUEST METRICS call {} {} client:{}'.format(
216 216 application.resource_name, proto_version,
217 217 request.headers.get('user_agent')))
218 218 return 'OK: request metrics accepted'
219 219
220 220
221 221 @view_config(route_name='api_general_metrics', renderer='string',
222 222 permission='create', require_csrf=False)
223 223 @view_config(route_name='api_general_metric', renderer='string',
224 224 permission='create', require_csrf=False)
225 225 def general_metrics_create(request):
226 226 """
227 227 Endpoint for general metrics aggregation
228 228 """
229 229 application = request.context.resource
230 230 if request.method.upper() == 'OPTIONS':
231 231 return check_cors(request, application)
232 232 else:
233 233 check_cors(request, application, should_return=False)
234 234
235 235 params = dict(request.params.copy())
236 236 proto_version = parse_proto(params.get('protocol_version', ''))
237 237 payload = request.unsafe_json_body
238 238 sequence_accepted = request.matched_route.name == 'api_general_metrics'
239 239 if sequence_accepted:
240 240 if application.allow_permanent_storage:
241 241 schema = GeneralMetricsPermanentListSchema().bind(
242 242 utcnow=datetime.datetime.utcnow())
243 243 else:
244 244 schema = GeneralMetricsListSchema().bind(
245 245 utcnow=datetime.datetime.utcnow())
246 246 else:
247 247 if application.allow_permanent_storage:
248 248 schema = GeneralMetricPermanentSchema().bind(
249 249 utcnow=datetime.datetime.utcnow())
250 250 else:
251 251 schema = GeneralMetricSchema().bind(
252 252 utcnow=datetime.datetime.utcnow())
253 253
254 254 deserialized_metrics = schema.deserialize(payload)
255 255 if sequence_accepted is False:
256 256 deserialized_metrics = [deserialized_metrics]
257 257
258 258 rate_limiting(request, application, 'per_application_metrics_rate_limit',
259 259 len(deserialized_metrics))
260 260
261 261 tasks.add_metrics.delay(application.resource_id, params,
262 262 deserialized_metrics, proto_version)
263 263
264 264 log.info('METRICS call {} {} client:{}'.format(
265 265 application.resource_name, proto_version,
266 266 request.headers.get('user_agent')))
267 267 return 'OK: Metrics accepted'
268 268
269 269
270 270 @view_config(route_name='api_reports', renderer='string', permission='create',
271 271 require_csrf=False)
272 272 @view_config(route_name='api_slow_reports', renderer='string',
273 273 permission='create', require_csrf=False)
274 274 @view_config(route_name='api_report', renderer='string', permission='create',
275 275 require_csrf=False)
276 276 def reports_create(request):
277 277 """
278 278 Endpoint for exception and slowness reports
279 279 """
280 280 # route_url('reports')
281 281 application = request.context.resource
282 282 if request.method.upper() == 'OPTIONS':
283 283 return check_cors(request, application)
284 284 else:
285 285 check_cors(request, application, should_return=False)
286 286 params = dict(request.params.copy())
287 287 proto_version = parse_proto(params.get('protocol_version', ''))
288 288 payload = request.unsafe_json_body
289 289 sequence_accepted = request.matched_route.name == 'api_reports'
290 290
291 291 if sequence_accepted:
292 292 schema = ReportListSchema_0_5().bind(
293 293 utcnow=datetime.datetime.utcnow())
294 294 else:
295 295 schema = ReportSchema_0_5().bind(
296 296 utcnow=datetime.datetime.utcnow())
297 297
298 298 deserialized_reports = schema.deserialize(payload)
299 299 if sequence_accepted is False:
300 300 deserialized_reports = [deserialized_reports]
301 301 if deserialized_reports:
302 302 rate_limiting(request, application,
303 303 'per_application_reports_rate_limit',
304 304 len(deserialized_reports))
305 305
306 306 # pprint.pprint(deserialized_reports)
307 307 tasks.add_reports.delay(application.resource_id, params,
308 308 deserialized_reports)
309 309 log.info('REPORT call %s, %s client:%s' % (
310 310 application,
311 311 proto_version,
312 312 request.headers.get('user_agent'))
313 313 )
314 314 return 'OK: Reports accepted'
315 315
316 316
317 317 @view_config(route_name='api_airbrake', renderer='string', permission='create',
318 318 require_csrf=False)
319 319 def airbrake_xml_compat(request):
320 320 """
321 321 Airbrake compatible endpoint for XML reports
322 322 """
323 323 application = request.context.resource
324 324 if request.method.upper() == 'OPTIONS':
325 325 return check_cors(request, application)
326 326 else:
327 327 check_cors(request, application, should_return=False)
328 328
329 params = request.params.copy()
329 params = dict(request.params.copy())
330 330
331 331 error_dict = parse_airbrake_xml(request)
332 332 schema = ReportListSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
333 333 deserialized_reports = schema.deserialize([error_dict])
334 334 rate_limiting(request, application, 'per_application_reports_rate_limit',
335 335 len(deserialized_reports))
336 336
337 337 tasks.add_reports.delay(application.resource_id, params,
338 338 deserialized_reports)
339 339 log.info('%s AIRBRAKE call for application %s, api_ver:%s client:%s' % (
340 340 500, application.resource_name,
341 341 request.params.get('protocol_version', 'unknown'),
342 342 request.headers.get('user_agent'))
343 343 )
344 344 return '<notice><id>no-id</id><url>%s</url></notice>' % \
345 345 request.registry.settings['mailing.app_url']
346 346
347 347
348 348 def decompress_gzip(data):
349 349 try:
350 350 fp = io.StringIO(data)
351 351 with GzipFile(fileobj=fp) as f:
352 352 return f.read()
353 353 except Exception as exc:
354 354 raise
355 355 log.error(exc)
356 356 raise HTTPBadRequest()
357 357
358 358
359 359 def decompress_zlib(data):
360 360 try:
361 361 return zlib.decompress(data)
362 362 except Exception as exc:
363 363 raise
364 364 log.error(exc)
365 365 raise HTTPBadRequest()
366 366
367 367
368 368 def decode_b64(data):
369 369 try:
370 370 return base64.b64decode(data)
371 371 except Exception as exc:
372 372 raise
373 373 log.error(exc)
374 374 raise HTTPBadRequest()
375 375
376 376
377 377 @view_config(route_name='api_sentry', renderer='string', permission='create',
378 378 require_csrf=False)
379 379 @view_config(route_name='api_sentry_slash', renderer='string',
380 380 permission='create', require_csrf=False)
381 381 def sentry_compat(request):
382 382 """
383 383 Sentry compatible endpoint
384 384 """
385 385 application = request.context.resource
386 386 if request.method.upper() == 'OPTIONS':
387 387 return check_cors(request, application)
388 388 else:
389 389 check_cors(request, application, should_return=False)
390 390
391 391 # handle various report encoding
392 392 content_encoding = request.headers.get('Content-Encoding')
393 393 content_type = request.headers.get('Content-Type')
394 394 if content_encoding == 'gzip':
395 395 body = decompress_gzip(request.body)
396 396 elif content_encoding == 'deflate':
397 397 body = decompress_zlib(request.body)
398 398 else:
399 399 body = request.body
400 400 # attempt to fix string before decoding for stupid clients
401 401 if content_type == 'application/x-www-form-urlencoded':
402 402 body = urllib.parse.unquote(body.decode('utf8'))
403 403 check_char = '{' if isinstance(body, str) else b'{'
404 404 if not body.startswith(check_char):
405 405 try:
406 406 body = decode_b64(body)
407 407 body = decompress_zlib(body)
408 408 except Exception as exc:
409 409 log.info(exc)
410 410
411 411 try:
412 412 json_body = json.loads(body.decode('utf8'))
413 413 except ValueError:
414 414 raise JSONException("Incorrect JSON")
415 415
416 416 event, event_type = parse_sentry_event(json_body)
417 417
418 418 if event_type == ParsedSentryEventType.LOG:
419 419 if application.allow_permanent_storage:
420 420 schema = LogSchemaPermanent().bind(
421 421 utcnow=datetime.datetime.utcnow())
422 422 else:
423 423 schema = LogSchema().bind(
424 424 utcnow=datetime.datetime.utcnow())
425 425 deserialized_logs = schema.deserialize(event)
426 426 non_pkey_logs = [deserialized_logs]
427 427 log.debug('%s non-pkey logs received: %s' % (application,
428 428 len(non_pkey_logs)))
429 429 tasks.add_logs.delay(application.resource_id, {}, non_pkey_logs)
430 430 if event_type == ParsedSentryEventType.ERROR_REPORT:
431 431 schema = ReportSchema_0_5().bind(utcnow=datetime.datetime.utcnow())
432 432 deserialized_reports = [schema.deserialize(event)]
433 433 rate_limiting(request, application,
434 434 'per_application_reports_rate_limit',
435 435 len(deserialized_reports))
436 436 tasks.add_reports.delay(application.resource_id, {},
437 437 deserialized_reports)
438 438 return 'OK: Events accepted'
General Comments 0
You need to be logged in to leave comments. Login now