##// END OF EJS Templates
metrics: expose more metrics via statsd...
super-admin -
r4801:80a69a8f default
parent child Browse files
Show More
@@ -1,564 +1,568 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2011-2020 RhodeCode GmbH
3 # Copyright (C) 2011-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import itertools
21 import itertools
22 import logging
22 import logging
23 import sys
23 import sys
24 import types
24 import types
25 import fnmatch
25 import fnmatch
26
26
27 import decorator
27 import decorator
28 import venusian
28 import venusian
29 from collections import OrderedDict
29 from collections import OrderedDict
30
30
31 from pyramid.exceptions import ConfigurationError
31 from pyramid.exceptions import ConfigurationError
32 from pyramid.renderers import render
32 from pyramid.renderers import render
33 from pyramid.response import Response
33 from pyramid.response import Response
34 from pyramid.httpexceptions import HTTPNotFound
34 from pyramid.httpexceptions import HTTPNotFound
35
35
36 from rhodecode.api.exc import (
36 from rhodecode.api.exc import (
37 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
37 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
38 from rhodecode.apps._base import TemplateArgs
38 from rhodecode.apps._base import TemplateArgs
39 from rhodecode.lib.auth import AuthUser
39 from rhodecode.lib.auth import AuthUser
40 from rhodecode.lib.base import get_ip_addr, attach_context_attributes
40 from rhodecode.lib.base import get_ip_addr, attach_context_attributes
41 from rhodecode.lib.exc_tracking import store_exception
41 from rhodecode.lib.exc_tracking import store_exception
42 from rhodecode.lib.ext_json import json
42 from rhodecode.lib.ext_json import json
43 from rhodecode.lib.utils2 import safe_str
43 from rhodecode.lib.utils2 import safe_str
44 from rhodecode.lib.plugins.utils import get_plugin_settings
44 from rhodecode.lib.plugins.utils import get_plugin_settings
45 from rhodecode.model.db import User, UserApiKeys
45 from rhodecode.model.db import User, UserApiKeys
46
46
47 log = logging.getLogger(__name__)
47 log = logging.getLogger(__name__)
48
48
49 DEFAULT_RENDERER = 'jsonrpc_renderer'
49 DEFAULT_RENDERER = 'jsonrpc_renderer'
50 DEFAULT_URL = '/_admin/apiv2'
50 DEFAULT_URL = '/_admin/apiv2'
51
51
52
52
53 def find_methods(jsonrpc_methods, pattern):
53 def find_methods(jsonrpc_methods, pattern):
54 matches = OrderedDict()
54 matches = OrderedDict()
55 if not isinstance(pattern, (list, tuple)):
55 if not isinstance(pattern, (list, tuple)):
56 pattern = [pattern]
56 pattern = [pattern]
57
57
58 for single_pattern in pattern:
58 for single_pattern in pattern:
59 for method_name, method in jsonrpc_methods.items():
59 for method_name, method in jsonrpc_methods.items():
60 if fnmatch.fnmatch(method_name, single_pattern):
60 if fnmatch.fnmatch(method_name, single_pattern):
61 matches[method_name] = method
61 matches[method_name] = method
62 return matches
62 return matches
63
63
64
64
65 class ExtJsonRenderer(object):
65 class ExtJsonRenderer(object):
66 """
66 """
67 Custom renderer that mkaes use of our ext_json lib
67 Custom renderer that mkaes use of our ext_json lib
68
68
69 """
69 """
70
70
71 def __init__(self, serializer=json.dumps, **kw):
71 def __init__(self, serializer=json.dumps, **kw):
72 """ Any keyword arguments will be passed to the ``serializer``
72 """ Any keyword arguments will be passed to the ``serializer``
73 function."""
73 function."""
74 self.serializer = serializer
74 self.serializer = serializer
75 self.kw = kw
75 self.kw = kw
76
76
77 def __call__(self, info):
77 def __call__(self, info):
78 """ Returns a plain JSON-encoded string with content-type
78 """ Returns a plain JSON-encoded string with content-type
79 ``application/json``. The content-type may be overridden by
79 ``application/json``. The content-type may be overridden by
80 setting ``request.response.content_type``."""
80 setting ``request.response.content_type``."""
81
81
82 def _render(value, system):
82 def _render(value, system):
83 request = system.get('request')
83 request = system.get('request')
84 if request is not None:
84 if request is not None:
85 response = request.response
85 response = request.response
86 ct = response.content_type
86 ct = response.content_type
87 if ct == response.default_content_type:
87 if ct == response.default_content_type:
88 response.content_type = 'application/json'
88 response.content_type = 'application/json'
89
89
90 return self.serializer(value, **self.kw)
90 return self.serializer(value, **self.kw)
91
91
92 return _render
92 return _render
93
93
94
94
95 def jsonrpc_response(request, result):
95 def jsonrpc_response(request, result):
96 rpc_id = getattr(request, 'rpc_id', None)
96 rpc_id = getattr(request, 'rpc_id', None)
97 response = request.response
97 response = request.response
98
98
99 # store content_type before render is called
99 # store content_type before render is called
100 ct = response.content_type
100 ct = response.content_type
101
101
102 ret_value = ''
102 ret_value = ''
103 if rpc_id:
103 if rpc_id:
104 ret_value = {
104 ret_value = {
105 'id': rpc_id,
105 'id': rpc_id,
106 'result': result,
106 'result': result,
107 'error': None,
107 'error': None,
108 }
108 }
109
109
110 # fetch deprecation warnings, and store it inside results
110 # fetch deprecation warnings, and store it inside results
111 deprecation = getattr(request, 'rpc_deprecation', None)
111 deprecation = getattr(request, 'rpc_deprecation', None)
112 if deprecation:
112 if deprecation:
113 ret_value['DEPRECATION_WARNING'] = deprecation
113 ret_value['DEPRECATION_WARNING'] = deprecation
114
114
115 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
115 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
116 response.body = safe_str(raw_body, response.charset)
116 response.body = safe_str(raw_body, response.charset)
117
117
118 if ct == response.default_content_type:
118 if ct == response.default_content_type:
119 response.content_type = 'application/json'
119 response.content_type = 'application/json'
120
120
121 return response
121 return response
122
122
123
123
124 def jsonrpc_error(request, message, retid=None, code=None, headers=None):
124 def jsonrpc_error(request, message, retid=None, code=None, headers=None):
125 """
125 """
126 Generate a Response object with a JSON-RPC error body
126 Generate a Response object with a JSON-RPC error body
127
127
128 :param code:
128 :param code:
129 :param retid:
129 :param retid:
130 :param message:
130 :param message:
131 """
131 """
132 err_dict = {'id': retid, 'result': None, 'error': message}
132 err_dict = {'id': retid, 'result': None, 'error': message}
133 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
133 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
134
134
135 return Response(
135 return Response(
136 body=body,
136 body=body,
137 status=code,
137 status=code,
138 content_type='application/json',
138 content_type='application/json',
139 headerlist=headers
139 headerlist=headers
140 )
140 )
141
141
142
142
143 def exception_view(exc, request):
143 def exception_view(exc, request):
144 rpc_id = getattr(request, 'rpc_id', None)
144 rpc_id = getattr(request, 'rpc_id', None)
145
145
146 if isinstance(exc, JSONRPCError):
146 if isinstance(exc, JSONRPCError):
147 fault_message = safe_str(exc.message)
147 fault_message = safe_str(exc.message)
148 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
148 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
149 elif isinstance(exc, JSONRPCValidationError):
149 elif isinstance(exc, JSONRPCValidationError):
150 colander_exc = exc.colander_exception
150 colander_exc = exc.colander_exception
151 # TODO(marcink): think maybe of nicer way to serialize errors ?
151 # TODO(marcink): think maybe of nicer way to serialize errors ?
152 fault_message = colander_exc.asdict()
152 fault_message = colander_exc.asdict()
153 log.debug('json-rpc colander error rpc_id:%s "%s"', rpc_id, fault_message)
153 log.debug('json-rpc colander error rpc_id:%s "%s"', rpc_id, fault_message)
154 elif isinstance(exc, JSONRPCForbidden):
154 elif isinstance(exc, JSONRPCForbidden):
155 fault_message = 'Access was denied to this resource.'
155 fault_message = 'Access was denied to this resource.'
156 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
156 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
157 elif isinstance(exc, HTTPNotFound):
157 elif isinstance(exc, HTTPNotFound):
158 method = request.rpc_method
158 method = request.rpc_method
159 log.debug('json-rpc method `%s` not found in list of '
159 log.debug('json-rpc method `%s` not found in list of '
160 'api calls: %s, rpc_id:%s',
160 'api calls: %s, rpc_id:%s',
161 method, request.registry.jsonrpc_methods.keys(), rpc_id)
161 method, request.registry.jsonrpc_methods.keys(), rpc_id)
162
162
163 similar = 'none'
163 similar = 'none'
164 try:
164 try:
165 similar_paterns = ['*{}*'.format(x) for x in method.split('_')]
165 similar_paterns = ['*{}*'.format(x) for x in method.split('_')]
166 similar_found = find_methods(
166 similar_found = find_methods(
167 request.registry.jsonrpc_methods, similar_paterns)
167 request.registry.jsonrpc_methods, similar_paterns)
168 similar = ', '.join(similar_found.keys()) or similar
168 similar = ', '.join(similar_found.keys()) or similar
169 except Exception:
169 except Exception:
170 # make the whole above block safe
170 # make the whole above block safe
171 pass
171 pass
172
172
173 fault_message = "No such method: {}. Similar methods: {}".format(
173 fault_message = "No such method: {}. Similar methods: {}".format(
174 method, similar)
174 method, similar)
175 else:
175 else:
176 fault_message = 'undefined error'
176 fault_message = 'undefined error'
177 exc_info = exc.exc_info()
177 exc_info = exc.exc_info()
178 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
178 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
179
179
180 statsd = request.registry.statsd
181 if statsd:
182 statsd.incr('rhodecode_exception', tags=["api"])
183
180 return jsonrpc_error(request, fault_message, rpc_id)
184 return jsonrpc_error(request, fault_message, rpc_id)
181
185
182
186
183 def request_view(request):
187 def request_view(request):
184 """
188 """
185 Main request handling method. It handles all logic to call a specific
189 Main request handling method. It handles all logic to call a specific
186 exposed method
190 exposed method
187 """
191 """
188 # cython compatible inspect
192 # cython compatible inspect
189 from rhodecode.config.patches import inspect_getargspec
193 from rhodecode.config.patches import inspect_getargspec
190 inspect = inspect_getargspec()
194 inspect = inspect_getargspec()
191
195
192 # check if we can find this session using api_key, get_by_auth_token
196 # check if we can find this session using api_key, get_by_auth_token
193 # search not expired tokens only
197 # search not expired tokens only
194 try:
198 try:
195 api_user = User.get_by_auth_token(request.rpc_api_key)
199 api_user = User.get_by_auth_token(request.rpc_api_key)
196
200
197 if api_user is None:
201 if api_user is None:
198 return jsonrpc_error(
202 return jsonrpc_error(
199 request, retid=request.rpc_id, message='Invalid API KEY')
203 request, retid=request.rpc_id, message='Invalid API KEY')
200
204
201 if not api_user.active:
205 if not api_user.active:
202 return jsonrpc_error(
206 return jsonrpc_error(
203 request, retid=request.rpc_id,
207 request, retid=request.rpc_id,
204 message='Request from this user not allowed')
208 message='Request from this user not allowed')
205
209
206 # check if we are allowed to use this IP
210 # check if we are allowed to use this IP
207 auth_u = AuthUser(
211 auth_u = AuthUser(
208 api_user.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
212 api_user.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
209 if not auth_u.ip_allowed:
213 if not auth_u.ip_allowed:
210 return jsonrpc_error(
214 return jsonrpc_error(
211 request, retid=request.rpc_id,
215 request, retid=request.rpc_id,
212 message='Request from IP:%s not allowed' % (
216 message='Request from IP:%s not allowed' % (
213 request.rpc_ip_addr,))
217 request.rpc_ip_addr,))
214 else:
218 else:
215 log.info('Access for IP:%s allowed', request.rpc_ip_addr)
219 log.info('Access for IP:%s allowed', request.rpc_ip_addr)
216
220
217 # register our auth-user
221 # register our auth-user
218 request.rpc_user = auth_u
222 request.rpc_user = auth_u
219 request.environ['rc_auth_user_id'] = auth_u.user_id
223 request.environ['rc_auth_user_id'] = auth_u.user_id
220
224
221 # now check if token is valid for API
225 # now check if token is valid for API
222 auth_token = request.rpc_api_key
226 auth_token = request.rpc_api_key
223 token_match = api_user.authenticate_by_token(
227 token_match = api_user.authenticate_by_token(
224 auth_token, roles=[UserApiKeys.ROLE_API])
228 auth_token, roles=[UserApiKeys.ROLE_API])
225 invalid_token = not token_match
229 invalid_token = not token_match
226
230
227 log.debug('Checking if API KEY is valid with proper role')
231 log.debug('Checking if API KEY is valid with proper role')
228 if invalid_token:
232 if invalid_token:
229 return jsonrpc_error(
233 return jsonrpc_error(
230 request, retid=request.rpc_id,
234 request, retid=request.rpc_id,
231 message='API KEY invalid or, has bad role for an API call')
235 message='API KEY invalid or, has bad role for an API call')
232
236
233 except Exception:
237 except Exception:
234 log.exception('Error on API AUTH')
238 log.exception('Error on API AUTH')
235 return jsonrpc_error(
239 return jsonrpc_error(
236 request, retid=request.rpc_id, message='Invalid API KEY')
240 request, retid=request.rpc_id, message='Invalid API KEY')
237
241
238 method = request.rpc_method
242 method = request.rpc_method
239 func = request.registry.jsonrpc_methods[method]
243 func = request.registry.jsonrpc_methods[method]
240
244
241 # now that we have a method, add request._req_params to
245 # now that we have a method, add request._req_params to
242 # self.kargs and dispatch control to WGIController
246 # self.kargs and dispatch control to WGIController
243 argspec = inspect.getargspec(func)
247 argspec = inspect.getargspec(func)
244 arglist = argspec[0]
248 arglist = argspec[0]
245 defaults = map(type, argspec[3] or [])
249 defaults = map(type, argspec[3] or [])
246 default_empty = types.NotImplementedType
250 default_empty = types.NotImplementedType
247
251
248 # kw arguments required by this method
252 # kw arguments required by this method
249 func_kwargs = dict(itertools.izip_longest(
253 func_kwargs = dict(itertools.izip_longest(
250 reversed(arglist), reversed(defaults), fillvalue=default_empty))
254 reversed(arglist), reversed(defaults), fillvalue=default_empty))
251
255
252 # This attribute will need to be first param of a method that uses
256 # This attribute will need to be first param of a method that uses
253 # api_key, which is translated to instance of user at that name
257 # api_key, which is translated to instance of user at that name
254 user_var = 'apiuser'
258 user_var = 'apiuser'
255 request_var = 'request'
259 request_var = 'request'
256
260
257 for arg in [user_var, request_var]:
261 for arg in [user_var, request_var]:
258 if arg not in arglist:
262 if arg not in arglist:
259 return jsonrpc_error(
263 return jsonrpc_error(
260 request,
264 request,
261 retid=request.rpc_id,
265 retid=request.rpc_id,
262 message='This method [%s] does not support '
266 message='This method [%s] does not support '
263 'required parameter `%s`' % (func.__name__, arg))
267 'required parameter `%s`' % (func.__name__, arg))
264
268
265 # get our arglist and check if we provided them as args
269 # get our arglist and check if we provided them as args
266 for arg, default in func_kwargs.items():
270 for arg, default in func_kwargs.items():
267 if arg in [user_var, request_var]:
271 if arg in [user_var, request_var]:
268 # user_var and request_var are pre-hardcoded parameters and we
272 # user_var and request_var are pre-hardcoded parameters and we
269 # don't need to do any translation
273 # don't need to do any translation
270 continue
274 continue
271
275
272 # skip the required param check if it's default value is
276 # skip the required param check if it's default value is
273 # NotImplementedType (default_empty)
277 # NotImplementedType (default_empty)
274 if default == default_empty and arg not in request.rpc_params:
278 if default == default_empty and arg not in request.rpc_params:
275 return jsonrpc_error(
279 return jsonrpc_error(
276 request,
280 request,
277 retid=request.rpc_id,
281 retid=request.rpc_id,
278 message=('Missing non optional `%s` arg in JSON DATA' % arg)
282 message=('Missing non optional `%s` arg in JSON DATA' % arg)
279 )
283 )
280
284
281 # sanitize extra passed arguments
285 # sanitize extra passed arguments
282 for k in request.rpc_params.keys()[:]:
286 for k in request.rpc_params.keys()[:]:
283 if k not in func_kwargs:
287 if k not in func_kwargs:
284 del request.rpc_params[k]
288 del request.rpc_params[k]
285
289
286 call_params = request.rpc_params
290 call_params = request.rpc_params
287 call_params.update({
291 call_params.update({
288 'request': request,
292 'request': request,
289 'apiuser': auth_u
293 'apiuser': auth_u
290 })
294 })
291
295
292 # register some common functions for usage
296 # register some common functions for usage
293 attach_context_attributes(TemplateArgs(), request, request.rpc_user.user_id)
297 attach_context_attributes(TemplateArgs(), request, request.rpc_user.user_id)
294
298
295 try:
299 try:
296 ret_value = func(**call_params)
300 ret_value = func(**call_params)
297 return jsonrpc_response(request, ret_value)
301 return jsonrpc_response(request, ret_value)
298 except JSONRPCBaseError:
302 except JSONRPCBaseError:
299 raise
303 raise
300 except Exception:
304 except Exception:
301 log.exception('Unhandled exception occurred on api call: %s', func)
305 log.exception('Unhandled exception occurred on api call: %s', func)
302 exc_info = sys.exc_info()
306 exc_info = sys.exc_info()
303 exc_id, exc_type_name = store_exception(
307 exc_id, exc_type_name = store_exception(
304 id(exc_info), exc_info, prefix='rhodecode-api')
308 id(exc_info), exc_info, prefix='rhodecode-api')
305 error_headers = [('RhodeCode-Exception-Id', str(exc_id)),
309 error_headers = [('RhodeCode-Exception-Id', str(exc_id)),
306 ('RhodeCode-Exception-Type', str(exc_type_name))]
310 ('RhodeCode-Exception-Type', str(exc_type_name))]
307 return jsonrpc_error(
311 return jsonrpc_error(
308 request, retid=request.rpc_id, message='Internal server error',
312 request, retid=request.rpc_id, message='Internal server error',
309 headers=error_headers)
313 headers=error_headers)
310
314
311
315
312 def setup_request(request):
316 def setup_request(request):
313 """
317 """
314 Parse a JSON-RPC request body. It's used inside the predicates method
318 Parse a JSON-RPC request body. It's used inside the predicates method
315 to validate and bootstrap requests for usage in rpc calls.
319 to validate and bootstrap requests for usage in rpc calls.
316
320
317 We need to raise JSONRPCError here if we want to return some errors back to
321 We need to raise JSONRPCError here if we want to return some errors back to
318 user.
322 user.
319 """
323 """
320
324
321 log.debug('Executing setup request: %r', request)
325 log.debug('Executing setup request: %r', request)
322 request.rpc_ip_addr = get_ip_addr(request.environ)
326 request.rpc_ip_addr = get_ip_addr(request.environ)
323 # TODO(marcink): deprecate GET at some point
327 # TODO(marcink): deprecate GET at some point
324 if request.method not in ['POST', 'GET']:
328 if request.method not in ['POST', 'GET']:
325 log.debug('unsupported request method "%s"', request.method)
329 log.debug('unsupported request method "%s"', request.method)
326 raise JSONRPCError(
330 raise JSONRPCError(
327 'unsupported request method "%s". Please use POST' % request.method)
331 'unsupported request method "%s". Please use POST' % request.method)
328
332
329 if 'CONTENT_LENGTH' not in request.environ:
333 if 'CONTENT_LENGTH' not in request.environ:
330 log.debug("No Content-Length")
334 log.debug("No Content-Length")
331 raise JSONRPCError("Empty body, No Content-Length in request")
335 raise JSONRPCError("Empty body, No Content-Length in request")
332
336
333 else:
337 else:
334 length = request.environ['CONTENT_LENGTH']
338 length = request.environ['CONTENT_LENGTH']
335 log.debug('Content-Length: %s', length)
339 log.debug('Content-Length: %s', length)
336
340
337 if length == 0:
341 if length == 0:
338 log.debug("Content-Length is 0")
342 log.debug("Content-Length is 0")
339 raise JSONRPCError("Content-Length is 0")
343 raise JSONRPCError("Content-Length is 0")
340
344
341 raw_body = request.body
345 raw_body = request.body
342 log.debug("Loading JSON body now")
346 log.debug("Loading JSON body now")
343 try:
347 try:
344 json_body = json.loads(raw_body)
348 json_body = json.loads(raw_body)
345 except ValueError as e:
349 except ValueError as e:
346 # catch JSON errors Here
350 # catch JSON errors Here
347 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
351 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
348
352
349 request.rpc_id = json_body.get('id')
353 request.rpc_id = json_body.get('id')
350 request.rpc_method = json_body.get('method')
354 request.rpc_method = json_body.get('method')
351
355
352 # check required base parameters
356 # check required base parameters
353 try:
357 try:
354 api_key = json_body.get('api_key')
358 api_key = json_body.get('api_key')
355 if not api_key:
359 if not api_key:
356 api_key = json_body.get('auth_token')
360 api_key = json_body.get('auth_token')
357
361
358 if not api_key:
362 if not api_key:
359 raise KeyError('api_key or auth_token')
363 raise KeyError('api_key or auth_token')
360
364
361 # TODO(marcink): support passing in token in request header
365 # TODO(marcink): support passing in token in request header
362
366
363 request.rpc_api_key = api_key
367 request.rpc_api_key = api_key
364 request.rpc_id = json_body['id']
368 request.rpc_id = json_body['id']
365 request.rpc_method = json_body['method']
369 request.rpc_method = json_body['method']
366 request.rpc_params = json_body['args'] \
370 request.rpc_params = json_body['args'] \
367 if isinstance(json_body['args'], dict) else {}
371 if isinstance(json_body['args'], dict) else {}
368
372
369 log.debug('method: %s, params: %.10240r', request.rpc_method, request.rpc_params)
373 log.debug('method: %s, params: %.10240r', request.rpc_method, request.rpc_params)
370 except KeyError as e:
374 except KeyError as e:
371 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
375 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
372
376
373 log.debug('setup complete, now handling method:%s rpcid:%s',
377 log.debug('setup complete, now handling method:%s rpcid:%s',
374 request.rpc_method, request.rpc_id, )
378 request.rpc_method, request.rpc_id, )
375
379
376
380
377 class RoutePredicate(object):
381 class RoutePredicate(object):
378 def __init__(self, val, config):
382 def __init__(self, val, config):
379 self.val = val
383 self.val = val
380
384
381 def text(self):
385 def text(self):
382 return 'jsonrpc route = %s' % self.val
386 return 'jsonrpc route = %s' % self.val
383
387
384 phash = text
388 phash = text
385
389
386 def __call__(self, info, request):
390 def __call__(self, info, request):
387 if self.val:
391 if self.val:
388 # potentially setup and bootstrap our call
392 # potentially setup and bootstrap our call
389 setup_request(request)
393 setup_request(request)
390
394
391 # Always return True so that even if it isn't a valid RPC it
395 # Always return True so that even if it isn't a valid RPC it
392 # will fall through to the underlaying handlers like notfound_view
396 # will fall through to the underlaying handlers like notfound_view
393 return True
397 return True
394
398
395
399
396 class NotFoundPredicate(object):
400 class NotFoundPredicate(object):
397 def __init__(self, val, config):
401 def __init__(self, val, config):
398 self.val = val
402 self.val = val
399 self.methods = config.registry.jsonrpc_methods
403 self.methods = config.registry.jsonrpc_methods
400
404
401 def text(self):
405 def text(self):
402 return 'jsonrpc method not found = {}.'.format(self.val)
406 return 'jsonrpc method not found = {}.'.format(self.val)
403
407
404 phash = text
408 phash = text
405
409
406 def __call__(self, info, request):
410 def __call__(self, info, request):
407 return hasattr(request, 'rpc_method')
411 return hasattr(request, 'rpc_method')
408
412
409
413
410 class MethodPredicate(object):
414 class MethodPredicate(object):
411 def __init__(self, val, config):
415 def __init__(self, val, config):
412 self.method = val
416 self.method = val
413
417
414 def text(self):
418 def text(self):
415 return 'jsonrpc method = %s' % self.method
419 return 'jsonrpc method = %s' % self.method
416
420
417 phash = text
421 phash = text
418
422
419 def __call__(self, context, request):
423 def __call__(self, context, request):
420 # we need to explicitly return False here, so pyramid doesn't try to
424 # we need to explicitly return False here, so pyramid doesn't try to
421 # execute our view directly. We need our main handler to execute things
425 # execute our view directly. We need our main handler to execute things
422 return getattr(request, 'rpc_method') == self.method
426 return getattr(request, 'rpc_method') == self.method
423
427
424
428
425 def add_jsonrpc_method(config, view, **kwargs):
429 def add_jsonrpc_method(config, view, **kwargs):
426 # pop the method name
430 # pop the method name
427 method = kwargs.pop('method', None)
431 method = kwargs.pop('method', None)
428
432
429 if method is None:
433 if method is None:
430 raise ConfigurationError(
434 raise ConfigurationError(
431 'Cannot register a JSON-RPC method without specifying the "method"')
435 'Cannot register a JSON-RPC method without specifying the "method"')
432
436
433 # we define custom predicate, to enable to detect conflicting methods,
437 # we define custom predicate, to enable to detect conflicting methods,
434 # those predicates are kind of "translation" from the decorator variables
438 # those predicates are kind of "translation" from the decorator variables
435 # to internal predicates names
439 # to internal predicates names
436
440
437 kwargs['jsonrpc_method'] = method
441 kwargs['jsonrpc_method'] = method
438
442
439 # register our view into global view store for validation
443 # register our view into global view store for validation
440 config.registry.jsonrpc_methods[method] = view
444 config.registry.jsonrpc_methods[method] = view
441
445
442 # we're using our main request_view handler, here, so each method
446 # we're using our main request_view handler, here, so each method
443 # has a unified handler for itself
447 # has a unified handler for itself
444 config.add_view(request_view, route_name='apiv2', **kwargs)
448 config.add_view(request_view, route_name='apiv2', **kwargs)
445
449
446
450
447 class jsonrpc_method(object):
451 class jsonrpc_method(object):
448 """
452 """
449 decorator that works similar to @add_view_config decorator,
453 decorator that works similar to @add_view_config decorator,
450 but tailored for our JSON RPC
454 but tailored for our JSON RPC
451 """
455 """
452
456
453 venusian = venusian # for testing injection
457 venusian = venusian # for testing injection
454
458
455 def __init__(self, method=None, **kwargs):
459 def __init__(self, method=None, **kwargs):
456 self.method = method
460 self.method = method
457 self.kwargs = kwargs
461 self.kwargs = kwargs
458
462
459 def __call__(self, wrapped):
463 def __call__(self, wrapped):
460 kwargs = self.kwargs.copy()
464 kwargs = self.kwargs.copy()
461 kwargs['method'] = self.method or wrapped.__name__
465 kwargs['method'] = self.method or wrapped.__name__
462 depth = kwargs.pop('_depth', 0)
466 depth = kwargs.pop('_depth', 0)
463
467
464 def callback(context, name, ob):
468 def callback(context, name, ob):
465 config = context.config.with_package(info.module)
469 config = context.config.with_package(info.module)
466 config.add_jsonrpc_method(view=ob, **kwargs)
470 config.add_jsonrpc_method(view=ob, **kwargs)
467
471
468 info = venusian.attach(wrapped, callback, category='pyramid',
472 info = venusian.attach(wrapped, callback, category='pyramid',
469 depth=depth + 1)
473 depth=depth + 1)
470 if info.scope == 'class':
474 if info.scope == 'class':
471 # ensure that attr is set if decorating a class method
475 # ensure that attr is set if decorating a class method
472 kwargs.setdefault('attr', wrapped.__name__)
476 kwargs.setdefault('attr', wrapped.__name__)
473
477
474 kwargs['_info'] = info.codeinfo # fbo action_method
478 kwargs['_info'] = info.codeinfo # fbo action_method
475 return wrapped
479 return wrapped
476
480
477
481
478 class jsonrpc_deprecated_method(object):
482 class jsonrpc_deprecated_method(object):
479 """
483 """
480 Marks method as deprecated, adds log.warning, and inject special key to
484 Marks method as deprecated, adds log.warning, and inject special key to
481 the request variable to mark method as deprecated.
485 the request variable to mark method as deprecated.
482 Also injects special docstring that extract_docs will catch to mark
486 Also injects special docstring that extract_docs will catch to mark
483 method as deprecated.
487 method as deprecated.
484
488
485 :param use_method: specify which method should be used instead of
489 :param use_method: specify which method should be used instead of
486 the decorated one
490 the decorated one
487
491
488 Use like::
492 Use like::
489
493
490 @jsonrpc_method()
494 @jsonrpc_method()
491 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
495 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
492 def old_func(request, apiuser, arg1, arg2):
496 def old_func(request, apiuser, arg1, arg2):
493 ...
497 ...
494 """
498 """
495
499
496 def __init__(self, use_method, deprecated_at_version):
500 def __init__(self, use_method, deprecated_at_version):
497 self.use_method = use_method
501 self.use_method = use_method
498 self.deprecated_at_version = deprecated_at_version
502 self.deprecated_at_version = deprecated_at_version
499 self.deprecated_msg = ''
503 self.deprecated_msg = ''
500
504
501 def __call__(self, func):
505 def __call__(self, func):
502 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
506 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
503 method=self.use_method)
507 method=self.use_method)
504
508
505 docstring = """\n
509 docstring = """\n
506 .. deprecated:: {version}
510 .. deprecated:: {version}
507
511
508 {deprecation_message}
512 {deprecation_message}
509
513
510 {original_docstring}
514 {original_docstring}
511 """
515 """
512 func.__doc__ = docstring.format(
516 func.__doc__ = docstring.format(
513 version=self.deprecated_at_version,
517 version=self.deprecated_at_version,
514 deprecation_message=self.deprecated_msg,
518 deprecation_message=self.deprecated_msg,
515 original_docstring=func.__doc__)
519 original_docstring=func.__doc__)
516 return decorator.decorator(self.__wrapper, func)
520 return decorator.decorator(self.__wrapper, func)
517
521
518 def __wrapper(self, func, *fargs, **fkwargs):
522 def __wrapper(self, func, *fargs, **fkwargs):
519 log.warning('DEPRECATED API CALL on function %s, please '
523 log.warning('DEPRECATED API CALL on function %s, please '
520 'use `%s` instead', func, self.use_method)
524 'use `%s` instead', func, self.use_method)
521 # alter function docstring to mark as deprecated, this is picked up
525 # alter function docstring to mark as deprecated, this is picked up
522 # via fabric file that generates API DOC.
526 # via fabric file that generates API DOC.
523 result = func(*fargs, **fkwargs)
527 result = func(*fargs, **fkwargs)
524
528
525 request = fargs[0]
529 request = fargs[0]
526 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
530 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
527 return result
531 return result
528
532
529
533
530 def add_api_methods(config):
534 def add_api_methods(config):
531 from rhodecode.api.views import (
535 from rhodecode.api.views import (
532 deprecated_api, gist_api, pull_request_api, repo_api, repo_group_api,
536 deprecated_api, gist_api, pull_request_api, repo_api, repo_group_api,
533 server_api, search_api, testing_api, user_api, user_group_api)
537 server_api, search_api, testing_api, user_api, user_group_api)
534
538
535 config.scan('rhodecode.api.views')
539 config.scan('rhodecode.api.views')
536
540
537
541
538 def includeme(config):
542 def includeme(config):
539 plugin_module = 'rhodecode.api'
543 plugin_module = 'rhodecode.api'
540 plugin_settings = get_plugin_settings(
544 plugin_settings = get_plugin_settings(
541 plugin_module, config.registry.settings)
545 plugin_module, config.registry.settings)
542
546
543 if not hasattr(config.registry, 'jsonrpc_methods'):
547 if not hasattr(config.registry, 'jsonrpc_methods'):
544 config.registry.jsonrpc_methods = OrderedDict()
548 config.registry.jsonrpc_methods = OrderedDict()
545
549
546 # match filter by given method only
550 # match filter by given method only
547 config.add_view_predicate('jsonrpc_method', MethodPredicate)
551 config.add_view_predicate('jsonrpc_method', MethodPredicate)
548 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
552 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
549
553
550 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
554 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
551 serializer=json.dumps, indent=4))
555 serializer=json.dumps, indent=4))
552 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
556 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
553
557
554 config.add_route_predicate(
558 config.add_route_predicate(
555 'jsonrpc_call', RoutePredicate)
559 'jsonrpc_call', RoutePredicate)
556
560
557 config.add_route(
561 config.add_route(
558 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
562 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
559
563
560 # register some exception handling view
564 # register some exception handling view
561 config.add_view(exception_view, context=JSONRPCBaseError)
565 config.add_view(exception_view, context=JSONRPCBaseError)
562 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
566 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
563
567
564 add_api_methods(config)
568 add_api_methods(config)
@@ -1,306 +1,310 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2020 RhodeCode GmbH
3 # Copyright (C) 2010-2020 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31 import importlib
31 import importlib
32
32
33 from celery import Celery
33 from celery import Celery
34 from celery import signals
34 from celery import signals
35 from celery import Task
35 from celery import Task
36 from celery import exceptions # pragma: no cover
36 from celery import exceptions # pragma: no cover
37 from kombu.serialization import register
37 from kombu.serialization import register
38 from pyramid.threadlocal import get_current_request
38 from pyramid.threadlocal import get_current_request
39
39
40 import rhodecode
40 import rhodecode
41
41
42 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars, ping_db
44 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.model import meta
47 from rhodecode.model import meta
48
48
49
49
50 register('json_ext', json.dumps, json.loads,
50 register('json_ext', json.dumps, json.loads,
51 content_type='application/x-json-ext',
51 content_type='application/x-json-ext',
52 content_encoding='utf-8')
52 content_encoding='utf-8')
53
53
54 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
55
55
56
56
57 def add_preload_arguments(parser):
57 def add_preload_arguments(parser):
58 parser.add_argument(
58 parser.add_argument(
59 '--ini', default=None,
59 '--ini', default=None,
60 help='Path to ini configuration file.'
60 help='Path to ini configuration file.'
61 )
61 )
62 parser.add_argument(
62 parser.add_argument(
63 '--ini-var', default=None,
63 '--ini-var', default=None,
64 help='Comma separated list of key=value to pass to ini.'
64 help='Comma separated list of key=value to pass to ini.'
65 )
65 )
66
66
67
67
68 def get_logger(obj):
68 def get_logger(obj):
69 custom_log = logging.getLogger(
69 custom_log = logging.getLogger(
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71
71
72 if rhodecode.CELERY_ENABLED:
72 if rhodecode.CELERY_ENABLED:
73 try:
73 try:
74 custom_log = obj.get_logger()
74 custom_log = obj.get_logger()
75 except Exception:
75 except Exception:
76 pass
76 pass
77
77
78 return custom_log
78 return custom_log
79
79
80
80
81 imports = ['rhodecode.lib.celerylib.tasks']
81 imports = ['rhodecode.lib.celerylib.tasks']
82
82
83 try:
83 try:
84 # try if we have EE tasks available
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
87 except ImportError:
88 pass
88 pass
89
89
90
90
91 base_celery_config = {
91 base_celery_config = {
92 'result_backend': 'rpc://',
92 'result_backend': 'rpc://',
93 'result_expires': 60 * 60 * 24,
93 'result_expires': 60 * 60 * 24,
94 'result_persistent': True,
94 'result_persistent': True,
95 'imports': imports,
95 'imports': imports,
96 'worker_max_tasks_per_child': 100,
96 'worker_max_tasks_per_child': 100,
97 'accept_content': ['json_ext'],
97 'accept_content': ['json_ext'],
98 'task_serializer': 'json_ext',
98 'task_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
100 'worker_hijack_root_logger': False,
100 'worker_hijack_root_logger': False,
101 'database_table_names': {
101 'database_table_names': {
102 'task': 'beat_taskmeta',
102 'task': 'beat_taskmeta',
103 'group': 'beat_groupmeta',
103 'group': 'beat_groupmeta',
104 }
104 }
105 }
105 }
106 # init main celery app
106 # init main celery app
107 celery_app = Celery()
107 celery_app = Celery()
108 celery_app.user_options['preload'].add(add_preload_arguments)
108 celery_app.user_options['preload'].add(add_preload_arguments)
109 ini_file_glob = None
109 ini_file_glob = None
110
110
111
111
112 @signals.setup_logging.connect
112 @signals.setup_logging.connect
113 def setup_logging_callback(**kwargs):
113 def setup_logging_callback(**kwargs):
114 setup_logging(ini_file_glob)
114 setup_logging(ini_file_glob)
115
115
116
116
117 @signals.user_preload_options.connect
117 @signals.user_preload_options.connect
118 def on_preload_parsed(options, **kwargs):
118 def on_preload_parsed(options, **kwargs):
119 ini_location = options['ini']
119 ini_location = options['ini']
120 ini_vars = options['ini_var']
120 ini_vars = options['ini_var']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
122
122
123 if ini_location is None:
123 if ini_location is None:
124 print('You must provide the paste --ini argument')
124 print('You must provide the paste --ini argument')
125 exit(-1)
125 exit(-1)
126
126
127 options = None
127 options = None
128 if ini_vars is not None:
128 if ini_vars is not None:
129 options = parse_ini_vars(ini_vars)
129 options = parse_ini_vars(ini_vars)
130
130
131 global ini_file_glob
131 global ini_file_glob
132 ini_file_glob = ini_location
132 ini_file_glob = ini_location
133
133
134 log.debug('Bootstrapping RhodeCode application...')
134 log.debug('Bootstrapping RhodeCode application...')
135 env = bootstrap(ini_location, options=options)
135 env = bootstrap(ini_location, options=options)
136
136
137 setup_celery_app(
137 setup_celery_app(
138 app=env['app'], root=env['root'], request=env['request'],
138 app=env['app'], root=env['root'], request=env['request'],
139 registry=env['registry'], closer=env['closer'],
139 registry=env['registry'], closer=env['closer'],
140 ini_location=ini_location)
140 ini_location=ini_location)
141
141
142 # fix the global flag even if it's disabled via .ini file because this
142 # fix the global flag even if it's disabled via .ini file because this
143 # is a worker code that doesn't need this to be disabled.
143 # is a worker code that doesn't need this to be disabled.
144 rhodecode.CELERY_ENABLED = True
144 rhodecode.CELERY_ENABLED = True
145
145
146
146
147 @signals.task_prerun.connect
147 @signals.task_prerun.connect
148 def task_prerun_signal(task_id, task, args, **kwargs):
148 def task_prerun_signal(task_id, task, args, **kwargs):
149 ping_db()
149 ping_db()
150
150
151
151
152 @signals.task_success.connect
152 @signals.task_success.connect
153 def task_success_signal(result, **kwargs):
153 def task_success_signal(result, **kwargs):
154 meta.Session.commit()
154 meta.Session.commit()
155 closer = celery_app.conf['PYRAMID_CLOSER']
155 closer = celery_app.conf['PYRAMID_CLOSER']
156 if closer:
156 if closer:
157 closer()
157 closer()
158
158
159
159
160 @signals.task_retry.connect
160 @signals.task_retry.connect
161 def task_retry_signal(
161 def task_retry_signal(
162 request, reason, einfo, **kwargs):
162 request, reason, einfo, **kwargs):
163 meta.Session.remove()
163 meta.Session.remove()
164 closer = celery_app.conf['PYRAMID_CLOSER']
164 closer = celery_app.conf['PYRAMID_CLOSER']
165 if closer:
165 if closer:
166 closer()
166 closer()
167
167
168
168
169 @signals.task_failure.connect
169 @signals.task_failure.connect
170 def task_failure_signal(
170 def task_failure_signal(
171 task_id, exception, args, kwargs, traceback, einfo, **kargs):
171 task_id, exception, args, kwargs, traceback, einfo, **kargs):
172 from rhodecode.lib.exc_tracking import store_exception
172 from rhodecode.lib.exc_tracking import store_exception
173 from rhodecode.lib.statsd_client import StatsdClient
173
174
174 meta.Session.remove()
175 meta.Session.remove()
175
176
176 # simulate sys.exc_info()
177 # simulate sys.exc_info()
177 exc_info = (einfo.type, einfo.exception, einfo.tb)
178 exc_info = (einfo.type, einfo.exception, einfo.tb)
178 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
179 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
180 statsd = StatsdClient.statsd
181 if statsd:
182 statsd.incr('rhodecode_exception', tags=["celery"])
179
183
180 closer = celery_app.conf['PYRAMID_CLOSER']
184 closer = celery_app.conf['PYRAMID_CLOSER']
181 if closer:
185 if closer:
182 closer()
186 closer()
183
187
184
188
185 @signals.task_revoked.connect
189 @signals.task_revoked.connect
186 def task_revoked_signal(
190 def task_revoked_signal(
187 request, terminated, signum, expired, **kwargs):
191 request, terminated, signum, expired, **kwargs):
188 closer = celery_app.conf['PYRAMID_CLOSER']
192 closer = celery_app.conf['PYRAMID_CLOSER']
189 if closer:
193 if closer:
190 closer()
194 closer()
191
195
192
196
193 def setup_celery_app(app, root, request, registry, closer, ini_location):
197 def setup_celery_app(app, root, request, registry, closer, ini_location):
194 ini_dir = os.path.dirname(os.path.abspath(ini_location))
198 ini_dir = os.path.dirname(os.path.abspath(ini_location))
195 celery_config = base_celery_config
199 celery_config = base_celery_config
196 celery_config.update({
200 celery_config.update({
197 # store celerybeat scheduler db where the .ini file is
201 # store celerybeat scheduler db where the .ini file is
198 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
202 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
199 })
203 })
200 ini_settings = get_ini_config(ini_location)
204 ini_settings = get_ini_config(ini_location)
201 log.debug('Got custom celery conf: %s', ini_settings)
205 log.debug('Got custom celery conf: %s', ini_settings)
202
206
203 celery_config.update(ini_settings)
207 celery_config.update(ini_settings)
204 celery_app.config_from_object(celery_config)
208 celery_app.config_from_object(celery_config)
205
209
206 celery_app.conf.update({'PYRAMID_APP': app})
210 celery_app.conf.update({'PYRAMID_APP': app})
207 celery_app.conf.update({'PYRAMID_ROOT': root})
211 celery_app.conf.update({'PYRAMID_ROOT': root})
208 celery_app.conf.update({'PYRAMID_REQUEST': request})
212 celery_app.conf.update({'PYRAMID_REQUEST': request})
209 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
213 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
210 celery_app.conf.update({'PYRAMID_CLOSER': closer})
214 celery_app.conf.update({'PYRAMID_CLOSER': closer})
211
215
212
216
213 def configure_celery(config, ini_location):
217 def configure_celery(config, ini_location):
214 """
218 """
215 Helper that is called from our application creation logic. It gives
219 Helper that is called from our application creation logic. It gives
216 connection info into running webapp and allows execution of tasks from
220 connection info into running webapp and allows execution of tasks from
217 RhodeCode itself
221 RhodeCode itself
218 """
222 """
219 # store some globals into rhodecode
223 # store some globals into rhodecode
220 rhodecode.CELERY_ENABLED = str2bool(
224 rhodecode.CELERY_ENABLED = str2bool(
221 config.registry.settings.get('use_celery'))
225 config.registry.settings.get('use_celery'))
222 if rhodecode.CELERY_ENABLED:
226 if rhodecode.CELERY_ENABLED:
223 log.info('Configuring celery based on `%s` file', ini_location)
227 log.info('Configuring celery based on `%s` file', ini_location)
224 setup_celery_app(
228 setup_celery_app(
225 app=None, root=None, request=None, registry=config.registry,
229 app=None, root=None, request=None, registry=config.registry,
226 closer=None, ini_location=ini_location)
230 closer=None, ini_location=ini_location)
227
231
228
232
229 def maybe_prepare_env(req):
233 def maybe_prepare_env(req):
230 environ = {}
234 environ = {}
231 try:
235 try:
232 environ.update({
236 environ.update({
233 'PATH_INFO': req.environ['PATH_INFO'],
237 'PATH_INFO': req.environ['PATH_INFO'],
234 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
238 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
235 'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
239 'HTTP_HOST':req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
236 'SERVER_NAME': req.environ['SERVER_NAME'],
240 'SERVER_NAME': req.environ['SERVER_NAME'],
237 'SERVER_PORT': req.environ['SERVER_PORT'],
241 'SERVER_PORT': req.environ['SERVER_PORT'],
238 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
242 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
239 })
243 })
240 except Exception:
244 except Exception:
241 pass
245 pass
242
246
243 return environ
247 return environ
244
248
245
249
246 class RequestContextTask(Task):
250 class RequestContextTask(Task):
247 """
251 """
248 This is a celery task which will create a rhodecode app instance context
252 This is a celery task which will create a rhodecode app instance context
249 for the task, patch pyramid with the original request
253 for the task, patch pyramid with the original request
250 that created the task and also add the user to the context.
254 that created the task and also add the user to the context.
251 """
255 """
252
256
253 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
257 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
254 link=None, link_error=None, shadow=None, **options):
258 link=None, link_error=None, shadow=None, **options):
255 """ queue the job to run (we are in web request context here) """
259 """ queue the job to run (we are in web request context here) """
256
260
257 req = get_current_request()
261 req = get_current_request()
258
262
259 # web case
263 # web case
260 if hasattr(req, 'user'):
264 if hasattr(req, 'user'):
261 ip_addr = req.user.ip_addr
265 ip_addr = req.user.ip_addr
262 user_id = req.user.user_id
266 user_id = req.user.user_id
263
267
264 # api case
268 # api case
265 elif hasattr(req, 'rpc_user'):
269 elif hasattr(req, 'rpc_user'):
266 ip_addr = req.rpc_user.ip_addr
270 ip_addr = req.rpc_user.ip_addr
267 user_id = req.rpc_user.user_id
271 user_id = req.rpc_user.user_id
268 else:
272 else:
269 raise Exception(
273 raise Exception(
270 'Unable to fetch required data from request: {}. \n'
274 'Unable to fetch required data from request: {}. \n'
271 'This task is required to be executed from context of '
275 'This task is required to be executed from context of '
272 'request in a webapp'.format(repr(req)))
276 'request in a webapp'.format(repr(req)))
273
277
274 if req:
278 if req:
275 # we hook into kwargs since it is the only way to pass our data to
279 # we hook into kwargs since it is the only way to pass our data to
276 # the celery worker
280 # the celery worker
277 environ = maybe_prepare_env(req)
281 environ = maybe_prepare_env(req)
278 options['headers'] = options.get('headers', {})
282 options['headers'] = options.get('headers', {})
279 options['headers'].update({
283 options['headers'].update({
280 'rhodecode_proxy_data': {
284 'rhodecode_proxy_data': {
281 'environ': environ,
285 'environ': environ,
282 'auth_user': {
286 'auth_user': {
283 'ip_addr': ip_addr,
287 'ip_addr': ip_addr,
284 'user_id': user_id
288 'user_id': user_id
285 },
289 },
286 }
290 }
287 })
291 })
288
292
289 return super(RequestContextTask, self).apply_async(
293 return super(RequestContextTask, self).apply_async(
290 args, kwargs, task_id, producer, link, link_error, shadow, **options)
294 args, kwargs, task_id, producer, link, link_error, shadow, **options)
291
295
292 def __call__(self, *args, **kwargs):
296 def __call__(self, *args, **kwargs):
293 """ rebuild the context and then run task on celery worker """
297 """ rebuild the context and then run task on celery worker """
294
298
295 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
299 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
296 if not proxy_data:
300 if not proxy_data:
297 return super(RequestContextTask, self).__call__(*args, **kwargs)
301 return super(RequestContextTask, self).__call__(*args, **kwargs)
298
302
299 log.debug('using celery proxy data to run task: %r', proxy_data)
303 log.debug('using celery proxy data to run task: %r', proxy_data)
300 # re-inject and register threadlocals for proper routing support
304 # re-inject and register threadlocals for proper routing support
301 request = prepare_request(proxy_data['environ'])
305 request = prepare_request(proxy_data['environ'])
302 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
306 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
303 ip_addr=proxy_data['auth_user']['ip_addr'])
307 ip_addr=proxy_data['auth_user']['ip_addr'])
304
308
305 return super(RequestContextTask, self).__call__(*args, **kwargs)
309 return super(RequestContextTask, self).__call__(*args, **kwargs)
306
310
General Comments 0
You need to be logged in to leave comments. Login now