##// END OF EJS Templates
exc-tracker: store API based exceptions and fix prefixes to no use _
marcink -
r3335:3f5d13e1 default
parent child Browse files
Show More
@@ -1,542 +1,548 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2011-2018 RhodeCode GmbH
3 # Copyright (C) 2011-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20
20
21 import inspect
21 import inspect
22 import itertools
22 import itertools
23 import logging
23 import logging
24 import sys
24 import types
25 import types
25 import fnmatch
26 import fnmatch
26
27
27 import decorator
28 import decorator
28 import venusian
29 import venusian
29 from collections import OrderedDict
30 from collections import OrderedDict
30
31
31 from pyramid.exceptions import ConfigurationError
32 from pyramid.exceptions import ConfigurationError
32 from pyramid.renderers import render
33 from pyramid.renderers import render
33 from pyramid.response import Response
34 from pyramid.response import Response
34 from pyramid.httpexceptions import HTTPNotFound
35 from pyramid.httpexceptions import HTTPNotFound
35
36
36 from rhodecode.api.exc import (
37 from rhodecode.api.exc import (
37 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
38 JSONRPCBaseError, JSONRPCError, JSONRPCForbidden, JSONRPCValidationError)
38 from rhodecode.apps._base import TemplateArgs
39 from rhodecode.apps._base import TemplateArgs
39 from rhodecode.lib.auth import AuthUser
40 from rhodecode.lib.auth import AuthUser
40 from rhodecode.lib.base import get_ip_addr, attach_context_attributes
41 from rhodecode.lib.base import get_ip_addr, attach_context_attributes
42 from rhodecode.lib.exc_tracking import store_exception
41 from rhodecode.lib.ext_json import json
43 from rhodecode.lib.ext_json import json
42 from rhodecode.lib.utils2 import safe_str
44 from rhodecode.lib.utils2 import safe_str
43 from rhodecode.lib.plugins.utils import get_plugin_settings
45 from rhodecode.lib.plugins.utils import get_plugin_settings
44 from rhodecode.model.db import User, UserApiKeys
46 from rhodecode.model.db import User, UserApiKeys
45
47
46 log = logging.getLogger(__name__)
48 log = logging.getLogger(__name__)
47
49
48 DEFAULT_RENDERER = 'jsonrpc_renderer'
50 DEFAULT_RENDERER = 'jsonrpc_renderer'
49 DEFAULT_URL = '/_admin/apiv2'
51 DEFAULT_URL = '/_admin/apiv2'
50
52
51
53
52 def find_methods(jsonrpc_methods, pattern):
54 def find_methods(jsonrpc_methods, pattern):
53 matches = OrderedDict()
55 matches = OrderedDict()
54 if not isinstance(pattern, (list, tuple)):
56 if not isinstance(pattern, (list, tuple)):
55 pattern = [pattern]
57 pattern = [pattern]
56
58
57 for single_pattern in pattern:
59 for single_pattern in pattern:
58 for method_name, method in jsonrpc_methods.items():
60 for method_name, method in jsonrpc_methods.items():
59 if fnmatch.fnmatch(method_name, single_pattern):
61 if fnmatch.fnmatch(method_name, single_pattern):
60 matches[method_name] = method
62 matches[method_name] = method
61 return matches
63 return matches
62
64
63
65
64 class ExtJsonRenderer(object):
66 class ExtJsonRenderer(object):
65 """
67 """
66 Custom renderer that mkaes use of our ext_json lib
68 Custom renderer that mkaes use of our ext_json lib
67
69
68 """
70 """
69
71
70 def __init__(self, serializer=json.dumps, **kw):
72 def __init__(self, serializer=json.dumps, **kw):
71 """ Any keyword arguments will be passed to the ``serializer``
73 """ Any keyword arguments will be passed to the ``serializer``
72 function."""
74 function."""
73 self.serializer = serializer
75 self.serializer = serializer
74 self.kw = kw
76 self.kw = kw
75
77
76 def __call__(self, info):
78 def __call__(self, info):
77 """ Returns a plain JSON-encoded string with content-type
79 """ Returns a plain JSON-encoded string with content-type
78 ``application/json``. The content-type may be overridden by
80 ``application/json``. The content-type may be overridden by
79 setting ``request.response.content_type``."""
81 setting ``request.response.content_type``."""
80
82
81 def _render(value, system):
83 def _render(value, system):
82 request = system.get('request')
84 request = system.get('request')
83 if request is not None:
85 if request is not None:
84 response = request.response
86 response = request.response
85 ct = response.content_type
87 ct = response.content_type
86 if ct == response.default_content_type:
88 if ct == response.default_content_type:
87 response.content_type = 'application/json'
89 response.content_type = 'application/json'
88
90
89 return self.serializer(value, **self.kw)
91 return self.serializer(value, **self.kw)
90
92
91 return _render
93 return _render
92
94
93
95
94 def jsonrpc_response(request, result):
96 def jsonrpc_response(request, result):
95 rpc_id = getattr(request, 'rpc_id', None)
97 rpc_id = getattr(request, 'rpc_id', None)
96 response = request.response
98 response = request.response
97
99
98 # store content_type before render is called
100 # store content_type before render is called
99 ct = response.content_type
101 ct = response.content_type
100
102
101 ret_value = ''
103 ret_value = ''
102 if rpc_id:
104 if rpc_id:
103 ret_value = {
105 ret_value = {
104 'id': rpc_id,
106 'id': rpc_id,
105 'result': result,
107 'result': result,
106 'error': None,
108 'error': None,
107 }
109 }
108
110
109 # fetch deprecation warnings, and store it inside results
111 # fetch deprecation warnings, and store it inside results
110 deprecation = getattr(request, 'rpc_deprecation', None)
112 deprecation = getattr(request, 'rpc_deprecation', None)
111 if deprecation:
113 if deprecation:
112 ret_value['DEPRECATION_WARNING'] = deprecation
114 ret_value['DEPRECATION_WARNING'] = deprecation
113
115
114 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
116 raw_body = render(DEFAULT_RENDERER, ret_value, request=request)
115 response.body = safe_str(raw_body, response.charset)
117 response.body = safe_str(raw_body, response.charset)
116
118
117 if ct == response.default_content_type:
119 if ct == response.default_content_type:
118 response.content_type = 'application/json'
120 response.content_type = 'application/json'
119
121
120 return response
122 return response
121
123
122
124
123 def jsonrpc_error(request, message, retid=None, code=None):
125 def jsonrpc_error(request, message, retid=None, code=None):
124 """
126 """
125 Generate a Response object with a JSON-RPC error body
127 Generate a Response object with a JSON-RPC error body
126
128
127 :param code:
129 :param code:
128 :param retid:
130 :param retid:
129 :param message:
131 :param message:
130 """
132 """
131 err_dict = {'id': retid, 'result': None, 'error': message}
133 err_dict = {'id': retid, 'result': None, 'error': message}
132 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
134 body = render(DEFAULT_RENDERER, err_dict, request=request).encode('utf-8')
133 return Response(
135 return Response(
134 body=body,
136 body=body,
135 status=code,
137 status=code,
136 content_type='application/json'
138 content_type='application/json'
137 )
139 )
138
140
139
141
140 def exception_view(exc, request):
142 def exception_view(exc, request):
141 rpc_id = getattr(request, 'rpc_id', None)
143 rpc_id = getattr(request, 'rpc_id', None)
142
144
143 fault_message = 'undefined error'
144 if isinstance(exc, JSONRPCError):
145 if isinstance(exc, JSONRPCError):
145 fault_message = safe_str(exc.message)
146 fault_message = safe_str(exc.message)
146 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
147 log.debug('json-rpc error rpc_id:%s "%s"', rpc_id, fault_message)
147 elif isinstance(exc, JSONRPCValidationError):
148 elif isinstance(exc, JSONRPCValidationError):
148 colander_exc = exc.colander_exception
149 colander_exc = exc.colander_exception
149 # TODO(marcink): think maybe of nicer way to serialize errors ?
150 # TODO(marcink): think maybe of nicer way to serialize errors ?
150 fault_message = colander_exc.asdict()
151 fault_message = colander_exc.asdict()
151 log.debug('json-rpc colander error rpc_id:%s "%s"', rpc_id, fault_message)
152 log.debug('json-rpc colander error rpc_id:%s "%s"', rpc_id, fault_message)
152 elif isinstance(exc, JSONRPCForbidden):
153 elif isinstance(exc, JSONRPCForbidden):
153 fault_message = 'Access was denied to this resource.'
154 fault_message = 'Access was denied to this resource.'
154 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
155 log.warning('json-rpc forbidden call rpc_id:%s "%s"', rpc_id, fault_message)
155 elif isinstance(exc, HTTPNotFound):
156 elif isinstance(exc, HTTPNotFound):
156 method = request.rpc_method
157 method = request.rpc_method
157 log.debug('json-rpc method `%s` not found in list of '
158 log.debug('json-rpc method `%s` not found in list of '
158 'api calls: %s, rpc_id:%s',
159 'api calls: %s, rpc_id:%s',
159 method, request.registry.jsonrpc_methods.keys(), rpc_id)
160 method, request.registry.jsonrpc_methods.keys(), rpc_id)
160
161
161 similar = 'none'
162 similar = 'none'
162 try:
163 try:
163 similar_paterns = ['*{}*'.format(x) for x in method.split('_')]
164 similar_paterns = ['*{}*'.format(x) for x in method.split('_')]
164 similar_found = find_methods(
165 similar_found = find_methods(
165 request.registry.jsonrpc_methods, similar_paterns)
166 request.registry.jsonrpc_methods, similar_paterns)
166 similar = ', '.join(similar_found.keys()) or similar
167 similar = ', '.join(similar_found.keys()) or similar
167 except Exception:
168 except Exception:
168 # make the whole above block safe
169 # make the whole above block safe
169 pass
170 pass
170
171
171 fault_message = "No such method: {}. Similar methods: {}".format(
172 fault_message = "No such method: {}. Similar methods: {}".format(
172 method, similar)
173 method, similar)
174 else:
175 fault_message = 'undefined error'
176 exc_info = exc.exc_info()
177 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
173
178
174 return jsonrpc_error(request, fault_message, rpc_id)
179 return jsonrpc_error(request, fault_message, rpc_id)
175
180
176
181
177 def request_view(request):
182 def request_view(request):
178 """
183 """
179 Main request handling method. It handles all logic to call a specific
184 Main request handling method. It handles all logic to call a specific
180 exposed method
185 exposed method
181 """
186 """
182
187
183 # check if we can find this session using api_key, get_by_auth_token
188 # check if we can find this session using api_key, get_by_auth_token
184 # search not expired tokens only
189 # search not expired tokens only
185
190
186 try:
191 try:
187 api_user = User.get_by_auth_token(request.rpc_api_key)
192 api_user = User.get_by_auth_token(request.rpc_api_key)
188
193
189 if api_user is None:
194 if api_user is None:
190 return jsonrpc_error(
195 return jsonrpc_error(
191 request, retid=request.rpc_id, message='Invalid API KEY')
196 request, retid=request.rpc_id, message='Invalid API KEY')
192
197
193 if not api_user.active:
198 if not api_user.active:
194 return jsonrpc_error(
199 return jsonrpc_error(
195 request, retid=request.rpc_id,
200 request, retid=request.rpc_id,
196 message='Request from this user not allowed')
201 message='Request from this user not allowed')
197
202
198 # check if we are allowed to use this IP
203 # check if we are allowed to use this IP
199 auth_u = AuthUser(
204 auth_u = AuthUser(
200 api_user.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
205 api_user.user_id, request.rpc_api_key, ip_addr=request.rpc_ip_addr)
201 if not auth_u.ip_allowed:
206 if not auth_u.ip_allowed:
202 return jsonrpc_error(
207 return jsonrpc_error(
203 request, retid=request.rpc_id,
208 request, retid=request.rpc_id,
204 message='Request from IP:%s not allowed' % (
209 message='Request from IP:%s not allowed' % (
205 request.rpc_ip_addr,))
210 request.rpc_ip_addr,))
206 else:
211 else:
207 log.info('Access for IP:%s allowed', request.rpc_ip_addr)
212 log.info('Access for IP:%s allowed', request.rpc_ip_addr)
208
213
209 # register our auth-user
214 # register our auth-user
210 request.rpc_user = auth_u
215 request.rpc_user = auth_u
211 request.environ['rc_auth_user_id'] = auth_u.user_id
216 request.environ['rc_auth_user_id'] = auth_u.user_id
212
217
213 # now check if token is valid for API
218 # now check if token is valid for API
214 auth_token = request.rpc_api_key
219 auth_token = request.rpc_api_key
215 token_match = api_user.authenticate_by_token(
220 token_match = api_user.authenticate_by_token(
216 auth_token, roles=[UserApiKeys.ROLE_API])
221 auth_token, roles=[UserApiKeys.ROLE_API])
217 invalid_token = not token_match
222 invalid_token = not token_match
218
223
219 log.debug('Checking if API KEY is valid with proper role')
224 log.debug('Checking if API KEY is valid with proper role')
220 if invalid_token:
225 if invalid_token:
221 return jsonrpc_error(
226 return jsonrpc_error(
222 request, retid=request.rpc_id,
227 request, retid=request.rpc_id,
223 message='API KEY invalid or, has bad role for an API call')
228 message='API KEY invalid or, has bad role for an API call')
224
229
225 except Exception:
230 except Exception:
226 log.exception('Error on API AUTH')
231 log.exception('Error on API AUTH')
227 return jsonrpc_error(
232 return jsonrpc_error(
228 request, retid=request.rpc_id, message='Invalid API KEY')
233 request, retid=request.rpc_id, message='Invalid API KEY')
229
234
230 method = request.rpc_method
235 method = request.rpc_method
231 func = request.registry.jsonrpc_methods[method]
236 func = request.registry.jsonrpc_methods[method]
232
237
233 # now that we have a method, add request._req_params to
238 # now that we have a method, add request._req_params to
234 # self.kargs and dispatch control to WGIController
239 # self.kargs and dispatch control to WGIController
235 argspec = inspect.getargspec(func)
240 argspec = inspect.getargspec(func)
236 arglist = argspec[0]
241 arglist = argspec[0]
237 defaults = map(type, argspec[3] or [])
242 defaults = map(type, argspec[3] or [])
238 default_empty = types.NotImplementedType
243 default_empty = types.NotImplementedType
239
244
240 # kw arguments required by this method
245 # kw arguments required by this method
241 func_kwargs = dict(itertools.izip_longest(
246 func_kwargs = dict(itertools.izip_longest(
242 reversed(arglist), reversed(defaults), fillvalue=default_empty))
247 reversed(arglist), reversed(defaults), fillvalue=default_empty))
243
248
244 # This attribute will need to be first param of a method that uses
249 # This attribute will need to be first param of a method that uses
245 # api_key, which is translated to instance of user at that name
250 # api_key, which is translated to instance of user at that name
246 user_var = 'apiuser'
251 user_var = 'apiuser'
247 request_var = 'request'
252 request_var = 'request'
248
253
249 for arg in [user_var, request_var]:
254 for arg in [user_var, request_var]:
250 if arg not in arglist:
255 if arg not in arglist:
251 return jsonrpc_error(
256 return jsonrpc_error(
252 request,
257 request,
253 retid=request.rpc_id,
258 retid=request.rpc_id,
254 message='This method [%s] does not support '
259 message='This method [%s] does not support '
255 'required parameter `%s`' % (func.__name__, arg))
260 'required parameter `%s`' % (func.__name__, arg))
256
261
257 # get our arglist and check if we provided them as args
262 # get our arglist and check if we provided them as args
258 for arg, default in func_kwargs.items():
263 for arg, default in func_kwargs.items():
259 if arg in [user_var, request_var]:
264 if arg in [user_var, request_var]:
260 # user_var and request_var are pre-hardcoded parameters and we
265 # user_var and request_var are pre-hardcoded parameters and we
261 # don't need to do any translation
266 # don't need to do any translation
262 continue
267 continue
263
268
264 # skip the required param check if it's default value is
269 # skip the required param check if it's default value is
265 # NotImplementedType (default_empty)
270 # NotImplementedType (default_empty)
266 if default == default_empty and arg not in request.rpc_params:
271 if default == default_empty and arg not in request.rpc_params:
267 return jsonrpc_error(
272 return jsonrpc_error(
268 request,
273 request,
269 retid=request.rpc_id,
274 retid=request.rpc_id,
270 message=('Missing non optional `%s` arg in JSON DATA' % arg)
275 message=('Missing non optional `%s` arg in JSON DATA' % arg)
271 )
276 )
272
277
273 # sanitize extra passed arguments
278 # sanitize extra passed arguments
274 for k in request.rpc_params.keys()[:]:
279 for k in request.rpc_params.keys()[:]:
275 if k not in func_kwargs:
280 if k not in func_kwargs:
276 del request.rpc_params[k]
281 del request.rpc_params[k]
277
282
278 call_params = request.rpc_params
283 call_params = request.rpc_params
279 call_params.update({
284 call_params.update({
280 'request': request,
285 'request': request,
281 'apiuser': auth_u
286 'apiuser': auth_u
282 })
287 })
283
288
284 # register some common functions for usage
289 # register some common functions for usage
285 attach_context_attributes(
290 attach_context_attributes(
286 TemplateArgs(), request, request.rpc_user.user_id)
291 TemplateArgs(), request, request.rpc_user.user_id)
287
292
288 try:
293 try:
289 ret_value = func(**call_params)
294 ret_value = func(**call_params)
290 return jsonrpc_response(request, ret_value)
295 return jsonrpc_response(request, ret_value)
291 except JSONRPCBaseError:
296 except JSONRPCBaseError:
292 raise
297 raise
293 except Exception:
298 except Exception:
294 log.exception('Unhandled exception occurred on api call: %s', func)
299 log.exception('Unhandled exception occurred on api call: %s', func)
295 return jsonrpc_error(request, retid=request.rpc_id,
300 exc_info = sys.exc_info()
296 message='Internal server error')
301 store_exception(id(exc_info), exc_info, prefix='rhodecode-api')
302 return jsonrpc_error(
303 request, retid=request.rpc_id, message='Internal server error')
297
304
298
305
299 def setup_request(request):
306 def setup_request(request):
300 """
307 """
301 Parse a JSON-RPC request body. It's used inside the predicates method
308 Parse a JSON-RPC request body. It's used inside the predicates method
302 to validate and bootstrap requests for usage in rpc calls.
309 to validate and bootstrap requests for usage in rpc calls.
303
310
304 We need to raise JSONRPCError here if we want to return some errors back to
311 We need to raise JSONRPCError here if we want to return some errors back to
305 user.
312 user.
306 """
313 """
307
314
308 log.debug('Executing setup request: %r', request)
315 log.debug('Executing setup request: %r', request)
309 request.rpc_ip_addr = get_ip_addr(request.environ)
316 request.rpc_ip_addr = get_ip_addr(request.environ)
310 # TODO(marcink): deprecate GET at some point
317 # TODO(marcink): deprecate GET at some point
311 if request.method not in ['POST', 'GET']:
318 if request.method not in ['POST', 'GET']:
312 log.debug('unsupported request method "%s"', request.method)
319 log.debug('unsupported request method "%s"', request.method)
313 raise JSONRPCError(
320 raise JSONRPCError(
314 'unsupported request method "%s". Please use POST' % request.method)
321 'unsupported request method "%s". Please use POST' % request.method)
315
322
316 if 'CONTENT_LENGTH' not in request.environ:
323 if 'CONTENT_LENGTH' not in request.environ:
317 log.debug("No Content-Length")
324 log.debug("No Content-Length")
318 raise JSONRPCError("Empty body, No Content-Length in request")
325 raise JSONRPCError("Empty body, No Content-Length in request")
319
326
320 else:
327 else:
321 length = request.environ['CONTENT_LENGTH']
328 length = request.environ['CONTENT_LENGTH']
322 log.debug('Content-Length: %s', length)
329 log.debug('Content-Length: %s', length)
323
330
324 if length == 0:
331 if length == 0:
325 log.debug("Content-Length is 0")
332 log.debug("Content-Length is 0")
326 raise JSONRPCError("Content-Length is 0")
333 raise JSONRPCError("Content-Length is 0")
327
334
328 raw_body = request.body
335 raw_body = request.body
329 try:
336 try:
330 json_body = json.loads(raw_body)
337 json_body = json.loads(raw_body)
331 except ValueError as e:
338 except ValueError as e:
332 # catch JSON errors Here
339 # catch JSON errors Here
333 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
340 raise JSONRPCError("JSON parse error ERR:%s RAW:%r" % (e, raw_body))
334
341
335 request.rpc_id = json_body.get('id')
342 request.rpc_id = json_body.get('id')
336 request.rpc_method = json_body.get('method')
343 request.rpc_method = json_body.get('method')
337
344
338 # check required base parameters
345 # check required base parameters
339 try:
346 try:
340 api_key = json_body.get('api_key')
347 api_key = json_body.get('api_key')
341 if not api_key:
348 if not api_key:
342 api_key = json_body.get('auth_token')
349 api_key = json_body.get('auth_token')
343
350
344 if not api_key:
351 if not api_key:
345 raise KeyError('api_key or auth_token')
352 raise KeyError('api_key or auth_token')
346
353
347 # TODO(marcink): support passing in token in request header
354 # TODO(marcink): support passing in token in request header
348
355
349 request.rpc_api_key = api_key
356 request.rpc_api_key = api_key
350 request.rpc_id = json_body['id']
357 request.rpc_id = json_body['id']
351 request.rpc_method = json_body['method']
358 request.rpc_method = json_body['method']
352 request.rpc_params = json_body['args'] \
359 request.rpc_params = json_body['args'] \
353 if isinstance(json_body['args'], dict) else {}
360 if isinstance(json_body['args'], dict) else {}
354
361
355 log.debug('method: %s, params: %s', request.rpc_method, request.rpc_params)
362 log.debug('method: %s, params: %s', request.rpc_method, request.rpc_params)
356 except KeyError as e:
363 except KeyError as e:
357 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
364 raise JSONRPCError('Incorrect JSON data. Missing %s' % e)
358
365
359 log.debug('setup complete, now handling method:%s rpcid:%s',
366 log.debug('setup complete, now handling method:%s rpcid:%s',
360 request.rpc_method, request.rpc_id, )
367 request.rpc_method, request.rpc_id, )
361
368
362
369
363 class RoutePredicate(object):
370 class RoutePredicate(object):
364 def __init__(self, val, config):
371 def __init__(self, val, config):
365 self.val = val
372 self.val = val
366
373
367 def text(self):
374 def text(self):
368 return 'jsonrpc route = %s' % self.val
375 return 'jsonrpc route = %s' % self.val
369
376
370 phash = text
377 phash = text
371
378
372 def __call__(self, info, request):
379 def __call__(self, info, request):
373 if self.val:
380 if self.val:
374 # potentially setup and bootstrap our call
381 # potentially setup and bootstrap our call
375 setup_request(request)
382 setup_request(request)
376
383
377 # Always return True so that even if it isn't a valid RPC it
384 # Always return True so that even if it isn't a valid RPC it
378 # will fall through to the underlaying handlers like notfound_view
385 # will fall through to the underlaying handlers like notfound_view
379 return True
386 return True
380
387
381
388
382 class NotFoundPredicate(object):
389 class NotFoundPredicate(object):
383 def __init__(self, val, config):
390 def __init__(self, val, config):
384 self.val = val
391 self.val = val
385 self.methods = config.registry.jsonrpc_methods
392 self.methods = config.registry.jsonrpc_methods
386
393
387 def text(self):
394 def text(self):
388 return 'jsonrpc method not found = {}.'.format(self.val)
395 return 'jsonrpc method not found = {}.'.format(self.val)
389
396
390 phash = text
397 phash = text
391
398
392 def __call__(self, info, request):
399 def __call__(self, info, request):
393 return hasattr(request, 'rpc_method')
400 return hasattr(request, 'rpc_method')
394
401
395
402
396 class MethodPredicate(object):
403 class MethodPredicate(object):
397 def __init__(self, val, config):
404 def __init__(self, val, config):
398 self.method = val
405 self.method = val
399
406
400 def text(self):
407 def text(self):
401 return 'jsonrpc method = %s' % self.method
408 return 'jsonrpc method = %s' % self.method
402
409
403 phash = text
410 phash = text
404
411
405 def __call__(self, context, request):
412 def __call__(self, context, request):
406 # we need to explicitly return False here, so pyramid doesn't try to
413 # we need to explicitly return False here, so pyramid doesn't try to
407 # execute our view directly. We need our main handler to execute things
414 # execute our view directly. We need our main handler to execute things
408 return getattr(request, 'rpc_method') == self.method
415 return getattr(request, 'rpc_method') == self.method
409
416
410
417
411 def add_jsonrpc_method(config, view, **kwargs):
418 def add_jsonrpc_method(config, view, **kwargs):
412 # pop the method name
419 # pop the method name
413 method = kwargs.pop('method', None)
420 method = kwargs.pop('method', None)
414
421
415 if method is None:
422 if method is None:
416 raise ConfigurationError(
423 raise ConfigurationError(
417 'Cannot register a JSON-RPC method without specifying the '
424 'Cannot register a JSON-RPC method without specifying the "method"')
418 '"method"')
419
425
420 # we define custom predicate, to enable to detect conflicting methods,
426 # we define custom predicate, to enable to detect conflicting methods,
421 # those predicates are kind of "translation" from the decorator variables
427 # those predicates are kind of "translation" from the decorator variables
422 # to internal predicates names
428 # to internal predicates names
423
429
424 kwargs['jsonrpc_method'] = method
430 kwargs['jsonrpc_method'] = method
425
431
426 # register our view into global view store for validation
432 # register our view into global view store for validation
427 config.registry.jsonrpc_methods[method] = view
433 config.registry.jsonrpc_methods[method] = view
428
434
429 # we're using our main request_view handler, here, so each method
435 # we're using our main request_view handler, here, so each method
430 # has a unified handler for itself
436 # has a unified handler for itself
431 config.add_view(request_view, route_name='apiv2', **kwargs)
437 config.add_view(request_view, route_name='apiv2', **kwargs)
432
438
433
439
434 class jsonrpc_method(object):
440 class jsonrpc_method(object):
435 """
441 """
436 decorator that works similar to @add_view_config decorator,
442 decorator that works similar to @add_view_config decorator,
437 but tailored for our JSON RPC
443 but tailored for our JSON RPC
438 """
444 """
439
445
440 venusian = venusian # for testing injection
446 venusian = venusian # for testing injection
441
447
442 def __init__(self, method=None, **kwargs):
448 def __init__(self, method=None, **kwargs):
443 self.method = method
449 self.method = method
444 self.kwargs = kwargs
450 self.kwargs = kwargs
445
451
446 def __call__(self, wrapped):
452 def __call__(self, wrapped):
447 kwargs = self.kwargs.copy()
453 kwargs = self.kwargs.copy()
448 kwargs['method'] = self.method or wrapped.__name__
454 kwargs['method'] = self.method or wrapped.__name__
449 depth = kwargs.pop('_depth', 0)
455 depth = kwargs.pop('_depth', 0)
450
456
451 def callback(context, name, ob):
457 def callback(context, name, ob):
452 config = context.config.with_package(info.module)
458 config = context.config.with_package(info.module)
453 config.add_jsonrpc_method(view=ob, **kwargs)
459 config.add_jsonrpc_method(view=ob, **kwargs)
454
460
455 info = venusian.attach(wrapped, callback, category='pyramid',
461 info = venusian.attach(wrapped, callback, category='pyramid',
456 depth=depth + 1)
462 depth=depth + 1)
457 if info.scope == 'class':
463 if info.scope == 'class':
458 # ensure that attr is set if decorating a class method
464 # ensure that attr is set if decorating a class method
459 kwargs.setdefault('attr', wrapped.__name__)
465 kwargs.setdefault('attr', wrapped.__name__)
460
466
461 kwargs['_info'] = info.codeinfo # fbo action_method
467 kwargs['_info'] = info.codeinfo # fbo action_method
462 return wrapped
468 return wrapped
463
469
464
470
465 class jsonrpc_deprecated_method(object):
471 class jsonrpc_deprecated_method(object):
466 """
472 """
467 Marks method as deprecated, adds log.warning, and inject special key to
473 Marks method as deprecated, adds log.warning, and inject special key to
468 the request variable to mark method as deprecated.
474 the request variable to mark method as deprecated.
469 Also injects special docstring that extract_docs will catch to mark
475 Also injects special docstring that extract_docs will catch to mark
470 method as deprecated.
476 method as deprecated.
471
477
472 :param use_method: specify which method should be used instead of
478 :param use_method: specify which method should be used instead of
473 the decorated one
479 the decorated one
474
480
475 Use like::
481 Use like::
476
482
477 @jsonrpc_method()
483 @jsonrpc_method()
478 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
484 @jsonrpc_deprecated_method(use_method='new_func', deprecated_at_version='3.0.0')
479 def old_func(request, apiuser, arg1, arg2):
485 def old_func(request, apiuser, arg1, arg2):
480 ...
486 ...
481 """
487 """
482
488
483 def __init__(self, use_method, deprecated_at_version):
489 def __init__(self, use_method, deprecated_at_version):
484 self.use_method = use_method
490 self.use_method = use_method
485 self.deprecated_at_version = deprecated_at_version
491 self.deprecated_at_version = deprecated_at_version
486 self.deprecated_msg = ''
492 self.deprecated_msg = ''
487
493
488 def __call__(self, func):
494 def __call__(self, func):
489 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
495 self.deprecated_msg = 'Please use method `{method}` instead.'.format(
490 method=self.use_method)
496 method=self.use_method)
491
497
492 docstring = """\n
498 docstring = """\n
493 .. deprecated:: {version}
499 .. deprecated:: {version}
494
500
495 {deprecation_message}
501 {deprecation_message}
496
502
497 {original_docstring}
503 {original_docstring}
498 """
504 """
499 func.__doc__ = docstring.format(
505 func.__doc__ = docstring.format(
500 version=self.deprecated_at_version,
506 version=self.deprecated_at_version,
501 deprecation_message=self.deprecated_msg,
507 deprecation_message=self.deprecated_msg,
502 original_docstring=func.__doc__)
508 original_docstring=func.__doc__)
503 return decorator.decorator(self.__wrapper, func)
509 return decorator.decorator(self.__wrapper, func)
504
510
505 def __wrapper(self, func, *fargs, **fkwargs):
511 def __wrapper(self, func, *fargs, **fkwargs):
506 log.warning('DEPRECATED API CALL on function %s, please '
512 log.warning('DEPRECATED API CALL on function %s, please '
507 'use `%s` instead', func, self.use_method)
513 'use `%s` instead', func, self.use_method)
508 # alter function docstring to mark as deprecated, this is picked up
514 # alter function docstring to mark as deprecated, this is picked up
509 # via fabric file that generates API DOC.
515 # via fabric file that generates API DOC.
510 result = func(*fargs, **fkwargs)
516 result = func(*fargs, **fkwargs)
511
517
512 request = fargs[0]
518 request = fargs[0]
513 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
519 request.rpc_deprecation = 'DEPRECATED METHOD ' + self.deprecated_msg
514 return result
520 return result
515
521
516
522
517 def includeme(config):
523 def includeme(config):
518 plugin_module = 'rhodecode.api'
524 plugin_module = 'rhodecode.api'
519 plugin_settings = get_plugin_settings(
525 plugin_settings = get_plugin_settings(
520 plugin_module, config.registry.settings)
526 plugin_module, config.registry.settings)
521
527
522 if not hasattr(config.registry, 'jsonrpc_methods'):
528 if not hasattr(config.registry, 'jsonrpc_methods'):
523 config.registry.jsonrpc_methods = OrderedDict()
529 config.registry.jsonrpc_methods = OrderedDict()
524
530
525 # match filter by given method only
531 # match filter by given method only
526 config.add_view_predicate('jsonrpc_method', MethodPredicate)
532 config.add_view_predicate('jsonrpc_method', MethodPredicate)
533 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
527
534
528 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
535 config.add_renderer(DEFAULT_RENDERER, ExtJsonRenderer(
529 serializer=json.dumps, indent=4))
536 serializer=json.dumps, indent=4))
530 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
537 config.add_directive('add_jsonrpc_method', add_jsonrpc_method)
531
538
532 config.add_route_predicate(
539 config.add_route_predicate(
533 'jsonrpc_call', RoutePredicate)
540 'jsonrpc_call', RoutePredicate)
534
541
535 config.add_route(
542 config.add_route(
536 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
543 'apiv2', plugin_settings.get('url', DEFAULT_URL), jsonrpc_call=True)
537
544
538 config.scan(plugin_module, ignore='rhodecode.api.tests')
545 config.scan(plugin_module, ignore='rhodecode.api.tests')
539 # register some exception handling view
546 # register some exception handling view
540 config.add_view(exception_view, context=JSONRPCBaseError)
547 config.add_view(exception_view, context=JSONRPCBaseError)
541 config.add_view_predicate('jsonrpc_method_not_found', NotFoundPredicate)
542 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
548 config.add_notfound_view(exception_view, jsonrpc_method_not_found=True)
@@ -1,302 +1,302 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2
2
3 # Copyright (C) 2010-2018 RhodeCode GmbH
3 # Copyright (C) 2010-2018 RhodeCode GmbH
4 #
4 #
5 # This program is free software: you can redistribute it and/or modify
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License, version 3
6 # it under the terms of the GNU Affero General Public License, version 3
7 # (only), as published by the Free Software Foundation.
7 # (only), as published by the Free Software Foundation.
8 #
8 #
9 # This program is distributed in the hope that it will be useful,
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
12 # GNU General Public License for more details.
13 #
13 #
14 # You should have received a copy of the GNU Affero General Public License
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
16 #
16 #
17 # This program is dual-licensed. If you wish to learn more about the
17 # This program is dual-licensed. If you wish to learn more about the
18 # RhodeCode Enterprise Edition, including its added features, Support services,
18 # RhodeCode Enterprise Edition, including its added features, Support services,
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
19 # and proprietary license terms, please see https://rhodecode.com/licenses/
20 """
20 """
21 Celery loader, run with::
21 Celery loader, run with::
22
22
23 celery worker \
23 celery worker \
24 --beat \
24 --beat \
25 --app rhodecode.lib.celerylib.loader \
25 --app rhodecode.lib.celerylib.loader \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
26 --scheduler rhodecode.lib.celerylib.scheduler.RcScheduler \
27 --loglevel DEBUG --ini=._dev/dev.ini
27 --loglevel DEBUG --ini=._dev/dev.ini
28 """
28 """
29 import os
29 import os
30 import logging
30 import logging
31 import importlib
31 import importlib
32
32
33 from celery import Celery
33 from celery import Celery
34 from celery import signals
34 from celery import signals
35 from celery import Task
35 from celery import Task
36 from celery import exceptions # pragma: no cover
36 from celery import exceptions # pragma: no cover
37 from kombu.serialization import register
37 from kombu.serialization import register
38 from pyramid.threadlocal import get_current_request
38 from pyramid.threadlocal import get_current_request
39
39
40 import rhodecode
40 import rhodecode
41
41
42 from rhodecode.lib.auth import AuthUser
42 from rhodecode.lib.auth import AuthUser
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
43 from rhodecode.lib.celerylib.utils import get_ini_config, parse_ini_vars
44 from rhodecode.lib.ext_json import json
44 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
45 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging, prepare_request
46 from rhodecode.lib.utils2 import str2bool
46 from rhodecode.lib.utils2 import str2bool
47 from rhodecode.model import meta
47 from rhodecode.model import meta
48
48
49
49
50 register('json_ext', json.dumps, json.loads,
50 register('json_ext', json.dumps, json.loads,
51 content_type='application/x-json-ext',
51 content_type='application/x-json-ext',
52 content_encoding='utf-8')
52 content_encoding='utf-8')
53
53
54 log = logging.getLogger('celery.rhodecode.loader')
54 log = logging.getLogger('celery.rhodecode.loader')
55
55
56
56
57 def add_preload_arguments(parser):
57 def add_preload_arguments(parser):
58 parser.add_argument(
58 parser.add_argument(
59 '--ini', default=None,
59 '--ini', default=None,
60 help='Path to ini configuration file.'
60 help='Path to ini configuration file.'
61 )
61 )
62 parser.add_argument(
62 parser.add_argument(
63 '--ini-var', default=None,
63 '--ini-var', default=None,
64 help='Comma separated list of key=value to pass to ini.'
64 help='Comma separated list of key=value to pass to ini.'
65 )
65 )
66
66
67
67
68 def get_logger(obj):
68 def get_logger(obj):
69 custom_log = logging.getLogger(
69 custom_log = logging.getLogger(
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
70 'rhodecode.task.{}'.format(obj.__class__.__name__))
71
71
72 if rhodecode.CELERY_ENABLED:
72 if rhodecode.CELERY_ENABLED:
73 try:
73 try:
74 custom_log = obj.get_logger()
74 custom_log = obj.get_logger()
75 except Exception:
75 except Exception:
76 pass
76 pass
77
77
78 return custom_log
78 return custom_log
79
79
80
80
81 imports = ['rhodecode.lib.celerylib.tasks']
81 imports = ['rhodecode.lib.celerylib.tasks']
82
82
83 try:
83 try:
84 # try if we have EE tasks available
84 # try if we have EE tasks available
85 importlib.import_module('rc_ee')
85 importlib.import_module('rc_ee')
86 imports.append('rc_ee.lib.celerylib.tasks')
86 imports.append('rc_ee.lib.celerylib.tasks')
87 except ImportError:
87 except ImportError:
88 pass
88 pass
89
89
90
90
91 base_celery_config = {
91 base_celery_config = {
92 'result_backend': 'rpc://',
92 'result_backend': 'rpc://',
93 'result_expires': 60 * 60 * 24,
93 'result_expires': 60 * 60 * 24,
94 'result_persistent': True,
94 'result_persistent': True,
95 'imports': imports,
95 'imports': imports,
96 'worker_max_tasks_per_child': 100,
96 'worker_max_tasks_per_child': 100,
97 'accept_content': ['json_ext'],
97 'accept_content': ['json_ext'],
98 'task_serializer': 'json_ext',
98 'task_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
99 'result_serializer': 'json_ext',
100 'worker_hijack_root_logger': False,
100 'worker_hijack_root_logger': False,
101 'database_table_names': {
101 'database_table_names': {
102 'task': 'beat_taskmeta',
102 'task': 'beat_taskmeta',
103 'group': 'beat_groupmeta',
103 'group': 'beat_groupmeta',
104 }
104 }
105 }
105 }
106 # init main celery app
106 # init main celery app
107 celery_app = Celery()
107 celery_app = Celery()
108 celery_app.user_options['preload'].add(add_preload_arguments)
108 celery_app.user_options['preload'].add(add_preload_arguments)
109 ini_file_glob = None
109 ini_file_glob = None
110
110
111
111
112 @signals.setup_logging.connect
112 @signals.setup_logging.connect
113 def setup_logging_callback(**kwargs):
113 def setup_logging_callback(**kwargs):
114 setup_logging(ini_file_glob)
114 setup_logging(ini_file_glob)
115
115
116
116
117 @signals.user_preload_options.connect
117 @signals.user_preload_options.connect
118 def on_preload_parsed(options, **kwargs):
118 def on_preload_parsed(options, **kwargs):
119 ini_location = options['ini']
119 ini_location = options['ini']
120 ini_vars = options['ini_var']
120 ini_vars = options['ini_var']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
121 celery_app.conf['INI_PYRAMID'] = options['ini']
122
122
123 if ini_location is None:
123 if ini_location is None:
124 print('You must provide the paste --ini argument')
124 print('You must provide the paste --ini argument')
125 exit(-1)
125 exit(-1)
126
126
127 options = None
127 options = None
128 if ini_vars is not None:
128 if ini_vars is not None:
129 options = parse_ini_vars(ini_vars)
129 options = parse_ini_vars(ini_vars)
130
130
131 global ini_file_glob
131 global ini_file_glob
132 ini_file_glob = ini_location
132 ini_file_glob = ini_location
133
133
134 log.debug('Bootstrapping RhodeCode application...')
134 log.debug('Bootstrapping RhodeCode application...')
135 env = bootstrap(ini_location, options=options)
135 env = bootstrap(ini_location, options=options)
136
136
137 setup_celery_app(
137 setup_celery_app(
138 app=env['app'], root=env['root'], request=env['request'],
138 app=env['app'], root=env['root'], request=env['request'],
139 registry=env['registry'], closer=env['closer'],
139 registry=env['registry'], closer=env['closer'],
140 ini_location=ini_location)
140 ini_location=ini_location)
141
141
142 # fix the global flag even if it's disabled via .ini file because this
142 # fix the global flag even if it's disabled via .ini file because this
143 # is a worker code that doesn't need this to be disabled.
143 # is a worker code that doesn't need this to be disabled.
144 rhodecode.CELERY_ENABLED = True
144 rhodecode.CELERY_ENABLED = True
145
145
146
146
147 @signals.task_success.connect
147 @signals.task_success.connect
148 def task_success_signal(result, **kwargs):
148 def task_success_signal(result, **kwargs):
149 meta.Session.commit()
149 meta.Session.commit()
150 closer = celery_app.conf['PYRAMID_CLOSER']
150 closer = celery_app.conf['PYRAMID_CLOSER']
151 if closer:
151 if closer:
152 closer()
152 closer()
153
153
154
154
155 @signals.task_retry.connect
155 @signals.task_retry.connect
156 def task_retry_signal(
156 def task_retry_signal(
157 request, reason, einfo, **kwargs):
157 request, reason, einfo, **kwargs):
158 meta.Session.remove()
158 meta.Session.remove()
159 closer = celery_app.conf['PYRAMID_CLOSER']
159 closer = celery_app.conf['PYRAMID_CLOSER']
160 if closer:
160 if closer:
161 closer()
161 closer()
162
162
163
163
164 @signals.task_failure.connect
164 @signals.task_failure.connect
165 def task_failure_signal(
165 def task_failure_signal(
166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
166 task_id, exception, args, kwargs, traceback, einfo, **kargs):
167 from rhodecode.lib.exc_tracking import store_exception
167 from rhodecode.lib.exc_tracking import store_exception
168
168
169 meta.Session.remove()
169 meta.Session.remove()
170
170
171 # simulate sys.exc_info()
171 # simulate sys.exc_info()
172 exc_info = (einfo.type, einfo.exception, einfo.tb)
172 exc_info = (einfo.type, einfo.exception, einfo.tb)
173 store_exception(id(exc_info), exc_info, prefix='celery_rhodecode')
173 store_exception(id(exc_info), exc_info, prefix='rhodecode-celery')
174
174
175 closer = celery_app.conf['PYRAMID_CLOSER']
175 closer = celery_app.conf['PYRAMID_CLOSER']
176 if closer:
176 if closer:
177 closer()
177 closer()
178
178
179
179
180 @signals.task_revoked.connect
180 @signals.task_revoked.connect
181 def task_revoked_signal(
181 def task_revoked_signal(
182 request, terminated, signum, expired, **kwargs):
182 request, terminated, signum, expired, **kwargs):
183 closer = celery_app.conf['PYRAMID_CLOSER']
183 closer = celery_app.conf['PYRAMID_CLOSER']
184 if closer:
184 if closer:
185 closer()
185 closer()
186
186
187
187
188 def setup_celery_app(app, root, request, registry, closer, ini_location):
188 def setup_celery_app(app, root, request, registry, closer, ini_location):
189 ini_dir = os.path.dirname(os.path.abspath(ini_location))
189 ini_dir = os.path.dirname(os.path.abspath(ini_location))
190 celery_config = base_celery_config
190 celery_config = base_celery_config
191 celery_config.update({
191 celery_config.update({
192 # store celerybeat scheduler db where the .ini file is
192 # store celerybeat scheduler db where the .ini file is
193 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
193 'beat_schedule_filename': os.path.join(ini_dir, 'celerybeat-schedule'),
194 })
194 })
195 ini_settings = get_ini_config(ini_location)
195 ini_settings = get_ini_config(ini_location)
196 log.debug('Got custom celery conf: %s', ini_settings)
196 log.debug('Got custom celery conf: %s', ini_settings)
197
197
198 celery_config.update(ini_settings)
198 celery_config.update(ini_settings)
199 celery_app.config_from_object(celery_config)
199 celery_app.config_from_object(celery_config)
200
200
201 celery_app.conf.update({'PYRAMID_APP': app})
201 celery_app.conf.update({'PYRAMID_APP': app})
202 celery_app.conf.update({'PYRAMID_ROOT': root})
202 celery_app.conf.update({'PYRAMID_ROOT': root})
203 celery_app.conf.update({'PYRAMID_REQUEST': request})
203 celery_app.conf.update({'PYRAMID_REQUEST': request})
204 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
204 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
205 celery_app.conf.update({'PYRAMID_CLOSER': closer})
205 celery_app.conf.update({'PYRAMID_CLOSER': closer})
206
206
207
207
208 def configure_celery(config, ini_location):
208 def configure_celery(config, ini_location):
209 """
209 """
210 Helper that is called from our application creation logic. It gives
210 Helper that is called from our application creation logic. It gives
211 connection info into running webapp and allows execution of tasks from
211 connection info into running webapp and allows execution of tasks from
212 RhodeCode itself
212 RhodeCode itself
213 """
213 """
214 # store some globals into rhodecode
214 # store some globals into rhodecode
215 rhodecode.CELERY_ENABLED = str2bool(
215 rhodecode.CELERY_ENABLED = str2bool(
216 config.registry.settings.get('use_celery'))
216 config.registry.settings.get('use_celery'))
217 if rhodecode.CELERY_ENABLED:
217 if rhodecode.CELERY_ENABLED:
218 log.info('Configuring celery based on `%s` file', ini_location)
218 log.info('Configuring celery based on `%s` file', ini_location)
219 setup_celery_app(
219 setup_celery_app(
220 app=None, root=None, request=None, registry=config.registry,
220 app=None, root=None, request=None, registry=config.registry,
221 closer=None, ini_location=ini_location)
221 closer=None, ini_location=ini_location)
222
222
223
223
224 def maybe_prepare_env(req):
224 def maybe_prepare_env(req):
225 environ = {}
225 environ = {}
226 try:
226 try:
227 environ.update({
227 environ.update({
228 'PATH_INFO': req.environ['PATH_INFO'],
228 'PATH_INFO': req.environ['PATH_INFO'],
229 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
229 'SCRIPT_NAME': req.environ['SCRIPT_NAME'],
230 'HTTP_HOST':
230 'HTTP_HOST':
231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
231 req.environ.get('HTTP_HOST', req.environ['SERVER_NAME']),
232 'SERVER_NAME': req.environ['SERVER_NAME'],
232 'SERVER_NAME': req.environ['SERVER_NAME'],
233 'SERVER_PORT': req.environ['SERVER_PORT'],
233 'SERVER_PORT': req.environ['SERVER_PORT'],
234 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
234 'wsgi.url_scheme': req.environ['wsgi.url_scheme'],
235 })
235 })
236 except Exception:
236 except Exception:
237 pass
237 pass
238
238
239 return environ
239 return environ
240
240
241
241
242 class RequestContextTask(Task):
242 class RequestContextTask(Task):
243 """
243 """
244 This is a celery task which will create a rhodecode app instance context
244 This is a celery task which will create a rhodecode app instance context
245 for the task, patch pyramid with the original request
245 for the task, patch pyramid with the original request
246 that created the task and also add the user to the context.
246 that created the task and also add the user to the context.
247 """
247 """
248
248
249 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
249 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
250 link=None, link_error=None, shadow=None, **options):
250 link=None, link_error=None, shadow=None, **options):
251 """ queue the job to run (we are in web request context here) """
251 """ queue the job to run (we are in web request context here) """
252
252
253 req = get_current_request()
253 req = get_current_request()
254
254
255 # web case
255 # web case
256 if hasattr(req, 'user'):
256 if hasattr(req, 'user'):
257 ip_addr = req.user.ip_addr
257 ip_addr = req.user.ip_addr
258 user_id = req.user.user_id
258 user_id = req.user.user_id
259
259
260 # api case
260 # api case
261 elif hasattr(req, 'rpc_user'):
261 elif hasattr(req, 'rpc_user'):
262 ip_addr = req.rpc_user.ip_addr
262 ip_addr = req.rpc_user.ip_addr
263 user_id = req.rpc_user.user_id
263 user_id = req.rpc_user.user_id
264 else:
264 else:
265 raise Exception(
265 raise Exception(
266 'Unable to fetch required data from request: {}. \n'
266 'Unable to fetch required data from request: {}. \n'
267 'This task is required to be executed from context of '
267 'This task is required to be executed from context of '
268 'request in a webapp'.format(repr(req)))
268 'request in a webapp'.format(repr(req)))
269
269
270 if req:
270 if req:
271 # we hook into kwargs since it is the only way to pass our data to
271 # we hook into kwargs since it is the only way to pass our data to
272 # the celery worker
272 # the celery worker
273 environ = maybe_prepare_env(req)
273 environ = maybe_prepare_env(req)
274 options['headers'] = options.get('headers', {})
274 options['headers'] = options.get('headers', {})
275 options['headers'].update({
275 options['headers'].update({
276 'rhodecode_proxy_data': {
276 'rhodecode_proxy_data': {
277 'environ': environ,
277 'environ': environ,
278 'auth_user': {
278 'auth_user': {
279 'ip_addr': ip_addr,
279 'ip_addr': ip_addr,
280 'user_id': user_id
280 'user_id': user_id
281 },
281 },
282 }
282 }
283 })
283 })
284
284
285 return super(RequestContextTask, self).apply_async(
285 return super(RequestContextTask, self).apply_async(
286 args, kwargs, task_id, producer, link, link_error, shadow, **options)
286 args, kwargs, task_id, producer, link, link_error, shadow, **options)
287
287
288 def __call__(self, *args, **kwargs):
288 def __call__(self, *args, **kwargs):
289 """ rebuild the context and then run task on celery worker """
289 """ rebuild the context and then run task on celery worker """
290
290
291 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
291 proxy_data = getattr(self.request, 'rhodecode_proxy_data', None)
292 if not proxy_data:
292 if not proxy_data:
293 return super(RequestContextTask, self).__call__(*args, **kwargs)
293 return super(RequestContextTask, self).__call__(*args, **kwargs)
294
294
295 log.debug('using celery proxy data to run task: %r', proxy_data)
295 log.debug('using celery proxy data to run task: %r', proxy_data)
296 # re-inject and register threadlocals for proper routing support
296 # re-inject and register threadlocals for proper routing support
297 request = prepare_request(proxy_data['environ'])
297 request = prepare_request(proxy_data['environ'])
298 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
298 request.user = AuthUser(user_id=proxy_data['auth_user']['user_id'],
299 ip_addr=proxy_data['auth_user']['ip_addr'])
299 ip_addr=proxy_data['auth_user']['ip_addr'])
300
300
301 return super(RequestContextTask, self).__call__(*args, **kwargs)
301 return super(RequestContextTask, self).__call__(*args, **kwargs)
302
302
General Comments 0
You need to be logged in to leave comments. Login now