##// END OF EJS Templates
celery: update how reqquest object is passed arround....
super-admin -
r4878:c5087cb0 default
parent child Browse files
Show More
@@ -325,6 +325,5 b' class EmailEventHandler(object):'
325
325
326 recipients = self.integration_settings['recipients']
326 recipients = self.integration_settings['recipients']
327 for email_address in recipients:
327 for email_address in recipients:
328 run_task(
328 run_task(tasks.send_email, email_address, subject,
329 tasks.send_email, email_address, subject,
329 email_body_plaintext, email_body_html)
330 email_body_plaintext, email_body_html)
@@ -38,11 +38,9 b' from celery import signals'
38 from celery import Task
38 from celery import Task
39 from celery import exceptions # pragma: no cover
39 from celery import exceptions # pragma: no cover
40 from kombu.serialization import register
40 from kombu.serialization import register
41 from pyramid.threadlocal import get_current_request
42
41
43 import rhodecode
42 import rhodecode
44
43
45 from rhodecode.lib.auth import AuthUser
46 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 from rhodecode.lib.ext_json import json
45 from rhodecode.lib.ext_json import json
48 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
@@ -212,6 +210,19 b' def task_revoked_signal('
212 closer()
210 closer()
213
211
214
212
213 class UNSET(object):
214 pass
215
216
217 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
218
219 if request is not UNSET:
220 celery_app.conf.update({'PYRAMID_REQUEST': request})
221
222 if registry is not UNSET:
223 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
224
225
215 def setup_celery_app(app, root, request, registry, closer, celery_settings):
226 def setup_celery_app(app, root, request, registry, closer, celery_settings):
216 log.debug('Got custom celery conf: %s', celery_settings)
227 log.debug('Got custom celery conf: %s', celery_settings)
217 celery_config = base_celery_config
228 celery_config = base_celery_config
@@ -273,52 +284,38 b' class RequestContextTask(Task):'
273 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
284 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
274 link=None, link_error=None, shadow=None, **options):
285 link=None, link_error=None, shadow=None, **options):
275 """ queue the job to run (we are in web request context here) """
286 """ queue the job to run (we are in web request context here) """
287 from rhodecode.lib.base import get_ip_addr
276
288
277 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
289 req = self.app.conf['PYRAMID_REQUEST']
290 if not req:
291 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
278
292
279 log.debug('Running Task with class: %s. Request Class: %s',
293 log.debug('Running Task with class: %s. Request Class: %s',
280 self.__class__, req.__class__)
294 self.__class__, req.__class__)
281
295
282 user_id = None
296 user_id = 0
283 ip_addr = None
284
297
285 # web case
298 # web case
286 if hasattr(req, 'user'):
299 if hasattr(req, 'user'):
287 ip_addr = req.user.ip_addr
288 user_id = req.user.user_id
300 user_id = req.user.user_id
289
301
290 # api case
302 # api case
291 elif hasattr(req, 'rpc_user'):
303 elif hasattr(req, 'rpc_user'):
292 ip_addr = req.rpc_user.ip_addr
293 user_id = req.rpc_user.user_id
304 user_id = req.rpc_user.user_id
294 else:
295 if user_id and ip_addr:
296 log.debug('Using data from celery proxy user')
297
305
298 else:
306 # we hook into kwargs since it is the only way to pass our data to
299 raise Exception(
307 # the celery worker
300 'Unable to fetch required data from request: {}. \n'
308 environ = maybe_prepare_env(req)
301 'This task is required to be executed from context of '
309 options['headers'] = options.get('headers', {})
302 'request in a webapp. Task: {}'.format(
310 options['headers'].update({
303 repr(req),
311 'rhodecode_proxy_data': {
304 self.__class__
312 'environ': environ,
305 )
313 'auth_user': {
306 )
314 'ip_addr': get_ip_addr(req.environ),
307
315 'user_id': user_id
308 if req:
316 },
309 # we hook into kwargs since it is the only way to pass our data to
317 }
310 # the celery worker
318 })
311 environ = maybe_prepare_env(req)
312 options['headers'] = options.get('headers', {})
313 options['headers'].update({
314 'rhodecode_proxy_data': {
315 'environ': environ,
316 'auth_user': {
317 'ip_addr': ip_addr,
318 'user_id': user_id
319 },
320 }
321 })
322
319
323 return super(RequestContextTask, self).apply_async(
320 return super(RequestContextTask, self).apply_async(
324 args, kwargs, task_id, producer, link, link_error, shadow, **options)
321 args, kwargs, task_id, producer, link, link_error, shadow, **options)
@@ -57,8 +57,10 b' class HooksHttpHandler(BaseHTTPRequestHa'
57 'TXN ID fail: expected {} got {} instead'.format(
57 'TXN ID fail: expected {} got {} instead'.format(
58 txn_id, computed_txn_id))
58 txn_id, computed_txn_id))
59
59
60 request = getattr(self.server, 'request', None)
60 try:
61 try:
61 result = self._call_hook(method, extras)
62 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
63 result = self._call_hook_method(hooks, method, extras)
62 except Exception as e:
64 except Exception as e:
63 exc_tb = traceback.format_exc()
65 exc_tb = traceback.format_exc()
64 result = {
66 result = {
@@ -80,8 +82,7 b' class HooksHttpHandler(BaseHTTPRequestHa'
80 self.end_headers()
82 self.end_headers()
81 self.wfile.write(json.dumps(result))
83 self.wfile.write(json.dumps(result))
82
84
83 def _call_hook(self, method, extras):
85 def _call_hook_method(self, hooks, method, extras):
84 hooks = Hooks()
85 try:
86 try:
86 result = getattr(hooks, method)(extras)
87 result = getattr(hooks, method)(extras)
87 finally:
88 finally:
@@ -97,7 +98,7 b' class HooksHttpHandler(BaseHTTPRequestHa'
97 message = format % args
98 message = format % args
98
99
99 log.debug(
100 log.debug(
100 "%s - - [%s] %s", self.client_address[0],
101 "HOOKS: %s - - [%s] %s", self.client_address,
101 self.log_date_time_string(), message)
102 self.log_date_time_string(), message)
102
103
103
104
@@ -154,6 +155,10 b' class HttpHooksCallbackDaemon(ThreadedHo'
154 # request and wastes cpu at all other times.
155 # request and wastes cpu at all other times.
155 POLL_INTERVAL = 0.01
156 POLL_INTERVAL = 0.01
156
157
158 @property
159 def _hook_prefix(self):
160 return 'HOOKS: {} '.format(self.hooks_uri)
161
157 def get_hostname(self):
162 def get_hostname(self):
158 return socket.gethostname() or '127.0.0.1'
163 return socket.gethostname() or '127.0.0.1'
159
164
@@ -162,6 +167,8 b' class HttpHooksCallbackDaemon(ThreadedHo'
162 return _get_port(min_port, max_port)
167 return _get_port(min_port, max_port)
163
168
164 def _prepare(self, txn_id=None, host=None, port=None):
169 def _prepare(self, txn_id=None, host=None, port=None):
170 from pyramid.threadlocal import get_current_request
171
165 if not host or host == "*":
172 if not host or host == "*":
166 host = self.get_hostname()
173 host = self.get_hostname()
167 if not port:
174 if not port:
@@ -173,13 +180,16 b' class HttpHooksCallbackDaemon(ThreadedHo'
173 self._done = False
180 self._done = False
174
181
175 log.debug(
182 log.debug(
176 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
183 "%s Preparing HTTP callback daemon registering hook object: %s",
177 self.hooks_uri, HooksHttpHandler)
184 self._hook_prefix, HooksHttpHandler)
178
185
179 self._daemon = TCPServer(server_address, HooksHttpHandler)
186 self._daemon = TCPServer(server_address, HooksHttpHandler)
180 # inject transaction_id for later verification
187 # inject transaction_id for later verification
181 self._daemon.txn_id = self.txn_id
188 self._daemon.txn_id = self.txn_id
182
189
190 # pass the WEB app request into daemon
191 self._daemon.request = get_current_request()
192
183 def _run(self):
193 def _run(self):
184 log.debug("Running event loop of callback daemon in background thread")
194 log.debug("Running event loop of callback daemon in background thread")
185 callback_thread = threading.Thread(
195 callback_thread = threading.Thread(
@@ -269,40 +279,35 b' class Hooks(object):'
269 """
279 """
270 Exposes the hooks for remote call backs
280 Exposes the hooks for remote call backs
271 """
281 """
282 def __init__(self, request=None, log_prefix=''):
283 self.log_prefix = log_prefix
284 self.request = request
272
285
273 def repo_size(self, extras):
286 def repo_size(self, extras):
274 log.debug("Called repo_size of %s object", self)
287 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
275 return self._call_hook(hooks_base.repo_size, extras)
288 return self._call_hook(hooks_base.repo_size, extras)
276
289
277 def pre_pull(self, extras):
290 def pre_pull(self, extras):
278 log.debug("Called pre_pull of %s object", self)
291 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
279 return self._call_hook(hooks_base.pre_pull, extras)
292 return self._call_hook(hooks_base.pre_pull, extras)
280
293
281 def post_pull(self, extras):
294 def post_pull(self, extras):
282 log.debug("Called post_pull of %s object", self)
295 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
283 return self._call_hook(hooks_base.post_pull, extras)
296 return self._call_hook(hooks_base.post_pull, extras)
284
297
285 def pre_push(self, extras):
298 def pre_push(self, extras):
286 log.debug("Called pre_push of %s object", self)
299 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
287 return self._call_hook(hooks_base.pre_push, extras)
300 return self._call_hook(hooks_base.pre_push, extras)
288
301
289 def post_push(self, extras):
302 def post_push(self, extras):
290 log.debug("Called post_push of %s object", self)
303 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
291 return self._call_hook(hooks_base.post_push, extras)
304 return self._call_hook(hooks_base.post_push, extras)
292
305
293 def _call_hook(self, hook, extras):
306 def _call_hook(self, hook, extras):
294 extras = AttributeDict(extras)
307 extras = AttributeDict(extras)
295 server_url = extras['server_url']
308 server_url = extras['server_url']
296 request = bootstrap_request(application_url=server_url)
297
309
298 bootstrap_config(request) # inject routes and other interfaces
310 extras.request = self.request
299
300 # inject the user for usage in hooks
301 request.user = AttributeDict({'username': extras.username,
302 'ip_addr': extras.ip,
303 'user_id': extras.user_id})
304
305 extras.request = request
306
311
307 try:
312 try:
308 result = hook(extras)
313 result = hook(extras)
@@ -322,7 +327,7 b' class Hooks(object):'
322 exc_tb = ''
327 exc_tb = ''
323 if not isinstance(error, HTTPLockedRC):
328 if not isinstance(error, HTTPLockedRC):
324 exc_tb = traceback.format_exc()
329 exc_tb = traceback.format_exc()
325 log.exception('Exception when handling hook %s', hook)
330 log.exception('%sException when handling hook %s', self.log_prefix, hook)
326 error_args = error.args
331 error_args = error.args
327 return {
332 return {
328 'status': 128,
333 'status': 128,
@@ -334,7 +339,7 b' class Hooks(object):'
334 finally:
339 finally:
335 meta.Session.remove()
340 meta.Session.remove()
336
341
337 log.debug('Got hook call response %s', result)
342 log.debug('%sGot hook call response %s', self.log_prefix, result)
338 return {
343 return {
339 'status': result.status,
344 'status': result.status,
340 'output': result.output,
345 'output': result.output,
@@ -154,9 +154,8 b' class NotificationModel(BaseModel):'
154 extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')}
154 extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')}
155
155
156 log.debug('Creating notification email task for user:`%s`', recipient)
156 log.debug('Creating notification email task for user:`%s`', recipient)
157 task = run_task(
157 task = run_task(tasks.send_email, recipient.email, subject,
158 tasks.send_email, recipient.email, subject,
158 email_body_plaintext, email_body, extra_headers=extra_headers)
159 email_body_plaintext, email_body, extra_headers=extra_headers)
160 log.debug('Created email task: %s', task)
159 log.debug('Created email task: %s', task)
161
160
162 return notification
161 return notification
@@ -69,6 +69,12 b' def set_user_lang(event):'
69 event.request._LOCALE_ = user_lang
69 event.request._LOCALE_ = user_lang
70
70
71
71
72 def update_celery_conf(event):
73 from rhodecode.lib.celerylib.loader import set_celery_conf
74 log.debug('Setting celery config from new request')
75 set_celery_conf(request=event.request, registry=event.request.registry)
76
77
72 def add_request_user_context(event):
78 def add_request_user_context(event):
73 """
79 """
74 Adds auth user into request context
80 Adds auth user into request context
@@ -108,6 +108,8 b' def sanity_check_factory(handler, regist'
108 def includeme(config):
108 def includeme(config):
109 config.add_subscriber('rhodecode.subscribers.add_renderer_globals',
109 config.add_subscriber('rhodecode.subscribers.add_renderer_globals',
110 'pyramid.events.BeforeRender')
110 'pyramid.events.BeforeRender')
111 config.add_subscriber('rhodecode.subscribers.update_celery_conf',
112 'pyramid.events.NewRequest')
111 config.add_subscriber('rhodecode.subscribers.set_user_lang',
113 config.add_subscriber('rhodecode.subscribers.set_user_lang',
112 'pyramid.events.NewRequest')
114 'pyramid.events.NewRequest')
113 config.add_subscriber('rhodecode.subscribers.reset_log_bucket',
115 config.add_subscriber('rhodecode.subscribers.reset_log_bucket',
General Comments 0
You need to be logged in to leave comments. Login now