Show More
@@ -325,6 +325,5 b' class EmailEventHandler(object):' | |||||
325 |
|
325 | |||
326 | recipients = self.integration_settings['recipients'] |
|
326 | recipients = self.integration_settings['recipients'] | |
327 | for email_address in recipients: |
|
327 | for email_address in recipients: | |
328 | run_task( |
|
328 | run_task(tasks.send_email, email_address, subject, | |
329 | tasks.send_email, email_address, subject, |
|
329 | email_body_plaintext, email_body_html) | |
330 | email_body_plaintext, email_body_html) |
|
@@ -38,11 +38,9 b' from celery import signals' | |||||
38 | from celery import Task |
|
38 | from celery import Task | |
39 | from celery import exceptions # pragma: no cover |
|
39 | from celery import exceptions # pragma: no cover | |
40 | from kombu.serialization import register |
|
40 | from kombu.serialization import register | |
41 | from pyramid.threadlocal import get_current_request |
|
|||
42 |
|
41 | |||
43 | import rhodecode |
|
42 | import rhodecode | |
44 |
|
43 | |||
45 | from rhodecode.lib.auth import AuthUser |
|
|||
46 | from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db |
|
44 | from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db | |
47 | from rhodecode.lib.ext_json import json |
|
45 | from rhodecode.lib.ext_json import json | |
48 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging |
|
46 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging | |
@@ -212,6 +210,19 b' def task_revoked_signal(' | |||||
212 | closer() |
|
210 | closer() | |
213 |
|
211 | |||
214 |
|
212 | |||
|
213 | class UNSET(object): | |||
|
214 | pass | |||
|
215 | ||||
|
216 | ||||
|
217 | def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()): | |||
|
218 | ||||
|
219 | if request is not UNSET: | |||
|
220 | celery_app.conf.update({'PYRAMID_REQUEST': request}) | |||
|
221 | ||||
|
222 | if registry is not UNSET: | |||
|
223 | celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | |||
|
224 | ||||
|
225 | ||||
215 | def setup_celery_app(app, root, request, registry, closer, celery_settings): |
|
226 | def setup_celery_app(app, root, request, registry, closer, celery_settings): | |
216 | log.debug('Got custom celery conf: %s', celery_settings) |
|
227 | log.debug('Got custom celery conf: %s', celery_settings) | |
217 | celery_config = base_celery_config |
|
228 | celery_config = base_celery_config | |
@@ -273,52 +284,38 b' class RequestContextTask(Task):' | |||||
273 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, |
|
284 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, | |
274 | link=None, link_error=None, shadow=None, **options): |
|
285 | link=None, link_error=None, shadow=None, **options): | |
275 | """ queue the job to run (we are in web request context here) """ |
|
286 | """ queue the job to run (we are in web request context here) """ | |
|
287 | from rhodecode.lib.base import get_ip_addr | |||
276 |
|
288 | |||
277 |
req = self.app.conf['PYRAMID_REQUEST'] |
|
289 | req = self.app.conf['PYRAMID_REQUEST'] | |
|
290 | if not req: | |||
|
291 | raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key') | |||
278 |
|
292 | |||
279 | log.debug('Running Task with class: %s. Request Class: %s', |
|
293 | log.debug('Running Task with class: %s. Request Class: %s', | |
280 | self.__class__, req.__class__) |
|
294 | self.__class__, req.__class__) | |
281 |
|
295 | |||
282 |
user_id = |
|
296 | user_id = 0 | |
283 | ip_addr = None |
|
|||
284 |
|
297 | |||
285 | # web case |
|
298 | # web case | |
286 | if hasattr(req, 'user'): |
|
299 | if hasattr(req, 'user'): | |
287 | ip_addr = req.user.ip_addr |
|
|||
288 | user_id = req.user.user_id |
|
300 | user_id = req.user.user_id | |
289 |
|
301 | |||
290 | # api case |
|
302 | # api case | |
291 | elif hasattr(req, 'rpc_user'): |
|
303 | elif hasattr(req, 'rpc_user'): | |
292 | ip_addr = req.rpc_user.ip_addr |
|
|||
293 | user_id = req.rpc_user.user_id |
|
304 | user_id = req.rpc_user.user_id | |
294 | else: |
|
|||
295 | if user_id and ip_addr: |
|
|||
296 | log.debug('Using data from celery proxy user') |
|
|||
297 |
|
305 | |||
298 | else: |
|
306 | # we hook into kwargs since it is the only way to pass our data to | |
299 | raise Exception( |
|
307 | # the celery worker | |
300 | 'Unable to fetch required data from request: {}. \n' |
|
308 | environ = maybe_prepare_env(req) | |
301 | 'This task is required to be executed from context of ' |
|
309 | options['headers'] = options.get('headers', {}) | |
302 | 'request in a webapp. Task: {}'.format( |
|
310 | options['headers'].update({ | |
303 | repr(req), |
|
311 | 'rhodecode_proxy_data': { | |
304 | self.__class__ |
|
312 | 'environ': environ, | |
305 |
|
|
313 | 'auth_user': { | |
306 | ) |
|
314 | 'ip_addr': get_ip_addr(req.environ), | |
307 |
|
315 | 'user_id': user_id | ||
308 | if req: |
|
316 | }, | |
309 | # we hook into kwargs since it is the only way to pass our data to |
|
317 | } | |
310 | # the celery worker |
|
318 | }) | |
311 | environ = maybe_prepare_env(req) |
|
|||
312 | options['headers'] = options.get('headers', {}) |
|
|||
313 | options['headers'].update({ |
|
|||
314 | 'rhodecode_proxy_data': { |
|
|||
315 | 'environ': environ, |
|
|||
316 | 'auth_user': { |
|
|||
317 | 'ip_addr': ip_addr, |
|
|||
318 | 'user_id': user_id |
|
|||
319 | }, |
|
|||
320 | } |
|
|||
321 | }) |
|
|||
322 |
|
319 | |||
323 | return super(RequestContextTask, self).apply_async( |
|
320 | return super(RequestContextTask, self).apply_async( | |
324 | args, kwargs, task_id, producer, link, link_error, shadow, **options) |
|
321 | args, kwargs, task_id, producer, link, link_error, shadow, **options) |
@@ -57,8 +57,10 b' class HooksHttpHandler(BaseHTTPRequestHa' | |||||
57 | 'TXN ID fail: expected {} got {} instead'.format( |
|
57 | 'TXN ID fail: expected {} got {} instead'.format( | |
58 | txn_id, computed_txn_id)) |
|
58 | txn_id, computed_txn_id)) | |
59 |
|
59 | |||
|
60 | request = getattr(self.server, 'request', None) | |||
60 | try: |
|
61 | try: | |
61 | result = self._call_hook(method, extras) |
|
62 | hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address)) | |
|
63 | result = self._call_hook_method(hooks, method, extras) | |||
62 | except Exception as e: |
|
64 | except Exception as e: | |
63 | exc_tb = traceback.format_exc() |
|
65 | exc_tb = traceback.format_exc() | |
64 | result = { |
|
66 | result = { | |
@@ -80,8 +82,7 b' class HooksHttpHandler(BaseHTTPRequestHa' | |||||
80 | self.end_headers() |
|
82 | self.end_headers() | |
81 | self.wfile.write(json.dumps(result)) |
|
83 | self.wfile.write(json.dumps(result)) | |
82 |
|
84 | |||
83 | def _call_hook(self, method, extras): |
|
85 | def _call_hook_method(self, hooks, method, extras): | |
84 | hooks = Hooks() |
|
|||
85 | try: |
|
86 | try: | |
86 | result = getattr(hooks, method)(extras) |
|
87 | result = getattr(hooks, method)(extras) | |
87 | finally: |
|
88 | finally: | |
@@ -97,7 +98,7 b' class HooksHttpHandler(BaseHTTPRequestHa' | |||||
97 | message = format % args |
|
98 | message = format % args | |
98 |
|
99 | |||
99 | log.debug( |
|
100 | log.debug( | |
100 |
"%s - - [%s] %s", self.client_address |
|
101 | "HOOKS: %s - - [%s] %s", self.client_address, | |
101 | self.log_date_time_string(), message) |
|
102 | self.log_date_time_string(), message) | |
102 |
|
103 | |||
103 |
|
104 | |||
@@ -154,6 +155,10 b' class HttpHooksCallbackDaemon(ThreadedHo' | |||||
154 | # request and wastes cpu at all other times. |
|
155 | # request and wastes cpu at all other times. | |
155 | POLL_INTERVAL = 0.01 |
|
156 | POLL_INTERVAL = 0.01 | |
156 |
|
157 | |||
|
158 | @property | |||
|
159 | def _hook_prefix(self): | |||
|
160 | return 'HOOKS: {} '.format(self.hooks_uri) | |||
|
161 | ||||
157 | def get_hostname(self): |
|
162 | def get_hostname(self): | |
158 | return socket.gethostname() or '127.0.0.1' |
|
163 | return socket.gethostname() or '127.0.0.1' | |
159 |
|
164 | |||
@@ -162,6 +167,8 b' class HttpHooksCallbackDaemon(ThreadedHo' | |||||
162 | return _get_port(min_port, max_port) |
|
167 | return _get_port(min_port, max_port) | |
163 |
|
168 | |||
164 | def _prepare(self, txn_id=None, host=None, port=None): |
|
169 | def _prepare(self, txn_id=None, host=None, port=None): | |
|
170 | from pyramid.threadlocal import get_current_request | |||
|
171 | ||||
165 | if not host or host == "*": |
|
172 | if not host or host == "*": | |
166 | host = self.get_hostname() |
|
173 | host = self.get_hostname() | |
167 | if not port: |
|
174 | if not port: | |
@@ -173,13 +180,16 b' class HttpHooksCallbackDaemon(ThreadedHo' | |||||
173 | self._done = False |
|
180 | self._done = False | |
174 |
|
181 | |||
175 | log.debug( |
|
182 | log.debug( | |
176 |
" |
|
183 | "%s Preparing HTTP callback daemon registering hook object: %s", | |
177 |
self. |
|
184 | self._hook_prefix, HooksHttpHandler) | |
178 |
|
185 | |||
179 | self._daemon = TCPServer(server_address, HooksHttpHandler) |
|
186 | self._daemon = TCPServer(server_address, HooksHttpHandler) | |
180 | # inject transaction_id for later verification |
|
187 | # inject transaction_id for later verification | |
181 | self._daemon.txn_id = self.txn_id |
|
188 | self._daemon.txn_id = self.txn_id | |
182 |
|
189 | |||
|
190 | # pass the WEB app request into daemon | |||
|
191 | self._daemon.request = get_current_request() | |||
|
192 | ||||
183 | def _run(self): |
|
193 | def _run(self): | |
184 | log.debug("Running event loop of callback daemon in background thread") |
|
194 | log.debug("Running event loop of callback daemon in background thread") | |
185 | callback_thread = threading.Thread( |
|
195 | callback_thread = threading.Thread( | |
@@ -269,40 +279,35 b' class Hooks(object):' | |||||
269 | """ |
|
279 | """ | |
270 | Exposes the hooks for remote call backs |
|
280 | Exposes the hooks for remote call backs | |
271 | """ |
|
281 | """ | |
|
282 | def __init__(self, request=None, log_prefix=''): | |||
|
283 | self.log_prefix = log_prefix | |||
|
284 | self.request = request | |||
272 |
|
285 | |||
273 | def repo_size(self, extras): |
|
286 | def repo_size(self, extras): | |
274 | log.debug("Called repo_size of %s object", self) |
|
287 | log.debug("%sCalled repo_size of %s object", self.log_prefix, self) | |
275 | return self._call_hook(hooks_base.repo_size, extras) |
|
288 | return self._call_hook(hooks_base.repo_size, extras) | |
276 |
|
289 | |||
277 | def pre_pull(self, extras): |
|
290 | def pre_pull(self, extras): | |
278 | log.debug("Called pre_pull of %s object", self) |
|
291 | log.debug("%sCalled pre_pull of %s object", self.log_prefix, self) | |
279 | return self._call_hook(hooks_base.pre_pull, extras) |
|
292 | return self._call_hook(hooks_base.pre_pull, extras) | |
280 |
|
293 | |||
281 | def post_pull(self, extras): |
|
294 | def post_pull(self, extras): | |
282 | log.debug("Called post_pull of %s object", self) |
|
295 | log.debug("%sCalled post_pull of %s object", self.log_prefix, self) | |
283 | return self._call_hook(hooks_base.post_pull, extras) |
|
296 | return self._call_hook(hooks_base.post_pull, extras) | |
284 |
|
297 | |||
285 | def pre_push(self, extras): |
|
298 | def pre_push(self, extras): | |
286 | log.debug("Called pre_push of %s object", self) |
|
299 | log.debug("%sCalled pre_push of %s object", self.log_prefix, self) | |
287 | return self._call_hook(hooks_base.pre_push, extras) |
|
300 | return self._call_hook(hooks_base.pre_push, extras) | |
288 |
|
301 | |||
289 | def post_push(self, extras): |
|
302 | def post_push(self, extras): | |
290 | log.debug("Called post_push of %s object", self) |
|
303 | log.debug("%sCalled post_push of %s object", self.log_prefix, self) | |
291 | return self._call_hook(hooks_base.post_push, extras) |
|
304 | return self._call_hook(hooks_base.post_push, extras) | |
292 |
|
305 | |||
293 | def _call_hook(self, hook, extras): |
|
306 | def _call_hook(self, hook, extras): | |
294 | extras = AttributeDict(extras) |
|
307 | extras = AttributeDict(extras) | |
295 | server_url = extras['server_url'] |
|
308 | server_url = extras['server_url'] | |
296 | request = bootstrap_request(application_url=server_url) |
|
|||
297 |
|
309 | |||
298 | bootstrap_config(request) # inject routes and other interfaces |
|
310 | extras.request = self.request | |
299 |
|
||||
300 | # inject the user for usage in hooks |
|
|||
301 | request.user = AttributeDict({'username': extras.username, |
|
|||
302 | 'ip_addr': extras.ip, |
|
|||
303 | 'user_id': extras.user_id}) |
|
|||
304 |
|
||||
305 | extras.request = request |
|
|||
306 |
|
311 | |||
307 | try: |
|
312 | try: | |
308 | result = hook(extras) |
|
313 | result = hook(extras) | |
@@ -322,7 +327,7 b' class Hooks(object):' | |||||
322 | exc_tb = '' |
|
327 | exc_tb = '' | |
323 | if not isinstance(error, HTTPLockedRC): |
|
328 | if not isinstance(error, HTTPLockedRC): | |
324 | exc_tb = traceback.format_exc() |
|
329 | exc_tb = traceback.format_exc() | |
325 | log.exception('Exception when handling hook %s', hook) |
|
330 | log.exception('%sException when handling hook %s', self.log_prefix, hook) | |
326 | error_args = error.args |
|
331 | error_args = error.args | |
327 | return { |
|
332 | return { | |
328 | 'status': 128, |
|
333 | 'status': 128, | |
@@ -334,7 +339,7 b' class Hooks(object):' | |||||
334 | finally: |
|
339 | finally: | |
335 | meta.Session.remove() |
|
340 | meta.Session.remove() | |
336 |
|
341 | |||
337 | log.debug('Got hook call response %s', result) |
|
342 | log.debug('%sGot hook call response %s', self.log_prefix, result) | |
338 | return { |
|
343 | return { | |
339 | 'status': result.status, |
|
344 | 'status': result.status, | |
340 | 'output': result.output, |
|
345 | 'output': result.output, |
@@ -154,9 +154,8 b' class NotificationModel(BaseModel):' | |||||
154 | extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')} |
|
154 | extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')} | |
155 |
|
155 | |||
156 | log.debug('Creating notification email task for user:`%s`', recipient) |
|
156 | log.debug('Creating notification email task for user:`%s`', recipient) | |
157 | task = run_task( |
|
157 | task = run_task(tasks.send_email, recipient.email, subject, | |
158 | tasks.send_email, recipient.email, subject, |
|
158 | email_body_plaintext, email_body, extra_headers=extra_headers) | |
159 | email_body_plaintext, email_body, extra_headers=extra_headers) |
|
|||
160 | log.debug('Created email task: %s', task) |
|
159 | log.debug('Created email task: %s', task) | |
161 |
|
160 | |||
162 | return notification |
|
161 | return notification |
@@ -69,6 +69,12 b' def set_user_lang(event):' | |||||
69 | event.request._LOCALE_ = user_lang |
|
69 | event.request._LOCALE_ = user_lang | |
70 |
|
70 | |||
71 |
|
71 | |||
|
72 | def update_celery_conf(event): | |||
|
73 | from rhodecode.lib.celerylib.loader import set_celery_conf | |||
|
74 | log.debug('Setting celery config from new request') | |||
|
75 | set_celery_conf(request=event.request, registry=event.request.registry) | |||
|
76 | ||||
|
77 | ||||
72 | def add_request_user_context(event): |
|
78 | def add_request_user_context(event): | |
73 | """ |
|
79 | """ | |
74 | Adds auth user into request context |
|
80 | Adds auth user into request context |
@@ -108,6 +108,8 b' def sanity_check_factory(handler, regist' | |||||
108 | def includeme(config): |
|
108 | def includeme(config): | |
109 | config.add_subscriber('rhodecode.subscribers.add_renderer_globals', |
|
109 | config.add_subscriber('rhodecode.subscribers.add_renderer_globals', | |
110 | 'pyramid.events.BeforeRender') |
|
110 | 'pyramid.events.BeforeRender') | |
|
111 | config.add_subscriber('rhodecode.subscribers.update_celery_conf', | |||
|
112 | 'pyramid.events.NewRequest') | |||
111 | config.add_subscriber('rhodecode.subscribers.set_user_lang', |
|
113 | config.add_subscriber('rhodecode.subscribers.set_user_lang', | |
112 | 'pyramid.events.NewRequest') |
|
114 | 'pyramid.events.NewRequest') | |
113 | config.add_subscriber('rhodecode.subscribers.reset_log_bucket', |
|
115 | config.add_subscriber('rhodecode.subscribers.reset_log_bucket', |
General Comments 0
You need to be logged in to leave comments.
Login now