##// END OF EJS Templates
celery: update how reqquest object is passed arround....
super-admin -
r4878:c5087cb0 default
parent child Browse files
Show More
@@ -325,6 +325,5 b' class EmailEventHandler(object):'
325 325
326 326 recipients = self.integration_settings['recipients']
327 327 for email_address in recipients:
328 run_task(
329 tasks.send_email, email_address, subject,
328 run_task(tasks.send_email, email_address, subject,
330 329 email_body_plaintext, email_body_html)
@@ -38,11 +38,9 b' from celery import signals'
38 38 from celery import Task
39 39 from celery import exceptions # pragma: no cover
40 40 from kombu.serialization import register
41 from pyramid.threadlocal import get_current_request
42 41
43 42 import rhodecode
44 43
45 from rhodecode.lib.auth import AuthUser
46 44 from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db
47 45 from rhodecode.lib.ext_json import json
48 46 from rhodecode.lib.pyramid_utils import bootstrap, setup_logging
@@ -212,6 +210,19 b' def task_revoked_signal('
212 210 closer()
213 211
214 212
213 class UNSET(object):
214 pass
215
216
217 def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()):
218
219 if request is not UNSET:
220 celery_app.conf.update({'PYRAMID_REQUEST': request})
221
222 if registry is not UNSET:
223 celery_app.conf.update({'PYRAMID_REGISTRY': registry})
224
225
215 226 def setup_celery_app(app, root, request, registry, closer, celery_settings):
216 227 log.debug('Got custom celery conf: %s', celery_settings)
217 228 celery_config = base_celery_config
@@ -273,39 +284,25 b' class RequestContextTask(Task):'
273 284 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
274 285 link=None, link_error=None, shadow=None, **options):
275 286 """ queue the job to run (we are in web request context here) """
287 from rhodecode.lib.base import get_ip_addr
276 288
277 req = self.app.conf['PYRAMID_REQUEST'] or get_current_request()
289 req = self.app.conf['PYRAMID_REQUEST']
290 if not req:
291 raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key')
278 292
279 293 log.debug('Running Task with class: %s. Request Class: %s',
280 294 self.__class__, req.__class__)
281 295
282 user_id = None
283 ip_addr = None
296 user_id = 0
284 297
285 298 # web case
286 299 if hasattr(req, 'user'):
287 ip_addr = req.user.ip_addr
288 300 user_id = req.user.user_id
289 301
290 302 # api case
291 303 elif hasattr(req, 'rpc_user'):
292 ip_addr = req.rpc_user.ip_addr
293 304 user_id = req.rpc_user.user_id
294 else:
295 if user_id and ip_addr:
296 log.debug('Using data from celery proxy user')
297 305
298 else:
299 raise Exception(
300 'Unable to fetch required data from request: {}. \n'
301 'This task is required to be executed from context of '
302 'request in a webapp. Task: {}'.format(
303 repr(req),
304 self.__class__
305 )
306 )
307
308 if req:
309 306 # we hook into kwargs since it is the only way to pass our data to
310 307 # the celery worker
311 308 environ = maybe_prepare_env(req)
@@ -314,7 +311,7 b' class RequestContextTask(Task):'
314 311 'rhodecode_proxy_data': {
315 312 'environ': environ,
316 313 'auth_user': {
317 'ip_addr': ip_addr,
314 'ip_addr': get_ip_addr(req.environ),
318 315 'user_id': user_id
319 316 },
320 317 }
@@ -57,8 +57,10 b' class HooksHttpHandler(BaseHTTPRequestHa'
57 57 'TXN ID fail: expected {} got {} instead'.format(
58 58 txn_id, computed_txn_id))
59 59
60 request = getattr(self.server, 'request', None)
60 61 try:
61 result = self._call_hook(method, extras)
62 hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address))
63 result = self._call_hook_method(hooks, method, extras)
62 64 except Exception as e:
63 65 exc_tb = traceback.format_exc()
64 66 result = {
@@ -80,8 +82,7 b' class HooksHttpHandler(BaseHTTPRequestHa'
80 82 self.end_headers()
81 83 self.wfile.write(json.dumps(result))
82 84
83 def _call_hook(self, method, extras):
84 hooks = Hooks()
85 def _call_hook_method(self, hooks, method, extras):
85 86 try:
86 87 result = getattr(hooks, method)(extras)
87 88 finally:
@@ -97,7 +98,7 b' class HooksHttpHandler(BaseHTTPRequestHa'
97 98 message = format % args
98 99
99 100 log.debug(
100 "%s - - [%s] %s", self.client_address[0],
101 "HOOKS: %s - - [%s] %s", self.client_address,
101 102 self.log_date_time_string(), message)
102 103
103 104
@@ -154,6 +155,10 b' class HttpHooksCallbackDaemon(ThreadedHo'
154 155 # request and wastes cpu at all other times.
155 156 POLL_INTERVAL = 0.01
156 157
158 @property
159 def _hook_prefix(self):
160 return 'HOOKS: {} '.format(self.hooks_uri)
161
157 162 def get_hostname(self):
158 163 return socket.gethostname() or '127.0.0.1'
159 164
@@ -162,6 +167,8 b' class HttpHooksCallbackDaemon(ThreadedHo'
162 167 return _get_port(min_port, max_port)
163 168
164 169 def _prepare(self, txn_id=None, host=None, port=None):
170 from pyramid.threadlocal import get_current_request
171
165 172 if not host or host == "*":
166 173 host = self.get_hostname()
167 174 if not port:
@@ -173,13 +180,16 b' class HttpHooksCallbackDaemon(ThreadedHo'
173 180 self._done = False
174 181
175 182 log.debug(
176 "Preparing HTTP callback daemon at `%s` and registering hook object: %s",
177 self.hooks_uri, HooksHttpHandler)
183 "%s Preparing HTTP callback daemon registering hook object: %s",
184 self._hook_prefix, HooksHttpHandler)
178 185
179 186 self._daemon = TCPServer(server_address, HooksHttpHandler)
180 187 # inject transaction_id for later verification
181 188 self._daemon.txn_id = self.txn_id
182 189
190 # pass the WEB app request into daemon
191 self._daemon.request = get_current_request()
192
183 193 def _run(self):
184 194 log.debug("Running event loop of callback daemon in background thread")
185 195 callback_thread = threading.Thread(
@@ -269,40 +279,35 b' class Hooks(object):'
269 279 """
270 280 Exposes the hooks for remote call backs
271 281 """
282 def __init__(self, request=None, log_prefix=''):
283 self.log_prefix = log_prefix
284 self.request = request
272 285
273 286 def repo_size(self, extras):
274 log.debug("Called repo_size of %s object", self)
287 log.debug("%sCalled repo_size of %s object", self.log_prefix, self)
275 288 return self._call_hook(hooks_base.repo_size, extras)
276 289
277 290 def pre_pull(self, extras):
278 log.debug("Called pre_pull of %s object", self)
291 log.debug("%sCalled pre_pull of %s object", self.log_prefix, self)
279 292 return self._call_hook(hooks_base.pre_pull, extras)
280 293
281 294 def post_pull(self, extras):
282 log.debug("Called post_pull of %s object", self)
295 log.debug("%sCalled post_pull of %s object", self.log_prefix, self)
283 296 return self._call_hook(hooks_base.post_pull, extras)
284 297
285 298 def pre_push(self, extras):
286 log.debug("Called pre_push of %s object", self)
299 log.debug("%sCalled pre_push of %s object", self.log_prefix, self)
287 300 return self._call_hook(hooks_base.pre_push, extras)
288 301
289 302 def post_push(self, extras):
290 log.debug("Called post_push of %s object", self)
303 log.debug("%sCalled post_push of %s object", self.log_prefix, self)
291 304 return self._call_hook(hooks_base.post_push, extras)
292 305
293 306 def _call_hook(self, hook, extras):
294 307 extras = AttributeDict(extras)
295 308 server_url = extras['server_url']
296 request = bootstrap_request(application_url=server_url)
297 309
298 bootstrap_config(request) # inject routes and other interfaces
299
300 # inject the user for usage in hooks
301 request.user = AttributeDict({'username': extras.username,
302 'ip_addr': extras.ip,
303 'user_id': extras.user_id})
304
305 extras.request = request
310 extras.request = self.request
306 311
307 312 try:
308 313 result = hook(extras)
@@ -322,7 +327,7 b' class Hooks(object):'
322 327 exc_tb = ''
323 328 if not isinstance(error, HTTPLockedRC):
324 329 exc_tb = traceback.format_exc()
325 log.exception('Exception when handling hook %s', hook)
330 log.exception('%sException when handling hook %s', self.log_prefix, hook)
326 331 error_args = error.args
327 332 return {
328 333 'status': 128,
@@ -334,7 +339,7 b' class Hooks(object):'
334 339 finally:
335 340 meta.Session.remove()
336 341
337 log.debug('Got hook call response %s', result)
342 log.debug('%sGot hook call response %s', self.log_prefix, result)
338 343 return {
339 344 'status': result.status,
340 345 'output': result.output,
@@ -154,8 +154,7 b' class NotificationModel(BaseModel):'
154 154 extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')}
155 155
156 156 log.debug('Creating notification email task for user:`%s`', recipient)
157 task = run_task(
158 tasks.send_email, recipient.email, subject,
157 task = run_task(tasks.send_email, recipient.email, subject,
159 158 email_body_plaintext, email_body, extra_headers=extra_headers)
160 159 log.debug('Created email task: %s', task)
161 160
@@ -69,6 +69,12 b' def set_user_lang(event):'
69 69 event.request._LOCALE_ = user_lang
70 70
71 71
72 def update_celery_conf(event):
73 from rhodecode.lib.celerylib.loader import set_celery_conf
74 log.debug('Setting celery config from new request')
75 set_celery_conf(request=event.request, registry=event.request.registry)
76
77
72 78 def add_request_user_context(event):
73 79 """
74 80 Adds auth user into request context
@@ -108,6 +108,8 b' def sanity_check_factory(handler, regist'
108 108 def includeme(config):
109 109 config.add_subscriber('rhodecode.subscribers.add_renderer_globals',
110 110 'pyramid.events.BeforeRender')
111 config.add_subscriber('rhodecode.subscribers.update_celery_conf',
112 'pyramid.events.NewRequest')
111 113 config.add_subscriber('rhodecode.subscribers.set_user_lang',
112 114 'pyramid.events.NewRequest')
113 115 config.add_subscriber('rhodecode.subscribers.reset_log_bucket',
General Comments 0
You need to be logged in to leave comments. Login now