Show More
@@ -325,6 +325,5 b' class EmailEventHandler(object):' | |||
|
325 | 325 | |
|
326 | 326 | recipients = self.integration_settings['recipients'] |
|
327 | 327 | for email_address in recipients: |
|
328 | run_task( | |
|
329 | tasks.send_email, email_address, subject, | |
|
328 | run_task(tasks.send_email, email_address, subject, | |
|
330 | 329 | email_body_plaintext, email_body_html) |
@@ -38,11 +38,9 b' from celery import signals' | |||
|
38 | 38 | from celery import Task |
|
39 | 39 | from celery import exceptions # pragma: no cover |
|
40 | 40 | from kombu.serialization import register |
|
41 | from pyramid.threadlocal import get_current_request | |
|
42 | 41 | |
|
43 | 42 | import rhodecode |
|
44 | 43 | |
|
45 | from rhodecode.lib.auth import AuthUser | |
|
46 | 44 | from rhodecode.lib.celerylib.utils import parse_ini_vars, ping_db |
|
47 | 45 | from rhodecode.lib.ext_json import json |
|
48 | 46 | from rhodecode.lib.pyramid_utils import bootstrap, setup_logging |
@@ -212,6 +210,19 b' def task_revoked_signal(' | |||
|
212 | 210 | closer() |
|
213 | 211 | |
|
214 | 212 | |
|
213 | class UNSET(object): | |
|
214 | pass | |
|
215 | ||
|
216 | ||
|
217 | def set_celery_conf(app=UNSET(), root=UNSET(), request=UNSET(), registry=UNSET(), closer=UNSET()): | |
|
218 | ||
|
219 | if request is not UNSET: | |
|
220 | celery_app.conf.update({'PYRAMID_REQUEST': request}) | |
|
221 | ||
|
222 | if registry is not UNSET: | |
|
223 | celery_app.conf.update({'PYRAMID_REGISTRY': registry}) | |
|
224 | ||
|
225 | ||
|
215 | 226 | def setup_celery_app(app, root, request, registry, closer, celery_settings): |
|
216 | 227 | log.debug('Got custom celery conf: %s', celery_settings) |
|
217 | 228 | celery_config = base_celery_config |
@@ -273,39 +284,25 b' class RequestContextTask(Task):' | |||
|
273 | 284 | def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, |
|
274 | 285 | link=None, link_error=None, shadow=None, **options): |
|
275 | 286 | """ queue the job to run (we are in web request context here) """ |
|
287 | from rhodecode.lib.base import get_ip_addr | |
|
276 | 288 | |
|
277 |
req = self.app.conf['PYRAMID_REQUEST'] |
|
|
289 | req = self.app.conf['PYRAMID_REQUEST'] | |
|
290 | if not req: | |
|
291 | raise ValueError('celery_app.conf is having empty PYRAMID_REQUEST key') | |
|
278 | 292 | |
|
279 | 293 | log.debug('Running Task with class: %s. Request Class: %s', |
|
280 | 294 | self.__class__, req.__class__) |
|
281 | 295 | |
|
282 |
user_id = |
|
|
283 | ip_addr = None | |
|
296 | user_id = 0 | |
|
284 | 297 | |
|
285 | 298 | # web case |
|
286 | 299 | if hasattr(req, 'user'): |
|
287 | ip_addr = req.user.ip_addr | |
|
288 | 300 | user_id = req.user.user_id |
|
289 | 301 | |
|
290 | 302 | # api case |
|
291 | 303 | elif hasattr(req, 'rpc_user'): |
|
292 | ip_addr = req.rpc_user.ip_addr | |
|
293 | 304 | user_id = req.rpc_user.user_id |
|
294 | else: | |
|
295 | if user_id and ip_addr: | |
|
296 | log.debug('Using data from celery proxy user') | |
|
297 | 305 | |
|
298 | else: | |
|
299 | raise Exception( | |
|
300 | 'Unable to fetch required data from request: {}. \n' | |
|
301 | 'This task is required to be executed from context of ' | |
|
302 | 'request in a webapp. Task: {}'.format( | |
|
303 | repr(req), | |
|
304 | self.__class__ | |
|
305 | ) | |
|
306 | ) | |
|
307 | ||
|
308 | if req: | |
|
309 | 306 |
|
|
310 | 307 |
|
|
311 | 308 |
|
@@ -314,7 +311,7 b' class RequestContextTask(Task):' | |||
|
314 | 311 |
|
|
315 | 312 |
|
|
316 | 313 |
|
|
317 |
|
|
|
314 | 'ip_addr': get_ip_addr(req.environ), | |
|
318 | 315 |
|
|
319 | 316 |
|
|
320 | 317 |
|
@@ -57,8 +57,10 b' class HooksHttpHandler(BaseHTTPRequestHa' | |||
|
57 | 57 | 'TXN ID fail: expected {} got {} instead'.format( |
|
58 | 58 | txn_id, computed_txn_id)) |
|
59 | 59 | |
|
60 | request = getattr(self.server, 'request', None) | |
|
60 | 61 | try: |
|
61 | result = self._call_hook(method, extras) | |
|
62 | hooks = Hooks(request=request, log_prefix='HOOKS: {} '.format(self.server.server_address)) | |
|
63 | result = self._call_hook_method(hooks, method, extras) | |
|
62 | 64 | except Exception as e: |
|
63 | 65 | exc_tb = traceback.format_exc() |
|
64 | 66 | result = { |
@@ -80,8 +82,7 b' class HooksHttpHandler(BaseHTTPRequestHa' | |||
|
80 | 82 | self.end_headers() |
|
81 | 83 | self.wfile.write(json.dumps(result)) |
|
82 | 84 | |
|
83 | def _call_hook(self, method, extras): | |
|
84 | hooks = Hooks() | |
|
85 | def _call_hook_method(self, hooks, method, extras): | |
|
85 | 86 | try: |
|
86 | 87 | result = getattr(hooks, method)(extras) |
|
87 | 88 | finally: |
@@ -97,7 +98,7 b' class HooksHttpHandler(BaseHTTPRequestHa' | |||
|
97 | 98 | message = format % args |
|
98 | 99 | |
|
99 | 100 | log.debug( |
|
100 |
"%s - - [%s] %s", self.client_address |
|
|
101 | "HOOKS: %s - - [%s] %s", self.client_address, | |
|
101 | 102 | self.log_date_time_string(), message) |
|
102 | 103 | |
|
103 | 104 | |
@@ -154,6 +155,10 b' class HttpHooksCallbackDaemon(ThreadedHo' | |||
|
154 | 155 | # request and wastes cpu at all other times. |
|
155 | 156 | POLL_INTERVAL = 0.01 |
|
156 | 157 | |
|
158 | @property | |
|
159 | def _hook_prefix(self): | |
|
160 | return 'HOOKS: {} '.format(self.hooks_uri) | |
|
161 | ||
|
157 | 162 | def get_hostname(self): |
|
158 | 163 | return socket.gethostname() or '127.0.0.1' |
|
159 | 164 | |
@@ -162,6 +167,8 b' class HttpHooksCallbackDaemon(ThreadedHo' | |||
|
162 | 167 | return _get_port(min_port, max_port) |
|
163 | 168 | |
|
164 | 169 | def _prepare(self, txn_id=None, host=None, port=None): |
|
170 | from pyramid.threadlocal import get_current_request | |
|
171 | ||
|
165 | 172 | if not host or host == "*": |
|
166 | 173 | host = self.get_hostname() |
|
167 | 174 | if not port: |
@@ -173,13 +180,16 b' class HttpHooksCallbackDaemon(ThreadedHo' | |||
|
173 | 180 | self._done = False |
|
174 | 181 | |
|
175 | 182 | log.debug( |
|
176 |
" |
|
|
177 |
self. |
|
|
183 | "%s Preparing HTTP callback daemon registering hook object: %s", | |
|
184 | self._hook_prefix, HooksHttpHandler) | |
|
178 | 185 | |
|
179 | 186 | self._daemon = TCPServer(server_address, HooksHttpHandler) |
|
180 | 187 | # inject transaction_id for later verification |
|
181 | 188 | self._daemon.txn_id = self.txn_id |
|
182 | 189 | |
|
190 | # pass the WEB app request into daemon | |
|
191 | self._daemon.request = get_current_request() | |
|
192 | ||
|
183 | 193 | def _run(self): |
|
184 | 194 | log.debug("Running event loop of callback daemon in background thread") |
|
185 | 195 | callback_thread = threading.Thread( |
@@ -269,40 +279,35 b' class Hooks(object):' | |||
|
269 | 279 | """ |
|
270 | 280 | Exposes the hooks for remote call backs |
|
271 | 281 | """ |
|
282 | def __init__(self, request=None, log_prefix=''): | |
|
283 | self.log_prefix = log_prefix | |
|
284 | self.request = request | |
|
272 | 285 | |
|
273 | 286 | def repo_size(self, extras): |
|
274 | log.debug("Called repo_size of %s object", self) | |
|
287 | log.debug("%sCalled repo_size of %s object", self.log_prefix, self) | |
|
275 | 288 | return self._call_hook(hooks_base.repo_size, extras) |
|
276 | 289 | |
|
277 | 290 | def pre_pull(self, extras): |
|
278 | log.debug("Called pre_pull of %s object", self) | |
|
291 | log.debug("%sCalled pre_pull of %s object", self.log_prefix, self) | |
|
279 | 292 | return self._call_hook(hooks_base.pre_pull, extras) |
|
280 | 293 | |
|
281 | 294 | def post_pull(self, extras): |
|
282 | log.debug("Called post_pull of %s object", self) | |
|
295 | log.debug("%sCalled post_pull of %s object", self.log_prefix, self) | |
|
283 | 296 | return self._call_hook(hooks_base.post_pull, extras) |
|
284 | 297 | |
|
285 | 298 | def pre_push(self, extras): |
|
286 | log.debug("Called pre_push of %s object", self) | |
|
299 | log.debug("%sCalled pre_push of %s object", self.log_prefix, self) | |
|
287 | 300 | return self._call_hook(hooks_base.pre_push, extras) |
|
288 | 301 | |
|
289 | 302 | def post_push(self, extras): |
|
290 | log.debug("Called post_push of %s object", self) | |
|
303 | log.debug("%sCalled post_push of %s object", self.log_prefix, self) | |
|
291 | 304 | return self._call_hook(hooks_base.post_push, extras) |
|
292 | 305 | |
|
293 | 306 | def _call_hook(self, hook, extras): |
|
294 | 307 | extras = AttributeDict(extras) |
|
295 | 308 | server_url = extras['server_url'] |
|
296 | request = bootstrap_request(application_url=server_url) | |
|
297 | 309 | |
|
298 | bootstrap_config(request) # inject routes and other interfaces | |
|
299 | ||
|
300 | # inject the user for usage in hooks | |
|
301 | request.user = AttributeDict({'username': extras.username, | |
|
302 | 'ip_addr': extras.ip, | |
|
303 | 'user_id': extras.user_id}) | |
|
304 | ||
|
305 | extras.request = request | |
|
310 | extras.request = self.request | |
|
306 | 311 | |
|
307 | 312 | try: |
|
308 | 313 | result = hook(extras) |
@@ -322,7 +327,7 b' class Hooks(object):' | |||
|
322 | 327 | exc_tb = '' |
|
323 | 328 | if not isinstance(error, HTTPLockedRC): |
|
324 | 329 | exc_tb = traceback.format_exc() |
|
325 | log.exception('Exception when handling hook %s', hook) | |
|
330 | log.exception('%sException when handling hook %s', self.log_prefix, hook) | |
|
326 | 331 | error_args = error.args |
|
327 | 332 | return { |
|
328 | 333 | 'status': 128, |
@@ -334,7 +339,7 b' class Hooks(object):' | |||
|
334 | 339 | finally: |
|
335 | 340 | meta.Session.remove() |
|
336 | 341 | |
|
337 | log.debug('Got hook call response %s', result) | |
|
342 | log.debug('%sGot hook call response %s', self.log_prefix, result) | |
|
338 | 343 | return { |
|
339 | 344 | 'status': result.status, |
|
340 | 345 | 'output': result.output, |
@@ -154,8 +154,7 b' class NotificationModel(BaseModel):' | |||
|
154 | 154 | extra_headers = {'thread_ids': email_kwargs.pop('thread_ids')} |
|
155 | 155 | |
|
156 | 156 | log.debug('Creating notification email task for user:`%s`', recipient) |
|
157 | task = run_task( | |
|
158 | tasks.send_email, recipient.email, subject, | |
|
157 | task = run_task(tasks.send_email, recipient.email, subject, | |
|
159 | 158 | email_body_plaintext, email_body, extra_headers=extra_headers) |
|
160 | 159 | log.debug('Created email task: %s', task) |
|
161 | 160 |
@@ -69,6 +69,12 b' def set_user_lang(event):' | |||
|
69 | 69 | event.request._LOCALE_ = user_lang |
|
70 | 70 | |
|
71 | 71 | |
|
72 | def update_celery_conf(event): | |
|
73 | from rhodecode.lib.celerylib.loader import set_celery_conf | |
|
74 | log.debug('Setting celery config from new request') | |
|
75 | set_celery_conf(request=event.request, registry=event.request.registry) | |
|
76 | ||
|
77 | ||
|
72 | 78 | def add_request_user_context(event): |
|
73 | 79 | """ |
|
74 | 80 | Adds auth user into request context |
@@ -108,6 +108,8 b' def sanity_check_factory(handler, regist' | |||
|
108 | 108 | def includeme(config): |
|
109 | 109 | config.add_subscriber('rhodecode.subscribers.add_renderer_globals', |
|
110 | 110 | 'pyramid.events.BeforeRender') |
|
111 | config.add_subscriber('rhodecode.subscribers.update_celery_conf', | |
|
112 | 'pyramid.events.NewRequest') | |
|
111 | 113 | config.add_subscriber('rhodecode.subscribers.set_user_lang', |
|
112 | 114 | 'pyramid.events.NewRequest') |
|
113 | 115 | config.add_subscriber('rhodecode.subscribers.reset_log_bucket', |
General Comments 0
You need to be logged in to leave comments.
Login now